@@ -8,6 +8,7 @@ use sqlite_nostd::{self as sqlite, Connection, ManagedStmt, ResultCode};
88use crate :: {
99 error:: { PSResult , PowerSyncError } ,
1010 ext:: SafeManagedStmt ,
11+ kv:: client_id,
1112 operations:: delete_bucket,
1213 schema:: Schema ,
1314 state:: DatabaseState ,
@@ -36,6 +37,7 @@ pub struct StorageAdapter {
3637 pub progress_stmt : ManagedStmt ,
3738 time_stmt : ManagedStmt ,
3839 delete_subscription : ManagedStmt ,
40+ update_subscription : ManagedStmt ,
3941}
4042
4143impl StorageAdapter {
@@ -52,11 +54,16 @@ impl StorageAdapter {
5254 let delete_subscription =
5355 db. prepare_v2 ( "DELETE FROM ps_stream_subscriptions WHERE id = ?" ) ?;
5456
57+ // language=SQLite
58+ let update_subscription =
59+ db. prepare_v2 ( "UPDATE ps_stream_subscriptions SET active = ?2, is_default = ?3, ttl = ?, expires_at = ?, last_synced_at = ? WHERE id = ?1" ) ?;
60+
5561 Ok ( Self {
5662 db,
5763 progress_stmt : progress,
5864 time_stmt : time,
5965 delete_subscription,
66+ update_subscription,
6067 } )
6168 }
6269
@@ -256,7 +263,23 @@ impl StorageAdapter {
256263 & self ,
257264 include_defaults : bool ,
258265 ) -> Result < StreamSubscriptionRequest , PowerSyncError > {
266+ self . delete_outdated_subscriptions ( ) ?;
267+
259268 let mut subscriptions: Vec < RequestedStreamSubscription > = Vec :: new ( ) ;
269+ let stmt = self
270+ . db
271+ . prepare_v2 ( "SELECT * FROM ps_stream_subscriptions WHERE NOT is_default;" ) ?;
272+
273+ while let ResultCode :: ROW = stmt. step ( ) ? {
274+ let subscription = Self :: read_stream_subscription ( & stmt) ?;
275+
276+ subscriptions. push ( RequestedStreamSubscription {
277+ stream : subscription. stream_name ,
278+ parameters : subscription. local_params ,
279+ override_priority : subscription. local_priority ,
280+ client_id : subscription. id ,
281+ } ) ;
282+ }
260283
261284 Ok ( StreamSubscriptionRequest {
262285 include_defaults,
@@ -296,6 +319,12 @@ impl StorageAdapter {
296319 } )
297320 }
298321
322+ fn delete_outdated_subscriptions ( & self ) -> Result < ( ) , PowerSyncError > {
323+ self . db
324+ . exec_safe ( "DELETE FROM ps_stream_subscriptions WHERE expires_at < unixepoch()" ) ?;
325+ Ok ( ( ) )
326+ }
327+
299328 pub fn iterate_local_subscriptions < F : FnMut ( LocallyTrackedSubscription ) -> ( ) > (
300329 & self ,
301330 mut action : F ,
@@ -324,6 +353,39 @@ impl StorageAdapter {
324353 }
325354 }
326355
356+ pub fn update_subscription (
357+ & self ,
358+ subscription : & LocallyTrackedSubscription ,
359+ ) -> Result < ( ) , PowerSyncError > {
360+ let _ = self . update_subscription . reset ( ) ;
361+
362+ self . update_subscription . bind_int64 ( 1 , subscription. id ) ?;
363+ self . update_subscription
364+ . bind_int ( 2 , if subscription. active { 1 } else { 0 } ) ?;
365+ self . update_subscription
366+ . bind_int ( 3 , if subscription. is_default { 1 } else { 0 } ) ?;
367+ if let Some ( ttl) = subscription. ttl {
368+ self . update_subscription . bind_int64 ( 4 , ttl) ?;
369+ } else {
370+ self . update_subscription . bind_null ( 4 ) ?;
371+ }
372+
373+ if let Some ( expires_at) = subscription. expires_at {
374+ self . update_subscription . bind_int64 ( 5 , expires_at) ?;
375+ } else {
376+ self . update_subscription . bind_null ( 5 ) ?;
377+ }
378+
379+ if let Some ( last_synced_at) = subscription. last_synced_at {
380+ self . update_subscription . bind_int64 ( 6 , last_synced_at) ?;
381+ } else {
382+ self . update_subscription . bind_null ( 6 ) ?;
383+ }
384+
385+ self . update_subscription . exec ( ) ?;
386+ Ok ( ( ) )
387+ }
388+
327389 pub fn delete_subscription ( & self , id : i64 ) -> Result < ( ) , PowerSyncError > {
328390 let _ = self . delete_subscription . reset ( ) ;
329391 self . delete_subscription . bind_int64 ( 1 , id) ?;
0 commit comments