Skip to content

Commit 3bc7cf5

Browse files
committed
Implement sync streams
1 parent 6913958 commit 3bc7cf5

File tree

9 files changed

+295
-1
lines changed

9 files changed

+295
-1
lines changed

Package.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ let packageName = "PowerSync"
77

88
// Set this to the absolute path of your Kotlin SDK checkout if you want to use a local Kotlin
99
// build. Also see docs/LocalBuild.md for details
10-
let localKotlinSdkOverride: String? = nil
10+
let localKotlinSdkOverride: String? = "/Users/simon/src/powersync-kotlin"
1111

1212
// Set this to the absolute path of your powersync-sqlite-core checkout if you want to use a
1313
// local build of the core extension.

Sources/PowerSync/Kotlin/KotlinPowerSyncDatabaseImpl.swift

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,11 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol,
329329
func close() async throws {
330330
try await kotlinDatabase.close()
331331
}
332+
333+
func syncStream(name: String, params: JsonParam?) -> any SyncStream {
334+
let rawStream = kotlinDatabase.syncStream(name: name, parameters: params?.mapValues { $0.toKotlinMap() });
335+
return KotlinSyncStream(kotlinStream: rawStream)
336+
}
332337

333338
/// Tries to convert Kotlin PowerSyncExceptions to Swift Exceptions
334339
private func wrapPowerSyncException<R: Sendable>(

Sources/PowerSync/Kotlin/sync/KotlinSyncStatusData.swift

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,27 @@ extension KotlinSyncStatusDataProtocol {
7272
)
7373
)
7474
}
75+
76+
var syncStreams: [SyncStreamStatus]? {
77+
return base.syncStreams?.map(mapSyncStreamStatus)
78+
}
79+
80+
func forStream(stream: SyncStreamDescription) -> SyncStreamStatus? {
81+
let name = stream.name
82+
// To match parameters, first check if we already have access to a Kotlin map for parameters.
83+
let parameters = if let kotlinStream = stream as? any HasKotlinStreamDescription {
84+
// Fast path: Reuse Kotlin map
85+
kotlinStream.kotlinParameters
86+
} else {
87+
// We don't? Ok, map to Kotlin.
88+
stream.parameters?.mapValues { $0.toValue() }
89+
}
90+
91+
guard let kotlinStatus = syncStatusForStream(status: base, name: stream.name, parameters: parameters) else {
92+
return nil
93+
}
94+
return mapSyncStreamStatus(kotlinStatus)
95+
}
7596

