@@ -193,4 +193,102 @@ public void concurrentPollRequestLockTest() throws Exception {
193193 // Verify we only handled two tasks
194194 verify (taskHandler , times (2 )).handleWorkflowTask (any ());
195195 }
196+
197+ @ Test
198+ public void respondWorkflowTaskFailureMetricTest () throws Exception {
199+ // Test that if the SDK gets a failure on RespondWorkflowTaskCompleted it does not increment
200+ // workflow_task_execution_failed.
201+ WorkflowServiceStubs client = mock (WorkflowServiceStubs .class );
202+ when (client .getServerCapabilities ())
203+ .thenReturn (() -> GetSystemInfoResponse .Capabilities .newBuilder ().build ());
204+
205+ WorkflowRunLockManager runLockManager = new WorkflowRunLockManager ();
206+
207+ Scope metricsScope =
208+ new RootScopeBuilder ()
209+ .reporter (reporter )
210+ .reportEvery (com .uber .m3 .util .Duration .ofMillis (1 ));
211+ WorkflowExecutorCache cache = new WorkflowExecutorCache (10 , runLockManager , metricsScope );
212+
213+ WorkflowTaskHandler taskHandler = mock (WorkflowTaskHandler .class );
214+ when (taskHandler .isAnyTypeSupported ()).thenReturn (true );
215+
216+ EagerActivityDispatcher eagerActivityDispatcher = mock (EagerActivityDispatcher .class );
217+ WorkflowWorker worker =
218+ new WorkflowWorker (
219+ client ,
220+ "default" ,
221+ "task_queue" ,
222+ "sticky_task_queue" ,
223+ SingleWorkerOptions .newBuilder ()
224+ .setIdentity ("test_identity" )
225+ .setBuildId (UUID .randomUUID ().toString ())
226+ .setPollerOptions (PollerOptions .newBuilder ().setPollThreadCount (1 ).build ())
227+ .setMetricsScope (metricsScope )
228+ .build (),
229+ runLockManager ,
230+ cache ,
231+ taskHandler ,
232+ eagerActivityDispatcher );
233+
234+ WorkflowServiceGrpc .WorkflowServiceBlockingStub blockingStub =
235+ mock (WorkflowServiceGrpc .WorkflowServiceBlockingStub .class );
236+ when (client .blockingStub ()).thenReturn (blockingStub );
237+ when (blockingStub .withOption (any (), any ())).thenReturn (blockingStub );
238+
239+ PollWorkflowTaskQueueResponse pollResponse =
240+ PollWorkflowTaskQueueResponse .newBuilder ()
241+ .setTaskToken (ByteString .copyFrom ("token" , UTF_8 ))
242+ .setWorkflowExecution (
243+ WorkflowExecution .newBuilder ().setWorkflowId (WORKFLOW_ID ).setRunId (RUN_ID ).build ())
244+ .setWorkflowType (WorkflowType .newBuilder ().setName (WORKFLOW_TYPE ).build ())
245+ .build ();
246+
247+ CountDownLatch pollTaskQueueLatch = new CountDownLatch (1 );
248+ CountDownLatch blockPollTaskQueueLatch = new CountDownLatch (1 );
249+
250+ when (blockingStub .pollWorkflowTaskQueue (any (PollWorkflowTaskQueueRequest .class )))
251+ .thenReturn (pollResponse )
252+ .thenAnswer (
253+ (Answer <PollWorkflowTaskQueueResponse >)
254+ invocation -> {
255+ pollTaskQueueLatch .countDown ();
256+ blockPollTaskQueueLatch .await ();
257+ return null ;
258+ });
259+ ;
260+
261+ CountDownLatch handleTaskLatch = new CountDownLatch (1 );
262+
263+ when (taskHandler .handleWorkflowTask (any (PollWorkflowTaskQueueResponse .class )))
264+ .thenAnswer (
265+ (Answer <WorkflowTaskHandler .Result >)
266+ invocation -> {
267+ handleTaskLatch .countDown ();
268+
269+ return new WorkflowTaskHandler .Result (
270+ WORKFLOW_TYPE ,
271+ RespondWorkflowTaskCompletedRequest .newBuilder ().build (),
272+ null ,
273+ null ,
274+ null ,
275+ false ,
276+ null );
277+ });
278+
279+ when (blockingStub .respondWorkflowTaskCompleted (any (RespondWorkflowTaskCompletedRequest .class )))
280+ .thenThrow (new RuntimeException ());
281+
282+ assertTrue (worker .start ());
283+ // Wait until we have got all the polls
284+ pollTaskQueueLatch .await ();
285+ // Wait until the worker handles at least one WFT
286+ handleTaskLatch .await ();
287+ // Cleanup
288+ worker .shutdown (new ShutdownManager (), false ).get ();
289+ // Make sure we don't report workflow task failure
290+ reporter .assertNoMetric (
291+ MetricsType .WORKFLOW_TASK_EXECUTION_FAILURE_COUNTER ,
292+ ImmutableMap .of ("worker_type" , "WorkflowWorker" , "workflow_type" , "test-workflow-type" ));
293+ }
196294}
0 commit comments