Coverage for src / rhiza / models / _git_utils.py: 100%

319 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-02 07:04 +0000

1"""Git utility helpers for Rhiza models.""" 

2 

3import logging 

4import os 

5import shutil 

6import subprocess # nosec B404 

7import tempfile 

8from dataclasses import dataclass, field 

9from pathlib import Path 

10from typing import TYPE_CHECKING, Any 

11 

12from loguru import logger 

13 

14if TYPE_CHECKING: 

15 from rhiza.models.lock import TemplateLock 

16 from rhiza.models.template import RhizaTemplate 

17 

18 

19@dataclass 

20class GitContext: 

21 """Bundles the git executable path and environment for subprocess calls. 

22 

23 All git-invoking functions in the sync helpers accept a 

24 :class:`GitContext` instead of resolving the executable on their own, 

25 making them easily testable via binary injection. 

26 

27 Attributes: 

28 executable: Absolute path to the git binary. 

29 env: Environment variables passed to every git subprocess. 

30 """ 

31 

32 executable: str 

33 env: dict[str, str] = field(default_factory=dict) 

34 

35 @classmethod 

36 def default(cls) -> "GitContext": 

37 """Create a GitContext using the system git and process environment. 

38 

39 Returns: 

40 A :class:`GitContext` populated with the real git executable path 

41 and a copy of the current process environment with 

42 ``GIT_TERMINAL_PROMPT`` set to ``"0"``. 

43 """ 

44 env = os.environ.copy() 

45 env["GIT_TERMINAL_PROMPT"] = "0" 

46 return cls(executable=get_git_executable(), env=env) 

47 

48 def assert_status_clean(self, target: Path) -> None: 

49 """Raise RuntimeError if the target repository has uncommitted changes. 

50 

51 Runs ``git status --porcelain`` and raises if the output is non-empty, 

52 preventing a sync from running on a dirty working tree. 

53 

54 Args: 

55 target: Path to the target repository. 

56 

57 Raises: 

58 RuntimeError: If the working tree has uncommitted changes. 

59 """ 

60 result = subprocess.run( # nosec B603 # noqa: S603 

61 [self.executable, "status", "--porcelain"], 

62 cwd=target, 

63 capture_output=True, 

64 text=True, 

65 env=self.env, 

66 ) 

67 if result.stdout.strip(): 

68 logger.error("Working tree is not clean. Please commit or stash your changes before syncing.") 

69 logger.error("Uncommitted changes:") 

70 for line in result.stdout.strip().splitlines(): 

71 logger.error(f" {line}") 

72 raise RuntimeError("Working tree is not clean. Please commit or stash your changes before syncing.") # noqa: TRY003 

73 

74 def handle_target_branch(self, target: Path, target_branch: str | None) -> None: 

75 """Handle target branch creation or checkout if specified. 

76 

77 Args: 

78 target: Path to the target repository. 

79 target_branch: Optional branch name to create/checkout. 

80 """ 

81 if not target_branch: 

82 return 

83 

84 logger.info(f"Creating/checking out target branch: {target_branch}") 

85 try: 

86 result = subprocess.run( # nosec B603 # noqa: S603 

87 [self.executable, "rev-parse", "--verify", target_branch], 

88 cwd=target, 

89 capture_output=True, 

90 text=True, 

91 env=self.env, 

92 ) 

93 

94 if result.returncode == 0: 

95 logger.info(f"Branch '{target_branch}' exists, checking out...") 

96 subprocess.run( # nosec B603 # noqa: S603 

97 [self.executable, "checkout", target_branch], 

98 cwd=target, 

99 check=True, 

100 capture_output=True, 

101 text=True, 

102 env=self.env, 

103 ) 

104 else: 

105 logger.info(f"Creating new branch '{target_branch}'...") 

106 subprocess.run( # nosec B603 # noqa: S603 

107 [self.executable, "checkout", "-b", target_branch], 

108 cwd=target, 

109 check=True, 

110 capture_output=True, 

111 text=True, 

112 env=self.env, 

113 ) 

114 except subprocess.CalledProcessError as e: 

115 logger.error(f"Failed to create/checkout branch '{target_branch}'") 

116 _log_git_stderr_errors(e.stderr) 

