Skip to content

Commit ba0bf51

Browse files
committed
Modify Elixir Security importer to support package-first mode #1933
* Update Elixir Security importer to filter and process advisories relevant to the purl passed in the constructor * Update Elixir Security v2 importer to filter and process advisories relevant to the purl passed in the constructor * Update Elixir Security importer tests to include testing package-first mode Signed-off-by: Michael Ehab Mikhail <michael.ehab@hotmail.com>
1 parent da873aa commit ba0bf51

File tree

3 files changed

+305
-5
lines changed

3 files changed

+305
-5
lines changed

vulnerabilities/importers/elixir_security.py

Lines changed: 91 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,18 @@
66
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
77
# See https://aboutcode.org for more information about nexB OSS projects.
88
#
9-
import urllib.parse as urlparse
9+
import logging
10+
import os
11+
import tempfile
1012
from pathlib import Path
1113
from typing import Set
1214

15+
import requests
1316
from dateutil import parser as dateparser
1417
from packageurl import PackageURL
1518
from univers.version_constraint import VersionConstraint
1619
from univers.version_range import HexVersionRange
20+
from univers.versions import SemverVersion
1721

1822
from vulnerabilities.importer import AdvisoryData
1923
from vulnerabilities.importer import AffectedPackage
@@ -30,7 +34,22 @@ class ElixirSecurityImporter(Importer):
3034
spdx_license_expression = "CC0-1.0"
3135
importer_name = "Elixir Security Importer"
3236

37+
def __init__(self, purl=None, *args, **kwargs):
38+
super().__init__(*args, **kwargs)
39+
self.purl = purl
40+
if self.purl:
41+
if self.purl.type != "hex":
42+
print(
43+
f"Warning: PURL type {self.purl.type} is not 'hex', may not match any advisories"
44+
)
45+
3346
def advisory_data(self) -> Set[AdvisoryData]:
47+
if not self.purl:
48+
return self._batch_advisory_data()
49+
50+
return self._package_first_advisory_data()
51+
52+
def _batch_advisory_data(self) -> Set[AdvisoryData]:
3453
try:
3554
self.clone(self.repo_url)
3655
base_path = Path(self.vcs_response.dest_dir)
@@ -41,8 +60,77 @@ def advisory_data(self) -> Set[AdvisoryData]:
4160
if self.vcs_response:
4261
self.vcs_response.delete()
4362

44-
def process_file(self, file, base_path):
45-
relative_path = str(file.relative_to(base_path)).strip("/")
63+
def _package_first_advisory_data(self) -> Set[AdvisoryData]:
64+
if self.purl.type != "hex":
65+
logging.warning(
66+
f"PURL type {self.purl.type} is not supported by Elixir Security importer"
67+
)
68+
return []
69+
70+
package_name = self.purl.name
71+
72+
try:
73+
directory_url = f"https://api.github.com/repos/dependabot/elixir-security-advisories/contents/packages/{package_name}"
74+
response = requests.get(directory_url)
75+
76+
if response.status_code != 200:
77+
logging.info(f"No advisories found for {package_name} in Elixir Security Database")
78+
return []
79+
80+
yaml_files = [file["path"] for file in response.json() if file["name"].endswith(".yml")]
81+
82+
for file_path in yaml_files:
83+
content_url = f"https://api.github.com/repos/dependabot/elixir-security-advisories/contents/{file_path}"
84+
content_response = requests.get(
85+
content_url, headers={"Accept": "application/vnd.github.v3.raw"}
86+
)
87+
88+
if content_response.status_code != 200:
89+
logging.warning(f"Failed to fetch file content for {file_path}")
90+
continue
91+
92+
# Create a temporary file to store the content
93+
with tempfile.NamedTemporaryFile(mode="w+", delete=False) as temp_file:
94+
temp_file.write(content_response.text)
95+
temp_path = temp_file.name
96+
97+
try:
98+
for advisory in self.process_file(temp_path, Path(""), file_path=file_path):
99+
if self.purl.version and not self._advisory_affects_version(advisory):
100+
continue
101+
102+
yield advisory
103+
finally:
104+
if os.path.exists(temp_path):
105+
os.remove(temp_path)
106+
107+
except Exception as e:
108+
logging.error(f"Error fetching advisories for {self.purl}: {str(e)}")
109+
return []
110+
111+
def _advisory_affects_version(self, advisory: AdvisoryData) -> bool:
112+
if not self.purl.version:
113+
return True
114+
115+
for affected_package in advisory.affected_packages:
116+
if affected_package.affected_version_range:
117+
try:
118+
purl_version = SemverVersion(self.purl.version)
119+
120+
if purl_version in affected_package.affected_version_range:
121+
return True
122+
except Exception as e:
123+
logging.warning(f"Failed to parse version {self.purl.version}: {str(e)}")
124+
return True
125+
126+
return False
127+
128+
def process_file(self, file, base_path, file_path=None):
129+
if file_path:
130+
relative_path = file_path
131+
else:
132+
relative_path = str(Path(file).relative_to(base_path)).strip("/")
133+
46134
advisory_url = (
47135
f"https://github.com/dependabot/elixir-security-advisories/blob/master/{relative_path}"
48136
)

