From f7a8d5029fa2fd78a0d3c54bf8a333c564c733a0 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Mon, 13 Oct 2025 13:30:07 -0700 Subject: [PATCH 01/16] Plumb plugins to core --- temporalio/bridge/src/worker.rs | 15 +++++++++++++-- temporalio/bridge/worker.py | 1 + temporalio/worker/_replayer.py | 3 +++ temporalio/worker/_worker.py | 4 ++++ tests/test_plugins.py | 7 +++++++ 5 files changed, 28 insertions(+), 2 deletions(-) diff --git a/temporalio/bridge/src/worker.rs b/temporalio/bridge/src/worker.rs index 92b43f356..c5cf46453 100644 --- a/temporalio/bridge/src/worker.rs +++ b/temporalio/bridge/src/worker.rs @@ -14,8 +14,9 @@ use temporal_sdk_core::api::errors::PollError; use temporal_sdk_core::replay::{HistoryForReplay, ReplayWorkerInput}; use temporal_sdk_core_api::errors::WorkflowErrorType; use temporal_sdk_core_api::worker::{ - SlotInfo, SlotInfoTrait, SlotKind, SlotKindType, SlotMarkUsedContext, SlotReleaseContext, - SlotReservationContext, SlotSupplier as SlotSupplierTrait, SlotSupplierPermit, + PluginInfo as CorePluginInfo, SlotInfo, SlotInfoTrait, SlotKind, SlotKindType, + SlotMarkUsedContext, SlotReleaseContext, SlotReservationContext, + SlotSupplier as SlotSupplierTrait, SlotSupplierPermit, }; use temporal_sdk_core_api::Worker; use temporal_sdk_core_protos::coresdk::workflow_completion::WorkflowActivationCompletion; @@ -63,6 +64,7 @@ pub struct WorkerConfig { nondeterminism_as_workflow_fail: bool, nondeterminism_as_workflow_fail_for_types: HashSet, nexus_task_poller_behavior: PollerBehavior, + plugins: Vec, } #[derive(FromPyObject)] @@ -722,6 +724,15 @@ fn convert_worker_config( .collect::>>(), ) .nexus_task_poller_behavior(conf.nexus_task_poller_behavior) + .plugins( + conf.plugins + .into_iter() + .map(|name| CorePluginInfo { + name, + version: None, + }) + .collect(), + ) .build() .map_err(|err| PyValueError::new_err(format!("Invalid worker config: {err}"))) } diff --git a/temporalio/bridge/worker.py b/temporalio/bridge/worker.py index 8e20b670a..fbcfc9c3c 100644 --- a/temporalio/bridge/worker.py +++ b/temporalio/bridge/worker.py @@ -64,6 +64,7 @@ class WorkerConfig: nondeterminism_as_workflow_fail: bool nondeterminism_as_workflow_fail_for_types: Set[str] nexus_task_poller_behavior: PollerBehavior + plugins: Sequence[str] @dataclass diff --git a/temporalio/worker/_replayer.py b/temporalio/worker/_replayer.py index 04a32be17..d792a596f 100644 --- a/temporalio/worker/_replayer.py +++ b/temporalio/worker/_replayer.py @@ -80,6 +80,7 @@ def __init__( runtime=runtime, disable_safe_workflow_eviction=disable_safe_workflow_eviction, header_codec_behavior=header_codec_behavior, + plugins=[plugin.name() for plugin in plugins], ) # Apply plugin configuration @@ -293,6 +294,7 @@ def on_eviction_hook( nexus_task_poller_behavior=temporalio.bridge.worker.PollerBehaviorSimpleMaximum( 1 ), + plugins=self._config.get("plugins", []), ), ) # Start worker @@ -368,6 +370,7 @@ class ReplayerConfig(TypedDict, total=False): runtime: Optional[temporalio.runtime.Runtime] disable_safe_workflow_eviction: bool header_codec_behavior: HeaderCodecBehavior + plugins: Sequence[str] @dataclass(frozen=True) diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index fc2c2241d..880fb93b0 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -376,6 +376,7 @@ def __init__( f"The same plugin type {type(client_plugin)} is present from both client and worker. It may run twice and may not be the intended behavior." ) plugins = plugins_from_client + list(plugins) + config["plugins"] = [plugin.name() for plugin in plugins] self.plugins = plugins for plugin in plugins: @@ -609,6 +610,7 @@ def check_activity(activity): nexus_task_poller_behavior=config[ "nexus_task_poller_behavior" ]._to_bridge(), + plugins=config.get("plugins", []), ), ) @@ -621,6 +623,7 @@ def config(self) -> WorkerConfig: config = self._config.copy() config["activities"] = list(config.get("activities", [])) config["workflows"] = list(config.get("workflows", [])) + config["plugins"] = list(config.get("plugins", [])) return config @property @@ -902,6 +905,7 @@ class WorkerConfig(TypedDict, total=False): workflow_task_poller_behavior: PollerBehavior activity_task_poller_behavior: PollerBehavior nexus_task_poller_behavior: PollerBehavior + plugins: Sequence[str] def _warn_if_activity_executor_max_workers_is_inconsistent( diff --git a/tests/test_plugins.py b/tests/test_plugins.py index 5571841b4..adde010da 100644 --- a/tests/test_plugins.py +++ b/tests/test_plugins.py @@ -149,6 +149,7 @@ async def test_worker_plugin_basic_config(client: Client) -> None: plugins=[MyWorkerPlugin()], ) assert worker.config().get("task_queue") == "replaced_queue" + assert worker.config().get("plugins") == [MyWorkerPlugin().name()] # Test client plugin propagation to worker plugins new_config = client.config() @@ -156,6 +157,7 @@ async def test_worker_plugin_basic_config(client: Client) -> None: client = Client(**new_config) worker = Worker(client, task_queue="queue", activities=[never_run_activity]) assert worker.config().get("task_queue") == "combined" + assert worker.config().get("plugins") == [MyCombinedPlugin().name()] # Test both. Client propagated plugins are called first, so the worker plugin overrides in this case worker = Worker( @@ -165,6 +167,10 @@ async def test_worker_plugin_basic_config(client: Client) -> None: plugins=[MyWorkerPlugin()], ) assert worker.config().get("task_queue") == "replaced_queue" + assert worker.config().get("plugins") == [ + MyCombinedPlugin().name(), + MyWorkerPlugin().name(), + ] async def test_worker_duplicated_plugin(client: Client) -> None: @@ -271,6 +277,7 @@ async def test_replay(client: Client) -> None: replayer = Replayer(workflows=[], plugins=[plugin]) assert len(replayer.config().get("workflows") or []) == 1 assert replayer.config().get("data_converter") == pydantic_data_converter + assert replayer.config().get("plugins") == [plugin.name()] await replayer.replay_workflow(await handle.fetch_history()) From d1cfb314411608e802e947f1908dd02364343cfb Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Tue, 14 Oct 2025 09:20:52 -0700 Subject: [PATCH 02/16] Plumb skip_client_worker_set_check --- temporalio/bridge/Cargo.lock | 223 ++++++++++++++++++++++++++++++- temporalio/bridge/runtime.py | 12 +- temporalio/bridge/sdk-core | 2 +- temporalio/bridge/src/lib.rs | 4 +- temporalio/bridge/src/runtime.rs | 48 +++++-- temporalio/bridge/src/worker.rs | 14 +- temporalio/bridge/worker.py | 1 + temporalio/runtime.py | 58 +++++++- temporalio/worker/_replayer.py | 1 + temporalio/worker/_worker.py | 9 ++ tests/conftest.py | 14 ++ tests/test_runtime.py | 17 +++ tests/worker/test_activity.py | 2 + 13 files changed, 377 insertions(+), 28 deletions(-) diff --git a/temporalio/bridge/Cargo.lock b/temporalio/bridge/Cargo.lock index b40d5fc72..764d8c792 100644 --- a/temporalio/bridge/Cargo.lock +++ b/temporalio/bridge/Cargo.lock @@ -17,6 +17,17 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" +[[package]] +name = "aes" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0" +dependencies = [ + "cfg-if", + "cipher", + "cpufeatures", +] + [[package]] name = "aho-corasick" version = "1.1.3" @@ -159,6 +170,15 @@ version = "2.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a65b545ab31d687cff52899d4890855fec459eb6afe0da6417b8a18da87aa29" +[[package]] +name = "block-buffer" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" +dependencies = [ + "generic-array", +] + [[package]] name = "bumpalo" version = "3.19.0" @@ -213,6 +233,22 @@ dependencies = [ "serde", ] +[[package]] +name = "cipher" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad" +dependencies = [ + "crypto-common", + "inout", +] + +[[package]] +name = "constant_time_eq" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c74b8349d32d297c9134b8c88677813a227df8f779daa29bfc29c183fe3dca6" + [[package]] name = "core-foundation" version = "0.10.1" @@ -229,6 +265,15 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "cpufeatures" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280" +dependencies = [ + "libc", +] + [[package]] name = "crc32fast" version = "1.5.0" @@ -262,6 +307,16 @@ version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" +[[package]] +name = "crypto-common" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" +dependencies = [ + "generic-array", + "typenum", +] + [[package]] name = "darling" version = "0.20.11" @@ -311,6 +366,21 @@ dependencies = [ "parking_lot_core", ] +[[package]] +name = "deflate64" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26bf8fc351c5ed29b5c2f0cbbac1b209b74f60ecd62e675a998df72c49af5204" + +[[package]] +name = "deranged" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a41953f86f8a05768a6cda24def994fd2f424b04ec5c719cf89989779f199071" +dependencies = [ + "powerfmt", +] + [[package]] name = "derive_arbitrary" version = "1.4.2" @@ -374,6 +444,17 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "digest" +version = "0.10.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" +dependencies = [ + "block-buffer", + "crypto-common", + "subtle", +] + [[package]] name = "dirs" version = "6.0.0" @@ -644,6 +725,16 @@ dependencies = [ "slab", ] +[[package]] +name = "generic-array" +version = "0.14.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bb6743198531e02858aeaea5398fcc883e71851fcbcb5a2f773e2fb6cb1edf2" +dependencies = [ + "typenum", + "version_check", +] + [[package]] name = "gethostname" version = "1.0.2" @@ -752,6 +843,15 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "hmac" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" +dependencies = [ + "digest", +] + [[package]] name = "http" version = "1.3.1" @@ -1004,6 +1104,15 @@ version = "2.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f4c7245a08504955605670dbf141fceab975f15ca21570696aebe9d2e71576bd" +[[package]] +name = "inout" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "879f10e63c20629ecabbb64a8010319738c66a5cd0c29b02d63d272b03751d01" +dependencies = [ + "generic-array", +] + [[package]] name = "instant" version = "0.1.13" @@ -1102,6 +1211,26 @@ version = "0.2.175" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a82ae493e598baaea5209805c49bbf2ea7de956d50d7da0da1164f9c6d28543" +[[package]] +name = "liblzma" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73c36d08cad03a3fbe2c4e7bb3a9e84c57e4ee4135ed0b065cade3d98480c648" +dependencies = [ + "liblzma-sys", +] + +[[package]] +name = "liblzma-sys" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01b9596486f6d60c3bbe644c0e1be1aa6ccc472ad630fe8927b456973d7cb736" +dependencies = [ + "cc", + "libc", + "pkg-config", +] + [[package]] name = "libredox" version = "0.1.9" @@ -1277,6 +1406,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + [[package]] name = "num-traits" version = "0.2.19" @@ -1431,6 +1566,16 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "pbkdf2" +version = "0.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8ed6a7761f76e3b9f92dfb0a60a6a6477c61024b775147ff0973a02653abaf2" +dependencies = [ + "digest", + "hmac", +] + [[package]] name = "percent-encoding" version = "2.3.1" @@ -1518,6 +1663,18 @@ dependencies = [ "zerovec", ] +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + +[[package]] +name = "ppmd-rust" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c834641d8ad1b348c9ee86dec3b9840d805acd5f24daa5f90c788951a52ff59b" + [[package]] name = "ppv-lite86" version = "0.2.21" @@ -2260,6 +2417,17 @@ dependencies = [ "serde", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sharded-slab" version = "0.1.7" @@ -2569,6 +2737,7 @@ dependencies = [ "tracing", "tracing-core", "url", + "uuid", ] [[package]] @@ -2647,6 +2816,25 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "time" +version = "0.3.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e7d9e3bb61134e77bde20dd4825b97c010155709965fedf0f49bb138e52a9d" +dependencies = [ + "deranged", + "num-conv", + "powerfmt", + "serde", + "time-core", +] + +[[package]] +name = "time-core" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40868e7c1d2f0b8d73e4a8c7f0ff63af4f6d19be117e90bd73eb1d62cf831c6b" + [[package]] name = "tinystr" version = "0.8.1" @@ -2931,6 +3119,12 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc7d623258602320d5c55d1bc22793b57daff0ec7efc270ea7d55ce1d5f5471c" +[[package]] +name = "typenum" +version = "1.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb" + [[package]] name = "typetag" version = "0.2.20" @@ -2998,9 +3192,9 @@ checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" [[package]] name = "uuid" -version = "1.18.0" +version = "1.18.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f33196643e165781c20a5ead5582283a7dacbb87855d867fbc2df3f81eddc1be" +checksum = "2f87b8aa10b915a06587d0dec516c282ff295b475d94abf425d62b57710070a2" dependencies = [ "getrandom 0.3.3", "js-sys", @@ -3537,6 +3731,20 @@ name = "zeroize" version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" +dependencies = [ + "zeroize_derive", +] + +[[package]] +name = "zeroize_derive" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] [[package]] name = "zerotrie" @@ -3577,12 +3785,23 @@ version = "4.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "caa8cd6af31c3b31c6631b8f483848b91589021b28fffe50adada48d4f4d2ed1" dependencies = [ + "aes", "arbitrary", "bzip2", + "constant_time_eq", "crc32fast", + "deflate64", "flate2", + "getrandom 0.3.3", + "hmac", "indexmap", + "liblzma", "memchr", + "pbkdf2", + "ppmd-rust", + "sha1", + "time", + "zeroize", "zopfli", "zstd", ] diff --git a/temporalio/bridge/runtime.py b/temporalio/bridge/runtime.py index afc79f0f5..8f00d3c30 100644 --- a/temporalio/bridge/runtime.py +++ b/temporalio/bridge/runtime.py @@ -23,9 +23,9 @@ def _raise_in_thread(thread_id: int, exc_type: Type[BaseException]) -> bool: thread_id, exc_type ) - def __init__(self, *, telemetry: TelemetryConfig) -> None: + def __init__(self, *, options: RuntimeOptions) -> None: """Create SDK Core runtime.""" - self._ref = temporalio.bridge.temporal_sdk_bridge.init_runtime(telemetry) + self._ref = temporalio.bridge.temporal_sdk_bridge.init_runtime(options) def retrieve_buffered_metrics(self, durations_as_seconds: bool) -> Sequence[Any]: """Get buffered metrics.""" @@ -91,6 +91,14 @@ class TelemetryConfig: metrics: Optional[MetricsConfig] +@dataclass(frozen=True) +class RuntimeOptions: + """Python representation of the Rust struct for runtime options.""" + + telemetry: TelemetryConfig + worker_heartbeat_interval_millis: Optional[int] = None + + # WARNING: This must match Rust runtime::BufferedLogEntry class BufferedLogEntry(Protocol): """A buffered log entry.""" diff --git a/temporalio/bridge/sdk-core b/temporalio/bridge/sdk-core index 682d441dd..eb74962b5 160000 --- a/temporalio/bridge/sdk-core +++ b/temporalio/bridge/sdk-core @@ -1 +1 @@ -Subproject commit 682d441dd3b830e1477af3edb7c2330b403c4c33 +Subproject commit eb74962b5e4ed71b046c81f1cba35b2b9080a127 diff --git a/temporalio/bridge/src/lib.rs b/temporalio/bridge/src/lib.rs index cbd5be10e..ee157fb18 100644 --- a/temporalio/bridge/src/lib.rs +++ b/temporalio/bridge/src/lib.rs @@ -82,8 +82,8 @@ fn new_metric_meter(runtime_ref: &runtime::RuntimeRef) -> Option PyResult { - runtime::init_runtime(telemetry_config) +fn init_runtime(options: runtime::RuntimeOptions) -> PyResult { + runtime::init_runtime(options) } #[pyfunction] diff --git a/temporalio/bridge/src/runtime.rs b/temporalio/bridge/src/runtime.rs index 72cc905ae..a33d91946 100644 --- a/temporalio/bridge/src/runtime.rs +++ b/temporalio/bridge/src/runtime.rs @@ -13,7 +13,7 @@ use temporal_sdk_core::telemetry::{ build_otlp_metric_exporter, start_prometheus_metric_exporter, CoreLogStreamConsumer, MetricsCallBuffer, }; -use temporal_sdk_core::{CoreRuntime, TokioRuntimeBuilder}; +use temporal_sdk_core::{CoreRuntime, RuntimeOptionsBuilder, TokioRuntimeBuilder}; use temporal_sdk_core_api::telemetry::metrics::{CoreMeter, MetricCallBufferer}; use temporal_sdk_core_api::telemetry::{ CoreLog, Logger, MetricTemporality, OtelCollectorOptionsBuilder, OtlpProtocol, @@ -86,16 +86,27 @@ pub struct PrometheusConfig { histogram_bucket_overrides: Option>>, } +#[derive(FromPyObject)] +pub struct RuntimeOptions { + telemetry: TelemetryConfig, + worker_heartbeat_interval_millis: Option, +} + const FORWARD_LOG_BUFFER_SIZE: usize = 2048; const FORWARD_LOG_MAX_FREQ_MS: u64 = 10; -pub fn init_runtime(telemetry_config: TelemetryConfig) -> PyResult { +pub fn init_runtime(options: RuntimeOptions) -> PyResult { + let RuntimeOptions { + telemetry: TelemetryConfig { logging, metrics }, + worker_heartbeat_interval_millis, + } = options; + // Have to build/start telemetry config pieces let mut telemetry_build = TelemetryOptionsBuilder::default(); // Build logging config, capturing forwarding info to start later let mut log_forwarding: Option<(Receiver, PyObject)> = None; - if let Some(logging_conf) = telemetry_config.logging { + if let Some(logging_conf) = logging { telemetry_build.logging(if let Some(forward_to) = logging_conf.forward_to { // Note, actual log forwarding is started later let (consumer, stream) = CoreLogStreamConsumer::new(FORWARD_LOG_BUFFER_SIZE); @@ -113,26 +124,39 @@ pub fn init_runtime(telemetry_config: TelemetryConfig) -> PyResult { // Build metric config, but actual metrics instance is late-bound after // CoreRuntime is created since it needs Tokio runtime - if let Some(metrics_conf) = telemetry_config.metrics.as_ref() { - telemetry_build.attach_service_name(metrics_conf.attach_service_name); - if let Some(prefix) = &metrics_conf.metric_prefix { + let mut metrics_conf = metrics; + if let Some(metrics_conf_ref) = metrics_conf.as_ref() { + telemetry_build.attach_service_name(metrics_conf_ref.attach_service_name); + if let Some(prefix) = &metrics_conf_ref.metric_prefix { telemetry_build.metric_prefix(prefix.to_string()); } } - // Create core runtime which starts tokio multi-thread runtime - let mut core = CoreRuntime::new( + let mut runtime_options_build = RuntimeOptionsBuilder::default(); + runtime_options_build.telemetry_options( telemetry_build .build() .map_err(|err| PyValueError::new_err(format!("Invalid telemetry config: {err}")))?, - TokioRuntimeBuilder::default(), - ) - .map_err(|err| PyRuntimeError::new_err(format!("Failed initializing telemetry: {err}")))?; + ); + + if let Some(ms) = worker_heartbeat_interval_millis { + runtime_options_build.heartbeat_interval(Some(Duration::from_millis(ms))); + } else { + runtime_options_build.heartbeat_interval(None); + } + + let runtime_options = runtime_options_build + .build() + .map_err(|err| PyValueError::new_err(format!("Invalid runtime options: {err}")))?; + + // Create core runtime which starts tokio multi-thread runtime + let mut core = CoreRuntime::new(runtime_options, TokioRuntimeBuilder::default()) + .map_err(|err| PyRuntimeError::new_err(format!("Failed initializing telemetry: {err}")))?; // We late-bind the metrics after core runtime is created since it needs // the Tokio handle let mut metrics_call_buffer: Option>> = None; - if let Some(metrics_conf) = telemetry_config.metrics { + if let Some(metrics_conf) = metrics_conf.take() { let _guard = core.tokio_handle().enter(); // If they want buffered, cannot have Prom/OTel and we make buffered if metrics_conf.buffered_with_size > 0 { diff --git a/temporalio/bridge/src/worker.rs b/temporalio/bridge/src/worker.rs index c5cf46453..191149d0d 100644 --- a/temporalio/bridge/src/worker.rs +++ b/temporalio/bridge/src/worker.rs @@ -14,9 +14,8 @@ use temporal_sdk_core::api::errors::PollError; use temporal_sdk_core::replay::{HistoryForReplay, ReplayWorkerInput}; use temporal_sdk_core_api::errors::WorkflowErrorType; use temporal_sdk_core_api::worker::{ - PluginInfo as CorePluginInfo, SlotInfo, SlotInfoTrait, SlotKind, SlotKindType, - SlotMarkUsedContext, SlotReleaseContext, SlotReservationContext, - SlotSupplier as SlotSupplierTrait, SlotSupplierPermit, + SlotInfo, SlotInfoTrait, SlotKind, SlotKindType, SlotMarkUsedContext, SlotReleaseContext, + SlotReservationContext, SlotSupplier as SlotSupplierTrait, SlotSupplierPermit, }; use temporal_sdk_core_api::Worker; use temporal_sdk_core_protos::coresdk::workflow_completion::WorkflowActivationCompletion; @@ -24,6 +23,7 @@ use temporal_sdk_core_protos::coresdk::{ nexus::NexusTaskCompletion, ActivityHeartbeat, ActivityTaskCompletion, }; use temporal_sdk_core_protos::temporal::api::history::v1::History; +use temporal_sdk_core_protos::temporal::api::worker::v1::PluginInfo; use tokio::sync::mpsc::{channel, Sender}; use tokio_stream::wrappers::ReceiverStream; use tracing::error; @@ -65,6 +65,7 @@ pub struct WorkerConfig { nondeterminism_as_workflow_fail_for_types: HashSet, nexus_task_poller_behavior: PollerBehavior, plugins: Vec, + skip_client_worker_set_check: bool, } #[derive(FromPyObject)] @@ -727,12 +728,13 @@ fn convert_worker_config( .plugins( conf.plugins .into_iter() - .map(|name| CorePluginInfo { + .map(|name| PluginInfo { name, - version: None, + version: String::new(), }) - .collect(), + .collect::>(), ) + .skip_client_worker_set_check(conf.skip_client_worker_set_check) .build() .map_err(|err| PyValueError::new_err(format!("Invalid worker config: {err}"))) } diff --git a/temporalio/bridge/worker.py b/temporalio/bridge/worker.py index fbcfc9c3c..d82f07b14 100644 --- a/temporalio/bridge/worker.py +++ b/temporalio/bridge/worker.py @@ -65,6 +65,7 @@ class WorkerConfig: nondeterminism_as_workflow_fail_for_types: Set[str] nexus_task_poller_behavior: PollerBehavior plugins: Sequence[str] + skip_client_worker_set_check: bool @dataclass diff --git a/temporalio/runtime.py b/temporalio/runtime.py index 64fa12192..a70862030 100644 --- a/temporalio/runtime.py +++ b/temporalio/runtime.py @@ -70,13 +70,37 @@ def set_default(runtime: Runtime, *, error_if_already_set: bool = True) -> None: raise RuntimeError("Runtime default already set") _default_runtime = runtime - def __init__(self, *, telemetry: TelemetryConfig) -> None: - """Create a default runtime with the given telemetry config. + def __init__( + self, + *, + telemetry: Optional[TelemetryConfig] = None, + runtime_options: Optional["RuntimeOptions"] = None, + ) -> None: + """Create a runtime with the provided configuration. Each new runtime creates a new internal thread pool, so use sparingly. + + Args: + telemetry: Telemetry configuration when not supplying + ``runtime_options``. + runtime_options: Full runtime configuration including telemetry and + worker heartbeating options. + + Raises: + ValueError: If both ``telemetry`` and ``runtime_options`` are + provided. """ + if runtime_options and telemetry: + raise ValueError("Cannot supply both telemetry and runtime_options") + + if runtime_options is None: + telemetry = telemetry or TelemetryConfig() + runtime_options = RuntimeOptions(telemetry=telemetry) + else: + telemetry = runtime_options.telemetry + self._core_runtime = temporalio.bridge.runtime.Runtime( - telemetry=telemetry._to_bridge_config() + options=runtime_options._to_bridge_config() ) if isinstance(telemetry.metrics, MetricBuffer): telemetry.metrics._runtime = self @@ -391,6 +415,34 @@ def _to_bridge_config(self) -> temporalio.bridge.runtime.TelemetryConfig: ) +@dataclass(frozen=True) +class RuntimeOptions: + """Configuration for runtime initialization.""" + + telemetry: TelemetryConfig = field(default_factory=TelemetryConfig) + """Telemetry configuration applied to the runtime.""" + + worker_heartbeat_interval: Optional[timedelta] = None + """Interval for worker heartbeats. ``None`` disables heartbeating.""" + + def _to_bridge_config(self) -> temporalio.bridge.runtime.RuntimeOptions: + heartbeat_millis: Optional[int] + if self.worker_heartbeat_interval is None: + heartbeat_millis = None + else: + if self.worker_heartbeat_interval <= timedelta(0): + raise ValueError("worker_heartbeat_interval must be positive") + heartbeat_millis = int( + self.worker_heartbeat_interval.total_seconds() * 1000 + ) + if heartbeat_millis == 0: + heartbeat_millis = 1 + return temporalio.bridge.runtime.RuntimeOptions( + telemetry=self.telemetry._to_bridge_config(), + worker_heartbeat_interval_millis=heartbeat_millis, + ) + + BufferedMetricKind = NewType("BufferedMetricKind", int) """Representation of a buffered metric kind.""" diff --git a/temporalio/worker/_replayer.py b/temporalio/worker/_replayer.py index d792a596f..a1df98074 100644 --- a/temporalio/worker/_replayer.py +++ b/temporalio/worker/_replayer.py @@ -295,6 +295,7 @@ def on_eviction_hook( 1 ), plugins=self._config.get("plugins", []), + skip_client_worker_set_check=True, ), ) # Start worker diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index 880fb93b0..bea64b217 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -148,6 +148,7 @@ def __init__( nexus_task_poller_behavior: PollerBehavior = PollerBehaviorSimpleMaximum( maximum=5 ), + skip_client_worker_set_check: bool = False, ) -> None: """Create a worker to process workflows and/or activities. @@ -322,6 +323,9 @@ def __init__( Defaults to a 5-poller maximum. nexus_task_poller_behavior: Specify the behavior of Nexus task polling. Defaults to a 5-poller maximum. + skip_client_worker_set_check: Skip the runtime validation that ensures + the client is registered with the worker set. This should only be + used in tests. """ config = WorkerConfig( client=client, @@ -364,6 +368,7 @@ def __init__( workflow_task_poller_behavior=workflow_task_poller_behavior, activity_task_poller_behavior=activity_task_poller_behavior, nexus_task_poller_behavior=nexus_task_poller_behavior, + skip_client_worker_set_check=skip_client_worker_set_check, ) plugins_from_client = cast( @@ -390,6 +395,8 @@ def _init_from_config(self, client: temporalio.client.Client, config: WorkerConf """ self._config = config + config.setdefault("skip_client_worker_set_check", False) + if not ( config["activities"] or config["nexus_service_handlers"] @@ -611,6 +618,7 @@ def check_activity(activity): "nexus_task_poller_behavior" ]._to_bridge(), plugins=config.get("plugins", []), + skip_client_worker_set_check=config["skip_client_worker_set_check"], ), ) @@ -906,6 +914,7 @@ class WorkerConfig(TypedDict, total=False): activity_task_poller_behavior: PollerBehavior nexus_task_poller_behavior: PollerBehavior plugins: Sequence[str] + skip_client_worker_set_check: bool def _warn_if_activity_executor_max_workers_is_inconsistent( diff --git a/tests/conftest.py b/tests/conftest.py index 8ffd3a456..edefef2cf 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -6,6 +6,8 @@ import pytest import pytest_asyncio +import temporalio.worker + from . import DEV_SERVER_DOWNLOAD_VERSION # If there is an integration test environment variable set, we must remove the @@ -57,6 +59,18 @@ def pytest_addoption(parser): ) +@pytest.fixture(autouse=True) +def _force_worker_skip_client_set(monkeypatch): + original_init = temporalio.worker.Worker.__init__ + + def patched_init(self, *args, **kwargs): + kwargs.setdefault("skip_client_worker_set_check", True) + return original_init(self, *args, **kwargs) + + monkeypatch.setattr(temporalio.worker.Worker, "__init__", patched_init) + yield + + @pytest.fixture(scope="session") def event_loop(): loop = asyncio.get_event_loop_policy().new_event_loop() diff --git a/tests/test_runtime.py b/tests/test_runtime.py index 4505ebfcf..330b45881 100644 --- a/tests/test_runtime.py +++ b/tests/test_runtime.py @@ -7,6 +7,8 @@ from typing import List, cast from urllib.request import urlopen +import pytest + from temporalio import workflow from temporalio.client import Client from temporalio.runtime import ( @@ -14,6 +16,7 @@ LoggingConfig, PrometheusConfig, Runtime, + RuntimeOptions, TelemetryConfig, TelemetryFilter, ) @@ -254,3 +257,17 @@ async def check_metrics() -> None: # Wait for metrics to appear and match the expected buckets await assert_eventually(check_metrics) + + +def test_runtime_options_to_bridge_config() -> None: + assert RuntimeOptions()._to_bridge_config().worker_heartbeat_interval_millis is None + options = RuntimeOptions(worker_heartbeat_interval=timedelta(seconds=30)) + bridge_config = options._to_bridge_config() + assert bridge_config.worker_heartbeat_interval_millis == 30_000 + + +def test_runtime_options_invalid_heartbeat() -> None: + with pytest.raises(ValueError): + RuntimeOptions( + worker_heartbeat_interval=timedelta(seconds=-5) + )._to_bridge_config() diff --git a/tests/worker/test_activity.py b/tests/worker/test_activity.py index 203b89a5a..05eabb73d 100644 --- a/tests/worker/test_activity.py +++ b/tests/worker/test_activity.py @@ -1370,6 +1370,8 @@ async def _execute_workflow_with_activity( worker_config["task_queue"] = str(uuid.uuid4()) worker_config["activities"] = [fn] + additional_activities worker_config["shared_state_manager"] = _default_shared_state_manager + worker_config["skip_client_worker_set_check"] = True + print("worker_config[skip_client_worker_set_check] = True\n") if not worker_config.get("max_concurrent_activities"): worker_config["max_concurrent_activities"] = default_max_concurrent_activities async with Worker(**worker_config): From 9f5bbd56aeeba5bd1ebfff8d91b365b3d36f5763 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Tue, 14 Oct 2025 10:26:21 -0700 Subject: [PATCH 03/16] New test passing plugin names to core --- temporalio/bridge/sdk-core | 2 +- temporalio/bridge/src/runtime.rs | 9 ++++----- temporalio/bridge/src/worker.rs | 5 +++-- temporalio/worker/_worker.py | 2 -- tests/test_plugins.py | 30 ++++++++++++++++++++++++++++++ tests/worker/test_activity.py | 1 - 6 files changed, 38 insertions(+), 11 deletions(-) diff --git a/temporalio/bridge/sdk-core b/temporalio/bridge/sdk-core index eb74962b5..401a38506 160000 --- a/temporalio/bridge/sdk-core +++ b/temporalio/bridge/sdk-core @@ -1 +1 @@ -Subproject commit eb74962b5e4ed71b046c81f1cba35b2b9080a127 +Subproject commit 401a38506b8b5780c8c6dd87016c825b05c2a70e diff --git a/temporalio/bridge/src/runtime.rs b/temporalio/bridge/src/runtime.rs index a33d91946..4d6ec90fb 100644 --- a/temporalio/bridge/src/runtime.rs +++ b/temporalio/bridge/src/runtime.rs @@ -124,10 +124,9 @@ pub fn init_runtime(options: RuntimeOptions) -> PyResult { // Build metric config, but actual metrics instance is late-bound after // CoreRuntime is created since it needs Tokio runtime - let mut metrics_conf = metrics; - if let Some(metrics_conf_ref) = metrics_conf.as_ref() { - telemetry_build.attach_service_name(metrics_conf_ref.attach_service_name); - if let Some(prefix) = &metrics_conf_ref.metric_prefix { + if let Some(metrics_conf) = metrics.as_ref() { + telemetry_build.attach_service_name(metrics_conf.attach_service_name); + if let Some(prefix) = &metrics_conf.metric_prefix { telemetry_build.metric_prefix(prefix.to_string()); } } @@ -156,7 +155,7 @@ pub fn init_runtime(options: RuntimeOptions) -> PyResult { // We late-bind the metrics after core runtime is created since it needs // the Tokio handle let mut metrics_call_buffer: Option>> = None; - if let Some(metrics_conf) = metrics_conf.take() { + if let Some(metrics_conf) = metrics { let _guard = core.tokio_handle().enter(); // If they want buffered, cannot have Prom/OTel and we make buffered if metrics_conf.buffered_with_size > 0 { diff --git a/temporalio/bridge/src/worker.rs b/temporalio/bridge/src/worker.rs index 191149d0d..0a5deb928 100644 --- a/temporalio/bridge/src/worker.rs +++ b/temporalio/bridge/src/worker.rs @@ -647,11 +647,12 @@ impl WorkerRef { Ok(()) } - fn replace_client(&self, client: &client::ClientRef) { + fn replace_client(&self, client: &client::ClientRef) -> PyResult<()> { self.worker .as_ref() .expect("missing worker") - .replace_client(client.retry_client.clone().into_inner()); + .replace_client(client.retry_client.clone().into_inner()) + .map_err(|err| PyValueError::new_err(format!("Failed replacing client: {err}"))) } fn initiate_shutdown(&self) -> PyResult<()> { diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index bea64b217..fa20ef6a7 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -395,8 +395,6 @@ def _init_from_config(self, client: temporalio.client.Client, config: WorkerConf """ self._config = config - config.setdefault("skip_client_worker_set_check", False) - if not ( config["activities"] or config["nexus_service_handlers"] diff --git a/tests/test_plugins.py b/tests/test_plugins.py index adde010da..ab2077064 100644 --- a/tests/test_plugins.py +++ b/tests/test_plugins.py @@ -9,6 +9,7 @@ import temporalio.client import temporalio.converter import temporalio.worker +import temporalio.bridge.temporal_sdk_bridge from temporalio import workflow from temporalio.client import Client, ClientConfig, OutboundInterceptor, WorkflowHistory from temporalio.contrib.pydantic import pydantic_data_converter @@ -173,6 +174,35 @@ async def test_worker_plugin_basic_config(client: Client) -> None: ] +async def test_worker_plugin_names_forwarded_to_core( + client: Client, monkeypatch: pytest.MonkeyPatch +) -> None: + captured_plugins: list[str] = [] + + original_new_worker = temporalio.bridge.temporal_sdk_bridge.new_worker + + def new_worker_wrapper(runtime_ref, client_ref, config): + nonlocal captured_plugins + captured_plugins = list(config.plugins) + return original_new_worker(runtime_ref, client_ref, config) + + monkeypatch.setattr( + temporalio.bridge.temporal_sdk_bridge, + "new_worker", + new_worker_wrapper, + ) + + plugin1 = SimplePlugin("test-worker-plugin1") + plugin2 = SimplePlugin("test-worker-plugin2") + worker = Worker( + client, + task_queue="queue", + activities=[never_run_activity], + plugins=[plugin1, plugin2], + ) + assert captured_plugins == [plugin1.name(), plugin2.name()] + + async def test_worker_duplicated_plugin(client: Client) -> None: new_config = client.config() new_config["plugins"] = [MyCombinedPlugin()] diff --git a/tests/worker/test_activity.py b/tests/worker/test_activity.py index 05eabb73d..bf5956fed 100644 --- a/tests/worker/test_activity.py +++ b/tests/worker/test_activity.py @@ -1371,7 +1371,6 @@ async def _execute_workflow_with_activity( worker_config["activities"] = [fn] + additional_activities worker_config["shared_state_manager"] = _default_shared_state_manager worker_config["skip_client_worker_set_check"] = True - print("worker_config[skip_client_worker_set_check] = True\n") if not worker_config.get("max_concurrent_activities"): worker_config["max_concurrent_activities"] = default_max_concurrent_activities async with Worker(**worker_config): From 2c38883a7ea6447e1523e26db5a35005848f4c46 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Fri, 17 Oct 2025 13:26:02 -0700 Subject: [PATCH 04/16] lint --- temporalio/bridge/runtime.py | 2 +- temporalio/runtime.py | 2 +- tests/test_plugins.py | 2 +- tests/worker/test_worker.py | 8 +++++++- 4 files changed, 10 insertions(+), 4 deletions(-) diff --git a/temporalio/bridge/runtime.py b/temporalio/bridge/runtime.py index 8f00d3c30..a06f9686c 100644 --- a/temporalio/bridge/runtime.py +++ b/temporalio/bridge/runtime.py @@ -96,7 +96,7 @@ class RuntimeOptions: """Python representation of the Rust struct for runtime options.""" telemetry: TelemetryConfig - worker_heartbeat_interval_millis: Optional[int] = None + worker_heartbeat_interval_millis: Optional[int] = 30000 # 30s # WARNING: This must match Rust runtime::BufferedLogEntry diff --git a/temporalio/runtime.py b/temporalio/runtime.py index a70862030..268cdc418 100644 --- a/temporalio/runtime.py +++ b/temporalio/runtime.py @@ -422,7 +422,7 @@ class RuntimeOptions: telemetry: TelemetryConfig = field(default_factory=TelemetryConfig) """Telemetry configuration applied to the runtime.""" - worker_heartbeat_interval: Optional[timedelta] = None + worker_heartbeat_interval: Optional[timedelta] = timedelta(seconds=30) """Interval for worker heartbeats. ``None`` disables heartbeating.""" def _to_bridge_config(self) -> temporalio.bridge.runtime.RuntimeOptions: diff --git a/tests/test_plugins.py b/tests/test_plugins.py index ab2077064..651f197aa 100644 --- a/tests/test_plugins.py +++ b/tests/test_plugins.py @@ -6,10 +6,10 @@ import pytest +import temporalio.bridge.temporal_sdk_bridge import temporalio.client import temporalio.converter import temporalio.worker -import temporalio.bridge.temporal_sdk_bridge from temporalio import workflow from temporalio.client import Client, ClientConfig, OutboundInterceptor, WorkflowHistory from temporalio.contrib.pydantic import pydantic_data_converter diff --git a/tests/worker/test_worker.py b/tests/worker/test_worker.py index 32f27f631..02e2a69ec 100644 --- a/tests/worker/test_worker.py +++ b/tests/worker/test_worker.py @@ -17,6 +17,7 @@ from temporalio.api.workflowservice.v1 import ( DescribeWorkerDeploymentRequest, DescribeWorkerDeploymentResponse, + ListWorkersRequest, SetWorkerDeploymentCurrentVersionRequest, SetWorkerDeploymentCurrentVersionResponse, SetWorkerDeploymentRampingVersionRequest, @@ -28,7 +29,12 @@ TaskReachabilityType, ) from temporalio.common import PinnedVersioningOverride, RawValue, VersioningBehavior -from temporalio.runtime import PrometheusConfig, Runtime, TelemetryConfig +from temporalio.runtime import ( + PrometheusConfig, + Runtime, + RuntimeOptions, + TelemetryConfig, +) from temporalio.service import RPCError from temporalio.testing import WorkflowEnvironment from temporalio.worker import ( From aaf8e343250ac94530aa25a7746aa5a9a9cbe5b0 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Mon, 20 Oct 2025 15:42:03 -0700 Subject: [PATCH 05/16] Fix and simplify plumbing plugins from WorkerConfig to core, impose runtime client identity requirement --- temporalio/bridge/Cargo.lock | 345 +++++++++----------------------- temporalio/bridge/Cargo.toml | 4 +- temporalio/bridge/runtime.py | 2 +- temporalio/bridge/sdk-core | 2 +- temporalio/bridge/src/client.rs | 9 +- temporalio/worker/_replayer.py | 4 +- temporalio/worker/_worker.py | 7 +- tests/test_plugins.py | 7 +- tests/test_runtime.py | 12 +- tests/worker/test_worker.py | 2 +- tests/worker/test_workflow.py | 64 ++---- 11 files changed, 130 insertions(+), 328 deletions(-) diff --git a/temporalio/bridge/Cargo.lock b/temporalio/bridge/Cargo.lock index 764d8c792..b9046647e 100644 --- a/temporalio/bridge/Cargo.lock +++ b/temporalio/bridge/Cargo.lock @@ -17,17 +17,6 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" -[[package]] -name = "aes" -version = "0.8.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0" -dependencies = [ - "cfg-if", - "cipher", - "cpufeatures", -] - [[package]] name = "aho-corasick" version = "1.1.3" @@ -170,15 +159,6 @@ version = "2.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a65b545ab31d687cff52899d4890855fec459eb6afe0da6417b8a18da87aa29" -[[package]] -name = "block-buffer" -version = "0.10.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" -dependencies = [ - "generic-array", -] - [[package]] name = "bumpalo" version = "3.19.0" @@ -233,22 +213,6 @@ dependencies = [ "serde", ] -[[package]] -name = "cipher" -version = "0.4.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad" -dependencies = [ - "crypto-common", - "inout", -] - -[[package]] -name = "constant_time_eq" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c74b8349d32d297c9134b8c88677813a227df8f779daa29bfc29c183fe3dca6" - [[package]] name = "core-foundation" version = "0.10.1" @@ -265,15 +229,6 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" -[[package]] -name = "cpufeatures" -version = "0.2.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280" -dependencies = [ - "libc", -] - [[package]] name = "crc32fast" version = "1.5.0" @@ -307,16 +262,6 @@ version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" -[[package]] -name = "crypto-common" -version = "0.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" -dependencies = [ - "generic-array", - "typenum", -] - [[package]] name = "darling" version = "0.20.11" @@ -366,21 +311,6 @@ dependencies = [ "parking_lot_core", ] -[[package]] -name = "deflate64" -version = "0.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26bf8fc351c5ed29b5c2f0cbbac1b209b74f60ecd62e675a998df72c49af5204" - -[[package]] -name = "deranged" -version = "0.5.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a41953f86f8a05768a6cda24def994fd2f424b04ec5c719cf89989779f199071" -dependencies = [ - "powerfmt", -] - [[package]] name = "derive_arbitrary" version = "1.4.2" @@ -444,17 +374,6 @@ dependencies = [ "unicode-xid", ] -[[package]] -name = "digest" -version = "0.10.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" -dependencies = [ - "block-buffer", - "crypto-common", - "subtle", -] - [[package]] name = "dirs" version = "6.0.0" @@ -493,6 +412,12 @@ version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" +[[package]] +name = "dyn-clone" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555" + [[package]] name = "either" version = "1.15.0" @@ -725,16 +650,6 @@ dependencies = [ "slab", ] -[[package]] -name = "generic-array" -version = "0.14.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4bb6743198531e02858aeaea5398fcc883e71851fcbcb5a2f773e2fb6cb1edf2" -dependencies = [ - "typenum", - "version_check", -] - [[package]] name = "gethostname" version = "1.0.2" @@ -843,15 +758,6 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" -[[package]] -name = "hmac" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" -dependencies = [ - "digest", -] - [[package]] name = "http" version = "1.3.1" @@ -1104,15 +1010,6 @@ version = "2.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f4c7245a08504955605670dbf141fceab975f15ca21570696aebe9d2e71576bd" -[[package]] -name = "inout" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "879f10e63c20629ecabbb64a8010319738c66a5cd0c29b02d63d272b03751d01" -dependencies = [ - "generic-array", -] - [[package]] name = "instant" version = "0.1.13" @@ -1211,26 +1108,6 @@ version = "0.2.175" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a82ae493e598baaea5209805c49bbf2ea7de956d50d7da0da1164f9c6d28543" -[[package]] -name = "liblzma" -version = "0.4.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73c36d08cad03a3fbe2c4e7bb3a9e84c57e4ee4135ed0b065cade3d98480c648" -dependencies = [ - "liblzma-sys", -] - -[[package]] -name = "liblzma-sys" -version = "0.4.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01b9596486f6d60c3bbe644c0e1be1aa6ccc472ad630fe8927b456973d7cb736" -dependencies = [ - "cc", - "libc", - "pkg-config", -] - [[package]] name = "libredox" version = "0.1.9" @@ -1406,12 +1283,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "num-conv" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" - [[package]] name = "num-traits" version = "0.2.19" @@ -1463,9 +1334,9 @@ checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" [[package]] name = "opentelemetry" -version = "0.30.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aaf416e4cb72756655126f7dd7bb0af49c674f4c1b9903e80c009e0c37e552e6" +checksum = "b84bcd6ae87133e903af7ef497404dda70c60d0ea14895fc8a5e6722754fc2a0" dependencies = [ "futures-core", "futures-sink", @@ -1477,9 +1348,9 @@ dependencies = [ [[package]] name = "opentelemetry-http" -version = "0.30.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50f6639e842a97dbea8886e3439710ae463120091e2e064518ba8e716e6ac36d" +checksum = "d7a6d09a73194e6b66df7c8f1b680f156d916a1a942abf2de06823dd02b7855d" dependencies = [ "async-trait", "bytes", @@ -1490,9 +1361,9 @@ dependencies = [ [[package]] name = "opentelemetry-otlp" -version = "0.30.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dbee664a43e07615731afc539ca60c6d9f1a9425e25ca09c57bc36c87c55852b" +checksum = "7a2366db2dca4d2ad033cad11e6ee42844fd727007af5ad04a1730f4cb8163bf" dependencies = [ "http", "opentelemetry", @@ -1509,21 +1380,22 @@ dependencies = [ [[package]] name = "opentelemetry-proto" -version = "0.30.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e046fd7660710fe5a05e8748e70d9058dc15c94ba914e7c4faa7c728f0e8ddc" +checksum = "a7175df06de5eaee9909d4805a3d07e28bb752c34cab57fa9cff549da596b30f" dependencies = [ "opentelemetry", "opentelemetry_sdk", "prost", "tonic", + "tonic-prost", ] [[package]] name = "opentelemetry_sdk" -version = "0.30.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11f644aa9e5e31d11896e024305d7e3c98a88884d9f8919dbf37a9991bc47a4b" +checksum = "e14ae4f5991976fd48df6d843de219ca6d31b01daaab2dad5af2badeded372bd" dependencies = [ "futures-channel", "futures-executor", @@ -1531,7 +1403,6 @@ dependencies = [ "opentelemetry", "percent-encoding", "rand 0.9.2", - "serde_json", "thiserror 2.0.15", "tokio", "tokio-stream", @@ -1566,16 +1437,6 @@ dependencies = [ "windows-targets 0.52.6", ] -[[package]] -name = "pbkdf2" -version = "0.12.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8ed6a7761f76e3b9f92dfb0a60a6a6477c61024b775147ff0973a02653abaf2" -dependencies = [ - "digest", - "hmac", -] - [[package]] name = "percent-encoding" version = "2.3.1" @@ -1663,18 +1524,6 @@ dependencies = [ "zerovec", ] -[[package]] -name = "powerfmt" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" - -[[package]] -name = "ppmd-rust" -version = "1.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c834641d8ad1b348c9ee86dec3b9840d805acd5f24daa5f90c788951a52ff59b" - [[package]] name = "ppv-lite86" version = "0.2.21" @@ -1746,9 +1595,9 @@ dependencies = [ [[package]] name = "prost" -version = "0.13.5" +version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" +checksum = "7231bd9b3d3d33c86b58adbac74b5ec0ad9f496b19d22801d773636feaa95f3d" dependencies = [ "bytes", "prost-derive", @@ -1756,9 +1605,9 @@ dependencies = [ [[package]] name = "prost-build" -version = "0.13.5" +version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf" +checksum = "ac6c3320f9abac597dcbc668774ef006702672474aad53c6d596b62e487b40b1" dependencies = [ "heck", "itertools", @@ -1769,6 +1618,8 @@ dependencies = [ "prettyplease", "prost", "prost-types", + "pulldown-cmark", + "pulldown-cmark-to-cmark", "regex", "syn", "tempfile", @@ -1776,9 +1627,9 @@ dependencies = [ [[package]] name = "prost-derive" -version = "0.13.5" +version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" +checksum = "9120690fafc389a67ba3803df527d0ec9cbbc9cc45e4cc20b332996dfb672425" dependencies = [ "anyhow", "itertools", @@ -1789,18 +1640,18 @@ dependencies = [ [[package]] name = "prost-types" -version = "0.13.5" +version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16" +checksum = "b9b4db3d6da204ed77bb26ba83b6122a73aeb2e87e25fbf7ad2e84c4ccbf8f72" dependencies = [ "prost", ] [[package]] name = "prost-wkt" -version = "0.6.1" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "497e1e938f0c09ef9cabe1d49437b4016e03e8f82fbbe5d1c62a9b61b9decae1" +checksum = "655944d0ce015e71b3ec21279437e6a09e58433e50c7b0677901f3d5235e74f5" dependencies = [ "chrono", "inventory", @@ -1813,9 +1664,9 @@ dependencies = [ [[package]] name = "prost-wkt-build" -version = "0.6.1" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07b8bf115b70a7aa5af1fd5d6e9418492e9ccb6e4785e858c938e28d132a884b" +checksum = "f869f1443fee474b785e935d92e1007f57443e485f51668ed41943fc01a321a2" dependencies = [ "heck", "prost", @@ -1826,9 +1677,9 @@ dependencies = [ [[package]] name = "prost-wkt-types" -version = "0.6.1" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8cdde6df0a98311c839392ca2f2f0bcecd545f86a62b4e3c6a49c336e970fe5" +checksum = "eeeffd6b9becd4600dd461399f3f71aeda2ff0848802a9ed526cf12e8f42902a" dependencies = [ "chrono", "prost", @@ -1862,6 +1713,26 @@ dependencies = [ "thiserror 1.0.69", ] +[[package]] +name = "pulldown-cmark" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e8bbe1a966bd2f362681a44f6edce3c2310ac21e4d5067a6e7ec396297a6ea0" +dependencies = [ + "bitflags", + "memchr", + "unicase", +] + +[[package]] +name = "pulldown-cmark-to-cmark" +version = "21.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5b6a0769a491a08b31ea5c62494a8f144ee0987d86d670a8af4df1e1b7cde75" +dependencies = [ + "pulldown-cmark", +] + [[package]] name = "pyo3" version = "0.25.1" @@ -2417,17 +2288,6 @@ dependencies = [ "serde", ] -[[package]] -name = "sha1" -version = "0.10.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" -dependencies = [ - "cfg-if", - "cpufeatures", - "digest", -] - [[package]] name = "sharded-slab" version = "0.1.7" @@ -2618,6 +2478,7 @@ dependencies = [ "bytes", "derive_builder", "derive_more", + "dyn-clone", "futures-retry", "futures-util", "http", @@ -2748,16 +2609,15 @@ dependencies = [ "base64", "derive_more", "prost", - "prost-build", "prost-wkt", - "prost-wkt-build", "prost-wkt-types", "rand 0.9.2", "serde", "serde_json", "thiserror 2.0.15", "tonic", - "tonic-build", + "tonic-prost", + "tonic-prost-build", "uuid", ] @@ -2816,25 +2676,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "time" -version = "0.3.44" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91e7d9e3bb61134e77bde20dd4825b97c010155709965fedf0f49bb138e52a9d" -dependencies = [ - "deranged", - "num-conv", - "powerfmt", - "serde", - "time-core", -] - -[[package]] -name = "time-core" -version = "0.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "40868e7c1d2f0b8d73e4a8c7f0ff63af4f6d19be117e90bd73eb1d62cf831c6b" - [[package]] name = "tinystr" version = "0.8.1" @@ -2966,9 +2807,9 @@ checksum = "fcc842091f2def52017664b53082ecbbeb5c7731092bad69d2c63050401dfd64" [[package]] name = "tonic" -version = "0.13.1" +version = "0.14.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e581ba15a835f4d9ea06c55ab1bd4dce26fc53752c69a04aac00703bfb49ba9" +checksum = "eb7613188ce9f7df5bfe185db26c5814347d110db17920415cf2fbcad85e7203" dependencies = [ "async-trait", "axum", @@ -2983,9 +2824,9 @@ dependencies = [ "hyper-util", "percent-encoding", "pin-project", - "prost", "rustls-native-certs", - "socket2 0.5.10", + "socket2 0.6.0", + "sync_wrapper", "tokio", "tokio-rustls", "tokio-stream", @@ -2997,9 +2838,32 @@ dependencies = [ [[package]] name = "tonic-build" -version = "0.13.1" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c40aaccc9f9eccf2cd82ebc111adc13030d23e887244bc9cfa5d1d636049de3" +dependencies = [ + "prettyplease", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tonic-prost" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66bd50ad6ce1252d87ef024b3d64fe4c3cf54a86fb9ef4c631fdd0ded7aeaa67" +dependencies = [ + "bytes", + "prost", + "tonic", +] + +[[package]] +name = "tonic-prost-build" +version = "0.14.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eac6f67be712d12f0b41328db3137e0d0757645d8904b4cb7d51cd9c2279e847" +checksum = "b4a16cba4043dc3ff43fcb3f96b4c5c154c64cbd18ca8dce2ab2c6a451d058a2" dependencies = [ "prettyplease", "proc-macro2", @@ -3007,6 +2871,8 @@ dependencies = [ "prost-types", "quote", "syn", + "tempfile", + "tonic-build", ] [[package]] @@ -3119,12 +2985,6 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc7d623258602320d5c55d1bc22793b57daff0ec7efc270ea7d55ce1d5f5471c" -[[package]] -name = "typenum" -version = "1.19.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb" - [[package]] name = "typetag" version = "0.2.20" @@ -3149,6 +3009,12 @@ dependencies = [ "syn", ] +[[package]] +name = "unicase" +version = "2.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75b844d17643ee918803943289730bec8aac480150456169e647ed0b576ba539" + [[package]] name = "unicode-ident" version = "1.0.18" @@ -3731,20 +3597,6 @@ name = "zeroize" version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" -dependencies = [ - "zeroize_derive", -] - -[[package]] -name = "zeroize_derive" -version = "1.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] [[package]] name = "zerotrie" @@ -3785,23 +3637,12 @@ version = "4.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "caa8cd6af31c3b31c6631b8f483848b91589021b28fffe50adada48d4f4d2ed1" dependencies = [ - "aes", "arbitrary", "bzip2", - "constant_time_eq", "crc32fast", - "deflate64", "flate2", - "getrandom 0.3.3", - "hmac", "indexmap", - "liblzma", "memchr", - "pbkdf2", - "ppmd-rust", - "sha1", - "time", - "zeroize", "zopfli", "zstd", ] diff --git a/temporalio/bridge/Cargo.toml b/temporalio/bridge/Cargo.toml index ad839c19b..1da8d5d2e 100644 --- a/temporalio/bridge/Cargo.toml +++ b/temporalio/bridge/Cargo.toml @@ -19,7 +19,7 @@ crate-type = ["cdylib"] anyhow = "1.0" async-trait = "0.1" futures = "0.3" -prost = "0.13" +prost = "0.14" pyo3 = { version = "0.25", features = [ "extension-module", "abi3-py310", @@ -38,7 +38,7 @@ temporal-sdk-core-api = { version = "0.1.0", path = "./sdk-core/core-api", featu temporal-sdk-core-protos = { version = "0.1.0", path = "./sdk-core/sdk-core-protos" } tokio = "1.26" tokio-stream = "0.1" -tonic = "0.13" +tonic = "0.14" tracing = "0.1" url = "2.2" diff --git a/temporalio/bridge/runtime.py b/temporalio/bridge/runtime.py index a06f9686c..b08b97584 100644 --- a/temporalio/bridge/runtime.py +++ b/temporalio/bridge/runtime.py @@ -96,7 +96,7 @@ class RuntimeOptions: """Python representation of the Rust struct for runtime options.""" telemetry: TelemetryConfig - worker_heartbeat_interval_millis: Optional[int] = 30000 # 30s + worker_heartbeat_interval_millis: Optional[int] = 30_000 # 30s # WARNING: This must match Rust runtime::BufferedLogEntry diff --git a/temporalio/bridge/sdk-core b/temporalio/bridge/sdk-core index 401a38506..9e9a46191 160000 --- a/temporalio/bridge/sdk-core +++ b/temporalio/bridge/sdk-core @@ -1 +1 @@ -Subproject commit 401a38506b8b5780c8c6dd87016c825b05c2a70e +Subproject commit 9e9a46191656fc9ccd95589dac3552410561d620 diff --git a/temporalio/bridge/src/client.rs b/temporalio/bridge/src/client.rs index dfbd432a1..68ebb1cb4 100644 --- a/temporalio/bridge/src/client.rs +++ b/temporalio/bridge/src/client.rs @@ -6,10 +6,11 @@ use std::time::Duration; use temporal_client::{ ClientKeepAliveConfig as CoreClientKeepAliveConfig, ClientOptions, ClientOptionsBuilder, ConfiguredClient, HttpConnectProxyOptions, RetryClient, RetryConfig, - TemporalServiceClientWithMetrics, TlsConfig, + TlsConfig, TemporalServiceClient, }; -use tonic::metadata::{ - AsciiMetadataKey, AsciiMetadataValue, BinaryMetadataKey, BinaryMetadataValue, +use temporal_client::tonic::{ + self, + metadata::{AsciiMetadataKey, AsciiMetadataValue, BinaryMetadataKey, BinaryMetadataValue}, }; use url::Url; @@ -17,7 +18,7 @@ use crate::runtime; pyo3::create_exception!(temporal_sdk_bridge, RPCError, PyException); -type Client = RetryClient>; +type Client = RetryClient>; #[pyclass] pub struct ClientRef { diff --git a/temporalio/worker/_replayer.py b/temporalio/worker/_replayer.py index a1df98074..fae1e1c05 100644 --- a/temporalio/worker/_replayer.py +++ b/temporalio/worker/_replayer.py @@ -80,7 +80,6 @@ def __init__( runtime=runtime, disable_safe_workflow_eviction=disable_safe_workflow_eviction, header_codec_behavior=header_codec_behavior, - plugins=[plugin.name() for plugin in plugins], ) # Apply plugin configuration @@ -294,7 +293,7 @@ def on_eviction_hook( nexus_task_poller_behavior=temporalio.bridge.worker.PollerBehaviorSimpleMaximum( 1 ), - plugins=self._config.get("plugins", []), + plugins=[plugin.name() for plugin in self.plugins], skip_client_worker_set_check=True, ), ) @@ -371,7 +370,6 @@ class ReplayerConfig(TypedDict, total=False): runtime: Optional[temporalio.runtime.Runtime] disable_safe_workflow_eviction: bool header_codec_behavior: HeaderCodecBehavior - plugins: Sequence[str] @dataclass(frozen=True) diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index fa20ef6a7..1cc9a045a 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -381,7 +381,7 @@ def __init__( f"The same plugin type {type(client_plugin)} is present from both client and worker. It may run twice and may not be the intended behavior." ) plugins = plugins_from_client + list(plugins) - config["plugins"] = [plugin.name() for plugin in plugins] + config["plugins"] = plugins self.plugins = plugins for plugin in plugins: @@ -615,7 +615,7 @@ def check_activity(activity): nexus_task_poller_behavior=config[ "nexus_task_poller_behavior" ]._to_bridge(), - plugins=config.get("plugins", []), + plugins=[plugin.name() for plugin in config.get("plugins", [])], skip_client_worker_set_check=config["skip_client_worker_set_check"], ), ) @@ -629,7 +629,6 @@ def config(self) -> WorkerConfig: config = self._config.copy() config["activities"] = list(config.get("activities", [])) config["workflows"] = list(config.get("workflows", [])) - config["plugins"] = list(config.get("plugins", [])) return config @property @@ -911,7 +910,7 @@ class WorkerConfig(TypedDict, total=False): workflow_task_poller_behavior: PollerBehavior activity_task_poller_behavior: PollerBehavior nexus_task_poller_behavior: PollerBehavior - plugins: Sequence[str] + plugins: Sequence[Plugin] skip_client_worker_set_check: bool diff --git a/tests/test_plugins.py b/tests/test_plugins.py index 651f197aa..23a4ddd9e 100644 --- a/tests/test_plugins.py +++ b/tests/test_plugins.py @@ -150,7 +150,7 @@ async def test_worker_plugin_basic_config(client: Client) -> None: plugins=[MyWorkerPlugin()], ) assert worker.config().get("task_queue") == "replaced_queue" - assert worker.config().get("plugins") == [MyWorkerPlugin().name()] + assert [p.name() for p in worker.config().get("plugins")] == [MyWorkerPlugin().name()] # Test client plugin propagation to worker plugins new_config = client.config() @@ -158,7 +158,7 @@ async def test_worker_plugin_basic_config(client: Client) -> None: client = Client(**new_config) worker = Worker(client, task_queue="queue", activities=[never_run_activity]) assert worker.config().get("task_queue") == "combined" - assert worker.config().get("plugins") == [MyCombinedPlugin().name()] + assert [p.name() for p in worker.config().get("plugins")] == [MyCombinedPlugin().name()] # Test both. Client propagated plugins are called first, so the worker plugin overrides in this case worker = Worker( @@ -168,7 +168,7 @@ async def test_worker_plugin_basic_config(client: Client) -> None: plugins=[MyWorkerPlugin()], ) assert worker.config().get("task_queue") == "replaced_queue" - assert worker.config().get("plugins") == [ + assert [p.name() for p in worker.config().get("plugins")] == [ MyCombinedPlugin().name(), MyWorkerPlugin().name(), ] @@ -307,7 +307,6 @@ async def test_replay(client: Client) -> None: replayer = Replayer(workflows=[], plugins=[plugin]) assert len(replayer.config().get("workflows") or []) == 1 assert replayer.config().get("data_converter") == pydantic_data_converter - assert replayer.config().get("plugins") == [plugin.name()] await replayer.replay_workflow(await handle.fetch_history()) diff --git a/tests/test_runtime.py b/tests/test_runtime.py index 330b45881..a11b17b51 100644 --- a/tests/test_runtime.py +++ b/tests/test_runtime.py @@ -21,7 +21,7 @@ TelemetryFilter, ) from temporalio.worker import Worker -from tests.helpers import assert_eq_eventually, assert_eventually, find_free_port +from tests.helpers import assert_eq_eventually, assert_eventually, find_free_port, worker_versioning_enabled @workflow.defn @@ -260,10 +260,12 @@ async def check_metrics() -> None: def test_runtime_options_to_bridge_config() -> None: - assert RuntimeOptions()._to_bridge_config().worker_heartbeat_interval_millis is None - options = RuntimeOptions(worker_heartbeat_interval=timedelta(seconds=30)) - bridge_config = options._to_bridge_config() - assert bridge_config.worker_heartbeat_interval_millis == 30_000 + assert RuntimeOptions()._to_bridge_config().worker_heartbeat_interval_millis == 30_000 + bridge_config = RuntimeOptions(worker_heartbeat_interval=timedelta(seconds=60))._to_bridge_config() + assert bridge_config.worker_heartbeat_interval_millis == 60_000 + + bridge_config1 = RuntimeOptions(worker_heartbeat_interval=None)._to_bridge_config() + assert bridge_config1.worker_heartbeat_interval_millis is None def test_runtime_options_invalid_heartbeat() -> None: diff --git a/tests/worker/test_worker.py b/tests/worker/test_worker.py index 02e2a69ec..daf1f0f47 100644 --- a/tests/worker/test_worker.py +++ b/tests/worker/test_worker.py @@ -1139,7 +1139,7 @@ async def test_can_run_autoscaling_polling_worker( activity_pollers = [l for l in matches if "activity_task" in l] assert len(activity_pollers) == 1 assert activity_pollers[0].endswith("2") - workflow_pollers = [l for l in matches if "workflow_task" in l] + workflow_pollers = [l for l in matches if "workflow_task" in l and w.task_queue in l] assert len(workflow_pollers) == 2 # There's sticky & non-sticky pollers, and they may have a count of 1 or 2 depending on # initialization timing. diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 220bb5e2b..8616ff099 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -5210,57 +5210,19 @@ async def run(self) -> None: await asyncio.sleep(0.1) -async def test_workflow_replace_worker_client(client: Client, env: WorkflowEnvironment): - if env.supports_time_skipping: - pytest.skip("Only testing against two real servers") - # We are going to start a second ephemeral server and then replace the - # client. So we will start a no-cache ticking workflow with the current - # client and confirm it has accomplished at least one task. Then we will - # start another on the other client, and confirm it gets started too. Then - # we will terminate both. We have to use a ticking workflow with only one - # poller to force a quick re-poll to recognize our client change quickly (as - # opposed to just waiting the minute for poll timeout). - async with await WorkflowEnvironment.start_local( - dev_server_download_version=DEV_SERVER_DOWNLOAD_VERSION - ) as other_env: - # Start both workflows on different servers - task_queue = f"tq-{uuid.uuid4()}" - handle1 = await client.start_workflow( - TickingWorkflow.run, id=f"wf-{uuid.uuid4()}", task_queue=task_queue - ) - handle2 = await other_env.client.start_workflow( - TickingWorkflow.run, id=f"wf-{uuid.uuid4()}", task_queue=task_queue - ) - - async def any_task_completed(handle: WorkflowHandle) -> bool: - async for e in handle.fetch_history_events(): - if e.HasField("workflow_task_completed_event_attributes"): - return True - return False - - # Now start the worker on the first env - async with Worker( - client, - task_queue=task_queue, - workflows=[TickingWorkflow], - max_cached_workflows=0, - max_concurrent_workflow_task_polls=1, - ) as worker: - # Confirm the first ticking workflow has completed a task but not - # the second - await assert_eq_eventually(True, lambda: any_task_completed(handle1)) - assert not await any_task_completed(handle2) - - # Now replace the client, which should be used fairly quickly - # because we should have timer-done poll completions every 100ms - worker.client = other_env.client - - # Now confirm the other workflow has started - await assert_eq_eventually(True, lambda: any_task_completed(handle2)) - - # Terminate both - await handle1.terminate() - await handle2.terminate() +async def test_workflow_replace_worker_client(client: Client): + other_runtime = Runtime() + other_client = await Client.connect( + client.service_client.config.target_host, + namespace=client.namespace, + runtime=other_runtime, + ) + async with new_worker(client, HelloWorkflow) as worker: + with pytest.raises( + ValueError, + match="New client is not on the same runtime as the existing client", + ): + worker.client = other_client @activity.defn(dynamic=True) From 1124b55837266d4961f19f44899b59f6e59f5ed9 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Mon, 20 Oct 2025 18:03:46 -0700 Subject: [PATCH 06/16] poe lint --- tests/test_plugins.py | 10 +++++++--- tests/test_runtime.py | 15 ++++++++++++--- tests/worker/test_worker.py | 4 +++- 3 files changed, 22 insertions(+), 7 deletions(-) diff --git a/tests/test_plugins.py b/tests/test_plugins.py index 23a4ddd9e..b438d6808 100644 --- a/tests/test_plugins.py +++ b/tests/test_plugins.py @@ -150,7 +150,9 @@ async def test_worker_plugin_basic_config(client: Client) -> None: plugins=[MyWorkerPlugin()], ) assert worker.config().get("task_queue") == "replaced_queue" - assert [p.name() for p in worker.config().get("plugins")] == [MyWorkerPlugin().name()] + assert [p.name() for p in worker.config().get("plugins", [])] == [ + MyWorkerPlugin().name() + ] # Test client plugin propagation to worker plugins new_config = client.config() @@ -158,7 +160,9 @@ async def test_worker_plugin_basic_config(client: Client) -> None: client = Client(**new_config) worker = Worker(client, task_queue="queue", activities=[never_run_activity]) assert worker.config().get("task_queue") == "combined" - assert [p.name() for p in worker.config().get("plugins")] == [MyCombinedPlugin().name()] + assert [p.name() for p in worker.config().get("plugins", [])] == [ + MyCombinedPlugin().name() + ] # Test both. Client propagated plugins are called first, so the worker plugin overrides in this case worker = Worker( @@ -168,7 +172,7 @@ async def test_worker_plugin_basic_config(client: Client) -> None: plugins=[MyWorkerPlugin()], ) assert worker.config().get("task_queue") == "replaced_queue" - assert [p.name() for p in worker.config().get("plugins")] == [ + assert [p.name() for p in worker.config().get("plugins", [])] == [ MyCombinedPlugin().name(), MyWorkerPlugin().name(), ] diff --git a/tests/test_runtime.py b/tests/test_runtime.py index a11b17b51..32219951d 100644 --- a/tests/test_runtime.py +++ b/tests/test_runtime.py @@ -21,7 +21,12 @@ TelemetryFilter, ) from temporalio.worker import Worker -from tests.helpers import assert_eq_eventually, assert_eventually, find_free_port, worker_versioning_enabled +from tests.helpers import ( + assert_eq_eventually, + assert_eventually, + find_free_port, + worker_versioning_enabled, +) @workflow.defn @@ -260,8 +265,12 @@ async def check_metrics() -> None: def test_runtime_options_to_bridge_config() -> None: - assert RuntimeOptions()._to_bridge_config().worker_heartbeat_interval_millis == 30_000 - bridge_config = RuntimeOptions(worker_heartbeat_interval=timedelta(seconds=60))._to_bridge_config() + assert ( + RuntimeOptions()._to_bridge_config().worker_heartbeat_interval_millis == 30_000 + ) + bridge_config = RuntimeOptions( + worker_heartbeat_interval=timedelta(seconds=60) + )._to_bridge_config() assert bridge_config.worker_heartbeat_interval_millis == 60_000 bridge_config1 = RuntimeOptions(worker_heartbeat_interval=None)._to_bridge_config() diff --git a/tests/worker/test_worker.py b/tests/worker/test_worker.py index daf1f0f47..0bc706091 100644 --- a/tests/worker/test_worker.py +++ b/tests/worker/test_worker.py @@ -1139,7 +1139,9 @@ async def test_can_run_autoscaling_polling_worker( activity_pollers = [l for l in matches if "activity_task" in l] assert len(activity_pollers) == 1 assert activity_pollers[0].endswith("2") - workflow_pollers = [l for l in matches if "workflow_task" in l and w.task_queue in l] + workflow_pollers = [ + l for l in matches if "workflow_task" in l and w.task_queue in l + ] assert len(workflow_pollers) == 2 # There's sticky & non-sticky pollers, and they may have a count of 1 or 2 depending on # initialization timing. From a305b2caaa6f779b0af4891ae5ba839e650e165e Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Tue, 21 Oct 2025 10:36:41 -0700 Subject: [PATCH 07/16] More rusty, clarify naming --- temporalio/bridge/src/runtime.rs | 21 +++++++-------------- tests/conftest.py | 2 +- 2 files changed, 8 insertions(+), 15 deletions(-) diff --git a/temporalio/bridge/src/runtime.rs b/temporalio/bridge/src/runtime.rs index 4d6ec90fb..be77a1676 100644 --- a/temporalio/bridge/src/runtime.rs +++ b/temporalio/bridge/src/runtime.rs @@ -131,20 +131,13 @@ pub fn init_runtime(options: RuntimeOptions) -> PyResult { } } - let mut runtime_options_build = RuntimeOptionsBuilder::default(); - runtime_options_build.telemetry_options( - telemetry_build - .build() - .map_err(|err| PyValueError::new_err(format!("Invalid telemetry config: {err}")))?, - ); - - if let Some(ms) = worker_heartbeat_interval_millis { - runtime_options_build.heartbeat_interval(Some(Duration::from_millis(ms))); - } else { - runtime_options_build.heartbeat_interval(None); - } - - let runtime_options = runtime_options_build + let runtime_options = RuntimeOptionsBuilder::default() + .telemetry_options( + telemetry_build + .build() + .map_err(|err| PyValueError::new_err(format!("Invalid telemetry config: {err}")))?, + ) + .heartbeat_interval(worker_heartbeat_interval_millis.map(Duration::from_millis)) .build() .map_err(|err| PyValueError::new_err(format!("Invalid runtime options: {err}")))?; diff --git a/tests/conftest.py b/tests/conftest.py index edefef2cf..0e2156f86 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -60,7 +60,7 @@ def pytest_addoption(parser): @pytest.fixture(autouse=True) -def _force_worker_skip_client_set(monkeypatch): +def _skip_client_worker_set_check(monkeypatch): original_init = temporalio.worker.Worker.__init__ def patched_init(self, *args, **kwargs): From 9d40b54f44b314dfb3a782cdf76241c929cc02ee Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Tue, 21 Oct 2025 10:53:01 -0700 Subject: [PATCH 08/16] More rusty, clarify naming, simplify Runtime args, add Client plugins to list --- temporalio/client.py | 5 +++ temporalio/runtime.py | 62 +++++++++++------------------------ temporalio/worker/_worker.py | 6 +++- tests/test_runtime.py | 27 ++++++++------- tests/worker/test_worker.py | 1 - tests/worker/test_workflow.py | 2 +- 6 files changed, 45 insertions(+), 58 deletions(-) diff --git a/temporalio/client.py b/temporalio/client.py index 770a51392..336645375 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -350,6 +350,11 @@ def api_key(self, value: Optional[str]) -> None: self.service_client.config.api_key = value self.service_client.update_api_key(value) + @property + def plugins(self) -> Sequence[Plugin]: + """Plugins used by this client.""" + return self._config["plugins"] + # Overload for no-param workflow @overload async def start_workflow( diff --git a/temporalio/runtime.py b/temporalio/runtime.py index 268cdc418..376794d6c 100644 --- a/temporalio/runtime.py +++ b/temporalio/runtime.py @@ -73,8 +73,8 @@ def set_default(runtime: Runtime, *, error_if_already_set: bool = True) -> None: def __init__( self, *, - telemetry: Optional[TelemetryConfig] = None, - runtime_options: Optional["RuntimeOptions"] = None, + telemetry: TelemetryConfig, + worker_heartbeat_interval: Optional[timedelta] = timedelta(seconds=30), ) -> None: """Create a runtime with the provided configuration. @@ -83,25 +83,29 @@ def __init__( Args: telemetry: Telemetry configuration when not supplying ``runtime_options``. - runtime_options: Full runtime configuration including telemetry and - worker heartbeating options. + worker_heartbeat_interval: Interval for worker heartbeats. ``None`` + disables heartbeating. Raises: - ValueError: If both ``telemetry`` and ``runtime_options`` are - provided. + ValueError: If both ```runtime_options`` is a negative value. """ - if runtime_options and telemetry: - raise ValueError("Cannot supply both telemetry and runtime_options") - - if runtime_options is None: - telemetry = telemetry or TelemetryConfig() - runtime_options = RuntimeOptions(telemetry=telemetry) + if worker_heartbeat_interval is None: + heartbeat_millis = None else: - telemetry = runtime_options.telemetry + if worker_heartbeat_interval <= timedelta(0): + raise ValueError("worker_heartbeat_interval must be positive") + heartbeat_millis = int(worker_heartbeat_interval.total_seconds() * 1000) + if heartbeat_millis == 0: + heartbeat_millis = 1 + + self._heartbeat_millis = heartbeat_millis - self._core_runtime = temporalio.bridge.runtime.Runtime( - options=runtime_options._to_bridge_config() + runtime_options = temporalio.bridge.runtime.RuntimeOptions( + telemetry=telemetry._to_bridge_config(), + worker_heartbeat_interval_millis=heartbeat_millis, ) + + self._core_runtime = temporalio.bridge.runtime.Runtime(options=runtime_options) if isinstance(telemetry.metrics, MetricBuffer): telemetry.metrics._runtime = self core_meter = temporalio.bridge.metric.MetricMeter.create(self._core_runtime) @@ -415,34 +419,6 @@ def _to_bridge_config(self) -> temporalio.bridge.runtime.TelemetryConfig: ) -@dataclass(frozen=True) -class RuntimeOptions: - """Configuration for runtime initialization.""" - - telemetry: TelemetryConfig = field(default_factory=TelemetryConfig) - """Telemetry configuration applied to the runtime.""" - - worker_heartbeat_interval: Optional[timedelta] = timedelta(seconds=30) - """Interval for worker heartbeats. ``None`` disables heartbeating.""" - - def _to_bridge_config(self) -> temporalio.bridge.runtime.RuntimeOptions: - heartbeat_millis: Optional[int] - if self.worker_heartbeat_interval is None: - heartbeat_millis = None - else: - if self.worker_heartbeat_interval <= timedelta(0): - raise ValueError("worker_heartbeat_interval must be positive") - heartbeat_millis = int( - self.worker_heartbeat_interval.total_seconds() * 1000 - ) - if heartbeat_millis == 0: - heartbeat_millis = 1 - return temporalio.bridge.runtime.RuntimeOptions( - telemetry=self.telemetry._to_bridge_config(), - worker_heartbeat_interval_millis=heartbeat_millis, - ) - - BufferedMetricKind = NewType("BufferedMetricKind", int) """Representation of a buffered metric kind.""" diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index 1cc9a045a..b56b0d488 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -561,6 +561,10 @@ def check_activity(activity): maximum=config["max_concurrent_activity_task_polls"] ) + worker_plugins = [plugin.name() for plugin in config.get("plugins", [])] + client_plugins = [plugin.name() for plugin in config["client"].plugins] + plugins = list(set(worker_plugins + client_plugins)) + # Create bridge worker last. We have empirically observed that if it is # created before an error is raised from the activity worker # constructor, a deadlock/hang will occur presumably while trying to @@ -615,7 +619,7 @@ def check_activity(activity): nexus_task_poller_behavior=config[ "nexus_task_poller_behavior" ]._to_bridge(), - plugins=[plugin.name() for plugin in config.get("plugins", [])], + plugins=plugins, skip_client_worker_set_check=config["skip_client_worker_set_check"], ), ) diff --git a/tests/test_runtime.py b/tests/test_runtime.py index 32219951d..5677eb040 100644 --- a/tests/test_runtime.py +++ b/tests/test_runtime.py @@ -16,7 +16,6 @@ LoggingConfig, PrometheusConfig, Runtime, - RuntimeOptions, TelemetryConfig, TelemetryFilter, ) @@ -265,20 +264,24 @@ async def check_metrics() -> None: def test_runtime_options_to_bridge_config() -> None: - assert ( - RuntimeOptions()._to_bridge_config().worker_heartbeat_interval_millis == 30_000 + runtime = Runtime(telemetry=TelemetryConfig()) + assert runtime._heartbeat_millis == 30_000 + + runtime = Runtime( + telemetry=TelemetryConfig(), + worker_heartbeat_interval=timedelta(seconds=60), ) - bridge_config = RuntimeOptions( - worker_heartbeat_interval=timedelta(seconds=60) - )._to_bridge_config() - assert bridge_config.worker_heartbeat_interval_millis == 60_000 + assert runtime._heartbeat_millis == 60_000 - bridge_config1 = RuntimeOptions(worker_heartbeat_interval=None)._to_bridge_config() - assert bridge_config1.worker_heartbeat_interval_millis is None + runtime = Runtime( + telemetry=TelemetryConfig(), + worker_heartbeat_interval=None, + ) + assert runtime._heartbeat_millis is None def test_runtime_options_invalid_heartbeat() -> None: with pytest.raises(ValueError): - RuntimeOptions( - worker_heartbeat_interval=timedelta(seconds=-5) - )._to_bridge_config() + Runtime( + telemetry=TelemetryConfig(), worker_heartbeat_interval=timedelta(seconds=-5) + ) diff --git a/tests/worker/test_worker.py b/tests/worker/test_worker.py index 0bc706091..116fe4336 100644 --- a/tests/worker/test_worker.py +++ b/tests/worker/test_worker.py @@ -32,7 +32,6 @@ from temporalio.runtime import ( PrometheusConfig, Runtime, - RuntimeOptions, TelemetryConfig, ) from temporalio.service import RPCError diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 8616ff099..ebfb3f04a 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -5211,7 +5211,7 @@ async def run(self) -> None: async def test_workflow_replace_worker_client(client: Client): - other_runtime = Runtime() + other_runtime = Runtime(telemetry=TelemetryConfig()) other_client = await Client.connect( client.service_client.config.target_host, namespace=client.namespace, From 5b9e0c20db8d7dad466f0b595459cb0f8a7fb9ef Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Tue, 21 Oct 2025 14:02:55 -0700 Subject: [PATCH 09/16] Move new plugin test to bridge --- tests/bridge/test_plugins.py | 39 ++++++++++++++++++++++++++++++++++++ tests/test_plugins.py | 30 --------------------------- 2 files changed, 39 insertions(+), 30 deletions(-) create mode 100644 tests/bridge/test_plugins.py diff --git a/tests/bridge/test_plugins.py b/tests/bridge/test_plugins.py new file mode 100644 index 000000000..28f7af43f --- /dev/null +++ b/tests/bridge/test_plugins.py @@ -0,0 +1,39 @@ +from collections import Counter + +import pytest + +import temporalio.bridge.temporal_sdk_bridge +from temporalio.client import Client +from temporalio.plugin import SimplePlugin +from temporalio.worker import Worker +from tests.worker.test_worker import never_run_activity + + +async def test_worker_plugin_names_forwarded_to_core( + client: Client, monkeypatch: pytest.MonkeyPatch +) -> None: + captured_plugins: list[str] = [] + + original_new_worker = temporalio.bridge.temporal_sdk_bridge.new_worker + + def new_worker_wrapper(runtime_ref, client_ref, config): + nonlocal captured_plugins + captured_plugins = list(config.plugins) + return original_new_worker(runtime_ref, client_ref, config) + + monkeypatch.setattr( + temporalio.bridge.temporal_sdk_bridge, + "new_worker", + new_worker_wrapper, + ) + + plugin1 = SimplePlugin("test-worker-plugin1") + plugin2 = SimplePlugin("test-worker-plugin2") + Worker( + client, + task_queue="queue", + activities=[never_run_activity], + plugins=[plugin1, plugin2], + ) + # Use counter to compare unordered lists + assert Counter(captured_plugins) == Counter([plugin1.name(), plugin2.name()]) diff --git a/tests/test_plugins.py b/tests/test_plugins.py index b438d6808..d51c4781c 100644 --- a/tests/test_plugins.py +++ b/tests/test_plugins.py @@ -6,7 +6,6 @@ import pytest -import temporalio.bridge.temporal_sdk_bridge import temporalio.client import temporalio.converter import temporalio.worker @@ -178,35 +177,6 @@ async def test_worker_plugin_basic_config(client: Client) -> None: ] -async def test_worker_plugin_names_forwarded_to_core( - client: Client, monkeypatch: pytest.MonkeyPatch -) -> None: - captured_plugins: list[str] = [] - - original_new_worker = temporalio.bridge.temporal_sdk_bridge.new_worker - - def new_worker_wrapper(runtime_ref, client_ref, config): - nonlocal captured_plugins - captured_plugins = list(config.plugins) - return original_new_worker(runtime_ref, client_ref, config) - - monkeypatch.setattr( - temporalio.bridge.temporal_sdk_bridge, - "new_worker", - new_worker_wrapper, - ) - - plugin1 = SimplePlugin("test-worker-plugin1") - plugin2 = SimplePlugin("test-worker-plugin2") - worker = Worker( - client, - task_queue="queue", - activities=[never_run_activity], - plugins=[plugin1, plugin2], - ) - assert captured_plugins == [plugin1.name(), plugin2.name()] - - async def test_worker_duplicated_plugin(client: Client) -> None: new_config = client.config() new_config["plugins"] = [MyCombinedPlugin()] From ab41d1005f72bfe20b9120b974ffe2671500cf00 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Tue, 21 Oct 2025 14:30:56 -0700 Subject: [PATCH 10/16] switch poe lint and poe build-develop order in CI, revert test move --- .github/workflows/ci.yml | 2 +- tests/bridge/test_plugins.py | 39 ------------------------------------ tests/test_plugins.py | 30 +++++++++++++++++++++++++++ 3 files changed, 31 insertions(+), 40 deletions(-) delete mode 100644 tests/bridge/test_plugins.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e24187c5f..56bdf131c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -62,8 +62,8 @@ jobs: - run: uv sync --all-extras - run: poe bridge-lint if: ${{ matrix.clippyLinter }} - - run: poe lint - run: poe build-develop + - run: poe lint - run: mkdir junit-xml - run: poe test ${{matrix.pytestExtraArgs}} -s --junit-xml=junit-xml/${{ matrix.python }}--${{ matrix.os }}.xml timeout-minutes: 15 diff --git a/tests/bridge/test_plugins.py b/tests/bridge/test_plugins.py deleted file mode 100644 index 28f7af43f..000000000 --- a/tests/bridge/test_plugins.py +++ /dev/null @@ -1,39 +0,0 @@ -from collections import Counter - -import pytest - -import temporalio.bridge.temporal_sdk_bridge -from temporalio.client import Client -from temporalio.plugin import SimplePlugin -from temporalio.worker import Worker -from tests.worker.test_worker import never_run_activity - - -async def test_worker_plugin_names_forwarded_to_core( - client: Client, monkeypatch: pytest.MonkeyPatch -) -> None: - captured_plugins: list[str] = [] - - original_new_worker = temporalio.bridge.temporal_sdk_bridge.new_worker - - def new_worker_wrapper(runtime_ref, client_ref, config): - nonlocal captured_plugins - captured_plugins = list(config.plugins) - return original_new_worker(runtime_ref, client_ref, config) - - monkeypatch.setattr( - temporalio.bridge.temporal_sdk_bridge, - "new_worker", - new_worker_wrapper, - ) - - plugin1 = SimplePlugin("test-worker-plugin1") - plugin2 = SimplePlugin("test-worker-plugin2") - Worker( - client, - task_queue="queue", - activities=[never_run_activity], - plugins=[plugin1, plugin2], - ) - # Use counter to compare unordered lists - assert Counter(captured_plugins) == Counter([plugin1.name(), plugin2.name()]) diff --git a/tests/test_plugins.py b/tests/test_plugins.py index d51c4781c..b438d6808 100644 --- a/tests/test_plugins.py +++ b/tests/test_plugins.py @@ -6,6 +6,7 @@ import pytest +import temporalio.bridge.temporal_sdk_bridge import temporalio.client import temporalio.converter import temporalio.worker @@ -177,6 +178,35 @@ async def test_worker_plugin_basic_config(client: Client) -> None: ] +async def test_worker_plugin_names_forwarded_to_core( + client: Client, monkeypatch: pytest.MonkeyPatch +) -> None: + captured_plugins: list[str] = [] + + original_new_worker = temporalio.bridge.temporal_sdk_bridge.new_worker + + def new_worker_wrapper(runtime_ref, client_ref, config): + nonlocal captured_plugins + captured_plugins = list(config.plugins) + return original_new_worker(runtime_ref, client_ref, config) + + monkeypatch.setattr( + temporalio.bridge.temporal_sdk_bridge, + "new_worker", + new_worker_wrapper, + ) + + plugin1 = SimplePlugin("test-worker-plugin1") + plugin2 = SimplePlugin("test-worker-plugin2") + worker = Worker( + client, + task_queue="queue", + activities=[never_run_activity], + plugins=[plugin1, plugin2], + ) + assert captured_plugins == [plugin1.name(), plugin2.name()] + + async def test_worker_duplicated_plugin(client: Client) -> None: new_config = client.config() new_config["plugins"] = [MyCombinedPlugin()] From 692e0e39ffb1091115e5c60825a76d7254c509bd Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Tue, 21 Oct 2025 14:34:10 -0700 Subject: [PATCH 11/16] missed a spot for poe lint in CI --- .github/workflows/ci.yml | 2 +- tests/test_plugins.py | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 56bdf131c..60c350d95 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -153,8 +153,8 @@ jobs: - run: uv tool install poethepoet - run: uv lock --upgrade - run: uv sync --all-extras - - run: poe lint - run: poe build-develop + - run: poe lint - run: mkdir junit-xml - run: poe test -s --junit-xml=junit-xml/latest-deps.xml timeout-minutes: 10 diff --git a/tests/test_plugins.py b/tests/test_plugins.py index b438d6808..f4b98397a 100644 --- a/tests/test_plugins.py +++ b/tests/test_plugins.py @@ -1,6 +1,7 @@ import dataclasses import uuid import warnings +from collections import Counter from contextlib import AbstractAsyncContextManager, asynccontextmanager from typing import AsyncIterator, Awaitable, Callable, Optional, cast @@ -204,7 +205,8 @@ def new_worker_wrapper(runtime_ref, client_ref, config): activities=[never_run_activity], plugins=[plugin1, plugin2], ) - assert captured_plugins == [plugin1.name(), plugin2.name()] + # Use counter to compare unordered lists + assert Counter(captured_plugins) == Counter([plugin1.name(), plugin2.name()]) async def test_worker_duplicated_plugin(client: Client) -> None: From 0bc7bce6776d9868e710a18e599ea9388048cdfc Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Wed, 22 Oct 2025 11:11:02 -0700 Subject: [PATCH 12/16] Default is 60, not 30 --- temporalio/bridge/runtime.py | 2 +- temporalio/runtime.py | 2 +- tests/test_runtime.py | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/temporalio/bridge/runtime.py b/temporalio/bridge/runtime.py index b08b97584..3583c1d46 100644 --- a/temporalio/bridge/runtime.py +++ b/temporalio/bridge/runtime.py @@ -96,7 +96,7 @@ class RuntimeOptions: """Python representation of the Rust struct for runtime options.""" telemetry: TelemetryConfig - worker_heartbeat_interval_millis: Optional[int] = 30_000 # 30s + worker_heartbeat_interval_millis: Optional[int] = 60_000 # 60s # WARNING: This must match Rust runtime::BufferedLogEntry diff --git a/temporalio/runtime.py b/temporalio/runtime.py index 376794d6c..3047f0b51 100644 --- a/temporalio/runtime.py +++ b/temporalio/runtime.py @@ -74,7 +74,7 @@ def __init__( self, *, telemetry: TelemetryConfig, - worker_heartbeat_interval: Optional[timedelta] = timedelta(seconds=30), + worker_heartbeat_interval: Optional[timedelta] = timedelta(seconds=60), ) -> None: """Create a runtime with the provided configuration. diff --git a/tests/test_runtime.py b/tests/test_runtime.py index 5677eb040..98b0f884a 100644 --- a/tests/test_runtime.py +++ b/tests/test_runtime.py @@ -265,13 +265,13 @@ async def check_metrics() -> None: def test_runtime_options_to_bridge_config() -> None: runtime = Runtime(telemetry=TelemetryConfig()) - assert runtime._heartbeat_millis == 30_000 + assert runtime._heartbeat_millis == 60_000 runtime = Runtime( telemetry=TelemetryConfig(), - worker_heartbeat_interval=timedelta(seconds=60), + worker_heartbeat_interval=timedelta(seconds=10), ) - assert runtime._heartbeat_millis == 60_000 + assert runtime._heartbeat_millis == 10_000 runtime = Runtime( telemetry=TelemetryConfig(), From 18873a5854bfff2de59da02bc9d16689c6f07562 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Wed, 12 Nov 2025 12:13:33 -0800 Subject: [PATCH 13/16] update core to 850db67 --- temporalio/bridge/sdk-core | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/temporalio/bridge/sdk-core b/temporalio/bridge/sdk-core index 9e9a46191..850db67c8 160000 --- a/temporalio/bridge/sdk-core +++ b/temporalio/bridge/sdk-core @@ -1 +1 @@ -Subproject commit 9e9a46191656fc9ccd95589dac3552410561d620 +Subproject commit 850db67c87ac9208da53df1cd82f8a36d71c5227 From e83a260f12eae34b43ec514f8a285b5297654c0e Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Wed, 12 Nov 2025 12:45:18 -0800 Subject: [PATCH 14/16] Remove exposure of skip_client_worker_set_check --- temporalio/bridge/src/worker.rs | 2 -- temporalio/bridge/worker.py | 1 - temporalio/worker/_replayer.py | 1 - temporalio/worker/_worker.py | 7 ------- tests/conftest.py | 12 ------------ 5 files changed, 23 deletions(-) diff --git a/temporalio/bridge/src/worker.rs b/temporalio/bridge/src/worker.rs index 0a5deb928..7ce8888a8 100644 --- a/temporalio/bridge/src/worker.rs +++ b/temporalio/bridge/src/worker.rs @@ -65,7 +65,6 @@ pub struct WorkerConfig { nondeterminism_as_workflow_fail_for_types: HashSet, nexus_task_poller_behavior: PollerBehavior, plugins: Vec, - skip_client_worker_set_check: bool, } #[derive(FromPyObject)] @@ -735,7 +734,6 @@ fn convert_worker_config( }) .collect::>(), ) - .skip_client_worker_set_check(conf.skip_client_worker_set_check) .build() .map_err(|err| PyValueError::new_err(format!("Invalid worker config: {err}"))) } diff --git a/temporalio/bridge/worker.py b/temporalio/bridge/worker.py index d82f07b14..fbcfc9c3c 100644 --- a/temporalio/bridge/worker.py +++ b/temporalio/bridge/worker.py @@ -65,7 +65,6 @@ class WorkerConfig: nondeterminism_as_workflow_fail_for_types: Set[str] nexus_task_poller_behavior: PollerBehavior plugins: Sequence[str] - skip_client_worker_set_check: bool @dataclass diff --git a/temporalio/worker/_replayer.py b/temporalio/worker/_replayer.py index fae1e1c05..df5a4087c 100644 --- a/temporalio/worker/_replayer.py +++ b/temporalio/worker/_replayer.py @@ -294,7 +294,6 @@ def on_eviction_hook( 1 ), plugins=[plugin.name() for plugin in self.plugins], - skip_client_worker_set_check=True, ), ) # Start worker diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index b56b0d488..d67925600 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -148,7 +148,6 @@ def __init__( nexus_task_poller_behavior: PollerBehavior = PollerBehaviorSimpleMaximum( maximum=5 ), - skip_client_worker_set_check: bool = False, ) -> None: """Create a worker to process workflows and/or activities. @@ -323,9 +322,6 @@ def __init__( Defaults to a 5-poller maximum. nexus_task_poller_behavior: Specify the behavior of Nexus task polling. Defaults to a 5-poller maximum. - skip_client_worker_set_check: Skip the runtime validation that ensures - the client is registered with the worker set. This should only be - used in tests. """ config = WorkerConfig( client=client, @@ -368,7 +364,6 @@ def __init__( workflow_task_poller_behavior=workflow_task_poller_behavior, activity_task_poller_behavior=activity_task_poller_behavior, nexus_task_poller_behavior=nexus_task_poller_behavior, - skip_client_worker_set_check=skip_client_worker_set_check, ) plugins_from_client = cast( @@ -620,7 +615,6 @@ def check_activity(activity): "nexus_task_poller_behavior" ]._to_bridge(), plugins=plugins, - skip_client_worker_set_check=config["skip_client_worker_set_check"], ), ) @@ -915,7 +909,6 @@ class WorkerConfig(TypedDict, total=False): activity_task_poller_behavior: PollerBehavior nexus_task_poller_behavior: PollerBehavior plugins: Sequence[Plugin] - skip_client_worker_set_check: bool def _warn_if_activity_executor_max_workers_is_inconsistent( diff --git a/tests/conftest.py b/tests/conftest.py index 0e2156f86..177e2ab51 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -59,18 +59,6 @@ def pytest_addoption(parser): ) -@pytest.fixture(autouse=True) -def _skip_client_worker_set_check(monkeypatch): - original_init = temporalio.worker.Worker.__init__ - - def patched_init(self, *args, **kwargs): - kwargs.setdefault("skip_client_worker_set_check", True) - return original_init(self, *args, **kwargs) - - monkeypatch.setattr(temporalio.worker.Worker, "__init__", patched_init) - yield - - @pytest.fixture(scope="session") def event_loop(): loop = asyncio.get_event_loop_policy().new_event_loop() From a8c3399c715622ee9d0d806ea03d274ef020b53a Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Wed, 12 Nov 2025 13:23:59 -0800 Subject: [PATCH 15/16] Pick up core crate renames --- temporalio/bridge/Cargo.lock | 139 +++++++----------- temporalio/bridge/Cargo.toml | 11 +- temporalio/bridge/sdk-core | 2 +- temporalio/bridge/src/client.rs | 8 +- temporalio/bridge/src/client_rpc_generated.rs | 10 +- temporalio/bridge/src/envconfig.rs | 2 +- temporalio/bridge/src/metric.rs | 2 +- temporalio/bridge/src/runtime.rs | 10 +- temporalio/bridge/src/testing.rs | 2 +- temporalio/bridge/src/worker.rs | 76 +++++----- tests/worker/test_activity.py | 1 - 11 files changed, 116 insertions(+), 147 deletions(-) diff --git a/temporalio/bridge/Cargo.lock b/temporalio/bridge/Cargo.lock index b9046647e..ee5e5e668 100644 --- a/temporalio/bridge/Cargo.lock +++ b/temporalio/bridge/Cargo.lock @@ -2101,29 +2101,6 @@ version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" -[[package]] -name = "rustfsm" -version = "0.1.0" -dependencies = [ - "rustfsm_procmacro", - "rustfsm_trait", -] - -[[package]] -name = "rustfsm_procmacro" -version = "0.1.0" -dependencies = [ - "derive_more", - "proc-macro2", - "quote", - "rustfsm_trait", - "syn", -] - -[[package]] -name = "rustfsm_trait" -version = "0.1.0" - [[package]] name = "rustix" version = "1.0.8" @@ -2468,7 +2445,28 @@ dependencies = [ ] [[package]] -name = "temporal-client" +name = "temporal-sdk-bridge" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "futures", + "prost", + "pyo3", + "pyo3-async-runtimes", + "pythonize", + "temporalio-client", + "temporalio-common", + "temporalio-sdk-core", + "tokio", + "tokio-stream", + "tonic", + "tracing", + "url", +] + +[[package]] +name = "temporalio-client" version = "0.1.0" dependencies = [ "anyhow", @@ -2486,9 +2484,9 @@ dependencies = [ "hyper", "hyper-util", "parking_lot", + "rand 0.9.2", "slotmap", - "temporal-sdk-core-api", - "temporal-sdk-core-protos", + "temporalio-common", "thiserror 2.0.15", "tokio", "tonic", @@ -2499,29 +2497,45 @@ dependencies = [ ] [[package]] -name = "temporal-sdk-bridge" +name = "temporalio-common" version = "0.1.0" dependencies = [ "anyhow", "async-trait", - "futures", + "base64", + "derive_builder", + "derive_more", + "dirs", + "opentelemetry", "prost", - "pyo3", - "pyo3-async-runtimes", - "pythonize", - "temporal-client", - "temporal-sdk-core", - "temporal-sdk-core-api", - "temporal-sdk-core-protos", - "tokio", - "tokio-stream", + "prost-wkt", + "prost-wkt-types", + "rand 0.9.2", + "serde", + "serde_json", + "thiserror 2.0.15", + "toml", "tonic", + "tonic-prost", + "tonic-prost-build", "tracing", + "tracing-core", "url", + "uuid", +] + +[[package]] +name = "temporalio-macros" +version = "0.1.0" +dependencies = [ + "derive_more", + "proc-macro2", + "quote", + "syn", ] [[package]] -name = "temporal-sdk-core" +name = "temporalio-sdk-core" version = "0.1.0" dependencies = [ "anyhow", @@ -2557,16 +2571,15 @@ dependencies = [ "rand 0.9.2", "reqwest", "ringbuf", - "rustfsm", "serde", "serde_json", "siphasher", "slotmap", "sysinfo", "tar", - "temporal-client", - "temporal-sdk-core-api", - "temporal-sdk-core-protos", + "temporalio-client", + "temporalio-common", + "temporalio-macros", "thiserror 2.0.15", "tokio", "tokio-stream", @@ -2579,48 +2592,6 @@ dependencies = [ "zip", ] -[[package]] -name = "temporal-sdk-core-api" -version = "0.1.0" -dependencies = [ - "async-trait", - "derive_builder", - "derive_more", - "dirs", - "opentelemetry", - "prost", - "serde", - "serde_json", - "temporal-sdk-core-protos", - "thiserror 2.0.15", - "toml", - "tonic", - "tracing", - "tracing-core", - "url", - "uuid", -] - -[[package]] -name = "temporal-sdk-core-protos" -version = "0.1.0" -dependencies = [ - "anyhow", - "base64", - "derive_more", - "prost", - "prost-wkt", - "prost-wkt-types", - "rand 0.9.2", - "serde", - "serde_json", - "thiserror 2.0.15", - "tonic", - "tonic-prost", - "tonic-prost-build", - "uuid", -] - [[package]] name = "termtree" version = "0.5.1" diff --git a/temporalio/bridge/Cargo.toml b/temporalio/bridge/Cargo.toml index 1da8d5d2e..b2d186b5e 100644 --- a/temporalio/bridge/Cargo.toml +++ b/temporalio/bridge/Cargo.toml @@ -28,14 +28,13 @@ pyo3 = { version = "0.25", features = [ ] } pyo3-async-runtimes = { version = "0.25", features = ["tokio-runtime"] } pythonize = "0.25" -temporal-client = { version = "0.1.0", path = "./sdk-core/client" } -temporal-sdk-core = { version = "0.1.0", path = "./sdk-core/core", features = [ - "ephemeral-server", -] } -temporal-sdk-core-api = { version = "0.1.0", path = "./sdk-core/core-api", features = [ +temporalio-client = { version = "0.1.0", path = "./sdk-core/crates/client" } +temporalio-common = { version = "0.1.0", path = "./sdk-core/crates/common", features = [ "envconfig", +]} +temporalio-sdk-core = { version = "0.1.0", path = "./sdk-core/crates/sdk-core", features = [ + "ephemeral-server", ] } -temporal-sdk-core-protos = { version = "0.1.0", path = "./sdk-core/sdk-core-protos" } tokio = "1.26" tokio-stream = "0.1" tonic = "0.14" diff --git a/temporalio/bridge/sdk-core b/temporalio/bridge/sdk-core index 850db67c8..6dbe4449d 160000 --- a/temporalio/bridge/sdk-core +++ b/temporalio/bridge/sdk-core @@ -1 +1 @@ -Subproject commit 850db67c87ac9208da53df1cd82f8a36d71c5227 +Subproject commit 6dbe4449d75dbeac31061ab8a70725fc3f947c70 diff --git a/temporalio/bridge/src/client.rs b/temporalio/bridge/src/client.rs index 68ebb1cb4..f760f2ec6 100644 --- a/temporalio/bridge/src/client.rs +++ b/temporalio/bridge/src/client.rs @@ -3,12 +3,12 @@ use pyo3::prelude::*; use std::collections::HashMap; use std::str::FromStr; use std::time::Duration; -use temporal_client::{ +use temporalio_client::{ ClientKeepAliveConfig as CoreClientKeepAliveConfig, ClientOptions, ClientOptionsBuilder, ConfiguredClient, HttpConnectProxyOptions, RetryClient, RetryConfig, TlsConfig, TemporalServiceClient, }; -use temporal_client::tonic::{ +use temporalio_client::tonic::{ self, metadata::{AsciiMetadataKey, AsciiMetadataValue, BinaryMetadataKey, BinaryMetadataValue}, }; @@ -258,7 +258,7 @@ impl TryFrom for ClientOptions { } } -impl TryFrom for temporal_client::TlsConfig { +impl TryFrom for temporalio_client::TlsConfig { type Error = PyErr; fn try_from(conf: ClientTlsConfig) -> PyResult { @@ -268,7 +268,7 @@ impl TryFrom for temporal_client::TlsConfig { client_tls_config: match (conf.client_cert, conf.client_private_key) { (None, None) => None, (Some(client_cert), Some(client_private_key)) => { - Some(temporal_client::ClientTlsConfig { + Some(temporalio_client::ClientTlsConfig { client_cert, client_private_key, }) diff --git a/temporalio/bridge/src/client_rpc_generated.rs b/temporalio/bridge/src/client_rpc_generated.rs index 659f5d8cf..e9ab11186 100644 --- a/temporalio/bridge/src/client_rpc_generated.rs +++ b/temporalio/bridge/src/client_rpc_generated.rs @@ -15,7 +15,7 @@ impl ClientRef { py: Python<'p>, call: RpcCall, ) -> PyResult> { - use temporal_client::WorkflowService; + use temporalio_client::WorkflowService; let mut retry_client = self.retry_client.clone(); self.runtime.future_into_py(py, async move { let bytes = match call.rpc.as_str() { @@ -566,7 +566,7 @@ impl ClientRef { py: Python<'p>, call: RpcCall, ) -> PyResult> { - use temporal_client::OperatorService; + use temporalio_client::OperatorService; let mut retry_client = self.retry_client.clone(); self.runtime.future_into_py(py, async move { let bytes = match call.rpc.as_str() { @@ -628,7 +628,7 @@ impl ClientRef { } fn call_cloud_service<'p>(&self, py: Python<'p>, call: RpcCall) -> PyResult> { - use temporal_client::CloudService; + use temporalio_client::CloudService; let mut retry_client = self.retry_client.clone(); self.runtime.future_into_py(py, async move { let bytes = match call.rpc.as_str() { @@ -842,7 +842,7 @@ impl ClientRef { } fn call_test_service<'p>(&self, py: Python<'p>, call: RpcCall) -> PyResult> { - use temporal_client::TestService; + use temporalio_client::TestService; let mut retry_client = self.retry_client.clone(); self.runtime.future_into_py(py, async move { let bytes = match call.rpc.as_str() { @@ -881,7 +881,7 @@ impl ClientRef { } fn call_health_service<'p>(&self, py: Python<'p>, call: RpcCall) -> PyResult> { - use temporal_client::HealthService; + use temporalio_client::HealthService; let mut retry_client = self.retry_client.clone(); self.runtime.future_into_py(py, async move { let bytes = match call.rpc.as_str() { diff --git a/temporalio/bridge/src/envconfig.rs b/temporalio/bridge/src/envconfig.rs index 9277a4588..fb0da290c 100644 --- a/temporalio/bridge/src/envconfig.rs +++ b/temporalio/bridge/src/envconfig.rs @@ -4,7 +4,7 @@ use pyo3::{ types::{PyBytes, PyDict}, }; use std::collections::HashMap; -use temporal_sdk_core_api::envconfig::{ +use temporalio_common::envconfig::{ load_client_config as core_load_client_config, load_client_config_profile as core_load_client_config_profile, ClientConfig as CoreClientConfig, ClientConfigCodec, diff --git a/temporalio/bridge/src/metric.rs b/temporalio/bridge/src/metric.rs index bc516a08a..0eb5afd60 100644 --- a/temporalio/bridge/src/metric.rs +++ b/temporalio/bridge/src/metric.rs @@ -4,7 +4,7 @@ use std::{collections::HashMap, sync::Arc}; use pyo3::prelude::*; use pyo3::{exceptions::PyTypeError, types::PyDict}; -use temporal_sdk_core_api::telemetry::metrics::{ +use temporalio_common::telemetry::metrics::{ self, BufferInstrumentRef, CustomMetricAttributes, MetricEvent, NewAttributes, }; diff --git a/temporalio/bridge/src/runtime.rs b/temporalio/bridge/src/runtime.rs index be77a1676..98aa9fd57 100644 --- a/temporalio/bridge/src/runtime.rs +++ b/temporalio/bridge/src/runtime.rs @@ -9,13 +9,13 @@ use std::pin::Pin; use std::str::FromStr; use std::sync::Arc; use std::time::{Duration, UNIX_EPOCH}; -use temporal_sdk_core::telemetry::{ +use temporalio_sdk_core::telemetry::{ build_otlp_metric_exporter, start_prometheus_metric_exporter, CoreLogStreamConsumer, MetricsCallBuffer, }; -use temporal_sdk_core::{CoreRuntime, RuntimeOptionsBuilder, TokioRuntimeBuilder}; -use temporal_sdk_core_api::telemetry::metrics::{CoreMeter, MetricCallBufferer}; -use temporal_sdk_core_api::telemetry::{ +use temporalio_sdk_core::{CoreRuntime, RuntimeOptionsBuilder, TokioRuntimeBuilder}; +use temporalio_common::telemetry::metrics::{CoreMeter, MetricCallBufferer}; +use temporalio_common::telemetry::{ CoreLog, Logger, MetricTemporality, OtelCollectorOptionsBuilder, OtlpProtocol, PrometheusExporterOptionsBuilder, TelemetryOptionsBuilder, }; @@ -369,7 +369,7 @@ impl TryFrom for Arc { } if let Some(overrides) = prom_conf.histogram_bucket_overrides { build.histogram_bucket_overrides( - temporal_sdk_core_api::telemetry::HistogramBucketOverrides { overrides }, + temporalio_common::telemetry::HistogramBucketOverrides { overrides }, ); } let prom_options = build.build().map_err(|err| { diff --git a/temporalio/bridge/src/testing.rs b/temporalio/bridge/src/testing.rs index 04eea1286..5df3ee24d 100644 --- a/temporalio/bridge/src/testing.rs +++ b/temporalio/bridge/src/testing.rs @@ -2,7 +2,7 @@ use std::time::Duration; use pyo3::exceptions::{PyRuntimeError, PyValueError}; use pyo3::prelude::*; -use temporal_sdk_core::ephemeral_server; +use temporalio_sdk_core::ephemeral_server; use crate::runtime; diff --git a/temporalio/bridge/src/worker.rs b/temporalio/bridge/src/worker.rs index 7ce8888a8..acf260bc1 100644 --- a/temporalio/bridge/src/worker.rs +++ b/temporalio/bridge/src/worker.rs @@ -10,20 +10,20 @@ use std::collections::HashSet; use std::marker::PhantomData; use std::sync::{Arc, OnceLock}; use std::time::Duration; -use temporal_sdk_core::api::errors::PollError; -use temporal_sdk_core::replay::{HistoryForReplay, ReplayWorkerInput}; -use temporal_sdk_core_api::errors::WorkflowErrorType; -use temporal_sdk_core_api::worker::{ +use temporalio_common::errors::PollError; +use temporalio_sdk_core::replay::{HistoryForReplay, ReplayWorkerInput}; +use temporalio_common::errors::WorkflowErrorType; +use temporalio_common::worker::{ SlotInfo, SlotInfoTrait, SlotKind, SlotKindType, SlotMarkUsedContext, SlotReleaseContext, SlotReservationContext, SlotSupplier as SlotSupplierTrait, SlotSupplierPermit, }; -use temporal_sdk_core_api::Worker; -use temporal_sdk_core_protos::coresdk::workflow_completion::WorkflowActivationCompletion; -use temporal_sdk_core_protos::coresdk::{ +use temporalio_common::Worker; +use temporalio_common::protos::coresdk::workflow_completion::WorkflowActivationCompletion; +use temporalio_common::protos::coresdk::{ nexus::NexusTaskCompletion, ActivityHeartbeat, ActivityTaskCompletion, }; -use temporal_sdk_core_protos::temporal::api::history::v1::History; -use temporal_sdk_core_protos::temporal::api::worker::v1::PluginInfo; +use temporalio_common::protos::temporal::api::history::v1::History; +use temporalio_common::protos::temporal::api::worker::v1::PluginInfo; use tokio::sync::mpsc::{channel, Sender}; use tokio_stream::wrappers::ReceiverStream; use tracing::error; @@ -35,7 +35,7 @@ pyo3::create_exception!(temporal_sdk_bridge, PollShutdownError, PyException); #[pyclass] pub struct WorkerRef { - worker: Option>, + worker: Option>, /// Set upon the call to `validate`, with the task locals for the event loop at that time, which /// is whatever event loop the user is running their worker in. This loop might be needed by /// other rust-created threads that want to run async python code. @@ -79,21 +79,21 @@ pub struct PollerBehaviorAutoscaling { pub initial: usize, } -/// Recreates [temporal_sdk_core_api::worker::PollerBehavior] +/// Recreates [temporalio_common::worker::PollerBehavior] #[derive(FromPyObject)] pub enum PollerBehavior { SimpleMaximum(PollerBehaviorSimpleMaximum), Autoscaling(PollerBehaviorAutoscaling), } -impl From for temporal_sdk_core_api::worker::PollerBehavior { +impl From for temporalio_common::worker::PollerBehavior { fn from(value: PollerBehavior) -> Self { match value { PollerBehavior::SimpleMaximum(simple) => { - temporal_sdk_core_api::worker::PollerBehavior::SimpleMaximum(simple.simple_maximum) + temporalio_common::worker::PollerBehavior::SimpleMaximum(simple.simple_maximum) } PollerBehavior::Autoscaling(auto) => { - temporal_sdk_core_api::worker::PollerBehavior::Autoscaling { + temporalio_common::worker::PollerBehavior::Autoscaling { minimum: auto.minimum, maximum: auto.maximum, initial: auto.initial, @@ -103,7 +103,7 @@ impl From for temporal_sdk_core_api::worker::PollerBehavior { } } -/// Recreates [temporal_sdk_core_api::worker::WorkerVersioningStrategy] +/// Recreates [temporalio_common::worker::WorkerVersioningStrategy] #[derive(FromPyObject)] pub enum WorkerVersioningStrategy { None(WorkerVersioningNone), @@ -116,7 +116,7 @@ pub struct WorkerVersioningNone { pub build_id_no_versioning: String, } -/// Recreates [temporal_sdk_core_api::worker::WorkerDeploymentOptions] +/// Recreates [temporalio_common::worker::WorkerDeploymentOptions] #[derive(FromPyObject)] pub struct WorkerDeploymentOptions { pub version: WorkerDeploymentVersion, @@ -130,15 +130,15 @@ pub struct LegacyBuildIdBased { pub build_id_with_versioning: String, } -/// Recreates [temporal_sdk_core_api::worker::WorkerDeploymentVersion] +/// Recreates [temporalio_common::worker::WorkerDeploymentVersion] #[derive(FromPyObject, IntoPyObject, Clone)] pub struct WorkerDeploymentVersion { pub deployment_name: String, pub build_id: String, } -impl From for WorkerDeploymentVersion { - fn from(version: temporal_sdk_core_api::worker::WorkerDeploymentVersion) -> Self { +impl From for WorkerDeploymentVersion { + fn from(version: temporalio_common::worker::WorkerDeploymentVersion) -> Self { WorkerDeploymentVersion { deployment_name: version.deployment_name, build_id: version.build_id, @@ -464,7 +464,7 @@ pub struct ResourceBasedTunerConfig { macro_rules! enter_sync { ($runtime:expr) => { if let Some(subscriber) = $runtime.core.telemetry().trace_subscriber() { - temporal_sdk_core::telemetry::set_trace_subscriber_for_current_thread(subscriber); + temporalio_sdk_core::telemetry::set_trace_subscriber_for_current_thread(subscriber); } let _guard = $runtime.core.tokio_handle().enter(); }; @@ -478,7 +478,7 @@ pub fn new_worker( enter_sync!(runtime_ref.runtime); let event_loop_task_locals = Arc::new(OnceLock::new()); let config = convert_worker_config(config, event_loop_task_locals.clone())?; - let worker = temporal_sdk_core::init_worker( + let worker = temporalio_sdk_core::init_worker( &runtime_ref.runtime.core, config, client.retry_client.clone().into_inner(), @@ -502,7 +502,7 @@ pub fn new_replay_worker<'a>( let (history_pusher, stream) = HistoryPusher::new(runtime_ref.runtime.clone()); let worker = WorkerRef { worker: Some(Arc::new( - temporal_sdk_core::init_replay_worker(ReplayWorkerInput::new(config, stream)).map_err( + temporalio_sdk_core::init_replay_worker(ReplayWorkerInput::new(config, stream)).map_err( |err| PyValueError::new_err(format!("Failed creating replay worker: {err}")), )?, )), @@ -679,10 +679,10 @@ impl WorkerRef { fn convert_worker_config( conf: WorkerConfig, task_locals: Arc>, -) -> PyResult { +) -> PyResult { let converted_tuner = convert_tuner_holder(conf.tuner, task_locals)?; let converted_versioning_strategy = convert_versioning_strategy(conf.versioning_strategy); - temporal_sdk_core::WorkerConfigBuilder::default() + temporalio_sdk_core::WorkerConfigBuilder::default() .namespace(conf.namespace) .task_queue(conf.task_queue) .versioning_strategy(converted_versioning_strategy) @@ -741,7 +741,7 @@ fn convert_worker_config( fn convert_tuner_holder( holder: TunerHolder, task_locals: Arc>, -) -> PyResult { +) -> PyResult { // Verify all resource-based options are the same if any are set let maybe_wf_resource_opts = if let SlotSupplier::ResourceBased(ref ss) = holder.workflow_slot_supplier { @@ -786,10 +786,10 @@ fn convert_tuner_holder( )); } - let mut options = temporal_sdk_core::TunerHolderOptionsBuilder::default(); + let mut options = temporalio_sdk_core::TunerHolderOptionsBuilder::default(); if let Some(first) = first { options.resource_based_options( - temporal_sdk_core::ResourceBasedSlotsOptionsBuilder::default() + temporalio_sdk_core::ResourceBasedSlotsOptionsBuilder::default() .target_mem_usage(first.target_memory_usage) .target_cpu_usage(first.target_cpu_usage) .build() @@ -823,19 +823,19 @@ fn convert_tuner_holder( fn convert_slot_supplier( supplier: SlotSupplier, task_locals: Arc>, -) -> PyResult> { +) -> PyResult> { Ok(match supplier { - SlotSupplier::FixedSize(fs) => temporal_sdk_core::SlotSupplierOptions::FixedSize { + SlotSupplier::FixedSize(fs) => temporalio_sdk_core::SlotSupplierOptions::FixedSize { slots: fs.num_slots, }, - SlotSupplier::ResourceBased(ss) => temporal_sdk_core::SlotSupplierOptions::ResourceBased( - temporal_sdk_core::ResourceSlotOptions::new( + SlotSupplier::ResourceBased(ss) => temporalio_sdk_core::SlotSupplierOptions::ResourceBased( + temporalio_sdk_core::ResourceSlotOptions::new( ss.minimum_slots, ss.maximum_slots, Duration::from_millis(ss.ramp_throttle_ms), ), ), - SlotSupplier::Custom(cs) => temporal_sdk_core::SlotSupplierOptions::Custom(Arc::new( + SlotSupplier::Custom(cs) => temporalio_sdk_core::SlotSupplierOptions::Custom(Arc::new( CustomSlotSupplierOfType:: { inner: cs.inner, event_loop_task_locals: task_locals, @@ -847,17 +847,17 @@ fn convert_slot_supplier( fn convert_versioning_strategy( strategy: WorkerVersioningStrategy, -) -> temporal_sdk_core_api::worker::WorkerVersioningStrategy { +) -> temporalio_common::worker::WorkerVersioningStrategy { match strategy { WorkerVersioningStrategy::None(vn) => { - temporal_sdk_core_api::worker::WorkerVersioningStrategy::None { + temporalio_common::worker::WorkerVersioningStrategy::None { build_id: vn.build_id_no_versioning, } } WorkerVersioningStrategy::DeploymentBased(options) => { - temporal_sdk_core_api::worker::WorkerVersioningStrategy::WorkerDeploymentBased( - temporal_sdk_core_api::worker::WorkerDeploymentOptions { - version: temporal_sdk_core_api::worker::WorkerDeploymentVersion { + temporalio_common::worker::WorkerVersioningStrategy::WorkerDeploymentBased( + temporalio_common::worker::WorkerDeploymentOptions { + version: temporalio_common::worker::WorkerDeploymentVersion { deployment_name: options.version.deployment_name, build_id: options.version.build_id, }, @@ -872,7 +872,7 @@ fn convert_versioning_strategy( ) } WorkerVersioningStrategy::LegacyBuildIdBased(lb) => { - temporal_sdk_core_api::worker::WorkerVersioningStrategy::LegacyBuildIdBased { + temporalio_common::worker::WorkerVersioningStrategy::LegacyBuildIdBased { build_id: lb.build_id_with_versioning, } } diff --git a/tests/worker/test_activity.py b/tests/worker/test_activity.py index bf5956fed..203b89a5a 100644 --- a/tests/worker/test_activity.py +++ b/tests/worker/test_activity.py @@ -1370,7 +1370,6 @@ async def _execute_workflow_with_activity( worker_config["task_queue"] = str(uuid.uuid4()) worker_config["activities"] = [fn] + additional_activities worker_config["shared_state_manager"] = _default_shared_state_manager - worker_config["skip_client_worker_set_check"] = True if not worker_config.get("max_concurrent_activities"): worker_config["max_concurrent_activities"] = default_max_concurrent_activities async with Worker(**worker_config): From a4c50eef341dd9cbd9729c4be5ae8c5c5586d29f Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Wed, 12 Nov 2025 13:26:08 -0800 Subject: [PATCH 16/16] fix core commit --- temporalio/bridge/sdk-core | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/temporalio/bridge/sdk-core b/temporalio/bridge/sdk-core index 6dbe4449d..850db67c8 160000 --- a/temporalio/bridge/sdk-core +++ b/temporalio/bridge/sdk-core @@ -1 +1 @@ -Subproject commit 6dbe4449d75dbeac31061ab8a70725fc3f947c70 +Subproject commit 850db67c87ac9208da53df1cd82f8a36d71c5227