117 logger.error("Please ensure you have no uncommitted changes or conflicts") 

118 raise 

119 

120 def get_diff(self, repo0: Path, repo1: Path) -> str: 

121 """Compute the raw diff between two directory trees using ``git diff --no-index``. 

122 

123 Args: 

124 repo0: Path to the base (old) directory tree. 

125 repo1: Path to the upstream (new) directory tree. 

126 """ 

127 repo0_str = repo0.resolve().as_posix() 

128 repo1_str = repo1.resolve().as_posix() 

129 result = subprocess.run( # nosec B603 # noqa: S603 

130 [ 

131 self.executable, 

132 "-c", 

133 "diff.noprefix=", 

134 "diff", 

135 "--no-index", 

136 "--relative", 

137 "--binary", 

138 "--src-prefix=upstream-template-old/", 

139 "--dst-prefix=upstream-template-new/", 

140 "--no-ext-diff", 

141 "--no-color", 

142 repo0_str, 

143 repo1_str, 

144 ], 

145 cwd=repo0_str, 

146 capture_output=True, 

147 env=self.env, 

148 ) 

149 diff = result.stdout.decode() if isinstance(result.stdout, bytes) else (result.stdout or "") 

150 for repo in [repo0_str, repo1_str]: 

151 from re import sub 

152 

153 repo_nix = sub("/[a-z]:", "", repo) 

154 diff = diff.replace(f"upstream-template-old{repo_nix}", "upstream-template-old").replace( 

155 f"upstream-template-new{repo_nix}", "upstream-template-new" 

156 ) 

157 diff = diff.replace(repo0_str + "/", "").replace(repo1_str + "/", "") 

158 return diff 

159 

160 def sync_diff(self, target: Path, upstream_snapshot: Path) -> None: 

161 """Execute the diff (dry-run) strategy. 

162 

163 Shows what would change without modifying any files. 

164 

165 Args: 

166 target: Path to the target repository. 

167 upstream_snapshot: Path to the upstream snapshot directory. 

168 """ 

169 diff = self.get_diff(target, upstream_snapshot) 

170 if diff.strip(): 

171 logger.info(f"\n{diff}") 

172 changes = diff.count("diff --git") 

173 logger.info(f"{changes} file(s) would be changed") 

174 else: 

175 logger.success("No differences found") 

176 

177 def _parse_diff_filenames(self, diff: str) -> list[tuple[str, bool, bool]]: 

178 """Parse a unified diff produced by :func:`GitContext.get_diff` into file entries. 

179 

180 Each entry is ``(rel_path, is_new, is_deleted)`` where *rel_path* is the 

181 path relative to both snapshot directories. 

182 

183 Args: 

184 diff: Unified diff string from :func:`GitContext.get_diff`. 

185 

186 Returns: 

187 List of ``(rel_path, is_new, is_deleted)`` tuples, one per changed file. 

188 """ 

189 src_prefix = "upstream-template-old/" 

190 dst_prefix = "upstream-template-new/" 

191 

192 results: list[tuple[str, bool, bool]] = [] 

193 is_new = False 

194 is_deleted = False 

195 src_path: str | None = None 

196 dst_path: str | None = None 

197 in_diff = False 

198 

199 def _flush() -> None: 

200 rel = dst_path if not is_deleted else src_path 

201 if rel: 

202 results.append((rel, is_new, is_deleted)) 

203 

204 for line in diff.splitlines(): 

205 if line.startswith("diff --git "): 

206 if in_diff: 

207 _flush() 

208 is_new = False 

209 is_deleted = False 

210 src_path = None 

211 dst_path = None 

212 in_diff = True 

213 elif line.startswith("new file mode"): 

214 is_new = True 

215 elif line.startswith("deleted file mode"): 

216 is_deleted = True 

217 elif line.startswith("--- "): 

218 raw = line[4:].strip().strip('"').split("\t")[0] 

219 if raw != "/dev/null" and raw.startswith(src_prefix): 

220 src_path = raw[len(src_prefix) :] 

221 elif line.startswith("+++ "): 

222 raw = line[4:].strip().strip('"').split("\t")[0] 

223 if raw != "/dev/null" and raw.startswith(dst_prefix): 

