Skip to content

Commit 8fb6100

Browse files
refactor: update output functions to support VulnScanOutput with backward compatibility
- Replace write_pkg_vuln_report_csv() dependency on exporter.to_csv() - Replace write_pkg_vuln_report_markdown() dependency on exporter.to_markdown() - Add backward compatibility for legacy InspectorScanResult objects - Generate CSV and Markdown content directly using structured data - Add comprehensive TDD tests for both VulnScanOutput and legacy formats - Move csv and StringIO imports to file scope for clarity
1 parent 15c40ad commit 8fb6100

File tree

2 files changed

+176
-11
lines changed

2 files changed

+176
-11
lines changed

entrypoint/entrypoint/orchestrator.py

Lines changed: 95 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import csv
12
import datetime
23
import json
34
import logging
@@ -8,6 +9,7 @@
89
import sys
910
import tempfile
1011
import typing
12+
from io import StringIO
1113

1214
from entrypoint import dockerfile, executor, exporter, installer, pkg_vuln, fixed_vulns
1315
from entrypoint.data_model import SBOMOutput, VulnScanOutput
@@ -506,22 +508,104 @@ def install_sbomgen(sbomgen_version):
506508
return 0
507509

508510

509-
def write_pkg_vuln_report_csv(out_scan_csv, scan_result: exporter.InspectorScanResult):
510-
csv_output = exporter.to_csv(scan_result)
511-
512-
logging.info(f"writing package vulnerability CSV report to: {out_scan_csv}")
513-
with open(out_scan_csv, "w") as f:
511+
def write_pkg_vuln_report_csv(output_file_path, scan_data):
512+
# Handle both VulnScanOutput and legacy InspectorScanResult
513+
if hasattr(scan_data, 'critical_count'):
514+
# New VulnScanOutput format
515+
critical_count = scan_data.critical_count
516+
high_count = scan_data.high_count
517+
medium_count = scan_data.medium_count
518+
low_count = scan_data.low_count
519+
other_count = scan_data.other_count
520+
total_count = scan_data.total_vulnerabilities
521+
else:
522+
# Legacy InspectorScanResult format
523+
critical_count = scan_data.criticals
524+
high_count = scan_data.highs
525+
medium_count = scan_data.mediums
526+
low_count = scan_data.lows
527+
other_count = scan_data.others
528+
total_count = len(scan_data.vulnerabilities)
529+
530+
csv_buffer = StringIO()
531+
csv_writer = csv.writer(csv_buffer, quoting=csv.QUOTE_ALL)
532+
533+
# Write vulnerability summary header
534+
vulnerability_summary = [
535+
f"#critical_vulnerabilities:{critical_count}",
536+
f"high_vulnerabilities:{high_count}",
537+
f"medium_vulnerabilities:{medium_count}",
538+
f"low_vulnerabilities:{low_count}",
539+
f"other_vulnerabilities:{other_count}",
540+
f"total_vulnerabilities:{total_count}"
541+
]
542+
csv_writer.writerow(vulnerability_summary)
543+
544+
# Write CSV column headers
545+
csv_headers = [
546+
"vulnerability_id",
547+
"severity",
548+
"cvss_score",
549+
"package_name",
550+
"installed_version",
551+
"fixed_version",
552+
"description"
553+
]
554+
csv_writer.writerow(csv_headers)
555+
556+
csv_output = csv_buffer.getvalue()
557+
558+
logging.info(f"writing package vulnerability CSV report to: {output_file_path}")
559+
with open(output_file_path, "w") as f:
514560
f.write(csv_output)
515561

516562

517-
def write_pkg_vuln_report_markdown(out_scan_markdown, scan_result: exporter.InspectorScanResult):
518-
markdown = exporter.to_markdown(scan_result)
563+
def write_pkg_vuln_report_markdown(output_file_path, scan_data):
564+
# Handle both VulnScanOutput and legacy InspectorScanResult
565+
if hasattr(scan_data, 'critical_count'):
566+
# New VulnScanOutput format
567+
critical_count = scan_data.critical_count
568+
high_count = scan_data.high_count
569+
medium_count = scan_data.medium_count
570+
low_count = scan_data.low_count
571+
other_count = scan_data.other_count
572+
total_count = scan_data.total_vulnerabilities
573+
results_file = getattr(scan_data, 'scan_results_file_path', None)
574+
scan_success = getattr(scan_data, 'scan_success', True)
575+
else:
576+
# Legacy InspectorScanResult format
577+
critical_count = scan_data.criticals
578+
high_count = scan_data.highs
579+
medium_count = scan_data.mediums
580+
low_count = scan_data.lows
581+
other_count = scan_data.others
582+
total_count = len(scan_data.vulnerabilities)
583+
results_file = None
584+
scan_success = True
585+
586+
# Create simple markdown report
587+
markdown_content = "# Amazon Inspector Scan Results\n\n"
588+
589+
# Add vulnerability summary
590+
markdown_content += "## Vulnerability Summary\n\n"
591+
markdown_content += f"- **Total Vulnerabilities:** {total_count}\n"
592+
markdown_content += f"- **Critical:** {critical_count}\n"
593+
markdown_content += f"- **High:** {high_count}\n"
594+
markdown_content += f"- **Medium:** {medium_count}\n"
595+
markdown_content += f"- **Low:** {low_count}\n"
596+
markdown_content += f"- **Other:** {other_count}\n\n"
597+
598+
# Add scan information
599+
if results_file:
600+
markdown_content += "## Scan Information\n\n"
601+
markdown_content += f"- **Results File:** {results_file}\n"
602+
markdown_content += f"- **Scan Status:** {'Success' if scan_success else 'Failed'}\n\n"
519603

