Coverage for src / rhiza / commands / _sync_helpers.py: 100%
104 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-02 07:04 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-02 07:04 +0000
1"""Internal helpers for the ``sync`` command.
3This module exposes the private implementation functions used by
4:mod:`rhiza.commands.sync`. Placing them here gives tests a stable import
5path (``from rhiza.commands._sync_helpers import ...``) without coupling them
6to the command module's public API.
7"""
9import contextlib
10import dataclasses
11import os
12from pathlib import Path
14try:
15 import fcntl
17 _FCNTL_AVAILABLE = True
18except ImportError: # pragma: no cover - Windows
19 _FCNTL_AVAILABLE = False
21from loguru import logger
23from rhiza.models import TemplateLock
25# ---------------------------------------------------------------------------
26# Lock-file constant
27# ---------------------------------------------------------------------------
29LOCK_FILE = ".rhiza/template.lock"
32# Shared template helpers
33# ---------------------------------------------------------------------------
36def _load_lock_or_warn(target: Path, lock_file: Path | None = None) -> TemplateLock | None:
37 """Load the template.lock file, or log a warning and return None if missing.
39 Args:
40 target: Path to the target repository root.
41 lock_file: Optional explicit path to the lock file. When ``None`` the
42 default ``<target>/.rhiza/template.lock`` is used.
44 Returns:
45 The loaded :class:`~rhiza.models.TemplateLock`, or ``None`` when the
46 lock file does not exist.
47 """
48 if lock_file is None:
49 lock_file = target / LOCK_FILE
50 lock_path = lock_file.resolve()
51 if not lock_path.exists():
52 logger.warning("No template.lock found — run `rhiza sync` first")
53 return None
54 return TemplateLock.from_yaml(lock_path)
57def _warn_about_workflow_files(materialized_files: list[Path]) -> None:
58 """Warn if workflow files were materialized.
60 Args:
61 materialized_files: List of materialized file paths.
62 """
63 workflow_files = [p for p in materialized_files if p.parts[:2] == (".github", "workflows")]
65 if workflow_files:
66 logger.warning(
67 "Workflow files were materialized. Updating these files requires "
68 "a token with the 'workflow' permission in GitHub Actions."
69 )
70 logger.info(f"Workflow files affected: {len(workflow_files)}")
73def _files_from_snapshot(snapshot_dir: Path) -> set[Path]:
74 """Return all files in *snapshot_dir* as paths relative to that directory.
76 Args:
77 snapshot_dir: Root of a snapshot directory tree.
79 Returns:
80 Set of relative file paths found under *snapshot_dir*.
81 """
82 return {f.relative_to(snapshot_dir) for f in snapshot_dir.rglob("*") if f.is_file()}
85def _read_previously_tracked_files(
86 target: Path,
87 base_snapshot: Path | None = None,
88 lock_file: Path | None = None,
89) -> set[Path]:
90 """Return the set of files tracked by the last sync.
92 Resolution order:
93 1. ``template.lock.files`` when the field is present and non-empty.
94 2. *base_snapshot* directory listing when provided and non-empty (used as a
95 fallback for lock files that pre-date the ``files`` field).
96 3. Legacy ``.rhiza/history`` file for backward compatibility.
98 Args:
99 target: Target repository path.
100 base_snapshot: Optional directory containing the template snapshot at
101 the previously-synced SHA. When the lock file has no ``files``
102 entry this snapshot is used to reconstruct the tracked-file list,
103 avoiding an extra network fetch.
104 lock_file: Optional explicit path to the lock file. When ``None`` the
105 default ``<target>/.rhiza/template.lock`` is used.
107 Returns:
108 Set of previously tracked file paths (relative to target), or an empty
109 set when no tracking information is found.
110 """
111 if lock_file is None:
112 lock_file = target / ".rhiza" / "template.lock"
113 if lock_file.exists():
114 try:
115 lock = TemplateLock.from_yaml(lock_file)
116 if lock.files:
117 files = {Path(f) for f in lock.files}
118 logger.debug(f"Reading previous file list from template.lock ({len(files)} files)")
119 return files
120 # Lock exists but has no files list - try to reconstruct from the
121 # base snapshot that was already fetched during this sync run.
122 if base_snapshot is not None and base_snapshot.is_dir():
123 snapshot_files = _files_from_snapshot(base_snapshot)
124 if snapshot_files:
125 logger.debug(f"Reconstructing previous file list from base snapshot ({len(snapshot_files)} files)")
126 return snapshot_files
127 except Exception as e:
128 logger.debug(f"Could not read template.lock for orphan cleanup: {e}")
130 history_file = target / ".rhiza" / "history"
132 if history_file.exists():
133 logger.debug(f"Reading existing history file: {history_file.relative_to(target)}")
134 else:
135 logger.debug("No previous file tracking found")
136 return set()
138 files = set()
139 with history_file.open("r", encoding="utf-8") as f:
140 for line in f:
141 line = line.strip()
142 if line and not line.startswith("#"):
143 files.add(Path(line))
144 return files
147def _delete_orphaned_file(target: Path, file_path: Path) -> None:
148 """Delete a single orphaned file from the target repository.
150 Args:
151 target: Target repository path.
152 file_path: Relative path of the orphaned file to delete.
153 """
154 full_path = target / file_path
155 if full_path.exists():
156 try:
157 full_path.unlink()
158 logger.success(f"[DEL] {file_path}")
159 except Exception as e:
160 logger.warning(f"Failed to delete {file_path}: {e}")
161 else:
162 logger.debug(f"Skipping {file_path} (already deleted)")
165def _clean_orphaned_files(
166 target: Path,
167 materialized_files: list[Path],
168 base_snapshot: Path | None = None,
169 excludes: set[str] | None = None,
170 previously_tracked_files: set[Path] | None = None,
171 lock_file: Path | None = None,
172) -> None:
173 """Clean up files that are no longer maintained by template.
175 Files that are explicitly excluded via the ``exclude:`` setting in
176 ``template.yml`` are never deleted even if they appear in a previous lock
177 but are absent from *materialized_files*.
179 Args:
180 target: Target repository path.
181 materialized_files: List of currently materialized files.
182 base_snapshot: Optional directory containing the template snapshot at
183 the previously-synced SHA. Passed through to
184 :func:`_read_previously_tracked_files` as a fallback when the lock
185 file has no ``files`` entry. Ignored when *previously_tracked_files*
186 is supplied directly.
187 excludes: Optional set of relative path strings that are currently
188 excluded from the template sync. Any previously-tracked file
189 present in this set is kept (the user explicitly opted it out).
190 previously_tracked_files: Optional pre-read set of files that were
191 tracked by the previous sync. When supplied this takes precedence
192 over reading from the on-disk lock file, which allows callers to
193 snapshot the old state before the lock is overwritten by the merge.
194 lock_file: Optional explicit path to the lock file. When ``None`` the
195 default ``<target>/.rhiza/template.lock`` is used.
196 """
197 if previously_tracked_files is None:
198 previously_tracked_files = _read_previously_tracked_files(
199 target, base_snapshot=base_snapshot, lock_file=lock_file
200 )
201 if not previously_tracked_files:
202 return
204 logger.debug(f"Found {len(previously_tracked_files)} file(s) in previous tracking")
206 orphaned_files = previously_tracked_files - set(materialized_files)
208 # Don't delete files that the user has explicitly excluded — they have
209 # opted those files out of template management and want to keep them.
210 if excludes:
211 excluded_as_paths = {Path(e) for e in excludes}
212 orphaned_files = orphaned_files - excluded_as_paths
214 protected_files = {Path(".rhiza/template.yml")}
216 if not orphaned_files:
217 logger.debug("No orphaned files to clean up")
218 return
220 logger.info(f"Found {len(orphaned_files)} orphaned file(s) no longer maintained by template")
221 for file_path in sorted(orphaned_files):
222 if file_path in protected_files:
223 logger.info(f"Skipping protected file: {file_path}")
224 continue
225 _delete_orphaned_file(target, file_path)
228# ---------------------------------------------------------------------------
229# Lock-file helpers
230# ---------------------------------------------------------------------------
233def _write_lock(target: Path, lock: TemplateLock, lock_file: Path | None = None) -> None:
234 """Persist the lock data to the YAML lock file.
236 Writes to a ``.tmp`` sibling file first, then replaces the real lock file
237 atomically with ``os.replace()``. An exclusive advisory lock (via
238 ``fcntl.flock``) is held for the entire write + rename sequence when
239 ``fcntl`` is available so that concurrent writers do not corrupt the file.
240 Falls back silently on platforms without ``fcntl`` (e.g. Windows).
242 Only files that actually exist in *target* are recorded in ``lock.files``.
243 This guarantees that the lock never references paths that are absent from
244 the repository.
246 Args:
247 target: Path to the target repository.
248 lock: The :class:`~rhiza.models.TemplateLock` to record.
249 lock_file: Optional explicit path for the lock file. When ``None`` the
250 default ``<target>/.rhiza/template.lock`` is used.
251 """
252 # Filter the files list to only include paths that exist on disk so that
253 # the lock never contains entries for files that are absent from the repo.
254 # Always sort the resulting list alphabetically.
255 existing_files = sorted(f for f in lock.files if (target / f).exists())
256 missing = sorted(set(lock.files) - set(existing_files))
257 if missing:
258 missing_str = ", ".join(missing)
259 logger.warning(f"{len(missing)} file(s) in lock absent from target and excluded: {missing_str}")
260 lock = dataclasses.replace(lock, files=existing_files)
262 lock_path = lock_file if lock_file is not None else target / LOCK_FILE
263 tmp_path = Path(str(lock_path) + ".tmp")
264 lock_path.parent.mkdir(parents=True, exist_ok=True)
265 # Acquire an exclusive advisory lock via a dedicated lock-fd file so that
266 # the flock survives the os.replace() rename of the actual lock file.
267 lock_fd_path = Path(str(lock_path) + ".fd")
268 try:
269 with lock_fd_path.open("a", encoding="utf-8") as lock_fd:
270 if _FCNTL_AVAILABLE:
271 fcntl.flock(lock_fd, fcntl.LOCK_EX)
272 else:
273 logger.debug("fcntl not available - skipping advisory lock on write")
274 lock.to_yaml(tmp_path)
275 os.replace(tmp_path, lock_path)
276 finally:
277 # Best-effort cleanup of the fd file; failures here are non-critical.
278 with contextlib.suppress(OSError):
279 lock_fd_path.unlink(missing_ok=True)
280 logger.info(f"Updated {lock_path.name} -> {lock.sha[:12]}")