Coverage for src / rhiza / commands / _sync_helpers.py: 100%

104 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-02 07:04 +0000

1"""Internal helpers for the ``sync`` command. 

2 

3This module exposes the private implementation functions used by 

4:mod:`rhiza.commands.sync`. Placing them here gives tests a stable import 

5path (``from rhiza.commands._sync_helpers import ...``) without coupling them 

6to the command module's public API. 

7""" 

8 

9import contextlib 

10import dataclasses 

11import os 

12from pathlib import Path 

13 

14try: 

15 import fcntl 

16 

17 _FCNTL_AVAILABLE = True 

18except ImportError: # pragma: no cover - Windows 

19 _FCNTL_AVAILABLE = False 

20 

21from loguru import logger 

22 

23from rhiza.models import TemplateLock 

24 

25# --------------------------------------------------------------------------- 

26# Lock-file constant 

27# --------------------------------------------------------------------------- 

28 

29LOCK_FILE = ".rhiza/template.lock" 

30 

31 

32# Shared template helpers 

33# --------------------------------------------------------------------------- 

34 

35 

36def _load_lock_or_warn(target: Path, lock_file: Path | None = None) -> TemplateLock | None: 

37 """Load the template.lock file, or log a warning and return None if missing. 

38 

39 Args: 

40 target: Path to the target repository root. 

41 lock_file: Optional explicit path to the lock file. When ``None`` the 

42 default ``<target>/.rhiza/template.lock`` is used. 

43 

44 Returns: 

45 The loaded :class:`~rhiza.models.TemplateLock`, or ``None`` when the 

46 lock file does not exist. 

47 """ 

48 if lock_file is None: 

49 lock_file = target / LOCK_FILE 

50 lock_path = lock_file.resolve() 

51 if not lock_path.exists(): 

52 logger.warning("No template.lock found — run `rhiza sync` first") 

53 return None 

54 return TemplateLock.from_yaml(lock_path) 

55 

56 

57def _warn_about_workflow_files(materialized_files: list[Path]) -> None: 

58 """Warn if workflow files were materialized. 

59 

60 Args: 

61 materialized_files: List of materialized file paths. 

62 """ 

63 workflow_files = [p for p in materialized_files if p.parts[:2] == (".github", "workflows")] 

64 

65 if workflow_files: 

66 logger.warning( 

67 "Workflow files were materialized. Updating these files requires " 

68 "a token with the 'workflow' permission in GitHub Actions." 

69 ) 

70 logger.info(f"Workflow files affected: {len(workflow_files)}") 

71 

72 

73def _files_from_snapshot(snapshot_dir: Path) -> set[Path]: 

74 """Return all files in *snapshot_dir* as paths relative to that directory. 

75 

76 Args: 

77 snapshot_dir: Root of a snapshot directory tree. 

78 

79 Returns: 

80 Set of relative file paths found under *snapshot_dir*. 

81 """ 

82 return {f.relative_to(snapshot_dir) for f in snapshot_dir.rglob("*") if f.is_file()} 

83 

84 

85def _read_previously_tracked_files( 

86 target: Path, 

87 base_snapshot: Path | None = None, 

88 lock_file: Path | None = None, 

89) -> set[Path]: 

90 """Return the set of files tracked by the last sync. 

91 

92 Resolution order: 

93 1. ``template.lock.files`` when the field is present and non-empty. 

94 2. *base_snapshot* directory listing when provided and non-empty (used as a 

95 fallback for lock files that pre-date the ``files`` field). 

96 3. Legacy ``.rhiza/history`` file for backward compatibility. 

97 

98 Args: 

99 target: Target repository path. 

100 base_snapshot: Optional directory containing the template snapshot at 

101 the previously-synced SHA. When the lock file has no ``files`` 

102 entry this snapshot is used to reconstruct the tracked-file list, 

103 avoiding an extra network fetch. 

104 lock_file: Optional explicit path to the lock file. When ``None`` the 

105 default ``<target>/.rhiza/template.lock`` is used. 

106 

107 Returns: 

108 Set of previously tracked file paths (relative to target), or an empty 

109 set when no tracking information is found. 

110 """ 

