Coverage for src / rhiza / models / _git_utils.py: 100%

360 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-06-15 18:22 +0000

1"""Git utility helpers for Rhiza models.""" 

2 

3import logging 

4import os 

5import shutil 

6import subprocess # nosec B404 

7import tempfile 

8from dataclasses import dataclass, field 

9from pathlib import Path 

10from typing import TYPE_CHECKING, Any 

11 

12from loguru import logger 

13 

14if TYPE_CHECKING: 

15 from rhiza.models.lock import TemplateLock 

16 from rhiza.models.template import RhizaTemplate 

17 

18 

19@dataclass 

20class GitContext: 

21 """Bundles the git executable path and environment for subprocess calls. 

22 

23 All git-invoking functions in the sync helpers accept a 

24 :class:`GitContext` instead of resolving the executable on their own, 

25 making them easily testable via binary injection. 

26 

27 Attributes: 

28 executable: Absolute path to the git binary. 

29 env: Environment variables passed to every git subprocess. 

30 """ 

31 

32 executable: str 

33 env: dict[str, str] = field(default_factory=dict) 

34 

35 @classmethod 

36 def default(cls) -> "GitContext": 

37 """Create a GitContext using the system git and process environment. 

38 

39 Returns: 

40 A :class:`GitContext` populated with the real git executable path 

41 and a copy of the current process environment with 

42 ``GIT_TERMINAL_PROMPT`` set to ``"0"``. 

43 """ 

44 env = os.environ.copy() 

45 env["GIT_TERMINAL_PROMPT"] = "0" 

46 return cls(executable=get_git_executable(), env=env) 

47 

48 def assert_status_clean(self, target: Path) -> None: 

49 """Raise RuntimeError if the target repository has uncommitted changes. 

50 

51 Runs ``git status --porcelain`` and raises if the output is non-empty, 

52 preventing a sync from running on a dirty working tree. 

53 

54 Args: 

55 target: Path to the target repository. 

56 

57 Raises: 

58 RuntimeError: If the working tree has uncommitted changes. 

59 """ 

60 result = subprocess.run( # nosec B603 # noqa: S603 

61 [self.executable, "status", "--porcelain"], 

62 cwd=target, 

63 capture_output=True, 

64 text=True, 

65 env=self.env, 

66 ) 

67 if result.stdout.strip(): 

68 logger.error("Working tree is not clean. Please commit or stash your changes before syncing.") 

69 logger.error("Uncommitted changes:") 

70 for line in result.stdout.strip().splitlines(): 

71 logger.error(f" {line}") 

72 raise RuntimeError("Working tree is not clean. Please commit or stash your changes before syncing.") # noqa: TRY003 

73 

74 def handle_target_branch(self, target: Path, target_branch: str | None) -> None: 

75 """Handle target branch creation or checkout if specified. 

76 

77 Args: 

78 target: Path to the target repository. 

79 target_branch: Optional branch name to create/checkout. 

80 """ 

81 if not target_branch: 

82 return 

83 

84 logger.info(f"Creating/checking out target branch: {target_branch}") 

85 try: 

86 result = subprocess.run( # nosec B603 # noqa: S603 

87 [self.executable, "rev-parse", "--verify", target_branch], 

88 cwd=target, 

89 capture_output=True, 

90 text=True, 

91 env=self.env, 

92 ) 

93 

94 if result.returncode == 0: 

95 logger.info(f"Branch '{target_branch}' exists, checking out...") 

96 subprocess.run( # nosec B603 # noqa: S603 

97 [self.executable, "checkout", target_branch], 

98 cwd=target, 

99 check=True, 

100 capture_output=True, 

101 text=True, 

102 env=self.env, 

103 ) 

104 else: 

105 logger.info(f"Creating new branch '{target_branch}'...") 

106 subprocess.run( # nosec B603 # noqa: S603 

107 [self.executable, "checkout", "-b", target_branch], 

108 cwd=target, 

109 check=True, 

110 capture_output=True, 

111 text=True, 

112 env=self.env, 

113 ) 

114 except subprocess.CalledProcessError as e: 

115 logger.error(f"Failed to create/checkout branch '{target_branch}'") 

116 _log_git_stderr_errors(e.stderr) 

117 logger.error("Please ensure you have no uncommitted changes or conflicts") 

118 raise 

119 

120 def get_diff(self, repo0: Path, repo1: Path) -> str: 

121 """Compute the raw diff between two directory trees using ``git diff --no-index``. 

122 

123 Args: 

124 repo0: Path to the base (old) directory tree. 

125 repo1: Path to the upstream (new) directory tree. 

126 """ 

127 repo0_str = repo0.resolve().as_posix() 

128 repo1_str = repo1.resolve().as_posix() 

