Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,13 @@ def _try_processing_dataset(
context=dataset_name,
exc=e,
)
except OSError as e:
self.report.warning(
title="Can't read manifest",
message="Provided manifest path appeared impossible to read",
context=dataset_name,
exc=e,
)
except ValueError as e:
if "Could not initialize FileIO" not in str(e):
raise
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,26 @@
from dataclasses import dataclass, field
from typing import Any, Dict, Optional

import boto3
from humanfriendly import format_timespan
from pydantic import Field, field_validator
from pyiceberg.catalog import Catalog, load_catalog
from pyiceberg.catalog import BOTOCORE_SESSION, Catalog, load_catalog
from pyiceberg.catalog.glue import (
GLUE_ACCESS_KEY_ID,
GLUE_PROFILE_NAME,
GLUE_REGION,
GLUE_SECRET_ACCESS_KEY,
GLUE_SESSION_TOKEN,
)
from pyiceberg.catalog.rest import RestCatalog
from pyiceberg.io import (
AWS_ACCESS_KEY_ID,
AWS_REGION,
AWS_ROLE_ARN,
AWS_SECRET_ACCESS_KEY,
AWS_SESSION_TOKEN,
)
from pyiceberg.utils.properties import get_first_property_value
from requests.adapters import HTTPAdapter
from sortedcontainers import SortedList
from urllib3.util import Retry
Expand All @@ -32,6 +48,8 @@
DEFAULT_REST_TIMEOUT = 120
DEFAULT_REST_RETRY_POLICY = {"total": 3, "backoff_factor": 0.1}

GLUE_ROLE_ARN = "glue.role-arn"


class TimeoutHTTPAdapter(HTTPAdapter):
def __init__(self, *args, **kwargs):
Expand Down Expand Up @@ -154,6 +172,76 @@ def is_profiling_enabled(self) -> bool:
self.profiling.operation_config
)

def _custom_glue_catalog_handling(self, catalog_config: Dict[str, Any]) -> None:
role_to_assume = get_first_property_value(
catalog_config, GLUE_ROLE_ARN, AWS_ROLE_ARN
)
if role_to_assume:
logger.debug(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should probably be info level

"Recognized role ARN in glue catalog config, attempting to workaround pyiceberg limitation in role assumption for the glue client"
)
session = boto3.Session(
profile_name=catalog_config.get(GLUE_PROFILE_NAME),
region_name=get_first_property_value(
catalog_config, GLUE_REGION, AWS_REGION
),
botocore_session=catalog_config.get(BOTOCORE_SESSION),
aws_access_key_id=get_first_property_value(
catalog_config, GLUE_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID
),
aws_secret_access_key=get_first_property_value(
catalog_config, GLUE_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY
),
aws_session_token=get_first_property_value(
catalog_config, GLUE_SESSION_TOKEN, AWS_SESSION_TOKEN
),
)

sts_client = session.client("sts")
identity = sts_client.get_caller_identity()
logger.debug(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also info
IMO :-)

f"Authenticated as {identity['Arn']}, attempting to assume a role: {role_to_assume}"
)

current_role_arn = None
try:
if ":assumed-role/" in identity["Arn"]:
current_role_arn = (
"/".join(identity["Arn"].split("/")[0:-1])
.replace(":assumed-role/", ":role/")
.replace("arn:aws:sts", "arn:aws:iam")
)
logger.debug(f"Deducted current role: {current_role_arn}")
except Exception as e:
logger.warning(
"We couldn't convert currently assumed role to 'role' format so that we could compare "
f"it with the target role, will try to assume the target role nonetheless, exception: {e}"
)

if current_role_arn == role_to_assume:
logger.debug(
"Current role and the role we wanted to assume are the same, continuing without further assumption steps"
)
else:
logger.debug(f"Assuming the role {role_to_assume}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

info

# below might fail if such duration is not allowed per policies
try:
response = sts_client.assume_role(
RoleArn=role_to_assume,
RoleSessionName="session",
DurationSeconds=43200,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could be a constant and we could mention that 12h is the max we can set

RoleSessionName could be more descriptive

)
except sts_client.exceptions.ClientError:
# Fallback to default duration
response = sts_client.assume_role(
RoleArn=role_to_assume, RoleSessionName="session"
)
logger.debug(f"Assumed role: {response['AssumedRoleUser']}")
creds = response["Credentials"]
catalog_config[GLUE_ACCESS_KEY_ID] = creds["AccessKeyId"]
catalog_config[GLUE_SECRET_ACCESS_KEY] = creds["SecretAccessKey"]
catalog_config[GLUE_SESSION_TOKEN] = creds["SessionToken"]

def get_catalog(self) -> Catalog:
"""Returns the Iceberg catalog instance as configured by the `catalog` dictionary.

Expand All @@ -165,9 +253,13 @@ def get_catalog(self) -> Catalog:

# Retrieve the dict associated with the one catalog entry
catalog_name, catalog_config = next(iter(self.catalog.items()))
logger.debug(
"Initializing the catalog %s with config: %s", catalog_name, catalog_config
)
logger.debug("Initializing the catalog %s", catalog_name)

# workaround pyiceberg 0.10.0 issue with ignoring role assumption for glue catalog,
# remove this code once pyiceberg is fixed, raised issue: https://github.com/apache/iceberg-python/issues/2747
if catalog_config.get("type") == "glue":
self._custom_glue_catalog_handling(catalog_config)

catalog = load_catalog(name=catalog_name, **catalog_config)
if isinstance(catalog, RestCatalog):
logger.debug(
Expand Down
Loading
Loading