7697
private func mapPriorityStatus(_ status: PowerSyncKotlin.PriorityStatusEntry) -> PriorityStatusEntry {
7798
var lastSyncedAt: Date?
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
import Foundation
2+
import PowerSyncKotlin
3+
4+
class KotlinStreamDescription<T: PowerSyncKotlin.SyncStreamDescription> {
5+
let inner: T
6+
let name: String
7+
let parameters: JsonParam?
8+
let kotlinParameters: [String: Any?]?
9+
10+
init(inner: T) {
11+
self.inner = inner
12+
self.name = inner.name
13+
self.kotlinParameters = inner.parameters
14+
self.parameters = inner.parameters?.mapValues { JsonValue.fromValue(raw: $0) }
15+
}
16+
}
17+
18+
protocol HasKotlinStreamDescription {
19+
associatedtype Description: PowerSyncKotlin.SyncStreamDescription
20+
21+
var stream: KotlinStreamDescription<Description> { get }
22+
}
23+
24+
extension HasKotlinStreamDescription {
25+
var kotlinParameters: [String: Any?]? {
26+
self.stream.kotlinParameters
27+
}
28+
}
29+
30+
class KotlinSyncStream: SyncStream, HasKotlinStreamDescription,
31+
// `PowerSyncKotlin.SyncStream` cannot be marked as Sendable, but is thread-safe.
32+
@unchecked Sendable
33+
{
34+
let stream: KotlinStreamDescription<PowerSyncKotlin.SyncStream>
35+
36+
init(kotlinStream: PowerSyncKotlin.SyncStream) {
37+
self.stream = KotlinStreamDescription(inner: kotlinStream);
38+
}
39+
40+
var name: String {
41+
stream.name
42+
}
43+
44+
var parameters: JsonParam? {
45+
stream.parameters
46+
}
47+
48+
func subscribe(ttl: TimeInterval?, priority: BucketPriority?) async throws -> any SyncStreamSubscription {
49+
let kotlinTtl: Optional<KotlinDouble> = if let ttl {
50+
KotlinDouble(value: ttl)
51+
} else {
52+
nil
53+
}
54+
let kotlinPriority: Optional<KotlinInt> = if let priority {
55+
KotlinInt(value: priority.priorityCode)
56+
} else {
57+
nil
58+
}
59+
60+
let kotlinSubscription = try await syncStreamSubscribeSwift(
61+
stream: stream.inner,
62+
ttl: kotlinTtl,
63+
priority: kotlinPriority,
64+
);
65+
return KotlinSyncStreamSubscription(kotlinStream: kotlinSubscription)
66+
}
67+
68+
func unsubscribeAll() async throws {
69+
try await stream.inner.unsubscribeAll()
70+
}
71+
}
72+
73+
class KotlinSyncStreamSubscription: SyncStreamSubscription, HasKotlinStreamDescription,
74+
// `PowerSyncKotlin.SyncStreamSubscription` cannot be marked as Sendable, but is thread-safe.
75+
@unchecked Sendable
76+
{
77+
let stream: KotlinStreamDescription<PowerSyncKotlin.SyncStreamSubscription>
78+
79+
init(kotlinStream: PowerSyncKotlin.SyncStreamSubscription) {
80+
self.stream = KotlinStreamDescription(inner: kotlinStream)
81+
}
82+
83+
var name: String {
84+
stream.name
85+
}
86+
var parameters: JsonParam? {
87+
stream.parameters
88+
}
89+
90+
func waitForFirstSync() async throws {
91+
try await stream.inner.waitForFirstSync()
92+
}
93+
94+
func unsubscribe() async throws {
95+
try await stream.inner.unsubscribe()
96+
}
97+
}
98+
99+
func mapSyncStreamStatus(_ status: PowerSyncKotlin.SyncStreamStatus) -> SyncStreamStatus {
100+
let progress = status.progress.map { ProgressNumbers(source: $0) }
101+
let subscription = status.subscription
102+
103+
return SyncStreamStatus(
104+
progress: progress,
105+
subscription: SyncSubscriptionDescription(
106+
name: subscription.name,
107+
parameters: subscription.parameters?.mapValues { JsonValue.fromValue(raw: $0) },
108+
active: subscription.active,
109+
isDefault: subscription.isDefault,
110+
hasExplicitSubscription: subscription.hasExplicitSubscription,
111+
expiresAt: subscription.expiresAt.map { Double($0.epochSeconds) },
112+
lastSyncedAt: subscription.lastSyncedAt.map { Double($0.epochSeconds) }
113+
)
114+
)
115+
}
116+
117+
struct ProgressNumbers: ProgressWithOperations {
118+
let totalOperations: Int32
119+
let downloadedOperations: Int32
120+
121+
init(source: PowerSyncKotlin.ProgressWithOperations) {
122+
self.totalOperations = source.totalOperations
123+
self.downloadedOperations = source.downloadedOperations
124+
}
125+
}

Sources/PowerSync/Protocol/PowerSyncDatabaseProtocol.swift

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,11 @@ public protocol PowerSyncDatabaseProtocol: Queries, Sendable {
230230
/// Using soft clears is recommended where it's not a security issue that old data could be reconstructed from
231231
/// the database.
232232
func disconnectAndClear(clearLocal: Bool, soft: Bool) async throws
233+
234+
/// Create a ``SyncStream`` instance for the given name and parameters.
235+
///
236+
/// Use ``SyncStream/subscribe`` on the returned instance to subscribe to the stream.
237+
func syncStream(name: String, params: JsonParam?) -> any SyncStream
233238

234239
/// Close the database, releasing resources.
235240
/// Also disconnects any active connection.

Sources/PowerSync/Protocol/db/JsonParam.swift

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,27 @@ public enum JsonValue: Codable, Sendable {
5050
return anyDict
5151
}
5252
}
53+
54+
/// Converts a raw Swift value into a ``JsonValue``.
55+
///
56+
/// The value must be one of the types returned by ``JsonValue/toValue()``.
57+
static func fromValue(raw: Any?) -> Self {
58+
if let string = raw as? String {
59+
return Self.string(string)
60+
} else if let int = raw as? Int {
61+
return Self.int(int)
62+
} else if let double = raw as? Double {
63+
return Self.double(double)
64+
} else if let bool = raw as? Bool {
65+
return Self.bool(bool)
66+
} else if let array = raw as? [Any?] {
67+
return Self.array(array.map(fromValue))
68+
} else if let object = raw as? [String: Any?] {
69+
return Self.object(object.mapValues(fromValue))
70+
} else {
71+
return Self.null
72+
}
73+
}
5374
}
5475

