Skip to content

Commit b3556ec

Browse files
authored
Merge pull request #273 from botify-labs/enhancement/272/Implement-workflow-cancelation
Enhancement/272/implement workflow cancelation
2 parents 5885f4d + 1352b0e commit b3556ec

File tree

8 files changed

+372
-3
lines changed

8 files changed

+372
-3
lines changed

simpleflow/history.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ def __init__(self, history):
4040
self._markers = collections.OrderedDict()
4141
self._timers = {}
4242
self._tasks = []
43+
self._cancel_requested = None
44+
self._cancel_failed = None
45+
self.started_decision_id = None
46+
self.completed_decision_id = None
4347

4448
@property
4549
def swf_history(self):
@@ -82,6 +86,38 @@ def signals(self):
8286
"""
8387
return self._signals
8488

89+
@property
90+
def cancel_requested(self):
91+
"""
92+
:return: Last cancel requested event, if any.
93+
:rtype: Optional[dict]
94+
"""
95+
return self._cancel_requested
96+
97+
@property
98+
def cancel_failed(self):
99+
"""
100+
:return: Last cancel failed event, if any.
101+
:rtype: Optional[dict]
102+
"""
103+
return self._cancel_failed
104+
105+
@property
106+
def cancel_requested_id(self):
107+
"""
108+
:return: ID of last cancel requested event, if any.
109+
:rtype: Optional[int]
110+
"""
111+
return self._cancel_requested['event_id'] if self._cancel_requested else None
112+
113+
@property
114+
def cancel_failed_decision_task_completed_event_id(self):
115+
"""
116+
:return: ID of last cancel failed event, if any.
117+
:rtype: Optional[int]
118+
"""
119+
return self._cancel_failed['decision_task_completed_event_id'] if self._cancel_failed else None
120+
85121
@property
86122
def signaled_workflows(self):
87123
"""
@@ -222,6 +258,18 @@ def get_activity():
222258
activity['state'] = event.state
223259
activity['details'] = getattr(event, 'details', '')
224260
activity['cancelled_timestamp'] = event.timestamp
261+
elif event.state == 'cancel_requested':
262+
activity = {
263+
'type': 'activity',
264+
'id': event.activity_id,
265+
'state': event.state,
266+
'cancel_requested_timestamp': event.timestamp,
267+
}
268+
if event.activity_id not in self._activities:
269+
self._activities[event.activity_id] = activity
270+
self._tasks.append(activity)
271+
else:
272+
self._activities[event.activity_id].update(activity)
225273

