Skip to content

Commit 3fa8313

Browse files
skrydalpedro93
authored andcommitted
fix(ingest/iceberg): Extend iceberg source to allow role assumption (#15288)
Co-authored-by: pedro93 <pedro.cls93@gmail.com>
1 parent f9ce4a5 commit 3fa8313

File tree

3 files changed

+507
-5
lines changed

3 files changed

+507
-5
lines changed

metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,13 @@ def _try_processing_dataset(
340340
context=dataset_name,
341341
exc=e,
342342
)
343+
except OSError as e:
344+
self.report.warning(
345+
title="Can't read manifest",
346+
message="Provided manifest path appeared impossible to read",
347+
context=dataset_name,
348+
exc=e,
349+
)
343350
except ValueError as e:
344351
if "Could not initialize FileIO" not in str(e):
345352
raise

metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg_common.py

Lines changed: 96 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,26 @@
33
from dataclasses import dataclass, field
44
from typing import Any, Dict, Optional
55

6+
import boto3
67
from humanfriendly import format_timespan
78
from pydantic import Field, field_validator
8-
from pyiceberg.catalog import Catalog, load_catalog
9+
from pyiceberg.catalog import BOTOCORE_SESSION, Catalog, load_catalog
10+
from pyiceberg.catalog.glue import (
11+
GLUE_ACCESS_KEY_ID,
12+
GLUE_PROFILE_NAME,
13+
GLUE_REGION,
14+
GLUE_SECRET_ACCESS_KEY,
15+
GLUE_SESSION_TOKEN,
16+
)
917
from pyiceberg.catalog.rest import RestCatalog
18+
from pyiceberg.io import (
19+
AWS_ACCESS_KEY_ID,
20+
AWS_REGION,
21+
AWS_ROLE_ARN,
22+
AWS_SECRET_ACCESS_KEY,
23+
AWS_SESSION_TOKEN,
24+
)
25+
from pyiceberg.utils.properties import get_first_property_value
1026
from requests.adapters import HTTPAdapter
1127
from sortedcontainers import SortedList
1228
from urllib3.util import Retry
@@ -32,6 +48,8 @@
3248
DEFAULT_REST_TIMEOUT = 120
3349
DEFAULT_REST_RETRY_POLICY = {"total": 3, "backoff_factor": 0.1}
3450

51+
GLUE_ROLE_ARN = "glue.role-arn"
52+
3553

3654
class TimeoutHTTPAdapter(HTTPAdapter):
3755
def __init__(self, *args, **kwargs):
@@ -154,6 +172,76 @@ def is_profiling_enabled(self) -> bool:
154172
self.profiling.operation_config
155173
)
156174

175+
def _custom_glue_catalog_handling(self, catalog_config: Dict[str, Any]) -> None:
176+
role_to_assume = get_first_property_value(
177+
catalog_config, GLUE_ROLE_ARN, AWS_ROLE_ARN
178+
)
179+
if role_to_assume:
180+
logger.debug(
181+
"Recognized role ARN in glue catalog config, attempting to workaround pyiceberg limitation in role assumption for the glue client"
182+
)
183+
session = boto3.Session(
184+
profile_name=catalog_config.get(GLUE_PROFILE_NAME),
185+
region_name=get_first_property_value(
186+
catalog_config, GLUE_REGION, AWS_REGION
187+
),
188+
botocore_session=catalog_config.get(BOTOCORE_SESSION),
189+
aws_access_key_id=get_first_property_value(
190+
catalog_config, GLUE_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID
191+
),
192+
aws_secret_access_key=get_first_property_value(
193+
catalog_config, GLUE_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY
194+
),
195+
aws_session_token=get_first_property_value(
196+
catalog_config, GLUE_SESSION_TOKEN, AWS_SESSION_TOKEN
197+
),
198+
)
199+
200+
sts_client = session.client("sts")
201+
identity = sts_client.get_caller_identity()
202+
logger.debug(
203+
f"Authenticated as {identity['Arn']}, attempting to assume a role: {role_to_assume}"
204+
)
205+
206+
current_role_arn = None
207+
try:
208+
if ":assumed-role/" in identity["Arn"]:
209+
current_role_arn = (
210+
"/".join(identity["Arn"].split("/")[0:-1])
211+
.replace(":assumed-role/", ":role/")
212+
.replace("arn:aws:sts", "arn:aws:iam")
213+
)
214+
logger.debug(f"Deducted current role: {current_role_arn}")
215+
except Exception as e:
216+
logger.warning(
217+
"We couldn't convert currently assumed role to 'role' format so that we could compare "
218+
f"it with the target role, will try to assume the target role nonetheless, exception: {e}"
219+
)
220+
221+
if current_role_arn == role_to_assume:
222+
logger.debug(
223+
"Current role and the role we wanted to assume are the same, continuing without further assumption steps"
224+
)
225+
else:
226+
logger.debug(f"Assuming the role {role_to_assume}")
227+
# below might fail if such duration is not allowed per policies
228+
try:
229+
response = sts_client.assume_role(
230+
RoleArn=role_to_assume,
231+
RoleSessionName="session",
232+
DurationSeconds=43200,
233+
)
234+
except sts_client.exceptions.ClientError:
235+
# Fallback to default duration
236+
response = sts_client.assume_role(
237+
RoleArn=role_to_assume, RoleSessionName="session"
238+
)
239+
logger.debug(f"Assumed role: {response['AssumedRoleUser']}")
240+
creds = response["Credentials"]
241+
catalog_config[GLUE_ACCESS_KEY_ID] = creds["AccessKeyId"]
242+
catalog_config[GLUE_SECRET_ACCESS_KEY] = creds["SecretAccessKey"]
243+
catalog_config[GLUE_SESSION_TOKEN] = creds["SessionToken"]
244+
157245
def get_catalog(self) -> Catalog:
158246
"""Returns the Iceberg catalog instance as configured by the `catalog` dictionary.
159247
@@ -165,9 +253,13 @@ def get_catalog(self) -> Catalog:
165253

166254
# Retrieve the dict associated with the one catalog entry
167255
catalog_name, catalog_config = next(iter(self.catalog.items()))
168-
logger.debug(
169-
"Initializing the catalog %s with config: %s", catalog_name, catalog_config
170-
)
256+
logger.debug("Initializing the catalog %s", catalog_name)
257+
258+
# workaround pyiceberg 0.10.0 issue with ignoring role assumption for glue catalog,
259+
# remove this code once pyiceberg is fixed, raised issue: https://github.com/apache/iceberg-python/issues/2747
260+
if catalog_config.get("type") == "glue":
261+
self._custom_glue_catalog_handling(catalog_config)
262+
171263
catalog = load_catalog(name=catalog_name, **catalog_config)
172264
if isinstance(catalog, RestCatalog):
173265
logger.debug(

0 commit comments

Comments
 (0)