224 dst_path = raw[len(dst_prefix) :] 

225 

226 if in_diff: 

227 _flush() 

228 

229 return results 

230 

231 def _merge_file_fallback( 

232 self, 

233 diff: str, 

234 target: Path, 

235 base_snapshot: Path, 

236 upstream_snapshot: Path, 

237 ) -> bool: 

238 """Apply *diff* file-by-file using ``git merge-file``. 

239 

240 Unlike ``git apply -3``, ``git merge-file`` works directly on the file 

241 contents from *base_snapshot* and *upstream_snapshot*, so it does not 

242 require the template's blob objects to exist in the target repository. 

243 

244 Conflict markers (``<<<<<<<`` / ``=======`` / ``>>>>>>>``) are left in 

245 place for manual resolution when both sides changed the same region. 

246 

247 Args: 

248 diff: Unified diff string (used only for file-list parsing). 

249 target: Path to the target repository. 

250 base_snapshot: Directory containing files at the previously-synced SHA. 

251 upstream_snapshot: Directory containing files at the new upstream SHA. 

252 

253 Returns: 

254 True if every file merged cleanly, False if any conflicts remain. 

255 """ 

256 file_entries = self._parse_diff_filenames(diff) 

257 all_clean = True 

258 conflict_files: list[str] = [] 

259 

260 for rel_path, is_new, is_deleted in file_entries: 

261 target_file = target / rel_path 

262 upstream_file = upstream_snapshot / rel_path 

263 base_file = base_snapshot / rel_path 

264 

265 if is_new: 

266 if upstream_file.exists(): 

267 target_file.parent.mkdir(parents=True, exist_ok=True) 

268 shutil.copy2(upstream_file, target_file) 

269 logger.debug(f"[merge-file] Added: {rel_path}") 

270 continue 

271 

272 if is_deleted: 

273 if target_file.exists(): 

274 target_file.unlink() 

275 logger.debug(f"[merge-file] Deleted: {rel_path}") 

276 continue 

277 

278 # Modified file — attempt a 3-way merge using the on-disk snapshots. 

279 if not target_file.exists(): 

280 if upstream_file.exists(): 

281 target_file.parent.mkdir(parents=True, exist_ok=True) 

282 shutil.copy2(upstream_file, target_file) 

283 logger.debug(f"[merge-file] Created (missing in target): {rel_path}") 

284 continue 

285 

286 if not base_file.exists() or not upstream_file.exists(): 

287 # Cannot 3-way-merge without both sides; just take upstream. 

288 if upstream_file.exists(): 

289 shutil.copy2(upstream_file, target_file) 

290 logger.debug(f"[merge-file] Overwrite (no base): {rel_path}") 

291 continue 

292 

293 result = subprocess.run( # nosec B603 # noqa: S603 

294 [ 

295 self.executable, 

296 "merge-file", 

297 "-L", 

298 "ours", 

299 "-L", 

300 "base", 

301 "-L", 

302 "upstream", 

303 str(target_file), 

304 str(base_file), 

305 str(upstream_file), 

306 ], 

307 capture_output=True, 

308 env=self.env, 

309 ) 

310 

311 if result.returncode > 0: 

312 conflict_files.append(rel_path) 

313 all_clean = False 

314 logger.warning(f"[merge-file] Conflict in {rel_path} — resolve markers manually") 

315 elif result.returncode < 0: 

316 logger.warning(f"[merge-file] Error merging {rel_path}: {result.stderr.decode().strip()}") 

317 all_clean = False 

318 else: 

319 logger.debug(f"[merge-file] Clean merge: {rel_path}") 

320 

321 if conflict_files: 

322 logger.warning( 

323 f"{len(conflict_files)} file(s) have conflict markers to resolve: " + ", ".join(conflict_files) 

324 ) 

325 

326 return all_clean 

327 

328 def _apply_diff( 

329 self, 

330 diff: str, 

331 target: Path, 

332 base_snapshot: Path | None = None, 

333 upstream_snapshot: Path | None = None, 

334 ) -> bool: 

