Skip to content

Commit 5ce5749

Browse files
committed
add use_content_defined_chunking=True
1 parent dadbd16 commit 5ce5749

File tree

1 file changed

+13
-3
lines changed

1 file changed

+13
-3
lines changed

pyspark_huggingface/huggingface_sink.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
logger = logging.getLogger(__name__)
2323

24+
2425
class HuggingFaceSink(DataSource):
2526
"""
2627
A DataSource for writing Spark DataFrames to HuggingFace Datasets.
@@ -125,8 +126,9 @@ def __init__(
125126
token: str,
126127
endpoint: Optional[str] = None,
127128
row_group_size: Optional[int] = None,
128-
max_bytes_per_file=500_000_000,
129-
max_operations_per_commit=100,
129+
max_bytes_per_file: int = 500_000_000,
130+
max_operations_per_commit: int = 100,
131+
use_content_defined_chunking: bool = True,
130132
**kwargs,
131133
):
132134
import uuid
@@ -144,6 +146,7 @@ def __init__(
144146
self.row_group_size = row_group_size
145147
self.max_bytes_per_file = max_bytes_per_file
146148
self.max_operations_per_commit = max_operations_per_commit
149+
self.use_content_defined_chunking = use_content_defined_chunking
147150
self.kwargs = kwargs
148151

149152
# Use a unique filename prefix to avoid conflicts with existing files
@@ -232,7 +235,14 @@ def flush(writer: pq.ParquetWriter):
232235
Limiting the size is necessary because we are writing them in memory.
233236
"""
234237
while True:
235-
with pq.ParquetWriter(parquet, schema, **self.kwargs) as writer:
238+
with pq.ParquetWriter(
239+
parquet,
240+
schema=schema,
241+
**{
242+
"use_content_defined_chunking": self.use_content_defined_chunking,
243+
**self.kwargs
244+
}
245+
) as writer:
236246
num_batches = 0
237247
for batch in iterator: # Start iterating from where we left off
238248
writer.write_batch(batch, row_group_size=self.row_group_size)

0 commit comments

Comments
 (0)