2323import java .util .concurrent .BlockingQueue ;
2424import java .util .concurrent .Executors ;
2525import java .util .concurrent .LinkedBlockingQueue ;
26+ import java .util .concurrent .RejectedExecutionException ;
2627import java .util .concurrent .ScheduledExecutorService ;
2728import java .util .concurrent .TimeUnit ;
2829import java .util .concurrent .atomic .AtomicBoolean ;
@@ -68,7 +69,11 @@ public SyncStreamQueueSource(final FlagdOptions options, Consumer<FlagdProviderE
6869 FlagSyncServiceGrpc .newStub (channelConnector .getChannel ()).withWaitForReady ();
6970 metadataStub = FlagSyncServiceGrpc .newBlockingStub (channelConnector .getChannel ())
7071 .withWaitForReady ();
71- scheduler = Executors .newSingleThreadScheduledExecutor ();
72+ scheduler = Executors .newSingleThreadScheduledExecutor (r -> {
73+ Thread t = new Thread (r , "flagd-sync-retry-scheduler" );
74+ t .setDaemon (true );
75+ return t ;
76+ });
7277 }
7378
7479 // internal use only
@@ -86,7 +91,11 @@ protected SyncStreamQueueSource(
8691 flagSyncStub = stubMock ;
8792 syncMetadataDisabled = options .isSyncMetadataDisabled ();
8893 metadataStub = blockingStubMock ;
89- scheduler = Executors .newSingleThreadScheduledExecutor ();
94+ scheduler = Executors .newSingleThreadScheduledExecutor (r -> {
95+ Thread t = new Thread (r , "flagd-sync-retry-scheduler" );
96+ t .setDaemon (true );
97+ return t ;
98+ });
9099 }
91100
92101 /** Initialize sync stream connector. */
@@ -115,7 +124,12 @@ public void shutdown() throws InterruptedException {
115124 return ;
116125 }
117126 this .scheduler .shutdownNow ();
118- this .scheduler .awaitTermination (deadline , TimeUnit .MILLISECONDS );
127+ try {
128+ this .scheduler .awaitTermination (deadline , TimeUnit .MILLISECONDS );
129+ } catch (InterruptedException e ) {
130+ log .debug ("Scheduler termination was interrupted" , e );
131+ Thread .currentThread ().interrupt ();
132+ }
119133 this .channelConnector .shutdown ();
120134 }
121135
@@ -167,7 +181,12 @@ private void scheduleRetry() {
167181 log .info ("Shutdown invoked, exiting event stream listener" );
168182 return ;
169183 }
170- scheduler .schedule (this ::observeSyncStream , this .maxBackoffMs , TimeUnit .MILLISECONDS );
184+ try {
185+ scheduler .schedule (this ::observeSyncStream , this .maxBackoffMs , TimeUnit .MILLISECONDS );
186+ } catch (RejectedExecutionException e ) {
187+ // Scheduler was shut down after the shutdown check, which is fine
188+ log .debug ("Retry scheduling rejected, scheduler is shut down" , e );
189+ }
171190 }
172191
173192 // TODO: remove the metadata call entirely after https://github.com/open-feature/flagd/issues/1584
0 commit comments