Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 172 additions & 1 deletion gvm/protocols/gmp/_gmpnext.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#
# SPDX-License-Identifier: GPL-3.0-or-later

from typing import Any, Mapping, Optional, Sequence
from typing import Any, Mapping, Optional, Sequence, Union

from gvm.protocols.gmp.requests import EntityID

Expand All @@ -13,6 +13,9 @@
AgentGroups,
AgentInstallers,
Agents,
Credentials,
CredentialStoreCredentialType,
CredentialStores,
OCIImageTargets,
Tasks,
)
Expand Down Expand Up @@ -297,6 +300,174 @@ def clone_agent_group(
AgentGroups.clone_agent_group(agent_group_id)
)

def create_credential_store_credential(
self,
name: str,
credential_type: Union[CredentialStoreCredentialType, str],
*,
comment: Optional[str] = None,
credential_store_id: Optional[EntityID] = None,
vault_id: Optional[str] = None,
host_identifier: Optional[str] = None,
) -> T:
"""Create a new credential store type credential

Args:
name: Name of the credential
credential_type: Type of the credential
comment: Optional comment for the credential object
credential_store_id: Optional credential store id to fetch the credential from
vault_id: Vault id used to fetch the credential from credential store
host_identifier: Host identifier used to fetch the credential from credential store
"""
return self._send_request_and_transform_response(
Credentials.create_credential_store_credential(
name=name,
credential_type=credential_type,
comment=comment,
credential_store_id=credential_store_id,
vault_id=vault_id,
host_identifier=host_identifier,
)
)

def modify_credential_store_credential(
self,
credential_id: EntityID,
*,
name: Optional[str] = None,
comment: Optional[str] = None,
credential_store_id: Optional[EntityID] = None,
vault_id: Optional[str] = None,
host_identifier: Optional[str] = None,
) -> T:
"""Modify an existing credential stored in a credential store

Args:
credential_id: UUID of the credential to modify
name: Name of the credential
comment: Optional comment for the credential object
credential_store_id: Optional credential store id to fetch the credential from
vault_id: Vault id used to fetch the credential from credential store
host_identifier: Host identifier used to fetch the credential from credential store
"""
return self._send_request_and_transform_response(
Credentials.modify_credential_store_credential(
credential_id=credential_id,
name=name,
comment=comment,
credential_store_id=credential_store_id,
vault_id=vault_id,
host_identifier=host_identifier,
)
)

def get_credential_store(
self,
credential_store_id: EntityID,
*,
details: Optional[bool] = None,
) -> T:
"""Request a credential store

Args:
credential_store_id: ID of credential store to fetch
details: True to request all details
"""
return self._send_request_and_transform_response(
CredentialStores.get_credential_store(
credential_store_id=credential_store_id,
details=details,
)
)

def get_credential_stores(
self,
*,
filter_string: Optional[str] = None,
filter_id: Optional[EntityID] = None,
details: Optional[bool] = None,
) -> T:
"""Request a list of credential stores

Args:
filter_string: Filter term to use for the query
filter_id: UUID of an existing filter to use for the query
details: True to request all details
"""
return self._send_request_and_transform_response(
CredentialStores.get_credential_stores(
filter_string=filter_string,
filter_id=filter_id,
details=details,
)
)

def modify_credential_store(
self,
credential_store_id: EntityID,
*,
active: Optional[bool] = None,
host: Optional[str] = None,
port: Optional[int] = None,
path: Optional[str] = None,
app_id: Optional[str] = None,
client_cert: Optional[str] = None,
client_key: Optional[str] = None,
client_pkcs12_file: Optional[str] = None,
passphrase: Optional[str] = None,
server_ca_cert: Optional[str] = None,
comment: Optional[str] = None,
) -> T:
"""Modify an existing credential store

Args:
credential_store_id: ID of credential store to fetch
active: Whether the credential store is active
host: The host to use for reaching the credential store
port: The port to use for reaching the credential store
path: The URI path the credential store is using
app_id: Depends on the credential store used. Usually called the same in the credential store
client_cert: The client certificate to use for authorization, as a plain string
client_key: The client key to use for authorization, as a plain string
client_pkcs12_file: The pkcs12 file contents to use for authorization, as a plain string
(alternative to using client_cert and client_key)
passphrase: The passphrase to use to decrypt client_pkcs12_file or client_key file
server_ca_cert: The server certificate, so the credential store can be trusted
comment: An optional comment to store alongside the credential store
"""
return self._send_request_and_transform_response(
CredentialStores.modify_credential_store(
credential_store_id=credential_store_id,
active=active,
host=host,
port=port,
path=path,
app_id=app_id,
client_cert=client_cert,
client_key=client_key,
client_pkcs12_file=client_pkcs12_file,
passphrase=passphrase,
server_ca_cert=server_ca_cert,
comment=comment,
)
)

def verify_credential_store(
self,
credential_store_id: EntityID,
) -> T:
"""Verify that the connection to an existing credential store works

Args:
credential_store_id: The uuid of the credential store to verify
"""
return self._send_request_and_transform_response(
CredentialStores.verify_credential_store(
credential_store_id=credential_store_id,
)
)

