diff --git a/mkdocs/docs/multi-part-namespace.md b/mkdocs/docs/multi-part-namespace.md new file mode 100644 index 0000000000..e5f7e14f93 --- /dev/null +++ b/mkdocs/docs/multi-part-namespace.md @@ -0,0 +1,31 @@ +# Multi-Part Namespace Support + +Some catalog implementations support multi-part namespaces, which allows for hierarchical organization of tables. The following table summarizes the support for multi-part namespaces across different catalog implementations in Iceberg Python. + +| Catalog Implementation | Multi-Part Namespace Support | Notes | +|------------------------|------------------------------|-------| +| REST Catalog | ✅ Yes | Fully supports multi-part namespace as defined by the REST catalog specification. | +| Hive Catalog | ❌ No | Spark does not support multi-part namespace. | +| DynamoDB Catalog | ✅ Yes | Namespace is represented as a composite key in DynamoDB. | +| Glue Catalog | ❌ No | Uses AWS Glue databases which don't support multi-part namespace. | +| File Catalog | ✅ Yes | Namespace parts are represented as directory hierarchies in the file system. | +| In-Memory Catalog | ✅ Yes | Supports multi-part namespace for testing purposes. | + +## Usage Example + +```python +from pyiceberg.catalog import load_catalog + +# Using a catalog with multi-part namespace support +catalog = load_catalog("my_catalog") + +# Creating a table with a multi-part namespace +catalog.create_table("default.multi.table_name", schema, spec) + +# Listing tables in a multi-part namespace +tables = catalog.list_tables("default.multi") +``` + +## Configuration + +When using catalogs that support multi-part namespaces, make sure to use the appropriate delimiter (typically `.`) when referencing namespaces in your code. diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index b13d9294a0..38665f2b4c 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -31,6 +31,7 @@ cast, ) +from pyiceberg.catalog._meta import CatalogABCMeta from pyiceberg.exceptions import ( NamespaceAlreadyExistsError, NoSuchNamespaceError, @@ -331,7 +332,7 @@ class PropertiesUpdateSummary: missing: list[str] -class Catalog(ABC): +class Catalog(ABC, metaclass=CatalogABCMeta): """Base Catalog for table operations like - create, drop, load, list and others. The catalog table APIs accept a table identifier, which is fully classified table name. The identifier can be a string or @@ -723,25 +724,44 @@ def namespace_to_string(identifier: str | Identifier, err: type[ValueError] | ty return ".".join(segment.strip() for segment in tuple_identifier) @staticmethod + def namespace_level(identifier: str | Identifier) -> int: + """Get the level of a namespace identifier. + + Args: + identifier (Union[str, Identifier]): a namespace identifier. + + Returns: + int: The level of the namespace. + """ + if not identifier: + return 1 + tuple_identifier = Catalog.identifier_to_tuple(identifier) + return len(tuple_identifier) + 1 + + @classmethod def identifier_to_database( - identifier: str | Identifier, err: type[ValueError] | type[NoSuchNamespaceError] = ValueError + cls, identifier: str | Identifier, err: type[ValueError] | type[NoSuchNamespaceError] = ValueError ) -> str: tuple_identifier = Catalog.identifier_to_tuple(identifier) - if len(tuple_identifier) != 1: - raise err(f"Invalid database, hierarchical namespaces are not supported: {identifier}") - return tuple_identifier[0] + if not cls._support_namespaces: + if len(tuple_identifier) != 1: + raise err(f"Invalid database, hierarchical namespaces are not supported: {identifier}") + else: + return tuple_identifier[0] + + return ".".join(tuple_identifier) - @staticmethod + @classmethod def identifier_to_database_and_table( + cls, identifier: str | Identifier, err: type[ValueError] | type[NoSuchTableError] | type[NoSuchNamespaceError] = ValueError, ) -> tuple[str, str]: tuple_identifier = Catalog.identifier_to_tuple(identifier) - if len(tuple_identifier) != 2: + if not cls._support_namespaces and len(tuple_identifier) != 2: raise err(f"Invalid path, hierarchical namespaces are not supported: {identifier}") - - return tuple_identifier[0], tuple_identifier[1] + return ".".join(tuple_identifier[:-1]), tuple_identifier[-1] def _load_file_io(self, properties: Properties = EMPTY_DICT, location: str | None = None) -> FileIO: return load_file_io({**self.properties, **properties}, location) diff --git a/pyiceberg/catalog/_meta.py b/pyiceberg/catalog/_meta.py new file mode 100644 index 0000000000..0ed87ccf66 --- /dev/null +++ b/pyiceberg/catalog/_meta.py @@ -0,0 +1,76 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from abc import ABCMeta +from typing import TYPE_CHECKING, Any, Callable, TypeVar + +if TYPE_CHECKING: + from pyiceberg.catalog import Catalog + +_T = TypeVar("_T", bound="Catalog") + + +class NamespaceMeta(type): + """Metaclass for managing namespace support configuration in catalog implementations. + + This metaclass automatically handles the inheritance and initialization of namespace-related + attributes for catalog classes. It ensures that namespace support configuration is properly + propagated through class inheritance hierarchies. + + Attributes: + _support_namespaces (bool): Indicates whether the catalog supports nested namespaces. + Defaults to False. When True, the catalog can handle hierarchical namespace + structures beyond simple flat namespaces. + _max_namespace_depth (int): Maximum depth allowed for nested namespaces. + Defaults to -1 (unlimited depth). When set to a positive integer, + restricts the nesting level of namespaces that can be created. + """ + + _support_namespaces: bool = False + _max_namespace_depth: int = -1 + + def __new__(mcls, name: str, bases: tuple[type, ...], atrrs: dict[str, Any], /, **kwargs: Any) -> type: + cls = super().__new__(mcls, name, bases, atrrs, **kwargs) + if "_support_namespaces" in atrrs: + pass # Already defined in the class + elif hasattr(bases[0], "_support_namespaces"): + cls._support_namespaces = bases[0]._support_namespaces + else: + cls._support_namespaces = NamespaceMeta._support_namespaces + return cls + + +class CatalogABCMeta(NamespaceMeta, ABCMeta): + """Metaclass for catalog implementations that combines namespace and abstract base class functionality. + + This metaclass inherits from both NamespaceMeta and ABCMeta. + """ + + +def multiple_namespaces( + _cls: type[_T] | None = None, /, disabled: bool = False, max_depth: int = -1 +) -> type[_T] | Callable[[type[_T]], type[_T]]: + def wrapper(cls: type[_T]) -> type[_T]: + if not hasattr(cls, "_support_namespaces"): + raise ValueError(f"{cls.__name__} must inherit Catalog with CatalogABCMeta and NamespaceMeta to use this decorator") + if max_depth >= 0 and max_depth <= 1 or disabled: + cls._support_namespaces = False + else: + cls._support_namespaces = True + cls._max_namespace_depth = max_depth + return cls + + return wrapper if _cls is None else wrapper(_cls) diff --git a/pyiceberg/catalog/dynamodb.py b/pyiceberg/catalog/dynamodb.py index 2d35b2c5e2..6500ec307e 100644 --- a/pyiceberg/catalog/dynamodb.py +++ b/pyiceberg/catalog/dynamodb.py @@ -92,6 +92,8 @@ class DynamoDbCatalog(MetastoreCatalog): + _support_namespaces: bool = True + def __init__(self, name: str, client: Optional["DynamoDBClient"] = None, **properties: str): """Dynamodb catalog. @@ -441,16 +443,23 @@ def list_tables(self, namespace: str | Identifier) -> list[Identifier]: return table_identifiers def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]: - """List top-level namespaces from the catalog. - - We do not support hierarchical namespace. + """List namespaces from the catalog. Returns: List[Identifier]: a List of namespace identifiers. """ - # Hierarchical namespace is not supported. Return an empty list + level = self.namespace_level(namespace) + conditions = f"{DYNAMODB_COL_IDENTIFIER} = :identifier" + expression_attribute_values = { + ":identifier": { + "S": DYNAMODB_NAMESPACE, + } + } if namespace: - return [] + conditions += f" AND begins_with({DYNAMODB_COL_NAMESPACE},:ns)" + expression_attribute_values[":ns"] = { + "S": self.namespace_to_string(namespace) + ".", + } paginator = self.dynamodb.get_paginator("query") @@ -458,12 +467,8 @@ def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]: page_iterator = paginator.paginate( TableName=self.dynamodb_table_name, ConsistentRead=True, - KeyConditionExpression=f"{DYNAMODB_COL_IDENTIFIER} = :identifier", - ExpressionAttributeValues={ - ":identifier": { - "S": DYNAMODB_NAMESPACE, - } - }, + KeyConditionExpression=conditions, + ExpressionAttributeValues=expression_attribute_values, ) except ( self.dynamodb.exceptions.ProvisionedThroughputExceededException, @@ -473,14 +478,14 @@ def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]: ) as e: raise GenericDynamoDbError(e.message) from e - database_identifiers = [] + database_identifiers = set() for page in page_iterator: for item in page["Items"]: _dict = _convert_dynamo_item_to_regular_dict(item) namespace_col = _dict[DYNAMODB_COL_NAMESPACE] - database_identifiers.append(self.identifier_to_tuple(namespace_col)) + database_identifiers.add(self.identifier_to_tuple(namespace_col)[:level]) - return database_identifiers + return list(database_identifiers) def load_namespace_properties(self, namespace: str | Identifier) -> Properties: """ diff --git a/pyiceberg/catalog/rest/__init__.py b/pyiceberg/catalog/rest/__init__.py index 3b77fd47f0..9a3b089a0f 100644 --- a/pyiceberg/catalog/rest/__init__.py +++ b/pyiceberg/catalog/rest/__init__.py @@ -214,6 +214,7 @@ class ListViewsResponse(IcebergBaseModel): class RestCatalog(Catalog): uri: str _session: Session + _support_namespaces: bool = True def __init__(self, name: str, **properties: str): """Rest Catalog. diff --git a/pyiceberg/catalog/sql.py b/pyiceberg/catalog/sql.py index 2b6fa74517..5edac0888b 100644 --- a/pyiceberg/catalog/sql.py +++ b/pyiceberg/catalog/sql.py @@ -113,6 +113,8 @@ class SqlCatalog(MetastoreCatalog): The `SqlCatalog` has a different convention where a `TableIdentifier` requires a `Namespace`. """ + _support_namespaces: bool = True + def __init__(self, name: str, **properties: str): super().__init__(name, **properties) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 56cab7618f..dcc7e9def9 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -29,8 +29,10 @@ TYPE_CHECKING, Any, Callable, + ClassVar, Iterable, Iterator, + Sequence, TypeVar, ) @@ -1020,20 +1022,70 @@ def commit_transaction(self) -> Table: return self._table -class Namespace(IcebergRootModel[list[str]]): +class Namespace(IcebergRootModel[tuple[str, ...]]): """Reference to one or more levels of a namespace.""" - root: list[str] = Field( + root: tuple[str, ...] = Field( ..., description="Reference to one or more levels of a namespace", ) + def __len__(self) -> int: + """Fetch the size of Namespace.""" + return len(self.root) + + def __getitem__(self, index: int) -> str: + """Fetch a value from a Namespace.""" + return self.root[index] + + def __iter__(self) -> Iterator[str]: + """Return an iterator over the elements in the root of the table.""" + return iter(self.root) + + def levels(self) -> int: + """Return the number of levels in this namespace.""" + return len(self.root) + + def __repr__(self) -> str: + """Return a string representation of the namespace.""" + return f"Namespace({self.root})" + class TableIdentifier(IcebergBaseModel): """Fully Qualified identifier to a table.""" namespace: Namespace name: str + _separator: ClassVar[str] = "." + + @classmethod + def from_string(cls, identifier: str) -> TableIdentifier: + """Create a TableIdentifier from a separator. + + Args: + identifier: A separator representing the table identifier, e.g., "db.schema.table". + + Returns: + A TableIdentifier instance. + """ + parts = identifier.split(cls._separator) + return cls.from_tuple(parts) + + @classmethod + def from_tuple(cls, identifier: Sequence[str]) -> TableIdentifier: + """Create a TableIdentifier from a tuple. + + Args: + identifier: A tuple representing the table identifier, e.g., ("db", "schema", "table"). + + Returns: + A TableIdentifier instance. + """ + if len(identifier) < 2: + raise ValueError("Identifier must include at least a namespace and a table name.") + namespace = Namespace(root=tuple(identifier[:-1])) + name = identifier[-1] + return cls(namespace=namespace, name=name) class CommitTableRequest(IcebergBaseModel): diff --git a/tests/catalog/test_rest.py b/tests/catalog/test_rest.py index efc89c7c7e..82a1be4bc7 100644 --- a/tests/catalog/test_rest.py +++ b/tests/catalog/test_rest.py @@ -544,6 +544,23 @@ def test_list_namespaces_200(rest_mock: Mocker) -> None: ] +def test_list_multipart_namespaces_200(rest_mock: Mocker) -> None: + rest_mock.get( + f"{TEST_URI}v1/namespaces", + json={"namespaces": [["default"], ["multipart"]]}, + status_code=200, + request_headers=TEST_HEADERS, + ) + assert RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_namespaces() == [("default",), ("multipart",)] + + rest_mock.get( + f"{TEST_URI}v1/namespaces?parent=multipart", + json={"namespaces": [["multipart", "namespace1"], ["multipart", "namespace2"]]}, + status_code=200, + request_headers=TEST_HEADERS, + ) + + def test_list_namespace_with_parent_200(rest_mock: Mocker) -> None: rest_mock.get( f"{TEST_URI}v1/namespaces?parent=accounting",