Skip to content

Commit b9f4abf

Browse files
authored
Refactor pkg_vuln module (#75)
Co-authored-by: Kenji Sugimura <kensugim@amazon.co.jp>
1 parent 6fbe2e0 commit b9f4abf

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+4822
-694
lines changed

entrypoint/entrypoint/exporter.py

Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
import csv
2+
import logging
3+
import os
4+
from dataclasses import dataclass
5+
from io import StringIO
6+
7+
from entrypoint import pkg_vuln
8+
9+
10+
@dataclass
11+
class InspectorScanResult:
12+
vulnerabilities: list[pkg_vuln.Vulnerability]
13+
artifact_name: str = pkg_vuln.NULL_STR
14+
artifact_type: str = pkg_vuln.NULL_STR
15+
artifact_hash: str = pkg_vuln.NULL_STR
16+
build_id: str = pkg_vuln.NULL_STR
17+
criticals: str = pkg_vuln.NULL_STR
18+
highs: str = pkg_vuln.NULL_STR
19+
mediums: str = pkg_vuln.NULL_STR
20+
lows: str = pkg_vuln.NULL_STR
21+
others: str = pkg_vuln.NULL_STR
22+
23+
def total_vulns(self) -> int:
24+
total_vulns = int(self.criticals) + int(self.highs) + int(self.mediums) + int(self.lows) + int(self.others)
25+
return total_vulns
26+
27+
28+
def to_csv(scan_result: InspectorScanResult):
29+
csv_buffer = StringIO()
30+
csv_writer = csv.writer(csv_buffer, quoting=csv.QUOTE_ALL)
31+
32+
# insert hash rows; these are like properties for CSV
33+
artifact_info = [
34+
f"#artifact_name:{scan_result.artifact_name}",
35+
f"artifact_type:{scan_result.artifact_type}",
36+
f"artifact_hash:{scan_result.artifact_hash}",
37+
f"build_id:{scan_result.build_id}",
38+
]
39+
csv_writer.writerow(artifact_info)
40+
41+
vuln_summary = [
42+
f"#critical_vulnerabilities:{scan_result.criticals}",
43+
f"high_vulnerabilities:{scan_result.highs}",
44+
f"medium_vulnerabilities:{scan_result.mediums}",
45+
f"low_vulnerabilities:{scan_result.lows}",
46+
f"other_vulnerabilities:{scan_result.others}",
47+
]
48+
csv_writer.writerow(vuln_summary)
49+
50+
# write the header into the CSV
51+
header = [
52+
"ID",
53+
"Severity",
54+
"Source",
55+
"CVSS",
56+
"Installed Package",
57+
"Fixed Package",
58+
"Path",
59+
"EPSS",
60+
"Exploit Available",
61+
"Exploit Last Seen",
62+
"CWEs",
63+
]
64+
csv_writer.writerow(header)
65+
66+
# write each vuln into a CSV
67+
if scan_result.vulnerabilities:
68+
for v in scan_result.vulnerabilities:
69+
# if package vuln
70+
row = [
71+
v.vuln_id,
72+
v.severity,
73+
v.severity_provider,
74+
v.cvss_score,
75+
v.installed_ver,
76+
v.fixed_ver,
77+
v.pkg_path,
78+
v.epss_score,
79+
v.exploit_available,
80+
v.exploit_last_seen,
81+
v.cwes,
82+
]
83+
csv_writer.writerow(row)
84+
85+
csv_str = csv_buffer.getvalue()
86+
csv_buffer.close()
87+
88+
return csv_str
89+
90+
91+
def to_markdown(scan_result: InspectorScanResult):
92+
markdown = create_header_info(scan_result)
93+
markdown += create_summary_table(scan_result)
94+
95+
if not scan_result.vulnerabilities:
96+
markdown += (
97+
":green_circle: Your artifact was scanned with Amazon Inspector and no vulnerabilities were detected."
98+
)
99+
else:
100+
markdown += create_vulnerability_details_table(scan_result.vulnerabilities)
101+
102+
markdown += "\n\n"
103+
return markdown
104+
105+
106+
def create_header_info(scan_result: InspectorScanResult):
107+
markdown = "# Amazon Inspector Scan Results\n"
108+
109+
if not scan_result.artifact_name == "./":
110+
markdown += f"Artifact Name: {scan_result.artifact_name}\n\n"
111+
112+
artifact_type = "repository" if scan_result.artifact_type == "directory" else scan_result.artifact_type
113+
markdown += f"Artifact Type: {artifact_type}\n\n"
114+
115+
if scan_result.artifact_hash != pkg_vuln.NULL_STR:
116+
markdown += f"Artifact Hash: {scan_result.artifact_hash}\n\n"
117+
if scan_result.build_id != pkg_vuln.NULL_STR:
118+
markdown += f"Build ID: {scan_result.build_id}\n\n"
119+
return markdown
120+
121+
122+
def create_summary_table(scan_result: InspectorScanResult):
123+
markdown = "## Vulnerability Counts by Severity\n\n"
124+
markdown += "| Severity | Count |\n"
125+
markdown += "|----------|-------|\n"
126+
markdown += f"| Critical | {scan_result.criticals}|\n"
127+
markdown += f"| High | {scan_result.highs}|\n"
128+
markdown += f"| Medium | {scan_result.mediums}|\n"
129+
markdown += f"| Low | {scan_result.lows}|\n"
130+
markdown += f"| Other | {scan_result.others}|\n"
131+
markdown += "\n\n"
132+
return markdown
133+
134+
135+
def create_vulnerability_details_table(vulns: list[pkg_vuln.Vulnerability]):
136+
markdown = "## Vulnerability Findings\n\n"
137+
138+
rows = []
139+
details_table_header_columns = [
140+
"ID",
141+
"Severity",
142+
"Source",
143+
"[CVSS](https://www.first.org/cvss/)",
144+
"Installed Package ([PURL](https://github.com/package-url/purl-spec/tree/master?tab=readme-ov-file#purl))",
145+
"Fixed Package",
146+
"Path",
147+
"[EPSS](https://www.first.org/epss/)",
148+
"Exploit Available",
149+
"Exploit Last Seen",
150+
"CWEs",
151+
]
152+
details_table_header = [
153+
generate_markdown_row(*details_table_header_columns),
154+
generate_markdown_row(*["-" * 7 for i in range(len(details_table_header_columns))]),
155+
]
156+
rows.extend(details_table_header)
157+
158+
vulns = sort_vulns_by_cvss_score(vulns)
159+
for v in vulns:
160+
rows.append(
161+
generate_markdown_row(
162+
v.vuln_id,
163+
clean_null(v.severity),
164+
clean_null(v.severity_provider),
165+
clean_null(v.cvss_score),
166+
merge_cell(v.installed_ver),
167+
merge_cell(v.fixed_ver),
168+
merge_cell(clean_null(v.pkg_path)),
169+
clean_null(v.epss_score),
170+
clean_null(v.exploit_available),
171+
clean_null(v.exploit_last_seen),
172+
merge_cell(clean_null(v.cwes)),
173+
)
174+
)
175+
markdown += "\n".join(rows)
176+
return markdown
177+
178+
179+
def generate_markdown_row(*cells):
180+
row_text = "| " + " | ".join(cells) + " |"
181+
return row_text
182+
183+
184+
def clean_null(a_string: str):
185+
if a_string == pkg_vuln.NULL_STR:
186+
return ""
187+
else:
188+
return a_string
189+
190+
191+
def merge_cell(a_string: str):
192+
"""
193+
This function expects a string of data
194+
that is intended to be placed in a markdown
195+
table. The provided data may include multiple
196+
elements in a string, such as multiple PURLs
197+
separated with a semi-colon character ';'.
198+
This function splits the provided string
199+
so that multiple elements can fit in one
200+
cell in a markdown table.
201+
"""
202+
203+
# return early on empty string
204+
if a_string == "":
205+
return ""
206+
207+
# we may have multiple PURLs for a single CVE,
208+
# so split purls into a list we can iterate on
209+
original_list = a_string.split(";")
210+
seen = set()
211+
unique_list = []
212+
213+
# make each PURL list unique while preserving ordering
214+
# so that our pkg versions line up with the correct pkg names
215+
for item in original_list:
216+
if item not in seen:
217+
seen.add(item)
218+
unique_list.append(item)
219+
220+
unique_formatted = []
221+
for each in unique_list:
222+
# make each element preformatted text,
223+
# otherwise the markdown report renders
224+
# with malformed characters on GitHub
225+
each = f"`{each}`"
226+
unique_formatted.append(each)
227+
228+
# separate multiple elements in cell with HTML break characters
229+
merged_cell = "<br><br>".join(unique_formatted)
230+
return merged_cell
231+
232+
233+
def sort_vulns_by_cvss_score(vulns):
234+
for each in vulns:
235+
if each.cvss_score == pkg_vuln.NULL_STR:
236+
each.cvss_score = 0
237+
238+
sorted_vulns = sorted(vulns, key=lambda obj: float(obj.cvss_score), reverse=True)
239+
240+
for each in sorted_vulns:
241+
if each.cvss_score == 0:
242+
each.cvss_score = pkg_vuln.NULL_STR
243+
return sorted_vulns
244+
245+
246+
def post_github_step_summary(markdown):
247+
step_summary_path = "/tmp/inspector.md"
248+
if os.getenv("GITHUB_ACTIONS"):
249+
step_summary_path = os.environ["GITHUB_STEP_SUMMARY"]
250+
251+
try:
252+
with open(step_summary_path, "a") as f:
253+
f.write(markdown)
254+
except Exception as e:
255+
logging.error(e)
256+
return

0 commit comments

Comments
 (0)