|
22 | 22 | from simpleflow.history import History |
23 | 23 | from simpleflow.marker import Marker |
24 | 24 | from simpleflow.signal import WaitForSignal |
| 25 | +from simpleflow.swf import constants |
25 | 26 | from simpleflow.swf.helpers import swf_identity |
26 | 27 | from simpleflow.swf.task import ActivityTask, WorkflowTask, SignalTask, MarkerTask, SwfTask |
27 | 28 | from simpleflow.task import ( |
28 | 29 | ActivityTask as BaseActivityTask, |
29 | 30 | WorkflowTask as BaseWorkflowTask, |
30 | 31 | SignalTask as BaseSignalTask, |
31 | 32 | MarkerTask as BaseMarkerTask, |
| 33 | +from simpleflow.utils import ( |
| 34 | + hex_hash, |
| 35 | + issubclass_, |
| 36 | + json_dumps, |
| 37 | + json_loads_or_raw, |
| 38 | + retry, |
32 | 39 | ) |
33 | | -from simpleflow.utils import issubclass_, json_dumps, hex_hash |
34 | | -from simpleflow.swf import constants |
35 | | -from simpleflow.utils import retry |
36 | 40 | from simpleflow.workflow import Workflow |
37 | 41 | from swf.core import ConnectedSWFObject |
38 | 42 |
|
@@ -264,7 +268,7 @@ def _get_future_from_activity_event(self, event): |
264 | 268 | future.set_running() |
265 | 269 | elif state == 'completed': |
266 | 270 | result = event['result'] |
267 | | - future.set_finished(json.loads(result) if result else None) |
| 271 | + future.set_finished(json_loads_or_raw(result)) |
268 | 272 | elif state == 'canceled': |
269 | 273 | future.set_cancelled() |
270 | 274 | elif state == 'failed': |
@@ -318,7 +322,7 @@ def _get_future_from_child_workflow_event(self, event): |
318 | 322 | elif state == 'started': |
319 | 323 | future.set_running() |
320 | 324 | elif state == 'completed': |
321 | | - future.set_finished(json.loads(event['result'])) |
| 325 | + future.set_finished(json_loads_or_raw(event['result'])) |
322 | 326 | elif state == 'failed': |
323 | 327 | future.set_exception(exceptions.TaskFailed( |
324 | 328 | name=event['id'], |
@@ -1056,10 +1060,13 @@ def record_marker(self, name, details=None): |
1056 | 1060 |
|
1057 | 1061 | def list_markers(self, all=False): |
1058 | 1062 | if all: |
1059 | | - return [Marker(m['name'], json.loads(m['details'])) for ml in self._history.markers.values() for m in ml] |
| 1063 | + return [ |
| 1064 | + Marker(m['name'], json_loads_or_raw(m['details'])) |
| 1065 | + for ml in self._history.markers.values() for m in ml |
| 1066 | + ] |
1060 | 1067 | rc = [] |
1061 | 1068 | for ml in self._history.markers.values(): |
1062 | 1069 | m = ml[-1] |
1063 | 1070 | if m['state'] == 'recorded': |
1064 | | - rc.append(Marker(m['name'], json.loads(m['details']))) |
| 1071 | + rc.append(Marker(m['name'], json_loads_or_raw(m['details']))) |
1065 | 1072 | return rc |
0 commit comments