335 """Apply a diff to the target project using ``git apply -3`` (3-way merge). 

336 

337 When ``git apply -3`` fails because the template's blob objects are absent 

338 from the target repository *and* both *base_snapshot* and 

339 *upstream_snapshot* are provided, falls back to :func:`_merge_file_fallback` 

340 which uses ``git merge-file`` on the on-disk snapshot files instead. 

341 

342 Otherwise falls back to ``git apply --reject``. 

343 

344 Args: 

345 diff: Unified diff string. 

346 target: Path to the target repository. 

347 base_snapshot: Optional directory containing files at the base SHA. 

348 upstream_snapshot: Optional directory containing files at the upstream SHA. 

349 

350 Returns: 

351 True if the diff applied cleanly, False if there were conflicts. 

352 """ 

353 if not diff.strip(): 

354 return True 

355 

356 try: 

357 subprocess.run( # nosec B603 # noqa: S603 

358 [self.executable, "apply", "-3"], 

359 input=diff.encode() if isinstance(diff, str) else diff, 

360 cwd=target, 

361 check=True, 

362 capture_output=True, 

363 env=self.env, 

364 ) 

365 except subprocess.CalledProcessError as e: 

366 stderr = e.stderr.decode() if isinstance(e.stderr, bytes) else (e.stderr or "") 

367 

368 # git apply -3 cannot do a real 3-way merge when the template blobs are 

369 # not present in the target repository's object store. If we have the 

370 # snapshot directories on disk, use git merge-file instead — it works 

371 # directly on file content and needs no shared git history. 

372 if "lacks the necessary blob" in stderr and base_snapshot is not None and upstream_snapshot is not None: 

373 logger.debug("git apply -3 lacks blob objects; switching to git merge-file fallback") 

374 return self._merge_file_fallback(diff, target, base_snapshot, upstream_snapshot) 

375 

376 if stderr: 

377 logger.warning(f"3-way merge had conflicts:\n{stderr.strip()}") 

378 # Fall back to --reject for conflict files 

379 try: 

380 subprocess.run( # nosec B603 # noqa: S603 

381 [self.executable, "apply", "--reject"], 

382 input=diff.encode() if isinstance(diff, str) else diff, 

383 cwd=target, 

384 check=True, 

385 capture_output=True, 

386 env=self.env, 

387 ) 

388 except subprocess.CalledProcessError as e2: 

389 stderr2 = e2.stderr.decode() if isinstance(e2.stderr, bytes) else (e2.stderr or "") 

390 if stderr2: 

391 logger.warning(stderr2.strip()) 

392 logger.warning( 

393 "Some changes could not be applied cleanly. Check for *.rej files and resolve conflicts manually." 

394 ) 

395 return False 

396 else: 

397 return True 

398 

399 def _copy_files_to_target(self, snapshot_dir: Path, target: Path, materialized: list[Path]) -> None: 

400 """Copy all materialized files from a snapshot into the target project. 

401 

402 Args: 

403 snapshot_dir: Directory containing the snapshot files. 

404 target: Path to the target repository. 

405 materialized: List of relative file paths to copy. 

406 """ 

407 for rel_path in sorted(materialized): 

408 src = snapshot_dir / rel_path 

409 dst = target / rel_path 

410 dst.parent.mkdir(parents=True, exist_ok=True) 

411 shutil.copy2(src, dst) 

412 logger.success(f"[COPY] {rel_path}") 

413 

414 def sync_merge( 

415 self, 

416 target: Path, 

417 upstream_snapshot: Path, 

418 upstream_sha: str, 

419 base_sha: str | None, 

420 materialized: list[Path], 

421 template: "RhizaTemplate", 

422 excludes: set[str], 

423 lock: "TemplateLock", 

424 lock_file: "Path | None" = None, 

425 ) -> None: 

426 """Execute the merge strategy (cruft-style 3-way merge). 

427 

428 When a base SHA exists, computes the diff between base and upstream 

429 snapshots and applies it via ``git apply -3``. On first sync (no base), 

430 falls back to a simple copy. 

431 

432 Args: 

433 target: Path to the target repository. 

434 upstream_snapshot: Path to the upstream snapshot directory. 

435 upstream_sha: HEAD SHA of the upstream template. 

436 base_sha: Previously synced commit SHA, or None for first sync. 

437 materialized: List of relative file paths. 

438 template: The :class:`~rhiza.models.RhizaTemplate` driving this sync. 

439 excludes: Set of relative paths to exclude. 

440 lock: Pre-built :class:`~rhiza.models.TemplateLock` for this sync. 

441 lock_file: Optional explicit path for the lock file. When ``None`` 

442 the default ``<target>/.rhiza/template.lock`` is used. 

443 """ 

