diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 89ccd73d4..d495c09bc 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -2,7 +2,7 @@ # See https://pre-commit.com/hooks.html for more hooks repos: - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v4.5.0 + rev: v4.6.0 hooks: - id: trailing-whitespace - id: end-of-file-fixer @@ -13,7 +13,7 @@ repos: - repo: https://github.com/astral-sh/ruff-pre-commit # Ruff version. - rev: v0.5.0 + rev: v0.5.1 hooks: # Run the linter. - id: ruff diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index 5ed5fdaf5..fd7c548ee 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -76,6 +76,7 @@ nav: - Error Handling: features/error_handling.md - Continue As New: features/continue_as_new.md - Middleware: features/middleware.md + - Proxy: features/proxy.md - Development: development.md - Contributing: contributing.md - License: license.md diff --git a/docs/src/features/canvas.md b/docs/src/features/canvas.md index 6865d91aa..baf5c1778 100644 --- a/docs/src/features/canvas.md +++ b/docs/src/features/canvas.md @@ -139,12 +139,12 @@ from simpleflow.canvas import Chain, FuncGroup, Group @activity.with_attributes(task_list="quickstart", version="example") def partition_data(data_location): # Partition a list of things to do into parallelizable sub-parts - pass + ... @activity.with_attributes(task_list="quickstart", version="example") def execute_on_sub_part(sub_part): - pass + ... class AWorkflow(Workflow): @@ -169,5 +169,11 @@ return an empty Group. Since this has been a long-standing policy, a new `_allow_none` argument relaxes this constraint. !!! warning - This is a new experimental option: a better one might be to enforce - that nothing is returned. + This is a new experimental option: a better one may be to allow + a `None` return value. + +### Overriding future classes + +Both the `Group` and `Canvas` instances delegate their work to a +`GroupFuture` (resp. `ChainFuture`) instance by default. +Passing a `future_class` argument allows overriding this. diff --git a/docs/src/features/command_line.md b/docs/src/features/command_line.md index 9d06f468b..bb668cb1f 100644 --- a/docs/src/features/command_line.md +++ b/docs/src/features/command_line.md @@ -9,13 +9,14 @@ List Workflow Executions ------------------------ $ simpleflow workflow.list TestDomain + Workflow ID Workflow Type Status basic-example-1438722273 basic OPEN Workflow Execution Status ------------------------- - $ simpleflow --header workflow.info TestDomain basic-example-1438722273 + $ simpleflow workflow.info TestDomain basic-example-1438722273 domain workflow_type.name workflow_type.version task_list workflow_id run_id tag_list execution_time input TestDomain basic example basic-example-1438722273 22QFVi362TnCh6BdoFgkQFlocunh24zEOemo1L12Yl5Go= 1.70 {'args': [1], 'kwargs': {}} @@ -25,11 +26,11 @@ Tasks Status You can check the status of the workflow execution with: - $ simpleflow --header workflow.tasks DOMAIN WORKFLOW_ID [RUN_ID] --nb-tasks 3 - $ simpleflow --header workflow.tasks TestDomain basic-example-1438722273 + $ simpleflow workflow.tasks DOMAIN WORKFLOW_ID [RUN_ID] --nb-tasks 3 + $ simpleflow workflow.tasks TestDomain basic-example-1438722273 Tasks Last State Last State Time Scheduled Time examples.basic.increment scheduled 2015-08-04 23:04:34.510000 2015-08-04 23:04:34.510000 - $ simpleflow --header workflow.tasks TestDomain basic-example-1438722273 + $ simpleflow workflow.tasks TestDomain basic-example-1438722273 Tasks Last State Last State Time Scheduled Time examples.basic.double completed 2015-08-04 23:06:19.200000 2015-08-04 23:06:17.738000 examples.basic.delay completed 2015-08-04 23:08:18.402000 2015-08-04 23:06:17.738000 @@ -41,7 +42,7 @@ Profiling You can profile the execution of the workflow with: - $ simpleflow --header workflow.profile TestDomain basic-example-1438722273 + $ simpleflow workflow.profile TestDomain basic-example-1438722273 Task Last State Scheduled Time Scheduled Start Time Running End Percentage of total time activity-examples.basic.double-1 completed 2015-08-04 23:06 0.07 2015-08-04 23:06 1.39 2015-08-04 23:06 1.15 activity-examples.basic.increment-1 completed 2015-08-04 23:04 102.20 2015-08-04 23:06 0.79 2015-08-04 23:06 0.65 diff --git a/docs/src/features/continue_as_new.md b/docs/src/features/continue_as_new.md index b1ac8c344..d51f4732c 100644 --- a/docs/src/features/continue_as_new.md +++ b/docs/src/features/continue_as_new.md @@ -1,7 +1,8 @@ # Continue As New -In long-running workflow executions, the history can hit the 25,000-events hard SWF limit. This causes execution -termination. To prevent this, the workflow can itself close the current execution and start another one by submitting +In long-running workflow executions, the history can hit the 25,000-events hard SWF limit. +This causes execution termination. +To prevent this, the workflow can itself close the current execution and start another one by submitting `self.continue_as_new(*args, **kwargs)`: it is then restarted with a new run ID and an empty history. See `examples/continue_as_new.py` for a demonstration of this pattern: diff --git a/docs/src/features/program_tasks.md b/docs/src/features/program_tasks.md index 367122163..83d0d2e1b 100644 --- a/docs/src/features/program_tasks.md +++ b/docs/src/features/program_tasks.md @@ -1,7 +1,7 @@ Execution of Tasks as Programs ============================== -The `simpleflow.execute` module allows to define functions that will be +The `simpleflow.execute` module allows defining functions that will be executed as a program. There are two modes: diff --git a/docs/src/features/proxy.md b/docs/src/features/proxy.md new file mode 100644 index 000000000..36ab92c34 --- /dev/null +++ b/docs/src/features/proxy.md @@ -0,0 +1,34 @@ +# Worker Proxy Support + +When deploying a fleet of workers on an instance, their number +may cause the maximum number of connections to SWF to be reached +and let some of these workers unable to fetch activities. + +A proxy can be used to prevent this. Simpleflow can now use one, +and provides a simplistic single-threaded proxy. + +## Starting the proxy + +The `simpleflow proxy.start` command starts an HTTP proxy server. +It processes the CONNECT proxy method; the rest of the connection +is encrypted as before (it doesn't handle HTTPS to MITM the +communication). + +## Using the proxy + +The `simpleflow worker.start` command accepts a new `-x, --proxy` argument. + +## Example + +```shell +# Running the `pirate` example with multiple processes. +# Starting the decider, then the workflow, are left as exercises. +[screen 1] $ simpleflow proxy.start +Serving HTTP Proxy on ::1:4242 +[screen 2] $ SWF_PROXY=localhost:4242 PYTHONPATH=$PWD SWF_DOMAIN=TestDomain \ + simpleflow worker.start -N 1 -t pirate +``` + +## Environment setting + +Both commands honor a new `SWF_PROXY` environment variable. diff --git a/docs/src/features/swf_layer.md b/docs/src/features/swf_layer.md index a0ac4c5c6..83dd6c94e 100644 --- a/docs/src/features/swf_layer.md +++ b/docs/src/features/swf_layer.md @@ -22,7 +22,7 @@ Settings -------- !!! bug - The informations in this "Settings" section may be outdated, they need some love. + The information in this "Settings" section may be outdated, it needs some love. Optional: @@ -84,7 +84,7 @@ local and remote objects. retention_period=60 ) -# a Domain model local instance has been created, but nothing has been +# A Domain model local instance has been created, but nothing has been # sent to amazon. To do so, you have to save it. >>> D.save() ``` @@ -133,8 +133,8 @@ ModelDiff() ### QuerySets -Models can be retrieved and instantiated via querysets. To continue over the django comparison, -they’re behaving like django managers. +Models can be retrieved and instantiated via querysets. To continue over the Django comparison, +they’re behaving like Django managers. ```python # As querying for models needs a valid connection to amazon service, diff --git a/docs/src/features/task_lists.md b/docs/src/features/task_lists.md index dbdc74df8..375487571 100644 --- a/docs/src/features/task_lists.md +++ b/docs/src/features/task_lists.md @@ -1,7 +1,6 @@ # Task Lists -Task lists are often used to route different tasks to specific groups -of workers. +Task lists are used to route different tasks to specific groups of workers. The decider and activity task lists are distinct, even if they have the same name. For SWF activities, the task list is typically specified with `@activity.with_attributes`: @@ -56,7 +55,7 @@ class MyWorkflow(Workflow): ... @classmethod - def get_task_list(cls, task_list, *args, **kwargs): + def get_task_list(cls, *args, task_list, **kwargs): return task_list def run(self, x, task_list, *args, **kwargs): diff --git a/examples/pirate/README.md b/examples/pirate/README.md index a02a6831c..c3ab42938 100644 --- a/examples/pirate/README.md +++ b/examples/pirate/README.md @@ -7,7 +7,7 @@ It can be executed like this: ``` export AWS_DEFAULT_REGION=eu-west-1 export SWF_DOMAIN=TestDomain -simpleflow standalone \ +PYTHONPATH="$PWD" simpleflow standalone \ --nb-deciders 1 \ --nb-workers 2 \ --input '{"kwargs":{"money_needed": 120}}' \ diff --git a/simpleflow/boto3_utils.py b/simpleflow/boto3_utils.py index 658492ca4..5c03f4c7b 100644 --- a/simpleflow/boto3_utils.py +++ b/simpleflow/boto3_utils.py @@ -5,18 +5,45 @@ from typing import Any import boto3 +from botocore import config from simpleflow.utils import json_dumps _client_var: ContextVar[dict] = ContextVar("boto3_clients") +def clean_config(cfg: config.Config) -> dict[str, Any]: + rc = {} + for k in vars(cfg): + if k.startswith("_"): + continue + v = getattr(cfg, k) + if callable(v) or v is None: + continue + rc[k] = v + return rc + + +def clean_kwargs(kwargs: dict[str, Any]) -> dict[str, Any]: + """ + We don't know how (or want) to serialize botocore.config.Config instances; + they currently only contain POD. + """ + rc = {} + for k, v in kwargs.items(): + if isinstance(v, config.Config): + v = clean_config(v) + rc[k] = v + return rc + + def get_or_create_boto3_client(*, region_name: str | None, service_name: str, **kwargs: Any): d = { "region_name": region_name, "service_name": service_name, } - d.update(kwargs) + cleaned_kwargs = clean_kwargs(kwargs) + d.update(cleaned_kwargs) key = hashlib.sha1(json_dumps(d).encode()).hexdigest() boto3_clients = _client_var.get({}) if not boto3_clients: diff --git a/simpleflow/command.py b/simpleflow/command.py index 040f09b2a..ed9c3357b 100644 --- a/simpleflow/command.py +++ b/simpleflow/command.py @@ -21,6 +21,7 @@ from simpleflow.settings import print_settings from simpleflow.swf import helpers from simpleflow.swf.process import decider, worker +from simpleflow.swf.process.proxy.command import start_proxy from simpleflow.swf.stats import pretty from simpleflow.swf.task import ActivityTask from simpleflow.swf.utils import get_workflow_execution, set_workflow_class_name @@ -42,7 +43,7 @@ def comma_separated_list(value): @click.group() @click.option("--format") -@click.option("--header/--no-header", default=False) +@click.option("--header/--no-header", default=True) @click.option( "--color", type=click.Choice([log.ColorModes.AUTO, log.ColorModes.ALWAYS, log.ColorModes.NEVER]), @@ -394,14 +395,11 @@ def task_info(ctx, domain, workflow_id, task_id, details): @click.option("--nb-processes", "-N", type=int) -@click.option("--log-level", "-l") @click.option("--task-list", "-t") @click.option("--domain", "-d", envvar="SWF_DOMAIN", required=True, help="SWF Domain") @click.argument("workflows", nargs=-1, required=False) @cli.command("decider.start", help="Start a decider process to manage workflow executions.") -def start_decider(workflows, domain, task_list, log_level, nb_processes): - if log_level: - logger.warning("Deprecated: --log-level will be removed, use LOG_LEVEL environment variable instead") +def start_decider(workflows, domain, task_list, nb_processes): decider.command.start( workflows, domain, @@ -411,6 +409,7 @@ def start_decider(workflows, domain, task_list, log_level, nb_processes): ) +@click.option("--proxy", "-x", envvar="SWF_PROXY", required=False, help="Proxy URL.") @click.option("--middleware-pre-execution", required=False, multiple=True) @click.option("--middleware-post-execution", required=False, multiple=True) @click.option( @@ -426,26 +425,24 @@ def start_decider(workflows, domain, task_list, log_level, nb_processes): help="Heartbeat interval in seconds (0 to disable heartbeating).", ) @click.option("--nb-processes", "-N", type=int) -@click.option("--log-level", "-l") @click.option("--task-list", "-t") @click.option("--domain", "-d", envvar="SWF_DOMAIN", required=True, help="SWF Domain") @cli.command("worker.start", help="Start a worker process to handle activity tasks.") def start_worker( domain, task_list, - log_level, nb_processes, heartbeat, one_task, poll_data, middleware_pre_execution, middleware_post_execution, + proxy, ): - if log_level: - logger.warning("Deprecated: --log-level will be removed, use LOG_LEVEL environment variable instead") - if not task_list and not poll_data: raise ValueError("Please provide a --task-list or some data via --poll-data") + if poll_data and proxy: + raise ValueError("Please provide either --poll-data or --proxy, not both") middlewares = { "pre": middleware_pre_execution, @@ -460,6 +457,7 @@ def start_worker( heartbeat=heartbeat, one_task=one_task, poll_data=poll_data, + proxy=proxy, ) @@ -784,6 +782,17 @@ def section(title): print(f"{key}={value}") +@click.option("--address", "-a", required=False, default="::1", help="Address to bind.") +@click.option("--port", "-p", required=False, type=int, default=4242, help="Port to bind.") +@cli.command("proxy.start", help="Start a proxy process to handle worker tasks.") +def proxy_start(address: str, port: int): + proxy = os.environ.get("SWF_PROXY") + if proxy: + address, sport = proxy.rsplit(":", 1) + port = int(sport) + start_proxy(address=address, port=port) + + @click.argument("locations", nargs=-1) @cli.command( "binaries.download", diff --git a/simpleflow/swf/mapper/actors/core.py b/simpleflow/swf/mapper/actors/core.py index fc2b11005..9826fed17 100644 --- a/simpleflow/swf/mapper/actors/core.py +++ b/simpleflow/swf/mapper/actors/core.py @@ -18,8 +18,8 @@ class Actor(ConnectedSWFObject): :ivar task_list: task list the Actor should watch for tasks on """ - def __init__(self, domain: Domain, task_list: str) -> None: - super().__init__() + def __init__(self, domain: Domain, task_list: str, **kwargs) -> None: + super().__init__(**kwargs) self._set_domain(domain) self.task_list = task_list diff --git a/simpleflow/swf/mapper/actors/worker.py b/simpleflow/swf/mapper/actors/worker.py index bbae3980e..e312f8a3c 100644 --- a/simpleflow/swf/mapper/actors/worker.py +++ b/simpleflow/swf/mapper/actors/worker.py @@ -41,8 +41,8 @@ class ActivityWorker(Actor): The form of this identity is user defined. """ - def __init__(self, domain: Domain, task_list: str, identity: str | None = None): - super().__init__(domain, task_list) + def __init__(self, domain: Domain, task_list: str, identity: str | None = None, **kwargs): + super().__init__(domain, task_list, **kwargs) self._identity = identity diff --git a/simpleflow/swf/mapper/core.py b/simpleflow/swf/mapper/core.py index 594c0d91c..1af8ac95b 100644 --- a/simpleflow/swf/mapper/core.py +++ b/simpleflow/swf/mapper/core.py @@ -9,6 +9,7 @@ from typing import Any import boto3 +from botocore import config from botocore.exceptions import NoCredentialsError # NB: import logger directly from simpleflow so we benefit from the logging @@ -37,18 +38,23 @@ class ConnectedSWFObject: delay=retry.exponential, on_exceptions=(TypeError, NoCredentialsError), ) - def __init__(self, *args, **kwargs): - self.region = SETTINGS.get("region") or kwargs.get("region") or DEFAULT_AWS_REGION + def __init__( + self, + *, + region: str | None = None, + proxy: str | None = None, + ): + self.region = SETTINGS.get("region") or region or DEFAULT_AWS_REGION # Use settings-provided keys if available, otherwise pass empty # dictionary to boto SWF client, which will use its default credentials # chain provider. cred_keys = ["aws_access_key_id", "aws_secret_access_key"] - creds_ = {k: SETTINGS[k] for k in cred_keys if SETTINGS.get(k, None)} + creds_: dict[str, Any] = {k: SETTINGS[k] for k in cred_keys if SETTINGS.get(k, None)} - self.boto3_client = kwargs.pop("boto3_client", None) - if not self.boto3_client: - # raises EndpointConnectionError if region is wrong - self.boto3_client = get_or_create_boto3_client(region_name=self.region, service_name="swf", **creds_) + if proxy: + creds_["config"] = config.Config(proxies={"https": proxy}) + # raises EndpointConnectionError if region is wrong + self.boto3_client = get_or_create_boto3_client(region_name=self.region, service_name="swf", **creds_) logger.debug(f"initiated connection to region={self.region}") @@ -70,7 +76,7 @@ def list_open_workflow_executions( "domain": domain, "startTimeFilter": { "oldestDate": datetime.fromtimestamp(oldest_date), - "latestDate": datetime.fromtimestamp(latest_date) if latest_date is not None else None, + "latestDate": (datetime.fromtimestamp(latest_date) if latest_date is not None else None), }, "nextPageToken": next_page_token, "maximumPageSize": maximum_page_size, @@ -83,7 +89,7 @@ def list_open_workflow_executions( } if tag: kwargs["tagFilter"] = { - "name": tag, + "tag": tag, } if workflow_id: kwargs["executionFilter"] = { @@ -120,12 +126,12 @@ def list_closed_workflow_executions( if start_oldest_date is not None: kwargs["startTimeFilter"] = { "oldestDate": datetime.fromtimestamp(start_oldest_date), - "latestDate": datetime.fromtimestamp(start_latest_date) if start_latest_date is not None else None, + "latestDate": (datetime.fromtimestamp(start_latest_date) if start_latest_date is not None else None), } if close_oldest_date is not None: kwargs["closeTimeFilter"] = { "oldestDate": datetime.fromtimestamp(close_oldest_date), - "latestDate": datetime.fromtimestamp(close_latest_date) if close_latest_date is not None else None, + "latestDate": (datetime.fromtimestamp(close_latest_date) if close_latest_date is not None else None), } if close_status: kwargs["closeStatusFilter"] = { @@ -138,7 +144,7 @@ def list_closed_workflow_executions( } if tag: kwargs["tagFilter"] = { - "name": tag, + "tag": tag, } if workflow_id: kwargs["executionFilter"] = { @@ -485,11 +491,13 @@ def list_activity_types( reverse_order: bool | None = None, ): kwargs = { - "activityType": { - "name": name, - } - if name - else None, + "activityType": ( + { + "name": name, + } + if name + else None + ), "maximumPageSize": maximum_page_size, "nextPageToken": next_page_token, "reverseOrder": reverse_order, @@ -537,11 +545,13 @@ def start_workflow_execution( task_start_to_close_timeout: str | None = None, ): kwargs = { - "taskList": { - "name": task_list, - } - if task_list - else None, + "taskList": ( + { + "name": task_list, + } + if task_list + else None + ), "childPolicy": child_policy, "executionStartToCloseTimeout": execution_start_to_close_timeout, "input": input if input is not None else "", diff --git a/simpleflow/swf/mapper/querysets/workflow.py b/simpleflow/swf/mapper/querysets/workflow.py index e91de7529..dbcb602c1 100644 --- a/simpleflow/swf/mapper/querysets/workflow.py +++ b/simpleflow/swf/mapper/querysets/workflow.py @@ -401,7 +401,7 @@ class WorkflowExecutionQuerySet(BaseWorkflowQuerySet): _infos = "executionInfo" _infos_plural = "executionInfos" - def _is_valid_status_param(self, status, param): + def _is_valid_status_param(self, status: str, param: str) -> bool: statuses = { WorkflowExecution.STATUS_OPEN: {"oldest_date", "latest_date"}, WorkflowExecution.STATUS_CLOSED: { @@ -414,10 +414,10 @@ def _is_valid_status_param(self, status, param): } return param in statuses.get(status, set()) - def _validate_status_parameters(self, status, params): + def _validate_status_parameters(self, status: str, params: list[str]) -> list[str]: return [param for param in params if not self._is_valid_status_param(status, param)] - def list_workflow_executions(self, status, *args, **kwargs): + def list_workflow_executions(self, status: str, *args, **kwargs): statuses = { WorkflowExecution.STATUS_OPEN: "open", WorkflowExecution.STATUS_CLOSED: "closed", @@ -436,7 +436,7 @@ def list_workflow_executions(self, status, *args, **kwargs): method = f"list_{statuses[status]}_workflow_executions" return getattr(self, method)(*args, **kwargs) - def get_workflow_type(self, execution_info): + def get_workflow_type(self, execution_info: dict[str, Any]) -> WorkflowType: workflow_type = execution_info["workflowType"] workflow_type_qs = WorkflowTypeQuerySet(self.domain) @@ -467,7 +467,7 @@ def to_WorkflowExecution(self, domain: Domain, execution_info: dict[str, Any], * **kwargs, ) - def get(self, workflow_id, run_id, *args, **kwargs): + def get(self, workflow_id: str, run_id: str, *args, **kwargs): """ """ try: response = self.describe_workflow_execution(self.domain.name, run_id, workflow_id) @@ -496,11 +496,11 @@ def get(self, workflow_id, run_id, *args, **kwargs): def filter( self, - status=WorkflowExecution.STATUS_OPEN, - tag=None, - workflow_id=None, - workflow_type_name=None, - workflow_type_version=None, + status: str = WorkflowExecution.STATUS_OPEN, + tag: str | None = None, + workflow_id: str | None = None, + workflow_type_name: str | None = None, + workflow_type_version: str | None = None, *args, **kwargs, ): @@ -510,21 +510,16 @@ def filter( Valid values are: * ``simpleflow.swf.mapper.models.WorkflowExecution.STATUS_OPEN`` * ``simpleflow.swf.mapper.models.WorkflowExecution.STATUS_CLOSED`` - :type status: string :param tag: workflow executions containing the tag will be kept - :type tag: String :param workflow_id: workflow executions attached to the id will be kept - :type workflow_id: String :param workflow_type_name: workflow executions attached to the workflow type with provided name will be kept - :type workflow_type_name: String :param workflow_type_version: workflow executions attached to the workflow type of the provided version will be kept - :type workflow_type_version: String **Be aware that** querying over status allows the usage of statuses specific kwargs @@ -618,8 +613,8 @@ def _list(self, *args, **kwargs): def all( self, - status=WorkflowExecution.STATUS_OPEN, - start_oldest_date=MAX_WORKFLOW_AGE, + status: str = WorkflowExecution.STATUS_OPEN, + start_oldest_date: int = MAX_WORKFLOW_AGE, *args, **kwargs, ): diff --git a/simpleflow/swf/mapper/settings.py b/simpleflow/swf/mapper/settings.py index 153287a3a..55e080b5c 100644 --- a/simpleflow/swf/mapper/settings.py +++ b/simpleflow/swf/mapper/settings.py @@ -99,9 +99,6 @@ def from_home(path: str | os.PathLike = ".swf") -> dict[str, str]: """Retrieves settings from home environment If HOME environment is applicable, search $HOME/path. - - :rtype: dict - """ if "HOME" in os.environ: swf_path = os.path.join(os.environ["HOME"], path) @@ -116,7 +113,7 @@ def get(path: str | os.PathLike = ".swf") -> dict[str, str]: First, it will try to retrieve settings from a *path* in the user's home directory. Other it tries to load the settings from the environment. - If both return an empty dict, it will also return a empty dict. + If both return an empty dict, it will also return an empty dict. """ return from_home(path) or from_env() diff --git a/simpleflow/swf/process/poller.py b/simpleflow/swf/process/poller.py index 42cb87a12..f5582c06e 100644 --- a/simpleflow/swf/process/poller.py +++ b/simpleflow/swf/process/poller.py @@ -21,11 +21,11 @@ class Poller(simpleflow.swf.mapper.actors.Actor, NamedMixin): """Multi-processing implementation of a SWF actor.""" - def __init__(self, domain: Domain, task_list: str | None = None) -> None: + def __init__(self, domain: Domain, task_list: str | None = None, **kwargs) -> None: self.is_alive = False self._named_mixin_properties = ["task_list"] - super().__init__(domain, task_list) + super().__init__(domain, task_list, **kwargs) def __repr__(self): return f"{self.__class__.__name__}(domain={self.domain.name}, task_list={self.task_list})" diff --git a/simpleflow/swf/process/proxy/__init__.py b/simpleflow/swf/process/proxy/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/simpleflow/swf/process/proxy/command.py b/simpleflow/swf/process/proxy/command.py new file mode 100644 index 000000000..88980f8e5 --- /dev/null +++ b/simpleflow/swf/process/proxy/command.py @@ -0,0 +1,117 @@ +""" +Adapted (much simplified) from https://github.com/inaz2/proxy2. +""" + +from __future__ import annotations + +import itertools +import select +import socket +import ssl +import sys +from http import HTTPStatus +from http.server import BaseHTTPRequestHandler, HTTPServer + +import botocore.endpoint + +import simpleflow + + +class ProxyHTTPServer(HTTPServer): + address_family = socket.AF_INET6 + + def handle_error(self, request, client_address): + # suppress socket/ssl related errors + cls, e = sys.exc_info()[:2] + if cls is socket.error or cls is ssl.SSLError: + pass + else: + super().handle_error(request, client_address) + + +class ProxyRequestHandler(BaseHTTPRequestHandler): + protocol_version = "HTTP/1.1" + server_version = "Simpleflow Proxy/" + simpleflow.__version__ + error_content_type = "application/json" + error_message_format = '{"code": %(code)d, "message": "%(message)s", "explain": "%(explain)s"}\n' + timeout = botocore.endpoint.DEFAULT_TIMEOUT + 1 + + def log_error(self, format, *args): + # suppress "Request timed out: timeout('timed out',)" + if isinstance(args[0], socket.timeout): + return + + self.log_message(format, *args) + + # https://en.wikipedia.org/wiki/List_of_Unicode_characters#Control_codes + _control_char_table = str.maketrans({c: rf"\x{c:02x}" for c in itertools.chain(range(0x20), range(0x7F, 0xA0))}) + _control_char_table[ord("\\")] = r"\\" + + def log_message(self, format, *args): + """Log an arbitrary message. + Copy-pasted from gh-100001 in case a vulnerable version of Python is used. + + This is used by all other logging functions. Override + it if you have specific logging wishes. + + The first argument, FORMAT, is a format string for the + message to be logged. If the format string contains + any % escapes requiring parameters, they should be + specified as subsequent arguments (it's just like + printf!). + + The client ip and current date/time are prefixed to + every message. + + Unicode control characters are replaced with escaped hex + before writing the output to stderr. + + """ + + message = format % args + sys.stderr.write( + f"{self.address_string()} - - [{self.log_date_time_string()}]" + f" {message.translate(self._control_char_table)}\n" + ) + + def do_CONNECT(self): + address = self.path.split(":", 1) + parsed_address = (address[0], int(address[1]) or 443) + try: + s = socket.create_connection(parsed_address, timeout=self.timeout) + except Exception: + self.send_error(HTTPStatus.BAD_GATEWAY) + return + self.send_response(HTTPStatus.OK, "Connection Established") + self.end_headers() + + self.proxy_connection(s) + + def proxy_connection(self, server_connection: socket.socket) -> None: + conns = (self.connection, server_connection) + self.close_connection = False + while not self.close_connection: + rlist, wlist, xlist = select.select(conns, [], conns, self.timeout) + if xlist or not rlist: + break + for r in rlist: + other = conns[1] if r is conns[0] else conns[0] + data = r.recv(8192) + if not data: + self.close_connection = True + break + other.sendall(data) + + def do_GET(self) -> None: + if self.path == "/status": + self.send_response(HTTPStatus.OK) + else: + self.send_error(HTTPStatus.NOT_FOUND) + + +def start_proxy(address: str = "::1", port: int = 4242) -> None: + server_address = (address, port) + httpd = ProxyHTTPServer(server_address, ProxyRequestHandler) + sa = httpd.socket.getsockname() + print(f"Serving HTTP Proxy on {sa[0]}:{sa[1]}") + httpd.serve_forever() diff --git a/simpleflow/swf/process/worker/base.py b/simpleflow/swf/process/worker/base.py index 376ddae03..e67662bfd 100644 --- a/simpleflow/swf/process/worker/base.py +++ b/simpleflow/swf/process/worker/base.py @@ -50,10 +50,10 @@ def __init__( middlewares: dict[str, list[str]] | None = None, heartbeat: int = 60, poll_data: str | None = None, + proxy: str | None = None, ) -> None: """ :param middlewares: Paths to middleware functions to execute before and after any Activity - :param process_mode: Whether to process locally (default) """ self.nb_retries = 3 # heartbeat=0 is a special value to disable heartbeating. We want to @@ -63,7 +63,7 @@ def __init__( self.middlewares = middlewares self.poll_data = poll_data - super().__init__(domain, task_list) + super().__init__(domain, task_list, proxy=proxy) @property def name(self): @@ -96,7 +96,7 @@ def process(self, response: Response) -> None: """ Process a simpleflow.swf.mapper.actors.ActivityWorker poll response. """ - token = response.task_token + token: str = response.task_token task = response.activity_task spawn(self, token, task, self.middlewares, self._heartbeat) @@ -107,7 +107,11 @@ def complete(self, token: str, result: str | None = None) -> None: # noinspection PyMethodOverriding @with_state("failing") def fail( - self, token: str, task: ActivityTask, reason: str | None = None, details: str | None = None + self, + token: str, + task: ActivityTask, + reason: str | None = None, + details: str | None = None, ) -> dict[str, Any] | None: """ Fail the activity, log and ignore exceptions. @@ -132,7 +136,11 @@ def dispatch(self, task: ActivityTask) -> Activity: return self._dispatcher.dispatch_activity(name) def process( - self, poller: ActivityPoller, token: str, task: ActivityTask, middlewares: dict[str, list[str]] | None = None + self, + poller: ActivityPoller, + token: str, + task: ActivityTask, + middlewares: dict[str, list[str]] | None = None, ) -> Any: logger.debug("ActivityWorker.process()") try: @@ -142,6 +150,7 @@ def process( kwargs = input.get("kwargs", {}) context = sanitize_activity_context(task.context) context["domain_name"] = poller.domain.name + context["task_list"] = poller.task_list if input.get("meta", {}).get("binaries"): download_binaries(input["meta"]["binaries"]) result = ActivityTask( @@ -180,7 +189,12 @@ def process( poller.fail_with_retry(token, task, reason) -def process_task(poller, token: str, task: ActivityTask, middlewares: dict[str, list[str]] | None = None) -> None: +def process_task( + poller: ActivityPoller, + token: str, + task: ActivityTask, + middlewares: dict[str, list[str]] | None = None, +) -> None: logger.debug("process_task()") format.JUMBO_FIELDS_MEMORY_CACHE.clear() worker = ActivityWorker() @@ -271,7 +285,7 @@ def worker_alive(): except simpleflow.swf.mapper.exceptions.RateLimitExceededError as error: # ignore rate limit errors: high chances the next heartbeat will be # ok anyway, so it would be stupid to break the task for that - logger.warning( + logger.info( f'got a "ThrottlingException / Rate exceeded" when heartbeating for task {task.activity_type.name}:' f" {error}" ) diff --git a/simpleflow/swf/process/worker/command.py b/simpleflow/swf/process/worker/command.py index f4b25771f..b74308521 100644 --- a/simpleflow/swf/process/worker/command.py +++ b/simpleflow/swf/process/worker/command.py @@ -11,6 +11,7 @@ def make_worker_poller( middlewares: dict[str, list[str]] | None, heartbeat: int, poll_data: str, + proxy: str | None = None, ) -> ActivityPoller: """ Make a worker poller for the domain and task list. @@ -22,6 +23,7 @@ def make_worker_poller( middlewares=middlewares, heartbeat=heartbeat, poll_data=poll_data, + proxy=proxy, ) @@ -33,6 +35,7 @@ def start( heartbeat: int = 60, one_task: bool = False, poll_data: str | None = None, + proxy: str | None = None, ): """ Start a worker for the given domain and task_list. @@ -48,6 +51,7 @@ def start( middlewares=middlewares, heartbeat=heartbeat, poll_data=poll_data, + proxy=proxy, ) if poll_data: