Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use super::{
use crate::{
abstractions::dbg_panic,
internal_flags::CoreInternalFlags,
worker::workflow::{InternalFlagsRef, machines::HistEventData},
worker::workflow::{InternalFlagsRef, machines::HistEventData, fatal, nondeterminism},
};
use std::convert::{TryFrom, TryInto};
use temporalio_common::protos::{
Expand Down Expand Up @@ -179,7 +179,7 @@ impl ActivityMachine {
"Activity cancel ignored in terminal state",
::serde_json::json!({
"seq": self.shared_state.attrs.seq,
"state": format!("{:?}", self.state())
"state": format!("{}", self.state())
})
);

Expand Down Expand Up @@ -236,9 +236,9 @@ impl TryFrom<HistEventData> for ActivityMachineEvents {
last_task_in_history,
})
} else {
return Err(WFMachinesError::Fatal(format!(
return Err(fatal!(
"Activity scheduled attributes were unset: {e}"
)));
));
}
}
EventType::ActivityTaskStarted => Self::ActivityTaskStarted(e.event_id),
Expand All @@ -249,9 +249,9 @@ impl TryFrom<HistEventData> for ActivityMachineEvents {
{
Self::ActivityTaskCompleted(attrs)
} else {
return Err(WFMachinesError::Fatal(format!(
return Err(fatal!(
"Activity completion attributes were unset: {e}"
)));
));
}
}
EventType::ActivityTaskFailed => {
Expand All @@ -260,9 +260,9 @@ impl TryFrom<HistEventData> for ActivityMachineEvents {
{
Self::ActivityTaskFailed(attrs)
} else {
return Err(WFMachinesError::Fatal(format!(
return Err(fatal!(
"Activity failure attributes were unset: {e}"
)));
));
}
}
EventType::ActivityTaskTimedOut => {
Expand All @@ -271,9 +271,9 @@ impl TryFrom<HistEventData> for ActivityMachineEvents {
{
Self::ActivityTaskTimedOut(attrs)
} else {
return Err(WFMachinesError::Fatal(format!(
return Err(fatal!(
"Activity timeout attributes were unset: {e}"
)));
));
}
}
EventType::ActivityTaskCancelRequested => Self::ActivityTaskCancelRequested,
Expand All @@ -283,15 +283,15 @@ impl TryFrom<HistEventData> for ActivityMachineEvents {
{
Self::ActivityTaskCanceled(attrs)
} else {
return Err(WFMachinesError::Fatal(format!(
return Err(fatal!(
"Activity cancellation attributes were unset: {e}"
)));
));
}
}
_ => {
return Err(WFMachinesError::Nondeterminism(format!(
return Err(nondeterminism!(
"Activity machine does not handle this event: {e}"
)));
));
}
})
}
Expand Down Expand Up @@ -388,18 +388,18 @@ impl ScheduleCommandCreated {
sched_dat.last_task_in_history,
) {
if sched_dat.act_id != dat.attrs.activity_id {
return TransitionResult::Err(WFMachinesError::Nondeterminism(format!(
return TransitionResult::Err(nondeterminism!(
"Activity id of scheduled event '{}' does not \
match activity id of activity command '{}'",
sched_dat.act_id, dat.attrs.activity_id
)));
));
}
if sched_dat.act_type != dat.attrs.activity_type {
return TransitionResult::Err(WFMachinesError::Nondeterminism(format!(
return TransitionResult::Err(nondeterminism!(
"Activity type of scheduled event '{}' does not \
match activity type of activity command '{}'",
sched_dat.act_type, dat.attrs.activity_type
)));
));
}
}
dat.scheduled_event_id = sched_dat.event_id;
Expand Down Expand Up @@ -446,7 +446,6 @@ impl ScheduledEventRecorded {
"Activity timed out before starting",
::serde_json::json!({
"activity_id": dat.attrs.activity_id,
"timeout_type": attrs.timeout_type,
"state": "ScheduledEventRecorded"
})
);
Expand Down Expand Up @@ -535,7 +534,6 @@ impl Started {
"Activity timed out after starting",
::serde_json::json!({
"activity_id": dat.attrs.activity_id,
"timeout_type": attrs.timeout_type,
"state": "Started"
})
);
Expand Down Expand Up @@ -763,10 +761,10 @@ impl Canceled {
);
TransitionResult::default()
} else {
TransitionResult::Err(WFMachinesError::Nondeterminism(format!(
TransitionResult::Err(nondeterminism!(
"Non-Abandon cancel mode activities cannot be started after being cancelled. \
Seq: {seq_num:?}"
)))
))
}
}
pub(super) fn on_activity_task_completed(
Expand All @@ -786,9 +784,9 @@ impl Canceled {
);
TransitionResult::default()
} else {
TransitionResult::Err(WFMachinesError::Nondeterminism(format!(
TransitionResult::Err(nondeterminism!(
"Non-Abandon cancel mode activities cannot be completed after being cancelled: {attrs:?}"
)))
))
}
}
}
Expand Down Expand Up @@ -914,9 +912,9 @@ fn convert_payloads(
result: Option<Payloads>,
) -> Result<Option<Payload>, WFMachinesError> {
result.map(TryInto::try_into).transpose().map_err(|pe| {
WFMachinesError::Fatal(format!(
fatal!(
"Not exactly one payload in activity result ({pe}) for event: {event_info:?}"
))
)
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::{
EventInfo, NewMachineWithCommand, OnEventWrapper, StateMachine, TransitionResult,
WFMachinesAdapter, WFMachinesError, fsm, workflow_machines::MachineResponse,
};
use crate::worker::workflow::machines::HistEventData;
use crate::worker::workflow::{machines::HistEventData, fatal, nondeterminism};
use std::convert::TryFrom;
use temporalio_common::protos::{
coresdk::{
Expand Down Expand Up @@ -153,15 +153,15 @@ impl TryFrom<HistEventData> for CancelExternalMachineEvents {
if let Some(history_event::Attributes::RequestCancelExternalWorkflowExecutionFailedEventAttributes(attrs)) = e.attributes {
Self::RequestCancelExternalWorkflowExecutionFailed(attrs.cause())
} else {
return Err(WFMachinesError::Fatal(format!(
return Err(fatal!(
"Cancelworkflow failed attributes were unset: {e}"
)));
));
}
}
_ => {
return Err(WFMachinesError::Nondeterminism(format!(
return Err(nondeterminism!(
"Cancel external WF machine does not handle this event: {e}"
)))
))
}
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use super::{
workflow_machines::MachineResponse,
};
use super::{StateMachine, TransitionResult, fsm};
use crate::worker::workflow::machines::HistEventData;
use crate::worker::workflow::{machines::HistEventData, nondeterminism, fatal};
use std::convert::TryFrom;
use temporalio_common::protos::{
coresdk::workflow_activation::ResolveCancelNexusOperation,
Expand Down Expand Up @@ -76,9 +76,9 @@ impl TryFrom<HistEventData> for CancelNexusOpMachineEvents {
Ok(match e.event_type() {
EventType::NexusOperationCancelRequested => Self::NexusOpCancelRequested,
_ => {
return Err(WFMachinesError::Nondeterminism(format!(
return Err(nondeterminism!(
"Cancel external WF machine does not handle this event: {e}"
)));
));
}
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::{
EventInfo, NewMachineWithCommand, OnEventWrapper, StateMachine, TransitionResult,
WFMachinesAdapter, WFMachinesError, fsm, workflow_machines::MachineResponse,
};
use crate::worker::workflow::machines::HistEventData;
use crate::worker::workflow::{machines::HistEventData, nondeterminism, fatal};
use std::convert::TryFrom;
use temporalio_common::protos::{
coresdk::workflow_commands::CancelWorkflowExecution,
Expand Down Expand Up @@ -67,9 +67,9 @@ impl TryFrom<HistEventData> for CancelWorkflowMachineEvents {
Ok(match EventType::try_from(e.event_type) {
Ok(EventType::WorkflowExecutionCanceled) => Self::WorkflowExecutionCanceled,
_ => {
return Err(WFMachinesError::Nondeterminism(format!(
return Err(nondeterminism!(
"Cancel workflow machine does not handle this event: {e}"
)));
));
}
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use super::{
use crate::{
abstractions::dbg_panic,
internal_flags::CoreInternalFlags,
worker::workflow::{InternalFlagsRef, machines::HistEventData},
worker::workflow::{InternalFlagsRef, machines::HistEventData, fatal, nondeterminism},
};
use itertools::Itertools;
use std::{
Expand Down Expand Up @@ -134,7 +134,7 @@ pub(super) struct Cancelled {
}

fn completion_of_not_abandoned_err() -> WFMachinesError {
WFMachinesError::Nondeterminism(
nondeterminism!("{}",
"Child workflows which don't have the ABANDON cancellation type cannot complete after \
being cancelled."
.to_string(),
Expand All @@ -146,7 +146,7 @@ impl Cancelled {
self,
) -> ChildWorkflowMachineTransition<Cancelled> {
if self.seen_cancelled_event {
ChildWorkflowMachineTransition::Err(WFMachinesError::Fatal(
ChildWorkflowMachineTransition::Err(fatal!("{}",
"Child workflow has already seen a ChildWorkflowExecutionCanceledEvent, and now \
another is being applied! This is a bug, please report."
.to_string(),
Expand Down Expand Up @@ -234,18 +234,18 @@ impl StartCommandCreated {
event_dat.last_task_in_history,
) {
if event_dat.wf_id != state.workflow_id {
return TransitionResult::Err(WFMachinesError::Nondeterminism(format!(
return TransitionResult::Err(nondeterminism!(
"Child workflow id of scheduled event '{}' does not \
match child workflow id of command '{}'",
event_dat.wf_id, state.workflow_id
)));
));
}
if event_dat.wf_type != state.workflow_type {
return TransitionResult::Err(WFMachinesError::Nondeterminism(format!(
return TransitionResult::Err(nondeterminism!(
"Child workflow type of scheduled event '{}' does not \
match child workflow type of command '{}'",
event_dat.wf_type, state.workflow_type
)));
));
}
}
state.initiated_event_id = event_dat.event_id;
Expand Down Expand Up @@ -533,7 +533,7 @@ impl TryFrom<HistEventData> for ChildWorkflowMachineEvents {
last_task_in_history,
})
} else {
return Err(WFMachinesError::Fatal(
return Err(fatal!("{}",
"StartChildWorkflowExecutionInitiated attributes were unset".to_string(),
));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All these ones are oddly formatted with {} just being replaced w/ a static string

}
Expand All @@ -547,13 +547,13 @@ impl TryFrom<HistEventData> for ChildWorkflowMachineEvents {
{
Self::StartChildWorkflowExecutionFailed(
StartChildWorkflowExecutionFailedCause::try_from(cause).map_err(|_| {
WFMachinesError::Fatal(
fatal!("{}",
"Invalid StartChildWorkflowExecutionFailedCause".to_string(),
)
})?,
)
} else {
return Err(WFMachinesError::Fatal(
return Err(fatal!("{}",
"StartChildWorkflowExecutionFailed attributes were unset".to_string(),
));
}
Expand All @@ -573,7 +573,7 @@ impl TryFrom<HistEventData> for ChildWorkflowMachineEvents {
started_event_id: e.event_id,
})
} else {
return Err(WFMachinesError::Fatal(
return Err(fatal!("{}",
"ChildWorkflowExecutionStarted attributes were unset or malformed"
.to_string(),
));
Expand All @@ -588,7 +588,7 @@ impl TryFrom<HistEventData> for ChildWorkflowMachineEvents {
{
Self::ChildWorkflowExecutionCompleted(result)
} else {
return Err(WFMachinesError::Fatal(
return Err(fatal!("{}",
"ChildWorkflowExecutionCompleted attributes were unset or malformed"
.to_string(),
));
Expand All @@ -601,7 +601,7 @@ impl TryFrom<HistEventData> for ChildWorkflowMachineEvents {
{
Self::ChildWorkflowExecutionFailed(attrs)
} else {
return Err(WFMachinesError::Fatal(
return Err(fatal!("{}",
"ChildWorkflowExecutionFailed attributes were unset".to_string(),
));
}
Expand All @@ -613,7 +613,7 @@ impl TryFrom<HistEventData> for ChildWorkflowMachineEvents {
{
Self::ChildWorkflowExecutionTimedOut(atts.retry_state())
} else {
return Err(WFMachinesError::Fatal(
return Err(fatal!("{}",
"ChildWorkflowExecutionTimedOut attributes were unset or malformed"
.to_string(),
));
Expand All @@ -624,9 +624,9 @@ impl TryFrom<HistEventData> for ChildWorkflowMachineEvents {
}
Ok(EventType::ChildWorkflowExecutionCanceled) => Self::ChildWorkflowExecutionCancelled,
_ => {
return Err(WFMachinesError::Nondeterminism(format!(
return Err(nondeterminism!(
"Child workflow machine does not handle this event: {e:?}"
)));
))
}
})
}
Expand Down Expand Up @@ -773,9 +773,9 @@ fn convert_payloads(
result: Option<Payloads>,
) -> Result<Option<Payload>, WFMachinesError> {
result.map(TryInto::try_into).transpose().map_err(|pe| {
WFMachinesError::Fatal(format!(
fatal!(
"Not exactly one payload in child workflow result ({pe}) for event: {event_info:?}"
))
)
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::{
EventInfo, NewMachineWithCommand, OnEventWrapper, StateMachine, TransitionResult,
WFMachinesAdapter, WFMachinesError, fsm, workflow_machines::MachineResponse,
};
use crate::{abstractions::dbg_panic, worker::workflow::machines::HistEventData};
use crate::{abstractions::dbg_panic, worker::workflow::{machines::HistEventData, nondeterminism, fatal}};
use std::convert::TryFrom;
use temporalio_common::protos::{
coresdk::workflow_commands::CompleteWorkflowExecution,
Expand Down Expand Up @@ -60,9 +60,9 @@ impl TryFrom<HistEventData> for CompleteWorkflowMachineEvents {
Ok(match e.event_type() {
EventType::WorkflowExecutionCompleted => Self::WorkflowExecutionCompleted,
_ => {
return Err(WFMachinesError::Nondeterminism(format!(
return Err(nondeterminism!(
"Complete workflow machine does not handle this event: {e}"
)));
));
}
})
}
Expand Down
Loading
Loading