444 from rhiza.commands._sync_helpers import ( 

445 _clean_orphaned_files, 

446 _read_previously_tracked_files, 

447 _warn_about_workflow_files, 

448 _write_lock, 

449 ) 

450 

451 # Snapshot the currently-tracked files before the merge runs. The merge 

452 # may write a new lock (e.g. on the "template unchanged" early-return path 

453 # in _merge_with_base), so we must read the old state first to ensure 

454 # orphan cleanup compares against the previous sync, not the new one. 

455 old_tracked_files = _read_previously_tracked_files(target, lock_file=lock_file) 

456 

457 base_snapshot = Path(tempfile.mkdtemp()) 

458 try: 

459 if base_sha: 

460 self._merge_with_base( 

461 target, 

462 upstream_snapshot, 

463 upstream_sha, 

464 base_sha, 

465 base_snapshot, 

466 template, 

467 excludes, 

468 lock, 

469 lock_file=lock_file, 

470 ) 

471 else: 

472 logger.info("First sync — copying all template files") 

473 self._copy_files_to_target(upstream_snapshot, target, materialized) 

474 

475 # Restore any template-managed files that are absent from the target. 

476 # This can happen when files tracked by the template do not exist in the 

477 # downstream repository — for example when the template snapshot was 

478 # unchanged since the last sync so no diff was applied, but the files 

479 # were never present or were manually deleted. 

480 missing_from_target = [p for p in materialized if not (target / p).exists()] 

481 if missing_from_target: 

482 logger.info(f"Restoring {len(missing_from_target)} template file(s) missing from target") 

483 self._copy_files_to_target(upstream_snapshot, target, missing_from_target) 

484 

485 _warn_about_workflow_files(materialized) 

486 _clean_orphaned_files( 

487 target, 

488 materialized, 

489 excludes=excludes, 

490 base_snapshot=base_snapshot, 

491 previously_tracked_files=old_tracked_files if old_tracked_files else None, 

492 lock_file=lock_file, 

493 ) 

494 _write_lock(target, lock, lock_file=lock_file) 

495 logger.success(f"Sync complete — {len(materialized)} file(s) processed") 

496 finally: 

497 if base_snapshot.exists(): 

498 shutil.rmtree(base_snapshot) 

499 

500 def update_sparse_checkout( 

501 self, 

502 tmp_dir: Path, 

503 include_paths: list[str], 

504 logger=None, 

505 ) -> None: 

506 """Update sparse-checkout paths in an already-cloned repository. 

507 

508 Args: 

509 tmp_dir: Temporary directory with cloned repository. 

510 include_paths: Paths to include in sparse checkout. 

511 logger: Optional logger; defaults to module logger. 

512 """ 

513 logger = logger or logging.getLogger(__name__) 

514 

515 try: 

516 logger.debug(f"Updating sparse checkout paths: {include_paths}") 

517 subprocess.run( # nosec B603 # noqa: S603 

518 [self.executable, "sparse-checkout", "set", "--skip-checks", *include_paths], 

519 cwd=tmp_dir, 

520 check=True, 

521 capture_output=True, 

522 text=True, 

523 env=self.env, 

524 ) 

525 logger.debug("Sparse checkout paths updated") 

526 except subprocess.CalledProcessError as e: 

527 logger.exception("Failed to update sparse checkout paths") 

528 _log_git_stderr_errors(e.stderr) 

529 raise 

530 

531 def get_head_sha(self, repo_dir: Path) -> str: 

532 """Return the HEAD commit SHA of a cloned repository. 

533 

534 Args: 

535 repo_dir: Path to the git repository. 

536 

537 Returns: 

538 The full HEAD SHA. 

539 """ 

540 result = subprocess.run( # nosec B603 # noqa: S603 

541 [self.executable, "rev-parse", "HEAD"], 

542 cwd=repo_dir, 

543 capture_output=True, 

544 text=True, 

545 check=True, 

546 env=self.env, 

547 ) 

