Skip to content

Commit c3cd421

Browse files
committed
feat(securitycenter): Add Resource SCC Mgt API Org ETD Cust Modules (Create, Get, Delete, List, Update)
1 parent fa9235a commit c3cd421

File tree

5 files changed

+433
-0
lines changed

5 files changed

+433
-0
lines changed
Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
#!/usr/bin/env python
2+
#
3+
# Copyright 2024 Google LLC
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# https://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
import random
18+
import time
19+
from typing import Dict
20+
21+
from google.cloud import securitycentermanagement_v1
22+
from google.protobuf.struct_pb2 import Struct
23+
from google.protobuf.field_mask_pb2 import FieldMask
24+
25+
26+
# [START securitycenter_create_event_threat_detection_custom_module]
27+
def create_event_threat_detection_custom_module(parent: str) -> Dict:
28+
"""
29+
Creates a Event Threat Detection Custom Module.
30+
31+
This custom module evaluates Cloud KMS CryptoKeys to ensure their rotation period exceeds 30 days (2592000 seconds),
32+
as per security best practices. A shorter rotation period helps reduce the risk of exposure in the event of a compromise.
33+
34+
Args:
35+
parent: Use any one of the following options:
36+
- organizations/{organization_id}/locations/{location_id}
37+
- folders/{folder_id}/locations/{location_id}
38+
- projects/{project_id}/locations/{location_id}
39+
Returns:
40+
Dict: Created custom module details.
41+
"""
42+
client = securitycentermanagement_v1.SecurityCenterManagementClient()
43+
44+
try:
45+
# Seed the random number generator
46+
random.seed(time.time())
47+
# Generate a unique suffix
48+
unique_suffix = f"{int(time.time())}-{random.randint(0, 999)}"
49+
# Create unique display name
50+
display_name = f"python_sample_etd_custom_module_{unique_suffix}"
51+
52+
# Define the metadata and other config parameters as a dictionary
53+
config_map = {
54+
"metadata": {
55+
"severity": "MEDIUM",
56+
"description": "Sample custom module for testing purposes. Please do not delete.",
57+
"recommendation": "na",
58+
},
59+
"ips": ["0.0.0.0"],
60+
}
61+
62+
# Convert the dictionary to a Struct
63+
config_struct = Struct()
64+
config_struct.update(config_map)
65+
66+
# Define the Event Threat Detection custom module configuration
67+
custom_module = securitycentermanagement_v1.EventThreatDetectionCustomModule(
68+
config=config_struct,
69+
display_name=display_name,
70+
enablement_state=securitycentermanagement_v1.EventThreatDetectionCustomModule.EnablementState.ENABLED,
71+
type_="CONFIGURABLE_BAD_IP",
72+
)
73+
74+
# Create the request
75+
request = securitycentermanagement_v1.CreateEventThreatDetectionCustomModuleRequest(
76+
parent=parent,
77+
event_threat_detection_custom_module=custom_module,
78+
)
79+
80+
# Make the API call
81+
response = client.create_event_threat_detection_custom_module(request=request)
82+
83+
print(f"Created EventThreatDetectionCustomModule: {response.name}")
84+
return response
85+
86+
except Exception as e:
87+
print(f"Failed to create EventThreatDetectionCustomModule: {e}")
88+
raise
89+
90+
# [END securitycenter_create_event_threat_detection_custom_module]
91+
92+
# [START securitycenter_get_event_threat_detection_custom_module]
93+
def get_event_threat_detection_custom_module(parent: str, module_id: str):
94+
"""
95+
Retrieves a Event Threat Detection custom module.
96+
Args:
97+
parent: Use any one of the following options:
98+
- organizations/{organization_id}/locations/{location_id}
99+
- folders/{folder_id}/locations/{location_id}
100+
- projects/{project_id}/locations/{location_id}
101+
Returns:
102+
The retrieved Event Threat Detection custom module.
103+
Raises:
104+
NotFound: If the specified custom module does not exist.
105+
"""
106+
client = securitycentermanagement_v1.SecurityCenterManagementClient()
107+
108+
try:
109+
request = securitycentermanagement_v1.GetEventThreatDetectionCustomModuleRequest(
110+
name=f"{parent}/eventThreatDetectionCustomModules/{module_id}",
111+
)
112+
113+
response = client.get_event_threat_detection_custom_module(request=request)
114+
print(f"Retrieved Event Threat Detection Custom Module: {response.name}")
115+
return response
116+
except NotFound as e:
117+
print(f"Custom Module not found: {response.name}")
118+
raise e
119+
# [END securitycenter_get_event_threat_detection_custom_module]
120+
121+
# [START securitycenter_list_event_threat_detection_custom_module]
122+
def list_event_threat_detection_custom_module(parent: str):
123+
"""
124+
Retrieves list of Event Threat Detection custom module.
125+
Args:
126+
parent: Use any one of the following options:
127+
- organizations/{organization_id}/locations/{location_id}
128+
- folders/{folder_id}/locations/{location_id}
129+
- projects/{project_id}/locations/{location_id}
130+
Returns:
131+
List of retrieved Event Threat Detection custom modules.
132+
Raises:
133+
NotFound: If the specified custom module does not exist.
134+
"""
135+
136+
client = securitycentermanagement_v1.SecurityCenterManagementClient()
137+
138+
try:
139+
request = securitycentermanagement_v1.ListEventThreatDetectionCustomModulesRequest(
140+
parent=parent,
141+
)
142+
143+
response = client.list_event_threat_detection_custom_modules(request=request)
144+
145+
custom_modules = []
146+
for custom_module in response:
147+
print(f"Custom Module: {custom_module.name}")
148+
custom_modules.append(custom_module)
149+
return custom_modules
150+
except NotFound as e:
151+
print(f"Parent resource not found: {parent}")
152+
raise e
153+
except Exception as e:
154+
print(f"An error occurred while listing custom modules: {e}")
155+
raise e
156+
# [END securitycenter_list_event_threat_detection_custom_module]
157+
158+
# [START securitycenter_update_event_threat_detection_custom_module]
159+
def update_event_threat_detection_custom_module(parent: str, module_id: str):
160+
"""
161+
Updates an Event Threat Detection Custom Module.
162+
163+
Args:
164+
parent: Use any one of the following options:
165+
- organizations/{organization_id}/locations/{location_id}
166+
- folders/{folder_id}/locations/{location_id}
167+
- projects/{project_id}/locations/{location_id}
168+
Returns:
169+
Dict: Created custom module details.
170+
"""
171+
client = securitycentermanagement_v1.SecurityCenterManagementClient()
172+
173+
try:
174+
175+
custom_module = securitycentermanagement_v1.EventThreatDetectionCustomModule(
176+
name=f"{parent}/eventThreatDetectionCustomModules/{module_id}",
177+
enablement_state=securitycentermanagement_v1.EventThreatDetectionCustomModule.EnablementState.DISABLED,
178+
)
179+
180+
# Create the request
181+
request = securitycentermanagement_v1.UpdateEventThreatDetectionCustomModuleRequest(
182+
event_threat_detection_custom_module=custom_module,
183+
update_mask=FieldMask(paths=["enablement_state"]),
184+
)
185+
186+
# Make the API call
187+
response = client.update_event_threat_detection_custom_module(request=request)
188+
189+
print(f"Updated EventThreatDetectionCustomModule: {response.name}")
190+
return response
191+
192+
except Exception as e:
193+
print(f"Failed to update EventThreatDetectionCustomModule: {e}")
194+
raise
195+
196+
# [END securitycenter_update_event_threat_detection_custom_module]
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
#!/usr/bin/env python
2+
#
3+
# Copyright 2024 Google LLC
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# https://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
import os
18+
19+
import random
20+
21+
import time
22+
23+
import backoff
24+
25+
from google.api_core.exceptions import InternalServerError, NotFound, ServiceUnavailable
26+
27+
from google.cloud import securitycentermanagement_v1
28+
29+
from google.protobuf.struct_pb2 import Struct
30+
31+
import pytest
32+
33+
import event_threat_detection_custom_modules
34+
35+
# Replace these variables before running the sample.
36+
# GCLOUD_ORGANIZATION: The organization ID.
37+
ORGANIZATION_ID = os.environ["GCLOUD_ORGANIZATION"]
38+
LOCATION = "global"
39+
PREFIX = "python_sample_etd_custom_module" # Prefix used for identifying test modules
40+
41+
42+
@pytest.fixture(scope="session", autouse=True)
43+
def setup_environment():
44+
"""Fixture to ensure a clean environment by removing test modules before running tests."""
45+
if not ORGANIZATION_ID:
46+
pytest.fail("GCLOUD_ORGANIZATION environment variable is not set.")
47+
48+
print(f"Cleaning up existing custom modules for organization: {ORGANIZATION_ID}")
49+
cleanup_existing_custom_modules(ORGANIZATION_ID)
50+
51+
52+
def cleanup_existing_custom_modules(org_id: str):
53+
"""
54+
Deletes all custom modules matching a specific naming pattern.
55+
Args:
56+
org_id: The organization ID.
57+
"""
58+
client = securitycentermanagement_v1.SecurityCenterManagementClient()
59+
parent = f"organizations/{org_id}/locations/global"
60+
print(f"Parent path: {parent}")
61+
try:
62+
custom_modules = client.list_event_threat_detection_custom_modules(
63+
request={"parent": parent}
64+
)
65+
for module in custom_modules:
66+
if module.display_name.startswith(PREFIX):
67+
client.delete_event_threat_detection_custom_module(
68+
request={"name": module.name}
69+
)
70+
print(f"Deleted custom module: {module.name}")
71+
except NotFound as e:
72+
print(f"Resource not found: {e}")
73+
except Exception as e:
74+
print(f"Unexpected error during cleanup: {e}")
75+
raise
76+
77+
78+
def add_custom_module(org_id: str):
79+
80+
parent = f"organizations/{org_id}/locations/global"
81+
client = securitycentermanagement_v1.SecurityCenterManagementClient()
82+
83+
# Generate a unique display name
84+
unique_suffix = f"{int(time.time())}_{random.randint(0, 999)}"
85+
display_name = f"python_sample_etd_custom_module_test_{unique_suffix}"
86+
87+
# Define the metadata and other config parameters as a dictionary
88+
config_map = {
89+
"metadata": {
90+
"severity": "MEDIUM",
91+
"description": "Sample custom module for testing purposes. Please do not delete.",
92+
"recommendation": "na",
93+
},
94+
"ips": ["0.0.0.0"],
95+
}
96+
97+
# Convert the dictionary to a Struct
98+
config_struct = Struct()
99+
config_struct.update(config_map)
100+
101+
# Define the custom module configuration
102+
custom_module = {
103+
"display_name": display_name,
104+
"enablement_state": "ENABLED",
105+
"type_": "CONFIGURABLE_BAD_IP",
106+
"config": config_struct,
107+
}
108+
109+
request = securitycentermanagement_v1.CreateEventThreatDetectionCustomModuleRequest(
110+
parent=parent,
111+
event_threat_detection_custom_module=custom_module,
112+
)
113+
response = client.create_event_threat_detection_custom_module(request=request)
114+
print(f"Created Event Threat Detection Custom Module: {response.name}")
115+
module_name = response.name
116+
module_id = module_name.split("/")[-1]
117+
return module_name, module_id
118+
119+
120+
@backoff.on_exception(
121+
backoff.expo, (InternalServerError, ServiceUnavailable, NotFound), max_tries=3
122+
)
123+
def test_create_event_threat_detection_custom_module():
124+
parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}"
125+
126+
# Run the function to create the custom module
127+
response = event_threat_detection_custom_modules.create_event_threat_detection_custom_module(parent)
128+
129+
assert response is not None, "Custom module creation failed."
130+
# Verify that the custom module was created
131+
assert response.display_name.startswith(PREFIX)
132+
assert response.enablement_state == securitycentermanagement_v1.EventThreatDetectionCustomModule.EnablementState.ENABLED
133+
134+
@backoff.on_exception(
135+
backoff.expo, (InternalServerError, ServiceUnavailable, NotFound), max_tries=3
136+
)
137+
def test_get_event_threat_detection_custom_module():
138+
139+
module_name, module_id = add_custom_module(ORGANIZATION_ID)
140+
parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}"
141+
142+
# Retrieve the custom module
143+
response = event_threat_detection_custom_modules.get_event_threat_detection_custom_module(parent, module_id)
144+
145+
assert response is not None, "Failed to retrieve the custom module."
146+
# Verify that the custom module was created
147+
assert response.display_name.startswith(PREFIX)
148+
assert response.enablement_state == securitycentermanagement_v1.EventThreatDetectionCustomModule.EnablementState.ENABLED
149+
print(f"Retrieved Custom Module: {response.name}")
150+
151+
@backoff.on_exception(
152+
backoff.expo, (InternalServerError, ServiceUnavailable, NotFound), max_tries=3
153+
)
154+
def test_list_event_threat_detection_custom_module():
155+
156+
module_name, module_id = add_custom_module(ORGANIZATION_ID)
157+
parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}"
158+
# Retrieve the custom modules
159+
custom_modules = event_threat_detection_custom_modules.list_event_threat_detection_custom_module(parent)
160+
161+
assert custom_modules is not None, "Failed to retrieve the custom modules."
162+
assert len(custom_modules) > 0, "No custom modules were retrieved."
163+
164+
# Verify the created module is in the list
165+
created_module = next(
166+
(module for module in custom_modules if module.name == module_name), None
167+
)
168+
assert created_module is not None, "Created custom module not found in the list."
169+
assert created_module.display_name.startswith(PREFIX)
170+
assert (
171+
created_module.enablement_state
172+
== securitycentermanagement_v1.EventThreatDetectionCustomModule.EnablementState.ENABLED
173+
)
174+
175+
@backoff.on_exception(
176+
backoff.expo, (InternalServerError, ServiceUnavailable, NotFound), max_tries=3
177+
)
178+
def test_update_event_threat_detection_custom_module():
179+
180+
module_name, module_id = add_custom_module(ORGANIZATION_ID)
181+
parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}"
182+
183+
# Retrieve the custom module
184+
response = event_threat_detection_custom_modules.update_event_threat_detection_custom_module(parent, module_id)
185+
186+
assert response is not None, "Failed to retrieve the custom module."
187+
# Verify that the custom module was created
188+
assert response.display_name.startswith(PREFIX)
189+
assert response.enablement_state == securitycentermanagement_v1.EventThreatDetectionCustomModule.EnablementState.DISABLED

0 commit comments

Comments
 (0)