def create_oci_image_target(
self,
name: str,
Expand Down
8 changes: 7 additions & 1 deletion gvm/protocols/gmp/requests/next/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@
from gvm.protocols.gmp.requests.next._agent_groups import AgentGroups
from gvm.protocols.gmp.requests.next._agent_installers import AgentInstallers
from gvm.protocols.gmp.requests.next._agents import Agents
from gvm.protocols.gmp.requests.next._credential_stores import CredentialStores
from gvm.protocols.gmp.requests.next._credentials import (
Credentials,
CredentialStoreCredentialType,
)
from gvm.protocols.gmp.requests.next._oci_image_targets import OCIImageTargets
from gvm.protocols.gmp.requests.next._tasks import Tasks

Expand All @@ -24,7 +29,6 @@
CertBundAdvisories,
Cpes,
CredentialFormat,
Credentials,
CredentialType,
Cves,
DfnCertAdvisories,
Expand Down Expand Up @@ -98,6 +102,8 @@
"Credentials",
"CredentialFormat",
"CredentialType",
"CredentialStoreCredentialType",
"CredentialStores",
"Cves",
"DfnCertAdvisories",
"EntityID",
Expand Down
171 changes: 171 additions & 0 deletions gvm/protocols/gmp/requests/next/_credential_stores.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
# SPDX-FileCopyrightText: 2025 Greenbone AG
#
# SPDX-License-Identifier: GPL-3.0-or-later

from base64 import b64encode
from typing import Optional

from gvm.errors import RequiredArgument
from gvm.protocols.core import Request
from gvm.utils import to_bool
from gvm.xml import XmlCommand

from .._entity_id import EntityID


class CredentialStores:
@classmethod
def get_credential_store(
cls,
credential_store_id: EntityID,
*,
details: Optional[bool] = None,
) -> Request:
"""Request a credential store

Args:
credential_store_id: ID of credential store to fetch
details: True to request all details
"""

if not credential_store_id:
raise RequiredArgument(
function=cls.get_credential_store.__name__,
argument="credential_store_id",
)

cmd = XmlCommand("get_credential_stores")
cmd.add_element("credential_store_id", str(credential_store_id))

if details is not None:
cmd.set_attribute("details", to_bool(details))

return cmd

@classmethod
def get_credential_stores(
cls,
*,
filter_string: Optional[str] = None,
filter_id: Optional[EntityID] = None,
details: Optional[bool] = None,
) -> Request:
"""Request a list of credential stores

Args:
filter_string: Filter term to use for the query
filter_id: UUID of an existing filter to use for the query
details: True to request all details
"""

cmd = XmlCommand("get_credential_stores")
cmd.add_filter(filter_string, filter_id)

if details is not None:
cmd.set_attribute("details", to_bool(details))

return cmd

@classmethod
def modify_credential_store(
cls,
credential_store_id: EntityID,
*,
active: Optional[bool] = None,
host: Optional[str] = None,
port: Optional[int] = None,
path: Optional[str] = None,
app_id: Optional[str] = None,
client_cert: Optional[str] = None,
client_key: Optional[str] = None,
client_pkcs12_file: Optional[str] = None,
passphrase: Optional[str] = None,
server_ca_cert: Optional[str] = None,
comment: Optional[str] = None,
) -> Request:
"""Modify a credential store

Args:
credential_store_id: ID of credential store to fetch
active: Whether the credential store is active
host: The host to use for reaching the credential store
port: The port to use for reaching the credential store
path: The URI path the credential store is using
app_id: Depends on the credential store used. Usually called the same in the credential store
client_cert: The client certificate to use for authorization, as a plain string
client_key: The client key to use for authorization, as a plain string
client_pkcs12_file: The pkcs12 file contents to use for authorization, as a plain string
(alternative to using client_cert and client_key)
passphrase: The passphrase to use to decrypt client_pkcs12_file or client_key file
server_ca_cert: The server certificate, so the credential store can be trusted
comment: An optional comment to store alongside the credential store
"""

if not credential_store_id:
raise RequiredArgument(
function=cls.verify_credential_store.__name__,
argument="credential_store_id",
)

cmd = XmlCommand("modify_credential_store")
cmd.set_attribute("credential_store_id", str(credential_store_id))

if active is not None:
cmd.add_element("active", to_bool(active))
if host:
cmd.add_element("host", host)
if port:
cmd.add_element("port", str(port))
if path:
cmd.add_element("path", path)
if comment:
cmd.add_element("comment", comment)

preferences = cmd.add_element("preferences")

if app_id:
preferences.add_element("app_id", app_id)
if client_cert:
preferences.add_element(
"client_cert",
b64encode(client_cert.encode("ascii")).decode("ascii"),
)
if client_key:
preferences.add_element(
"client_key",
b64encode(client_key.encode("ascii")).decode("ascii"),
)
if client_pkcs12_file:
preferences.add_element(
"client_pkcs12_file",
b64encode(client_pkcs12_file.encode("ascii")).decode("ascii"),
)
if passphrase:
preferences.add_element("passphrase", passphrase)
if server_ca_cert:
preferences.add_element(
"server_ca_cert",
b64encode(server_ca_cert.encode("ascii")).decode("ascii"),
)

return cmd

@classmethod
def verify_credential_store(
cls,
credential_store_id: EntityID,
) -> Request:
"""Verify that the connection to a credential store works

Args:
credential_store_id: The uuid of the credential store to verify
"""
if not credential_store_id:
raise RequiredArgument(
function=cls.verify_credential_store.__name__,
argument="credential_store_id",
)

cmd = XmlCommand("verify_credential_store")
cmd.set_attribute("credential_store_id", str(credential_store_id))
return cmd
Loading