111 if lock_file is None: 

112 lock_file = target / ".rhiza" / "template.lock" 

113 if lock_file.exists(): 

114 try: 

115 lock = TemplateLock.from_yaml(lock_file) 

116 if lock.files: 

117 files = {Path(f) for f in lock.files} 

118 logger.debug(f"Reading previous file list from template.lock ({len(files)} files)") 

119 return files 

120 # Lock exists but has no files list - try to reconstruct from the 

121 # base snapshot that was already fetched during this sync run. 

122 if base_snapshot is not None and base_snapshot.is_dir(): 

123 snapshot_files = _files_from_snapshot(base_snapshot) 

124 if snapshot_files: 

125 logger.debug(f"Reconstructing previous file list from base snapshot ({len(snapshot_files)} files)") 

126 return snapshot_files 

127 except Exception as e: 

128 logger.debug(f"Could not read template.lock for orphan cleanup: {e}") 

129 

130 history_file = target / ".rhiza" / "history" 

131 

132 if history_file.exists(): 

133 logger.debug(f"Reading existing history file: {history_file.relative_to(target)}") 

134 else: 

135 logger.debug("No previous file tracking found") 

136 return set() 

137 

138 files = set() 

139 with history_file.open("r", encoding="utf-8") as f: 

140 for line in f: 

141 line = line.strip() 

142 if line and not line.startswith("#"): 

143 files.add(Path(line)) 

144 return files 

145 

146 

147def _delete_orphaned_file(target: Path, file_path: Path) -> None: 

148 """Delete a single orphaned file from the target repository. 

149 

150 Args: 

151 target: Target repository path. 

152 file_path: Relative path of the orphaned file to delete. 

153 """ 

154 full_path = target / file_path 

155 if full_path.exists(): 

156 try: 

157 full_path.unlink() 

158 logger.success(f"[DEL] {file_path}") 

159 except Exception as e: 

160 logger.warning(f"Failed to delete {file_path}: {e}") 

161 else: 

162 logger.debug(f"Skipping {file_path} (already deleted)") 

163 

164 

165def _clean_orphaned_files( 

166 target: Path, 

167 materialized_files: list[Path], 

168 base_snapshot: Path | None = None, 

169 excludes: set[str] | None = None, 

170 previously_tracked_files: set[Path] | None = None, 

171 lock_file: Path | None = None, 

172) -> None: 

173 """Clean up files that are no longer maintained by template. 

174 

175 Files that are explicitly excluded via the ``exclude:`` setting in 

176 ``template.yml`` are never deleted even if they appear in a previous lock 

177 but are absent from *materialized_files*. 

178 

179 Args: 

180 target: Target repository path. 

181 materialized_files: List of currently materialized files. 

182 base_snapshot: Optional directory containing the template snapshot at 

183 the previously-synced SHA. Passed through to 

184 :func:`_read_previously_tracked_files` as a fallback when the lock 

185 file has no ``files`` entry. Ignored when *previously_tracked_files* 

186 is supplied directly. 

187 excludes: Optional set of relative path strings that are currently 

188 excluded from the template sync. Any previously-tracked file 

189 present in this set is kept (the user explicitly opted it out). 

190 previously_tracked_files: Optional pre-read set of files that were 

191 tracked by the previous sync. When supplied this takes precedence 

192 over reading from the on-disk lock file, which allows callers to 

193 snapshot the old state before the lock is overwritten by the merge. 

194 lock_file: Optional explicit path to the lock file. When ``None`` the 

195 default ``<target>/.rhiza/template.lock`` is used. 

196 """ 

197 if previously_tracked_files is None: 

198 previously_tracked_files = _read_previously_tracked_files( 

199 target, base_snapshot=base_snapshot, lock_file=lock_file 

200 ) 

