Skip to content

Commit ae50fa9

Browse files
Refactor
1 parent ef5848e commit ae50fa9

File tree

15 files changed

+111
-58
lines changed

15 files changed

+111
-58
lines changed

temporal-sdk/src/main/java/io/temporal/client/WorkflowClient.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ static WorkflowClient newInstance(WorkflowServiceStubs service, WorkflowClientOp
203203
* workflowId is assumed.
204204
* @param workflowType type of the workflow. Optional as it is used for error reporting only.
205205
* @return Stub that can be used to start workflow and later to signal or query it.
206-
* @deprecated Use {@link #newUntypedWorkflowStub(Optional, WorkflowTargetOptions)} instead.
206+
* @deprecated Use {@link #newUntypedWorkflowStub(WorkflowTargetOptions, Optional)} instead.
207207
*/
208208
@Deprecated
209209
WorkflowStub newUntypedWorkflowStub(
@@ -216,20 +216,29 @@ WorkflowStub newUntypedWorkflowStub(
216216
* @param execution workflow id and optional run id for execution
217217
* @param workflowType type of the workflow. Optional as it is used for error reporting only.
218218
* @return Stub that can be used to start workflow and later to signal or query it.
219-
* @deprecated Use {@link #newUntypedWorkflowStub(Optional, WorkflowTargetOptions)} instead.
219+
* @deprecated Use {@link #newUntypedWorkflowStub(WorkflowTargetOptions, Optional)} instead.
220220
*/
221221
WorkflowStub newUntypedWorkflowStub(WorkflowExecution execution, Optional<String> workflowType);
222222

223223
/**
224224
* Creates workflow untyped client stub for a known execution. Use it to send signals or queries
225225
* to a running workflow. Do not call methods annotated with @WorkflowMethod.
226226
*
227-
* @param workflowType type of the workflow. Optional as it is used for error reporting only.
228227
* @param workflowTargetOptions options that specify target workflow execution.
229228
* @return Stub that can be used to start workflow and later to signal or query it.
230229
*/
230+
WorkflowStub newUntypedWorkflowStub(WorkflowTargetOptions workflowTargetOptions);
231+
232+
/**
233+
* Creates workflow untyped client stub for a known execution. Use it to send signals or queries
234+
* to a running workflow. Do not call methods annotated with @WorkflowMethod.
235+
*
236+
* @param workflowTargetOptions options that specify target workflow execution.
237+
* @param workflowType type of the workflow. Optional as it is used for error reporting only.
238+
* @return Stub that can be used to start workflow and later to signal or query it.
239+
*/
231240
WorkflowStub newUntypedWorkflowStub(
232-
Optional<String> workflowType, WorkflowTargetOptions workflowTargetOptions);
241+
WorkflowTargetOptions workflowTargetOptions, Optional<String> workflowType);
233242

234243
/**
235244
* Creates new {@link ActivityCompletionClient} that can be used to complete activities

temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,9 +241,14 @@ public WorkflowStub newUntypedWorkflowStub(
241241
.build());
242242
}
243243

244+
@Override
245+
public WorkflowStub newUntypedWorkflowStub(WorkflowTargetOptions workflowTargetOptions) {
246+
return newUntypedWorkflowStub(Optional.empty(), false, workflowTargetOptions);
247+
}
248+
244249
@Override
245250
public WorkflowStub newUntypedWorkflowStub(
246-
Optional<String> workflowType, WorkflowTargetOptions workflowTargetOptions) {
251+
WorkflowTargetOptions workflowTargetOptions, Optional<String> workflowType) {
247252
return newUntypedWorkflowStub(workflowType, false, workflowTargetOptions);
248253
}
249254

temporal-sdk/src/test/java/io/temporal/client/functional/StartTest.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import io.temporal.workflow.shared.TestMultiArgWorkflowFunctions.*;
1818
import java.time.Duration;
1919
import java.util.List;
20-
import java.util.Optional;
2120
import java.util.stream.Collectors;
2221
import org.junit.Assert;
2322
import org.junit.Rule;
@@ -202,7 +201,6 @@ private void assertResult(String expected, WorkflowExecution execution) {
202201
testWorkflowRule
203202
.getWorkflowClient()
204203
.newUntypedWorkflowStub(
205-
Optional.empty(),
206204
WorkflowTargetOptions.newBuilder().setWorkflowExecution(execution).build())
207205
.getResult(String.class);
208206
assertEquals(expected, result);
@@ -213,7 +211,6 @@ private void assertResult(int expected, WorkflowExecution execution) {
213211
testWorkflowRule
214212
.getWorkflowClient()
215213
.newUntypedWorkflowStub(
216-
Optional.empty(),
217214
WorkflowTargetOptions.newBuilder().setWorkflowExecution(execution).build())
218215
.getResult(int.class);
219216
assertEquals(expected, result);
@@ -223,7 +220,6 @@ private void waitForProc(WorkflowExecution execution) {
223220
testWorkflowRule
224221
.getWorkflowClient()
225222
.newUntypedWorkflowStub(
226-
Optional.empty(),
227223
WorkflowTargetOptions.newBuilder().setWorkflowExecution(execution).build())
228224
.getResult(Void.class);
229225
}

temporal-sdk/src/test/java/io/temporal/client/functional/WorkflowIdConflictPolicyTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,10 @@ public void policyTerminateExisting() {
5858
testWorkflowRule
5959
.getWorkflowClient()
6060
.newUntypedWorkflowStub(
61-
Optional.of(TestWorkflows.TestSignaledWorkflow.class.toString()),
6261
WorkflowTargetOptions.newBuilder()
6362
.setWorkflowExecution(workflowExecution1)
64-
.build())
63+
.build(),
64+
Optional.of(TestWorkflows.TestSignaledWorkflow.class.toString()))
6565
.getResult(String.class));
6666
}
6767

Original file line numberDiff line numberDiff line change
@@ -1,44 +1,42 @@
11
package io.temporal.client.functional;
22

3+
import io.temporal.api.common.v1.WorkflowExecution;
34
import io.temporal.api.enums.v1.EventType;
45
import io.temporal.client.*;
56
import io.temporal.testing.internal.SDKTestWorkflowRule;
7+
import io.temporal.workflow.QueryMethod;
68
import io.temporal.workflow.Workflow;
7-
import io.temporal.workflow.shared.TestWorkflows;
8-
import java.util.Optional;
9+
import io.temporal.workflow.WorkflowInterface;
10+
import io.temporal.workflow.WorkflowMethod;
911
import org.junit.Assert;
1012
import org.junit.Rule;
1113
import org.junit.Test;
1214

13-
public class WorkflowStubFirstExecutionRunIdTest {
15+
public class WorkflowStubRespectRunIdTest {
1416
@Rule
1517
public SDKTestWorkflowRule testWorkflowRule =
1618
SDKTestWorkflowRule.newBuilder().setWorkflowTypes(AwaitingWorkflow.class).build();
1719

1820
@Test
1921
public void terminateFollowFirstRunId() throws InterruptedException {
20-
TestWorkflows.TestWorkflow1 workflow =
21-
testWorkflowRule.newWorkflowStub(TestWorkflows.TestWorkflow1.class);
22+
TestWorkflowWithQuery workflow = testWorkflowRule.newWorkflowStub(TestWorkflowWithQuery.class);
2223
WorkflowClient.start(workflow::execute, "input1");
2324
WorkflowStub untyped = WorkflowStub.fromTyped(workflow);
24-
// TODO wait for the continue as new to be visible
25-
Thread.sleep(1000);
25+
waitForContinueAsNew(untyped.getExecution());
2626
Assert.assertThrows(
2727
"If the workflow continued as new, terminating by execution without firstExecutionRunId should fail",
2828
WorkflowNotFoundException.class,
2929
() ->
3030
testWorkflowRule
3131
.getWorkflowClient()
3232
.newUntypedWorkflowStub(
33-
Optional.empty(),
3433
WorkflowTargetOptions.newBuilder()
3534
.setWorkflowExecution(untyped.getExecution())
3635
.build())
3736
.terminate("termination"));
3837
testWorkflowRule
3938
.getWorkflowClient()
4039
.newUntypedWorkflowStub(
41-
Optional.empty(),
4240
WorkflowTargetOptions.newBuilder()
4341
.setWorkflowExecution(untyped.getExecution())
4442
.setFirstExecutionRunId(untyped.getExecution().getRunId())
@@ -52,16 +50,13 @@ public void terminateFollowFirstRunId() throws InterruptedException {
5250

5351
@Test
5452
public void cancelFollowFirstRunId() throws InterruptedException {
55-
TestWorkflows.TestWorkflow1 workflow =
56-
testWorkflowRule.newWorkflowStub(TestWorkflows.TestWorkflow1.class);
53+
TestWorkflowWithQuery workflow = testWorkflowRule.newWorkflowStub(TestWorkflowWithQuery.class);
5754
WorkflowClient.start(workflow::execute, "input1");
5855
WorkflowStub untyped = WorkflowStub.fromTyped(workflow);
59-
// TODO wait for the continue as new to be visible
60-
Thread.sleep(1000);
56+
waitForContinueAsNew(untyped.getExecution());
6157
testWorkflowRule
6258
.getWorkflowClient()
6359
.newUntypedWorkflowStub(
64-
Optional.empty(),
6560
WorkflowTargetOptions.newBuilder().setWorkflowExecution(untyped.getExecution()).build())
6661
.cancel();
6762
testWorkflowRule.assertNoHistoryEvent(
@@ -72,7 +67,6 @@ public void cancelFollowFirstRunId() throws InterruptedException {
7267
testWorkflowRule
7368
.getWorkflowClient()
7469
.newUntypedWorkflowStub(
75-
Optional.empty(),
7670
WorkflowTargetOptions.newBuilder()
7771
.setWorkflowExecution(untyped.getExecution())
7872
.setFirstExecutionRunId(untyped.getExecution().getRunId())
@@ -89,19 +83,16 @@ public void cancelFollowFirstRunId() throws InterruptedException {
8983

9084
@Test
9185
public void signalRespectRunId() throws InterruptedException {
92-
TestWorkflows.TestWorkflow1 workflow =
93-
testWorkflowRule.newWorkflowStub(TestWorkflows.TestWorkflow1.class);
86+
TestWorkflowWithQuery workflow = testWorkflowRule.newWorkflowStub(TestWorkflowWithQuery.class);
9487
WorkflowClient.start(workflow::execute, "input1");
9588
WorkflowStub untyped = WorkflowStub.fromTyped(workflow);
96-
// TODO wait for the continue as new to be visible
97-
Thread.sleep(1000);
89+
waitForContinueAsNew(untyped.getExecution());
9890
Assert.assertThrows(
9991
WorkflowNotFoundException.class,
10092
() ->
10193
testWorkflowRule
10294
.getWorkflowClient()
10395
.newUntypedWorkflowStub(
104-
Optional.empty(),
10596
WorkflowTargetOptions.newBuilder()
10697
.setWorkflowExecution(untyped.getExecution())
10798
.build())
@@ -110,14 +101,78 @@ public void signalRespectRunId() throws InterruptedException {
110101
testWorkflowRule
111102
.getWorkflowClient()
112103
.newUntypedWorkflowStub(
113-
Optional.empty(),
114104
WorkflowTargetOptions.newBuilder()
115105
.setWorkflowId(untyped.getExecution().getWorkflowId())
116106
.build())
117107
.signal("signal");
118108
}
119109

120-
public static class AwaitingWorkflow implements TestWorkflows.TestWorkflow1 {
110+
private void waitForContinueAsNew(WorkflowExecution execution) throws InterruptedException {
111+
final int maxAttempts = 5; // 100 * 100ms = 10s
112+
final long sleepMs = 1000L;
113+
int attempts = 0;
114+
115+
WorkflowStub latestStub =
116+
testWorkflowRule
117+
.getWorkflowClient()
118+
.newUntypedWorkflowStub(
119+
WorkflowTargetOptions.newBuilder()
120+
.setWorkflowId(execution.getWorkflowId())
121+
.build());
122+
123+
while (attempts++ < maxAttempts) {
124+
try {
125+
String currentRunId = latestStub.describe().getExecution().getRunId();
126+
if (!execution.getRunId().equals(currentRunId)) {
127+
return;
128+
}
129+
} catch (Exception e) {
130+
// Ignore and retry until timeout (query may fail while continue-as-new is in progress)
131+
}
132+
Thread.sleep(sleepMs);
133+
}
134+
135+
throw new AssertionError(
136+
"ContinueAsNew event was not observed for workflowId: " + execution.getWorkflowId());
137+
}
138+
139+
@Test
140+
public void queryRespectRunId() throws InterruptedException {
141+
TestWorkflowWithQuery workflow = testWorkflowRule.newWorkflowStub(TestWorkflowWithQuery.class);
142+
WorkflowClient.start(workflow::execute, "input1");
143+
WorkflowStub untyped = WorkflowStub.fromTyped(workflow);
144+
waitForContinueAsNew(untyped.getExecution());
145+
String actualRunId =
146+
testWorkflowRule
147+
.getWorkflowClient()
148+
.newUntypedWorkflowStub(
149+
WorkflowTargetOptions.newBuilder()
150+
.setWorkflowExecution(untyped.getExecution())
151+
.build())
152+
.query("getRunId", String.class);
153+
Assert.assertEquals(untyped.getExecution().getRunId(), actualRunId);
154+
155+
actualRunId =
156+
testWorkflowRule
157+
.getWorkflowClient()
158+
.newUntypedWorkflowStub(
159+
WorkflowTargetOptions.newBuilder()
160+
.setWorkflowId(untyped.getExecution().getWorkflowId())
161+
.build())
162+
.query("getRunId", String.class);
163+
Assert.assertNotEquals(untyped.getExecution().getRunId(), actualRunId);
164+
}
165+
166+
@WorkflowInterface
167+
public interface TestWorkflowWithQuery {
168+
@WorkflowMethod()
169+
String execute(String arg);
170+
171+
@QueryMethod()
172+
String getRunId();
173+
}
174+
175+
public static class AwaitingWorkflow implements TestWorkflowWithQuery {
121176

122177
@Override
123178
public String execute(String arg) {
@@ -127,5 +182,10 @@ public String execute(String arg) {
127182
Workflow.await(() -> false);
128183
return "done";
129184
}
185+
186+
@Override
187+
public String getRunId() {
188+
return Workflow.getInfo().getRunId();
189+
}
130190
}
131191
}

temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowSlotMaxConcurrentTests.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.time.Duration;
2222
import java.util.ArrayList;
2323
import java.util.List;
24-
import java.util.Optional;
2524
import java.util.concurrent.atomic.AtomicInteger;
2625
import org.junit.After;
2726
import org.junit.Before;
@@ -167,7 +166,6 @@ public void TestSlotsNotExceeded() {
167166
for (WorkflowExecution execution : executions) {
168167
WorkflowStub workflowStub =
169168
client.newUntypedWorkflowStub(
170-
Optional.empty(),
171169
WorkflowTargetOptions.newBuilder().setWorkflowExecution(execution).build());
172170
workflowStub.getResult(String.class);
173171
}

temporal-sdk/src/test/java/io/temporal/workflow/SyncTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,8 @@ public void testSyncUntypedAndStackTrace() {
7878
testWorkflowRule
7979
.getWorkflowClient()
8080
.newUntypedWorkflowStub(
81-
workflowStub.getWorkflowType(),
82-
WorkflowTargetOptions.newBuilder().setWorkflowExecution(execution).build());
81+
WorkflowTargetOptions.newBuilder().setWorkflowExecution(execution).build(),
82+
workflowStub.getWorkflowType());
8383
stackTrace = workflowStub.query(QUERY_TYPE_STACK_TRACE, String.class);
8484
assertTrue(stackTrace, stackTrace.contains("TestSyncWorkflowImpl.execute"));
8585
assertTrue(stackTrace, stackTrace.contains("activityWithDelay"));

temporal-sdk/src/test/java/io/temporal/workflow/signalTests/SignalDuringLastWorkflowTaskTest.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import io.temporal.worker.WorkerOptions;
1111
import io.temporal.workflow.shared.TestWorkflows.TestSignaledWorkflow;
1212
import java.time.Duration;
13-
import java.util.Optional;
1413
import java.util.concurrent.CompletableFuture;
1514
import java.util.concurrent.ExecutionException;
1615
import java.util.concurrent.TimeUnit;
@@ -58,7 +57,6 @@ public void testSignalDuringLastWorkflowTask() throws ExecutionException, Interr
5857
testWorkflowRule
5958
.getWorkflowClient()
6059
.newUntypedWorkflowStub(
61-
Optional.empty(),
6260
WorkflowTargetOptions.newBuilder().setWorkflowExecution(execution).build())
6361
.getResult(String.class));
6462
assertCompleted.complete(true);

temporal-sdk/src/test/java/io/temporal/workflow/signalTests/SignalTest.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ public void testSignal() {
7171
"Hello World!",
7272
workflowClient
7373
.newUntypedWorkflowStub(
74-
Optional.empty(),
7574
WorkflowTargetOptions.newBuilder().setWorkflowExecution(execution).build())
7675
.getResult(String.class));
7776
client2.execute();
@@ -109,7 +108,6 @@ public void testSignalWithStart() {
109108
"Hello World!",
110109
workflowClient
111110
.newUntypedWorkflowStub(
112-
Optional.empty(),
113111
WorkflowTargetOptions.newBuilder().setWorkflowExecution(execution).build())
114112
.getResult(String.class));
115113

@@ -170,8 +168,8 @@ public void testSignalUntyped() {
170168
"Hello World!",
171169
workflowClient
172170
.newUntypedWorkflowStub(
173-
Optional.of(workflowType),
174-
WorkflowTargetOptions.newBuilder().setWorkflowExecution(execution).build())
171+
WorkflowTargetOptions.newBuilder().setWorkflowExecution(execution).build(),
172+
Optional.of(workflowType))
175173
.getResult(String.class));
176174
assertEquals("Hello World!", workflowStub.getResult(String.class));
177175
assertEquals("World!", workflowStub.query("getState", String.class));
@@ -185,8 +183,8 @@ public void testSignalUntyped() {
185183
.build());
186184
WorkflowStub workflowStubNotOptionRejectCondition =
187185
client.newUntypedWorkflowStub(
188-
Optional.of(workflowType),
189-
WorkflowTargetOptions.newBuilder().setWorkflowExecution(execution).build());
186+
WorkflowTargetOptions.newBuilder().setWorkflowExecution(execution).build(),
187+
Optional.of(workflowType));
190188
try {
191189
workflowStubNotOptionRejectCondition.query("getState", String.class);
192190
fail("unreachable");

temporal-sdk/src/test/java/io/temporal/workflow/updateTest/DynamicUpdateTest.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import io.temporal.workflow.shared.TestWorkflows;
1414
import java.util.ArrayList;
1515
import java.util.List;
16-
import java.util.Optional;
1716
import java.util.UUID;
1817
import java.util.concurrent.ExecutionException;
1918
import org.junit.Assert;
@@ -60,7 +59,6 @@ public void dynamicUpdate() throws ExecutionException, InterruptedException {
6059
testWorkflowRule
6160
.getWorkflowClient()
6261
.newUntypedWorkflowStub(
63-
Optional.empty(),
6462
WorkflowTargetOptions.newBuilder().setWorkflowExecution(execution).build())
6563
.getResult(String.class);
6664
assertEquals(" update complete", result);

0 commit comments

Comments
 (0)