Coverage for src / rhiza / models / bundle.py: 100%
183 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-06-15 18:22 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-06-15 18:22 +0000
1"""Bundle models for Rhiza configuration."""
3from dataclasses import dataclass, field
4from typing import Any
6from rhiza.models._base import YamlSerializable
7from rhiza.models._git_utils import _normalize_to_list
10@dataclass(frozen=True)
11class BundleFileEntry:
12 """A file entry in a bundle, with optional source→destination path remapping.
14 When ``source`` and ``dest`` differ, the file is read from ``source`` in the
15 template repository but written to ``dest`` in the downstream project.
17 Attributes:
18 source: Path of the file in the template repository.
19 dest: Path where the file should be placed in the downstream project.
20 """
22 source: str
23 dest: str
25 @property
26 def is_remapped(self) -> bool:
27 """True when the destination path differs from the source path."""
28 return self.source != self.dest
30 @classmethod
31 def from_config_entry(cls, entry: "str | dict[str, str]") -> "BundleFileEntry":
32 """Parse a file entry from a YAML config value (string or dict).
34 Args:
35 entry: Either a plain path string or a ``{source, dest}`` dict.
37 Returns:
38 A :class:`BundleFileEntry` instance.
40 Raises:
41 TypeError: If the entry is a dict without a ``source`` key.
42 """
43 if isinstance(entry, str):
44 return cls(source=entry, dest=entry)
45 if not isinstance(entry, dict) or "source" not in entry:
46 raise TypeError( # noqa: TRY003
47 f"File entry must be a string or a dict with a 'source' key, got: {entry!r}"
48 )
49 source = entry["source"]
50 dest = entry.get("dest", source)
51 return cls(source=source, dest=dest)
53 def to_config_entry(self) -> "str | dict[str, str]":
54 """Serialize back to the YAML config representation."""
55 if not self.is_remapped:
56 return self.source
57 return {"source": self.source, "dest": self.dest}
59 def remap_expanded_path(self, expanded_source: str) -> str:
60 """Map an expanded source path to its destination path.
62 Handles both exact file matches and directory-prefix matches.
64 Args:
65 expanded_source: A source path produced by expanding this entry.
67 Returns:
68 The corresponding destination path.
69 """
70 if not self.is_remapped:
71 return expanded_source
72 if expanded_source == self.source:
73 return self.dest
74 src_prefix = self.source.rstrip("/") + "/"
75 if expanded_source.startswith(src_prefix):
76 dest_prefix = self.dest.rstrip("/") + "/"
77 return dest_prefix + expanded_source[len(src_prefix) :]
78 return expanded_source
81@dataclass(frozen=True, kw_only=True)
82class ProfileDefinition:
83 """Represents a single profile from template-bundles.yml.
85 Attributes:
86 description: Human-readable description of the profile.
87 bundles: List of bundle names included in this profile.
88 """
90 description: str = ""
91 bundles: list[str] = field(default_factory=list)
94@dataclass(frozen=True, kw_only=True)
95class BundleDefinition:
96 """Represents a single bundle from template-bundles.yml.
98 Attributes:
99 description: Human-readable description of the bundle.
100 files: Explicit file entries (legacy format only — new bundles own files via
101 their ``bundles/<name>/`` directory in the template repository).
102 requires: List of bundle names that this bundle requires.
103 recommends: List of bundle names that this bundle recommends (soft deps).
104 standalone: Whether this bundle is standalone (no dependencies).
105 required: Whether this bundle is mandatory (always included).
106 notes: Free-form notes for maintainers (not synced to downstream projects).
107 """
109 description: str
110 standalone: bool = True
111 required: bool = False
112 files: list[BundleFileEntry] = field(default_factory=list)
113 requires: list[str] = field(default_factory=list)
114 recommends: list[str] = field(default_factory=list)
115 notes: str = ""
118@dataclass(frozen=True, kw_only=True)
119class RhizaBundles(YamlSerializable):
120 """Represents the structure of template-bundles.yml.
122 Attributes:
123 version: Optional version string of the bundles configuration format.
124 bundles: Dictionary mapping bundle names to their definitions.
125 """
127 version: str | None = None
128 bundles: dict[str, BundleDefinition] = field(default_factory=dict)
129 profiles: dict[str, ProfileDefinition] = field(default_factory=dict)
131 @property
132 def config(self) -> dict[str, Any]:
133 """Return the bundles' current state as a configuration dictionary."""
134 config: dict[str, Any] = {}
136 if self.version is not None:
137 config["version"] = self.version
139 bundles_dict: dict[str, Any] = {}
140 for name, bundle in self.bundles.items():
141 bundle_entry: dict[str, Any] = {"description": bundle.description}
142 if bundle.required:
143 bundle_entry["required"] = bundle.required
144 if bundle.standalone:
145 bundle_entry["standalone"] = bundle.standalone
146 if bundle.requires:
147 bundle_entry["requires"] = bundle.requires
148 if bundle.recommends:
149 bundle_entry["recommends"] = bundle.recommends
150 if bundle.files:
151 bundle_entry["files"] = [f.to_config_entry() for f in bundle.files]
152 if bundle.notes:
153 bundle_entry["notes"] = bundle.notes
154 bundles_dict[name] = bundle_entry
156 config["bundles"] = bundles_dict
158 if self.profiles:
159 profiles_dict: dict[str, Any] = {}
160 for name, profile in self.profiles.items():
161 profile_entry: dict[str, Any] = {}
162 if profile.description:
163 profile_entry["description"] = profile.description
164 profile_entry["bundles"] = profile.bundles
165 profiles_dict[name] = profile_entry
166 config["profiles"] = profiles_dict
168 return config
170 @classmethod
171 def from_config(cls, config: dict[str, Any]) -> "RhizaBundles":
172 """Create a RhizaBundles instance from a configuration dictionary.
174 Args:
175 config: Dictionary containing bundles configuration.
177 Returns:
178 A new RhizaBundles instance.
180 Raises:
181 TypeError: If bundle data has invalid types.
182 """
183 version = config.get("version")
185 bundles_config = config.get("bundles", {})
186 if not isinstance(bundles_config, dict):
187 msg = "Bundles must be a dictionary"
188 raise TypeError(msg)
190 bundles: dict[str, BundleDefinition] = {}
191 for bundle_name, bundle_data in bundles_config.items():
192 if not isinstance(bundle_data, dict):
193 msg = f"Bundle '{bundle_name}' must be a dictionary"
194 raise TypeError(msg)
196 raw_files = bundle_data.get("files")
197 if isinstance(raw_files, list):
198 files = [BundleFileEntry.from_config_entry(e) for e in raw_files]
199 elif isinstance(raw_files, str):
200 files = [BundleFileEntry.from_config_entry(e) for e in _normalize_to_list(raw_files)]
201 else:
202 files = []
203 requires = _normalize_to_list(bundle_data.get("requires"))
205 bundles[bundle_name] = BundleDefinition(
206 description=bundle_data.get("description", ""),
207 files=files,
208 requires=requires,
209 recommends=_normalize_to_list(bundle_data.get("recommends")),
210 standalone=bundle_data.get("standalone", True),
211 required=bool(bundle_data.get("required", False)),
212 notes=bundle_data.get("notes") or "",
213 )
215 profiles_config = config.get("profiles", {})
216 if profiles_config is None:
217 profiles_config = {}
218 elif not isinstance(profiles_config, dict):
219 msg = "Profiles must be a dictionary"
220 raise TypeError(msg)
222 profiles: dict[str, ProfileDefinition] = {}
223 for profile_name, profile_data in profiles_config.items():
224 if not isinstance(profile_data, dict):
225 msg = f"Profile '{profile_name}' must be a dictionary"
226 raise TypeError(msg)
227 profiles[profile_name] = ProfileDefinition(
228 description=profile_data.get("description", ""),
229 bundles=_normalize_to_list(profile_data.get("bundles")),
230 )
232 return cls(version=version, bundles=bundles, profiles=profiles)
234 def resolve_to_paths(self, bundle_names: list[str]) -> list[str]:
235 """Convert bundle names to deduplicated file paths.
237 Args:
238 bundle_names: List of bundle names to resolve.
240 Returns:
241 Deduplicated list of file paths from all bundles and their dependencies.
243 Raises:
244 ValueError: If a bundle doesn't exist or circular dependency detected.
245 """
246 bundles: list[str] = []
247 resolved: set[str] = set()
248 resolving: set[str] = set()
249 paths: list[str] = []
250 seen: set[str] = set()
252 def _collect(bundle_name: str) -> None:
253 """Recursively resolve bundle dependencies in topological order."""
254 if bundle_name not in self.bundles:
255 msg = f"Bundle '{bundle_name}' does not exist"
256 raise ValueError(msg)
257 if bundle_name in resolving:
258 msg = f"Circular dependency detected for bundle '{bundle_name}'"
259 raise ValueError(msg)
260 if bundle_name in resolved:
261 return
263 resolving.add(bundle_name)
264 for dependency in self.bundles[bundle_name].requires:
265 _collect(dependency)
266 resolving.remove(bundle_name)
267 resolved.add(bundle_name)
268 bundles.append(bundle_name)
270 for bundle_name in bundle_names:
271 _collect(bundle_name)
273 for bundle_name in bundles:
274 bundle = self.bundles[bundle_name]
275 if bundle.files:
276 for entry in bundle.files:
277 if entry.source not in seen:
278 paths.append(entry.source)
279 seen.add(entry.source)
280 else:
281 dir_path = f"bundles/{bundle_name}/"
282 if dir_path not in seen:
283 paths.append(dir_path)
284 seen.add(dir_path)
286 return paths
288 def resolve_to_path_map(self, bundle_names: list[str]) -> dict[str, str]:
289 """Return a source→destination mapping for all remapped file entries.
291 Plain (non-remapped) entries are excluded — callers can assume an
292 absent key means ``dest == source``.
294 Args:
295 bundle_names: List of bundle names to resolve (dependencies included).
297 Returns:
298 Dict mapping source path → destination path for remapped entries only.
299 """
300 path_map: dict[str, str] = {}
301 resolved = self.resolve_to_paths(bundle_names)
302 resolved_set = set(resolved)
304 seen: set[str] = set()
305 bundle_order: list[str] = []
306 resolving: set[str] = set()
308 def _collect(name: str) -> None:
309 """Recursively collect bundle dependency order, skipping already-seen entries."""
310 if name not in self.bundles or name in resolving or name in seen:
311 return
312 resolving.add(name)
313 for dep in self.bundles[name].requires:
314 _collect(dep)
315 resolving.remove(name)
316 seen.add(name)
317 bundle_order.append(name)
319 for name in bundle_names:
320 _collect(name)
322 for bundle_name in bundle_order:
323 bundle = self.bundles[bundle_name]
324 if bundle.files:
325 for entry in bundle.files:
326 if entry.source in resolved_set and entry.is_remapped:
327 path_map[entry.source] = entry.dest
328 else:
329 path_map[f"bundles/{bundle_name}/"] = ""
331 return path_map
333 def resolve_profile_to_paths(self, profile_name: str) -> list[str]:
334 """Resolve a profile name to deduplicated file paths.
336 Args:
337 profile_name: Name of the profile to resolve.
339 Returns:
340 Deduplicated list of file paths from all bundles in the profile.
342 Raises:
343 ValueError: If the profile doesn't exist or a referenced bundle doesn't exist.
344 """
345 if profile_name not in self.profiles:
346 msg = f"Profile '{profile_name}' does not exist"
347 raise ValueError(msg)
348 return self.resolve_to_paths(self.profiles[profile_name].bundles)