Skip to content

Commit b12ee45

Browse files
Add support for FirstExecutionRunId to cancel and terminate (#2692)
Add support for FirstExecutionRunId to cancel and terminate
1 parent c46eb46 commit b12ee45

File tree

4 files changed

+426
-7
lines changed

4 files changed

+426
-7
lines changed

temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableState.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ interface TestWorkflowMutableState {
2626

2727
ExecutionId getExecutionId();
2828

29+
String getFirstExecutionRunId();
30+
2931
WorkflowExecutionStatus getWorkflowExecutionStatus();
3032

3133
StartWorkflowExecutionRequest getStartRequest();

temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,11 @@ public ExecutionId getExecutionId() {
373373
return executionId;
374374
}
375375

376+
@Override
377+
public String getFirstExecutionRunId() {
378+
return workflow.getData().firstExecutionRunId;
379+
}
380+
376381
@Override
377382
public WorkflowExecutionStatus getWorkflowExecutionStatus() {
378383
switch (workflow.getState()) {

temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java

Lines changed: 98 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -69,14 +69,15 @@
6969
public final class TestWorkflowService extends WorkflowServiceGrpc.WorkflowServiceImplBase
7070
implements Closeable {
7171
private static final Logger log = LoggerFactory.getLogger(TestWorkflowService.class);
72-
private static final JsonFormat.Printer JSON_PRINTER = JsonFormat.printer();
7372
private static final JsonFormat.Parser JSON_PARSER = JsonFormat.parser();
7473

7574
private static final String FAILURE_TYPE_STRING = Failure.getDescriptor().getFullName();
7675

7776
private final Map<ExecutionId, TestWorkflowMutableState> executions = new HashMap<>();
7877
// key->WorkflowId
7978
private final Map<WorkflowId, TestWorkflowMutableState> executionsByWorkflowId = new HashMap<>();
79+
private final Map<WorkflowChainId, TestWorkflowMutableState> executionsByFirstExecutionRunId =
80+
new HashMap<>();
8081
private final ExecutorService executor = Executors.newCachedThreadPool();
8182
private final Lock lock = new ReentrantLock();
8283

@@ -166,6 +167,52 @@ private TestWorkflowMutableState getMutableState(ExecutionId executionId) {
166167
return getMutableState(executionId, true);
167168
}
168169

170+
private static final class WorkflowChainId {
171+
private final String namespace;
172+
private final String workflowId;
173+
private final String firstExecutionRunId;
174+
175+
private WorkflowChainId(String namespace, String workflowId, String firstExecutionRunId) {
176+
this.namespace = Objects.requireNonNull(namespace);
177+
this.workflowId = Objects.requireNonNull(workflowId);
178+
this.firstExecutionRunId = Objects.requireNonNull(firstExecutionRunId);
179+
}
180+
181+
@Override
182+
public boolean equals(Object o) {
183+
if (this == o) {
184+
return true;
185+
}
186+
if (o == null || getClass() != o.getClass()) {
187+
return false;
188+
}
189+
WorkflowChainId that = (WorkflowChainId) o;
190+
return namespace.equals(that.namespace)
191+
&& workflowId.equals(that.workflowId)
192+
&& firstExecutionRunId.equals(that.firstExecutionRunId);
193+
}
194+
195+
@Override
196+
public int hashCode() {
197+
return Objects.hash(namespace, workflowId, firstExecutionRunId);
198+
}
199+
200+
@Override
201+
public String toString() {
202+
return "WorkflowChainId{"
203+
+ "namespace='"
204+
+ namespace
205+
+ '\''
206+
+ ", workflowId='"
207+
+ workflowId
208+
+ '\''
209+
+ ", firstExecutionRunId='"
210+
+ firstExecutionRunId
211+
+ '\''
212+
+ '}';
213+
}
214+
}
215+
169216
private TestWorkflowMutableState getMutableState(ExecutionId executionId, boolean failNotExists) {
170217
lock.lock();
171218
try {
@@ -205,6 +252,39 @@ private TestWorkflowMutableState getMutableState(WorkflowId workflowId, boolean
205252
}
206253
}
207254

255+
private TestWorkflowMutableState getMutableState(
256+
WorkflowChainId workflowChainId, boolean failNotExists) {
257+
lock.lock();
258+
try {
259+
TestWorkflowMutableState mutableState = executionsByFirstExecutionRunId.get(workflowChainId);
260+
if (mutableState == null && failNotExists) {
261+
throw Status.NOT_FOUND
262+
.withDescription("Execution not found in mutable state: " + workflowChainId)
263+
.asRuntimeException();
264+
}
265+
return mutableState;
266+
} finally {
267+
lock.unlock();
268+
}
269+
}
270+
271+
private TestWorkflowMutableState getMutableState(
272+
String namespace,
273+
WorkflowExecution execution,
274+
String firstExecutionRunId,
275+
boolean failNotExists) {
276+
ExecutionId executionId = new ExecutionId(namespace, execution);
277+
WorkflowChainId workflowChainId =
278+
firstExecutionRunId.isEmpty()
279+
? null
280+
: new WorkflowChainId(namespace, execution.getWorkflowId(), firstExecutionRunId);
281+
282+
if (workflowChainId != null) {
283+
return getMutableState(workflowChainId, failNotExists);
284+
}
285+
return getMutableState(executionId, failNotExists);
286+
}
287+
208288
@Override
209289
public void startWorkflowExecution(
210290
StartWorkflowExecutionRequest request,
@@ -454,6 +534,11 @@ private StartWorkflowExecutionResponse startWorkflowExecutionNoRunningCheckLocke
454534
WorkflowExecution execution = mutableState.getExecutionId().getExecution();
455535
ExecutionId executionId = new ExecutionId(namespace, execution);
456536
executionsByWorkflowId.put(workflowId, mutableState);
537+
if (!firstExecutionRunId.isEmpty()) {
538+
executionsByFirstExecutionRunId.put(
539+
new WorkflowChainId(namespace, workflowId.getWorkflowId(), firstExecutionRunId),
540+
mutableState);
541+
}
457542
executions.put(executionId, mutableState);
458543

459544
PollWorkflowTaskQueueRequest eagerWorkflowTaskPollRequest =
@@ -1094,9 +1179,12 @@ public void requestCancelWorkflowExecution(
10941179
void requestCancelWorkflowExecution(
10951180
RequestCancelWorkflowExecutionRequest cancelRequest,
10961181
Optional<TestWorkflowMutableStateImpl.CancelExternalWorkflowExecutionCallerInfo> callerInfo) {
1097-
ExecutionId executionId =
1098-
new ExecutionId(cancelRequest.getNamespace(), cancelRequest.getWorkflowExecution());
1099-
TestWorkflowMutableState mutableState = getMutableState(executionId);
1182+
TestWorkflowMutableState mutableState =
1183+
getMutableState(
1184+
cancelRequest.getNamespace(),
1185+
cancelRequest.getWorkflowExecution(),
1186+
cancelRequest.getFirstExecutionRunId(),
1187+
true);
11001188
mutableState.requestCancelWorkflowExecution(cancelRequest, callerInfo);
11011189
}
11021190

@@ -1114,9 +1202,12 @@ public void terminateWorkflowExecution(
11141202
}
11151203

11161204
private void terminateWorkflowExecution(TerminateWorkflowExecutionRequest request) {
1117-
ExecutionId executionId =
1118-
new ExecutionId(request.getNamespace(), request.getWorkflowExecution());
1119-
TestWorkflowMutableState mutableState = getMutableState(executionId);
1205+
TestWorkflowMutableState mutableState =
1206+
getMutableState(
1207+
request.getNamespace(),
1208+
request.getWorkflowExecution(),
1209+
request.getFirstExecutionRunId(),
1210+
true);
11201211
mutableState.terminateWorkflowExecution(request);
11211212
}
11221213

0 commit comments

Comments
 (0)