diff --git a/gvm/protocols/gmp/_gmpnext.py b/gvm/protocols/gmp/_gmpnext.py
index dec6654c..b09dc5c4 100644
--- a/gvm/protocols/gmp/_gmpnext.py
+++ b/gvm/protocols/gmp/_gmpnext.py
@@ -454,6 +454,49 @@ def create_agent_group_task(
)
)
+ def create_container_image_task(
+ self,
+ name: str,
+ oci_image_target_id: EntityID,
+ scanner_id: EntityID,
+ *,
+ comment: Optional[str] = None,
+ alterable: Optional[bool] = None,
+ schedule_id: Optional[EntityID] = None,
+ alert_ids: Optional[Sequence[EntityID]] = None,
+ schedule_periods: Optional[int] = None,
+ observers: Optional[Sequence[str]] = None,
+ preferences: Optional[Mapping[str, SupportsStr]] = None,
+ ) -> T:
+ """Create a new scan task using an OCI image target.
+
+ Args:
+ name: Name of the new task.
+ oci_image_target_id: UUID of the OCI image target to be scanned.
+ scanner_id: UUID of scanner to use for scanning the agents.
+ comment: Optional comment for the task.
+ alterable: Whether the task should be alterable.
+ alert_ids: List of UUIDs for alerts to be applied to the task.
+ schedule_id: UUID of a schedule when the task should be run.
+ schedule_periods: Limit to number of scheduled runs, 0 for unlimited.
+ observers: List of usernames or IDs allowed to observe the task.
+ preferences: Scanner preferences as name/value pairs.
+ """
+ return self._send_request_and_transform_response(
+ Tasks.create_container_image_task(
+ name=name,
+ oci_image_target_id=oci_image_target_id,
+ scanner_id=scanner_id,
+ comment=comment,
+ alterable=alterable,
+ schedule_id=schedule_id,
+ alert_ids=alert_ids,
+ schedule_periods=schedule_periods,
+ observers=observers,
+ preferences=preferences,
+ )
+ )
+
def create_container_task(
self, name: str, *, comment: Optional[str] = None
) -> T:
@@ -586,6 +629,7 @@ def modify_task(
target_id: Optional[EntityID] = None,
scanner_id: Optional[EntityID] = None,
agent_group_id: Optional[EntityID] = None,
+ oci_image_target_id: Optional[EntityID] = None,
alterable: Optional[bool] = None,
hosts_ordering: Optional[HostsOrdering] = None,
schedule_id: Optional[EntityID] = None,
@@ -604,6 +648,7 @@ def modify_task(
target_id: UUID of target to be scanned
scanner_id: UUID of scanner to use for scanning the target
agent_group_id: UUID of agent group to use for scanning
+ oci_image_target_id: UUID of the OCI Image target to be scanned.
comment: The comment on the task.
alert_ids: List of UUIDs for alerts to be applied to the task
hosts_ordering: The order hosts are scanned in
@@ -622,6 +667,7 @@ def modify_task(
target_id=target_id,
scanner_id=scanner_id,
agent_group_id=agent_group_id,
+ oci_image_target_id=oci_image_target_id,
alterable=alterable,
hosts_ordering=hosts_ordering,
schedule_id=schedule_id,
diff --git a/gvm/protocols/gmp/requests/next/_tasks.py b/gvm/protocols/gmp/requests/next/_tasks.py
index 6e81e1ee..f64cd729 100644
--- a/gvm/protocols/gmp/requests/next/_tasks.py
+++ b/gvm/protocols/gmp/requests/next/_tasks.py
@@ -119,6 +119,96 @@ def create_agent_group_task(
return cmd
+ @classmethod
+ def create_container_image_task(
+ cls,
+ name: str,
+ oci_image_target_id: EntityID,
+ scanner_id: EntityID,
+ *,
+ comment: Optional[str] = None,
+ alterable: Optional[bool] = None,
+ schedule_id: Optional[EntityID] = None,
+ alert_ids: Optional[Sequence[EntityID]] = None,
+ schedule_periods: Optional[int] = None,
+ observers: Optional[Sequence[str]] = None,
+ preferences: Optional[Mapping[str, SupportsStr]] = None,
+ ) -> Request:
+ """Create a new scan task using an OCI image target.
+
+ Args:
+ name: Name of the new task.
+ oci_image_target_id: UUID of the OCI Image target to be scanned.
+ scanner_id: UUID of scanner to use for scanning the agents.
+ comment: Optional comment for the task.
+ alterable: Whether the task should be alterable.
+ alert_ids: List of UUIDs for alerts to be applied to the task.
+ schedule_id: UUID of a schedule when the task should be run.
+ schedule_periods: Limit to number of scheduled runs, 0 for unlimited.
+ observers: List of usernames or IDs allowed to observe the task.
+ preferences: Scanner preferences as name/value pairs.
+ """
+ if not name:
+ raise RequiredArgument(
+ function=cls.create_container_image_task.__name__,
+ argument="name",
+ )
+
+ if not oci_image_target_id:
+ raise RequiredArgument(
+ function=cls.create_container_image_task.__name__,
+ argument="oci_image_target_id",
+ )
+
+ if not scanner_id:
+ raise RequiredArgument(
+ function=cls.create_container_image_task.__name__,
+ argument="scanner_id",
+ )
+
+ cmd = XmlCommand("create_task")
+ cmd.add_element("name", name)
+ cmd.add_element("usage_type", "scan")
+ cmd.add_element(
+ "oci_image_target", attrs={"id": str(oci_image_target_id)}
+ )
+ cmd.add_element("scanner", attrs={"id": str(scanner_id)})
+
+ if comment:
+ cmd.add_element("comment", comment)
+
+ if alterable is not None:
+ cmd.add_element("alterable", to_bool(alterable))
+
+ if alert_ids:
+ for alert in alert_ids:
+ cmd.add_element("alert", attrs={"id": str(alert)})
+
+ if schedule_id:
+ cmd.add_element("schedule", attrs={"id": str(schedule_id)})
+
+ if schedule_periods is not None:
+ if (
+ not isinstance(schedule_periods, Integral)
+ or schedule_periods < 0
+ ):
+ raise InvalidArgument(
+ "schedule_periods must be an integer greater or equal than 0"
+ )
+ cmd.add_element("schedule_periods", str(schedule_periods))
+
+ if observers:
+ cmd.add_element("observers", to_comma_list(observers))
+
+ if preferences is not None:
+ xml_prefs = cmd.add_element("preferences")
+ for pref_name, pref_value in preferences.items():
+ xml_pref = xml_prefs.add_element("preference")
+ xml_pref.add_element("scanner_name", pref_name)
+ xml_pref.add_element("value", str(pref_value))
+
+ return cmd
+
@classmethod
def create_container_task(
cls, name: str, *, comment: Optional[str] = None
@@ -350,6 +440,7 @@ def modify_task(
target_id: Optional[EntityID] = None,
scanner_id: Optional[EntityID] = None,
agent_group_id: Optional[EntityID] = None,
+ oci_image_target_id: Optional[EntityID] = None,
alterable: Optional[bool] = None,
hosts_ordering: Optional[HostsOrdering] = None,
schedule_id: Optional[EntityID] = None,
@@ -368,6 +459,7 @@ def modify_task(
target_id: UUID of target to be scanned
scanner_id: UUID of scanner to use for scanning the target
agent_group_id: UUID of agent group to use for scanning
+ oci_image_target_id: UUID of the OCI Image target to be scanned.
comment: The comment on the task.
alert_ids: List of UUIDs for alerts to be applied to the task
hosts_ordering: The order hosts are scanned in
@@ -383,11 +475,17 @@ def modify_task(
function=cls.modify_task.__name__, argument="task_id"
)
- if target_id and agent_group_id:
+ if (
+ sum(
+ id is not None
+ for id in (target_id, agent_group_id, oci_image_target_id)
+ )
+ > 1
+ ):
raise InvalidArgument(
function=cls.modify_task.__name__,
- argument="target_id/agent_group_id",
- message="Only one of target_id or agent_group_id can be modified at a time",
+ argument="target_id/agent_group_id/oci_image_target_id",
+ message="Only one of target_id, agent_group_id or oci_image_target_id can be modified at a time",
)
cmd = XmlCommand("modify_task")
@@ -408,6 +506,11 @@ def modify_task(
if agent_group_id:
cmd.add_element("agent_group", attrs={"id": str(agent_group_id)})
+ if oci_image_target_id:
+ cmd.add_element(
+ "oci_image_target", attrs={"id": str(oci_image_target_id)}
+ )
+
if alterable is not None:
cmd.add_element("alterable", to_bool(alterable))
diff --git a/tests/protocols/gmpnext/entities/tasks/__init__.py b/tests/protocols/gmpnext/entities/tasks/__init__.py
index 36374649..8f709f64 100644
--- a/tests/protocols/gmpnext/entities/tasks/__init__.py
+++ b/tests/protocols/gmpnext/entities/tasks/__init__.py
@@ -5,6 +5,9 @@
from .test_clone_task import GmpCloneTaskTestMixin
from .test_create_agent_group_task import GmpCreateAgentGroupTaskTestMixin
+from .test_create_container_image_task import (
+ GmpCreateContainerImageTaskTestMixin,
+)
from .test_create_container_task import GmpCreateContainerTaskTestMixin
from .test_create_task import GmpCreateTaskTestMixin
from .test_delete_task import GmpDeleteTaskTestMixin
@@ -19,6 +22,7 @@
__all__ = (
"GmpCloneTaskTestMixin",
"GmpCreateAgentGroupTaskTestMixin",
+ "GmpCreateContainerImageTaskTestMixin",
"GmpCreateContainerTaskTestMixin",
"GmpCreateTaskTestMixin",
"GmpDeleteTaskTestMixin",
diff --git a/tests/protocols/gmpnext/entities/tasks/test_create_container_image_task.py b/tests/protocols/gmpnext/entities/tasks/test_create_container_image_task.py
new file mode 100644
index 00000000..ea53843a
--- /dev/null
+++ b/tests/protocols/gmpnext/entities/tasks/test_create_container_image_task.py
@@ -0,0 +1,219 @@
+# SPDX-FileCopyrightText: 2018-2025 Greenbone AG
+#
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+
+from collections import OrderedDict
+
+from gvm.errors import InvalidArgument, RequiredArgument
+
+
+class GmpCreateContainerImageTaskTestMixin:
+ def test_create_container_image_task(self):
+ self.gmp.create_container_image_task(
+ name="foo", oci_image_target_id="it1", scanner_id="s1"
+ )
+
+ self.connection.send.has_been_called_with(
+ b""
+ b"foo"
+ b"scan"
+ b''
+ b''
+ b""
+ )
+
+ def test_create_container_image_task_missing_name(self):
+ with self.assertRaises(RequiredArgument):
+ self.gmp.create_container_image_task(
+ name=None, oci_image_target_id="it1", scanner_id="s1"
+ )
+ with self.assertRaises(RequiredArgument):
+ self.gmp.create_container_image_task(
+ name="", oci_image_target_id="it1", scanner_id="s1"
+ )
+
+ def test_create_container_image_task_missing_oci_image_target_id(self):
+ with self.assertRaises(RequiredArgument):
+ self.gmp.create_container_image_task(
+ name="foo", oci_image_target_id=None, scanner_id="s1"
+ )
+ with self.assertRaises(RequiredArgument):
+ self.gmp.create_container_image_task(
+ name="foo", oci_image_target_id="", scanner_id="s1"
+ )
+
+ def test_create_container_image_task_missing_scanner_id(self):
+ with self.assertRaises(RequiredArgument):
+ self.gmp.create_container_image_task(
+ name="foo", oci_image_target_id="it1", scanner_id=None
+ )
+ with self.assertRaises(RequiredArgument):
+ self.gmp.create_container_image_task(
+ name="foo", oci_image_target_id="it1", scanner_id=""
+ )
+
+ def test_create_container_image_task_with_comment(self):
+ self.gmp.create_container_image_task(
+ name="foo",
+ oci_image_target_id="it1",
+ scanner_id="s1",
+ comment="my comment",
+ )
+
+ self.connection.send.has_been_called_with(
+ b""
+ b"foo"
+ b"scan"
+ b''
+ b''
+ b"my comment"
+ b""
+ )
+
+ def test_create_container_image_task_with_alerts(self):
+ self.gmp.create_container_image_task(
+ name="foo",
+ oci_image_target_id="it1",
+ scanner_id="s1",
+ alert_ids=["a1", "a2"],
+ )
+
+ self.connection.send.has_been_called_with(
+ b""
+ b"foo"
+ b"scan"
+ b''
+ b''
+ b''
+ b''
+ b""
+ )
+
+ def test_create_container_image_task_with_empty_alerts(self):
+ self.gmp.create_container_image_task(
+ name="foo", oci_image_target_id="it1", scanner_id="s1", alert_ids=[]
+ )
+
+ self.connection.send.has_been_called_with(
+ b""
+ b"foo"
+ b"scan"
+ b''
+ b''
+ b""
+ )
+
+ def test_create_container_image_task_with_schedule(self):
+ self.gmp.create_container_image_task(
+ name="foo",
+ oci_image_target_id="it1",
+ scanner_id="s1",
+ schedule_id="s1",
+ )
+
+ self.connection.send.has_been_called_with(
+ b""
+ b"foo"
+ b"scan"
+ b''
+ b''
+ b''
+ b""
+ )
+
+ def test_create_create_container_image_task_with_schedule_periods(self):
+ self.gmp.create_container_image_task(
+ name="foo",
+ oci_image_target_id="it1",
+ scanner_id="s1",
+ schedule_id="s1",
+ schedule_periods=5,
+ )
+
+ self.connection.send.has_been_called_with(
+ b""
+ b"foo"
+ b"scan"
+ b''
+ b''
+ b''
+ b"5"
+ b""
+ )
+
+ def test_create_container_image_task_with_invalid_schedule_periods(self):
+ with self.assertRaises(InvalidArgument):
+ self.gmp.create_container_image_task(
+ name="foo",
+ oci_image_target_id="it1",
+ scanner_id="s1",
+ schedule_id="s1",
+ schedule_periods="invalid",
+ )
+
+ with self.assertRaises(InvalidArgument):
+ self.gmp.create_container_image_task(
+ name="foo",
+ oci_image_target_id="it1",
+ scanner_id="s1",
+ schedule_id="s1",
+ schedule_periods=-1,
+ )
+
+ def test_create_container_image_task_with_alterable(self):
+ self.gmp.create_container_image_task(
+ name="foo",
+ oci_image_target_id="it1",
+ scanner_id="s1",
+ alterable=True,
+ )
+
+ self.connection.send.has_been_called_with(
+ b""
+ b"foo"
+ b"scan"
+ b''
+ b''
+ b"1"
+ b""
+ )
+
+ def test_create__container_image_task_with_observers(self):
+ self.gmp.create_container_image_task(
+ name="foo",
+ oci_image_target_id="it1",
+ scanner_id="s1",
+ observers=["u1", "u2"],
+ )
+
+ self.connection.send.has_been_called_with(
+ b""
+ b"foo"
+ b"scan"
+ b''
+ b''
+ b"u1,u2"
+ b""
+ )
+
+ def test_create_container_image_task_with_preferences(self):
+ self.gmp.create_container_image_task(
+ name="foo",
+ oci_image_target_id="it1",
+ scanner_id="s1",
+ preferences=OrderedDict([("pref1", "val1"), ("pref2", "val2")]),
+ )
+
+ self.connection.send.has_been_called_with(
+ b""
+ b"foo"
+ b"scan"
+ b''
+ b''
+ b""
+ b"pref1val1"
+ b"pref2val2"
+ b""
+ b""
+ )
diff --git a/tests/protocols/gmpnext/entities/tasks/test_modify_task.py b/tests/protocols/gmpnext/entities/tasks/test_modify_task.py
index b1281bd5..0cc81174 100644
--- a/tests/protocols/gmpnext/entities/tasks/test_modify_task.py
+++ b/tests/protocols/gmpnext/entities/tasks/test_modify_task.py
@@ -62,12 +62,31 @@ def test_modify_task_with_agent_group_id(self):
b''
)
+ def test_modify_task_with_oci_image_target_id(self):
+ self.gmp.modify_task(task_id="t1", oci_image_target_id="it1")
+
+ self.connection.send.has_been_called_with(
+ b''
+ )
+
def test_modify_task_with_target_and_agent_group(self):
with self.assertRaises(InvalidArgument):
self.gmp.modify_task(
task_id="t1", target_id="t1", agent_group_id="ag1"
)
+ def test_modify_task_with_target_and_oci_image_target(self):
+ with self.assertRaises(InvalidArgument):
+ self.gmp.modify_task(
+ task_id="t1", target_id="t1", oci_image_target_id="it1"
+ )
+
+ def test_modify_task_with_agent_group_and_oci_image_target(self):
+ with self.assertRaises(InvalidArgument):
+ self.gmp.modify_task(
+ task_id="t1", agent_group_id="ag1", oci_image_target_id="it1"
+ )
+
def test_modify_task_with_schedule_id(self):
self.gmp.modify_task(task_id="t1", schedule_id="s1")
diff --git a/tests/protocols/gmpnext/entities/test_tasks.py b/tests/protocols/gmpnext/entities/test_tasks.py
index 386e816a..1a0ae32f 100644
--- a/tests/protocols/gmpnext/entities/test_tasks.py
+++ b/tests/protocols/gmpnext/entities/test_tasks.py
@@ -7,6 +7,7 @@
from ...gmpnext.entities.tasks import (
GmpCloneTaskTestMixin,
GmpCreateAgentGroupTaskTestMixin,
+ GmpCreateContainerImageTaskTestMixin,
GmpCreateContainerTaskTestMixin,
GmpCreateTaskTestMixin,
GmpDeleteTaskTestMixin,
@@ -30,6 +31,12 @@ class GmpCreateAgentGroupTaskTestCase(
pass
+class GMPCreateContainerImageTaskTestCase(
+ GmpCreateContainerImageTaskTestMixin, GMPTestCase
+):
+ pass
+
+
class GMPCreateContainerTaskTestCase(
GmpCreateContainerTaskTestMixin, GMPTestCase
):