3939import io .temporal .serviceclient .rpcretry .DefaultStubServiceOperationRpcRetryOptions ;
4040import io .temporal .worker .MetricsType ;
4141import io .temporal .worker .WorkerMetricsTag ;
42+ import io .temporal .worker .tuning .*;
4243import java .util .Objects ;
44+ import java .util .Optional ;
4345import java .util .concurrent .CompletableFuture ;
44- import java .util .concurrent .Semaphore ;
4546import java .util .concurrent .TimeUnit ;
4647import javax .annotation .Nonnull ;
4748import org .slf4j .Logger ;
@@ -64,16 +65,16 @@ final class ActivityWorker implements SuspendableWorker {
6465 private final Scope workerMetricsScope ;
6566 private final GrpcRetryer grpcRetryer ;
6667 private final GrpcRetryer .GrpcRetryerOptions replyGrpcRetryerOptions ;
67- private final int executorSlots ;
68- private final Semaphore executorSlotsSemaphore ;
68+ private final TrackingSlotSupplier <ActivitySlotInfo > slotSupplier ;
6969
7070 public ActivityWorker (
7171 @ Nonnull WorkflowServiceStubs service ,
7272 @ Nonnull String namespace ,
7373 @ Nonnull String taskQueue ,
7474 double taskQueueActivitiesPerSecond ,
7575 @ Nonnull SingleWorkerOptions options ,
76- @ Nonnull ActivityTaskHandler handler ) {
76+ @ Nonnull ActivityTaskHandler handler ,
77+ @ Nonnull TrackingSlotSupplier <ActivitySlotInfo > slotSupplier ) {
7778 this .service = Objects .requireNonNull (service );
7879 this .namespace = Objects .requireNonNull (namespace );
7980 this .taskQueue = Objects .requireNonNull (taskQueue );
@@ -87,8 +88,8 @@ public ActivityWorker(
8788 this .replyGrpcRetryerOptions =
8889 new GrpcRetryer .GrpcRetryerOptions (
8990 DefaultStubServiceOperationRpcRetryOptions .INSTANCE , null );
90- this .executorSlots = options . getTaskExecutorThreadPoolSize () ;
91- this .executorSlotsSemaphore = new Semaphore ( executorSlots );
91+ this .slotSupplier = slotSupplier ;
92+ this .slotSupplier . setMetricsScope ( this . workerMetricsScope );
9293 }
9394
9495 @ Override
@@ -101,8 +102,7 @@ public boolean start() {
101102 options .getIdentity (),
102103 new TaskHandlerImpl (handler ),
103104 pollerOptions ,
104- options .getTaskExecutorThreadPoolSize (),
105- workerMetricsScope ,
105+ slotSupplier .maximumSlots (),
106106 true );
107107 poller =
108108 new Poller <>(
@@ -115,7 +115,7 @@ public boolean start() {
115115 options .getBuildId (),
116116 options .isUsingBuildIdForVersioning (),
117117 taskQueueActivitiesPerSecond ,
118- executorSlotsSemaphore ,
118+ this . slotSupplier ,
119119 workerMetricsScope ,
120120 service .getServerCapabilities ()),
121121 this .pollTaskExecutor ,
@@ -131,14 +131,14 @@ public boolean start() {
131131
132132 @ Override
133133 public CompletableFuture <Void > shutdown (ShutdownManager shutdownManager , boolean interruptTasks ) {
134- String semaphoreName = this + "#executorSlotsSemaphore " ;
134+ String supplierName = this + "#executorSlots " ;
135135 return poller
136136 .shutdown (shutdownManager , interruptTasks )
137137 .thenCompose (
138138 ignore ->
139139 !interruptTasks
140- ? shutdownManager .waitForSemaphorePermitsReleaseUntimed (
141- executorSlotsSemaphore , executorSlots , semaphoreName )
140+ ? shutdownManager .waitForSupplierPermitsReleasedUnlimited (
141+ slotSupplier , supplierName )
142142 : CompletableFuture .completedFuture (null ))
143143 .thenCompose (
144144 ignore ->
@@ -416,23 +416,33 @@ private void logExceptionDuringResultReporting(
416416
417417 private final class EagerActivityDispatcherImpl implements EagerActivityDispatcher {
418418 @ Override
419- public boolean tryReserveActivitySlot (
419+ public Optional < SlotPermit > tryReserveActivitySlot (
420420 ScheduleActivityTaskCommandAttributesOrBuilder commandAttributes ) {
421- return WorkerLifecycleState .ACTIVE .equals (ActivityWorker .this .getLifecycleState ())
422- && Objects .equals (
423- commandAttributes .getTaskQueue ().getName (), ActivityWorker .this .taskQueue )
424- && ActivityWorker .this .executorSlotsSemaphore .tryAcquire ();
421+ if (!WorkerLifecycleState .ACTIVE .equals (ActivityWorker .this .getLifecycleState ())
422+ || !Objects .equals (
423+ commandAttributes .getTaskQueue ().getName (), ActivityWorker .this .taskQueue )) {
424+ return Optional .empty ();
425+ }
426+ return ActivityWorker .this .slotSupplier .tryReserveSlot (
427+ new SlotReservationData (
428+ ActivityWorker .this .taskQueue , options .getIdentity (), options .getBuildId ()));
425429 }
426430
427431 @ Override
428- public void releaseActivitySlotReservations (int slotCounts ) {
429- ActivityWorker .this .executorSlotsSemaphore .release (slotCounts );
432+ public void releaseActivitySlotReservations (Iterable <SlotPermit > permits ) {
433+ for (SlotPermit permit : permits ) {
434+ ActivityWorker .this .slotSupplier .releaseSlot (SlotReleaseReason .neverUsed (), permit );
435+ }
430436 }
431437
432438 @ Override
433- public void dispatchActivity (PollActivityTaskQueueResponse activity ) {
439+ public void dispatchActivity (PollActivityTaskQueueResponse activity , SlotPermit permit ) {
434440 ActivityWorker .this .pollTaskExecutor .process (
435- new ActivityTask (activity , ActivityWorker .this .executorSlotsSemaphore ::release ));
441+ new ActivityTask (
442+ activity ,
443+ () ->
444+ ActivityWorker .this .slotSupplier .releaseSlot (
445+ SlotReleaseReason .taskComplete (), permit )));
436446 }
437447 }
438448}
0 commit comments