Skip to content

Commit f731d70

Browse files
committed
Initial rust sync support.
1 parent 89f5181 commit f731d70

File tree

14 files changed

+859
-306
lines changed

14 files changed

+859
-306
lines changed

PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,11 @@ public interface IPowerSyncDatabase : IEventStream<PowerSyncDBEvent>
6060

6161
Task<NonQueryResult> Execute(string query, object[]? parameters = null);
6262

63-
Task<T[]> GetAll<T>(string sql, params object[]? parameters);
63+
Task<T[]> GetAll<T>(string sql, object[]? parameters = null);
6464

65-
Task<T?> GetOptional<T>(string sql, params object[]? parameters);
65+
Task<T?> GetOptional<T>(string sql, object[]? parameters = null);
6666

67-
Task<T> Get<T>(string sql, params object[]? parameters);
67+
Task<T> Get<T>(string sql, object[]? parameters = null);
6868

6969
Task<T> ReadLock<T>(Func<ILockContext, Task<T>> fn, DBLockOptions? options = null);
7070

@@ -236,7 +236,7 @@ protected async Task UpdateHasSynced()
236236
);
237237

238238
DateTime? lastCompleteSync = null;
239-
239+
240240
// TODO: Will be altered/extended when reporting individual sync priority statuses is supported
241241
foreach (var result in results)
242242
{

PowerSync/PowerSync.Common/Client/Sync/Bucket/BucketStorageAdapter.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,4 +113,9 @@ public interface IBucketStorageAdapter : IEventStream<BucketStorageEvent>
113113
/// Get a unique client ID.
114114
/// </summary>
115115
Task<string> GetClientId();
116+
117+
/// <summary>
118+
/// Invokes the `powersync_control` function for the sync client.
119+
/// </summary>
120+
Task<string> Control(string op, object? payload);
116121
}

PowerSync/PowerSync.Common/Client/Sync/Bucket/SqliteBucketStorage.cs

Lines changed: 39 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -351,16 +351,18 @@ public async Task<bool> UpdateLocalTarget(Func<Task<string>> callback)
351351

352352
if (seqAfter != seqBefore)
353353
{
354-
logger.LogDebug("[updateLocalTarget] seqAfter ({seqAfter}) != seqBefore ({seqBefore})", seqAfter, seqBefore);
354+
logger.LogDebug("[updateLocalTarget] seqAfter ({seqAfter}) != seqBefore ({seqBefore})", seqAfter,
355+
seqBefore);
355356
return false;
356357
}
357358

358359
var response = await tx.Execute(
359-
"UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='$local'",
360-
[opId]
361-
);
360+
"UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='$local'",
361+
[opId]
362+
);
362363

363-
logger.LogDebug("[updateLocalTarget] Response from updating target_op: {response}", JsonConvert.SerializeObject(response));
364+
logger.LogDebug("[updateLocalTarget] Response from updating target_op: {response}",
365+
JsonConvert.SerializeObject(response));
364366
return true;
365367
});
366368
}
@@ -388,33 +390,33 @@ public async Task<bool> UpdateLocalTarget(Func<Task<string>> callback)
388390
var last = all[all.Length - 1];
389391

390392
return new CrudBatch(
391-
Crud: all,
392-
HaveMore: true,
393-
CompleteCallback: async (string? writeCheckpoint) =>
394-
{
395-
await db.WriteTransaction(async tx =>
393+
Crud: all,
394+
HaveMore: true,
395+
CompleteCallback: async (string? writeCheckpoint) =>
396396
{
397-
await tx.Execute("DELETE FROM ps_crud WHERE id <= ?", [last.ClientId]);
398-
399-
if (!string.IsNullOrEmpty(writeCheckpoint))
397+
await db.WriteTransaction(async tx =>
400398
{
401-
var crudResult = await tx.GetAll<object>("SELECT 1 FROM ps_crud LIMIT 1");
402-
if (crudResult?.Length > 0)
399+
await tx.Execute("DELETE FROM ps_crud WHERE id <= ?", [last.ClientId]);
400+
401+
if (!string.IsNullOrEmpty(writeCheckpoint))
402+
{
403+
var crudResult = await tx.GetAll<object>("SELECT 1 FROM ps_crud LIMIT 1");
404+
if (crudResult?.Length > 0)
405+
{
406+
await tx.Execute(
407+
"UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='$local'",
408+
[writeCheckpoint]);
409+
}
410+
}
411+
else
403412
{
404413
await tx.Execute(
405414
"UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='$local'",
406-
[writeCheckpoint]);
415+
[GetMaxOpId()]);
407416
}
408-
}
409-
else
410-
{
411-
await tx.Execute(
412-
"UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='$local'",
413-
[GetMaxOpId()]);
414-
}
415-
});
416-
}
417-
);
417+
});
418+
}
419+
);
418420
}
419421