129 result = subprocess.run( # nosec B603 # noqa: S603 

130 [ 

131 self.executable, 

132 "-c", 

133 "diff.noprefix=", 

134 "diff", 

135 "--no-index", 

136 "--relative", 

137 "--binary", 

138 "--src-prefix=upstream-template-old/", 

139 "--dst-prefix=upstream-template-new/", 

140 "--no-ext-diff", 

141 "--no-color", 

142 repo0_str, 

143 repo1_str, 

144 ], 

145 cwd=repo0_str, 

146 capture_output=True, 

147 env=self.env, 

148 ) 

149 diff = result.stdout.decode() if isinstance(result.stdout, bytes) else (result.stdout or "") 

150 for repo in [repo0_str, repo1_str]: 

151 from re import sub 

152 

153 repo_nix = sub("/[a-z]:", "", repo) 

154 diff = diff.replace(f"upstream-template-old{repo_nix}", "upstream-template-old").replace( 

155 f"upstream-template-new{repo_nix}", "upstream-template-new" 

156 ) 

157 diff = diff.replace(repo0_str + "/", "").replace(repo1_str + "/", "") 

158 return diff 

159 

160 def sync_diff(self, target: Path, upstream_snapshot: Path) -> None: 

161 """Execute the diff (dry-run) strategy. 

162 

163 Shows what would change without modifying any files. 

164 

165 Args: 

166 target: Path to the target repository. 

167 upstream_snapshot: Path to the upstream snapshot directory. 

168 """ 

169 diff = self.get_diff(target, upstream_snapshot) 

170 if diff.strip(): 

171 logger.info(f"\n{diff}") 

172 changes = diff.count("diff --git") 

173 logger.info(f"{changes} file(s) would be changed") 

174 else: 

175 logger.success("No differences found") 

176 

177 def _parse_diff_filenames(self, diff: str) -> list[tuple[str, bool, bool]]: 

178 """Parse a unified diff produced by :func:`GitContext.get_diff` into file entries. 

179 

180 Each entry is ``(rel_path, is_new, is_deleted)`` where *rel_path* is the 

181 path relative to both snapshot directories. 

182 

183 Args: 

184 diff: Unified diff string from :func:`GitContext.get_diff`. 

185 

186 Returns: 

187 List of ``(rel_path, is_new, is_deleted)`` tuples, one per changed file. 

188 """ 

189 src_prefix = "upstream-template-old/" 

190 dst_prefix = "upstream-template-new/" 

191 

192 results: list[tuple[str, bool, bool]] = [] 

193 is_new = False 

194 is_deleted = False 

195 src_path: str | None = None 

196 dst_path: str | None = None 

197 in_diff = False 

198 

199 def _flush() -> None: 

200 """Emit the current file entry into results if a path was captured.""" 

201 rel = dst_path if not is_deleted else src_path 

202 if rel: 

203 results.append((rel, is_new, is_deleted)) 

204 

205 for line in diff.splitlines(): 

206 if line.startswith("diff --git "): 

207 if in_diff: 

208 _flush() 

209 is_new = False 

210 is_deleted = False 

211 src_path = None 

212 dst_path = None 

213 in_diff = True 

214 elif line.startswith("new file mode"): 

215 is_new = True 

216 elif line.startswith("deleted file mode"): 

217 is_deleted = True 

218 elif line.startswith("--- "): 

219 raw = line[4:].strip().strip('"').split("\t")[0] 

220 if raw != "/dev/null" and raw.startswith(src_prefix): 

221 src_path = raw[len(src_prefix) :] 

222 elif line.startswith("+++ "): 

223 raw = line[4:].strip().strip('"').split("\t")[0] 

224 if raw != "/dev/null" and raw.startswith(dst_prefix): 

225 dst_path = raw[len(dst_prefix) :] 

226 

227 if in_diff: 

228 _flush() 

229 

230 return results 

231 

232 def _merge_file_fallback( 

233 self, 

234 diff: str, 

235 target: Path, 

236 base_snapshot: Path, 

237 upstream_snapshot: Path, 

238 ) -> bool: 

239 """Apply *diff* file-by-file using ``git merge-file``. 

240 

241 Unlike ``git apply -3``, ``git merge-file`` works directly on the file 

242 contents from *base_snapshot* and *upstream_snapshot*, so it does not 

243 require the template's blob objects to exist in the target repository. 

244 

245 Conflict markers (``<<<<<<< HEAD`` / ``=======`` / ``>>>>>>> rhiza-template``) are left in 

246 place for manual resolution when both sides changed the same region. 

247 

248 Args: 

249 diff: Unified diff string (used only for file-list parsing). 

250 target: Path to the target repository. 

251 base_snapshot: Directory containing files at the previously-synced SHA. 

252 upstream_snapshot: Directory containing files at the new upstream SHA. 

253 

254 Returns: 

255 True if every file merged cleanly, False if any conflicts remain. 

256 """ 

