-
Notifications
You must be signed in to change notification settings - Fork 3.3k
fix(ingest/iceberg): Extend iceberg source to allow role assumption #15288
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
53e6771
efab896
d5172c1
1c2d21c
06a3366
6a2e9c3
ab68f1a
f3479bc
f56fc8d
2a9f1c8
c5a60ef
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,10 +3,26 @@ | |
| from dataclasses import dataclass, field | ||
| from typing import Any, Dict, Optional | ||
|
|
||
| import boto3 | ||
| from humanfriendly import format_timespan | ||
| from pydantic import Field, field_validator | ||
| from pyiceberg.catalog import Catalog, load_catalog | ||
| from pyiceberg.catalog import BOTOCORE_SESSION, Catalog, load_catalog | ||
| from pyiceberg.catalog.glue import ( | ||
| GLUE_ACCESS_KEY_ID, | ||
| GLUE_PROFILE_NAME, | ||
| GLUE_REGION, | ||
| GLUE_SECRET_ACCESS_KEY, | ||
| GLUE_SESSION_TOKEN, | ||
| ) | ||
| from pyiceberg.catalog.rest import RestCatalog | ||
| from pyiceberg.io import ( | ||
| AWS_ACCESS_KEY_ID, | ||
| AWS_REGION, | ||
| AWS_ROLE_ARN, | ||
| AWS_SECRET_ACCESS_KEY, | ||
| AWS_SESSION_TOKEN, | ||
| ) | ||
| from pyiceberg.utils.properties import get_first_property_value | ||
| from requests.adapters import HTTPAdapter | ||
| from sortedcontainers import SortedList | ||
| from urllib3.util import Retry | ||
|
|
@@ -32,6 +48,8 @@ | |
| DEFAULT_REST_TIMEOUT = 120 | ||
| DEFAULT_REST_RETRY_POLICY = {"total": 3, "backoff_factor": 0.1} | ||
|
|
||
| GLUE_ROLE_ARN = "glue.role-arn" | ||
|
|
||
|
|
||
| class TimeoutHTTPAdapter(HTTPAdapter): | ||
| def __init__(self, *args, **kwargs): | ||
|
|
@@ -154,6 +172,76 @@ def is_profiling_enabled(self) -> bool: | |
| self.profiling.operation_config | ||
| ) | ||
|
|
||
| def _custom_glue_catalog_handling(self, catalog_config: Dict[str, Any]) -> None: | ||
| role_to_assume = get_first_property_value( | ||
| catalog_config, GLUE_ROLE_ARN, AWS_ROLE_ARN | ||
| ) | ||
| if role_to_assume: | ||
| logger.debug( | ||
| "Recognized role ARN in glue catalog config, attempting to workaround pyiceberg limitation in role assumption for the glue client" | ||
| ) | ||
| session = boto3.Session( | ||
| profile_name=catalog_config.get(GLUE_PROFILE_NAME), | ||
| region_name=get_first_property_value( | ||
| catalog_config, GLUE_REGION, AWS_REGION | ||
| ), | ||
| botocore_session=catalog_config.get(BOTOCORE_SESSION), | ||
| aws_access_key_id=get_first_property_value( | ||
| catalog_config, GLUE_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID | ||
| ), | ||
| aws_secret_access_key=get_first_property_value( | ||
| catalog_config, GLUE_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY | ||
| ), | ||
| aws_session_token=get_first_property_value( | ||
| catalog_config, GLUE_SESSION_TOKEN, AWS_SESSION_TOKEN | ||
| ), | ||
| ) | ||
|
|
||
| sts_client = session.client("sts") | ||
| identity = sts_client.get_caller_identity() | ||
| logger.debug( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. also info |
||
| f"Authenticated as {identity['Arn']}, attempting to assume a role: {role_to_assume}" | ||
| ) | ||
|
|
||
| current_role_arn = None | ||
| try: | ||
| if ":assumed-role/" in identity["Arn"]: | ||
| current_role_arn = ( | ||
| "/".join(identity["Arn"].split("/")[0:-1]) | ||
| .replace(":assumed-role/", ":role/") | ||
| .replace("arn:aws:sts", "arn:aws:iam") | ||
| ) | ||
| logger.debug(f"Deducted current role: {current_role_arn}") | ||
| except Exception as e: | ||
| logger.warning( | ||
| "We couldn't convert currently assumed role to 'role' format so that we could compare " | ||
| f"it with the target role, will try to assume the target role nonetheless, exception: {e}" | ||
| ) | ||
|
|
||
| if current_role_arn == role_to_assume: | ||
| logger.debug( | ||
| "Current role and the role we wanted to assume are the same, continuing without further assumption steps" | ||
| ) | ||
| else: | ||
| logger.debug(f"Assuming the role {role_to_assume}") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. info |
||
| # below might fail if such duration is not allowed per policies | ||
| try: | ||
| response = sts_client.assume_role( | ||
| RoleArn=role_to_assume, | ||
| RoleSessionName="session", | ||
| DurationSeconds=43200, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this could be a constant and we could mention that 12h is the max we can set RoleSessionName could be more descriptive |
||
| ) | ||
| except sts_client.exceptions.ClientError: | ||
| # Fallback to default duration | ||
| response = sts_client.assume_role( | ||
| RoleArn=role_to_assume, RoleSessionName="session" | ||
| ) | ||
| logger.debug(f"Assumed role: {response['AssumedRoleUser']}") | ||
| creds = response["Credentials"] | ||
| catalog_config[GLUE_ACCESS_KEY_ID] = creds["AccessKeyId"] | ||
| catalog_config[GLUE_SECRET_ACCESS_KEY] = creds["SecretAccessKey"] | ||
| catalog_config[GLUE_SESSION_TOKEN] = creds["SessionToken"] | ||
|
|
||
| def get_catalog(self) -> Catalog: | ||
| """Returns the Iceberg catalog instance as configured by the `catalog` dictionary. | ||
|
|
||
|
|
@@ -165,9 +253,13 @@ def get_catalog(self) -> Catalog: | |
|
|
||
| # Retrieve the dict associated with the one catalog entry | ||
| catalog_name, catalog_config = next(iter(self.catalog.items())) | ||
| logger.debug( | ||
| "Initializing the catalog %s with config: %s", catalog_name, catalog_config | ||
| ) | ||
| logger.debug("Initializing the catalog %s", catalog_name) | ||
|
|
||
| # workaround pyiceberg 0.10.0 issue with ignoring role assumption for glue catalog, | ||
| # remove this code once pyiceberg is fixed, raised issue: https://github.com/apache/iceberg-python/issues/2747 | ||
| if catalog_config.get("type") == "glue": | ||
| self._custom_glue_catalog_handling(catalog_config) | ||
|
|
||
| catalog = load_catalog(name=catalog_name, **catalog_config) | ||
| if isinstance(catalog, RestCatalog): | ||
| logger.debug( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should probably be info level