From ada2e54df9c33051a014c230f98f38cf88c6ae73 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Thu, 30 Oct 2025 04:24:45 -0700 Subject: [PATCH 1/9] gate compilation at module and call site, use dbg_panic macro at state machines instead of direct calls to antithesis assert_always (dbg_panic just logs in non-release builds with no antithesis_assertions feature flag anyways) --- crates/sdk-core-c-bridge/Cargo.toml | 1 + crates/sdk-core/Cargo.toml | 2 ++ crates/sdk-core/src/abstractions.rs | 21 ++++++++++--- crates/sdk-core/src/antithesis.rs | 30 +++++++++++++++++++ crates/sdk-core/src/lib.rs | 2 ++ .../machines/activity_state_machine.rs | 5 +++- .../machines/child_workflow_state_machine.rs | 6 +++- .../complete_workflow_state_machine.rs | 7 +++-- crates/sdk/Cargo.toml | 4 +++ 9 files changed, 70 insertions(+), 8 deletions(-) create mode 100644 crates/sdk-core/src/antithesis.rs diff --git a/crates/sdk-core-c-bridge/Cargo.toml b/crates/sdk-core-c-bridge/Cargo.toml index 30df33eed..ea4e9d4d9 100644 --- a/crates/sdk-core-c-bridge/Cargo.toml +++ b/crates/sdk-core-c-bridge/Cargo.toml @@ -55,4 +55,5 @@ thiserror = { workspace = true } cbindgen = { version = "0.29", default-features = false } [features] +antithesis_assertions = ["temporalio-sdk-core/antithesis_assertions"] xz2-static = ["xz2/static"] diff --git a/crates/sdk-core/Cargo.toml b/crates/sdk-core/Cargo.toml index c57a32ef2..c4d6d964f 100644 --- a/crates/sdk-core/Cargo.toml +++ b/crates/sdk-core/Cargo.toml @@ -27,9 +27,11 @@ tokio-console = ["console-subscriber"] ephemeral-server = ["dep:flate2", "dep:reqwest", "dep:tar", "dep:zip"] debug-plugin = ["dep:reqwest"] test-utilities = ["dep:assert_matches", "dep:bimap"] +antithesis_assertions = ["dep:antithesis_sdk"] [dependencies] anyhow = "1.0" +antithesis_sdk = { version = "0.2.1", optional = true, default-features = false, features = ["full"] } assert_matches = { version = "1.5", optional = true } bimap = { version = "0.6.3", optional = true } async-trait = "0.1" diff --git a/crates/sdk-core/src/abstractions.rs b/crates/sdk-core/src/abstractions.rs index f246ab40a..d6f6956af 100644 --- a/crates/sdk-core/src/abstractions.rs +++ b/crates/sdk-core/src/abstractions.rs @@ -423,10 +423,23 @@ impl OwnedMeteredSemPermit { pub(crate) struct UsedMeteredSemPermit(#[allow(dead_code)] OwnedMeteredSemPermit); macro_rules! dbg_panic { - ($($arg:tt)*) => { - error!($($arg)*); - debug_assert!(false, $($arg)*); - }; + ($($arg:tt)*) => {{ + let message = format!($($arg)*); + error!("{}", message); + + #[cfg(feature = "antithesis_assertions")] + crate::antithesis::assert_always!( + false, + "dbg_panic invariant triggered", + ::serde_json::json!({ + "message": message, + "file": file!(), + "line": line!(), + "module": module_path!(), + }) + ); + debug_assert!(false, "{}", message); + }}; } pub(crate) use dbg_panic; diff --git a/crates/sdk-core/src/antithesis.rs b/crates/sdk-core/src/antithesis.rs new file mode 100644 index 000000000..e6342784a --- /dev/null +++ b/crates/sdk-core/src/antithesis.rs @@ -0,0 +1,30 @@ +//! Antithesis SDK integration for invariant testing. +//! +//! This module provides assertion macros that integrate with the Antithesis +//! testing platform to detect invariant violations during fuzz testing. + +use std::sync::OnceLock; + +/// Ensure Antithesis is initialized exactly once. +pub(crate) fn ensure_init() { + static INIT: OnceLock<()> = OnceLock::new(); + INIT.get_or_init(|| { + ::antithesis_sdk::antithesis_init(); + }); +} + +/// Assert that a condition is always true during Antithesis fuzz testing. +/// Use `false` as the condition to log an invariant violation. +macro_rules! assert_always { + ($condition:expr, $message:literal, $details:expr) => {{ + $crate::antithesis::ensure_init(); + let details: ::serde_json::Value = $details; + ::antithesis_sdk::assert_always!($condition, $message, &details); + }}; + ($condition:expr, $message:literal) => {{ + $crate::antithesis::ensure_init(); + ::antithesis_sdk::assert_always!($condition, $message); + }}; +} + +pub(crate) use assert_always; diff --git a/crates/sdk-core/src/lib.rs b/crates/sdk-core/src/lib.rs index 1832f4a9c..663af8fd8 100644 --- a/crates/sdk-core/src/lib.rs +++ b/crates/sdk-core/src/lib.rs @@ -12,6 +12,8 @@ extern crate tracing; extern crate core; mod abstractions; +#[cfg(feature = "antithesis_assertions")] +mod antithesis; #[cfg(feature = "debug-plugin")] pub mod debug_client; #[cfg(feature = "ephemeral-server")] diff --git a/crates/sdk-core/src/worker/workflow/machines/activity_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/activity_state_machine.rs index 03ae9627f..a45fb3966 100644 --- a/crates/sdk-core/src/worker/workflow/machines/activity_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/activity_state_machine.rs @@ -193,7 +193,10 @@ impl ActivityMachine { ActivityMachineCommand::Cancel(details) => { vec![self.create_cancelation_resolve(details).into()] } - x => panic!("Invalid cancel event response {x:?}"), + x => { + dbg_panic!("Invalid cancel event response {x:?}"); + panic!("Invalid cancel event response {x:?}"); + } }) .collect(); Ok(res) diff --git a/crates/sdk-core/src/worker/workflow/machines/child_workflow_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/child_workflow_state_machine.rs index d87325141..69b499882 100644 --- a/crates/sdk-core/src/worker/workflow/machines/child_workflow_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/child_workflow_state_machine.rs @@ -3,6 +3,7 @@ use super::{ WFMachinesAdapter, WFMachinesError, fsm, workflow_machines::MachineResponse, }; use crate::{ + abstractions::dbg_panic, internal_flags::CoreInternalFlags, worker::workflow::{InternalFlagsRef, machines::HistEventData}, }; @@ -496,7 +497,10 @@ impl ChildWorkflowMachine { | c @ ChildWorkflowCommand::IssueCancelAfterStarted { .. } => { self.adapt_response(c, None) } - x => panic!("Invalid cancel event response {x:?}"), + x => { + dbg_panic!("Invalid cancel event response {x:?}"); + panic!("Invalid cancel event response {x:?}"); + } }) .flatten_ok() .try_collect()?; diff --git a/crates/sdk-core/src/worker/workflow/machines/complete_workflow_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/complete_workflow_state_machine.rs index cbdc9d015..d70b16011 100644 --- a/crates/sdk-core/src/worker/workflow/machines/complete_workflow_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/complete_workflow_state_machine.rs @@ -2,7 +2,7 @@ use super::{ EventInfo, NewMachineWithCommand, OnEventWrapper, StateMachine, TransitionResult, WFMachinesAdapter, WFMachinesError, fsm, workflow_machines::MachineResponse, }; -use crate::worker::workflow::machines::HistEventData; +use crate::{abstractions::dbg_panic, worker::workflow::machines::HistEventData}; use std::convert::TryFrom; use temporalio_common::protos::{ coresdk::workflow_commands::CompleteWorkflowExecution, @@ -41,7 +41,10 @@ pub(super) fn complete_workflow(attribs: CompleteWorkflowExecution) -> NewMachin .pop() { Some(CompleteWFCommand::AddCommand(c)) => c, - _ => panic!("complete wf machine on_schedule must produce command"), + unexpected => { + dbg_panic!("complete wf machine on_schedule must produce command: {unexpected:?}"); + panic!("complete wf machine on_schedule must produce command"); + } }; NewMachineWithCommand { command: add_cmd, diff --git a/crates/sdk/Cargo.toml b/crates/sdk/Cargo.toml index 29a5d93e5..acab33f0d 100644 --- a/crates/sdk/Cargo.toml +++ b/crates/sdk/Cargo.toml @@ -42,5 +42,9 @@ version = "0.1" path = "../client" version = "0.1" +[features] +default = [] +antithesis_assertions = ["temporalio-sdk-core/antithesis_assertions"] + [lints] workspace = true From a4d4b9c35bd43ee22de1a0a7f0304e25780a9f3e Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Tue, 11 Nov 2025 15:38:10 -0800 Subject: [PATCH 2/9] add assert_sometimes --- crates/sdk-core/src/antithesis.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/crates/sdk-core/src/antithesis.rs b/crates/sdk-core/src/antithesis.rs index e6342784a..5052d3865 100644 --- a/crates/sdk-core/src/antithesis.rs +++ b/crates/sdk-core/src/antithesis.rs @@ -27,4 +27,19 @@ macro_rules! assert_always { }}; } +/// Assert that a condition is sometimes true during Antithesis fuzz testing. +/// This checks that the condition occurs at least once across the entire test session. +macro_rules! assert_sometimes { + ($condition:expr, $message:literal, $details:expr) => {{ + $crate::antithesis::ensure_init(); + let details: ::serde_json::Value = $details; + ::antithesis_sdk::assert_sometimes!($condition, $message, &details); + }}; + ($condition:expr, $message:literal) => {{ + $crate::antithesis::ensure_init(); + ::antithesis_sdk::assert_sometimes!($condition, $message); + }}; +} + pub(crate) use assert_always; +pub(crate) use assert_sometimes; From 808709ed72756fe12678988ec3e8bb110dda190c Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Tue, 11 Nov 2025 15:39:46 -0800 Subject: [PATCH 3/9] instrument retries --- crates/sdk-core/src/retry_logic.rs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/crates/sdk-core/src/retry_logic.rs b/crates/sdk-core/src/retry_logic.rs index b4eacbb4d..723ccd7c4 100644 --- a/crates/sdk-core/src/retry_logic.rs +++ b/crates/sdk-core/src/retry_logic.rs @@ -71,6 +71,15 @@ impl ValidatedRetryPolicy { application_failure: Option<&ApplicationFailureInfo>, ) -> Option { if self.maximum_attempts > 0 && attempt_number.get() >= self.maximum_attempts { + #[cfg(feature = "antithesis_assertions")] + crate::antithesis::assert_sometimes!( + true, + "Retry maximum_attempts limit reached", + ::serde_json::json!({ + "attempt": attempt_number.get(), + "maximum_attempts": self.maximum_attempts + }) + ); return None; } @@ -78,6 +87,15 @@ impl ValidatedRetryPolicy { .map(|f| f.non_retryable) .unwrap_or_default(); if non_retryable { + #[cfg(feature = "antithesis_assertions")] + crate::antithesis::assert_sometimes!( + true, + "Non-retryable application failure encountered", + ::serde_json::json!({ + "attempt": attempt_number.get(), + "error_type": application_failure.map(|f| &f.r#type) + }) + ); return None; } From f1092fb46323b8ad64eeb3365a3a5cf908be26f5 Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Tue, 11 Nov 2025 15:42:07 -0800 Subject: [PATCH 4/9] sometimes assertions for timers --- .../workflow/machines/timer_state_machine.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/crates/sdk-core/src/worker/workflow/machines/timer_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/timer_state_machine.rs index 32fd70702..3532a53cb 100644 --- a/crates/sdk-core/src/worker/workflow/machines/timer_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/timer_state_machine.rs @@ -192,6 +192,14 @@ impl StartCommandCreated { pub(super) fn on_cancel(self, dat: &mut SharedState) -> TimerMachineTransition { dat.cancelled_before_sent = true; + + #[cfg(feature = "antithesis_assertions")] + crate::antithesis::assert_sometimes!( + true, + "Timer cancelled before sent to server", + ::serde_json::json!({"timer_seq": dat.attrs.seq}) + ); + TransitionResult::default() } } @@ -219,6 +227,13 @@ impl StartCommandRecorded { self, dat: &mut SharedState, ) -> TimerMachineTransition { + #[cfg(feature = "antithesis_assertions")] + crate::antithesis::assert_sometimes!( + true, + "Timer cancelled after started", + ::serde_json::json!({"timer_seq": dat.attrs.seq}) + ); + TransitionResult::ok( vec![TimerMachineCommand::IssueCancelCmd( CancelTimer { seq: dat.attrs.seq }.into(), From 1cb3f2d39add0644012219d8c88cac9fdcf81565 Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Tue, 11 Nov 2025 15:44:54 -0800 Subject: [PATCH 5/9] add sometimes assertions to activities --- .../machines/activity_state_machine.rs | 119 ++++++++++++++++++ 1 file changed, 119 insertions(+) diff --git a/crates/sdk-core/src/worker/workflow/machines/activity_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/activity_state_machine.rs index a45fb3966..d78e2c046 100644 --- a/crates/sdk-core/src/worker/workflow/machines/activity_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/activity_state_machine.rs @@ -173,6 +173,16 @@ impl ActivityMachine { | ActivityMachineState::TimedOut(_) ) { // Ignore attempted cancels in terminal states + #[cfg(feature = "antithesis_assertions")] + crate::antithesis::assert_always!( + true, + "Activity cancel ignored in terminal state", + ::serde_json::json!({ + "seq": self.shared_state.attrs.seq, + "state": format!("{:?}", self.state()) + }) + ); + debug!( "Attempted to cancel already resolved activity (seq {})", self.shared_state.attrs.seq @@ -430,6 +440,16 @@ impl ScheduledEventRecorded { dat: &mut SharedState, attrs: ActivityTaskTimedOutEventAttributes, ) -> ActivityMachineTransition { + #[cfg(feature = "antithesis_assertions")] + crate::antithesis::assert_sometimes!( + true, + "Activity timed out before starting", + ::serde_json::json!({ + "activity_id": dat.attrs.activity_id, + "timeout_type": attrs.timeout_type, + "state": "ScheduledEventRecorded" + }) + ); notify_lang_activity_timed_out(dat, attrs) } @@ -437,12 +457,27 @@ impl ScheduledEventRecorded { self, dat: &mut SharedState, ) -> ActivityMachineTransition { + #[cfg(feature = "antithesis_assertions")] + crate::antithesis::assert_sometimes!( + true, + "Activity cancelled in scheduled state", + ::serde_json::json!({ + "activity_id": dat.attrs.activity_id, + "cancellation_type": format!("{:?}", dat.cancellation_type) + }) + ); create_request_cancel_activity_task_command( dat, ScheduledActivityCancelCommandCreated::default(), ) } pub(super) fn on_abandoned(self) -> ActivityMachineTransition { + #[cfg(feature = "antithesis_assertions")] + crate::antithesis::assert_sometimes!( + true, + "Activity abandoned", + ::serde_json::json!({"mode": "Abandon"}) + ); notify_lang_activity_cancelled(None) } } @@ -455,6 +490,14 @@ impl Started { self, attrs: ActivityTaskCompletedEventAttributes, ) -> ActivityMachineTransition { + #[cfg(feature = "antithesis_assertions")] + crate::antithesis::assert_sometimes!( + true, + "Activity completed successfully", + ::serde_json::json!({ + "has_result": attrs.result.is_some() + }) + ); ActivityMachineTransition::ok( vec![ActivityMachineCommand::Complete(attrs.result)], Completed::default(), @@ -465,6 +508,16 @@ impl Started { dat: &mut SharedState, attrs: ActivityTaskFailedEventAttributes, ) -> ActivityMachineTransition { + #[cfg(feature = "antithesis_assertions")] + crate::antithesis::assert_sometimes!( + true, + "Activity task failed", + ::serde_json::json!({ + "activity_id": dat.attrs.activity_id, + "activity_type": dat.attrs.activity_type, + "retry_state": attrs.retry_state + }) + ); ActivityMachineTransition::ok( vec![ActivityMachineCommand::Fail(new_failure(dat, attrs))], Failed::default(), @@ -476,6 +529,16 @@ impl Started { dat: &mut SharedState, attrs: ActivityTaskTimedOutEventAttributes, ) -> ActivityMachineTransition { + #[cfg(feature = "antithesis_assertions")] + crate::antithesis::assert_sometimes!( + true, + "Activity timed out after starting", + ::serde_json::json!({ + "activity_id": dat.attrs.activity_id, + "timeout_type": attrs.timeout_type, + "state": "Started" + }) + ); notify_lang_activity_timed_out(dat, attrs) } @@ -483,12 +546,27 @@ impl Started { self, dat: &mut SharedState, ) -> ActivityMachineTransition { + #[cfg(feature = "antithesis_assertions")] + crate::antithesis::assert_sometimes!( + true, + "Activity cancelled after started", + ::serde_json::json!({ + "activity_id": dat.attrs.activity_id, + "cancellation_type": format!("{:?}", dat.cancellation_type) + }) + ); create_request_cancel_activity_task_command( dat, StartedActivityCancelCommandCreated::default(), ) } pub(super) fn on_abandoned(self) -> ActivityMachineTransition { + #[cfg(feature = "antithesis_assertions")] + crate::antithesis::assert_sometimes!( + true, + "Activity abandoned in started state", + ::serde_json::json!({"state": "Started"}) + ); notify_lang_activity_cancelled(None) } } @@ -505,6 +583,14 @@ impl ScheduledActivityCancelEventRecorded { dat: &mut SharedState, attrs: ActivityTaskCanceledEventAttributes, ) -> ActivityMachineTransition { + #[cfg(feature = "antithesis_assertions")] + crate::antithesis::assert_sometimes!( + true, + "Activity cancellation completed", + ::serde_json::json!({ + "has_details": attrs.details.is_some() + }) + ); notify_if_not_already_cancelled(dat, |_| notify_lang_activity_cancelled(Some(attrs))) } @@ -535,6 +621,14 @@ impl StartedActivityCancelEventRecorded { dat: &mut SharedState, attrs: ActivityTaskCompletedEventAttributes, ) -> ActivityMachineTransition { + #[cfg(feature = "antithesis_assertions")] + crate::antithesis::assert_sometimes!( + true, + "Activity completed despite cancel request", + ::serde_json::json!({ + "cancellation_type": format!("{:?}", dat.cancellation_type) + }) + ); notify_if_not_already_cancelled(dat, |_| { TransitionResult::commands(vec![ActivityMachineCommand::Complete(attrs.result)]) }) @@ -544,6 +638,14 @@ impl StartedActivityCancelEventRecorded { dat: &mut SharedState, attrs: ActivityTaskFailedEventAttributes, ) -> ActivityMachineTransition { + #[cfg(feature = "antithesis_assertions")] + crate::antithesis::assert_sometimes!( + true, + "Activity failed despite cancel request", + ::serde_json::json!({ + "cancellation_type": format!("{:?}", dat.cancellation_type) + }) + ); notify_if_not_already_cancelled(dat, |dat| { TransitionResult::commands(vec![ActivityMachineCommand::Fail(new_failure(dat, attrs))]) }) @@ -650,6 +752,15 @@ impl Canceled { ) -> ActivityMachineTransition { // Abandoned activities might start anyway. Ignore the result. if dat.cancellation_type == ActivityCancellationType::Abandon { + #[cfg(feature = "antithesis_assertions")] + crate::antithesis::assert_always!( + true, + "Abandoned activity can start after cancellation", + ::serde_json::json!({ + "seq_num": seq_num, + "cancellation_type": "Abandon" + }) + ); TransitionResult::default() } else { TransitionResult::Err(WFMachinesError::Nondeterminism(format!( @@ -665,6 +776,14 @@ impl Canceled { ) -> ActivityMachineTransition { // Abandoned activities might complete anyway. Ignore the result. if dat.cancellation_type == ActivityCancellationType::Abandon { + #[cfg(feature = "antithesis_assertions")] + crate::antithesis::assert_always!( + true, + "Abandoned activity can complete after cancellation", + ::serde_json::json!({ + "activity_id": dat.attrs.activity_id + }) + ); TransitionResult::default() } else { TransitionResult::Err(WFMachinesError::Nondeterminism(format!( From 4b82b30f4f09c1cf91c24d7eb9605c1f92150319 Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Tue, 11 Nov 2025 15:46:20 -0800 Subject: [PATCH 6/9] add assert_unreachable --- crates/sdk-core/src/antithesis.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/crates/sdk-core/src/antithesis.rs b/crates/sdk-core/src/antithesis.rs index 5052d3865..750dd10b0 100644 --- a/crates/sdk-core/src/antithesis.rs +++ b/crates/sdk-core/src/antithesis.rs @@ -41,5 +41,20 @@ macro_rules! assert_sometimes { }}; } +/// Assert that a code location is unreachable during Antithesis fuzz testing. +/// Use this for code paths that should never be reached (bugs, invariant violations). +macro_rules! assert_unreachable { + ($message:literal, $details:expr) => {{ + $crate::antithesis::ensure_init(); + let details: ::serde_json::Value = $details; + ::antithesis_sdk::assert_unreachable!($message, &details); + }}; + ($message:literal) => {{ + $crate::antithesis::ensure_init(); + ::antithesis_sdk::assert_unreachable!($message); + }}; +} + pub(crate) use assert_always; pub(crate) use assert_sometimes; +pub(crate) use assert_unreachable; From 53651a231f28f2e3e91ea950a419a822031ace8e Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Tue, 11 Nov 2025 16:26:03 -0800 Subject: [PATCH 7/9] nondeterminism! and fatal! --- .../machines/activity_state_machine.rs | 50 ++++++------ .../machines/cancel_external_state_machine.rs | 10 +-- .../machines/cancel_nexus_op_state_machine.rs | 6 +- .../machines/cancel_workflow_state_machine.rs | 6 +- .../machines/child_workflow_state_machine.rs | 36 ++++----- .../complete_workflow_state_machine.rs | 6 +- .../continue_as_new_workflow_state_machine.rs | 6 +- .../machines/fail_workflow_state_machine.rs | 6 +- .../machines/local_activity_state_machine.rs | 44 ++++++----- .../src/worker/workflow/machines/mod.rs | 17 ++-- .../machines/nexus_operation_state_machine.rs | 24 +++--- .../workflow/machines/patch_state_machine.rs | 12 +-- .../machines/signal_external_state_machine.rs | 12 +-- .../workflow/machines/timer_state_machine.rs | 14 ++-- .../workflow/machines/update_state_machine.rs | 20 +++-- .../workflow/machines/workflow_machines.rs | 76 +++++++++--------- .../machines/workflow_task_state_machine.rs | 19 +++-- .../src/worker/workflow/managed_run.rs | 4 +- crates/sdk-core/src/worker/workflow/mod.rs | 78 ++++++++++++++++++- 19 files changed, 265 insertions(+), 181 deletions(-) diff --git a/crates/sdk-core/src/worker/workflow/machines/activity_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/activity_state_machine.rs index d78e2c046..3d549b406 100644 --- a/crates/sdk-core/src/worker/workflow/machines/activity_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/activity_state_machine.rs @@ -7,7 +7,7 @@ use super::{ use crate::{ abstractions::dbg_panic, internal_flags::CoreInternalFlags, - worker::workflow::{InternalFlagsRef, machines::HistEventData}, + worker::workflow::{InternalFlagsRef, machines::HistEventData, fatal, nondeterminism}, }; use std::convert::{TryFrom, TryInto}; use temporalio_common::protos::{ @@ -179,7 +179,7 @@ impl ActivityMachine { "Activity cancel ignored in terminal state", ::serde_json::json!({ "seq": self.shared_state.attrs.seq, - "state": format!("{:?}", self.state()) + "state": format!("{}", self.state()) }) ); @@ -236,9 +236,9 @@ impl TryFrom for ActivityMachineEvents { last_task_in_history, }) } else { - return Err(WFMachinesError::Fatal(format!( + return Err(fatal!( "Activity scheduled attributes were unset: {e}" - ))); + )); } } EventType::ActivityTaskStarted => Self::ActivityTaskStarted(e.event_id), @@ -249,9 +249,9 @@ impl TryFrom for ActivityMachineEvents { { Self::ActivityTaskCompleted(attrs) } else { - return Err(WFMachinesError::Fatal(format!( + return Err(fatal!( "Activity completion attributes were unset: {e}" - ))); + )); } } EventType::ActivityTaskFailed => { @@ -260,9 +260,9 @@ impl TryFrom for ActivityMachineEvents { { Self::ActivityTaskFailed(attrs) } else { - return Err(WFMachinesError::Fatal(format!( + return Err(fatal!( "Activity failure attributes were unset: {e}" - ))); + )); } } EventType::ActivityTaskTimedOut => { @@ -271,9 +271,9 @@ impl TryFrom for ActivityMachineEvents { { Self::ActivityTaskTimedOut(attrs) } else { - return Err(WFMachinesError::Fatal(format!( + return Err(fatal!( "Activity timeout attributes were unset: {e}" - ))); + )); } } EventType::ActivityTaskCancelRequested => Self::ActivityTaskCancelRequested, @@ -283,15 +283,15 @@ impl TryFrom for ActivityMachineEvents { { Self::ActivityTaskCanceled(attrs) } else { - return Err(WFMachinesError::Fatal(format!( + return Err(fatal!( "Activity cancellation attributes were unset: {e}" - ))); + )); } } _ => { - return Err(WFMachinesError::Nondeterminism(format!( + return Err(nondeterminism!( "Activity machine does not handle this event: {e}" - ))); + )); } }) } @@ -388,18 +388,18 @@ impl ScheduleCommandCreated { sched_dat.last_task_in_history, ) { if sched_dat.act_id != dat.attrs.activity_id { - return TransitionResult::Err(WFMachinesError::Nondeterminism(format!( + return TransitionResult::Err(nondeterminism!( "Activity id of scheduled event '{}' does not \ match activity id of activity command '{}'", sched_dat.act_id, dat.attrs.activity_id - ))); + )); } if sched_dat.act_type != dat.attrs.activity_type { - return TransitionResult::Err(WFMachinesError::Nondeterminism(format!( + return TransitionResult::Err(nondeterminism!( "Activity type of scheduled event '{}' does not \ match activity type of activity command '{}'", sched_dat.act_type, dat.attrs.activity_type - ))); + )); } } dat.scheduled_event_id = sched_dat.event_id; @@ -446,7 +446,6 @@ impl ScheduledEventRecorded { "Activity timed out before starting", ::serde_json::json!({ "activity_id": dat.attrs.activity_id, - "timeout_type": attrs.timeout_type, "state": "ScheduledEventRecorded" }) ); @@ -535,7 +534,6 @@ impl Started { "Activity timed out after starting", ::serde_json::json!({ "activity_id": dat.attrs.activity_id, - "timeout_type": attrs.timeout_type, "state": "Started" }) ); @@ -763,10 +761,10 @@ impl Canceled { ); TransitionResult::default() } else { - TransitionResult::Err(WFMachinesError::Nondeterminism(format!( + TransitionResult::Err(nondeterminism!( "Non-Abandon cancel mode activities cannot be started after being cancelled. \ Seq: {seq_num:?}" - ))) + )) } } pub(super) fn on_activity_task_completed( @@ -786,9 +784,9 @@ impl Canceled { ); TransitionResult::default() } else { - TransitionResult::Err(WFMachinesError::Nondeterminism(format!( + TransitionResult::Err(nondeterminism!( "Non-Abandon cancel mode activities cannot be completed after being cancelled: {attrs:?}" - ))) + )) } } } @@ -914,9 +912,9 @@ fn convert_payloads( result: Option, ) -> Result, WFMachinesError> { result.map(TryInto::try_into).transpose().map_err(|pe| { - WFMachinesError::Fatal(format!( + fatal!( "Not exactly one payload in activity result ({pe}) for event: {event_info:?}" - )) + ) }) } diff --git a/crates/sdk-core/src/worker/workflow/machines/cancel_external_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/cancel_external_state_machine.rs index 4e308e4a4..518d81521 100644 --- a/crates/sdk-core/src/worker/workflow/machines/cancel_external_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/cancel_external_state_machine.rs @@ -2,7 +2,7 @@ use super::{ EventInfo, NewMachineWithCommand, OnEventWrapper, StateMachine, TransitionResult, WFMachinesAdapter, WFMachinesError, fsm, workflow_machines::MachineResponse, }; -use crate::worker::workflow::machines::HistEventData; +use crate::worker::workflow::{machines::HistEventData, fatal, nondeterminism}; use std::convert::TryFrom; use temporalio_common::protos::{ coresdk::{ @@ -153,15 +153,15 @@ impl TryFrom for CancelExternalMachineEvents { if let Some(history_event::Attributes::RequestCancelExternalWorkflowExecutionFailedEventAttributes(attrs)) = e.attributes { Self::RequestCancelExternalWorkflowExecutionFailed(attrs.cause()) } else { - return Err(WFMachinesError::Fatal(format!( + return Err(fatal!( "Cancelworkflow failed attributes were unset: {e}" - ))); + )); } } _ => { - return Err(WFMachinesError::Nondeterminism(format!( + return Err(nondeterminism!( "Cancel external WF machine does not handle this event: {e}" - ))) + )) } }) } diff --git a/crates/sdk-core/src/worker/workflow/machines/cancel_nexus_op_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/cancel_nexus_op_state_machine.rs index 898965b78..502030bbe 100644 --- a/crates/sdk-core/src/worker/workflow/machines/cancel_nexus_op_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/cancel_nexus_op_state_machine.rs @@ -3,7 +3,7 @@ use super::{ workflow_machines::MachineResponse, }; use super::{StateMachine, TransitionResult, fsm}; -use crate::worker::workflow::machines::HistEventData; +use crate::worker::workflow::{machines::HistEventData, nondeterminism, fatal}; use std::convert::TryFrom; use temporalio_common::protos::{ coresdk::workflow_activation::ResolveCancelNexusOperation, @@ -76,9 +76,9 @@ impl TryFrom for CancelNexusOpMachineEvents { Ok(match e.event_type() { EventType::NexusOperationCancelRequested => Self::NexusOpCancelRequested, _ => { - return Err(WFMachinesError::Nondeterminism(format!( + return Err(nondeterminism!( "Cancel external WF machine does not handle this event: {e}" - ))); + )); } }) } diff --git a/crates/sdk-core/src/worker/workflow/machines/cancel_workflow_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/cancel_workflow_state_machine.rs index 5de44b996..1edbdef22 100644 --- a/crates/sdk-core/src/worker/workflow/machines/cancel_workflow_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/cancel_workflow_state_machine.rs @@ -2,7 +2,7 @@ use super::{ EventInfo, NewMachineWithCommand, OnEventWrapper, StateMachine, TransitionResult, WFMachinesAdapter, WFMachinesError, fsm, workflow_machines::MachineResponse, }; -use crate::worker::workflow::machines::HistEventData; +use crate::worker::workflow::{machines::HistEventData, nondeterminism, fatal}; use std::convert::TryFrom; use temporalio_common::protos::{ coresdk::workflow_commands::CancelWorkflowExecution, @@ -67,9 +67,9 @@ impl TryFrom for CancelWorkflowMachineEvents { Ok(match EventType::try_from(e.event_type) { Ok(EventType::WorkflowExecutionCanceled) => Self::WorkflowExecutionCanceled, _ => { - return Err(WFMachinesError::Nondeterminism(format!( + return Err(nondeterminism!( "Cancel workflow machine does not handle this event: {e}" - ))); + )); } }) } diff --git a/crates/sdk-core/src/worker/workflow/machines/child_workflow_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/child_workflow_state_machine.rs index 69b499882..f27022e2c 100644 --- a/crates/sdk-core/src/worker/workflow/machines/child_workflow_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/child_workflow_state_machine.rs @@ -5,7 +5,7 @@ use super::{ use crate::{ abstractions::dbg_panic, internal_flags::CoreInternalFlags, - worker::workflow::{InternalFlagsRef, machines::HistEventData}, + worker::workflow::{InternalFlagsRef, machines::HistEventData, fatal, nondeterminism}, }; use itertools::Itertools; use std::{ @@ -134,7 +134,7 @@ pub(super) struct Cancelled { } fn completion_of_not_abandoned_err() -> WFMachinesError { - WFMachinesError::Nondeterminism( + nondeterminism!("{}", "Child workflows which don't have the ABANDON cancellation type cannot complete after \ being cancelled." .to_string(), @@ -146,7 +146,7 @@ impl Cancelled { self, ) -> ChildWorkflowMachineTransition { if self.seen_cancelled_event { - ChildWorkflowMachineTransition::Err(WFMachinesError::Fatal( + ChildWorkflowMachineTransition::Err(fatal!("{}", "Child workflow has already seen a ChildWorkflowExecutionCanceledEvent, and now \ another is being applied! This is a bug, please report." .to_string(), @@ -234,18 +234,18 @@ impl StartCommandCreated { event_dat.last_task_in_history, ) { if event_dat.wf_id != state.workflow_id { - return TransitionResult::Err(WFMachinesError::Nondeterminism(format!( + return TransitionResult::Err(nondeterminism!( "Child workflow id of scheduled event '{}' does not \ match child workflow id of command '{}'", event_dat.wf_id, state.workflow_id - ))); + )); } if event_dat.wf_type != state.workflow_type { - return TransitionResult::Err(WFMachinesError::Nondeterminism(format!( + return TransitionResult::Err(nondeterminism!( "Child workflow type of scheduled event '{}' does not \ match child workflow type of command '{}'", event_dat.wf_type, state.workflow_type - ))); + )); } } state.initiated_event_id = event_dat.event_id; @@ -533,7 +533,7 @@ impl TryFrom for ChildWorkflowMachineEvents { last_task_in_history, }) } else { - return Err(WFMachinesError::Fatal( + return Err(fatal!("{}", "StartChildWorkflowExecutionInitiated attributes were unset".to_string(), )); } @@ -547,13 +547,13 @@ impl TryFrom for ChildWorkflowMachineEvents { { Self::StartChildWorkflowExecutionFailed( StartChildWorkflowExecutionFailedCause::try_from(cause).map_err(|_| { - WFMachinesError::Fatal( + fatal!("{}", "Invalid StartChildWorkflowExecutionFailedCause".to_string(), ) })?, ) } else { - return Err(WFMachinesError::Fatal( + return Err(fatal!("{}", "StartChildWorkflowExecutionFailed attributes were unset".to_string(), )); } @@ -573,7 +573,7 @@ impl TryFrom for ChildWorkflowMachineEvents { started_event_id: e.event_id, }) } else { - return Err(WFMachinesError::Fatal( + return Err(fatal!("{}", "ChildWorkflowExecutionStarted attributes were unset or malformed" .to_string(), )); @@ -588,7 +588,7 @@ impl TryFrom for ChildWorkflowMachineEvents { { Self::ChildWorkflowExecutionCompleted(result) } else { - return Err(WFMachinesError::Fatal( + return Err(fatal!("{}", "ChildWorkflowExecutionCompleted attributes were unset or malformed" .to_string(), )); @@ -601,7 +601,7 @@ impl TryFrom for ChildWorkflowMachineEvents { { Self::ChildWorkflowExecutionFailed(attrs) } else { - return Err(WFMachinesError::Fatal( + return Err(fatal!("{}", "ChildWorkflowExecutionFailed attributes were unset".to_string(), )); } @@ -613,7 +613,7 @@ impl TryFrom for ChildWorkflowMachineEvents { { Self::ChildWorkflowExecutionTimedOut(atts.retry_state()) } else { - return Err(WFMachinesError::Fatal( + return Err(fatal!("{}", "ChildWorkflowExecutionTimedOut attributes were unset or malformed" .to_string(), )); @@ -624,9 +624,9 @@ impl TryFrom for ChildWorkflowMachineEvents { } Ok(EventType::ChildWorkflowExecutionCanceled) => Self::ChildWorkflowExecutionCancelled, _ => { - return Err(WFMachinesError::Nondeterminism(format!( + return Err(nondeterminism!( "Child workflow machine does not handle this event: {e:?}" - ))); + )) } }) } @@ -773,9 +773,9 @@ fn convert_payloads( result: Option, ) -> Result, WFMachinesError> { result.map(TryInto::try_into).transpose().map_err(|pe| { - WFMachinesError::Fatal(format!( + fatal!( "Not exactly one payload in child workflow result ({pe}) for event: {event_info:?}" - )) + ) }) } diff --git a/crates/sdk-core/src/worker/workflow/machines/complete_workflow_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/complete_workflow_state_machine.rs index d70b16011..b118661e3 100644 --- a/crates/sdk-core/src/worker/workflow/machines/complete_workflow_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/complete_workflow_state_machine.rs @@ -2,7 +2,7 @@ use super::{ EventInfo, NewMachineWithCommand, OnEventWrapper, StateMachine, TransitionResult, WFMachinesAdapter, WFMachinesError, fsm, workflow_machines::MachineResponse, }; -use crate::{abstractions::dbg_panic, worker::workflow::machines::HistEventData}; +use crate::{abstractions::dbg_panic, worker::workflow::{machines::HistEventData, nondeterminism, fatal}}; use std::convert::TryFrom; use temporalio_common::protos::{ coresdk::workflow_commands::CompleteWorkflowExecution, @@ -60,9 +60,9 @@ impl TryFrom for CompleteWorkflowMachineEvents { Ok(match e.event_type() { EventType::WorkflowExecutionCompleted => Self::WorkflowExecutionCompleted, _ => { - return Err(WFMachinesError::Nondeterminism(format!( + return Err(nondeterminism!( "Complete workflow machine does not handle this event: {e}" - ))); + )); } }) } diff --git a/crates/sdk-core/src/worker/workflow/machines/continue_as_new_workflow_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/continue_as_new_workflow_state_machine.rs index becf3b19e..f407b9b44 100644 --- a/crates/sdk-core/src/worker/workflow/machines/continue_as_new_workflow_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/continue_as_new_workflow_state_machine.rs @@ -2,7 +2,7 @@ use super::{ EventInfo, MachineResponse, NewMachineWithCommand, OnEventWrapper, StateMachine, TransitionResult, WFMachinesAdapter, WFMachinesError, fsm, }; -use crate::worker::workflow::machines::HistEventData; +use crate::worker::workflow::{machines::HistEventData, nondeterminism, fatal}; use std::convert::TryFrom; use temporalio_common::protos::{ coresdk::workflow_commands::ContinueAsNewWorkflowExecution, @@ -73,9 +73,9 @@ impl TryFrom for ContinueAsNewWorkflowMachineEvents { Ok(match e.event_type() { EventType::WorkflowExecutionContinuedAsNew => Self::WorkflowExecutionContinuedAsNew, _ => { - return Err(WFMachinesError::Nondeterminism(format!( + return Err(nondeterminism!( "Continue as new workflow machine does not handle this event: {e}" - ))); + )); } }) } diff --git a/crates/sdk-core/src/worker/workflow/machines/fail_workflow_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/fail_workflow_state_machine.rs index b61009eb2..66be30214 100644 --- a/crates/sdk-core/src/worker/workflow/machines/fail_workflow_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/fail_workflow_state_machine.rs @@ -2,7 +2,7 @@ use super::{ EventInfo, NewMachineWithCommand, OnEventWrapper, StateMachine, TransitionResult, WFMachinesAdapter, WFMachinesError, fsm, workflow_machines::MachineResponse, }; -use crate::worker::workflow::machines::HistEventData; +use crate::worker::workflow::{machines::HistEventData, nondeterminism, fatal}; use std::convert::TryFrom; use temporalio_common::protos::{ coresdk::workflow_commands::FailWorkflowExecution, @@ -84,9 +84,9 @@ impl TryFrom for FailWorkflowMachineEvents { Ok(match e.event_type() { EventType::WorkflowExecutionFailed => Self::WorkflowExecutionFailed, _ => { - return Err(WFMachinesError::Nondeterminism(format!( + return Err(nondeterminism!( "Fail workflow machine does not handle this event: {e}" - ))); + )); } }) } diff --git a/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs index ef8873533..04402cdee 100644 --- a/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs @@ -10,6 +10,8 @@ use crate::{ workflow::{ InternalFlagsRef, machines::{HistEventData, activity_state_machine::activity_fail_info}, + fatal, + nondeterminism, }, }, }; @@ -162,7 +164,7 @@ pub(super) fn new_local_activity( } } else { if maybe_pre_resolved.is_some() { - return Err(WFMachinesError::Nondeterminism( + return Err(nondeterminism!("{}", "Local activity cannot be created as pre-resolved while not replaying".to_string(), )); } @@ -210,10 +212,10 @@ impl LocalActivityMachine { LocalActivityMachineState::ResultNotified(_) => Ok(false), LocalActivityMachineState::WaitingMarkerEvent(_) => Ok(true), LocalActivityMachineState::WaitingMarkerEventPreResolved(_) => Ok(true), - _ => Err(WFMachinesError::Fatal(format!( + _ => Err(fatal!( "Attempted to check for LA marker handling in invalid state {}", self.state() - ))), + )), } } @@ -241,12 +243,12 @@ impl LocalActivityMachine { let mut res = OnEventWrapper::on_event_mut(self, LocalActivityMachineEvents::StartedNonReplayWFT) .map_err(|e| match e { - MachineError::InvalidTransition => WFMachinesError::Fatal(format!( + MachineError::InvalidTransition => fatal!( "Invalid transition while notifying local activity (seq {})\ of non-replay-wft-started in {}", self.shared_state.attrs.seq, self.state(), - )), + ), MachineError::Underlying(e) => e, })?; let res = res.pop().expect("Always produces one response"); @@ -295,12 +297,12 @@ impl LocalActivityMachine { LocalActivityMachineEvents::HandleResult(dat) }; let res = OnEventWrapper::on_event_mut(self, evt).map_err(|e| match e { - MachineError::InvalidTransition => WFMachinesError::Fatal(format!( + MachineError::InvalidTransition => fatal!( "Invalid transition resolving local activity (seq {}, from marker: {}) in {}", self.shared_state.attrs.seq, from_marker, self.state(), - )), + ), MachineError::Underlying(e) => e, })?; @@ -542,16 +544,16 @@ impl ResultNotified { dat: CompleteLocalActivityData, ) -> LocalActivityMachineTransition { if self.result_type == ResultType::Completed && dat.result.is_err() { - return TransitionResult::Err(WFMachinesError::Nondeterminism(format!( + return TransitionResult::Err(nondeterminism!( "Local activity (seq {}) completed successfully locally, but history said \ it failed!", shared.attrs.seq - ))); + )); } else if self.result_type == ResultType::Failed && dat.result.is_ok() { - return TransitionResult::Err(WFMachinesError::Nondeterminism(format!( + return TransitionResult::Err(nondeterminism!( "Local activity (seq {}) failed locally, but history said it completed!", shared.attrs.seq - ))); + )); } verify_marker_dat!(shared, &dat, TransitionResult::default()) } @@ -811,15 +813,15 @@ impl TryFrom for LocalActivityMachineEvents { fn try_from(e: HistEventData) -> Result { let e = e.event; if e.event_type() != EventType::MarkerRecorded { - return Err(WFMachinesError::Nondeterminism(format!( + return Err(nondeterminism!( "Local activity machine cannot handle this event: {e}" - ))); + )); } match e.into_local_activity_marker_details() { Some(marker_dat) => Ok(LocalActivityMachineEvents::MarkerRecorded(marker_dat)), - _ => Err(WFMachinesError::Nondeterminism( - "Local activity machine encountered an unparsable marker".to_string(), + _ => Err(nondeterminism!( + "Local activity machine encountered an unparsable marker" )), } } @@ -830,11 +832,11 @@ fn verify_marker_data_matches( dat: &CompleteLocalActivityData, ) -> Result<(), WFMachinesError> { if shared.attrs.seq != dat.marker_dat.seq { - return Err(WFMachinesError::Nondeterminism(format!( + return Err(nondeterminism!( "Local activity marker data has sequence number {} but matched against LA \ command with sequence number {}", dat.marker_dat.seq, shared.attrs.seq - ))); + )); } // Here we use whether or not we were replaying when we _first invoked_ the LA, because we // are always replaying when we see the marker recorded event, and that would make this check @@ -844,18 +846,18 @@ fn verify_marker_data_matches( !shared.replaying_when_invoked, ) { if dat.marker_dat.activity_id != shared.attrs.activity_id { - return Err(WFMachinesError::Nondeterminism(format!( + return Err(nondeterminism!( "Activity id of recorded marker '{}' does not \ match activity id of local activity command '{}'", dat.marker_dat.activity_id, shared.attrs.activity_id - ))); + )); } if dat.marker_dat.activity_type != shared.attrs.activity_type { - return Err(WFMachinesError::Nondeterminism(format!( + return Err(nondeterminism!( "Activity type of recorded marker '{}' does not \ match activity type of local activity command '{}'", dat.marker_dat.activity_type, shared.attrs.activity_type - ))); + )); } } diff --git a/crates/sdk-core/src/worker/workflow/machines/mod.rs b/crates/sdk-core/src/worker/workflow/machines/mod.rs index d46f5b3d9..16be6d4ef 100644 --- a/crates/sdk-core/src/worker/workflow/machines/mod.rs +++ b/crates/sdk-core/src/worker/workflow/machines/mod.rs @@ -26,7 +26,10 @@ mod transition_coverage; pub(crate) use temporalio_common::fsm_trait::MachineError; pub(crate) use workflow_machines::{MachinesWFTResponseContent, WorkflowMachines}; -use crate::{telemetry::VecDisplayer, worker::workflow::WFMachinesError}; +use crate::{ + telemetry::VecDisplayer, + worker::workflow::{WFMachinesError, fatal, nondeterminism}, +}; use activity_state_machine::ActivityMachine; use cancel_external_state_machine::CancelExternalMachine; use cancel_workflow_state_machine::CancelWorkflowMachine; @@ -128,20 +131,20 @@ where match OnEventWrapper::on_event_mut(self, converted_command) { Ok(c) => process_machine_commands(self, c, None), Err(MachineError::InvalidTransition) => { - Err(WFMachinesError::Nondeterminism(format!( + Err(nondeterminism!( "Unexpected command producing an invalid transition {:?} in state {}", command_type, self.state() - ))) + )) } Err(MachineError::Underlying(e)) => Err(e.into()), } } else { - Err(WFMachinesError::Nondeterminism(format!( + Err(nondeterminism!( "Unexpected command {:?} generated by a {:?} machine", command_type, self.name() - ))) + )) } } @@ -163,12 +166,12 @@ where match OnEventWrapper::on_event_mut(self, converted_event) { Ok(c) => process_machine_commands(self, c, Some(event_info)), - Err(MachineError::InvalidTransition) => Err(WFMachinesError::Fatal(format!( + Err(MachineError::InvalidTransition) => Err(fatal!( "{} in state {} says the transition is invalid during event {:?}", self.name(), self.state(), event_info - ))), + )), Err(MachineError::Underlying(e)) => Err(e.into()), } } diff --git a/crates/sdk-core/src/worker/workflow/machines/nexus_operation_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/nexus_operation_state_machine.rs index e15cbced8..3414526df 100644 --- a/crates/sdk-core/src/worker/workflow/machines/nexus_operation_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/nexus_operation_state_machine.rs @@ -5,6 +5,8 @@ use crate::worker::workflow::{ EventInfo, HistEventData, NewMachineWithCommand, OnEventWrapper, WFMachinesAdapter, workflow_machines::MachineResponse, }, + fatal, + nondeterminism, }; use itertools::Itertools; use temporalio_common::protos::{ @@ -392,7 +394,7 @@ pub(super) struct TimedOut; pub(super) struct Cancelled; fn completion_of_not_abandoned_err() -> WFMachinesError { - WFMachinesError::Nondeterminism( + nondeterminism!("{}", "Nexus operation which don't have the ABANDON cancellation type cannot complete after \ being cancelled." .to_string(), @@ -455,7 +457,7 @@ impl TryFrom for NexusOperationMachineEvents { event_id: e.event_id, }) } else { - return Err(WFMachinesError::Nondeterminism( + return Err(nondeterminism!("{}", "NexusOperationScheduled attributes were unset or malformed".to_string(), )); } @@ -466,7 +468,7 @@ impl TryFrom for NexusOperationMachineEvents { { Self::NexusOperationStarted(sa) } else { - return Err(WFMachinesError::Nondeterminism( + return Err(nondeterminism!("{}", "NexusOperationStarted attributes were unset or malformed".to_string(), )); } @@ -477,7 +479,7 @@ impl TryFrom for NexusOperationMachineEvents { { Self::NexusOperationCompleted(ca) } else { - return Err(WFMachinesError::Nondeterminism( + return Err(nondeterminism!("{}", "NexusOperationCompleted attributes were unset or malformed".to_string(), )); } @@ -488,7 +490,7 @@ impl TryFrom for NexusOperationMachineEvents { { Self::NexusOperationFailed(fa) } else { - return Err(WFMachinesError::Nondeterminism( + return Err(nondeterminism!("{}", "NexusOperationFailed attributes were unset or malformed".to_string(), )); } @@ -499,7 +501,7 @@ impl TryFrom for NexusOperationMachineEvents { { Self::NexusOperationCanceled(ca) } else { - return Err(WFMachinesError::Nondeterminism( + return Err(nondeterminism!("{}", "NexusOperationCanceled attributes were unset or malformed".to_string(), )); } @@ -510,7 +512,7 @@ impl TryFrom for NexusOperationMachineEvents { { Self::NexusOperationTimedOut(toa) } else { - return Err(WFMachinesError::Nondeterminism( + return Err(nondeterminism!("{}", "NexusOperationTimedOut attributes were unset or malformed".to_string(), )); } @@ -525,7 +527,7 @@ impl TryFrom for NexusOperationMachineEvents { { Self::NexusOperationCancelRequestCompleted(attrs) } else { - return Err(WFMachinesError::Nondeterminism( + return Err(nondeterminism!("{}", "NexusOperationCancelRequestCompleted attributes were unset or malformed" .to_string(), )); @@ -540,16 +542,16 @@ impl TryFrom for NexusOperationMachineEvents { { Self::NexusOperationCancelRequestFailed(attrs) } else { - return Err(WFMachinesError::Nondeterminism( + return Err(nondeterminism!("{}", "NexusOperationCancelRequestFailed attributes were unset or malformed" .to_string(), )); } } _ => { - return Err(WFMachinesError::Nondeterminism(format!( + return Err(nondeterminism!( "Nexus operation machine does not handle this event: {e:?}" - ))); + )) } }) } diff --git a/crates/sdk-core/src/worker/workflow/machines/patch_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/patch_state_machine.rs index 94105f7a3..b149c7bb3 100644 --- a/crates/sdk-core/src/worker/workflow/machines/patch_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/patch_state_machine.rs @@ -29,6 +29,8 @@ use crate::{ machines::{ HistEventData, upsert_search_attributes_state_machine::MAX_SEARCH_ATTR_PAYLOAD_SIZE, }, + fatal, + nondeterminism, }, }; use anyhow::Context; @@ -129,7 +131,7 @@ pub(super) fn has_change<'a>( let mut serialized = all_ids .as_json_payload() .context("Could not serialize search attribute value for patch machine") - .map_err(|e| WFMachinesError::Fatal(e.to_string()))?; + .map_err(|e| fatal!("{}", e.to_string()))?; if serialized.data.len() >= MAX_SEARCH_ATTR_PAYLOAD_SIZE { warn!( @@ -219,10 +221,10 @@ impl Notified { id: String, ) -> PatchMachineTransition { if id != dat.patch_id { - return TransitionResult::Err(WFMachinesError::Nondeterminism(format!( + return TransitionResult::Err(nondeterminism!( "Change id {} does not match expected id {}", id, dat.patch_id - ))); + )); } TransitionResult::default() } @@ -256,9 +258,9 @@ impl TryFrom for PatchMachineEvents { let e = e.event; match e.get_patch_marker_details() { Some((id, _)) => Ok(Self::MarkerRecorded(id)), - _ => Err(WFMachinesError::Nondeterminism(format!( + _ => Err(nondeterminism!( "Change machine cannot handle this event: {e}" - ))), + )), } } } diff --git a/crates/sdk-core/src/worker/workflow/machines/signal_external_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/signal_external_state_machine.rs index db128d5ed..57271deff 100644 --- a/crates/sdk-core/src/worker/workflow/machines/signal_external_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/signal_external_state_machine.rs @@ -2,7 +2,7 @@ use super::{ EventInfo, MachineError, NewMachineWithCommand, OnEventWrapper, StateMachine, TransitionResult, WFMachinesAdapter, WFMachinesError, fsm, workflow_machines::MachineResponse, }; -use crate::worker::workflow::machines::HistEventData; +use crate::worker::workflow::{machines::HistEventData, fatal, nondeterminism}; use std::convert::TryFrom; use temporalio_common::protos::{ coresdk::{ @@ -70,7 +70,7 @@ pub(super) fn new_external_signal( ) -> Result { let (workflow_execution, only_child) = match attrs.target { None => { - return Err(WFMachinesError::Fatal( + return Err(fatal!("{}", "Signal external workflow command had empty target field".to_string(), )); } @@ -195,15 +195,15 @@ impl TryFrom for SignalExternalMachineEvents { { Self::SignalExternalWorkflowExecutionFailed(attrs.cause()) } else { - return Err(WFMachinesError::Fatal(format!( + return Err(fatal!( "Signal workflow failed attributes were unset: {e}" - ))); + )); } } _ => { - return Err(WFMachinesError::Nondeterminism(format!( + return Err(nondeterminism!( "Signal external WF machine does not handle this event: {e}" - ))); + )) } }) } diff --git a/crates/sdk-core/src/worker/workflow/machines/timer_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/timer_state_machine.rs index 3532a53cb..0c0d2d85d 100644 --- a/crates/sdk-core/src/worker/workflow/machines/timer_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/timer_state_machine.rs @@ -4,7 +4,7 @@ use super::{ EventInfo, MachineError, NewMachineWithCommand, OnEventWrapper, StateMachine, TransitionResult, WFMachinesAdapter, fsm, workflow_machines::MachineResponse, }; -use crate::worker::workflow::{WFMachinesError, machines::HistEventData}; +use crate::worker::workflow::{WFMachinesError, machines::HistEventData, fatal, nondeterminism}; use std::convert::TryFrom; use temporalio_common::protos::{ coresdk::{ @@ -120,15 +120,15 @@ impl TryFrom for TimerMachineEvents { { Self::TimerFired(attrs) } else { - return Err(WFMachinesError::Fatal(format!( + return Err(fatal!( "Timer fired attribs were unset: {e}" - ))); + )); } } _ => { - return Err(WFMachinesError::Nondeterminism(format!( + return Err(nondeterminism!( "Timer machine does not handle this event: {e}" - ))); + )) } }) } @@ -216,10 +216,10 @@ impl StartCommandRecorded { if dat.attrs.seq.to_string() == attrs.timer_id { TransitionResult::ok(vec![TimerMachineCommand::Complete], Fired::default()) } else { - TransitionResult::Err(WFMachinesError::Nondeterminism(format!( + TransitionResult::Err(nondeterminism!( "Timer fired event did not have expected timer id {}, it was {}!", dat.attrs.seq, attrs.timer_id - ))) + )) } } diff --git a/crates/sdk-core/src/worker/workflow/machines/update_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/update_state_machine.rs index 998e318ac..633be6e84 100644 --- a/crates/sdk-core/src/worker/workflow/machines/update_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/update_state_machine.rs @@ -4,7 +4,11 @@ use super::{ }; use crate::{ protosext::protocol_messages::UpdateRequest, - worker::workflow::machines::{HistEventData, NewMachineWithResponse}, + worker::workflow::{ + machines::{HistEventData, NewMachineWithResponse}, + fatal, + nondeterminism, + }, }; use itertools::Itertools; use prost::EncodeError; @@ -121,10 +125,10 @@ impl UpdateMachine { ) -> Result, WFMachinesError> { let cmds = match resp.response { None => { - return Err(WFMachinesError::Fatal(format!( + return Err(fatal!( "Update response for update {} had an empty result, this is a lang layer bug.", &self.shared_state.meta.update_id - ))); + )); } Some(update_response::Response::Accepted(_)) => { self.on_event(UpdateMachineEvents::Accept) @@ -137,11 +141,11 @@ impl UpdateMachine { } } .map_err(|e| match e { - MachineError::InvalidTransition => WFMachinesError::Nondeterminism(format!( + MachineError::InvalidTransition => nondeterminism!( "Invalid transition while handling update response (id {}) in state {}", &self.shared_state.meta.update_id, self.state(), - )), + ), MachineError::Underlying(e) => e, })?; cmds.into_iter() @@ -175,7 +179,7 @@ impl UpdateMachine { msg: UpdateMsg, ) -> Result { let accept_body = msg.pack().map_err(|e| { - WFMachinesError::Fatal(format!("Failed to serialize update response: {e:?}")) + fatal!("Failed to serialize update response: {e:?}") })?; Ok(MachineResponse::IssueNewMessage(ProtocolMessage { id: outgoing_id.clone(), @@ -223,9 +227,9 @@ impl TryFrom for UpdateMachineEvents { UpdateMachineEvents::WorkflowExecutionUpdateCompleted } _ => { - return Err(WFMachinesError::Nondeterminism(format!( + return Err(nondeterminism!( "Update machine does not handle this event: {e}" - ))); + )) } }) } diff --git a/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs b/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs index 23411f008..1d00e55e7 100644 --- a/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs +++ b/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs @@ -26,6 +26,8 @@ use crate::{ CommandID, DrivenWorkflow, HistoryUpdate, InternalFlagsRef, LocalResolution, OutgoingJob, RunBasics, WFCommand, WFCommandVariant, WFMachinesError, WorkflowStartedInfo, + fatal, + nondeterminism, history_update::NextWFT, machines::{ HistEventData, activity_state_machine::ActivityMachine, @@ -246,11 +248,11 @@ macro_rules! cancel_machine { let machine = if let Machines::$machine_variant(m) = $self.machine_mut(m_key) { m } else { - return Err(WFMachinesError::Nondeterminism(format!( + return Err(nondeterminism!( "Machine was not a {} when it should have been during cancellation: {:?}", stringify!($machine_variant), $cmd_id - ))); + )); }; let machine_resps = machine.$cancel_method($($args),*)?; $self.process_machine_responses(m_key, machine_resps)? @@ -366,10 +368,10 @@ impl WorkflowMachines { } self.process_machine_responses(mk, resps)?; } else { - return Err(WFMachinesError::Nondeterminism(format!( + return Err(nondeterminism!( "Command matching activity with seq num {seq} existed but was not a \ local activity!" - ))); + )); } self.local_activity_data.done_executing(seq); } @@ -573,7 +575,7 @@ impl WorkflowMachines { (evts, has_final_event) } NextWFT::NeedFetch => { - return Err(WFMachinesError::Fatal( + return Err(fatal!("{}", "Need to fetch history events to continue applying workflow task, but this \ should be prevented ahead of time! This is a Core SDK bug." .to_string(), @@ -649,10 +651,10 @@ impl WorkflowMachines { while let Some(event) = history.next() { let eid = event.event_id; if eid != self.last_processed_event + 1 { - return Err(WFMachinesError::Fatal(format!( + return Err(fatal!( "History is out of order. Last processed event: {}, event id: {}", self.last_processed_event, eid - ))); + )); } let next_event = history.peek(); @@ -782,9 +784,9 @@ impl WorkflowMachines { self.local_activity_data.insert_peeked_marker(la_dat); } } else { - return Err(WFMachinesError::Fatal(format!( + return Err(fatal!( "Local activity marker was unparsable: {e:?}" - ))); + )); } } else if let Some( history_event::Attributes::WorkflowExecutionUpdateAcceptedEventAttributes(ref atts), @@ -856,7 +858,7 @@ impl WorkflowMachines { let are_more_events = next_event.is_some() || !event_dat.current_task_is_last_in_history; return if are_more_events { - Err(WFMachinesError::Fatal( + Err(fatal!("{}", "Machines were fed a history which has an event after workflow execution was \ terminated!" .to_string(), @@ -867,9 +869,9 @@ impl WorkflowMachines { } if event.event_type() == EventType::Unspecified || event.attributes.is_none() { return if !event.worker_may_ignore { - Err(WFMachinesError::Fatal(format!( + Err(fatal!( "Event type is unspecified! This history is invalid. Event detail: {event:?}" - ))) + )) } else { debug!("Event is ignorable"); Ok(EventHandlingOutcome::SkipEvent { @@ -894,10 +896,10 @@ impl WorkflowMachines { .machines_by_event_id .get(&initial_cmd_id) .ok_or_else(|| { - WFMachinesError::Nondeterminism(format!( + nondeterminism!( "During event handling, this event had an initial command ID but we \ could not find a matching command for it: {event:?}" - )) + ) })?; self.submachine_handle_event(*mkey, event_dat)?; } else { @@ -925,7 +927,7 @@ impl WorkflowMachines { if event.is_local_activity_marker() { let deets = event.extract_local_activity_marker_data().ok_or_else(|| { - WFMachinesError::Fatal(format!("Local activity marker was unparsable: {event:?}")) + fatal!("Local activity marker was unparsable: {event:?}") })?; let cmdid = CommandID::LocalActivity(deets.seq); let mkey = self.get_machine_key(cmdid)?; @@ -935,10 +937,10 @@ impl WorkflowMachines { return Ok(EventHandlingOutcome::Normal); } } else { - return Err(WFMachinesError::Fatal(format!( + return Err(fatal!( "Encountered local activity marker but the associated machine was of the \ wrong type! {event:?}" - ))); + )); } } @@ -959,9 +961,9 @@ impl WorkflowMachines { let command = if let Some(c) = maybe_command { c } else { - return Err(WFMachinesError::Nondeterminism(format!( + return Err(nondeterminism!( "No command scheduled for event {event}" - ))); + )); }; let canceled_before_sent = self @@ -1008,9 +1010,9 @@ impl WorkflowMachines { attrs, ); } else { - return Err(WFMachinesError::Fatal(format!( + return Err(fatal!( "WorkflowExecutionStarted event did not have appropriate attributes: {event_dat}" - ))); + )); } } Ok(EventType::WorkflowTaskScheduled) => { @@ -1044,9 +1046,9 @@ impl WorkflowMachines { } } _ => { - return Err(WFMachinesError::Fatal(format!( + return Err(fatal!( "The event is not a non-stateful event, but we tried to handle it as one: {event_dat}" - ))); + )); } } Ok(()) @@ -1084,11 +1086,11 @@ impl WorkflowMachines { IncomingProtocolMessageBody::UpdateRequest(ur) => { let seq_id = if let SequencingId::EventId(eid) = message .sequencing_id - .ok_or_else(|| WFMachinesError::Fatal(SEQIDERR.to_string()))? + .ok_or_else(|| fatal!("{}", SEQIDERR.to_string()))? { eid } else { - return Err(WFMachinesError::Fatal(SEQIDERR.to_string())); + return Err(fatal!("{}", SEQIDERR.to_string())); }; let um = UpdateMachine::init( message.id, @@ -1213,9 +1215,9 @@ impl WorkflowMachines { ); } c => { - return Err(WFMachinesError::Fatal(format!( + return Err(fatal!( "A machine requested to create a new command of an unsupported type: {c:?}" - ))); + )); } }, MachineResponse::IssueFakeLocalActivityMarker(seq) => { @@ -1360,9 +1362,9 @@ impl WorkflowMachines { let seq = attrs.seq; let attrs: ValidScheduleLA = ValidScheduleLA::from_schedule_la(attrs, cmd.metadata).map_err(|e| { - WFMachinesError::Fatal(format!( + fatal!( "Invalid schedule local activity request (seq {seq}): {e}" - )) + ) })?; let (la, mach_resp) = new_local_activity( attrs, @@ -1470,7 +1472,7 @@ impl WorkflowMachines { } WFCommandVariant::RequestCancelExternalWorkflow(attrs) => { let we = attrs.workflow_execution.ok_or_else(|| { - WFMachinesError::Fatal( + fatal!("{}", "Cancel external workflow command had no workflow_execution field" .to_string(), ) @@ -1521,11 +1523,11 @@ impl WorkflowMachines { let m = if let Machines::UpdateMachine(m) = self.machine_mut(m_key) { m } else { - return Err(WFMachinesError::Nondeterminism(format!( + return Err(nondeterminism!( "Tried to handle an update response for \ update with instance id {} but it was not found!", &ur.protocol_instance_id - ))); + )); }; let resps = m.handle_response(ur)?; self.process_machine_responses(m_key, resps)?; @@ -1554,7 +1556,7 @@ impl WorkflowMachines { fn get_machine_key(&self, id: CommandID) -> Result { Ok(*self.id_to_machine.get(&id).ok_or_else(|| { - WFMachinesError::Nondeterminism(format!("Missing associated machine for {id:?}")) + nondeterminism!("Missing associated machine for {id:?}") })?) } @@ -1563,9 +1565,9 @@ impl WorkflowMachines { .machines_by_protocol_instance_id .get(protocol_instance_id) .ok_or_else(|| { - WFMachinesError::Fatal(format!( + fatal!( "Missing associated machine for protocol message {protocol_instance_id}" - )) + ) })?) } @@ -1758,10 +1760,10 @@ fn patch_marker_handling( debug!("Deprecated patch marker tried against non-patch machine, skipping."); skip_one_or_two_events(next_event) } else { - Err(WFMachinesError::Nondeterminism(format!( + Err(nondeterminism!( "Non-deprecated patch marker encountered for change {patch_name}, but there is \ no corresponding change command!" - ))) + )) } } } else if patch_machine.is_some() { diff --git a/crates/sdk-core/src/worker/workflow/machines/workflow_task_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/workflow_task_state_machine.rs index 2ad275e30..a27da4676 100644 --- a/crates/sdk-core/src/worker/workflow/machines/workflow_task_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/workflow_task_state_machine.rs @@ -4,7 +4,7 @@ use super::{ EventInfo, StateMachine, TransitionResult, WFMachinesAdapter, WFMachinesError, fsm, workflow_machines::MachineResponse, }; -use crate::worker::workflow::machines::HistEventData; +use crate::worker::workflow::{machines::HistEventData, fatal, nondeterminism}; use std::{ convert::{TryFrom, TryInto}, time::SystemTime, @@ -67,7 +67,7 @@ impl WFMachinesAdapter for WorkflowTaskMachine { let (event_id, event_type) = if let Some(ei) = event_info { (ei.event_id, ei.event_type) } else { - return Err(WFMachinesError::Fatal( + return Err(fatal!("{}", "WF Task machine should never issue a task started trigger \ command in response to non-history events" .to_string(), @@ -102,16 +102,15 @@ impl TryFrom for WorkflowTaskMachineEvents { match time.try_into() { Ok(t) => t, Err(_) => { - return Err(WFMachinesError::Fatal( + return Err(fatal!( "Workflow task started event timestamp was inconvertible" - .to_string(), )); } } } else { - return Err(WFMachinesError::Fatal(format!( + return Err(fatal!( "Workflow task started event must contain timestamp: {e}" - ))); + )); }; WFTStartedDat { started_event_id: e.event_id, @@ -137,15 +136,15 @@ impl TryFrom for WorkflowTaskMachineEvents { }, }) } else { - return Err(WFMachinesError::Fatal(format!( + return Err(fatal!( "Workflow task failed is missing attributes: {e}" - ))); + )); } } _ => { - return Err(WFMachinesError::Nondeterminism(format!( + return Err(nondeterminism!( "Event does not apply to a wf task machine: {e}" - ))); + )) } }) } diff --git a/crates/sdk-core/src/worker/workflow/managed_run.rs b/crates/sdk-core/src/worker/workflow/managed_run.rs index 79cb5c55c..7d10b6d6c 100644 --- a/crates/sdk-core/src/worker/workflow/managed_run.rs +++ b/crates/sdk-core/src/worker/workflow/managed_run.rs @@ -243,10 +243,10 @@ impl ManagedRun { let r = self.wfm.get_next_activation()?; if r.jobs.is_empty() { return Err(RunUpdateErr { - source: WFMachinesError::Fatal(format!( + source: crate::worker::workflow::fatal!( "Machines created for {} with no jobs", self.wfm.machines.run_id - )), + ), complete_resp_chan: None, }); } diff --git a/crates/sdk-core/src/worker/workflow/mod.rs b/crates/sdk-core/src/worker/workflow/mod.rs index abddc6593..880d7aa12 100644 --- a/crates/sdk-core/src/worker/workflow/mod.rs +++ b/crates/sdk-core/src/worker/workflow/mod.rs @@ -1426,7 +1426,79 @@ pub(crate) enum WFMachinesError { Fatal(String), } +/// Helper macro to create Nondeterminism errors with automatic assertion +/// +/// Usage: `nondeterminism!("Activity id mismatch: {} vs {}", id1, id2)` +macro_rules! nondeterminism { + ($($arg:tt)*) => {{ + let msg = format!($($arg)*); + #[cfg(feature = "antithesis_assertions")] + $crate::antithesis::assert_unreachable!( + "Nondeterminism error detected", + ::serde_json::json!({ + "error_message": &msg, + "error_type": "Nondeterminism", + "location": format!("{}:{}", file!(), line!()) + }) + ); + WFMachinesError::Nondeterminism(msg) + }}; +} +pub(crate) use nondeterminism; + +/// Helper macro to create Fatal errors with automatic assertion +/// +/// Usage: `fatal!("Invalid state transition: {:?}", state)` +macro_rules! fatal { + ($($arg:tt)*) => {{ + let msg = format!($($arg)*); + #[cfg(feature = "antithesis_assertions")] + $crate::antithesis::assert_unreachable!( + "Fatal error detected", + ::serde_json::json!({ + "error_message": &msg, + "error_type": "Fatal", + "location": format!("{}:{}", file!(), line!()) + }) + ); + WFMachinesError::Fatal(msg) + }}; +} +pub(crate) use fatal; + impl WFMachinesError { + /// Create a new Nondeterminism error with assertion instrumentation + pub(crate) fn nondeterminism(message: impl Into) -> Self { + let msg = message.into(); + + #[cfg(feature = "antithesis_assertions")] + crate::antithesis::assert_unreachable!( + "Nondeterminism error detected", + ::serde_json::json!({ + "error_message": msg, + "error_type": "Nondeterminism" + }) + ); + + WFMachinesError::Nondeterminism(msg) + } + + /// Create a new Fatal error with assertion instrumentation + pub(crate) fn fatal(message: impl Into) -> Self { + let msg = message.into(); + + #[cfg(feature = "antithesis_assertions")] + crate::antithesis::assert_unreachable!( + "Fatal error detected", + ::serde_json::json!({ + "error_message": msg, + "error_type": "Fatal" + }) + ); + + WFMachinesError::Fatal(msg) + } + fn evict_reason(&self) -> EvictionReason { match self { WFMachinesError::Nondeterminism(_) => EvictionReason::Nondeterminism, @@ -1452,7 +1524,7 @@ impl From> for WFMachinesError { match v { MachineError::InvalidTransition => { // TODO: Get states back - WFMachinesError::Nondeterminism("Invalid transition in state machine".to_string()) + WFMachinesError::nondeterminism("Invalid transition in state machine") } MachineError::Underlying(e) => e, } @@ -1461,13 +1533,13 @@ impl From> for WFMachinesError { impl From for WFMachinesError { fn from(_: TimestampError) -> Self { - Self::Fatal("Could not decode timestamp".to_string()) + Self::fatal("Could not decode timestamp") } } impl From for WFMachinesError { fn from(value: anyhow::Error) -> Self { - WFMachinesError::Fatal(value.to_string()) + WFMachinesError::fatal(value.to_string()) } } From ca665bf4a65a1688e2156a52b76c9b6cfcfc5b9a Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Fri, 14 Nov 2025 16:03:54 -0800 Subject: [PATCH 8/9] fmt --- .../machines/activity_state_machine.rs | 32 ++++-------- .../machines/cancel_external_state_machine.rs | 2 +- .../machines/cancel_workflow_state_machine.rs | 2 +- .../machines/child_workflow_state_machine.rs | 45 +++++++--------- .../complete_workflow_state_machine.rs | 5 +- .../continue_as_new_workflow_state_machine.rs | 2 +- .../machines/fail_workflow_state_machine.rs | 2 +- .../machines/local_activity_state_machine.rs | 16 +++--- .../src/worker/workflow/machines/mod.rs | 12 ++--- .../machines/nexus_operation_state_machine.rs | 38 ++++++-------- .../workflow/machines/patch_state_machine.rs | 8 +-- .../machines/signal_external_state_machine.rs | 12 ++--- .../workflow/machines/timer_state_machine.rs | 11 ++-- .../workflow/machines/update_state_machine.rs | 10 ++-- .../workflow/machines/workflow_machines.rs | 52 +++++++------------ .../machines/workflow_task_state_machine.rs | 11 ++-- 16 files changed, 111 insertions(+), 149 deletions(-) diff --git a/crates/sdk-core/src/worker/workflow/machines/activity_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/activity_state_machine.rs index 3d549b406..33ea7a963 100644 --- a/crates/sdk-core/src/worker/workflow/machines/activity_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/activity_state_machine.rs @@ -7,7 +7,7 @@ use super::{ use crate::{ abstractions::dbg_panic, internal_flags::CoreInternalFlags, - worker::workflow::{InternalFlagsRef, machines::HistEventData, fatal, nondeterminism}, + worker::workflow::{InternalFlagsRef, fatal, machines::HistEventData, nondeterminism}, }; use std::convert::{TryFrom, TryInto}; use temporalio_common::protos::{ @@ -236,9 +236,7 @@ impl TryFrom for ActivityMachineEvents { last_task_in_history, }) } else { - return Err(fatal!( - "Activity scheduled attributes were unset: {e}" - )); + return Err(fatal!("Activity scheduled attributes were unset: {e}")); } } EventType::ActivityTaskStarted => Self::ActivityTaskStarted(e.event_id), @@ -249,9 +247,7 @@ impl TryFrom for ActivityMachineEvents { { Self::ActivityTaskCompleted(attrs) } else { - return Err(fatal!( - "Activity completion attributes were unset: {e}" - )); + return Err(fatal!("Activity completion attributes were unset: {e}")); } } EventType::ActivityTaskFailed => { @@ -260,9 +256,7 @@ impl TryFrom for ActivityMachineEvents { { Self::ActivityTaskFailed(attrs) } else { - return Err(fatal!( - "Activity failure attributes were unset: {e}" - )); + return Err(fatal!("Activity failure attributes were unset: {e}")); } } EventType::ActivityTaskTimedOut => { @@ -271,9 +265,7 @@ impl TryFrom for ActivityMachineEvents { { Self::ActivityTaskTimedOut(attrs) } else { - return Err(fatal!( - "Activity timeout attributes were unset: {e}" - )); + return Err(fatal!("Activity timeout attributes were unset: {e}")); } } EventType::ActivityTaskCancelRequested => Self::ActivityTaskCancelRequested, @@ -283,9 +275,7 @@ impl TryFrom for ActivityMachineEvents { { Self::ActivityTaskCanceled(attrs) } else { - return Err(fatal!( - "Activity cancellation attributes were unset: {e}" - )); + return Err(fatal!("Activity cancellation attributes were unset: {e}")); } } _ => { @@ -391,14 +381,16 @@ impl ScheduleCommandCreated { return TransitionResult::Err(nondeterminism!( "Activity id of scheduled event '{}' does not \ match activity id of activity command '{}'", - sched_dat.act_id, dat.attrs.activity_id + sched_dat.act_id, + dat.attrs.activity_id )); } if sched_dat.act_type != dat.attrs.activity_type { return TransitionResult::Err(nondeterminism!( "Activity type of scheduled event '{}' does not \ match activity type of activity command '{}'", - sched_dat.act_type, dat.attrs.activity_type + sched_dat.act_type, + dat.attrs.activity_type )); } } @@ -912,9 +904,7 @@ fn convert_payloads( result: Option, ) -> Result, WFMachinesError> { result.map(TryInto::try_into).transpose().map_err(|pe| { - fatal!( - "Not exactly one payload in activity result ({pe}) for event: {event_info:?}" - ) + fatal!("Not exactly one payload in activity result ({pe}) for event: {event_info:?}") }) } diff --git a/crates/sdk-core/src/worker/workflow/machines/cancel_external_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/cancel_external_state_machine.rs index 518d81521..2250da8af 100644 --- a/crates/sdk-core/src/worker/workflow/machines/cancel_external_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/cancel_external_state_machine.rs @@ -2,7 +2,7 @@ use super::{ EventInfo, NewMachineWithCommand, OnEventWrapper, StateMachine, TransitionResult, WFMachinesAdapter, WFMachinesError, fsm, workflow_machines::MachineResponse, }; -use crate::worker::workflow::{machines::HistEventData, fatal, nondeterminism}; +use crate::worker::workflow::{fatal, machines::HistEventData, nondeterminism}; use std::convert::TryFrom; use temporalio_common::protos::{ coresdk::{ diff --git a/crates/sdk-core/src/worker/workflow/machines/cancel_workflow_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/cancel_workflow_state_machine.rs index 1edbdef22..65688472f 100644 --- a/crates/sdk-core/src/worker/workflow/machines/cancel_workflow_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/cancel_workflow_state_machine.rs @@ -2,7 +2,7 @@ use super::{ EventInfo, NewMachineWithCommand, OnEventWrapper, StateMachine, TransitionResult, WFMachinesAdapter, WFMachinesError, fsm, workflow_machines::MachineResponse, }; -use crate::worker::workflow::{machines::HistEventData, nondeterminism, fatal}; +use crate::worker::workflow::{fatal, machines::HistEventData, nondeterminism}; use std::convert::TryFrom; use temporalio_common::protos::{ coresdk::workflow_commands::CancelWorkflowExecution, diff --git a/crates/sdk-core/src/worker/workflow/machines/child_workflow_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/child_workflow_state_machine.rs index f27022e2c..e70a7ba76 100644 --- a/crates/sdk-core/src/worker/workflow/machines/child_workflow_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/child_workflow_state_machine.rs @@ -5,7 +5,7 @@ use super::{ use crate::{ abstractions::dbg_panic, internal_flags::CoreInternalFlags, - worker::workflow::{InternalFlagsRef, machines::HistEventData, fatal, nondeterminism}, + worker::workflow::{InternalFlagsRef, fatal, machines::HistEventData, nondeterminism}, }; use itertools::Itertools; use std::{ @@ -134,10 +134,9 @@ pub(super) struct Cancelled { } fn completion_of_not_abandoned_err() -> WFMachinesError { - nondeterminism!("{}", + nondeterminism!( "Child workflows which don't have the ABANDON cancellation type cannot complete after \ being cancelled." - .to_string(), ) } @@ -146,10 +145,9 @@ impl Cancelled { self, ) -> ChildWorkflowMachineTransition { if self.seen_cancelled_event { - ChildWorkflowMachineTransition::Err(fatal!("{}", + ChildWorkflowMachineTransition::Err(fatal!( "Child workflow has already seen a ChildWorkflowExecutionCanceledEvent, and now \ another is being applied! This is a bug, please report." - .to_string(), )) } else { ChildWorkflowMachineTransition::ok( @@ -237,14 +235,16 @@ impl StartCommandCreated { return TransitionResult::Err(nondeterminism!( "Child workflow id of scheduled event '{}' does not \ match child workflow id of command '{}'", - event_dat.wf_id, state.workflow_id + event_dat.wf_id, + state.workflow_id )); } if event_dat.wf_type != state.workflow_type { return TransitionResult::Err(nondeterminism!( "Child workflow type of scheduled event '{}' does not \ match child workflow type of command '{}'", - event_dat.wf_type, state.workflow_type + event_dat.wf_type, + state.workflow_type )); } } @@ -533,8 +533,8 @@ impl TryFrom for ChildWorkflowMachineEvents { last_task_in_history, }) } else { - return Err(fatal!("{}", - "StartChildWorkflowExecutionInitiated attributes were unset".to_string(), + return Err(fatal!( + "StartChildWorkflowExecutionInitiated attributes were unset" )); } } @@ -547,14 +547,12 @@ impl TryFrom for ChildWorkflowMachineEvents { { Self::StartChildWorkflowExecutionFailed( StartChildWorkflowExecutionFailedCause::try_from(cause).map_err(|_| { - fatal!("{}", - "Invalid StartChildWorkflowExecutionFailedCause".to_string(), - ) + fatal!("Invalid StartChildWorkflowExecutionFailedCause") })?, ) } else { - return Err(fatal!("{}", - "StartChildWorkflowExecutionFailed attributes were unset".to_string(), + return Err(fatal!( + "StartChildWorkflowExecutionFailed attributes were unset" )); } } @@ -573,9 +571,8 @@ impl TryFrom for ChildWorkflowMachineEvents { started_event_id: e.event_id, }) } else { - return Err(fatal!("{}", + return Err(fatal!( "ChildWorkflowExecutionStarted attributes were unset or malformed" - .to_string(), )); } } @@ -588,9 +585,8 @@ impl TryFrom for ChildWorkflowMachineEvents { { Self::ChildWorkflowExecutionCompleted(result) } else { - return Err(fatal!("{}", + return Err(fatal!( "ChildWorkflowExecutionCompleted attributes were unset or malformed" - .to_string(), )); } } @@ -601,9 +597,7 @@ impl TryFrom for ChildWorkflowMachineEvents { { Self::ChildWorkflowExecutionFailed(attrs) } else { - return Err(fatal!("{}", - "ChildWorkflowExecutionFailed attributes were unset".to_string(), - )); + return Err(fatal!("ChildWorkflowExecutionFailed attributes were unset")); } } Ok(EventType::ChildWorkflowExecutionTimedOut) => { @@ -613,9 +607,8 @@ impl TryFrom for ChildWorkflowMachineEvents { { Self::ChildWorkflowExecutionTimedOut(atts.retry_state()) } else { - return Err(fatal!("{}", + return Err(fatal!( "ChildWorkflowExecutionTimedOut attributes were unset or malformed" - .to_string(), )); } } @@ -626,7 +619,7 @@ impl TryFrom for ChildWorkflowMachineEvents { _ => { return Err(nondeterminism!( "Child workflow machine does not handle this event: {e:?}" - )) + )); } }) } @@ -773,9 +766,7 @@ fn convert_payloads( result: Option, ) -> Result, WFMachinesError> { result.map(TryInto::try_into).transpose().map_err(|pe| { - fatal!( - "Not exactly one payload in child workflow result ({pe}) for event: {event_info:?}" - ) + fatal!("Not exactly one payload in child workflow result ({pe}) for event: {event_info:?}") }) } diff --git a/crates/sdk-core/src/worker/workflow/machines/complete_workflow_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/complete_workflow_state_machine.rs index b118661e3..14f65f1d9 100644 --- a/crates/sdk-core/src/worker/workflow/machines/complete_workflow_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/complete_workflow_state_machine.rs @@ -2,7 +2,10 @@ use super::{ EventInfo, NewMachineWithCommand, OnEventWrapper, StateMachine, TransitionResult, WFMachinesAdapter, WFMachinesError, fsm, workflow_machines::MachineResponse, }; -use crate::{abstractions::dbg_panic, worker::workflow::{machines::HistEventData, nondeterminism, fatal}}; +use crate::{ + abstractions::dbg_panic, + worker::workflow::{fatal, machines::HistEventData, nondeterminism}, +}; use std::convert::TryFrom; use temporalio_common::protos::{ coresdk::workflow_commands::CompleteWorkflowExecution, diff --git a/crates/sdk-core/src/worker/workflow/machines/continue_as_new_workflow_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/continue_as_new_workflow_state_machine.rs index f407b9b44..925d8aa4d 100644 --- a/crates/sdk-core/src/worker/workflow/machines/continue_as_new_workflow_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/continue_as_new_workflow_state_machine.rs @@ -2,7 +2,7 @@ use super::{ EventInfo, MachineResponse, NewMachineWithCommand, OnEventWrapper, StateMachine, TransitionResult, WFMachinesAdapter, WFMachinesError, fsm, }; -use crate::worker::workflow::{machines::HistEventData, nondeterminism, fatal}; +use crate::worker::workflow::{fatal, machines::HistEventData, nondeterminism}; use std::convert::TryFrom; use temporalio_common::protos::{ coresdk::workflow_commands::ContinueAsNewWorkflowExecution, diff --git a/crates/sdk-core/src/worker/workflow/machines/fail_workflow_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/fail_workflow_state_machine.rs index 66be30214..0c4dd8112 100644 --- a/crates/sdk-core/src/worker/workflow/machines/fail_workflow_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/fail_workflow_state_machine.rs @@ -2,7 +2,7 @@ use super::{ EventInfo, NewMachineWithCommand, OnEventWrapper, StateMachine, TransitionResult, WFMachinesAdapter, WFMachinesError, fsm, workflow_machines::MachineResponse, }; -use crate::worker::workflow::{machines::HistEventData, nondeterminism, fatal}; +use crate::worker::workflow::{fatal, machines::HistEventData, nondeterminism}; use std::convert::TryFrom; use temporalio_common::protos::{ coresdk::workflow_commands::FailWorkflowExecution, diff --git a/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs index 04402cdee..6a12e11dd 100644 --- a/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs @@ -8,9 +8,8 @@ use crate::{ worker::{ LocalActivityExecutionResult, workflow::{ - InternalFlagsRef, + InternalFlagsRef, fatal, machines::{HistEventData, activity_state_machine::activity_fail_info}, - fatal, nondeterminism, }, }, @@ -164,8 +163,8 @@ pub(super) fn new_local_activity( } } else { if maybe_pre_resolved.is_some() { - return Err(nondeterminism!("{}", - "Local activity cannot be created as pre-resolved while not replaying".to_string(), + return Err(nondeterminism!( + "Local activity cannot be created as pre-resolved while not replaying" )); } Executing {}.into() @@ -835,7 +834,8 @@ fn verify_marker_data_matches( return Err(nondeterminism!( "Local activity marker data has sequence number {} but matched against LA \ command with sequence number {}", - dat.marker_dat.seq, shared.attrs.seq + dat.marker_dat.seq, + shared.attrs.seq )); } // Here we use whether or not we were replaying when we _first invoked_ the LA, because we @@ -849,14 +849,16 @@ fn verify_marker_data_matches( return Err(nondeterminism!( "Activity id of recorded marker '{}' does not \ match activity id of local activity command '{}'", - dat.marker_dat.activity_id, shared.attrs.activity_id + dat.marker_dat.activity_id, + shared.attrs.activity_id )); } if dat.marker_dat.activity_type != shared.attrs.activity_type { return Err(nondeterminism!( "Activity type of recorded marker '{}' does not \ match activity type of local activity command '{}'", - dat.marker_dat.activity_type, shared.attrs.activity_type + dat.marker_dat.activity_type, + shared.attrs.activity_type )); } } diff --git a/crates/sdk-core/src/worker/workflow/machines/mod.rs b/crates/sdk-core/src/worker/workflow/machines/mod.rs index 16be6d4ef..bc5e3fcb9 100644 --- a/crates/sdk-core/src/worker/workflow/machines/mod.rs +++ b/crates/sdk-core/src/worker/workflow/machines/mod.rs @@ -130,13 +130,11 @@ where if let Ok(converted_command) = command_type.try_into() { match OnEventWrapper::on_event_mut(self, converted_command) { Ok(c) => process_machine_commands(self, c, None), - Err(MachineError::InvalidTransition) => { - Err(nondeterminism!( - "Unexpected command producing an invalid transition {:?} in state {}", - command_type, - self.state() - )) - } + Err(MachineError::InvalidTransition) => Err(nondeterminism!( + "Unexpected command producing an invalid transition {:?} in state {}", + command_type, + self.state() + )), Err(MachineError::Underlying(e)) => Err(e.into()), } } else { diff --git a/crates/sdk-core/src/worker/workflow/machines/nexus_operation_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/nexus_operation_state_machine.rs index 3414526df..81b0dc028 100644 --- a/crates/sdk-core/src/worker/workflow/machines/nexus_operation_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/nexus_operation_state_machine.rs @@ -1,11 +1,10 @@ use super::{MachineError, StateMachine, TransitionResult, fsm}; use crate::worker::workflow::{ - WFMachinesError, + WFMachinesError, fatal, machines::{ EventInfo, HistEventData, NewMachineWithCommand, OnEventWrapper, WFMachinesAdapter, workflow_machines::MachineResponse, }, - fatal, nondeterminism, }; use itertools::Itertools; @@ -394,10 +393,9 @@ pub(super) struct TimedOut; pub(super) struct Cancelled; fn completion_of_not_abandoned_err() -> WFMachinesError { - nondeterminism!("{}", + nondeterminism!( "Nexus operation which don't have the ABANDON cancellation type cannot complete after \ being cancelled." - .to_string(), ) } @@ -457,8 +455,8 @@ impl TryFrom for NexusOperationMachineEvents { event_id: e.event_id, }) } else { - return Err(nondeterminism!("{}", - "NexusOperationScheduled attributes were unset or malformed".to_string(), + return Err(nondeterminism!( + "NexusOperationScheduled attributes were unset or malformed" )); } } @@ -468,8 +466,8 @@ impl TryFrom for NexusOperationMachineEvents { { Self::NexusOperationStarted(sa) } else { - return Err(nondeterminism!("{}", - "NexusOperationStarted attributes were unset or malformed".to_string(), + return Err(nondeterminism!( + "NexusOperationStarted attributes were unset or malformed" )); } } @@ -479,8 +477,8 @@ impl TryFrom for NexusOperationMachineEvents { { Self::NexusOperationCompleted(ca) } else { - return Err(nondeterminism!("{}", - "NexusOperationCompleted attributes were unset or malformed".to_string(), + return Err(nondeterminism!( + "NexusOperationCompleted attributes were unset or malformed" )); } } @@ -490,8 +488,8 @@ impl TryFrom for NexusOperationMachineEvents { { Self::NexusOperationFailed(fa) } else { - return Err(nondeterminism!("{}", - "NexusOperationFailed attributes were unset or malformed".to_string(), + return Err(nondeterminism!( + "NexusOperationFailed attributes were unset or malformed" )); } } @@ -501,8 +499,8 @@ impl TryFrom for NexusOperationMachineEvents { { Self::NexusOperationCanceled(ca) } else { - return Err(nondeterminism!("{}", - "NexusOperationCanceled attributes were unset or malformed".to_string(), + return Err(nondeterminism!( + "NexusOperationCanceled attributes were unset or malformed" )); } } @@ -512,8 +510,8 @@ impl TryFrom for NexusOperationMachineEvents { { Self::NexusOperationTimedOut(toa) } else { - return Err(nondeterminism!("{}", - "NexusOperationTimedOut attributes were unset or malformed".to_string(), + return Err(nondeterminism!( + "NexusOperationTimedOut attributes were unset or malformed" )); } } @@ -527,9 +525,8 @@ impl TryFrom for NexusOperationMachineEvents { { Self::NexusOperationCancelRequestCompleted(attrs) } else { - return Err(nondeterminism!("{}", + return Err(nondeterminism!( "NexusOperationCancelRequestCompleted attributes were unset or malformed" - .to_string(), )); } } @@ -542,16 +539,15 @@ impl TryFrom for NexusOperationMachineEvents { { Self::NexusOperationCancelRequestFailed(attrs) } else { - return Err(nondeterminism!("{}", + return Err(nondeterminism!( "NexusOperationCancelRequestFailed attributes were unset or malformed" - .to_string(), )); } } _ => { return Err(nondeterminism!( "Nexus operation machine does not handle this event: {e:?}" - )) + )); } }) } diff --git a/crates/sdk-core/src/worker/workflow/machines/patch_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/patch_state_machine.rs index b149c7bb3..6c6f14821 100644 --- a/crates/sdk-core/src/worker/workflow/machines/patch_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/patch_state_machine.rs @@ -25,11 +25,10 @@ use crate::{ internal_flags::CoreInternalFlags, protosext::HistoryEventExt, worker::workflow::{ - InternalFlagsRef, + InternalFlagsRef, fatal, machines::{ HistEventData, upsert_search_attributes_state_machine::MAX_SEARCH_ATTR_PAYLOAD_SIZE, }, - fatal, nondeterminism, }, }; @@ -131,7 +130,7 @@ pub(super) fn has_change<'a>( let mut serialized = all_ids .as_json_payload() .context("Could not serialize search attribute value for patch machine") - .map_err(|e| fatal!("{}", e.to_string()))?; + .map_err(|e| fatal!("{}", e))?; if serialized.data.len() >= MAX_SEARCH_ATTR_PAYLOAD_SIZE { warn!( @@ -223,7 +222,8 @@ impl Notified { if id != dat.patch_id { return TransitionResult::Err(nondeterminism!( "Change id {} does not match expected id {}", - id, dat.patch_id + id, + dat.patch_id )); } TransitionResult::default() diff --git a/crates/sdk-core/src/worker/workflow/machines/signal_external_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/signal_external_state_machine.rs index 57271deff..548c198b3 100644 --- a/crates/sdk-core/src/worker/workflow/machines/signal_external_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/signal_external_state_machine.rs @@ -2,7 +2,7 @@ use super::{ EventInfo, MachineError, NewMachineWithCommand, OnEventWrapper, StateMachine, TransitionResult, WFMachinesAdapter, WFMachinesError, fsm, workflow_machines::MachineResponse, }; -use crate::worker::workflow::{machines::HistEventData, fatal, nondeterminism}; +use crate::worker::workflow::{fatal, machines::HistEventData, nondeterminism}; use std::convert::TryFrom; use temporalio_common::protos::{ coresdk::{ @@ -70,8 +70,8 @@ pub(super) fn new_external_signal( ) -> Result { let (workflow_execution, only_child) = match attrs.target { None => { - return Err(fatal!("{}", - "Signal external workflow command had empty target field".to_string(), + return Err(fatal!( + "Signal external workflow command had empty target field" )); } Some(sig_we::Target::ChildWorkflowId(wfid)) => ( @@ -195,15 +195,13 @@ impl TryFrom for SignalExternalMachineEvents { { Self::SignalExternalWorkflowExecutionFailed(attrs.cause()) } else { - return Err(fatal!( - "Signal workflow failed attributes were unset: {e}" - )); + return Err(fatal!("Signal workflow failed attributes were unset: {e}")); } } _ => { return Err(nondeterminism!( "Signal external WF machine does not handle this event: {e}" - )) + )); } }) } diff --git a/crates/sdk-core/src/worker/workflow/machines/timer_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/timer_state_machine.rs index 0c0d2d85d..f193fd41d 100644 --- a/crates/sdk-core/src/worker/workflow/machines/timer_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/timer_state_machine.rs @@ -4,7 +4,7 @@ use super::{ EventInfo, MachineError, NewMachineWithCommand, OnEventWrapper, StateMachine, TransitionResult, WFMachinesAdapter, fsm, workflow_machines::MachineResponse, }; -use crate::worker::workflow::{WFMachinesError, machines::HistEventData, fatal, nondeterminism}; +use crate::worker::workflow::{WFMachinesError, fatal, machines::HistEventData, nondeterminism}; use std::convert::TryFrom; use temporalio_common::protos::{ coresdk::{ @@ -120,15 +120,13 @@ impl TryFrom for TimerMachineEvents { { Self::TimerFired(attrs) } else { - return Err(fatal!( - "Timer fired attribs were unset: {e}" - )); + return Err(fatal!("Timer fired attribs were unset: {e}")); } } _ => { return Err(nondeterminism!( "Timer machine does not handle this event: {e}" - )) + )); } }) } @@ -218,7 +216,8 @@ impl StartCommandRecorded { } else { TransitionResult::Err(nondeterminism!( "Timer fired event did not have expected timer id {}, it was {}!", - dat.attrs.seq, attrs.timer_id + dat.attrs.seq, + attrs.timer_id )) } } diff --git a/crates/sdk-core/src/worker/workflow/machines/update_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/update_state_machine.rs index 633be6e84..b3222355b 100644 --- a/crates/sdk-core/src/worker/workflow/machines/update_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/update_state_machine.rs @@ -5,8 +5,8 @@ use super::{ use crate::{ protosext::protocol_messages::UpdateRequest, worker::workflow::{ - machines::{HistEventData, NewMachineWithResponse}, fatal, + machines::{HistEventData, NewMachineWithResponse}, nondeterminism, }, }; @@ -178,9 +178,9 @@ impl UpdateMachine { outgoing_id: String, msg: UpdateMsg, ) -> Result { - let accept_body = msg.pack().map_err(|e| { - fatal!("Failed to serialize update response: {e:?}") - })?; + let accept_body = msg + .pack() + .map_err(|e| fatal!("Failed to serialize update response: {e:?}"))?; Ok(MachineResponse::IssueNewMessage(ProtocolMessage { id: outgoing_id.clone(), protocol_instance_id: self.shared_state.instance_id.clone(), @@ -229,7 +229,7 @@ impl TryFrom for UpdateMachineEvents { _ => { return Err(nondeterminism!( "Update machine does not handle this event: {e}" - )) + )); } }) } diff --git a/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs b/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs index 1d00e55e7..bbc46d5e5 100644 --- a/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs +++ b/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs @@ -25,9 +25,7 @@ use crate::{ workflow::{ CommandID, DrivenWorkflow, HistoryUpdate, InternalFlagsRef, LocalResolution, OutgoingJob, RunBasics, WFCommand, WFCommandVariant, WFMachinesError, - WorkflowStartedInfo, - fatal, - nondeterminism, + WorkflowStartedInfo, fatal, history_update::NextWFT, machines::{ HistEventData, activity_state_machine::ActivityMachine, @@ -37,6 +35,7 @@ use crate::{ update_state_machine::UpdateMachine, upsert_search_attributes_state_machine::upsert_search_attrs_internal, }, + nondeterminism, }, }, }; @@ -575,10 +574,9 @@ impl WorkflowMachines { (evts, has_final_event) } NextWFT::NeedFetch => { - return Err(fatal!("{}", + return Err(fatal!( "Need to fetch history events to continue applying workflow task, but this \ should be prevented ahead of time! This is a Core SDK bug." - .to_string(), )); } }; @@ -653,7 +651,8 @@ impl WorkflowMachines { if eid != self.last_processed_event + 1 { return Err(fatal!( "History is out of order. Last processed event: {}, event id: {}", - self.last_processed_event, eid + self.last_processed_event, + eid )); } let next_event = history.peek(); @@ -784,9 +783,7 @@ impl WorkflowMachines { self.local_activity_data.insert_peeked_marker(la_dat); } } else { - return Err(fatal!( - "Local activity marker was unparsable: {e:?}" - )); + return Err(fatal!("Local activity marker was unparsable: {e:?}")); } } else if let Some( history_event::Attributes::WorkflowExecutionUpdateAcceptedEventAttributes(ref atts), @@ -858,10 +855,9 @@ impl WorkflowMachines { let are_more_events = next_event.is_some() || !event_dat.current_task_is_last_in_history; return if are_more_events { - Err(fatal!("{}", + Err(fatal!( "Machines were fed a history which has an event after workflow execution was \ terminated!" - .to_string(), )) } else { Ok(EventHandlingOutcome::Normal) @@ -926,9 +922,9 @@ impl WorkflowMachines { let event = &event_dat.event; if event.is_local_activity_marker() { - let deets = event.extract_local_activity_marker_data().ok_or_else(|| { - fatal!("Local activity marker was unparsable: {event:?}") - })?; + let deets = event + .extract_local_activity_marker_data() + .ok_or_else(|| fatal!("Local activity marker was unparsable: {event:?}"))?; let cmdid = CommandID::LocalActivity(deets.seq); let mkey = self.get_machine_key(cmdid)?; if let Machines::LocalActivityMachine(lam) = self.machine(mkey) { @@ -961,9 +957,7 @@ impl WorkflowMachines { let command = if let Some(c) = maybe_command { c } else { - return Err(nondeterminism!( - "No command scheduled for event {event}" - )); + return Err(nondeterminism!("No command scheduled for event {event}")); }; let canceled_before_sent = self @@ -1086,11 +1080,11 @@ impl WorkflowMachines { IncomingProtocolMessageBody::UpdateRequest(ur) => { let seq_id = if let SequencingId::EventId(eid) = message .sequencing_id - .ok_or_else(|| fatal!("{}", SEQIDERR.to_string()))? + .ok_or_else(|| fatal!("{}", SEQIDERR))? { eid } else { - return Err(fatal!("{}", SEQIDERR.to_string())); + return Err(fatal!("{}", SEQIDERR)); }; let um = UpdateMachine::init( message.id, @@ -1362,9 +1356,7 @@ impl WorkflowMachines { let seq = attrs.seq; let attrs: ValidScheduleLA = ValidScheduleLA::from_schedule_la(attrs, cmd.metadata).map_err(|e| { - fatal!( - "Invalid schedule local activity request (seq {seq}): {e}" - ) + fatal!("Invalid schedule local activity request (seq {seq}): {e}") })?; let (la, mach_resp) = new_local_activity( attrs, @@ -1472,10 +1464,7 @@ impl WorkflowMachines { } WFCommandVariant::RequestCancelExternalWorkflow(attrs) => { let we = attrs.workflow_execution.ok_or_else(|| { - fatal!("{}", - "Cancel external workflow command had no workflow_execution field" - .to_string(), - ) + fatal!("Cancel external workflow command had no workflow_execution field") })?; self.add_cmd_to_wf_task( new_external_cancel( @@ -1555,9 +1544,10 @@ impl WorkflowMachines { } fn get_machine_key(&self, id: CommandID) -> Result { - Ok(*self.id_to_machine.get(&id).ok_or_else(|| { - nondeterminism!("Missing associated machine for {id:?}") - })?) + Ok(*self + .id_to_machine + .get(&id) + .ok_or_else(|| nondeterminism!("Missing associated machine for {id:?}"))?) } fn get_machine_by_msg(&self, protocol_instance_id: &str) -> Result { @@ -1565,9 +1555,7 @@ impl WorkflowMachines { .machines_by_protocol_instance_id .get(protocol_instance_id) .ok_or_else(|| { - fatal!( - "Missing associated machine for protocol message {protocol_instance_id}" - ) + fatal!("Missing associated machine for protocol message {protocol_instance_id}") })?) } diff --git a/crates/sdk-core/src/worker/workflow/machines/workflow_task_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/workflow_task_state_machine.rs index a27da4676..f82d3008b 100644 --- a/crates/sdk-core/src/worker/workflow/machines/workflow_task_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/workflow_task_state_machine.rs @@ -4,7 +4,7 @@ use super::{ EventInfo, StateMachine, TransitionResult, WFMachinesAdapter, WFMachinesError, fsm, workflow_machines::MachineResponse, }; -use crate::worker::workflow::{machines::HistEventData, fatal, nondeterminism}; +use crate::worker::workflow::{fatal, machines::HistEventData, nondeterminism}; use std::{ convert::{TryFrom, TryInto}, time::SystemTime, @@ -67,10 +67,9 @@ impl WFMachinesAdapter for WorkflowTaskMachine { let (event_id, event_type) = if let Some(ei) = event_info { (ei.event_id, ei.event_type) } else { - return Err(fatal!("{}", + return Err(fatal!( "WF Task machine should never issue a task started trigger \ command in response to non-history events" - .to_string(), )); }; @@ -136,15 +135,13 @@ impl TryFrom for WorkflowTaskMachineEvents { }, }) } else { - return Err(fatal!( - "Workflow task failed is missing attributes: {e}" - )); + return Err(fatal!("Workflow task failed is missing attributes: {e}")); } } _ => { return Err(nondeterminism!( "Event does not apply to a wf task machine: {e}" - )) + )); } }) } From 00071cff3aeba4eb2d4682dc7cb68ae09ce540da Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Fri, 14 Nov 2025 16:15:15 -0800 Subject: [PATCH 9/9] fmt II --- .../workflow/machines/cancel_workflow_state_machine.rs | 2 +- .../workflow/machines/complete_workflow_state_machine.rs | 2 +- .../machines/continue_as_new_workflow_state_machine.rs | 2 +- .../workflow/machines/fail_workflow_state_machine.rs | 2 +- .../workflow/machines/nexus_operation_state_machine.rs | 2 +- .../src/worker/workflow/machines/workflow_machines.rs | 7 +++---- 6 files changed, 8 insertions(+), 9 deletions(-) diff --git a/crates/sdk-core/src/worker/workflow/machines/cancel_workflow_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/cancel_workflow_state_machine.rs index 65688472f..30331c4e9 100644 --- a/crates/sdk-core/src/worker/workflow/machines/cancel_workflow_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/cancel_workflow_state_machine.rs @@ -2,7 +2,7 @@ use super::{ EventInfo, NewMachineWithCommand, OnEventWrapper, StateMachine, TransitionResult, WFMachinesAdapter, WFMachinesError, fsm, workflow_machines::MachineResponse, }; -use crate::worker::workflow::{fatal, machines::HistEventData, nondeterminism}; +use crate::worker::workflow::{machines::HistEventData, nondeterminism}; use std::convert::TryFrom; use temporalio_common::protos::{ coresdk::workflow_commands::CancelWorkflowExecution, diff --git a/crates/sdk-core/src/worker/workflow/machines/complete_workflow_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/complete_workflow_state_machine.rs index 14f65f1d9..2d6579b03 100644 --- a/crates/sdk-core/src/worker/workflow/machines/complete_workflow_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/complete_workflow_state_machine.rs @@ -4,7 +4,7 @@ use super::{ }; use crate::{ abstractions::dbg_panic, - worker::workflow::{fatal, machines::HistEventData, nondeterminism}, + worker::workflow::{machines::HistEventData, nondeterminism}, }; use std::convert::TryFrom; use temporalio_common::protos::{ diff --git a/crates/sdk-core/src/worker/workflow/machines/continue_as_new_workflow_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/continue_as_new_workflow_state_machine.rs index 925d8aa4d..148203457 100644 --- a/crates/sdk-core/src/worker/workflow/machines/continue_as_new_workflow_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/continue_as_new_workflow_state_machine.rs @@ -2,7 +2,7 @@ use super::{ EventInfo, MachineResponse, NewMachineWithCommand, OnEventWrapper, StateMachine, TransitionResult, WFMachinesAdapter, WFMachinesError, fsm, }; -use crate::worker::workflow::{fatal, machines::HistEventData, nondeterminism}; +use crate::worker::workflow::{machines::HistEventData, nondeterminism}; use std::convert::TryFrom; use temporalio_common::protos::{ coresdk::workflow_commands::ContinueAsNewWorkflowExecution, diff --git a/crates/sdk-core/src/worker/workflow/machines/fail_workflow_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/fail_workflow_state_machine.rs index 0c4dd8112..290ee9278 100644 --- a/crates/sdk-core/src/worker/workflow/machines/fail_workflow_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/fail_workflow_state_machine.rs @@ -2,7 +2,7 @@ use super::{ EventInfo, NewMachineWithCommand, OnEventWrapper, StateMachine, TransitionResult, WFMachinesAdapter, WFMachinesError, fsm, workflow_machines::MachineResponse, }; -use crate::worker::workflow::{fatal, machines::HistEventData, nondeterminism}; +use crate::worker::workflow::{machines::HistEventData, nondeterminism}; use std::convert::TryFrom; use temporalio_common::protos::{ coresdk::workflow_commands::FailWorkflowExecution, diff --git a/crates/sdk-core/src/worker/workflow/machines/nexus_operation_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/nexus_operation_state_machine.rs index 81b0dc028..b8c744982 100644 --- a/crates/sdk-core/src/worker/workflow/machines/nexus_operation_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/nexus_operation_state_machine.rs @@ -1,6 +1,6 @@ use super::{MachineError, StateMachine, TransitionResult, fsm}; use crate::worker::workflow::{ - WFMachinesError, fatal, + WFMachinesError, machines::{ EventInfo, HistEventData, NewMachineWithCommand, OnEventWrapper, WFMachinesAdapter, workflow_machines::MachineResponse, diff --git a/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs b/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs index bbc46d5e5..dd7ba43f2 100644 --- a/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs +++ b/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs @@ -1078,13 +1078,12 @@ impl WorkflowMachines { match message.body { IncomingProtocolMessageBody::UpdateRequest(ur) => { - let seq_id = if let SequencingId::EventId(eid) = message - .sequencing_id - .ok_or_else(|| fatal!("{}", SEQIDERR))? + let seq_id = if let SequencingId::EventId(eid) = + message.sequencing_id.ok_or_else(|| fatal!("{SEQIDERR}"))? { eid } else { - return Err(fatal!("{}", SEQIDERR)); + return Err(fatal!("{SEQIDERR}")); }; let um = UpdateMachine::init( message.id,