257 file_entries = self._parse_diff_filenames(diff) 

258 all_clean = True 

259 conflict_files: list[str] = [] 

260 

261 for rel_path, is_new, is_deleted in file_entries: 

262 target_file = target / rel_path 

263 upstream_file = upstream_snapshot / rel_path 

264 base_file = base_snapshot / rel_path 

265 

266 if is_new: 

267 if upstream_file.exists(): 

268 target_file.parent.mkdir(parents=True, exist_ok=True) 

269 shutil.copy2(upstream_file, target_file) 

270 logger.debug(f"[merge-file] Added: {rel_path}") 

271 continue 

272 

273 if is_deleted: 

274 if target_file.exists(): 

275 target_file.unlink() 

276 logger.debug(f"[merge-file] Deleted: {rel_path}") 

277 continue 

278 

279 # Modified file — attempt a 3-way merge using the on-disk snapshots. 

280 if not target_file.exists(): 

281 if upstream_file.exists(): 

282 target_file.parent.mkdir(parents=True, exist_ok=True) 

283 shutil.copy2(upstream_file, target_file) 

284 logger.debug(f"[merge-file] Created (missing in target): {rel_path}") 

285 continue 

286 

287 if not base_file.exists() or not upstream_file.exists(): 

288 # Cannot 3-way-merge without both sides; just take upstream. 

289 if upstream_file.exists(): 

290 shutil.copy2(upstream_file, target_file) 

291 logger.debug(f"[merge-file] Overwrite (no base): {rel_path}") 

292 continue 

293 

294 result = subprocess.run( # nosec B603 # noqa: S603 

295 [ 

296 self.executable, 

297 "merge-file", 

298 "-L", 

299 "HEAD", 

300 "-L", 

301 "base", 

302 "-L", 

303 "rhiza-template", 

304 str(target_file), 

305 str(base_file), 

306 str(upstream_file), 

307 ], 

308 capture_output=True, 

309 env=self.env, 

310 ) 

311 

312 if result.returncode > 0: 

313 conflict_files.append(rel_path) 

314 all_clean = False 

315 logger.warning(f"[merge-file] Conflict in {rel_path} — resolve markers manually") 

316 elif result.returncode < 0: 

317 logger.warning(f"[merge-file] Error merging {rel_path}: {result.stderr.decode().strip()}") 

318 all_clean = False 

319 else: 

320 logger.debug(f"[merge-file] Clean merge: {rel_path}") 

321 

322 if conflict_files: 

323 detail = "\n".join(f" {f}" for f in conflict_files) 

324 logger.warning( 

325 f"The following file(s) have conflict markers to resolve:\n{detail}\n" 

326 " Resolve each <<<<<<< / ======= / >>>>>>> block and remove the markers\n" 

327 " before committing." 

328 ) 

329 

330 return all_clean 

331 

332 def _scan_conflict_artifacts(self, target: Path) -> tuple[list[str], list[str]]: 

333 """Scan *target* for merge-conflict artifacts left by git. 

334 

335 Looks for: 

336 

337 - ``*.rej`` files produced by ``git apply --reject``. 

338 - Text files that contain ``<<<<<<<`` conflict markers (from 

339 ``git apply -3`` or ``git merge-file``). 

340 

341 Args: 

342 target: Root of the working tree to scan. 

343 

344 Returns: 

345 A ``(rej_files, marker_files)`` tuple, each a sorted list of 

346 paths relative to *target*. 

347 """ 

348 rej_files: list[str] = [] 

349 marker_files: list[str] = [] 

350 for path in sorted(target.rglob("*")): 

351 if not path.is_file(): 

352 continue 

353 rel = str(path.relative_to(target)) 

354 if path.suffix == ".rej": 

355 rej_files.append(rel) 

356 else: 

357 try: 

358 # Read up to 1 MB to avoid stalling on large binary files. 

359 content = path.read_bytes()[:1_048_576] 

360 if b"<<<<<<<" in content: 

361 marker_files.append(rel) 

362 except OSError: 

363 pass 

364 return rej_files, marker_files 

365 

366 def _apply_diff( 

367 self, 

368 diff: str, 

369 target: Path, 

370 base_snapshot: Path | None = None, 

371 upstream_snapshot: Path | None = None, 

372 ) -> bool: 

373 """Apply a diff to the target project using ``git apply -3`` (3-way merge). 

374 

375 When ``git apply -3`` fails because the template's blob objects are absent 

376 from the target repository *and* both *base_snapshot* and 

377 *upstream_snapshot* are provided, falls back to :func:`_merge_file_fallback` 

378 which uses ``git merge-file`` on the on-disk snapshot files instead. 

379 

380 Otherwise falls back to ``git apply --reject``. 

381 

382 Args: 

383 diff: Unified diff string. 

384 target: Path to the target repository. 

385 base_snapshot: Optional directory containing files at the base SHA. 

386 upstream_snapshot: Optional directory containing files at the upstream SHA. 

387 

388 Returns: 

389 True if the diff applied cleanly, False if there were conflicts. 

390 """ 

