@@ -896,10 +896,7 @@ async fn workflow_failures_only_reported_once() {
896896#[ tokio:: test]
897897async fn max_wft_respected ( ) {
898898 let total_wfs = 100 ;
899- let wf_ids: Vec < _ > = ( 0 ..total_wfs)
900- . into_iter ( )
901- . map ( |i| format ! ( "fake-wf-{i}" ) )
902- . collect ( ) ;
899+ let wf_ids: Vec < _ > = ( 0 ..total_wfs) . map ( |i| format ! ( "fake-wf-{i}" ) ) . collect ( ) ;
903900 let hists = wf_ids. iter ( ) . map ( |wf_id| {
904901 let hist = canned_histories:: single_timer ( "1" ) ;
905902 FakeWfResponses {
@@ -1027,7 +1024,7 @@ async fn activity_not_canceled_when_also_completed_repro(hist_batches: &'static
10271024#[ tokio:: test]
10281025async fn lots_of_workflows ( ) {
10291026 let total_wfs = 500 ;
1030- let hists = ( 0 ..total_wfs) . into_iter ( ) . map ( |i| {
1027+ let hists = ( 0 ..total_wfs) . map ( |i| {
10311028 let wf_id = format ! ( "fake-wf-{i}" ) ;
10321029 let hist = canned_histories:: single_timer ( "1" ) ;
10331030 FakeWfResponses {
@@ -1705,9 +1702,7 @@ async fn pagination_works_with_tasks_from_completion() {
17051702 t. add_by_type ( EventType :: WorkflowExecutionStarted ) ;
17061703 t. add_full_wf_task ( ) ;
17071704 t. add_we_signaled ( "sig" , vec ! [ ] ) ;
1708- t. add_full_wf_task ( ) ;
1709- t. add_workflow_execution_completed ( ) ;
1710- let get_exec_resp: GetWorkflowExecutionHistoryResponse = t. get_history_info ( 2 ) . unwrap ( ) . into ( ) ;
1705+ t. add_workflow_task_scheduled_and_started ( ) ;
17111706
17121707 let mut mock = mock_workflow_client ( ) ;
17131708 let mut needs_pag_resp = hist_to_poll_resp ( & t, wfid. to_owned ( ) , ResponseType :: OneTask ( 2 ) ) . resp ;
@@ -1722,9 +1717,13 @@ async fn pagination_works_with_tasks_from_completion() {
17221717 mock. expect_complete_workflow_task ( )
17231718 . times ( 1 )
17241719 . returning ( |_| Ok ( Default :: default ( ) ) ) ;
1720+
1721+ let get_exec_resp: GetWorkflowExecutionHistoryResponse =
1722+ t. get_full_history_info ( ) . unwrap ( ) . into ( ) ;
17251723 mock. expect_get_workflow_execution_history ( )
17261724 . returning ( move |_, _, _| Ok ( get_exec_resp. clone ( ) ) )
17271725 . times ( 1 ) ;
1726+
17281727 let mut mock = single_hist_mock_sg ( wfid, t, [ 1 ] , mock, true ) ;
17291728 mock. worker_cfg ( |wc| wc. max_cached_workflows = 2 ) ;
17301729 let core = mock_worker ( mock) ;
@@ -2162,23 +2161,15 @@ async fn fetching_to_continue_replay_works() {
21622161 t. add_full_wf_task ( ) ; // end 14
21632162 let mut fetch_resp: GetWorkflowExecutionHistoryResponse =
21642163 t. get_full_history_info ( ) . unwrap ( ) . into ( ) ;
2165- // Should only contain events after 7
2166- if let Some ( ref mut h) = fetch_resp. history {
2167- h. events . retain ( |e| e. event_id >= 8 ) ;
2168- }
21692164 // And indicate that even *more* needs to be fetched after this, so we see a request for the
21702165 // next page happen.
21712166 fetch_resp. next_page_token = vec ! [ 2 ] ;
21722167
21732168 let timer_started_event_id = t. add_by_type ( EventType :: TimerStarted ) ;
21742169 t. add_timer_fired ( timer_started_event_id, "1" . to_string ( ) ) ;
21752170 t. add_full_wf_task ( ) ;
2176- let mut final_fetch_resp: GetWorkflowExecutionHistoryResponse =
2171+ let final_fetch_resp: GetWorkflowExecutionHistoryResponse =
21772172 t. get_full_history_info ( ) . unwrap ( ) . into ( ) ;
2178- // Should have only the final event
2179- if let Some ( ref mut h) = final_fetch_resp. history {
2180- h. events . retain ( |e| e. event_id >= 15 ) ;
2181- }
21822173
21832174 let tasks = vec ! [
21842175 ResponseType :: ToTaskNum ( 1 ) ,
@@ -2273,15 +2264,25 @@ async fn ensure_fetching_fail_during_complete_sends_task_failure() {
22732264 t. add_full_wf_task ( ) ; // started 3
22742265 t. add_we_signaled ( "sig1" , vec ! [ ] ) ;
22752266 t. add_full_wf_task ( ) ; // started 7
2276- t. add_we_signaled ( "sig2" , vec ! [ ] ) ;
2267+
2268+ // Need a command event after here so the paginator will know it has two complete WFTs and
2269+ // processing can begin before needing to fetch again
2270+ t. add_by_type ( EventType :: TimerStarted ) ;
22772271 t. add_full_wf_task ( ) ; // started 11
22782272 t. add_workflow_execution_completed ( ) ;
22792273
2280- let mut first_poll = hist_to_poll_resp ( & t, wfid, ResponseType :: ToTaskNum ( 1 ) ) . resp ;
2281- first_poll. next_page_token = vec ! [ 1 ] ;
2282- first_poll. previous_started_event_id = 3 ;
2274+ let mut first_poll = hist_to_poll_resp ( & t, wfid, ResponseType :: OneTask ( 4 ) ) . resp ;
2275+ // History is partial so fetch will happen. We have to lie here and make up a previous started
2276+ // which really makes no sense, otherwise the paginator eagerly fetches and will fail before we
2277+ // ever start anything -- which is good -- but this test wants to make sure a fetching failure
2278+ // during a completion is handled correctly. That may no longer actually be a thing that can
2279+ // happen.
2280+ first_poll. previous_started_event_id = 0 ;
2281+ first_poll. started_event_id = 11 ;
22832282
2284- let mut next_page: GetWorkflowExecutionHistoryResponse = t. get_history_info ( 2 ) . unwrap ( ) . into ( ) ;
2283+ let mut next_page: GetWorkflowExecutionHistoryResponse =
2284+ t. get_full_history_info ( ) . unwrap ( ) . into ( ) ;
2285+ next_page. history . as_mut ( ) . unwrap ( ) . events . truncate ( 9 ) ;
22852286 next_page. next_page_token = vec ! [ 2 ] ;
22862287
22872288 let mut mock = mock_workflow_client ( ) ;
@@ -2291,9 +2292,6 @@ async fn ensure_fetching_fail_during_complete_sends_task_failure() {
22912292 Ok ( next_page. clone ( ) )
22922293 } )
22932294 . times ( 1 ) ;
2294- let mut really_empty_fetch_resp: GetWorkflowExecutionHistoryResponse =
2295- t. get_history_info ( 1 ) . unwrap ( ) . into ( ) ;
2296- really_empty_fetch_resp. history = Some ( Default :: default ( ) ) ;
22972295 mock. expect_get_workflow_execution_history ( )
22982296 . returning ( move |_, _, _| {
22992297 error ! ( "Called fetch second time!" ) ;
@@ -2314,24 +2312,13 @@ async fn ensure_fetching_fail_during_complete_sends_task_failure() {
23142312 . await
23152313 . unwrap ( ) ;
23162314
2317- let wf_task = core. poll_workflow_activation ( ) . await . unwrap ( ) ;
2318- assert_matches ! (
2319- wf_task. jobs. as_slice( ) ,
2320- [ WorkflowActivationJob {
2321- variant: Some ( workflow_activation_job:: Variant :: SignalWorkflow ( _) ) ,
2322- } , ]
2323- ) ;
2324- core. complete_workflow_activation ( WorkflowActivationCompletion :: empty ( wf_task. run_id ) )
2325- . await
2326- . unwrap ( ) ;
2327-
23282315 // Expect to see eviction b/c of history fetching error here.
23292316 let wf_task = core. poll_workflow_activation ( ) . await . unwrap ( ) ;
23302317 assert_matches ! (
23312318 wf_task. jobs. as_slice( ) ,
23322319 [ WorkflowActivationJob {
2333- variant: Some ( workflow_activation_job:: Variant :: RemoveFromCache ( _ ) ) ,
2334- } , ]
2320+ variant: Some ( workflow_activation_job:: Variant :: RemoveFromCache ( c ) ) ,
2321+ } ] if c . message . contains ( "Fetching history" )
23352322 ) ;
23362323
23372324 core. shutdown ( ) . await ;
@@ -2401,7 +2388,6 @@ async fn core_internal_flags() {
24012388 . copied( )
24022389 . collect:: <HashSet <_>>( ) ,
24032390 CoreInternalFlags :: all_except_too_high( )
2404- . into_iter( )
24052391 . map( |f| f as u32 )
24062392 . collect( )
24072393 ) ;
0 commit comments