From 850a62dfbafd959e0a9f70aa24d4c2a775740bb9 Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Wed, 12 Nov 2025 16:36:10 -0800 Subject: [PATCH 1/2] Fix up plugin readme --- README.md | 171 +++++++++++++++++++++++++++++++----------------------- 1 file changed, 100 insertions(+), 71 deletions(-) diff --git a/README.md b/README.md index 1f991c908..f304c77ed 100644 --- a/README.md +++ b/README.md @@ -1498,10 +1498,79 @@ configuration, and worker execution. Common customizations may include but are n 3. Workflows 4. Interceptors +**Important Notes:** + +- Client plugins that also implement worker plugin interfaces are automatically propagated to workers +- Avoid providing the same plugin to both client and worker to prevent double execution +- Each plugin's `name()` method returns a unique identifier for debugging purposes + +#### Usage + +Plugins can be provided to both `Client` and `Worker`. + +```python +# Use the plugin when connecting +client = await Client.connect( + "my-server.com:7233", + plugins=[SomePlugin()] +) +``` +```python +# Use the plugin when creating a worker +worker = Worker( + client, + plugins=[SomePlugin()] +) +``` +In the case of `Client`, any plugins will also be provided to any workers created with that client. +```python +# Create client with the unified plugin +client = await Client.connect( + "localhost:7233", + plugins=[SomePlugin()] +) + +# Worker will automatically inherit the plugin from the client +worker = Worker( + client, + task_queue="my-task-queue", + workflows=[MyWorkflow], + activities=[my_activity] +) +``` +#### Plugin Implementations + +The easiest way to create your own plugin is to use `SimplePlugin`. This takes a number of possible configurations to produce +a relatively straightforward plugin. + +```python +plugin = SimplePlugin( + "MyPlugin", + data_converter=converter, +) +``` + +It is also possible to subclass `SimplePlugin` for some additional controls. This is what we do for `OpenAIAgentsPlugin`. + +```python +class MediumPlugin(SimplePlugin): + def __init__(self): + super().__init__("MediumPlugin", data_converter=pydantic_data_converter) + + def configure_worker(self, config: WorkerConfig) -> WorkerConfig: + config = super().configure_worker(config) + config["task_queue"] = "override" + return config +``` + +#### Advanced Plugin Implementations + +`SimplePlugin` doesn't cover all possible uses of plugins. For more unusual use cases, an implementor can + A single plugin class can implement both client and worker plugin interfaces to share common logic between both contexts. When used with a client, it will automatically be propagated to any workers created with that client. -#### Client Plugins +##### Client Plugins Client plugins can intercept and modify client configuration and service connections. They are useful for adding authentication, modifying connection parameters, or adding custom behavior during client creation. @@ -1516,29 +1585,21 @@ class AuthenticationPlugin(Plugin): def __init__(self, api_key: str): self.api_key = api_key - def init_client_plugin(self, next: Plugin) -> None: - self.next_client_plugin = next - def configure_client(self, config: ClientConfig) -> ClientConfig: # Modify client configuration config["namespace"] = "my-secure-namespace" - return self.next_client_plugin.configure_client(config) + return config async def connect_service_client( - self, config: temporalio.service.ConnectConfig + self, + config: temporalio.service.ConnectConfig, + next: Callable[[ConnectConfig], Awaitable[ServiceClient]] ) -> temporalio.service.ServiceClient: - # Add authentication to the connection config.api_key = self.api_key - return await self.next_client_plugin.connect_service_client(config) - -# Use the plugin when connecting -client = await Client.connect( - "my-server.com:7233", - plugins=[AuthenticationPlugin("my-api-key")] -) + return await next(config) ``` -#### Worker Plugins +##### Worker Plugins Worker plugins can modify worker configuration and intercept worker execution. They are useful for adding monitoring, custom lifecycle management, or modifying worker settings. Worker plugins can also configure replay. @@ -1558,47 +1619,39 @@ class MonitoringPlugin(Plugin): def __init__(self): self.logger = logging.getLogger(__name__) - def init_worker_plugin(self, next: Plugin) -> None: - self.next_worker_plugin = next - def configure_worker(self, config: WorkerConfig) -> WorkerConfig: # Modify worker configuration original_task_queue = config["task_queue"] config["task_queue"] = f"monitored-{original_task_queue}" self.logger.info(f"Worker created for task queue: {config['task_queue']}") - return self.next_worker_plugin.configure_worker(config) + return config - async def run_worker(self, worker: Worker) -> None: + async def run_worker(self, worker: Worker, next: Callable[[Worker], Awaitable[None]]) -> None: self.logger.info("Starting worker execution") try: - await self.next_worker_plugin.run_worker(worker) + await next(worker) finally: self.logger.info("Worker execution completed") def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig: - return self.next_worker_plugin.configure_replayer(config) + return config @asynccontextmanager async def run_replayer( self, replayer: Replayer, histories: AsyncIterator[temporalio.client.WorkflowHistory], + next: Callable[ + [Replayer, AsyncIterator[WorkflowHistory]], + AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]], + ] ) -> AsyncIterator[AsyncIterator[WorkflowReplayResult]]: self.logger.info("Starting replay execution") try: - async with self.next_worker_plugin.run_replayer(replayer, histories) as results: - yield results + async with self.next_worker_plugin.run_replayer(replayer, histories) as results: + yield results finally: self.logger.info("Replay execution completed") - -# Use the plugin when creating a worker -worker = Worker( - client, - task_queue="my-task-queue", - workflows=[MyWorkflow], - activities=[my_activity], - plugins=[MonitoringPlugin()] -) ``` For plugins that need to work with both clients and workers, you can implement both interfaces in a single class: @@ -1612,67 +1665,43 @@ from temporalio.worker import Plugin as WorkerPlugin, WorkerConfig, ReplayerConf class UnifiedPlugin(ClientPlugin, WorkerPlugin): - def init_client_plugin(self, next: ClientPlugin) -> None: - self.next_client_plugin = next - - def init_worker_plugin(self, next: WorkerPlugin) -> None: - self.next_worker_plugin = next - def configure_client(self, config: ClientConfig) -> ClientConfig: # Client-side customization config["data_converter"] = pydantic_data_converter - return self.next_client_plugin.configure_client(config) + return config async def connect_service_client( - self, config: temporalio.service.ConnectConfig + self, + config: temporalio.service.ConnectConfig, + next: Callable[[ConnectConfig], Awaitable[ServiceClient]] ) -> temporalio.service.ServiceClient: - # Add authentication to the connection config.api_key = self.api_key - return await self.next_client_plugin.connect_service_client(config) + return await next(config) def configure_worker(self, config: WorkerConfig) -> WorkerConfig: # Worker-side customization - return self.next_worker_plugin.configure_worker(config) + return config - async def run_worker(self, worker: Worker) -> None: + async def run_worker(self, worker: Worker, next: Callable[[Worker], Awaitable[None]]) -> None: print("Starting unified worker") - await self.next_worker_plugin.run_worker(worker) + await next(worker) def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig: config["data_converter"] = pydantic_data_converter - return self.next_worker_plugin.configure_replayer(config) + return config async def run_replayer( self, replayer: Replayer, histories: AsyncIterator[temporalio.client.WorkflowHistory], + next: Callable[ + [Replayer, AsyncIterator[WorkflowHistory]], + AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]], + ] ) -> AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]]: - return self.next_worker_plugin.run_replayer(replayer, histories) - -# Create client with the unified plugin -client = await Client.connect( - "localhost:7233", - plugins=[UnifiedPlugin()] -) - -# Worker will automatically inherit the plugin from the client -worker = Worker( - client, - task_queue="my-task-queue", - workflows=[MyWorkflow], - activities=[my_activity] -) + return next(replayer, histories) ``` -**Important Notes:** - -- Plugins are executed in reverse order (last plugin wraps the first), forming a chain of responsibility -- Client plugins that also implement worker plugin interfaces are automatically propagated to workers -- Avoid providing the same plugin to both client and worker to prevent double execution -- Plugin methods should call the plugin provided during initialization to maintain the plugin chain -- Each plugin's `name()` method returns a unique identifier for debugging purposes - - ### Workflow Replay Given a workflow's history, it can be replayed locally to check for things like non-determinism errors. For example, From 006d2c24cf46ad067cfaa850d17e1b06a3d1002b Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Thu, 13 Nov 2025 09:21:01 -0800 Subject: [PATCH 2/2] Finish sentence --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index f304c77ed..110279410 100644 --- a/README.md +++ b/README.md @@ -1565,7 +1565,8 @@ class MediumPlugin(SimplePlugin): #### Advanced Plugin Implementations -`SimplePlugin` doesn't cover all possible uses of plugins. For more unusual use cases, an implementor can +`SimplePlugin` doesn't cover all possible uses of plugins. For more unusual use cases, an implementor can implement +the underlying plugin interfaces directly. A single plugin class can implement both client and worker plugin interfaces to share common logic between both contexts. When used with a client, it will automatically be propagated to any workers created with that client.