|
19 | 19 |
|
20 | 20 | from vulnerabilities.api_v2 import PackageV2Serializer |
21 | 21 | from vulnerabilities.api_v2 import VulnerabilityListSerializer |
| 22 | +from vulnerabilities.models import AdvisoryV2 |
22 | 23 | from vulnerabilities.models import Alias |
23 | 24 | from vulnerabilities.models import ApiUser |
| 25 | +from vulnerabilities.models import CodeFixV2 |
24 | 26 | from vulnerabilities.models import Package |
| 27 | +from vulnerabilities.models import PackageV2 |
25 | 28 | from vulnerabilities.models import PipelineRun |
26 | 29 | from vulnerabilities.models import PipelineSchedule |
27 | 30 | from vulnerabilities.models import Vulnerability |
@@ -782,3 +785,123 @@ def test_schedule_update_with_staff_session_permitted(self, mock_create_new_job) |
782 | 785 | self.assertEqual(response.status_code, status.HTTP_200_OK) |
783 | 786 | self.assertNotEqual(response.status_code, status.HTTP_403_FORBIDDEN) |
784 | 787 | self.assertEqual(self.schedule1.run_interval, 2) |
| 788 | + |
| 789 | + |
| 790 | +class CodeFixV2APITest(APITestCase): |
| 791 | + def setUp(self): |
| 792 | + self.advisory = AdvisoryV2.objects.create( |
| 793 | + datasource_id="test_source", |
| 794 | + advisory_id="TEST-2025-001", |
| 795 | + avid="test_source/TEST-2025-001", |
| 796 | + unique_content_id="a" * 64, |
| 797 | + url="https://example.com/advisory", |
| 798 | + date_collected="2025-07-01T00:00:00Z", |
| 799 | + ) |
| 800 | + |
| 801 | + self.affected_package = PackageV2.objects.from_purl(purl="pkg:pypi/affected_package@1.0.0") |
| 802 | + self.fixed_package = PackageV2.objects.from_purl(purl="pkg:pypi/fixed_package@1.0.1") |
| 803 | + |
| 804 | + self.codefix = CodeFixV2.objects.create( |
| 805 | + advisory=self.advisory, |
| 806 | + affected_package=self.affected_package, |
| 807 | + fixed_package=self.fixed_package, |
| 808 | + notes="Security patch", |
| 809 | + is_reviewed=True, |
| 810 | + ) |
| 811 | + self.user = ApiUser.objects.create_api_user(username="e@mail.com") |
| 812 | + self.auth = f"Token {self.user.auth_token.key}" |
| 813 | + self.client = APIClient(enforce_csrf_checks=True) |
| 814 | + self.client.credentials(HTTP_AUTHORIZATION=self.auth) |
| 815 | + |
| 816 | + self.url = reverse("advisory-codefix-list") |
| 817 | + |
| 818 | + def test_list_all_codefixes(self): |
| 819 | + with self.assertNumQueries(10): |
| 820 | + response = self.client.get(self.url) |
| 821 | + assert response.status_code == status.HTTP_200_OK |
| 822 | + assert response.data["count"] == 1 |
| 823 | + assert response.data["results"][0]["affected_advisory_id"] == self.advisory.avid |
| 824 | + |
| 825 | + def test_filter_codefix_by_advisory_id_success(self): |
| 826 | + with self.assertNumQueries(10): |
| 827 | + response = self.client.get(self.url, {"advisory_id": self.advisory.avid}) |
| 828 | + assert response.status_code == status.HTTP_200_OK |
| 829 | + assert response.data["count"] == 1 |
| 830 | + assert response.data["results"][0]["affected_advisory_id"] == self.advisory.avid |
| 831 | + |
| 832 | + def test_filter_codefix_by_advisory_id_not_found(self): |
| 833 | + with self.assertNumQueries(6): |
| 834 | + response = self.client.get(self.url, {"advisory_id": "nonexistent/ADVISORY-ID"}) |
| 835 | + assert response.status_code == status.HTTP_200_OK |
| 836 | + assert response.data["count"] == 0 |
| 837 | + |
| 838 | + |
| 839 | +class AdvisoriesPackageV2Tests(APITestCase): |
| 840 | + def setUp(self): |
| 841 | + self.advisory = AdvisoryV2.objects.create( |
| 842 | + datasource_id="ghsa", |
| 843 | + advisory_id="GHSA-1234", |
| 844 | + avid="ghsa/GHSA-1234", |
| 845 | + unique_content_id="f" * 64, |
| 846 | + url="https://example.com/advisory", |
| 847 | + date_collected="2025-07-01T00:00:00Z", |
| 848 | + ) |
| 849 | + |
| 850 | + self.package = PackageV2.objects.from_purl(purl="pkg:pypi/sample@1.0.0") |
| 851 | + |
| 852 | + self.user = ApiUser.objects.create_api_user(username="e@mail.com") |
| 853 | + self.auth = f"Token {self.user.auth_token.key}" |
| 854 | + self.client = APIClient(enforce_csrf_checks=True) |
| 855 | + self.client.credentials(HTTP_AUTHORIZATION=self.auth) |
| 856 | + |
| 857 | + self.package.affected_by_advisories.add(self.advisory) |
| 858 | + self.package.save() |
| 859 | + |
| 860 | + def test_list_with_purl_filter(self): |
| 861 | + url = reverse("advisories-package-v2-list") |
| 862 | + with self.assertNumQueries(18): |
| 863 | + response = self.client.get(url, {"purl": "pkg:pypi/sample@1.0.0"}) |
| 864 | + assert response.status_code == 200 |
| 865 | + assert "packages" in response.data["results"] |
| 866 | + assert "advisories" in response.data["results"] |
| 867 | + assert self.advisory.avid in response.data["results"]["advisories"] |
| 868 | + |
| 869 | + def test_bulk_lookup(self): |
| 870 | + url = reverse("advisories-package-v2-bulk-lookup") |
| 871 | + with self.assertNumQueries(13): |
| 872 | + response = self.client.post(url, {"purls": ["pkg:pypi/sample@1.0.0"]}, format="json") |
| 873 | + assert response.status_code == 200 |
| 874 | + assert "packages" in response.data |
| 875 | + assert "advisories" in response.data |
| 876 | + assert self.advisory.avid in response.data["advisories"] |
| 877 | + |
| 878 | + def test_bulk_search_plain(self): |
| 879 | + url = reverse("advisories-package-v2-bulk-search") |
| 880 | + payload = {"purls": ["pkg:pypi/sample@1.0.0"], "plain_purl": True, "purl_only": False} |
| 881 | + with self.assertNumQueries(13): |
| 882 | + response = self.client.post(url, payload, format="json") |
| 883 | + assert response.status_code == 200 |
| 884 | + assert "packages" in response.data |
| 885 | + assert "advisories" in response.data |
| 886 | + |
| 887 | + def test_bulk_search_purl_only(self): |
| 888 | + url = reverse("advisories-package-v2-bulk-search") |
| 889 | + payload = {"purls": ["pkg:pypi/sample@1.0.0"], "plain_purl": False, "purl_only": True} |
| 890 | + with self.assertNumQueries(13): |
| 891 | + response = self.client.post(url, payload, format="json") |
| 892 | + assert response.status_code == 200 |
| 893 | + assert "pkg:pypi/sample@1.0.0" in response.data |
| 894 | + |
| 895 | + def test_lookup_single_package(self): |
| 896 | + url = reverse("advisories-package-v2-lookup") |
| 897 | + with self.assertNumQueries(11): |
| 898 | + response = self.client.post(url, {"purl": "pkg:pypi/sample@1.0.0"}, format="json") |
| 899 | + assert response.status_code == 200 |
| 900 | + assert any(pkg["purl"] == "pkg:pypi/sample@1.0.0" for pkg in response.data) |
| 901 | + |
| 902 | + def test_get_all_vulnerable_purls(self): |
| 903 | + url = reverse("advisories-package-v2-all") |
| 904 | + with self.assertNumQueries(6): |
| 905 | + response = self.client.get(url) |
| 906 | + assert response.status_code == 200 |
| 907 | + assert "pkg:pypi/sample@1.0.0" in response.data |
0 commit comments