2323logger = logging .getLogger (__name__ )
2424SNOWFLAKE_AUDIENCE = "snowflakecomputing.com"
2525DEFAULT_ENTRA_SNOWFLAKE_RESOURCE = "api://fd3f753b-eed3-462c-b6a7-a4b5bb650aad"
26+ GCP_METADATA_SERVICE_ACCOUNT_BASE_URL = (
27+ "http://169.254.169.254/computeMetadata/v1/instance/service-accounts/default"
28+ )
2629
2730
2831@unique
@@ -193,29 +196,103 @@ def create_aws_attestation(
193196 )
194197
195198
196- def create_gcp_attestation (
197- session_manager : SessionManager | None = None ,
198- ) -> WorkloadIdentityAttestation :
199- """Tries to create a workload identity attestation for GCP.
199+ def get_gcp_access_token (session_manager : SessionManager ) -> str :
200+ """Gets a GCP access token from the metadata server.
201+
202+ If the application isn't running on GCP or no credentials were found, raises an error.
203+ """
204+ try :
205+ res = session_manager .request (
206+ method = "GET" ,
207+ url = f"{ GCP_METADATA_SERVICE_ACCOUNT_BASE_URL } /token" ,
208+ headers = {
209+ "Metadata-Flavor" : "Google" ,
210+ },
211+ )
212+ res .raise_for_status ()
213+ return res .json ()["access_token" ]
214+ except Exception as e :
215+ raise ProgrammingError (
216+ msg = f"Error fetching GCP access token: { e } . Ensure the application is running on GCP." ,
217+ errno = ER_WIF_CREDENTIALS_NOT_FOUND ,
218+ )
219+
220+
221+ def get_gcp_identity_token_via_impersonation (
222+ impersonation_path : list [str ], session_manager : SessionManager
223+ ) -> str :
224+ """Gets a GCP identity token from the metadata server.
225+
226+ If the application isn't running on GCP or no credentials were found, raises an error.
227+ """
228+ if not impersonation_path :
229+ raise ProgrammingError (
230+ msg = "Error: impersonation_path cannot be empty." ,
231+ errno = ER_WIF_CREDENTIALS_NOT_FOUND ,
232+ )
233+
234+ current_sa_token = get_gcp_access_token (session_manager )
235+ impersonation_path = [
236+ f"projects/-/serviceAccounts/{ client_id } " for client_id in impersonation_path
237+ ]
238+ try :
239+ res = session_manager .post (
240+ url = f"https://iamcredentials.googleapis.com/v1/{ impersonation_path [- 1 ]} :generateIdToken" ,
241+ headers = {
242+ "Authorization" : f"Bearer { current_sa_token } " ,
243+ "Content-Type" : "application/json" ,
244+ },
245+ json = {
246+ "delegates" : impersonation_path [:- 1 ],
247+ "audience" : SNOWFLAKE_AUDIENCE ,
248+ },
249+ )
250+ res .raise_for_status ()
251+ return res .json ()["token" ]
252+ except Exception as e :
253+ raise ProgrammingError (
254+ msg = f"Error fetching GCP identity token for impersonated GCP service account '{ impersonation_path [- 1 ]} ': { e } . Ensure the application is running on GCP." ,
255+ errno = ER_WIF_CREDENTIALS_NOT_FOUND ,
256+ )
257+
258+
259+ def get_gcp_identity_token (session_manager : SessionManager ) -> str :
260+ """Gets a GCP identity token from the metadata server.
200261
201262 If the application isn't running on GCP or no credentials were found, raises an error.
202263 """
203264 try :
204265 res = session_manager .request (
205266 method = "GET" ,
206- url = f"http://169.254.169.254/computeMetadata/v1/instance/service-accounts/default /identity?audience={ SNOWFLAKE_AUDIENCE } " ,
267+ url = f"{ GCP_METADATA_SERVICE_ACCOUNT_BASE_URL } /identity?audience={ SNOWFLAKE_AUDIENCE } " ,
207268 headers = {
208269 "Metadata-Flavor" : "Google" ,
209270 },
210271 )
211272 res .raise_for_status ()
273+ return res .content .decode ("utf-8" )
212274 except Exception as e :
213275 raise ProgrammingError (
214- msg = f"Error fetching GCP metadata : { e } . Ensure the application is running on GCP." ,
276+ msg = f"Error fetching GCP identity token : { e } . Ensure the application is running on GCP." ,
215277 errno = ER_WIF_CREDENTIALS_NOT_FOUND ,
216278 )
217279
218- jwt_str = res .content .decode ("utf-8" )
280+
281+ def create_gcp_attestation (
282+ session_manager : SessionManager ,
283+ impersonation_path : list [str ] | None = None ,
284+ ) -> WorkloadIdentityAttestation :
285+ """Tries to create a workload identity attestation for GCP.
286+
287+ If the application isn't running on GCP or no credentials were found, raises an error.
288+ """
289+ if impersonation_path :
290+ jwt_str = get_gcp_identity_token_via_impersonation (
291+ impersonation_path , session_manager
292+ )
293+ else :
294+ jwt_str = get_gcp_identity_token (session_manager )
295+
219296 _ , subject = extract_iss_and_sub_without_signature_verification (jwt_str )
220297 return WorkloadIdentityAttestation (
221298 AttestationProvider .GCP , jwt_str , {"sub" : subject }
@@ -304,6 +381,7 @@ def create_attestation(
304381 provider : AttestationProvider ,
305382 entra_resource : str | None = None ,
306383 token : str | None = None ,
384+ impersonation_path : list [str ] | None = None ,
307385 session_manager : SessionManager | None = None ,
308386) -> WorkloadIdentityAttestation :
309387 """Entry point to create an attestation using the given provider.
@@ -322,7 +400,7 @@ def create_attestation(
322400 elif provider == AttestationProvider .AZURE :
323401 return create_azure_attestation (entra_resource , session_manager )
324402 elif provider == AttestationProvider .GCP :
325- return create_gcp_attestation (session_manager )
403+ return create_gcp_attestation (session_manager , impersonation_path )
326404 elif provider == AttestationProvider .OIDC :
327405 return create_oidc_attestation (token )
328406 else :
0 commit comments