Skip to content

Commit 183d8bd

Browse files
Add new constructor for WorkflowStub to target correct execution
1 parent e7c0c2a commit 183d8bd

File tree

10 files changed

+334
-34
lines changed

10 files changed

+334
-34
lines changed

temporal-sdk/src/main/java/io/temporal/client/WorkflowClient.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,16 @@ static WorkflowClient newInstance(WorkflowServiceStubs service, WorkflowClientOp
150150
*/
151151
<T> T newWorkflowStub(Class<T> workflowInterface, String workflowId);
152152

153+
/**
154+
* Creates workflow client stub for a known execution. Use it to send signals or queries to a
155+
* running workflow. Do not call methods annotated with @WorkflowMethod.
156+
*
157+
* @param workflowInterface interface that given workflow implements.
158+
* @param workflowTargetOptions options that specify target workflow execution.
159+
* @return Stub that implements workflowInterface and can be used to signal or query it.
160+
*/
161+
<T> T newWorkflowStub(Class<T> workflowInterface, WorkflowTargetOptions workflowTargetOptions);
162+
153163
/**
154164
* Creates workflow client stub for a known execution. Use it to send signals, updates, or queries
155165
* to a running workflow. Do not call methods annotated with @WorkflowMethod.
@@ -205,6 +215,17 @@ WorkflowStub newUntypedWorkflowStub(
205215
*/
206216
WorkflowStub newUntypedWorkflowStub(WorkflowExecution execution, Optional<String> workflowType);
207217

218+
/**
219+
* Creates workflow untyped client stub for a known execution. Use it to send signals or queries
220+
* to a running workflow. Do not call methods annotated with @WorkflowMethod.
221+
*
222+
* @param workflowType type of the workflow. Optional as it is used for error reporting only.
223+
* @param workflowTargetOptions options that specify target workflow execution.
224+
* @return Stub that can be used to start workflow and later to signal or query it.
225+
*/
226+
WorkflowStub newUntypedWorkflowStub(
227+
Optional<String> workflowType, WorkflowTargetOptions workflowTargetOptions);
228+
208229
/**
209230
* Creates new {@link ActivityCompletionClient} that can be used to complete activities
210231
* asynchronously. Only relevant for activity implementations that called {@link

temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java

Lines changed: 62 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -149,24 +149,49 @@ public <T> T newWorkflowStub(Class<T> workflowInterface, String workflowId) {
149149
return newWorkflowStub(workflowInterface, workflowId, Optional.empty());
150150
}
151151

152+
public <T> T newWorkflowStub(
153+
Class<T> workflowInterface, WorkflowTargetOptions workflowTargetOptions) {
154+
return newWorkflowStub(workflowInterface, workflowTargetOptions, true);
155+
}
156+
152157
@Override
153158
public <T> T newWorkflowStub(
154159
Class<T> workflowInterface, String workflowId, Optional<String> runId) {
160+
return newWorkflowStub(
161+
workflowInterface,
162+
WorkflowTargetOptions.newBuilder()
163+
.setWorkflowId(workflowId)
164+
.setRunId(runId.orElse(null))
165+
.build());
166+
}
167+
168+
public <T> T newWorkflowStub(
169+
Class<T> workflowInterface,
170+
WorkflowTargetOptions workflowTargetOptions,
171+
boolean legacyTargeting) {
155172
checkAnnotation(
156173
workflowInterface,
157174
WorkflowMethod.class,
158175
QueryMethod.class,
159176
SignalMethod.class,
160177
UpdateMethod.class);
161-
if (Strings.isNullOrEmpty(workflowId)) {
178+
if (Strings.isNullOrEmpty(workflowTargetOptions.getWorkflowId())) {
162179
throw new IllegalArgumentException("workflowId is null or empty");
163180
}
164-
WorkflowExecution execution =
165-
WorkflowExecution.newBuilder().setWorkflowId(workflowId).setRunId(runId.orElse("")).build();
181+
WorkflowExecution.Builder execution =
182+
WorkflowExecution.newBuilder().setWorkflowId(workflowTargetOptions.getWorkflowId());
183+
if (!Strings.isNullOrEmpty(workflowTargetOptions.getRunId())) {
184+
execution.setRunId(workflowTargetOptions.getRunId());
185+
}
166186

167187
WorkflowInvocationHandler invocationHandler =
168188
new WorkflowInvocationHandler(
169-
workflowInterface, this.getOptions(), workflowClientCallsInvoker, execution);
189+
workflowInterface,
190+
this.getOptions(),
191+
workflowClientCallsInvoker,
192+
execution.build(),
193+
legacyTargeting,
194+
workflowTargetOptions.getFirstExecutionRunId());
170195
@SuppressWarnings("unchecked")
171196
T result =
172197
(T)
@@ -202,13 +227,43 @@ public WorkflowStub newUntypedWorkflowStub(
202227
}
203228

204229
@Override
205-
@SuppressWarnings("deprecation")
206230
public WorkflowStub newUntypedWorkflowStub(
207231
WorkflowExecution execution, Optional<String> workflowType) {
232+
return newUntypedWorkflowStub(
233+
workflowType,
234+
true,
235+
WorkflowTargetOptions.newBuilder()
236+
.setWorkflowId(execution.getWorkflowId())
237+
.setRunId(execution.getRunId())
238+
.build());
239+
}
240+
241+
@Override
242+
public WorkflowStub newUntypedWorkflowStub(
243+
Optional<String> workflowType, WorkflowTargetOptions workflowTargetOptions) {
244+
return newUntypedWorkflowStub(workflowType, false, workflowTargetOptions);
245+
}
246+
247+
@SuppressWarnings("deprecation")
248+
WorkflowStub newUntypedWorkflowStub(
249+
Optional<String> workflowType,
250+
boolean legacyTargeting,
251+
WorkflowTargetOptions workflowTargetOptions) {
252+
WorkflowExecution.Builder execution =
253+
WorkflowExecution.newBuilder().setWorkflowId(workflowTargetOptions.getWorkflowId());
254+
if (!Strings.isNullOrEmpty(workflowTargetOptions.getRunId())) {
255+
execution.setRunId(workflowTargetOptions.getRunId());
256+
}
208257
WorkflowStub result =
209-
new WorkflowStubImpl(options, workflowClientCallsInvoker, workflowType, execution);
258+
new WorkflowStubImpl(
259+
options,
260+
workflowClientCallsInvoker,
261+
workflowType,
262+
execution.build(),
263+
legacyTargeting,
264+
workflowTargetOptions.getFirstExecutionRunId());
210265
for (WorkflowClientInterceptor i : interceptors) {
211-
result = i.newUntypedWorkflowStub(execution, workflowType, result);
266+
result = i.newUntypedWorkflowStub(execution.build(), workflowType, result);
212267
}
213268
return result;
214269
}

temporal-sdk/src/main/java/io/temporal/client/WorkflowInvocationHandler.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.lang.reflect.InvocationHandler;
2121
import java.lang.reflect.Method;
2222
import java.util.*;
23+
import javax.annotation.Nullable;
2324

2425
/**
2526
* Dynamic implementation of a strongly typed workflow interface that can be used to start, signal
@@ -107,11 +108,19 @@ static void closeAsyncInvocation() {
107108
Class<?> workflowInterface,
108109
WorkflowClientOptions clientOptions,
109110
WorkflowClientCallsInterceptor workflowClientCallsInvoker,
110-
WorkflowExecution execution) {
111+
WorkflowExecution execution,
112+
boolean legacyTargeting,
113+
@Nullable String firstExecutionRunId) {
111114
workflowMetadata = POJOWorkflowInterfaceMetadata.newInstance(workflowInterface, false);
112115
Optional<String> workflowType = workflowMetadata.getWorkflowType();
113116
WorkflowStub stub =
114-
new WorkflowStubImpl(clientOptions, workflowClientCallsInvoker, workflowType, execution);
117+
new WorkflowStubImpl(
118+
clientOptions,
119+
workflowClientCallsInvoker,
120+
workflowType,
121+
execution,
122+
legacyTargeting,
123+
firstExecutionRunId);
115124
for (WorkflowClientInterceptor i : clientOptions.getInterceptors()) {
116125
stub = i.newUntypedWorkflowStub(execution, workflowType, stub);
117126
}

temporal-sdk/src/main/java/io/temporal/client/WorkflowStub.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,10 +160,25 @@ <R> WorkflowUpdateHandle<R> startUpdateWithStart(
160160
<R> R executeUpdateWithStart(
161161
UpdateOptions<R> updateOptions, Object[] updateArgs, Object[] startArgs);
162162

163+
/**
164+
* Sends a signal to a workflow, starting the workflow if it is not already running.
165+
*
166+
* @param signalName name of the signal handler. Usually it is a method name.
167+
* @param signalArgs signal method arguments
168+
* @param startArgs workflow start arguments
169+
* @return workflow execution
170+
*/
163171
WorkflowExecution signalWithStart(String signalName, Object[] signalArgs, Object[] startArgs);
164172

173+
/**
174+
* @return workflow type name if it was provided when the stub was created.
175+
*/
165176
Optional<String> getWorkflowType();
166177

178+
/**
179+
* @return current workflow execution. Returns null if the workflow has not been started yet.
180+
*/
181+
@Nullable
167182
WorkflowExecution getExecution();
168183

169184
/**
@@ -406,6 +421,9 @@ <R> CompletableFuture<R> getResultAsync(
406421
*/
407422
WorkflowExecutionDescription describe();
408423

424+
/**
425+
* @return workflow options if they were provided when the stub was created.
426+
*/
409427
Optional<WorkflowOptions> getOptions();
410428

411429
/**

temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java

Lines changed: 37 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -35,19 +35,25 @@ class WorkflowStubImpl implements WorkflowStub {
3535
// if null, this stub is created to bound to an existing execution.
3636
// This stub is created to bound to an existing execution otherwise.
3737
private final @Nullable WorkflowOptions options;
38+
private final boolean legacyTargeting;
39+
private final @Nullable String firstExecutionRunId;
3840

3941
WorkflowStubImpl(
4042
WorkflowClientOptions clientOptions,
4143
WorkflowClientCallsInterceptor workflowClientInvoker,
4244
Optional<String> workflowType,
43-
WorkflowExecution execution) {
45+
WorkflowExecution execution,
46+
boolean legacyTargeting,
47+
@Nullable String firstExecutionRunId) {
4448
this.clientOptions = clientOptions;
4549
this.workflowClientInvoker = workflowClientInvoker;
4650
this.workflowType = workflowType;
4751
if (execution == null || execution.getWorkflowId().isEmpty()) {
4852
throw new IllegalArgumentException("null or empty workflowId");
4953
}
5054
this.execution.set(execution);
55+
this.legacyTargeting = legacyTargeting;
56+
this.firstExecutionRunId = firstExecutionRunId;
5157
this.options = null;
5258
}
5359

@@ -60,12 +66,14 @@ class WorkflowStubImpl implements WorkflowStub {
6066
this.workflowClientInvoker = workflowClientInvoker;
6167
this.workflowType = Optional.of(workflowType);
6268
this.options = options;
69+
this.legacyTargeting = false;
70+
this.firstExecutionRunId = null;
6371
}
6472

6573
@Override
6674
public void signal(String signalName, Object... args) {
6775
checkStarted();
68-
WorkflowExecution targetExecution = currentExecutionWithoutRunId();
76+
WorkflowExecution targetExecution = currentExecutionCheckLegacy();
6977
try {
7078
workflowClientInvoker.signal(
7179
new WorkflowClientCallsInterceptor.WorkflowSignalInput(
@@ -338,6 +346,7 @@ public <R> R update(String updateName, Class<R> resultClass, Object... args) {
338346
.setUpdateName(updateName)
339347
.setWaitForStage(WorkflowUpdateStage.COMPLETED)
340348
.setResultClass(resultClass)
349+
.setFirstExecutionRunId(firstExecutionRunId)
341350
.build();
342351
return startUpdate(options, args).getResultAsync().get();
343352
} catch (InterruptedException e) {
@@ -385,21 +394,17 @@ private <R> WorkflowClientCallsInterceptor.StartUpdateInput<R> startUpdateInput(
385394
Strings.isNullOrEmpty(options.getUpdateId())
386395
? UUID.randomUUID().toString()
387396
: options.getUpdateId();
388-
WorkflowClientCallsInterceptor.StartUpdateInput<R> input =
389-
new WorkflowClientCallsInterceptor.StartUpdateInput<>(
390-
targetExecution,
391-
workflowType,
392-
options.getUpdateName(),
393-
Header.empty(),
394-
updateId,
395-
args,
396-
options.getResultClass(),
397-
options.getResultType(),
398-
options.getFirstExecutionRunId(),
399-
WaitPolicy.newBuilder()
400-
.setLifecycleStage(options.getWaitForStage().getProto())
401-
.build());
402-
return input;
397+
return new WorkflowClientCallsInterceptor.StartUpdateInput<>(
398+
targetExecution,
399+
workflowType,
400+
options.getUpdateName(),
401+
Header.empty(),
402+
updateId,
403+
args,
404+
options.getResultClass(),
405+
options.getResultType(),
406+
options.getFirstExecutionRunId(),
407+
WaitPolicy.newBuilder().setLifecycleStage(options.getWaitForStage().getProto()).build());
403408
}
404409

405410
@Override
@@ -435,10 +440,11 @@ public void cancel() {
435440
@Override
436441
public void cancel(@Nullable String reason) {
437442
checkStarted();
438-
WorkflowExecution targetExecution = currentExecutionWithoutRunId();
443+
WorkflowExecution targetExecution = currentExecutionCheckLegacy();
439444
try {
440445
workflowClientInvoker.cancel(
441-
new WorkflowClientCallsInterceptor.CancelInput(targetExecution, reason));
446+
new WorkflowClientCallsInterceptor.CancelInput(
447+
targetExecution, firstExecutionRunId, reason));
442448
} catch (Exception e) {
443449
Throwable failure = throwAsWorkflowFailureException(e, targetExecution);
444450
throw new WorkflowServiceException(targetExecution, workflowType.orElse(null), failure);
@@ -448,10 +454,11 @@ public void cancel(@Nullable String reason) {
448454
@Override
449455
public void terminate(@Nullable String reason, Object... details) {
450456
checkStarted();
451-
WorkflowExecution targetExecution = currentExecutionWithoutRunId();
457+
WorkflowExecution targetExecution = currentExecutionCheckLegacy();
452458
try {
453459
workflowClientInvoker.terminate(
454-
new WorkflowClientCallsInterceptor.TerminateInput(targetExecution, reason, details));
460+
new WorkflowClientCallsInterceptor.TerminateInput(
461+
targetExecution, firstExecutionRunId, reason, details));
455462
} catch (Exception e) {
456463
Throwable failure = throwAsWorkflowFailureException(e, targetExecution);
457464
throw new WorkflowServiceException(targetExecution, workflowType.orElse(null), failure);
@@ -532,6 +539,14 @@ private WorkflowExecution currentExecutionWithoutRunId() {
532539
}
533540
}
534541

542+
private WorkflowExecution currentExecutionCheckLegacy() {
543+
if (legacyTargeting) {
544+
return currentExecutionWithoutRunId();
545+
} else {
546+
return execution.get();
547+
}
548+
}
549+
535550
private <R> R throwAsWorkflowFailureExceptionForQuery(
536551
Throwable failure,
537552
@SuppressWarnings("unused") Class<R> returnType,
@@ -589,6 +604,7 @@ private Throwable throwAsWorkflowFailureException(
589604

590605
private void populateExecutionAfterStart(WorkflowExecution startedExecution) {
591606
this.startedExecution.set(startedExecution);
607+
// this.firstExecutionRunId.set(startedExecution.getRunId());
592608
// bind to an execution without a runId, so queries follow runId chains by default
593609
this.execution.set(WorkflowExecution.newBuilder(startedExecution).setRunId("").build());
594610
}

0 commit comments

Comments
 (0)