@@ -176,11 +176,14 @@ impl HistoryPaginator {
176176 run_id : req. original_wft . work . execution . run_id . clone ( ) ,
177177 previous_wft_started_id : req. original_wft . work . update . previous_wft_started_id ,
178178 wft_started_event_id : req. original_wft . work . update . wft_started_id ,
179- id_of_last_event_in_last_extracted_update : None ,
179+ id_of_last_event_in_last_extracted_update : req
180+ . original_wft
181+ . paginator
182+ . id_of_last_event_in_last_extracted_update ,
180183 client,
181184 event_queue : Default :: default ( ) ,
182185 next_page_token : NextPageToken :: FetchFromStart ,
183- final_events : vec ! [ ] ,
186+ final_events : req . original_wft . work . update . events ,
184187 } ;
185188 let first_update = paginator. extract_next_update ( ) . await ?;
186189 req. original_wft . work . update = first_update;
@@ -244,7 +247,7 @@ impl HistoryPaginator {
244247 /// we have two, or until we are at the end of history.
245248 pub ( crate ) async fn extract_next_update ( & mut self ) -> Result < HistoryUpdate , tonic:: Status > {
246249 loop {
247- let fetch_happened = !self . get_next_page ( ) . await ?;
250+ let no_next_page = !self . get_next_page ( ) . await ?;
248251 let current_events = mem:: take ( & mut self . event_queue ) ;
249252 let seen_enough_events = current_events
250253 . back ( )
@@ -267,10 +270,7 @@ impl HistoryPaginator {
267270 . id_of_last_event_in_last_extracted_update
268271 . unwrap_or_default ( )
269272 >= self . wft_started_event_id ;
270- if current_events. is_empty ( )
271- && !fetch_happened
272- && already_sent_update_with_enough_events
273- {
273+ if current_events. is_empty ( ) && no_next_page && already_sent_update_with_enough_events {
274274 // We must return an empty update which also says is contains the final WFT so we
275275 // know we're done with replay.
276276 return Ok ( HistoryUpdate :: from_events (
@@ -282,12 +282,15 @@ impl HistoryPaginator {
282282 . 0 ) ;
283283 }
284284
285- if current_events. is_empty ( ) || ( fetch_happened && !seen_enough_events) {
285+ if current_events. is_empty ( ) || ( no_next_page && !seen_enough_events) {
286286 // If next page fetching happened, and we still ended up with no or insufficient
287287 // events, something is wrong. We're expecting there to be more events to be able to
288288 // extract this update, but server isn't giving us any. We have no choice except to
289289 // give up and evict.
290290 error ! (
291+ current_events=?current_events,
292+ no_next_page,
293+ seen_enough_events,
291294 "We expected to be able to fetch more events but server says there are none"
292295 ) ;
293296 return Err ( EMPTY_FETCH_ERR . clone ( ) ) ;
@@ -319,13 +322,13 @@ impl HistoryPaginator {
319322 }
320323 }
321324
322- /// Fetches the next page and adds it to the internal queue. Returns true if a fetch was
323- /// performed, false if there is no next page.
325+ /// Fetches the next page and adds it to the internal queue.
326+ /// Returns true if we still have a next page token after fetching .
324327 async fn get_next_page ( & mut self ) -> Result < bool , tonic:: Status > {
325328 let history = loop {
326329 let npt = match mem:: replace ( & mut self . next_page_token , NextPageToken :: Done ) {
327330 // If the last page token we got was empty, we're done.
328- NextPageToken :: Done => return Ok ( false ) ,
331+ NextPageToken :: Done => break None ,
329332 NextPageToken :: FetchFromStart => vec ! [ ] ,
330333 NextPageToken :: Next ( v) => v,
331334 } ;
@@ -366,7 +369,7 @@ impl HistoryPaginator {
366369 ) ;
367370 }
368371 } ;
369- Ok ( true )
372+ Ok ( ! matches ! ( & self . next_page_token , NextPageToken :: Done ) )
370373 }
371374}
372375
@@ -715,11 +718,21 @@ pub mod tests {
715718 use super :: * ;
716719 use crate :: {
717720 replay:: { HistoryInfo , TestHistoryBuilder } ,
718- test_help:: canned_histories,
721+ test_help:: { canned_histories, mock_sdk_cfg , MockPollCfg , ResponseType } ,
719722 worker:: client:: mocks:: mock_workflow_client,
720723 } ;
724+ use futures:: StreamExt ;
721725 use futures_util:: TryStreamExt ;
722- use temporal_sdk_core_protos:: temporal:: api:: workflowservice:: v1:: GetWorkflowExecutionHistoryResponse ;
726+ use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
727+ use temporal_client:: WorkflowOptions ;
728+ use temporal_sdk:: WfContext ;
729+ use temporal_sdk_core_protos:: {
730+ temporal:: api:: {
731+ common:: v1:: WorkflowExecution , enums:: v1:: WorkflowTaskFailedCause ,
732+ workflowservice:: v1:: GetWorkflowExecutionHistoryResponse ,
733+ } ,
734+ DEFAULT_WORKFLOW_TYPE ,
735+ } ;
723736
724737 impl From < HistoryInfo > for HistoryUpdate {
725738 fn from ( v : HistoryInfo ) -> Self {
@@ -1252,4 +1265,123 @@ pub mod tests {
12521265 let mut update = paginator. extract_next_update ( ) . await . unwrap ( ) ;
12531266 assert_matches ! ( update. take_next_wft_sequence( 8 ) , NextWFT :: ReplayOver ) ;
12541267 }
1268+
1269+ #[ tokio:: test]
1270+ async fn weird_pagination_doesnt_drop_wft_events ( ) {
1271+ crate :: telemetry:: test_telem_console ( ) ;
1272+ let wf_id = "fakeid" ;
1273+ // 1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED
1274+ // 2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
1275+ // 3: EVENT_TYPE_WORKFLOW_TASK_STARTED
1276+ // 4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED
1277+ // empty page
1278+ // 5: EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED
1279+ // 6: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
1280+ // 7: EVENT_TYPE_WORKFLOW_TASK_STARTED
1281+ // 8: EVENT_TYPE_WORKFLOW_TASK_FAILED
1282+ // empty page
1283+ // 9: EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED
1284+ // 10: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
1285+ // 11: EVENT_TYPE_WORKFLOW_TASK_STARTED
1286+ // empty page
1287+ let mut t = TestHistoryBuilder :: default ( ) ;
1288+ t. add_by_type ( EventType :: WorkflowExecutionStarted ) ;
1289+ t. add_full_wf_task ( ) ;
1290+
1291+ t. add_we_signaled ( "hi" , vec ! [ ] ) ;
1292+ t. add_workflow_task_scheduled_and_started ( ) ;
1293+ t. add_workflow_task_failed_with_failure (
1294+ WorkflowTaskFailedCause :: UnhandledCommand ,
1295+ Default :: default ( ) ,
1296+ ) ;
1297+
1298+ t. add_we_signaled ( "hi" , vec ! [ ] ) ;
1299+ t. add_workflow_task_scheduled_and_started ( ) ;
1300+
1301+ let workflow_task = t. get_full_history_info ( ) . unwrap ( ) ;
1302+ let mut wft_resp = workflow_task. as_poll_wft_response ( ) ;
1303+ wft_resp. workflow_execution = Some ( WorkflowExecution {
1304+ workflow_id : wf_id. to_string ( ) ,
1305+ run_id : t. get_orig_run_id ( ) . to_string ( ) ,
1306+ } ) ;
1307+ // Just 9/10/11 in WFT
1308+ wft_resp. history . as_mut ( ) . unwrap ( ) . events . drain ( 0 ..8 ) ;
1309+
1310+ let mut resp_1: GetWorkflowExecutionHistoryResponse =
1311+ t. get_full_history_info ( ) . unwrap ( ) . into ( ) ;
1312+ resp_1. next_page_token = vec ! [ 1 ] ;
1313+ resp_1. history . as_mut ( ) . unwrap ( ) . events . truncate ( 4 ) ;
1314+
1315+ let mut mock_client = mock_workflow_client ( ) ;
1316+ mock_client
1317+ . expect_get_workflow_execution_history ( )
1318+ . returning ( move |_, _, _| Ok ( resp_1. clone ( ) ) )
1319+ . times ( 1 ) ;
1320+ mock_client
1321+ . expect_get_workflow_execution_history ( )
1322+ . returning ( move |_, _, _| {
1323+ Ok ( GetWorkflowExecutionHistoryResponse {
1324+ history : Some ( History { events : vec ! [ ] } ) ,
1325+ raw_history : vec ! [ ] ,
1326+ next_page_token : vec ! [ 2 ] ,
1327+ archived : false ,
1328+ } )
1329+ } )
1330+ . times ( 1 ) ;
1331+ let mut resp_2: GetWorkflowExecutionHistoryResponse =
1332+ t. get_full_history_info ( ) . unwrap ( ) . into ( ) ;
1333+ resp_2. next_page_token = vec ! [ 3 ] ;
1334+ resp_2. history . as_mut ( ) . unwrap ( ) . events . drain ( 0 ..4 ) ;
1335+ resp_2. history . as_mut ( ) . unwrap ( ) . events . truncate ( 4 ) ;
1336+ mock_client
1337+ . expect_get_workflow_execution_history ( )
1338+ . returning ( move |_, _, _| Ok ( resp_2. clone ( ) ) )
1339+ . times ( 1 ) ;
1340+ mock_client
1341+ . expect_get_workflow_execution_history ( )
1342+ . returning ( move |_, _, _| {
1343+ Ok ( GetWorkflowExecutionHistoryResponse {
1344+ history : Some ( History { events : vec ! [ ] } ) ,
1345+ raw_history : vec ! [ ] ,
1346+ next_page_token : vec ! [ ] ,
1347+ archived : false ,
1348+ } )
1349+ } )
1350+ . times ( 1 ) ;
1351+
1352+ let wf_type = DEFAULT_WORKFLOW_TYPE ;
1353+ let mh =
1354+ MockPollCfg :: from_resp_batches ( wf_id, t, [ ResponseType :: Raw ( wft_resp) ] , mock_client) ;
1355+ let mut worker = mock_sdk_cfg ( mh, |cfg| {
1356+ cfg. max_cached_workflows = 2 ;
1357+ cfg. ignore_evicts_on_shutdown = false ;
1358+ } ) ;
1359+
1360+ let sig_ctr = Arc :: new ( AtomicUsize :: new ( 0 ) ) ;
1361+ let sig_ctr_clone = sig_ctr. clone ( ) ;
1362+ worker. register_wf ( wf_type. to_owned ( ) , move |ctx : WfContext | {
1363+ let sig_ctr_clone = sig_ctr_clone. clone ( ) ;
1364+ async move {
1365+ let mut sigchan = ctx. make_signal_channel ( "hi" ) ;
1366+ while sigchan. next ( ) . await . is_some ( ) {
1367+ if sig_ctr_clone. fetch_add ( 1 , Ordering :: AcqRel ) == 1 {
1368+ break ;
1369+ }
1370+ }
1371+ Ok ( ( ) . into ( ) )
1372+ }
1373+ } ) ;
1374+
1375+ worker
1376+ . submit_wf (
1377+ wf_id. to_owned ( ) ,
1378+ wf_type. to_owned ( ) ,
1379+ vec ! [ ] ,
1380+ WorkflowOptions :: default ( ) ,
1381+ )
1382+ . await
1383+ . unwrap ( ) ;
1384+ worker. run_until_done ( ) . await . unwrap ( ) ;
1385+ assert_eq ! ( sig_ctr. load( Ordering :: Acquire ) , 2 ) ;
1386+ }
12551387}
0 commit comments