Skip to content

Commit ec1a733

Browse files
author
Amine
committed
feat: added archival management to AttachmentQueue via AttachmentContext
1 parent eedd224 commit ec1a733

File tree

5 files changed

+99
-23
lines changed

5 files changed

+99
-23
lines changed

packages/attachments/src/AttachmentContext.ts

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,21 @@ export class AttachmentContext {
1717
/** Logger instance for diagnostic information */
1818
logger: ILogger;
1919

20+
/** Maximum number of archived attachments to keep before cleanup */
21+
archivedCacheLimit: number = 100;
22+
2023
/**
2124
* Creates a new AttachmentContext instance.
2225
*
2326
* @param db - PowerSync database instance
2427
* @param tableName - Name of the table storing attachment records. Default: 'attachments'
2528
* @param logger - Logger instance for diagnostic output
2629
*/
27-
constructor(db: AbstractPowerSyncDatabase, tableName: string = 'attachments', logger: ILogger) {
30+
constructor(db: AbstractPowerSyncDatabase, tableName: string = 'attachments', logger: ILogger, archivedCacheLimit: number) {
2831
this.db = db;
2932
this.tableName = tableName;
3033
this.logger = logger;
34+
this.archivedCacheLimit = archivedCacheLimit;
3135
}
3236

3337
/**
@@ -191,6 +195,53 @@ export class AttachmentContext {
191195
);
192196
}
193197

198+
async clearQueue(): Promise<void> {
199+
await this.db.writeTransaction((tx) =>
200+
tx.execute(
201+
/* sql */
202+
`
203+
DELETE FROM ${this.tableName}
204+
`
205+
)
206+
);
207+
}
208+
209+
async deleteArchivedAttachments(callback?: (attachments: AttachmentRecord[]) => Promise<void>): Promise<boolean> {
210+
const limit = 1000;
211+
212+
const results = await this.db.getAll(
213+
/* sql */
214+
`
215+
SELECT * FROM ${this.tableName} WHERE state = ? ORDER BY timestamp DESC LIMIT ? OFFSET ?
216+
`,
217+
[
218+
AttachmentState.ARCHIVED,
219+
limit,
220+
this.archivedCacheLimit,
221+
],
222+
);
223+
224+
const archivedAttachments = results.map(attachmentFromSql);
225+
if (archivedAttachments.length === 0) return false;
226+
227+
this.logger.info(`Deleting ${archivedAttachments.length} archived attachments. Archived attachment exceeds cache archiveCacheLimit of ${this.archivedCacheLimit}.`);
228+
229+
await callback?.(archivedAttachments);
230+
231+
const ids = archivedAttachments.map(attachment => attachment.id);
232+
233+
await this.db.executeBatch(
234+
/* sql */
235+
`
236+
DELETE FROM ${this.tableName} WHERE id IN (?)
237+
`,
238+
[ids]
239+
);
240+
241+
this.logger.info(`Deleted ${archivedAttachments.length} archived attachments`);
242+
return archivedAttachments.length < limit;
243+
}
244+
194245
/**
195246
* Saves multiple attachment records in a single transaction.
196247
*

packages/attachments/src/AttachmentQueue.ts

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -97,18 +97,18 @@ export class AttachmentQueue {
9797
archivedCacheLimit?: number;
9898
errorHandler?: AttachmentErrorHandler;
9999
}) {
100-
this.context = new AttachmentContext(db, tableName, logger ?? db.logger);
101100
this.remoteStorage = remoteStorage;
102101
this.localStorage = localStorage;
103102
this.watchAttachments = watchAttachments;
104103
this.tableName = tableName;
105-
this.syncingService = new SyncingService(this.context, localStorage, remoteStorage, logger ?? db.logger, errorHandler);
106-
this.attachmentService = new AttachmentService(tableName, db);
107104
this.syncIntervalMs = syncIntervalMs;
108105
this.syncThrottleDuration = syncThrottleDuration;
109-
this.watchActiveAttachments = this.attachmentService.watchActiveAttachments({ throttleMs: this.syncThrottleDuration });
110-
this.downloadAttachments = downloadAttachments;
111106
this.archivedCacheLimit = archivedCacheLimit;
107+
this.downloadAttachments = downloadAttachments;
108+
this.context = new AttachmentContext(db, tableName, logger ?? db.logger, archivedCacheLimit);
109+
this.attachmentService = new AttachmentService(db, logger ?? db.logger, tableName);
110+
this.syncingService = new SyncingService(this.context, localStorage, remoteStorage, logger ?? db.logger, errorHandler);
111+
this.watchActiveAttachments = this.attachmentService.watchActiveAttachments({ throttleMs: this.syncThrottleDuration });
112112
}
113113

114114
/**
@@ -346,6 +346,18 @@ export class AttachmentQueue {
346346
});
347347
}
348348

349+
async expireCache(): Promise<void> {
350+
let isDone = false;
351+
while (!isDone) {
352+
isDone = await this.syncingService.deleteArchivedAttachments();
353+
}
354+
}
355+
356+
async clearQueue(): Promise<void> {
357+
await this.context.clearQueue();
358+
await this.localStorage.clear();
359+
}
360+
349361
/**
350362
* Verifies the integrity of all attachment records and repairs inconsistencies.
351363
*
@@ -372,15 +384,23 @@ export class AttachmentQueue {
372384
const newLocalUri = this.localStorage.getLocalUri(attachment.filename);
373385
const newExists = await this.localStorage.fileExists(newLocalUri);
374386
if (newExists) {
375-
// The file exists but the localUri is broken, lets update it.
387+
// The file exists locally but the localUri is broken, we update it.
376388
updates.push({
377389
...attachment,
378390
localUri: newLocalUri
379391
});
380392
} else {
381-
// no new exists
382-
if (attachment.state === AttachmentState.QUEUED_UPLOAD || attachment.state === AttachmentState.SYNCED) {
383-
// The file must have been removed from the local storage before upload was completed
393+
// the file doesn't exist locally.
394+
if (attachment.state === AttachmentState.SYNCED) {
395+
// the file has been successfully synced to remote storage but is missing
396+
// we download it again
397+
updates.push({
398+
...attachment,
399+
state: AttachmentState.QUEUED_DOWNLOAD,
400+
localUri: undefined
401+
});
402+
} else {
403+
// the file wasn't successfully synced to remote storage, we archive it
384404
updates.push({
385405
...attachment,
386406
state: AttachmentState.ARCHIVED,

packages/attachments/src/AttachmentService.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,23 @@
1-
import { AbstractPowerSyncDatabase, DifferentialWatchedQuery } from '@powersync/common';
1+
import { AbstractPowerSyncDatabase, DifferentialWatchedQuery, ILogger } from '@powersync/common';
22
import { AttachmentRecord, AttachmentState } from './Schema.js';
3+
import { AttachmentContext } from './AttachmentContext.js';
34

45
/**
56
* Service for querying and watching attachment records in the database.
67
*/
78
export class AttachmentService {
89
constructor(
10+
private db: AbstractPowerSyncDatabase,
11+
private logger: ILogger,
912
private tableName: string = 'attachments',
10-
private db: AbstractPowerSyncDatabase
1113
) {}
1214

1315
/**
1416
* Creates a differential watch query for active attachments requiring synchronization.
1517
* @returns Watch query that emits changes for queued uploads, downloads, and deletes
1618
*/
1719
watchActiveAttachments({ throttleMs }: { throttleMs?: number } = {}): DifferentialWatchedQuery<AttachmentRecord> {
20+
this.logger.info('Watching active attachments...');
1821
const watch = this.db
1922
.query<AttachmentRecord>({
2023
sql: /* sql */ `

packages/attachments/src/SyncingService.ts

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -170,17 +170,18 @@ export class SyncingService {
170170
* Performs cleanup of archived attachments by removing their local files and records.
171171
* Errors during local file deletion are logged but do not prevent record deletion.
172172
*/
173-
async deleteArchivedAttachments(): Promise<void> {
174-
const archivedAttachments = await this.context.getArchivedAttachments();
175-
for (const attachment of archivedAttachments) {
176-
if (attachment.localUri) {
177-
try {
178-
await this.localStorage.deleteFile(attachment.localUri);
179-
} catch (error) {
180-
this.logger.error('Error deleting local file for archived attachment', error);
173+
async deleteArchivedAttachments(): Promise<boolean> {
174+
return await this.context.deleteArchivedAttachments(async (archivedAttachments) => {
175+
for (const attachment of archivedAttachments) {
176+
if (attachment.localUri) {
177+
try {
178+
await this.localStorage.deleteFile(attachment.localUri);
179+
} catch (error) {
180+
this.logger.error('Error deleting local file for archived attachment', error);
181+
}
181182
}
183+
await this.context.deleteAttachment(attachment.id);
182184
}
183-
await this.context.deleteAttachment(attachment.id);
184-
}
185+
});
185186
}
186187
}

packages/attachments/tests/attachments.test.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ beforeEach(() => {
9999
remoteStorage: mockRemoteStorage,
100100
localStorage: mockLocalStorage,
101101
syncIntervalMs: INTERVAL_MILLISECONDS,
102+
archivedCacheLimit: 0,
102103
});
103104
})
104105

@@ -280,7 +281,7 @@ describe('attachment queue', () => {
280281
5
281282
);
282283

283-
// await queue.syncStorage(); // <-- explicitly delete
284+
// await new Promise(resolve => setTimeout(resolve, 1500));
284285

285286
// File should be deleted too
286287
expect(await mockLocalStorage.fileExists(record.localUri!)).toBe(false);

0 commit comments

Comments
 (0)