@@ -18,7 +18,7 @@ use rstest::{fixture, rstest};
1818use std:: {
1919 collections:: { HashMap , HashSet , VecDeque } ,
2020 sync:: {
21- atomic:: { AtomicU64 , Ordering } ,
21+ atomic:: { AtomicU64 , AtomicUsize , Ordering } ,
2222 Arc ,
2323 } ,
2424 time:: Duration ,
@@ -60,6 +60,7 @@ use temporal_sdk_core_test_utils::{fanout_tasks, start_timer_cmd, WorkerTestHelp
6060use tokio:: {
6161 join,
6262 sync:: { Barrier , Semaphore } ,
63+ time,
6364} ;
6465
6566#[ fixture( hist_batches = & [ ] ) ]
@@ -1535,8 +1536,6 @@ async fn failing_wft_doesnt_eat_permit_forever() {
15351536 . unwrap ( ) ;
15361537 }
15371538 assert_eq ! ( worker. outstanding_workflow_tasks( ) . await , 0 ) ;
1538- // 1 permit is in use because the next task is buffered and has re-used the permit
1539- assert_eq ! ( worker. available_wft_permits( ) . await , 1 ) ;
15401539 // We should be "out of work" because the mock service thinks we didn't complete the last task,
15411540 // which we didn't, because we don't spam failures. The real server would eventually time out
15421541 // the task. Mock doesn't understand that, so the WFT permit is released because eventually a
@@ -1545,7 +1544,7 @@ async fn failing_wft_doesnt_eat_permit_forever() {
15451544 outstanding_mock_tasks. unwrap ( ) . release_run ( & run_id) ;
15461545 let activation = worker. poll_workflow_activation ( ) . await . unwrap ( ) ;
15471546 // There should be no change in permits, since this just unbuffered the buffered task
1548- assert_eq ! ( worker. available_wft_permits( ) . await , 1 ) ;
1547+ assert_eq ! ( worker. available_wft_permits( ) , 1 ) ;
15491548 worker
15501549 . complete_workflow_activation ( WorkflowActivationCompletion :: from_cmd (
15511550 activation. run_id ,
@@ -1554,7 +1553,7 @@ async fn failing_wft_doesnt_eat_permit_forever() {
15541553 . await
15551554 . unwrap ( ) ;
15561555 worker. shutdown ( ) . await ;
1557- assert_eq ! ( worker. available_wft_permits( ) . await , 2 ) ;
1556+ assert_eq ! ( worker. available_wft_permits( ) , 2 ) ;
15581557}
15591558
15601559#[ tokio:: test]
@@ -1875,7 +1874,7 @@ async fn poll_faster_than_complete_wont_overflow_cache() {
18751874
18761875 join ! ( blocking_poll, complete_evict) ;
18771876 // p5 outstanding and final poll outstanding -- hence one permit available
1878- assert_eq ! ( core. available_wft_permits( ) . await , 1 ) ;
1877+ assert_eq ! ( core. available_wft_permits( ) , 1 ) ;
18791878 assert_eq ! ( core. cached_workflows( ) . await , 3 ) ;
18801879}
18811880
@@ -2588,3 +2587,72 @@ async fn history_length_with_fail_and_timeout(
25882587 . unwrap ( ) ;
25892588 worker. run_until_done ( ) . await . unwrap ( ) ;
25902589}
2590+
2591+ #[ tokio:: test]
2592+ async fn poller_wont_run_ahead_of_task_slots ( ) {
2593+ let popped_tasks = Arc :: new ( AtomicUsize :: new ( 0 ) ) ;
2594+ let ptc = popped_tasks. clone ( ) ;
2595+ let mut bunch_of_first_tasks = ( 1 ..50 ) . map ( move |i| {
2596+ ptc. fetch_add ( 1 , Ordering :: Relaxed ) ;
2597+ hist_to_poll_resp (
2598+ & canned_histories:: single_timer ( & format ! ( "{i}" ) ) ,
2599+ format ! ( "wf-{i}" ) ,
2600+ 1 . into ( ) ,
2601+ )
2602+ . resp
2603+ } ) ;
2604+ let mut mock_client = mock_workflow_client ( ) ;
2605+ mock_client
2606+ . expect_poll_workflow_task ( )
2607+ . returning ( move |_, _| Ok ( bunch_of_first_tasks. next ( ) . unwrap ( ) ) ) ;
2608+ mock_client
2609+ . expect_complete_workflow_task ( )
2610+ . returning ( |_| Ok ( Default :: default ( ) ) ) ;
2611+
2612+ let worker = Worker :: new_test (
2613+ test_worker_cfg ( )
2614+ . max_cached_workflows ( 10_usize )
2615+ . max_outstanding_workflow_tasks ( 10_usize )
2616+ . max_concurrent_wft_polls ( 10_usize )
2617+ . no_remote_activities ( true )
2618+ . build ( )
2619+ . unwrap ( ) ,
2620+ mock_client,
2621+ ) ;
2622+
2623+ // Should be able to get up to 10 tasks
2624+ let mut tasks = vec ! [ ] ;
2625+ for _ in 0 ..10 {
2626+ tasks. push ( worker. poll_workflow_activation ( ) . await . unwrap ( ) ) ;
2627+ }
2628+
2629+ assert_eq ! ( worker. outstanding_workflow_tasks( ) . await , 10 ) ;
2630+ assert_eq ! ( worker. available_wft_permits( ) , 0 ) ;
2631+ assert_eq ! ( worker. unused_wft_permits( ) , 0 ) ;
2632+
2633+ // This one should hang until we complete some tasks since we're at the limit
2634+ let hung_poll = async {
2635+ // This should end up getting shut down after the other routine finishes tasks
2636+ assert_matches ! (
2637+ worker. poll_workflow_activation( ) . await . unwrap_err( ) ,
2638+ PollWfError :: ShutDown
2639+ ) ;
2640+ } ;
2641+ // Wait for a bit concurrently with above, verify no extra tasks got taken, shutdown
2642+ let ender = async {
2643+ time:: sleep ( Duration :: from_millis ( 300 ) ) . await ;
2644+ // initiate shutdown, then complete open tasks
2645+ worker. initiate_shutdown ( ) ;
2646+ for t in tasks {
2647+ worker
2648+ . complete_workflow_activation ( WorkflowActivationCompletion :: empty ( t. run_id ) )
2649+ . await
2650+ . unwrap ( ) ;
2651+ }
2652+ worker. shutdown ( ) . await ;
2653+ } ;
2654+ join ! ( hung_poll, ender) ;
2655+ // We shouldn't have got more than the 10 tasks from the poller -- verifying that the concurrent
2656+ // polling is not exceeding the task limit
2657+ assert_eq ! ( popped_tasks. load( Ordering :: Relaxed ) , 10 ) ;
2658+ }
0 commit comments