vulnerabilities/pipelines/v2_importers/elixir_security_importer.py

Lines changed: 109 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,18 @@
77
# See https://aboutcode.org for more information about nexB OSS projects.
88
#
99

10+
import os
11+
import tempfile
1012
from pathlib import Path
1113
from typing import Iterable
1214

15+
import requests
1316
from dateutil import parser as dateparser
1417
from fetchcode.vcs import fetch_via_vcs
1518
from packageurl import PackageURL
1619
from univers.version_constraint import VersionConstraint
1720
from univers.version_range import HexVersionRange
21+
from univers.versions import SemverVersion
1822

1923
from vulnerabilities.importer import AdvisoryData
2024
from vulnerabilities.importer import AffectedPackage
@@ -37,6 +41,15 @@ class ElixirSecurityImporterPipeline(VulnerableCodeBaseImporterPipelineV2):
3741
repo_url = "git+https://github.com/dependabot/elixir-security-advisories"
3842
unfurl_version_ranges = True
3943

44+
def __init__(self, *args, **kwargs):
45+
super().__init__(*args, **kwargs)
46+
self.purl = kwargs.get("purl")
47+
if self.purl:
48+
if self.purl.type != "hex":
49+
self.log(
50+
f"Warning: PURL type {self.purl.type} is not 'hex', may not match any advisories"
51+
)
52+
4053
@classmethod
4154
def steps(cls):
4255
return (cls.collect_and_store_advisories,)
@@ -46,11 +59,36 @@ def clone(self):
4659
self.vcs_response = fetch_via_vcs(self.repo_url)
4760

4861
def advisories_count(self) -> int:
62+
if self.purl:
63+
return self._count_package_advisories()
64+
4965
base_path = Path(self.vcs_response.dest_dir)
5066
count = len(list((base_path / "packages").glob("**/*.yml")))
5167
return count
5268

69+
def _count_package_advisories(self) -> int:
70+
if self.purl.type != "hex":
71+
return 0
72+
73+
try:
74+
directory_url = f"https://api.github.com/repos/dependabot/elixir-security-advisories/contents/packages/{self.purl.name}"
75+
response = requests.get(directory_url)
76+
77+
if response.status_code != 200:
78+
return 0
79+
80+
yaml_files = [file for file in response.json() if file["name"].endswith(".yml")]
81+
return len(yaml_files)
82+
except Exception:
83+
return 0
84+
5385
def collect_advisories(self) -> Iterable[AdvisoryData]:
86+
if self.purl:
87+
return self._collect_package_advisories()
88+
89+
return self._collect_batch_advisories()
90+
91+
def _collect_batch_advisories(self) -> Iterable[AdvisoryData]:
5492
try:
5593
base_path = Path(self.vcs_response.dest_dir)
5694
vuln = base_path / "packages"
@@ -60,8 +98,77 @@ def collect_advisories(self) -> Iterable[AdvisoryData]:
6098
if self.vcs_response:
6199
self.vcs_response.delete()
62100

