Skip to content

Commit 9a19f99

Browse files
committed
feat(securitycenter): Add Resource SCC Mgt API Org SHA Cust Modules (Create, Get, Delete, List, Update)
1 parent b9b28d7 commit 9a19f99

File tree

5 files changed

+471
-0
lines changed

5 files changed

+471
-0
lines changed
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# Copyright 2020 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
# Default TEST_CONFIG_OVERRIDE for python repos.
16+
17+
# You can copy this file into your directory, then it will be inported from
18+
# the noxfile.py.
19+
20+
# The source of truth:
21+
# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/noxfile_config.py
22+
23+
TEST_CONFIG_OVERRIDE = {
24+
# You can opt out from the test for specific Python versions.
25+
"ignored_versions": ["2.7", "3.7", "3.9", "3.10", "3.11"],
26+
# An envvar key for determining the project id to use. Change it
27+
# to 'BUILD_SPECIFIC_GCLOUD_PROJECT' if you want to opt in using a
28+
# build specific Cloud project. You can also use your own string
29+
# to use your own Cloud project.
30+
"gcloud_project_env": "GOOGLE_CLOUD_PROJECT",
31+
# "gcloud_project_env": "BUILD_SPECIFIC_GCLOUD_PROJECT",
32+
# A dictionary you want to inject into your test. Don't put any
33+
# secrets here. These values will override predefined values.
34+
"envs": {
35+
"GCLOUD_ORGANIZATION": "1081635000895",
36+
"GCLOUD_PROJECT": "project-a-id",
37+
"GCLOUD_PUBSUB_TOPIC": "projects/project-a-id/topics/notifications-sample-topic",
38+
"GCLOUD_PUBSUB_SUBSCRIPTION": "projects/project-a-id/subscriptions/notification-sample-subscription",
39+
"GCLOUD_LOCATION": "global",
40+
},
41+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
backoff==2.2.1
2+
pytest==8.2.0
3+
google-cloud-bigquery==3.11.4
4+
google-cloud-securitycentermanagement==0.1.17
5+
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
google-cloud-securitycentermanagement==0.1.17
2+
google-cloud-bigquery==3.11.4
3+
google-cloud-pubsub==2.21.5
Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
#!/usr/bin/env python
2+
#
3+
# Copyright 2024 Google LLC
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# https://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
import os
17+
import uuid
18+
import backoff
19+
from google.api_core.exceptions import InternalServerError, ServiceUnavailable, NotFound
20+
from google.cloud import securitycentermanagement_v1
21+
import security_health_analytics_custom_modules
22+
import pytest
23+
import time
24+
import random
25+
26+
#Replace these variables before running the sample.
27+
ORGANIZATION_ID = os.environ["GCLOUD_ORGANIZATION"]
28+
LOCATION = "global"
29+
PREFIX = "python_sample_sha_custom_module" # Prefix used for identifying test modules
30+
31+
@pytest.fixture(scope="session", autouse=True)
32+
def setup_environment():
33+
"""Fixture to ensure a clean environment by removing test modules before running tests."""
34+
if not ORGANIZATION_ID:
35+
pytest.fail("GCLOUD_ORGANIZATION environment variable is not set.")
36+
37+
print(f"Cleaning up existing custom modules for organization: {ORGANIZATION_ID}")
38+
cleanup_existing_custom_modules(ORGANIZATION_ID)
39+
40+
def cleanup_existing_custom_modules(org_id: str):
41+
"""
42+
Deletes all custom modules matching a specific naming pattern.
43+
Args:
44+
org_id: The organization ID.
45+
"""
46+
client = securitycentermanagement_v1.SecurityCenterManagementClient()
47+
parent = f"organizations/{org_id}/locations/global"
48+
49+
try:
50+
custom_modules = client.list_security_health_analytics_custom_modules(
51+
request={"parent": parent}
52+
)
53+
for module in custom_modules:
54+
if module.display_name.startswith(PREFIX):
55+
client.delete_security_health_analytics_custom_module(
56+
request={"name": module.name}
57+
)
58+
print(f"Deleted custom module: {module.name}")
59+
except NotFound:
60+
print(f"Custom Module not found for deletion: {module.name}")
61+
62+
def add_custom_module(org_id: str):
63+
64+
parent = f"organizations/{org_id}/locations/global"
65+
client = securitycentermanagement_v1.SecurityCenterManagementClient()
66+
67+
# Generate a unique display name
68+
unique_suffix = f"{int(time.time())}_{random.randint(0, 999)}"
69+
display_name = f"python_sample_sha_custom_module_test_{unique_suffix}"
70+
71+
# Define the custom module configuration
72+
custom_module = {
73+
"display_name": display_name,
74+
"enablement_state": "ENABLED",
75+
"custom_config": {
76+
"description": "Sample custom module for testing purpose. Please do not delete.",
77+
"predicate": {
78+
"expression": "has(resource.rotationPeriod) && (resource.rotationPeriod > duration('2592000s'))",
79+
"title": "GCE Instance High Severity",
80+
"description": "Custom module to detect high severity issues on GCE instances.",
81+
},
82+
"recommendation": "Ensure proper security configurations on GCE instances.",
83+
"resource_selector": {"resource_types": ["cloudkms.googleapis.com/CryptoKey"]},
84+
"severity": "CRITICAL",
85+
"custom_output": {
86+
"properties": [
87+
{
88+
"name": "example_property",
89+
"value_expression": {
90+
"description": "The name of the instance",
91+
"expression": "resource.name",
92+
"location": "global",
93+
"title": "Instance Name",
94+
},
95+
}
96+
]
97+
},
98+
},
99+
}
100+
101+
request = securitycentermanagement_v1.CreateSecurityHealthAnalyticsCustomModuleRequest(
102+
parent= parent,
103+
security_health_analytics_custom_module= custom_module,
104+
)
105+
106+
response = client.create_security_health_analytics_custom_module(request=request)
107+
print(f"Created Security Health Analytics Custom Module: {response.name}")
108+
module_name = response.name
109+
module_id = module_name.split("/")[-1]
110+
111+
return module_name, module_id
112+
113+
@backoff.on_exception(
114+
backoff.expo, (InternalServerError, ServiceUnavailable, NotFound), max_tries=3
115+
)
116+
def test_create_security_health_analytics_custom_module():
117+
parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}"
118+
119+
# Run the function to create the custom module
120+
response = security_health_analytics_custom_modules.create_security_health_analytics_custom_module(parent)
121+
122+
assert response is not None, "Custom module creation failed."
123+
# Verify that the custom module was created
124+
assert response.display_name.startswith(PREFIX)
125+
assert response.enablement_state == securitycentermanagement_v1.SecurityHealthAnalyticsCustomModule.EnablementState.ENABLED
126+
127+
@backoff.on_exception(
128+
backoff.expo, (InternalServerError, ServiceUnavailable, NotFound), max_tries=3
129+
)
130+
def test_get_security_health_analytics_custom_module():
131+
132+
module_name, module_id = add_custom_module(ORGANIZATION_ID)
133+
parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}"
134+
135+
# Retrieve the custom module
136+
response = security_health_analytics_custom_modules.get_security_health_analytics_custom_module(parent, module_id)
137+
138+
assert response is not None, "Failed to retrieve the custom module."
139+
# Verify that the custom module was created
140+
assert response.display_name.startswith(PREFIX)
141+
assert response.enablement_state == securitycentermanagement_v1.SecurityHealthAnalyticsCustomModule.EnablementState.ENABLED
142+
143+
print(f"Retrieved Custom Module: {response.name}")
144+
145+
@backoff.on_exception(
146+
backoff.expo, (InternalServerError, ServiceUnavailable, NotFound), max_tries=3
147+
)
148+
def test_delete_security_health_analytics_custom_module():
149+
150+
module_name, module_id = add_custom_module(ORGANIZATION_ID)
151+
parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}"
152+
153+
try:
154+
response = security_health_analytics_custom_modules.delete_security_health_analytics_custom_module(parent, module_id)
155+
except Exception as e:
156+
pytest.fail(f"delete_security_health_analytics_custom_module() failed: {e}")
157+
return
158+
159+
assert response is None
160+
161+
print(f"Custom module was deleted successfully: {module_id}")
162+
163+
@backoff.on_exception(
164+
backoff.expo, (InternalServerError, ServiceUnavailable, NotFound), max_tries=3
165+
)
166+
def test_list_security_health_analytics_custom_module():
167+
168+
module_name, module_id = add_custom_module(ORGANIZATION_ID)
169+
parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}"
170+
# Retrieve the custom modules
171+
custom_modules = security_health_analytics_custom_modules.list_security_health_analytics_custom_module(parent)
172+
173+
assert custom_modules is not None, "Failed to retrieve the custom modules."
174+
assert len(custom_modules) > 0, "No custom modules were retrieved."
175+
176+
# Verify the created module is in the list
177+
created_module = next(
178+
(module for module in custom_modules if module.name == module_name), None
179+
)
180+
assert created_module is not None, "Created custom module not found in the list."
181+
assert created_module.display_name.startswith(PREFIX)
182+
assert (
183+
created_module.enablement_state
184+
== securitycentermanagement_v1.SecurityHealthAnalyticsCustomModule.EnablementState.ENABLED
185+
)
186+
187+
@backoff.on_exception(
188+
backoff.expo, (InternalServerError, ServiceUnavailable, NotFound), max_tries=3
189+
)
190+
def test_update_security_health_analytics_custom_module():
191+
192+
module_name, module_id = add_custom_module(ORGANIZATION_ID)
193+
parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}"
194+
# Retrieve the custom modules
195+
updated_custom_module = security_health_analytics_custom_modules.update_security_health_analytics_custom_module(parent, module_id)
196+
197+
assert updated_custom_module is not None, "Failed to retrieve the updated custom module."
198+
response_org_id = updated_custom_module.name.split("/")[1] # Extract organization ID from the name field
199+
assert response_org_id == ORGANIZATION_ID, f"Organization ID mismatch: Expected {ORGANIZATION_ID}, got {response_org_id}."
200+
assert updated_custom_module.enablement_state == securitycentermanagement_v1.SecurityHealthAnalyticsCustomModule.EnablementState.DISABLED

0 commit comments

Comments
 (0)