Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# See https://pre-commit.com/hooks.html for more hooks
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.5.0
rev: v4.6.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
Expand All @@ -13,7 +13,7 @@ repos:

- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.5.0
rev: v0.5.1
hooks:
# Run the linter.
- id: ruff
Expand Down
1 change: 1 addition & 0 deletions docs/mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ nav:
- Error Handling: features/error_handling.md
- Continue As New: features/continue_as_new.md
- Middleware: features/middleware.md
- Proxy: features/proxy.md
- Development: development.md
- Contributing: contributing.md
- License: license.md
14 changes: 10 additions & 4 deletions docs/src/features/canvas.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,12 @@ from simpleflow.canvas import Chain, FuncGroup, Group
@activity.with_attributes(task_list="quickstart", version="example")
def partition_data(data_location):
# Partition a list of things to do into parallelizable sub-parts
pass
...


@activity.with_attributes(task_list="quickstart", version="example")
def execute_on_sub_part(sub_part):
pass
...


class AWorkflow(Workflow):
Expand All @@ -169,5 +169,11 @@ return an empty Group. Since this has been a long-standing policy, a new
`_allow_none` argument relaxes this constraint.

!!! warning
This is a new experimental option: a better one might be to enforce
that nothing is returned.
This is a new experimental option: a better one may be to allow
a `None` return value.

### Overriding future classes

Both the `Group` and `Canvas` instances delegate their work to a
`GroupFuture` (resp. `ChainFuture`) instance by default.
Passing a `future_class` argument allows overriding this.
11 changes: 6 additions & 5 deletions docs/src/features/command_line.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ List Workflow Executions
------------------------

$ simpleflow workflow.list TestDomain
Workflow ID Workflow Type Status
basic-example-1438722273 basic OPEN


Workflow Execution Status
-------------------------

$ simpleflow --header workflow.info TestDomain basic-example-1438722273
$ simpleflow workflow.info TestDomain basic-example-1438722273
domain workflow_type.name workflow_type.version task_list workflow_id run_id tag_list execution_time input
TestDomain basic example basic-example-1438722273 22QFVi362TnCh6BdoFgkQFlocunh24zEOemo1L12Yl5Go= 1.70 {'args': [1], 'kwargs': {}}

Expand All @@ -25,11 +26,11 @@ Tasks Status

You can check the status of the workflow execution with:

$ simpleflow --header workflow.tasks DOMAIN WORKFLOW_ID [RUN_ID] --nb-tasks 3
$ simpleflow --header workflow.tasks TestDomain basic-example-1438722273
$ simpleflow workflow.tasks DOMAIN WORKFLOW_ID [RUN_ID] --nb-tasks 3
$ simpleflow workflow.tasks TestDomain basic-example-1438722273
Tasks Last State Last State Time Scheduled Time
examples.basic.increment scheduled 2015-08-04 23:04:34.510000 2015-08-04 23:04:34.510000
$ simpleflow --header workflow.tasks TestDomain basic-example-1438722273
$ simpleflow workflow.tasks TestDomain basic-example-1438722273
Tasks Last State Last State Time Scheduled Time
examples.basic.double completed 2015-08-04 23:06:19.200000 2015-08-04 23:06:17.738000
examples.basic.delay completed 2015-08-04 23:08:18.402000 2015-08-04 23:06:17.738000
Expand All @@ -41,7 +42,7 @@ Profiling

You can profile the execution of the workflow with:

$ simpleflow --header workflow.profile TestDomain basic-example-1438722273
$ simpleflow workflow.profile TestDomain basic-example-1438722273
Task Last State Scheduled Time Scheduled Start Time Running End Percentage of total time
activity-examples.basic.double-1 completed 2015-08-04 23:06 0.07 2015-08-04 23:06 1.39 2015-08-04 23:06 1.15
activity-examples.basic.increment-1 completed 2015-08-04 23:04 102.20 2015-08-04 23:06 0.79 2015-08-04 23:06 0.65
Expand Down
5 changes: 3 additions & 2 deletions docs/src/features/continue_as_new.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# Continue As New

In long-running workflow executions, the history can hit the 25,000-events hard SWF limit. This causes execution
termination. To prevent this, the workflow can itself close the current execution and start another one by submitting
In long-running workflow executions, the history can hit the 25,000-events hard SWF limit.
This causes execution termination.
To prevent this, the workflow can itself close the current execution and start another one by submitting
`self.continue_as_new(*args, **kwargs)`: it is then restarted with a new run ID and an empty history.

See `examples/continue_as_new.py` for a demonstration of this pattern:
Expand Down
2 changes: 1 addition & 1 deletion docs/src/features/program_tasks.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Execution of Tasks as Programs
==============================

