2121import io .grpc .StatusRuntimeException ;
2222import io .grpc .stub .StreamObserver ;
2323import java .util .concurrent .BlockingQueue ;
24+ import java .util .concurrent .Executors ;
2425import java .util .concurrent .LinkedBlockingQueue ;
26+ import java .util .concurrent .ScheduledExecutorService ;
2527import java .util .concurrent .TimeUnit ;
2628import java .util .concurrent .atomic .AtomicBoolean ;
2729import java .util .function .Consumer ;
@@ -49,6 +51,7 @@ public class SyncStreamQueueSource implements QueueSource {
4951 private final BlockingQueue <QueuePayload > outgoingQueue = new LinkedBlockingQueue <>(QUEUE_SIZE );
5052 private final FlagSyncServiceStub flagSyncStub ;
5153 private final FlagSyncServiceBlockingStub metadataStub ;
54+ private final ScheduledExecutorService scheduler ;
5255
5356 /**
5457 * Creates a new SyncStreamQueueSource responsible for observing the event stream.
@@ -65,6 +68,7 @@ public SyncStreamQueueSource(final FlagdOptions options, Consumer<FlagdProviderE
6568 FlagSyncServiceGrpc .newStub (channelConnector .getChannel ()).withWaitForReady ();
6669 metadataStub = FlagSyncServiceGrpc .newBlockingStub (channelConnector .getChannel ())
6770 .withWaitForReady ();
71+ scheduler = Executors .newSingleThreadScheduledExecutor ();
6872 }
6973
7074 // internal use only
@@ -82,6 +86,7 @@ protected SyncStreamQueueSource(
8286 flagSyncStub = stubMock ;
8387 syncMetadataDisabled = options .isSyncMetadataDisabled ();
8488 metadataStub = blockingStubMock ;
89+ scheduler = Executors .newSingleThreadScheduledExecutor ();
8590 }
8691
8792 /** Initialize sync stream connector. */
@@ -109,6 +114,8 @@ public void shutdown() throws InterruptedException {
109114 log .debug ("Shutdown already in progress or completed" );
110115 return ;
111116 }
117+ this .scheduler .shutdownNow ();
118+ this .scheduler .awaitTermination (deadline , TimeUnit .MILLISECONDS );
112119 this .channelConnector .shutdown ();
113120 }
114121
@@ -120,45 +127,49 @@ private void observeSyncStream() {
120127 // "waitForReady" on the channel, plus our retry policy slow this loop down in
121128 // error conditions
122129 while (!shutdown .get ()) {
130+ if (shouldThrottle .getAndSet (false )) {
131+ log .debug ("Previous stream ended with error, waiting {} ms before retry" , this .maxBackoffMs );
132+ scheduleRetry ();
133+ return ;
134+ }
135+
136+ log .debug ("Initializing sync stream request" );
137+ SyncStreamObserver observer = new SyncStreamObserver (outgoingQueue , shouldThrottle );
138+ try {
139+ observer .metadata = getMetadata ();
140+ } catch (Exception metaEx ) {
141+ // retry if getMetadata fails
142+ String message = metaEx .getMessage ();
143+ log .debug ("Metadata request error: {}, will restart" , message , metaEx );
144+ enqueueError (String .format ("Error in getMetadata request: %s" , message ));
145+ shouldThrottle .set (true );
146+ continue ;
147+ }
148+
123149 try {
124- if (shouldThrottle .getAndSet (false )) {
125- log .debug ("Previous stream ended with error, waiting {} ms before retry" , this .maxBackoffMs );
126- Thread .sleep (this .maxBackoffMs );
127-
128- // Check shutdown again after sleep to avoid unnecessary work
129- if (shutdown .get ()) {
130- break ;
131- }
132- }
133-
134- log .debug ("Initializing sync stream request" );
135- SyncStreamObserver observer = new SyncStreamObserver (outgoingQueue , shouldThrottle );
136- try {
137- observer .metadata = getMetadata ();
138- } catch (Exception metaEx ) {
139- // retry if getMetadata fails
140- String message = metaEx .getMessage ();
141- log .debug ("Metadata request error: {}, will restart" , message , metaEx );
142- enqueueError (String .format ("Error in getMetadata request: %s" , message ));
143- shouldThrottle .set (true );
144- continue ;
145- }
146-
147- try {
148- syncFlags (observer );
149- } catch (Exception ex ) {
150- log .error ("Unexpected sync stream exception, will restart." , ex );
151- enqueueError (String .format ("Error in syncStream: %s" , ex .getMessage ()));
152- shouldThrottle .set (true );
153- }
154- } catch (InterruptedException ie ) {
155- log .debug ("Stream loop interrupted, most likely shutdown was invoked" , ie );
150+ syncFlags (observer );
151+ } catch (Exception ex ) {
152+ log .error ("Unexpected sync stream exception, will restart." , ex );
153+ enqueueError (String .format ("Error in syncStream: %s" , ex .getMessage ()));
154+ shouldThrottle .set (true );
156155 }
157156 }
158157
159158 log .info ("Shutdown invoked, exiting event stream listener" );
160159 }
161160
161+ /**
162+ * Schedules a retry of the sync stream after the backoff period.
163+ * Uses a non-blocking approach instead of Thread.sleep().
164+ */
165+ private void scheduleRetry () {
166+ if (shutdown .get ()) {
167+ log .info ("Shutdown invoked, exiting event stream listener" );
168+ return ;
169+ }
170+ scheduler .schedule (this ::observeSyncStream , this .maxBackoffMs , TimeUnit .MILLISECONDS );
171+ }
172+
162173 // TODO: remove the metadata call entirely after https://github.com/open-feature/flagd/issues/1584
163174 private Struct getMetadata () {
164175 if (syncMetadataDisabled ) {
0 commit comments