11use alloc:: {
22 boxed:: Box ,
33 collections:: { btree_map:: BTreeMap , btree_set:: BTreeSet } ,
4+ format,
45 rc:: Rc ,
56 string:: String ,
67 vec:: Vec ,
@@ -9,9 +10,13 @@ use core::{
910 cell:: RefCell ,
1011 cmp:: min,
1112 hash:: { BuildHasher , Hash } ,
13+ ops:: AddAssign ,
1214} ;
1315use rustc_hash:: FxBuildHasher ;
14- use serde:: Serialize ;
16+ use serde:: {
17+ Serialize ,
18+ ser:: { SerializeMap , SerializeStruct } ,
19+ } ;
1520use sqlite_nostd:: ResultCode ;
1621
1722use crate :: {
@@ -28,7 +33,7 @@ use super::{
2833} ;
2934
3035/// Information about a progressing download.
31- #[ derive( Serialize , Hash ) ]
36+ #[ derive( Hash ) ]
3237pub struct DownloadSyncStatus {
3338 /// Whether the socket to the sync service is currently open and connected.
3439 ///
@@ -136,6 +141,72 @@ impl Default for DownloadSyncStatus {
136141 }
137142}
138143
144+ impl Serialize for DownloadSyncStatus {
145+ fn serialize < S > ( & self , serializer : S ) -> Result < S :: Ok , S :: Error >
146+ where
147+ S : serde:: Serializer ,
148+ {
149+ struct SerializeStreamsWithProgress < ' a > ( & ' a DownloadSyncStatus ) ;
150+
151+ impl < ' a > Serialize for SerializeStreamsWithProgress < ' a > {
152+ fn serialize < S > ( & self , serializer : S ) -> Result < S :: Ok , S :: Error >
153+ where
154+ S : serde:: Serializer ,
155+ {
156+ #[ derive( Serialize ) ]
157+ struct StreamWithProgress < ' a > {
158+ #[ serde( flatten) ]
159+ subscription : & ' a ActiveStreamSubscription ,
160+ progress : ProgressCounters ,
161+ }
162+
163+ let streams = self . 0 . streams . iter ( ) . map ( |sub| {
164+ let mut stream_progress = ProgressCounters :: default ( ) ;
165+ if let Some ( sync_progress) = & self . 0 . downloading {
166+ for bucket in & sub. associated_buckets {
167+ if let Some ( bucket_progress) = sync_progress. buckets . get ( bucket) {
168+ stream_progress += bucket_progress;
169+ }
170+ }
171+ }
172+
173+ StreamWithProgress {
174+ subscription : sub,
175+ progress : stream_progress,
176+ }
177+ } ) ;
178+
179+ serializer. collect_seq ( streams)
180+ }
181+ }
182+
183+ let mut serializer = serializer. serialize_struct ( "DownloadSyncStatus" , 4 ) ?;
184+ serializer. serialize_field ( "connected" , & self . connected ) ?;
185+ serializer. serialize_field ( "connecting" , & self . connecting ) ?;
186+ serializer. serialize_field ( "priority_status" , & self . priority_status ) ?;
187+ serializer. serialize_field ( "downloading" , & self . downloading ) ?;
188+ serializer. serialize_field ( "streams" , & SerializeStreamsWithProgress ( self ) ) ?;
189+
190+ serializer. end ( )
191+ }
192+ }
193+
194+ #[ derive( Serialize , Default ) ]
195+ struct ProgressCounters {
196+ total : i64 ,
197+ downloaded : i64 ,
198+ }
199+
200+ impl < ' a > AddAssign < & ' a BucketProgress > for ProgressCounters {
201+ fn add_assign ( & mut self , rhs : & ' a BucketProgress ) {
202+ let downloaded = rhs. since_last ;
203+ let total = rhs. target_count - rhs. at_last ;
204+
205+ self . total += total;
206+ self . downloaded += downloaded;
207+ }
208+ }
209+
139210pub struct SyncStatusContainer {
140211 status : Rc < RefCell < DownloadSyncStatus > > ,
141212 last_published_hash : u64 ,
@@ -204,11 +275,57 @@ pub struct BucketProgress {
204275 pub target_count : i64 ,
205276}
206277
207- #[ derive( Serialize , Hash ) ]
278+ #[ derive( Hash ) ]
208279pub struct SyncDownloadProgress {
209280 buckets : BTreeMap < String , BucketProgress > ,
210281}
211282
283+ impl Serialize for SyncDownloadProgress {
284+ fn serialize < S > ( & self , serializer : S ) -> Result < S :: Ok , S :: Error >
285+ where
286+ S : serde:: Serializer ,
287+ {
288+ // When we publish a SyncDownloadProgress to clients, avoid serializing every bucket since
289+ // that can lead to very large status updates.
290+ // Instead, we report one entry per priority group.
291+ let mut by_priority = BTreeMap :: < BucketPriority , ProgressCounters > :: new ( ) ;
292+ for progress in self . buckets . values ( ) {
293+ let priority_progress = by_priority. entry ( progress. priority ) . or_default ( ) ;
294+ * priority_progress += progress;
295+ }
296+
297+ // We used to serialize SyncDownloadProgress as-is. To keep backwards-compatibility with the
298+ // general format, we're now synthesizing a fake bucket id for each priority and then report
299+ // each priority as a single-bucket item. This allows keeping client logic unchanged.
300+ struct SerializeWithFakeBucketNames ( BTreeMap < BucketPriority , ProgressCounters > ) ;
301+
302+ impl Serialize for SerializeWithFakeBucketNames {
303+ fn serialize < S > ( & self , serializer : S ) -> Result < S :: Ok , S :: Error >
304+ where
305+ S : serde:: Serializer ,
306+ {
307+ let mut serializer = serializer. serialize_map ( Some ( self . 0 . len ( ) ) ) ?;
308+ for ( priority, progress) in & self . 0 {
309+ serializer. serialize_entry (
310+ & format ! ( "prio_{}" , priority. number) ,
311+ & BucketProgress {
312+ priority : * priority,
313+ at_last : 0 ,
314+ since_last : progress. downloaded ,
315+ target_count : progress. total ,
316+ } ,
317+ ) ?;
318+ }
319+ serializer. end ( )
320+ }
321+ }
322+
323+ let mut serializer = serializer. serialize_struct ( "SyncDownloadProgress" , 1 ) ?;
324+ serializer. serialize_field ( "buckets" , & SerializeWithFakeBucketNames ( by_priority) ) ?;
325+ serializer. end ( )
326+ }
327+ }
328+
212329pub struct SyncProgressFromCheckpoint {
213330 pub progress : SyncDownloadProgress ,
214331 pub needs_counter_reset : bool ,
@@ -282,6 +399,7 @@ pub struct ActiveStreamSubscription {
282399 pub id : i64 ,
283400 pub name : String ,
284401 pub parameters : Option < Box < JsonString > > ,
402+ #[ serde( skip) ]
285403 pub associated_buckets : BTreeSet < String > ,
286404 pub priority : Option < BucketPriority > ,
287405 pub active : bool ,
0 commit comments