Coverage for src / rhiza / models / _git_utils.py: 100%
319 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-02 07:04 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-02 07:04 +0000
1"""Git utility helpers for Rhiza models."""
3import logging
4import os
5import shutil
6import subprocess # nosec B404
7import tempfile
8from dataclasses import dataclass, field
9from pathlib import Path
10from typing import TYPE_CHECKING, Any
12from loguru import logger
14if TYPE_CHECKING:
15 from rhiza.models.lock import TemplateLock
16 from rhiza.models.template import RhizaTemplate
19@dataclass
20class GitContext:
21 """Bundles the git executable path and environment for subprocess calls.
23 All git-invoking functions in the sync helpers accept a
24 :class:`GitContext` instead of resolving the executable on their own,
25 making them easily testable via binary injection.
27 Attributes:
28 executable: Absolute path to the git binary.
29 env: Environment variables passed to every git subprocess.
30 """
32 executable: str
33 env: dict[str, str] = field(default_factory=dict)
35 @classmethod
36 def default(cls) -> "GitContext":
37 """Create a GitContext using the system git and process environment.
39 Returns:
40 A :class:`GitContext` populated with the real git executable path
41 and a copy of the current process environment with
42 ``GIT_TERMINAL_PROMPT`` set to ``"0"``.
43 """
44 env = os.environ.copy()
45 env["GIT_TERMINAL_PROMPT"] = "0"
46 return cls(executable=get_git_executable(), env=env)
48 def assert_status_clean(self, target: Path) -> None:
49 """Raise RuntimeError if the target repository has uncommitted changes.
51 Runs ``git status --porcelain`` and raises if the output is non-empty,
52 preventing a sync from running on a dirty working tree.
54 Args:
55 target: Path to the target repository.
57 Raises:
58 RuntimeError: If the working tree has uncommitted changes.
59 """
60 result = subprocess.run( # nosec B603 # noqa: S603
61 [self.executable, "status", "--porcelain"],
62 cwd=target,
63 capture_output=True,
64 text=True,
65 env=self.env,
66 )
67 if result.stdout.strip():
68 logger.error("Working tree is not clean. Please commit or stash your changes before syncing.")
69 logger.error("Uncommitted changes:")
70 for line in result.stdout.strip().splitlines():
71 logger.error(f" {line}")
72 raise RuntimeError("Working tree is not clean. Please commit or stash your changes before syncing.") # noqa: TRY003
74 def handle_target_branch(self, target: Path, target_branch: str | None) -> None:
75 """Handle target branch creation or checkout if specified.
77 Args:
78 target: Path to the target repository.
79 target_branch: Optional branch name to create/checkout.
80 """
81 if not target_branch:
82 return
84 logger.info(f"Creating/checking out target branch: {target_branch}")
85 try:
86 result = subprocess.run( # nosec B603 # noqa: S603
87 [self.executable, "rev-parse", "--verify", target_branch],
88 cwd=target,
89 capture_output=True,
90 text=True,
91 env=self.env,
92 )
94 if result.returncode == 0:
95 logger.info(f"Branch '{target_branch}' exists, checking out...")
96 subprocess.run( # nosec B603 # noqa: S603
97 [self.executable, "checkout", target_branch],
98 cwd=target,
99 check=True,
100 capture_output=True,
101 text=True,
102 env=self.env,
103 )
104 else:
105 logger.info(f"Creating new branch '{target_branch}'...")
106 subprocess.run( # nosec B603 # noqa: S603
107 [self.executable, "checkout", "-b", target_branch],
108 cwd=target,
109 check=True,
110 capture_output=True,
111 text=True,
112 env=self.env,
113 )
114 except subprocess.CalledProcessError as e:
115 logger.error(f"Failed to create/checkout branch '{target_branch}'")
116 _log_git_stderr_errors(e.stderr)
117 logger.error("Please ensure you have no uncommitted changes or conflicts")
118 raise
120 def get_diff(self, repo0: Path, repo1: Path) -> str:
121 """Compute the raw diff between two directory trees using ``git diff --no-index``.
123 Args:
124 repo0: Path to the base (old) directory tree.
125 repo1: Path to the upstream (new) directory tree.
126 """
127 repo0_str = repo0.resolve().as_posix()
128 repo1_str = repo1.resolve().as_posix()
129 result = subprocess.run( # nosec B603 # noqa: S603
130 [
131 self.executable,
132 "-c",
133 "diff.noprefix=",
134 "diff",
135 "--no-index",
136 "--relative",
137 "--binary",
138 "--src-prefix=upstream-template-old/",
139 "--dst-prefix=upstream-template-new/",
140 "--no-ext-diff",
141 "--no-color",
142 repo0_str,
143 repo1_str,
144 ],
145 cwd=repo0_str,
146 capture_output=True,
147 env=self.env,
148 )
149 diff = result.stdout.decode() if isinstance(result.stdout, bytes) else (result.stdout or "")
150 for repo in [repo0_str, repo1_str]:
151 from re import sub
153 repo_nix = sub("/[a-z]:", "", repo)
154 diff = diff.replace(f"upstream-template-old{repo_nix}", "upstream-template-old").replace(
155 f"upstream-template-new{repo_nix}", "upstream-template-new"
156 )
157 diff = diff.replace(repo0_str + "/", "").replace(repo1_str + "/", "")
158 return diff
160 def sync_diff(self, target: Path, upstream_snapshot: Path) -> None:
161 """Execute the diff (dry-run) strategy.
163 Shows what would change without modifying any files.
165 Args:
166 target: Path to the target repository.
167 upstream_snapshot: Path to the upstream snapshot directory.
168 """
169 diff = self.get_diff(target, upstream_snapshot)
170 if diff.strip():
171 logger.info(f"\n{diff}")
172 changes = diff.count("diff --git")
173 logger.info(f"{changes} file(s) would be changed")
174 else:
175 logger.success("No differences found")
177 def _parse_diff_filenames(self, diff: str) -> list[tuple[str, bool, bool]]:
178 """Parse a unified diff produced by :func:`GitContext.get_diff` into file entries.
180 Each entry is ``(rel_path, is_new, is_deleted)`` where *rel_path* is the
181 path relative to both snapshot directories.
183 Args:
184 diff: Unified diff string from :func:`GitContext.get_diff`.
186 Returns:
187 List of ``(rel_path, is_new, is_deleted)`` tuples, one per changed file.
188 """
189 src_prefix = "upstream-template-old/"
190 dst_prefix = "upstream-template-new/"
192 results: list[tuple[str, bool, bool]] = []
193 is_new = False
194 is_deleted = False
195 src_path: str | None = None
196 dst_path: str | None = None
197 in_diff = False
199 def _flush() -> None:
200 rel = dst_path if not is_deleted else src_path
201 if rel:
202 results.append((rel, is_new, is_deleted))
204 for line in diff.splitlines():
205 if line.startswith("diff --git "):
206 if in_diff:
207 _flush()
208 is_new = False
209 is_deleted = False
210 src_path = None
211 dst_path = None
212 in_diff = True
213 elif line.startswith("new file mode"):
214 is_new = True
215 elif line.startswith("deleted file mode"):
216 is_deleted = True
217 elif line.startswith("--- "):
218 raw = line[4:].strip().strip('"').split("\t")[0]
219 if raw != "/dev/null" and raw.startswith(src_prefix):
220 src_path = raw[len(src_prefix) :]
221 elif line.startswith("+++ "):
222 raw = line[4:].strip().strip('"').split("\t")[0]
223 if raw != "/dev/null" and raw.startswith(dst_prefix):
224 dst_path = raw[len(dst_prefix) :]
226 if in_diff:
227 _flush()
229 return results
231 def _merge_file_fallback(
232 self,
233 diff: str,
234 target: Path,
235 base_snapshot: Path,
236 upstream_snapshot: Path,
237 ) -> bool:
238 """Apply *diff* file-by-file using ``git merge-file``.
240 Unlike ``git apply -3``, ``git merge-file`` works directly on the file
241 contents from *base_snapshot* and *upstream_snapshot*, so it does not
242 require the template's blob objects to exist in the target repository.
244 Conflict markers (``<<<<<<<`` / ``=======`` / ``>>>>>>>``) are left in
245 place for manual resolution when both sides changed the same region.
247 Args:
248 diff: Unified diff string (used only for file-list parsing).
249 target: Path to the target repository.
250 base_snapshot: Directory containing files at the previously-synced SHA.
251 upstream_snapshot: Directory containing files at the new upstream SHA.
253 Returns:
254 True if every file merged cleanly, False if any conflicts remain.
255 """
256 file_entries = self._parse_diff_filenames(diff)
257 all_clean = True
258 conflict_files: list[str] = []
260 for rel_path, is_new, is_deleted in file_entries:
261 target_file = target / rel_path
262 upstream_file = upstream_snapshot / rel_path
263 base_file = base_snapshot / rel_path
265 if is_new:
266 if upstream_file.exists():
267 target_file.parent.mkdir(parents=True, exist_ok=True)
268 shutil.copy2(upstream_file, target_file)
269 logger.debug(f"[merge-file] Added: {rel_path}")
270 continue
272 if is_deleted:
273 if target_file.exists():
274 target_file.unlink()
275 logger.debug(f"[merge-file] Deleted: {rel_path}")
276 continue
278 # Modified file — attempt a 3-way merge using the on-disk snapshots.
279 if not target_file.exists():
280 if upstream_file.exists():
281 target_file.parent.mkdir(parents=True, exist_ok=True)
282 shutil.copy2(upstream_file, target_file)
283 logger.debug(f"[merge-file] Created (missing in target): {rel_path}")
284 continue
286 if not base_file.exists() or not upstream_file.exists():
287 # Cannot 3-way-merge without both sides; just take upstream.
288 if upstream_file.exists():
289 shutil.copy2(upstream_file, target_file)
290 logger.debug(f"[merge-file] Overwrite (no base): {rel_path}")
291 continue
293 result = subprocess.run( # nosec B603 # noqa: S603
294 [
295 self.executable,
296 "merge-file",
297 "-L",
298 "ours",
299 "-L",
300 "base",
301 "-L",
302 "upstream",
303 str(target_file),
304 str(base_file),
305 str(upstream_file),
306 ],
307 capture_output=True,
308 env=self.env,
309 )
311 if result.returncode > 0:
312 conflict_files.append(rel_path)
313 all_clean = False
314 logger.warning(f"[merge-file] Conflict in {rel_path} — resolve markers manually")
315 elif result.returncode < 0:
316 logger.warning(f"[merge-file] Error merging {rel_path}: {result.stderr.decode().strip()}")
317 all_clean = False
318 else:
319 logger.debug(f"[merge-file] Clean merge: {rel_path}")
321 if conflict_files:
322 logger.warning(
323 f"{len(conflict_files)} file(s) have conflict markers to resolve: " + ", ".join(conflict_files)
324 )
326 return all_clean
328 def _apply_diff(
329 self,
330 diff: str,
331 target: Path,
332 base_snapshot: Path | None = None,
333 upstream_snapshot: Path | None = None,
334 ) -> bool:
335 """Apply a diff to the target project using ``git apply -3`` (3-way merge).
337 When ``git apply -3`` fails because the template's blob objects are absent
338 from the target repository *and* both *base_snapshot* and
339 *upstream_snapshot* are provided, falls back to :func:`_merge_file_fallback`
340 which uses ``git merge-file`` on the on-disk snapshot files instead.
342 Otherwise falls back to ``git apply --reject``.
344 Args:
345 diff: Unified diff string.
346 target: Path to the target repository.
347 base_snapshot: Optional directory containing files at the base SHA.
348 upstream_snapshot: Optional directory containing files at the upstream SHA.
350 Returns:
351 True if the diff applied cleanly, False if there were conflicts.
352 """
353 if not diff.strip():
354 return True
356 try:
357 subprocess.run( # nosec B603 # noqa: S603
358 [self.executable, "apply", "-3"],
359 input=diff.encode() if isinstance(diff, str) else diff,
360 cwd=target,
361 check=True,
362 capture_output=True,
363 env=self.env,
364 )
365 except subprocess.CalledProcessError as e:
366 stderr = e.stderr.decode() if isinstance(e.stderr, bytes) else (e.stderr or "")
368 # git apply -3 cannot do a real 3-way merge when the template blobs are
369 # not present in the target repository's object store. If we have the
370 # snapshot directories on disk, use git merge-file instead — it works
371 # directly on file content and needs no shared git history.
372 if "lacks the necessary blob" in stderr and base_snapshot is not None and upstream_snapshot is not None:
373 logger.debug("git apply -3 lacks blob objects; switching to git merge-file fallback")
374 return self._merge_file_fallback(diff, target, base_snapshot, upstream_snapshot)
376 if stderr:
377 logger.warning(f"3-way merge had conflicts:\n{stderr.strip()}")
378 # Fall back to --reject for conflict files
379 try:
380 subprocess.run( # nosec B603 # noqa: S603
381 [self.executable, "apply", "--reject"],
382 input=diff.encode() if isinstance(diff, str) else diff,
383 cwd=target,
384 check=True,
385 capture_output=True,
386 env=self.env,
387 )
388 except subprocess.CalledProcessError as e2:
389 stderr2 = e2.stderr.decode() if isinstance(e2.stderr, bytes) else (e2.stderr or "")
390 if stderr2:
391 logger.warning(stderr2.strip())
392 logger.warning(
393 "Some changes could not be applied cleanly. Check for *.rej files and resolve conflicts manually."
394 )
395 return False
396 else:
397 return True
399 def _copy_files_to_target(self, snapshot_dir: Path, target: Path, materialized: list[Path]) -> None:
400 """Copy all materialized files from a snapshot into the target project.
402 Args:
403 snapshot_dir: Directory containing the snapshot files.
404 target: Path to the target repository.
405 materialized: List of relative file paths to copy.
406 """
407 for rel_path in sorted(materialized):
408 src = snapshot_dir / rel_path
409 dst = target / rel_path
410 dst.parent.mkdir(parents=True, exist_ok=True)
411 shutil.copy2(src, dst)
412 logger.success(f"[COPY] {rel_path}")
414 def sync_merge(
415 self,
416 target: Path,
417 upstream_snapshot: Path,
418 upstream_sha: str,
419 base_sha: str | None,
420 materialized: list[Path],
421 template: "RhizaTemplate",
422 excludes: set[str],
423 lock: "TemplateLock",
424 lock_file: "Path | None" = None,
425 ) -> None:
426 """Execute the merge strategy (cruft-style 3-way merge).
428 When a base SHA exists, computes the diff between base and upstream
429 snapshots and applies it via ``git apply -3``. On first sync (no base),
430 falls back to a simple copy.
432 Args:
433 target: Path to the target repository.
434 upstream_snapshot: Path to the upstream snapshot directory.
435 upstream_sha: HEAD SHA of the upstream template.
436 base_sha: Previously synced commit SHA, or None for first sync.
437 materialized: List of relative file paths.
438 template: The :class:`~rhiza.models.RhizaTemplate` driving this sync.
439 excludes: Set of relative paths to exclude.
440 lock: Pre-built :class:`~rhiza.models.TemplateLock` for this sync.
441 lock_file: Optional explicit path for the lock file. When ``None``
442 the default ``<target>/.rhiza/template.lock`` is used.
443 """
444 from rhiza.commands._sync_helpers import (
445 _clean_orphaned_files,
446 _read_previously_tracked_files,
447 _warn_about_workflow_files,
448 _write_lock,
449 )
451 # Snapshot the currently-tracked files before the merge runs. The merge
452 # may write a new lock (e.g. on the "template unchanged" early-return path
453 # in _merge_with_base), so we must read the old state first to ensure
454 # orphan cleanup compares against the previous sync, not the new one.
455 old_tracked_files = _read_previously_tracked_files(target, lock_file=lock_file)
457 base_snapshot = Path(tempfile.mkdtemp())
458 try:
459 if base_sha:
460 self._merge_with_base(
461 target,
462 upstream_snapshot,
463 upstream_sha,
464 base_sha,
465 base_snapshot,
466 template,
467 excludes,
468 lock,
469 lock_file=lock_file,
470 )
471 else:
472 logger.info("First sync — copying all template files")
473 self._copy_files_to_target(upstream_snapshot, target, materialized)
475 # Restore any template-managed files that are absent from the target.
476 # This can happen when files tracked by the template do not exist in the
477 # downstream repository — for example when the template snapshot was
478 # unchanged since the last sync so no diff was applied, but the files
479 # were never present or were manually deleted.
480 missing_from_target = [p for p in materialized if not (target / p).exists()]
481 if missing_from_target:
482 logger.info(f"Restoring {len(missing_from_target)} template file(s) missing from target")
483 self._copy_files_to_target(upstream_snapshot, target, missing_from_target)
485 _warn_about_workflow_files(materialized)
486 _clean_orphaned_files(
487 target,
488 materialized,
489 excludes=excludes,
490 base_snapshot=base_snapshot,
491 previously_tracked_files=old_tracked_files if old_tracked_files else None,
492 lock_file=lock_file,
493 )
494 _write_lock(target, lock, lock_file=lock_file)
495 logger.success(f"Sync complete — {len(materialized)} file(s) processed")
496 finally:
497 if base_snapshot.exists():
498 shutil.rmtree(base_snapshot)
500 def update_sparse_checkout(
501 self,
502 tmp_dir: Path,
503 include_paths: list[str],
504 logger=None,
505 ) -> None:
506 """Update sparse-checkout paths in an already-cloned repository.
508 Args:
509 tmp_dir: Temporary directory with cloned repository.
510 include_paths: Paths to include in sparse checkout.
511 logger: Optional logger; defaults to module logger.
512 """
513 logger = logger or logging.getLogger(__name__)
515 try:
516 logger.debug(f"Updating sparse checkout paths: {include_paths}")
517 subprocess.run( # nosec B603 # noqa: S603
518 [self.executable, "sparse-checkout", "set", "--skip-checks", *include_paths],
519 cwd=tmp_dir,
520 check=True,
521 capture_output=True,
522 text=True,
523 env=self.env,
524 )
525 logger.debug("Sparse checkout paths updated")
526 except subprocess.CalledProcessError as e:
527 logger.exception("Failed to update sparse checkout paths")
528 _log_git_stderr_errors(e.stderr)
529 raise
531 def get_head_sha(self, repo_dir: Path) -> str:
532 """Return the HEAD commit SHA of a cloned repository.
534 Args:
535 repo_dir: Path to the git repository.
537 Returns:
538 The full HEAD SHA.
539 """
540 result = subprocess.run( # nosec B603 # noqa: S603
541 [self.executable, "rev-parse", "HEAD"],
542 cwd=repo_dir,
543 capture_output=True,
544 text=True,
545 check=True,
546 env=self.env,
547 )
548 return result.stdout.strip()
550 def clone_repository(
551 self,
552 git_url: str,
553 tmp_dir: Path,
554 branch: str,
555 include_paths: list[str],
556 logger=None,
557 ) -> None:
558 """Clone template repository with sparse checkout.
560 Args:
561 git_url: URL of the repository to clone.
562 tmp_dir: Temporary directory for cloning.
563 branch: Branch to clone from the template repository.
564 include_paths: Paths to include in sparse checkout.
565 logger: Optional logger; defaults to module logger.
566 """
567 logger = logger or logging.getLogger(__name__)
569 try:
570 logger.debug("Executing git clone with sparse checkout")
571 subprocess.run( # nosec B603 # noqa: S603
572 [
573 self.executable,
574 "clone",
575 "--depth",
576 "1",
577 "--filter=blob:none",
578 "--sparse",
579 "--branch",
580 branch,
581 git_url,
582 str(tmp_dir),
583 ],
584 check=True,
585 capture_output=True,
586 text=True,
587 env=self.env,
588 )
589 logger.debug("Git clone completed successfully")
590 except subprocess.CalledProcessError as e:
591 logger.exception(f"Failed to clone repository from {git_url}")
592 _log_git_stderr_errors(e.stderr)
593 logger.exception("Please check that:")
594 logger.exception(" - The repository exists and is accessible")
595 logger.exception(f" - Branch '{branch}' exists in the repository")
596 logger.exception(" - You have network access to the git hosting service")
597 raise
599 try:
600 logger.debug("Initializing sparse checkout")
601 subprocess.run( # nosec B603 # noqa: S603
602 [self.executable, "sparse-checkout", "init", "--cone"],
603 cwd=tmp_dir,
604 check=True,
605 capture_output=True,
606 text=True,
607 env=self.env,
608 )
609 logger.debug("Sparse checkout initialized")
610 except subprocess.CalledProcessError as e:
611 logger.exception("Failed to initialize sparse checkout")
612 _log_git_stderr_errors(e.stderr)
613 raise
615 try:
616 logger.debug(f"Setting sparse checkout paths: {include_paths}")
617 subprocess.run( # nosec B603 # noqa: S603
618 [self.executable, "sparse-checkout", "set", "--skip-checks", *include_paths],
619 cwd=tmp_dir,
620 check=True,
621 capture_output=True,
622 text=True,
623 env=self.env,
624 )
625 logger.debug("Sparse checkout paths configured")
626 except subprocess.CalledProcessError as e:
627 logger.exception("Failed to configure sparse checkout paths")
628 _log_git_stderr_errors(e.stderr)
629 raise
631 def clone_at_sha(
632 self,
633 git_url: str,
634 sha: str,
635 dest: Path,
636 include_paths: list[str],
637 logger=None,
638 ) -> None:
639 """Clone the template repository and checkout a specific commit.
641 Args:
642 git_url: URL of the repository to clone.
643 sha: Commit SHA to check out.
644 dest: Target directory for the clone.
645 include_paths: Paths to include in sparse checkout.
646 logger: Optional logger; defaults to module logger.
647 """
648 logger = logger or logging.getLogger(__name__)
649 try:
650 subprocess.run( # nosec B603 # noqa: S603
651 [
652 self.executable,
653 "clone",
654 "--filter=blob:none",
655 "--sparse",
656 "--no-checkout",
657 git_url,
658 str(dest),
659 ],
660 check=True,
661 capture_output=True,
662 text=True,
663 env=self.env,
664 )
665 except subprocess.CalledProcessError as e:
666 logger.exception(f"Failed to clone repository for base snapshot: {git_url}")
667 _log_git_stderr_errors(e.stderr)
668 raise
670 try:
671 subprocess.run( # nosec B603 # noqa: S603
672 [self.executable, "sparse-checkout", "init", "--cone"],
673 cwd=dest,
674 check=True,
675 capture_output=True,
676 text=True,
677 env=self.env,
678 )
679 subprocess.run( # nosec B603 # noqa: S603
680 [self.executable, "sparse-checkout", "set", "--skip-checks", *include_paths],
681 cwd=dest,
682 check=True,
683 capture_output=True,
684 text=True,
685 env=self.env,
686 )
687 except subprocess.CalledProcessError as e:
688 logger.exception("Failed to configure sparse checkout for base snapshot")
689 _log_git_stderr_errors(e.stderr)
690 raise
692 try:
693 subprocess.run( # nosec B603 # noqa: S603
694 [self.executable, "checkout", sha],
695 cwd=dest,
696 check=True,
697 capture_output=True,
698 text=True,
699 env=self.env,
700 )
701 except subprocess.CalledProcessError as e:
702 logger.exception(f"Failed to checkout base commit {sha[:12]}")
703 _log_git_stderr_errors(e.stderr)
704 raise
706 def _merge_with_base(
707 self,
708 target: Path,
709 upstream_snapshot: Path,
710 upstream_sha: str,
711 base_sha: str,
712 base_snapshot: Path,
713 template: "RhizaTemplate",
714 excludes: set[str],
715 lock: "TemplateLock",
716 lock_file: "Path | None" = None,
717 ) -> None:
718 """Compute and apply the diff between base and upstream snapshots.
720 Args:
721 target: Path to the target repository.
722 upstream_snapshot: Path to the upstream snapshot directory.
723 upstream_sha: HEAD SHA of the upstream template.
724 base_sha: Previously synced commit SHA.
725 base_snapshot: Directory to populate with the base snapshot.
726 template: The :class:`~rhiza.models.RhizaTemplate` driving this sync.
727 excludes: Set of relative paths to exclude.
728 lock: Pre-built :class:`~rhiza.models.TemplateLock` for this sync.
729 lock_file: Optional explicit path for the lock file. When ``None``
730 the default ``<target>/.rhiza/template.lock`` is used.
731 """
732 from rhiza.commands._sync_helpers import _write_lock
734 logger.info(f"Cloning base snapshot at {base_sha[:12]}")
735 base_clone = Path(tempfile.mkdtemp())
736 try:
737 self.clone_at_sha(template.git_url, base_sha, base_clone, template.include)
738 _prepare_snapshot(base_clone, template.include, excludes, base_snapshot)
739 except Exception:
740 logger.warning("Could not checkout base commit — treating all files as new")
741 finally:
742 if base_clone.exists():
743 shutil.rmtree(base_clone)
745 diff = self.get_diff(base_snapshot, upstream_snapshot)
747 if not diff.strip():
748 logger.success("Template unchanged since last sync — nothing to apply")
749 _write_lock(target, lock, lock_file=lock_file)
750 return
752 logger.info("Applying template changes via 3-way merge (cruft)...")
753 clean = self._apply_diff(diff, target, base_snapshot=base_snapshot, upstream_snapshot=upstream_snapshot)
755 if clean:
756 logger.success("All changes applied cleanly")
757 else:
758 logger.warning("Some changes had conflicts. Check for *.rej files and resolve manually.")
761def _normalize_to_list(value: Any | list[Any] | None) -> list[Any]:
762 r"""Convert a value to a list of strings.
764 Handles the case where YAML multi-line strings (using |) are parsed as
765 a single string instead of a list. Splits the string by newlines and
766 strips whitespace from each item.
768 Args:
769 value: A string, list of strings, or None.
771 Returns:
772 A list of strings. Empty list if value is None or empty.
774 Examples:
775 >>> _normalize_to_list(None)
776 []
777 >>> _normalize_to_list([])
778 []
779 >>> _normalize_to_list(['a', 'b', 'c'])
780 ['a', 'b', 'c']
781 >>> _normalize_to_list('single line')
782 ['single line']
783 >>> _normalize_to_list('line1\\n' + 'line2\\n' + 'line3')
784 ['line1', 'line2', 'line3']
785 >>> _normalize_to_list(' item1 \\n' + ' item2 ')
786 ['item1', 'item2']
787 """
788 if value is None:
789 return []
790 if isinstance(value, list):
791 return value
792 if isinstance(value, str):
793 # Split by newlines and filter out empty strings
794 # Handle both actual newlines (\n) and literal backslash-n (\\n)
795 items = value.split("\\n") if "\\n" in value and "\n" not in value else value.split("\n")
796 return [item.strip() for item in items if item.strip()]
797 return []
800def get_git_executable() -> str:
801 """Get the absolute path to the git executable.
803 This function ensures we use the full path to git to prevent
804 security issues related to PATH manipulation.
806 Returns:
807 str: Absolute path to the git executable.
809 Raises:
810 RuntimeError: If git executable is not found in PATH.
811 """
812 git_path = shutil.which("git")
813 if git_path is None:
814 msg = "git executable not found in PATH. Please ensure git is installed and available."
815 raise RuntimeError(msg)
816 return git_path
819def _log_git_stderr_errors(stderr: str | None) -> None:
820 """Extract and log only relevant error messages from git stderr.
822 Args:
823 stderr: Git command stderr output.
824 """
825 if stderr:
826 for line in stderr.strip().split("\n"):
827 line = line.strip()
828 if line and (line.startswith("fatal:") or line.startswith("error:")):
829 logger.error(line)
832def _expand_paths(base_dir: Path, paths: list[str]) -> list[Path]:
833 """Expand file/directory paths relative to *base_dir* into individual files.
835 Args:
836 base_dir: Root directory to resolve against.
837 paths: Relative path strings.
839 Returns:
840 Flat list of file paths.
841 """
842 all_files: list[Path] = []
843 for p in paths:
844 full = base_dir / p
845 if full.is_file():
846 all_files.append(full)
847 elif full.is_dir():
848 all_files.extend(f for f in full.rglob("*") if f.is_file())
849 else:
850 logger.debug(f"Path not found in template repository: {p}")
851 return all_files
854def _excluded_set(base_dir: Path, excluded_paths: list[str]) -> set[str]:
855 """Build a set of relative path strings that should be excluded.
857 Args:
858 base_dir: Root of the template clone.
859 excluded_paths: User-configured exclude list.
861 Returns:
862 Set of relative path strings (always includes rhiza internals).
863 """
864 result: set[str] = set()
865 for f in _expand_paths(base_dir, excluded_paths):
866 result.add(str(f.relative_to(base_dir)))
867 result.add(".rhiza/template.yml")
868 result.add(".rhiza/history")
869 return result
872def _prepare_snapshot(
873 clone_dir: Path,
874 include_paths: list[str],
875 excludes: set[str],
876 snapshot_dir: Path,
877) -> list[Path]:
878 """Copy included (non-excluded) files from a clone into a snapshot directory.
880 Args:
881 clone_dir: Root of the template clone.
882 include_paths: Paths to include.
883 excludes: Set of relative paths to exclude.
884 snapshot_dir: Destination directory for the snapshot.
886 Returns:
887 List of relative file paths that were copied.
888 """
889 materialized: list[Path] = []
890 for f in _expand_paths(clone_dir, include_paths):
891 rel = str(f.relative_to(clone_dir))
892 if rel not in excludes:
893 dst = snapshot_dir / rel
894 dst.parent.mkdir(parents=True, exist_ok=True)
895 shutil.copy2(f, dst)
896 materialized.append(Path(rel))
897 return materialized