201 if not previously_tracked_files: 

202 return 

203 

204 logger.debug(f"Found {len(previously_tracked_files)} file(s) in previous tracking") 

205 

206 orphaned_files = previously_tracked_files - set(materialized_files) 

207 

208 # Don't delete files that the user has explicitly excluded — they have 

209 # opted those files out of template management and want to keep them. 

210 if excludes: 

211 excluded_as_paths = {Path(e) for e in excludes} 

212 orphaned_files = orphaned_files - excluded_as_paths 

213 

214 protected_files = {Path(".rhiza/template.yml")} 

215 

216 if not orphaned_files: 

217 logger.debug("No orphaned files to clean up") 

218 return 

219 

220 logger.info(f"Found {len(orphaned_files)} orphaned file(s) no longer maintained by template") 

221 for file_path in sorted(orphaned_files): 

222 if file_path in protected_files: 

223 logger.info(f"Skipping protected file: {file_path}") 

224 continue 

225 _delete_orphaned_file(target, file_path) 

226 

227 

228# --------------------------------------------------------------------------- 

229# Lock-file helpers 

230# --------------------------------------------------------------------------- 

231 

232 

233def _write_lock(target: Path, lock: TemplateLock, lock_file: Path | None = None) -> None: 

234 """Persist the lock data to the YAML lock file. 

235 

236 Writes to a ``.tmp`` sibling file first, then replaces the real lock file 

237 atomically with ``os.replace()``. An exclusive advisory lock (via 

238 ``fcntl.flock``) is held for the entire write + rename sequence when 

239 ``fcntl`` is available so that concurrent writers do not corrupt the file. 

240 Falls back silently on platforms without ``fcntl`` (e.g. Windows). 

241 

242 Only files that actually exist in *target* are recorded in ``lock.files``. 

243 This guarantees that the lock never references paths that are absent from 

244 the repository. 

245 

246 Args: 

247 target: Path to the target repository. 

248 lock: The :class:`~rhiza.models.TemplateLock` to record. 

249 lock_file: Optional explicit path for the lock file. When ``None`` the 

250 default ``<target>/.rhiza/template.lock`` is used. 

251 """ 

252 # Filter the files list to only include paths that exist on disk so that 

253 # the lock never contains entries for files that are absent from the repo. 

254 # Always sort the resulting list alphabetically. 

255 existing_files = sorted(f for f in lock.files if (target / f).exists()) 

256 missing = sorted(set(lock.files) - set(existing_files)) 

257 if missing: 

258 missing_str = ", ".join(missing) 

259 logger.warning(f"{len(missing)} file(s) in lock absent from target and excluded: {missing_str}") 

260 lock = dataclasses.replace(lock, files=existing_files) 

261 

262 lock_path = lock_file if lock_file is not None else target / LOCK_FILE 

263 tmp_path = Path(str(lock_path) + ".tmp") 

264 lock_path.parent.mkdir(parents=True, exist_ok=True) 

265 # Acquire an exclusive advisory lock via a dedicated lock-fd file so that 

266 # the flock survives the os.replace() rename of the actual lock file. 

267 lock_fd_path = Path(str(lock_path) + ".fd") 

268 try: 

269 with lock_fd_path.open("a", encoding="utf-8") as lock_fd: 

270 if _FCNTL_AVAILABLE: 

271 fcntl.flock(lock_fd, fcntl.LOCK_EX) 

272 else: 

273 logger.debug("fcntl not available - skipping advisory lock on write") 

274 lock.to_yaml(tmp_path) 

275 os.replace(tmp_path, lock_path) 

276 finally: 

277 # Best-effort cleanup of the fd file; failures here are non-critical. 

278 with contextlib.suppress(OSError): 

279 lock_fd_path.unlink(missing_ok=True) 

280 logger.info(f"Updated {lock_path.name} -> {lock.sha[:12]}")