11use alloc:: string:: String ;
22use alloc:: vec:: Vec ;
3- use serde:: { de, Deserialize , Deserializer , Serialize } ;
4- use serde_json as json;
3+ use serde:: { Deserialize , Serialize } ;
54
6- use crate :: util:: { deserialize_string_to_i64, deserialize_optional_string_to_i64} ;
7- use alloc:: format;
8- use alloc:: string:: { ToString } ;
9- use core:: fmt;
10- use serde:: de:: { MapAccess , Visitor } ;
11-
12- use sqlite_nostd as sqlite;
13- use sqlite_nostd:: { Connection , ResultCode } ;
14- use uuid:: Uuid ;
15- use crate :: error:: { SQLiteError , PSResult } ;
16-
17- use crate :: ext:: SafeManagedStmt ;
5+ use crate :: util:: { deserialize_optional_string_to_i64, deserialize_string_to_i64} ;
186
197#[ derive( Serialize , Deserialize , Debug ) ]
208pub struct Checkpoint {
@@ -31,154 +19,3 @@ pub struct BucketChecksum {
3119 pub bucket : String ,
3220 pub checksum : i32 ,
3321}
34-
35-
36- #[ derive( Serialize , Deserialize , Debug ) ]
37- pub struct CheckpointComplete {
38- #[ serde( deserialize_with = "deserialize_string_to_i64" ) ]
39- last_op_id : i64
40- }
41-
42- #[ derive( Serialize , Deserialize , Debug ) ]
43- pub struct SyncBucketData {
44- // TODO: complete this
45- bucket : String
46- }
47-
48- #[ derive( Serialize , Deserialize , Debug ) ]
49- pub struct Keepalive {
50- token_expires_in : i32
51- }
52-
53- #[ derive( Serialize , Deserialize , Debug ) ]
54- pub struct CheckpointDiff {
55- #[ serde( deserialize_with = "deserialize_string_to_i64" ) ]
56- last_op_id : i64 ,
57- updated_buckets : Vec < BucketChecksum > ,
58- removed_buckets : Vec < String > ,
59- #[ serde( default ) ]
60- #[ serde( deserialize_with = "deserialize_optional_string_to_i64" ) ]
61- write_checkpoint : Option < i64 >
62- }
63-
64-
65-
66- #[ derive( Debug ) ]
67- pub enum StreamingSyncLine {
68- CheckpointLine ( Checkpoint ) ,
69- CheckpointDiffLine ( CheckpointDiff ) ,
70- CheckpointCompleteLine ( CheckpointComplete ) ,
71- SyncBucketDataLine ( SyncBucketData ) ,
72- KeepaliveLine ( i32 ) ,
73- Unknown
74- }
75-
76- // Serde does not supporting ignoring unknown fields in externally-tagged enums, so we use our own
77- // serializer.
78-
79- struct StreamingSyncLineVisitor ;
80-
81- impl < ' de > Visitor < ' de > for StreamingSyncLineVisitor {
82- type Value = StreamingSyncLine ;
83-
84- fn expecting ( & self , formatter : & mut fmt:: Formatter ) -> fmt:: Result {
85- formatter. write_str ( "sync data" )
86- }
87-
88- fn visit_map < A > ( self , mut access : A ) -> Result < Self :: Value , A :: Error >
89- where
90- A : MapAccess < ' de > ,
91- {
92- let mut r = StreamingSyncLine :: Unknown ;
93- while let Some ( ( key, value) ) = access. next_entry :: < String , json:: Value > ( ) ? {
94- if !matches ! ( r, StreamingSyncLine :: Unknown ) {
95- // Generally, we don't expect to receive multiple in one line.
96- // But if it does happen, we keep the first one.
97- continue ;
98- }
99- match key. as_str ( ) {
100- "checkpoint" => {
101- r = StreamingSyncLine :: CheckpointLine (
102- serde_json:: from_value ( value) . map_err ( de:: Error :: custom) ?,
103- ) ;
104- }
105- "checkpoint_diff" => {
106- r = StreamingSyncLine :: CheckpointDiffLine (
107- serde_json:: from_value ( value) . map_err ( de:: Error :: custom) ?,
108- ) ;
109- }
110- "checkpoint_complete" => {
111- r = StreamingSyncLine :: CheckpointCompleteLine (
112- serde_json:: from_value ( value) . map_err ( de:: Error :: custom) ?,
113- ) ;
114- }
115- "data" => {
116- r = StreamingSyncLine :: SyncBucketDataLine (
117- serde_json:: from_value ( value) . map_err ( de:: Error :: custom) ?,
118- ) ;
119- }
120- "token_expires_in" => {
121- r = StreamingSyncLine :: KeepaliveLine (
122- serde_json:: from_value ( value) . map_err ( de:: Error :: custom) ?,
123- ) ;
124- }
125- _ => { }
126- }
127- }
128-
129- Ok ( r)
130- }
131- }
132-
133- impl < ' de > Deserialize < ' de > for StreamingSyncLine {
134- fn deserialize < D > ( deserializer : D ) -> Result < Self , D :: Error >
135- where
136- D : Deserializer < ' de > ,
137- {
138- deserializer. deserialize_map ( StreamingSyncLineVisitor )
139- }
140- }
141-
142-
143- #[ cfg( test) ]
144- mod tests {
145- use core:: assert_matches:: assert_matches;
146- use super :: * ;
147-
148- #[ test]
149- fn json_parsing_test ( ) {
150- let line: StreamingSyncLine = serde_json:: from_str ( r#"{"token_expires_in": 42}"# ) . unwrap ( ) ;
151- assert_matches ! ( line, StreamingSyncLine :: KeepaliveLine ( 42 ) ) ;
152-
153- let line: StreamingSyncLine = serde_json:: from_str ( r#"{"checkpoint_complete": {"last_op_id": "123"}}"# ) . unwrap ( ) ;
154- assert_matches ! ( line, StreamingSyncLine :: CheckpointCompleteLine ( CheckpointComplete { last_op_id: 123 } ) ) ;
155-
156- let line: StreamingSyncLine = serde_json:: from_str ( r#"{"checkpoint_complete": {"last_op_id": "123", "other": "foo"}}"# ) . unwrap ( ) ;
157- assert_matches ! ( line, StreamingSyncLine :: CheckpointCompleteLine ( CheckpointComplete { last_op_id: 123 } ) ) ;
158-
159- let line: StreamingSyncLine = serde_json:: from_str ( r#"{"checkpoint": {"last_op_id": "123", "buckets": []}}"# ) . unwrap ( ) ;
160- assert_matches ! ( line, StreamingSyncLine :: CheckpointLine ( Checkpoint { last_op_id: 123 , .. } ) ) ;
161-
162- let line: StreamingSyncLine = serde_json:: from_str ( r#"{"checkpoint": {"last_op_id": "123", "write_checkpoint": "42", "buckets": []}}"# ) . unwrap ( ) ;
163- assert_matches ! ( line, StreamingSyncLine :: CheckpointLine ( Checkpoint { last_op_id: 123 , write_checkpoint: Some ( 42 ) , .. } ) ) ;
164-
165- let line: StreamingSyncLine = serde_json:: from_str ( r#"{"checkpoint_diff": {"last_op_id": "123", "updated_buckets": [], "removed_buckets": []}}"# ) . unwrap ( ) ;
166- assert_matches ! ( line, StreamingSyncLine :: CheckpointDiffLine ( CheckpointDiff { last_op_id: 123 , .. } ) ) ;
167-
168- // Additional/unknown fields
169- let line: StreamingSyncLine = serde_json:: from_str ( r#"{"token_expires_in": 42, "foo": 1}"# ) . unwrap ( ) ;
170- assert_matches ! ( line, StreamingSyncLine :: KeepaliveLine ( 42 ) ) ;
171- let line: StreamingSyncLine = serde_json:: from_str ( r#"{}"# ) . unwrap ( ) ;
172- assert_matches ! ( line, StreamingSyncLine :: Unknown ) ;
173- let line: StreamingSyncLine = serde_json:: from_str ( r#"{"other":"test"}"# ) . unwrap ( ) ;
174- assert_matches ! ( line, StreamingSyncLine :: Unknown ) ;
175-
176- // Multiple - keep the first one
177- let line: StreamingSyncLine = serde_json:: from_str ( r#"{"token_expires_in": 42, "checkpoint_complete": {"last_op_id": "123"}}"# ) . unwrap ( ) ;
178- assert_matches ! ( line, StreamingSyncLine :: KeepaliveLine ( 42 ) ) ;
179-
180- // Test error handling
181- let line: Result < StreamingSyncLine , _ > = serde_json:: from_str ( r#"{"token_expires_in": "42"}"# ) ;
182- assert ! ( line. is_err( ) ) ;
183- }
184- }
0 commit comments