Skip to content

Commit bccfb5a

Browse files
committed
add doctest and optimize code in rockylinux importer
Signed-off-by: ambuj <kulshreshthaak.12@gmail.com>
1 parent 1914bdc commit bccfb5a

File tree

1 file changed

+55
-4
lines changed

1 file changed

+55
-4
lines changed

vulnerabilities/importers/rockylinux.py

Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,14 +69,61 @@ def advisory_data(self) -> Iterable[AdvisoryData]:
6969

7070

7171
def to_advisory(advisory_data):
72+
73+
"""
74+
Convert Rockylinux advisory data into an AdvisoryData object.
75+
76+
Args:
77+
advisory_data (dict): A dictionary containing advisory information.
78+
79+
Returns:
80+
AdvisoryData: An instance of AdvisoryData with processed information.
81+
82+
Example:
83+
>>> advisory_data = {
84+
... "name": "CVE-2023-1234",
85+
... "publishedAt": "2023-08-20T12:34:56Z",
86+
... "description": "A vulnerability in the system.",
87+
... "affectedProducts": ["product1"],
88+
... "rpms": {
89+
... "product1": {
90+
... "nvras": [
91+
... "package-1.0-1.el8.x86_64.rpm",
92+
... "package-2.0-1.el8.noarch.rpm"
93+
... ]
94+
... }
95+
... },
96+
... "fixes": [
97+
... {"sourceLink": "http://example.com/fix", "ticket": "12345"}
98+
... ],
99+
... "cves": [
100+
... {
101+
... "name": "CVE-2023-1234",
102+
... "cvss3BaseScore": "7.5",
103+
... "cvss3ScoringVector": "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H",
104+
... "sourceLink": "http://example.com/cve"
105+
... }
106+
... ]
107+
... }
108+
>>> advisory = to_advisory(advisory_data)
109+
>>> advisory.aliases
110+
'CVE-2023-1234'
111+
>>> advisory.date_published.year
112+
2023
113+
>>> len(advisory.affected_packages)
114+
2
115+
>>> len(advisory.references)
116+
2
117+
"""
72118
aliases = advisory_data.get("name") or ""
73119
date_published = dateparser.parse(advisory_data.get("publishedAt", ""))
74120

75121
summary = advisory_data.get("description") or ""
76122
affected_products = advisory_data.get("affectedProducts") or []
77123
affected_packages = []
78124
for products in affected_products:
79-
packages = advisory_data["rpms"][products]["nvras"]
125+
rpms = advisory_data.get("rpms", {})
126+
packages = rpms.get(products, {}).get("nvras", [])
80127
affected_packages.extend(packages)
81128
processed_affected_packages: List[AffectedPackage] = []
82129
for rpm in affected_packages:
@@ -97,8 +144,8 @@ def to_advisory(advisory_data):
97144
fixed_version=None,
98145
)
99146
)
100-
except Exception as e:
101-
logger.error(f"Failed to parse version range {purl.version} for {purl} {e}")
147+
except VersionParsingError as e:
148+
logger.error(f"Failed to parse version {purl.version} for {purl} {e}")
102149

103150
references = [
104151
Reference(
@@ -142,6 +189,10 @@ def to_advisory(advisory_data):
142189
)
143190

144191

192+
class VersionParsingError(Exception):
193+
pass
194+
195+
145196
def get_cwes_from_rockylinux_advisory(advisory_data) -> [int]:
146197
"""
147198
Extract CWE IDs from advisory data and validate them against a database.
@@ -194,6 +245,6 @@ def get_cwes_from_rockylinux_advisory(advisory_data) -> [int]:
194245
try:
195246
db.get(cwe_id)
196247
weaknesses.append(cwe_id)
197-
except Exception:
248+
except ValueError:
198249
logger.error("Invalid CWE id")
199250
return weaknesses

0 commit comments

Comments
 (0)