Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/pypi-build-artifacts.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ jobs:
# Ignore tests for pypy since not all dependencies are compiled for it
# and would require a local rust build chain
CIBW_TEST_SKIP: "pp*"
# Skip free-threaded (PEP 703) builds until we evaluate decoder_fast support
CIBW_SKIP: "cp3*t-*"

- name: Add source distribution
if: startsWith(matrix.os, 'ubuntu')
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/svn-build-artifacts.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ jobs:
# Ignore tests for pypy since not all dependencies are compiled for it
# and would require a local rust build chain
CIBW_TEST_SKIP: "pp*"
# Skip free-threaded (PEP 703) builds until we evaluate decoder_fast support
CIBW_SKIP: "cp3*t-*"

- name: Add source distribution
if: startsWith(matrix.os, 'ubuntu')
Expand Down
25 changes: 14 additions & 11 deletions dev/hive/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,24 @@
# See the License for the specific language governing permissions and
# limitations under the License.

FROM openjdk:8-jre-slim AS build

RUN apt-get update -qq && apt-get -qq -y install curl
FROM apache/hive:4.0.0

ENV HADOOP_VERSION=3.3.6
ENV AWS_SDK_BUNDLE=1.12.753

RUN curl https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${HADOOP_VERSION}/hadoop-aws-${HADOOP_VERSION}.jar -Lo /tmp/hadoop-aws-${HADOOP_VERSION}.jar
RUN curl https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/${AWS_SDK_BUNDLE}/aws-java-sdk-bundle-${AWS_SDK_BUNDLE}.jar -Lo /tmp/aws-java-sdk-bundle-${AWS_SDK_BUNDLE}.jar
USER root

