@@ -84,6 +84,7 @@ pub struct HistoryPaginator {
8484 pub ( crate ) run_id : String ,
8585 pub ( crate ) previous_wft_started_id : i64 ,
8686 pub ( crate ) wft_started_event_id : i64 ,
87+ id_of_last_event_in_last_extracted_update : Option < i64 > ,
8788
8889 #[ cfg_attr( feature = "save_wf_inputs" , serde( skip) ) ]
8990 client : Arc < dyn WorkerClient > ,
@@ -175,6 +176,7 @@ impl HistoryPaginator {
175176 run_id : req. original_wft . work . execution . run_id . clone ( ) ,
176177 previous_wft_started_id : req. original_wft . work . update . previous_wft_started_id ,
177178 wft_started_event_id : req. original_wft . work . update . wft_started_id ,
179+ id_of_last_event_in_last_extracted_update : None ,
178180 client,
179181 event_queue : Default :: default ( ) ,
180182 next_page_token : NextPageToken :: FetchFromStart ,
@@ -211,6 +213,7 @@ impl HistoryPaginator {
211213 final_events,
212214 previous_wft_started_id,
213215 wft_started_event_id,
216+ id_of_last_event_in_last_extracted_update : None ,
214217 }
215218 }
216219
@@ -226,6 +229,7 @@ impl HistoryPaginator {
226229 final_events : vec ! [ ] ,
227230 previous_wft_started_id : -2 ,
228231 wft_started_event_id : -2 ,
232+ id_of_last_event_in_last_extracted_update : None ,
229233 }
230234 }
231235
@@ -240,14 +244,45 @@ impl HistoryPaginator {
240244 /// we have two, or until we are at the end of history.
241245 pub ( crate ) async fn extract_next_update ( & mut self ) -> Result < HistoryUpdate , tonic:: Status > {
242246 loop {
243- let no_next_page = !self . get_next_page ( ) . await ?;
247+ let fetch_happened = !self . get_next_page ( ) . await ?;
244248 let current_events = mem:: take ( & mut self . event_queue ) ;
245249 let seen_enough_events = current_events
246250 . back ( )
247251 . map ( |e| e. event_id )
248252 . unwrap_or_default ( )
249253 >= self . wft_started_event_id ;
250- if current_events. is_empty ( ) || ( no_next_page && !seen_enough_events) {
254+
255+ // This handles a special case where the server might send us a page token along with
256+ // a real page which ends at the current end of history. The page token then points to
257+ // en empty page. We need to detect this, and consider it the end of history.
258+ //
259+ // This case unfortunately cannot be handled earlier, because we might fetch a page
260+ // from the server which contains two complete WFTs, and thus we are happy to return
261+ // an update at that time. But, if the page has a next page token, we *cannot* conclude
262+ // we are done with replay until we fetch that page. So, we have to wait until the next
263+ // extraction to determine (after fetching the next page and finding it to be empty)
264+ // that we are done. Fetching the page eagerly is another option, but would be wasteful
265+ // the overwhelming majority of the time.
266+ let already_sent_update_with_enough_events = self
267+ . id_of_last_event_in_last_extracted_update
268+ . unwrap_or_default ( )
269+ >= self . wft_started_event_id ;
270+ if current_events. is_empty ( )
271+ && !fetch_happened
272+ && already_sent_update_with_enough_events
273+ {
274+ // We must return an empty update which also says is contains the final WFT so we
275+ // know we're done with replay.
276+ return Ok ( HistoryUpdate :: from_events (
277+ [ ] ,
278+ self . previous_wft_started_id ,
279+ self . wft_started_event_id ,
280+ true ,
281+ )
282+ . 0 ) ;
283+ }
284+
285+ if current_events. is_empty ( ) || ( fetch_happened && !seen_enough_events) {
251286 // If next page fetching happened, and we still ended up with no or insufficient
252287 // events, something is wrong. We're expecting there to be more events to be able to
253288 // extract this update, but server isn't giving us any. We have no choice except to
@@ -278,6 +313,8 @@ impl HistoryPaginator {
278313 // There was not a meaningful WFT in the whole page. We must fetch more.
279314 continue ;
280315 }
316+ self . id_of_last_event_in_last_extracted_update =
317+ update. events . last ( ) . map ( |e| e. event_id ) ;
281318 return Ok ( update) ;
282319 }
283320 }
@@ -1168,4 +1205,51 @@ pub mod tests {
11681205
11691206 // TODO: Test we dont re-feed pointless updates if fetching returns <= events we already
11701207 // processed
1208+
1209+ #[ tokio:: test]
1210+ async fn handles_fetching_page_with_complete_wft_and_page_token_to_empty_page ( ) {
1211+ let timer_hist = canned_histories:: single_timer ( "t" ) ;
1212+ let workflow_task = timer_hist. get_full_history_info ( ) . unwrap ( ) ;
1213+ let prev_started_wft_id = workflow_task. previous_started_event_id ( ) ;
1214+ let wft_started_id = workflow_task. workflow_task_started_event_id ( ) ;
1215+
1216+ let mut full_resp_with_npt: GetWorkflowExecutionHistoryResponse =
1217+ timer_hist. get_full_history_info ( ) . unwrap ( ) . into ( ) ;
1218+ full_resp_with_npt. next_page_token = vec ! [ 1 ] ;
1219+
1220+ let mut mock_client = mock_workflow_client ( ) ;
1221+ mock_client
1222+ . expect_get_workflow_execution_history ( )
1223+ . returning ( move |_, _, _| Ok ( full_resp_with_npt. clone ( ) ) )
1224+ . times ( 1 ) ;
1225+ mock_client
1226+ . expect_get_workflow_execution_history ( )
1227+ . returning ( move |_, _, _| {
1228+ Ok ( GetWorkflowExecutionHistoryResponse {
1229+ history : Some ( History { events : vec ! [ ] } ) ,
1230+ raw_history : vec ! [ ] ,
1231+ next_page_token : vec ! [ ] ,
1232+ archived : false ,
1233+ } )
1234+ } )
1235+ . times ( 1 ) ;
1236+
1237+ let mut paginator = HistoryPaginator :: new (
1238+ workflow_task. into ( ) ,
1239+ prev_started_wft_id,
1240+ wft_started_id,
1241+ "wfid" . to_string ( ) ,
1242+ "runid" . to_string ( ) ,
1243+ NextPageToken :: FetchFromStart ,
1244+ Arc :: new ( mock_client) ,
1245+ ) ;
1246+ let mut update = paginator. extract_next_update ( ) . await . unwrap ( ) ;
1247+ let seq = update. take_next_wft_sequence ( 0 ) . unwrap_events ( ) ;
1248+ assert_eq ! ( seq. last( ) . unwrap( ) . event_id, 3 ) ;
1249+ let seq = update. take_next_wft_sequence ( 3 ) . unwrap_events ( ) ;
1250+ assert_eq ! ( seq. last( ) . unwrap( ) . event_id, 8 ) ;
1251+ assert_matches ! ( update. take_next_wft_sequence( 8 ) , NextWFT :: NeedFetch ) ;
1252+ let mut update = paginator. extract_next_update ( ) . await . unwrap ( ) ;
1253+ assert_matches ! ( update. take_next_wft_sequence( 8 ) , NextWFT :: ReplayOver ) ;
1254+ }
11711255}
0 commit comments