@@ -3,8 +3,14 @@ use core::{cmp::Ordering, hash::Hash, time::Duration};
33use alloc:: { boxed:: Box , string:: String } ;
44use serde:: Deserialize ;
55use serde_with:: { serde_as, DurationSeconds } ;
6+ use sqlite_nostd:: { self as sqlite, Connection } ;
67
7- use crate :: { sync:: BucketPriority , util:: JsonString } ;
8+ use crate :: {
9+ error:: { PSResult , PowerSyncError } ,
10+ ext:: SafeManagedStmt ,
11+ sync:: BucketPriority ,
12+ util:: JsonString ,
13+ } ;
814
915/// A key that uniquely identifies a stream subscription.
1016#[ derive( Debug , PartialEq , PartialOrd , Eq , Ord ) ]
@@ -38,16 +44,20 @@ impl LocallyTrackedSubscription {
3844/// A request sent from a PowerSync SDK to alter the subscriptions managed by this client.
3945#[ derive( Deserialize ) ]
4046pub enum SubscriptionChangeRequest {
47+ #[ serde( rename = "subscribe" ) ]
4148 Subscribe ( SubscribeToStream ) ,
4249}
4350
4451#[ serde_as]
4552#[ derive( Deserialize ) ]
4653pub struct SubscribeToStream {
4754 pub stream : String ,
55+ #[ serde( default ) ]
4856 pub params : Option < Box < serde_json:: value:: RawValue > > ,
4957 #[ serde_as( as = "Option<DurationSeconds>" ) ]
58+ #[ serde( default ) ]
5059 pub ttl : Option < Duration > ,
60+ #[ serde( default ) ]
5161 pub priority : Option < BucketPriority > ,
5262}
5363
@@ -57,3 +67,37 @@ pub struct UnsubscribeFromStream {
5767 pub params : Option < Box < serde_json:: value:: RawValue > > ,
5868 pub immediate : bool ,
5969}
70+
71+ pub fn apply_subscriptions (
72+ db : * mut sqlite:: sqlite3 ,
73+ subscription : SubscriptionChangeRequest ,
74+ ) -> Result < ( ) , PowerSyncError > {
75+ match subscription {
76+ SubscriptionChangeRequest :: Subscribe ( subscription) => {
77+ let stmt = db
78+ . prepare_v2 ( "INSERT INTO ps_stream_subscriptions (stream_name, local_priority, local_params, ttl) VALUES (?, ?2, ?, ?4) ON CONFLICT DO UPDATE SET local_priority = min(coalesce(?2, local_priority), local_priority), ttl = ?4, is_default = FALSE" )
79+ . into_db_result ( db) ?;
80+
81+ stmt. bind_text ( 1 , & subscription. stream , sqlite:: Destructor :: STATIC ) ?;
82+ match & subscription. priority {
83+ Some ( priority) => stmt. bind_int ( 2 , priority. number ) ,
84+ None => stmt. bind_null ( 2 ) ,
85+ } ?;
86+ stmt. bind_text (
87+ 3 ,
88+ match & subscription. params {
89+ Some ( params) => params. get ( ) ,
90+ None => "null" ,
91+ } ,
92+ sqlite:: Destructor :: STATIC ,
93+ ) ?;
94+ match & subscription. ttl {
95+ Some ( ttl) => stmt. bind_int64 ( 4 , ttl. as_secs ( ) as i64 ) ,
96+ None => stmt. bind_null ( 4 ) ,
97+ } ?;
98+ stmt. exec ( ) ?;
99+ }
100+ }
101+
102+ Ok ( ( ) )
103+ }
0 commit comments