226274
def parse_child_workflow_event(self, events, event):
227275
"""Aggregate all the attributes of a workflow in a single entry.
@@ -371,13 +419,34 @@ def parse_workflow_event(self, events, event):
371419
}
372420
self._signals[event.signal_name] = signal
373421
self._tasks.append(signal)
422+
elif event.state == 'cancel_requested':
423+
cancel_requested = {
424+
'type': event.state,
425+
'cause': getattr(event, 'cause', None),
426+
'external_initiated_event_id': getattr(event, 'external_initiated_event_id', None),
427+
'external_run_id': getattr(event, 'external_workflow_execution', {}).get('runId'),
428+
'external_workflow_id': getattr(event, 'external_workflow_execution', {}).get('workflowId'),
429+
'event_id': event.id,
430+
'timestamp': event.timestamp,
431+
}
432+
self._cancel_requested = cancel_requested
433+
elif event.state == 'cancel_failed':
434+
cancel_failed = {
435+
'type': event.state,
436+
'cause': getattr(event, 'cause', None),
437+
'event_id': event.id,
438+
'decision_task_completed_event_id': event.decision_task_completed_event_id,
439+
'timestamp': event.timestamp,
440+
}
441+
self._cancel_failed = cancel_failed
374442

375443
def parse_external_workflow_event(self, events, event):
376444
"""
377445
Parse an external workflow event.
378446
:param events:
379447
:param event:
380448
"""
449+
381450
def get_workflow(workflows):
382451
initiated_event = events[event.initiated_event_id - 1]
383452
return workflows[initiated_event.workflow_id]
@@ -452,6 +521,7 @@ def get_workflow(workflows):
452521
workflow = get_workflow(self._external_workflows_canceling)
453522
workflow['run_id'] = event.workflow_execution['runId']
454523
workflow['workflow_id'] = event.workflow_execution['workflowId']
524+
workflow['cancel_requested_event_id'] = event.id
455525
workflow['cancel_requested_timestamp'] = event.timestamp
456526

457527
def parse_marker_event(self, events, event):
@@ -522,8 +592,15 @@ def parse_timer_event(self, events, event):
522592
timer['cancel_failed_event_id'] = event.id
523593
timer['cancel_failed_event_timestamp'] = event.timestamp
524594

595+
def parse_decision_event(self, events, event):
596+
if event.state == 'started':
597+
self.started_decision_id = event.id
598+
if event.state == 'completed':
599+
self.completed_decision_id = event.id
600+
525601
TYPE_TO_PARSER = {
526602
'ActivityTask': parse_activity_event,
603+
'DecisionTask': parse_decision_event,
527604
'ChildWorkflowExecution': parse_child_workflow_event,
528605
'WorkflowExecution': parse_workflow_event,
529606
'ExternalWorkflowExecution': parse_external_workflow_event,

simpleflow/local/executor.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from simpleflow.signal import WaitForSignal
1212
from simpleflow.task import ActivityTask, WorkflowTask, SignalTask, MarkerTask
1313
from simpleflow.activity import Activity
14+
from simpleflow.utils import format_exc
1415
from simpleflow.workflow import Workflow
1516
from swf.models.history import builder
1617
from simpleflow.history import History
@@ -89,9 +90,9 @@ def submit(self, func, *args, **kwargs):
8990
state = 'completed'
9091
except Exception as err:
9192
future._exception = err
92-
logger.info('rescuing exception: {}'.format(err))
93+
logger.exception('rescuing exception: {}'.format(err))
9394
if isinstance(func, Activity) and func.raises_on_failure:
94-
message = err.args[0] if err.args else ''
95+
message = format_exc(err)
9596
raise exceptions.TaskFailed(func.name, message)
9697
state = 'failed'
9798
finally:

simpleflow/swf/executor.py

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,8 +185,14 @@ def __init__(self, domain, workflow_class, task_list=None, repair_with=None,
185185
self.force_activities = re.compile(force_activities)
186186
else:
187187
self.force_activities = None
188+
self._open_activity_count = 0
189+
self._decisions = []
190+
self._append_timer = False # Append an immediate timer decision
191+
self._tasks = TaskRegistry()
192+
self._idempotent_tasks_to_submit = set()
193+
self._execution = None
194+
self.current_priority = None
188195

189-
# noinspection PyAttributeOutsideInit
190196
def reset(self):
191197
"""
192198
Clears the state of the execution.
@@ -892,10 +898,12 @@ def replay(self, decision_response, decref_workflow=True):
892898
"""
893899
self.reset()
894900

901+
# noinspection PyUnresolvedReferences
895902
history = decision_response.history
896903
self._history = History(history)
897904
self._history.parse()
898905
self.build_execution_context(decision_response)
906+
# noinspection PyUnresolvedReferences
899907
self._execution = decision_response.execution
900908

901909
workflow_started_event = history[0]
@@ -906,8 +914,17 @@ def replay(self, decision_response, decref_workflow=True):
906914
kwargs = input.get('kwargs', {})
907915

908916
self.before_replay()
917+
909918
try:
910919
self.propagate_signals()
920+
if self._history.cancel_requested:
921+
decisions = self.handle_cancel_requested()
922+
if decisions is not None:
923+
self.after_replay()
924+
self.after_closed()
925+
if decref_workflow:
926+
self.decref_workflow()
927+
return decisions
911928
result = self.run_workflow(*args, **kwargs)
912929
except exceptions.ExecutionBlocked:
913930
logger.info('{} open activities ({} decisions)'.format(
@@ -997,6 +1014,9 @@ def on_completed(self):
9971014
except NotImplementedError:
9981015
pass
9991016

1017+
def on_canceled(self):
1018+
self._workflow.on_canceled(self._history)
1019+
10001020
def fail(self, reason, details=None):
10011021
self.on_failure(reason, details)
10021022

@@ -1022,11 +1042,13 @@ def build_execution_context(self, decision_response):
10221042
:param decision_response:
10231043
:type decision_response: swf.responses.Response
10241044
"""
1045+
# noinspection PyUnresolvedReferences
10251046
execution = decision_response.execution
10261047
if not execution:
10271048
# For tests that don't provide an execution object.
10281049
return
10291050

