Skip to content

Commit eadc5cd

Browse files
authored
Avoid spamming grpc message too large task failures (#1050)
1 parent 76b66fa commit eadc5cd

File tree

3 files changed

+64
-3
lines changed

3 files changed

+64
-3
lines changed

crates/sdk-core/src/core_tests/workflow_tasks.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use std::{
2727
},
2828
time::Duration,
2929
};
30+
use temporalio_client::MESSAGE_TOO_LARGE_KEY;
3031
use temporalio_common::{
3132
Worker as WorkerTrait,
3233
errors::PollError,
@@ -3070,3 +3071,54 @@ async fn both_normal_and_sticky_pollers_poll_concurrently() {
30703071
"At peak, there should be exactly 2 pollers active at the same time"
30713072
);
30723073
}
3074+
3075+
#[tokio::test]
3076+
async fn grpc_message_too_large_doesnt_spam_task_fails() {
3077+
let mut t = TestHistoryBuilder::default();
3078+
t.add_by_type(EventType::WorkflowExecutionStarted);
3079+
t.add_workflow_task_scheduled_and_started();
3080+
3081+
let mut mh = MockPollCfg::from_resp_batches(
3082+
"fake_wf_id",
3083+
t,
3084+
[
3085+
ResponseType::AllHistory,
3086+
ResponseType::AllHistory,
3087+
ResponseType::AllHistory,
3088+
ResponseType::AllHistory,
3089+
ResponseType::AllHistory,
3090+
ResponseType::AllHistory,
3091+
],
3092+
mock_worker_client(),
3093+
);
3094+
mh.num_expected_fails = 1;
3095+
let mut times = 1;
3096+
mh.completion_mock_fn = Some(Box::new(move |_| {
3097+
if times <= 5 {
3098+
let mut err = tonic::Status::new(tonic::Code::ResourceExhausted, "message too large");
3099+
// This key is what we look for
3100+
err.metadata_mut().insert(MESSAGE_TOO_LARGE_KEY, 1.into());
3101+
times += 1;
3102+
Err(err)
3103+
} else {
3104+
Ok(Default::default())
3105+
}
3106+
}));
3107+
3108+
let mut mock = build_mock_pollers(mh);
3109+
mock.worker_cfg(|wc| wc.max_cached_workflows = 1);
3110+
let core = mock_worker(mock);
3111+
3112+
// Since the mock makes us fail 5 times, we should succeed on the sixth
3113+
for _ in 1..=5 {
3114+
let act = core.poll_workflow_activation().await.unwrap();
3115+
core.complete_workflow_activation(WorkflowActivationCompletion::empty(&act.run_id))
3116+
.await
3117+
.unwrap();
3118+
core.handle_eviction().await;
3119+
}
3120+
let act = core.poll_workflow_activation().await.unwrap();
3121+
core.complete_execution(&act.run_id).await;
3122+
core.drain_pollers_and_shutdown().await;
3123+
// Mock only expects 1 task failure, and would fail here if we spammed
3124+
}

crates/sdk-core/src/worker/workflow/managed_run.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1142,6 +1142,7 @@ impl ManagedRun {
11421142
}
11431143
}
11441144

1145+
let attempt = self.wft.as_ref().map(|t| t.info.attempt).unwrap_or(1);
11451146
ActivationCompleteOutcome::ReportWFTSuccess(ServerCommandsWithWorkflowInfo {
11461147
task_token: data.task_token,
11471148
action: ActivationAction::WftComplete {
@@ -1151,6 +1152,7 @@ impl ManagedRun {
11511152
query_responses,
11521153
sdk_metadata: machines_wft_response.metadata_for_complete(),
11531154
versioning_behavior: data.versioning_behavior,
1155+
attempt,
11541156
},
11551157
})
11561158
} else {

crates/sdk-core/src/worker/workflow/mod.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -341,13 +341,14 @@ impl Workflows {
341341
force_new_wft,
342342
sdk_metadata,
343343
mut versioning_behavior,
344+
attempt,
344345
},
345346
} => {
346347
let reserved_act_permits =
347348
self.reserve_activity_slots_for_outgoing_commands(commands.as_mut_slice());
348349
debug!(commands=%commands.display(), query_responses=%query_responses.display(),
349-
messages=%messages.display(), force_new_wft,
350-
"Sending responses to server");
350+
messages=%messages.display(), force_new_wft,
351+
"Sending responses to server");
351352
if let Some(default_vb) = self.default_versioning_behavior.as_ref()
352353
&& versioning_behavior == VersioningBehavior::Unspecified
353354
{
@@ -393,7 +394,11 @@ impl Workflows {
393394
response.activity_tasks,
394395
);
395396
}
396-
Err(e) if e.metadata().contains_key(MESSAGE_TOO_LARGE_KEY) => {
397+
// Reply with a task failure if we got grpc too large from server, but
398+
// not if this is a nonfirst attempt to avoid spamming.
399+
Err(e)
400+
if e.metadata().contains_key(MESSAGE_TOO_LARGE_KEY) && attempt < 2 =>
401+
{
397402
let failure = Failure {
398403
failure: Some(
399404
temporalio_common::protos::temporal::api::failure::v1::Failure {
@@ -417,6 +422,7 @@ impl Workflows {
417422
);
418423
self.handle_activation_failed(run_id, completion_time, new_outcome)
419424
.await;
425+
return Err(e);
420426
}
421427
e => {
422428
e?;
@@ -992,6 +998,7 @@ pub(crate) enum ActivationAction {
992998
force_new_wft: bool,
993999
sdk_metadata: WorkflowTaskCompletedMetadata,
9941000
versioning_behavior: VersioningBehavior,
1001+
attempt: u32,
9951002
},
9961003
/// We should respond to a legacy query request
9971004
RespondLegacyQuery { result: Box<QueryResult> },

0 commit comments

Comments
 (0)