The `simpleflow.execute` module allows to define functions that will be
The `simpleflow.execute` module allows defining functions that will be
executed as a program.

There are two modes:
Expand Down
34 changes: 34 additions & 0 deletions docs/src/features/proxy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Worker Proxy Support

When deploying a fleet of workers on an instance, their number
may cause the maximum number of connections to SWF to be reached
and let some of these workers unable to fetch activities.

A proxy can be used to prevent this. Simpleflow can now use one,
and provides a simplistic single-threaded proxy.

## Starting the proxy

The `simpleflow proxy.start` command starts an HTTP proxy server.
It processes the CONNECT proxy method; the rest of the connection
is encrypted as before (it doesn't handle HTTPS to MITM the
communication).

## Using the proxy

The `simpleflow worker.start` command accepts a new `-x, --proxy` argument.

## Example

```shell
# Running the `pirate` example with multiple processes.
# Starting the decider, then the workflow, are left as exercises.
[screen 1] $ simpleflow proxy.start
Serving HTTP Proxy on ::1:4242
[screen 2] $ SWF_PROXY=localhost:4242 PYTHONPATH=$PWD SWF_DOMAIN=TestDomain \
simpleflow worker.start -N 1 -t pirate
```

## Environment setting

Both commands honor a new `SWF_PROXY` environment variable.
8 changes: 4 additions & 4 deletions docs/src/features/swf_layer.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Settings
--------

!!! bug
The informations in this "Settings" section may be outdated, they need some love.
The information in this "Settings" section may be outdated, it needs some love.

Optional:

Expand Down Expand Up @@ -84,7 +84,7 @@ local and remote objects.
retention_period=60
)

# a Domain model local instance has been created, but nothing has been
# A Domain model local instance has been created, but nothing has been
# sent to amazon. To do so, you have to save it.
>>> D.save()
```
Expand Down Expand Up @@ -133,8 +133,8 @@ ModelDiff()

### QuerySets

Models can be retrieved and instantiated via querysets. To continue over the django comparison,
they’re behaving like django managers.
Models can be retrieved and instantiated via querysets. To continue over the Django comparison,
they’re behaving like Django managers.

```python
# As querying for models needs a valid connection to amazon service,
Expand Down
5 changes: 2 additions & 3 deletions docs/src/features/task_lists.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# Task Lists

Task lists are often used to route different tasks to specific groups
of workers.
Task lists are used to route different tasks to specific groups of workers.
The decider and activity task lists are distinct, even if they have the same name.

For SWF activities, the task list is typically specified with `@activity.with_attributes`:
Expand Down Expand Up @@ -56,7 +55,7 @@ class MyWorkflow(Workflow):
...

@classmethod
def get_task_list(cls, task_list, *args, **kwargs):
def get_task_list(cls, *args, task_list, **kwargs):
return task_list

