Skip to content

Commit cff1b24

Browse files
committed
take latest changes from upstream
1 parent a5081a9 commit cff1b24

28 files changed

+1091
-133
lines changed

README.md

Lines changed: 86 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,65 +1,46 @@
1-
# Python framework for Cadence Workflow Service
1+
# Intro: Fault-Oblivious Stateful Python Code
22

3-
[Cadence](https://github.com/uber/cadence) is a workflow engine developed at Uber Engineering. With this framework, workflows and activities managed by Cadence can be implemented in Python code.
3+
cadence-python allows you to create Python functions that have their state (local variables etc..) implicitly saved such that if the process/machine fails the state of the function is not lost and can resume from where it left off.
44

5-
## Status / TODO
5+
This programming model is useful whenever you need to ensure that a function runs to completion. For example:
66

7-
cadence-python is still under going heavy development. It should be considered EXPERIMENTAL at the moment. A production
8-
version is targeted to be released in ~~September of 2019~~ ~~January 2020~~ March 2020.
7+
- Business logic involving multiple micro services
8+
- CI/CD pipelines
9+
- Data pipelines
10+
- RPA
11+
- ETL
12+
- Marketing automation / Customer journeys / Customer engagement
13+
- Zapier/IFTTT like end user automation.
14+
- Chat bots
15+
- Multi-step forms
16+
- Scheduler/Cron jobs
917

10-
1.0
11-
- [x] Tchannel implementation
12-
- [x] Python-friendly wrapper around Cadence's Thrift API
13-
- [x] Author activities in Python
14-
- [x] Start workflows (synchronously)
15-
- [x] Create workflows
16-
- [x] Workflow execution in coroutines
17-
- [x] Invoke activities from workflows
18-
- [x] ActivityCompletionClient heartbeat, complete, complete_exceptionally
19-
- [x] Activity heartbeat, getHeartbeatDetails and doNotCompleteOnReturn
20-
- [x] Activity retry
21-
- [x] Activity getDomain(), getTaskToken(), getWorkflowExecution()
22-
- [x] Signals
23-
- [x] Queries
24-
- [x] Async workflow execution
25-
- [x] await
26-
- [x] now (currentTimeMillis)
27-
- [x] Sleep
28-
- [x] Loggers
29-
- [x] newRandom
30-
- [x] UUID
31-
- [x] Workflow Versioning
32-
- [x] WorkflowClient.newWorkflowStub(Class workflowInterface, String workflowId);
18+
Behind the scenes, cadence-python uses [Cadence](https://github.com/uber/cadence) as its backend.
3319

34-
1.1
35-
- [ ] ActivityStub and Workflow.newUntypedActivityStub
36-
- [ ] Classes as arguments and return values to/from activity and workflow methods
37-
- [ ] WorkflowStub and WorkflowClient.newUntypedWorkflowStub
38-
- [ ] Custom workflow ids through start() and new_workflow_stub()
39-
- [ ] ContinueAsNew
40-
- [ ] Compatibility with Java client
41-
- [ ] Compatibility with Golang client
20+
For more information about the fault-oblivious programming model refer to the Cadence documentation [here](https://cadenceworkflow.io/docs/03_concepts/01_workflows)
4221

43-
2.0
44-
- [ ] Sticky workflows
22+
## Install Cadencce
4523

46-
Post 2.0:
47-
- [ ] sideEffect/mutableSideEffect
48-
- [ ] Parallel activity execution
49-
- [ ] Timers
50-
- [ ] Cancellation Scopes
51-
- [ ] Child Workflows
52-
- [ ] Explicit activity ids for activity invocations
24+
```
25+
wget https://raw.githubusercontent.com/uber/cadence/master/docker/docker-compose.yml
26+
docker-compose up
27+
```
5328

54-
## Installation
29+
## Register `sample` domain
5530

5631
```
57-
pip install cadence-client
32+
docker run --network=host --rm ubercadence/cli:master --do sample domain register -rd 1
5833
```
5934

60-
## Hello World Sample
35+
## Installation cadence-python
6136

6237
```
38+
pip install cadence-client==1.0.0b3
39+
```
40+
41+
## Hello World Sample
42+
43+
```python
6344
import sys
6445
import logging
6546
from cadence.activity_method import activity_method
@@ -82,7 +63,7 @@ class GreetingActivities:
8263
# Activities Implementation
8364
class GreetingActivitiesImpl:
8465
def compose_greeting(self, greeting: str, name: str):
85-
return greeting + " " + name + "!"
66+
return f"{greeting} {name}!"
8667

8768

8869
# Workflow Interface
@@ -99,6 +80,9 @@ class GreetingWorkflowImpl(GreetingWorkflow):
9980
self.greeting_activities: GreetingActivities = Workflow.new_activity_stub(GreetingActivities)
10081

10182
async def get_greeting(self, name):
83+
# Place any Python code here that you want to ensure is executed to completion.
84+
# Note: code in workflow functions must be deterministic so that the same code paths
85+
# are ran during replay.
10286
return await self.greeting_activities.compose_greeting("Hello", name)
10387

10488

@@ -118,4 +102,56 @@ if __name__ == '__main__':
118102
worker.stop()
119103
print("Workers stopped...")
120104
sys.exit(0)
121-
```
105+
```
106+
107+
## Status / TODO
108+
109+
cadence-python is still under going heavy development. It should be considered EXPERIMENTAL at the moment. A production
110+
version is targeted to be released in ~~September of 2019~~ ~~January 2020~~ ~~March 2020~~ April 2020.
111+
112+
1.0
113+
- [x] Tchannel implementation
114+
- [x] Python-friendly wrapper around Cadence's Thrift API
115+
- [x] Author activities in Python
116+
- [x] Start workflows (synchronously)
117+
- [x] Create workflows
118+
- [x] Workflow execution in coroutines
119+
- [x] Invoke activities from workflows
120+
- [x] ActivityCompletionClient heartbeat, complete, complete_exceptionally
121+
- [x] Activity heartbeat, getHeartbeatDetails and doNotCompleteOnReturn
122+
- [x] Activity retry
123+
- [x] Activity getDomain(), getTaskToken(), getWorkflowExecution()
124+
- [x] Signals
125+
- [x] Queries
126+
- [x] Async workflow execution
127+
- [x] await
128+
- [x] now (currentTimeMillis)
129+
- [x] Sleep
130+
- [x] Loggers
131+
- [x] newRandom
132+
- [x] UUID
133+
- [x] Workflow Versioning
134+
- [x] WorkflowClient.newWorkflowStub(Class workflowInterface, String workflowId);
135+
136+
1.1
137+
- [ ] ActivityStub and Workflow.newUntypedActivityStub
138+
- [ ] Classes as arguments and return values to/from activity and workflow methods
139+
- [ ] WorkflowStub and WorkflowClient.newUntypedWorkflowStub
140+
- [ ] Custom workflow ids through start() and new_workflow_stub()
141+
- [ ] ContinueAsNew
142+
- [ ] Compatibility with Java client
143+
- [ ] Compatibility with Golang client
144+
145+
2.0
146+
- [ ] Sticky workflows
147+
148+
Post 2.0:
149+
- [ ] sideEffect/mutableSideEffect
150+
- [ ] Local activity
151+
- [ ] Parallel activity execution
152+
- [ ] Timers
153+
- [ ] Cancellation Scopes
154+
- [ ] Child Workflows
155+
- [ ] Explicit activity ids for activity invocations
156+
157+

cadence/activity_loop.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,22 @@
66
from cadence.cadence_types import PollForActivityTaskRequest, TaskListMetadata, TaskList, PollForActivityTaskResponse
77
from cadence.conversions import json_to_args
88
from cadence.workflowservice import WorkflowService
9-
from cadence.worker import Worker
9+
from cadence.worker import Worker, StopRequestedException
1010

1111
logger = logging.getLogger(__name__)
1212

1313

1414
def activity_task_loop(worker: Worker):
15-
service: WorkflowService = WorkflowService.create(worker.host, worker.port)
15+
service: WorkflowService = WorkflowService.create(worker.host, worker.port, timeout=worker.get_timeout())
1616
worker.manage_service(service)
1717
logger.info(f"Activity task worker started: {WorkflowService.get_identity()}")
1818
try:
1919
while True:
2020
if worker.is_stop_requested():
2121
return
2222
try:
23+
service.set_next_timeout_cb(worker.raise_if_stop_requested)
24+
2325
polling_start = datetime.datetime.now()
2426
polling_request = PollForActivityTaskRequest()
2527
polling_request.task_list_metadata = TaskListMetadata()
@@ -32,6 +34,8 @@ def activity_task_loop(worker: Worker):
3234
task, err = service.poll_for_activity_task(polling_request)
3335
polling_end = datetime.datetime.now()
3436
logger.debug("PollForActivityTask: %dms", (polling_end - polling_start).total_seconds() * 1000)
37+
except StopRequestedException:
38+
return
3539
except Exception as ex:
3640
logger.error("PollForActivityTask error: %s", ex)
3741
continue
@@ -75,4 +79,8 @@ def activity_task_loop(worker: Worker):
7579
process_end = datetime.datetime.now()
7680
logger.info("Process ActivityTask: %dms", (process_end - process_start).total_seconds() * 1000)
7781
finally:
82+
try:
83+
service.close()
84+
except:
85+
logger.warning("service.close() failed", exc_info=1)
7886
worker.notify_thread_stopped()

cadence/activity_method.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ async def stub_activity_fn(self, *args):
5454
assert self._decision_context
5555
assert stub_activity_fn._execute_parameters
5656
parameters = copy.deepcopy(stub_activity_fn._execute_parameters)
57+
if hasattr(self, "_activity_options") and self._activity_options:
58+
self._activity_options.fill_execute_activity_parameters(parameters)
5759
if self._retry_parameters:
5860
parameters.retry_parameters = self._retry_parameters
5961
parameters.input = args_to_json(args).encode("utf-8")
@@ -80,3 +82,24 @@ async def stub_activity_fn(self, *args):
8082
raise Exception("activity_method must be called with arguments")
8183
else:
8284
return wrapper
85+
86+
87+
@dataclass
88+
class ActivityOptions:
89+
schedule_to_close_timeout_seconds: int = None
90+
schedule_to_start_timeout_seconds: int = None
91+
start_to_close_timeout_seconds: int = None
92+
heartbeat_timeout_seconds: int = None
93+
task_list: str = None
94+
95+
def fill_execute_activity_parameters(self, execute_parameters: ExecuteActivityParameters):
96+
if self.schedule_to_close_timeout_seconds is not None:
97+
execute_parameters.schedule_to_close_timeout_seconds = self.schedule_to_close_timeout_seconds
98+
if self.schedule_to_start_timeout_seconds is not None:
99+
execute_parameters.schedule_to_start_timeout_seconds = self.schedule_to_start_timeout_seconds
100+
if self.start_to_close_timeout_seconds is not None:
101+
execute_parameters.start_to_close_timeout_seconds = self.start_to_close_timeout_seconds
102+
if self.heartbeat_timeout_seconds is not None:
103+
execute_parameters.heartbeat_timeout_seconds = self.heartbeat_timeout_seconds
104+
if self.task_list is not None:
105+
execute_parameters.task_list = self.task_list

cadence/connection.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
from __future__ import annotations
22

33
import getpass
4-
import os
54
import socket
65
from dataclasses import dataclass
76
from io import BytesIO
8-
from typing import IO, List, Union, Optional, Dict
7+
from typing import IO, List, Union, Optional, Dict, Callable
98

109
from cadence.frames import InitReqFrame, Frame, Arg, CallReqFrame, CallReqContinueFrame, CallResFrame, \
1110
CallResContinueFrame, FrameWithArgs, CallFlags, ErrorFrame
@@ -300,19 +299,23 @@ class TChannelConnection:
300299
s: socket.socket
301300

302301
@classmethod
303-
def open(cls, host: object, port: object) -> TChannelConnection:
302+
def open(cls, host: object, port: object, timeout: int = None) -> TChannelConnection:
304303
s: socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
304+
s.settimeout(timeout)
305305
s.connect((host, port))
306306
return cls(s)
307307

308308
def __init__(self, s: socket):
309309
self.s = s
310310
self.file = self.s.makefile("rwb")
311-
self.wrapper = IOWrapper(self.file)
311+
self.wrapper = IOWrapper(self.file, socket_=s)
312312
self.current_id = -1
313313

314314
self.handshake()
315315

316+
def set_next_timeout_cb(self, cb: Callable):
317+
self.wrapper.set_next_timeout_cb(cb)
318+
316319
def new_id(self):
317320
self.current_id += 1
318321
return self.current_id
@@ -340,6 +343,7 @@ def read_frame(self):
340343

341344
def close(self):
342345
self.s.close()
346+
self.wrapper.close()
343347

344348
def call_function(self, call: ThriftFunctionCall) -> ThriftFunctionResponse:
345349
frames = call.build_frames(self.new_id())

cadence/constants.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
11

22
CODE_OK = 0x00
33
CODE_ERROR = 0x01
4+
5+
# This should be at least 60 seconds because Cadence will reply after 60 seconds when polling
6+
# if there is nothing pending
7+
DEFAULT_SOCKET_TIMEOUT_SECONDS = 120

0 commit comments

Comments
 (0)