Skip to content

Commit 38668f5

Browse files
authored
Merge pull request #227 from botify-labs/bugfix/set-markers-many-times
Use details in addition to name to find markers
2 parents 4561c14 + 3f5b33e commit 38668f5

File tree

10 files changed

+352
-25
lines changed

10 files changed

+352
-25
lines changed

examples/marker.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from simpleflow import Workflow, futures
2+
from simpleflow.canvas import Chain
23

34

45
class MarkerWorkflow(Workflow):
@@ -11,5 +12,29 @@ def run(self):
1112
m = self.submit(self.record_marker('marker 1', 'some details'))
1213
self.submit(self.record_marker('marker 2', "2nd marker's details"))
1314
futures.wait(m)
14-
print('Markers: {}'.format(self.list_markers()))
15-
print('Markers, all: {}'.format(self.list_markers(all=True)))
15+
markers = self.list_markers()
16+
assert 2 == len(markers)
17+
print('Markers: {}'.format(markers))
18+
markers = self.list_markers(all=True)
19+
assert 3 == len(markers)
20+
print('Markers, all: {}'.format(markers))
21+
22+
23+
class MarkerInChainWorkflow(Workflow):
24+
name = 'example'
25+
version = 'example'
26+
task_list = 'example'
27+
28+
def run(self):
29+
chain = Chain(
30+
self.record_marker('marker 1'),
31+
self.record_marker('marker 1', 'some details'),
32+
self.record_marker('marker 2', "2nd marker's details"),
33+
)
34+
futures.wait(self.submit(chain))
35+
markers = self.list_markers()
36+
assert 2 == len(markers)
37+
print('Markers: {}'.format(markers))
38+
markers = self.list_markers(all=True)
39+
assert 3 == len(markers)
40+
print('Markers, all: {}'.format(markers))

