diff --git a/docs/Makefile b/docs/Makefile index 09cc0a4a..987aea33 100644 --- a/docs/Makefile +++ b/docs/Makefile @@ -11,7 +11,10 @@ BUILDDIR = build help: @$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) -.PHONY: help Makefile +.PHONY: help Makefile livehtml + +livehtml: + sphinx-autobuild "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) # Catch-all target: route all unknown targets to Sphinx using the new # "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS). diff --git a/docs/api/http.md b/docs/api/http.md new file mode 100644 index 00000000..c6b2a3a5 --- /dev/null +++ b/docs/api/http.md @@ -0,0 +1,16 @@ +(http)= + +# HTTP APIs + +```{eval-rst} +.. automodule:: gvm.protocols.http +``` + +The following modules are provided: + +```{eval-rst} +.. toctree:: + :maxdepth: 1 + + openvasdv1 +``` diff --git a/docs/api/openvasdv1.md b/docs/api/openvasdv1.md new file mode 100644 index 00000000..948aa0e2 --- /dev/null +++ b/docs/api/openvasdv1.md @@ -0,0 +1,18 @@ +(openvasdv1)= + +# openvasd v1 + +```{eval-rst} +.. automodule:: gvm.protocols.http.openvasd + :members: +``` + +```{toctree} +:hidden: + +openvasdv1/health +openvasdv1/metadata +openvasdv1/notus +openvasdv1/scans +openvasdv1/vts +``` diff --git a/docs/api/openvasdv1/health.md b/docs/api/openvasdv1/health.md new file mode 100644 index 00000000..26ba8582 --- /dev/null +++ b/docs/api/openvasdv1/health.md @@ -0,0 +1,6 @@ +# Health API + +```{eval-rst} +.. automodule:: gvm.protocols.http.openvasd._health + :members: +``` diff --git a/docs/api/openvasdv1/metadata.md b/docs/api/openvasdv1/metadata.md new file mode 100644 index 00000000..5a70a016 --- /dev/null +++ b/docs/api/openvasdv1/metadata.md @@ -0,0 +1,6 @@ +# Metadata API + +```{eval-rst} +.. automodule:: gvm.protocols.http.openvasd._metadata + :members: +``` diff --git a/docs/api/openvasdv1/notus.md b/docs/api/openvasdv1/notus.md new file mode 100644 index 00000000..4ba959cd --- /dev/null +++ b/docs/api/openvasdv1/notus.md @@ -0,0 +1,6 @@ +# Notus API + +```{eval-rst} +.. automodule:: gvm.protocols.http.openvasd._notus + :members: +``` diff --git a/docs/api/openvasdv1/scans.md b/docs/api/openvasdv1/scans.md new file mode 100644 index 00000000..474bc3f6 --- /dev/null +++ b/docs/api/openvasdv1/scans.md @@ -0,0 +1,6 @@ +# Scans API + +```{eval-rst} +.. automodule:: gvm.protocols.http.openvasd._scans + :members: +``` diff --git a/docs/api/openvasdv1/vts.md b/docs/api/openvasdv1/vts.md new file mode 100644 index 00000000..e7479297 --- /dev/null +++ b/docs/api/openvasdv1/vts.md @@ -0,0 +1,6 @@ +# VTs API + +```{eval-rst} +.. automodule:: gvm.protocols.http.openvasd._vts + :members: +``` diff --git a/docs/api/protocols.md b/docs/api/protocols.md index 587379e2..9662d09a 100644 --- a/docs/api/protocols.md +++ b/docs/api/protocols.md @@ -11,6 +11,7 @@ gmp ospv1 +http ``` ## Dynamic diff --git a/gvm/protocols/http/__init__.py b/gvm/protocols/http/__init__.py new file mode 100644 index 00000000..7facd4c4 --- /dev/null +++ b/gvm/protocols/http/__init__.py @@ -0,0 +1,9 @@ +# SPDX-FileCopyrightText: 2025 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later + +""" +Package for supported Greenbone HTTP APIs. + +Currently only `openvasd version 1 `_ is supported. +""" diff --git a/gvm/protocols/http/openvasd/__init__.py b/gvm/protocols/http/openvasd/__init__.py new file mode 100644 index 00000000..ac92d345 --- /dev/null +++ b/gvm/protocols/http/openvasd/__init__.py @@ -0,0 +1,18 @@ +# SPDX-FileCopyrightText: 2025 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later + +""" +High-level API interface for interacting with openvasd HTTP services via +logical modules (health, metadata, scans, etc.). + +Usage: + +.. code-block:: python + + from gvm.protocols.http.openvasd import OpenvasdHttpAPIv1 +""" + +from ._openvasd1 import OpenvasdHttpAPIv1 + +__all__ = ["OpenvasdHttpAPIv1"] diff --git a/gvm/protocols/http/openvasd/_api.py b/gvm/protocols/http/openvasd/_api.py new file mode 100644 index 00000000..22aea0b2 --- /dev/null +++ b/gvm/protocols/http/openvasd/_api.py @@ -0,0 +1,23 @@ +# SPDX-FileCopyrightText: 2025 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later + + +import httpx + + +class OpenvasdAPI: + def __init__( + self, client: httpx.Client, *, suppress_exceptions: bool = False + ): + """ + Initialize the OpenvasdAPI entry point. + + Args: + client: An initialized `httpx.Client` configured for communicating + with the openvasd server. + suppress_exceptions: If True, suppress exceptions and return structured error responses. + Default is False, which means exceptions will be raised. + """ + self._client = client + self._suppress_exceptions = suppress_exceptions diff --git a/gvm/protocols/http/openvasd/_client.py b/gvm/protocols/http/openvasd/_client.py new file mode 100644 index 00000000..c82a5a09 --- /dev/null +++ b/gvm/protocols/http/openvasd/_client.py @@ -0,0 +1,80 @@ +# SPDX-FileCopyrightText: 2025 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later + +""" +http client for initializing a connection to the openvasd HTTP API using optional mTLS authentication. +""" + +import ssl +from os import PathLike +from typing import Optional, Tuple, Union + +from httpx import Client + +StrOrPathLike = Union[str, PathLike[str]] + + +def create_openvasd_http_client( + host_name: str, + *, + api_key: Optional[str] = None, + server_ca_path: Optional[StrOrPathLike] = None, + client_cert_paths: Optional[ + Union[StrOrPathLike, Tuple[StrOrPathLike, StrOrPathLike]] + ] = None, + port: int = 3000, +) -> Client: + """ + Create a `httpx.Client` configured for mTLS-secured or API KEY access + to an openvasd HTTP API instance. + + Args: + host_name: Hostname or IP of the OpenVASD server (e.g., "localhost"). + api_key: Optional API key used for authentication via HTTP headers. + server_ca_path: Path to the server's CA certificate (for verifying the server). + client_cert_paths: Path to the client certificate (str) or a tuple of + (cert_path, key_path) for mTLS authentication. + port: The port to connect to (default: 3000). + + Behavior: + - If both `server_ca_path` and `client_cert_paths` are set, an mTLS connection + is established using an SSLContext. + - If not, `verify` is set to False (insecure), and HTTP is used instead of HTTPS. + HTTP connection needs api_key for authorization. + """ + headers = {} + + context: Optional[ssl.SSLContext] = None + + # Prepare mTLS SSL context if needed + if client_cert_paths and server_ca_path: + context = ssl.create_default_context( + ssl.Purpose.SERVER_AUTH, cafile=server_ca_path + ) + if isinstance(client_cert_paths, tuple): + context.load_cert_chain( + certfile=client_cert_paths[0], keyfile=client_cert_paths[1] + ) + else: + context.load_cert_chain(certfile=client_cert_paths) + + context.check_hostname = False + context.verify_mode = ssl.CERT_REQUIRED + + # Set verify based on context presence + verify: Union[bool, ssl.SSLContext] = context if context else False + + if api_key: + headers["X-API-KEY"] = api_key + + protocol = "https" if context else "http" + base_url = f"{protocol}://{host_name}:{port}" + + return Client( + base_url=base_url, + headers=headers, + verify=verify, + http2=True, + timeout=10.0, + ) diff --git a/gvm/protocols/http/openvasd/_health.py b/gvm/protocols/http/openvasd/_health.py new file mode 100644 index 00000000..8701fa6c --- /dev/null +++ b/gvm/protocols/http/openvasd/_health.py @@ -0,0 +1,87 @@ +# SPDX-FileCopyrightText: 2025 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later + +""" +API wrapper for accessing the /health endpoints of the openvasd HTTP API. +""" + +import httpx + +from ._api import OpenvasdAPI + + +class HealthAPI(OpenvasdAPI): + """ + Provides access to the openvasd /health endpoints, which expose the + operational state of the scanner. + + All methods return the HTTP status code of the response and raise an exception + if the server returns an error response (4xx or 5xx). + """ + + def get_alive(self) -> int: + """ + Check if the scanner process is alive. + + Returns: + HTTP status code (e.g., 200 if alive). + + Raises: + httpx.HTTPStatusError: If the server response indicates failure and + exceptions are not suppressed. + + See: GET /health/alive in the openvasd API documentation. + """ + try: + response = self._client.get("/health/alive") + response.raise_for_status() + return response.status_code + except httpx.HTTPStatusError as e: + if self._suppress_exceptions: + return e.response.status_code + raise + + def get_ready(self) -> int: + """ + Check if the scanner is ready to accept requests (e.g., feed loaded). + + Returns: + HTTP status code (e.g., 200 if ready). + + Raises: + httpx.HTTPStatusError: If the server response indicates failure and + exceptions are not suppressed. + + See: GET /health/ready in the openvasd API documentation. + """ + try: + response = self._client.get("/health/ready") + response.raise_for_status() + return response.status_code + except httpx.HTTPStatusError as e: + if self._suppress_exceptions: + return e.response.status_code + raise + + def get_started(self) -> int: + """ + Check if the scanner has fully started. + + Returns: + HTTP status code (e.g., 200 if started). + + Raises: + httpx.HTTPStatusError: If the server response indicates failure and + exceptions are not suppressed. + + See: GET /health/started in the openvasd API documentation. + """ + try: + response = self._client.get("/health/started") + response.raise_for_status() + return response.status_code + except httpx.HTTPStatusError as e: + if self._suppress_exceptions: + return e.response.status_code + raise diff --git a/gvm/protocols/http/openvasd/_metadata.py b/gvm/protocols/http/openvasd/_metadata.py new file mode 100644 index 00000000..d701c375 --- /dev/null +++ b/gvm/protocols/http/openvasd/_metadata.py @@ -0,0 +1,113 @@ +# SPDX-FileCopyrightText: 2025 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later + +""" +API wrapper for retrieving metadata from the openvasd HTTP API using HEAD requests. +""" + +from dataclasses import dataclass +from typing import Union + +import httpx + +from ._api import OpenvasdAPI + + +@dataclass +class Metadata: + """ + Represents metadata returned by the openvasd API. + + Attributes: + api_version: Comma separated list of available API versions + feed_version: Version of the feed. + authentication: Supported authentication methods + """ + + api_version: str + feed_version: str + authentication: str + + +@dataclass +class MetadataError: + """ + Represents an error response from the metadata API. + + Attributes: + error: Error message. + status_code: HTTP status code of the error response. + """ + + error: str + status_code: int + + +class MetadataAPI(OpenvasdAPI): + """ + Provides access to metadata endpoints exposed by the openvasd server + using lightweight HTTP HEAD requests. + + These endpoints return useful information in HTTP headers such as: + - API version + - Feed version + - Authentication type + + If the scanner is protected and the request is unauthorized, a 401 response + is handled gracefully. + """ + + def get(self) -> Union[Metadata, MetadataError]: + """ + Perform a HEAD request to `/` to retrieve top-level API metadata. + + Returns: + A Metadata instance or MetadataError if exceptions are suppressed and an error occurs. + + Raises: + httpx.HTTPStatusError: For non-401 HTTP errors if exceptions are not suppressed. + + See: HEAD / in the openvasd API documentation. + """ + try: + response = self._client.head("/") + response.raise_for_status() + return Metadata( + api_version=response.headers.get("api-version"), + feed_version=response.headers.get("feed-version"), + authentication=response.headers.get("authentication"), + ) + except httpx.HTTPStatusError as e: + if self._suppress_exceptions: + return MetadataError( + error=str(e), status_code=e.response.status_code + ) + raise + + def get_scans(self) -> Union[Metadata, MetadataError]: + """ + Perform a HEAD request to `/scans` to retrieve scan endpoint metadata. + + Returns: + A Metadata instance or MetadataError if exceptions are suppressed and an error occurs. + + Raises: + httpx.HTTPStatusError: For non-401 HTTP errors if exceptions are not suppressed. + + See: HEAD /scans in the openvasd API documentation. + """ + try: + response = self._client.head("/scans") + response.raise_for_status() + return Metadata( + api_version=response.headers.get("api-version"), + feed_version=response.headers.get("feed-version"), + authentication=response.headers.get("authentication"), + ) + except httpx.HTTPStatusError as e: + if self._suppress_exceptions: + return MetadataError( + error=str(e), status_code=e.response.status_code + ) + raise diff --git a/gvm/protocols/http/openvasd/_notus.py b/gvm/protocols/http/openvasd/_notus.py new file mode 100644 index 00000000..1af06750 --- /dev/null +++ b/gvm/protocols/http/openvasd/_notus.py @@ -0,0 +1,69 @@ +# SPDX-FileCopyrightText: 2025 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later + +""" +API wrapper for interacting with the Notus component of the openvasd HTTP API. +""" + +import urllib + +import httpx + +from ._api import OpenvasdAPI + + +class NotusAPI(OpenvasdAPI): + """ + Provides access to the Notus-related endpoints of the openvasd HTTP API. + + This includes retrieving supported operating systems and triggering + package-based vulnerability scans for a specific OS. + """ + + def get_os_list(self) -> httpx.Response: + """ + Retrieve the list of supported operating systems from the Notus service. + + Returns: + The full `httpx.Response` on success. + + See: GET /notus in the openvasd API documentation. + """ + try: + response = self._client.get("/notus") + response.raise_for_status() + return response + except httpx.HTTPStatusError as e: + if self._suppress_exceptions: + return e.response + raise + + def run_scan( + self, + os: str, + package_list: list[str], + ) -> httpx.Response: + """ + Trigger a Notus scan for a given OS and list of packages. + + Args: + os: Operating system name (e.g., "debian", "alpine"). + package_list: List of package names to evaluate for vulnerabilities. + + Returns: + The full `httpx.Response` on success. + + See: POST /notus/{os} in the openvasd API documentation. + """ + quoted_os = urllib.parse.quote(os) + try: + response = self._client.post( + f"/notus/{quoted_os}", json=package_list + ) + response.raise_for_status() + return response + except httpx.HTTPStatusError as e: + if self._suppress_exceptions: + return e.response + raise diff --git a/gvm/protocols/http/openvasd/_openvasd1.py b/gvm/protocols/http/openvasd/_openvasd1.py new file mode 100644 index 00000000..1793e5d3 --- /dev/null +++ b/gvm/protocols/http/openvasd/_openvasd1.py @@ -0,0 +1,128 @@ +# SPDX-FileCopyrightText: 2025 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later + +""" +openvasd HTTP API version 1 + +High-level API interface for interacting with openvasd HTTP services via +logical modules (health, metadata, scans, etc.). +""" + +from typing import Optional, Tuple, Union + +from ._client import StrOrPathLike, create_openvasd_http_client +from ._health import HealthAPI +from ._metadata import MetadataAPI +from ._notus import NotusAPI +from ._scans import ScansAPI +from ._vts import VtsAPI + + +class OpenvasdHttpAPIv1: + """ + High-level interface for accessing openvasd HTTP API v1 endpoints. + + This class encapsulates modular sub-APIs (health, metadata, notus, scans, vts) + and wires them to a shared `httpx.Client` configured for secure access. + + Each sub-API provides methods for interacting with a specific openvasd domain. + """ + + def __init__( + self, + host_name: str, + port: int = 3000, + *, + api_key: Optional[str] = None, + server_ca_path: Optional[StrOrPathLike] = None, + client_cert_paths: Optional[ + Union[StrOrPathLike, Tuple[StrOrPathLike, StrOrPathLike]] + ] = None, + suppress_exceptions: bool = False, + ): + """ + Initialize the OpenvasdHttpApiV1 entry point. + + Args: + host_name: Hostname or IP of the openvasd server (e.g., "localhost"). + port: Port of the openvasd service (default: 3000). + api_key: Optional API key to be used for authentication. + server_ca_path: Path to the server CA certificate (for HTTPS/mTLS). + client_cert_paths: Path to client certificate or (cert, key) tuple for mTLS. + suppress_exceptions: If True, suppress exceptions and return structured error + responses. Default is False, which means exceptions will be raised. + """ + self._client = create_openvasd_http_client( + host_name=host_name, + port=port, + api_key=api_key, + server_ca_path=server_ca_path, + client_cert_paths=client_cert_paths, + ) + + # Sub-API modules + self.__health = HealthAPI( + self._client, suppress_exceptions=suppress_exceptions + ) + self.__metadata = MetadataAPI( + self._client, suppress_exceptions=suppress_exceptions + ) + self.__notus = NotusAPI( + self._client, suppress_exceptions=suppress_exceptions + ) + self.__scans = ScansAPI( + self._client, suppress_exceptions=suppress_exceptions + ) + self.__vts = VtsAPI( + self._client, suppress_exceptions=suppress_exceptions + ) + + @property + def health(self) -> HealthAPI: + """ + Access the health API module. + + Provides methods to check the health status of the openvasd service. + """ + return self.__health + + @property + def metadata(self) -> MetadataAPI: + """ + Access the metadata API module. + + Provides methods to retrieve metadata about the openvasd service, + including version and feed information. + """ + return self.__metadata + + @property + def notus(self) -> NotusAPI: + """ + Access the Notus API module. + + Provides methods to interact with the Notus service for package-based + vulnerability scanning. + """ + return self.__notus + + @property + def scans(self) -> ScansAPI: + """ + Access the scans API module. + + Provides methods to manage and interact with vulnerability scans, + including starting, stopping, and retrieving scan results. + """ + return self.__scans + + @property + def vts(self) -> VtsAPI: + """ + Access the VTS API module. + + Provides methods to manage and interact with Vulnerability Tests + (VTs) for vulnerability assessment. + """ + return self.__vts diff --git a/gvm/protocols/http/openvasd/_scans.py b/gvm/protocols/http/openvasd/_scans.py new file mode 100644 index 00000000..ba869063 --- /dev/null +++ b/gvm/protocols/http/openvasd/_scans.py @@ -0,0 +1,562 @@ +# SPDX-FileCopyrightText: 2025 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later + +""" +API wrapper for interacting with the /scans endpoints of the openvasd HTTP API. +""" + +import urllib.parse +from dataclasses import asdict, dataclass, field +from enum import Enum +from typing import Any, Optional, Union +from uuid import UUID + +import httpx + +from gvm.errors import InvalidArgumentType + +from ._api import OpenvasdAPI + +ID = Union[str, UUID] + + +@dataclass +class PortRange: + """ + Represents a range of ports. + + Attributes: + start: The starting port number. + end: The ending port number. + """ + + start: int + end: int + + +@dataclass +class Port: + """ + Represents a port configuration for scanning. + + Attributes: + protocol: The protocol to use ("tcp" or "udp"). + range: A list of port ranges to scan. + """ + + protocol: str # e.g., "tcp", "udp" + range: list[PortRange] + + +@dataclass +class CredentialUP: + """ + Represents username/password credentials for a service. + + Attributes: + username: The login username. + password: The login password. + privilege_username: Optional privilege escalation username. + privilege_password: Optional privilege escalation password. + """ + + username: str + password: str + privilege_username: Optional[str] = None + privilege_password: Optional[str] = None + + +@dataclass +class CredentialKRB5: + """ + Represents Kerberos credentials. + + Attributes: + username: Kerberos username. + password: Kerberos password. + realm: Kerberos realm. + kdc: Key Distribution Center hostname. + """ + + username: str + password: str + realm: str + kdc: str + + +@dataclass +class CredentialUSK: + """ + Represents credentials using a user/SSH key combination. + + Attributes: + username: SSH username. + password: Password for SSH key (if encrypted). + private: Private key content or reference. + privilege_username: Optional privilege escalation username. + privilege_password: Optional privilege escalation password. + """ + + username: str + password: str + private: str + privilege_username: Optional[str] = None + privilege_password: Optional[str] = None + + +@dataclass +class CredentialSNMP: + """ + Represents SNMP credentials. + + Attributes: + username: SNMP username. + password: SNMP authentication password. + community: SNMP community string. + auth_algorithm: Authentication algorithm (e.g., "md5"). + privacy_password: Privacy password for SNMPv3. + privacy_algorithm: Privacy algorithm (e.g., "aes"). + """ + + username: str + password: str + community: str + auth_algorithm: str + privacy_password: str + privacy_algorithm: str + + +@dataclass +class Credential: + """ + Represents a full credential configuration for a specific service. + + Attributes: + service: Name of the service (e.g., "ssh", "snmp"). + port: Port number associated with the service. + up: Optional username/password credentials. + krb5: Optional Kerberos credentials. + usk: Optional user/SSH key credentials. + snmp: Optional SNMP credentials. + """ + + service: str + port: int + up: Optional[CredentialUP] = None + krb5: Optional[CredentialKRB5] = None + usk: Optional[CredentialUSK] = None + snmp: Optional[CredentialSNMP] = None + + +@dataclass +class Target: + """ + Represents the scan target configuration. + + Attributes: + hosts: List of target IPs or hostnames. + excluded_hosts: List of IPs or hostnames to exclude. + ports: List of port configurations. + credentials: List of credentials to use during scanning. + alive_test_ports: Port ranges used to test if hosts are alive. + alive_test_methods: Methods used to check if hosts are alive (e.g., "icmp"). + reverse_lookup_unify: Whether to unify reverse lookup results. + reverse_lookup_only: Whether to rely solely on reverse DNS lookups. + """ + + hosts: list[str] + excluded_hosts: list[str] = field(default_factory=list) + ports: list[Port] = field(default_factory=list) + credentials: list[Credential] = field(default_factory=list) + alive_test_ports: list[Port] = field(default_factory=list) + alive_test_methods: list[str] = field(default_factory=list) + reverse_lookup_unify: bool = False + reverse_lookup_only: bool = False + + +@dataclass +class VTParameter: + """ + Represents a parameter for a specific vulnerability test. + + Attributes: + id: Identifier of the VT parameter. + value: Value to assign to the parameter. + """ + + id: int + value: str + + +@dataclass +class VTSelection: + """ + Represents a selected vulnerability test (VT) and its parameters. + + Attributes: + oid: The OID (Object Identifier) of the VT. + parameters: A list of parameters to customize VT behavior. + """ + + oid: str + parameters: list[VTParameter] = field(default_factory=list) + + +@dataclass +class ScanPreference: + """ + Represents a scan-level preference or configuration option. + + Attributes: + id: Preference ID or name (e.g., "max_checks", "scan_speed"). + value: The value assigned to the preference. + """ + + id: str + value: str + + +def _to_dict(obj: Any) -> Any: + """Recursively convert dataclass instances to dictionaries.""" + if isinstance(obj, list): + return [_to_dict(item) for item in obj] + elif hasattr(obj, "__dataclass_fields__"): + return {k: _to_dict(v) for k, v in asdict(obj).items() if v is not None} + return obj + + +class ScanAction(str, Enum): + """ + Enumeration of valid scan actions supported by the openvasd API. + + This enum defines the allowed actions that can be performed on a scan. + Using an enum helps ensure type safety and prevents invalid action strings + from being passed to the API. + + Values: + START: Initiates the scan execution. + STOP: Terminates an ongoing scan. + """ + + START = "start" # Start the scan + STOP = "stop" # Stop the scan + + +class ScansAPI(OpenvasdAPI): + """ + Provides access to scan-related operations in the openvasd HTTP API. + + Includes methods for creating, starting, stopping, and retrieving scan details + and results. + """ + + def create( + self, + target: Target, + vt_selection: list[VTSelection], + *, + scan_preferences: Optional[list[ScanPreference]] = None, + ) -> httpx.Response: + """ + Create a new scan with the specified target configuration and VT selection. + + Args: + target: A `Target` dataclass instance describing the scan target(s), including hosts, ports, credentials, and alive test settings. + vt_selection: A list of `VTSelection` instances specifying which vulnerability tests (VTs) to include in the scan. + scan_preferences: Optional list of `ScanPreference` instances to customize scan behavior (e.g., number of threads, timeout values). + + Returns: + The full HTTP response returned by the POST /scans request. + + Raises: + httpx.HTTPStatusError: If the server responds with an error status + and the exception is not suppressed. + + See: POST /scans in the openvasd API documentation. + """ + request_json = { + "target": _to_dict(target), + "vts": _to_dict(vt_selection), + } + if scan_preferences: + request_json["scan_preferences"] = _to_dict(scan_preferences) + + try: + response = self._client.post("/scans", json=request_json) + response.raise_for_status() + return response + except httpx.HTTPStatusError as e: + if self._suppress_exceptions: + return e.response + raise + + def delete(self, scan_id: ID) -> int: + """ + Delete a scan by its ID. + + Args: + scan_id: The scan identifier. + + Returns: + The HTTP status code returned by the server on success, + or the error status code returned by the server on failure. + + Raises: + httpx.HTTPStatusError: If the server responds with an error status + and the exception is not suppressed. + + See: DELETE /scans/{id} in the openvasd API documentation. + """ + try: + response = self._client.delete( + f"/scans/{urllib.parse.quote(str(scan_id))}" + ) + response.raise_for_status() + return response.status_code + except httpx.HTTPStatusError as e: + if self._suppress_exceptions: + return e.response.status_code + raise + + def get_all(self) -> httpx.Response: + """ + Retrieve the list of all available scans. + + Returns: + The full HTTP response of the GET /scans request. + + Raises: + httpx.HTTPStatusError: If the request fails and exceptions are not suppressed. + + See: GET /scans in the openvasd API documentation. + """ + try: + response = self._client.get("/scans") + response.raise_for_status() + return response + except httpx.HTTPStatusError as e: + if self._suppress_exceptions: + return e.response + raise + + def get(self, scan_id: ID) -> httpx.Response: + """ + Retrieve metadata of a single scan by ID. + + Args: + scan_id: The scan identifier. + + Returns: + The full HTTP response of the GET /scans/{id} request. + + Raises: + httpx.HTTPStatusError: If the request fails and exceptions are not suppressed. + + See: GET /scans/{id} in the openvasd API documentation. + """ + try: + response = self._client.get( + f"/scans/{urllib.parse.quote(str(scan_id))}" + ) + response.raise_for_status() + return response + except httpx.HTTPStatusError as e: + if self._suppress_exceptions: + return e.response + raise + + def get_results( + self, + scan_id: ID, + *, + range_start: Optional[int] = None, + range_end: Optional[int] = None, + ) -> httpx.Response: + """ + Retrieve a range of results for a given scan. + + Args: + scan_id: The scan identifier. + range_start: Optional start index for paginated results. + range_end: Optional end index for paginated results. + + Returns: + The full HTTP response of the GET /scans/{id}/results request. + + Raises: + InvalidArgumentType: If provided range values are not integers. + httpx.HTTPStatusError: If the request fails and exceptions are not suppressed. + + See: GET /scans/{id}/results in the openvasd API documentation. + """ + params = {} + if range_start is not None: + if not isinstance(range_start, int): + raise InvalidArgumentType("range_start") + if range_end is not None: + if not isinstance(range_end, int): + raise InvalidArgumentType("range_end") + params["range"] = f"{range_start}-{range_end}" + else: + params["range"] = str(range_start) + elif range_end is not None: + if not isinstance(range_end, int): + raise InvalidArgumentType("range_end") + params["range"] = f"0-{range_end}" + + try: + response = self._client.get( + f"/scans/{urllib.parse.quote(str(scan_id))}/results", + params=params, + ) + response.raise_for_status() + return response + except httpx.HTTPStatusError as e: + if self._suppress_exceptions: + return e.response + raise + + def get_result( + self, + scan_id: ID, + result_id: ID, + ) -> httpx.Response: + """ + Retrieve a specific scan result. + + Args: + scan_id: The scan identifier. + result_id: The specific result ID to fetch. + + Returns: + The full HTTP response of the GET /scans/{id}/results/{rid} request. + + Raises: + httpx.HTTPStatusError: If the request fails and exceptions are not suppressed. + + See: GET /scans/{id}/results/{rid} in the openvasd API documentation. + """ + try: + response = self._client.get( + f"/scans/{urllib.parse.quote(str(scan_id))}/results/{urllib.parse.quote(str(result_id))}" + ) + response.raise_for_status() + return response + except httpx.HTTPStatusError as e: + if self._suppress_exceptions: + return e.response + raise + + def get_status(self, scan_id: ID) -> httpx.Response: + """ + Retrieve the status of a scan. + + Args: + scan_id: The scan identifier. + + Returns: + The full HTTP response of the GET /scans/{id}/status request. + + Raises: + httpx.HTTPStatusError: If the request fails and exceptions are not suppressed. + + See: GET /scans/{id}/status in the openvasd API documentation. + """ + try: + response = self._client.get( + f"/scans/{urllib.parse.quote(str(scan_id))}/status" + ) + response.raise_for_status() + return response + except httpx.HTTPStatusError as e: + if self._suppress_exceptions: + return e.response + raise + + def _run_action(self, scan_id: ID, action: ScanAction) -> int: + """ + Perform a scan action (start or stop) for the given scan ID. + + Args: + scan_id: The unique identifier of the scan. + action: A member of the ScanAction enum (e.g., ScanAction.START or ScanAction.STOP). + + Returns: + The HTTP status code returned by the server on success, + or the error status code returned by the server on failure. + + Raises: + httpx.HTTPStatusError: If the request fails and exceptions are not suppressed. + + See: POST /scans/{id} in the openvasd API documentation. + """ + try: + response = self._client.post( + f"/scans/{urllib.parse.quote(str(scan_id))}", + json={"action": str(action.value)}, + ) + response.raise_for_status() + return response.status_code + except httpx.HTTPStatusError as e: + if self._suppress_exceptions: + return e.response.status_code + raise + + def start(self, scan_id: ID) -> int: + """ + Start the scan identified by the given scan ID. + + Args: + scan_id: The unique identifier of the scan. + + Returns: + HTTP status code (e.g., 200) if successful, or error code (e.g., 404, 500) if the request fails, + and safe is False. + + Raises: + httpx.HTTPStatusError: If the request fails and exceptions are not suppressed. + + See: POST /scans/{id} with action=start in the openvasd API documentation. + """ + return self._run_action(scan_id, ScanAction.START) + + def stop(self, scan_id: ID) -> int: + """ + Stop the scan identified by the given scan ID. + + Args: + scan_id: The unique identifier of the scan. + + Returns: + HTTP status code (e.g., 200) if successful, or error code (e.g., 404, 500) if the request fails, + and safe is False. + + Raises: + httpx.HTTPStatusError: If the request fails and exceptions are not suppressed. + + See: POST /scans/{id} with action=stop in the openvasd API documentation. + """ + return self._run_action(scan_id, ScanAction.STOP) + + def get_preferences(self) -> httpx.Response: + """ + Retrieve all available scan preferences from the scanner. + + Returns: + The full HTTP response of the GET /scans/preferences request. + + Raises: + httpx.HTTPStatusError: If the server responds with an error status + and the exception is not suppressed. + + See: GET /scans/preferences in the openvasd API documentation. + """ + try: + response = self._client.get("/scans/preferences") + response.raise_for_status() + return response + except httpx.HTTPStatusError as e: + if self._suppress_exceptions: + return e.response + raise diff --git a/gvm/protocols/http/openvasd/_vts.py b/gvm/protocols/http/openvasd/_vts.py new file mode 100644 index 00000000..1753ac63 --- /dev/null +++ b/gvm/protocols/http/openvasd/_vts.py @@ -0,0 +1,72 @@ +# SPDX-FileCopyrightText: 2025 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later + +""" +API wrapper for accessing vulnerability test (VT) metadata from the openvasd HTTP API. +""" + +import urllib.parse + +import httpx + +from ._api import OpenvasdAPI + + +class VtsAPI(OpenvasdAPI): + """ + Provides access to the openvasd /vts endpoints. + + This includes retrieving the list of all available vulnerability tests + as well as fetching detailed information for individual VTs by OID. + """ + + def get_all(self) -> httpx.Response: + """ + Retrieve the list of all available vulnerability tests (VTs). + + This corresponds to a GET request to `/vts`. + + Returns: + The full `httpx.Response` containing a JSON list of VT entries. + + Raises: + httpx.HTTPStatusError: If the server returns a non-success status and exceptions are not suppressed. + + See: GET /vts in the openvasd API documentation. + """ + try: + response = self._client.get("/vts") + response.raise_for_status() + return response + except httpx.HTTPStatusError as e: + if self._suppress_exceptions: + return e.response + raise + + def get(self, oid: str) -> httpx.Response: + """ + Retrieve detailed information about a specific VT by OID. + + This corresponds to a GET request to `/vts/{oid}`. + + Args: + oid: The OID (object identifier) of the vulnerability test. + + Returns: + The full `httpx.Response` containing VT metadata for the given OID. + + Raises: + httpx.HTTPStatusError: If the server returns a non-success status and exceptions are not suppressed. + + See: GET /vts/{id} in the openvasd API documentation. + """ + quoted_oid = urllib.parse.quote(oid) + try: + response = self._client.get(f"/vts/{quoted_oid}") + response.raise_for_status() + return response + except httpx.HTTPStatusError as e: + if self._suppress_exceptions: + return e.response + raise diff --git a/poetry.lock b/poetry.lock index c8994d93..40811f7a 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.1.2 and should not be changed by hand. [[package]] name = "alabaster" @@ -18,7 +18,7 @@ version = "4.9.0" description = "High level compatibility layer for multiple asynchronous event loop implementations" optional = false python-versions = ">=3.9" -groups = ["dev"] +groups = ["main", "dev"] files = [ {file = "anyio-4.9.0-py3-none-any.whl", hash = "sha256:9f76d541cad6e36af7beb62e978876f3b41e3e04f2c1fbf0884604c0a9c4d93c"}, {file = "anyio-4.9.0.tar.gz", hash = "sha256:673c0c244e15788651a4ff38710fea9675823028a6f08a5eda409e0c9840a028"}, @@ -257,7 +257,7 @@ version = "2025.6.15" description = "Python package for providing Mozilla's CA Bundle." optional = false python-versions = ">=3.7" -groups = ["dev"] +groups = ["main", "dev"] files = [ {file = "certifi-2025.6.15-py3-none-any.whl", hash = "sha256:2e0c7ce7cb5d8f8634ca55d2ba7e6ec2689a2fd6537d8dec1296a477a4910057"}, {file = "certifi-2025.6.15.tar.gz", hash = "sha256:d747aa5a8b9bbbb1bb8c22bb13e22bd1f18e9796defa16bab421f7f7a317323b"}, @@ -468,7 +468,7 @@ description = "Cross-platform colored terminal text." optional = false python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,>=2.7" groups = ["dev"] -markers = "platform_system == \"Windows\" or sys_platform == \"win32\"" +markers = "sys_platform == \"win32\" or platform_system == \"Windows\"" files = [ {file = "colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6"}, {file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"}, @@ -647,7 +647,7 @@ version = "1.3.0" description = "Backport of PEP 654 (exception groups)" optional = false python-versions = ">=3.7" -groups = ["dev"] +groups = ["main", "dev"] markers = "python_version < \"3.11\"" files = [ {file = "exceptiongroup-1.3.0-py3-none-any.whl", hash = "sha256:4d111e6e0c13d0644cad6ddaa7ed0261a0b36971f6d23e7ec9b4b9097da78a10"}, @@ -704,7 +704,7 @@ version = "0.16.0" description = "A pure-Python, bring-your-own-I/O implementation of HTTP/1.1" optional = false python-versions = ">=3.8" -groups = ["dev"] +groups = ["main", "dev"] files = [ {file = "h11-0.16.0-py3-none-any.whl", hash = "sha256:63cf8bbe7522de3bf65932fda1d9c2772064ffb3dae62d55932da54b31cb6c86"}, {file = "h11-0.16.0.tar.gz", hash = "sha256:4e35b956cf45792e4caa5885e69fba00bdbc6ffafbfa020300e549b208ee5ff1"}, @@ -716,7 +716,7 @@ version = "4.2.0" description = "Pure-Python HTTP/2 protocol implementation" optional = false python-versions = ">=3.9" -groups = ["dev"] +groups = ["main", "dev"] files = [ {file = "h2-4.2.0-py3-none-any.whl", hash = "sha256:479a53ad425bb29af087f3458a61d30780bc818e4ebcf01f0b536ba916462ed0"}, {file = "h2-4.2.0.tar.gz", hash = "sha256:c8a52129695e88b1a0578d8d2cc6842bbd79128ac685463b887ee278126ad01f"}, @@ -732,7 +732,7 @@ version = "4.1.0" description = "Pure-Python HPACK header encoding" optional = false python-versions = ">=3.9" -groups = ["dev"] +groups = ["main", "dev"] files = [ {file = "hpack-4.1.0-py3-none-any.whl", hash = "sha256:157ac792668d995c657d93111f46b4535ed114f0c9c8d672271bbec7eae1b496"}, {file = "hpack-4.1.0.tar.gz", hash = "sha256:ec5eca154f7056aa06f196a557655c5b009b382873ac8d1e66e79e87535f1dca"}, @@ -744,7 +744,7 @@ version = "1.0.9" description = "A minimal low-level HTTP client." optional = false python-versions = ">=3.8" -groups = ["dev"] +groups = ["main", "dev"] files = [ {file = "httpcore-1.0.9-py3-none-any.whl", hash = "sha256:2d400746a40668fc9dec9810239072b40b4484b640a8c38fd654a024c7a1bf55"}, {file = "httpcore-1.0.9.tar.gz", hash = "sha256:6e34463af53fd2ab5d807f399a9b45ea31c3dfa2276f15a2c3f00afff6e176e8"}, @@ -766,7 +766,7 @@ version = "0.28.1" description = "The next generation HTTP client." optional = false python-versions = ">=3.8" -groups = ["dev"] +groups = ["main", "dev"] files = [ {file = "httpx-0.28.1-py3-none-any.whl", hash = "sha256:d909fcccc110f8c7faf814ca82a9a4d816bc5a6dbfea25d6591d6985b8ba59ad"}, {file = "httpx-0.28.1.tar.gz", hash = "sha256:75e98c5f16b0f35b567856f597f06ff2270a374470a5c2392242528e3e3e42fc"}, @@ -792,7 +792,7 @@ version = "6.1.0" description = "Pure-Python HTTP/2 framing" optional = false python-versions = ">=3.9" -groups = ["dev"] +groups = ["main", "dev"] files = [ {file = "hyperframe-6.1.0-py3-none-any.whl", hash = "sha256:b03380493a519fce58ea5af42e4a42317bf9bd425596f7a0835ffce80f1a42e5"}, {file = "hyperframe-6.1.0.tar.gz", hash = "sha256:f630908a00854a7adeabd6382b43923a4c4cd4b821fcb527e6ab9e15382a3b08"}, @@ -804,7 +804,7 @@ version = "3.10" description = "Internationalized Domain Names in Applications (IDNA)" optional = false python-versions = ">=3.6" -groups = ["dev"] +groups = ["main", "dev"] files = [ {file = "idna-3.10-py3-none-any.whl", hash = "sha256:946d195a0d259cbba61165e88e65941f16e9b36ea6ddb97f00452bae8b1287d3"}, {file = "idna-3.10.tar.gz", hash = "sha256:12f65c9b470abda6dc35cf8e63cc574b1c52b11df2c86030af0ac09b01b13ea9"}, @@ -1552,7 +1552,7 @@ version = "1.3.1" description = "Sniff out which async library your code is running under" optional = false python-versions = ">=3.7" -groups = ["dev"] +groups = ["main", "dev"] files = [ {file = "sniffio-1.3.1-py3-none-any.whl", hash = "sha256:2f6da418d1f1e0fddd844478f41680e794e6051915791a034ff65e5f100525a2"}, {file = "sniffio-1.3.1.tar.gz", hash = "sha256:f4324edc670a0f49750a81b895f35c3adb843cca46f0530f79fc1babb23789dc"}, @@ -1813,11 +1813,12 @@ version = "4.14.1" description = "Backported and Experimental Type Hints for Python 3.9+" optional = false python-versions = ">=3.9" -groups = ["dev"] +groups = ["main", "dev"] files = [ {file = "typing_extensions-4.14.1-py3-none-any.whl", hash = "sha256:d1e1e3b58374dc93031d6eda2420a48ea44a36c2b4766a4fdeb3710755731d76"}, {file = "typing_extensions-4.14.1.tar.gz", hash = "sha256:38b39f4aeeab64884ce9f74c94263ef78f3c22467c8724005483154c26648d36"}, ] +markers = {main = "python_version < \"3.13\""} [[package]] name = "urllib3" @@ -1861,4 +1862,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.1" python-versions = "^3.9.2" -content-hash = "5a19a437b167240b91207d5d281549beffcb24ee414425a88d3b6371bf7c96ad" +content-hash = "c4aea8eeef68616539cbf0c5393936a80b3565878a0367ed8a282390ea71c89c" diff --git a/pyproject.toml b/pyproject.toml index 0e808b3d..a6b59c9b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,6 +32,7 @@ packages = [{ include = "gvm" }, { include = "tests", format = "sdist" }] python = "^3.9.2" paramiko = ">=2.7.1" lxml = ">=4.5.0" +httpx = {extras = ["http2"], version = "^0.28.1"} [tool.poetry.group.dev.dependencies] coverage = ">=7.2" diff --git a/tests/protocols/http/__init__.py b/tests/protocols/http/__init__.py new file mode 100644 index 00000000..9c0a68e7 --- /dev/null +++ b/tests/protocols/http/__init__.py @@ -0,0 +1,3 @@ +# SPDX-FileCopyrightText: 2025 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later diff --git a/tests/protocols/http/openvasd/__init__.py b/tests/protocols/http/openvasd/__init__.py new file mode 100644 index 00000000..9c0a68e7 --- /dev/null +++ b/tests/protocols/http/openvasd/__init__.py @@ -0,0 +1,3 @@ +# SPDX-FileCopyrightText: 2025 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later diff --git a/tests/protocols/http/openvasd/test_client.py b/tests/protocols/http/openvasd/test_client.py new file mode 100644 index 00000000..2224a5f8 --- /dev/null +++ b/tests/protocols/http/openvasd/test_client.py @@ -0,0 +1,74 @@ +# SPDX-FileCopyrightText: 2025 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later + +import ssl +import unittest +from unittest.mock import MagicMock, patch + +from gvm.protocols.http.openvasd._client import create_openvasd_http_client + + +class TestOpenvasdClient(unittest.TestCase): + @patch("gvm.protocols.http.openvasd._client.Client") + def test_init_without_tls_or_api_key(self, mock_httpx_client): + create_openvasd_http_client("localhost") + mock_httpx_client.assert_called_once() + _, kwargs = mock_httpx_client.call_args + self.assertEqual(kwargs["base_url"], "http://localhost:3000") + self.assertFalse(kwargs["verify"]) + self.assertNotIn("X-API-KEY", kwargs["headers"]) + + @patch("gvm.protocols.http.openvasd._client.Client") + def test_init_with_api_key_only(self, mock_httpx_client): + create_openvasd_http_client("localhost", api_key="secret") + _, kwargs = mock_httpx_client.call_args + self.assertEqual(kwargs["headers"]["X-API-KEY"], "secret") + self.assertEqual(kwargs["base_url"], "http://localhost:3000") + self.assertFalse(kwargs["verify"]) + + @patch("gvm.protocols.http.openvasd._client.ssl.create_default_context") + @patch("gvm.protocols.http.openvasd._client.Client") + def test_init_with_mtls_tuple( + self, mock_httpx_client, mock_ssl_ctx_factory + ): + mock_context = MagicMock(spec=ssl.SSLContext) + mock_ssl_ctx_factory.return_value = mock_context + + create_openvasd_http_client( + "localhost", + server_ca_path="/path/ca.pem", + client_cert_paths=("/path/cert.pem", "/path/key.pem"), + ) + + mock_ssl_ctx_factory.assert_called_once_with( + ssl.Purpose.SERVER_AUTH, cafile="/path/ca.pem" + ) + mock_context.load_cert_chain.assert_called_once_with( + certfile="/path/cert.pem", keyfile="/path/key.pem" + ) + mock_httpx_client.assert_called_once() + _, kwargs = mock_httpx_client.call_args + self.assertEqual(kwargs["base_url"], "https://localhost:3000") + self.assertEqual(kwargs["verify"], mock_context) + + @patch("gvm.protocols.http.openvasd._client.ssl.create_default_context") + @patch("gvm.protocols.http.openvasd._client.Client") + def test_init_with_mtls_single_cert( + self, mock_httpx_client, mock_ssl_ctx_factory + ): + mock_context = MagicMock(spec=ssl.SSLContext) + mock_ssl_ctx_factory.return_value = mock_context + + create_openvasd_http_client( + "localhost", + server_ca_path="/path/ca.pem", + client_cert_paths="/path/client.pem", + ) + + mock_context.load_cert_chain.assert_called_once_with( + certfile="/path/client.pem" + ) + _, kwargs = mock_httpx_client.call_args + self.assertEqual(kwargs["base_url"], "https://localhost:3000") + self.assertEqual(kwargs["verify"], mock_context) diff --git a/tests/protocols/http/openvasd/test_health.py b/tests/protocols/http/openvasd/test_health.py new file mode 100644 index 00000000..c2c202b7 --- /dev/null +++ b/tests/protocols/http/openvasd/test_health.py @@ -0,0 +1,110 @@ +# SPDX-FileCopyrightText: 2025 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later + +import unittest +from unittest.mock import MagicMock + +import httpx + +from gvm.protocols.http.openvasd._health import HealthAPI + + +def _mock_response(status_code=200): + mock_response = MagicMock(spec=httpx.Response) + mock_response.status_code = status_code + mock_response.raise_for_status = MagicMock() + return mock_response + + +class TestHealthAPI(unittest.TestCase): + def setUp(self): + self.mock_client = MagicMock(spec=httpx.Client) + + def test_alive_returns_status_code(self): + mock_response = _mock_response(200) + self.mock_client.get.return_value = mock_response + + health_api = HealthAPI(self.mock_client) + result = health_api.get_alive() + self.mock_client.get.assert_called_once_with("/health/alive") + mock_response.raise_for_status.assert_called_once() + self.assertEqual(result, 200) + + def test_alive_raises_httpx_error_returns_503_with_suppress_exceptions( + self, + ): + self.mock_client.get.side_effect = httpx.HTTPStatusError( + "Not OK", request=MagicMock(), response=MagicMock(status_code=503) + ) + + health_api = HealthAPI(self.mock_client, suppress_exceptions=True) + result = health_api.get_alive() + self.assertEqual(result, 503) + + def test_alive_raises_httpx_error(self): + self.mock_client.get.side_effect = httpx.HTTPStatusError( + "Not OK", request=MagicMock(), response=MagicMock(status_code=503) + ) + health_api = HealthAPI(self.mock_client) + with self.assertRaises(httpx.HTTPStatusError): + health_api.get_alive() + + def test_ready_returns_status_code(self): + mock_response = _mock_response(204) + self.mock_client.get.return_value = mock_response + + health_api = HealthAPI(self.mock_client) + result = health_api.get_ready() + self.mock_client.get.assert_called_once_with("/health/ready") + mock_response.raise_for_status.assert_called_once() + self.assertEqual(result, 204) + + def test_ready_raises_httpx_error_returns_503_with_suppress_exceptions( + self, + ): + self.mock_client.get.side_effect = httpx.HTTPStatusError( + "Not OK", request=MagicMock(), response=MagicMock(status_code=503) + ) + + health_api = HealthAPI(self.mock_client, suppress_exceptions=True) + result = health_api.get_ready() + self.assertEqual(result, 503) + + def test_ready_raises_httpx_error(self): + self.mock_client.get.side_effect = httpx.HTTPStatusError( + "Not OK", request=MagicMock(), response=MagicMock(status_code=503) + ) + health_api = HealthAPI(self.mock_client) + with self.assertRaises(httpx.HTTPStatusError): + health_api.get_ready() + + def test_started_returns_status_code(self): + mock_response = _mock_response(202) + self.mock_client.get.return_value = mock_response + + health_api = HealthAPI(self.mock_client) + result = health_api.get_started() + self.mock_client.get.assert_called_once_with("/health/started") + mock_response.raise_for_status.assert_called_once() + self.assertEqual(result, 202) + + def test_started_raises_httpx_error_returns_503_with_suppress_exceptions( + self, + ): + self.mock_client.get.side_effect = httpx.HTTPStatusError( + "Not OK", request=MagicMock(), response=MagicMock(status_code=503) + ) + + health_api = HealthAPI(self.mock_client, suppress_exceptions=True) + result = health_api.get_started() + self.assertEqual(result, 503) + + def test_started_raises_httpx_error(self): + self.mock_client.get.side_effect = httpx.HTTPStatusError( + "Not OK", request=MagicMock(), response=MagicMock(status_code=503) + ) + + health_api = HealthAPI(self.mock_client) + with self.assertRaises(httpx.HTTPStatusError): + health_api.get_started() diff --git a/tests/protocols/http/openvasd/test_metadata.py b/tests/protocols/http/openvasd/test_metadata.py new file mode 100644 index 00000000..e6a4e4f5 --- /dev/null +++ b/tests/protocols/http/openvasd/test_metadata.py @@ -0,0 +1,129 @@ +# SPDX-FileCopyrightText: 2025 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later + +import unittest +from unittest.mock import MagicMock + +import httpx + +from gvm.protocols.http.openvasd._metadata import ( + Metadata, + MetadataAPI, + MetadataError, +) + + +def _mock_head_response(status_code=200, headers=None): + response = MagicMock(spec=httpx.Response) + response.status_code = status_code + response.headers = headers or {} + response.raise_for_status = MagicMock() + return response + + +class TestMetadataAPI(unittest.TestCase): + def setUp(self): + self.mock_client = MagicMock(spec=httpx.Client) + + def test_get_successful(self): + headers = { + "api-version": "1.0", + "feed-version": "2025.01", + "authentication": "API-KEY", + } + mock_response = _mock_head_response(200, headers) + self.mock_client.head.return_value = mock_response + + api = MetadataAPI(self.mock_client) + result = api.get() + self.mock_client.head.assert_called_once_with("/") + mock_response.raise_for_status.assert_called_once() + self.assertEqual( + result, + Metadata( + api_version="1.0", + feed_version="2025.01", + authentication="API-KEY", + ), + ) + + def test_get_unauthorized_with_suppress_exceptions(self): + mock_response = MagicMock(spec=httpx.Response) + mock_response.status_code = 401 + + self.mock_client.head.side_effect = httpx.HTTPStatusError( + "Unauthorized", request=MagicMock(), response=mock_response + ) + + api = MetadataAPI(self.mock_client, suppress_exceptions=True) + result = api.get() + self.assertEqual( + result, MetadataError(error="Unauthorized", status_code=401) + ) + + def test_get_failure_raises_httpx_error(self): + mock_response = _mock_head_response(500, None) + mock_response.raise_for_status.side_effect = httpx.HTTPStatusError( + "Server failed", + request=MagicMock(), + response=MagicMock(status_code=500), + ) + self.mock_client.head.return_value = mock_response + + api = MetadataAPI(self.mock_client) + with self.assertRaises(httpx.HTTPStatusError): + api.get() + + self.mock_client.head.assert_called_once_with("/") + + def test_get_scans_successful(self): + headers = { + "api-version": "1.0", + "feed-version": "2025.01", + "authentication": "API-KEY", + } + mock_response = _mock_head_response(200, headers) + self.mock_client.head.return_value = mock_response + + api = MetadataAPI(self.mock_client) + result = api.get_scans() + self.mock_client.head.assert_called_once_with("/scans") + mock_response.raise_for_status.assert_called_once() + self.assertEqual( + result, + Metadata( + api_version="1.0", + feed_version="2025.01", + authentication="API-KEY", + ), + ) + + def test_get_scans_unauthorized_with_suppress_exceptions(self): + mock_response = MagicMock(spec=httpx.Response) + mock_response.status_code = 401 + + self.mock_client.head.side_effect = httpx.HTTPStatusError( + "Unauthorized", request=MagicMock(), response=mock_response + ) + + api = MetadataAPI(self.mock_client, suppress_exceptions=True) + result = api.get_scans() + self.assertEqual( + result, MetadataError(error="Unauthorized", status_code=401) + ) + + def test_get_scans_failure_raises_httpx_error(self): + mock_response = _mock_head_response(500, None) + mock_response.raise_for_status.side_effect = httpx.HTTPStatusError( + "Server failed", + request=MagicMock(), + response=MagicMock(status_code=500), + ) + self.mock_client.head.return_value = mock_response + + api = MetadataAPI(self.mock_client) + with self.assertRaises(httpx.HTTPStatusError): + api.get_scans() + + self.mock_client.head.assert_called_once_with("/scans") diff --git a/tests/protocols/http/openvasd/test_notus.py b/tests/protocols/http/openvasd/test_notus.py new file mode 100644 index 00000000..72cc63b5 --- /dev/null +++ b/tests/protocols/http/openvasd/test_notus.py @@ -0,0 +1,100 @@ +# SPDX-FileCopyrightText: 2025 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later + +import unittest +from unittest.mock import MagicMock + +import httpx + +from gvm.protocols.http.openvasd._notus import NotusAPI + + +def _mock_response(status_code=200, json_data=None): + response = MagicMock(spec=httpx.Response) + response.status_code = status_code + response.json.return_value = json_data or [] + response.raise_for_status = MagicMock() + return response + + +class TestNotusAPI(unittest.TestCase): + def setUp(self): + self.mock_client = MagicMock(spec=httpx.Client) + + def test_get_os_list_success(self): + mock_response = _mock_response(json_data=["debian", "alpine"]) + self.mock_client.get.return_value = mock_response + + api = NotusAPI(self.mock_client) + result = api.get_os_list() + self.mock_client.get.assert_called_once_with("/notus") + mock_response.raise_for_status.assert_called_once() + self.assertEqual(result, mock_response) + + def test_get_os_list_http_error_with_suppress_exceptions(self): + mock_response = MagicMock() + mock_response.status_code = 500 + + self.mock_client.get.side_effect = httpx.HTTPStatusError( + "Internal Server Error", request=MagicMock(), response=mock_response + ) + + api = NotusAPI(self.mock_client, suppress_exceptions=True) + result = api.get_os_list() + + self.assertEqual(result, mock_response) + + def test_get_os_list_http_error(self): + self.mock_client.get.side_effect = httpx.HTTPStatusError( + "Error", request=MagicMock(), response=MagicMock(status_code=500) + ) + + api = NotusAPI(self.mock_client) + with self.assertRaises(httpx.HTTPStatusError): + api.get_os_list() + + def test_run_scan_success(self): + mock_response = _mock_response(json_data={"vulnerabilities": []}) + self.mock_client.post.return_value = mock_response + + api = NotusAPI(self.mock_client) + result = api.run_scan("debian", ["openssl", "bash"]) + self.mock_client.post.assert_called_once_with( + "/notus/debian", json=["openssl", "bash"] + ) + mock_response.raise_for_status.assert_called_once() + self.assertEqual(result, mock_response) + + def test_run_scan_with_special_os_name(self): + mock_response = _mock_response() + self.mock_client.post.return_value = mock_response + + os_name = "alpine linux" + encoded = "alpine%20linux" + api = NotusAPI(self.mock_client) + api.run_scan(os_name, ["musl"]) + self.mock_client.post.assert_called_once_with( + f"/notus/{encoded}", json=["musl"] + ) + + def test_run_scan_http_error(self): + self.mock_client.post.side_effect = httpx.HTTPStatusError( + "Error", request=MagicMock(), response=MagicMock(status_code=400) + ) + + api = NotusAPI(self.mock_client) + with self.assertRaises(httpx.HTTPStatusError): + api.run_scan("ubuntu", ["curl"]) + + def test_run_scan_http_error_with_suppress_exceptions(self): + mock_response = MagicMock() + mock_response.status_code = 500 + self.mock_client.post.side_effect = httpx.HTTPStatusError( + "Internal Server Error", request=MagicMock(), response=mock_response + ) + + api = NotusAPI(self.mock_client, suppress_exceptions=True) + response = api.run_scan("ubuntu", ["curl"]) + + self.assertEqual(response, mock_response) diff --git a/tests/protocols/http/openvasd/test_openvasd1.py b/tests/protocols/http/openvasd/test_openvasd1.py new file mode 100644 index 00000000..5f46de0b --- /dev/null +++ b/tests/protocols/http/openvasd/test_openvasd1.py @@ -0,0 +1,38 @@ +# SPDX-FileCopyrightText: 2025 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later + +import unittest +from unittest.mock import MagicMock, patch + +from gvm.protocols.http.openvasd import OpenvasdHttpAPIv1 + + +class TestOpenvasdHttpApiV1(unittest.TestCase): + @patch("gvm.protocols.http.openvasd._openvasd1.create_openvasd_http_client") + def test_initializes_all_sub_apis(self, mock_crate_openvasd_client): + mock_httpx_client = MagicMock() + mock_crate_openvasd_client.return_value = mock_httpx_client + + api = OpenvasdHttpAPIv1( + host_name="localhost", + port=3000, + api_key="test-key", + server_ca_path="/path/to/ca.pem", + client_cert_paths=("/path/to/client.pem", "/path/to/key.pem"), + ) + + mock_crate_openvasd_client.assert_called_once_with( + host_name="localhost", + port=3000, + api_key="test-key", + server_ca_path="/path/to/ca.pem", + client_cert_paths=("/path/to/client.pem", "/path/to/key.pem"), + ) + + self.assertEqual(api._client, mock_httpx_client) + self.assertEqual(api.health._client, mock_httpx_client) + self.assertEqual(api.metadata._client, mock_httpx_client) + self.assertEqual(api.notus._client, mock_httpx_client) + self.assertEqual(api.scans._client, mock_httpx_client) + self.assertEqual(api.vts._client, mock_httpx_client) diff --git a/tests/protocols/http/openvasd/test_scans.py b/tests/protocols/http/openvasd/test_scans.py new file mode 100644 index 00000000..fbef1497 --- /dev/null +++ b/tests/protocols/http/openvasd/test_scans.py @@ -0,0 +1,636 @@ +# SPDX-FileCopyrightText: 2025 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later + +import unittest +from unittest.mock import MagicMock + +import httpx + +from gvm.errors import InvalidArgumentType +from gvm.protocols.http.openvasd._scans import ( + Credential, + CredentialUP, + ScanAction, + ScanPreference, + ScansAPI, + Target, + VTParameter, + VTSelection, +) + + +def _mock_response(status_code=200, json_data=None): + resp = MagicMock(spec=httpx.Response) + resp.status_code = status_code + resp.json.return_value = json_data or {} + resp.raise_for_status = MagicMock() + return resp + + +class TestScansAPI(unittest.TestCase): + def setUp(self): + self.mock_client = MagicMock(spec=httpx.Client) + + def test_create_scan_success(self): + mock_resp = _mock_response() + self.mock_client.post.return_value = mock_resp + + api = ScansAPI(self.mock_client) + + target = Target( + hosts=["localhost"], + credentials=[ + Credential( + service="ssh", + port=22, + up=CredentialUP(username="user", password="pass"), + ) + ], + ) + vt_selection = [ + VTSelection( + oid="1.3.6.1.4.1.25623.1.0.100001", + parameters=[VTParameter(id=1, value="true")], + ) + ] + preferences = [ + ScanPreference(id="auto_enable_dependencies", value="True") + ] + + api.create(target, vt_selection, scan_preferences=preferences) + + self.mock_client.post.assert_called_once_with( + "/scans", + json={ + "target": { + "hosts": ["localhost"], + "excluded_hosts": [], + "ports": [], + "credentials": [ + { + "service": "ssh", + "port": 22, + "up": { + "username": "user", + "password": "pass", + "privilege_username": None, + "privilege_password": None, + }, + "krb5": None, + "usk": None, + "snmp": None, + } + ], + "alive_test_ports": [], + "alive_test_methods": [], + "reverse_lookup_unify": False, + "reverse_lookup_only": False, + }, + "vts": [ + { + "oid": "1.3.6.1.4.1.25623.1.0.100001", + "parameters": [{"id": 1, "value": "true"}], + } + ], + "scan_preferences": [ + {"id": "auto_enable_dependencies", "value": "True"} + ], + }, + ) + + def test_create_scan_without_scan_preferences(self): + mock_resp = _mock_response() + self.mock_client.post.return_value = mock_resp + + api = ScansAPI(self.mock_client) + + target = Target(hosts=["localhost"]) + vt_selection = [VTSelection(oid="1.3.6.1.4.1.25623.1.0.100001")] + + api.create(target, vt_selection) + + self.mock_client.post.assert_called_once_with( + "/scans", + json={ + "target": { + "hosts": ["localhost"], + "excluded_hosts": [], + "ports": [], + "credentials": [], + "alive_test_ports": [], + "alive_test_methods": [], + "reverse_lookup_unify": False, + "reverse_lookup_only": False, + }, + "vts": [ + {"oid": "1.3.6.1.4.1.25623.1.0.100001", "parameters": []} + ], + }, + ) + + def test_create_scan_failure_raises_httpx_error(self): + mock_response = _mock_response() + mock_response.raise_for_status.side_effect = httpx.HTTPStatusError( + "Bad Request", + request=MagicMock(), + response=MagicMock(status_code=400), + ) + self.mock_client.post.return_value = mock_response + + api = ScansAPI(self.mock_client) + + target = Target(hosts=["localhost"]) + vt_selection = [VTSelection(oid="test-oid")] + preferences = [ScanPreference(id="test", value="4")] + + with self.assertRaises(httpx.HTTPStatusError): + api.create(target, vt_selection, scan_preferences=preferences) + + self.mock_client.post.assert_called_once_with( + "/scans", + json={ + "target": { + "hosts": ["localhost"], + "excluded_hosts": [], + "ports": [], + "credentials": [], + "alive_test_ports": [], + "alive_test_methods": [], + "reverse_lookup_unify": False, + "reverse_lookup_only": False, + }, + "vts": [{"oid": "test-oid", "parameters": []}], + "scan_preferences": [{"id": "test", "value": "4"}], + }, + ) + + def test_create_scan_failure_httpx_error_with_suppress_exceptions(self): + mock_response = _mock_response() + mock_http_error = httpx.HTTPStatusError( + "Bad Request", + request=MagicMock(), + response=MagicMock(status_code=400), + ) + mock_response.raise_for_status.side_effect = mock_http_error + self.mock_client.post.return_value = mock_response + + api = ScansAPI(self.mock_client, suppress_exceptions=True) + + target = Target(hosts=["localhost"]) + vt_selection = [VTSelection(oid="1")] + preferences = [ScanPreference(id="test", value="4")] + + response = api.create( + target, vt_selection, scan_preferences=preferences + ) + + self.mock_client.post.assert_called_once_with( + "/scans", + json={ + "target": { + "hosts": ["localhost"], + "excluded_hosts": [], + "ports": [], + "credentials": [], + "alive_test_ports": [], + "alive_test_methods": [], + "reverse_lookup_unify": False, + "reverse_lookup_only": False, + }, + "vts": [{"oid": "1", "parameters": []}], + "scan_preferences": [{"id": "test", "value": "4"}], + }, + ) + + self.assertEqual(response, mock_http_error.response) + self.assertEqual(response.status_code, 400) + + def test_delete_scan_success(self): + mock_resp = _mock_response(status_code=204) + self.mock_client.delete.return_value = mock_resp + + api = ScansAPI(self.mock_client) + result = api.delete("scan-123") + self.assertEqual(result, 204) + + def test_delete_scan_returns_500_on_httpx_error_with_suppress_exceptions( + self, + ): + mock_response = MagicMock(spec=httpx.Response) + mock_response.status_code = 500 + + self.mock_client.delete.side_effect = httpx.HTTPStatusError( + "failed", request=MagicMock(), response=mock_response + ) + + api = ScansAPI(self.mock_client, suppress_exceptions=True) + status = api.delete("scan-1") + self.assertEqual(status, 500) + + def test_delete_scan_raise_on_httpx_error(self): + mock_response = MagicMock(spec=httpx.Response) + mock_response.status_code = 500 + + self.mock_client.delete.side_effect = httpx.HTTPStatusError( + "failed", request=MagicMock(), response=mock_response + ) + + api = ScansAPI(self.mock_client) + with self.assertRaises(httpx.HTTPStatusError): + api.delete("scan-1") + + def test_get_all_scans(self): + mock_resp = _mock_response() + self.mock_client.get.return_value = mock_resp + + api = ScansAPI(self.mock_client) + result = api.get_all() + self.mock_client.get.assert_called_once_with("/scans") + self.assertEqual(result, mock_resp) + + def test_get_all_scans_failure_raises_httpx_error(self): + mock_response = _mock_response() + mock_response.raise_for_status.side_effect = httpx.HTTPStatusError( + "Server failed", + request=MagicMock(), + response=MagicMock(status_code=500), + ) + self.mock_client.get.return_value = mock_response + + api = ScansAPI(self.mock_client) + with self.assertRaises(httpx.HTTPStatusError): + api.get_all() + + self.mock_client.get.assert_called_once_with("/scans") + + def test_get_all_scans_failure_returns_500_with_suppress_exceptions(self): + mock_response = _mock_response() + mock_http_error = httpx.HTTPStatusError( + "Server failed", + request=MagicMock(), + response=MagicMock(status_code=500), + ) + mock_response.raise_for_status.side_effect = mock_http_error + self.mock_client.get.return_value = mock_response + + api = ScansAPI(self.mock_client, suppress_exceptions=True) + response = api.get_all() + + self.mock_client.get.assert_called_once_with("/scans") + self.assertEqual(response, mock_http_error.response) + self.assertEqual(response.status_code, 500) + + def test_get_scan_by_id(self): + mock_resp = _mock_response() + self.mock_client.get.return_value = mock_resp + + api = ScansAPI(self.mock_client) + result = api.get("scan-1") + self.mock_client.get.assert_called_once_with("/scans/scan-1") + self.assertEqual(result, mock_resp) + + def test_get_scan_by_id_failure_raises_httpx_error(self): + mock_response = _mock_response() + mock_response.raise_for_status.side_effect = httpx.HTTPStatusError( + "Server failed", + request=MagicMock(), + response=MagicMock(status_code=500), + ) + self.mock_client.get.return_value = mock_response + + api = ScansAPI(self.mock_client) + with self.assertRaises(httpx.HTTPStatusError): + api.get("scan-1") + + self.mock_client.get.assert_called_once_with("/scans/scan-1") + + def test_get_scan_by_id_failure_return_500_httpx_error_with_suppress_exceptions( + self, + ): + mock_response = _mock_response() + mock_http_error = httpx.HTTPStatusError( + "Server failed", + request=MagicMock(), + response=MagicMock(status_code=500), + ) + mock_response.raise_for_status.side_effect = mock_http_error + self.mock_client.get.return_value = mock_response + + api = ScansAPI(self.mock_client, suppress_exceptions=True) + response = api.get("scan-1") + + self.mock_client.get.assert_called_once_with("/scans/scan-1") + self.assertEqual(response, mock_http_error.response) + self.assertEqual(response.status_code, 500) + + def test_get_results_with_range(self): + mock_resp = _mock_response() + self.mock_client.get.return_value = mock_resp + + api = ScansAPI(self.mock_client) + api.get_results("scan-1", range_start=5, range_end=10) + self.mock_client.get.assert_called_once_with( + "/scans/scan-1/results", params={"range": "5-10"} + ) + + def test_get_results_only_range_start(self): + mock_response = _mock_response() + self.mock_client.get.return_value = mock_response + + api = ScansAPI(self.mock_client) + result = api.get_results("scan-1", range_start=7) + self.mock_client.get.assert_called_once_with( + "/scans/scan-1/results", params={"range": "7"} + ) + self.assertEqual(result, mock_response) + + def test_get_results_only_range_end(self): + mock_response = _mock_response() + self.mock_client.get.return_value = mock_response + + api = ScansAPI(self.mock_client) + result = api.get_results("scan-1", range_end=9) + self.mock_client.get.assert_called_once_with( + "/scans/scan-1/results", params={"range": "0-9"} + ) + self.assertEqual(result, mock_response) + + def test_get_results_without_range(self): + mock_response = _mock_response() + self.mock_client.get.return_value = mock_response + + api = ScansAPI(self.mock_client) + result = api.get_results("scan-1") + self.mock_client.get.assert_called_once_with( + "/scans/scan-1/results", params={} + ) + self.assertEqual(result, mock_response) + + def test_get_results_invalid_range_type_for_range_start(self): + api = ScansAPI(self.mock_client) + with self.assertRaises(InvalidArgumentType): + api.get_results("scan-1", range_start="wrong", range_end=5) + + def test_get_results_invalid_range_type_for_range_end(self): + api = ScansAPI(self.mock_client) + with self.assertRaises(InvalidArgumentType): + api.get_results("scan-1", range_start=5, range_end="wrong") + + def test_get_results_invalid_range_end_type_only(self): + api = ScansAPI(self.mock_client) + with self.assertRaises(InvalidArgumentType): + api.get_results("scan-1", range_end="not-an-int") + + def test_get_results_with_range_failure_raises_httpx_error(self): + mock_response = _mock_response() + mock_response.raise_for_status.side_effect = httpx.HTTPStatusError( + "Server failed", + request=MagicMock(), + response=MagicMock(status_code=500), + ) + self.mock_client.get.return_value = mock_response + + api = ScansAPI(self.mock_client) + with self.assertRaises(httpx.HTTPStatusError): + api.get_results("scan-1", range_start=5, range_end=10) + + self.mock_client.get.assert_called_once_with( + "/scans/scan-1/results", params={"range": "5-10"} + ) + + def test_get_results_with_range_failure_return_500_httpx_error_with_suppress_exceptions( + self, + ): + mock_response = _mock_response() + mock_http_error = httpx.HTTPStatusError( + "Server failed", + request=MagicMock(), + response=MagicMock(status_code=500), + ) + mock_response.raise_for_status.side_effect = mock_http_error + self.mock_client.get.return_value = mock_response + + api = ScansAPI(self.mock_client, suppress_exceptions=True) + response = api.get_results("scan-1", range_start=5, range_end=10) + + self.mock_client.get.assert_called_once_with( + "/scans/scan-1/results", params={"range": "5-10"} + ) + self.assertEqual(response, mock_http_error.response) + self.assertEqual(response.status_code, 500) + + def test_get_result(self): + mock_resp = _mock_response() + self.mock_client.get.return_value = mock_resp + + api = ScansAPI(self.mock_client) + api.get_result("scan-1", "99") + self.mock_client.get.assert_called_once_with("/scans/scan-1/results/99") + + def test_get_result_failure_raises_httpx_error(self): + mock_response = _mock_response() + mock_response.raise_for_status.side_effect = httpx.HTTPStatusError( + "Server failed", + request=MagicMock(), + response=MagicMock(status_code=500), + ) + self.mock_client.get.return_value = mock_response + + api = ScansAPI(self.mock_client) + with self.assertRaises(httpx.HTTPStatusError): + api.get_result("scan-1", "99") + + self.mock_client.get.assert_called_once_with("/scans/scan-1/results/99") + + def test_get_result_failure_return_500_httpx_error_with_suppress_exceptions( + self, + ): + mock_response = _mock_response() + mock_http_error = httpx.HTTPStatusError( + "Server failed", + request=MagicMock(), + response=MagicMock(status_code=500), + ) + mock_response.raise_for_status.side_effect = mock_http_error + self.mock_client.get.return_value = mock_response + + api = ScansAPI(self.mock_client, suppress_exceptions=True) + response = api.get_result("scan-1", "99") + + self.mock_client.get.assert_called_once_with("/scans/scan-1/results/99") + self.assertEqual(response, mock_http_error.response) + self.assertEqual(response.status_code, 500) + + def test_get_status(self): + mock_resp = _mock_response() + self.mock_client.get.return_value = mock_resp + + api = ScansAPI(self.mock_client) + api.get_status("scan-1") + self.mock_client.get.assert_called_once_with("/scans/scan-1/status") + + def test_get_status_failure_raises_httpx_error(self): + mock_response = _mock_response() + mock_response.raise_for_status.side_effect = httpx.HTTPStatusError( + "Server failed", + request=MagicMock(), + response=MagicMock(status_code=500), + ) + self.mock_client.get.return_value = mock_response + + api = ScansAPI(self.mock_client) + with self.assertRaises(httpx.HTTPStatusError): + api.get_status("scan-1") + + self.mock_client.get.assert_called_once_with("/scans/scan-1/status") + + def test_get_status_failure_return_500_httpx_error_with_suppress_exceptions( + self, + ): + mock_response = _mock_response() + mock_http_error = httpx.HTTPStatusError( + "Server failed", + request=MagicMock(), + response=MagicMock(status_code=500), + ) + mock_response.raise_for_status.side_effect = mock_http_error + self.mock_client.get.return_value = mock_response + + api = ScansAPI(self.mock_client, suppress_exceptions=True) + response = api.get_status("scan-1") + + self.mock_client.get.assert_called_once_with("/scans/scan-1/status") + self.assertEqual(response, mock_http_error.response) + self.assertEqual(response.status_code, 500) + + def test_run_action_success(self): + mock_resp = _mock_response(status_code=202) + self.mock_client.post.return_value = mock_resp + + api = ScansAPI(self.mock_client) + status = api._run_action("scan-1", ScanAction.START) + self.assertEqual(status, 202) + + def test_run_action_failure_raise_httpx_error(self): + self.mock_client.post.side_effect = httpx.HTTPStatusError( + "failed", request=MagicMock(), response=MagicMock(status_code=500) + ) + api = ScansAPI(self.mock_client) + with self.assertRaises(httpx.HTTPStatusError): + api._run_action("scan-1", ScanAction.STOP) + + def test_run_action_failure_returns_500_with_suppress_exceptions(self): + self.mock_client.post.side_effect = httpx.HTTPStatusError( + "failed", request=MagicMock(), response=MagicMock(status_code=500) + ) + + api = ScansAPI(self.mock_client, suppress_exceptions=True) + status = api._run_action("scan-1", ScanAction.STOP) + self.assertEqual(status, 500) + + def test_start_scan(self): + api = ScansAPI(self.mock_client) + api._run_action = MagicMock(return_value=200) + result = api.start("scan-abc") + api._run_action.assert_called_once_with("scan-abc", ScanAction.START) + self.assertEqual(result, 200) + + def test_start_scan_failure_raises_httpx_error(self): + api = ScansAPI(self.mock_client) + api._run_action = MagicMock( + side_effect=httpx.HTTPStatusError( + "Scan start failed", + request=MagicMock(), + response=MagicMock(status_code=500), + ) + ) + + with self.assertRaises(httpx.HTTPStatusError): + api.start("scan-abc") + + def test_start_scan_failure_returns_500_with_suppress_exceptions(self): + api = ScansAPI(self.mock_client, suppress_exceptions=True) + api._run_action = MagicMock(return_value=500) + result = api.start("scan-abc") + self.assertEqual(result, 500) + + def test_stop_scan(self): + api = ScansAPI(self.mock_client) + api._run_action = MagicMock(return_value=200) + result = api.stop("scan-abc") + api._run_action.assert_called_once_with("scan-abc", ScanAction.STOP) + self.assertEqual(result, 200) + + def test_stop_scan_failure_raises_httpx_error(self): + api = ScansAPI(self.mock_client) + api._run_action = MagicMock( + side_effect=httpx.HTTPStatusError( + "Scan stop failed", + request=MagicMock(), + response=MagicMock(status_code=500), + ) + ) + + with self.assertRaises(httpx.HTTPStatusError): + api.stop("scan-abc") + + def test_stop_scan_failure_returns_500_with_suppress_exceptions(self): + api = ScansAPI(self.mock_client, suppress_exceptions=True) + api._run_action = MagicMock(return_value=500) + result = api.stop("scan-abc") + self.assertEqual(result, 500) + + def test_get_preferences_success(self): + mock_resp = _mock_response( + 200, + [ + { + "id": "optimize_test", + "name": "Optimize Test", + "default": True, + "description": "By default, optimize_test is enabled...", + } + ], + ) + self.mock_client.get.return_value = mock_resp + + api = ScansAPI(self.mock_client) + response = api.get_preferences() + + self.mock_client.get.assert_called_once_with("/scans/preferences") + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json()[0]["id"], "optimize_test") + + def test_get_preferences_failure_raises_httpx_error(self): + mock_resp = _mock_response() + mock_resp.raise_for_status.side_effect = httpx.HTTPStatusError( + "Server error", + request=MagicMock(), + response=MagicMock(status_code=500), + ) + self.mock_client.get.return_value = mock_resp + + api = ScansAPI(self.mock_client) + + with self.assertRaises(httpx.HTTPStatusError): + api.get_preferences() + + self.mock_client.get.assert_called_once_with("/scans/preferences") + + def test_get_preferences_suppressed_exception_returns_response(self): + mock_error_response = MagicMock(status_code=400) + mock_resp = MagicMock() + mock_resp.raise_for_status.side_effect = httpx.HTTPStatusError( + "Bad Request", + request=MagicMock(), + response=mock_error_response, + ) + self.mock_client.get.return_value = mock_resp + + api = ScansAPI(self.mock_client, suppress_exceptions=True) + response = api.get_preferences() + + self.mock_client.get.assert_called_once_with("/scans/preferences") + self.assertEqual(response, mock_error_response) + self.assertEqual(response.status_code, 400) diff --git a/tests/protocols/http/openvasd/test_vts.py b/tests/protocols/http/openvasd/test_vts.py new file mode 100644 index 00000000..bc14b1f6 --- /dev/null +++ b/tests/protocols/http/openvasd/test_vts.py @@ -0,0 +1,95 @@ +# SPDX-FileCopyrightText: 2025 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later + +import unittest +from unittest.mock import MagicMock + +import httpx + +from gvm.protocols.http.openvasd._vts import VtsAPI + + +def _mock_response(status_code=200): + mock_response = MagicMock(spec=httpx.Response) + mock_response.status_code = status_code + mock_response.raise_for_status = MagicMock() + return mock_response + + +class TestVtsAPI(unittest.TestCase): + def setUp(self): + self.mock_client = MagicMock(spec=httpx.Client) + + def test_get_all_returns_response(self): + mock_response = _mock_response(200) + self.mock_client.get.return_value = mock_response + + api = VtsAPI(self.mock_client) + response = api.get_all() + self.mock_client.get.assert_called_once_with("/vts") + mock_response.raise_for_status.assert_called_once() + self.assertEqual(response, mock_response) + + def test_get_all_raises_httpx_error(self): + self.mock_client.get.side_effect = httpx.HTTPStatusError( + "Error", request=MagicMock(), response=MagicMock(status_code=500) + ) + api = VtsAPI(self.mock_client) + with self.assertRaises(httpx.HTTPStatusError): + api.get_all() + + def test_get_all_returns_response_on_httpx_error_with_suppress_exceptions( + self, + ): + mock_response = _mock_response() + mock_http_error = httpx.HTTPStatusError( + "Server failed", + request=MagicMock(), + response=MagicMock(status_code=500), + ) + mock_response.raise_for_status.side_effect = mock_http_error + self.mock_client.get.return_value = mock_response + + api = VtsAPI(self.mock_client, suppress_exceptions=True) + result = api.get_all() + + self.mock_client.get.assert_called_once_with("/vts") + self.assertEqual(result, mock_http_error.response) + + def test_get_by_oid_returns_response(self): + mock_response = _mock_response(200) + self.mock_client.get.return_value = mock_response + + oid = "1.3.6.1.4.1.25623.1.0.123456" + api = VtsAPI(self.mock_client) + response = api.get(oid) + self.mock_client.get.assert_called_once_with(f"/vts/{oid}") + mock_response.raise_for_status.assert_called_once() + self.assertEqual(response, mock_response) + + def test_get_by_oid_raises_httpx_error(self): + self.mock_client.get.side_effect = httpx.HTTPStatusError( + "Not Found", + request=MagicMock(), + response=MagicMock(status_code=404), + ) + + api = VtsAPI(self.mock_client) + with self.assertRaises(httpx.HTTPStatusError): + api.get("nonexistent-oid") + + def test_get_by_oid_response_on_httpx_error_with_suppress_exceptions(self): + mock_response = _mock_response() + mock_http_error = httpx.HTTPStatusError( + "Not Found", + request=MagicMock(), + response=MagicMock(status_code=404), + ) + mock_response.raise_for_status.side_effect = mock_http_error + self.mock_client.get.return_value = mock_response + + api = VtsAPI(self.mock_client, suppress_exceptions=True) + response = api.get("nonexistent-oid") + + self.assertEqual(response, mock_http_error.response)