Skip to content

Commit 3409ea7

Browse files
authored
Support consuming N message in a single command (#198)
Co-authored-by: Kaili Zhu <kzhu@indeed.com>
1 parent 757eed4 commit 3409ea7

12 files changed

+477
-29
lines changed

src/main/java/io/iworkflow/core/command/CommandRequest.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ public abstract class CommandRequest {
2727
* @return the command request
2828
*/
2929
public static CommandRequest forAllCommandCompleted(final BaseCommand... commands) {
30-
return ImmutableCommandRequest.builder().addAllCommands(Arrays.asList(commands)).commandWaitingType(CommandWaitingType.ALL_COMPLETED).build();
30+
final List<BaseCommand> allSingleCommands = getAllSingleCommands(commands);
31+
return ImmutableCommandRequest.builder().addAllCommands(allSingleCommands).commandWaitingType(CommandWaitingType.ALL_COMPLETED).build();
3132
}
3233

3334
/**
@@ -50,12 +51,32 @@ public static CommandRequest forAnyCommandCompleted(final BaseCommand... command
5051
* @return the command request
5152
*/
5253
public static CommandRequest forAnyCommandCombinationCompleted(final List<List<String>> commandCombinationLists, final BaseCommand... commands) {
54+
final List<BaseCommand> allSingleCommands = getAllSingleCommands(commands);
55+
5356
final List<CommandCombination> combinations = new ArrayList<>();
5457
commandCombinationLists.forEach(commandIds -> combinations.add(new CommandCombination().commandIds(commandIds)));
5558
return ImmutableCommandRequest.builder()
5659
.commandCombinations(combinations)
57-
.addAllCommands(Arrays.asList(commands))
60+
.addAllCommands(allSingleCommands)
5861
.commandWaitingType(CommandWaitingType.ANY_COMBINATION_COMPLETED)
5962
.build();
6063
}
64+
65+
private static List<BaseCommand> getAllSingleCommands(final BaseCommand... commands) {
66+
final ArrayList<BaseCommand> allSingleCommands = new ArrayList<>();
67+
Arrays.stream(commands).forEach(
68+
command -> {
69+
if (command instanceof SuperCommand) {
70+
allSingleCommands.addAll(
71+
SuperCommand.toList((SuperCommand) command)
72+
);
73+
return;
74+
}
75+
76+
allSingleCommands.add(command);
77+
}
78+
);
79+
80+
return allSingleCommands;
81+
}
6182
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package io.iworkflow.core.command;
2+
3+
import io.iworkflow.core.communication.InternalChannelCommand;
4+
import io.iworkflow.core.communication.SignalCommand;
5+
import org.immutables.value.Value;
6+
7+
import java.util.ArrayList;
8+
import java.util.List;
9+
10+
@Value.Immutable
11+
public abstract class SuperCommand implements BaseCommand {
12+
13+
public enum Type {
14+
SIGNAL_CHANNEL,
15+
INTERNAL_CHANNEL,
16+
}
17+
18+
public abstract String getName();
19+
public abstract int getCount();
20+
public abstract Type getType();
21+
22+
public static List<BaseCommand> toList(final SuperCommand superCommand) {
23+
final List<BaseCommand> commands = new ArrayList<>();
24+
25+
final boolean hasCommandId = superCommand.getCommandId().isPresent();
26+
for (int i = 0; i < superCommand.getCount(); i ++) {
27+
final BaseCommand command;
28+
if (superCommand.getType() == Type.INTERNAL_CHANNEL) {
29+
if (hasCommandId) {
30+
command = InternalChannelCommand.create(superCommand.getCommandId().get(), superCommand.getName());
31+
} else {
32+
command = InternalChannelCommand.create(superCommand.getName());
33+
}
34+
} else {
35+
if (hasCommandId) {
36+
command = SignalCommand.create(superCommand.getCommandId().get(), superCommand.getName());
37+
} else {
38+
command = SignalCommand.create(superCommand.getName());
39+
}
40+
}
41+
42+
commands.add(command);
43+
}
44+
45+
return commands;
46+
}
47+
}

src/main/java/io/iworkflow/core/communication/InternalChannelCommand.java

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,68 @@
11
package io.iworkflow.core.communication;
22

33
import io.iworkflow.core.command.BaseCommand;
4+
import io.iworkflow.core.command.ImmutableSuperCommand;
5+
import io.iworkflow.core.command.SuperCommand;
46
import org.immutables.value.Value;
57

68
@Value.Immutable
79
public abstract class InternalChannelCommand implements BaseCommand {
810

911
public abstract String getChannelName();
1012

13+
/**
14+
* Create a super command that represents one or many internal channel commands.
15+
*
16+
* @param commandId required. All the internal channel commands created here will share the same commandId.
17+
* @param channelName required.
18+
* @param count required. It represents the number of internal channel commands to create.
19+
* @return super command
20+
*/
21+
public static SuperCommand create(final String commandId, final String channelName, final int count) {
22+
return ImmutableSuperCommand.builder()
23+
.commandId(commandId)
24+
.name(channelName)
25+
.count(Math.max(1, count))
26+
.type(SuperCommand.Type.INTERNAL_CHANNEL)
27+
.build();
28+
}
29+
30+
/**
31+
* Create one internal channel command.
32+
*
33+
* @param commandId required.
34+
* @param channelName required.
35+
* @return internal channel command
36+
*/
1137
public static InternalChannelCommand create(final String commandId, final String channelName) {
1238
return ImmutableInternalChannelCommand.builder()
13-
.channelName(channelName)
1439
.commandId(commandId)
40+
.channelName(channelName)
41+
.build();
42+
}
43+
44+
/**
45+
* Create a super command that represents one or many internal channel commands.
46+
*
47+
* @param channelName required.
48+
* @param count required. It represents the number of internal channel commands to create.
49+
* @return super command
50+
*/
51+
public static SuperCommand create(final String channelName, final int count) {
52+
return ImmutableSuperCommand.builder()
53+
.name(channelName)
54+
.count(Math.max(1, count))
55+
.type(SuperCommand.Type.INTERNAL_CHANNEL)
1556
.build();
1657
}
1758

18-
public static InternalChannelCommand create(String channelName) {
59+
/**
60+
* Create one internal channel command.
61+
*
62+
* @param channelName required.
63+
* @return internal channel command
64+
*/
65+
public static InternalChannelCommand create(final String channelName) {
1966
return ImmutableInternalChannelCommand.builder()
2067
.channelName(channelName)
2168
.build();

src/main/java/io/iworkflow/core/communication/SignalCommand.java

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,68 @@
11
package io.iworkflow.core.communication;
22

33
import io.iworkflow.core.command.BaseCommand;
4+
import io.iworkflow.core.command.ImmutableSuperCommand;
5+
import io.iworkflow.core.command.SuperCommand;
46
import org.immutables.value.Value;
57

68
@Value.Immutable
79
public abstract class SignalCommand implements BaseCommand {
810

911
public abstract String getSignalChannelName();
1012

11-
public static SignalCommand create(final String commandId, final String channelName) {
13+
/**
14+
* Create a super command that represents one or many signal commands.
15+
*
16+
* @param commandId required. All the signal commands created here will share the same commandId.
17+
* @param signalName required.
18+
* @param count required. It represents the number of signal commands to create.
19+
* @return super command
20+
*/
21+
public static SuperCommand create(final String commandId, final String signalName, final int count) {
22+
return ImmutableSuperCommand.builder()
23+
.commandId(commandId)
24+
.name(signalName)
25+
.count(Math.max(1, count))
26+
.type(SuperCommand.Type.SIGNAL_CHANNEL)
27+
.build();
28+
}
29+
30+
/**
31+
* Create one signal command.
32+
*
33+
* @param commandId required.
34+
* @param signalName required.
35+
* @return signal command
36+
*/
37+
public static SignalCommand create(final String commandId, final String signalName) {
1238
return ImmutableSignalCommand.builder()
13-
.signalChannelName(channelName)
1439
.commandId(commandId)
40+
.signalChannelName(signalName)
41+
.build();
42+
}
43+
44+
/**
45+
* Create a super command that represents one or many signal commands.
46+
*
47+
* @param signalName required.
48+
* @param count required. It represents the number of signal commands to create.
49+
* @return super command
50+
*/
51+
public static SuperCommand create(final String signalName, final int count) {
52+
return ImmutableSuperCommand.builder()
53+
.name(signalName)
54+
.count(Math.max(1, count))
55+
.type(SuperCommand.Type.SIGNAL_CHANNEL)
1556
.build();
1657
}
1758

18-
public static SignalCommand create(String signalName) {
59+
/**
60+
* Create one signal command.
61+
*
62+
* @param signalName required.
63+
* @return signal command
64+
*/
65+
public static SignalCommand create(final String signalName) {
1966
return ImmutableSignalCommand.builder()
2067
.signalChannelName(signalName)
2168
.build();

src/test/java/io/iworkflow/integ/InternalChannelTest.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22

33
import io.iworkflow.core.Client;
44
import io.iworkflow.core.ClientOptions;
5-
import io.iworkflow.integ.interstatechannel.BasicInterStateChannelWorkflow;
5+
import io.iworkflow.integ.internalchannel.BasicInternalChannelWorkflow;
6+
import io.iworkflow.integ.internalchannel.MultipleSameInternalChannelWorkflow;
67
import io.iworkflow.spring.TestSingletonWorkerService;
78
import io.iworkflow.spring.controller.WorkflowRegistry;
89
import org.junit.jupiter.api.Assertions;
@@ -19,13 +20,24 @@ public void setup() throws ExecutionException, InterruptedException {
1920
}
2021

2122
@Test
22-
public void testBasicInterStateWorkflow() throws InterruptedException {
23+
public void testBasicInternalWorkflow() throws InterruptedException {
2324
final Client client = new Client(WorkflowRegistry.registry, ClientOptions.localDefault);
24-
final String wfId = "basic-inter-state-test-id" + System.currentTimeMillis() / 1000;
25+
final String wfId = "basic-internal-test-id" + System.currentTimeMillis() / 1000;
2526
final Integer input = 1;
2627
final String runId = client.startWorkflow(
27-
BasicInterStateChannelWorkflow.class, wfId, 10, input);
28+
BasicInternalChannelWorkflow.class, wfId, 10, input);
2829
final Integer output = client.getSimpleWorkflowResultWithWait(Integer.class, wfId);
2930
Assertions.assertEquals(3, output);
3031
}
32+
33+
@Test
34+
public void testMultipleSameInternalWorkflow() throws InterruptedException {
35+
final Client client = new Client(WorkflowRegistry.registry, ClientOptions.localDefault);
36+
final String wfId = "multiple-same-internal-test-id" + System.currentTimeMillis() / 1000;
37+
final Integer input = 1;
38+
final String runId = client.startWorkflow(
39+
MultipleSameInternalChannelWorkflow.class, wfId, 10, input);
40+
final Integer output = client.getSimpleWorkflowResultWithWait(Integer.class, wfId);
41+
Assertions.assertEquals(5, output);
42+
}
3143
}

src/test/java/io/iworkflow/integ/SignalTest.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
import io.iworkflow.gen.models.ErrorSubStatus;
77
import io.iworkflow.integ.signal.BasicSignalWorkflow;
88
import io.iworkflow.integ.signal.BasicSignalWorkflowState2;
9+
import io.iworkflow.integ.signal.MultipleSameSignalWorkflow;
10+
import static io.iworkflow.integ.signal.MultipleSameSignalWorkflow.SIGNAL_NAME_1;
911
import io.iworkflow.spring.TestSingletonWorkerService;
1012
import io.iworkflow.spring.controller.WorkflowRegistry;
1113
import org.junit.jupiter.api.Assertions;
@@ -67,4 +69,21 @@ public void testBasicSignalWorkflow() throws InterruptedException {
6769
}
6870
Assertions.fail("signal closed workflow should fail");
6971
}
72+
73+
@Test
74+
public void testMultipleSameSignalWorkflow() {
75+
final Client client = new Client(WorkflowRegistry.registry, ClientOptions.localDefault);
76+
final String wfId = "multiple-same-signal-test-id" + System.currentTimeMillis() / 1000;
77+
final Integer input = 1;
78+
final String runId = client.startWorkflow(
79+
MultipleSameSignalWorkflow.class, wfId, 10, input);
80+
client.signalWorkflow(
81+
MultipleSameSignalWorkflow.class, wfId, runId, SIGNAL_NAME_1, Integer.valueOf(2));
82+
83+
client.signalWorkflow(
84+
MultipleSameSignalWorkflow.class, wfId, runId, SIGNAL_NAME_1, Integer.valueOf(3));
85+
86+
final Integer output = client.getSimpleWorkflowResultWithWait(Integer.class, wfId);
87+
Assertions.assertEquals(5, output);
88+
}
7089
}
Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package io.iworkflow.integ.interstatechannel;
1+
package io.iworkflow.integ.internalchannel;
22

33
import io.iworkflow.core.ObjectWorkflow;
44
import io.iworkflow.core.StateDef;
@@ -10,7 +10,7 @@
1010
import java.util.List;
1111

1212
@Component
13-
public class BasicInterStateChannelWorkflow implements ObjectWorkflow {
13+
public class BasicInternalChannelWorkflow implements ObjectWorkflow {
1414
public static final String INTER_STATE_CHANNEL_NAME_1 = "test-inter-state-channel-1";
1515

1616
public static final String INTER_STATE_CHANNEL_NAME_2 = "test-inter-state-channel-2";
@@ -28,9 +28,9 @@ public List<CommunicationMethodDef> getCommunicationSchema() {
2828
@Override
2929
public List<StateDef> getWorkflowStates() {
3030
return Arrays.asList(
31-
StateDef.startingState(new BasicInterStateChannelWorkflowState0()),
32-
StateDef.nonStartingState(new BasicInterStateChannelWorkflowState1()),
33-
StateDef.nonStartingState(new BasicInterStateChannelWorkflowState2())
31+
StateDef.startingState(new BasicInternalChannelWorkflowState0()),
32+
StateDef.nonStartingState(new BasicInternalChannelWorkflowState1()),
33+
StateDef.nonStartingState(new BasicInternalChannelWorkflowState2())
3434
);
3535
}
3636
}
Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package io.iworkflow.integ.interstatechannel;
1+
package io.iworkflow.integ.internalchannel;
22

33
import io.iworkflow.core.Context;
44
import io.iworkflow.core.StateDecision;
@@ -9,7 +9,7 @@
99
import io.iworkflow.core.communication.Communication;
1010
import io.iworkflow.core.persistence.Persistence;
1111

12-
public class BasicInterStateChannelWorkflowState0 implements WorkflowState<Integer> {
12+
public class BasicInternalChannelWorkflowState0 implements WorkflowState<Integer> {
1313

1414
@Override
1515
public Class<Integer> getInputType() {
@@ -31,8 +31,8 @@ public StateDecision execute(
3131
CommandResults commandResults,
3232
Persistence persistence, final Communication communication) {
3333
return StateDecision.multiNextStates(
34-
StateMovement.create(BasicInterStateChannelWorkflowState1.class, input),
35-
StateMovement.create(BasicInterStateChannelWorkflowState2.class, input)
34+
StateMovement.create(BasicInternalChannelWorkflowState1.class, input),
35+
StateMovement.create(BasicInternalChannelWorkflowState2.class, input)
3636
);
3737
}
3838
}
Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package io.iworkflow.integ.interstatechannel;
1+
package io.iworkflow.integ.internalchannel;
22

33
import io.iworkflow.core.Context;
44
import io.iworkflow.core.StateDecision;
@@ -13,7 +13,7 @@
1313

1414
import java.util.Arrays;
1515

16-
public class BasicInterStateChannelWorkflowState1 implements WorkflowState<Integer> {
16+
public class BasicInternalChannelWorkflowState1 implements WorkflowState<Integer> {
1717
public static final String COMMAND_ID = "test-cmd-id";
1818
public static final String COMMAND_ID_2 = "test-cmd-id-2";
1919

@@ -32,9 +32,9 @@ public CommandRequest waitUntil(
3232
Arrays.asList(
3333
Arrays.asList(COMMAND_ID, COMMAND_ID_2)
3434
),
35-
InternalChannelCommand.create(COMMAND_ID, BasicInterStateChannelWorkflow.INTER_STATE_CHANNEL_NAME_1),
36-
InternalChannelCommand.create(COMMAND_ID, BasicInterStateChannelWorkflow.INTER_STATE_CHANNEL_NAME_2),
37-
InternalChannelCommand.create(COMMAND_ID_2, BasicInterStateChannelWorkflow.INTER_STATE_CHANNEL_PREFIX_1 + "1")
35+
InternalChannelCommand.create(COMMAND_ID, BasicInternalChannelWorkflow.INTER_STATE_CHANNEL_NAME_1),
36+
InternalChannelCommand.create(COMMAND_ID, BasicInternalChannelWorkflow.INTER_STATE_CHANNEL_NAME_2),
37+
InternalChannelCommand.create(COMMAND_ID_2, BasicInternalChannelWorkflow.INTER_STATE_CHANNEL_PREFIX_1 + "1")
3838
);
3939
}
4040

0 commit comments

Comments
 (0)