1010import logging
1111from datetime import datetime
1212from datetime import timedelta
13+ from unittest .mock import patch
1314
1415import pytest
16+ from packageurl import PackageURL
17+ from univers .version_range import VersionRange
1518
1619from vulnerabilities .importer import AdvisoryData
20+ from vulnerabilities .importer import AffectedPackageV2
1721from vulnerabilities .models import AdvisoryV2
22+ from vulnerabilities .models import ImpactedPackage
23+ from vulnerabilities .models import PackageV2
1824from vulnerabilities .pipelines import VulnerableCodeBaseImporterPipelineV2
1925
2026
@@ -40,7 +46,18 @@ def dummy_advisory():
4046 references_v2 = [],
4147 severities = [],
4248 weaknesses = [],
43- affected_packages = [],
49+ affected_packages = [
50+ AffectedPackageV2 (
51+ package = PackageURL .from_string ("pkg:npm/foobar" ),
52+ affected_version_range = VersionRange .from_string ("vers:npm/<=1.2.3" ),
53+ fixed_version_range = VersionRange .from_string ("vers:npm/1.2.4" ),
54+ ),
55+ AffectedPackageV2 (
56+ package = PackageURL .from_string ("pkg:npm/foobar" ),
57+ affected_version_range = VersionRange .from_string ("vers:npm/<=3.2.3" ),
58+ fixed_version_range = VersionRange .from_string ("vers:npm/3.2.4" ),
59+ ),
60+ ],
4461 advisory_id = "ADV-123" ,
4562 date_published = datetime .now () - timedelta (days = 10 ),
4663 url = "https://example.com/advisory/1" ,
@@ -60,3 +77,19 @@ def test_collect_and_store_advisories(dummy_importer):
6077 assert len (dummy_importer .log_messages ) >= 2
6178 assert "Successfully collected" in dummy_importer .log_messages [- 1 ][1 ]
6279 assert AdvisoryV2 .objects .count () == 1
80+
81+
82+ @pytest .mark .django_db
83+ @patch ("vulnerabilities.pipes.advisory.get_exact_purls_v2" , side_effect = Exception ("error" ))
84+ def test_advisory_import_atomicity_no_partial_adv_import (mock_exception , dummy_importer ):
85+ dummy_importer .collect_and_store_advisories ()
86+ assert AdvisoryV2 .objects .count () == 0
87+ assert ImpactedPackage .objects .count () == 0
88+
89+
90+ @pytest .mark .django_db
91+ def test_advisory_import_atomicity (dummy_importer ):
92+ dummy_importer .collect_and_store_advisories ()
93+ assert AdvisoryV2 .objects .count () == 1
94+ assert ImpactedPackage .objects .count () == 2
95+ assert PackageV2 .objects .count () == 4
0 commit comments