7272 service workflowserviceclient.Interface
7373 taskHandler WorkflowTaskHandler
7474 ldaTunnel * locallyDispatchedActivityTunnel
75- metricsScope tally. Scope
75+ metricsScope * metrics. TaggedScope
7676 logger * zap.Logger
7777
7878 stickyUUID string
@@ -276,7 +276,7 @@ func newWorkflowTaskPoller(
276276 identity : params .Identity ,
277277 taskHandler : taskHandler ,
278278 ldaTunnel : ldaTunnel ,
279- metricsScope : params .MetricsScope ,
279+ metricsScope : metrics . NewTaggedScope ( params .MetricsScope ) ,
280280 logger : params .Logger ,
281281 stickyUUID : uuid .New (),
282282 disableStickyExecution : params .DisableStickyExecution ,
@@ -385,8 +385,9 @@ func (wtp *workflowTaskPoller) processResetStickinessTask(rst *resetStickinessTa
385385
386386func (wtp * workflowTaskPoller ) RespondTaskCompletedWithMetrics (completedRequest interface {}, taskErr error , task * s.PollForDecisionTaskResponse , startTime time.Time ) (response * s.RespondDecisionTaskCompletedResponse , err error ) {
387387
388+ metricsScope := wtp .metricsScope .GetTaggedScope (tagWorkflowType , task .WorkflowType .GetName ())
388389 if taskErr != nil {
389- wtp . metricsScope .Counter (metrics .DecisionExecutionFailedCounter ).Inc (1 )
390+ metricsScope .Counter (metrics .DecisionExecutionFailedCounter ).Inc (1 )
390391 wtp .logger .Warn ("Failed to process decision task." ,
391392 zap .String (tagWorkflowType , task .WorkflowType .GetName ()),
392393 zap .String (tagWorkflowID , task .WorkflowExecution .GetWorkflowId ()),
@@ -395,17 +396,17 @@ func (wtp *workflowTaskPoller) RespondTaskCompletedWithMetrics(completedRequest
395396 // convert err to DecisionTaskFailed
396397 completedRequest = errorToFailDecisionTask (task .TaskToken , taskErr , wtp .identity )
397398 } else {
398- wtp . metricsScope .Counter (metrics .DecisionTaskCompletedCounter ).Inc (1 )
399+ metricsScope .Counter (metrics .DecisionTaskCompletedCounter ).Inc (1 )
399400 }
400401
401- wtp . metricsScope .Timer (metrics .DecisionExecutionLatency ).Record (time .Now ().Sub (startTime ))
402+ metricsScope .Timer (metrics .DecisionExecutionLatency ).Record (time .Now ().Sub (startTime ))
402403
403404 responseStartTime := time .Now ()
404405 if response , err = wtp .RespondTaskCompleted (completedRequest , task ); err != nil {
405- wtp . metricsScope .Counter (metrics .DecisionResponseFailedCounter ).Inc (1 )
406+ metricsScope .Counter (metrics .DecisionResponseFailedCounter ).Inc (1 )
406407 return
407408 }
408- wtp . metricsScope .Timer (metrics .DecisionResponseLatency ).Record (time .Now ().Sub (responseStartTime ))
409+ metricsScope .Timer (metrics .DecisionResponseLatency ).Record (time .Now ().Sub (responseStartTime ))
409410
410411 return
411412}
@@ -787,11 +788,12 @@ func (wtp *workflowTaskPoller) poll(ctx context.Context) (interface{}, error) {
787788 zap .Bool ("IsQueryTask" , response .Query != nil ))
788789 })
789790
790- wtp .metricsScope .Counter (metrics .DecisionPollSucceedCounter ).Inc (1 )
791- wtp .metricsScope .Timer (metrics .DecisionPollLatency ).Record (time .Now ().Sub (startTime ))
791+ metricsScope := wtp .metricsScope .GetTaggedScope (tagWorkflowType , response .WorkflowType .GetName ())
792+ metricsScope .Counter (metrics .DecisionPollSucceedCounter ).Inc (1 )
793+ metricsScope .Timer (metrics .DecisionPollLatency ).Record (time .Now ().Sub (startTime ))
792794
793795 scheduledToStartLatency := time .Duration (response .GetStartedTimestamp () - response .GetScheduledTimestamp ())
794- wtp . metricsScope .Timer (metrics .DecisionScheduledToStartLatency ).Record (scheduledToStartLatency )
796+ metricsScope .Timer (metrics .DecisionScheduledToStartLatency ).Record (scheduledToStartLatency )
795797 return task , nil
796798}
797799
@@ -966,11 +968,14 @@ func (atp *activityTaskPoller) pollWithMetrics(ctx context.Context,
966968 return & activityTask {}, nil
967969 }
968970
969- atp .metricsScope .Counter (metrics .ActivityPollSucceedCounter ).Inc (1 )
970- atp .metricsScope .Timer (metrics .ActivityPollLatency ).Record (time .Now ().Sub (startTime ))
971+ workflowType := response .WorkflowType .GetName ()
972+ activityType := response .ActivityType .GetName ()
973+ metricsScope := getMetricsScopeForActivity (atp .metricsScope , workflowType , activityType )
974+ metricsScope .Counter (metrics .ActivityPollSucceedCounter ).Inc (1 )
975+ metricsScope .Timer (metrics .ActivityPollLatency ).Record (time .Now ().Sub (startTime ))
971976
972977 scheduledToStartLatency := time .Duration (response .GetStartedTimestamp () - response .GetScheduledTimestampOfThisAttempt ())
973- atp . metricsScope .Timer (metrics .ActivityScheduledToStartLatency ).Record (scheduledToStartLatency )
978+ metricsScope .Timer (metrics .ActivityScheduledToStartLatency ).Record (scheduledToStartLatency )
974979
975980 return & activityTask {task : response , pollStartTime : startTime }, nil
976981}
0 commit comments