63-
def process_file(self, file, base_path) -> Iterable[AdvisoryData]:
64-
relative_path = str(file.relative_to(base_path)).strip("/")
101+
def _collect_package_advisories(self) -> Iterable[AdvisoryData]:
102+
if self.purl.type != "hex":
103+
self.log(f"PURL type {self.purl.type} is not supported by Elixir Security importer")
104+
return []
105+
106+
package_name = self.purl.name
107+
108+
try:
109+
directory_url = f"https://api.github.com/repos/dependabot/elixir-security-advisories/contents/packages/{package_name}"
110+
response = requests.get(directory_url)
111+
112+
if response.status_code != 200:
113+
self.log(f"No advisories found for {package_name} in Elixir Security Database")
114+
return []
115+
116+
yaml_files = [file["path"] for file in response.json() if file["name"].endswith(".yml")]
117+
118+
for file_path in yaml_files:
119+
content_url = f"https://api.github.com/repos/dependabot/elixir-security-advisories/contents/{file_path}"
120+
content_response = requests.get(
121+
content_url, headers={"Accept": "application/vnd.github.v3.raw"}
122+
)
123+
124+
if content_response.status_code != 200:
125+
self.log(f"Failed to fetch file content for {file_path}")
126+
continue
127+
128+
# Create a temporary file to store the content
129+
with tempfile.NamedTemporaryFile(mode="w+", delete=False) as temp_file:
130+
temp_file.write(content_response.text)
131+
temp_path = temp_file.name
132+
133+
try:
134+
for advisory in self.process_file(
135+
Path(temp_path), Path(""), file_path=file_path
136+
):
137+
if self.purl.version and not self._advisory_affects_version(advisory):
138+
continue
139+
140+
yield advisory
141+
finally:
142+
if os.path.exists(temp_path):
143+
os.remove(temp_path)
144+
145+
except Exception as e:
146+
self.log(f"Error fetching advisories for {self.purl}: {str(e)}")
147+
return []
148+
149+
def _advisory_affects_version(self, advisory: AdvisoryData) -> bool:
150+
if not self.purl.version:
151+
return True
152+
153+
for affected_package in advisory.affected_packages:
154+
if affected_package.affected_version_range:
155+
try:
156+
purl_version = SemverVersion(self.purl.version)
157+
158+
if purl_version in affected_package.affected_version_range:
159+
return True
160+
except Exception as e:
161+
self.log(f"Failed to parse version {self.purl.version}: {str(e)}")
162+
return True
163+
164+
return False
165+
166+
def process_file(self, file, base_path, file_path=None) -> Iterable[AdvisoryData]:
167+
if file_path:
168+
relative_path = file_path
169+
else:
170+
relative_path = str(file.relative_to(base_path)).strip("/")
171+
65172
advisory_url = (
66173
f"https://github.com/dependabot/elixir-security-advisories/blob/master/{relative_path}"
67174
)

vulnerabilities/tests/test_elixir_security.py

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,11 @@
1010
import json
1111
import os
1212
from pathlib import Path
13+
from unittest.mock import Mock
1314
from unittest.mock import patch
1415