391 if not diff.strip(): 

392 return True 

393 

394 try: 

395 subprocess.run( # nosec B603 # noqa: S603 

396 [self.executable, "apply", "-3"], 

397 input=diff.encode() if isinstance(diff, str) else diff, 

398 cwd=target, 

399 check=True, 

400 capture_output=True, 

401 env=self.env, 

402 ) 

403 except subprocess.CalledProcessError as e: 

404 stderr = e.stderr.decode() if isinstance(e.stderr, bytes) else (e.stderr or "") 

405 

406 # git apply -3 cannot do a real 3-way merge when the template blobs are 

407 # not present in the target repository's object store. If we have the 

408 # snapshot directories on disk, use git merge-file instead — it works 

409 # directly on file content and needs no shared git history. 

410 if "lacks the necessary blob" in stderr and base_snapshot is not None and upstream_snapshot is not None: 

411 logger.debug("git apply -3 lacks blob objects; switching to git merge-file fallback") 

412 return self._merge_file_fallback(diff, target, base_snapshot, upstream_snapshot) 

413 

414 if stderr: 

415 logger.warning(f"3-way merge had conflicts:\n{stderr.strip()}") 

416 # Fall back to --reject for conflict files 

417 try: 

418 subprocess.run( # nosec B603 # noqa: S603 

419 [self.executable, "apply", "--reject"], 

420 input=diff.encode() if isinstance(diff, str) else diff, 

421 cwd=target, 

422 check=True, 

423 capture_output=True, 

424 env=self.env, 

425 ) 

426 except subprocess.CalledProcessError as e2: 

427 stderr2 = e2.stderr.decode() if isinstance(e2.stderr, bytes) else (e2.stderr or "") 

428 if stderr2: 

429 logger.warning(stderr2.strip()) 

430 

431 # Scan and report any conflict artifacts left behind so users know 

432 # exactly which files need attention. 

433 rej_files, marker_files = self._scan_conflict_artifacts(target) 

434 if rej_files: 

435 rej_detail = "\n".join( 

436 f" {f.removesuffix('.rej')} (unresolved hunks saved to {f})" for f in rej_files 

437 ) 

438 logger.warning( 

439 f"The following file(s) have unresolved hunks:\n{rej_detail}\n" 

440 " Open each .rej file, manually apply the diff hunks to the source file,\n" 

441 " then delete the .rej file before committing." 

442 ) 

443 if marker_files: 

444 marker_detail = "\n".join(f" {f}" for f in marker_files) 

445 logger.warning( 

446 f"The following file(s) contain conflict markers:\n{marker_detail}\n" 

447 " Resolve each <<<<<<< / ======= / >>>>>>> block and remove the markers\n" 

448 " before committing." 

449 ) 

450 if not rej_files and not marker_files: 

451 logger.warning("Some changes could not be applied cleanly — check the working tree for partial edits.") 

452 return False 

453 else: 

454 return True 

455 

456 def _copy_files_to_target(self, snapshot_dir: Path, target: Path, materialized: list[Path]) -> None: 

457 """Copy all materialized files from a snapshot into the target project. 

458 

459 Args: 

460 snapshot_dir: Directory containing the snapshot files. 

461 target: Path to the target repository. 

462 materialized: List of relative file paths to copy. 

463 """ 

464 for rel_path in sorted(materialized): 

465 src = snapshot_dir / rel_path 

466 dst = target / rel_path 

467 dst.parent.mkdir(parents=True, exist_ok=True) 

468 shutil.copy2(src, dst) 

469 logger.success(f"[COPY] {rel_path}") 

470 

471 def sync_merge( 

472 self, 

473 target: Path, 

474 upstream_snapshot: Path, 

475 upstream_sha: str, 

476 base_sha: str | None, 

477 materialized: list[Path], 

478 template: "RhizaTemplate", 

479 excludes: set[str], 

480 lock: "TemplateLock", 

481 lock_file: "Path | None" = None, 

482 path_map: "dict[str, str] | None" = None, 

483 ) -> bool: 

