Skip to content

Commit 4fe296e

Browse files
Add start delay (#1897)
1 parent 06ef0df commit 4fe296e

File tree

6 files changed

+130
-5
lines changed

6 files changed

+130
-5
lines changed

temporal-sdk/src/main/java/io/temporal/client/WorkflowOptions.java

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.google.common.base.Objects;
2424
import io.temporal.api.enums.v1.WorkflowIdReusePolicy;
2525
import io.temporal.common.CronSchedule;
26+
import io.temporal.common.Experimental;
2627
import io.temporal.common.MethodRetry;
2728
import io.temporal.common.RetryOptions;
2829
import io.temporal.common.SearchAttributes;
@@ -76,6 +77,7 @@ public static WorkflowOptions merge(
7677
.setTypedSearchAttributes(o.getTypedSearchAttributes())
7778
.setContextPropagators(o.getContextPropagators())
7879
.setDisableEagerExecution(o.isDisableEagerExecution())
80+
.setStartDelay(o.getStartDelay())
7981
.validateBuildWithDefaults();
8082
}
8183

@@ -107,6 +109,8 @@ public static final class Builder {
107109

108110
private boolean disableEagerExecution = true;
109111

112+
private Duration startDelay;
113+
110114
private Builder() {}
111115

112116
private Builder(WorkflowOptions options) {
@@ -126,6 +130,7 @@ private Builder(WorkflowOptions options) {
126130
this.typedSearchAttributes = options.typedSearchAttributes;
127131
this.contextPropagators = options.contextPropagators;
128132
this.disableEagerExecution = options.disableEagerExecution;
133+
this.startDelay = options.startDelay;
129134
}
130135

131136
/**
@@ -344,6 +349,18 @@ public Builder setDisableEagerExecution(boolean disableEagerExecution) {
344349
return this;
345350
}
346351

352+
/**
353+
* Time to wait before dispatching the first workflow task. If the workflow gets a signal before
354+
* the delay, a workflow task will be dispatched and the rest of the delay will be ignored. A
355+
* signal from signal with start will not trigger a workflow task. Cannot be set the same time
356+
* as a CronSchedule.
357+
*/
358+
@Experimental
359+
public Builder setStartDelay(Duration startDelay) {
360+
this.startDelay = startDelay;
361+
return this;
362+
}
363+
347364
public WorkflowOptions build() {
348365
return new WorkflowOptions(
349366
workflowId,
@@ -358,7 +375,8 @@ public WorkflowOptions build() {
358375
searchAttributes,
359376
typedSearchAttributes,
360377
contextPropagators,
361-
disableEagerExecution);
378+
disableEagerExecution,
379+
startDelay);
362380
}
363381

364382
/**
@@ -378,7 +396,8 @@ public WorkflowOptions validateBuildWithDefaults() {
378396
searchAttributes,
379397
typedSearchAttributes,
380398
contextPropagators,
381-
disableEagerExecution);
399+
disableEagerExecution,
400+
startDelay);
382401
}
383402
}
384403

@@ -408,6 +427,8 @@ public WorkflowOptions validateBuildWithDefaults() {
408427

409428
private final boolean disableEagerExecution;
410429

430+
private final Duration startDelay;
431+
411432
private WorkflowOptions(
412433
String workflowId,
413434
WorkflowIdReusePolicy workflowIdReusePolicy,
@@ -421,7 +442,8 @@ private WorkflowOptions(
421442
Map<String, ?> searchAttributes,
422443
SearchAttributes typedSearchAttributes,
423444
List<ContextPropagator> contextPropagators,
424-
boolean disableEagerExecution) {
445+
boolean disableEagerExecution,
446+
Duration startDelay) {
425447
this.workflowId = workflowId;
426448
this.workflowIdReusePolicy = workflowIdReusePolicy;
427449
this.workflowRunTimeout = workflowRunTimeout;
@@ -435,6 +457,7 @@ private WorkflowOptions(
435457
this.typedSearchAttributes = typedSearchAttributes;
436458
this.contextPropagators = contextPropagators;
437459
this.disableEagerExecution = disableEagerExecution;
460+
this.startDelay = startDelay;
438461
}
439462

440463
public String getWorkflowId() {
@@ -498,6 +521,10 @@ public boolean isDisableEagerExecution() {
498521
return disableEagerExecution;
499522
}
500523

524+
public @Nullable Duration getStartDelay() {
525+
return startDelay;
526+
}
527+
501528
public Builder toBuilder() {
502529
return new Builder(this);
503530
}
@@ -519,7 +546,8 @@ public boolean equals(Object o) {
519546
&& Objects.equal(searchAttributes, that.searchAttributes)
520547
&& Objects.equal(typedSearchAttributes, that.typedSearchAttributes)
521548
&& Objects.equal(contextPropagators, that.contextPropagators)
522-
&& Objects.equal(disableEagerExecution, that.disableEagerExecution);
549+
&& Objects.equal(disableEagerExecution, that.disableEagerExecution)
550+
&& Objects.equal(startDelay, that.startDelay);
523551
}
524552

525553
@Override
@@ -537,7 +565,8 @@ public int hashCode() {
537565
searchAttributes,
538566
typedSearchAttributes,
539567
contextPropagators,
540-
disableEagerExecution);
568+
disableEagerExecution,
569+
startDelay);
541570
}
542571

543572
@Override
@@ -572,6 +601,8 @@ public String toString() {
572601
+ contextPropagators
573602
+ ", disableEagerExecution="
574603
+ disableEagerExecution
604+
+ ", startDelay="
605+
+ startDelay
575606
+ '}';
576607
}
577608
}

temporal-sdk/src/main/java/io/temporal/internal/client/WorkflowClientRequestFactory.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,10 @@ StartWorkflowExecutionRequest.Builder newStartWorkflowExecutionRequest(
100100
request.setMemo(memo);
101101
}
102102

103+
if (options.getStartDelay() != null) {
104+
request.setWorkflowStartDelay(ProtobufTimeUtils.toProtoDuration(options.getStartDelay()));
105+
}
106+
103107
if (options.getSearchAttributes() != null && !options.getSearchAttributes().isEmpty()) {
104108
if (options.getTypedSearchAttributes() != null) {
105109
throw new IllegalArgumentException(
@@ -170,6 +174,10 @@ SignalWithStartWorkflowExecutionRequest.Builder newSignalWithStartWorkflowExecut
170174
request.setHeader(startParameters.getHeader());
171175
}
172176

177+
if (startParameters.hasWorkflowStartDelay()) {
178+
request.setWorkflowStartDelay(startParameters.getWorkflowStartDelay());
179+
}
180+
173181
return request;
174182
}
175183

temporal-sdk/src/test/java/io/temporal/client/functional/SignalTest.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,23 @@
2020

2121
package io.temporal.client.functional;
2222

23+
import static org.junit.Assert.assertEquals;
2324
import static org.junit.Assert.assertThrows;
2425

26+
import io.temporal.api.common.v1.WorkflowExecution;
27+
import io.temporal.api.history.v1.HistoryEvent;
28+
import io.temporal.api.history.v1.WorkflowExecutionStartedEventAttributes;
2529
import io.temporal.client.WorkflowNotFoundException;
2630
import io.temporal.client.WorkflowOptions;
31+
import io.temporal.client.WorkflowStub;
32+
import io.temporal.common.WorkflowExecutionHistory;
33+
import io.temporal.internal.common.ProtobufTimeUtils;
34+
import io.temporal.testing.internal.SDKTestOptions;
2735
import io.temporal.testing.internal.SDKTestWorkflowRule;
2836
import io.temporal.workflow.shared.TestWorkflows;
37+
import java.time.Duration;
38+
import java.util.List;
39+
import java.util.stream.Collectors;
2940
import org.junit.Rule;
3041
import org.junit.Test;
3142

@@ -56,6 +67,35 @@ public void signalCompletedWorkflow() {
5667
assertThrows(WorkflowNotFoundException.class, () -> workflow.signal("some-value"));
5768
}
5869

70+
@Test(timeout = 50000)
71+
public void signalWithStartWithDelay() {
72+
WorkflowOptions workflowOptions =
73+
SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue()).toBuilder()
74+
.setStartDelay(Duration.ofSeconds(5))
75+
.build();
76+
WorkflowStub stubF =
77+
testWorkflowRule
78+
.getWorkflowClient()
79+
.newUntypedWorkflowStub("TestSignaledWorkflow", workflowOptions);
80+
81+
WorkflowExecution workflowExecution =
82+
stubF.signalWithStart("testSignal", new Object[] {"testArg"}, new Object[] {});
83+
84+
assertEquals("done", stubF.getResult(String.class));
85+
WorkflowExecutionHistory workflowExecutionHistory =
86+
testWorkflowRule.getWorkflowClient().fetchHistory(workflowExecution.getWorkflowId());
87+
List<WorkflowExecutionStartedEventAttributes> workflowExecutionStartedEvents =
88+
workflowExecutionHistory.getEvents().stream()
89+
.filter(HistoryEvent::hasWorkflowExecutionStartedEventAttributes)
90+
.map(x -> x.getWorkflowExecutionStartedEventAttributes())
91+
.collect(Collectors.toList());
92+
assertEquals(1, workflowExecutionStartedEvents.size());
93+
assertEquals(
94+
Duration.ofSeconds(5),
95+
ProtobufTimeUtils.toJavaDuration(
96+
workflowExecutionStartedEvents.get(0).getFirstWorkflowTaskBackoff()));
97+
}
98+
5999
public static class QuickWorkflowWithSignalImpl implements TestWorkflows.TestSignaledWorkflow {
60100

61101
@Override

temporal-sdk/src/test/java/io/temporal/client/functional/StartTest.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,20 @@
2424

2525
import io.temporal.api.common.v1.WorkflowExecution;
2626
import io.temporal.api.enums.v1.WorkflowIdReusePolicy;
27+
import io.temporal.api.history.v1.HistoryEvent;
28+
import io.temporal.api.history.v1.WorkflowExecutionStartedEventAttributes;
2729
import io.temporal.client.WorkflowClient;
2830
import io.temporal.client.WorkflowOptions;
31+
import io.temporal.client.WorkflowStub;
32+
import io.temporal.common.WorkflowExecutionHistory;
33+
import io.temporal.internal.common.ProtobufTimeUtils;
2934
import io.temporal.testing.internal.SDKTestOptions;
3035
import io.temporal.testing.internal.SDKTestWorkflowRule;
3136
import io.temporal.workflow.shared.TestMultiArgWorkflowFunctions.*;
37+
import java.time.Duration;
38+
import java.util.List;
3239
import java.util.Optional;
40+
import java.util.stream.Collectors;
3341
import org.junit.Assert;
3442
import org.junit.Rule;
3543
import org.junit.Test;
@@ -40,6 +48,32 @@ public class StartTest {
4048
public SDKTestWorkflowRule testWorkflowRule =
4149
SDKTestWorkflowRule.newBuilder().setWorkflowTypes(TestMultiArgWorkflowImpl.class).build();
4250

51+
@Test
52+
public void startWithDelay() {
53+
WorkflowOptions workflowOptions =
54+
SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue()).toBuilder()
55+
.setStartDelay(Duration.ofSeconds(5))
56+
.build();
57+
TestNoArgsWorkflowFunc stubF =
58+
testWorkflowRule
59+
.getWorkflowClient()
60+
.newWorkflowStub(TestNoArgsWorkflowFunc.class, workflowOptions);
61+
assertResult("func", WorkflowClient.start(stubF::func));
62+
WorkflowExecution workflowExecution = WorkflowStub.fromTyped(stubF).getExecution();
63+
WorkflowExecutionHistory workflowExecutionHistory =
64+
testWorkflowRule.getWorkflowClient().fetchHistory(workflowExecution.getWorkflowId());
65+
List<WorkflowExecutionStartedEventAttributes> workflowExecutionStartedEvents =
66+
workflowExecutionHistory.getEvents().stream()
67+
.filter(HistoryEvent::hasWorkflowExecutionStartedEventAttributes)
68+
.map(x -> x.getWorkflowExecutionStartedEventAttributes())
69+
.collect(Collectors.toList());
70+
assertEquals(1, workflowExecutionStartedEvents.size());
71+
assertEquals(
72+
Duration.ofSeconds(5),
73+
ProtobufTimeUtils.toJavaDuration(
74+
workflowExecutionStartedEvents.get(0).getFirstWorkflowTaskBackoff()));
75+
}
76+
4377
@Test
4478
public void startNoArgFuncWithRejectDuplicate() {
4579
WorkflowOptions workflowOptions =

temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -841,6 +841,11 @@ private static void startWorkflow(
841841
.withDescription("negative workflowTaskTimeoutSeconds")
842842
.asRuntimeException();
843843
}
844+
if (request.hasWorkflowStartDelay() && !request.getCronSchedule().trim().isEmpty()) {
845+
throw Status.INVALID_ARGUMENT
846+
.withDescription("CronSchedule and WorkflowStartDelay may not be used together.")
847+
.asRuntimeException();
848+
}
844849

845850
WorkflowExecutionStartedEventAttributes.Builder a =
846851
WorkflowExecutionStartedEventAttributes.newBuilder()
@@ -860,6 +865,9 @@ private static void startWorkflow(
860865
if (data.lastCompletionResult != null) {
861866
a.setLastCompletionResult(data.lastCompletionResult);
862867
}
868+
if (request.hasWorkflowStartDelay()) {
869+
a.setFirstWorkflowTaskBackoff(request.getWorkflowStartDelay());
870+
}
863871
data.lastFailure.ifPresent(a::setContinuedFailure);
864872
if (request.hasMemo()) {
865873
a.setMemo(request.getMemo());

temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -883,6 +883,10 @@ public void signalWithStartWorkflowExecution(
883883
if (r.hasSearchAttributes()) {
884884
startRequest.setSearchAttributes(r.getSearchAttributes());
885885
}
886+
if (r.hasWorkflowStartDelay()) {
887+
startRequest.setWorkflowStartDelay(r.getWorkflowStartDelay());
888+
}
889+
886890
StartWorkflowExecutionResponse startResult =
887891
startWorkflowExecutionImpl(
888892
startRequest.build(),

0 commit comments

Comments
 (0)