548 return result.stdout.strip() 

549 

550 def clone_repository( 

551 self, 

552 git_url: str, 

553 tmp_dir: Path, 

554 branch: str, 

555 include_paths: list[str], 

556 logger=None, 

557 ) -> None: 

558 """Clone template repository with sparse checkout. 

559 

560 Args: 

561 git_url: URL of the repository to clone. 

562 tmp_dir: Temporary directory for cloning. 

563 branch: Branch to clone from the template repository. 

564 include_paths: Paths to include in sparse checkout. 

565 logger: Optional logger; defaults to module logger. 

566 """ 

567 logger = logger or logging.getLogger(__name__) 

568 

569 try: 

570 logger.debug("Executing git clone with sparse checkout") 

571 subprocess.run( # nosec B603 # noqa: S603 

572 [ 

573 self.executable, 

574 "clone", 

575 "--depth", 

576 "1", 

577 "--filter=blob:none", 

578 "--sparse", 

579 "--branch", 

580 branch, 

581 git_url, 

582 str(tmp_dir), 

583 ], 

584 check=True, 

585 capture_output=True, 

586 text=True, 

587 env=self.env, 

588 ) 

589 logger.debug("Git clone completed successfully") 

590 except subprocess.CalledProcessError as e: 

591 logger.exception(f"Failed to clone repository from {git_url}") 

592 _log_git_stderr_errors(e.stderr) 

593 logger.exception("Please check that:") 

594 logger.exception(" - The repository exists and is accessible") 

595 logger.exception(f" - Branch '{branch}' exists in the repository") 

596 logger.exception(" - You have network access to the git hosting service") 

597 raise 

598 

599 try: 

600 logger.debug("Initializing sparse checkout") 

601 subprocess.run( # nosec B603 # noqa: S603 

602 [self.executable, "sparse-checkout", "init", "--cone"], 

603 cwd=tmp_dir, 

604 check=True, 

605 capture_output=True, 

606 text=True, 

607 env=self.env, 

608 ) 

609 logger.debug("Sparse checkout initialized") 

610 except subprocess.CalledProcessError as e: 

611 logger.exception("Failed to initialize sparse checkout") 

612 _log_git_stderr_errors(e.stderr) 

613 raise 

614 

615 try: 

616 logger.debug(f"Setting sparse checkout paths: {include_paths}") 

617 subprocess.run( # nosec B603 # noqa: S603 

618 [self.executable, "sparse-checkout", "set", "--skip-checks", *include_paths], 

619 cwd=tmp_dir, 

620 check=True, 

621 capture_output=True, 

622 text=True, 

623 env=self.env, 

624 ) 

625 logger.debug("Sparse checkout paths configured") 

626 except subprocess.CalledProcessError as e: 

627 logger.exception("Failed to configure sparse checkout paths") 

628 _log_git_stderr_errors(e.stderr) 

629 raise 

630 

631 def clone_at_sha( 

632 self, 

633 git_url: str, 

634 sha: str, 

635 dest: Path, 

636 include_paths: list[str], 

637 logger=None, 

638 ) -> None: 

639 """Clone the template repository and checkout a specific commit. 

640 

641 Args: 

642 git_url: URL of the repository to clone. 

643 sha: Commit SHA to check out. 

644 dest: Target directory for the clone. 

645 include_paths: Paths to include in sparse checkout. 

646 logger: Optional logger; defaults to module logger. 

647 """ 

648 logger = logger or logging.getLogger(__name__) 

649 try: 

650 subprocess.run( # nosec B603 # noqa: S603 

651 [ 

652 self.executable, 

653 "clone", 

654 "--filter=blob:none", 

655 "--sparse", 

656 "--no-checkout", 

657 git_url, 

658 str(dest), 

659 ], 

660 check=True, 

661 capture_output=True, 

662 text=True, 

663 env=self.env, 

664 ) 

665 except subprocess.CalledProcessError as e: 

666 logger.exception(f"Failed to clone repository for base snapshot: {git_url}") 

667 _log_git_stderr_errors(e.stderr) 

668 raise 

669 

670 try: 