484 """Execute the merge strategy (cruft-style 3-way merge). 

485 

486 When a base SHA exists, computes the diff between base and upstream 

487 snapshots and applies it via ``git apply -3``. On first sync (no base), 

488 falls back to a simple copy. 

489 

490 Args: 

491 target: Path to the target repository. 

492 upstream_snapshot: Path to the upstream snapshot directory. 

493 upstream_sha: HEAD SHA of the upstream template. 

494 base_sha: Previously synced commit SHA, or None for first sync. 

495 materialized: List of relative file paths. 

496 template: The :class:`~rhiza.models.RhizaTemplate` driving this sync. 

497 excludes: Set of relative paths to exclude. 

498 lock: Pre-built :class:`~rhiza.models.TemplateLock` for this sync. 

499 lock_file: Optional explicit path for the lock file. When ``None`` 

500 the default ``<target>/.rhiza/template.lock`` is used. 

501 path_map: Optional source→destination path mapping for remapped 

502 bundle file entries. 

503 

504 Returns: 

505 True if all changes applied cleanly, False if any conflicts remain. 

506 """ 

507 from rhiza.commands._sync_helpers import ( 

508 _clean_orphaned_files, 

509 _read_previously_tracked_files, 

510 _warn_about_workflow_files, 

511 _write_lock, 

512 ) 

513 

514 # Snapshot the currently-tracked files before the merge runs. The merge 

515 # may write a new lock (e.g. on the "template unchanged" early-return path 

516 # in _merge_with_base), so we must read the old state first to ensure 

517 # orphan cleanup compares against the previous sync, not the new one. 

518 old_tracked_files = _read_previously_tracked_files(target, lock_file=lock_file) 

519 

520 base_snapshot = Path(tempfile.mkdtemp()) 

521 clean = True 

522 try: 

523 if base_sha: 

524 clean = self._merge_with_base( 

525 target, 

526 upstream_snapshot, 

527 upstream_sha, 

528 base_sha, 

529 base_snapshot, 

530 template, 

531 excludes, 

532 lock, 

533 lock_file=lock_file, 

534 path_map=path_map, 

535 ) 

536 else: 

537 logger.info("First sync — copying all template files") 

538 self._copy_files_to_target(upstream_snapshot, target, materialized) 

539 

540 # Restore any template-managed files that are absent from the target. 

541 # This can happen when files tracked by the template do not exist in the 

542 # downstream repository — for example when the template snapshot was 

543 # unchanged since the last sync so no diff was applied, but the files 

544 # were never present or were manually deleted. 

545 missing_from_target = [p for p in materialized if not (target / p).exists()] 

546 if missing_from_target: 

547 logger.info(f"Restoring {len(missing_from_target)} template file(s) missing from target") 

548 self._copy_files_to_target(upstream_snapshot, target, missing_from_target) 

549 

550 _warn_about_workflow_files(materialized) 

551 _clean_orphaned_files( 

552 target, 

553 materialized, 

554 excludes=excludes, 

555 base_snapshot=base_snapshot, 

556 previously_tracked_files=old_tracked_files if old_tracked_files else None, 

557 lock_file=lock_file, 

558 ) 

559 _write_lock(target, lock, lock_file=lock_file) 

560 logger.success(f"Sync complete — {len(materialized)} file(s) processed") 

561 finally: 

562 if base_snapshot.exists(): 

563 shutil.rmtree(base_snapshot) 

564 

565 return clean 

566 

567 def update_sparse_checkout( 

568 self, 

569 tmp_dir: Path, 

570 include_paths: list[str], 

571 logger: logging.Logger | None = None, 

572 ) -> None: 

573 """Update sparse-checkout paths in an already-cloned repository. 

574 

575 Args: 

576 tmp_dir: Temporary directory with cloned repository. 

577 include_paths: Paths to include in sparse checkout. 

578 logger: Optional logger; defaults to module logger. 

579 """ 

580 logger = logger or logging.getLogger(__name__) 

581 

582 try: 

583 logger.debug(f"Updating sparse checkout paths: {include_paths}") 

584 subprocess.run( # nosec B603 # noqa: S603 

585 [self.executable, "sparse-checkout", "set", "--skip-checks", *include_paths], 

586 cwd=tmp_dir, 

587 check=True, 

588 capture_output=True, 

589 text=True, 

590 env=self.env, 

591 ) 

592 logger.debug("Sparse checkout paths updated") 

593 except subprocess.CalledProcessError as e: 

594 logger.exception("Failed to update sparse checkout paths") 

595 _log_git_stderr_errors(e.stderr) 

596 raise 

597 

598 def get_head_sha(self, repo_dir: Path) -> str: 

599 """Return the HEAD commit SHA of a cloned repository. 

600 

601 Args: 

602 repo_dir: Path to the git repository. 

603 

604 Returns: 

605 The full HEAD SHA. 

606 """ 

607 result = subprocess.run( # nosec B603 # noqa: S603 

608 [self.executable, "rev-parse", "HEAD"], 

609 cwd=repo_dir, 

610 capture_output=True, 

611 text=True, 

612 check=True, 

613 env=self.env, 

614 ) 

615 return result.stdout.strip() 

616 

