Skip to content

Commit 4e9cc06

Browse files
authored
Set worker version on eager start requests (#2704)
1 parent 1b29853 commit 4e9cc06

File tree

5 files changed

+140
-6
lines changed

5 files changed

+140
-6
lines changed

temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.temporal.internal.client.external.GenericWorkflowClient;
2525
import io.temporal.internal.common.HeaderUtils;
2626
import io.temporal.internal.nexus.CurrentNexusOperationContext;
27+
import io.temporal.internal.worker.WorkerVersioningProtoUtils;
2728
import io.temporal.payload.context.WorkflowSerializationContext;
2829
import io.temporal.serviceclient.StatusUtils;
2930
import io.temporal.worker.WorkflowTaskDispatchHandle;
@@ -71,6 +72,11 @@ public WorkflowStartOutput start(WorkflowStartInput input) {
7172
try (@Nullable WorkflowTaskDispatchHandle eagerDispatchHandle = obtainDispatchHandle(input)) {
7273
boolean requestEagerExecution = eagerDispatchHandle != null;
7374
startRequest.setRequestEagerExecution(requestEagerExecution);
75+
if (requestEagerExecution && eagerDispatchHandle.getDeploymentOptions() != null) {
76+
startRequest.setEagerWorkerDeploymentOptions(
77+
WorkerVersioningProtoUtils.deploymentOptionsToProto(
78+
eagerDispatchHandle.getDeploymentOptions()));
79+
}
7480
StartWorkflowExecutionResponse response = genericClient.start(startRequest.build());
7581
WorkflowExecution execution =
7682
WorkflowExecution.newBuilder()

temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,8 @@ public WorkflowTaskDispatchHandle reserveWorkflowExecutor() {
328328
}
329329
},
330330
slotSupplier,
331-
slotPermit))
331+
slotPermit,
332+
options.getDeploymentOptions()))
332333
.orElse(null);
333334
}
334335

temporal-sdk/src/main/java/io/temporal/worker/WorkflowTaskDispatchHandle.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,24 +11,31 @@
1111
import java.util.concurrent.atomic.AtomicBoolean;
1212
import java.util.function.Function;
1313
import javax.annotation.Nonnull;
14+
import javax.annotation.Nullable;
1415

