2121package io .temporal .functional .serialization ;
2222
2323import static org .junit .Assert .*;
24+ import static org .junit .Assume .assumeFalse ;
25+ import static org .junit .Assume .assumeTrue ;
2426
27+ import com .google .common .collect .ImmutableMap ;
2528import com .google .protobuf .ByteString ;
2629import io .temporal .activity .*;
2730import io .temporal .api .common .v1 .Payload ;
28- import io .temporal .client . WorkflowClientOptions ;
29- import io .temporal .common . converter . CodecDataConverter ;
30- import io .temporal .common . converter . DataConverter ;
31- import io .temporal .common .converter .DefaultDataConverter ;
32- import io .temporal .common . converter . EncodingKeys ;
31+ import io .temporal .api . common . v1 . WorkflowExecution ;
32+ import io .temporal .client .* ;
33+ import io .temporal .client . schedules .* ;
34+ import io .temporal .common .converter .* ;
35+ import io .temporal .failure . CanceledFailure ;
3336import io .temporal .payload .codec .PayloadCodec ;
3437import io .temporal .payload .codec .PayloadCodecException ;
3538import io .temporal .payload .context .ActivitySerializationContext ;
3639import io .temporal .payload .context .HasWorkflowSerializationContext ;
3740import io .temporal .payload .context .SerializationContext ;
41+ import io .temporal .testing .internal .SDKTestOptions ;
3842import io .temporal .testing .internal .SDKTestWorkflowRule ;
43+ import io .temporal .workflow .ChildWorkflowOptions ;
44+ import io .temporal .workflow .ContinueAsNewOptions ;
3945import io .temporal .workflow .Workflow ;
46+ import io .temporal .workflow .shared .TestWorkflowWithCronScheduleImpl ;
4047import io .temporal .workflow .shared .TestWorkflows ;
4148import java .io .IOException ;
4249import java .time .Duration ;
43- import java .util .Arrays ;
44- import java .util .Collections ;
45- import java .util .List ;
46- import java .util .Optional ;
50+ import java .util .*;
4751import java .util .stream .Collectors ;
4852import javax .annotation .Nonnull ;
4953import javax .annotation .Nullable ;
5054import org .junit .Rule ;
5155import org .junit .Test ;
56+ import org .junit .rules .TestName ;
5257
5358/**
5459 * This test emulates a scenario when users may be using WorkflowId in their encoding to sign every
5863 * explode on decoding.
5964 */
6065public class WorkflowIdSignedPayloadsTest {
66+ private static final String MEMO_KEY = "testKey" ;
67+ private static final String MEMO_VALUE = "testValue" ;
68+ private static final Map <String , Object > MEMO = ImmutableMap .of (MEMO_KEY , MEMO_VALUE );
6169 private final SimpleActivity heartbeatingActivity = new HeartbeatingIfNotLocalActivityImpl ();
6270 private final ManualCompletionActivity manualCompletionActivity =
6371 new ManualCompletionActivityImpl ();
@@ -70,19 +78,90 @@ public class WorkflowIdSignedPayloadsTest {
7078 @ Rule
7179 public SDKTestWorkflowRule testWorkflowRule =
7280 SDKTestWorkflowRule .newBuilder ()
73- .setWorkflowTypes (SimpleWorkflowWithAnActivity .class )
81+ .setWorkflowTypes (
82+ SimpleWorkflowWithAnActivity .class , TestWorkflowWithCronScheduleImpl .class )
7483 .setWorkflowClientOptions (
7584 WorkflowClientOptions .newBuilder ().setDataConverter (codecDataConverter ).build ())
7685 .setActivityImplementations (heartbeatingActivity , manualCompletionActivity )
7786 .build ();
7887
88+ @ Rule public TestName testName = new TestName ();
89+
7990 @ Test
8091 public void testSimpleWorkflowWithAnActivity () {
8192 TestWorkflows .TestWorkflow1 workflowStub =
8293 testWorkflowRule .newWorkflowStubTimeoutOptions (TestWorkflows .TestWorkflow1 .class );
8394 assertEquals ("result" , workflowStub .execute ("input" ));
8495 }
8596
97+ @ Test
98+ public void testSimpleWorkflowWithMemo () throws InterruptedException {
99+ assumeTrue (
100+ "skipping as test server does not support list" , SDKTestWorkflowRule .useExternalService );
101+
102+ WorkflowOptions options =
103+ SDKTestOptions .newWorkflowOptionsWithTimeouts (testWorkflowRule .getTaskQueue ());
104+ options = WorkflowOptions .newBuilder (options ).setMemo (MEMO ).build ();
105+ TestWorkflows .TestWorkflow1 workflowStub =
106+ testWorkflowRule
107+ .getWorkflowClient ()
108+ .newWorkflowStub (TestWorkflows .TestWorkflow1 .class , options );
109+ assertEquals ("result" , workflowStub .execute ("input" ));
110+ WorkflowExecution execution = WorkflowStub .fromTyped (workflowStub ).getExecution ();
111+ String workflowId = execution .getWorkflowId ();
112+ String runId = execution .getRunId ();
113+
114+ // listWorkflowExecutions is Visibility API
115+ // Temporal Visibility has latency and is not transactional with the Server API call
116+ Thread .sleep (4_000 );
117+
118+ List <WorkflowExecutionMetadata > executions =
119+ testWorkflowRule
120+ .getWorkflowClient ()
121+ .listExecutions ("WorkflowId = '" + workflowId + "' AND " + " RunId = '" + runId + "'" )
122+ .collect (Collectors .toList ());
123+ assertEquals (1 , executions .size ());
124+ assertEquals (MEMO_VALUE , executions .get (0 ).getMemo (MEMO_KEY , String .class ));
125+ }
126+
127+ @ Test
128+ public void testSimpleCronWorkflow () {
129+ assumeFalse ("skipping as test will timeout" , SDKTestWorkflowRule .useExternalService );
130+
131+ WorkflowOptions options =
132+ SDKTestOptions .newWorkflowOptionsWithTimeouts (testWorkflowRule .getTaskQueue ());
133+ options =
134+ WorkflowOptions .newBuilder (options )
135+ .setWorkflowRunTimeout (Duration .ofHours (1 ))
136+ .setCronSchedule ("0 */6 * * *" )
137+ .build ();
138+ TestWorkflows .TestWorkflowWithCronSchedule workflow =
139+ testWorkflowRule
140+ .getWorkflowClient ()
141+ .newWorkflowStub (TestWorkflows .TestWorkflowWithCronSchedule .class , options );
142+
143+ testWorkflowRule .registerDelayedCallback (
144+ Duration .ofDays (1 ), WorkflowStub .fromTyped (workflow )::cancel );
145+ WorkflowClient .start (workflow ::execute , testName .getMethodName ());
146+
147+ try {
148+ workflow .execute (testName .getMethodName ());
149+ fail ("unreachable" );
150+ } catch (WorkflowFailedException e ) {
151+ assertTrue (e .getCause () instanceof CanceledFailure );
152+ }
153+
154+ Map <Integer , String > lastCompletionResults =
155+ TestWorkflowWithCronScheduleImpl .lastCompletionResults .get (testName .getMethodName ());
156+ assertEquals (4 , lastCompletionResults .size ());
157+ // Run 3 failed. So on run 4 we get the last completion result from run 2.
158+ assertEquals ("run 2" , lastCompletionResults .get (4 ));
159+ // The last failure ought to be the one from run 3
160+ assertTrue (TestWorkflowWithCronScheduleImpl .lastFail .isPresent ());
161+ assertTrue (
162+ TestWorkflowWithCronScheduleImpl .lastFail .get ().getMessage ().contains ("simulated error" ));
163+ }
164+
86165 @ ActivityInterface
87166 public interface SimpleActivity {
88167 @ ActivityMethod (name = "simple" )
@@ -159,14 +238,21 @@ public String execute(String input) {
159238 assertEquals ("result" , result );
160239 // Child Workflow
161240 if (!Workflow .getInfo ().getParentWorkflowId ().isPresent ()) {
241+ ChildWorkflowOptions childOptions = ChildWorkflowOptions .newBuilder ().setMemo (MEMO ).build ();
162242 TestWorkflows .TestWorkflow1 child =
163- Workflow .newChildWorkflowStub (TestWorkflows .TestWorkflow1 .class );
243+ Workflow .newChildWorkflowStub (TestWorkflows .TestWorkflow1 .class , childOptions );
164244 result = child .execute (input );
165245 assertEquals ("result" , result );
166246 }
247+ // Memo
248+ String memoValue = (String ) Workflow .getMemo (MEMO_KEY , String .class );
249+ if (memoValue != null ) {
250+ assertEquals (MEMO_VALUE , memoValue );
251+ }
167252 // continueAsNew
168253 if (!Workflow .getInfo ().getContinuedExecutionRunId ().isPresent ()) {
169- Workflow .continueAsNew (input );
254+ ContinueAsNewOptions casOptions = ContinueAsNewOptions .newBuilder ().setMemo (MEMO ).build ();
255+ Workflow .continueAsNew (casOptions , input );
170256 }
171257 return result ;
172258 }
0 commit comments