@@ -3,7 +3,7 @@ import Foundation
33
44/// Class used to implement the attachment queue
55/// Requires a PowerSyncDatabase, a RemoteStorageAdapter implementation, and a directory name for attachments.
6- public actor AttachmentQueue {
6+ public class AttachmentQueue {
77 /// Default name of the attachments table
88 public static let defaultTableName = " attachments "
99
@@ -67,12 +67,14 @@ public actor AttachmentQueue {
6767 logger: self . logger,
6868 getLocalUri: { [ weak self] filename in
6969 guard let self = self else { return filename }
70- return await self . getLocalUri ( filename)
70+ return self . getLocalUri ( filename)
7171 } ,
7272 errorHandler: self . errorHandler,
7373 syncThrottle: self . syncThrottleDuration
7474 )
7575
76+ private let lock : LockActor
77+
7678 /// Initializes the attachment queue
7779 /// - Parameters match the stored properties
7880 public init (
@@ -103,73 +105,81 @@ public actor AttachmentQueue {
103105 self . subdirectories = subdirectories
104106 self . downloadAttachments = downloadAttachments
105107 self . logger = logger ?? db. logger
106-
107- attachmentsService = AttachmentService (
108+ self . attachmentsService = AttachmentService (
108109 db: db,
109110 tableName: attachmentsQueueTableName,
110111 logger: self . logger,
111112 maxArchivedCount: archivedCacheLimit
112113 )
114+ self . lock = LockActor ( )
113115 }
114116
115117 /// Starts the attachment sync process
116118 public func startSync( ) async throws {
117- try guardClosed ( )
119+ try await lock. withLock {
120+ try guardClosed ( )
118121
119- // Stop any active syncing before starting new Tasks
120- try await stopSyncing ( )
122+ // Stop any active syncing before starting new Tasks
123+ try await _stopSyncing ( )
121124
122- // Ensure the directory where attachments are downloaded exists
123- try await localStorage. makeDir ( path: attachmentsDirectory)
125+ // Ensure the directory where attachments are downloaded exists
126+ try await localStorage. makeDir ( path: attachmentsDirectory)
124127
125- if let subdirectories = subdirectories {
126- for subdirectory in subdirectories {
127- let path = URL ( fileURLWithPath: attachmentsDirectory) . appendingPathComponent ( subdirectory) . path
128- try await localStorage. makeDir ( path: path)
128+ if let subdirectories = subdirectories {
129+ for subdirectory in subdirectories {
130+ let path = URL ( fileURLWithPath: attachmentsDirectory) . appendingPathComponent ( subdirectory) . path
131+ try await localStorage. makeDir ( path: path)
132+ }
129133 }
130- }
131-
132- // Verify initial state
133- try await attachmentsService. withLock { context in
134- try await self . verifyAttachments ( context: context)
135- }
136134
137- try await syncingService. startSync ( period: syncInterval)
138-
139- syncStatusTask = Task {
140- do {
141- try await withThrowingTaskGroup ( of: Void . self) { group in
142- // Add connectivity monitoring task
143- group. addTask {
144- var previousConnected = self . db. currentStatus. connected
145- for await status in self . db. currentStatus. asFlow ( ) {
146- if !previousConnected && status. connected {
147- try await self . syncingService. triggerSync ( )
135+ // Verify initial state
136+ try await attachmentsService. withLock { context in
137+ try await self . verifyAttachments ( context: context)
138+ }
139+
140+ try await syncingService. startSync ( period: syncInterval)
141+
142+ syncStatusTask = Task {
143+ do {
144+ try await withThrowingTaskGroup ( of: Void . self) { group in
145+ // Add connectivity monitoring task
146+ group. addTask {
147+ var previousConnected = self . db. currentStatus. connected
148+ for await status in self . db. currentStatus. asFlow ( ) {
149+ if !previousConnected && status. connected {
150+ try await self . syncingService. triggerSync ( )
151+ }
152+ previousConnected = status. connected
148153 }
149- previousConnected = status. connected
150154 }
151- }
152155
153- // Add attachment watching task
154- group. addTask {
155- for try await items in try self . watchAttachments ( ) {
156- try await self . processWatchedAttachments ( items: items)
156+ // Add attachment watching task
157+ group. addTask {
158+ for try await items in try self . watchAttachments ( ) {
159+ try await self . processWatchedAttachments ( items: items)
160+ }
157161 }
158- }
159162
160- // Wait for any task to complete (which should only happen on cancellation)
161- try await group. next ( )
162- }
163- } catch {
164- if !( error is CancellationError ) {
165- logger. error ( " Error in attachment sync job: \( error. localizedDescription) " , tag: logTag)
163+ // Wait for any task to complete (which should only happen on cancellation)
164+ try await group. next ( )
165+ }
166+ } catch {
167+ if !( error is CancellationError ) {
168+ logger. error ( " Error in attachment sync job: \( error. localizedDescription) " , tag: logTag)
169+ }
166170 }
167171 }
168172 }
169173 }
170174
171175 /// Stops active syncing tasks. Syncing can be resumed with ``startSync()``
172176 public func stopSyncing( ) async throws {
177+ try await lock. withLock {
178+ try await _stopSyncing ( )
179+ }
180+ }
181+
182+ private func _stopSyncing( ) async throws {
173183 try guardClosed ( )
174184
175185 syncStatusTask? . cancel ( )
@@ -187,11 +197,13 @@ public actor AttachmentQueue {
187197
188198 /// Closes the attachment queue and cancels all sync tasks
189199 public func close( ) async throws {
190- try guardClosed ( )
200+ try await lock. withLock {
201+ try guardClosed ( )
191202
192- try await stopSyncing ( )
193- try await syncingService. close ( )
194- closed = true
203+ try await _stopSyncing ( )
204+ try await syncingService. close ( )
205+ closed = true
206+ }
195207 }
196208
197209 /// Resolves the filename for a new attachment
@@ -226,7 +238,7 @@ public actor AttachmentQueue {
226238 // This item is assumed to be coming from an upstream sync
227239 // Locally created new items should be persisted using saveFile before
228240 // this point.
229- let filename = await self . resolveNewAttachmentFilename (
241+ let filename = self . resolveNewAttachmentFilename (
230242 attachmentId: item. id,
231243 fileExtension: item. fileExtension
232244 )
@@ -385,29 +397,30 @@ public actor AttachmentQueue {
385397 try await self . localStorage. rmDir ( path: self . attachmentsDirectory)
386398 }
387399 }
388-
400+
389401 /// Verifies attachment records are present in the filesystem
390402 private func verifyAttachments( context: AttachmentContext ) async throws {
391403 let attachments = try await context. getAttachments ( )
392404 var updates : [ Attachment ] = [ ]
393-
405+
394406 for attachment in attachments {
395407 guard let localUri = attachment. localUri else {
396408 continue
397409 }
398-
410+
399411 let exists = try await localStorage. fileExists ( filePath: localUri)
400412 if attachment. state == AttachmentState . synced ||
401413 attachment. state == AttachmentState . queuedUpload &&
402- !exists {
414+ !exists
415+ {
403416 // The file must have been removed from the local storage
404417 updates. append ( attachment. with (
405418 state: . archived,
406419 localUri: . some( nil ) // Clears the value
407420 ) )
408421 }
409422 }
410-
423+
411424 try await context. saveAttachments ( attachments: updates)
412425 }
413426
0 commit comments