diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e24187c5f..60c350d95 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -62,8 +62,8 @@ jobs: - run: uv sync --all-extras - run: poe bridge-lint if: ${{ matrix.clippyLinter }} - - run: poe lint - run: poe build-develop + - run: poe lint - run: mkdir junit-xml - run: poe test ${{matrix.pytestExtraArgs}} -s --junit-xml=junit-xml/${{ matrix.python }}--${{ matrix.os }}.xml timeout-minutes: 15 @@ -153,8 +153,8 @@ jobs: - run: uv tool install poethepoet - run: uv lock --upgrade - run: uv sync --all-extras - - run: poe lint - run: poe build-develop + - run: poe lint - run: mkdir junit-xml - run: poe test -s --junit-xml=junit-xml/latest-deps.xml timeout-minutes: 10 diff --git a/temporalio/bridge/Cargo.lock b/temporalio/bridge/Cargo.lock index b40d5fc72..ee5e5e668 100644 --- a/temporalio/bridge/Cargo.lock +++ b/temporalio/bridge/Cargo.lock @@ -412,6 +412,12 @@ version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" +[[package]] +name = "dyn-clone" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555" + [[package]] name = "either" version = "1.15.0" @@ -1328,9 +1334,9 @@ checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" [[package]] name = "opentelemetry" -version = "0.30.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aaf416e4cb72756655126f7dd7bb0af49c674f4c1b9903e80c009e0c37e552e6" +checksum = "b84bcd6ae87133e903af7ef497404dda70c60d0ea14895fc8a5e6722754fc2a0" dependencies = [ "futures-core", "futures-sink", @@ -1342,9 +1348,9 @@ dependencies = [ [[package]] name = "opentelemetry-http" -version = "0.30.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50f6639e842a97dbea8886e3439710ae463120091e2e064518ba8e716e6ac36d" +checksum = "d7a6d09a73194e6b66df7c8f1b680f156d916a1a942abf2de06823dd02b7855d" dependencies = [ "async-trait", "bytes", @@ -1355,9 +1361,9 @@ dependencies = [ [[package]] name = "opentelemetry-otlp" -version = "0.30.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dbee664a43e07615731afc539ca60c6d9f1a9425e25ca09c57bc36c87c55852b" +checksum = "7a2366db2dca4d2ad033cad11e6ee42844fd727007af5ad04a1730f4cb8163bf" dependencies = [ "http", "opentelemetry", @@ -1374,21 +1380,22 @@ dependencies = [ [[package]] name = "opentelemetry-proto" -version = "0.30.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e046fd7660710fe5a05e8748e70d9058dc15c94ba914e7c4faa7c728f0e8ddc" +checksum = "a7175df06de5eaee9909d4805a3d07e28bb752c34cab57fa9cff549da596b30f" dependencies = [ "opentelemetry", "opentelemetry_sdk", "prost", "tonic", + "tonic-prost", ] [[package]] name = "opentelemetry_sdk" -version = "0.30.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11f644aa9e5e31d11896e024305d7e3c98a88884d9f8919dbf37a9991bc47a4b" +checksum = "e14ae4f5991976fd48df6d843de219ca6d31b01daaab2dad5af2badeded372bd" dependencies = [ "futures-channel", "futures-executor", @@ -1396,7 +1403,6 @@ dependencies = [ "opentelemetry", "percent-encoding", "rand 0.9.2", - "serde_json", "thiserror 2.0.15", "tokio", "tokio-stream", @@ -1589,9 +1595,9 @@ dependencies = [ [[package]] name = "prost" -version = "0.13.5" +version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" +checksum = "7231bd9b3d3d33c86b58adbac74b5ec0ad9f496b19d22801d773636feaa95f3d" dependencies = [ "bytes", "prost-derive", @@ -1599,9 +1605,9 @@ dependencies = [ [[package]] name = "prost-build" -version = "0.13.5" +version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf" +checksum = "ac6c3320f9abac597dcbc668774ef006702672474aad53c6d596b62e487b40b1" dependencies = [ "heck", "itertools", @@ -1612,6 +1618,8 @@ dependencies = [ "prettyplease", "prost", "prost-types", + "pulldown-cmark", + "pulldown-cmark-to-cmark", "regex", "syn", "tempfile", @@ -1619,9 +1627,9 @@ dependencies = [ [[package]] name = "prost-derive" -version = "0.13.5" +version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" +checksum = "9120690fafc389a67ba3803df527d0ec9cbbc9cc45e4cc20b332996dfb672425" dependencies = [ "anyhow", "itertools", @@ -1632,18 +1640,18 @@ dependencies = [ [[package]] name = "prost-types" -version = "0.13.5" +version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16" +checksum = "b9b4db3d6da204ed77bb26ba83b6122a73aeb2e87e25fbf7ad2e84c4ccbf8f72" dependencies = [ "prost", ] [[package]] name = "prost-wkt" -version = "0.6.1" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "497e1e938f0c09ef9cabe1d49437b4016e03e8f82fbbe5d1c62a9b61b9decae1" +checksum = "655944d0ce015e71b3ec21279437e6a09e58433e50c7b0677901f3d5235e74f5" dependencies = [ "chrono", "inventory", @@ -1656,9 +1664,9 @@ dependencies = [ [[package]] name = "prost-wkt-build" -version = "0.6.1" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07b8bf115b70a7aa5af1fd5d6e9418492e9ccb6e4785e858c938e28d132a884b" +checksum = "f869f1443fee474b785e935d92e1007f57443e485f51668ed41943fc01a321a2" dependencies = [ "heck", "prost", @@ -1669,9 +1677,9 @@ dependencies = [ [[package]] name = "prost-wkt-types" -version = "0.6.1" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8cdde6df0a98311c839392ca2f2f0bcecd545f86a62b4e3c6a49c336e970fe5" +checksum = "eeeffd6b9becd4600dd461399f3f71aeda2ff0848802a9ed526cf12e8f42902a" dependencies = [ "chrono", "prost", @@ -1705,6 +1713,26 @@ dependencies = [ "thiserror 1.0.69", ] +[[package]] +name = "pulldown-cmark" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e8bbe1a966bd2f362681a44f6edce3c2310ac21e4d5067a6e7ec396297a6ea0" +dependencies = [ + "bitflags", + "memchr", + "unicase", +] + +[[package]] +name = "pulldown-cmark-to-cmark" +version = "21.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5b6a0769a491a08b31ea5c62494a8f144ee0987d86d670a8af4df1e1b7cde75" +dependencies = [ + "pulldown-cmark", +] + [[package]] name = "pyo3" version = "0.25.1" @@ -2073,29 +2101,6 @@ version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" -[[package]] -name = "rustfsm" -version = "0.1.0" -dependencies = [ - "rustfsm_procmacro", - "rustfsm_trait", -] - -[[package]] -name = "rustfsm_procmacro" -version = "0.1.0" -dependencies = [ - "derive_more", - "proc-macro2", - "quote", - "rustfsm_trait", - "syn", -] - -[[package]] -name = "rustfsm_trait" -version = "0.1.0" - [[package]] name = "rustix" version = "1.0.8" @@ -2440,7 +2445,28 @@ dependencies = [ ] [[package]] -name = "temporal-client" +name = "temporal-sdk-bridge" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "futures", + "prost", + "pyo3", + "pyo3-async-runtimes", + "pythonize", + "temporalio-client", + "temporalio-common", + "temporalio-sdk-core", + "tokio", + "tokio-stream", + "tonic", + "tracing", + "url", +] + +[[package]] +name = "temporalio-client" version = "0.1.0" dependencies = [ "anyhow", @@ -2450,6 +2476,7 @@ dependencies = [ "bytes", "derive_builder", "derive_more", + "dyn-clone", "futures-retry", "futures-util", "http", @@ -2457,9 +2484,9 @@ dependencies = [ "hyper", "hyper-util", "parking_lot", + "rand 0.9.2", "slotmap", - "temporal-sdk-core-api", - "temporal-sdk-core-protos", + "temporalio-common", "thiserror 2.0.15", "tokio", "tonic", @@ -2470,29 +2497,45 @@ dependencies = [ ] [[package]] -name = "temporal-sdk-bridge" +name = "temporalio-common" version = "0.1.0" dependencies = [ "anyhow", "async-trait", - "futures", + "base64", + "derive_builder", + "derive_more", + "dirs", + "opentelemetry", "prost", - "pyo3", - "pyo3-async-runtimes", - "pythonize", - "temporal-client", - "temporal-sdk-core", - "temporal-sdk-core-api", - "temporal-sdk-core-protos", - "tokio", - "tokio-stream", + "prost-wkt", + "prost-wkt-types", + "rand 0.9.2", + "serde", + "serde_json", + "thiserror 2.0.15", + "toml", "tonic", + "tonic-prost", + "tonic-prost-build", "tracing", + "tracing-core", "url", + "uuid", +] + +[[package]] +name = "temporalio-macros" +version = "0.1.0" +dependencies = [ + "derive_more", + "proc-macro2", + "quote", + "syn", ] [[package]] -name = "temporal-sdk-core" +name = "temporalio-sdk-core" version = "0.1.0" dependencies = [ "anyhow", @@ -2528,16 +2571,15 @@ dependencies = [ "rand 0.9.2", "reqwest", "ringbuf", - "rustfsm", "serde", "serde_json", "siphasher", "slotmap", "sysinfo", "tar", - "temporal-client", - "temporal-sdk-core-api", - "temporal-sdk-core-protos", + "temporalio-client", + "temporalio-common", + "temporalio-macros", "thiserror 2.0.15", "tokio", "tokio-stream", @@ -2550,48 +2592,6 @@ dependencies = [ "zip", ] -[[package]] -name = "temporal-sdk-core-api" -version = "0.1.0" -dependencies = [ - "async-trait", - "derive_builder", - "derive_more", - "dirs", - "opentelemetry", - "prost", - "serde", - "serde_json", - "temporal-sdk-core-protos", - "thiserror 2.0.15", - "toml", - "tonic", - "tracing", - "tracing-core", - "url", -] - -[[package]] -name = "temporal-sdk-core-protos" -version = "0.1.0" -dependencies = [ - "anyhow", - "base64", - "derive_more", - "prost", - "prost-build", - "prost-wkt", - "prost-wkt-build", - "prost-wkt-types", - "rand 0.9.2", - "serde", - "serde_json", - "thiserror 2.0.15", - "tonic", - "tonic-build", - "uuid", -] - [[package]] name = "termtree" version = "0.5.1" @@ -2778,9 +2778,9 @@ checksum = "fcc842091f2def52017664b53082ecbbeb5c7731092bad69d2c63050401dfd64" [[package]] name = "tonic" -version = "0.13.1" +version = "0.14.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e581ba15a835f4d9ea06c55ab1bd4dce26fc53752c69a04aac00703bfb49ba9" +checksum = "eb7613188ce9f7df5bfe185db26c5814347d110db17920415cf2fbcad85e7203" dependencies = [ "async-trait", "axum", @@ -2795,9 +2795,9 @@ dependencies = [ "hyper-util", "percent-encoding", "pin-project", - "prost", "rustls-native-certs", - "socket2 0.5.10", + "socket2 0.6.0", + "sync_wrapper", "tokio", "tokio-rustls", "tokio-stream", @@ -2809,9 +2809,32 @@ dependencies = [ [[package]] name = "tonic-build" -version = "0.13.1" +version = "0.14.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eac6f67be712d12f0b41328db3137e0d0757645d8904b4cb7d51cd9c2279e847" +checksum = "4c40aaccc9f9eccf2cd82ebc111adc13030d23e887244bc9cfa5d1d636049de3" +dependencies = [ + "prettyplease", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tonic-prost" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66bd50ad6ce1252d87ef024b3d64fe4c3cf54a86fb9ef4c631fdd0ded7aeaa67" +dependencies = [ + "bytes", + "prost", + "tonic", +] + +[[package]] +name = "tonic-prost-build" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4a16cba4043dc3ff43fcb3f96b4c5c154c64cbd18ca8dce2ab2c6a451d058a2" dependencies = [ "prettyplease", "proc-macro2", @@ -2819,6 +2842,8 @@ dependencies = [ "prost-types", "quote", "syn", + "tempfile", + "tonic-build", ] [[package]] @@ -2955,6 +2980,12 @@ dependencies = [ "syn", ] +[[package]] +name = "unicase" +version = "2.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75b844d17643ee918803943289730bec8aac480150456169e647ed0b576ba539" + [[package]] name = "unicode-ident" version = "1.0.18" @@ -2998,9 +3029,9 @@ checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" [[package]] name = "uuid" -version = "1.18.0" +version = "1.18.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f33196643e165781c20a5ead5582283a7dacbb87855d867fbc2df3f81eddc1be" +checksum = "2f87b8aa10b915a06587d0dec516c282ff295b475d94abf425d62b57710070a2" dependencies = [ "getrandom 0.3.3", "js-sys", diff --git a/temporalio/bridge/Cargo.toml b/temporalio/bridge/Cargo.toml index ad839c19b..b2d186b5e 100644 --- a/temporalio/bridge/Cargo.toml +++ b/temporalio/bridge/Cargo.toml @@ -19,7 +19,7 @@ crate-type = ["cdylib"] anyhow = "1.0" async-trait = "0.1" futures = "0.3" -prost = "0.13" +prost = "0.14" pyo3 = { version = "0.25", features = [ "extension-module", "abi3-py310", @@ -28,17 +28,16 @@ pyo3 = { version = "0.25", features = [ ] } pyo3-async-runtimes = { version = "0.25", features = ["tokio-runtime"] } pythonize = "0.25" -temporal-client = { version = "0.1.0", path = "./sdk-core/client" } -temporal-sdk-core = { version = "0.1.0", path = "./sdk-core/core", features = [ - "ephemeral-server", -] } -temporal-sdk-core-api = { version = "0.1.0", path = "./sdk-core/core-api", features = [ +temporalio-client = { version = "0.1.0", path = "./sdk-core/crates/client" } +temporalio-common = { version = "0.1.0", path = "./sdk-core/crates/common", features = [ "envconfig", +]} +temporalio-sdk-core = { version = "0.1.0", path = "./sdk-core/crates/sdk-core", features = [ + "ephemeral-server", ] } -temporal-sdk-core-protos = { version = "0.1.0", path = "./sdk-core/sdk-core-protos" } tokio = "1.26" tokio-stream = "0.1" -tonic = "0.13" +tonic = "0.14" tracing = "0.1" url = "2.2" diff --git a/temporalio/bridge/runtime.py b/temporalio/bridge/runtime.py index afc79f0f5..3583c1d46 100644 --- a/temporalio/bridge/runtime.py +++ b/temporalio/bridge/runtime.py @@ -23,9 +23,9 @@ def _raise_in_thread(thread_id: int, exc_type: Type[BaseException]) -> bool: thread_id, exc_type ) - def __init__(self, *, telemetry: TelemetryConfig) -> None: + def __init__(self, *, options: RuntimeOptions) -> None: """Create SDK Core runtime.""" - self._ref = temporalio.bridge.temporal_sdk_bridge.init_runtime(telemetry) + self._ref = temporalio.bridge.temporal_sdk_bridge.init_runtime(options) def retrieve_buffered_metrics(self, durations_as_seconds: bool) -> Sequence[Any]: """Get buffered metrics.""" @@ -91,6 +91,14 @@ class TelemetryConfig: metrics: Optional[MetricsConfig] +@dataclass(frozen=True) +class RuntimeOptions: + """Python representation of the Rust struct for runtime options.""" + + telemetry: TelemetryConfig + worker_heartbeat_interval_millis: Optional[int] = 60_000 # 60s + + # WARNING: This must match Rust runtime::BufferedLogEntry class BufferedLogEntry(Protocol): """A buffered log entry.""" diff --git a/temporalio/bridge/sdk-core b/temporalio/bridge/sdk-core index 682d441dd..850db67c8 160000 --- a/temporalio/bridge/sdk-core +++ b/temporalio/bridge/sdk-core @@ -1 +1 @@ -Subproject commit 682d441dd3b830e1477af3edb7c2330b403c4c33 +Subproject commit 850db67c87ac9208da53df1cd82f8a36d71c5227 diff --git a/temporalio/bridge/src/client.rs b/temporalio/bridge/src/client.rs index dfbd432a1..f760f2ec6 100644 --- a/temporalio/bridge/src/client.rs +++ b/temporalio/bridge/src/client.rs @@ -3,13 +3,14 @@ use pyo3::prelude::*; use std::collections::HashMap; use std::str::FromStr; use std::time::Duration; -use temporal_client::{ +use temporalio_client::{ ClientKeepAliveConfig as CoreClientKeepAliveConfig, ClientOptions, ClientOptionsBuilder, ConfiguredClient, HttpConnectProxyOptions, RetryClient, RetryConfig, - TemporalServiceClientWithMetrics, TlsConfig, + TlsConfig, TemporalServiceClient, }; -use tonic::metadata::{ - AsciiMetadataKey, AsciiMetadataValue, BinaryMetadataKey, BinaryMetadataValue, +use temporalio_client::tonic::{ + self, + metadata::{AsciiMetadataKey, AsciiMetadataValue, BinaryMetadataKey, BinaryMetadataValue}, }; use url::Url; @@ -17,7 +18,7 @@ use crate::runtime; pyo3::create_exception!(temporal_sdk_bridge, RPCError, PyException); -type Client = RetryClient>; +type Client = RetryClient>; #[pyclass] pub struct ClientRef { @@ -257,7 +258,7 @@ impl TryFrom for ClientOptions { } } -impl TryFrom for temporal_client::TlsConfig { +impl TryFrom for temporalio_client::TlsConfig { type Error = PyErr; fn try_from(conf: ClientTlsConfig) -> PyResult { @@ -267,7 +268,7 @@ impl TryFrom for temporal_client::TlsConfig { client_tls_config: match (conf.client_cert, conf.client_private_key) { (None, None) => None, (Some(client_cert), Some(client_private_key)) => { - Some(temporal_client::ClientTlsConfig { + Some(temporalio_client::ClientTlsConfig { client_cert, client_private_key, }) diff --git a/temporalio/bridge/src/client_rpc_generated.rs b/temporalio/bridge/src/client_rpc_generated.rs index 659f5d8cf..e9ab11186 100644 --- a/temporalio/bridge/src/client_rpc_generated.rs +++ b/temporalio/bridge/src/client_rpc_generated.rs @@ -15,7 +15,7 @@ impl ClientRef { py: Python<'p>, call: RpcCall, ) -> PyResult> { - use temporal_client::WorkflowService; + use temporalio_client::WorkflowService; let mut retry_client = self.retry_client.clone(); self.runtime.future_into_py(py, async move { let bytes = match call.rpc.as_str() { @@ -566,7 +566,7 @@ impl ClientRef { py: Python<'p>, call: RpcCall, ) -> PyResult> { - use temporal_client::OperatorService; + use temporalio_client::OperatorService; let mut retry_client = self.retry_client.clone(); self.runtime.future_into_py(py, async move { let bytes = match call.rpc.as_str() { @@ -628,7 +628,7 @@ impl ClientRef { } fn call_cloud_service<'p>(&self, py: Python<'p>, call: RpcCall) -> PyResult> { - use temporal_client::CloudService; + use temporalio_client::CloudService; let mut retry_client = self.retry_client.clone(); self.runtime.future_into_py(py, async move { let bytes = match call.rpc.as_str() { @@ -842,7 +842,7 @@ impl ClientRef { } fn call_test_service<'p>(&self, py: Python<'p>, call: RpcCall) -> PyResult> { - use temporal_client::TestService; + use temporalio_client::TestService; let mut retry_client = self.retry_client.clone(); self.runtime.future_into_py(py, async move { let bytes = match call.rpc.as_str() { @@ -881,7 +881,7 @@ impl ClientRef { } fn call_health_service<'p>(&self, py: Python<'p>, call: RpcCall) -> PyResult> { - use temporal_client::HealthService; + use temporalio_client::HealthService; let mut retry_client = self.retry_client.clone(); self.runtime.future_into_py(py, async move { let bytes = match call.rpc.as_str() { diff --git a/temporalio/bridge/src/envconfig.rs b/temporalio/bridge/src/envconfig.rs index 9277a4588..fb0da290c 100644 --- a/temporalio/bridge/src/envconfig.rs +++ b/temporalio/bridge/src/envconfig.rs @@ -4,7 +4,7 @@ use pyo3::{ types::{PyBytes, PyDict}, }; use std::collections::HashMap; -use temporal_sdk_core_api::envconfig::{ +use temporalio_common::envconfig::{ load_client_config as core_load_client_config, load_client_config_profile as core_load_client_config_profile, ClientConfig as CoreClientConfig, ClientConfigCodec, diff --git a/temporalio/bridge/src/lib.rs b/temporalio/bridge/src/lib.rs index cbd5be10e..ee157fb18 100644 --- a/temporalio/bridge/src/lib.rs +++ b/temporalio/bridge/src/lib.rs @@ -82,8 +82,8 @@ fn new_metric_meter(runtime_ref: &runtime::RuntimeRef) -> Option PyResult { - runtime::init_runtime(telemetry_config) +fn init_runtime(options: runtime::RuntimeOptions) -> PyResult { + runtime::init_runtime(options) } #[pyfunction] diff --git a/temporalio/bridge/src/metric.rs b/temporalio/bridge/src/metric.rs index bc516a08a..0eb5afd60 100644 --- a/temporalio/bridge/src/metric.rs +++ b/temporalio/bridge/src/metric.rs @@ -4,7 +4,7 @@ use std::{collections::HashMap, sync::Arc}; use pyo3::prelude::*; use pyo3::{exceptions::PyTypeError, types::PyDict}; -use temporal_sdk_core_api::telemetry::metrics::{ +use temporalio_common::telemetry::metrics::{ self, BufferInstrumentRef, CustomMetricAttributes, MetricEvent, NewAttributes, }; diff --git a/temporalio/bridge/src/runtime.rs b/temporalio/bridge/src/runtime.rs index 72cc905ae..98aa9fd57 100644 --- a/temporalio/bridge/src/runtime.rs +++ b/temporalio/bridge/src/runtime.rs @@ -9,13 +9,13 @@ use std::pin::Pin; use std::str::FromStr; use std::sync::Arc; use std::time::{Duration, UNIX_EPOCH}; -use temporal_sdk_core::telemetry::{ +use temporalio_sdk_core::telemetry::{ build_otlp_metric_exporter, start_prometheus_metric_exporter, CoreLogStreamConsumer, MetricsCallBuffer, }; -use temporal_sdk_core::{CoreRuntime, TokioRuntimeBuilder}; -use temporal_sdk_core_api::telemetry::metrics::{CoreMeter, MetricCallBufferer}; -use temporal_sdk_core_api::telemetry::{ +use temporalio_sdk_core::{CoreRuntime, RuntimeOptionsBuilder, TokioRuntimeBuilder}; +use temporalio_common::telemetry::metrics::{CoreMeter, MetricCallBufferer}; +use temporalio_common::telemetry::{ CoreLog, Logger, MetricTemporality, OtelCollectorOptionsBuilder, OtlpProtocol, PrometheusExporterOptionsBuilder, TelemetryOptionsBuilder, }; @@ -86,16 +86,27 @@ pub struct PrometheusConfig { histogram_bucket_overrides: Option>>, } +#[derive(FromPyObject)] +pub struct RuntimeOptions { + telemetry: TelemetryConfig, + worker_heartbeat_interval_millis: Option, +} + const FORWARD_LOG_BUFFER_SIZE: usize = 2048; const FORWARD_LOG_MAX_FREQ_MS: u64 = 10; -pub fn init_runtime(telemetry_config: TelemetryConfig) -> PyResult { +pub fn init_runtime(options: RuntimeOptions) -> PyResult { + let RuntimeOptions { + telemetry: TelemetryConfig { logging, metrics }, + worker_heartbeat_interval_millis, + } = options; + // Have to build/start telemetry config pieces let mut telemetry_build = TelemetryOptionsBuilder::default(); // Build logging config, capturing forwarding info to start later let mut log_forwarding: Option<(Receiver, PyObject)> = None; - if let Some(logging_conf) = telemetry_config.logging { + if let Some(logging_conf) = logging { telemetry_build.logging(if let Some(forward_to) = logging_conf.forward_to { // Note, actual log forwarding is started later let (consumer, stream) = CoreLogStreamConsumer::new(FORWARD_LOG_BUFFER_SIZE); @@ -113,26 +124,31 @@ pub fn init_runtime(telemetry_config: TelemetryConfig) -> PyResult { // Build metric config, but actual metrics instance is late-bound after // CoreRuntime is created since it needs Tokio runtime - if let Some(metrics_conf) = telemetry_config.metrics.as_ref() { + if let Some(metrics_conf) = metrics.as_ref() { telemetry_build.attach_service_name(metrics_conf.attach_service_name); if let Some(prefix) = &metrics_conf.metric_prefix { telemetry_build.metric_prefix(prefix.to_string()); } } + let runtime_options = RuntimeOptionsBuilder::default() + .telemetry_options( + telemetry_build + .build() + .map_err(|err| PyValueError::new_err(format!("Invalid telemetry config: {err}")))?, + ) + .heartbeat_interval(worker_heartbeat_interval_millis.map(Duration::from_millis)) + .build() + .map_err(|err| PyValueError::new_err(format!("Invalid runtime options: {err}")))?; + // Create core runtime which starts tokio multi-thread runtime - let mut core = CoreRuntime::new( - telemetry_build - .build() - .map_err(|err| PyValueError::new_err(format!("Invalid telemetry config: {err}")))?, - TokioRuntimeBuilder::default(), - ) - .map_err(|err| PyRuntimeError::new_err(format!("Failed initializing telemetry: {err}")))?; + let mut core = CoreRuntime::new(runtime_options, TokioRuntimeBuilder::default()) + .map_err(|err| PyRuntimeError::new_err(format!("Failed initializing telemetry: {err}")))?; // We late-bind the metrics after core runtime is created since it needs // the Tokio handle let mut metrics_call_buffer: Option>> = None; - if let Some(metrics_conf) = telemetry_config.metrics { + if let Some(metrics_conf) = metrics { let _guard = core.tokio_handle().enter(); // If they want buffered, cannot have Prom/OTel and we make buffered if metrics_conf.buffered_with_size > 0 { @@ -353,7 +369,7 @@ impl TryFrom for Arc { } if let Some(overrides) = prom_conf.histogram_bucket_overrides { build.histogram_bucket_overrides( - temporal_sdk_core_api::telemetry::HistogramBucketOverrides { overrides }, + temporalio_common::telemetry::HistogramBucketOverrides { overrides }, ); } let prom_options = build.build().map_err(|err| { diff --git a/temporalio/bridge/src/testing.rs b/temporalio/bridge/src/testing.rs index 04eea1286..5df3ee24d 100644 --- a/temporalio/bridge/src/testing.rs +++ b/temporalio/bridge/src/testing.rs @@ -2,7 +2,7 @@ use std::time::Duration; use pyo3::exceptions::{PyRuntimeError, PyValueError}; use pyo3::prelude::*; -use temporal_sdk_core::ephemeral_server; +use temporalio_sdk_core::ephemeral_server; use crate::runtime; diff --git a/temporalio/bridge/src/worker.rs b/temporalio/bridge/src/worker.rs index 92b43f356..acf260bc1 100644 --- a/temporalio/bridge/src/worker.rs +++ b/temporalio/bridge/src/worker.rs @@ -10,19 +10,20 @@ use std::collections::HashSet; use std::marker::PhantomData; use std::sync::{Arc, OnceLock}; use std::time::Duration; -use temporal_sdk_core::api::errors::PollError; -use temporal_sdk_core::replay::{HistoryForReplay, ReplayWorkerInput}; -use temporal_sdk_core_api::errors::WorkflowErrorType; -use temporal_sdk_core_api::worker::{ +use temporalio_common::errors::PollError; +use temporalio_sdk_core::replay::{HistoryForReplay, ReplayWorkerInput}; +use temporalio_common::errors::WorkflowErrorType; +use temporalio_common::worker::{ SlotInfo, SlotInfoTrait, SlotKind, SlotKindType, SlotMarkUsedContext, SlotReleaseContext, SlotReservationContext, SlotSupplier as SlotSupplierTrait, SlotSupplierPermit, }; -use temporal_sdk_core_api::Worker; -use temporal_sdk_core_protos::coresdk::workflow_completion::WorkflowActivationCompletion; -use temporal_sdk_core_protos::coresdk::{ +use temporalio_common::Worker; +use temporalio_common::protos::coresdk::workflow_completion::WorkflowActivationCompletion; +use temporalio_common::protos::coresdk::{ nexus::NexusTaskCompletion, ActivityHeartbeat, ActivityTaskCompletion, }; -use temporal_sdk_core_protos::temporal::api::history::v1::History; +use temporalio_common::protos::temporal::api::history::v1::History; +use temporalio_common::protos::temporal::api::worker::v1::PluginInfo; use tokio::sync::mpsc::{channel, Sender}; use tokio_stream::wrappers::ReceiverStream; use tracing::error; @@ -34,7 +35,7 @@ pyo3::create_exception!(temporal_sdk_bridge, PollShutdownError, PyException); #[pyclass] pub struct WorkerRef { - worker: Option>, + worker: Option>, /// Set upon the call to `validate`, with the task locals for the event loop at that time, which /// is whatever event loop the user is running their worker in. This loop might be needed by /// other rust-created threads that want to run async python code. @@ -63,6 +64,7 @@ pub struct WorkerConfig { nondeterminism_as_workflow_fail: bool, nondeterminism_as_workflow_fail_for_types: HashSet, nexus_task_poller_behavior: PollerBehavior, + plugins: Vec, } #[derive(FromPyObject)] @@ -77,21 +79,21 @@ pub struct PollerBehaviorAutoscaling { pub initial: usize, } -/// Recreates [temporal_sdk_core_api::worker::PollerBehavior] +/// Recreates [temporalio_common::worker::PollerBehavior] #[derive(FromPyObject)] pub enum PollerBehavior { SimpleMaximum(PollerBehaviorSimpleMaximum), Autoscaling(PollerBehaviorAutoscaling), } -impl From for temporal_sdk_core_api::worker::PollerBehavior { +impl From for temporalio_common::worker::PollerBehavior { fn from(value: PollerBehavior) -> Self { match value { PollerBehavior::SimpleMaximum(simple) => { - temporal_sdk_core_api::worker::PollerBehavior::SimpleMaximum(simple.simple_maximum) + temporalio_common::worker::PollerBehavior::SimpleMaximum(simple.simple_maximum) } PollerBehavior::Autoscaling(auto) => { - temporal_sdk_core_api::worker::PollerBehavior::Autoscaling { + temporalio_common::worker::PollerBehavior::Autoscaling { minimum: auto.minimum, maximum: auto.maximum, initial: auto.initial, @@ -101,7 +103,7 @@ impl From for temporal_sdk_core_api::worker::PollerBehavior { } } -/// Recreates [temporal_sdk_core_api::worker::WorkerVersioningStrategy] +/// Recreates [temporalio_common::worker::WorkerVersioningStrategy] #[derive(FromPyObject)] pub enum WorkerVersioningStrategy { None(WorkerVersioningNone), @@ -114,7 +116,7 @@ pub struct WorkerVersioningNone { pub build_id_no_versioning: String, } -/// Recreates [temporal_sdk_core_api::worker::WorkerDeploymentOptions] +/// Recreates [temporalio_common::worker::WorkerDeploymentOptions] #[derive(FromPyObject)] pub struct WorkerDeploymentOptions { pub version: WorkerDeploymentVersion, @@ -128,15 +130,15 @@ pub struct LegacyBuildIdBased { pub build_id_with_versioning: String, } -/// Recreates [temporal_sdk_core_api::worker::WorkerDeploymentVersion] +/// Recreates [temporalio_common::worker::WorkerDeploymentVersion] #[derive(FromPyObject, IntoPyObject, Clone)] pub struct WorkerDeploymentVersion { pub deployment_name: String, pub build_id: String, } -impl From for WorkerDeploymentVersion { - fn from(version: temporal_sdk_core_api::worker::WorkerDeploymentVersion) -> Self { +impl From for WorkerDeploymentVersion { + fn from(version: temporalio_common::worker::WorkerDeploymentVersion) -> Self { WorkerDeploymentVersion { deployment_name: version.deployment_name, build_id: version.build_id, @@ -462,7 +464,7 @@ pub struct ResourceBasedTunerConfig { macro_rules! enter_sync { ($runtime:expr) => { if let Some(subscriber) = $runtime.core.telemetry().trace_subscriber() { - temporal_sdk_core::telemetry::set_trace_subscriber_for_current_thread(subscriber); + temporalio_sdk_core::telemetry::set_trace_subscriber_for_current_thread(subscriber); } let _guard = $runtime.core.tokio_handle().enter(); }; @@ -476,7 +478,7 @@ pub fn new_worker( enter_sync!(runtime_ref.runtime); let event_loop_task_locals = Arc::new(OnceLock::new()); let config = convert_worker_config(config, event_loop_task_locals.clone())?; - let worker = temporal_sdk_core::init_worker( + let worker = temporalio_sdk_core::init_worker( &runtime_ref.runtime.core, config, client.retry_client.clone().into_inner(), @@ -500,7 +502,7 @@ pub fn new_replay_worker<'a>( let (history_pusher, stream) = HistoryPusher::new(runtime_ref.runtime.clone()); let worker = WorkerRef { worker: Some(Arc::new( - temporal_sdk_core::init_replay_worker(ReplayWorkerInput::new(config, stream)).map_err( + temporalio_sdk_core::init_replay_worker(ReplayWorkerInput::new(config, stream)).map_err( |err| PyValueError::new_err(format!("Failed creating replay worker: {err}")), )?, )), @@ -644,11 +646,12 @@ impl WorkerRef { Ok(()) } - fn replace_client(&self, client: &client::ClientRef) { + fn replace_client(&self, client: &client::ClientRef) -> PyResult<()> { self.worker .as_ref() .expect("missing worker") - .replace_client(client.retry_client.clone().into_inner()); + .replace_client(client.retry_client.clone().into_inner()) + .map_err(|err| PyValueError::new_err(format!("Failed replacing client: {err}"))) } fn initiate_shutdown(&self) -> PyResult<()> { @@ -676,10 +679,10 @@ impl WorkerRef { fn convert_worker_config( conf: WorkerConfig, task_locals: Arc>, -) -> PyResult { +) -> PyResult { let converted_tuner = convert_tuner_holder(conf.tuner, task_locals)?; let converted_versioning_strategy = convert_versioning_strategy(conf.versioning_strategy); - temporal_sdk_core::WorkerConfigBuilder::default() + temporalio_sdk_core::WorkerConfigBuilder::default() .namespace(conf.namespace) .task_queue(conf.task_queue) .versioning_strategy(converted_versioning_strategy) @@ -722,6 +725,15 @@ fn convert_worker_config( .collect::>>(), ) .nexus_task_poller_behavior(conf.nexus_task_poller_behavior) + .plugins( + conf.plugins + .into_iter() + .map(|name| PluginInfo { + name, + version: String::new(), + }) + .collect::>(), + ) .build() .map_err(|err| PyValueError::new_err(format!("Invalid worker config: {err}"))) } @@ -729,7 +741,7 @@ fn convert_worker_config( fn convert_tuner_holder( holder: TunerHolder, task_locals: Arc>, -) -> PyResult { +) -> PyResult { // Verify all resource-based options are the same if any are set let maybe_wf_resource_opts = if let SlotSupplier::ResourceBased(ref ss) = holder.workflow_slot_supplier { @@ -774,10 +786,10 @@ fn convert_tuner_holder( )); } - let mut options = temporal_sdk_core::TunerHolderOptionsBuilder::default(); + let mut options = temporalio_sdk_core::TunerHolderOptionsBuilder::default(); if let Some(first) = first { options.resource_based_options( - temporal_sdk_core::ResourceBasedSlotsOptionsBuilder::default() + temporalio_sdk_core::ResourceBasedSlotsOptionsBuilder::default() .target_mem_usage(first.target_memory_usage) .target_cpu_usage(first.target_cpu_usage) .build() @@ -811,19 +823,19 @@ fn convert_tuner_holder( fn convert_slot_supplier( supplier: SlotSupplier, task_locals: Arc>, -) -> PyResult> { +) -> PyResult> { Ok(match supplier { - SlotSupplier::FixedSize(fs) => temporal_sdk_core::SlotSupplierOptions::FixedSize { + SlotSupplier::FixedSize(fs) => temporalio_sdk_core::SlotSupplierOptions::FixedSize { slots: fs.num_slots, }, - SlotSupplier::ResourceBased(ss) => temporal_sdk_core::SlotSupplierOptions::ResourceBased( - temporal_sdk_core::ResourceSlotOptions::new( + SlotSupplier::ResourceBased(ss) => temporalio_sdk_core::SlotSupplierOptions::ResourceBased( + temporalio_sdk_core::ResourceSlotOptions::new( ss.minimum_slots, ss.maximum_slots, Duration::from_millis(ss.ramp_throttle_ms), ), ), - SlotSupplier::Custom(cs) => temporal_sdk_core::SlotSupplierOptions::Custom(Arc::new( + SlotSupplier::Custom(cs) => temporalio_sdk_core::SlotSupplierOptions::Custom(Arc::new( CustomSlotSupplierOfType:: { inner: cs.inner, event_loop_task_locals: task_locals, @@ -835,17 +847,17 @@ fn convert_slot_supplier( fn convert_versioning_strategy( strategy: WorkerVersioningStrategy, -) -> temporal_sdk_core_api::worker::WorkerVersioningStrategy { +) -> temporalio_common::worker::WorkerVersioningStrategy { match strategy { WorkerVersioningStrategy::None(vn) => { - temporal_sdk_core_api::worker::WorkerVersioningStrategy::None { + temporalio_common::worker::WorkerVersioningStrategy::None { build_id: vn.build_id_no_versioning, } } WorkerVersioningStrategy::DeploymentBased(options) => { - temporal_sdk_core_api::worker::WorkerVersioningStrategy::WorkerDeploymentBased( - temporal_sdk_core_api::worker::WorkerDeploymentOptions { - version: temporal_sdk_core_api::worker::WorkerDeploymentVersion { + temporalio_common::worker::WorkerVersioningStrategy::WorkerDeploymentBased( + temporalio_common::worker::WorkerDeploymentOptions { + version: temporalio_common::worker::WorkerDeploymentVersion { deployment_name: options.version.deployment_name, build_id: options.version.build_id, }, @@ -860,7 +872,7 @@ fn convert_versioning_strategy( ) } WorkerVersioningStrategy::LegacyBuildIdBased(lb) => { - temporal_sdk_core_api::worker::WorkerVersioningStrategy::LegacyBuildIdBased { + temporalio_common::worker::WorkerVersioningStrategy::LegacyBuildIdBased { build_id: lb.build_id_with_versioning, } } diff --git a/temporalio/bridge/worker.py b/temporalio/bridge/worker.py index 8e20b670a..fbcfc9c3c 100644 --- a/temporalio/bridge/worker.py +++ b/temporalio/bridge/worker.py @@ -64,6 +64,7 @@ class WorkerConfig: nondeterminism_as_workflow_fail: bool nondeterminism_as_workflow_fail_for_types: Set[str] nexus_task_poller_behavior: PollerBehavior + plugins: Sequence[str] @dataclass diff --git a/temporalio/client.py b/temporalio/client.py index 770a51392..336645375 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -350,6 +350,11 @@ def api_key(self, value: Optional[str]) -> None: self.service_client.config.api_key = value self.service_client.update_api_key(value) + @property + def plugins(self) -> Sequence[Plugin]: + """Plugins used by this client.""" + return self._config["plugins"] + # Overload for no-param workflow @overload async def start_workflow( diff --git a/temporalio/runtime.py b/temporalio/runtime.py index 64fa12192..3047f0b51 100644 --- a/temporalio/runtime.py +++ b/temporalio/runtime.py @@ -70,14 +70,42 @@ def set_default(runtime: Runtime, *, error_if_already_set: bool = True) -> None: raise RuntimeError("Runtime default already set") _default_runtime = runtime - def __init__(self, *, telemetry: TelemetryConfig) -> None: - """Create a default runtime with the given telemetry config. + def __init__( + self, + *, + telemetry: TelemetryConfig, + worker_heartbeat_interval: Optional[timedelta] = timedelta(seconds=60), + ) -> None: + """Create a runtime with the provided configuration. Each new runtime creates a new internal thread pool, so use sparingly. + + Args: + telemetry: Telemetry configuration when not supplying + ``runtime_options``. + worker_heartbeat_interval: Interval for worker heartbeats. ``None`` + disables heartbeating. + + Raises: + ValueError: If both ```runtime_options`` is a negative value. """ - self._core_runtime = temporalio.bridge.runtime.Runtime( - telemetry=telemetry._to_bridge_config() + if worker_heartbeat_interval is None: + heartbeat_millis = None + else: + if worker_heartbeat_interval <= timedelta(0): + raise ValueError("worker_heartbeat_interval must be positive") + heartbeat_millis = int(worker_heartbeat_interval.total_seconds() * 1000) + if heartbeat_millis == 0: + heartbeat_millis = 1 + + self._heartbeat_millis = heartbeat_millis + + runtime_options = temporalio.bridge.runtime.RuntimeOptions( + telemetry=telemetry._to_bridge_config(), + worker_heartbeat_interval_millis=heartbeat_millis, ) + + self._core_runtime = temporalio.bridge.runtime.Runtime(options=runtime_options) if isinstance(telemetry.metrics, MetricBuffer): telemetry.metrics._runtime = self core_meter = temporalio.bridge.metric.MetricMeter.create(self._core_runtime) diff --git a/temporalio/worker/_replayer.py b/temporalio/worker/_replayer.py index 04a32be17..df5a4087c 100644 --- a/temporalio/worker/_replayer.py +++ b/temporalio/worker/_replayer.py @@ -293,6 +293,7 @@ def on_eviction_hook( nexus_task_poller_behavior=temporalio.bridge.worker.PollerBehaviorSimpleMaximum( 1 ), + plugins=[plugin.name() for plugin in self.plugins], ), ) # Start worker diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index fc2c2241d..d67925600 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -376,6 +376,7 @@ def __init__( f"The same plugin type {type(client_plugin)} is present from both client and worker. It may run twice and may not be the intended behavior." ) plugins = plugins_from_client + list(plugins) + config["plugins"] = plugins self.plugins = plugins for plugin in plugins: @@ -555,6 +556,10 @@ def check_activity(activity): maximum=config["max_concurrent_activity_task_polls"] ) + worker_plugins = [plugin.name() for plugin in config.get("plugins", [])] + client_plugins = [plugin.name() for plugin in config["client"].plugins] + plugins = list(set(worker_plugins + client_plugins)) + # Create bridge worker last. We have empirically observed that if it is # created before an error is raised from the activity worker # constructor, a deadlock/hang will occur presumably while trying to @@ -609,6 +614,7 @@ def check_activity(activity): nexus_task_poller_behavior=config[ "nexus_task_poller_behavior" ]._to_bridge(), + plugins=plugins, ), ) @@ -902,6 +908,7 @@ class WorkerConfig(TypedDict, total=False): workflow_task_poller_behavior: PollerBehavior activity_task_poller_behavior: PollerBehavior nexus_task_poller_behavior: PollerBehavior + plugins: Sequence[Plugin] def _warn_if_activity_executor_max_workers_is_inconsistent( diff --git a/tests/conftest.py b/tests/conftest.py index 8ffd3a456..177e2ab51 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -6,6 +6,8 @@ import pytest import pytest_asyncio +import temporalio.worker + from . import DEV_SERVER_DOWNLOAD_VERSION # If there is an integration test environment variable set, we must remove the diff --git a/tests/test_plugins.py b/tests/test_plugins.py index 5571841b4..f4b98397a 100644 --- a/tests/test_plugins.py +++ b/tests/test_plugins.py @@ -1,11 +1,13 @@ import dataclasses import uuid import warnings +from collections import Counter from contextlib import AbstractAsyncContextManager, asynccontextmanager from typing import AsyncIterator, Awaitable, Callable, Optional, cast import pytest +import temporalio.bridge.temporal_sdk_bridge import temporalio.client import temporalio.converter import temporalio.worker @@ -149,6 +151,9 @@ async def test_worker_plugin_basic_config(client: Client) -> None: plugins=[MyWorkerPlugin()], ) assert worker.config().get("task_queue") == "replaced_queue" + assert [p.name() for p in worker.config().get("plugins", [])] == [ + MyWorkerPlugin().name() + ] # Test client plugin propagation to worker plugins new_config = client.config() @@ -156,6 +161,9 @@ async def test_worker_plugin_basic_config(client: Client) -> None: client = Client(**new_config) worker = Worker(client, task_queue="queue", activities=[never_run_activity]) assert worker.config().get("task_queue") == "combined" + assert [p.name() for p in worker.config().get("plugins", [])] == [ + MyCombinedPlugin().name() + ] # Test both. Client propagated plugins are called first, so the worker plugin overrides in this case worker = Worker( @@ -165,6 +173,40 @@ async def test_worker_plugin_basic_config(client: Client) -> None: plugins=[MyWorkerPlugin()], ) assert worker.config().get("task_queue") == "replaced_queue" + assert [p.name() for p in worker.config().get("plugins", [])] == [ + MyCombinedPlugin().name(), + MyWorkerPlugin().name(), + ] + + +async def test_worker_plugin_names_forwarded_to_core( + client: Client, monkeypatch: pytest.MonkeyPatch +) -> None: + captured_plugins: list[str] = [] + + original_new_worker = temporalio.bridge.temporal_sdk_bridge.new_worker + + def new_worker_wrapper(runtime_ref, client_ref, config): + nonlocal captured_plugins + captured_plugins = list(config.plugins) + return original_new_worker(runtime_ref, client_ref, config) + + monkeypatch.setattr( + temporalio.bridge.temporal_sdk_bridge, + "new_worker", + new_worker_wrapper, + ) + + plugin1 = SimplePlugin("test-worker-plugin1") + plugin2 = SimplePlugin("test-worker-plugin2") + worker = Worker( + client, + task_queue="queue", + activities=[never_run_activity], + plugins=[plugin1, plugin2], + ) + # Use counter to compare unordered lists + assert Counter(captured_plugins) == Counter([plugin1.name(), plugin2.name()]) async def test_worker_duplicated_plugin(client: Client) -> None: diff --git a/tests/test_runtime.py b/tests/test_runtime.py index 4505ebfcf..98b0f884a 100644 --- a/tests/test_runtime.py +++ b/tests/test_runtime.py @@ -7,6 +7,8 @@ from typing import List, cast from urllib.request import urlopen +import pytest + from temporalio import workflow from temporalio.client import Client from temporalio.runtime import ( @@ -18,7 +20,12 @@ TelemetryFilter, ) from temporalio.worker import Worker -from tests.helpers import assert_eq_eventually, assert_eventually, find_free_port +from tests.helpers import ( + assert_eq_eventually, + assert_eventually, + find_free_port, + worker_versioning_enabled, +) @workflow.defn @@ -254,3 +261,27 @@ async def check_metrics() -> None: # Wait for metrics to appear and match the expected buckets await assert_eventually(check_metrics) + + +def test_runtime_options_to_bridge_config() -> None: + runtime = Runtime(telemetry=TelemetryConfig()) + assert runtime._heartbeat_millis == 60_000 + + runtime = Runtime( + telemetry=TelemetryConfig(), + worker_heartbeat_interval=timedelta(seconds=10), + ) + assert runtime._heartbeat_millis == 10_000 + + runtime = Runtime( + telemetry=TelemetryConfig(), + worker_heartbeat_interval=None, + ) + assert runtime._heartbeat_millis is None + + +def test_runtime_options_invalid_heartbeat() -> None: + with pytest.raises(ValueError): + Runtime( + telemetry=TelemetryConfig(), worker_heartbeat_interval=timedelta(seconds=-5) + ) diff --git a/tests/worker/test_worker.py b/tests/worker/test_worker.py index 32f27f631..116fe4336 100644 --- a/tests/worker/test_worker.py +++ b/tests/worker/test_worker.py @@ -17,6 +17,7 @@ from temporalio.api.workflowservice.v1 import ( DescribeWorkerDeploymentRequest, DescribeWorkerDeploymentResponse, + ListWorkersRequest, SetWorkerDeploymentCurrentVersionRequest, SetWorkerDeploymentCurrentVersionResponse, SetWorkerDeploymentRampingVersionRequest, @@ -28,7 +29,11 @@ TaskReachabilityType, ) from temporalio.common import PinnedVersioningOverride, RawValue, VersioningBehavior -from temporalio.runtime import PrometheusConfig, Runtime, TelemetryConfig +from temporalio.runtime import ( + PrometheusConfig, + Runtime, + TelemetryConfig, +) from temporalio.service import RPCError from temporalio.testing import WorkflowEnvironment from temporalio.worker import ( @@ -1133,7 +1138,9 @@ async def test_can_run_autoscaling_polling_worker( activity_pollers = [l for l in matches if "activity_task" in l] assert len(activity_pollers) == 1 assert activity_pollers[0].endswith("2") - workflow_pollers = [l for l in matches if "workflow_task" in l] + workflow_pollers = [ + l for l in matches if "workflow_task" in l and w.task_queue in l + ] assert len(workflow_pollers) == 2 # There's sticky & non-sticky pollers, and they may have a count of 1 or 2 depending on # initialization timing. diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 220bb5e2b..ebfb3f04a 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -5210,57 +5210,19 @@ async def run(self) -> None: await asyncio.sleep(0.1) -async def test_workflow_replace_worker_client(client: Client, env: WorkflowEnvironment): - if env.supports_time_skipping: - pytest.skip("Only testing against two real servers") - # We are going to start a second ephemeral server and then replace the - # client. So we will start a no-cache ticking workflow with the current - # client and confirm it has accomplished at least one task. Then we will - # start another on the other client, and confirm it gets started too. Then - # we will terminate both. We have to use a ticking workflow with only one - # poller to force a quick re-poll to recognize our client change quickly (as - # opposed to just waiting the minute for poll timeout). - async with await WorkflowEnvironment.start_local( - dev_server_download_version=DEV_SERVER_DOWNLOAD_VERSION - ) as other_env: - # Start both workflows on different servers - task_queue = f"tq-{uuid.uuid4()}" - handle1 = await client.start_workflow( - TickingWorkflow.run, id=f"wf-{uuid.uuid4()}", task_queue=task_queue - ) - handle2 = await other_env.client.start_workflow( - TickingWorkflow.run, id=f"wf-{uuid.uuid4()}", task_queue=task_queue - ) - - async def any_task_completed(handle: WorkflowHandle) -> bool: - async for e in handle.fetch_history_events(): - if e.HasField("workflow_task_completed_event_attributes"): - return True - return False - - # Now start the worker on the first env - async with Worker( - client, - task_queue=task_queue, - workflows=[TickingWorkflow], - max_cached_workflows=0, - max_concurrent_workflow_task_polls=1, - ) as worker: - # Confirm the first ticking workflow has completed a task but not - # the second - await assert_eq_eventually(True, lambda: any_task_completed(handle1)) - assert not await any_task_completed(handle2) - - # Now replace the client, which should be used fairly quickly - # because we should have timer-done poll completions every 100ms - worker.client = other_env.client - - # Now confirm the other workflow has started - await assert_eq_eventually(True, lambda: any_task_completed(handle2)) - - # Terminate both - await handle1.terminate() - await handle2.terminate() +async def test_workflow_replace_worker_client(client: Client): + other_runtime = Runtime(telemetry=TelemetryConfig()) + other_client = await Client.connect( + client.service_client.config.target_host, + namespace=client.namespace, + runtime=other_runtime, + ) + async with new_worker(client, HelloWorkflow) as worker: + with pytest.raises( + ValueError, + match="New client is not on the same runtime as the existing client", + ): + worker.client = other_client @activity.defn(dynamic=True)