1+ from collections import defaultdict
2+ from typing import Dict , List , Any
3+
4+
5+ class Dedupe :
6+ @staticmethod
7+ def normalize_file_path (path : str ) -> str :
8+ return path .split ("/" , 1 )[- 1 ] if path and "/" in path else path or ""
9+
10+ @staticmethod
11+ def alert_key (alert : dict ) -> tuple :
12+ return (
13+ alert ["type" ],
14+ alert ["severity" ],
15+ alert ["category" ],
16+ Dedupe .normalize_file_path (alert .get ("file" )),
17+ alert .get ("start" ),
18+ alert .get ("end" )
19+ )
20+
21+ @staticmethod
22+ def consolidate_and_merge_alerts (package_group : List [Dict [str , Any ]]) -> Dict [str , Any ]:
23+ def alert_identity (alert : dict ) -> tuple :
24+ return (
25+ alert ["type" ],
26+ alert ["severity" ],
27+ alert ["category" ],
28+ Dedupe .normalize_file_path (alert .get ("file" )),
29+ alert .get ("start" ),
30+ alert .get ("end" )
31+ )
32+
33+ alert_map : Dict [tuple , dict ] = {}
34+ releases = set ()
35+ for pkg in package_group :
36+ release = pkg .get ("release" ) if pkg .get ("release" ) is not None else pkg .get ("type" )
37+ releases .add (release )
38+
39+ for alert in pkg .get ("alerts" , []):
40+ identity = alert_identity (alert )
41+ file = Dedupe .normalize_file_path (alert .get ("file" ))
42+
43+ if identity not in alert_map :
44+ alert_map [identity ] = {
45+ "key" : alert ["key" ], # keep the first key seen
46+ "type" : alert ["type" ],
47+ "severity" : alert ["severity" ],
48+ "category" : alert ["category" ],
49+ "file" : file ,
50+ "start" : alert .get ("start" ),
51+ "end" : alert .get ("end" ),
52+ "releases" : [release ]
53+ }
54+ else :
55+ if release not in alert_map [identity ]["releases" ]:
56+ alert_map [identity ]["releases" ].append (release )
57+
58+ base = package_group [0 ]
59+ return {
60+ "id" : base .get ("id" ),
61+ "author" : base .get ("author" ),
62+ "size" : base .get ("size" ),
63+ "type" : base .get ("type" ),
64+ "name" : base .get ("name" ),
65+ "namespace" : base .get ("namespace" ),
66+ "version" : base .get ("version" ),
67+ "releases" : sorted (releases ),
68+ "alerts" : list (alert_map .values ()),
69+ "score" : base .get ("score" , {}),
70+ "license" : base .get ("license" ),
71+ "licenseDetails" : base .get ("licenseDetails" , []),
72+ "batchIndex" : base .get ("batchIndex" ),
73+ "purl" : f"pkg:{ base .get ('type' , 'unknown' )} /{ base .get ('name' , 'unknown' )} @{ base .get ('version' , '0.0.0' )} "
74+ }
75+
76+ @staticmethod
77+ def dedupe (packages : List [Dict [str , Any ]], batched : bool = True ) -> List [Dict [str , Any ]]:
78+ if batched :
79+ grouped = Dedupe .consolidate_by_batch_index (packages )
80+ else :
81+ grouped = Dedupe .consolidate_by_order (packages )
82+ return [Dedupe .consolidate_and_merge_alerts (group ) for group in grouped .values ()]
83+
84+ @staticmethod
85+ def consolidate_by_batch_index (packages : List [Dict [str , Any ]]) -> dict [int , list [dict [str , Any ]]]:
86+ grouped : Dict [int , List [Dict [str , Any ]]] = defaultdict (list )
87+ for pkg in packages :
88+ grouped [pkg ["batchIndex" ]].append (pkg )
89+ return grouped
90+
91+ @staticmethod
92+ def consolidate_by_order (packages : List [Dict [str , Any ]]) -> dict [int , list [dict [str , Any ]]]:
93+ grouped : Dict [int , List [Dict [str , Any ]]] = defaultdict (list )
94+ batch_index = 0
95+ package_purl = None
96+ try :
97+ for pkg in packages :
98+ name = pkg ["name" ]
99+ version = pkg ["version" ]
100+ namespace = pkg .get ("namespace" )
101+ ecosystem = pkg .get ("type" )
102+ new_purl = f"pkg:{ ecosystem } /"
103+ if namespace :
104+ new_purl += f"{ namespace } /"
105+ new_purl += f"{ name } @{ version } "
106+ if package_purl is None :
107+ package_purl = new_purl
108+ if package_purl != new_purl :
109+ batch_index += 1
110+ pkg ["batchIndex" ] = batch_index
111+ grouped [pkg ["batchIndex" ]].append (pkg )
112+ except Exception as error :
113+ print (error )
114+ return grouped
0 commit comments