Skip to content

Commit 2ceee6d

Browse files
authored
Change default WFT fail cause to worker unhandled (#1047)
1 parent acc1c86 commit 2ceee6d

File tree

4 files changed

+62
-17
lines changed

4 files changed

+62
-17
lines changed

crates/sdk-core/src/core_tests/workflow_tasks.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1286,6 +1286,40 @@ async fn fail_wft_then_recover() {
12861286
core.shutdown().await;
12871287
}
12881288

1289+
#[tokio::test]
1290+
async fn default_wft_fail_cause_is_worker_unhandled() {
1291+
let t = canned_histories::single_timer("1");
1292+
let mut mh = MockPollCfg::from_resp_batches(
1293+
"fake_wf_id",
1294+
t,
1295+
[ResponseType::AllHistory],
1296+
mock_worker_client(),
1297+
);
1298+
mh.num_expected_fails = 1;
1299+
mh.expect_fail_wft_matcher = Box::new(|_, cause, _| {
1300+
matches!(
1301+
cause,
1302+
WorkflowTaskFailedCause::WorkflowWorkerUnhandledFailure
1303+
)
1304+
});
1305+
let mock = build_mock_pollers(mh);
1306+
let core = mock_worker(mock);
1307+
1308+
let act = core.poll_workflow_activation().await.unwrap();
1309+
core.complete_workflow_activation(WorkflowActivationCompletion::fail(
1310+
act.run_id,
1311+
Failure {
1312+
message: "Simulated workflow failure".to_string(),
1313+
..Default::default()
1314+
},
1315+
None,
1316+
))
1317+
.await
1318+
.unwrap();
1319+
1320+
core.shutdown().await;
1321+
}
1322+
12891323
#[tokio::test]
12901324
async fn poll_response_triggers_wf_error() {
12911325
let mut t = TestHistoryBuilder::default();

crates/sdk-core/src/worker/workflow/managed_run.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,7 @@ impl ManagedRun {
405405
}) {
406406
ActivationCompleteOutcome::ReportWFTFail(FailedActivationWFTReport::Report(
407407
tt,
408-
WorkflowTaskFailedCause::Unspecified,
408+
WorkflowTaskFailedCause::WorkflowWorkerUnhandledFailure,
409409
Failure::application_failure(reason, true).into(),
410410
))
411411
} else {
@@ -888,7 +888,7 @@ impl ManagedRun {
888888
// We just checked it is some, unwrap OK.
889889
let c = self.completion_waiting_on_page_fetch.take().unwrap();
890890
let run_upd = self.failed_completion(
891-
WorkflowTaskFailedCause::Unspecified,
891+
WorkflowTaskFailedCause::WorkflowWorkerUnhandledFailure,
892892
info.reason,
893893
Failure::application_failure(info.message, false).into(),
894894
true,
@@ -1022,7 +1022,7 @@ impl ManagedRun {
10221022
let fail_cause = if matches!(&fail.source, WFMachinesError::Nondeterminism(_)) {
10231023
WorkflowTaskFailedCause::NonDeterministicError
10241024
} else {
1025-
WorkflowTaskFailedCause::Unspecified
1025+
WorkflowTaskFailedCause::WorkflowWorkerUnhandledFailure
10261026
};
10271027
self.failed_completion(
10281028
fail_cause,

crates/sdk-core/src/worker/workflow/workflow_stream.rs

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -288,17 +288,24 @@ impl WFStream {
288288
failure,
289289
is_autocomplete,
290290
..
291-
} => rh.failed_completion(
292-
failure.force_cause(),
293-
if is_autocomplete {
294-
EvictionReason::Unspecified
295-
} else {
296-
EvictionReason::LangFail
297-
},
298-
failure,
299-
is_autocomplete,
300-
complete.response_tx,
301-
),
291+
} => {
292+
let forced_cause = failure.force_cause();
293+
rh.failed_completion(
294+
if forced_cause == WorkflowTaskFailedCause::Unspecified {
295+
WorkflowTaskFailedCause::WorkflowWorkerUnhandledFailure
296+
} else {
297+
forced_cause
298+
},
299+
if is_autocomplete {
300+
EvictionReason::Unspecified
301+
} else {
302+
EvictionReason::LangFail
303+
},
304+
failure,
305+
is_autocomplete,
306+
complete.response_tx,
307+
)
308+
}
302309
},
303310
NewOrFetchedComplete::Fetched(update, paginator) => {
304311
rh.fetched_page_completion(update, *paginator)

crates/sdk-core/tests/integ_tests/workflow_tests/determinism.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -121,11 +121,15 @@ async fn test_panic_wf_task_rejected_properly() {
121121
let t = canned_histories::workflow_fails_with_failure_after_timer("1");
122122
let mock = mock_worker_client();
123123
let mut mh = MockPollCfg::from_resp_batches(wf_id, t, [1, 2, 2], mock);
124-
// We should see one wft failure which has unspecified cause, since panics don't have a defined
124+
// We should see one wft failure which has the default cause, since panics don't have a defined
125125
// type.
126126
mh.num_expected_fails = 1;
127-
mh.expect_fail_wft_matcher =
128-
Box::new(|_, cause, _| matches!(cause, WorkflowTaskFailedCause::Unspecified));
127+
mh.expect_fail_wft_matcher = Box::new(|_, cause, _| {
128+
matches!(
129+
cause,
130+
WorkflowTaskFailedCause::WorkflowWorkerUnhandledFailure
131+
)
132+
});
129133
let mut worker = mock_sdk(mh);
130134

131135
worker.register_wf(wf_type.to_owned(), timer_wf_fails_once);

0 commit comments

Comments
 (0)