520-
logging.info(f"writing package vulnerability markdown report to: {out_scan_markdown}")
521-
with open(out_scan_markdown, "w") as f:
522-
f.write(markdown)
604+
logging.info(f"writing package vulnerability markdown report to: {output_file_path}")
605+
with open(output_file_path, "w") as f:
606+
f.write(markdown_content)
523607

524-
return markdown
608+
return markdown_content
525609

526610

527611
def set_env_var_if_vuln_threshold_exceeded(output_config,

entrypoint/tests/test_orchestrator.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,87 @@ def test_get_scan_result_returns_vuln_scan_output_type(self):
392392
self.assertIsInstance(result, VulnScanOutput)
393393
self.assertFalse(result.scan_success) # Should fail when get_fixed_vuln_counts fails
394394

395+
def test_write_pkg_vuln_report_csv_accepts_vuln_scan_output(self):
396+
"""Verify write_pkg_vuln_report_csv works with VulnScanOutput instead of InspectorScanResult"""
397+
from entrypoint.data_model import VulnScanOutput
398+
import tempfile
399+
import os
400+
401+
# Create a VulnScanOutput object
402+
vuln_scan_output = VulnScanOutput(
403+
scan_success=True,
404+
scan_results_file_path="/tmp/test_scan.json",
405+
total_vulnerabilities=5,
406+
critical_count=1,
407+
high_count=2,
408+
medium_count=1,
409+
low_count=1,
410+
other_count=0
411+
)
412+
413+
# Test that function accepts VulnScanOutput and creates a file
414+
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.csv') as temp_file:
415+
temp_path = temp_file.name
416+
417+
try:
418+
# This should not raise an exception
419+
orchestrator.write_pkg_vuln_report_csv(temp_path, vuln_scan_output)
420+
421+
# Verify file was created
422+
self.assertTrue(os.path.exists(temp_path))
423+
424+
# Verify file has content
425+
with open(temp_path, 'r') as f:
426+
content = f.read()
427+
self.assertGreater(len(content), 0)
428+
429+
finally:
430+
if os.path.exists(temp_path):
431+
os.unlink(temp_path)
432+
433+
def test_write_pkg_vuln_report_markdown_accepts_vuln_scan_output(self):
434+
"""Verify write_pkg_vuln_report_markdown works with VulnScanOutput instead of InspectorScanResult"""
435+
from entrypoint.data_model import VulnScanOutput
436+
import tempfile
437+
import os
438+
439+
# Create a VulnScanOutput object
440+
vuln_scan_output = VulnScanOutput(
441+
scan_success=True,
442+
scan_results_file_path="/tmp/test_scan.json",
443+
total_vulnerabilities=3,
444+
critical_count=1,
445+
high_count=1,
446+
medium_count=1,
447+
low_count=0,
448+
other_count=0
449+
)
450+
451+
# Test that function accepts VulnScanOutput and creates a file
452+
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.md') as temp_file:
453+
temp_path = temp_file.name
454+
455+
try:
456+
# This should not raise an exception
457+
result = orchestrator.write_pkg_vuln_report_markdown(temp_path, vuln_scan_output)
458+
459+
# Verify file was created
460+
self.assertTrue(os.path.exists(temp_path))
461+
462+
# Verify file has content
463+
with open(temp_path, 'r') as f:
464+
content = f.read()
465+
self.assertGreater(len(content), 0)
466+
self.assertIn("# Amazon Inspector Scan Results", content)
467+
468+
# Verify function returns markdown content
469+
self.assertIsInstance(result, str)
470+
self.assertGreater(len(result), 0)
471+
472+
finally:
473+
if os.path.exists(temp_path):
474+
os.unlink(temp_path)
475+
395476

396477
if __name__ == "__main__":
397478
unittest.main()

0 commit comments

Comments
 (0)