11#!/usr/bin/env python
22#
3- # Copyright 2024 Google LLC
3+ # Copyright 2025 Google LLC
44#
55# Licensed under the Apache License, Version 2.0 (the "License");
66# you may not use this file except in compliance with the License.
1919
2020import time
2121
22+ import uuid
23+
2224import backoff
2325
2426from google .api_core .exceptions import InternalServerError , NotFound , ServiceUnavailable
3335# GCLOUD_ORGANIZATION: The organization ID.
3436ORGANIZATION_ID = os .environ ["GCLOUD_ORGANIZATION" ]
3537LOCATION = "global"
36- PREFIX = "python_sample_sha_custom_module" # Prefix used for identifying test modules
38+ PREFIX = "python_sample_sha_custom_module"
3739
38- # Global list to track created modules
39- created_modules = []
40+ # Global list to track created shared modules
41+ shared_modules = []
4042
4143
4244@pytest .fixture (scope = "session" , autouse = True )
4345def setup_environment ():
4446 if not ORGANIZATION_ID :
4547 pytest .fail ("GCLOUD_ORGANIZATION environment variable is not set." )
4648
49+ setup_shared_modules ()
50+
4751
4852@pytest .fixture (scope = "session" , autouse = True )
4953def cleanup_after_tests (request ):
5054 """Fixture to clean up created custom modules after the test session."""
5155 def teardown ():
52- print ("\n Created Custom Modules:" )
53- print_all_created_modules ()
54- print ("Cleaning up created custom modules..." )
55- cleanup_created_custom_modules ()
56+ print_all_shared_modules ()
57+ cleanup_shared_modules ()
5658
5759 request .addfinalizer (teardown )
5860
5961
62+ def setup_shared_modules ():
63+ for _ in range (3 ) :
64+ _ , module_id = add_custom_module (ORGANIZATION_ID )
65+ if module_id != "" :
66+ shared_modules .append (module_id )
67+
68+
69+ def add_module_to_cleanup (module_id ):
70+ shared_modules .append (module_id )
71+
72+
73+ def print_all_shared_modules ():
74+ """Print all created custom modules."""
75+ if not shared_modules :
76+ print ("No custom modules were created." )
77+ else :
78+ print ("\n Created Custom Modules:" )
79+ for module_id in shared_modules :
80+ print (module_id )
81+
82+
83+ def cleanup_shared_modules ():
84+ """
85+ Deletes all created custom modules in this test session.
86+ """
87+ client = securitycentermanagement_v1 .SecurityCenterManagementClient ()
88+
89+ print ("Cleaning up created custom modules..." )
90+
91+ for module_id in list (shared_modules ):
92+ if not custom_module_exists (module_id ):
93+ print (f"Module not found (already deleted): { module_id } " )
94+ shared_modules .remove (module_id )
95+ continue
96+ try :
97+ client .delete_security_health_analytics_custom_module (
98+ request = {"name" : f"organizations/{ ORGANIZATION_ID } /locations/{ LOCATION } /securityHealthAnalyticsCustomModules/{ module_id } " }
99+ )
100+ print (f"Deleted custom module: { module_id } " )
101+ shared_modules .remove (module_id )
102+ except Exception as e :
103+ print (f"Failed to delete module { module_id } : { e } " )
104+ raise
105+
106+
107+ def custom_module_exists (module_id ):
108+ client = securitycentermanagement_v1 .SecurityCenterManagementClient ()
109+ try :
110+ client .get_security_health_analytics_custom_module (
111+ request = {"name" : f"organizations/{ ORGANIZATION_ID } /locations/{ LOCATION } /securityHealthAnalyticsCustomModules/{ module_id } " }
112+ )
113+ return True
114+ except Exception as e :
115+ if "404" in str (e ):
116+ return False
117+ raise
118+
119+
120+ def get_random_shared_module ():
121+ if not shared_modules :
122+ return ""
123+ random .seed (int (time .time () * 1000000 ))
124+ return shared_modules [random .randint (0 , len (shared_modules ) - 1 )]
125+
126+
127+ def extract_custom_module_id (module_name ):
128+ trimmed_full_name = module_name .strip ()
129+ parts = trimmed_full_name .split ("/" )
130+ if parts :
131+ return parts [- 1 ]
132+ return ""
133+
134+
60135def add_custom_module (org_id : str ):
61136 """
62137 Adds a new SHA custom module.
@@ -69,7 +144,7 @@ def add_custom_module(org_id: str):
69144 client = securitycentermanagement_v1 .SecurityCenterManagementClient ()
70145
71146 # Generate a unique display name
72- unique_suffix = f" { int ( time . time ())} _ { random . randint ( 0 , 999 ) } "
147+ unique_suffix = str ( uuid . uuid4 ()). replace ( "-" , "_" )
73148 display_name = f"python_sample_sha_custom_module_test_{ unique_suffix } "
74149
75150 # Define the custom module configuration
@@ -109,7 +184,7 @@ def add_custom_module(org_id: str):
109184 response = client .create_security_health_analytics_custom_module (request = request )
110185 print (f"Created Security Health Analytics Custom Module: { response .name } " )
111186 module_name = response .name
112- module_id = module_name . split ( "/" )[ - 1 ]
187+ module_id = extract_custom_module_id ( module_name )
113188 return module_name , module_id
114189
115190
@@ -118,8 +193,7 @@ def add_custom_module(org_id: str):
118193)
119194def test_get_effective_security_health_analytics_custom_module ():
120195 """Tests getting an effective SHA custom module."""
121- module_name , module_id = add_custom_module (ORGANIZATION_ID )
122- created_modules .append (module_name )
196+ module_id = get_random_shared_module ()
123197 parent = f"organizations/{ ORGANIZATION_ID } /locations/{ LOCATION } "
124198
125199 # Retrieve the custom module
@@ -129,68 +203,39 @@ def test_get_effective_security_health_analytics_custom_module():
129203 # Verify that the custom module was created
130204 assert response .display_name .startswith (PREFIX )
131205 assert response .enablement_state == securitycentermanagement_v1 .EffectiveSecurityHealthAnalyticsCustomModule .EnablementState .ENABLED
132- print (f"Retrieved Custom Module: { response .name } " )
133206
134207
135208@backoff .on_exception (
136209 backoff .expo , (InternalServerError , ServiceUnavailable , NotFound ), max_tries = 3
137210)
138211def test_list_descendant_security_health_analytics_custom_module ():
139212 """Tests listing descendant SHA custom modules."""
140- module_name , module_id = add_custom_module (ORGANIZATION_ID )
141- created_modules .append (module_name )
142213 parent = f"organizations/{ ORGANIZATION_ID } /locations/{ LOCATION } "
143214 # Retrieve the list descendant custom modules
144215 custom_modules = security_health_analytics_custom_modules .list_descendant_security_health_analytics_custom_module (parent )
145216
146217 assert custom_modules is not None , "Failed to retrieve the custom modules."
147218 assert len (custom_modules ) > 0 , "No custom modules were retrieved."
148219
149- # Verify the created module is in the list
150- created_module = next (
151- (module for module in custom_modules if module .name == module_name ), None
152- )
153- assert created_module is not None , "Created custom module not found in the list."
154- assert created_module .display_name .startswith (PREFIX )
155- assert (
156- created_module .enablement_state
157- == securitycentermanagement_v1 .SecurityHealthAnalyticsCustomModule .EnablementState .ENABLED
158- )
159-
160220
161221@backoff .on_exception (
162222 backoff .expo , (InternalServerError , ServiceUnavailable , NotFound ), max_tries = 3
163223)
164224def test_list_effective_security_health_analytics_custom_module ():
165225 """Tests listing effective SHA custom modules."""
166- module_name , module_id = add_custom_module (ORGANIZATION_ID )
167- created_modules .append (module_name )
168226 parent = f"organizations/{ ORGANIZATION_ID } /locations/{ LOCATION } "
169227 # Retrieve the list of custom modules
170228 custom_modules = security_health_analytics_custom_modules .list_effective_security_health_analytics_custom_module (parent )
171229
172230 assert custom_modules is not None , "Failed to retrieve the custom modules."
173231 assert len (custom_modules ) > 0 , "No custom modules were retrieved."
174232
175- # Verify the created module is in the list
176- created_module = next (
177- (module for module in custom_modules if (module .name .split ("/" )[- 1 ]) == module_id ), None
178- )
179- assert created_module is not None , "Created custom module not found in the list."
180- assert created_module .display_name .startswith (PREFIX )
181- assert (
182- created_module .enablement_state
183- == securitycentermanagement_v1 .EffectiveSecurityHealthAnalyticsCustomModule .EnablementState .ENABLED
184- )
185-
186233
187234@backoff .on_exception (
188235 backoff .expo , (InternalServerError , ServiceUnavailable , NotFound ), max_tries = 3
189236)
190237def test_simulate_security_health_analytics_custom_module ():
191238 """Tests simulating an SHA custom module."""
192- module_name , module_id = add_custom_module (ORGANIZATION_ID )
193- created_modules .append (module_name )
194239 parent = f"organizations/{ ORGANIZATION_ID } /locations/{ LOCATION } "
195240
196241 simulated_custom_module = security_health_analytics_custom_modules .simulate_security_health_analytics_custom_module (parent )
@@ -199,47 +244,3 @@ def test_simulate_security_health_analytics_custom_module():
199244 assert simulated_custom_module .result .no_violation is not None , (
200245 f"Expected no_violation to be present, got { simulated_custom_module .result } ."
201246 )
202-
203-
204- def print_all_created_modules ():
205- """Print all created custom modules."""
206- if not created_modules :
207- print ("No custom modules were created." )
208- else :
209- for module in created_modules :
210- print (module )
211-
212-
213- def cleanup_created_custom_modules ():
214- """
215- Deletes all created custom modules in this test session.
216- """
217- client = securitycentermanagement_v1 .SecurityCenterManagementClient ()
218-
219- for module in list (created_modules ):
220- if not custom_module_exists (module ):
221- print (f"Module not found (already deleted): { module } " )
222- created_modules .remove (module )
223- continue
224- try :
225- client .delete_security_health_analytics_custom_module (
226- request = {"name" : module }
227- )
228- print (f"Deleted custom module: { module } " )
229- created_modules .remove (module )
230- except Exception as e :
231- print (f"Failed to delete module { module } : { e } " )
232- raise
233-
234-
235- def custom_module_exists (module_name ):
236- client = securitycentermanagement_v1 .SecurityCenterManagementClient ()
237- try :
238- client .get_security_health_analytics_custom_module (
239- request = {"name" : module_name }
240- )
241- return True
242- except Exception as e :
243- if "404" in str (e ):
244- return False
245- raise
0 commit comments