Skip to content

Commit 1b29853

Browse files
finish wiring up cancel cause to Workflow cancellation. (#2695)
Finish wiring up cancel cause to Workflow cancellation.
1 parent 9c7a5d1 commit 1b29853

File tree

11 files changed

+278
-10
lines changed

11 files changed

+278
-10
lines changed

temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,14 +378,27 @@ public Promise<Void> getResult() {
378378

379379
final class CancelWorkflowInput {
380380
private final WorkflowExecution execution;
381+
private final @Nullable String reason;
381382

383+
// Kept for backward compatibility
384+
@Deprecated
382385
public CancelWorkflowInput(WorkflowExecution execution) {
386+
this(execution, null);
387+
}
388+
389+
public CancelWorkflowInput(WorkflowExecution execution, @Nullable String reason) {
383390
this.execution = execution;
391+
this.reason = reason;
384392
}
385393

386394
public WorkflowExecution getExecution() {
387395
return execution;
388396
}
397+
398+
@Nullable
399+
public String getReason() {
400+
return reason;
401+
}
389402
}
390403

391404
final class CancelWorkflowOutput {

temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,10 +185,13 @@ Functions.Proc1<Exception> signalExternalWorkflowExecution(
185185
* Request cancellation of a workflow execution by WorkflowId and optionally RunId.
186186
*
187187
* @param execution contains WorkflowId and optional RunId of the workflow to send request to.
188+
* @param reason optional reason for cancellation.
188189
* @param callback callback notified about the operation result
189190
*/
190191
void requestCancelExternalWorkflowExecution(
191-
WorkflowExecution execution, Functions.Proc2<Void, RuntimeException> callback);
192+
WorkflowExecution execution,
193+
@Nullable String reason,
194+
Functions.Proc2<Void, RuntimeException> callback);
192195

193196
/**
194197
* @return time of the {@link PollWorkflowTaskQueueResponse} start event of the workflow task

temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -231,13 +231,17 @@ public Functions.Proc1<Exception> signalExternalWorkflowExecution(
231231

232232
@Override
233233
public void requestCancelExternalWorkflowExecution(
234-
WorkflowExecution execution, Functions.Proc2<Void, RuntimeException> callback) {
235-
RequestCancelExternalWorkflowExecutionCommandAttributes attributes =
234+
WorkflowExecution execution,
235+
@Nullable String reason,
236+
Functions.Proc2<Void, RuntimeException> callback) {
237+
RequestCancelExternalWorkflowExecutionCommandAttributes.Builder attributes =
236238
RequestCancelExternalWorkflowExecutionCommandAttributes.newBuilder()
237239
.setWorkflowId(execution.getWorkflowId())
238-
.setRunId(execution.getRunId())
239-
.build();
240-
workflowStateMachines.requestCancelExternalWorkflowExecution(attributes, callback);
240+
.setRunId(execution.getRunId());
241+
if (reason != null) {
242+
attributes.setReason(reason);
243+
}
244+
workflowStateMachines.requestCancelExternalWorkflowExecution(attributes.build(), callback);
241245
}
242246

243247
@Override

temporal-sdk/src/main/java/io/temporal/internal/sync/ExternalWorkflowStubImpl.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
66
import io.temporal.workflow.*;
77
import java.util.Objects;
8+
import javax.annotation.Nullable;
89

910
/** Dynamic implementation of a strongly typed child workflow interface. */
1011
class ExternalWorkflowStubImpl implements ExternalWorkflowStub {
@@ -52,10 +53,26 @@ public void signal(String signalName, Object... args) {
5253

5354
@Override
5455
public void cancel() {
56+
cancel(null);
57+
}
58+
59+
@Override
60+
public void cancel(@Nullable String reason) {
5561
assertReadOnly.apply("cancel external workflow");
62+
if (reason == null) {
63+
try {
64+
CancellationScope currentScope = CancellationScope.current();
65+
if (currentScope.isCancelRequested()) {
66+
reason = currentScope.getCancellationReason();
67+
}
68+
} catch (IllegalStateException ignored) {
69+
// Outside of workflow thread; leave reason as null.
70+
}
71+
}
5672
Promise<Void> cancelRequested =
5773
outboundCallsInterceptor
58-
.cancelWorkflow(new WorkflowOutboundCallsInterceptor.CancelWorkflowInput(execution))
74+
.cancelWorkflow(
75+
new WorkflowOutboundCallsInterceptor.CancelWorkflowInput(execution, reason))
5976
.getResult();
6077
if (AsyncInternal.isAsync()) {
6178
AsyncInternal.setAsyncResult(cancelRequested);

temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1374,6 +1374,7 @@ public CancelWorkflowOutput cancelWorkflow(CancelWorkflowInput input) {
13741374
CompletablePromise<Void> result = Workflow.newPromise();
13751375
replayContext.requestCancelExternalWorkflowExecution(
13761376
input.getExecution(),
1377+
input.getReason(),
13771378
(r, exception) -> {
13781379
if (exception == null) {
13791380
result.complete(null);

temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -627,9 +627,13 @@ public static void continueAsNew(
627627
}
628628

629629
public static Promise<Void> cancelWorkflow(WorkflowExecution execution) {
630+
return cancelWorkflow(execution, null);
631+
}
632+
633+
public static Promise<Void> cancelWorkflow(WorkflowExecution execution, @Nullable String reason) {
630634
assertNotReadOnly("cancel workflow");
631635
return getWorkflowOutboundInterceptor()
632-
.cancelWorkflow(new WorkflowOutboundCallsInterceptor.CancelWorkflowInput(execution))
636+
.cancelWorkflow(new WorkflowOutboundCallsInterceptor.CancelWorkflowInput(execution, reason))
633637
.getResult();
634638
}
635639

temporal-sdk/src/main/java/io/temporal/workflow/CancellationScope.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public interface CancellationScope extends Runnable {
2323
/**
2424
* Cancels the scope as well as all its children.
2525
*
26-
* @param reason human readable reason for the cancellation. Becomes message of the
26+
* @param reason human-readable reason for the cancellation. Becomes message of the
2727
* CanceledException thrown.
2828
*/
2929
void cancel(String reason);

temporal-sdk/src/main/java/io/temporal/workflow/ExternalWorkflowStub.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import io.temporal.api.common.v1.WorkflowExecution;
44
import io.temporal.internal.sync.StubMarker;
5+
import javax.annotation.Nullable;
56

67
/**
78
* Supports signalling and cancelling any workflows by the workflow type and their id. This is
@@ -49,5 +50,13 @@ static <T> ExternalWorkflowStub fromTyped(T typed) {
4950
*/
5051
void signal(String signalName, Object... args);
5152

53+
/** Request cancellation of the workflow execution. */
5254
void cancel();
55+
56+
/**
57+
* Request cancellation of the workflow execution with a reason.
58+
*
59+
* @param reason optional reason for cancellation.
60+
*/
61+
void cancel(@Nullable String reason);
5362
}
Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
package io.temporal.workflow.cancellationTests;
2+
3+
import static org.junit.Assert.*;
4+
import static org.junit.Assert.assertEquals;
5+
6+
import io.temporal.client.WorkflowClient;
7+
import io.temporal.client.WorkflowFailedException;
8+
import io.temporal.client.WorkflowOptions;
9+
import io.temporal.client.WorkflowStub;
10+
import io.temporal.failure.CanceledFailure;
11+
import io.temporal.testing.internal.SDKTestWorkflowRule;
12+
import io.temporal.workflow.CancellationScope;
13+
import io.temporal.workflow.ExternalWorkflowStub;
14+
import io.temporal.workflow.Promise;
15+
import io.temporal.workflow.QueryMethod;
16+
import io.temporal.workflow.Workflow;
17+
import io.temporal.workflow.WorkflowInterface;
18+
import io.temporal.workflow.WorkflowMethod;
19+
import java.util.UUID;
20+
import org.junit.Rule;
21+
import org.junit.Test;
22+
23+
public class WorkflowCancelReasonTest {
24+
25+
@Rule
26+
public SDKTestWorkflowRule testWorkflowRule =
27+
SDKTestWorkflowRule.newBuilder()
28+
.setUseTimeskipping(false)
29+
.setWorkflowTypes(
30+
CancellationAwareWorkflow.class,
31+
CancelExternalWorkflowImpl.class,
32+
CascadingCancelWorkflowImpl.class)
33+
.build();
34+
35+
@Test
36+
public void testCancellationReasonFromClient() {
37+
String workflowId = "client-cancel-" + UUID.randomUUID();
38+
CancellationReasonWorkflow workflow =
39+
testWorkflowRule
40+
.getWorkflowClient()
41+
.newWorkflowStub(
42+
CancellationReasonWorkflow.class,
43+
WorkflowOptions.newBuilder()
44+
.setWorkflowId(workflowId)
45+
.setTaskQueue(testWorkflowRule.getTaskQueue())
46+
.build());
47+
48+
WorkflowClient.start(workflow::execute);
49+
50+
WorkflowStub stub = WorkflowStub.fromTyped(workflow);
51+
String reason = "client-cancel-reason";
52+
stub.cancel(reason);
53+
54+
WorkflowFailedException exception =
55+
assertThrows(WorkflowFailedException.class, () -> stub.getResult(String.class));
56+
assertEquals(CanceledFailure.class, exception.getCause().getClass());
57+
58+
assertEquals(expectedResult(reason), workflow.getRecordedReason());
59+
}
60+
61+
@Test
62+
public void testCancellationReasonFromCommand() {
63+
String targetWorkflowId = "command-cancel-" + UUID.randomUUID();
64+
CancellationReasonWorkflow targetWorkflow =
65+
testWorkflowRule
66+
.getWorkflowClient()
67+
.newWorkflowStub(
68+
CancellationReasonWorkflow.class,
69+
WorkflowOptions.newBuilder()
70+
.setWorkflowId(targetWorkflowId)
71+
.setTaskQueue(testWorkflowRule.getTaskQueue())
72+
.build());
73+
74+
WorkflowClient.start(targetWorkflow::execute);
75+
76+
CancelExternalWorkflow canceller =
77+
testWorkflowRule.newWorkflowStub(CancelExternalWorkflow.class);
78+
String reason = "command-cancel-reason";
79+
WorkflowClient.start(canceller::execute, targetWorkflowId, reason);
80+
81+
WorkflowStub targetStub = WorkflowStub.fromTyped(targetWorkflow);
82+
WorkflowFailedException exception =
83+
assertThrows(WorkflowFailedException.class, () -> targetStub.getResult(String.class));
84+
assertEquals(CanceledFailure.class, exception.getCause().getClass());
85+
86+
assertEquals(expectedResult(reason), targetWorkflow.getRecordedReason());
87+
}
88+
89+
@Test
90+
public void testCancellationReasonAbsent() {
91+
String workflowId = "client-cancel-null-" + UUID.randomUUID();
92+
CancellationReasonWorkflow workflow =
93+
testWorkflowRule
94+
.getWorkflowClient()
95+
.newWorkflowStub(
96+
CancellationReasonWorkflow.class,
97+
WorkflowOptions.newBuilder()
98+
.setWorkflowId(workflowId)
99+
.setTaskQueue(testWorkflowRule.getTaskQueue())
100+
.build());
101+
102+
WorkflowClient.start(workflow::execute);
103+
104+
WorkflowStub stub = WorkflowStub.fromTyped(workflow);
105+
stub.cancel();
106+
107+
WorkflowFailedException exception =
108+
assertThrows(WorkflowFailedException.class, () -> stub.getResult(String.class));
109+
assertEquals(CanceledFailure.class, exception.getCause().getClass());
110+
111+
assertEquals(expectedResult(""), workflow.getRecordedReason());
112+
}
113+
114+
@Test
115+
public void testCancellationReasonDerivedFromContext() {
116+
String targetWorkflowId = "context-derived-" + UUID.randomUUID();
117+
CancellationReasonWorkflow targetWorkflow =
118+
testWorkflowRule
119+
.getWorkflowClient()
120+
.newWorkflowStub(
121+
CancellationReasonWorkflow.class,
122+
WorkflowOptions.newBuilder()
123+
.setWorkflowId(targetWorkflowId)
124+
.setTaskQueue(testWorkflowRule.getTaskQueue())
125+
.build());
126+
127+
WorkflowClient.start(targetWorkflow::execute);
128+
129+
CascadingCancelWorkflow cascaderWorkflow =
130+
testWorkflowRule.newWorkflowStub(CascadingCancelWorkflow.class);
131+
132+
WorkflowClient.start(cascaderWorkflow::execute, targetWorkflowId);
133+
134+
WorkflowStub cascaderStub = WorkflowStub.fromTyped(cascaderWorkflow);
135+
String reason = "context-derived-reason";
136+
cascaderStub.cancel(reason);
137+
138+
WorkflowStub targetStub = WorkflowStub.fromTyped(targetWorkflow);
139+
WorkflowFailedException exception =
140+
assertThrows(WorkflowFailedException.class, () -> targetStub.getResult(String.class));
141+
assertEquals(CanceledFailure.class, exception.getCause().getClass());
142+
143+
assertEquals(expectedResult(reason), targetWorkflow.getRecordedReason());
144+
}
145+
146+
private static String expectedResult(String reason) {
147+
String part = String.valueOf(reason);
148+
return part + "|" + part;
149+
}
150+
151+
@WorkflowInterface
152+
public interface CancellationReasonWorkflow {
153+
@WorkflowMethod
154+
String execute();
155+
156+
@QueryMethod
157+
String getRecordedReason();
158+
}
159+
160+
public static class CancellationAwareWorkflow implements CancellationReasonWorkflow {
161+
162+
private String recordedReason;
163+
164+
@Override
165+
public String execute() {
166+
Promise<String> cancellationRequest = CancellationScope.current().getCancellationRequest();
167+
String requestedReason = cancellationRequest.get();
168+
String scopeReason = CancellationScope.current().getCancellationReason();
169+
recordedReason = expectedResultFromWorkflow(requestedReason, scopeReason);
170+
System.out.println(recordedReason);
171+
return recordedReason;
172+
}
173+
174+
@Override
175+
public String getRecordedReason() {
176+
return recordedReason;
177+
}
178+
179+
private String expectedResultFromWorkflow(String requestedReason, String scopeReason) {
180+
return requestedReason + "|" + scopeReason;
181+
}
182+
}
183+
184+
@WorkflowInterface
185+
public interface CancelExternalWorkflow {
186+
@WorkflowMethod
187+
void execute(String workflowId, String reason);
188+
}
189+
190+
public static class CancelExternalWorkflowImpl implements CancelExternalWorkflow {
191+
192+
@Override
193+
public void execute(String workflowId, String reason) {
194+
ExternalWorkflowStub externalWorkflow = Workflow.newUntypedExternalWorkflowStub(workflowId);
195+
externalWorkflow.cancel(reason);
196+
}
197+
}
198+
199+
@WorkflowInterface
200+
public interface CascadingCancelWorkflow {
201+
@WorkflowMethod
202+
void execute(String workflowId);
203+
}
204+
205+
public static class CascadingCancelWorkflowImpl implements CascadingCancelWorkflow {
206+
207+
@Override
208+
public void execute(String workflowId) {
209+
ExternalWorkflowStub externalWorkflow = Workflow.newUntypedExternalWorkflowStub(workflowId);
210+
CancellationScope.current().getCancellationRequest().get();
211+
externalWorkflow.cancel();
212+
}
213+
}
214+
}

temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -917,6 +917,7 @@ private void processRequestCancelExternalWorkflowExecution(
917917
.setWorkflowExecution(
918918
WorkflowExecution.newBuilder().setWorkflowId(attr.getWorkflowId()))
919919
.setNamespace(ctx.getNamespace())
920+
.setReason(attr.getReason())
920921
.build();
921922
CancelExternalWorkflowExecutionCallerInfo info =
922923
new CancelExternalWorkflowExecutionCallerInfo(

0 commit comments

Comments
 (0)