Skip to content

Commit ec0fe2e

Browse files
authored
Merge pull request #217 from botify-labs/enhancement/216/Implement-markers
Implement markers (#216)
2 parents 56faf29 + d3d5ba0 commit ec0fe2e

File tree

17 files changed

+392
-74
lines changed

17 files changed

+392
-74
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ output/*/index.html
4343
# Sphinx
4444
docs/_build
4545
docs/api/*
46+
!docs/api/.gitkeep
4647

4748
# Coverage
4849
htmlcov/

examples/marker.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
from simpleflow import Workflow, futures
2+
3+
4+
class MarkerWorkflow(Workflow):
5+
name = 'example'
6+
version = 'example'
7+
task_list = 'example'
8+
9+
def run(self):
10+
m = self.submit(self.record_marker('marker 1'))
11+
m = self.submit(self.record_marker('marker 1', 'some details'))
12+
self.submit(self.record_marker('marker 2', "2nd marker's details"))
13+
futures.wait(m)
14+
print('Markers: {}'.format(self.list_markers()))
15+
print('Markers, all: {}'.format(self.list_markers(all=True)))

simpleflow/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
from .activity import Activity # NOQA
66
from .workflow import Workflow # NOQA
7+
from .signal import WaitForSignal # NOQA
78

89
from . import settings
910

simpleflow/command.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -426,11 +426,11 @@ def create_unique_task_list(workflow_id=''):
426426
required=False,
427427
default=60,
428428
help='Heartbeat interval in seconds (0 to disable heartbeating).')
429-
@click.option('--nb-workers',
429+
@click.option('--nb-workers', '-W',
430430
type=int,
431431
required=False,
432432
help='Number of parallel processes handling activity tasks.')
433-
@click.option('--nb-deciders',
433+
@click.option('--nb-deciders', '-D',
434434
type=int,
435435
required=False,
436436
help='Number of parallel processes handling decision tasks.')

simpleflow/executor.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,11 +161,18 @@ def get_execution_context(self):
161161
"""
162162
return {}
163163

164+
@abc.abstractmethod
164165
def signal(self, name, *args, **kwargs):
165166
raise NotImplementedError
166167

168+
@abc.abstractmethod
167169
def wait_signal(self, name):
168170
raise NotImplementedError
169171

170-
def get_future_from_signal(self, name):
172+
@abc.abstractmethod
173+
def record_marker(self, name, details=None):
174+
raise NotImplementedError
175+
176+
@abc.abstractmethod
177+
def list_markers(self, all=False):
171178
raise NotImplementedError

simpleflow/history.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ class History(object):
2121
:type _external_workflows_canceling: collections.OrderedDict[str, dict[str, Any]]
2222
:ivar _signals: activity events
2323
:type _signals: collections.OrderedDict[str, dict[str, Any]]
24+
:ivar _markers: marker events
25+
:type _markers: collections.OrderedDict[str, list[dict[str, Any]]]
2426
:ivar _tasks: ordered list of tasks/etc
2527
:type _tasks: list[dict[str, Any]]
2628
"""
@@ -33,8 +35,18 @@ def __init__(self, history):
3335
self._external_workflows_canceling = collections.OrderedDict()
3436
self._signals = collections.OrderedDict()
3537
self._signaled_workflows = collections.defaultdict(list)
38+
self._markers = collections.OrderedDict()
3639
self._tasks = []
3740

41+
@property
42+
def swf_history(self):
43+
"""
44+
45+
:return: SWF history
46+
:rtype: swf.models.history.History
47+
"""
48+
return self._history
49+
3850
@property
3951
def activities(self):
4052
"""
@@ -75,8 +87,21 @@ def signaled_workflows(self):
7587
"""
7688
return self._signaled_workflows
7789

90+
@property
91+
def markers(self):
92+
"""
93+
94+
:return: Markers
95+
:rtype: collections.OrderedDict[str, list[dict[str, Any]]]
96+
"""
97+
return self._markers
98+
7899
@property
79100
def tasks(self):
101+
"""
102+
:return:
103+
:rtype: list[dict[str, Any]]
104+
"""
80105
return self._tasks
81106

82107
@property
@@ -422,11 +447,34 @@ def get_workflow(workflows):
422447
workflow['workflow_id'] = event.workflow_execution['workflowId']
423448
workflow['cancel_requested_timestamp'] = event.timestamp
424449

450+
def parse_marker_event(self, events, event):
451+
if event.state == 'recorded':
452+
marker = {
453+
'type': 'marker',
454+
'name': event.marker_name,
455+
'state': event.state,
456+
'details': getattr(event, 'details', None),
457+
'recorded_event_id': event.id,
458+
'recorded_event_timestamp': event.timestamp,
459+
}
460+
self._markers.setdefault(event.marker_name, []).append(marker)
461+
elif event.state == 'failed':
462+
marker = {
463+
'type': 'marker',
464+
'name': event.marker_name,
465+
'state': event.state,
466+
'cause': event.cause,
467+
'record_failed_event_id': event.id,
468+
'record_failed_event_timestamp': event.timestamp,
469+
}
470+
self._markers.setdefault(event.marker_name, []).append(marker)
471+
425472
TYPE_TO_PARSER = {
426473
'ActivityTask': parse_activity_event,
427474
'ChildWorkflowExecution': parse_child_workflow_event,
428475
'WorkflowExecution': parse_workflow_event,
429476
'ExternalWorkflowExecution': parse_external_workflow_event,
477+
'Marker': parse_marker_event,
430478
}
431479

432480
def parse(self):

simpleflow/local/executor.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import collections
12
import logging
23

34
from simpleflow import (
@@ -6,8 +7,9 @@
67
futures,
78
)
89
from simpleflow.base import Submittable
10+
from simpleflow.marker import Marker
911
from simpleflow.signal import WaitForSignal
10-
from simpleflow.task import ActivityTask, WorkflowTask, SignalTask
12+
from simpleflow.task import ActivityTask, WorkflowTask, SignalTask, MarkerTask
1113
from simpleflow.activity import Activity
1214
from simpleflow.workflow import Workflow
1315
from swf.models.history import builder
@@ -22,11 +24,13 @@ class Executor(executor.Executor):
2224
Executes all tasks synchronously in a single local process.
2325
2426
"""
27+
2528
def __init__(self, workflow_class):
2629
super(Executor, self).__init__(workflow_class)
2730
self.update_workflow_class()
2831
self.nb_activities = 0
2932
self.signals_sent = set()
33+
self._markers = collections.OrderedDict()
3034

3135
def update_workflow_class(self):
3236
"""
@@ -65,6 +69,8 @@ def submit(self, func, *args, **kwargs):
6569
raise NotImplementedError(
6670
'wait_signal({}) before signal was sent: unsupported by the local executor'.format(signal_name)
6771
)
72+
elif isinstance(func, MarkerTask):
73+
self._markers.setdefault(func.name, []).append(Marker(func.name, func.details))
6874

6975
if isinstance(func, Submittable):
7076
task = func # *args, **kwargs already resolved.
@@ -139,3 +145,11 @@ def signal(self, name, *args, **kwargs):
139145

140146
def wait_signal(self, name):
141147
return WaitForSignal(name)
148+
149+
def record_marker(self, name, details=None):
150+
return MarkerTask(name, details)
151+
152+
def list_markers(self, all=False):
153+
if all:
154+
return [m for ml in self._markers.values() for m in ml]
155+
return [m[-1] for m in self._markers.values()]

simpleflow/marker.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
class Marker(object):
2+
def __init__(self, name, details):
3+
self.name = name
4+
self.details = details
5+
6+
def __repr__(self):
7+
return '<{klass} {name!r} details={details!r}>'.format(
8+
klass=self.__class__.__name__,
9+
name=self.name,
10+
details=self.details,
11+
)

0 commit comments

Comments
 (0)