2020logger = logging .getLogger (__name__ )
2121SNOWFLAKE_AUDIENCE = "snowflakecomputing.com"
2222DEFAULT_ENTRA_SNOWFLAKE_RESOURCE = "api://fd3f753b-eed3-462c-b6a7-a4b5bb650aad"
23+ GCP_METADATA_SERVICE_ACCOUNT_BASE_URL = (
24+ "http://169.254.169.254/computeMetadata/v1/instance/service-accounts/default"
25+ )
2326
2427
2528@unique
@@ -184,29 +187,103 @@ def create_aws_attestation(
184187 )
185188
186189
187- def create_gcp_attestation (
188- session_manager : SessionManager | None = None ,
189- ) -> WorkloadIdentityAttestation :
190- """Tries to create a workload identity attestation for GCP.
190+ def get_gcp_access_token (session_manager : SessionManager ) -> str :
191+ """Gets a GCP access token from the metadata server.
192+
193+ If the application isn't running on GCP or no credentials were found, raises an error.
194+ """
195+ try :
196+ res = session_manager .request (
197+ method = "GET" ,
198+ url = f"{ GCP_METADATA_SERVICE_ACCOUNT_BASE_URL } /token" ,
199+ headers = {
200+ "Metadata-Flavor" : "Google" ,
201+ },
202+ )
203+ res .raise_for_status ()
204+ return res .json ()["access_token" ]
205+ except Exception as e :
206+ raise ProgrammingError (
207+ msg = f"Error fetching GCP access token: { e } . Ensure the application is running on GCP." ,
208+ errno = ER_WIF_CREDENTIALS_NOT_FOUND ,
209+ )
210+
211+
212+ def get_gcp_identity_token_via_impersonation (
213+ impersonation_path : list [str ], session_manager : SessionManager
214+ ) -> str :
215+ """Gets a GCP identity token from the metadata server.
216+
217+ If the application isn't running on GCP or no credentials were found, raises an error.
218+ """
219+ if not impersonation_path :
220+ raise ProgrammingError (
221+ msg = "Error: impersonation_path cannot be empty." ,
222+ errno = ER_WIF_CREDENTIALS_NOT_FOUND ,
223+ )
224+
225+ current_sa_token = get_gcp_access_token (session_manager )
226+ impersonation_path = [
227+ f"projects/-/serviceAccounts/{ client_id } " for client_id in impersonation_path
228+ ]
229+ try :
230+ res = session_manager .post (
231+ url = f"https://iamcredentials.googleapis.com/v1/{ impersonation_path [- 1 ]} :generateIdToken" ,
232+ headers = {
233+ "Authorization" : f"Bearer { current_sa_token } " ,
234+ "Content-Type" : "application/json" ,
235+ },
236+ json = {
237+ "delegates" : impersonation_path [:- 1 ],
238+ "audience" : SNOWFLAKE_AUDIENCE ,
239+ },
240+ )
241+ res .raise_for_status ()
242+ return res .json ()["token" ]
243+ except Exception as e :
244+ raise ProgrammingError (
245+ msg = f"Error fetching GCP identity token for impersonated GCP service account '{ impersonation_path [- 1 ]} ': { e } . Ensure the application is running on GCP." ,
246+ errno = ER_WIF_CREDENTIALS_NOT_FOUND ,
247+ )
248+
249+
250+ def get_gcp_identity_token (session_manager : SessionManager ) -> str :
251+ """Gets a GCP identity token from the metadata server.
191252
192253 If the application isn't running on GCP or no credentials were found, raises an error.
193254 """
194255 try :
195256 res = session_manager .request (
196257 method = "GET" ,
197- url = f"http://169.254.169.254/computeMetadata/v1/instance/service-accounts/default /identity?audience={ SNOWFLAKE_AUDIENCE } " ,
258+ url = f"{ GCP_METADATA_SERVICE_ACCOUNT_BASE_URL } /identity?audience={ SNOWFLAKE_AUDIENCE } " ,
198259 headers = {
199260 "Metadata-Flavor" : "Google" ,
200261 },
201262 )
202263 res .raise_for_status ()
264+ return res .content .decode ("utf-8" )
203265 except Exception as e :
204266 raise ProgrammingError (
205- msg = f"Error fetching GCP metadata : { e } . Ensure the application is running on GCP." ,
267+ msg = f"Error fetching GCP identity token : { e } . Ensure the application is running on GCP." ,
206268 errno = ER_WIF_CREDENTIALS_NOT_FOUND ,
207269 )
208270
209- jwt_str = res .content .decode ("utf-8" )
271+
272+ def create_gcp_attestation (
273+ session_manager : SessionManager ,
274+ impersonation_path : list [str ] | None = None ,
275+ ) -> WorkloadIdentityAttestation :
276+ """Tries to create a workload identity attestation for GCP.
277+
278+ If the application isn't running on GCP or no credentials were found, raises an error.
279+ """
280+ if impersonation_path :
281+ jwt_str = get_gcp_identity_token_via_impersonation (
282+ impersonation_path , session_manager
283+ )
284+ else :
285+ jwt_str = get_gcp_identity_token (session_manager )
286+
210287 _ , subject = extract_iss_and_sub_without_signature_verification (jwt_str )
211288 return WorkloadIdentityAttestation (
212289 AttestationProvider .GCP , jwt_str , {"sub" : subject }
@@ -295,6 +372,7 @@ def create_attestation(
295372 provider : AttestationProvider ,
296373 entra_resource : str | None = None ,
297374 token : str | None = None ,
375+ impersonation_path : list [str ] | None = None ,
298376 session_manager : SessionManager | None = None ,
299377) -> WorkloadIdentityAttestation :
300378 """Entry point to create an attestation using the given provider.
@@ -313,7 +391,7 @@ def create_attestation(
313391 elif provider == AttestationProvider .AZURE :
314392 return create_azure_attestation (entra_resource , session_manager )
315393 elif provider == AttestationProvider .GCP :
316- return create_gcp_attestation (session_manager )
394+ return create_gcp_attestation (session_manager , impersonation_path )
317395 elif provider == AttestationProvider .OIDC :
318396 return create_oidc_attestation (token )
319397 else :
0 commit comments