def run(self, x, task_list, *args, **kwargs):
Expand Down
2 changes: 1 addition & 1 deletion examples/pirate/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ It can be executed like this:
```
export AWS_DEFAULT_REGION=eu-west-1
export SWF_DOMAIN=TestDomain
simpleflow standalone \
PYTHONPATH="$PWD" simpleflow standalone \
--nb-deciders 1 \
--nb-workers 2 \
--input '{"kwargs":{"money_needed": 120}}' \
Expand Down
29 changes: 28 additions & 1 deletion simpleflow/boto3_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,45 @@
from typing import Any

import boto3
from botocore import config

from simpleflow.utils import json_dumps

_client_var: ContextVar[dict] = ContextVar("boto3_clients")


def clean_config(cfg: config.Config) -> dict[str, Any]:
rc = {}
for k in vars(cfg):
if k.startswith("_"):
continue
v = getattr(cfg, k)
if callable(v) or v is None:
continue
rc[k] = v
return rc


def clean_kwargs(kwargs: dict[str, Any]) -> dict[str, Any]:
"""
We don't know how (or want) to serialize botocore.config.Config instances;
they currently only contain POD.
"""
rc = {}
for k, v in kwargs.items():
if isinstance(v, config.Config):
v = clean_config(v)
rc[k] = v
return rc


def get_or_create_boto3_client(*, region_name: str | None, service_name: str, **kwargs: Any):
d = {
"region_name": region_name,
"service_name": service_name,
}
d.update(kwargs)
cleaned_kwargs = clean_kwargs(kwargs)
d.update(cleaned_kwargs)
key = hashlib.sha1(json_dumps(d).encode()).hexdigest()
boto3_clients = _client_var.get({})
if not boto3_clients:
Expand Down
29 changes: 19 additions & 10 deletions simpleflow/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from simpleflow.settings import print_settings
from simpleflow.swf import helpers
from simpleflow.swf.process import decider, worker
from simpleflow.swf.process.proxy.command import start_proxy
from simpleflow.swf.stats import pretty
from simpleflow.swf.task import ActivityTask
from simpleflow.swf.utils import get_workflow_execution, set_workflow_class_name
Expand All @@ -42,7 +43,7 @@ def comma_separated_list(value):

@click.group()
@click.option("--format")
@click.option("--header/--no-header", default=False)
@click.option("--header/--no-header", default=True)
@click.option(
"--color",
type=click.Choice([log.ColorModes.AUTO, log.ColorModes.ALWAYS, log.ColorModes.NEVER]),
Expand Down Expand Up @@ -394,14 +395,11 @@ def task_info(ctx, domain, workflow_id, task_id, details):


@click.option("--nb-processes", "-N", type=int)
@click.option("--log-level", "-l")
@click.option("--task-list", "-t")
@click.option("--domain", "-d", envvar="SWF_DOMAIN", required=True, help="SWF Domain")
@click.argument("workflows", nargs=-1, required=False)
@cli.command("decider.start", help="Start a decider process to manage workflow executions.")
def start_decider(workflows, domain, task_list, log_level, nb_processes):
if log_level:
logger.warning("Deprecated: --log-level will be removed, use LOG_LEVEL environment variable instead")
def start_decider(workflows, domain, task_list, nb_processes):
decider.command.start(
workflows,
domain,
Expand All @@ -411,6 +409,7 @@ def start_decider(workflows, domain, task_list, log_level, nb_processes):
)


@click.option("--proxy", "-x", envvar="SWF_PROXY", required=False, help="Proxy URL.")
@click.option("--middleware-pre-execution", required=False, multiple=True)
@click.option("--middleware-post-execution", required=False, multiple=True)
@click.option(
Expand All @@ -426,26 +425,24 @@ def start_decider(workflows, domain, task_list, log_level, nb_processes):
help="Heartbeat interval in seconds (0 to disable heartbeating).",
)
@click.option("--nb-processes", "-N", type=int)
@click.option("--log-level", "-l")
@click.option("--task-list", "-t")
@click.option("--domain", "-d", envvar="SWF_DOMAIN", required=True, help="SWF Domain")
@cli.command("worker.start", help="Start a worker process to handle activity tasks.")
def start_worker(
domain,
task_list,
log_level,
nb_processes,
heartbeat,
one_task,
poll_data,
middleware_pre_execution,
middleware_post_execution,
proxy,
):
if log_level:
logger.warning("Deprecated: --log-level will be removed, use LOG_LEVEL environment variable instead")

if not task_list and not poll_data:
raise ValueError("Please provide a --task-list or some data via --poll-data")
if poll_data and proxy:
raise ValueError("Please provide either --poll-data or --proxy, not both")

middlewares = {
"pre": middleware_pre_execution,
Expand All @@ -460,6 +457,7 @@ def start_worker(
heartbeat=heartbeat,
one_task=one_task,
poll_data=poll_data,
proxy=proxy,
)


Expand Down Expand Up @@ -784,6 +782,17 @@ def section(title):
print(f"{key}={value}")


@click.option("--address", "-a", required=False, default="::1", help="Address to bind.")
@click.option("--port", "-p", required=False, type=int, default=4242, help="Port to bind.")
@cli.command("proxy.start", help="Start a proxy process to handle worker tasks.")
def proxy_start(address: str, port: int):
proxy = os.environ.get("SWF_PROXY")
if proxy:
address, sport = proxy.rsplit(":", 1)
port = int(sport)
start_proxy(address=address, port=port)


@click.argument("locations", nargs=-1)
@cli.command(
"binaries.download",
Expand Down
4 changes: 2 additions & 2 deletions simpleflow/swf/mapper/actors/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ class Actor(ConnectedSWFObject):
:ivar task_list: task list the Actor should watch for tasks on
"""

def __init__(self, domain: Domain, task_list: str) -> None:
super().__init__()
def __init__(self, domain: Domain, task_list: str, **kwargs) -> None:
super().__init__(**kwargs)

self._set_domain(domain)
self.task_list = task_list
Expand Down
4 changes: 2 additions & 2 deletions simpleflow/swf/mapper/actors/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ class ActivityWorker(Actor):
The form of this identity is user defined.
"""

def __init__(self, domain: Domain, task_list: str, identity: str | None = None):
super().__init__(domain, task_list)
def __init__(self, domain: Domain, task_list: str, identity: str | None = None, **kwargs):
super().__init__(domain, task_list, **kwargs)

self._identity = identity

Expand Down
Loading