2727import io .temporal .common .interceptors .WorkflowOutboundCallsInterceptor ;
2828import io .temporal .common .interceptors .WorkflowOutboundCallsInterceptorBase ;
2929import io .temporal .opentracing .OpenTracingOptions ;
30+ import io .temporal .workflow .Functions ;
31+ import io .temporal .workflow .Promise ;
3032import io .temporal .workflow .Workflow ;
3133import io .temporal .workflow .WorkflowInfo ;
3234import io .temporal .workflow .unsafe .WorkflowUnsafe ;
35+ import java .util .concurrent .TimeUnit ;
36+ import java .util .concurrent .TimeoutException ;
3337
3438public class OpenTracingWorkflowOutboundCallsInterceptor
3539 extends WorkflowOutboundCallsInterceptorBase {
3640 private final SpanFactory spanFactory ;
3741 private final Tracer tracer ;
3842 private final ContextAccessor contextAccessor ;
3943
44+ private class PromiseWrapper <R > implements Promise <R > {
45+ private final Span capturedSpan ;
46+ private final Promise <R > delegate ;
47+
48+ PromiseWrapper (Span capturedSpan , Promise <R > delegate ) {
49+ this .capturedSpan = capturedSpan ;
50+ this .delegate = delegate ;
51+ }
52+
53+ private <O > O wrap (Functions .Func <O > fn ) {
54+ Span activeSpan = tracer .scopeManager ().activeSpan ();
55+ if (activeSpan == null && capturedSpan != null ) {
56+ try (Scope ignored = tracer .scopeManager ().activate (capturedSpan )) {
57+ return fn .apply ();
58+ }
59+ } else {
60+ return fn .apply ();
61+ }
62+ }
63+
64+ @ Override
65+ public boolean isCompleted () {
66+ return delegate .isCompleted ();
67+ }
68+
69+ @ Override
70+ public R get () {
71+ return delegate .get ();
72+ }
73+
74+ @ Override
75+ public R cancellableGet () {
76+ return delegate .cancellableGet ();
77+ }
78+
79+ @ Override
80+ public R get (long timeout , TimeUnit unit ) throws TimeoutException {
81+ return delegate .get (timeout , unit );
82+ }
83+
84+ @ Override
85+ public R cancellableGet (long timeout , TimeUnit unit ) throws TimeoutException {
86+ return delegate .cancellableGet (timeout , unit );
87+ }
88+
89+ @ Override
90+ public RuntimeException getFailure () {
91+ return delegate .getFailure ();
92+ }
93+
94+ @ Override
95+ public <U > Promise <U > thenApply (Functions .Func1 <? super R , ? extends U > fn ) {
96+ return delegate .thenApply ((r ) -> wrap (() -> fn .apply (r )));
97+ }
98+
99+ @ Override
100+ public <U > Promise <U > handle (Functions .Func2 <? super R , RuntimeException , ? extends U > fn ) {
101+ return delegate .handle ((r , e ) -> wrap (() -> fn .apply (r , e )));
102+ }
103+
104+ @ Override
105+ public <U > Promise <U > thenCompose (Functions .Func1 <? super R , ? extends Promise <U >> fn ) {
106+ return delegate .thenCompose ((r ) -> wrap (() -> fn .apply (r )));
107+ }
108+
109+ @ Override
110+ public Promise <R > exceptionally (Functions .Func1 <Throwable , ? extends R > fn ) {
111+ return delegate .exceptionally ((t ) -> wrap (() -> fn .apply (t )));
112+ }
113+ }
114+
40115 public OpenTracingWorkflowOutboundCallsInterceptor (
41116 WorkflowOutboundCallsInterceptor next ,
42117 OpenTracingOptions options ,
@@ -51,13 +126,16 @@ public OpenTracingWorkflowOutboundCallsInterceptor(
51126 @ Override
52127 public <R > ActivityOutput <R > executeActivity (ActivityInput <R > input ) {
53128 if (!WorkflowUnsafe .isReplaying ()) {
129+ Span capturedSpan = tracer .scopeManager ().activeSpan ();
54130 Span activityStartSpan =
55131 contextAccessor .writeSpanContextToHeader (
56132 () -> createActivityStartSpanBuilder (input .getActivityName ()).start (),
57133 input .getHeader (),
58134 tracer );
59135 try (Scope ignored = tracer .scopeManager ().activate (activityStartSpan )) {
60- return super .executeActivity (input );
136+ ActivityOutput <R > output = super .executeActivity (input );
137+ return new ActivityOutput <>(
138+ output .getActivityId (), new PromiseWrapper <>(capturedSpan , output .getResult ()));
61139 } finally {
62140 activityStartSpan .finish ();
63141 }
@@ -69,13 +147,15 @@ public <R> ActivityOutput<R> executeActivity(ActivityInput<R> input) {
69147 @ Override
70148 public <R > LocalActivityOutput <R > executeLocalActivity (LocalActivityInput <R > input ) {
71149 if (!WorkflowUnsafe .isReplaying ()) {
150+ Span capturedSpan = tracer .scopeManager ().activeSpan ();
72151 Span activityStartSpan =
73152 contextAccessor .writeSpanContextToHeader (
74153 () -> createActivityStartSpanBuilder (input .getActivityName ()).start (),
75154 input .getHeader (),
76155 tracer );
77156 try (Scope ignored = tracer .scopeManager ().activate (activityStartSpan )) {
78- return super .executeLocalActivity (input );
157+ LocalActivityOutput <R > output = super .executeLocalActivity (input );
158+ return new LocalActivityOutput <>(new PromiseWrapper <>(capturedSpan , output .getResult ()));
79159 } finally {
80160 activityStartSpan .finish ();
81161 }
@@ -87,11 +167,15 @@ public <R> LocalActivityOutput<R> executeLocalActivity(LocalActivityInput<R> inp
87167 @ Override
88168 public <R > ChildWorkflowOutput <R > executeChildWorkflow (ChildWorkflowInput <R > input ) {
89169 if (!WorkflowUnsafe .isReplaying ()) {
170+ Span capturedSpan = tracer .scopeManager ().activeSpan ();
90171 Span childWorkflowStartSpan =
91172 contextAccessor .writeSpanContextToHeader (
92173 () -> createChildWorkflowStartSpanBuilder (input ).start (), input .getHeader (), tracer );
93174 try (Scope ignored = tracer .scopeManager ().activate (childWorkflowStartSpan )) {
94- return super .executeChildWorkflow (input );
175+ ChildWorkflowOutput <R > output = super .executeChildWorkflow (input );
176+ return new ChildWorkflowOutput <>(
177+ new PromiseWrapper <>(capturedSpan , output .getResult ()),
178+ new PromiseWrapper <>(capturedSpan , output .getWorkflowExecution ()));
95179 } finally {
96180 childWorkflowStartSpan .finish ();
97181 }
@@ -104,13 +188,17 @@ public <R> ChildWorkflowOutput<R> executeChildWorkflow(ChildWorkflowInput<R> inp
104188 public <R > ExecuteNexusOperationOutput <R > executeNexusOperation (
105189 ExecuteNexusOperationInput <R > input ) {
106190 if (!WorkflowUnsafe .isReplaying ()) {
191+ Span capturedSpan = tracer .scopeManager ().activeSpan ();
107192 Span nexusOperationExecuteSpan =
108193 contextAccessor .writeSpanContextToHeader (
109194 () -> createStartNexusOperationSpanBuilder (input ).start (),
110195 input .getHeaders (),
111196 tracer );
112197 try (Scope ignored = tracer .scopeManager ().activate (nexusOperationExecuteSpan )) {
113- return super .executeNexusOperation (input );
198+ ExecuteNexusOperationOutput <R > output = super .executeNexusOperation (input );
199+ return new ExecuteNexusOperationOutput <>(
200+ new PromiseWrapper <>(capturedSpan , output .getResult ()),
201+ new PromiseWrapper <>(capturedSpan , output .getOperationExecution ()));
114202 } finally {
115203 nexusOperationExecuteSpan .finish ();
116204 }
@@ -122,6 +210,7 @@ public <R> ExecuteNexusOperationOutput<R> executeNexusOperation(
122210 @ Override
123211 public SignalExternalOutput signalExternalWorkflow (SignalExternalInput input ) {
124212 if (!WorkflowUnsafe .isReplaying ()) {
213+ Span capturedSpan = tracer .scopeManager ().activeSpan ();
125214 WorkflowInfo workflowInfo = Workflow .getInfo ();
126215 Span childWorkflowStartSpan =
127216 contextAccessor .writeSpanContextToHeader (
@@ -136,7 +225,8 @@ public SignalExternalOutput signalExternalWorkflow(SignalExternalInput input) {
136225 input .getHeader (),
137226 tracer );
138227 try (Scope ignored = tracer .scopeManager ().activate (childWorkflowStartSpan )) {
139- return super .signalExternalWorkflow (input );
228+ SignalExternalOutput output = super .signalExternalWorkflow (input );
229+ return new SignalExternalOutput (new PromiseWrapper <>(capturedSpan , output .getResult ()));
140230 } finally {
141231 childWorkflowStartSpan .finish ();
142232 }
0 commit comments