1+ #!/usr/bin/env python3
2+ """
3+ Coverage script to load .coverage.json and integrate CodeQL query resolution data.
4+
5+ This script:
6+ 1. Loads the existing .coverage.json file from the project root
7+ 2. Runs 'codeql resolve queries --format=json ./ql/src' to get available queries
8+ 3. Integrates the query data into the coverage file
9+ """
10+
11+ import json
12+ import subprocess
13+ import sys
14+ import argparse
15+ from pathlib import Path
16+ from typing import Dict , List , Any
17+
18+
19+ def find_project_root () -> Path :
20+ """Find the project root directory by looking for .coverage.json file."""
21+ current_dir = Path (__file__ ).parent
22+
23+ # Look for .coverage.json in parent directories
24+ while current_dir != current_dir .parent :
25+ coverage_file = current_dir / ".coverage.json"
26+ if coverage_file .exists ():
27+ return current_dir
28+ current_dir = current_dir .parent
29+
30+ # If not found, assume project root is one level up from scripts directory
31+ return Path (__file__ ).parent .parent
32+
33+
34+ def load_coverage_file (project_root : Path ) -> Dict [str , Any ]:
35+ """Load the existing .coverage.json file."""
36+ coverage_file = project_root / ".coverage.json"
37+
38+ if not coverage_file .exists ():
39+ print (f"Error: .coverage.json not found at { coverage_file } " )
40+ sys .exit (1 )
41+
42+ try :
43+ with open (coverage_file , 'r' , encoding = 'utf-8' ) as f :
44+ return json .load (f )
45+ except json .JSONDecodeError as e :
46+ print (f"Error: Invalid JSON in .coverage.json: { e } " )
47+ sys .exit (1 )
48+
49+
50+ def run_codeql_resolve_queries (project_root : Path ) -> List [str ]:
51+ """Run codeql resolve queries command and return the list of query paths."""
52+ ql_src_path = project_root / "ql" / "src"
53+
54+ if not ql_src_path .exists ():
55+ print (f"Error: ql/src directory not found at { ql_src_path } " )
56+ sys .exit (1 )
57+
58+ try :
59+ cmd = ["codeql" , "resolve" , "queries" , "--format=json" , str (ql_src_path )]
60+ result = subprocess .run (
61+ cmd ,
62+ cwd = project_root ,
63+ capture_output = True ,
64+ text = True ,
65+ check = True
66+ )
67+
68+ # Parse the JSON output
69+ queries = json .loads (result .stdout )
70+ return queries
71+
72+ except subprocess .CalledProcessError as e :
73+ print (f"Error running codeql command: { e } " )
74+ print (f"stderr: { e .stderr } " )
75+ sys .exit (1 )
76+ except json .JSONDecodeError as e :
77+ print (f"Error parsing codeql output as JSON: { e } " )
78+ sys .exit (1 )
79+
80+
81+ def process_query_paths (queries : List [str ], project_root : Path ) -> List [Dict [str , Any ]]:
82+ """Process query paths to extract metadata and create coverage entries."""
83+ processed_queries = []
84+
85+ for query_path in queries :
86+ # Convert absolute path to relative path from project root
87+ try :
88+ relative_path = Path (query_path ).relative_to (project_root )
89+ except ValueError :
90+ # If the path is not relative to project root, use the full path
91+ relative_path = Path (query_path )
92+
93+ # Extract query metadata
94+ query_info = {
95+ "path" : str (relative_path ),
96+ "absolute_path" : query_path ,
97+ "name" : Path (query_path ).stem ,
98+ "category" : extract_category_from_path (relative_path ),
99+ "cwe" : extract_cwe_from_path (relative_path ),
100+ "covered" : False , # Default to not covered
101+ "test_files" : [] # Will be populated with test file paths if any
102+ }
103+
104+ processed_queries .append (query_info )
105+
106+ return processed_queries
107+
108+
109+ def extract_category_from_path (path : Path ) -> str :
110+ """Extract category from query path (e.g., 'security', 'diagnostics')."""
111+ parts = path .parts
112+ if len (parts ) >= 3 and parts [0 ] == "ql" and parts [1 ] == "src" :
113+ return parts [2 ]
114+ return "unknown"
115+
116+
117+ def extract_cwe_from_path (path : Path ) -> str :
118+ """Extract CWE number from query path if present."""
119+ parts = path .parts
120+ for part in parts :
121+ if part .startswith ("CWE-" ):
122+ return part
123+ return ""
124+
125+
126+ def update_coverage_file (coverage_data : Dict [str , Any ], queries : List [Dict [str , Any ]]) -> Dict [str , Any ]:
127+ """Update the coverage data with query information."""
128+ # Add queries to the coverage data
129+ coverage_data ["queries" ] = queries
130+
131+ # Update metadata
132+ coverage_data ["metadata" ] = {
133+ "total_queries" : len (queries ),
134+ "covered_queries" : sum (1 for q in queries if q ["covered" ]),
135+ "categories" : list (set (q ["category" ] for q in queries )),
136+ "cwes" : list (set (q ["cwe" ] for q in queries if q ["cwe" ]))
137+ }
138+
139+ # Calculate coverage percentage
140+ total = coverage_data ["metadata" ]["total_queries" ]
141+ covered = coverage_data ["metadata" ]["covered_queries" ]
142+ coverage_data ["metadata" ]["coverage_percentage" ] = (covered / total * 100 ) if total > 0 else 0
143+
144+ return coverage_data
145+
146+
147+ def save_coverage_file (coverage_data : Dict [str , Any ], project_root : Path ) -> None :
148+ """Save the updated coverage data back to .coverage.json."""
149+ coverage_file = project_root / ".coverage.json"
150+
151+ try :
152+ with open (coverage_file , 'w' , encoding = 'utf-8' ) as f :
153+ json .dump (coverage_data , f , indent = 2 , ensure_ascii = False )
154+ print (f"Successfully updated { coverage_file } " )
155+ except Exception as e :
156+ print (f"Error saving coverage file: { e } " )
157+ sys .exit (1 )
158+
159+
160+ def generate_coverage_markdown (coverage_data : Dict [str , Any ]) -> str :
161+ """Generate markdown coverage report from coverage data."""
162+ metadata = coverage_data ["metadata" ]
163+ queries = coverage_data ["queries" ]
164+
165+ # Calculate coverage percentage
166+ coverage_pct = metadata ["coverage_percentage" ]
167+
168+ # Create coverage badge color based on percentage
169+ if coverage_pct >= 80 :
170+ badge_color = "brightgreen"
171+ elif coverage_pct >= 60 :
172+ badge_color = "yellow"
173+ elif coverage_pct >= 40 :
174+ badge_color = "orange"
175+ else :
176+ badge_color = "red"
177+
178+ # Generate markdown content
179+ md_content = []
180+
181+ # Coverage badge
182+ md_content .append (f"" )
183+ md_content .append ("" )
184+
185+ # Summary statistics
186+ md_content .append ("| Metric | Value |" )
187+ md_content .append ("|--------|-------|" )
188+ md_content .append (f"| Total Queries | { metadata ['total_queries' ]} |" )
189+ md_content .append (f"| Covered Queries | { metadata ['covered_queries' ]} |" )
190+ md_content .append (f"| Coverage Percentage | { coverage_pct :.1f} % |" )
191+ md_content .append (f"| Categories | { len (metadata ['categories' ])} |" )
192+ md_content .append (f"| CWE Categories | { len (metadata ['cwes' ])} |" )
193+ md_content .append ("" )
194+
195+ # Coverage by category
196+ if queries :
197+ category_stats = {}
198+ for query in queries :
199+ category = query ["category" ]
200+ if category not in category_stats :
201+ category_stats [category ] = {"total" : 0 , "covered" : 0 }
202+ category_stats [category ]["total" ] += 1
203+ if query ["covered" ]:
204+ category_stats [category ]["covered" ] += 1
205+
206+ md_content .append ("### Coverage by Category" )
207+ md_content .append ("" )
208+ md_content .append ("| Category | Covered | Total | Percentage |" )
209+ md_content .append ("|----------|---------|-------|------------|" )
210+
211+ for category in sorted (category_stats .keys ()):
212+ stats = category_stats [category ]
213+ pct = (stats ["covered" ] / stats ["total" ] * 100 ) if stats ["total" ] > 0 else 0
214+ md_content .append (f"| { category .title ()} | { stats ['covered' ]} | { stats ['total' ]} | { pct :.1f} % |" )
215+
216+ md_content .append ("" )
217+
218+ # CWE coverage breakdown
219+ if metadata ["cwes" ]:
220+ cwe_stats = {}
221+ for query in queries :
222+ if query ["cwe" ]:
223+ cwe = query ["cwe" ]
224+ if cwe not in cwe_stats :
225+ cwe_stats [cwe ] = {"total" : 0 , "covered" : 0 }
226+ cwe_stats [cwe ]["total" ] += 1
227+ if query ["covered" ]:
228+ cwe_stats [cwe ]["covered" ] += 1
229+
230+ if cwe_stats :
231+ md_content .append ("### Coverage by CWE" )
232+ md_content .append ("" )
233+ md_content .append ("| CWE | Description | Covered | Total | Percentage |" )
234+ md_content .append ("|-----|-------------|---------|-------|------------|" )
235+
236+ # CWE descriptions for common ones
237+ cwe_descriptions = {
238+ "CWE-200" : "Information Exposure" ,
239+ "CWE-284" : "Improper Access Control" ,
240+ "CWE-306" : "Missing Authentication" ,
241+ "CWE-319" : "Cleartext Transmission" ,
242+ "CWE-327" : "Broken/Risky Crypto Algorithm" ,
243+ "CWE-352" : "Cross-Site Request Forgery" ,
244+ "CWE-272" : "Least Privilege Violation" ,
245+ "CWE-311" : "Missing Encryption" ,
246+ "CWE-400" : "Resource Exhaustion" ,
247+ "CWE-942" : "Overly Permissive CORS" ,
248+ "CWE-693" : "Protection Mechanism Failure" ,
249+ "CWE-295" : "Improper Certificate Validation" ,
250+ "CWE-798" : "Hard-coded Credentials" ,
251+ "CWE-404" : "Improper Resource Shutdown"
252+ }
253+
254+ for cwe in sorted (cwe_stats .keys ()):
255+ stats = cwe_stats [cwe ]
256+ pct = (stats ["covered" ] / stats ["total" ] * 100 ) if stats ["total" ] > 0 else 0
257+ description = cwe_descriptions .get (cwe , "Security Vulnerability" )
258+ md_content .append (f"| { cwe } | { description } | { stats ['covered' ]} | { stats ['total' ]} | { pct :.1f} % |" )
259+
260+ md_content .append ("" )
261+
262+ # Last updated timestamp
263+ from datetime import datetime
264+ timestamp = datetime .now ().strftime ("%Y-%m-%d %H:%M:%S UTC" )
265+ md_content .append (f"*Last updated: { timestamp } *" )
266+
267+ return "\n " .join (md_content )
268+
269+
270+ def update_readme_coverage (project_root : Path , coverage_markdown : str ) -> None :
271+ """Update the README.md file with the coverage report."""
272+ readme_file = project_root / "README.md"
273+
274+ if not readme_file .exists ():
275+ print (f"Warning: README.md not found at { readme_file } " )
276+ return
277+
278+ try :
279+ with open (readme_file , 'r' , encoding = 'utf-8' ) as f :
280+ content = f .read ()
281+
282+ # Find the coverage report markers
283+ start_marker = "<!-- COVERAGE-REPORT -->"
284+ end_marker = "<!-- COVERAGE-REPORT:END -->"
285+
286+ start_idx = content .find (start_marker )
287+ end_idx = content .find (end_marker )
288+
289+ if start_idx == - 1 or end_idx == - 1 :
290+ print (f"Warning: Coverage report markers not found in { readme_file } " )
291+ print ("Please add the following markers to your README.md where you want the coverage report:" )
292+ print (f" { start_marker } " )
293+ print (f" { end_marker } " )
294+ return
295+
296+ # Replace the content between markers
297+ new_content = (
298+ content [:start_idx + len (start_marker )] +
299+ "\n \n " + coverage_markdown + "\n \n " +
300+ content [end_idx :]
301+ )
302+
303+ with open (readme_file , 'w' , encoding = 'utf-8' ) as f :
304+ f .write (new_content )
305+
306+ print (f"Successfully updated coverage report in { readme_file } " )
307+
308+ except Exception as e :
309+ print (f"Error updating README.md: { e } " )
310+
311+
312+ def main ():
313+ """Main function to orchestrate the coverage update process."""
314+ parser = argparse .ArgumentParser (description = "Generate CodeQL query coverage report" )
315+ parser .add_argument (
316+ "--markdown-only" ,
317+ action = "store_true" ,
318+ help = "Generate only the markdown report and print to stdout (don't update files)"
319+ )
320+ parser .add_argument (
321+ "--no-readme-update" ,
322+ action = "store_true" ,
323+ help = "Don't update the README.md file with the coverage report"
324+ )
325+
326+ args = parser .parse_args ()
327+
328+ if not args .markdown_only :
329+ print ("Loading CodeQL query coverage data..." )
330+
331+ # Find project root
332+ project_root = find_project_root ()
333+ if not args .markdown_only :
334+ print (f"Project root: { project_root } " )
335+
336+ # Load existing coverage file
337+ coverage_data = load_coverage_file (project_root )
338+ if not args .markdown_only :
339+ print ("Loaded existing coverage data" )
340+
341+ # Run codeql resolve queries
342+ if not args .markdown_only :
343+ print ("Running codeql resolve queries..." )
344+ query_paths = run_codeql_resolve_queries (project_root )
345+ if not args .markdown_only :
346+ print (f"Found { len (query_paths )} queries" )
347+
348+ # Process query paths
349+ processed_queries = process_query_paths (query_paths , project_root )
350+
351+ # Update coverage data
352+ updated_coverage = update_coverage_file (coverage_data , processed_queries )
353+
354+ # Generate markdown coverage report
355+ coverage_markdown = generate_coverage_markdown (updated_coverage )
356+
357+ if args .markdown_only :
358+ # Just print the markdown and exit
359+ print (coverage_markdown )
360+ return
361+
362+ # Save updated coverage file
363+ save_coverage_file (updated_coverage , project_root )
364+
365+ # Update README if not disabled
366+ if not args .no_readme_update :
367+ print ("Generating coverage report..." )
368+ update_readme_coverage (project_root , coverage_markdown )
369+
370+ # Print summary
371+ metadata = updated_coverage ["metadata" ]
372+ print (f"\n Coverage Summary:" )
373+ print (f" Total queries: { metadata ['total_queries' ]} " )
374+ print (f" Covered queries: { metadata ['covered_queries' ]} " )
375+ print (f" Coverage percentage: { metadata ['coverage_percentage' ]:.1f} %" )
376+ print (f" Categories: { ', ' .join (metadata ['categories' ])} " )
377+ print (f" CWEs covered: { len (metadata ['cwes' ])} " )
378+
379+
380+ if __name__ == "__main__" :
381+ main ()
0 commit comments