1516
public class WorkflowTaskDispatchHandle implements Closeable {
1617
private final AtomicBoolean completed = new AtomicBoolean();
1718
private final Function<WorkflowTask, Boolean> dispatchCallback;
1819
private final TrackingSlotSupplier<WorkflowSlotInfo> slotSupplier;
1920
private final SlotPermit permit;
21+
private final WorkerDeploymentOptions deploymentOptions;
2022

2123
/**
2224
* @param dispatchCallback callback into a {@code WorkflowWorker} to dispatch a workflow task.
2325
* @param slotSupplier slot supplier that was used to reserve a slot for this workflow task
26+
* @param permit the slot permit reserved for this workflow task
27+
* @param deploymentOptions deployment options of the worker that reserved the slot, or null if
28+
* not configured
2429
*/
2530
public WorkflowTaskDispatchHandle(
2631
DispatchCallback dispatchCallback,
2732
TrackingSlotSupplier<WorkflowSlotInfo> slotSupplier,
28-
SlotPermit permit) {
33+
SlotPermit permit,
34+
@Nullable WorkerDeploymentOptions deploymentOptions) {
2935
this.dispatchCallback = dispatchCallback;
3036
this.slotSupplier = slotSupplier;
3137
this.permit = permit;
38+
this.deploymentOptions = deploymentOptions;
3239
}
3340

3441
/**
@@ -47,6 +54,14 @@ public boolean dispatch(@Nonnull PollWorkflowTaskQueueResponse workflowTask) {
4754
}
4855
}
4956

57+
/**
58+
* @return deployment options of the worker that reserved the slot, or null if not configured
59+
*/
60+
@Nullable
61+
public WorkerDeploymentOptions getDeploymentOptions() {
62+
return deploymentOptions;
63+
}
64+
5065
@Override
5166
public void close() {
5267
if (completed.compareAndSet(false, true)) {

temporal-sdk/src/test/java/io/temporal/workflow/EagerWorkflowTaskDispatchTest.java

Lines changed: 115 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,21 @@
33
import static org.junit.Assert.*;
44

55
import io.grpc.*;
6+
import io.temporal.api.deployment.v1.WorkerDeploymentOptions;
67
import io.temporal.api.enums.v1.EventType;
8+
import io.temporal.api.enums.v1.WorkerVersioningMode;
79
import io.temporal.api.history.v1.HistoryEvent;
810
import io.temporal.api.workflowservice.v1.StartWorkflowExecutionRequest;
911
import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc;
1012
import io.temporal.client.WorkflowClient;
1113
import io.temporal.client.WorkflowOptions;
1214
import io.temporal.client.WorkflowStub;
15+
import io.temporal.common.VersioningBehavior;
16+
import io.temporal.common.WorkerDeploymentVersion;
1317
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
1418
import io.temporal.testUtils.CountingSlotSupplier;
1519
import io.temporal.testing.internal.SDKTestWorkflowRule;
16-
import io.temporal.worker.Worker;
17-
import io.temporal.worker.WorkerFactory;
18-
import io.temporal.worker.WorkerOptions;
20+
import io.temporal.worker.*;
1921
import io.temporal.worker.tuning.*;
2022
import io.temporal.workflow.shared.TestWorkflows;
2123
import java.util.ArrayList;
@@ -67,6 +69,14 @@ public void tearDown() throws Exception {
6769

6870
private WorkerFactory setupWorkerFactory(
6971
String workerIdentity, boolean registerWorkflows, boolean start) {
72+
return setupWorkerFactory(workerIdentity, registerWorkflows, start, null);
73+
}
74+
75+
private WorkerFactory setupWorkerFactory(
76+
String workerIdentity,
77+
boolean registerWorkflows,
78+
boolean start,
79+
io.temporal.worker.WorkerDeploymentOptions deploymentOptions) {
7080
WorkflowClient workflowClient =
7181
WorkflowClient.newInstance(
7282
testWorkflowRule.getWorkflowServiceStubs(),
@@ -86,6 +96,7 @@ private WorkerFactory setupWorkerFactory(
8696
activityTaskSlotSupplier,
8797
localActivitySlotSupplier,
8898
nexusSlotSupplier))
99+
.setDeploymentOptions(deploymentOptions)
89100
.build());
90101
if (registerWorkflows) {
91102
worker.registerWorkflowImplementationTypes(EagerWorkflowTaskWorkflowImpl.class);
@@ -265,6 +276,96 @@ public void testNoEagerWFTIfDisabledOnWorkflowOptions() {
265276
START_CALL_INTERCEPTOR.wasLastStartEager());
266277
}
267278

279+
@Test
280+
public void testDeploymentOptionsArePropagatedForVersionedWorker() {
281+
io.temporal.worker.WorkerDeploymentOptions deploymentOptions =
282+
io.temporal.worker.WorkerDeploymentOptions.newBuilder()
283+
.setUseVersioning(true)
284+
.setVersion(new WorkerDeploymentVersion("my-deployment", "build-id-123"))
285+
.setDefaultVersioningBehavior(VersioningBehavior.PINNED)
286+
.build();
287+
288+
WorkerFactory workerFactory = setupWorkerFactory("worker1", true, true, deploymentOptions);
289+
290+
TestWorkflows.NoArgsWorkflow workflowStub =
291+
workerFactory
292+
.getWorkflowClient()
293+
.newWorkflowStub(
294+
TestWorkflows.NoArgsWorkflow.class,
295+
WorkflowOptions.newBuilder()
296+
.setTaskQueue(testWorkflowRule.getTaskQueue())
297+
.setDisableEagerExecution(false)
298+
.build());
299+
workflowStub.execute();
300+
301+
assertTrue(START_CALL_INTERCEPTOR.wasLastStartEager());
302+
303+
WorkerDeploymentOptions capturedOptions = START_CALL_INTERCEPTOR.getLastDeploymentOptions();
304+
assertNotNull(
305+
"Deployment options should be present in StartWorkflowExecutionRequest", capturedOptions);
306+
assertEquals("my-deployment", capturedOptions.getDeploymentName());
307+
assertEquals("build-id-123", capturedOptions.getBuildId());
308+
assertEquals(
309+
WorkerVersioningMode.WORKER_VERSIONING_MODE_VERSIONED,
310+
capturedOptions.getWorkerVersioningMode());
311+
}
312+
313+
@Test
314+
public void testDeploymentOptionsArePropagatedForUnversionedWorker() {
315+
io.temporal.worker.WorkerDeploymentOptions deploymentOptions =
316+
io.temporal.worker.WorkerDeploymentOptions.newBuilder()
317+
.setUseVersioning(false)
318+
.setVersion(new WorkerDeploymentVersion("my-deployment", "build-id-456"))
319+
.build();
320+
321+
WorkerFactory workerFactory = setupWorkerFactory("worker1", true, true, deploymentOptions);
322+
323+
TestWorkflows.NoArgsWorkflow workflowStub =
324+
workerFactory
325+
.getWorkflowClient()
326+
.newWorkflowStub(
327+
TestWorkflows.NoArgsWorkflow.class,
328+
WorkflowOptions.newBuilder()
329+
.setTaskQueue(testWorkflowRule.getTaskQueue())
330+
.setDisableEagerExecution(false)
331+
.build());
332+
workflowStub.execute();
333+
334+
assertTrue(START_CALL_INTERCEPTOR.wasLastStartEager());
335+
336+
WorkerDeploymentOptions capturedOptions = START_CALL_INTERCEPTOR.getLastDeploymentOptions();
337+
assertNotNull(
338+
"Deployment options should be present in StartWorkflowExecutionRequest", capturedOptions);
339+
assertEquals("my-deployment", capturedOptions.getDeploymentName());
340+
assertEquals("build-id-456", capturedOptions.getBuildId());
341+
assertEquals(
342+
WorkerVersioningMode.WORKER_VERSIONING_MODE_UNVERSIONED,
343+
capturedOptions.getWorkerVersioningMode());
344+
}
345+
346+
@Test
347+
public void testNoDeploymentOptionsWhenWorkerHasNone() {
348+
WorkerFactory workerFactory = setupWorkerFactory("worker1", true, true, null);
349+
350+
TestWorkflows.NoArgsWorkflow workflowStub =
351+
workerFactory
352+
.getWorkflowClient()
353+
.newWorkflowStub(
354+
TestWorkflows.NoArgsWorkflow.class,
355+
WorkflowOptions.newBuilder()
356+
.setTaskQueue(testWorkflowRule.getTaskQueue())
357+
.setDisableEagerExecution(false)
358+
.build());
359+
workflowStub.execute();
360+
361+
assertTrue(START_CALL_INTERCEPTOR.wasLastStartEager());
362+
363+
WorkerDeploymentOptions capturedOptions = START_CALL_INTERCEPTOR.getLastDeploymentOptions();
364+
assertNull(
365+
"Deployment options should not be present when worker has no deployment options configured",
366+
capturedOptions);
367+
}
368+
268369
public static class EagerWorkflowTaskWorkflowImpl implements TestWorkflows.NoArgsWorkflow {
269370
@Override
270371
public void execute() {}
@@ -273,6 +374,7 @@ public void execute() {}
273374
private static class StartCallInterceptor implements ClientInterceptor {
274375

275376
private Boolean wasLastStartEager;
377+
private WorkerDeploymentOptions lastDeploymentOptions;
276378

277379
@Override
278380
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
@@ -287,8 +389,13 @@ public Boolean wasLastStartEager() {
287389
return wasLastStartEager;
288390
}
289391

392+
public WorkerDeploymentOptions getLastDeploymentOptions() {
393+
return lastDeploymentOptions;
394+
}
395+
290396
public void clear() {
291397
wasLastStartEager = null;
398+
lastDeploymentOptions = null;
292399
}
293400

294401
private final class EagerStartSniffingCall<ReqT, RespT>
@@ -302,6 +409,11 @@ private final class EagerStartSniffingCall<ReqT, RespT>
302409
public void sendMessage(ReqT message) {
303410
StartWorkflowExecutionRequest request = (StartWorkflowExecutionRequest) message;
304411
wasLastStartEager = request.getRequestEagerExecution();
412+
if (request.hasEagerWorkerDeploymentOptions()) {
413+
lastDeploymentOptions = request.getEagerWorkerDeploymentOptions();
414+
} else {
415+
lastDeploymentOptions = null;
416+
}
305417
super.sendMessage(message);
306418
}
307419
}

0 commit comments

Comments
 (0)