@@ -27,6 +27,13 @@ class StreamingSyncImplementation {
2727
2828 final Future <void > Function () uploadCrud;
2929
30+ // An internal controller which is used to trigger CRUD uploads internally
31+ // e.g. when reconnecting.
32+ // This is only a broadcast controller since the `crudLoop` method is public
33+ // and could potentially be called multiple times externally.
34+ final StreamController <Null > _internalCrudTriggerController =
35+ StreamController <Null >.broadcast ();
36+
3037 final Stream crudUpdateTriggerStream;
3138
3239 final StreamController <SyncStatus > _statusStreamController =
@@ -92,6 +99,9 @@ class StreamingSyncImplementation {
9299 if (_safeToClose) {
93100 _client.close ();
94101 }
102+
103+ await _internalCrudTriggerController.close ();
104+
95105 // wait for completeAbort() to be called
96106 await future;
97107
@@ -144,7 +154,7 @@ class StreamingSyncImplementation {
144154
145155 // On error, wait a little before retrying
146156 // When aborting, don't wait
147- await Future . any ([ Future . delayed (retryDelay), _abort ! .onAbort] );
157+ await _delayRetry ( );
148158 }
149159 }
150160 } finally {
@@ -155,10 +165,14 @@ class StreamingSyncImplementation {
155165 Future <void > crudLoop () async {
156166 await uploadAllCrud ();
157167
158- await for (var _ in crudUpdateTriggerStream) {
159- if (_abort? .aborted == true ) {
160- break ;
161- }
168+ // Trigger a CRUD upload whenever the upstream trigger fires
169+ // as-well-as whenever the sync stream reconnects.
170+ // This has the potential (in rare cases) to affect the crudThrottleTime,
171+ // but it should not result in excessive uploads since the
172+ // sync reconnects are also throttled.
173+ // The stream here is closed on abort.
174+ await for (var _ in mergeStreams (
175+ [crudUpdateTriggerStream, _internalCrudTriggerController.stream])) {
162176 await uploadAllCrud ();
163177 }
164178 }
@@ -170,6 +184,13 @@ class StreamingSyncImplementation {
170184
171185 while (true ) {
172186 try {
187+ // It's possible that an abort or disconnect operation could
188+ // be followed by a `close` operation. The close would cause these
189+ // operations, which use the DB, to throw an exception. Breaking the loop
190+ // here prevents unnecessary potential (caught) exceptions.
191+ if (aborted) {
192+ break ;
193+ }
173194 // This is the first item in the FIFO CRUD queue.
174195 CrudEntry ? nextCrudItem = await adapter.nextCrudItem ();
175196 if (nextCrudItem != null ) {
@@ -196,7 +217,7 @@ class StreamingSyncImplementation {
196217 checkedCrudItem = null ;
197218 isolateLogger.warning ('Data upload error' , e, stacktrace);
198219 _updateStatus (uploading: false , uploadError: e);
199- await Future . delayed (retryDelay );
220+ await _delayRetry ( );
200221 if (! isConnected) {
201222 // Exit the upload loop if the sync stream is no longer connected
202223 break ;
@@ -298,6 +319,9 @@ class StreamingSyncImplementation {
298319 Future <void >? credentialsInvalidation;
299320 bool haveInvalidated = false ;
300321
322+ // Trigger a CRUD upload on reconnect
323+ _internalCrudTriggerController.add (null );
324+
301325 await for (var line in merged) {
302326 if (aborted) {
303327 break ;
@@ -465,6 +489,12 @@ class StreamingSyncImplementation {
465489 yield parseStreamingSyncLine (line as Map <String , dynamic >);
466490 }
467491 }
492+
493+ /// Delays the standard `retryDelay` Duration, but exits early if
494+ /// an abort has been requested.
495+ Future <void > _delayRetry () async {
496+ await Future .any ([Future .delayed (retryDelay), _abort! .onAbort]);
497+ }
468498}
469499
470500/// Attempt to give a basic summary of the error for cases where the full error
0 commit comments