FROM apache/hive:4.0.0
# Install curl and download AWS JARs needed for S3 support
RUN apt-get update -qq && \
apt-get install -y --no-install-recommends curl && \
curl -fsSL https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${HADOOP_VERSION}/hadoop-aws-${HADOOP_VERSION}.jar \
-o /opt/hive/lib/hadoop-aws-${HADOOP_VERSION}.jar && \
curl -fsSL https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/${AWS_SDK_BUNDLE}/aws-java-sdk-bundle-${AWS_SDK_BUNDLE}.jar \
-o /opt/hive/lib/aws-java-sdk-bundle-${AWS_SDK_BUNDLE}.jar && \
apt-get remove -y curl && \
apt-get autoremove -y && \
rm -rf /var/lib/apt/lists/*

ENV HADOOP_VERSION=3.3.6
ENV AWS_SDK_BUNDLE=1.12.753

COPY --from=build /tmp/hadoop-aws-${HADOOP_VERSION}.jar /opt/hive/lib/hadoop-aws-${HADOOP_VERSION}.jar
COPY --from=build /tmp/aws-java-sdk-bundle-${AWS_SDK_BUNDLE}.jar /opt/hive/lib/aws-java-sdk-bundle-${AWS_SDK_BUNDLE}.jar
COPY core-site.xml /opt/hadoop/etc/hadoop/core-site.xml

USER hive
152 changes: 81 additions & 71 deletions poetry.lock

Large diffs are not rendered by default.

53 changes: 26 additions & 27 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1492,14 +1492,18 @@ def _field_id(self, field: pa.Field) -> int:


def _get_column_projection_values(
file: DataFile, projected_schema: Schema, partition_spec: Optional[PartitionSpec], file_project_field_ids: Set[int]
file: DataFile,
projected_schema: Schema,
table_schema: Schema,
partition_spec: Optional[PartitionSpec],
file_project_field_ids: Set[int],
) -> Dict[int, Any]:
"""Apply Column Projection rules to File Schema."""
project_schema_diff = projected_schema.field_ids.difference(file_project_field_ids)
if len(project_schema_diff) == 0 or partition_spec is None:
return EMPTY_DICT

partition_schema = partition_spec.partition_type(projected_schema)
partition_schema = partition_spec.partition_type(table_schema)
accessors = build_position_accessors(partition_schema)

projected_missing_fields = {}
Expand All @@ -1517,6 +1521,7 @@ def _task_to_record_batches(
task: FileScanTask,
bound_row_filter: BooleanExpression,
projected_schema: Schema,
table_schema: Schema,
projected_field_ids: Set[int],
positional_deletes: Optional[List[ChunkedArray]],
case_sensitive: bool,
Expand All @@ -1541,7 +1546,7 @@ def _task_to_record_batches(

# Apply column projection rules: https://iceberg.apache.org/spec/#column-projection
projected_missing_fields = _get_column_projection_values(
task.file, projected_schema, partition_spec, file_schema.field_ids
task.file, projected_schema, table_schema, partition_spec, file_schema.field_ids
)

pyarrow_filter = None
Expand Down Expand Up @@ -1763,6 +1768,7 @@ def _record_batches_from_scan_tasks_and_deletes(
task,
self._bound_row_filter,
self._projected_schema,
self._table_metadata.schema(),
self._projected_field_ids,
deletes_per_file.get(task.file.file_path),
self._case_sensitive,
Expand Down Expand Up @@ -2790,30 +2796,26 @@ def _dataframe_to_data_files(
yield from write_file(
io=io,
table_metadata=table_metadata,
tasks=iter(
[
WriteTask(write_uuid=write_uuid, task_id=next(counter), record_batches=batches, schema=task_schema)
for batches in bin_pack_arrow_table(df, target_file_size)
]
tasks=(
WriteTask(write_uuid=write_uuid, task_id=next(counter), record_batches=batches, schema=task_schema)
for batches in bin_pack_arrow_table(df, target_file_size)
),
)
else:
partitions = _determine_partitions(spec=table_metadata.spec(), schema=table_metadata.schema(), arrow_table=df)
yield from write_file(
io=io,
table_metadata=table_metadata,
tasks=iter(
[
WriteTask(
write_uuid=write_uuid,
task_id=next(counter),
record_batches=batches,
partition_key=partition.partition_key,
schema=task_schema,
)
for partition in partitions
for batches in bin_pack_arrow_table(partition.arrow_table_partition, target_file_size)
]
tasks=(
WriteTask(
write_uuid=write_uuid,
task_id=next(counter),
record_batches=batches,
partition_key=partition.partition_key,
schema=task_schema,
)
for partition in partitions
for batches in bin_pack_arrow_table(partition.arrow_table_partition, target_file_size)
),
)

Expand All @@ -2824,7 +2826,7 @@ class _TablePartition:
arrow_table_partition: pa.Table


def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.Table) -> List[_TablePartition]:
def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.Table) -> Iterable[_TablePartition]:
"""Based on the iceberg table partition spec, filter the arrow table into partitions with their keys.

Example:
Expand Down Expand Up @@ -2852,8 +2854,6 @@ def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.T

unique_partition_fields = arrow_table.select(partition_fields).group_by(partition_fields).aggregate([])

table_partitions = []
# TODO: As a next step, we could also play around with yielding instead of materializing the full list
for unique_partition in unique_partition_fields.to_pylist():
partition_key = PartitionKey(
field_values=[
Expand All @@ -2880,12 +2880,11 @@ def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.T

# The combine_chunks seems to be counter-intuitive to do, but it actually returns
# fresh buffers that don't interfere with each other when it is written out to file
table_partitions.append(
_TablePartition(partition_key=partition_key, arrow_table_partition=filtered_table.combine_chunks())
yield _TablePartition(
partition_key=partition_key,
arrow_table_partition=filtered_table.combine_chunks(),
)

return table_partitions


def _get_field_from_arrow_table(arrow_table: pa.Table, field_path: str) -> pa.Array:
"""Get a field from an Arrow table, supporting both literal field names and nested field paths.
Expand Down
17 changes: 9 additions & 8 deletions pyiceberg/table/update/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,11 @@ def _write_delete_manifest() -> List[ManifestFile]:
def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary:
from pyiceberg.table import TableProperties

# avoid copying metadata for each data file
table_metadata = self._transaction.table_metadata

partition_summary_limit = int(
self._transaction.table_metadata.properties.get(
table_metadata.properties.get(
TableProperties.WRITE_PARTITION_SUMMARY_LIMIT, TableProperties.WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT
)
)
Expand All @@ -250,23 +253,21 @@ def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary:
for data_file in self._added_data_files:
ssc.add_file(
data_file=data_file,
partition_spec=self._transaction.table_metadata.spec(),
schema=self._transaction.table_metadata.schema(),
partition_spec=table_metadata.spec(),
schema=table_metadata.schema(),
)

if len(self._deleted_data_files) > 0:
specs = self._transaction.table_metadata.specs()
specs = table_metadata.specs()
for data_file in self._deleted_data_files:
ssc.remove_file(
data_file=data_file,
partition_spec=specs[data_file.spec_id],
schema=self._transaction.table_metadata.schema(),
schema=table_metadata.schema(),
)

previous_snapshot = (
self._transaction.table_metadata.snapshot_by_id(self._parent_snapshot_id)
if self._parent_snapshot_id is not None
else None
table_metadata.snapshot_by_id(self._parent_snapshot_id) if self._parent_snapshot_id is not None else None
)

return update_snapshot_summaries(
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ mkdocstrings-python = "1.18.2"
mkdocs-literate-nav = "0.6.2"
mkdocs-autorefs = "1.4.3"
mkdocs-gen-files = "0.5.0"
mkdocs-material = "9.6.22"
mkdocs-material = "9.6.23"
mkdocs-material-extensions = "1.3.1"
mkdocs-section-index = "0.3.10"

Expand Down
78 changes: 74 additions & 4 deletions tests/io/test_pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2479,7 +2479,7 @@ def test_partition_for_demo() -> None:
PartitionField(source_id=2, field_id=1002, transform=IdentityTransform(), name="n_legs_identity"),
PartitionField(source_id=1, field_id=1001, transform=IdentityTransform(), name="year_identity"),
)
result = _determine_partitions(partition_spec, test_schema, arrow_table)
result = list(_determine_partitions(partition_spec, test_schema, arrow_table))
assert {table_partition.partition_key.partition for table_partition in result} == {
Record(2, 2020),
Record(100, 2021),
Expand Down Expand Up @@ -2518,7 +2518,7 @@ def test_partition_for_nested_field() -> None:
]

arrow_table = pa.Table.from_pylist(test_data, schema=schema.as_arrow())
partitions = _determine_partitions(spec, schema, arrow_table)
partitions = list(_determine_partitions(spec, schema, arrow_table))
partition_values = {p.partition_key.partition[0] for p in partitions}

assert partition_values == {486729, 486730}
Expand Down Expand Up @@ -2550,7 +2550,7 @@ def test_partition_for_deep_nested_field() -> None:
]

arrow_table = pa.Table.from_pylist(test_data, schema=schema.as_arrow())
partitions = _determine_partitions(spec, schema, arrow_table)
partitions = list(_determine_partitions(spec, schema, arrow_table))

assert len(partitions) == 2 # 2 unique partitions
partition_values = {p.partition_key.partition[0] for p in partitions}
Expand Down Expand Up @@ -2621,7 +2621,7 @@ def test_identity_partition_on_multi_columns() -> None:
}
arrow_table = pa.Table.from_pydict(test_data, schema=test_pa_schema)

result = _determine_partitions(partition_spec, test_schema, arrow_table)
result = list(_determine_partitions(partition_spec, test_schema, arrow_table))

assert {table_partition.partition_key.partition for table_partition in result} == expected
concatenated_arrow_table = pa.concat_tables([table_partition.arrow_table_partition for table_partition in result])
Expand Down Expand Up @@ -2846,6 +2846,7 @@ def test_task_to_record_batches_nanos(format_version: TableVersion, tmpdir: str)
FileScanTask(data_file),
bound_row_filter=AlwaysTrue(),
projected_schema=table_schema,
table_schema=table_schema,
projected_field_ids={1},
positional_deletes=None,
case_sensitive=True,
Expand Down Expand Up @@ -4590,3 +4591,72 @@ def test_orc_stripe_based_batching(tmp_path: Path) -> None:
# Verify total rows
total_rows = sum(batch.num_rows for batch in batches)
assert total_rows == 10000, f"Expected 10000 total rows, got {total_rows}"


def test_partition_column_projection_with_schema_evolution(catalog: InMemoryCatalog) -> None:
"""Test column projection on partitioned table after schema evolution (https://github.com/apache/iceberg-python/issues/2672)."""
initial_schema = Schema(
NestedField(1, "partition_date", DateType(), required=False),
NestedField(2, "id", IntegerType(), required=False),
NestedField(3, "name", StringType(), required=False),
NestedField(4, "value", IntegerType(), required=False),
)

partition_spec = PartitionSpec(
PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="partition_date"),
)

catalog.create_namespace("default")
table = catalog.create_table(
"default.test_schema_evolution_projection",
schema=initial_schema,
partition_spec=partition_spec,
)

data_v1 = pa.Table.from_pylist(
[
{"partition_date": date(2024, 1, 1), "id": 1, "name": "Alice", "value": 100},
{"partition_date": date(2024, 1, 1), "id": 2, "name": "Bob", "value": 200},
],
schema=pa.schema(
[
("partition_date", pa.date32()),
("id", pa.int32()),
("name", pa.string()),
("value", pa.int32()),
]
),
)

table.append(data_v1)

with table.update_schema() as update:
update.add_column("new_column", StringType())

table = catalog.load_table("default.test_schema_evolution_projection")

data_v2 = pa.Table.from_pylist(
[
{"partition_date": date(2024, 1, 2), "id": 3, "name": "Charlie", "value": 300, "new_column": "new1"},
{"partition_date": date(2024, 1, 2), "id": 4, "name": "David", "value": 400, "new_column": "new2"},
],
schema=pa.schema(
[
("partition_date", pa.date32()),
("id", pa.int32()),
("name", pa.string()),
("value", pa.int32()),
("new_column", pa.string()),
]
),
)

table.append(data_v2)

result = table.scan(selected_fields=("id", "name", "value", "new_column")).to_arrow()

assert set(result.schema.names) == {"id", "name", "value", "new_column"}
assert result.num_rows == 4
result_sorted = result.sort_by("name")
assert result_sorted["name"].to_pylist() == ["Alice", "Bob", "Charlie", "David"]
assert result_sorted["new_column"].to_pylist() == [None, None, "new1", "new2"]