1818package com .uber .cadence .worker ;
1919
2020import com .google .common .annotations .VisibleForTesting ;
21+ import com .google .common .base .MoreObjects ;
22+ import com .google .common .base .Preconditions ;
23+ import com .google .common .base .Strings ;
2124import com .uber .cadence .WorkflowExecution ;
2225import com .uber .cadence .client .WorkflowClient ;
2326import com .uber .cadence .converter .DataConverter ;
3336import com .uber .m3 .util .ImmutableMap ;
3437import java .lang .reflect .Type ;
3538import java .time .Duration ;
39+ import java .util .ArrayList ;
40+ import java .util .List ;
3641import java .util .Map ;
3742import java .util .Objects ;
3843import java .util .concurrent .TimeUnit ;
@@ -49,95 +54,44 @@ public final class Worker {
4954 private final SyncWorkflowWorker workflowWorker ;
5055 private final SyncActivityWorker activityWorker ;
5156 private final AtomicBoolean started = new AtomicBoolean ();
57+ private final AtomicBoolean closed = new AtomicBoolean ();
5258
5359 /**
54- * Creates worker that connects to the local instance of the Cadence Service that listens on a
55- * default port (7933).
56- *
57- * @param domain domain that worker uses to poll.
58- * @param taskList task list name worker uses to poll. It uses this name for both decision and
59- * activity task list polls.
60- */
61- public Worker (String domain , String taskList ) {
62- this (domain , taskList , null );
63- }
64-
65- /**
66- * Creates worker that connects to the local instance of the Cadence Service that listens on a
67- * default port (7933).
60+ * Creates worker that connects to an instance of the Cadence Service.
6861 *
62+ * @param service client to the Cadence Service endpoint.
6963 * @param domain domain that worker uses to poll.
7064 * @param taskList task list name worker uses to poll. It uses this name for both decision and
7165 * activity task list polls.
7266 * @param options Options (like {@link DataConverter} override) for configuring worker.
7367 */
74- public Worker (String domain , String taskList , WorkerOptions options ) {
75- this (new WorkflowServiceTChannel (), domain , taskList , options );
76- }
68+ private Worker (IWorkflowService service , String domain , String taskList , WorkerOptions options ) {
69+ Objects .requireNonNull (service , "service should not be null" );
70+ Preconditions .checkArgument (
71+ !Strings .isNullOrEmpty (domain ), "domain should not be an empty string" );
72+ Preconditions .checkArgument (
73+ !Strings .isNullOrEmpty (taskList ), "taskList should not be an empty string" );
7774
78- /**
79- * Creates worker that connects to an instance of the Cadence Service.
80- *
81- * @param host of the Cadence Service endpoint
82- * @param port of the Cadence Service endpoint
83- * @param domain domain that worker uses to poll.
84- * @param taskList task list name worker uses to poll. It uses this name for both decision and
85- * activity task list polls.
86- */
87- public Worker (String host , int port , String domain , String taskList ) {
88- this (new WorkflowServiceTChannel (host , port ), domain , taskList , null );
89- }
75+ this .taskList = taskList ;
76+ this .options = MoreObjects .firstNonNull (options , new Builder ().build ());
9077
91- /**
92- * Creates worker that connects to an instance of the Cadence Service.
93- *
94- * @param host of the Cadence Service endpoint
95- * @param port of the Cadence Service endpoint
96- * @param domain domain that worker uses to poll.
97- * @param taskList task list name worker uses to poll. It uses this name for both decision and
98- * activity task list polls.
99- * @param options Options (like {@link DataConverter} override) for configuring worker.
100- */
101- public Worker (String host , int port , String domain , String taskList , WorkerOptions options ) {
102- this (new WorkflowServiceTChannel (host , port ), domain , taskList , options );
103- }
78+ SingleWorkerOptions activityOptions = toActivityOptions (this .options , domain , taskList );
79+ activityWorker =
80+ this .options .isDisableActivityWorker ()
81+ ? null
82+ : new SyncActivityWorker (service , domain , taskList , activityOptions );
10483
105- /**
106- * Creates worker that connects to an instance of the Cadence Service.
107- *
108- * @param service client to the Cadence Service endpoint.
109- * @param domain domain that worker uses to poll.
110- * @param taskList task list name worker uses to poll. It uses this name for both decision and
111- * activity task list polls.
112- * @param options Options (like {@link DataConverter} override) for configuring worker.
113- */
114- public Worker (IWorkflowService service , String domain , String taskList , WorkerOptions options ) {
115- Objects .requireNonNull (service , "service" );
116- Objects .requireNonNull (domain , "domain" );
117- this .taskList = Objects .requireNonNull (taskList , "taskList" );
118- if (options == null ) {
119- options = new Builder ().build ();
120- }
121- this .options = options ;
122- SingleWorkerOptions activityOptions = toActivityOptions (options , domain , taskList );
123- if (!options .isDisableActivityWorker ()) {
124- activityWorker = new SyncActivityWorker (service , domain , taskList , activityOptions );
125- } else {
126- activityWorker = null ;
127- }
128- SingleWorkerOptions workflowOptions = toWorkflowOptions (options , domain , taskList );
129- if (!options .isDisableWorkflowWorker ()) {
130- workflowWorker =
131- new SyncWorkflowWorker (
132- service ,
133- domain ,
134- taskList ,
135- options .getInterceptorFactory (),
136- workflowOptions ,
137- options .getMaxWorkflowThreads ());
138- } else {
139- workflowWorker = null ;
140- }
84+ SingleWorkerOptions workflowOptions = toWorkflowOptions (this .options , domain , taskList );
85+ workflowWorker =
86+ this .options .isDisableWorkflowWorker ()
87+ ? null
88+ : new SyncWorkflowWorker (
89+ service ,
90+ domain ,
91+ taskList ,
92+ this .options .getInterceptorFactory (),
93+ workflowOptions ,
94+ this .options .getMaxWorkflowThreads ());
14195 }
14296
14397 private SingleWorkerOptions toActivityOptions (
@@ -190,10 +144,13 @@ private SingleWorkerOptions toWorkflowOptions(
190144 * workflows are stateful and a new instance is created for each workflow execution.
191145 */
192146 public void registerWorkflowImplementationTypes (Class <?>... workflowImplementationClasses ) {
193- if (workflowWorker == null ) {
194- throw new IllegalStateException ("disableWorkflowWorker is set in worker options" );
195- }
196- checkNotStarted ();
147+ Preconditions .checkState (
148+ workflowWorker != null ,
149+ "registerWorkflowImplementationTypes is not allowed when disableWorkflowWorker is set in worker options" );
150+ Preconditions .checkState (
151+ !started .get (),
152+ "registerWorkflowImplementationTypes is not allowed after worker has started" );
153+
197154 workflowWorker .setWorkflowImplementationTypes (workflowImplementationClasses );
198155 }
199156
@@ -234,20 +191,17 @@ public <R> void addWorkflowImplementationFactory(Class<R> workflowInterface, Fun
234191 * <p>
235192 */
236193 public void registerActivitiesImplementations (Object ... activityImplementations ) {
237- if ( activityWorker == null ) {
238- throw new IllegalStateException ( "disableActivityWorker is set in worker options" );
239- }
240- checkNotStarted ();
241- activityWorker . setActivitiesImplementation ( activityImplementations );
242- }
194+ Preconditions . checkState (
195+ activityWorker != null ,
196+ "registerActivitiesImplementations is not allowed when disableWorkflowWorker is set in worker options" );
197+ Preconditions . checkState (
198+ ! started . get (),
199+ "registerActivitiesImplementations is not allowed after worker has started" );
243200
244- private void checkNotStarted () {
245- if (started .get ()) {
246- throw new IllegalStateException ("already started" );
247- }
201+ activityWorker .setActivitiesImplementation (activityImplementations );
248202 }
249203
250- public void start () {
204+ private void start () {
251205 if (!started .compareAndSet (false , true )) {
252206 return ;
253207 }
@@ -263,10 +217,14 @@ public boolean isStarted() {
263217 return started .get ();
264218 }
265219
220+ public boolean isClosed () {
221+ return closed .get ();
222+ }
223+
266224 /**
267225 * Shutdown a worker, waiting for activities to complete execution up to the specified timeout.
268226 */
269- public void shutdown (Duration timeout ) {
227+ private void shutdown (Duration timeout ) {
270228 try {
271229 long time = System .currentTimeMillis ();
272230 if (activityWorker != null ) {
@@ -276,6 +234,7 @@ public void shutdown(Duration timeout) {
276234 long left = timeout .toMillis () - (System .currentTimeMillis () - time );
277235 workflowWorker .shutdownAndAwaitTermination (left , TimeUnit .MILLISECONDS );
278236 }
237+ closed .set (true );
279238 } catch (InterruptedException e ) {
280239 throw new RuntimeException (e );
281240 }
@@ -344,4 +303,85 @@ public <R> R queryWorkflowExecution(
344303 public String getTaskList () {
345304 return taskList ;
346305 }
306+
307+ public static final class Factory {
308+
309+ private final List <Worker > workers = new ArrayList <>();
310+ private final IWorkflowService workflowService ;
311+ private final String domain ;
312+ private State state = State .Initial ;
313+
314+ private final String statusErrorMessage =
315+ "attempted to %s while in %s state. Acceptable States: %s" ;
316+
317+ public Factory (String domain ) {
318+ this (new WorkflowServiceTChannel (), domain );
319+ }
320+
321+ public Factory (String host , int port , String domain ) {
322+ this (new WorkflowServiceTChannel (host , port ), domain );
323+ }
324+
325+ public Factory (IWorkflowService workflowService , String domain ) {
326+ Objects .requireNonNull (workflowService , "workflowService should not be null" );
327+ Preconditions .checkArgument (!Strings .isNullOrEmpty (domain ), "domain should not be an empty string" );
328+
329+ this .workflowService = workflowService ;
330+ this .domain = domain ;
331+ }
332+
333+ public Worker newWorker (String taskList ) {
334+ return newWorker (taskList , null );
335+ }
336+
337+ public Worker newWorker (String taskList , WorkerOptions options ) {
338+ Preconditions .checkArgument (!Strings .isNullOrEmpty (taskList ), "taskList should not be an empty string" );
339+
340+ synchronized (this ) {
341+ Preconditions .checkState (
342+ state == State .Initial ,
343+ String .format (
344+ statusErrorMessage , "create new worker" , state .name (), State .Initial .name ()));
345+ Worker worker = new Worker (workflowService , domain , taskList , options );
346+ workers .add (worker );
347+ return worker ;
348+ }
349+ }
350+
351+ public void start () {
352+ synchronized (this ) {
353+ Preconditions .checkState (
354+ state == State .Initial || state == State .Started ,
355+ String .format (
356+ statusErrorMessage ,
357+ "start WorkerFactory" ,
358+ state .name (),
359+ String .format ("%s, %s" , State .Initial .name (), State .Initial .name ())));
360+ if (state == State .Started ) {
361+ return ;
362+ }
363+ state = State .Started ;
364+
365+ for (Worker worker : workers ) {
366+ worker .start ();
367+ }
368+ }
369+ }
370+
371+ public void shutdown (Duration timeout ) {
372+ synchronized (this ) {
373+ state = State .Shutdown ;
374+
375+ for (Worker worker : workers ) {
376+ worker .shutdown (timeout );
377+ }
378+ }
379+ }
380+
381+ enum State {
382+ Initial ,
383+ Started ,
384+ Shutdown
385+ }
386+ }
347387}
0 commit comments