|
1 | 1 | import logging |
2 | 2 | from pathlib import PurePath |
3 | | - |
| 3 | +from requests.exceptions import ReadTimeout |
4 | 4 | import requests |
5 | 5 | from urllib.parse import urlencode |
6 | 6 | import base64 |
7 | 7 | import json |
8 | 8 | from socketsecurity.core.exceptions import ( |
9 | | - APIFailure, APIKeyMissing, APIAccessDenied, APIInsufficientQuota, APIResourceNotFound, APICloudflareError |
| 9 | + APIFailure, |
| 10 | + APIKeyMissing, |
| 11 | + APIAccessDenied, |
| 12 | + APIInsufficientQuota, |
| 13 | + APIResourceNotFound, |
| 14 | + APICloudflareError, |
| 15 | + RequestTimeoutExceeded |
10 | 16 | ) |
11 | 17 | from socketsecurity import __version__ |
12 | 18 | from socketsecurity.core.licenses import Licenses |
@@ -182,15 +188,18 @@ def do_request( |
182 | 188 | verify = True |
183 | 189 | if allow_unverified_ssl: |
184 | 190 | verify = False |
185 | | - response = requests.request( |
186 | | - method.upper(), |
187 | | - url, |
188 | | - headers=headers, |
189 | | - data=payload, |
190 | | - files=files, |
191 | | - timeout=timeout, |
192 | | - verify=verify |
193 | | - ) |
| 191 | + try: |
| 192 | + response = requests.request( |
| 193 | + method.upper(), |
| 194 | + url, |
| 195 | + headers=headers, |
| 196 | + data=payload, |
| 197 | + files=files, |
| 198 | + timeout=timeout, |
| 199 | + verify=verify |
| 200 | + ) |
| 201 | + except ReadTimeout: |
| 202 | + raise RequestTimeoutExceeded(f"Configured timeout {timeout} reached for request for path {url}") |
194 | 203 | output_headers = headers.copy() |
195 | 204 | output_headers['Authorization'] = "API_KEY_REDACTED" |
196 | 205 | output = { |
@@ -794,15 +803,18 @@ def get_source_data(package: Package, packages: dict) -> list: |
794 | 803 | else: |
795 | 804 | for top_id in package.topLevelAncestors: |
796 | 805 | top_package: Package |
797 | | - top_package = packages[top_id] |
798 | | - manifests = "" |
799 | | - top_purl = f"{top_package.type}/{top_package.name}@{top_package.version}" |
800 | | - for manifest_data in top_package.manifestFiles: |
801 | | - manifest_file = manifest_data.get("file") |
802 | | - manifests += f"{manifest_file};" |
803 | | - manifests = manifests.rstrip(";") |
804 | | - source = (top_purl, manifests) |
805 | | - introduced_by.append(source) |
| 806 | + top_package = packages.get(top_id) |
| 807 | + if top_package: |
| 808 | + manifests = "" |
| 809 | + top_purl = f"{top_package.type}/{top_package.name}@{top_package.version}" |
| 810 | + for manifest_data in top_package.manifestFiles: |
| 811 | + manifest_file = manifest_data.get("file") |
| 812 | + manifests += f"{manifest_file};" |
| 813 | + manifests = manifests.rstrip(";") |
| 814 | + source = (top_purl, manifests) |
| 815 | + introduced_by.append(source) |
| 816 | + else: |
| 817 | + log.debug(f"Unable to get top level package info for {top_id}") |
806 | 818 | return introduced_by |
807 | 819 |
|
808 | 820 | @staticmethod |
@@ -841,21 +853,29 @@ def create_sbom_dict(sbom: list) -> dict: |
841 | 853 | """ |
842 | 854 | packages = {} |
843 | 855 | top_level_count = {} |
| 856 | + top_levels = {} |
844 | 857 | for item in sbom: |
845 | 858 | package = Package(**item) |
846 | 859 | if package.id in packages: |
847 | | - print("Duplicate package?") |
| 860 | + log.debug("Duplicate package?") |
848 | 861 | else: |
849 | 862 | package = Core.get_license_details(package) |
850 | 863 | packages[package.id] = package |
851 | 864 | for top_id in package.topLevelAncestors: |
852 | 865 | if top_id not in top_level_count: |
853 | 866 | top_level_count[top_id] = 1 |
| 867 | + top_levels[top_id] = [package.id] |
854 | 868 | else: |
855 | 869 | top_level_count[top_id] += 1 |
| 870 | + if package.id not in top_levels[top_id]: |
| 871 | + top_levels[top_id].append(package.id) |
856 | 872 | if len(top_level_count) > 0: |
857 | 873 | for package_id in top_level_count: |
858 | | - packages[package_id].transitives = top_level_count[package_id] |
| 874 | + if package_id not in packages: |
| 875 | + details = top_levels.get(package_id) |
| 876 | + log.debug(f"Orphaned top level package id {package_id} for packages {details}") |
| 877 | + else: |
| 878 | + packages[package_id].transitives = top_level_count[package_id] |
859 | 879 | return packages |
860 | 880 |
|
861 | 881 | @staticmethod |
|
0 commit comments