1313from typing import Iterable
1414from typing import List
1515from typing import Tuple
16+ from urllib .parse import urljoin
1617
1718import pytz
1819import saneyaml
3132from vulnerabilities .utils import build_description
3233from vulnerabilities .utils import get_advisory_url
3334from vulnerabilities .utils import get_cwe_id
35+ from vulntotal .datasources .gitlab import get_casesensitive_slug
36+ from vulntotal .datasources .gitlab_api import fetch_gitlab_advisories_for_purl
37+ from vulntotal .datasources .gitlab_api import get_estimated_advisories_count
3438
3539
3640class GitLabImporterPipeline (VulnerableCodeBaseImporterPipeline ):
@@ -42,9 +46,16 @@ class GitLabImporterPipeline(VulnerableCodeBaseImporterPipeline):
4246 license_url = "https://gitlab.com/gitlab-org/advisories-community/-/blob/main/LICENSE"
4347 importer_name = "GitLab Importer"
4448 repo_url = "git+https://gitlab.com/gitlab-org/advisories-community/"
49+ is_batch_run = True
4550
4651 @classmethod
4752 def steps (cls ):
53+ if not cls .is_batch_run :
54+ return (
55+ cls .collect_and_store_advisories ,
56+ cls .import_new_advisories ,
57+ )
58+
4859 return (
4960 cls .clone ,
5061 cls .collect_and_store_advisories ,
@@ -66,15 +77,57 @@ def steps(cls):
6677
6778 gitlab_scheme_by_purl_type = {v : k for k , v in purl_type_by_gitlab_scheme .items ()}
6879
80+ def __init__ (self , * args , purl = None , ** kwargs ):
81+ super ().__init__ (* args , ** kwargs )
82+ self .purl = purl
83+ # If a purl is provided, we are running in package-first mode
84+ if self .purl :
85+ GitLabImporterPipeline .is_batch_run = False
86+
6987 def clone (self ):
7088 self .log (f"Cloning `{ self .repo_url } `" )
7189 self .vcs_response = fetch_via_vcs (self .repo_url )
7290
7391 def advisories_count (self ):
74- root = Path (self .vcs_response .dest_dir )
75- return sum (1 for _ in root .rglob ("*.yml" ))
92+ if GitLabImporterPipeline .is_batch_run :
93+ root = Path (self .vcs_response .dest_dir )
94+ return sum (1 for _ in root .rglob ("*.yml" ))
95+ else :
96+ return get_estimated_advisories_count (
97+ self .purl , self .purl_type_by_gitlab_scheme , get_casesensitive_slug
98+ )
7699
77100 def collect_advisories (self ) -> Iterable [AdvisoryData ]:
101+ if not self .is_batch_run :
102+ advisories = fetch_gitlab_advisories_for_purl (
103+ self .purl , self .purl_type_by_gitlab_scheme , get_casesensitive_slug
104+ )
105+
106+ input_version = self .purl .version
107+ vrc = RANGE_CLASS_BY_SCHEMES [self .purl .type ]
108+ version_obj = vrc .version_class (input_version ) if input_version else None
109+
110+ for advisory in advisories :
111+ advisory_data = self ._advisory_dict_to_advisory_data (advisory )
112+ # If purl has version, we need to check if advisory affects the version
113+ if input_version :
114+ affected = False
115+ for affected_package in advisory_data .affected_packages :
116+ vrange = affected_package .affected_version_range
117+ fixed_version = affected_package .fixed_version
118+ if vrange and version_obj in vrange :
119+ if fixed_version :
120+ fixed_version_obj = vrc .version_class (str (fixed_version ))
121+ if version_obj >= fixed_version_obj :
122+ continue
123+ affected = True
124+ break
125+ if affected :
126+ yield advisory_data
127+ else :
128+ yield advisory_data
129+ return
130+
78131 base_path = Path (self .vcs_response .dest_dir )
79132
80133 for file_path in base_path .rglob ("*.yml" ):
@@ -109,6 +162,135 @@ def clean_downloads(self):
109162 def on_failure (self ):
110163 self .clean_downloads ()
111164
165+ def _advisory_dict_to_advisory_data (self , advisory ):
166+ return advisory_dict_to_advisory_data (
167+ advisory = advisory ,
168+ purl_type_by_gitlab_scheme = self .purl_type_by_gitlab_scheme ,
169+ gitlab_scheme_by_purl_type = self .gitlab_scheme_by_purl_type ,
170+ logger = self .log ,
171+ purl = self .purl ,
172+ )
173+
174+
175+ def advisory_dict_to_advisory_data (
176+ advisory : dict ,
177+ purl_type_by_gitlab_scheme ,
178+ gitlab_scheme_by_purl_type ,
179+ logger ,
180+ purl = None ,
181+ advisory_url = None ,
182+ ):
183+ """
184+ Convert a GitLab advisory dict to AdvisoryData.
185+ """
186+ aliases = advisory .get ("identifiers" , [])
187+ identifier = advisory .get ("identifier" , "" )
188+ summary = build_description (advisory .get ("title" ), advisory .get ("description" ))
189+ urls = advisory .get ("urls" , [])
190+ references = [Reference .from_url (u ) for u in urls ]
191+
192+ cwe_ids = advisory .get ("cwe_ids" ) or []
193+ cwe_list = list (map (get_cwe_id , cwe_ids ))
194+
195+ date_published = dateparser .parse (advisory .get ("pubdate" ))
196+ date_published = date_published .replace (tzinfo = pytz .UTC )
197+
198+ package_slug = advisory .get ("package_slug" )
199+
200+ # Determine purl if not provided
201+ if not purl :
202+ purl = get_purl (
203+ package_slug = package_slug ,
204+ purl_type_by_gitlab_scheme = purl_type_by_gitlab_scheme ,
205+ logger = logger ,
206+ )
207+
208+ if not purl :
209+ logger (
210+ f"advisory_dict_to_advisory_data: purl is not valid: { package_slug !r} " ,
211+ level = logging .ERROR ,
212+ )
213+ return AdvisoryData (
214+ aliases = aliases ,
215+ summary = summary ,
216+ references = references ,
217+ date_published = date_published ,
218+ url = advisory_url ,
219+ )
220+
221+ affected_version_range = None
222+ fixed_versions = advisory .get ("fixed_versions" ) or []
223+ affected_range = advisory .get ("affected_range" )
224+ gitlab_native_schemes = set (["pypi" , "gem" , "npm" , "go" , "packagist" , "conan" ])
225+ vrc : VersionRange = RANGE_CLASS_BY_SCHEMES [purl .type ]
226+ gitlab_scheme = gitlab_scheme_by_purl_type [purl .type ]
227+ try :
228+ if affected_range :
229+ if gitlab_scheme in gitlab_native_schemes :
230+ affected_version_range = from_gitlab_native (
231+ gitlab_scheme = gitlab_scheme , string = affected_range
232+ )
233+ else :
234+ affected_version_range = vrc .from_native (affected_range )
235+ except Exception as e :
236+ logger (
237+ f"advisory_dict_to_advisory_data: affected_range is not parsable: { affected_range !r} for: { purl !s} error: { e !r} \n { traceback .format_exc ()} " ,
238+ level = logging .ERROR ,
239+ )
240+
241+ parsed_fixed_versions = []
242+ for fixed_version in fixed_versions :
243+ try :
244+ fixed_version = vrc .version_class (fixed_version )
245+ parsed_fixed_versions .append (fixed_version )
246+ except Exception as e :
247+ logger (
248+ f"advisory_dict_to_advisory_data: fixed_version is not parsable`: { fixed_version !r} error: { e !r} \n { traceback .format_exc ()} " ,
249+ level = logging .ERROR ,
250+ )
251+
252+ purl_without_version = get_purl (
253+ package_slug = package_slug ,
254+ purl_type_by_gitlab_scheme = purl_type_by_gitlab_scheme ,
255+ logger = logger ,
256+ )
257+
258+ if parsed_fixed_versions :
259+ affected_packages = list (
260+ extract_affected_packages (
261+ affected_version_range = affected_version_range ,
262+ fixed_versions = parsed_fixed_versions ,
263+ purl = purl_without_version ,
264+ )
265+ )
266+ else :
267+ if not affected_version_range :
268+ affected_packages = []
269+ else :
270+ affected_packages = [
271+ AffectedPackage (
272+ package = purl_without_version ,
273+ affected_version_range = affected_version_range ,
274+ )
275+ ]
276+
277+ # Determine advisory_url if not provided
278+ if not advisory_url and package_slug and identifier :
279+ advisory_url = urljoin (
280+ "https://gitlab.com/gitlab-org/advisories-community/-/blob/main/" ,
281+ package_slug + "/" + identifier + ".yml" ,
282+ )
283+
284+ return AdvisoryData (
285+ aliases = aliases ,
286+ summary = summary ,
287+ references = references ,
288+ date_published = date_published ,
289+ affected_packages = affected_packages ,
290+ weaknesses = cwe_list ,
291+ url = advisory_url ,
292+ )
293+
112294
113295def parse_advisory_path (base_path : Path , file_path : Path ) -> Tuple [str , str , str ]:
114296 """
@@ -219,94 +401,16 @@ def parse_gitlab_advisory(
219401 )
220402 return
221403
222- # refer to schema here https://gitlab.com/gitlab-org/advisories-community/-/blob/main/ci/schema/schema.json
223- aliases = gitlab_advisory .get ("identifiers" )
224- summary = build_description (gitlab_advisory .get ("title" ), gitlab_advisory .get ("description" ))
225- urls = gitlab_advisory .get ("urls" )
226- references = [Reference .from_url (u ) for u in urls ]
227-
228- cwe_ids = gitlab_advisory .get ("cwe_ids" ) or []
229- cwe_list = list (map (get_cwe_id , cwe_ids ))
230-
231- date_published = dateparser .parse (gitlab_advisory .get ("pubdate" ))
232- date_published = date_published .replace (tzinfo = pytz .UTC )
233- package_slug = gitlab_advisory .get ("package_slug" )
234404 advisory_url = get_advisory_url (
235405 file = file ,
236406 base_path = base_path ,
237407 url = "https://gitlab.com/gitlab-org/advisories-community/-/blob/main/" ,
238408 )
239- purl : PackageURL = get_purl (
240- package_slug = package_slug ,
409+
410+ return advisory_dict_to_advisory_data (
411+ advisory = gitlab_advisory ,
241412 purl_type_by_gitlab_scheme = purl_type_by_gitlab_scheme ,
413+ gitlab_scheme_by_purl_type = gitlab_scheme_by_purl_type ,
242414 logger = logger ,
243- )
244- if not purl :
245- logger (
246- f"parse_yaml_file: purl is not valid: { file !r} { package_slug !r} " , level = logging .ERROR
247- )
248- return AdvisoryData (
249- aliases = aliases ,
250- summary = summary ,
251- references = references ,
252- date_published = date_published ,
253- url = advisory_url ,
254- )
255- affected_version_range = None
256- fixed_versions = gitlab_advisory .get ("fixed_versions" ) or []
257- affected_range = gitlab_advisory .get ("affected_range" )
258- gitlab_native_schemes = set (["pypi" , "gem" , "npm" , "go" , "packagist" , "conan" ])
259- vrc : VersionRange = RANGE_CLASS_BY_SCHEMES [purl .type ]
260- gitlab_scheme = gitlab_scheme_by_purl_type [purl .type ]
261- try :
262- if affected_range :
263- if gitlab_scheme in gitlab_native_schemes :
264- affected_version_range = from_gitlab_native (
265- gitlab_scheme = gitlab_scheme , string = affected_range
266- )
267- else :
268- affected_version_range = vrc .from_native (affected_range )
269- except Exception as e :
270- logger (
271- f"parse_yaml_file: affected_range is not parsable: { affected_range !r} for: { purl !s} error: { e !r} \n { traceback .format_exc ()} " ,
272- level = logging .ERROR ,
273- )
274-
275- parsed_fixed_versions = []
276- for fixed_version in fixed_versions :
277- try :
278- fixed_version = vrc .version_class (fixed_version )
279- parsed_fixed_versions .append (fixed_version )
280- except Exception as e :
281- logger (
282- f"parse_yaml_file: fixed_version is not parsable`: { fixed_version !r} error: { e !r} \n { traceback .format_exc ()} " ,
283- level = logging .ERROR ,
284- )
285-
286- if parsed_fixed_versions :
287- affected_packages = list (
288- extract_affected_packages (
289- affected_version_range = affected_version_range ,
290- fixed_versions = parsed_fixed_versions ,
291- purl = purl ,
292- )
293- )
294- else :
295- if not affected_version_range :
296- affected_packages = []
297- else :
298- affected_packages = [
299- AffectedPackage (
300- package = purl ,
301- affected_version_range = affected_version_range ,
302- )
303- ]
304- return AdvisoryData (
305- aliases = aliases ,
306- summary = summary ,
307- references = references ,
308- date_published = date_published ,
309- affected_packages = affected_packages ,
310- weaknesses = cwe_list ,
311- url = advisory_url ,
415+ advisory_url = advisory_url ,
312416 )
0 commit comments