|
6 | 6 | import json |
7 | 7 | import logging |
8 | 8 | import typing |
| 9 | +from collections import defaultdict |
9 | 10 | from pathlib import Path |
10 | 11 | from tempfile import TemporaryDirectory |
11 | 12 |
|
@@ -385,35 +386,36 @@ def _verify_attestation(args: argparse.Namespace) -> None: |
385 | 386 | """Verify the files passed as argument.""" |
386 | 387 | pol = policy.Identity(identity=args.identity) |
387 | 388 |
|
388 | | - # Validate that both the attestations and files exist |
| 389 | + # Validate that the files exist |
389 | 390 | _validate_files(args.files, should_exist=True) |
390 | | - _validate_files( |
391 | | - (Path(f"{file_path}.publish.attestation") for file_path in args.files), |
392 | | - should_exist=True, |
393 | | - ) |
394 | | - |
395 | | - inputs: list[Path] = [] |
396 | | - for file_path in args.files: |
397 | | - inputs.append(file_path) |
398 | 391 |
|
399 | | - for input in inputs: |
400 | | - attestation_path = Path(f"{input}.publish.attestation") |
401 | | - try: |
402 | | - attestation = Attestation.model_validate_json(attestation_path.read_text()) |
403 | | - except ValidationError as validation_error: |
404 | | - _die(f"Invalid attestation ({attestation_path}): {validation_error}") |
| 392 | + # artifact -> [attestation1, attestation2, ...] |
| 393 | + files_with_attestations: dict[Path, list[Path]] = defaultdict(list) |
| 394 | + for f in args.files: |
| 395 | + for attestation_file in (Path(f"{f}.publish.attestation"), Path(f"{f}.slsa.attestation")): |
| 396 | + if attestation_file.exists(): |
| 397 | + files_with_attestations[f].append(attestation_file) |
| 398 | + if not files_with_attestations[f]: |
| 399 | + _die(f"Couldn't find attestations for file {f}") |
| 400 | + |
| 401 | + for file_path, attestations in files_with_attestations.items(): |
| 402 | + for attestation_path in attestations: |
| 403 | + try: |
| 404 | + attestation = Attestation.model_validate_json(attestation_path.read_text()) |
| 405 | + except ValidationError as validation_error: |
| 406 | + _die(f"Invalid attestation ({attestation_path}): {validation_error}") |
405 | 407 |
|
406 | | - try: |
407 | | - dist = Distribution.from_file(input) |
408 | | - except ValidationError as e: |
409 | | - _die(f"Invalid Python package distribution: {e}") |
| 408 | + try: |
| 409 | + dist = Distribution.from_file(file_path) |
| 410 | + except ValidationError as e: |
| 411 | + _die(f"Invalid Python package distribution: {e}") |
410 | 412 |
|
411 | | - try: |
412 | | - attestation.verify(pol, dist, staging=args.staging) |
413 | | - except VerificationError as verification_error: |
414 | | - _die(f"Verification failed for {input}: {verification_error}") |
| 413 | + try: |
| 414 | + attestation.verify(pol, dist, staging=args.staging) |
| 415 | + except VerificationError as verification_error: |
| 416 | + _die(f"Verification failed for {file_path}: {verification_error}") |
415 | 417 |
|
416 | | - _logger.info(f"OK: {attestation_path}") |
| 418 | + _logger.info(f"OK: {attestation_path}") |
417 | 419 |
|
418 | 420 |
|
419 | 421 | def _verify_pypi(args: argparse.Namespace) -> None: |
|
0 commit comments