Skip to content

Commit 64e37f2

Browse files
authored
Support weak typed data attributes, and rename related methods (#189)
Co-authored-by: Kaili Zhu <kzhu@indeed.com>
1 parent 54cdb47 commit 64e37f2

File tree

11 files changed

+203
-69
lines changed

11 files changed

+203
-69
lines changed

src/main/java/io/iworkflow/core/Client.java

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.iworkflow.core;
22

33
import io.iworkflow.core.persistence.PersistenceOptions;
4+
import io.iworkflow.core.utils.DataAttributeUtils;
45
import io.iworkflow.gen.models.KeyValue;
56
import io.iworkflow.gen.models.SearchAttribute;
67
import io.iworkflow.gen.models.SearchAttributeKeyAndType;
@@ -438,16 +439,16 @@ public void stopWorkflow(final String workflowId) {
438439
* @param keys required, cannot be empty or null
439440
* @return the data attributes
440441
*/
441-
public Map<String, Object> getWorkflowDataObjects(
442+
public Map<String, Object> getWorkflowDataAttributes(
442443
final Class<? extends ObjectWorkflow> workflowClass,
443444
final String workflowId,
444445
final String workflowRunId,
445-
List<String> keys) {
446+
final List<String> keys) {
446447

447448
if (keys == null || keys.isEmpty()) {
448-
throw new IllegalArgumentException("keys must contain at least one entry, or use getAllDataObjects API to get all");
449+
throw new IllegalArgumentException("keys must contain at least one entry, or use getAllDataAttributes API to get all");
449450
}
450-
return doGetWorkflowDataObjects(workflowClass, workflowId, workflowRunId, keys);
451+
return doGetWorkflowDataAttributes(workflowClass, workflowId, workflowRunId, keys);
451452
}
452453

453454
/**
@@ -458,11 +459,11 @@ public Map<String, Object> getWorkflowDataObjects(
458459
* @param keys required, cannot be empty or null
459460
* @return the data attributes
460461
*/
461-
public Map<String, Object> getWorkflowDataObjects(
462+
public Map<String, Object> getWorkflowDataAttributes(
462463
final Class<? extends ObjectWorkflow> workflowClass,
463464
final String workflowId,
464-
List<String> keys) {
465-
return getWorkflowDataObjects(workflowClass, workflowId, "", keys);
465+
final List<String> keys) {
466+
return getWorkflowDataAttributes(workflowClass, workflowId, "", keys);
466467
}
467468

468469
/**
@@ -473,11 +474,11 @@ public Map<String, Object> getWorkflowDataObjects(
473474
* @param workflowRunId optional, can be empty
474475
* @return the data attributes
475476
*/
476-
public Map<String, Object> getAllDataObjects(
477+
public Map<String, Object> getAllDataAttributes(
477478
final Class<? extends ObjectWorkflow> workflowClass,
478479
final String workflowId,
479480
final String workflowRunId) {
480-
return doGetWorkflowDataObjects(workflowClass, workflowId, workflowRunId, null);
481+
return doGetWorkflowDataAttributes(workflowClass, workflowId, workflowRunId, null);
481482
}
482483

483484
/**
@@ -487,32 +488,33 @@ public Map<String, Object> getAllDataObjects(
487488
* @param workflowId required
488489
* @return the data attributes
489490
*/
490-
public Map<String, Object> getAllDataObjects(
491+
public Map<String, Object> getAllDataAttributes(
491492
final Class<? extends ObjectWorkflow> workflowClass,
492493
final String workflowId) {
493-
return getAllDataObjects(workflowClass, workflowId, "");
494+
return getAllDataAttributes(workflowClass, workflowId, "");
494495
}
495496

496-
private Map<String, Object> doGetWorkflowDataObjects(
497+
private Map<String, Object> doGetWorkflowDataAttributes(
497498
final Class<? extends ObjectWorkflow> workflowClass,
498499
final String workflowId,
499500
final String workflowRunId,
500-
List<String> keys) {
501+
final List<String> keys) {
501502
final String wfType = workflowClass.getSimpleName();
502503
checkWorkflowTypeExists(wfType);
503504

504-
Map<String, Class<?>> queryDataObjectKeyToTypeMap = registry.getDataAttributeKeyToTypeMap(wfType);
505+
final Map<String, Class<?>> dataAttributeKeyToTypeMap = registry.getDataAttributeKeyToTypeMap(wfType);
506+
final Map<String, Class<?>> dataAttributePrefixToTypeMap = registry.getDataAttributePrefixToTypeMap(wfType);
505507

506508
// if attribute keys is null or empty, iwf server will return all data attributes
507509
if (keys != null && !keys.isEmpty()) {
508-
List<String> nonExistingDataObjectKeyList = keys.stream()
509-
.filter(s -> !queryDataObjectKeyToTypeMap.containsKey(s))
510-
.collect(Collectors.toList());
511-
if (!nonExistingDataObjectKeyList.isEmpty()) {
510+
final Optional<String> first = keys.stream()
511+
.filter(key -> !DataAttributeUtils.isValidDataAttributeKey(key, dataAttributeKeyToTypeMap, dataAttributePrefixToTypeMap))
512+
.findFirst();
513+
if (first.isPresent()) {
512514
throw new IllegalArgumentException(
513515
String.format(
514-
"data attributes not registered: %s",
515-
String.join(", ", nonExistingDataObjectKeyList)
516+
"data attribute not registered: %s",
517+
first.get()
516518
)
517519
);
518520
}
@@ -525,12 +527,15 @@ private Map<String, Object> doGetWorkflowDataObjects(
525527
if (response.getObjects() == null) {
526528
throw new IllegalStateException("data attributes not returned");
527529
}
528-
Map<String, Object> result = new HashMap<>();
529-
for (KeyValue keyValue : response.getObjects()) {
530+
531+
final Map<String, Object> result = new HashMap<>();
532+
for (final KeyValue keyValue : response.getObjects()) {
530533
if (keyValue.getValue() != null) {
531534
result.put(
532535
keyValue.getKey(),
533-
clientOptions.getObjectEncoder().decode(keyValue.getValue(), queryDataObjectKeyToTypeMap.get(keyValue.getKey()))
536+
clientOptions.getObjectEncoder().decode(
537+
keyValue.getValue(),
538+
DataAttributeUtils.getDataAttributeType(keyValue.getKey(), dataAttributeKeyToTypeMap, dataAttributePrefixToTypeMap))
534539
);
535540
}
536541
}

src/main/java/io/iworkflow/core/Registry.java

Lines changed: 47 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ public class Registry {
2626
private final Map<String, Map<String, Class<?>>> signalTypeStore = new HashMap<>();
2727

2828
private final Map<String, Map<String, Class<?>>> internalChannelTypeStore = new HashMap<>();
29-
private final Map<String, Map<String, Class<?>>> dataAttributeTypeStore = new HashMap<>();
29+
private final Map<String, Map<String, Class<?>>> dataAttributeKeyToTypeStore = new HashMap<>();
30+
private final Map<String, Map<String, Class<?>>> dataAttributePrefixToTypeStore = new HashMap<>();
3031

3132
private final Map<String, Map<String, SearchAttributeValueType>> searchAttributeTypeStore = new HashMap<>();
3233

@@ -150,28 +151,42 @@ private void registerWorkflowInternalChannel(final ObjectWorkflow wf) {
150151
}
151152

152153
private void registerWorkflowDataAttributes(final ObjectWorkflow wf) {
153-
String workflowType = getWorkflowType(wf);
154+
final String workflowType = getWorkflowType(wf);
154155
final List<DataAttributeDef> fields = getDataAttributeFields(wf);
156+
157+
dataAttributeKeyToTypeStore.put(workflowType, new HashMap<>());
158+
dataAttributePrefixToTypeStore.put(workflowType, new HashMap<>());
159+
155160
if (fields == null || fields.isEmpty()) {
156-
dataAttributeTypeStore.put(workflowType, new HashMap<>());
157161
return;
158162
}
159163

160-
for (DataAttributeDef dataAttributeField : fields) {
161-
Map<String, Class<?>> dataAttributeKeyToTypeMap =
162-
dataAttributeTypeStore.computeIfAbsent(workflowType, s -> new HashMap<>());
163-
if (dataAttributeKeyToTypeMap.containsKey(dataAttributeField.getKey())) {
164-
throw new WorkflowDefinitionException(
165-
String.format(
166-
"data attribute key %s already exists",
167-
dataAttributeField.getDataAttributeType())
168-
);
164+
for (final DataAttributeDef dataAttributeField : fields) {
165+
if (dataAttributeField.isPrefix()) {
166+
addDataAttributeToStore(dataAttributeField, workflowType, dataAttributePrefixToTypeStore);
167+
} else {
168+
addDataAttributeToStore(dataAttributeField, workflowType, dataAttributeKeyToTypeStore);
169169
}
170-
dataAttributeKeyToTypeMap.put(
171-
dataAttributeField.getKey(),
172-
dataAttributeField.getDataAttributeType()
170+
}
171+
}
172+
173+
private void addDataAttributeToStore(
174+
final DataAttributeDef dataAttributeField,
175+
final String workflowType,
176+
final Map<String, Map<String, Class<?>>> dataAttributeStore) {
177+
final Map<String, Class<?>> dataAttributeMap =
178+
dataAttributeStore.computeIfAbsent(workflowType, s -> new HashMap<>());
179+
if (dataAttributeMap.containsKey(dataAttributeField.getKey())) {
180+
throw new WorkflowDefinitionException(
181+
String.format(
182+
"data attribute key/prefix %s already exists",
183+
dataAttributeField.getDataAttributeType())
173184
);
174185
}
186+
dataAttributeMap.put(
187+
dataAttributeField.getKey(),
188+
dataAttributeField.getDataAttributeType()
189+
);
175190
}
176191

177192
private void registerPersistenceOptions(final ObjectWorkflow wf) {
@@ -180,19 +195,24 @@ private void registerPersistenceOptions(final ObjectWorkflow wf) {
180195
}
181196

182197
private List<DataAttributeDef> getDataAttributeFields(final ObjectWorkflow wf) {
183-
final Set<String> keySet = wf.getPersistenceSchema().stream().map(PersistenceFieldDef::getKey).collect(Collectors.toSet());
184-
if (keySet.size() != wf.getPersistenceSchema().size()) {
185-
throw new WorkflowDefinitionException("cannot have conflict key definition in persistence schema");
186-
}
187-
return wf.getPersistenceSchema().stream().filter((f) -> f instanceof DataAttributeDef).map(f -> (DataAttributeDef) f).collect(Collectors.toList());
198+
return getAttributeFields(wf, DataAttributeDef.class);
188199
}
189200

190201
private List<SearchAttributeDef> getSearchAttributeFields(final ObjectWorkflow wf) {
202+
return getAttributeFields(wf, SearchAttributeDef.class);
203+
}
204+
205+
private <T extends PersistenceFieldDef> List<T> getAttributeFields(final ObjectWorkflow wf, final Class<T> attributeDef) {
206+
// All the search attributes, data attributes, and data attribute prefixes can not have the same `key`
191207
final Set<String> keySet = wf.getPersistenceSchema().stream().map(PersistenceFieldDef::getKey).collect(Collectors.toSet());
192208
if (keySet.size() != wf.getPersistenceSchema().size()) {
193-
throw new WorkflowDefinitionException("cannot have conflict key definition in persistence schema");
209+
throw new WorkflowDefinitionException("cannot have conflict key/prefix definition in persistence schema");
194210
}
195-
return wf.getPersistenceSchema().stream().filter((f) -> f instanceof SearchAttributeDef).map(f -> (SearchAttributeDef) f).collect(Collectors.toList());
211+
212+
return wf.getPersistenceSchema().stream()
213+
.filter(attributeDef::isInstance)
214+
.map(attributeDef::cast)
215+
.collect(Collectors.toList());
196216
}
197217

198218
private List<InternalChannelDef> getInternalChannels(final ObjectWorkflow wf) {
@@ -259,7 +279,11 @@ public Map<String, Class<?>> getInternalChannelNameToTypeMap(final String workfl
259279
}
260280

261281
public Map<String, Class<?>> getDataAttributeKeyToTypeMap(final String workflowType) {
262-
return dataAttributeTypeStore.get(workflowType);
282+
return dataAttributeKeyToTypeStore.get(workflowType);
283+
}
284+
285+
public Map<String, Class<?>> getDataAttributePrefixToTypeMap(final String workflowType) {
286+
return dataAttributePrefixToTypeStore.get(workflowType);
263287
}
264288

265289
public Map<String, SearchAttributeValueType> getSearchAttributeKeyToTypeMap(final String workflowType) {

src/main/java/io/iworkflow/core/WorkerService.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -268,9 +268,13 @@ private List<InterStateChannelPublishing> toInterStateChannelPublishing(final Ma
268268
return results;
269269
}
270270

271-
private DataAttributesRWImpl createDataObjectsRW(String workflowType, List<KeyValue> keyValues) {
271+
private DataAttributesRWImpl createDataObjectsRW(final String workflowType, final List<KeyValue> keyValues) {
272272
final Map<String, EncodedObject> map = toMap(keyValues);
273-
return new DataAttributesRWImpl(registry.getDataAttributeKeyToTypeMap(workflowType), map, workerOptions.getObjectEncoder());
273+
return new DataAttributesRWImpl(
274+
registry.getDataAttributeKeyToTypeMap(workflowType),
275+
registry.getDataAttributePrefixToTypeMap(workflowType),
276+
map,
277+
workerOptions.getObjectEncoder());
274278
}
275279

276280
private Map<String, EncodedObject> toMap(final List<KeyValue> keyValues) {

src/main/java/io/iworkflow/core/persistence/DataAttributeDef.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,39 @@
55
@Value.Immutable
66
public abstract class DataAttributeDef implements PersistenceFieldDef {
77
public abstract Class getDataAttributeType();
8+
public abstract Boolean isPrefix();
89

9-
public static DataAttributeDef create(Class dataType, String key) {
10+
/**
11+
* iWF will verify if the key has been registered for the data attribute created using this method,
12+
* allowing users to create only one data attribute with the same key and data type.
13+
*
14+
* @param dataType required.
15+
* @param key required. The unique key.
16+
* @return a data attribute definition
17+
*/
18+
public static DataAttributeDef create(final Class dataType, final String key) {
1019
return ImmutableDataAttributeDef.builder()
1120
.key(key)
1221
.dataAttributeType(dataType)
22+
.isPrefix(false)
23+
.build();
24+
}
25+
26+
/**
27+
* iWF now supports dynamically created data attributes with a shared prefix and the same data type.
28+
* (E.g., dynamically created data attributes of type String can be named with a common prefix like: data_attribute_prefix_1: "one", data_attribute_prefix_2: "two")
29+
* iWF will verify if the prefix has been registered for data attributes created using this method,
30+
* allowing users to create multiple data attributes with the same prefix and data type.
31+
*
32+
* @param dataType required.
33+
* @param keyPrefix required. The common prefix of a set of keys to be created later.
34+
* @return a data attribute definition
35+
*/
36+
public static DataAttributeDef createByPrefix(final Class dataType, final String keyPrefix) {
37+
return ImmutableDataAttributeDef.builder()
38+
.key(keyPrefix)
39+
.dataAttributeType(dataType)
40+
.isPrefix(true)
1341
.build();
1442
}
1543
}

src/main/java/io/iworkflow/core/persistence/DataAttributesRWImpl.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.iworkflow.core.persistence;
22

33
import io.iworkflow.core.ObjectEncoder;
4+
import io.iworkflow.core.utils.DataAttributeUtils;
45
import io.iworkflow.gen.models.EncodedObject;
56
import io.iworkflow.gen.models.KeyValue;
67

@@ -11,30 +12,33 @@
1112

1213
public class DataAttributesRWImpl implements DataAttributesRW {
1314
private final Map<String, Class<?>> keyToTypeMap;
15+
private final Map<String, Class<?>> prefixToTypeMap;
1416
private final Map<String, EncodedObject> keyToEncodedObjectMap;
1517
private final Map<String, EncodedObject> toReturnToServer;
1618
private final ObjectEncoder objectEncoder;
1719

1820
public DataAttributesRWImpl(
1921
final Map<String, Class<?>> keyToTypeMap,
22+
final Map<String, Class<?>> prefixToTypeMap,
2023
final Map<String, EncodedObject> keyToValueMap,
2124
final ObjectEncoder objectEncoder) {
2225
this.keyToTypeMap = keyToTypeMap;
26+
this.prefixToTypeMap = prefixToTypeMap;
2327
this.keyToEncodedObjectMap = keyToValueMap;
2428
this.toReturnToServer = new HashMap<>();
2529
this.objectEncoder = objectEncoder;
2630
}
2731

2832
@Override
2933
public <T> T getDataAttribute(String key, Class<T> type) {
30-
if (!keyToTypeMap.containsKey(key)) {
34+
if (!DataAttributeUtils.isValidDataAttributeKey(key, keyToTypeMap, prefixToTypeMap)) {
3135
throw new IllegalArgumentException(String.format("data attribute %s is not registered", key));
3236
}
3337
if (!keyToEncodedObjectMap.containsKey(key)) {
3438
return null;
3539
}
3640

37-
Class<?> registeredType = keyToTypeMap.get(key);
41+
final Class<?> registeredType = DataAttributeUtils.getDataAttributeType(key, keyToTypeMap, prefixToTypeMap);
3842
if (!type.isAssignableFrom(registeredType)) {
3943
throw new IllegalArgumentException(
4044
String.format(
@@ -49,11 +53,11 @@ public <T> T getDataAttribute(String key, Class<T> type) {
4953

5054
@Override
5155
public void setDataAttribute(String key, Object value) {
52-
if (!keyToTypeMap.containsKey(key)) {
56+
if (!DataAttributeUtils.isValidDataAttributeKey(key, keyToTypeMap, prefixToTypeMap)) {
5357
throw new IllegalArgumentException(String.format("data attribute %s is not registered", key));
5458
}
5559

56-
Class<?> registeredType = keyToTypeMap.get(key);
60+
final Class<?> registeredType = DataAttributeUtils.getDataAttributeType(key, keyToTypeMap, prefixToTypeMap);
5761
if (value != null && !registeredType.isAssignableFrom(value.getClass())) {
5862
throw new IllegalArgumentException(String.format("Input is not an instance of class %s", registeredType.getName()));
5963
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package io.iworkflow.core.utils;
2+
3+
import java.util.Map;
4+
import java.util.Optional;
5+
6+
public class DataAttributeUtils {
7+
public static Boolean isValidDataAttributeKey(
8+
final String key,
9+
final Map<String, Class<?>> dataAttributeKeyToTypeMap,
10+
final Map<String, Class<?>> dataAttributePrefixToTypeMap) {
11+
12+
if (dataAttributeKeyToTypeMap.containsKey(key)) {
13+
return true;
14+
}
15+
16+
final Optional<String> first = dataAttributePrefixToTypeMap.keySet().stream()
17+
.filter(key::startsWith)
18+
.findFirst();
19+
20+
return first.isPresent();
21+
}
22+
23+
public static Class<?> getDataAttributeType(
24+
final String key,
25+
final Map<String, Class<?>> dataAttributeKeyToTypeMap,
26+
final Map<String, Class<?>> dataAttributePrefixToTypeMap) {
27+
28+
if (dataAttributeKeyToTypeMap.containsKey(key)) {
29+
return dataAttributeKeyToTypeMap.get(key);
30+
}
31+
32+
final Optional<String> first = dataAttributePrefixToTypeMap.keySet().stream()
33+
.filter(key::startsWith)
34+
.findFirst();
35+
36+
if (first.isPresent()) {
37+
return dataAttributePrefixToTypeMap.get(first.get());
38+
}
39+
40+
throw new IllegalArgumentException(
41+
String.format(
42+
"Data attribute not registered: %s",
43+
key
44+
)
45+
);
46+
}
47+
}

0 commit comments

Comments
 (0)