Skip to content

Commit 2b0d198

Browse files
committed
Add v2 pipeline to collect Ubuntu OSV advisories
Signed-off-by: Keshav Priyadarshi <git@keshav.space>
1 parent 385088d commit 2b0d198

File tree

1 file changed

+79
-0
lines changed

1 file changed

+79
-0
lines changed
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
#
2+
# Copyright (c) nexB Inc. and others. All rights reserved.
3+
# VulnerableCode is a trademark of nexB Inc.
4+
# SPDX-License-Identifier: Apache-2.0
5+
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
6+
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
7+
# See https://aboutcode.org for more information about nexB OSS projects.
8+
#
9+
10+
from pathlib import Path
11+
from typing import Iterable
12+
13+
from fetchcode.vcs import fetch_via_vcs
14+
15+
from vulnerabilities.importer import AdvisoryData
16+
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2
17+
from vulnerabilities.pipes.osv_v2 import parse_advisory_data_v3
18+
from vulnerabilities.utils import get_advisory_url
19+
from vulnerabilities.utils import load_json
20+
21+
22+
class UbuntuOSVImporterPipeline(VulnerableCodeBaseImporterPipelineV2):
23+
"""
24+
Collect Ubuntu OSV format advisories.
25+
26+
Collect advisories from the GitHub Ubuntu Vulnerability Data repository.
27+
"""
28+
29+
pipeline_id = "ubuntu_osv_importer_v2"
30+
spdx_license_expression = "CC-BY-4.0"
31+
license_url = "https://github.com/canonical/ubuntu-security-notices/blob/main/LICENSE"
32+
repo_url = "git+https://github.com/canonical/ubuntu-security-notices/"
33+
34+
progress_step = 1
35+
36+
@classmethod
37+
def steps(cls):
38+
return (
39+
cls.clone,
40+
cls.collect_and_store_advisories,
41+
cls.clean_downloads,
42+
)
43+
44+
def clone(self):
45+
self.log(f"Cloning `{self.repo_url}`")
46+
self.vcs_response = fetch_via_vcs(self.repo_url)
47+
self.advisories_path = Path(self.vcs_response.dest_dir)
48+
49+
def advisories_count(self):
50+
cve_directory = self.advisories_path / "osv" / "cve"
51+
return sum(1 for _ in cve_directory.rglob("*.json"))
52+
53+
def collect_advisories(self) -> Iterable[AdvisoryData]:
54+
supported_ecosystems = ["deb"]
55+
cve_directory = self.advisories_path / "osv" / "cve"
56+
57+
for file in cve_directory.rglob("*.json"):
58+
advisory_url = get_advisory_url(
59+
file=file,
60+
base_path=self.advisories_path,
61+
url="https://github.com/canonical/ubuntu-security-notices/blob/main/",
62+
)
63+
raw_data = load_json(file)
64+
advisory_text = file.read_text()
65+
66+
yield parse_advisory_data_v3(
67+
raw_data=raw_data,
68+
supported_ecosystems=supported_ecosystems,
69+
advisory_url=advisory_url,
70+
advisory_text=advisory_text,
71+
)
72+
73+
def clean_downloads(self):
74+
if self.vcs_response:
75+
self.log("Removing cloned repository")
76+
self.vcs_response.delete()
77+
78+
def on_failure(self):
79+
self.clean_downloads()

0 commit comments

Comments
 (0)