Skip to content

Commit b2c3988

Browse files
authored
Support weak typed signal and internal channel (#192)
Co-authored-by: Kaili Zhu <kzhu@indeed.com>
1 parent 64e37f2 commit b2c3988

17 files changed

+274
-165
lines changed

src/main/java/io/iworkflow/core/Client.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package io.iworkflow.core;
22

33
import io.iworkflow.core.persistence.PersistenceOptions;
4-
import io.iworkflow.core.utils.DataAttributeUtils;
54
import io.iworkflow.gen.models.KeyValue;
65
import io.iworkflow.gen.models.SearchAttribute;
76
import io.iworkflow.gen.models.SearchAttributeKeyAndType;
@@ -339,12 +338,8 @@ public void signalWorkflow(
339338

340339
checkWorkflowTypeExists(wfType);
341340

342-
Map<String, Class<?>> nameToTypeMap = registry.getSignalChannelNameToSignalTypeMap(wfType);
341+
final Class<?> signalType = registry.getSignalChannelTypeStore(wfType).getType(signalChannelName);
343342

344-
if (!nameToTypeMap.containsKey(signalChannelName)) {
345-
throw new IllegalArgumentException(String.format("Workflow %s doesn't have signal %s", wfType, signalChannelName));
346-
}
347-
Class<?> signalType = nameToTypeMap.get(signalChannelName);
348343
if (signalValue != null && !signalType.isInstance(signalValue)) {
349344
throw new IllegalArgumentException(String.format("Signal value is not of type %s", signalType.getName()));
350345
}
@@ -502,13 +497,12 @@ private Map<String, Object> doGetWorkflowDataAttributes(
502497
final String wfType = workflowClass.getSimpleName();
503498
checkWorkflowTypeExists(wfType);
504499

505-
final Map<String, Class<?>> dataAttributeKeyToTypeMap = registry.getDataAttributeKeyToTypeMap(wfType);
506-
final Map<String, Class<?>> dataAttributePrefixToTypeMap = registry.getDataAttributePrefixToTypeMap(wfType);
500+
final TypeStore dataAttributeTypeStore = registry.getDataAttributeTypeStore(wfType);
507501

508502
// if attribute keys is null or empty, iwf server will return all data attributes
509503
if (keys != null && !keys.isEmpty()) {
510504
final Optional<String> first = keys.stream()
511-
.filter(key -> !DataAttributeUtils.isValidDataAttributeKey(key, dataAttributeKeyToTypeMap, dataAttributePrefixToTypeMap))
505+
.filter(key -> !dataAttributeTypeStore.isValidNameOrPrefix(key))
512506
.findFirst();
513507
if (first.isPresent()) {
514508
throw new IllegalArgumentException(
@@ -535,7 +529,8 @@ private Map<String, Object> doGetWorkflowDataAttributes(
535529
keyValue.getKey(),
536530
clientOptions.getObjectEncoder().decode(
537531
keyValue.getValue(),
538-
DataAttributeUtils.getDataAttributeType(keyValue.getKey(), dataAttributeKeyToTypeMap, dataAttributePrefixToTypeMap))
532+
dataAttributeTypeStore.getType(keyValue.getKey())
533+
)
539534
);
540535
}
541536
}

src/main/java/io/iworkflow/core/Registry.java

Lines changed: 24 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,10 @@ public class Registry {
2323
private final Map<String, StateDef> workflowStateStore = new HashMap<>(); // TODO refactor to use Map<String, Map<String, StateDef>> to be more clear
2424

2525
private final Map<String, StateDef> workflowStartStateStore = new HashMap<>();
26-
private final Map<String, Map<String, Class<?>>> signalTypeStore = new HashMap<>();
2726

28-
private final Map<String, Map<String, Class<?>>> internalChannelTypeStore = new HashMap<>();
29-
private final Map<String, Map<String, Class<?>>> dataAttributeKeyToTypeStore = new HashMap<>();
30-
private final Map<String, Map<String, Class<?>>> dataAttributePrefixToTypeStore = new HashMap<>();
27+
private final Map<String, TypeStore> signalTypeStore = new HashMap<>();
28+
private final Map<String, TypeStore> internalChannelTypeStore = new HashMap<>();
29+
private final Map<String, TypeStore> dataAttributeTypeStore = new HashMap<>();
3130

3231
private final Map<String, Map<String, SearchAttributeValueType>> searchAttributeTypeStore = new HashMap<>();
3332

@@ -113,81 +112,53 @@ private void registerWorkflowRPCs(final ObjectWorkflow wf) {
113112
}
114113

115114
private void registerWorkflowSignal(final ObjectWorkflow wf) {
116-
String workflowType = getWorkflowType(wf);
115+
final String workflowType = getWorkflowType(wf);
117116
final List<SignalChannelDef> channels = getSignalChannels(wf);
117+
118+
final TypeStore typeStore = signalTypeStore.computeIfAbsent(workflowType,
119+
s -> TypeStore.defaultBuilder(TypeStore.Type.SIGNAL_CHANNEL));
120+
118121
if (channels == null || channels.isEmpty()) {
119-
signalTypeStore.put(workflowType, new HashMap<>());
120122
return;
121123
}
122124

123-
for (SignalChannelDef signalChannelDef : channels) {
124-
Map<String, Class<?>> signalNameToTypeMap =
125-
signalTypeStore.computeIfAbsent(workflowType, s -> new HashMap<>());
126-
if (signalNameToTypeMap.containsKey(signalChannelDef.getSignalChannelName())) {
127-
throw new WorkflowDefinitionException(
128-
String.format("Signal channel name %s already exists", signalChannelDef.getSignalChannelName()));
129-
}
130-
signalNameToTypeMap.put(signalChannelDef.getSignalChannelName(), signalChannelDef.getSignalValueType());
125+
for (final SignalChannelDef signalChannelDef : channels) {
126+
typeStore.addToStore(signalChannelDef);
131127
}
132128
}
133129

134130
private void registerWorkflowInternalChannel(final ObjectWorkflow wf) {
135-
String workflowType = getWorkflowType(wf);
131+
final String workflowType = getWorkflowType(wf);
136132
final List<InternalChannelDef> channels = getInternalChannels(wf);
133+
134+
final TypeStore typeStore = internalChannelTypeStore.computeIfAbsent(workflowType,
135+
s -> TypeStore.defaultBuilder(TypeStore.Type.INTERNAL_CHANNEL));
136+
137137
if (channels == null || channels.isEmpty()) {
138-
internalChannelTypeStore.put(workflowType, new HashMap<>());
139138
return;
140139
}
141140

142-
for (InternalChannelDef internalChannelDef : channels) {
143-
Map<String, Class<?>> nameToTypeMap =
144-
internalChannelTypeStore.computeIfAbsent(workflowType, s -> new HashMap<>());
145-
if (nameToTypeMap.containsKey(internalChannelDef.getChannelName())) {
146-
throw new WorkflowDefinitionException(
147-
String.format("InternalChannel name %s already exists", internalChannelDef.getChannelName()));
148-
}
149-
nameToTypeMap.put(internalChannelDef.getChannelName(), internalChannelDef.getValueType());
141+
for (final InternalChannelDef internalChannelDef : channels) {
142+
typeStore.addToStore(internalChannelDef);
150143
}
151144
}
152145

153146
private void registerWorkflowDataAttributes(final ObjectWorkflow wf) {
154147
final String workflowType = getWorkflowType(wf);
155148
final List<DataAttributeDef> fields = getDataAttributeFields(wf);
156149

157-
dataAttributeKeyToTypeStore.put(workflowType, new HashMap<>());
158-
dataAttributePrefixToTypeStore.put(workflowType, new HashMap<>());
150+
final TypeStore typeStore = dataAttributeTypeStore.computeIfAbsent(workflowType,
151+
s -> TypeStore.defaultBuilder(TypeStore.Type.DATA_ATTRIBUTE));
159152

160153
if (fields == null || fields.isEmpty()) {
161154
return;
162155
}
163156

164157
for (final DataAttributeDef dataAttributeField : fields) {
165-
if (dataAttributeField.isPrefix()) {
166-
addDataAttributeToStore(dataAttributeField, workflowType, dataAttributePrefixToTypeStore);
167-
} else {
168-
addDataAttributeToStore(dataAttributeField, workflowType, dataAttributeKeyToTypeStore);
169-
}
158+
typeStore.addToStore(dataAttributeField);
170159
}
171160
}
172161

173-
private void addDataAttributeToStore(
174-
final DataAttributeDef dataAttributeField,
175-
final String workflowType,
176-
final Map<String, Map<String, Class<?>>> dataAttributeStore) {
177-
final Map<String, Class<?>> dataAttributeMap =
178-
dataAttributeStore.computeIfAbsent(workflowType, s -> new HashMap<>());
179-
if (dataAttributeMap.containsKey(dataAttributeField.getKey())) {
180-
throw new WorkflowDefinitionException(
181-
String.format(
182-
"data attribute key/prefix %s already exists",
183-
dataAttributeField.getDataAttributeType())
184-
);
185-
}
186-
dataAttributeMap.put(
187-
dataAttributeField.getKey(),
188-
dataAttributeField.getDataAttributeType()
189-
);
190-
}
191162

192163
private void registerPersistenceOptions(final ObjectWorkflow wf) {
193164
String workflowType = getWorkflowType(wf);
@@ -270,20 +241,16 @@ public Optional<StateDef> getWorkflowStartingState(final String workflowType) {
270241
return Optional.ofNullable(state);
271242
}
272243

273-
public Map<String, Class<?>> getSignalChannelNameToSignalTypeMap(final String workflowType) {
244+
public TypeStore getSignalChannelTypeStore(final String workflowType) {
274245
return signalTypeStore.get(workflowType);
275246
}
276247

277-
public Map<String, Class<?>> getInternalChannelNameToTypeMap(final String workflowType) {
248+
public TypeStore getInternalChannelTypeStore(final String workflowType) {
278249
return internalChannelTypeStore.get(workflowType);
279250
}
280251

281-
public Map<String, Class<?>> getDataAttributeKeyToTypeMap(final String workflowType) {
282-
return dataAttributeKeyToTypeStore.get(workflowType);
283-
}
284-
285-
public Map<String, Class<?>> getDataAttributePrefixToTypeMap(final String workflowType) {
286-
return dataAttributePrefixToTypeStore.get(workflowType);
252+
public TypeStore getDataAttributeTypeStore(final String workflowType) {
253+
return dataAttributeTypeStore.get(workflowType);
287254
}
288255

289256
public Map<String, SearchAttributeValueType> getSearchAttributeKeyToTypeMap(final String workflowType) {
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package io.iworkflow.core;
2+
3+
import io.iworkflow.core.communication.CommunicationMethodDef;
4+
import io.iworkflow.core.persistence.DataAttributeDef;
5+
6+
import java.util.HashMap;
7+
import java.util.Map;
8+
import java.util.Optional;
9+
10+
/**
11+
* This class is used to store both the nameToTypeStore and prefixToTypeStore for the
12+
* data attribute, signal, and internal channel
13+
* in registry.
14+
*/
15+
public class TypeStore {
16+
public enum Type {
17+
DATA_ATTRIBUTE,
18+
SIGNAL_CHANNEL,
19+
INTERNAL_CHANNEL,
20+
}
21+
22+
public final Type classType;
23+
public final Map<String, Class<?>> nameToTypeStore;
24+
public final Map<String, Class<?>> prefixToTypeStore;
25+
26+
private TypeStore(final Type classType, final Map<String, Class<?>> nameToTypeStore, final Map<String, Class<?>> prefixToTypeStore) {
27+
this.classType = classType;
28+
this.nameToTypeStore = nameToTypeStore;
29+
this.prefixToTypeStore = prefixToTypeStore;
30+
}
31+
32+
public static TypeStore defaultBuilder(final Type classType) {
33+
return new TypeStore(classType, new HashMap<>(), new HashMap<>());
34+
}
35+
36+
public Boolean isValidNameOrPrefix(final String name) {
37+
final Class<?> type = doGetType(name);
38+
return type != null;
39+
}
40+
41+
public Class<?> getType(final String name) {
42+
final Class<?> type = doGetType(name);
43+
44+
if (type == null) {
45+
throw new IllegalArgumentException(
46+
String.format(
47+
"%s not registered: %s",
48+
classType,
49+
name
50+
)
51+
);
52+
}
53+
54+
return type;
55+
}
56+
57+
public void addToStore(final Object def) {
58+
final boolean isPrefix;
59+
final String name;
60+
final Class<?> type;
61+
if (classType.equals(Type.DATA_ATTRIBUTE)) {
62+
final DataAttributeDef attributeDef = (DataAttributeDef) def;
63+
isPrefix = attributeDef.isPrefix();
64+
name = attributeDef.getKey();
65+
type = attributeDef.getDataAttributeType();
66+
} else {
67+
final CommunicationMethodDef channelDef = (CommunicationMethodDef) def;
68+
isPrefix = channelDef.isPrefix();
69+
name = channelDef.getName();
70+
type = channelDef.getValueType();
71+
}
72+
73+
doAddToStore(isPrefix, name, type);
74+
}
75+
76+
private Class<?> doGetType(final String name) {
77+
if (nameToTypeStore.containsKey(name)) {
78+
return nameToTypeStore.get(name);
79+
}
80+
81+
final Optional<String> first = prefixToTypeStore.keySet().stream()
82+
.filter(name::startsWith)
83+
.findFirst();
84+
85+
return first.map(prefixToTypeStore::get).orElse(null);
86+
}
87+
88+
private void doAddToStore(final boolean isPrefix, final String name, final Class<?> type) {
89+
final Map<String, Class<?>> store;
90+
if (isPrefix) {
91+
store = prefixToTypeStore;
92+
} else {
93+
store = nameToTypeStore;
94+
}
95+
96+
if (store.containsKey(name)) {
97+
throw new WorkflowDefinitionException(
98+
String.format(
99+
"%s name/prefix %s already exists",
100+
classType,
101+
name
102+
)
103+
);
104+
}
105+
store.put(name, type);
106+
}
107+
}

src/main/java/io/iworkflow/core/WorkerService.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,10 @@ public WorkflowWorkerRpcResponse handleWorkflowWorkerRpc(final WorkflowWorkerRpc
6868
final Map<String, SearchAttributeValueType> searchAttrsTypeMap = registry.getSearchAttributeKeyToTypeMap(req.getWorkflowType());
6969
final SearchAttributeRWImpl searchAttributeRW = new SearchAttributeRWImpl(searchAttrsTypeMap, req.getSearchAttributes());
7070
final CommunicationImpl communication = new CommunicationImpl(
71-
registry.getInternalChannelNameToTypeMap(req.getWorkflowType()), workerOptions.getObjectEncoder(), true);
71+
registry.getInternalChannelTypeStore(req.getWorkflowType()),
72+
workerOptions.getObjectEncoder(),
73+
true
74+
);
7275

7376
final StateExecutionLocalsImpl stateExeLocals = new StateExecutionLocalsImpl(toMap(null), workerOptions.getObjectEncoder());
7477
Persistence persistence = new PersistenceImpl(dataObjectsRW, searchAttributeRW, stateExeLocals);
@@ -146,7 +149,10 @@ public WorkflowStateWaitUntilResponse handleWorkflowStateWaitUntil(final Workflo
146149
final Map<String, SearchAttributeValueType> searchAttrsTypeMap = registry.getSearchAttributeKeyToTypeMap(req.getWorkflowType());
147150
final SearchAttributeRWImpl searchAttributeRW = new SearchAttributeRWImpl(searchAttrsTypeMap, req.getSearchAttributes());
148151
final CommunicationImpl communication = new CommunicationImpl(
149-
registry.getInternalChannelNameToTypeMap(req.getWorkflowType()), workerOptions.getObjectEncoder(), false);
152+
registry.getInternalChannelTypeStore(req.getWorkflowType()),
153+
workerOptions.getObjectEncoder(),
154+
false
155+
);
150156

151157
final StateExecutionLocalsImpl stateExeLocals = new StateExecutionLocalsImpl(toMap(null), workerOptions.getObjectEncoder());
152158
Persistence persistence = new PersistenceImpl(dataObjectsRW, searchAttributeRW, stateExeLocals);
@@ -208,7 +214,10 @@ public WorkflowStateExecuteResponse handleWorkflowStateExecute(final WorkflowSta
208214
final Map<String, SearchAttributeValueType> saTypeMap = registry.getSearchAttributeKeyToTypeMap(req.getWorkflowType());
209215
final SearchAttributeRWImpl searchAttributeRW = new SearchAttributeRWImpl(saTypeMap, req.getSearchAttributes());
210216
final CommunicationImpl communication = new CommunicationImpl(
211-
registry.getInternalChannelNameToTypeMap(req.getWorkflowType()), workerOptions.getObjectEncoder(), false);
217+
registry.getInternalChannelTypeStore(req.getWorkflowType()),
218+
workerOptions.getObjectEncoder(),
219+
false
220+
);
212221

213222
Persistence persistence = new PersistenceImpl(dataObjectsRW, searchAttributeRW, stateExeLocals);
214223

@@ -217,8 +226,8 @@ public WorkflowStateExecuteResponse handleWorkflowStateExecute(final WorkflowSta
217226
input,
218227
CommandResultsMapper.fromGenerated(
219228
req.getCommandResults(),
220-
registry.getSignalChannelNameToSignalTypeMap(req.getWorkflowType()),
221-
registry.getInternalChannelNameToTypeMap(req.getWorkflowType()),
229+
registry.getSignalChannelTypeStore(req.getWorkflowType()),
230+
registry.getInternalChannelTypeStore(req.getWorkflowType()),
222231
workerOptions.getObjectEncoder()),
223232
persistence,
224233
communication);
@@ -271,8 +280,7 @@ private List<InterStateChannelPublishing> toInterStateChannelPublishing(final Ma
271280
private DataAttributesRWImpl createDataObjectsRW(final String workflowType, final List<KeyValue> keyValues) {
272281
final Map<String, EncodedObject> map = toMap(keyValues);
273282
return new DataAttributesRWImpl(
274-
registry.getDataAttributeKeyToTypeMap(workflowType),
275-
registry.getDataAttributePrefixToTypeMap(workflowType),
283+
registry.getDataAttributeTypeStore(workflowType),
276284
map,
277285
workerOptions.getObjectEncoder());
278286
}

src/main/java/io/iworkflow/core/communication/CommunicationImpl.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import io.iworkflow.core.ObjectEncoder;
44
import io.iworkflow.core.StateMovement;
5+
import io.iworkflow.core.TypeStore;
56
import io.iworkflow.core.WorkflowDefinitionException;
67
import io.iworkflow.gen.models.EncodedObject;
78

@@ -13,7 +14,7 @@
1314

1415
public class CommunicationImpl implements Communication {
1516

16-
final Map<String, Class<?>> nameToTypeMap;
17+
final TypeStore typeStore;
1718
final Map<String, List<EncodedObject>> toPublish = new HashMap<>();
1819

1920
final List<StateMovement> stateMovements = new ArrayList<>();
@@ -22,17 +23,18 @@ public class CommunicationImpl implements Communication {
2223
final ObjectEncoder objectEncoder;
2324

2425
public CommunicationImpl(
25-
final Map<String, Class<?>> nameToTypeMap,
26+
final TypeStore typeStore,
2627
final ObjectEncoder objectEncoder,
2728
final boolean allowTriggerStateMovements) {
28-
this.nameToTypeMap = nameToTypeMap;
29+
this.typeStore = typeStore;
2930
this.objectEncoder = objectEncoder;
3031
this.allowTriggerStateMovements = allowTriggerStateMovements;
3132
}
3233

3334
@Override
3435
public void publishInternalChannel(final String channelName, final Object value) {
35-
final Class<?> type = nameToTypeMap.get(channelName);
36+
final Class<?> type = typeStore.getType(channelName);
37+
3638
if (value != null && !type.isInstance(value)) {
3739
throw new WorkflowDefinitionException(String.format("InternalChannel value is not of type %s", type.getName()));
3840
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
package io.iworkflow.core.communication;
22

33
public interface CommunicationMethodDef {
4+
String getName();
5+
Class getValueType();
6+
Boolean isPrefix();
47
}

0 commit comments

Comments
 (0)