|
10 | 10 | import os |
11 | 11 | import urllib.parse |
12 | 12 |
|
| 13 | +from dataclasses import dataclass |
13 | 14 | from io import StringIO |
14 | 15 | from typing import List |
15 | 16 |
|
@@ -38,6 +39,12 @@ def __init__(self): |
38 | 39 | self.cwes = "null" |
39 | 40 |
|
40 | 41 |
|
| 42 | +@dataclass |
| 43 | +class CvssRating: |
| 44 | + severity: str = "null" |
| 45 | + cvss_score: str = "null" |
| 46 | + |
| 47 | + |
41 | 48 | def vulns_to_obj(inspector_scan_json) -> List[Vulnerability]: |
42 | 49 | """ |
43 | 50 | this function parses JSON from Inspector's ScanSbom API |
@@ -70,18 +77,7 @@ def vulns_to_obj(inspector_scan_json) -> List[Vulnerability]: |
70 | 77 |
|
71 | 78 | # get vuln severity and EPSS score |
72 | 79 | ratings = v.get("ratings") |
73 | | - if ratings: |
74 | | - nvd_severity = get_nvd_severity(ratings) |
75 | | - if nvd_severity: |
76 | | - vuln_obj.severity = nvd_severity |
77 | | - |
78 | | - nvd_score = get_nvd_score(ratings) |
79 | | - if nvd_score: |
80 | | - vuln_obj.cvss_score = nvd_score |
81 | | - |
82 | | - epss_score = get_epss_score(ratings) |
83 | | - if epss_score: |
84 | | - vuln_obj.epss_score = epss_score |
| 80 | + add_ratings(ratings, vuln_obj) |
85 | 81 |
|
86 | 82 | # get vulnerability published date |
87 | 83 | published = v.get("created") |
@@ -207,34 +203,37 @@ def getPropertyValueFromKey(vuln_json, key): |
207 | 203 | return None |
208 | 204 |
|
209 | 205 |
|
210 | | -def get_nvd_severity(ratings): |
211 | | - for rating in ratings: |
212 | | - source = rating.get("source") |
213 | | - if not source: |
214 | | - continue |
| 206 | +def add_ratings(ratings, vulnerability): |
| 207 | + if ratings is None: |
| 208 | + return |
215 | 209 |
|
216 | | - if source["name"] == "NVD": |
217 | | - severity = rating["severity"] |
218 | | - if severity: |
219 | | - return severity |
220 | | - return None |
| 210 | + rating = get_cvss_rating(ratings, vulnerability) |
| 211 | + vulnerability.severity = rating.severity |
| 212 | + vulnerability.cvss_score = rating.cvss_score |
221 | 213 |
|
| 214 | + epss_score = get_epss_score(ratings) |
| 215 | + if epss_score: |
| 216 | + vulnerability.epss_score = epss_score |
222 | 217 |
|
223 | | -def get_nvd_score(ratings): |
224 | | - for rating in ratings: |
225 | | - source = rating.get("source") |
226 | | - if not source: |
227 | | - continue |
228 | 218 |
|
229 | | - method = rating.get("method") |
230 | | - if not method: |
231 | | - continue |
| 219 | +def get_cvss_rating(ratings, vulnerability) -> CvssRating: |
| 220 | + rating_provider_priority = [ |
| 221 | + 'NVD', |
| 222 | + 'MITRE', |
| 223 | + 'AMAZON_INSPECTOR' |
| 224 | + ] |
| 225 | + for provider in rating_provider_priority: |
| 226 | + for rating in ratings: |
| 227 | + if rating['source']['name'] != provider: |
| 228 | + continue |
232 | 229 |
|
233 | | - if source["name"] == "NVD" and method == "CVSSv31": |
234 | | - score = rating["score"] |
235 | | - if score: |
236 | | - return score |
237 | | - return None |
| 230 | + severity = rating["severity"] |
| 231 | + cvss_score = rating["score"] if rating['method'] == 'CVSSv31' else "null" |
| 232 | + if severity and cvss_score: |
| 233 | + return CvssRating(severity=severity, cvss_score=cvss_score) |
| 234 | + |
| 235 | + logging.info(f"No CVSS rating is provided for {vulnerability.vuln_id}") |
| 236 | + return CvssRating() |
238 | 237 |
|
239 | 238 |
|
240 | 239 | def get_epss_score(ratings): |
|
0 commit comments