2121package io .temporal .internal .common ;
2222
2323import com .google .common .base .Defaults ;
24+ import io .nexusrpc .Header ;
2425import io .temporal .api .common .v1 .Callback ;
2526import io .temporal .api .enums .v1 .TaskQueueKind ;
2627import io .temporal .api .taskqueue .v1 .TaskQueue ;
2728import io .temporal .client .WorkflowOptions ;
2829import io .temporal .client .WorkflowStub ;
2930import io .temporal .internal .client .NexusStartWorkflowRequest ;
3031import java .util .Arrays ;
32+ import java .util .Map ;
33+ import java .util .TreeMap ;
3134import java .util .stream .Collectors ;
3235import org .slf4j .Logger ;
3336import org .slf4j .LoggerFactory ;
@@ -78,6 +81,20 @@ public static WorkflowStub createNexusBoundStub(
7881 throw new IllegalArgumentException (
7982 "WorkflowId is expected to be set on WorkflowOptions when used with Nexus" );
8083 }
84+ // Add the Nexus operation ID to the headers if it is not already present to support fabricating
85+ // a NexusOperationStarted event if the completion is received before the response to a
86+ // StartOperation request.
87+ Map <String , String > headers =
88+ request .getCallbackHeaders ().entrySet ().stream ()
89+ .collect (
90+ Collectors .toMap (
91+ (k ) -> k .getKey ().toLowerCase (),
92+ Map .Entry ::getValue ,
93+ (a , b ) -> a ,
94+ () -> new TreeMap <>(String .CASE_INSENSITIVE_ORDER )));
95+ if (!headers .containsKey (Header .OPERATION_ID )) {
96+ headers .put (Header .OPERATION_ID .toLowerCase (), options .getWorkflowId ());
97+ }
8198 WorkflowOptions .Builder nexusWorkflowOptions =
8299 WorkflowOptions .newBuilder (options )
83100 .setRequestId (request .getRequestId ())
@@ -87,7 +104,7 @@ public static WorkflowStub createNexusBoundStub(
87104 .setNexus (
88105 Callback .Nexus .newBuilder ()
89106 .setUrl (request .getCallbackUrl ())
90- .putAllHeader (request . getCallbackHeaders () )
107+ .putAllHeader (headers )
91108 .build ())
92109 .build ()));
93110 if (options .getTaskQueue () == null ) {
0 commit comments