Skip to content

Commit f0b901c

Browse files
committed
Use details in addition to name to find markers
Signed-off-by: Yves Bastide <yves@botify.com>
1 parent ec0fe2e commit f0b901c

File tree

3 files changed

+40
-5
lines changed

3 files changed

+40
-5
lines changed

examples/marker.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from simpleflow import Workflow, futures
2+
from simpleflow.canvas import Chain
23

34

45
class MarkerWorkflow(Workflow):
@@ -11,5 +12,29 @@ def run(self):
1112
m = self.submit(self.record_marker('marker 1', 'some details'))
1213
self.submit(self.record_marker('marker 2', "2nd marker's details"))
1314
futures.wait(m)
14-
print('Markers: {}'.format(self.list_markers()))
15-
print('Markers, all: {}'.format(self.list_markers(all=True)))
15+
markers = self.list_markers()
16+
assert 2 == len(markers)
17+
print('Markers: {}'.format(markers))
18+
markers = self.list_markers(all=True)
19+
assert 3 == len(markers)
20+
print('Markers, all: {}'.format(markers))
21+
22+
23+
class MarkerInChainWorkflow(Workflow):
24+
name = 'example'
25+
version = 'example'
26+
task_list = 'example'
27+
28+
def run(self):
29+
chain = Chain(
30+
self.record_marker('marker 1'),
31+
self.record_marker('marker 1', 'some details'),
32+
self.record_marker('marker 2', "2nd marker's details"),
33+
)
34+
futures.wait(self.submit(chain))
35+
markers = self.list_markers()
36+
assert 2 == len(markers)
37+
print('Markers: {}'.format(markers))
38+
markers = self.list_markers(all=True)
39+
assert 3 == len(markers)
40+
print('Markers, all: {}'.format(markers))

simpleflow/swf/executor.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -475,13 +475,20 @@ def find_marker_event(self, a_task, history):
475475
Get the event corresponding to a activity task, if any.
476476
477477
:param a_task:
478-
:type a_task: Marker
478+
:type a_task: MarkerTask
479479
:param history:
480480
:type history: simpleflow.history.History
481481
:return:
482482
:rtype: Optional[dict[str, Any]]
483483
"""
484+
json_details = a_task.get_json_details()
484485
marker_list = history.markers.get(a_task.name)
486+
if not marker_list:
487+
return None
488+
marker_list = filter(
489+
lambda m: m['state'] == 'recorded' and m['details'] == json_details,
490+
marker_list
491+
)
485492
return marker_list[-1] if marker_list else None
486493

487494
TASK_TYPE_TO_EVENT_FINDER = {
@@ -725,7 +732,7 @@ def _compute_priority(self, priority_set_on_submit, a_task):
725732
if priority_set_on_submit is not PRIORITY_NOT_SET:
726733
return priority_set_on_submit
727734
elif (isinstance(a_task, ActivityTask) and
728-
a_task.activity.task_priority is not PRIORITY_NOT_SET):
735+
a_task.activity.task_priority is not PRIORITY_NOT_SET):
729736
return a_task.activity.task_priority
730737
elif self._workflow.task_priority is not PRIORITY_NOT_SET:
731738
return self._workflow.task_priority

simpleflow/swf/task.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,9 @@ def schedule(self, *args, **kwargs):
242242
decision = swf.models.decision.MarkerDecision()
243243
decision.record(
244244
self.name,
245-
json_dumps(self.details) if self.details is not None else None,
245+
self.get_json_details(),
246246
)
247247
return [decision]
248+
249+
def get_json_details(self):
250+
return json_dumps(self.details) if self.details is not None else None

0 commit comments

Comments
 (0)