From 436389a67bd2cbfd97deb0d87c6e4cb0feef3e4c Mon Sep 17 00:00:00 2001 From: tom Date: Fri, 31 Oct 2025 16:48:27 -0600 Subject: [PATCH 1/2] do not materialize entire files in a batch reader --- pyiceberg/io/pyarrow.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index e42c130779..78c6a0c50d 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1726,16 +1726,11 @@ def to_record_batches(self, tasks: Iterable[FileScanTask]) -> Iterator[pa.Record deletes_per_file = _read_all_delete_files(self._io, tasks) total_row_count = 0 - executor = ExecutorFactory.get_or_create() - def batches_for_task(task: FileScanTask) -> List[pa.RecordBatch]: - # Materialize the iterator here to ensure execution happens within the executor. - # Otherwise, the iterator would be lazily consumed later (in the main thread), - # defeating the purpose of using executor.map. - return list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file)) limit_reached = False - for batches in executor.map(batches_for_task, tasks): + for task in tasks: + batches = self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file) for batch in batches: current_batch_size = len(batch) if self._limit is not None and total_row_count + current_batch_size >= self._limit: From c8b76875bad3b81dc8e9b4ee9ec9c459b3babd8a Mon Sep 17 00:00:00 2001 From: tom Date: Fri, 31 Oct 2025 17:33:19 -0600 Subject: [PATCH 2/2] ruff reformat --- pyiceberg/io/pyarrow.py | 1 - 1 file changed, 1 deletion(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 78c6a0c50d..d5524ae2e8 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1727,7 +1727,6 @@ def to_record_batches(self, tasks: Iterable[FileScanTask]) -> Iterator[pa.Record total_row_count = 0 - limit_reached = False for task in tasks: batches = self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file)