2020
2121package io .temporal .workflow .activityTests ;
2222
23+ import io .temporal .activity .LocalActivityOptions ;
2324import io .temporal .client .WorkflowOptions ;
2425import io .temporal .testing .internal .SDKTestOptions ;
2526import io .temporal .testing .internal .SDKTestWorkflowRule ;
3839import org .junit .Assert ;
3940import org .junit .Rule ;
4041import org .junit .Test ;
42+ import org .junit .runner .RunWith ;
43+ import org .junit .runners .Parameterized ;
4144
45+ @ RunWith (Parameterized .class )
4246public class ParallelLocalActivitiesTest {
4347
4448 private final TestActivitiesImpl activitiesImpl = new TestActivitiesImpl ();
49+ static final int TOTAL_LOCAL_ACT_COUNT = 100 ;
50+ @ Parameterized .Parameter public int maxLocalActivityExecutionSize ;
51+
52+ @ Parameterized .Parameters
53+ public static Object [] data () {
54+ return new Object [] {50 , TOTAL_LOCAL_ACT_COUNT };
55+ }
4556
4657 @ Rule
4758 public SDKTestWorkflowRule testWorkflowRule =
@@ -52,7 +63,9 @@ public class ParallelLocalActivitiesTest {
5263 // Use a number lower than the number of concurrent activities to ensure that the
5364 // queueing of LAs when task executor is full works
5465 .setWorkerOptions (
55- WorkerOptions .newBuilder ().setMaxConcurrentLocalActivityExecutionSize (50 ).build ())
66+ WorkerOptions .newBuilder ()
67+ .setMaxConcurrentLocalActivityExecutionSize (maxLocalActivityExecutionSize )
68+ .build ())
5669 .build ();
5770
5871 @ Test
@@ -66,16 +79,18 @@ public void testParallelLocalActivities() {
6679
6780 TestWorkflow1 workflowStub =
6881 testWorkflowRule .getWorkflowClient ().newWorkflowStub (TestWorkflow1 .class , options );
69- String result = workflowStub .execute (testWorkflowRule .getTaskQueue ());
82+ String willQueue = maxLocalActivityExecutionSize < TOTAL_LOCAL_ACT_COUNT ? "yes" : "" ;
83+ String result = workflowStub .execute (willQueue );
7084 Assert .assertEquals ("done" , result );
71- Assert .assertEquals (activitiesImpl .toString (), 100 , activitiesImpl .invocations .size ());
72- List <String > expected = new ArrayList <String >();
85+ Assert .assertEquals (
86+ activitiesImpl .toString (), TOTAL_LOCAL_ACT_COUNT , activitiesImpl .invocations .size ());
87+ List <String > expected = new ArrayList <>();
7388 expected .add ("interceptExecuteWorkflow " + SDKTestWorkflowRule .UUID_REGEXP );
7489 expected .add ("newThread workflow-method" );
75- for (int i = 0 ; i < TestParallelLocalActivitiesWorkflowImpl . COUNT ; i ++) {
90+ for (int i = 0 ; i < TOTAL_LOCAL_ACT_COUNT ; i ++) {
7691 expected .add ("executeLocalActivity SleepActivity" );
7792 }
78- for (int i = 0 ; i < TestParallelLocalActivitiesWorkflowImpl . COUNT ; i ++) {
93+ for (int i = 0 ; i < TOTAL_LOCAL_ACT_COUNT ; i ++) {
7994 expected .add ("local activity SleepActivity" );
8095 }
8196 testWorkflowRule
@@ -84,16 +99,21 @@ public void testParallelLocalActivities() {
8499 }
85100
86101 public static class TestParallelLocalActivitiesWorkflowImpl implements TestWorkflow1 {
87- static final int COUNT = 100 ;
88102
89103 @ Override
90- public String execute (String taskQueue ) {
104+ public String execute (String willQueue ) {
105+ LocalActivityOptions laOptions = SDKTestOptions .newLocalActivityOptions ();
106+ // For the case where LAs will be forced to queue, we want to use start-to-close rather
107+ // than schedule-to-start timeouts.
108+ if (!willQueue .isEmpty ()) {
109+ laOptions =
110+ LocalActivityOptions .newBuilder ().setStartToCloseTimeout (Duration .ofSeconds (5 )).build ();
111+ }
91112 VariousTestActivities localActivities =
92- Workflow .newLocalActivityStub (
93- VariousTestActivities .class , SDKTestOptions .newLocalActivityOptions ());
113+ Workflow .newLocalActivityStub (VariousTestActivities .class , laOptions );
94114 List <Promise <String >> laResults = new ArrayList <>();
95115 Random r = Workflow .newRandom ();
96- for (int i = 0 ; i < COUNT ; i ++) {
116+ for (int i = 0 ; i < TOTAL_LOCAL_ACT_COUNT ; i ++) {
97117 laResults .add (Async .function (localActivities ::sleepActivity , (long ) r .nextInt (3000 ), i ));
98118 }
99119 Promise .allOf (laResults ).get ();
0 commit comments