Skip to content

Commit 4fb1d17

Browse files
Support conditional complete on signal or internal empty (#178)
1 parent 75c73c8 commit 4fb1d17

File tree

5 files changed

+307
-15
lines changed

5 files changed

+307
-15
lines changed
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package io.iworkflow.core;
2+
3+
import io.iworkflow.gen.models.WorkflowConditionalCloseType;
4+
import org.immutables.value.Value;
5+
6+
import java.util.Optional;
7+
8+
@Value.Immutable
9+
public abstract class InternalConditionalClose {
10+
public abstract WorkflowConditionalCloseType getWorkflowConditionalCloseType();
11+
12+
public abstract String getChannelName();
13+
14+
public abstract Optional<Object> getCloseInput();
15+
}

src/main/java/io/iworkflow/core/StateDecision.java

Lines changed: 94 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,22 @@
11
package io.iworkflow.core;
22

3+
import io.iworkflow.gen.models.WorkflowConditionalCloseType;
34
import org.immutables.value.Value;
45

56
import java.util.ArrayList;
67
import java.util.Arrays;
78
import java.util.List;
9+
import java.util.Optional;
810

911
@Value.Immutable
1012
public abstract class StateDecision {
1113

14+
public abstract Optional<InternalConditionalClose> getWorkflowConditionalClose();
15+
1216
public abstract List<StateMovement> getNextStates();
1317

1418
// a dead end will just complete its thread, without triggering any closing workflow
15-
public static StateDecision deadEnd(){
19+
public static StateDecision deadEnd() {
1620
return ImmutableStateDecision.builder()
1721
.nextStates(Arrays.asList(StateMovement.DEAD_END_WORKFLOW_MOVEMENT))
1822
.build();
@@ -22,9 +26,9 @@ public static ImmutableStateDecision.Builder builder() {
2226
return ImmutableStateDecision.builder();
2327
}
2428

25-
public static StateDecision gracefulCompleteWorkflow(final Object output) {
29+
public static StateDecision gracefulCompleteWorkflow(final Object completionOutput) {
2630
return ImmutableStateDecision.builder().nextStates(Arrays.asList(
27-
StateMovement.gracefulCompleteWorkflow(output)
31+
StateMovement.gracefulCompleteWorkflow(completionOutput)
2832
)).build();
2933
}
3034

@@ -34,9 +38,9 @@ public static StateDecision gracefulCompleteWorkflow() {
3438
)).build();
3539
}
3640

37-
public static StateDecision forceCompleteWorkflow(final Object output) {
41+
public static StateDecision forceCompleteWorkflow(final Object completionOutput) {
3842
return ImmutableStateDecision.builder().nextStates(Arrays.asList(
39-
StateMovement.forceCompleteWorkflow(output)
43+
StateMovement.forceCompleteWorkflow(completionOutput)
4044
)).build();
4145
}
4246

@@ -46,9 +50,9 @@ public static StateDecision forceCompleteWorkflow() {
4650
)).build();
4751
}
4852

49-
public static StateDecision forceFailWorkflow(final Object output) {
53+
public static StateDecision forceFailWorkflow(final Object completionOutput) {
5054
return ImmutableStateDecision.builder().nextStates(Arrays.asList(
51-
StateMovement.forceFailWorkflow(output)
55+
StateMovement.forceFailWorkflow(completionOutput)
5256
)).build();
5357
}
5458

@@ -58,6 +62,89 @@ public static StateDecision forceFailWorkflow() {
5862
.build();
5963
}
6064

65+
66+
public static StateDecision forceCompleteIfInternalChannelEmptyOrElse(final String internalChannelName, final Class<? extends WorkflowState> orElseStateClass) {
67+
return forceCompleteIfInternalChannelEmptyOrElse(internalChannelName, orElseStateClass, null);
68+
}
69+
70+
public static StateDecision forceCompleteIfInternalChannelEmptyOrElse(final String internalChannelName, final Class<? extends WorkflowState> orElseStateClass, final Object stateInput) {
71+
return forceCompleteIfInternalChannelEmptyOrElse(null, internalChannelName, StateMovement.create(orElseStateClass, stateInput));
72+
}
73+
74+
public static StateDecision forceCompleteIfInternalChannelEmptyOrElse(final Object completionOutput, final String internalChannelName, final Class<? extends WorkflowState> orElseStateClass) {
75+
return forceCompleteIfInternalChannelEmptyOrElse(completionOutput, internalChannelName, orElseStateClass, null);
76+
}
77+
78+
public static StateDecision forceCompleteIfInternalChannelEmptyOrElse(final Object completionOutput, final String internalChannelName, final Class<? extends WorkflowState> orElseStateClass, final Object stateInput) {
79+
return forceCompleteIfInternalChannelEmptyOrElse(completionOutput, internalChannelName, StateMovement.create(orElseStateClass, stateInput));
80+
}
81+
82+
/**
83+
* Atomically force complete the workflow if internal channel is empty, otherwise trigger the state movements from the current thread
84+
* This is important for use case that needs to ensure all the messages in the channel are processed before completing the workflow, otherwise messages will be lost.
85+
* Without this atomic API, if user just check the channel emptiness in the State APIs, the channel may receive new messages during the execution of state APIs
86+
* <br>
87+
* Note that today this doesn't cover the case that internal messages are published from other State APIs yet. It's only for internal messages published from RPCs.
88+
* If you do want to use other State APIs to publish messages to the channel at the same time, you can use persistence locking to ensure only the State APIs are not executed
89+
* in parallel. See more in TODO https://github.com/indeedeng/iwf/issues/289
90+
*
91+
* @param completionOutput the output of workflow completion
92+
* @param internalChannelName the internal channel name for checking emptiness
93+
* @param orElseStateMovements the state movements if channel is not empty
94+
* @return the decision
95+
*/
96+
public static StateDecision forceCompleteIfInternalChannelEmptyOrElse(final Object completionOutput, final String internalChannelName, final StateMovement... orElseStateMovements) {
97+
return ImmutableStateDecision.builder()
98+
.workflowConditionalClose(
99+
ImmutableInternalConditionalClose.builder()
100+
.workflowConditionalCloseType(WorkflowConditionalCloseType.FORCE_COMPLETE_ON_INTERNAL_CHANNEL_EMPTY)
101+
.channelName(internalChannelName)
102+
.closeInput(Optional.ofNullable(completionOutput))
103+
.build()
104+
)
105+
.nextStates(Arrays.asList(orElseStateMovements))
106+
.build();
107+
}
108+
109+
public static StateDecision forceCompleteIfSignalChannelEmptyOrElse(final String signalChannelName, final Class<? extends WorkflowState> orElseStateClass) {
110+
return forceCompleteIfSignalChannelEmptyOrElse(signalChannelName, orElseStateClass, null);
111+
}
112+
113+
public static StateDecision forceCompleteIfSignalChannelEmptyOrElse(final String signalChannelName, final Class<? extends WorkflowState> orElseStateClass, final Object stateInput) {
114+
return forceCompleteIfSignalChannelEmptyOrElse(null, signalChannelName, StateMovement.create(orElseStateClass, stateInput));
115+
}
116+
117+
public static StateDecision forceCompleteIfSignalChannelEmptyOrElse(final Object completionOutput, final String signalChannelName, final Class<? extends WorkflowState> orElseStateClass) {
118+
return forceCompleteIfSignalChannelEmptyOrElse(completionOutput, signalChannelName, orElseStateClass, null);
119+
}
120+
121+
public static StateDecision forceCompleteIfSignalChannelEmptyOrElse(final Object completionOutput, final String signalChannelName, final Class<? extends WorkflowState> orElseStateClass, final Object stateInput) {
122+
return forceCompleteIfSignalChannelEmptyOrElse(completionOutput, signalChannelName, StateMovement.create(orElseStateClass, stateInput));
123+
}
124+
125+
/**
126+
* Atomically force complete the workflow if signal channel is empty, otherwise trigger the state movements from the current thread
127+
* This is important for use case that needs to ensure all the messages in the channel are processed before completing the workflow, otherwise messages will be lost.
128+
* Without this atomic API, if user just check the channel emptiness in the State APIs, the channel may receive new messages during the execution of state APIs
129+
*
130+
* @param completionOutput the output of workflow completion
131+
* @param signalChannelName the signal channel name for checking emptiness
132+
* @param orElseStateMovements the state movements if channel is not empty
133+
* @return the decision
134+
*/
135+
public static StateDecision forceCompleteIfSignalChannelEmptyOrElse(final Object completionOutput, final String signalChannelName, final StateMovement... orElseStateMovements) {
136+
return ImmutableStateDecision.builder()
137+
.workflowConditionalClose(
138+
ImmutableInternalConditionalClose.builder()
139+
.workflowConditionalCloseType(WorkflowConditionalCloseType.FORCE_COMPLETE_ON_SIGNAL_CHANNEL_EMPTY)
140+
.channelName(signalChannelName)
141+
.closeInput(Optional.ofNullable(completionOutput))
142+
.build()
143+
)
144+
.nextStates(Arrays.asList(orElseStateMovements))
145+
.build();
146+
}
147+
61148
public static StateDecision singleNextState(final Class<? extends WorkflowState> stateClass) {
62149
return singleNextState(stateClass.getSimpleName());
63150
}
Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,43 @@
11
package io.iworkflow.core.mapper;
22

3+
import io.iworkflow.core.InternalConditionalClose;
34
import io.iworkflow.core.ObjectEncoder;
45
import io.iworkflow.core.Registry;
6+
import io.iworkflow.gen.models.EncodedObject;
57
import io.iworkflow.gen.models.StateDecision;
8+
import io.iworkflow.gen.models.WorkflowConditionalClose;
69

710
import java.util.stream.Collectors;
811

912
public class StateDecisionMapper {
1013
public static StateDecision toGenerated(io.iworkflow.core.StateDecision stateDecision, final String workflowType, final Registry registry, final ObjectEncoder objectEncoder) {
11-
if (stateDecision.getNextStates() == null) {
14+
if (stateDecision.getNextStates() == null && !stateDecision.getWorkflowConditionalClose().isPresent()) {
1215
return null;
1316
}
14-
return new StateDecision()
15-
.nextStates(stateDecision.getNextStates()
16-
.stream()
17-
.map(movement -> {
18-
return StateMovementMapper.toGenerated(movement, workflowType, registry, objectEncoder);
19-
})
20-
.collect(Collectors.toList()));
17+
18+
StateDecision decision = new StateDecision();
19+
20+
if (stateDecision.getNextStates() != null) {
21+
decision.nextStates(
22+
stateDecision.getNextStates()
23+
.stream()
24+
.map(movement -> StateMovementMapper.toGenerated(movement, workflowType, registry, objectEncoder))
25+
.collect(Collectors.toList())
26+
);
27+
}
28+
29+
if (!stateDecision.getWorkflowConditionalClose().isPresent()) {
30+
return decision;
31+
}
32+
33+
InternalConditionalClose conditionalClose = stateDecision.getWorkflowConditionalClose().get();
34+
EncodedObject closeInput = objectEncoder.encode(conditionalClose.getCloseInput());
35+
decision.conditionalClose(
36+
new WorkflowConditionalClose()
37+
.conditionalCloseType(conditionalClose.getWorkflowConditionalCloseType())
38+
.closeInput(closeInput)
39+
.channelName(conditionalClose.getChannelName())
40+
);
41+
return decision;
2142
}
2243
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package io.iworkflow.integ;
2+
3+
import io.iworkflow.core.Client;
4+
import io.iworkflow.core.ClientOptions;
5+
import io.iworkflow.integ.conditional.ConditionalCompleteWorkflow;
6+
import io.iworkflow.spring.TestSingletonWorkerService;
7+
import io.iworkflow.spring.controller.WorkflowRegistry;
8+
import org.junit.jupiter.api.Assertions;
9+
import org.junit.jupiter.api.BeforeEach;
10+
import org.junit.jupiter.api.Test;
11+
12+
import java.util.concurrent.ExecutionException;
13+
14+
public class ConditionalCompleteTest {
15+
16+
@BeforeEach
17+
public void setup() throws ExecutionException, InterruptedException {
18+
TestSingletonWorkerService.startWorkerIfNotUp();
19+
}
20+
21+
@Test
22+
public void testCompleteIfInternalChannelEmpty() throws InterruptedException {
23+
testCompleteIfChannelEmpty(false);
24+
}
25+
26+
@Test
27+
public void testCompleteIfSignalChannelEmpty() throws InterruptedException {
28+
testCompleteIfChannelEmpty(true);
29+
}
30+
31+
public void testCompleteIfChannelEmpty(boolean useSignal) throws InterruptedException {
32+
final Client client = new Client(WorkflowRegistry.registry, ClientOptions.localDefault);
33+
String namePart;
34+
if (useSignal) {
35+
namePart = "Signal";
36+
} else {
37+
namePart = "Internal";
38+
}
39+
final String wfId = "testCompleteIf" + namePart + "ChannelEmpty" + System.currentTimeMillis() / 1000;
40+
final String runId = client.startWorkflow(
41+
ConditionalCompleteWorkflow.class, wfId, 10, useSignal);
42+
43+
Thread.sleep(1000);
44+
45+
for (int i = 0; i < 3; i++) {
46+
if (useSignal) {
47+
client.signalWorkflow(ConditionalCompleteWorkflow.class, wfId, "", ConditionalCompleteWorkflow.SIGNAL_CHANNEL_NAME, null);
48+
} else {
49+
final ConditionalCompleteWorkflow rpcStub = client.newRpcStub(ConditionalCompleteWorkflow.class, wfId, "");
50+
client.invokeRPC(rpcStub::publishToInternalChannel);
51+
}
52+
if (i == 0) {
53+
// wait for a second so that the workflow is in execute state
54+
Thread.sleep(1000);
55+
}
56+
}
57+
58+
final Integer output = client.getSimpleWorkflowResultWithWait(Integer.class, wfId);
59+
Assertions.assertEquals(3, output);
60+
61+
}
62+
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package io.iworkflow.integ.conditional;
2+
3+
import io.iworkflow.core.*;
4+
import io.iworkflow.core.command.CommandRequest;
5+
import io.iworkflow.core.command.CommandResults;
6+
import io.iworkflow.core.communication.*;
7+
import io.iworkflow.core.persistence.DataAttributeDef;
8+
import io.iworkflow.core.persistence.Persistence;
9+
import io.iworkflow.core.persistence.PersistenceFieldDef;
10+
import org.springframework.stereotype.Component;
11+
12+
import java.util.Arrays;
13+
import java.util.List;
14+
15+
16+
@Component
17+
public class ConditionalCompleteWorkflow implements ObjectWorkflow {
18+
19+
public static final String SIGNAL_CHANNEL_NAME = "test-signal-channel";
20+
21+
public static final String INTERNAL_CHANNEL_NAME = "test-internal-channel";
22+
public static final String DA_COUNTER = "counter";
23+
24+
25+
@Override
26+
public List<CommunicationMethodDef> getCommunicationSchema() {
27+
return Arrays.asList(
28+
SignalChannelDef.create(Void.class, SIGNAL_CHANNEL_NAME),
29+
InternalChannelDef.create(Void.class, INTERNAL_CHANNEL_NAME)
30+
);
31+
}
32+
33+
@Override
34+
public List<PersistenceFieldDef> getPersistenceSchema() {
35+
return Arrays.asList(
36+
DataAttributeDef.create(Integer.class, DA_COUNTER)
37+
);
38+
}
39+
40+
@Override
41+
public List<StateDef> getWorkflowStates() {
42+
return Arrays.asList(
43+
StateDef.startingState(new WorkflowState1())
44+
);
45+
}
46+
47+
@RPC
48+
public void publishToInternalChannel(Context context, Persistence persistence, Communication communication) {
49+
communication.publishInternalChannel(INTERNAL_CHANNEL_NAME, null);
50+
}
51+
}
52+
53+
54+
class WorkflowState1 implements WorkflowState<Boolean> {
55+
56+
@Override
57+
public Class<Boolean> getInputType() {
58+
return Boolean.class;
59+
}
60+
61+
@Override
62+
public CommandRequest waitUntil(
63+
Context context,
64+
Boolean useSignal,
65+
Persistence persistence,
66+
final Communication communication) {
67+
68+
if (useSignal) {
69+
return CommandRequest.forAnyCommandCompleted(
70+
SignalCommand.create(ConditionalCompleteWorkflow.SIGNAL_CHANNEL_NAME)
71+
);
72+
} else {
73+
return CommandRequest.forAnyCommandCompleted(
74+
InternalChannelCommand.create(ConditionalCompleteWorkflow.INTERNAL_CHANNEL_NAME)
75+
);
76+
}
77+
}
78+
79+
@Override
80+
public StateDecision execute(
81+
Context context,
82+
Boolean useSignal,
83+
CommandResults commandResults,
84+
Persistence persistence,
85+
final Communication communication) {
86+
Integer counter = persistence.getDataAttribute(ConditionalCompleteWorkflow.DA_COUNTER, Integer.class);
87+
if (counter == null) {
88+
counter = 0;
89+
}
90+
counter++;
91+
persistence.setDataAttribute(ConditionalCompleteWorkflow.DA_COUNTER, counter);
92+
93+
if (context.getStateExecutionId().get().equals("WorkflowState1-1")) {
94+
// wait for 3 seconds so that the channel can have a new message
95+
try {
96+
Thread.sleep(3000);
97+
} catch (InterruptedException e) {
98+
throw new RuntimeException(e);
99+
}
100+
}
101+
if (useSignal) {
102+
return StateDecision.forceCompleteIfSignalChannelEmptyOrElse(counter, ConditionalCompleteWorkflow.SIGNAL_CHANNEL_NAME, WorkflowState1.class, useSignal);
103+
} else {
104+
return StateDecision.forceCompleteIfInternalChannelEmptyOrElse(counter, ConditionalCompleteWorkflow.INTERNAL_CHANNEL_NAME, WorkflowState1.class, useSignal);
105+
}
106+
}
107+
}

0 commit comments

Comments
 (0)