671 subprocess.run( # nosec B603 # noqa: S603 

672 [self.executable, "sparse-checkout", "init", "--cone"], 

673 cwd=dest, 

674 check=True, 

675 capture_output=True, 

676 text=True, 

677 env=self.env, 

678 ) 

679 subprocess.run( # nosec B603 # noqa: S603 

680 [self.executable, "sparse-checkout", "set", "--skip-checks", *include_paths], 

681 cwd=dest, 

682 check=True, 

683 capture_output=True, 

684 text=True, 

685 env=self.env, 

686 ) 

687 except subprocess.CalledProcessError as e: 

688 logger.exception("Failed to configure sparse checkout for base snapshot") 

689 _log_git_stderr_errors(e.stderr) 

690 raise 

691 

692 try: 

693 subprocess.run( # nosec B603 # noqa: S603 

694 [self.executable, "checkout", sha], 

695 cwd=dest, 

696 check=True, 

697 capture_output=True, 

698 text=True, 

699 env=self.env, 

700 ) 

701 except subprocess.CalledProcessError as e: 

702 logger.exception(f"Failed to checkout base commit {sha[:12]}") 

703 _log_git_stderr_errors(e.stderr) 

704 raise 

705 

706 def _merge_with_base( 

707 self, 

708 target: Path, 

709 upstream_snapshot: Path, 

710 upstream_sha: str, 

711 base_sha: str, 

712 base_snapshot: Path, 

713 template: "RhizaTemplate", 

714 excludes: set[str], 

715 lock: "TemplateLock", 

716 lock_file: "Path | None" = None, 

717 ) -> None: 

718 """Compute and apply the diff between base and upstream snapshots. 

719 

720 Args: 

721 target: Path to the target repository. 

722 upstream_snapshot: Path to the upstream snapshot directory. 

723 upstream_sha: HEAD SHA of the upstream template. 

724 base_sha: Previously synced commit SHA. 

725 base_snapshot: Directory to populate with the base snapshot. 

726 template: The :class:`~rhiza.models.RhizaTemplate` driving this sync. 

727 excludes: Set of relative paths to exclude. 

728 lock: Pre-built :class:`~rhiza.models.TemplateLock` for this sync. 

729 lock_file: Optional explicit path for the lock file. When ``None`` 

730 the default ``<target>/.rhiza/template.lock`` is used. 

731 """ 

732 from rhiza.commands._sync_helpers import _write_lock 

733 

734 logger.info(f"Cloning base snapshot at {base_sha[:12]}") 

735 base_clone = Path(tempfile.mkdtemp()) 

736 try: 

737 self.clone_at_sha(template.git_url, base_sha, base_clone, template.include) 

738 _prepare_snapshot(base_clone, template.include, excludes, base_snapshot) 

739 except Exception: 

740 logger.warning("Could not checkout base commit — treating all files as new") 

741 finally: 

742 if base_clone.exists(): 

743 shutil.rmtree(base_clone) 

744 

745 diff = self.get_diff(base_snapshot, upstream_snapshot) 

746 

747 if not diff.strip(): 

748 logger.success("Template unchanged since last sync — nothing to apply") 

749 _write_lock(target, lock, lock_file=lock_file) 

750 return 

751 

752 logger.info("Applying template changes via 3-way merge (cruft)...") 

753 clean = self._apply_diff(diff, target, base_snapshot=base_snapshot, upstream_snapshot=upstream_snapshot) 

754 

755 if clean: 

756 logger.success("All changes applied cleanly") 

757 else: 

758 logger.warning("Some changes had conflicts. Check for *.rej files and resolve manually.") 

759 

760 

761def _normalize_to_list(value: Any | list[Any] | None) -> list[Any]: 

762 r"""Convert a value to a list of strings. 

763 

764 Handles the case where YAML multi-line strings (using |) are parsed as 

765 a single string instead of a list. Splits the string by newlines and 

766 strips whitespace from each item. 

767 

768 Args: 

769 value: A string, list of strings, or None. 

770 

771 Returns: 

772 A list of strings. Empty list if value is None or empty. 

773 

774 Examples: 

775 >>> _normalize_to_list(None) 

776 [] 

777 >>> _normalize_to_list([]) 

778 [] 

779 >>> _normalize_to_list(['a', 'b', 'c']) 

780 ['a', 'b', 'c'] 

781 >>> _normalize_to_list('single line') 

782 ['single line'] 

783 >>> _normalize_to_list('line1\\n' + 'line2\\n' + 'line3') 

784 ['line1', 'line2', 'line3'] 

785 >>> _normalize_to_list(' item1 \\n' + ' item2 ') 

786 ['item1', 'item2'] 

787 """ 