5576
/// A typealias representing a top-level JSON object with string keys and `JSONValue` values.

Sources/PowerSync/Protocol/sync/SyncStatusData.swift

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,15 @@ public protocol SyncStatusData: Sendable {
4747
/// - Parameter priority: The priority for which the status is requested.
4848
/// - Returns: A `PriorityStatusEntry` representing the synchronization status for the given priority.
4949
func statusForPriority(_ priority: BucketPriority) -> PriorityStatusEntry
50+
51+
/// All sync streams currently being tracked in the database.
52+
///
53+
/// This returns null when the database is currently being opened and we don't have reliable information about
54+
/// included streams yet.
55+
var syncStreams: [SyncStreamStatus]? { get }
56+
57+
/// Status information for the given stream, if it's a stream that is currently tracked by the sync client.
58+
func forStream(stream: SyncStreamDescription) -> SyncStreamStatus?
5059
}
5160

5261
/// A protocol extending `SyncStatusData` to include flow-based updates for synchronization status.
@@ -55,3 +64,11 @@ public protocol SyncStatus: SyncStatusData, Sendable {
5564
/// - Returns: An `AsyncStream` that emits updates whenever the synchronization status changes.
5665
func asFlow() -> AsyncStream<SyncStatusData>
5766
}
67+
68+
/// Current information about a ``SyncStreamSubscription``.
69+
public struct SyncStreamStatus {
70+
/// If the sync status is currently downloading, information about download progress related to this stream.
71+
let progress: ProgressWithOperations?
72+
/// The ``SyncSubscriptionDescription`` providing information about the subscription.
73+
let subscription: SyncSubscriptionDescription
74+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
import Foundation
2+
3+
/// Information uniquely identifying a sync stream that can be subscribed to.
4+
public protocol SyncStreamDescription: Sendable {
5+
/// The name of the sync stream as it appeaers in the stream definition for the PowerSync service.
6+
var name: String { get }
7+
/// The parameters used to subscribe to the stream, if any.
8+
///
9+
/// The same stream can be subscribed to multiple times with different parameters.
10+
var parameters: JsonParam? { get }
11+
}
12+
13+
/// A handle to a ``SyncStreamDescription`` that allows subscribing to the stream.
14+
///
15+
/// To obtain an instance of ``SyncStream``, call ``PowerSyncDatabase/syncStream``.
16+
public protocol SyncStream: SyncStreamDescription {
17+
/// Creates a new subscription on this stream.
18+
///
19+
/// As long as a subscription is active on the stream, the sync client will request it from the sync service.
20+
///
21+
/// This call is generally quite cheap and can be issued frequently, e.g. when a view needing data from the stream is activated.
22+
func subscribe(ttl: TimeInterval?, priority: BucketPriority?) async throws -> any SyncStreamSubscription
23+
24+
/// Unsubscribes all existing subscriptions on this stream.
25+
///
26+
/// This is a potentially unsafe method since it interferes with other subscriptions. A better option is to call
27+
/// ``SyncStreamSubscription/unsubscribe``.
28+
func unsubscribeAll() async throws
29+
}
30+
31+
extension SyncStream {
32+
33+
public func subscribe() async throws -> any SyncStreamSubscription {
34+
return try await subscribe(ttl: nil, priority: nil)
35+
}
36+
}
37+
38+
/// A ``SyncStream`` that has an active subscription.
39+
public protocol SyncStreamSubscription: SyncStreamDescription {
40+
/// An asynchronous function that completes once data on this stream has been synced.
41+
func waitForFirstSync() async throws
42+
/// Removes this subscription.
43+
///
44+
/// Once all ``SyncStreamSubscription``s for a ``SyncStream`` have been unsubscribed, the `ttl`
45+
/// for that stream thats running. When it expires without subscribing again, the stream will be evicted.
46+
func unsubscribe() async throws
47+
}
48+
49+
/// Information about a subscribed sync stream.
50+
///
51+
/// This includes the ``SyncStreamDescription`` along with information about the current sync status.
52+
public struct SyncSubscriptionDescription: SyncStreamDescription {
53+
public let name: String
54+
public let parameters: JsonParam?
55+
/// Whether this stream is active, meaning that the subscription has been acknowledged by the sync service.
56+
public let active: Bool
57+
/// Whether this stream subscription is included by default, regardless of whether the stream has explicitly
58+
/// been subscribed to or not.
59+
///
60+
/// Default streams are created by applying `auto_subscribe: true` in their definition on the sync service.
61+
///
62+
/// It's possible for both ``SyncSubscriptionDescription/isDefault`` and
63+
/// ``SyncSubscriptionDescription/hasExplicitSubscription`` to be true at the same time. This
64+
/// happens when a default stream was subscribed to explicitly.
65+
public let isDefault: Bool
66+
/// Whether this stream has been subscribed to explicitly.
67+
///
68+
/// It's possible for both ``SyncSubscriptionDescription/isDefault`` and
69+
/// ``SyncSubscriptionDescription/hasExplicitSubscription`` to be true at the same time. This
70+
/// happens when a default stream was subscribed to explicitly.
71+
public let hasExplicitSubscription: Bool
72+
/// For sync streams that have a time-to-live, the current time at which the stream would expire if not subscribed to
73+
/// again.
74+
public let expiresAt: TimeInterval?
75+
/// If ``SyncSubscriptionDescription/hasSynced`` is true, the last time data from this stream has been synced.
76+
public let lastSyncedAt: TimeInterval?
77+
78+
/// Whether this stream has been synced at least once.
79+
public var hasSynced: Bool {
80+
get {
81+
return self.expiresAt != nil
82+
}
83+
}
84+
}

Tests/PowerSyncTests/Kotlin/KotlinPowerSyncDatabaseImplTests.swift

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -624,4 +624,20 @@ final class KotlinPowerSyncDatabaseImplTests: XCTestCase {
624624
XCTAssertEqual(result[0], JoinOutput(name: "Test User", description: "task 1", comment: "comment 1"))
625625
XCTAssertEqual(result[1], JoinOutput(name: "Test User", description: "task 2", comment: "comment 2"))
626626
}
627+
628+
func testSubscriptionsUpdateStateWhileOffline() async throws {
629+
var streams = database.currentStatus.asFlow().makeAsyncIterator()
630+
let initialStatus = await streams.next(); // Ignore initial
631+
XCTAssertEqual(initialStatus?.syncStreams?.count, 0)
632+
633+
// Subscribing while offline should add the stream to the subscriptions reported in the status.
634+
let subscription = try await database.syncStream(name: "foo", params: ["foo": JsonValue.string("bar")]).subscribe()
635+
let updatedStatus = await streams.next();
636+
637+
XCTAssertEqual(updatedStatus?.syncStreams?.count, 1)
638+
let status = updatedStatus?.forStream(stream: subscription)
639+
XCTAssertNotNil(status)
640+
641+
XCTAssertNil(status?.progress)
642+
}
627643
}

0 commit comments

Comments
 (0)