16+
from packageurl import PackageURL
17+
1518
from vulnerabilities.importer import AdvisoryData
1619
from vulnerabilities.importers.elixir_security import ElixirSecurityImporter
1720
from vulnerabilities.improvers.default import DefaultImprover
@@ -59,3 +62,105 @@ def test_elixir_improver(mock_response):
5962
result.extend(inference)
6063
expected_file = os.path.join(TEST_DIR, f"elixir-improver-expected.json")
6164
util_tests.check_results_against_json(result, expected_file)
65+
66+
67+
@patch("requests.get")
68+
def test_elixir_package_first_mode_success(mock_get):
69+
directory_response = Mock()
70+
directory_response.status_code = 200
71+
directory_response.json.return_value = [
72+
{"name": "test_file.yml", "path": "packages/coherence/test_file.yml"}
73+
]
74+
75+
test_file_path = os.path.join(TEST_DIR, "test_file.yml")
76+
with open(test_file_path, "r") as f:
77+
test_content = f.read()
78+
79+
content_response = Mock()
80+
content_response.status_code = 200
81+
content_response.text = test_content
82+
83+
mock_get.side_effect = [directory_response, content_response]
84+
85+
purl = PackageURL(type="hex", name="coherence")
86+
importer = ElixirSecurityImporter(purl=purl)
87+
88+
advisories = list(importer.advisory_data())
89+
90+
assert len(advisories) == 1
91+
advisory = advisories[0]
92+
assert "CVE-2018-20301" in advisory.aliases
93+
assert advisory.summary == 'The Coherence library has "Mass Assignment"-like vulnerabilities.'
94+
assert len(advisory.affected_packages) == 1
95+
assert advisory.affected_packages[0].package.name == "coherence"
96+
97+
98+
@patch("requests.get")
99+
def test_elixir_package_first_mode_with_version_filter(mock_get):
100+
directory_response = Mock()
101+
directory_response.status_code = 200
102+
directory_response.json.return_value = [
103+
{"name": "test_file.yml", "path": "packages/coherence/test_file.yml"}
104+
]
105+
106+
test_file_path = os.path.join(TEST_DIR, "test_file.yml")
107+
with open(test_file_path, "r") as f:
108+
test_content = f.read()
109+
110+
content_response = Mock()
111+
content_response.status_code = 200
112+
content_response.text = test_content
113+
114+
mock_get.side_effect = [directory_response, content_response]
115+
116+
purl = PackageURL(type="hex", name="coherence", version="0.5.1")
117+
importer = ElixirSecurityImporter(purl=purl)
118+
advisories = list(importer.advisory_data())
119+
assert len(advisories) == 1
120+
121+
mock_get.side_effect = [directory_response, content_response]
122+
purl = PackageURL(type="hex", name="coherence", version="0.5.2")
123+
importer = ElixirSecurityImporter(purl=purl)
124+
advisories = list(importer.advisory_data())
125+
assert len(advisories) == 0
126+
127+
128+
@patch("requests.get")
129+
def test_elixir_package_first_mode_no_advisories(mock_get):
130+
mock_response = Mock()
131+
mock_response.status_code = 404
132+
mock_get.return_value = mock_response
133+
134+
purl = PackageURL(type="hex", name="nonexistent-package")
135+
importer = ElixirSecurityImporter(purl=purl)
136+
137+
advisories = list(importer.advisory_data())
138+
assert len(advisories) == 0
139+
140+
141+
@patch("requests.get")
142+
def test_elixir_package_first_mode_api_error(mock_get):
143+
directory_response = Mock()
144+
directory_response.status_code = 200
145+
directory_response.json.return_value = [
146+
{"name": "test_file.yml", "path": "packages/coherence/test_file.yml"}
147+
]
148+
149+
content_response = Mock()
150+
content_response.status_code = 500
151+
152+
mock_get.side_effect = [directory_response, content_response]
153+
154+
purl = PackageURL(type="hex", name="coherence")
155+
importer = ElixirSecurityImporter(purl=purl)
156+
157+
advisories = list(importer.advisory_data())
158+
assert len(advisories) == 0
159+
160+
161+
def test_elixir_package_first_mode_non_hex_purl():
162+
purl = PackageURL(type="npm", name="some-package")
163+
importer = ElixirSecurityImporter(purl=purl)
164+
165+
advisories = list(importer.advisory_data())
166+
assert len(advisories) == 0

0 commit comments

Comments
 (0)