1051+
# noinspection PyUnresolvedReferences
10301052
history = decision_response.history
10311053
workflow_started_event = history[0]
10321054
self._execution_context = dict(
@@ -1179,3 +1201,30 @@ def get_event_details(self, event_type, event_name):
11791201
raise ValueError('Unimplemented type {!r} for get_event_details'.format(
11801202
event_type
11811203
))
1204+
1205+
def handle_cancel_requested(self):
1206+
decision = swf.models.decision.WorkflowExecutionDecision()
1207+
is_current_decision = self._history.completed_decision_id < self._history.cancel_requested_id
1208+
should_cancel = self._workflow.should_cancel(self._history)
1209+
if not should_cancel:
1210+
return None # ignore cancel
1211+
if is_current_decision:
1212+
self.on_canceled()
1213+
decision.cancel()
1214+
return [decision]
1215+
if self._history.cancel_failed:
1216+
logger.warning('failed: %s', self._history.cancel_failed)
1217+
if self._history.cancel_failed and self._history.cancel_failed_decision_task_completed_event_id == self._history.completed_decision_id:
1218+
# Per http://docs.aws.amazon.com/amazonswf/latest/apireference/API_Decision.html,
1219+
# we should call RespondDecisionTaskCompleted without any decisions; however this hangs the workflow...
1220+
1221+
# <1 WorkflowExecution : started>, <2 DecisionTask : scheduled>, <3 DecisionTask : started>,
1222+
# <4 DecisionTask : completed>, <5 ActivityTask : scheduled>, <6 ActivityTask : started>,
1223+
# <7 WorkflowExecution : cancel_requested>, <8 DecisionTask : scheduled>, <9 DecisionTask : started>,
1224+
# <10 ActivityTask : completed>, <11 DecisionTask : scheduled>, <12 DecisionTask : completed>,
1225+
# <13 WorkflowExecution : cancel_failed>, <14 DecisionTask : started>
1226+
1227+
# return []
1228+
pass
1229+
decision.cancel()
1230+
return [decision]

simpleflow/workflow.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,15 @@ def on_completed(self, history):
167167
"""
168168
pass
169169

170+
def on_canceled(self, history):
171+
"""
172+
Method called on canceling the execution.
173+
174+
:param history:
175+
:type history: simpleflow.history.History
176+
"""
177+
pass
178+
170179
def get_execution_context(self):
171180
"""
172181
Get an execution context from the executor.
@@ -208,3 +217,11 @@ def start_timer(self, timer_id, timeout, control=None):
208217

209218
def cancel_timer(self, timer_id):
210219
return CancelTimerTask(timer_id)
220+
221+
def should_cancel(self, history):
222+
"""
223+
Called by the executor if cancel requested.
224+
:param history:
225+
:return:
226+
"""
227+
return True
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
interactions:
2+
- request:
3+
body: !!python/unicode '{"domain": "TestDomain", "workflowType": {"version": "example",
4+
"name": "example"}}'
5+
headers:
6+
Authorization: ['AWS4-HMAC-SHA256 Credential=1234AB/20160823/us-east-1/swf/aws4_request,SignedHeaders=host;x-amz-date;x-amz-target,Signature=foobar']
7+
Content-Encoding: [amz-1.0]
8+
Content-Length: ['83']
9+
Content-Type: [application/json; charset=UTF-8]
10+
Host: [swf.us-east-1.amazonaws.com]
11+
User-Agent: [Boto/2.46.1 Python/2.7.12+ Linux/4.8.0-53-generic]
12+
X-Amz-Date: [20170613T163306Z]
13+
X-Amz-Target: [com.amazonaws.swf.service.model.SimpleWorkflowService.DescribeWorkflowType]
14+
method: POST
15+
uri: https://swf.us-east-1.amazonaws.com/
16+
response:
17+
body: {string: !!python/unicode '{"configuration":{"defaultChildPolicy":"TERMINATE","defaultExecutionStartToCloseTimeout":"300","defaultTaskList":{"name":"None"},"defaultTaskStartToCloseTimeout":"300"},"typeInfo":{"creationDate":1.487851963534E9,"status":"REGISTERED","workflowType":{"name":"example","version":"example"}}}'}
18+
headers:
19+
content-length: ['290']
20+
content-type: [application/json]
21+
x-amzn-requestid: [fab29807-5055-11e7-b293-fb22db0b9b82]
22+
status: {code: 200, message: OK}
23+
- request:
24+
body: !!python/unicode '{"domain": "TestDomain", "taskList": {"name": "test-simpleflow-workflow-5f7b987ba10e4b4db22192c044052319"},
25+
"childPolicy": "TERMINATE", "input": "{\"args\":[true],\"kwargs\":{}}", "workflowType":
26+
{"version": "example", "name": "example"}, "workflowId": "test-simpleflow-workflow"}'
27+
headers:
28+
Authorization: ['AWS4-HMAC-SHA256 Credential=1234AB/20160823/us-east-1/swf/aws4_request,SignedHeaders=host;x-amz-date;x-amz-target,Signature=foobar']
29+
Content-Encoding: [amz-1.0]
30+
Content-Length: ['280']
31+
Content-Type: [application/json; charset=UTF-8]
32+
Host: [swf.us-east-1.amazonaws.com]
33+
User-Agent: [Boto/2.46.1 Python/2.7.12+ Linux/4.8.0-53-generic]
34+
X-Amz-Date: [20170613T163306Z]
35+
X-Amz-Target: [com.amazonaws.swf.service.model.SimpleWorkflowService.StartWorkflowExecution]
36+
method: POST
37+
uri: https://swf.us-east-1.amazonaws.com/
38+
response:
39+
body: {string: !!python/unicode '{"runId":"22X56OQhwfy+AtGx1iSo3HX51AK2DdtKzfUnU+RQElbms="}'}
40+
headers:
41+
content-length: ['58']
42+
content-type: [application/json]
43+
x-amzn-requestid: [faeeb73a-5055-11e7-970d-a3e28b0572da]
44+
status: {code: 200, message: OK}
45+
- request:
46+
body: !!python/unicode '{"domain": "TestDomain", "execution": {"workflowId": "test-simpleflow-workflow",
47+
"runId": "22X56OQhwfy+AtGx1iSo3HX51AK2DdtKzfUnU+RQElbms="}}'
48+
headers:
49+
Authorization: ['AWS4-HMAC-SHA256 Credential=1234AB/20160823/us-east-1/swf/aws4_request,SignedHeaders=host;x-amz-date;x-amz-target,Signature=foobar']
50+
Content-Encoding: [amz-1.0]
51+
Content-Length: ['140']
52+
Content-Type: [application/json; charset=UTF-8]
53+
Host: [swf.us-east-1.amazonaws.com]
54+
User-Agent: [Boto/2.46.1 Python/2.7.12+ Linux/4.8.0-53-generic]
55+
X-Amz-Date: [20170613T163309Z]
56+
X-Amz-Target: [com.amazonaws.swf.service.model.SimpleWorkflowService.DescribeWorkflowExecution]
57+
method: POST
58+
uri: https://swf.us-east-1.amazonaws.com/
59+
response:
60+
body: {string: !!python/unicode '{"executionConfiguration":{"childPolicy":"TERMINATE","executionStartToCloseTimeout":"300","taskList":{"name":"test-simpleflow-workflow-5f7b987ba10e4b4db22192c044052319"},"taskStartToCloseTimeout":"300"},"executionInfo":{"cancelRequested":false,"closeStatus":"CANCELED","closeTimestamp":1.49737158914E9,"execution":{"runId":"22X56OQhwfy+AtGx1iSo3HX51AK2DdtKzfUnU+RQElbms=","workflowId":"test-simpleflow-workflow"},"executionStatus":"CLOSED","startTimestamp":1.497371587073E9,"workflowType":{"name":"example","version":"example"}},"latestActivityTaskTimestamp":1.49737158784E9,"openCounts":{"openActivityTasks":1,"openChildWorkflowExecutions":0,"openDecisionTasks":0,"openLambdaFunctions":0,"openTimers":0}}'}
61+
headers:
62+
content-length: ['705']
63+
content-type: [application/json]
64+
x-amzn-requestid: [fc630789-5055-11e7-8d87-ed8dd5432c37]
65+
status: {code: 200, message: OK}
66+
- request:
67+
body: !!python/unicode '{"domain": "TestDomain", "execution": {"workflowId": "test-simpleflow-workflow",
68+
"runId": "22X56OQhwfy+AtGx1iSo3HX51AK2DdtKzfUnU+RQElbms="}}'
69+
headers:
70+
Authorization: ['AWS4-HMAC-SHA256 Credential=1234AB/20160823/us-east-1/swf/aws4_request,SignedHeaders=host;x-amz-date;x-amz-target,Signature=foobar']
71+
Content-Encoding: [amz-1.0]
72+
Content-Length: ['140']
73+
Content-Type: [application/json; charset=UTF-8]
74+
Host: [swf.us-east-1.amazonaws.com]
75+
User-Agent: [Boto/2.46.1 Python/2.7.12+ Linux/4.8.0-53-generic]
76+
X-Amz-Date: [20170613T163510Z]
77+
X-Amz-Target: [com.amazonaws.swf.service.model.SimpleWorkflowService.GetWorkflowExecutionHistory]
78+
method: POST
79+
uri: https://swf.us-east-1.amazonaws.com/
80+
response:
81+
body: {string: !!python/unicode '{"events":[{"eventId":1,"eventTimestamp":1.497371587073E9,"eventType":"WorkflowExecutionStarted","workflowExecutionStartedEventAttributes":{"childPolicy":"TERMINATE","executionStartToCloseTimeout":"300","input":"{\"args\":[true],\"kwargs\":{}}","parentInitiatedEventId":0,"taskList":{"name":"test-simpleflow-workflow-5f7b987ba10e4b4db22192c044052319"},"taskStartToCloseTimeout":"300","workflowType":{"name":"example","version":"example"}}},{"decisionTaskScheduledEventAttributes":{"startToCloseTimeout":"300","taskList":{"name":"test-simpleflow-workflow-5f7b987ba10e4b4db22192c044052319"}},"eventId":2,"eventTimestamp":1.497371587073E9,"eventType":"DecisionTaskScheduled"},{"decisionTaskStartedEventAttributes":{"identity":"{\"hostname\":\"zeb-Precision-T3610\",\"pid\":13202,\"user\":\"zeb\"}","scheduledEventId":2},"eventId":3,"eventTimestamp":1.497371587123E9,"eventType":"DecisionTaskStarted"},{"decisionTaskCompletedEventAttributes":{"scheduledEventId":2,"startedEventId":3},"eventId":4,"eventTimestamp":1.497371587661E9,"eventType":"DecisionTaskCompleted"},{"activityTaskScheduledEventAttributes":{"activityId":"activity-tests.integration.workflow.cancel_workflow-1","activityType":{"name":"tests.integration.workflow.cancel_workflow","version":"example"},"decisionTaskCompletedEventId":4,"heartbeatTimeout":"300","input":"{\"args\":[],\"kwargs\":{}}","scheduleToCloseTimeout":"300","scheduleToStartTimeout":"300","startToCloseTimeout":"300","taskList":{"name":"test-simpleflow-workflow-5f7b987ba10e4b4db22192c044052319"}},"eventId":5,"eventTimestamp":1.497371587661E9,"eventType":"ActivityTaskScheduled"},{"activityTaskStartedEventAttributes":{"identity":"{\"hostname\":\"zeb-Precision-T3610\",\"pid\":13203,\"user\":\"zeb\"}","scheduledEventId":5},"eventId":6,"eventTimestamp":1.49737158784E9,"eventType":"ActivityTaskStarted"},{"eventId":7,"eventTimestamp":1.497371588667E9,"eventType":"WorkflowExecutionCancelRequested","workflowExecutionCancelRequestedEventAttributes":{"externalInitiatedEventId":0}},{"decisionTaskScheduledEventAttributes":{"startToCloseTimeout":"300","taskList":{"name":"test-simpleflow-workflow-5f7b987ba10e4b4db22192c044052319"}},"eventId":8,"eventTimestamp":1.497371588667E9,"eventType":"DecisionTaskScheduled"},{"decisionTaskStartedEventAttributes":{"identity":"{\"hostname\":\"zeb-Precision-T3610\",\"pid\":13204,\"user\":\"zeb\"}","scheduledEventId":8},"eventId":9,"eventTimestamp":1.497371588696E9,"eventType":"DecisionTaskStarted"},{"decisionTaskCompletedEventAttributes":{"scheduledEventId":8,"startedEventId":9},"eventId":10,"eventTimestamp":1.49737158914E9,"eventType":"DecisionTaskCompleted"},{"eventId":11,"eventTimestamp":1.49737158914E9,"eventType":"WorkflowExecutionCanceled","workflowExecutionCanceledEventAttributes":{"decisionTaskCompletedEventId":10}}]}'}
82+
headers:
83+
content-length: ['2803']
84+
content-type: [application/json]
85+
x-amzn-requestid: [44f5d3ac-5056-11e7-8887-8d61a3f8196e]
86+
status: {code: 200, message: OK}
87+
version: 1

0 commit comments

Comments
 (0)