@@ -1454,12 +1454,35 @@ def _wait_until_data_rows_are_processed(
14541454 """ Wait until all the specified data rows are processed"""
14551455 start_time = datetime .now ()
14561456
1457+ max_data_rows_per_poll = 100_000
1458+ if data_row_ids is not None :
1459+ for i in range (0 , len (data_row_ids ), max_data_rows_per_poll ):
1460+ chunk = data_row_ids [i :i + max_data_rows_per_poll ]
1461+ self ._poll_data_row_processing_status (
1462+ chunk , [], start_time , wait_processing_max_seconds ,
1463+ sleep_interval )
1464+
1465+ if global_keys is not None :
1466+ for i in range (0 , len (global_keys ), max_data_rows_per_poll ):
1467+ chunk = global_keys [i :i + max_data_rows_per_poll ]
1468+ self ._poll_data_row_processing_status (
1469+ [], chunk , start_time , wait_processing_max_seconds ,
1470+ sleep_interval )
1471+
1472+ def _poll_data_row_processing_status (
1473+ self ,
1474+ data_row_ids : List [str ],
1475+ global_keys : List [str ],
1476+ start_time : datetime ,
1477+ wait_processing_max_seconds : int = _wait_processing_max_seconds ,
1478+ sleep_interval = 30 ):
1479+
14571480 while True :
14581481 if (datetime .now () -
14591482 start_time ).total_seconds () >= wait_processing_max_seconds :
14601483 raise ProcessingWaitTimeout (
1461- "Maximum wait time exceeded while waiting for data rows to be processed. Try creating a batch a bit later"
1462- )
1484+ """ Maximum wait time exceeded while waiting for data rows to be processed.
1485+ Try creating a batch a bit later""" )
14631486
14641487 all_good = self .__check_data_rows_have_been_processed (
14651488 data_row_ids , global_keys )
0 commit comments