420422
public async Task<CrudEntry?> NextCrudItem()
@@ -434,4 +436,15 @@ public async Task SetTargetCheckpoint(Checkpoint checkpoint)
434436
// No Op
435437
await Task.CompletedTask;
436438
}
439+
440+
record ControlResult(string? r);
441+
442+
public async Task<string> Control(string op, object? payload = null)
443+
{
444+
return await db.WriteTransaction(async tx =>
445+
{
446+
var result = await tx.Get<ControlResult>("SELECT powersync_control(?, ?) AS r", [op, payload ?? ""]);
447+
return result.r!;
448+
});
449+
}
437450
}

PowerSync/PowerSync.Common/Client/Sync/Bucket/SyncDataBucket.cs

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,12 @@ public class SyncDataBucketJSON
2424

2525
public class SyncDataBucket(
2626
string bucket,
27-
OplogEntry[] data,
28-
bool hasMore = false
29-
// string? after = null,
30-
// string? nextAfter = null
27+
OplogEntry[] data
3128
)
3229
{
3330
public string Bucket { get; private set; } = bucket;
3431
public OplogEntry[] Data { get; private set; } = data;
3532

36-
// public string? After { get; private set; } = after;
37-
// public string? NextAfter { get; private set; } = nextAfter;
38-
3933
public static SyncDataBucket FromRow(SyncDataBucketJSON row)
4034
{
4135
var dataEntries = row.Data != null
@@ -48,9 +42,6 @@ public static SyncDataBucket FromRow(SyncDataBucketJSON row)
4842
return new SyncDataBucket(
4943
row.Bucket,
5044
dataEntries
51-
// row.HasMore ?? false,
52-
// row.After,
53-
// row.NextAfter
5445
);
5546
}
5647

@@ -64,9 +55,6 @@ public string ToJSON()
6455
var jsonObject = new SyncDataBucketJSON
6556
{
6657
Bucket = Bucket,
67-
// HasMore = HasMore,
68-
// After = After,
69-
// NextAfter = NextAfter,
7058
Data = dataObjects
7159
};
7260

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
using Newtonsoft.Json.Linq;
2+
using Newtonsoft.Json;
3+
4+
namespace PowerSync.Common.Client.Sync.Stream;
5+
6+
/// <summary>
7+
/// An internal instruction emitted by the sync client in the core extension in response to the
8+
/// SDK passing sync data into the extension.
9+
/// </summary>
10+
public abstract class Instruction
11+
{
12+
13+
public static Instruction[] ParseInstructions(string rawResponse)
14+
{
15+
var jsonArray = JArray.Parse(rawResponse);
16+
List<Instruction> instructions = [];
17+
18+
foreach (JObject item in jsonArray)
19+
{
20+
var instruction = ParseInstruction(item);
21+
if (instruction == null)
22+
{
23+
throw new JsonSerializationException("Failed to parse instruction from JSON.");
24+
}
25+
instructions.Add(instruction);
26+
}
27+
28+
return instructions.ToArray();
29+
}
30+
31+
public static Instruction? ParseInstruction(JObject json)
32+
{
33+
if (json.ContainsKey("LogLine"))
34+
return json["LogLine"]!.ToObject<LogLine>();
35+
if (json.ContainsKey("UpdateSyncStatus"))
36+
return json["UpdateSyncStatus"]!.ToObject<UpdateSyncStatus>();
37+
if (json.ContainsKey("EstablishSyncStream"))
38+
return json["EstablishSyncStream"]!.ToObject<EstablishSyncStream>();
39+
if (json.ContainsKey("FetchCredentials"))
40+
return json["FetchCredentials"]!.ToObject<FetchCredentials>();
41+
if (json.ContainsKey("CloseSyncStream"))
42+
return new CloseSyncStream();
43+
if (json.ContainsKey("FlushFileSystem"))
44+
return new FlushFileSystem();
45+
if (json.ContainsKey("DidCompleteSync"))
46+
return new DidCompleteSync();
47+
48+
throw new JsonSerializationException("Unknown Instruction type.");
49+
}
50+
}
51+
52+
public class LogLine : Instruction
53+
{
54+
[JsonProperty("severity")]
55+
public string Severity { get; set; } = null!; // "DEBUG", "INFO", "WARNING"
56+
57+
[JsonProperty("line")]
58+
public string Line { get; set; } = null!;
59+
}
60+
61+
public class EstablishSyncStream : Instruction
62+
{
63+
[JsonProperty("request")]
64+
public StreamingSyncRequest Request { get; set; } = null!;
65+
}
66+
67+
public class UpdateSyncStatus : Instruction
68+
{
69+
[JsonProperty("status")]
70+
public CoreSyncStatus Status { get; set; } = null!;
71+
}
72+
73+
public class CoreSyncStatus
74+
{
75+
[JsonProperty("connected")]
76+
public bool Connected { get; set; }
77+
78+
[JsonProperty("connecting")]
79+
public bool Connecting { get; set; }
80+
81+
[JsonProperty("priority_status")]
82+
public List<SyncPriorityStatus> PriorityStatus { get; set; } = [];
83+
84+
[JsonProperty("downloading")]
85+
public DownloadProgress? Downloading { get; set; }
86+
}
87+
88+
public class SyncPriorityStatus
89+
{
90+
[JsonProperty("priority")]
91+
public int Priority { get; set; }
92+
93+
[JsonProperty("last_synced_at")]
94+
public long LastSyncedAt { get; set; }
95+
96+
[JsonProperty("has_synced")]
97+
public bool? HasSynced { get; set; }
98+
}
99+
100+
public class DownloadProgress
101+
{
102+
[JsonProperty("buckets")]
103+
public Dictionary<string, BucketProgress> Buckets { get; set; } = null!;
104+
}
105+
106+
public class BucketProgress
107+
{
108+
[JsonProperty("priority")]
109+
public int Priority { get; set; }
110+
111+
[JsonProperty("at_last")]
112+
public int AtLast { get; set; }
113+
114+
[JsonProperty("since_last")]
115+
public int SinceLast { get; set; }
116+
117+
[JsonProperty("target_count")]
118+
public int TargetCount { get; set; }
119+
}
120+
121+
public class FetchCredentials : Instruction
122+
{
123+
[JsonProperty("did_expire")]
124+
public bool DidExpire { get; set; }
125+
}
126+
127+
public class CloseSyncStream : Instruction { }
128+
public class FlushFileSystem : Instruction { }
129+
public class DidCompleteSync : Instruction { }

PowerSync/PowerSync.Common/Client/Sync/Stream/Remote.cs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,37 @@ public async Task<T> Get<T>(string path, Dictionary<string, string>? headers = n
124124
return JsonConvert.DeserializeObject<T>(responseData)!;
125125
}
126126

127+
/// <summary>
128+
/// Posts to the stream endpoint and returns a raw NDJSON stream that can be read line by line.
129+
/// </summary>
130+
public async Task<Stream> PostStreamRaw(SyncStreamOptions options)
131+
{
132+
var requestMessage = await BuildRequest(HttpMethod.Post, options.Path, options.Data, options.Headers);
133+
var response = await httpClient.SendAsync(requestMessage, HttpCompletionOption.ResponseHeadersRead, options.CancellationToken);
134+
135+
if (response.Content == null)
136+
{
137+
throw new HttpRequestException($"HTTP {response.StatusCode}: No content");
138+
}
139+
140+
if (response.StatusCode == System.Net.HttpStatusCode.Unauthorized)
141+
{
142+
InvalidateCredentials();
143+
}
144+
145+
if (!response.IsSuccessStatusCode)
146+
{
147+
var errorText = await response.Content.ReadAsStringAsync();
148+
throw new HttpRequestException($"HTTP {response.StatusCode}: {errorText}");
149+
}
150+
151+
return await response.Content.ReadAsStreamAsync();
152+
}
153+
154+
155+
/// <summary>
156+
/// Originally used for the C# streaming sync implementation.
157+
/// </summary>
127158
public async IAsyncEnumerable<StreamingSyncLine?> PostStream(SyncStreamOptions options)
128159
{
129160
using var requestMessage = await BuildRequest(HttpMethod.Post, options.Path, options.Data, options.Headers);

0 commit comments

Comments
 (0)