617 def clone_repository( 

618 self, 

619 git_url: str, 

620 tmp_dir: Path, 

621 branch: str, 

622 include_paths: list[str], 

623 logger: logging.Logger | None = None, 

624 ) -> None: 

625 """Clone template repository with sparse checkout. 

626 

627 Args: 

628 git_url: URL of the repository to clone. 

629 tmp_dir: Temporary directory for cloning. 

630 branch: Branch to clone from the template repository. 

631 include_paths: Paths to include in sparse checkout. 

632 logger: Optional logger; defaults to module logger. 

633 """ 

634 logger = logger or logging.getLogger(__name__) 

635 

636 try: 

637 logger.debug("Executing git clone with sparse checkout") 

638 subprocess.run( # nosec B603 # noqa: S603 

639 [ 

640 self.executable, 

641 "clone", 

642 "--depth", 

643 "1", 

644 "--filter=blob:none", 

645 "--sparse", 

646 "--branch", 

647 branch, 

648 git_url, 

649 str(tmp_dir), 

650 ], 

651 check=True, 

652 capture_output=True, 

653 text=True, 

654 env=self.env, 

655 ) 

656 logger.debug("Git clone completed successfully") 

657 except subprocess.CalledProcessError as e: 

658 logger.exception(f"Failed to clone repository from {git_url}") 

659 _log_git_stderr_errors(e.stderr) 

660 logger.exception("Please check that:") 

661 logger.exception(" - The repository exists and is accessible") 

662 logger.exception(f" - Branch '{branch}' exists in the repository") 

663 logger.exception(" - You have network access to the git hosting service") 

664 raise 

665 

666 try: 

667 logger.debug("Initializing sparse checkout") 

668 subprocess.run( # nosec B603 # noqa: S603 

669 [self.executable, "sparse-checkout", "init", "--cone"], 

670 cwd=tmp_dir, 

671 check=True, 

672 capture_output=True, 

673 text=True, 

674 env=self.env, 

675 ) 

676 logger.debug("Sparse checkout initialized") 

677 except subprocess.CalledProcessError as e: 

678 logger.exception("Failed to initialize sparse checkout") 

679 _log_git_stderr_errors(e.stderr) 

680 raise 

681 

682 try: 

683 logger.debug(f"Setting sparse checkout paths: {include_paths}") 

684 subprocess.run( # nosec B603 # noqa: S603 

685 [self.executable, "sparse-checkout", "set", "--skip-checks", *include_paths], 

686 cwd=tmp_dir, 

687 check=True, 

688 capture_output=True, 

689 text=True, 

690 env=self.env, 

691 ) 

692 logger.debug("Sparse checkout paths configured") 

693 except subprocess.CalledProcessError as e: 

694 logger.exception("Failed to configure sparse checkout paths") 

695 _log_git_stderr_errors(e.stderr) 

696 raise 

697 

698 def clone_at_sha( 

699 self, 

700 git_url: str, 

701 sha: str, 

702 dest: Path, 

703 include_paths: list[str], 

704 logger: logging.Logger | None = None, 

705 ) -> None: 

706 """Clone the template repository and checkout a specific commit. 

707 

708 Args: 

709 git_url: URL of the repository to clone. 

710 sha: Commit SHA to check out. 

711 dest: Target directory for the clone. 

712 include_paths: Paths to include in sparse checkout. 

713 logger: Optional logger; defaults to module logger. 

714 """ 

715 logger = logger or logging.getLogger(__name__) 

716 try: 

717 subprocess.run( # nosec B603 # noqa: S603 

718 [ 

719 self.executable, 

720 "clone", 

721 "--filter=blob:none", 

722 "--sparse", 

723 "--no-checkout", 

724 git_url, 

725 str(dest), 

726 ], 

727 check=True, 

728 capture_output=True, 

729 text=True, 

730 env=self.env, 

731 ) 

732 except subprocess.CalledProcessError as e: 

733 logger.exception(f"Failed to clone repository for base snapshot: {git_url}") 

734 _log_git_stderr_errors(e.stderr) 

735 raise 

736 

737 try: 

738 subprocess.run( # nosec B603 # noqa: S603 

739 [self.executable, "sparse-checkout", "init", "--cone"], 

740 cwd=dest, 

741 check=True, 

742 capture_output=True, 

743 text=True, 

744 env=self.env, 

745 ) 

746 subprocess.run( # nosec B603 # noqa: S603 

747 [self.executable, "sparse-checkout", "set", "--skip-checks", *include_paths], 

748 cwd=dest, 

749 check=True, 

750 capture_output=True, 

751 text=True, 

752 env=self.env, 

753 ) 

754 except subprocess.CalledProcessError as e: 

755 logger.exception("Failed to configure sparse checkout for base snapshot") 

756 _log_git_stderr_errors(e.stderr) 

757 raise 

758 

759 try: 

