|
1 | 1 | from __future__ import annotations |
2 | 2 |
|
| 3 | +import copy |
3 | 4 | import json |
4 | 5 | import pathlib |
5 | 6 | import typing as t |
@@ -106,6 +107,20 @@ def _from_file(cls, path: pathlib.Path) -> dict[str, t.Any]: |
106 | 107 | assert isinstance(path, pathlib.Path) |
107 | 108 | content = path.open(encoding="utf-8").read() |
108 | 109 |
|
| 110 | + # TODO(#?): Align this loader with the duplicate-aware YAML handling that |
| 111 | + # ``vcspull fmt`` introduced in November 2025. The formatter now uses a |
| 112 | + # custom SafeLoader subclass to retain and merge duplicate workspace root |
| 113 | + # sections so repos are never overwritten. ConfigReader currently drops |
| 114 | + # later duplicates because PyYAML keeps only the last key. Options: |
| 115 | + # 1) Extract the formatter's loader/merge helpers into a shared utility |
| 116 | + # that ConfigReader can reuse here; |
| 117 | + # 2) Replace ConfigReader entirely when reading vcspull configs and call |
| 118 | + # the formatter helpers directly; |
| 119 | + # 3) Keep this basic loader but add an opt-in path for duplicate-aware |
| 120 | + # parsing so commands like ``vcspull add`` can avoid data loss. |
| 121 | + # Revisit once the new ``vcspull add`` flow lands so both commands share |
| 122 | + # the same duplication safeguards. |
| 123 | + |
109 | 124 | if path.suffix in {".yaml", ".yml"}: |
110 | 125 | fmt: FormatLiteral = "yaml" |
111 | 126 | elif path.suffix == ".json": |
@@ -204,3 +219,120 @@ def dump(self, fmt: FormatLiteral, indent: int = 2, **kwargs: t.Any) -> str: |
204 | 219 | indent=indent, |
205 | 220 | **kwargs, |
206 | 221 | ) |
| 222 | + |
| 223 | + |
| 224 | +class _DuplicateTrackingSafeLoader(yaml.SafeLoader): |
| 225 | + """SafeLoader that records duplicate top-level keys.""" |
| 226 | + |
| 227 | + def __init__(self, stream: str) -> None: |
| 228 | + super().__init__(stream) |
| 229 | + self.top_level_key_values: dict[t.Any, list[t.Any]] = {} |
| 230 | + self._mapping_depth = 0 |
| 231 | + |
| 232 | + |
| 233 | +def _duplicate_tracking_construct_mapping( |
| 234 | + loader: _DuplicateTrackingSafeLoader, |
| 235 | + node: yaml.nodes.MappingNode, |
| 236 | + deep: bool = False, |
| 237 | +) -> dict[t.Any, t.Any]: |
| 238 | + loader._mapping_depth += 1 |
| 239 | + loader.flatten_mapping(node) |
| 240 | + mapping: dict[t.Any, t.Any] = {} |
| 241 | + |
| 242 | + for key_node, value_node in node.value: |
| 243 | + construct = t.cast( |
| 244 | + t.Callable[[yaml.nodes.Node], t.Any], |
| 245 | + loader.construct_object, |
| 246 | + ) |
| 247 | + key = construct(key_node) |
| 248 | + value = construct(value_node) |
| 249 | + |
| 250 | + if loader._mapping_depth == 1: |
| 251 | + loader.top_level_key_values.setdefault(key, []).append(copy.deepcopy(value)) |
| 252 | + |
| 253 | + mapping[key] = value |
| 254 | + |
| 255 | + loader._mapping_depth -= 1 |
| 256 | + return mapping |
| 257 | + |
| 258 | + |
| 259 | +_DuplicateTrackingSafeLoader.add_constructor( |
| 260 | + yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, |
| 261 | + _duplicate_tracking_construct_mapping, |
| 262 | +) |
| 263 | + |
| 264 | + |
| 265 | +class DuplicateAwareConfigReader(ConfigReader): |
| 266 | + """ConfigReader that tracks duplicate top-level YAML sections.""" |
| 267 | + |
| 268 | + def __init__( |
| 269 | + self, |
| 270 | + content: RawConfigData, |
| 271 | + *, |
| 272 | + duplicate_sections: dict[str, list[t.Any]] | None = None, |
| 273 | + ) -> None: |
| 274 | + super().__init__(content) |
| 275 | + self._duplicate_sections = duplicate_sections or {} |
| 276 | + |
| 277 | + @property |
| 278 | + def duplicate_sections(self) -> dict[str, list[t.Any]]: |
| 279 | + """Mapping of top-level keys to the list of duplicated values.""" |
| 280 | + return self._duplicate_sections |
| 281 | + |
| 282 | + @classmethod |
| 283 | + def _load_yaml_with_duplicates( |
| 284 | + cls, |
| 285 | + content: str, |
| 286 | + ) -> tuple[dict[str, t.Any], dict[str, list[t.Any]]]: |
| 287 | + loader = _DuplicateTrackingSafeLoader(content) |
| 288 | + |
| 289 | + try: |
| 290 | + data = loader.get_single_data() |
| 291 | + finally: |
| 292 | + dispose = t.cast(t.Callable[[], None], loader.dispose) |
| 293 | + dispose() |
| 294 | + |
| 295 | + if data is None: |
| 296 | + loaded: dict[str, t.Any] = {} |
| 297 | + else: |
| 298 | + if not isinstance(data, dict): |
| 299 | + msg = "Loaded configuration is not a mapping" |
| 300 | + raise TypeError(msg) |
| 301 | + loaded = t.cast("dict[str, t.Any]", data) |
| 302 | + |
| 303 | + duplicate_sections = { |
| 304 | + t.cast(str, key): values |
| 305 | + for key, values in loader.top_level_key_values.items() |
| 306 | + if len(values) > 1 |
| 307 | + } |
| 308 | + |
| 309 | + return loaded, duplicate_sections |
| 310 | + |
| 311 | + @classmethod |
| 312 | + def _load_from_path( |
| 313 | + cls, |
| 314 | + path: pathlib.Path, |
| 315 | + ) -> tuple[dict[str, t.Any], dict[str, list[t.Any]]]: |
| 316 | + if path.suffix.lower() in {".yaml", ".yml"}: |
| 317 | + content = path.read_text(encoding="utf-8") |
| 318 | + return cls._load_yaml_with_duplicates(content) |
| 319 | + |
| 320 | + return ConfigReader._from_file(path), {} |
| 321 | + |
| 322 | + @classmethod |
| 323 | + def from_file(cls, path: pathlib.Path) -> DuplicateAwareConfigReader: |
| 324 | + content, duplicate_sections = cls._load_from_path(path) |
| 325 | + return cls(content, duplicate_sections=duplicate_sections) |
| 326 | + |
| 327 | + @classmethod |
| 328 | + def _from_file(cls, path: pathlib.Path) -> dict[str, t.Any]: |
| 329 | + content, _ = cls._load_from_path(path) |
| 330 | + return content |
| 331 | + |
| 332 | + @classmethod |
| 333 | + def load_with_duplicates( |
| 334 | + cls, |
| 335 | + path: pathlib.Path, |
| 336 | + ) -> tuple[dict[str, t.Any], dict[str, list[t.Any]]]: |
| 337 | + reader = cls.from_file(path) |
| 338 | + return reader.content, reader.duplicate_sections |
0 commit comments