788 if value is None: 

789 return [] 

790 if isinstance(value, list): 

791 return value 

792 if isinstance(value, str): 

793 # Split by newlines and filter out empty strings 

794 # Handle both actual newlines (\n) and literal backslash-n (\\n) 

795 items = value.split("\\n") if "\\n" in value and "\n" not in value else value.split("\n") 

796 return [item.strip() for item in items if item.strip()] 

797 return [] 

798 

799 

800def get_git_executable() -> str: 

801 """Get the absolute path to the git executable. 

802 

803 This function ensures we use the full path to git to prevent 

804 security issues related to PATH manipulation. 

805 

806 Returns: 

807 str: Absolute path to the git executable. 

808 

809 Raises: 

810 RuntimeError: If git executable is not found in PATH. 

811 """ 

812 git_path = shutil.which("git") 

813 if git_path is None: 

814 msg = "git executable not found in PATH. Please ensure git is installed and available." 

815 raise RuntimeError(msg) 

816 return git_path 

817 

818 

819def _log_git_stderr_errors(stderr: str | None) -> None: 

820 """Extract and log only relevant error messages from git stderr. 

821 

822 Args: 

823 stderr: Git command stderr output. 

824 """ 

825 if stderr: 

826 for line in stderr.strip().split("\n"): 

827 line = line.strip() 

828 if line and (line.startswith("fatal:") or line.startswith("error:")): 

829 logger.error(line) 

830 

831 

832def _expand_paths(base_dir: Path, paths: list[str]) -> list[Path]: 

833 """Expand file/directory paths relative to *base_dir* into individual files. 

834 

835 Args: 

836 base_dir: Root directory to resolve against. 

837 paths: Relative path strings. 

838 

839 Returns: 

840 Flat list of file paths. 

841 """ 

842 all_files: list[Path] = [] 

843 for p in paths: 

844 full = base_dir / p 

845 if full.is_file(): 

846 all_files.append(full) 

847 elif full.is_dir(): 

848 all_files.extend(f for f in full.rglob("*") if f.is_file()) 

849 else: 

850 logger.debug(f"Path not found in template repository: {p}") 

851 return all_files 

852 

853 

854def _excluded_set(base_dir: Path, excluded_paths: list[str]) -> set[str]: 

855 """Build a set of relative path strings that should be excluded. 

856 

857 Args: 

858 base_dir: Root of the template clone. 

859 excluded_paths: User-configured exclude list. 

860 

861 Returns: 

862 Set of relative path strings (always includes rhiza internals). 

863 """ 

864 result: set[str] = set() 

865 for f in _expand_paths(base_dir, excluded_paths): 

866 result.add(str(f.relative_to(base_dir))) 

867 result.add(".rhiza/template.yml") 

868 result.add(".rhiza/history") 

869 return result 

870 

871 

872def _prepare_snapshot( 

873 clone_dir: Path, 

874 include_paths: list[str], 

875 excludes: set[str], 

876 snapshot_dir: Path, 

877) -> list[Path]: 

878 """Copy included (non-excluded) files from a clone into a snapshot directory. 

879 

880 Args: 

881 clone_dir: Root of the template clone. 

882 include_paths: Paths to include. 

883 excludes: Set of relative paths to exclude. 

884 snapshot_dir: Destination directory for the snapshot. 

885 

886 Returns: 

887 List of relative file paths that were copied. 

888 """ 

889 materialized: list[Path] = [] 

890 for f in _expand_paths(clone_dir, include_paths): 

891 rel = str(f.relative_to(clone_dir)) 

892 if rel not in excludes: 

893 dst = snapshot_dir / rel 

894 dst.parent.mkdir(parents=True, exist_ok=True) 

895 shutil.copy2(f, dst) 

896 materialized.append(Path(rel)) 

897 return materialized