Skip to content

Commit bdf52de

Browse files
committed
a bit more
1 parent af121b2 commit bdf52de

File tree

1 file changed

+11
-2
lines changed

1 file changed

+11
-2
lines changed

python/pyspark/sql/pandas/serializers.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2028,6 +2028,7 @@ def row_stream():
20282028
batch_key = tuple(row[s] for s in self.init_key_offsets)
20292029
yield (batch_key, None, row)
20302030

2031+
EMPTY_DATAFRAME = pd.DataFrame()
20312032
for batch_key, group_rows in groupby(row_stream(), key=lambda x: x[0]):
20322033
rows = []
20332034
init_state_rows = []
@@ -2042,11 +2043,19 @@ def row_stream():
20422043
total_len >= self.arrow_max_records_per_batch
20432044
or total_len * self.average_arrow_row_size >= self.arrow_max_bytes_per_batch
20442045
):
2045-
yield (batch_key, pd.DataFrame(rows), pd.DataFrame(init_state_rows))
2046+
yield (
2047+
batch_key,
2048+
pd.DataFrame(rows) if len(rows) > 0 else EMPTY_DATAFRAME.copy(),
2049+
pd.DataFrame(init_state_rows) if len(init_state_rows) > 0 else EMPTY_DATAFRAME.copy()
2050+
)
20462051
rows = []
20472052
init_state_rows = []
20482053
if rows or init_state_rows:
2049-
yield (batch_key, pd.DataFrame(rows), pd.DataFrame(init_state_rows))
2054+
yield (
2055+
batch_key,
2056+
pd.DataFrame(rows) if len(rows) > 0 else EMPTY_DATAFRAME.copy(),
2057+
pd.DataFrame(init_state_rows) if len(init_state_rows) > 0 else EMPTY_DATAFRAME.copy()
2058+
)
20502059

20512060
_batches = super(ArrowStreamPandasSerializer, self).load_stream(stream)
20522061
data_batches = generate_data_batches(_batches)

0 commit comments

Comments
 (0)