11import Combine
22import Foundation
3- import OSLog
4-
5- /// A watched attachment record item.
6- /// This is usually returned from watching all relevant attachment IDs.
7- public struct WatchedAttachmentItem {
8- /// Id for the attachment record
9- public let id : String
10-
11- /// File extension used to determine an internal filename for storage if no `filename` is provided
12- public let fileExtension : String ?
13-
14- /// Filename to store the attachment with
15- public let filename : String ?
16-
17- /// Metadata for the attachment (optional)
18- public let metaData : String ?
19-
20- /// Initializes a new `WatchedAttachmentItem`
21- /// - Parameters:
22- /// - id: Attachment record ID
23- /// - fileExtension: Optional file extension
24- /// - filename: Optional filename
25- /// - metaData: Optional metadata
26- public init (
27- id: String ,
28- fileExtension: String ? = nil ,
29- filename: String ? = nil ,
30- metaData: String ? = nil
31- ) {
32- self . id = id
33- self . fileExtension = fileExtension
34- self . filename = filename
35- self . metaData = metaData
36-
37- precondition ( fileExtension != nil || filename != nil , " Either fileExtension or filename must be provided. " )
38- }
39- }
403
414/// Class used to implement the attachment queue
425/// Requires a PowerSyncDatabase, a RemoteStorageAdapter implementation, and a directory name for attachments.
@@ -151,9 +114,10 @@ public actor AttachmentQueue {
151114
152115 /// Starts the attachment sync process
153116 public func startSync( ) async throws {
154- if closed {
155- throw PowerSyncAttachmentError . closed ( " Cannot start syncing on closed attachment queue " )
156- }
117+ try guardClosed ( )
118+
119+ // Stop any active syncing before starting new Tasks
120+ try await stopSyncing ( )
157121
158122 // Ensure the directory where attachments are downloaded exists
159123 try await localStorage. makeDir ( path: attachmentsDirectory)
@@ -165,48 +129,63 @@ public actor AttachmentQueue {
165129 }
166130 }
167131
168- await syncingService. startPeriodicSync ( period: syncInterval)
132+ try await syncingService. startSync ( period: syncInterval)
169133
170134 syncStatusTask = Task {
171135 do {
172- // Create a task for watching connectivity changes
173- let connectivityTask = Task {
174- var previousConnected = db. currentStatus. connected
175-
176- for await status in db. currentStatus. asFlow ( ) {
177- if !previousConnected && status. connected {
178- await syncingService. triggerSync ( )
136+ try await withThrowingTaskGroup ( of: Void . self) { group in
137+ // Add connectivity monitoring task
138+ group. addTask {
139+ var previousConnected = self . db. currentStatus. connected
140+ for await status in self . db. currentStatus. asFlow ( ) {
141+ if !previousConnected && status. connected {
142+ try await self . syncingService. triggerSync ( )
143+ }
144+ previousConnected = status. connected
179145 }
180- previousConnected = status. connected
181146 }
182- }
183147
184- // Create a task for watching attachment changes
185- let watchTask = Task {
186- for try await items in self . watchedAttachments {
187- try await self . processWatchedAttachments ( items: items)
148+ // Add attachment watching task
149+ group. addTask {
150+ for try await items in self . watchedAttachments {
151+ try await self . processWatchedAttachments ( items: items)
152+ }
188153 }
189- }
190154
191- // Wait for both tasks to complete (they shouldn't unless cancelled )
192- await connectivityTask . value
193- try await watchTask . value
155+ // Wait for any task to complete (which should only happen on cancellation )
156+ try await group . next ( )
157+ }
194158 } catch {
195159 if !( error is CancellationError ) {
196- logger. error ( " Error in sync job: \( error. localizedDescription) " , tag: logTag)
160+ logger. error ( " Error in attachment sync job: \( error. localizedDescription) " , tag: logTag)
197161 }
198162 }
199163 }
200164 }
201165
166+ /// Stops active syncing tasks. Syncing can be resumed with ``startSync()``
167+ public func stopSyncing( ) async throws {
168+ try guardClosed ( )
169+
170+ syncStatusTask? . cancel ( )
171+ // Wait for the task to actually complete
172+ do {
173+ _ = try await syncStatusTask? . value
174+ } catch {
175+ // Task completed with error (likely cancellation)
176+ // This is okay
177+ }
178+ syncStatusTask = nil
179+
180+ try await syncingService. stopSync ( )
181+ }
182+
202183 /// Closes the attachment queue and cancels all sync tasks
203184 public func close( ) async throws {
204- if closed {
205- return
206- }
185+ try guardClosed ( )
207186
208- syncStatusTask ? . cancel ( )
209- await syncingService. close ( )
187+ try await stopSyncing ( )
188+ try await syncingService. close ( )
210189 closed = true
211190 }
212191
@@ -219,7 +198,7 @@ public actor AttachmentQueue {
219198 attachmentId: String ,
220199 fileExtension: String ?
221200 ) -> String {
222- return " \( attachmentId) . \( fileExtension ?? " " ) "
201+ return " \( attachmentId) . \( fileExtension ?? " attachment " ) "
223202 }
224203
225204 /// Processes watched attachment items and updates sync state
@@ -230,10 +209,10 @@ public actor AttachmentQueue {
230209 try await attachmentsService. withLock { context in
231210 let currentAttachments = try await context. getAttachments ( )
232211 var attachmentUpdates = [ Attachment] ( )
233-
212+
234213 for item in items {
235214 let existingQueueItem = currentAttachments. first { $0. id == item. id }
236-
215+
237216 if existingQueueItem == nil {
238217 if !self . downloadAttachments {
239218 continue
@@ -246,7 +225,7 @@ public actor AttachmentQueue {
246225 attachmentId: item. id,
247226 fileExtension: item. fileExtension
248227 )
249-
228+
250229 attachmentUpdates. append (
251230 Attachment (
252231 id: item. id,
@@ -267,29 +246,29 @@ public actor AttachmentQueue {
267246 // and has been synced. If it's missing and hasSynced is false then
268247 // it must be an upload operation
269248 let newState = existingQueueItem!. localUri == nil ?
270- AttachmentState . queuedDownload :
271- AttachmentState . queuedUpload
272-
249+ AttachmentState . queuedDownload :
250+ AttachmentState . queuedUpload
251+
273252 attachmentUpdates. append (
274253 existingQueueItem!. with ( state: newState)
275254 )
276255 }
277256 }
278257 }
279-
280-
258+
281259 /**
282260 * Archive any items not specified in the watched items except for items pending delete.
283261 */
284262 for attachment in currentAttachments {
285- if attachment. state != AttachmentState . queuedDelete &&
286- items. first ( where: { $0. id == attachment. id } ) == nil {
263+ if attachment. state != AttachmentState . queuedDelete,
264+ items. first ( where: { $0. id == attachment. id } ) == nil
265+ {
287266 attachmentUpdates. append (
288267 attachment. with ( state: AttachmentState . archived)
289268 )
290269 }
291270 }
292-
271+
293272 if !attachmentUpdates. isEmpty {
294273 try await context. saveAttachments ( attachments: attachmentUpdates)
295274 }
@@ -319,11 +298,11 @@ public actor AttachmentQueue {
319298
320299 // Write the file to the filesystem
321300 let fileSize = try await localStorage. saveFile ( filePath: localUri, data: data)
322-
301+
323302 return try await attachmentsService. withLock { context in
324303 // Start a write transaction. The attachment record and relevant local relationship
325304 // assignment should happen in the same transaction.
326- return try await self . db. writeTransaction { tx in
305+ try await self . db. writeTransaction { tx in
327306 let attachment = Attachment (
328307 id: id,
329308 filename: filename,
@@ -385,7 +364,7 @@ public actor AttachmentQueue {
385364
386365 /// Removes all archived items
387366 public func expireCache( ) async throws {
388- try await attachmentsService. withLock { context in
367+ try await attachmentsService. withLock { context in
389368 var done = false
390369 repeat {
391370 done = try await self . syncingService. deleteArchivedAttachments ( context)
@@ -401,4 +380,10 @@ public actor AttachmentQueue {
401380 try await self . localStorage. rmDir ( path: self . attachmentsDirectory)
402381 }
403382 }
383+
384+ private func guardClosed( ) throws {
385+ if closed {
386+ throw PowerSyncAttachmentError . closed ( " Attachment queue is closed " )
387+ }
388+ }
404389}
0 commit comments