setup.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ def read(fname):
8080
'click',
8181
'psutil>=3.2.1',
8282
'pytz',
83+
'typing',
8384
]
8485
if PY2:
8586
DEPS += [
@@ -112,10 +113,12 @@ def read(fname):
112113
'License :: OSI Approved :: MIT License',
113114
'Natural Language :: English',
114115
"Programming Language :: Python :: 2",
115-
'Programming Language :: Python :: 2.6',
116116
'Programming Language :: Python :: 2.7',
117117
'Programming Language :: Python :: 3',
118118
'Programming Language :: Python :: 3.3',
119+
'Programming Language :: Python :: 3.4',
120+
'Programming Language :: Python :: 3.5',
121+
'Programming Language :: Python :: 3.6',
119122
],
120123
test_suite='tests',
121124
tests_require=[

simpleflow/swf/executor.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -475,13 +475,20 @@ def find_marker_event(self, a_task, history):
475475
Get the event corresponding to a activity task, if any.
476476
477477
:param a_task:
478-
:type a_task: Marker
478+
:type a_task: MarkerTask
479479
:param history:
480480
:type history: simpleflow.history.History
481481
:return:
482482
:rtype: Optional[dict[str, Any]]
483483
"""
484+
json_details = a_task.get_json_details()
484485
marker_list = history.markers.get(a_task.name)
486+
if not marker_list:
487+
return None
488+
marker_list = filter(
489+
lambda m: m['state'] == 'recorded' and m['details'] == json_details,
490+
marker_list
491+
)
485492
return marker_list[-1] if marker_list else None
486493

487494
TASK_TYPE_TO_EVENT_FINDER = {
@@ -725,7 +732,7 @@ def _compute_priority(self, priority_set_on_submit, a_task):
725732
if priority_set_on_submit is not PRIORITY_NOT_SET:
726733
return priority_set_on_submit
727734
elif (isinstance(a_task, ActivityTask) and
728-
a_task.activity.task_priority is not PRIORITY_NOT_SET):
735+
a_task.activity.task_priority is not PRIORITY_NOT_SET):
729736
return a_task.activity.task_priority
730737
elif self._workflow.task_priority is not PRIORITY_NOT_SET:
731738
return self._workflow.task_priority

simpleflow/swf/task.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,9 @@ def schedule(self, *args, **kwargs):
242242
decision = swf.models.decision.MarkerDecision()
243243
decision.record(
244244
self.name,
245-
json_dumps(self.details) if self.details is not None else None,
245+
self.get_json_details(),
246246
)
247247
return [decision]
248+
249+
def get_json_details(self):
250+
return json_dumps(self.details) if self.details is not None else None

tests/integration/__init__.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,20 @@
22
import os
33

44
import boto.swf
5+
from click.testing import CliRunner
6+
from sure import expect
57
from vcr import VCR
68

79
import simpleflow.command # NOQA
810
from tests.utils import IntegrationTestCase
911

12+
from simpleflow.utils import json_dumps
13+
14+
if False:
15+
from typing import List, Union
16+
from click.testing import Result
17+
18+
1019
# Default SWF parameters
1120
os.environ["AWS_DEFAULT_REGION"] = "us-east-1"
1221
os.environ["SWF_DOMAIN"] = "TestDomain"
@@ -50,3 +59,57 @@ def conn(self):
5059
if not hasattr(self, "_conn"):
5160
self._conn = boto.swf.connect_to_region(self.region)
5261
return self._conn
62+
63+
def get_events(self, run_id):
64+
response = self.conn.get_workflow_execution_history(
65+
self.domain,
66+
run_id,
67+
self.workflow_id,
68+
)
69+
events = response['events']
70+
next_page = response.get('nextPageToken')
71+
while next_page is not None:
72+
response = self.conn.get_workflow_execution_history(
73+
self.domain,
74+
run_id,
75+
self.workflow_id,
76+
next_page_token=next_page,
77+
)
78+
79+
events.extend(response['events'])
80+
next_page = response.get('nextPageToken')
81+
return events
82+
83+
def invoke(self, command, arguments):
84+
# type: (str, Union(str, List[str])) -> Result
85+
if not hasattr(self, "runner"):
86+
self.runner = CliRunner()
87+
if isinstance(arguments, str):
88+
arguments = arguments.split(" ")
89+
print('simpleflow {} {}'.format(command, ' '.join(arguments)))
90+
return self.runner.invoke(command, arguments, catch_exceptions=False)
91+
92+
def run_standalone(self, workflow_name, *args, **kwargs):
93+
input = json_dumps(dict(args=args, kwargs=kwargs))
94+
result = self.invoke(
95+
simpleflow.command.cli,
96+
[
97+
"standalone",
98+
"--workflow-id",
99+
str(self.workflow_id),
100+
"--input",
101+
input,
102+
"--nb-deciders",
103+
"2",
104+
"--nb-workers",
105+
"2",
106+
workflow_name,
107+
],
108+
)
109+
expect(result.exit_code).to.equal(0)
110+
lines = result.output.split("\n")
111+
start_line = [line for line in lines if line.startswith(self.workflow_id)][0]
112+
_, run_id = start_line.split(" ", 1)
113+
114+
events = self.get_events(run_id)
115+
return events
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
interactions:
2+
- request:
3+
body: !!python/unicode '{"domain": "TestDomain", "workflowType": {"version": "example",
4+
"name": "example"}}'
5+
headers:
6+
Authorization: ['AWS4-HMAC-SHA256 Credential=1234AB/20160823/us-east-1/swf/aws4_request,SignedHeaders=host;x-amz-date;x-amz-target,Signature=foobar']
7+
Content-Encoding: [amz-1.0]
8+
Content-Length: ['83']
9+
Content-Type: [application/json; charset=UTF-8]
10+
Host: [swf.us-east-1.amazonaws.com]
11+
User-Agent: [Boto/2.46.1 Python/2.7.12+ Linux/4.8.0-38-generic]
12+
X-Amz-Date: [20170303T103000Z]
13+
X-Amz-Target: [com.amazonaws.swf.service.model.SimpleWorkflowService.DescribeWorkflowType]
14+
method: POST
15+
uri: https://swf.us-east-1.amazonaws.com/
16+
response:
17+
body: {string: !!python/unicode '{"configuration":{"defaultChildPolicy":"TERMINATE","defaultExecutionStartToCloseTimeout":"300","defaultTaskList":{"name":"None"},"defaultTaskStartToCloseTimeout":"300"},"typeInfo":{"creationDate":1.487851963534E9,"status":"REGISTERED","workflowType":{"name":"example","version":"example"}}}'}
18+
headers:
19+
content-length: ['290']
20+
content-type: [application/json]
21+
x-amzn-requestid: [5b64c228-fffc-11e6-bc39-2f26f01a61e7]
22+
status: {code: 200, message: OK}
23+
- request:
24+
body: !!python/unicode '{"domain": "TestDomain", "taskList": {"name": "test-simpleflow-workflow-b3bc4bd5de8d48cf8908c8b00a834e79"},
25+
"childPolicy": "TERMINATE", "input": "{\"args\":[true],\"kwargs\":{}}", "workflowType":
26+
{"version": "example", "name": "example"}, "workflowId": "test-simpleflow-workflow"}'
27+
headers:
28+
Authorization: ['AWS4-HMAC-SHA256 Credential=1234AB/20160823/us-east-1/swf/aws4_request,SignedHeaders=host;x-amz-date;x-amz-target,Signature=foobar']
29+
Content-Encoding: [amz-1.0]
30+
Content-Length: ['280']
31+
Content-Type: [application/json; charset=UTF-8]
32+
Host: [swf.us-east-1.amazonaws.com]
33+
User-Agent: [Boto/2.46.1 Python/2.7.12+ Linux/4.8.0-38-generic]
34+
X-Amz-Date: [20170303T103001Z]
35+
X-Amz-Target: [com.amazonaws.swf.service.model.SimpleWorkflowService.StartWorkflowExecution]
36+
method: POST
37+
uri: https://swf.us-east-1.amazonaws.com/
38+
response:
39+
body: {string: !!python/unicode '{"runId":"22whTBuUw1fjwfLHU4XuHBL+uPmpOpU2900LflTOTTVg0="}'}
40+
headers:
41+
content-length: ['58']
42+
content-type: [application/json]
43+
x-amzn-requestid: [5b9ebdef-fffc-11e6-a472-65992fd150a3]
44+
status: {code: 200, message: OK}
45+
- request:
46+
body: !!python/unicode '{"domain": "TestDomain", "execution": {"workflowId": "test-simpleflow-workflow",
47+
"runId": "22whTBuUw1fjwfLHU4XuHBL+uPmpOpU2900LflTOTTVg0="}}'
48+
headers:
49+
Authorization: ['AWS4-HMAC-SHA256 Credential=1234AB/20160823/us-east-1/swf/aws4_request,SignedHeaders=host;x-amz-date;x-amz-target,Signature=foobar']
50+
Content-Encoding: [amz-1.0]
51+
Content-Length: ['140']
52+
Content-Type: [application/json; charset=UTF-8]
53+
Host: [swf.us-east-1.amazonaws.com]
54+
User-Agent: [Boto/2.46.1 Python/2.7.12+ Linux/4.8.0-38-generic]
55+
X-Amz-Date: [20170303T103003Z]
56+
X-Amz-Target: [com.amazonaws.swf.service.model.SimpleWorkflowService.DescribeWorkflowExecution]
57+
method: POST
58+
uri: https://swf.us-east-1.amazonaws.com/
59+
response:
60+
body: {string: !!python/unicode '{"executionConfiguration":{"childPolicy":"TERMINATE","executionStartToCloseTimeout":"300","taskList":{"name":"test-simpleflow-workflow-b3bc4bd5de8d48cf8908c8b00a834e79"},"taskStartToCloseTimeout":"300"},"executionInfo":{"cancelRequested":false,"closeStatus":"COMPLETED","closeTimestamp":1.488537003712E9,"execution":{"runId":"22whTBuUw1fjwfLHU4XuHBL+uPmpOpU2900LflTOTTVg0=","workflowId":"test-simpleflow-workflow"},"executionStatus":"CLOSED","startTimestamp":1.488537001561E9,"workflowType":{"name":"example","version":"example"}},"openCounts":{"openActivityTasks":0,"openChildWorkflowExecutions":0,"openDecisionTasks":0,"openLambdaFunctions":0,"openTimers":0}}'}
61+
headers:
62+
content-length: ['661']
63+
content-type: [application/json]
64+
x-amzn-requestid: [5d13f8a2-fffc-11e6-8d43-f7a87b88066c]
65+
status: {code: 200, message: OK}
66+
- request:
67+
body: !!python/unicode '{"domain": "TestDomain", "execution": {"workflowId": "test-simpleflow-workflow",
68+
"runId": "22whTBuUw1fjwfLHU4XuHBL+uPmpOpU2900LflTOTTVg0="}}'
69+
headers:
70+
Authorization: ['AWS4-HMAC-SHA256 Credential=1234AB/20160823/us-east-1/swf/aws4_request,SignedHeaders=host;x-amz-date;x-amz-target,Signature=foobar']
71+
Content-Encoding: [amz-1.0]
72+
Content-Length: ['140']
73+
Content-Type: [application/json; charset=UTF-8]
74+
Host: [swf.us-east-1.amazonaws.com]
75+
User-Agent: [Boto/2.46.1 Python/2.7.12+ Linux/4.8.0-38-generic]
76+
X-Amz-Date: [20170303T103104Z]
77+
X-Amz-Target: [com.amazonaws.swf.service.model.SimpleWorkflowService.GetWorkflowExecutionHistory]
78+
method: POST
79+
uri: https://swf.us-east-1.amazonaws.com/
80+
response:
81+
body: {string: !!python/unicode '{"events":[{"eventId":1,"eventTimestamp":1.488537001561E9,"eventType":"WorkflowExecutionStarted","workflowExecutionStartedEventAttributes":{"childPolicy":"TERMINATE","executionStartToCloseTimeout":"300","input":"{\"args\":[true],\"kwargs\":{}}","parentInitiatedEventId":0,"taskList":{"name":"test-simpleflow-workflow-b3bc4bd5de8d48cf8908c8b00a834e79"},"taskStartToCloseTimeout":"300","workflowType":{"name":"example","version":"example"}}},{"decisionTaskScheduledEventAttributes":{"startToCloseTimeout":"300","taskList":{"name":"test-simpleflow-workflow-b3bc4bd5de8d48cf8908c8b00a834e79"}},"eventId":2,"eventTimestamp":1.488537001561E9,"eventType":"DecisionTaskScheduled"},{"decisionTaskStartedEventAttributes":{"identity":"{\"hostname\":\"zeb-Precision-T3610\",\"pid\":7418,\"user\":\"zeb\"}","scheduledEventId":2},"eventId":3,"eventTimestamp":1.488537001621E9,"eventType":"DecisionTaskStarted"},{"decisionTaskCompletedEventAttributes":{"scheduledEventId":2,"startedEventId":3},"eventId":4,"eventTimestamp":1.488537002044E9,"eventType":"DecisionTaskCompleted"},{"eventId":5,"eventTimestamp":1.488537002044E9,"eventType":"MarkerRecorded","markerRecordedEventAttributes":{"decisionTaskCompletedEventId":4,"markerName":"marker
82+
1"}},{"eventId":6,"eventTimestamp":1.488537002044E9,"eventType":"TimerStarted","timerStartedEventAttributes":{"decisionTaskCompletedEventId":4,"startToFireTimeout":"0","timerId":"_simpleflow_wake_up_timer"}},{"eventId":7,"eventTimestamp":1.488537002167E9,"eventType":"TimerFired","timerFiredEventAttributes":{"startedEventId":6,"timerId":"_simpleflow_wake_up_timer"}},{"decisionTaskScheduledEventAttributes":{"startToCloseTimeout":"300","taskList":{"name":"test-simpleflow-workflow-b3bc4bd5de8d48cf8908c8b00a834e79"}},"eventId":8,"eventTimestamp":1.488537002167E9,"eventType":"DecisionTaskScheduled"},{"decisionTaskStartedEventAttributes":{"identity":"{\"hostname\":\"zeb-Precision-T3610\",\"pid\":7416,\"user\":\"zeb\"}","scheduledEventId":8},"eventId":9,"eventTimestamp":1.48853700224E9,"eventType":"DecisionTaskStarted"},{"decisionTaskCompletedEventAttributes":{"scheduledEventId":8,"startedEventId":9},"eventId":10,"eventTimestamp":1.488537002693E9,"eventType":"DecisionTaskCompleted"},{"eventId":11,"eventTimestamp":1.488537002693E9,"eventType":"MarkerRecorded","markerRecordedEventAttributes":{"decisionTaskCompletedEventId":10,"details":"\"some
83+
details\"","markerName":"marker 1"}},{"eventId":12,"eventTimestamp":1.488537002693E9,"eventType":"TimerStarted","timerStartedEventAttributes":{"decisionTaskCompletedEventId":10,"startToFireTimeout":"0","timerId":"_simpleflow_wake_up_timer"}},{"eventId":13,"eventTimestamp":1.48853700271E9,"eventType":"TimerFired","timerFiredEventAttributes":{"startedEventId":12,"timerId":"_simpleflow_wake_up_timer"}},{"decisionTaskScheduledEventAttributes":{"startToCloseTimeout":"300","taskList":{"name":"test-simpleflow-workflow-b3bc4bd5de8d48cf8908c8b00a834e79"}},"eventId":14,"eventTimestamp":1.48853700271E9,"eventType":"DecisionTaskScheduled"},{"decisionTaskStartedEventAttributes":{"identity":"{\"hostname\":\"zeb-Precision-T3610\",\"pid\":7418,\"user\":\"zeb\"}","scheduledEventId":14},"eventId":15,"eventTimestamp":1.488537002771E9,"eventType":"DecisionTaskStarted"},{"decisionTaskCompletedEventAttributes":{"scheduledEventId":14,"startedEventId":15},"eventId":16,"eventTimestamp":1.488537003206E9,"eventType":"DecisionTaskCompleted"},{"eventId":17,"eventTimestamp":1.488537003206E9,"eventType":"MarkerRecorded","markerRecordedEventAttributes":{"decisionTaskCompletedEventId":16,"details":"\"2nd
84+
marker''s details\"","markerName":"marker 2"}},{"eventId":18,"eventTimestamp":1.488537003206E9,"eventType":"TimerStarted","timerStartedEventAttributes":{"decisionTaskCompletedEventId":16,"startToFireTimeout":"0","timerId":"_simpleflow_wake_up_timer"}},{"eventId":19,"eventTimestamp":1.488537003233E9,"eventType":"TimerFired","timerFiredEventAttributes":{"startedEventId":18,"timerId":"_simpleflow_wake_up_timer"}},{"decisionTaskScheduledEventAttributes":{"startToCloseTimeout":"300","taskList":{"name":"test-simpleflow-workflow-b3bc4bd5de8d48cf8908c8b00a834e79"}},"eventId":20,"eventTimestamp":1.488537003233E9,"eventType":"DecisionTaskScheduled"},{"decisionTaskStartedEventAttributes":{"identity":"{\"hostname\":\"zeb-Precision-T3610\",\"pid\":7416,\"user\":\"zeb\"}","scheduledEventId":20},"eventId":21,"eventTimestamp":1.488537003291E9,"eventType":"DecisionTaskStarted"},{"decisionTaskCompletedEventAttributes":{"scheduledEventId":20,"startedEventId":21},"eventId":22,"eventTimestamp":1.488537003712E9,"eventType":"DecisionTaskCompleted"},{"eventId":23,"eventTimestamp":1.488537003712E9,"eventType":"WorkflowExecutionCompleted","workflowExecutionCompletedEventAttributes":{"decisionTaskCompletedEventId":22,"result":"null"}}]}'}
85+
headers:
86+
content-length: ['4796']
87+
content-type: [application/json]
88+
x-amzn-requestid: [816a56b1-fffc-11e6-ad40-e15570700f4f]
89+
status: {code: 200, message: OK}
90+
version: 1

0 commit comments

Comments
 (0)