760 subprocess.run( # nosec B603 # noqa: S603 

761 [self.executable, "checkout", sha], 

762 cwd=dest, 

763 check=True, 

764 capture_output=True, 

765 text=True, 

766 env=self.env, 

767 ) 

768 except subprocess.CalledProcessError as e: 

769 logger.exception(f"Failed to checkout base commit {sha[:12]}") 

770 _log_git_stderr_errors(e.stderr) 

771 raise 

772 

773 def _merge_with_base( 

774 self, 

775 target: Path, 

776 upstream_snapshot: Path, 

777 upstream_sha: str, # noqa: ARG002 # part of the merge-call signature; lock carries the sha 

778 base_sha: str, 

779 base_snapshot: Path, 

780 template: "RhizaTemplate", 

781 excludes: set[str], 

782 lock: "TemplateLock", 

783 lock_file: "Path | None" = None, 

784 path_map: "dict[str, str] | None" = None, 

785 ) -> bool: 

786 """Compute and apply the diff between base and upstream snapshots. 

787 

788 Args: 

789 target: Path to the target repository. 

790 upstream_snapshot: Path to the upstream snapshot directory. 

791 upstream_sha: HEAD SHA of the upstream template. 

792 base_sha: Previously synced commit SHA. 

793 base_snapshot: Directory to populate with the base snapshot. 

794 template: The :class:`~rhiza.models.RhizaTemplate` driving this sync. 

795 excludes: Set of relative paths to exclude. 

796 lock: Pre-built :class:`~rhiza.models.TemplateLock` for this sync. 

797 lock_file: Optional explicit path for the lock file. When ``None`` 

798 the default ``<target>/.rhiza/template.lock`` is used. 

799 path_map: Optional source→destination path mapping for remapped 

800 bundle file entries. 

801 

802 Returns: 

803 True if all changes applied cleanly, False if any conflicts remain. 

804 """ 

805 from rhiza.commands._sync_helpers import _write_lock 

806 

807 logger.info(f"Cloning base snapshot at {base_sha[:12]}") 

808 base_clone = Path(tempfile.mkdtemp()) 

809 try: 

810 self.clone_at_sha(template.git_url, base_sha, base_clone, template.include) 

811 _prepare_snapshot(base_clone, template.include, excludes, base_snapshot, path_map=path_map) 

812 except Exception: # noqa: BLE001 # clone/snapshot can fail many ways; on any failure treat all files as new 

813 logger.warning("Could not checkout base commit — treating all files as new") 

814 finally: 

815 if base_clone.exists(): 

816 shutil.rmtree(base_clone) 

817 

818 diff = self.get_diff(base_snapshot, upstream_snapshot) 

819 

820 if not diff.strip(): 

821 logger.success("Template unchanged since last sync — nothing to apply") 

822 _write_lock(target, lock, lock_file=lock_file) 

823 return True 

824 

825 logger.info("Applying template changes via 3-way merge (cruft)...") 

826 clean = self._apply_diff(diff, target, base_snapshot=base_snapshot, upstream_snapshot=upstream_snapshot) 

827 

828 if clean: 

829 logger.success("All changes applied cleanly") 

830 else: 

831 logger.warning("Some changes had conflicts. Check for *.rej files and resolve manually.") 

832 

833 return clean 

834 

835 

836def _normalize_to_list(value: Any | list[Any] | None) -> list[Any]: 

837 r"""Convert a value to a list of strings. 

838 

839 Handles the case where YAML multi-line strings (using |) are parsed as 

840 a single string instead of a list. Splits the string by newlines and 

841 strips whitespace from each item. 

842 

843 Args: 

844 value: A string, list of strings, or None. 

845 

846 Returns: 

847 A list of strings. Empty list if value is None or empty. 

848 

849 Examples: 

850 >>> _normalize_to_list(None) 

851 [] 

852 >>> _normalize_to_list([]) 

853 [] 

854 >>> _normalize_to_list(['a', 'b', 'c']) 

855 ['a', 'b', 'c'] 

856 >>> _normalize_to_list('single line') 

857 ['single line'] 

858 >>> _normalize_to_list('line1\\n' + 'line2\\n' + 'line3') 

859 ['line1', 'line2', 'line3'] 

860 >>> _normalize_to_list(' item1 \\n' + ' item2 ') 

861 ['item1', 'item2'] 

862 """ 

863 if value is None: 

864 return [] 

865 if isinstance(value, list): 

866 return value 

867 if isinstance(value, str): 

868 # Split by newlines and filter out empty strings 

869 # Handle both actual newlines (\n) and literal backslash-n (\\n) 

870 items = value.split("\\n") if "\\n" in value and "\n" not in value else value.split("\n") 

871 return [item.strip() for item in items if item.strip()] 

872 return [] 

873 

874 

875def get_git_executable() -> str: 

