diff --git a/classes/source-info.bbclass b/classes/source-info.bbclass new file mode 100644 index 00000000..d6c71b74 --- /dev/null +++ b/classes/source-info.bbclass @@ -0,0 +1,51 @@ + +python do_collect_source_info() { + import json + + log_file = f"{d.getVar('T')}/source-info.json" + + source_info = d.getVar("EMLINUX_SOURCE_FROM") + if source_info is None: + distro = d.getVar("DISTRO").split("-")[1] + source_info = distro + + data = { + "source_package_name": d.getVar("PN"), + "source_from": source_info, + } + + with open(log_file, "w") as f: + json.dump(data, f, indent=4) +} + +python do_merge_source_info() { + import glob + import json + + distro = d.getVar("DISTRO") + distro_arch = d.getVar("DISTRO_ARCH") + + tmpdir = d.getVar("TMPDIR") + + basepath = f"{tmpdir}/work/{distro}-{distro_arch}" + search_path = f"{basepath}/*/*/temp/source-info.json" + + source_info = {} + + for file in glob.glob(search_path): + with open(file) as f: + data = json.load(f) + pkg = data["source_package_name"] + source_info[pkg] = data + + source_info = dict(sorted(source_info.items(), key=lambda x: x[0], reverse=False)) + + source_info_file = d.getVar("DEPLOY_DIR_IMAGE") + "/all-source-info.json" + + with open(source_info_file, "w") as f: + json.dump(source_info, f, indent=4) + +} + +addtask collect_source_info after do_dpkg_source before do_dpkg_build +addtask merge_source_info after do_image before do_deploy diff --git a/conf/distro/emlinux-common.inc b/conf/distro/emlinux-common.inc index 1c9fddf1..457f0caf 100644 --- a/conf/distro/emlinux-common.inc +++ b/conf/distro/emlinux-common.inc @@ -11,7 +11,7 @@ require include/security_flags.inc -INHERIT += "sdk-installer" +INHERIT += "sdk-installer source-info" DISTRO_KERNELS ?= "linux-cip" KERNEL_NAME ?= "cip" diff --git a/docker/Dockerfile b/docker/Dockerfile index 40d4cd75..7c66e56d 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -76,6 +76,7 @@ RUN bash -c 'if test -n "$no_proxy"; then git config --global core.noproxy "$no_ RUN pip install --user 'cyclonedx-python-lib>=7.3.2,<=7.5.1' --break-system-packages --no-warn-script-location RUN pip install --user jsonschema==4.21.1 --break-system-packages --no-warn-script-location RUN pip install --user spdx-tools==0.8.2 --break-system-packages --no-warn-script-location +RUN pip install --user looseversion==1.3.0 --break-system-packages --no-warn-script-location RUN mkdir work WORKDIR /home/build/work diff --git a/recipes-core/emlinux-customization/emlinux-customization.bb b/recipes-core/emlinux-customization/emlinux-customization.bb index 881a27c2..330edad9 100644 --- a/recipes-core/emlinux-customization/emlinux-customization.bb +++ b/recipes-core/emlinux-customization/emlinux-customization.bb @@ -10,8 +10,9 @@ # FILESEXTRAPATHS:prepend := "${FILE_DIRNAME}/files:" +EMLINUX_SOURCE_FROM="non-debian" DESCRIPTION = "EMLinux 3.x specific customization" - +LICENSE = "MIT" DEBIAN_DEPENDS = "netbase" inherit dpkg-raw @@ -26,4 +27,3 @@ do_install:append() { echo "PS1=\"${EMLINUX_ENVIRONMENT_VARIABLE_PS1}\"" > "${D}/etc/profile.d/ps1.sh" fi } - diff --git a/recipes-kernel/linux/linux-cip-rt_6.1.bb b/recipes-kernel/linux/linux-cip-rt_6.1.bb index 43cc96f7..3bc73ef8 100644 --- a/recipes-kernel/linux/linux-cip-rt_6.1.bb +++ b/recipes-kernel/linux/linux-cip-rt_6.1.bb @@ -17,3 +17,5 @@ SRC_URI += " file://preempt-rt.cfg" SRC_URI:append:generic-x86-64 = " file://generic-x86-64_defconfig" SRC_URI:append:raspberrypi3bplus-64 = " file://raspberrypi3-64_defconfig" SRC_URI:append:raspberrypi4b-64 = " file://raspberrypi4-64_defconfig" + +EMLINUX_SOURCE_FROM="non-debian" diff --git a/recipes-kernel/linux/linux-cip_6.1.bb b/recipes-kernel/linux/linux-cip_6.1.bb index 0398092a..eb5d1bbb 100644 --- a/recipes-kernel/linux/linux-cip_6.1.bb +++ b/recipes-kernel/linux/linux-cip_6.1.bb @@ -19,3 +19,5 @@ SRC_URI:append:qemu-amd64 = " file://qemu-amd64_defconfig" SRC_URI:append:generic-x86-64 = " file://generic-x86-64_defconfig" SRC_URI:append:raspberrypi3bplus-64 = " file://raspberrypi3-64_defconfig" SRC_URI:append:raspberrypi4b-64 = " file://raspberrypi4-64_defconfig" + +EMLINUX_SOURCE_FROM="non-debian" diff --git a/recipes-kernel/linux/linux-cip_6.12.bb b/recipes-kernel/linux/linux-cip_6.12.bb index 2afc9b03..8aa19160 100644 --- a/recipes-kernel/linux/linux-cip_6.12.bb +++ b/recipes-kernel/linux/linux-cip_6.12.bb @@ -15,3 +15,5 @@ SRC_URI:append:qemu-arm = " file://qemu-arm_defconfig" SRC_URI:append:qemu-amd64 = " file://qemu-amd64_defconfig" SRC_URI:append:generic-x86-64 = " file://generic-x86-64_defconfig" SRC_URI:append:raspberrypi4b-64 = " file://raspberrypi4-64_defconfig" + +EMLINUX_SOURCE_FROM="non-debian" diff --git a/scripts/cve_check_ng.py b/scripts/cve_check_ng.py new file mode 100755 index 00000000..6bce1869 --- /dev/null +++ b/scripts/cve_check_ng.py @@ -0,0 +1,456 @@ +#!/usr/bin/python3 + +# +# EMLinux CVE checker +# +# Copyright (c) Cybertrust Japan Co., Ltd. +# +# SPDX-License-Identifier: MIT +# + +from lib.python.cve.plugin.eml_cve_plugin_base import EmlCvePlugin +import lib.python.cve.common_libs as cl +from lib.python.cve.nvd_lib import CveCheckMergedList, NvdCveInfoListCreator +from lib.python.cve.cve_reporter import CveReporter +from lib.python.package_info import PackageInfoHelper, PackageList +from lib.python.cve.cve_product import CveProductList +from lib.python.cve.kev_info import KevInfoList +import lib.python.cve.kev_cve as kev_cve +import lib.python.bitbake_runner as bitbake_runner + +import argparse +import sys +import os, os.path +import yaml +from typing import Any +import pathlib +import traceback +import re +import logging + +logging.basicConfig(level=logging.INFO, format="%(asctime)s:%(levelname)s: %(message)s") +logger = logging.getLogger("emlinux-cve-check") + +import glob + +import importlib.util +from concurrent.futures import ThreadPoolExecutor, as_completed + + +def create_ignore_list( + emlinux_layer_dir: str, + installed_packages: PackageList, + debian_codename: str, + extra_cve_check_ignore: str, +): + name = f"{emlinux_layer_dir}/conf/cve/cve_check_ignore.yml" + + def read_ignore_list(filename): + if filename is None: + return {} + + try: + with open(filename) as f: + return yaml.safe_load(f) + except FileNotFoundError: + logger.warn(f"File {filename} is not found") + return {} + + # 1. Read default ignore data + tmp_merge_list = read_ignore_list(name) + # 2. Read extra ignore list + extra_data = read_ignore_list(extra_cve_check_ignore) + + # backward compatibility + def make_backward_compatibility(data): + tmp = {} + for pkg in data: + if type(data[pkg]) is list: + tmp[pkg] = {"all": data[pkg]} + else: + tmp[pkg] = data[pkg] + return tmp + + tmp_merge_list = make_backward_compatibility(tmp_merge_list) + extra_data = make_backward_compatibility(extra_data) + + # 3. Merge default and extra list + if extra_data: + for pkg in extra_data: + if pkg in tmp_merge_list: + for ek in extra_data[pkg].keys(): + if ek in tmp_merge_list[pkg]: + tmp_merge_list[pkg][ek] += extra_data[pkg][ek] + else: + tmp_merge_list[pkg][ek] = extra_data[pkg][ek] + else: + tmp_merge_list[pkg] = extra_data[pkg] + + # 4. Create complete ignore list + # This step does not collect CVE IDs which are not target distribution/linux version. + ignore_list = {} + all_used = [] + for pkg in tmp_merge_list: + d = tmp_merge_list[pkg] + for version in d.keys(): + if debian_codename == version: + ignore_list[pkg] = d[version] + elif version == "all": + if not pkg in ignore_list: + ignore_list[pkg] = [] + all_used.append(pkg) + else: + if not pkg in installed_packages: + continue + pkg_version = str(installed_packages.get_upstream_version(pkg)) + if str(version) == pkg_version: + ignore_list[pkg] = d[version] + break + + ver_pattern = rf"\b{re.escape(str(version))}(?!\d)" + m = re.search(ver_pattern, pkg_version) + if m: + if not pkg in ignore_list: + ignore_list[pkg] = d[version] + else: + ignore_list[pkg].extend(d[version]) + + for pkg in all_used: + ignore_list[pkg].extend(tmp_merge_list[pkg]["all"]) + return ignore_list + + +def create_cve_check_merged_list( + src_pkg_names: list[str], check_results: Any +) -> CveCheckMergedList: + cve_check_merged_list = CveCheckMergedList() + + cve_ids = create_cve_id_list_by_src_pkg_name_from_check_result( + src_pkg_names, check_results + ) + for src_pkg_name in src_pkg_names: + for cveid in cve_ids: + for cr in check_results: + vulns = cr[src_pkg_name] + if vulns is None: + # Plugin doesn't have CVE information for the src_pkg_name + continue + if cveid in vulns: + cve_check_merged_list.add_data( + src_pkg_name, cveid, vulns[cveid], cr.priority + ) + else: + # Plugin doesn't have CVE information for the CVE + pass + # logger.debug(f"{cveid} {src_pkg_name} is not found") + return cve_check_merged_list + + +def create_cve_id_list_by_src_pkg_name_from_check_result( + src_pkg_names: list[str], check_results: Any +) -> list[str]: + tmp_cve_ids = [] + for src_pkg_name in src_pkg_names: + for cr in check_results: + ci = cr.cve_ids_by_src_pkg(src_pkg_name) + if ci: + tmp_cve_ids.extend(ci) + return list(dict.fromkeys(tmp_cve_ids)) + + +# make a source package name list which content has CVE information +def create_src_package_name_list_from_check_result(check_results: Any) -> list[str]: + tmp_list = [] + for result in check_results: + tmp_list.extend(result.src_pkg_names()) + return list(dict.fromkeys(tmp_list)) + + +def read_recipe_source_info(deploy_dir: str) -> Any: + filepath = deploy_dir + "/all-source-info.json" + return cl.read_json(filepath) + + +def load_plugin(plugin_file: str) -> EmlCvePlugin: + path = pathlib.Path(plugin_file).resolve() + if not path.exists(): + logger.error(f"Failed to find plugin {plugin_file}") + exit(1) + + spec = importlib.util.spec_from_file_location(path.stem, str(path)) + if not spec or not spec.loader: + logger.error(f"Fail to load spec from {plugin_file}") + exit(1) + + mod = importlib.util.module_from_spec(spec) + sys.modules[path.stem] = mod + try: + spec.loader.exec_module(mod) + except Exception as e: + logger.error(e) + traceback.print_exc() + exit(1) + + for obj in vars(mod).values(): + if ( + isinstance(obj, type) + and issubclass(obj, EmlCvePlugin) + and obj is not EmlCvePlugin + ): + return obj + + logger.info(f"Plugin is not found in {plugin_file}") + + return None + + +def load_plugins(plugin_files: list[str]): + plugins = [] + + for plugin_file in plugin_files: + logger.debug(f"loading {plugin_file}") + obj = load_plugin(plugin_file) + if obj: + plugins.append(obj) + + return plugins + + +# Plugin file name convention: +# 1. Plugin file must be located in scripts/lib/python/cve/plugin directory +# 2. Plugin file name must be start with eml_cve_ then ends with _plugin.py +# e.g. eml_cve_myplugin_plugin.py +def find_plugins() -> list[str]: + layer_dirs = bitbake_runner.find_layers() + plugins = [] + + plugin_dir = "/scripts/lib/python/cve/plugin/" + for ld in layer_dirs: + d = ld + plugin_dir + pattern = f"{d}/eml_cve_*_plugin.py" + for plugin in glob.glob(pattern): + plugins.append(plugin) + + return plugins + + +def cve_check_worker(plugin: EmlCvePlugin, args: Any): + logger.debug(f"run {plugin.plugin_name}") + + if not args.skip_update: + ret = plugin.update_database() + if not ret: + raise Exception(f"{plugin.plugin_name}: Failed to update datebase") + + if args.update_cve_databese_only: + return {} + + return plugin.run_check() + + +def fetch_kev_data(cve_data_dir: str) -> KevInfoList: + try: + kev_json = kev_cve.fetch_kev_data(cve_data_dir) + return KevInfoList(cl.read_json(kev_json)) + except: + return KevInfoList({}) + + +def main(args: dict): + if args.verbose_output: + logger.setLevel(logging.DEBUG) + + bitbakeinfo = bitbake_runner.get_bitbake_information(args.image_name) + + dpkg_status_file = ( + bitbakeinfo["dpkg_status"] + if not args.dpkg_status_file + else args.dpkg_status_file + ) + if not os.path.exists(dpkg_status_file): + logger.error(f"File {dpkg_status_file} is not found") + exit(1) + + # Read dpkg file to get installed package information + installed_packages = PackageInfoHelper.parse_dpkg_status_file( + dpkg_status_file, target_source_package=args.target_source_package + ) + + # Check recipe's source code provenance + recipe_source_info = read_recipe_source_info(bitbakeinfo["deploy_image_dir"]) + installed_packages.merge_recipe_source_info(recipe_source_info) + + # Read cve product list + cve_product_list = CveProductList() + cve_product_list.create_product_list( + installed_packages, bitbakeinfo["emlinux_layer_dir"], args.extra_cve_product + ) + + # Read ignore list + ignore_list = create_ignore_list( + bitbakeinfo["emlinux_layer_dir"], + installed_packages, + args.debian_codename, + args.extra_cve_check_ignore, + ) + + cve_data_dir = f"{bitbakeinfo['dl_dir']}/CVE" + cl.create_directory(cve_data_dir) + + # Find and load plugins + plugin_files = find_plugins() + plugin_objs = load_plugins(plugin_files) + + # Create plugin instance + plugins = [] + for obj in plugin_objs: + o = obj(cve_data_dir, args, bitbakeinfo, installed_packages, cve_product_list) + plugins.append(o) + + check_results = [] + max_workers = min(args.threads, len(plugins)) + + # Run all plugins + with ThreadPoolExecutor(max_workers=max_workers) as ex: + futures = [ex.submit(cve_check_worker, p, args) for p in plugins] + for f in as_completed(futures): + try: + check_results.append(f.result()) + except Exception as e: + logger.error(f"error: {e}") + traceback.print_exc() + exit(1) + + if args.update_cve_databese_only: + logger.info("Updating database finished.") + exit(0) + + # Sort CVE data by plugin priority + check_results = sorted(check_results, key=lambda d: d.priority) + + # Merge CVE results + src_pkg_names = create_src_package_name_list_from_check_result(check_results) + + cve_check_merged_list = create_cve_check_merged_list(src_pkg_names, check_results) + + cve_check_merged_list.apply_ignore_list_info(ignore_list) + + # Load KEV data + kev_info_list = fetch_kev_data(cve_data_dir) + + # Create CVE report data + creator = NvdCveInfoListCreator(cve_data_dir, installed_packages, kev_info_list) + creator.create_cve_info_list(cve_check_merged_list) + + cve_info_list = creator.get_nvd_info_list() + + # Write CVE report + output_base_dir = ( + f"{bitbakeinfo['deploy_dir']}/cve/{bitbakeinfo['image_full_name']}" + ) + # Use cve_check_ng scripts own directory for testing + output_base_dir = f"{output_base_dir}/cve_check_ng" + + reporter = CveReporter(output_base_dir, bitbakeinfo["image_full_name"]) + reporter.write_report(args.output_format, cve_info_list, installed_packages) + + +def parse_options(): + parser = argparse.ArgumentParser() + plugin_opts = parser.add_argument_group("arguments for plugins") + cve_check_opts = parser.add_argument_group("arguments for cve check") + + # misc options + parser.add_argument( + "--verbose", + dest="verbose_output", + help="Enable verbose output", + default=False, + action="store_true", + ) + + # CVE check core options + cve_check_opts.add_argument( + "--debian-codename", + dest="debian_codename", + help="debian codename(Debian 12 is bookworm)", + default="bookworm", + metavar="DEBIANCODENAME", + ) + cve_check_opts.add_argument( + "--output-format", + dest="output_format", + help="output format. available formats are text, json. formats can be comma separated string(e.g. text,json)", + default="text", + metavar="OUTPUTFORMAT", + ) + cve_check_opts.add_argument( + "--cve-product", + dest="extra_cve_product", + help="User defined cve-product file", + metavar="CVEPRODUCT", + ) + cve_check_opts.add_argument( + "--cve-ignore", + dest="extra_cve_check_ignore", + help="User defined cve-check-ignore file", + metavar="CVEIGNORE", + ) + cve_check_opts.add_argument( + "--image-name", + dest="image_name", + help="EMLinux image name(e.g. emlinux-image-base, emlinux-image-weston)", + metavar="IMAGENAME", + required=True, + ) + cve_check_opts.add_argument( + "--target-source-package", + dest="target_source_package", + help="Only check given debian source package(e.g. bash, util-linux", + metavar="DEBIAN SOURCE PACKAGE NAME", + ) + cve_check_opts.add_argument( + "--dpkg-status-file", + dest="dpkg_status_file", + help="Use specific dpkg_status file instead of default", + metavar="DPKG STATUS FILE", + ) + cve_check_opts.add_argument( + "--threads", default=1, help="Number of thread for cve check" + ) + + # options for plugins + plugin_opts.add_argument( + "--nvd-api-key", + dest="nvd_api_key", + help="API key for NVD API", + metavar="NVDAPIKEY", + ) + plugin_opts.add_argument( + "--cve-db-predownload", + dest="cve_db_predownload", + action="store_true", + help="Enable CVE database predownload.URL should be defined by CVE_DB_PREDOWNLOAD_URL in conf/local.conf.", + ) + plugin_opts.add_argument( + "--update-cve-databese-only", + dest="update_cve_databese_only", + default=False, + action="store_true", + help="Do not run cve check. Update CVE database only.", + ) + plugin_opts.add_argument( + "--skip-update", + default=False, + action="store_true", + help="Skip update CVE databases", + ) + + return parser.parse_args() + + +if __name__ == "__main__": + logger.info("|------------------------------|") + logger.info("| This is experimental version |") + logger.info("|------------------------------|") + main(parse_options()) diff --git a/scripts/lib/python/bitbake_runner.py b/scripts/lib/python/bitbake_runner.py index ceac08ad..1dc1eff9 100644 --- a/scripts/lib/python/bitbake_runner.py +++ b/scripts/lib/python/bitbake_runner.py @@ -10,6 +10,22 @@ import subprocess import sys +def find_layers(): + cmd = ["bitbake-layers", "show-layers"] + process = subprocess.Popen(cmd, stdout=subprocess.PIPE, text=True) + output, errors = process.communicate() + + layers = [] + + lines = output.split("\n") + for line in lines: + tmp = [col for col in line.split(" ") if not col == ""] + if len(tmp) >= 2: + if "/repos/" in tmp[1]: + layers.append(tmp[1]) + + return layers + def get_linux_source_dir(kernel_name): cmd = ["bitbake", f"linux-{kernel_name}", "-e"] @@ -46,6 +62,7 @@ def get_bitbake_information(image): pattern_repo_isar_dir = r'\nREPO_ISAR_DIR="([^"]*)"' pattern_image_distro = r'\nDISTRO="([^"]*)"' pattern_cve_db_predownload = r'\nCVE_DB_PREDOWNLOAD_URL="([^"]*)"' + pattern_layer_emlinux = r'\nLAYERDIR_emlinux="([^"]*)"' deploy_image_dir = re.findall(pattern_deploy_image_dir, output)[0] deploy_dir = re.findall(pattern_deploy_dir, output)[0] @@ -57,6 +74,7 @@ def get_bitbake_information(image): distro_arch = re.findall(pattern_distro_arch, output)[0] repo_isar_dir = re.findall(pattern_repo_isar_dir, output)[0] image_distro = re.findall(pattern_image_distro, output)[0] + emlinux_layer_dir = re.findall(pattern_layer_emlinux, output)[0] cve_db_url = re.findall(pattern_cve_db_predownload, output) if not len(cve_db_url) == 0: @@ -67,6 +85,7 @@ def get_bitbake_information(image): dpkg_status = f"{deploy_image_dir}/{image_full_name}.dpkg_status" return { "deploy_dir": deploy_dir, + "deploy_image_dir": deploy_image_dir, "image_full_name": image_full_name, "dl_dir": dl_dir, "dpkg_status": dpkg_status, @@ -76,4 +95,5 @@ def get_bitbake_information(image): "repo_isar_dir": repo_isar_dir, "image_distro": image_distro, "cve_db_predownload": cve_db_predownload, + "emlinux_layer_dir": emlinux_layer_dir, } diff --git a/scripts/lib/python/cve/__init__.py b/scripts/lib/python/cve/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/scripts/lib/python/cve/common_libs.py b/scripts/lib/python/cve/common_libs.py new file mode 100644 index 00000000..9a058ee2 --- /dev/null +++ b/scripts/lib/python/cve/common_libs.py @@ -0,0 +1,68 @@ +import os +import json +from looseversion import LooseVersion + + +def read_json(jsonfile: str) -> dict: + with open(jsonfile, "r") as f: + return json.loads(f.read()) + + +def create_directory(target): + if not os.path.exists(target): + os.makedirs(target) + + +def check_affected( + target_version: str, + version_start: str, + operator_start: str, + version_end: str, + operator_end: str, +) -> bool: + lv_target_version = LooseVersion(target_version) + + if not (version_start is None or version_start == "-" or len(version_start) == 0): + lv_versoin_start = LooseVersion(version_start) + + if not (version_end is None or version_end == "-" or len(version_end) == 0): + lv_version_end = LooseVersion(version_end) + + vulnerable = True + if ( + operator_start == "=" and target_version == version_start + ) or version_start == "-": + vulnerable = True + else: + if operator_start: + try: + vulnerable_start = ( + operator_start == ">=" and lv_target_version >= lv_versoin_start + ) + vulnerable_start |= ( + operator_start == ">" and lv_target_version > lv_versoin_start + ) + except: + vulnerable_start = False + else: + vulnerable_start = False + + if operator_end: + try: + vulnerable_end = ( + operator_end == "<=" and lv_target_version <= lv_version_end + ) + vulnerable_end |= ( + operator_end == "<" and lv_target_version < lv_version_end + ) + except: + vulnerable_end = False + else: + vulnerable_end = False + + if operator_start and operator_end: + vulnerable = vulnerable_start and vulnerable_end + else: + vulnerable = vulnerable_start or vulnerable_end + + return vulnerable diff --git a/scripts/lib/python/cve/cve_info.py b/scripts/lib/python/cve/cve_info.py new file mode 100644 index 00000000..7f2e6df3 --- /dev/null +++ b/scripts/lib/python/cve/cve_info.py @@ -0,0 +1,87 @@ +import logging + +logger = logging.getLogger("emlinux-cve-check") + + +class CveStatus: + CVE_STATUS_PATCHED = "Patched" + CVE_STATUS_UNPATCHED = "Unpatched" + CVE_STATUS_REJECTED = "Rejected" + + +class CveCheckResult: + def __init__(self, cveid: str, src_pkg_name: str, status: str) -> None: + self.cveid = cveid + self.src_pkg_name = src_pkg_name + self.status = status + + def __repr__(self) -> str: + return f"{self.__dict__}" + + +class CveCheckResultList: + def __init__(self, plugin_name, priority) -> None: + self.plugin_name = plugin_name + self.priority = priority + self.cves = {} + + def __repr__(self): + return f"{self.__dict__}" + + def __iter__(self) -> str: + return iter(self.cves) + + def __getitem__(self, key: str) -> CveCheckResult: + if key in self.cves: + return self.cves[key] + return None + + def add_cve_info(self, src_pkg_name: str, check_result: CveCheckResult) -> None: + if not src_pkg_name in self.cves: + self.cves[src_pkg_name] = {check_result.cveid: check_result} + else: + if not check_result.cveid in self.cves[src_pkg_name]: + self.cves[src_pkg_name][check_result.cveid] = check_result + else: + # We already have this CVE information + pass + # logger.debug(f"Source:{src_pkg_name}: CVE:{check_result.cveid} is duplicated") + + def src_pkg_names(self) -> list[str]: + return sorted(self.cves.keys()) + + def cve_ids_by_src_pkg(self, src_pkg_name: str) -> list[str]: + if not src_pkg_name in self.cves: + return None + + return sorted(self.cves[src_pkg_name].keys()) + + +class CveInfo: + def __init__(self, cveid: str, src_pkg_name: str, status: str) -> None: + self.cveid = cveid + self.src_pkg_name = src_pkg_name + self.status = status + self.summary = None + self.cvssv2 = None + self.cvessv3 = None + self.vector = None + self.vector_string = None + self.more_information = None + + def __repr__(self) -> str: + return f"{self.__dict__}" + + +class CveInfoList: + def __init__(self) -> None: + self.cves = {} + + def __repr__(self): + return f"{self.__dict__}" + + def add_cve_info(self, src_pkg_name: str, cveinfo: CveInfo) -> None: + if not src_pkg_name in self.cves: + self.cves[src_pkg_name] = [cveinfo] + else: + self.cves[src_pkg_name].append(cveinfo) diff --git a/scripts/lib/python/cve/cve_product.py b/scripts/lib/python/cve/cve_product.py new file mode 100644 index 00000000..b372f147 --- /dev/null +++ b/scripts/lib/python/cve/cve_product.py @@ -0,0 +1,104 @@ +from typing import Any +from lib.python.package_info import PackageList +import yaml +import os + + +class CveProductVendorProructPair: + def __init__(self, vendor: str, product: str): + self.vendor = vendor + self.product = product + + def __repr__(self) -> str: + return f"{self.__dict__}" + + +class CveProduct: + def __init__(self, src_pkg_name: str): + self.src_pkg_name = src_pkg_name + self.pair = None + + def set_pair_list(self, pair_list: list[CveProductVendorProructPair]): + self.pair = pair_list + + def __repr__(self) -> str: + return f"{self.__dict__}" + + def __iter__(self) -> CveProductVendorProructPair: + return iter(self.pair) + + +class CveProductList: + def __init__(self): + self.cve_products = {} + + def __repr__(self) -> str: + return f"{self.__dict__}" + + def __iter__(self) -> Any: + return iter(self.cve_products) + + def __getitem__(self, key: str) -> CveProduct: + return self.cve_products[key] + + def create_product_list( + self, + installed_packages: PackageList, + emlinux_layer_dir: str, + extra_cve_product: str, + ): + self._read_cve_products_file(emlinux_layer_dir, extra_cve_product) + for src_pkg_name in installed_packages: + if not src_pkg_name in self.cve_products: + # Debian binary package is instaleld + cveproduct = CveProduct(src_pkg_name) + # Default vendor/product pair is vendor: and product: + pair = CveProductVendorProructPair(None, src_pkg_name) + cve_product = [pair] + cveproduct.set_pair_list(cve_product) + self.cve_products[src_pkg_name] = cveproduct + + def _read_cve_products_file(self, emlinux_layer_dir: str, extra_cve_product: str): + name = os.path.join( + os.path.dirname(__file__), f"{emlinux_layer_dir}/conf/cve/cve_products.yml" + ) + + data = None + with open(name, "r") as f: + data = yaml.safe_load(f) + + if extra_cve_product: + with open(extra_cve_product, "r") as f: + tmp = yaml.safe_load(f) + if tmp: + data.update(tmp) + + for pkgname in data: + pkg = data[pkgname] + tmp_product_list = [] + + if type(pkg) == list: + for d in pkg: + tmp = self._create_pair(d) + tmp_product_list.append(tmp) + + elif type(pkg) == dict: + tmp = self._create_pair(pkg) + tmp_product_list.append(tmp) + + if len(tmp_product_list) > 0: + cveproduct = CveProduct(pkgname) + cveproduct.set_pair_list(tmp_product_list) + + self.cve_products[pkgname] = cveproduct + + def _create_pair(self, data: dict) -> dict: + vendor = None + product = None + + if "product" in data: + product = data["product"] + if "vendor" in data: + vendor = data["vendor"] + + return CveProductVendorProructPair(vendor, product) diff --git a/scripts/lib/python/cve/cve_reporter.py b/scripts/lib/python/cve/cve_reporter.py new file mode 100644 index 00000000..56bbd339 --- /dev/null +++ b/scripts/lib/python/cve/cve_reporter.py @@ -0,0 +1,201 @@ +from lib.python.cve.nvd_lib import NvdCveNvdInfoList, NvdCveNvdInfo +from lib.python.package_info import PackageList + +import logging + +logger = logging.getLogger("emlinux-cve-check") +import json +import os + +CVE_REPORT_JSON_VERSION = "1" + + +class CveReporter: + def __init__(self, output_base_dir: str, image_name: str) -> None: + self.output_base_dir = output_base_dir + self.image_name = image_name + + def write_report( + self, + formats: str, + cve_info_list: NvdCveNvdInfoList, + installed_packages: PackageList, + ) -> None: + fmts = formats.split(",") + for fmt in fmts: + filenames = None + if fmt == "text": + filenames = self._write_text_report(cve_info_list) + self._create_all_in_one_text_report(filenames) + if fmt == "json": + filenames = self._write_json_report(cve_info_list, installed_packages) + self._create_all_in_one_json_report(filenames) + + def _create_dir(self, path: str) -> None: + if not os.path.exists(path): + os.makedirs(path) + + def _write_text_report(self, cve_info_list: NvdCveNvdInfoList) -> list[str]: + text_dir = f"{self.output_base_dir}/text" + self._create_dir(text_dir) + + filenames = [] + + for src_pkg_name in cve_info_list: + cve_list = cve_info_list[src_pkg_name] + filename = f"{text_dir}/{src_pkg_name}" + filenames.append(filename) + + with open(filename, "w") as f: + for ci in cve_list: + f.write(f"PACKAGE NAME: {ci.src_pkg_name}\n") + + bin_names = " ".join(ci.bin_pkg_names) + + f.write(f"BINARY PACKAGE NAME: {bin_names}\n") + f.write(f"VERSION: {ci.version}\n") + f.write(f"CVE: {ci.cveid}\n") + f.write(f"CVE STATUS: {ci.status}\n") + f.write(f"CVE SUMMARY: {ci.summary}\n") + f.write(f"CVSS v2 BASE SCORE: {ci.scorev2}\n") + f.write(f"CVSS v3 BASE SCORE: {ci.scorev3}\n") + f.write(f"VECTOR: {ci.vector}\n") + f.write(f"VECTORSTRING: {ci.vector_string}\n") + + if ci.kev == "Found": + f.write(f"KEV: Found\n") + f.write(f"KNOWN RANSOMWARE CAMPAIGN USE: {ci.kev_use}\n") + else: + f.write("KEV: Not Found\n") + + f.write(f"MORE INFORMATION: {ci.moreinfo}\n") + + f.write("\n") + + logger.info(f"Text report were written to {text_dir}") + return filenames + + def _write_json_report( + self, cve_info_list: NvdCveNvdInfoList, installed_packages: PackageList + ) -> list[str]: + json_dir = f"{self.output_base_dir}/json" + self._create_dir(json_dir) + filenames = [] + + for src_pkg_name in installed_packages: + has_cve = src_pkg_name in cve_info_list + cve_list = cve_info_list[src_pkg_name] + data = self._create_json_data_for_package( + src_pkg_name, cve_list, installed_packages, has_cve + ) + + filename = f"{json_dir}/{src_pkg_name}_cve.json" + filenames.append(filename) + + with open(filename, "w") as f: + json.dump(data, f, indent=4, sort_keys=False) + + logger.info(f"Json report were written to {json_dir}") + return filenames + + def _create_json_data_for_package( + self, + src_pkg_name: str, + cve_info: NvdCveNvdInfo, + installed_packages, + has_cve: bool, + ): + if has_cve: + cvesInRecord = "Yes" + issue = self._create_issue_data(cve_info) + version = cve_info[0].version + bin_pkg_names = cve_info[0].bin_pkg_names + else: + cvesInRecord = "No" + issue = [] + version = installed_packages.get_version(src_pkg_name) + bin_pkg_names = ( + installed_packages.get_binary_package_names_by_src_package_name( + src_pkg_name + ) + ) + + data = { + "version": CVE_REPORT_JSON_VERSION, + "package": [ + { + "name": src_pkg_name, + "binary package name": bin_pkg_names, + "version": version, + "products": [ + { + "product": src_pkg_name, + "cvesInRecord": cvesInRecord, + }, + ], + "issue": issue, + } + ], + } + + return data + + def _create_issue_data(self, cve_info: NvdCveNvdInfo) -> list[dict]: + issue = [] + + for ci in cve_info: + data = {} + + data["CVE"] = ci.cveid + data["PACKAGE NAME"] = ci.src_pkg_name + data["BINARY PACKAGE NAME"] = ci.bin_pkg_names + data["VERSION"] = ci.version + data["CVE STATUS"] = ci.status + data["CVE SUMMARY"] = ci.summary + data["CVSS v2 BASE SCORE"] = ci.scorev2 + data["CVSS v3 BASE SCORE"] = ci.scorev3 + data["VECTOR"] = ci.vector + data["VECTORSTRING"] = ci.vector_string + + if ci.kev == "Found": + data["KEV"] = "Found" + data["KNOWN RANSOMWARE CAMPAIGN USE"] = ci.kev_use + else: + data["KEV"] = "Not Found" + + data["MORE INFORMATION"] = ci.moreinfo + issue.append(data) + + return issue + + def _create_all_in_one_text_report(self, filenames: list[str]) -> None: + if filenames is None: + return + + output_file = f"{self.output_base_dir}/{self.image_name}_cve" + with open(output_file, "w") as out: + for filename in filenames: + with open(filename, "r") as f: + out.write(f.read()) + out.write("") + logger.info(f"All in one text report was written to {output_file}") + + def _create_all_in_one_json_report(self, filenames: list[str]) -> None: + if filenames is None: + return + + all_in_one_data = { + "version": "1", + "package": [], + } + + for filename in filenames: + with open(filename, "r") as f: + data = json.load(f) + all_in_one_data["package"].extend(data["package"]) + + output_file = f"{self.output_base_dir}/{self.image_name}_cve.json" + with open(output_file, "w") as f: + json.dump(all_in_one_data, f, indent=4, sort_keys=False) + + logger.info(f"All in one json report was written to {output_file}") diff --git a/scripts/lib/python/cve/kev_info.py b/scripts/lib/python/cve/kev_info.py new file mode 100644 index 00000000..5574dc5d --- /dev/null +++ b/scripts/lib/python/cve/kev_info.py @@ -0,0 +1,34 @@ +import logging + +logger = logging.getLogger("emlinux-cve-check") + + +class KevInfoList: + def __init__(self, kev_json_data) -> None: + self.kev_list = {} + self.kev_json_data = kev_json_data + + self._create_kev_list() + + def __contains__(self, value: str) -> bool: + return value in self.kev_list + + def __iter__(self) -> str: + return iter(self.kev_list) + + def __repr__(self) -> str: + return f"{self.kev_list}" + + def _create_kev_list(self): + for vul in self.kev_json_data["vulnerabilities"]: + self._add_kev_data(vul) + + def _add_kev_data(self, kev_data: dict) -> None: + cveid = kev_data["cveID"] + if not cveid in self.kev_list: + self.kev_list[cveid] = kev_data + else: + logger.info(f"CVE ID {cveid} is duplicated in the KEV data") + + def get_known_ransomware_campaign_use(self, cveid: str): + return self.kev_list[cveid]["knownRansomwareCampaignUse"] diff --git a/scripts/lib/python/cve/nvd_lib.py b/scripts/lib/python/cve/nvd_lib.py new file mode 100644 index 00000000..2d4979cf --- /dev/null +++ b/scripts/lib/python/cve/nvd_lib.py @@ -0,0 +1,244 @@ +from lib.python.cve.cve_info import CveStatus +from lib.python.package_info import PackageList +from lib.python.cve.kev_info import KevInfoList +import logging + +logger = logging.getLogger("emlinux-cve-check") + +import sqlite3 + +CVE_DATABASE_NAME = "nvd_cve_db.db" + + +class NvdCveNvdInfo: + def __init__( + self, + cveid: str, + src_pkg_name: str, + bin_pkg_names: list[str], + version: str, + summary: str, + scorev2: str, + scorev3: str, + vector: str, + vector_string: str, + status: str, + ) -> None: + self.cveid = cveid + self.src_pkg_name = src_pkg_name + self.bin_pkg_names = bin_pkg_names + self.version = version + if summary is None or len(summary) == 0: + self.summary = "" + else: + self.summary = summary.strip() + self.scorev2 = scorev2 + self.scorev3 = scorev3 + self.vector = vector + self.vector_string = vector_string + self.status = status + self.kev = "Not Found" + self.kev_use = None + self.moreinfo = f"https://nvd.nist.gov/vuln/detail/{cveid}" + + def set_kev(self, kev_use: str) -> None: + self.kev = "Found" + self.kev_use = kev_use + + def __repr__(self) -> str: + return f"{self.__dict__}" + + +class NvdCveNvdInfoList: + def __init__(self) -> None: + self.nvd_cve_nvd_info_list = {} + + def __repr__(self) -> str: + return f"{self.nvd_cve_nvd_info_list}" + + def __iter__(self) -> str: + return iter(self.nvd_cve_nvd_info_list) + + def __getitem__(self, key: str) -> NvdCveNvdInfo: + if key in self.nvd_cve_nvd_info_list: + return self.nvd_cve_nvd_info_list[key] + return None + + def add_nvd_cve_nvd_info(self, data: NvdCveNvdInfo) -> None: + if not data.src_pkg_name in self.nvd_cve_nvd_info_list: + self.nvd_cve_nvd_info_list[data.src_pkg_name] = [data] + else: + self.nvd_cve_nvd_info_list[data.src_pkg_name].append(data) + + +class NvdCveProductInfo: + def __init__( + self, + cveid: str, + src_pkg_name: str, + vendor: str, + product: str, + version_start: str, + operator_start: str, + version_end: str, + operator_end: str, + ) -> None: + self.cveid = cveid + self.src_pkg_name = src_pkg_name + self.vendor = vendor + self.product = product + self.version_start = version_start + self.operator_start = operator_start + self.version_end = version_end + self.operator_end = operator_end + + def __repr__(self) -> str: + return f"{self.__dict__}" + + +class CveCheckMergedList: + def __init__(self): + self.merged = {} + + def __repr__(self): + return f"{self.merged}" + + def __iter__(self) -> dict: + return iter(self.merged) + + def __getitem__(self, key: str) -> NvdCveProductInfo: + if key in self.merged: + return self.merged[key] + return None + + def add_data( + self, src_pkg_name: str, cveid: str, cve_data: dict, priority: int + ) -> None: + # print(f"found cve {cveid} in {cr.plugin_name}") + if not src_pkg_name in self.merged: + self.merged[src_pkg_name] = { + cveid: { + "cve_info": cve_data, + "priority": priority, + } + } + elif src_pkg_name in self.merged and not cveid in self.merged[src_pkg_name]: + # print(f"Append {cveid} to {src_pkg_name}") + self.merged[src_pkg_name][cveid] = { + "cve_info": cve_data, + "priority": priority, + } + else: + # print(f"CVE:{cveid} is already found") + if priority > self.merged[src_pkg_name][cveid]["priority"]: + # print(f"{cveid}:{src_pkg_name}: cr.priority > cve_check_merged_list[cveid].priority = {cr.priority > cve_check_merged_list[cveid]['priority']}: replace data") + self.merged[src_pkg_name][cveid]["cve_info"] = cve_data + self.merged[src_pkg_name][cveid]["priority"] = priority + + def apply_ignore_list_info(self, ignore_list: dict) -> None: + for src_pkg_name in ignore_list: + if src_pkg_name in self.merged: + for cveid in ignore_list[src_pkg_name]: + if cveid in self.merged[src_pkg_name]: + self.merged[src_pkg_name][cveid][ + "cve_info" + ].status = CveStatus.CVE_STATUS_PATCHED + + def get_cve_status(self, src_pkg_name: str, cveid: str) -> str: + return self.merged[src_pkg_name][cveid]["cve_info"].status + + +class NvdCveInfoListCreator: + def __init__( + self, + cve_data_dir: str, + package_info_list: PackageList, + kev_info_list: KevInfoList, + ) -> None: + self.db_file = f"{cve_data_dir}/{CVE_DATABASE_NAME}" + self.nvd_info_list = NvdCveNvdInfoList() + self.package_info_list = package_info_list + self.kev_info_list = kev_info_list + self.conn = None + + def get_nvd_info_list(self) -> NvdCveNvdInfoList: + return self.nvd_info_list + + def create_cve_info_list( + self, cve_check_merged_list: CveCheckMergedList + ) -> NvdCveNvdInfoList: + if self.conn is None: + self.conn = sqlite3.connect(self.db_file) + + for src_pkg_name in cve_check_merged_list: + for cveid in cve_check_merged_list[src_pkg_name]: + # status = cve_check_merged_list[src_pkg_name][cveid].status + status = cve_check_merged_list.get_cve_status(src_pkg_name, cveid) + binary_package_names = ( + self.package_info_list.get_binary_package_names_by_src_package_name( + src_pkg_name + ) + ) + debian_pkg_version = self.package_info_list.get_version(src_pkg_name) + + ni = self._get_cve_information( + cveid, + src_pkg_name, + binary_package_names, + debian_pkg_version, + status, + ) + if cveid in self.kev_info_list: + ni.set_kev( + self.kev_info_list.get_known_ransomware_campaign_use(cveid) + ) + + self.nvd_info_list.add_nvd_cve_nvd_info(ni) + self.conn.close() + self.conn = None + + def _get_cve_information( + self, + cveid: str, + src_pkg_name: str, + bin_pkg_names: list[str], + debian_pkg_version: str, + status: str, + ) -> NvdCveNvdInfo: + c = self.conn.cursor() + sql = f'SELECT VULNSTATUS, SUMMARY, SCOREV2, SCOREV3, VECTOR, VECTORSTRING FROM NVD WHERE ID="{cveid}"' + cursor = c.execute(sql) + data = cursor.fetchone() + c.close() + + vuln_status = status + summary = "" + scorev2 = "0.0" + scorev3 = "0.0" + vector = "UNKNOWN" + vector_string = "UNKNOWN" + more_info = f"https://security-tracker.debian.org/tracker/{cveid}" + + if data: + if data[0] == "Rejected": + vuln_status = CveInfo.CVE_STATUS_REJECTED + + summary = data[1] + scorev2 = data[2] + scorev3 = data[3] + vector = data[4] + vector_string = data[5] + more_info = f"https://nvd.nist.gov/vuln/detail/{cveid}" + + return NvdCveNvdInfo( + cveid, + src_pkg_name, + bin_pkg_names, + debian_pkg_version, + summary, + scorev2, + scorev3, + vector, + vector_string, + vuln_status, + ) diff --git a/scripts/lib/python/cve/plugin/__init__.py b/scripts/lib/python/cve/plugin/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/scripts/lib/python/cve/plugin/eml_cve_nvd_plugin.py b/scripts/lib/python/cve/plugin/eml_cve_nvd_plugin.py new file mode 100644 index 00000000..bf275f01 --- /dev/null +++ b/scripts/lib/python/cve/plugin/eml_cve_nvd_plugin.py @@ -0,0 +1,487 @@ +from typing import Any, Tuple +from lib.python.cve.plugin.eml_cve_plugin_base import EmlCvePlugin +from lib.python.cve.cve_product import CveProduct, CveProductList +from lib.python.cve.cve_info import CveStatus, CveCheckResult, CveCheckResultList +from lib.python.package_info import PackageList +import lib.python.cve.common_libs as cl +import lib.python.cve.nvd_lib as nvd_lib + +import logging + +logger = logging.getLogger("emlinux-cve-check") + +import sqlite3 +import datetime +import urllib.request +import urllib.parse +import gzip +import time +import json +import os +import logging +import errno + +CVE_DB_UPDATE_INTERVAL = 86400 + +NVDCVE_URL = "https://services.nvd.nist.gov/rest/json/cves/2.0" + + +class EmlNVDPlugin(EmlCvePlugin): + def __init__( + self, + cve_data_dir: str, + args: Any, + bitbakeinfo: Any, + installed_packages: PackageList, + cve_products: CveProductList, + ): + super().__init__( + "EmlNVDPlugin", + 1, + cve_data_dir, + args, + bitbakeinfo, + installed_packages, + cve_products, + ) + + self.predownload_url = self.bitbakeinfo["cve_db_predownload"] + self.predownload = self.args.cve_db_predownload + + self.db_file = f"{self.cve_data_dir}/{nvd_lib.CVE_DATABASE_NAME}" + self.nvd_api_key = args.nvd_api_key + + def update_database(self) -> bool: + return self._update_nvd_db() + + def run_check(self) -> CveCheckResultList: + logger.debug(f"{self.plugin_name}: run-check start") + if not os.path.exists(self.db_file): + raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), self.db_file) + self._collect_cves_from_installed_packages() + logger.debug(f"{self.plugin_name}: run-check finish") + return self.cve_check_result_list + + def _collect_cves_from_installed_packages(self): + conn = sqlite3.connect(self.db_file) + + for src_pkg_name in self.installed_packages: + # logger.debug(f"Check source package: {src_pkg_name}") + cve_product = self.cve_products[src_pkg_name] + pi = self.installed_packages[src_pkg_name] + version = self.installed_packages.get_version(src_pkg_name) + self._check_cves(conn, src_pkg_name, version, cve_product) + + conn.close() + + def _check_cves( + self, + conn: sqlite3.Connection, + src_pkg_name: str, + version: str, + cve_product: CveProduct, + ) -> None: + for cp in cve_product: + vendor = cp.vendor + product = cp.product + + if product is None: + continue + + if vendor is None: + vendor = "%" + + cve_cursor = conn.execute( + "SELECT DISTINCT ID FROM PRODUCTS WHERE PRODUCT IS ? AND VENDOR LIKE ?", + (product, vendor), + ) + for cverow in cve_cursor: + cveid = cverow[0] + vulnerable = False + product_cursor = conn.execute( + "SELECT VERSION_START, OPERATOR_START, VERSION_END, OPERATOR_START FROM PRODUCTS WHERE ID IS ? AND PRODUCT IS ? AND VENDOR LIKE ?", + (cveid, product, vendor), + ) + for row in product_cursor: + (version_start, operator_start, version_end, operator_end) = row + + vuln_status = CveStatus.CVE_STATUS_PATCHED + vulnerable = cl.check_affected( + version, + version_start, + operator_start, + version_end, + operator_end, + ) + if vulnerable: + # logger.debug(f"{src_pkg_name}:{cve}: vulnerable = {vulnerable}") + vuln_status = CveStatus.CVE_STATUS_UNPATCHED + + ci = CveCheckResult(cveid, src_pkg_name, vuln_status) + self.cve_check_result_list.add_cve_info(src_pkg_name, ci) + + def _update_nvd_db(self) -> bool: + result = False + + conn = sqlite3.connect(self.db_file) + logger.debug(f"Initialize nvd cve database {self.db_file}") + self._initialize_nvd_cve_db(conn) + + skip_db_update, last_modified = self._check_skip_db_update(conn) + + if not skip_db_update and self.predownload: + # predownload database file is old, download latest file + conn.close() + logger.info("Predownload CVE database file.") + if not self._predownload_db(self.predownload_url): + return result + + conn = sqlite3.connect(self.db_file) + # re-check last modified date + skip_db_update, last_modified = self._check_skip_db_update(conn) + + if not skip_db_update: + logger.info("Update NVD CVE database") + if self._fetch_all_cves(conn, last_modified): + logger.info("Update last modified date") + self._update_last_modified_date(conn) + conn.commit() + result = True + else: + logger.info(f"Last database update is in 1day so skip NVD database update") + result = True + + conn.close() + return result + + def _initialize_nvd_cve_db(self, conn: sqlite3.Connection) -> None: + with conn: + c = conn.cursor() + + c.execute( + "CREATE TABLE IF NOT EXISTS META (ID NUMBER UNIQUE, LASTMODIFIED TEXT)" + ) + + c.execute( + "CREATE TABLE IF NOT EXISTS NVD (ID TEXT UNIQUE, VULNSTATUS TEXT, SUMMARY TEXT, SCOREV2 TEXT, \ + SCOREV3 TEXT, MODIFIED INTEGER, VECTOR TEXT, VECTORSTRING TEXT)" + ) + + c.execute( + "CREATE TABLE IF NOT EXISTS PRODUCTS (ID TEXT, \ + VENDOR TEXT, PRODUCT TEXT, VERSION_START TEXT, OPERATOR_START TEXT, \ + VERSION_END TEXT, OPERATOR_END TEXT)" + ) + + c.execute("CREATE INDEX IF NOT EXISTS PRODUCT_ID_IDX on PRODUCTS(ID);") + + c.close() + + def _check_skip_db_update(self, conn: sqlite3.Connection) -> Tuple[bool, str]: + skip_db_update = False + last_modified = self._get_last_modified_date(conn) + if last_modified: + d1 = datetime.datetime.fromisoformat(datetime.datetime.now().isoformat()) + d2 = datetime.datetime.fromisoformat(last_modified) + + date_delta = d1 - d2 + if date_delta.total_seconds() < CVE_DB_UPDATE_INTERVAL: + skip_db_update = True + else: + # Database is too old so that fetch all data + if date_delta.days > 120: + last_modified = None + + return skip_db_update, last_modified + + def _get_last_modified_date(self, conn: sqlite3.Connection) -> str: + with conn: + c = conn.cursor() + + cursor = c.execute("SELECT LASTMODIFIED from META") + + last = cursor.fetchone() + c.close() + + if last is None: + return None + + return last[0] + + def _predownload_db(self, predownload_url: str) -> bool: + logger.info(f"Download CVE database file from {predownload_url}.") + + request = urllib.request.Request(predownload_url) + for attempt in range(5): + try: + r = urllib.request.urlopen(request) + + if r.headers["content-encoding"] == "gzip": + buf = r.read() + raw_data = gzip.decompress(buf) + else: + raw_data = r.read() + + r.close() + except Exception as e: + logger.debug(f"CVE databese download: received error ({e}), retrying") + time.sleep(6) + pass + else: + with open(self.db_file, "wb") as f: + f.write(raw_data) + logger.info(f"Download CVE database file was succeeded.") + + return True + else: + # We failed at all attempts + return False + + def _parse_node_and_insert(self, conn, node, cveId): + def _cpe_generator(): + for cpe in node.get("cpeMatch", ()): + if not cpe["vulnerable"]: + return + cpe23 = cpe.get("criteria") + if not cpe23: + return + cpe23 = cpe23.split(":") + if len(cpe23) < 6: + return + vendor = cpe23[3] + product = cpe23[4] + version = cpe23[5] + + if cpe23[6] == "*" or cpe23[6] == "-": + version_suffix = "" + else: + version_suffix = "_" + cpe23[6] + + if version != "*" and version != "-": + # Version is defined, this is a '=' match + yield [ + cveId, + vendor, + product, + version + version_suffix, + "=", + "", + "", + ] + elif version == "-": + # no version information is available + yield [cveId, vendor, product, version, "", "", ""] + else: + # Parse start version, end version and operators + op_start = "" + op_end = "" + v_start = "" + v_end = "" + + if "versionStartIncluding" in cpe: + op_start = ">=" + v_start = cpe["versionStartIncluding"] + + if "versionStartExcluding" in cpe: + op_start = ">" + v_start = cpe["versionStartExcluding"] + + if "versionEndIncluding" in cpe: + op_end = "<=" + v_end = cpe["versionEndIncluding"] + + if "versionEndExcluding" in cpe: + op_end = "<" + v_end = cpe["versionEndExcluding"] + + if op_start or op_end or v_start or v_end: + yield [cveId, vendor, product, v_start, op_start, v_end, op_end] + else: + # This is no version information, expressed differently. + # Save processing by representing as -. + yield [cveId, vendor, product, "-", "", "", ""] + + conn.executemany( + "insert into PRODUCTS values (?, ?, ?, ?, ?, ?, ?)", _cpe_generator() + ).close() + + def _update_db(self, conn, elt): + """ + Update a single entry in the on-disk database + """ + + accessVector = None + vectorString = None + cveId = elt["cve"]["id"] + logger.debug(f"Processing CVE {cveId}") + + if "vulnStatus" in elt["cve"]: + vulnStatus = elt["cve"]["vulnStatus"] + else: + vulnStatus = "" + + cveDesc = "" + for desc in elt["cve"]["descriptions"]: + if desc["lang"] == "en": + cveDesc = desc["value"] + date = elt["cve"]["lastModified"] + try: + accessVector = elt["cve"]["metrics"]["cvssMetricV2"][0]["cvssData"][ + "accessVector" + ] + vectorString = elt["cve"]["metrics"]["cvssMetricV2"][0]["cvssData"][ + "vectorString" + ] + cvssv2 = elt["cve"]["metrics"]["cvssMetricV2"][0]["cvssData"]["baseScore"] + except KeyError: + cvssv2 = 0.0 + cvssv3 = None + try: + accessVector = ( + accessVector + or elt["cve"]["metrics"]["cvssMetricV30"][0]["cvssData"]["attackVector"] + ) + vectorString = ( + vectorString + or elt["cve"]["metrics"]["cvssMetricV30"][0]["cvssData"]["vectorString"] + ) + cvssv3 = elt["cve"]["metrics"]["cvssMetricV30"][0]["cvssData"]["baseScore"] + except KeyError: + pass + try: + accessVector = ( + accessVector + or elt["cve"]["metrics"]["cvssMetricV31"][0]["cvssData"]["attackVector"] + ) + vectorString = ( + vectorString + or elt["cve"]["metrics"]["cvssMetricV31"][0]["cvssData"]["vectorString"] + ) + cvssv3 = ( + cvssv3 + or elt["cve"]["metrics"]["cvssMetricV31"][0]["cvssData"]["baseScore"] + ) + except KeyError: + pass + accessVector = accessVector or "UNKNOWN" + vectorString = vectorString or "UNKNOWN" + cvssv3 = cvssv3 or 0.0 + + conn.execute( + "insert or replace into NVD values (?, ?, ?, ?, ?, ?, ?, ?)", + [ + cveId.strip(), + vulnStatus.strip(), + cveDesc.strip(), + cvssv2, + cvssv3, + date.strip(), + accessVector.strip(), + vectorString.strip(), + ], + ).close() + + try: + # Remove any pre-existing CVE configuration. Even for partial database + # update, those will be repopulated. This ensures that old + # configuration is not kept for an updated CVE. + conn.execute("delete from PRODUCTS where ID = ?", [cveId]).close() + for config in elt["cve"]["configurations"]: + # This is suboptimal as it doesn't handle AND/OR and negate, but is better than nothing + for node in config["nodes"]: + self._parse_node_and_insert(conn, node, cveId) + except KeyError: + logger.debug("CVE %s has no configurations" % cveId) + + def _nvd_request_next(self, url: str, request_args: Any) -> str: + """ + Request next part of the NVD dabase + """ + + request = urllib.request.Request( + url + "?" + urllib.parse.urlencode(request_args) + ) + if self.nvd_api_key: + request.add_header("apiKey", self.nvd_api_key) + logger.debug(f"Requesting {request.full_url}") + + for attempt in range(5): + try: + r = urllib.request.urlopen(request) + + if r.headers["content-encoding"] == "gzip": + buf = r.read() + raw_data = gzip.decompress(buf) + else: + raw_data = r.read().decode("utf-8") + + r.close() + except Exception as e: + logger.debug(f"CVE database: received error ({e}), retrying") + time.sleep(6) + pass + else: + return raw_data + else: + # We failed at all attempts + return None + + def _fetch_all_cves(self, conn: sqlite3.Connection, last_modified: str) -> bool: + index = 0 + url = NVDCVE_URL + + req_args = {} + + if last_modified is not None: + req_args["lastModStartDate"] = last_modified + req_args["lastModEndDate"] = datetime.datetime.now().isoformat() + + # Recommended by NVD + sleep_time = 6 + if self.nvd_api_key: + sleep_time = 2 + + with open(self.db_file, "a") as cve_f: + while True: + logger.debug("Updating entries") + + req_args["startIndex"] = index + + raw_data = self._nvd_request_next(url, req_args) + if raw_data is None: + return False + + data = json.loads(raw_data) + + index = data["startIndex"] + total = data["totalResults"] + per_page = data["resultsPerPage"] + logger.debug(f"Got {per_page} entries") + for cve in data["vulnerabilities"]: + self._update_db(conn, cve) + + index += per_page + if index >= total: + break + + time.sleep(sleep_time) + + return True + + def _update_last_modified_date(self, conn: sqlite3.Connection) -> None: + d = datetime.datetime.now().isoformat() + + with conn: + c = conn.cursor() + + cursor = c.execute("SELECT LASTMODIFIED from META where ID=1") + last = cursor.fetchone() + + if last is None: + sql = f"INSERT INTO META VALUES (1, '{d}')" + else: + sql = f"UPDATE META set LASTMODIFIED='{d}' where ID=1" + + c.execute(sql) + + c.close() diff --git a/scripts/lib/python/cve/plugin/eml_cve_plugin_base.py b/scripts/lib/python/cve/plugin/eml_cve_plugin_base.py new file mode 100644 index 00000000..7ec0ca8f --- /dev/null +++ b/scripts/lib/python/cve/plugin/eml_cve_plugin_base.py @@ -0,0 +1,33 @@ +from typing import Any +from lib.python.cve.cve_product import CveProductList +from lib.python.cve.cve_info import CveCheckResultList +from lib.python.package_info import PackageList + + +class EmlCvePlugin: + def __init__( + self, + plugin_name: str, + priority: int, + cve_data_dir: str, + args: Any, + bitbakeinfo: Any, + installed_packages: PackageList, + cve_products: CveProductList, + ): + self.plugin_name = plugin_name + self.plugin_priority = priority + self.cve_data_dir = cve_data_dir + self.args = args + self.bitbakeinfo = bitbakeinfo + self.installed_packages = installed_packages + self.cve_check_result_list = CveCheckResultList( + self.plugin_name, self.plugin_priority + ) + self.cve_products = cve_products + + def update_database(self) -> bool: + raise NotImplementedError("It must be implemented in your plugin module") + + def run_check(self) -> CveCheckResultList: + raise NotImplementedError("It must be implemented in your plugin module") diff --git a/scripts/lib/python/package_info.py b/scripts/lib/python/package_info.py new file mode 100644 index 00000000..f97a1912 --- /dev/null +++ b/scripts/lib/python/package_info.py @@ -0,0 +1,146 @@ +from typing import Any +import json +import debian.debian_support + +import re + +SOURCE_VERSION_PATTERN = re.compile( + r"\s*(?P[^\s(]+)(?:\s*\(\s*(?P[^)]+?)\s*\))?\s*$" +) + + +class PackageInfo: + def __init__(self, binary_pkg_name, src_pkg_name, version): + self.binary_pkg_name = binary_pkg_name + self.source_from = "debian" # default is debian package + + self.version = debian.debian_support.Version(version) + self.upstream_version = self.version.upstream_version + + if src_pkg_name is None: + # If Source line is not in dpkg_status file, + # source package name and version is same as binary package name and version. + self.src_pkg_name = self.binary_pkg_name + self.src_pkg_version = self.version + else: + # If binary package was uploaded by binNMU, source version and binary package + # version is different. Source package version is in Source line. + # So, we need track both versions + """ + Source: bash (5.2.15-2) + Version: 5.2.15-2+b8 + + Source: bzip2 (1.0.8-5) + Version: 1.0.8-5+b1 + """ + m = SOURCE_VERSION_PATTERN.match(src_pkg_name) + tmp_src_pkg_name = m.group("pkg") + tmp_src_pkg_ver = m.group("ver") + + self.src_pkg_name = tmp_src_pkg_name + + if not tmp_src_pkg_ver: + self.src_pkg_version = self.version + else: + self.src_pkg_version = debian.debian_support.Version(tmp_src_pkg_ver) + + def __repr__(self) -> str: + return f"{self.__dict__}" + + +class PackageList: + def __init__(self) -> None: + self.packages = {} + + def __repr__(self) -> str: + return f"{self.__dict__}" + + def __iter__(self) -> str: + return iter(self.packages) + + def __getitem__(self, key: str) -> PackageInfo: + return self.packages[key] + + def add_package(self, pkginfo: PackageInfo) -> None: + name = pkginfo.src_pkg_name + if not name in self.packages: + self.packages[name] = [pkginfo] + else: + self.packages[name].append(pkginfo) + + def merge_recipe_source_info(self, recipe_source_info: Any) -> None: + for src_name in recipe_source_info: + recipe = recipe_source_info[src_name] + if src_name in self.packages: + # print(f"merge source info {src_name}") + for pkg in self.packages[src_name]: + pkg.source_from = recipe["source_from"] + pkg.src_pkg_name = recipe["source_package_name"] + + def get_binary_package_names_by_src_package_name( + self, src_pkg_name: str + ) -> list[str]: + ret = [] + for pkg in self.packages.get(src_pkg_name): + ret.append(pkg.binary_pkg_name) + return ret + + def get_upstream_version(self, src_pkg_name: str) -> str: + if not src_pkg_name in self.packages: + return None + + return str(self.packages[src_pkg_name][0].upstream_version) + + def get_version(self, src_pkg_name: str) -> str: + if not src_pkg_name in self.packages: + return None + + return str(self.packages[src_pkg_name][0].version) + + +class PackageInfoHelper: + # staticmethod + def parse_dpkg_status_file( + filepath: str, target_source_package: str = None + ) -> PackageList: + results = PackageList() + + with open(filepath) as f: + # split by package data + blocks = f.read().split("\n\n") + + for block in blocks: + binary_pkg_name = None + src_pkg_name = None + actual_src_pkg_name = None + version = None + + lines = block.split("\n") + if len(lines) < 2: + continue + + for line in lines: + if line.startswith("Package:"): + binary_pkg_name = line.split(":")[1].strip() + elif line.startswith("Source:"): + # If binary package is rebuilt by binNMU, source package line + # contains source package version. + # bash (5.2.15-2) + # This text will be normalized in PackageInfo's constroctor. + src_pkg_name = line.split(":")[1].strip() + actual_src_pkg_name = src_pkg_name.split(" ")[0].strip() + elif line.startswith("Version"): + # Version contains ":" in it(e.g. 5.36.0-7+deb12u3") + version = ":".join(line.split(":")[1:]).strip() + + pkginfo = None + if ( + target_source_package is None + or target_source_package == actual_src_pkg_name + ): + pkginfo = PackageInfo(binary_pkg_name, actual_src_pkg_name, version) + + if pkginfo: + results.add_package(pkginfo) + + return results