@@ -219,4 +219,45 @@ void main() {
219219 var status = await stream.next;
220220 expect (status.forStream (subscription), isNotNull);
221221 });
222+
223+ test ('unsubscribing multiple times has no effect' , () async {
224+ final a = await database.syncStream ('a' ).subscribe ();
225+ final aAgain = await database.syncStream ('a' ).subscribe ();
226+ a.unsubscribe ();
227+ a.unsubscribe (); // Should not decrement the refcount again
228+
229+ // Pretend the streams are expired - they should still be requested because
230+ // the core extension extends the lifetime of streams currently referenced
231+ // before connecting.
232+ await database.execute (
233+ 'UPDATE ps_stream_subscriptions SET expires_at = unixepoch() - 1000' );
234+
235+ await waitForConnection ();
236+ final request = await syncService.waitForListener;
237+ expect (
238+ json.decode (await request.readAsString ()),
239+ containsPair (
240+ 'streams' ,
241+ containsPair ('subscriptions' , isNotEmpty),
242+ ),
243+ );
244+ aAgain.unsubscribe ();
245+ });
246+
247+ test ('unsubscribeAll' , () async {
248+ final a = await database.syncStream ('a' ).subscribe ();
249+ await database.syncStream ('a' ).unsubscribeAll ();
250+
251+ // Despite a being active, it should not be requested.
252+ await waitForConnection ();
253+ final request = await syncService.waitForListener;
254+ expect (
255+ json.decode (await request.readAsString ()),
256+ containsPair (
257+ 'streams' ,
258+ containsPair ('subscriptions' , isEmpty),
259+ ),
260+ );
261+ a.unsubscribe ();
262+ });
222263}
0 commit comments