diff --git a/gvm/protocols/gmp/_gmpnext.py b/gvm/protocols/gmp/_gmpnext.py index dec6654c..b09dc5c4 100644 --- a/gvm/protocols/gmp/_gmpnext.py +++ b/gvm/protocols/gmp/_gmpnext.py @@ -454,6 +454,49 @@ def create_agent_group_task( ) ) + def create_container_image_task( + self, + name: str, + oci_image_target_id: EntityID, + scanner_id: EntityID, + *, + comment: Optional[str] = None, + alterable: Optional[bool] = None, + schedule_id: Optional[EntityID] = None, + alert_ids: Optional[Sequence[EntityID]] = None, + schedule_periods: Optional[int] = None, + observers: Optional[Sequence[str]] = None, + preferences: Optional[Mapping[str, SupportsStr]] = None, + ) -> T: + """Create a new scan task using an OCI image target. + + Args: + name: Name of the new task. + oci_image_target_id: UUID of the OCI image target to be scanned. + scanner_id: UUID of scanner to use for scanning the agents. + comment: Optional comment for the task. + alterable: Whether the task should be alterable. + alert_ids: List of UUIDs for alerts to be applied to the task. + schedule_id: UUID of a schedule when the task should be run. + schedule_periods: Limit to number of scheduled runs, 0 for unlimited. + observers: List of usernames or IDs allowed to observe the task. + preferences: Scanner preferences as name/value pairs. + """ + return self._send_request_and_transform_response( + Tasks.create_container_image_task( + name=name, + oci_image_target_id=oci_image_target_id, + scanner_id=scanner_id, + comment=comment, + alterable=alterable, + schedule_id=schedule_id, + alert_ids=alert_ids, + schedule_periods=schedule_periods, + observers=observers, + preferences=preferences, + ) + ) + def create_container_task( self, name: str, *, comment: Optional[str] = None ) -> T: @@ -586,6 +629,7 @@ def modify_task( target_id: Optional[EntityID] = None, scanner_id: Optional[EntityID] = None, agent_group_id: Optional[EntityID] = None, + oci_image_target_id: Optional[EntityID] = None, alterable: Optional[bool] = None, hosts_ordering: Optional[HostsOrdering] = None, schedule_id: Optional[EntityID] = None, @@ -604,6 +648,7 @@ def modify_task( target_id: UUID of target to be scanned scanner_id: UUID of scanner to use for scanning the target agent_group_id: UUID of agent group to use for scanning + oci_image_target_id: UUID of the OCI Image target to be scanned. comment: The comment on the task. alert_ids: List of UUIDs for alerts to be applied to the task hosts_ordering: The order hosts are scanned in @@ -622,6 +667,7 @@ def modify_task( target_id=target_id, scanner_id=scanner_id, agent_group_id=agent_group_id, + oci_image_target_id=oci_image_target_id, alterable=alterable, hosts_ordering=hosts_ordering, schedule_id=schedule_id, diff --git a/gvm/protocols/gmp/requests/next/_tasks.py b/gvm/protocols/gmp/requests/next/_tasks.py index 6e81e1ee..f64cd729 100644 --- a/gvm/protocols/gmp/requests/next/_tasks.py +++ b/gvm/protocols/gmp/requests/next/_tasks.py @@ -119,6 +119,96 @@ def create_agent_group_task( return cmd + @classmethod + def create_container_image_task( + cls, + name: str, + oci_image_target_id: EntityID, + scanner_id: EntityID, + *, + comment: Optional[str] = None, + alterable: Optional[bool] = None, + schedule_id: Optional[EntityID] = None, + alert_ids: Optional[Sequence[EntityID]] = None, + schedule_periods: Optional[int] = None, + observers: Optional[Sequence[str]] = None, + preferences: Optional[Mapping[str, SupportsStr]] = None, + ) -> Request: + """Create a new scan task using an OCI image target. + + Args: + name: Name of the new task. + oci_image_target_id: UUID of the OCI Image target to be scanned. + scanner_id: UUID of scanner to use for scanning the agents. + comment: Optional comment for the task. + alterable: Whether the task should be alterable. + alert_ids: List of UUIDs for alerts to be applied to the task. + schedule_id: UUID of a schedule when the task should be run. + schedule_periods: Limit to number of scheduled runs, 0 for unlimited. + observers: List of usernames or IDs allowed to observe the task. + preferences: Scanner preferences as name/value pairs. + """ + if not name: + raise RequiredArgument( + function=cls.create_container_image_task.__name__, + argument="name", + ) + + if not oci_image_target_id: + raise RequiredArgument( + function=cls.create_container_image_task.__name__, + argument="oci_image_target_id", + ) + + if not scanner_id: + raise RequiredArgument( + function=cls.create_container_image_task.__name__, + argument="scanner_id", + ) + + cmd = XmlCommand("create_task") + cmd.add_element("name", name) + cmd.add_element("usage_type", "scan") + cmd.add_element( + "oci_image_target", attrs={"id": str(oci_image_target_id)} + ) + cmd.add_element("scanner", attrs={"id": str(scanner_id)}) + + if comment: + cmd.add_element("comment", comment) + + if alterable is not None: + cmd.add_element("alterable", to_bool(alterable)) + + if alert_ids: + for alert in alert_ids: + cmd.add_element("alert", attrs={"id": str(alert)}) + + if schedule_id: + cmd.add_element("schedule", attrs={"id": str(schedule_id)}) + + if schedule_periods is not None: + if ( + not isinstance(schedule_periods, Integral) + or schedule_periods < 0 + ): + raise InvalidArgument( + "schedule_periods must be an integer greater or equal than 0" + ) + cmd.add_element("schedule_periods", str(schedule_periods)) + + if observers: + cmd.add_element("observers", to_comma_list(observers)) + + if preferences is not None: + xml_prefs = cmd.add_element("preferences") + for pref_name, pref_value in preferences.items(): + xml_pref = xml_prefs.add_element("preference") + xml_pref.add_element("scanner_name", pref_name) + xml_pref.add_element("value", str(pref_value)) + + return cmd + @classmethod def create_container_task( cls, name: str, *, comment: Optional[str] = None @@ -350,6 +440,7 @@ def modify_task( target_id: Optional[EntityID] = None, scanner_id: Optional[EntityID] = None, agent_group_id: Optional[EntityID] = None, + oci_image_target_id: Optional[EntityID] = None, alterable: Optional[bool] = None, hosts_ordering: Optional[HostsOrdering] = None, schedule_id: Optional[EntityID] = None, @@ -368,6 +459,7 @@ def modify_task( target_id: UUID of target to be scanned scanner_id: UUID of scanner to use for scanning the target agent_group_id: UUID of agent group to use for scanning + oci_image_target_id: UUID of the OCI Image target to be scanned. comment: The comment on the task. alert_ids: List of UUIDs for alerts to be applied to the task hosts_ordering: The order hosts are scanned in @@ -383,11 +475,17 @@ def modify_task( function=cls.modify_task.__name__, argument="task_id" ) - if target_id and agent_group_id: + if ( + sum( + id is not None + for id in (target_id, agent_group_id, oci_image_target_id) + ) + > 1 + ): raise InvalidArgument( function=cls.modify_task.__name__, - argument="target_id/agent_group_id", - message="Only one of target_id or agent_group_id can be modified at a time", + argument="target_id/agent_group_id/oci_image_target_id", + message="Only one of target_id, agent_group_id or oci_image_target_id can be modified at a time", ) cmd = XmlCommand("modify_task") @@ -408,6 +506,11 @@ def modify_task( if agent_group_id: cmd.add_element("agent_group", attrs={"id": str(agent_group_id)}) + if oci_image_target_id: + cmd.add_element( + "oci_image_target", attrs={"id": str(oci_image_target_id)} + ) + if alterable is not None: cmd.add_element("alterable", to_bool(alterable)) diff --git a/tests/protocols/gmpnext/entities/tasks/__init__.py b/tests/protocols/gmpnext/entities/tasks/__init__.py index 36374649..8f709f64 100644 --- a/tests/protocols/gmpnext/entities/tasks/__init__.py +++ b/tests/protocols/gmpnext/entities/tasks/__init__.py @@ -5,6 +5,9 @@ from .test_clone_task import GmpCloneTaskTestMixin from .test_create_agent_group_task import GmpCreateAgentGroupTaskTestMixin +from .test_create_container_image_task import ( + GmpCreateContainerImageTaskTestMixin, +) from .test_create_container_task import GmpCreateContainerTaskTestMixin from .test_create_task import GmpCreateTaskTestMixin from .test_delete_task import GmpDeleteTaskTestMixin @@ -19,6 +22,7 @@ __all__ = ( "GmpCloneTaskTestMixin", "GmpCreateAgentGroupTaskTestMixin", + "GmpCreateContainerImageTaskTestMixin", "GmpCreateContainerTaskTestMixin", "GmpCreateTaskTestMixin", "GmpDeleteTaskTestMixin", diff --git a/tests/protocols/gmpnext/entities/tasks/test_create_container_image_task.py b/tests/protocols/gmpnext/entities/tasks/test_create_container_image_task.py new file mode 100644 index 00000000..ea53843a --- /dev/null +++ b/tests/protocols/gmpnext/entities/tasks/test_create_container_image_task.py @@ -0,0 +1,219 @@ +# SPDX-FileCopyrightText: 2018-2025 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later +# + +from collections import OrderedDict + +from gvm.errors import InvalidArgument, RequiredArgument + + +class GmpCreateContainerImageTaskTestMixin: + def test_create_container_image_task(self): + self.gmp.create_container_image_task( + name="foo", oci_image_target_id="it1", scanner_id="s1" + ) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"scan" + b'' + b'' + b"" + ) + + def test_create_container_image_task_missing_name(self): + with self.assertRaises(RequiredArgument): + self.gmp.create_container_image_task( + name=None, oci_image_target_id="it1", scanner_id="s1" + ) + with self.assertRaises(RequiredArgument): + self.gmp.create_container_image_task( + name="", oci_image_target_id="it1", scanner_id="s1" + ) + + def test_create_container_image_task_missing_oci_image_target_id(self): + with self.assertRaises(RequiredArgument): + self.gmp.create_container_image_task( + name="foo", oci_image_target_id=None, scanner_id="s1" + ) + with self.assertRaises(RequiredArgument): + self.gmp.create_container_image_task( + name="foo", oci_image_target_id="", scanner_id="s1" + ) + + def test_create_container_image_task_missing_scanner_id(self): + with self.assertRaises(RequiredArgument): + self.gmp.create_container_image_task( + name="foo", oci_image_target_id="it1", scanner_id=None + ) + with self.assertRaises(RequiredArgument): + self.gmp.create_container_image_task( + name="foo", oci_image_target_id="it1", scanner_id="" + ) + + def test_create_container_image_task_with_comment(self): + self.gmp.create_container_image_task( + name="foo", + oci_image_target_id="it1", + scanner_id="s1", + comment="my comment", + ) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"scan" + b'' + b'' + b"my comment" + b"" + ) + + def test_create_container_image_task_with_alerts(self): + self.gmp.create_container_image_task( + name="foo", + oci_image_target_id="it1", + scanner_id="s1", + alert_ids=["a1", "a2"], + ) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"scan" + b'' + b'' + b'' + b'' + b"" + ) + + def test_create_container_image_task_with_empty_alerts(self): + self.gmp.create_container_image_task( + name="foo", oci_image_target_id="it1", scanner_id="s1", alert_ids=[] + ) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"scan" + b'' + b'' + b"" + ) + + def test_create_container_image_task_with_schedule(self): + self.gmp.create_container_image_task( + name="foo", + oci_image_target_id="it1", + scanner_id="s1", + schedule_id="s1", + ) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"scan" + b'' + b'' + b'' + b"" + ) + + def test_create_create_container_image_task_with_schedule_periods(self): + self.gmp.create_container_image_task( + name="foo", + oci_image_target_id="it1", + scanner_id="s1", + schedule_id="s1", + schedule_periods=5, + ) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"scan" + b'' + b'' + b'' + b"5" + b"" + ) + + def test_create_container_image_task_with_invalid_schedule_periods(self): + with self.assertRaises(InvalidArgument): + self.gmp.create_container_image_task( + name="foo", + oci_image_target_id="it1", + scanner_id="s1", + schedule_id="s1", + schedule_periods="invalid", + ) + + with self.assertRaises(InvalidArgument): + self.gmp.create_container_image_task( + name="foo", + oci_image_target_id="it1", + scanner_id="s1", + schedule_id="s1", + schedule_periods=-1, + ) + + def test_create_container_image_task_with_alterable(self): + self.gmp.create_container_image_task( + name="foo", + oci_image_target_id="it1", + scanner_id="s1", + alterable=True, + ) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"scan" + b'' + b'' + b"1" + b"" + ) + + def test_create__container_image_task_with_observers(self): + self.gmp.create_container_image_task( + name="foo", + oci_image_target_id="it1", + scanner_id="s1", + observers=["u1", "u2"], + ) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"scan" + b'' + b'' + b"u1,u2" + b"" + ) + + def test_create_container_image_task_with_preferences(self): + self.gmp.create_container_image_task( + name="foo", + oci_image_target_id="it1", + scanner_id="s1", + preferences=OrderedDict([("pref1", "val1"), ("pref2", "val2")]), + ) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"scan" + b'' + b'' + b"" + b"pref1val1" + b"pref2val2" + b"" + b"" + ) diff --git a/tests/protocols/gmpnext/entities/tasks/test_modify_task.py b/tests/protocols/gmpnext/entities/tasks/test_modify_task.py index b1281bd5..0cc81174 100644 --- a/tests/protocols/gmpnext/entities/tasks/test_modify_task.py +++ b/tests/protocols/gmpnext/entities/tasks/test_modify_task.py @@ -62,12 +62,31 @@ def test_modify_task_with_agent_group_id(self): b'' ) + def test_modify_task_with_oci_image_target_id(self): + self.gmp.modify_task(task_id="t1", oci_image_target_id="it1") + + self.connection.send.has_been_called_with( + b'' + ) + def test_modify_task_with_target_and_agent_group(self): with self.assertRaises(InvalidArgument): self.gmp.modify_task( task_id="t1", target_id="t1", agent_group_id="ag1" ) + def test_modify_task_with_target_and_oci_image_target(self): + with self.assertRaises(InvalidArgument): + self.gmp.modify_task( + task_id="t1", target_id="t1", oci_image_target_id="it1" + ) + + def test_modify_task_with_agent_group_and_oci_image_target(self): + with self.assertRaises(InvalidArgument): + self.gmp.modify_task( + task_id="t1", agent_group_id="ag1", oci_image_target_id="it1" + ) + def test_modify_task_with_schedule_id(self): self.gmp.modify_task(task_id="t1", schedule_id="s1") diff --git a/tests/protocols/gmpnext/entities/test_tasks.py b/tests/protocols/gmpnext/entities/test_tasks.py index 386e816a..1a0ae32f 100644 --- a/tests/protocols/gmpnext/entities/test_tasks.py +++ b/tests/protocols/gmpnext/entities/test_tasks.py @@ -7,6 +7,7 @@ from ...gmpnext.entities.tasks import ( GmpCloneTaskTestMixin, GmpCreateAgentGroupTaskTestMixin, + GmpCreateContainerImageTaskTestMixin, GmpCreateContainerTaskTestMixin, GmpCreateTaskTestMixin, GmpDeleteTaskTestMixin, @@ -30,6 +31,12 @@ class GmpCreateAgentGroupTaskTestCase( pass +class GMPCreateContainerImageTaskTestCase( + GmpCreateContainerImageTaskTestMixin, GMPTestCase +): + pass + + class GMPCreateContainerTaskTestCase( GmpCreateContainerTaskTestMixin, GMPTestCase ):