11use alloc:: { boxed:: Box , collections:: btree_map:: BTreeMap , rc:: Rc , string:: String , vec:: Vec } ;
22use core:: {
33 cell:: RefCell ,
4+ cmp:: min,
45 hash:: { BuildHasher , Hash } ,
56} ;
67use rustc_hash:: FxBuildHasher ;
78use serde:: Serialize ;
89use sqlite_nostd:: ResultCode ;
910
1011use crate :: {
11- sync:: { storage_adapter:: StorageAdapter , subscriptions:: LocallyTrackedSubscription } ,
12+ sync:: {
13+ checkpoint:: OwnedBucketChecksum , storage_adapter:: StorageAdapter ,
14+ subscriptions:: LocallyTrackedSubscription ,
15+ } ,
1216 util:: JsonString ,
1317} ;
1418
@@ -37,7 +41,7 @@ pub struct DownloadSyncStatus {
3741 /// When a download is active (that is, a `checkpoint` or `checkpoint_diff` line has been
3842 /// received), information about how far the download has progressed.
3943 pub downloading : Option < SyncDownloadProgress > ,
40- pub streams : Vec < ActiveStreamSubscription > ,
44+ pub streams : Option < Vec < ActiveStreamSubscription > > ,
4145}
4246
4347impl DownloadSyncStatus {
@@ -75,7 +79,7 @@ impl DownloadSyncStatus {
7579 self . mark_connected ( ) ;
7680
7781 self . downloading = Some ( progress) ;
78- self . streams = subscriptions;
82+ self . streams = Some ( subscriptions) ;
7983 }
8084
8185 /// Increments [SyncDownloadProgress] progress for the given [DataLine].
@@ -119,7 +123,7 @@ impl Default for DownloadSyncStatus {
119123 connecting : false ,
120124 downloading : None ,
121125 priority_status : Vec :: new ( ) ,
122- streams : Vec :: new ( ) ,
126+ streams : None ,
123127 }
124128 }
125129}
@@ -137,6 +141,10 @@ impl SyncStatusContainer {
137141 }
138142 }
139143
144+ pub fn inner ( & self ) -> & Rc < RefCell < DownloadSyncStatus > > {
145+ & self . status
146+ }
147+
140148 /// Invokes a function to update the sync status, then emits an [Instruction::UpdateSyncStatus]
141149 /// if the function did indeed change the status.
142150 pub fn update < F : FnOnce ( & mut DownloadSyncStatus ) -> ( ) > (
@@ -262,9 +270,12 @@ impl SyncDownloadProgress {
262270
263271#[ derive( Serialize , Hash ) ]
264272pub struct ActiveStreamSubscription {
273+ #[ serde( skip) ]
274+ pub id : i64 ,
265275 pub name : String ,
266276 pub parameters : Option < Box < JsonString > > ,
267277 pub associated_buckets : Vec < String > ,
278+ pub priority : Option < BucketPriority > ,
268279 pub active : bool ,
269280 pub is_default : bool ,
270281 pub expires_at : Option < Timestamp > ,
@@ -274,13 +285,30 @@ pub struct ActiveStreamSubscription {
274285impl ActiveStreamSubscription {
275286 pub fn from_local ( local : & LocallyTrackedSubscription ) -> Self {
276287 Self {
288+ id : local. id ,
277289 name : local. stream_name . clone ( ) ,
278290 parameters : local. local_params . clone ( ) ,
279291 is_default : local. is_default ,
292+ priority : None ,
280293 associated_buckets : Vec :: new ( ) ,
281294 active : local. active ,
282295 expires_at : local. expires_at . clone ( ) . map ( |e| Timestamp ( e) ) ,
283296 last_synced_at : local. last_synced_at . map ( |e| Timestamp ( e) ) ,
284297 }
285298 }
299+
300+ pub fn mark_associated_with_bucket ( & mut self , bucket : & OwnedBucketChecksum ) {
301+ self . associated_buckets . push ( bucket. bucket . clone ( ) ) ;
302+ self . priority = Some ( match self . priority {
303+ None => bucket. priority ,
304+ Some ( prio) => min ( prio, bucket. priority ) ,
305+ } ) ;
306+ }
307+
308+ pub fn is_in_priority ( & self , prio : Option < BucketPriority > ) -> bool {
309+ match prio {
310+ None => true ,
311+ Some ( prio) => self . priority >= Some ( prio) ,
312+ }
313+ }
286314}
0 commit comments