876 """Get the absolute path to the git executable. 

877 

878 This function ensures we use the full path to git to prevent 

879 security issues related to PATH manipulation. 

880 

881 Returns: 

882 str: Absolute path to the git executable. 

883 

884 Raises: 

885 RuntimeError: If git executable is not found in PATH. 

886 """ 

887 git_path = shutil.which("git") 

888 if git_path is None: 

889 msg = "git executable not found in PATH. Please ensure git is installed and available." 

890 raise RuntimeError(msg) 

891 return git_path 

892 

893 

894def _log_git_stderr_errors(stderr: str | None) -> None: 

895 """Extract and log only relevant error messages from git stderr. 

896 

897 Args: 

898 stderr: Git command stderr output. 

899 """ 

900 if stderr: 

901 for line in stderr.strip().split("\n"): 

902 line = line.strip() 

903 if line and (line.startswith(("fatal:", "error:"))): 

904 logger.error(line) 

905 

906 

907def _expand_paths(base_dir: Path, paths: list[str]) -> list[Path]: 

908 """Expand file/directory paths relative to *base_dir* into individual files. 

909 

910 Args: 

911 base_dir: Root directory to resolve against. 

912 paths: Relative path strings. 

913 

914 Returns: 

915 Flat list of file paths. 

916 """ 

917 all_files: list[Path] = [] 

918 for p in paths: 

919 full = base_dir / p 

920 if full.is_file(): 

921 all_files.append(full) 

922 elif full.is_dir(): 

923 all_files.extend( 

924 Path(dirpath) / fname 

925 for dirpath, _, filenames in os.walk(full, followlinks=True) 

926 for fname in filenames 

927 ) 

928 else: 

929 logger.debug(f"Path not found in template repository: {p}") 

930 return all_files 

931 

932 

933def _excluded_set(base_dir: Path, excluded_paths: list[str]) -> set[str]: 

934 """Build a set of relative path strings that should be excluded. 

935 

936 Args: 

937 base_dir: Root of the template clone. 

938 excluded_paths: User-configured exclude list. 

939 

940 Returns: 

941 Set of relative path strings (always includes rhiza internals). 

942 """ 

943 result: set[str] = set() 

944 for f in _expand_paths(base_dir, excluded_paths): 

945 result.add(str(f.relative_to(base_dir))) 

946 result.add(".rhiza/template.yml") 

947 result.add(".rhiza/history") 

948 return result 

949 

950 

951def _remap_path(source: str, path_map: dict[str, str]) -> str: 

952 """Translate *source* to its destination path using *path_map*. 

953 

954 Supports both exact file matches and directory-prefix matches. A prefix 

955 match is triggered when a key ends with ``/`` or when *source* starts with 

956 ``<key>/``. 

957 

958 Args: 

959 source: Source-relative path from the template clone. 

960 path_map: Mapping of source path → destination path. 

961 

962 Returns: 

963 The destination path, or *source* unchanged when no mapping applies. 

964 """ 

965 if source in path_map: 

966 return path_map[source] 

967 for src, dest in path_map.items(): 

968 src_prefix = src.rstrip("/") + "/" 

969 if source.startswith(src_prefix): 

970 suffix = source[len(src_prefix) :] 

971 if dest.rstrip("/"): 

972 return dest.rstrip("/") + "/" + suffix 

973 return suffix 

974 return source 

975 

976 

977def _prepare_snapshot( 

978 clone_dir: Path, 

979 include_paths: list[str], 

980 excludes: set[str], 

981 snapshot_dir: Path, 

982 path_map: dict[str, str] | None = None, 

983) -> list[Path]: 

984 """Copy included (non-excluded) files from a clone into a snapshot directory. 

985 

986 When *path_map* is provided, files are written at their destination paths 

987 (rather than their source paths) so that downstream diffs and merges operate 

988 on the correct target locations. 

989 

990 Args: 

991 clone_dir: Root of the template clone. 

992 include_paths: Source paths to include. 

993 excludes: Set of relative source paths to exclude. 

994 snapshot_dir: Destination directory for the snapshot. 

995 path_map: Optional source→destination path mapping. Keys may be exact 

996 file paths or directory prefixes. 

997 

998 Returns: 

999 List of relative destination file paths that were copied. 

1000 """ 

1001 effective_map = path_map or {} 

1002 materialized: list[Path] = [] 

1003 for f in _expand_paths(clone_dir, include_paths): 

1004 rel_source = str(f.relative_to(clone_dir)) 

1005 if rel_source not in excludes: 

1006 rel_dest = _remap_path(rel_source, effective_map) 

1007 dst = snapshot_dir / rel_dest 

1008 dst.parent.mkdir(parents=True, exist_ok=True) 

1009 shutil.copy2(f, dst) 

1010 materialized.append(Path(rel_dest)) 

1011 return materialized