Coverage for src / rhiza / models / _git_utils.py: 100%
360 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-06-15 18:22 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-06-15 18:22 +0000
1"""Git utility helpers for Rhiza models."""
3import logging
4import os
5import shutil
6import subprocess # nosec B404
7import tempfile
8from dataclasses import dataclass, field
9from pathlib import Path
10from typing import TYPE_CHECKING, Any
12from loguru import logger
14if TYPE_CHECKING:
15 from rhiza.models.lock import TemplateLock
16 from rhiza.models.template import RhizaTemplate
19@dataclass
20class GitContext:
21 """Bundles the git executable path and environment for subprocess calls.
23 All git-invoking functions in the sync helpers accept a
24 :class:`GitContext` instead of resolving the executable on their own,
25 making them easily testable via binary injection.
27 Attributes:
28 executable: Absolute path to the git binary.
29 env: Environment variables passed to every git subprocess.
30 """
32 executable: str
33 env: dict[str, str] = field(default_factory=dict)
35 @classmethod
36 def default(cls) -> "GitContext":
37 """Create a GitContext using the system git and process environment.
39 Returns:
40 A :class:`GitContext` populated with the real git executable path
41 and a copy of the current process environment with
42 ``GIT_TERMINAL_PROMPT`` set to ``"0"``.
43 """
44 env = os.environ.copy()
45 env["GIT_TERMINAL_PROMPT"] = "0"
46 return cls(executable=get_git_executable(), env=env)
48 def assert_status_clean(self, target: Path) -> None:
49 """Raise RuntimeError if the target repository has uncommitted changes.
51 Runs ``git status --porcelain`` and raises if the output is non-empty,
52 preventing a sync from running on a dirty working tree.
54 Args:
55 target: Path to the target repository.
57 Raises:
58 RuntimeError: If the working tree has uncommitted changes.
59 """
60 result = subprocess.run( # nosec B603 # noqa: S603
61 [self.executable, "status", "--porcelain"],
62 cwd=target,
63 capture_output=True,
64 text=True,
65 env=self.env,
66 )
67 if result.stdout.strip():
68 logger.error("Working tree is not clean. Please commit or stash your changes before syncing.")
69 logger.error("Uncommitted changes:")
70 for line in result.stdout.strip().splitlines():
71 logger.error(f" {line}")
72 raise RuntimeError("Working tree is not clean. Please commit or stash your changes before syncing.") # noqa: TRY003
74 def handle_target_branch(self, target: Path, target_branch: str | None) -> None:
75 """Handle target branch creation or checkout if specified.
77 Args:
78 target: Path to the target repository.
79 target_branch: Optional branch name to create/checkout.
80 """
81 if not target_branch:
82 return
84 logger.info(f"Creating/checking out target branch: {target_branch}")
85 try:
86 result = subprocess.run( # nosec B603 # noqa: S603
87 [self.executable, "rev-parse", "--verify", target_branch],
88 cwd=target,
89 capture_output=True,
90 text=True,
91 env=self.env,
92 )
94 if result.returncode == 0:
95 logger.info(f"Branch '{target_branch}' exists, checking out...")
96 subprocess.run( # nosec B603 # noqa: S603
97 [self.executable, "checkout", target_branch],
98 cwd=target,
99 check=True,
100 capture_output=True,
101 text=True,
102 env=self.env,
103 )
104 else:
105 logger.info(f"Creating new branch '{target_branch}'...")
106 subprocess.run( # nosec B603 # noqa: S603
107 [self.executable, "checkout", "-b", target_branch],
108 cwd=target,
109 check=True,
110 capture_output=True,
111 text=True,
112 env=self.env,
113 )
114 except subprocess.CalledProcessError as e:
115 logger.error(f"Failed to create/checkout branch '{target_branch}'")
116 _log_git_stderr_errors(e.stderr)
117 logger.error("Please ensure you have no uncommitted changes or conflicts")
118 raise
120 def get_diff(self, repo0: Path, repo1: Path) -> str:
121 """Compute the raw diff between two directory trees using ``git diff --no-index``.
123 Args:
124 repo0: Path to the base (old) directory tree.
125 repo1: Path to the upstream (new) directory tree.
126 """
127 repo0_str = repo0.resolve().as_posix()
128 repo1_str = repo1.resolve().as_posix()
129 result = subprocess.run( # nosec B603 # noqa: S603
130 [
131 self.executable,
132 "-c",
133 "diff.noprefix=",
134 "diff",
135 "--no-index",
136 "--relative",
137 "--binary",
138 "--src-prefix=upstream-template-old/",
139 "--dst-prefix=upstream-template-new/",
140 "--no-ext-diff",
141 "--no-color",
142 repo0_str,
143 repo1_str,
144 ],
145 cwd=repo0_str,
146 capture_output=True,
147 env=self.env,
148 )
149 diff = result.stdout.decode() if isinstance(result.stdout, bytes) else (result.stdout or "")
150 for repo in [repo0_str, repo1_str]:
151 from re import sub
153 repo_nix = sub("/[a-z]:", "", repo)
154 diff = diff.replace(f"upstream-template-old{repo_nix}", "upstream-template-old").replace(
155 f"upstream-template-new{repo_nix}", "upstream-template-new"
156 )
157 diff = diff.replace(repo0_str + "/", "").replace(repo1_str + "/", "")
158 return diff
160 def sync_diff(self, target: Path, upstream_snapshot: Path) -> None:
161 """Execute the diff (dry-run) strategy.
163 Shows what would change without modifying any files.
165 Args:
166 target: Path to the target repository.
167 upstream_snapshot: Path to the upstream snapshot directory.
168 """
169 diff = self.get_diff(target, upstream_snapshot)
170 if diff.strip():
171 logger.info(f"\n{diff}")
172 changes = diff.count("diff --git")
173 logger.info(f"{changes} file(s) would be changed")
174 else:
175 logger.success("No differences found")
177 def _parse_diff_filenames(self, diff: str) -> list[tuple[str, bool, bool]]:
178 """Parse a unified diff produced by :func:`GitContext.get_diff` into file entries.
180 Each entry is ``(rel_path, is_new, is_deleted)`` where *rel_path* is the
181 path relative to both snapshot directories.
183 Args:
184 diff: Unified diff string from :func:`GitContext.get_diff`.
186 Returns:
187 List of ``(rel_path, is_new, is_deleted)`` tuples, one per changed file.
188 """
189 src_prefix = "upstream-template-old/"
190 dst_prefix = "upstream-template-new/"
192 results: list[tuple[str, bool, bool]] = []
193 is_new = False
194 is_deleted = False
195 src_path: str | None = None
196 dst_path: str | None = None
197 in_diff = False
199 def _flush() -> None:
200 """Emit the current file entry into results if a path was captured."""
201 rel = dst_path if not is_deleted else src_path
202 if rel:
203 results.append((rel, is_new, is_deleted))
205 for line in diff.splitlines():
206 if line.startswith("diff --git "):
207 if in_diff:
208 _flush()
209 is_new = False
210 is_deleted = False
211 src_path = None
212 dst_path = None
213 in_diff = True
214 elif line.startswith("new file mode"):
215 is_new = True
216 elif line.startswith("deleted file mode"):
217 is_deleted = True
218 elif line.startswith("--- "):
219 raw = line[4:].strip().strip('"').split("\t")[0]
220 if raw != "/dev/null" and raw.startswith(src_prefix):
221 src_path = raw[len(src_prefix) :]
222 elif line.startswith("+++ "):
223 raw = line[4:].strip().strip('"').split("\t")[0]
224 if raw != "/dev/null" and raw.startswith(dst_prefix):
225 dst_path = raw[len(dst_prefix) :]
227 if in_diff:
228 _flush()
230 return results
232 def _merge_file_fallback(
233 self,
234 diff: str,
235 target: Path,
236 base_snapshot: Path,
237 upstream_snapshot: Path,
238 ) -> bool:
239 """Apply *diff* file-by-file using ``git merge-file``.
241 Unlike ``git apply -3``, ``git merge-file`` works directly on the file
242 contents from *base_snapshot* and *upstream_snapshot*, so it does not
243 require the template's blob objects to exist in the target repository.
245 Conflict markers (``<<<<<<< HEAD`` / ``=======`` / ``>>>>>>> rhiza-template``) are left in
246 place for manual resolution when both sides changed the same region.
248 Args:
249 diff: Unified diff string (used only for file-list parsing).
250 target: Path to the target repository.
251 base_snapshot: Directory containing files at the previously-synced SHA.
252 upstream_snapshot: Directory containing files at the new upstream SHA.
254 Returns:
255 True if every file merged cleanly, False if any conflicts remain.
256 """
257 file_entries = self._parse_diff_filenames(diff)
258 all_clean = True
259 conflict_files: list[str] = []
261 for rel_path, is_new, is_deleted in file_entries:
262 target_file = target / rel_path
263 upstream_file = upstream_snapshot / rel_path
264 base_file = base_snapshot / rel_path
266 if is_new:
267 if upstream_file.exists():
268 target_file.parent.mkdir(parents=True, exist_ok=True)
269 shutil.copy2(upstream_file, target_file)
270 logger.debug(f"[merge-file] Added: {rel_path}")
271 continue
273 if is_deleted:
274 if target_file.exists():
275 target_file.unlink()
276 logger.debug(f"[merge-file] Deleted: {rel_path}")
277 continue
279 # Modified file — attempt a 3-way merge using the on-disk snapshots.
280 if not target_file.exists():
281 if upstream_file.exists():
282 target_file.parent.mkdir(parents=True, exist_ok=True)
283 shutil.copy2(upstream_file, target_file)
284 logger.debug(f"[merge-file] Created (missing in target): {rel_path}")
285 continue
287 if not base_file.exists() or not upstream_file.exists():
288 # Cannot 3-way-merge without both sides; just take upstream.
289 if upstream_file.exists():
290 shutil.copy2(upstream_file, target_file)
291 logger.debug(f"[merge-file] Overwrite (no base): {rel_path}")
292 continue
294 result = subprocess.run( # nosec B603 # noqa: S603
295 [
296 self.executable,
297 "merge-file",
298 "-L",
299 "HEAD",
300 "-L",
301 "base",
302 "-L",
303 "rhiza-template",
304 str(target_file),
305 str(base_file),
306 str(upstream_file),
307 ],
308 capture_output=True,
309 env=self.env,
310 )
312 if result.returncode > 0:
313 conflict_files.append(rel_path)
314 all_clean = False
315 logger.warning(f"[merge-file] Conflict in {rel_path} — resolve markers manually")
316 elif result.returncode < 0:
317 logger.warning(f"[merge-file] Error merging {rel_path}: {result.stderr.decode().strip()}")
318 all_clean = False
319 else:
320 logger.debug(f"[merge-file] Clean merge: {rel_path}")
322 if conflict_files:
323 detail = "\n".join(f" {f}" for f in conflict_files)
324 logger.warning(
325 f"The following file(s) have conflict markers to resolve:\n{detail}\n"
326 " Resolve each <<<<<<< / ======= / >>>>>>> block and remove the markers\n"
327 " before committing."
328 )
330 return all_clean
332 def _scan_conflict_artifacts(self, target: Path) -> tuple[list[str], list[str]]:
333 """Scan *target* for merge-conflict artifacts left by git.
335 Looks for:
337 - ``*.rej`` files produced by ``git apply --reject``.
338 - Text files that contain ``<<<<<<<`` conflict markers (from
339 ``git apply -3`` or ``git merge-file``).
341 Args:
342 target: Root of the working tree to scan.
344 Returns:
345 A ``(rej_files, marker_files)`` tuple, each a sorted list of
346 paths relative to *target*.
347 """
348 rej_files: list[str] = []
349 marker_files: list[str] = []
350 for path in sorted(target.rglob("*")):
351 if not path.is_file():
352 continue
353 rel = str(path.relative_to(target))
354 if path.suffix == ".rej":
355 rej_files.append(rel)
356 else:
357 try:
358 # Read up to 1 MB to avoid stalling on large binary files.
359 content = path.read_bytes()[:1_048_576]
360 if b"<<<<<<<" in content:
361 marker_files.append(rel)
362 except OSError:
363 pass
364 return rej_files, marker_files
366 def _apply_diff(
367 self,
368 diff: str,
369 target: Path,
370 base_snapshot: Path | None = None,
371 upstream_snapshot: Path | None = None,
372 ) -> bool:
373 """Apply a diff to the target project using ``git apply -3`` (3-way merge).
375 When ``git apply -3`` fails because the template's blob objects are absent
376 from the target repository *and* both *base_snapshot* and
377 *upstream_snapshot* are provided, falls back to :func:`_merge_file_fallback`
378 which uses ``git merge-file`` on the on-disk snapshot files instead.
380 Otherwise falls back to ``git apply --reject``.
382 Args:
383 diff: Unified diff string.
384 target: Path to the target repository.
385 base_snapshot: Optional directory containing files at the base SHA.
386 upstream_snapshot: Optional directory containing files at the upstream SHA.
388 Returns:
389 True if the diff applied cleanly, False if there were conflicts.
390 """
391 if not diff.strip():
392 return True
394 try:
395 subprocess.run( # nosec B603 # noqa: S603
396 [self.executable, "apply", "-3"],
397 input=diff.encode() if isinstance(diff, str) else diff,
398 cwd=target,
399 check=True,
400 capture_output=True,
401 env=self.env,
402 )
403 except subprocess.CalledProcessError as e:
404 stderr = e.stderr.decode() if isinstance(e.stderr, bytes) else (e.stderr or "")
406 # git apply -3 cannot do a real 3-way merge when the template blobs are
407 # not present in the target repository's object store. If we have the
408 # snapshot directories on disk, use git merge-file instead — it works
409 # directly on file content and needs no shared git history.
410 if "lacks the necessary blob" in stderr and base_snapshot is not None and upstream_snapshot is not None:
411 logger.debug("git apply -3 lacks blob objects; switching to git merge-file fallback")
412 return self._merge_file_fallback(diff, target, base_snapshot, upstream_snapshot)
414 if stderr:
415 logger.warning(f"3-way merge had conflicts:\n{stderr.strip()}")
416 # Fall back to --reject for conflict files
417 try:
418 subprocess.run( # nosec B603 # noqa: S603
419 [self.executable, "apply", "--reject"],
420 input=diff.encode() if isinstance(diff, str) else diff,
421 cwd=target,
422 check=True,
423 capture_output=True,
424 env=self.env,
425 )
426 except subprocess.CalledProcessError as e2:
427 stderr2 = e2.stderr.decode() if isinstance(e2.stderr, bytes) else (e2.stderr or "")
428 if stderr2:
429 logger.warning(stderr2.strip())
431 # Scan and report any conflict artifacts left behind so users know
432 # exactly which files need attention.
433 rej_files, marker_files = self._scan_conflict_artifacts(target)
434 if rej_files:
435 rej_detail = "\n".join(
436 f" {f.removesuffix('.rej')} (unresolved hunks saved to {f})" for f in rej_files
437 )
438 logger.warning(
439 f"The following file(s) have unresolved hunks:\n{rej_detail}\n"
440 " Open each .rej file, manually apply the diff hunks to the source file,\n"
441 " then delete the .rej file before committing."
442 )
443 if marker_files:
444 marker_detail = "\n".join(f" {f}" for f in marker_files)
445 logger.warning(
446 f"The following file(s) contain conflict markers:\n{marker_detail}\n"
447 " Resolve each <<<<<<< / ======= / >>>>>>> block and remove the markers\n"
448 " before committing."
449 )
450 if not rej_files and not marker_files:
451 logger.warning("Some changes could not be applied cleanly — check the working tree for partial edits.")
452 return False
453 else:
454 return True
456 def _copy_files_to_target(self, snapshot_dir: Path, target: Path, materialized: list[Path]) -> None:
457 """Copy all materialized files from a snapshot into the target project.
459 Args:
460 snapshot_dir: Directory containing the snapshot files.
461 target: Path to the target repository.
462 materialized: List of relative file paths to copy.
463 """
464 for rel_path in sorted(materialized):
465 src = snapshot_dir / rel_path
466 dst = target / rel_path
467 dst.parent.mkdir(parents=True, exist_ok=True)
468 shutil.copy2(src, dst)
469 logger.success(f"[COPY] {rel_path}")
471 def sync_merge(
472 self,
473 target: Path,
474 upstream_snapshot: Path,
475 upstream_sha: str,
476 base_sha: str | None,
477 materialized: list[Path],
478 template: "RhizaTemplate",
479 excludes: set[str],
480 lock: "TemplateLock",
481 lock_file: "Path | None" = None,
482 path_map: "dict[str, str] | None" = None,
483 ) -> bool:
484 """Execute the merge strategy (cruft-style 3-way merge).
486 When a base SHA exists, computes the diff between base and upstream
487 snapshots and applies it via ``git apply -3``. On first sync (no base),
488 falls back to a simple copy.
490 Args:
491 target: Path to the target repository.
492 upstream_snapshot: Path to the upstream snapshot directory.
493 upstream_sha: HEAD SHA of the upstream template.
494 base_sha: Previously synced commit SHA, or None for first sync.
495 materialized: List of relative file paths.
496 template: The :class:`~rhiza.models.RhizaTemplate` driving this sync.
497 excludes: Set of relative paths to exclude.
498 lock: Pre-built :class:`~rhiza.models.TemplateLock` for this sync.
499 lock_file: Optional explicit path for the lock file. When ``None``
500 the default ``<target>/.rhiza/template.lock`` is used.
501 path_map: Optional source→destination path mapping for remapped
502 bundle file entries.
504 Returns:
505 True if all changes applied cleanly, False if any conflicts remain.
506 """
507 from rhiza.commands._sync_helpers import (
508 _clean_orphaned_files,
509 _read_previously_tracked_files,
510 _warn_about_workflow_files,
511 _write_lock,
512 )
514 # Snapshot the currently-tracked files before the merge runs. The merge
515 # may write a new lock (e.g. on the "template unchanged" early-return path
516 # in _merge_with_base), so we must read the old state first to ensure
517 # orphan cleanup compares against the previous sync, not the new one.
518 old_tracked_files = _read_previously_tracked_files(target, lock_file=lock_file)
520 base_snapshot = Path(tempfile.mkdtemp())
521 clean = True
522 try:
523 if base_sha:
524 clean = self._merge_with_base(
525 target,
526 upstream_snapshot,
527 upstream_sha,
528 base_sha,
529 base_snapshot,
530 template,
531 excludes,
532 lock,
533 lock_file=lock_file,
534 path_map=path_map,
535 )
536 else:
537 logger.info("First sync — copying all template files")
538 self._copy_files_to_target(upstream_snapshot, target, materialized)
540 # Restore any template-managed files that are absent from the target.
541 # This can happen when files tracked by the template do not exist in the
542 # downstream repository — for example when the template snapshot was
543 # unchanged since the last sync so no diff was applied, but the files
544 # were never present or were manually deleted.
545 missing_from_target = [p for p in materialized if not (target / p).exists()]
546 if missing_from_target:
547 logger.info(f"Restoring {len(missing_from_target)} template file(s) missing from target")
548 self._copy_files_to_target(upstream_snapshot, target, missing_from_target)
550 _warn_about_workflow_files(materialized)
551 _clean_orphaned_files(
552 target,
553 materialized,
554 excludes=excludes,
555 base_snapshot=base_snapshot,
556 previously_tracked_files=old_tracked_files if old_tracked_files else None,
557 lock_file=lock_file,
558 )
559 _write_lock(target, lock, lock_file=lock_file)
560 logger.success(f"Sync complete — {len(materialized)} file(s) processed")
561 finally:
562 if base_snapshot.exists():
563 shutil.rmtree(base_snapshot)
565 return clean
567 def update_sparse_checkout(
568 self,
569 tmp_dir: Path,
570 include_paths: list[str],
571 logger: logging.Logger | None = None,
572 ) -> None:
573 """Update sparse-checkout paths in an already-cloned repository.
575 Args:
576 tmp_dir: Temporary directory with cloned repository.
577 include_paths: Paths to include in sparse checkout.
578 logger: Optional logger; defaults to module logger.
579 """
580 logger = logger or logging.getLogger(__name__)
582 try:
583 logger.debug(f"Updating sparse checkout paths: {include_paths}")
584 subprocess.run( # nosec B603 # noqa: S603
585 [self.executable, "sparse-checkout", "set", "--skip-checks", *include_paths],
586 cwd=tmp_dir,
587 check=True,
588 capture_output=True,
589 text=True,
590 env=self.env,
591 )
592 logger.debug("Sparse checkout paths updated")
593 except subprocess.CalledProcessError as e:
594 logger.exception("Failed to update sparse checkout paths")
595 _log_git_stderr_errors(e.stderr)
596 raise
598 def get_head_sha(self, repo_dir: Path) -> str:
599 """Return the HEAD commit SHA of a cloned repository.
601 Args:
602 repo_dir: Path to the git repository.
604 Returns:
605 The full HEAD SHA.
606 """
607 result = subprocess.run( # nosec B603 # noqa: S603
608 [self.executable, "rev-parse", "HEAD"],
609 cwd=repo_dir,
610 capture_output=True,
611 text=True,
612 check=True,
613 env=self.env,
614 )
615 return result.stdout.strip()
617 def clone_repository(
618 self,
619 git_url: str,
620 tmp_dir: Path,
621 branch: str,
622 include_paths: list[str],
623 logger: logging.Logger | None = None,
624 ) -> None:
625 """Clone template repository with sparse checkout.
627 Args:
628 git_url: URL of the repository to clone.
629 tmp_dir: Temporary directory for cloning.
630 branch: Branch to clone from the template repository.
631 include_paths: Paths to include in sparse checkout.
632 logger: Optional logger; defaults to module logger.
633 """
634 logger = logger or logging.getLogger(__name__)
636 try:
637 logger.debug("Executing git clone with sparse checkout")
638 subprocess.run( # nosec B603 # noqa: S603
639 [
640 self.executable,
641 "clone",
642 "--depth",
643 "1",
644 "--filter=blob:none",
645 "--sparse",
646 "--branch",
647 branch,
648 git_url,
649 str(tmp_dir),
650 ],
651 check=True,
652 capture_output=True,
653 text=True,
654 env=self.env,
655 )
656 logger.debug("Git clone completed successfully")
657 except subprocess.CalledProcessError as e:
658 logger.exception(f"Failed to clone repository from {git_url}")
659 _log_git_stderr_errors(e.stderr)
660 logger.exception("Please check that:")
661 logger.exception(" - The repository exists and is accessible")
662 logger.exception(f" - Branch '{branch}' exists in the repository")
663 logger.exception(" - You have network access to the git hosting service")
664 raise
666 try:
667 logger.debug("Initializing sparse checkout")
668 subprocess.run( # nosec B603 # noqa: S603
669 [self.executable, "sparse-checkout", "init", "--cone"],
670 cwd=tmp_dir,
671 check=True,
672 capture_output=True,
673 text=True,
674 env=self.env,
675 )
676 logger.debug("Sparse checkout initialized")
677 except subprocess.CalledProcessError as e:
678 logger.exception("Failed to initialize sparse checkout")
679 _log_git_stderr_errors(e.stderr)
680 raise
682 try:
683 logger.debug(f"Setting sparse checkout paths: {include_paths}")
684 subprocess.run( # nosec B603 # noqa: S603
685 [self.executable, "sparse-checkout", "set", "--skip-checks", *include_paths],
686 cwd=tmp_dir,
687 check=True,
688 capture_output=True,
689 text=True,
690 env=self.env,
691 )
692 logger.debug("Sparse checkout paths configured")
693 except subprocess.CalledProcessError as e:
694 logger.exception("Failed to configure sparse checkout paths")
695 _log_git_stderr_errors(e.stderr)
696 raise
698 def clone_at_sha(
699 self,
700 git_url: str,
701 sha: str,
702 dest: Path,
703 include_paths: list[str],
704 logger: logging.Logger | None = None,
705 ) -> None:
706 """Clone the template repository and checkout a specific commit.
708 Args:
709 git_url: URL of the repository to clone.
710 sha: Commit SHA to check out.
711 dest: Target directory for the clone.
712 include_paths: Paths to include in sparse checkout.
713 logger: Optional logger; defaults to module logger.
714 """
715 logger = logger or logging.getLogger(__name__)
716 try:
717 subprocess.run( # nosec B603 # noqa: S603
718 [
719 self.executable,
720 "clone",
721 "--filter=blob:none",
722 "--sparse",
723 "--no-checkout",
724 git_url,
725 str(dest),
726 ],
727 check=True,
728 capture_output=True,
729 text=True,
730 env=self.env,
731 )
732 except subprocess.CalledProcessError as e:
733 logger.exception(f"Failed to clone repository for base snapshot: {git_url}")
734 _log_git_stderr_errors(e.stderr)
735 raise
737 try:
738 subprocess.run( # nosec B603 # noqa: S603
739 [self.executable, "sparse-checkout", "init", "--cone"],
740 cwd=dest,
741 check=True,
742 capture_output=True,
743 text=True,
744 env=self.env,
745 )
746 subprocess.run( # nosec B603 # noqa: S603
747 [self.executable, "sparse-checkout", "set", "--skip-checks", *include_paths],
748 cwd=dest,
749 check=True,
750 capture_output=True,
751 text=True,
752 env=self.env,
753 )
754 except subprocess.CalledProcessError as e:
755 logger.exception("Failed to configure sparse checkout for base snapshot")
756 _log_git_stderr_errors(e.stderr)
757 raise
759 try:
760 subprocess.run( # nosec B603 # noqa: S603
761 [self.executable, "checkout", sha],
762 cwd=dest,
763 check=True,
764 capture_output=True,
765 text=True,
766 env=self.env,
767 )
768 except subprocess.CalledProcessError as e:
769 logger.exception(f"Failed to checkout base commit {sha[:12]}")
770 _log_git_stderr_errors(e.stderr)
771 raise
773 def _merge_with_base(
774 self,
775 target: Path,
776 upstream_snapshot: Path,
777 upstream_sha: str, # noqa: ARG002 # part of the merge-call signature; lock carries the sha
778 base_sha: str,
779 base_snapshot: Path,
780 template: "RhizaTemplate",
781 excludes: set[str],
782 lock: "TemplateLock",
783 lock_file: "Path | None" = None,
784 path_map: "dict[str, str] | None" = None,
785 ) -> bool:
786 """Compute and apply the diff between base and upstream snapshots.
788 Args:
789 target: Path to the target repository.
790 upstream_snapshot: Path to the upstream snapshot directory.
791 upstream_sha: HEAD SHA of the upstream template.
792 base_sha: Previously synced commit SHA.
793 base_snapshot: Directory to populate with the base snapshot.
794 template: The :class:`~rhiza.models.RhizaTemplate` driving this sync.
795 excludes: Set of relative paths to exclude.
796 lock: Pre-built :class:`~rhiza.models.TemplateLock` for this sync.
797 lock_file: Optional explicit path for the lock file. When ``None``
798 the default ``<target>/.rhiza/template.lock`` is used.
799 path_map: Optional source→destination path mapping for remapped
800 bundle file entries.
802 Returns:
803 True if all changes applied cleanly, False if any conflicts remain.
804 """
805 from rhiza.commands._sync_helpers import _write_lock
807 logger.info(f"Cloning base snapshot at {base_sha[:12]}")
808 base_clone = Path(tempfile.mkdtemp())
809 try:
810 self.clone_at_sha(template.git_url, base_sha, base_clone, template.include)
811 _prepare_snapshot(base_clone, template.include, excludes, base_snapshot, path_map=path_map)
812 except Exception: # noqa: BLE001 # clone/snapshot can fail many ways; on any failure treat all files as new
813 logger.warning("Could not checkout base commit — treating all files as new")
814 finally:
815 if base_clone.exists():
816 shutil.rmtree(base_clone)
818 diff = self.get_diff(base_snapshot, upstream_snapshot)
820 if not diff.strip():
821 logger.success("Template unchanged since last sync — nothing to apply")
822 _write_lock(target, lock, lock_file=lock_file)
823 return True
825 logger.info("Applying template changes via 3-way merge (cruft)...")
826 clean = self._apply_diff(diff, target, base_snapshot=base_snapshot, upstream_snapshot=upstream_snapshot)
828 if clean:
829 logger.success("All changes applied cleanly")
830 else:
831 logger.warning("Some changes had conflicts. Check for *.rej files and resolve manually.")
833 return clean
836def _normalize_to_list(value: Any | list[Any] | None) -> list[Any]:
837 r"""Convert a value to a list of strings.
839 Handles the case where YAML multi-line strings (using |) are parsed as
840 a single string instead of a list. Splits the string by newlines and
841 strips whitespace from each item.
843 Args:
844 value: A string, list of strings, or None.
846 Returns:
847 A list of strings. Empty list if value is None or empty.
849 Examples:
850 >>> _normalize_to_list(None)
851 []
852 >>> _normalize_to_list([])
853 []
854 >>> _normalize_to_list(['a', 'b', 'c'])
855 ['a', 'b', 'c']
856 >>> _normalize_to_list('single line')
857 ['single line']
858 >>> _normalize_to_list('line1\\n' + 'line2\\n' + 'line3')
859 ['line1', 'line2', 'line3']
860 >>> _normalize_to_list(' item1 \\n' + ' item2 ')
861 ['item1', 'item2']
862 """
863 if value is None:
864 return []
865 if isinstance(value, list):
866 return value
867 if isinstance(value, str):
868 # Split by newlines and filter out empty strings
869 # Handle both actual newlines (\n) and literal backslash-n (\\n)
870 items = value.split("\\n") if "\\n" in value and "\n" not in value else value.split("\n")
871 return [item.strip() for item in items if item.strip()]
872 return []
875def get_git_executable() -> str:
876 """Get the absolute path to the git executable.
878 This function ensures we use the full path to git to prevent
879 security issues related to PATH manipulation.
881 Returns:
882 str: Absolute path to the git executable.
884 Raises:
885 RuntimeError: If git executable is not found in PATH.
886 """
887 git_path = shutil.which("git")
888 if git_path is None:
889 msg = "git executable not found in PATH. Please ensure git is installed and available."
890 raise RuntimeError(msg)
891 return git_path
894def _log_git_stderr_errors(stderr: str | None) -> None:
895 """Extract and log only relevant error messages from git stderr.
897 Args:
898 stderr: Git command stderr output.
899 """
900 if stderr:
901 for line in stderr.strip().split("\n"):
902 line = line.strip()
903 if line and (line.startswith(("fatal:", "error:"))):
904 logger.error(line)
907def _expand_paths(base_dir: Path, paths: list[str]) -> list[Path]:
908 """Expand file/directory paths relative to *base_dir* into individual files.
910 Args:
911 base_dir: Root directory to resolve against.
912 paths: Relative path strings.
914 Returns:
915 Flat list of file paths.
916 """
917 all_files: list[Path] = []
918 for p in paths:
919 full = base_dir / p
920 if full.is_file():
921 all_files.append(full)
922 elif full.is_dir():
923 all_files.extend(
924 Path(dirpath) / fname
925 for dirpath, _, filenames in os.walk(full, followlinks=True)
926 for fname in filenames
927 )
928 else:
929 logger.debug(f"Path not found in template repository: {p}")
930 return all_files
933def _excluded_set(base_dir: Path, excluded_paths: list[str]) -> set[str]:
934 """Build a set of relative path strings that should be excluded.
936 Args:
937 base_dir: Root of the template clone.
938 excluded_paths: User-configured exclude list.
940 Returns:
941 Set of relative path strings (always includes rhiza internals).
942 """
943 result: set[str] = set()
944 for f in _expand_paths(base_dir, excluded_paths):
945 result.add(str(f.relative_to(base_dir)))
946 result.add(".rhiza/template.yml")
947 result.add(".rhiza/history")
948 return result
951def _remap_path(source: str, path_map: dict[str, str]) -> str:
952 """Translate *source* to its destination path using *path_map*.
954 Supports both exact file matches and directory-prefix matches. A prefix
955 match is triggered when a key ends with ``/`` or when *source* starts with
956 ``<key>/``.
958 Args:
959 source: Source-relative path from the template clone.
960 path_map: Mapping of source path → destination path.
962 Returns:
963 The destination path, or *source* unchanged when no mapping applies.
964 """
965 if source in path_map:
966 return path_map[source]
967 for src, dest in path_map.items():
968 src_prefix = src.rstrip("/") + "/"
969 if source.startswith(src_prefix):
970 suffix = source[len(src_prefix) :]
971 if dest.rstrip("/"):
972 return dest.rstrip("/") + "/" + suffix
973 return suffix
974 return source
977def _prepare_snapshot(
978 clone_dir: Path,
979 include_paths: list[str],
980 excludes: set[str],
981 snapshot_dir: Path,
982 path_map: dict[str, str] | None = None,
983) -> list[Path]:
984 """Copy included (non-excluded) files from a clone into a snapshot directory.
986 When *path_map* is provided, files are written at their destination paths
987 (rather than their source paths) so that downstream diffs and merges operate
988 on the correct target locations.
990 Args:
991 clone_dir: Root of the template clone.
992 include_paths: Source paths to include.
993 excludes: Set of relative source paths to exclude.
994 snapshot_dir: Destination directory for the snapshot.
995 path_map: Optional source→destination path mapping. Keys may be exact
996 file paths or directory prefixes.
998 Returns:
999 List of relative destination file paths that were copied.
1000 """
1001 effective_map = path_map or {}
1002 materialized: list[Path] = []
1003 for f in _expand_paths(clone_dir, include_paths):
1004 rel_source = str(f.relative_to(clone_dir))
1005 if rel_source not in excludes:
1006 rel_dest = _remap_path(rel_source, effective_map)
1007 dst = snapshot_dir / rel_dest
1008 dst.parent.mkdir(parents=True, exist_ok=True)
1009 shutil.copy2(f, dst)
1010 materialized.append(Path(rel_dest))
1011 return materialized