2727
2828logger = logging .getLogger (__name__ )
2929
30+ GCP_METADATA_SERVICE_ACCOUNT_BASE_URL = (
31+ "http://169.254.169.254/computeMetadata/v1/instance/service-accounts/default"
32+ )
33+
3034
3135async def get_aws_region () -> str :
3236 """Get the current AWS workload's region."""
@@ -91,30 +95,108 @@ async def create_aws_attestation() -> WorkloadIdentityAttestation:
9195 )
9296
9397
94- async def create_gcp_attestation (
95- session_manager : SessionManager | None = None ,
96- ) -> WorkloadIdentityAttestation :
97- """Tries to create a workload identity attestation for GCP.
98+ async def get_gcp_access_token (session_manager : SessionManager ) -> str :
99+ """Gets a GCP access token from the metadata server.
100+
101+ If the application isn't running on GCP or no credentials were found, raises an error.
102+ """
103+ try :
104+ res = await session_manager .request (
105+ method = "GET" ,
106+ url = f"{ GCP_METADATA_SERVICE_ACCOUNT_BASE_URL } /token" ,
107+ headers = {
108+ "Metadata-Flavor" : "Google" ,
109+ },
110+ )
111+
112+ content = await res .content .read ()
113+ response_text = content .decode ("utf-8" )
114+ return json .loads (response_text )["access_token" ]
115+ except Exception as e :
116+ raise ProgrammingError (
117+ msg = f"Error fetching GCP access token: { e } . Ensure the application is running on GCP." ,
118+ errno = ER_WIF_CREDENTIALS_NOT_FOUND ,
119+ )
120+
121+
122+ async def get_gcp_identity_token_via_impersonation (
123+ impersonation_path : list [str ], session_manager : SessionManager
124+ ) -> str :
125+ """Gets a GCP identity token from the metadata server.
126+
127+ If the application isn't running on GCP or no credentials were found, raises an error.
128+ """
129+ if not impersonation_path :
130+ raise ProgrammingError (
131+ msg = "Error: impersonation_path cannot be empty." ,
132+ errno = ER_WIF_CREDENTIALS_NOT_FOUND ,
133+ )
134+
135+ current_sa_token = await get_gcp_access_token (session_manager )
136+ impersonation_path = [
137+ f"projects/-/serviceAccounts/{ client_id } " for client_id in impersonation_path
138+ ]
139+ try :
140+ res = await session_manager .post (
141+ url = f"https://iamcredentials.googleapis.com/v1/{ impersonation_path [- 1 ]} :generateIdToken" ,
142+ headers = {
143+ "Authorization" : f"Bearer { current_sa_token } " ,
144+ "Content-Type" : "application/json" ,
145+ },
146+ json = {
147+ "delegates" : impersonation_path [:- 1 ],
148+ "audience" : SNOWFLAKE_AUDIENCE ,
149+ },
150+ )
151+
152+ content = await res .content .read ()
153+ response_text = content .decode ("utf-8" )
154+ return json .loads (response_text )["token" ]
155+ except Exception as e :
156+ raise ProgrammingError (
157+ msg = f"Error fetching GCP identity token for impersonated GCP service account '{ impersonation_path [- 1 ]} ': { e } . Ensure the application is running on GCP." ,
158+ errno = ER_WIF_CREDENTIALS_NOT_FOUND ,
159+ )
160+
161+
162+ async def get_gcp_identity_token (session_manager : SessionManager ) -> str :
163+ """Gets a GCP identity token from the metadata server.
98164
99165 If the application isn't running on GCP or no credentials were found, raises an error.
100166 """
101167 try :
102168 res = await session_manager .request (
103169 method = "GET" ,
104- url = f"http://169.254.169.254/computeMetadata/v1/instance/service-accounts/default /identity?audience={ SNOWFLAKE_AUDIENCE } " ,
170+ url = f"{ GCP_METADATA_SERVICE_ACCOUNT_BASE_URL } /identity?audience={ SNOWFLAKE_AUDIENCE } " ,
105171 headers = {
106172 "Metadata-Flavor" : "Google" ,
107173 },
108174 )
109175
110176 content = await res .content .read ()
111- jwt_str = content .decode ("utf-8" )
177+ return content .decode ("utf-8" )
112178 except Exception as e :
113179 raise ProgrammingError (
114- msg = f"Error fetching GCP metadata : { e } . Ensure the application is running on GCP." ,
180+ msg = f"Error fetching GCP identity token : { e } . Ensure the application is running on GCP." ,
115181 errno = ER_WIF_CREDENTIALS_NOT_FOUND ,
116182 )
117183
184+
185+ async def create_gcp_attestation (
186+ session_manager : SessionManager ,
187+ impersonation_path : list [str ] | None = None ,
188+ ) -> WorkloadIdentityAttestation :
189+ """Tries to create a workload identity attestation for GCP.
190+
191+ If the application isn't running on GCP or no credentials were found, raises an error.
192+ """
193+ if impersonation_path :
194+ jwt_str = await get_gcp_identity_token_via_impersonation (
195+ impersonation_path , session_manager
196+ )
197+ else :
198+ jwt_str = await get_gcp_identity_token (session_manager )
199+
118200 _ , subject = extract_iss_and_sub_without_signature_verification (jwt_str )
119201 return WorkloadIdentityAttestation (
120202 AttestationProvider .GCP , jwt_str , {"sub" : subject }
@@ -189,6 +271,7 @@ async def create_attestation(
189271 provider : AttestationProvider | None ,
190272 entra_resource : str | None = None ,
191273 token : str | None = None ,
274+ impersonation_path : list [str ] | None = None ,
192275 session_manager : SessionManager | None = None ,
193276) -> WorkloadIdentityAttestation :
194277 """Entry point to create an attestation using the given provider.
@@ -207,7 +290,7 @@ async def create_attestation(
207290 elif provider == AttestationProvider .AZURE :
208291 return await create_azure_attestation (entra_resource , session_manager )
209292 elif provider == AttestationProvider .GCP :
210- return await create_gcp_attestation (session_manager )
293+ return await create_gcp_attestation (session_manager , impersonation_path )
211294 elif provider == AttestationProvider .OIDC :
212295 return create_oidc_attestation (token )
213296 else :
0 commit comments