Skip to content

Commit b364581

Browse files
Improve compacting & checksum pre-calculations (#396)
* Reduce batch size for bucket checksum pre-calculations. * Avoid repeatedly re-compacting the same buckets. * Fix projection. * Fix compact tests. * Changeset. * Fix another test. * Update packages/service-core/src/storage/SyncRulesBucketStorage.ts Co-authored-by: stevensJourney <51082125+stevensJourney@users.noreply.github.com> --------- Co-authored-by: stevensJourney <51082125+stevensJourney@users.noreply.github.com>
1 parent 0ace0d3 commit b364581

File tree

8 files changed

+81
-19
lines changed

8 files changed

+81
-19
lines changed

.changeset/mean-zoos-retire.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
'@powersync/service-module-mongodb-storage': patch
3+
'@powersync/service-core-tests': patch
4+
'@powersync/service-core': patch
5+
---
6+
7+
Avoid re-compacting recently compacted buckets.

.changeset/shaggy-queens-cheat.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
'@powersync/service-module-mongodb-storage': patch
3+
'@powersync/service-core-tests': patch
4+
'@powersync/service-core': patch
5+
---
6+
7+
Reduce batch size for checksum pre-calculations to reduce timeouts.

modules/module-mongodb-storage/src/storage/implementation/MongoChecksums.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ export class MongoChecksums {
169169
// Limit the number of buckets we query for at a time.
170170
const bucketBatchLimit = this.options?.bucketBatchLimit ?? DEFAULT_BUCKET_BATCH_LIMIT;
171171

172-
if (batch.length < bucketBatchLimit) {
172+
if (batch.length <= bucketBatchLimit) {
173173
// Single batch - no need for splitting the batch and merging results
174174
return await this.computePartialChecksumsInternal(batch);
175175
}

modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts

Lines changed: 45 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ export interface MongoCompactOptions extends storage.CompactOptions {}
6161
const DEFAULT_CLEAR_BATCH_LIMIT = 5000;
6262
const DEFAULT_MOVE_BATCH_LIMIT = 2000;
6363
const DEFAULT_MOVE_BATCH_QUERY_LIMIT = 10_000;
64+
const DEFAULT_MIN_BUCKET_CHANGES = 10;
6465

6566
/** This default is primarily for tests. */
6667
const DEFAULT_MEMORY_LIMIT_MB = 64;
@@ -73,6 +74,7 @@ export class MongoCompactor {
7374
private moveBatchLimit: number;
7475
private moveBatchQueryLimit: number;
7576
private clearBatchLimit: number;
77+
private minBucketChanges: number;
7678
private maxOpId: bigint;
7779
private buckets: string[] | undefined;
7880
private signal?: AbortSignal;
@@ -88,6 +90,7 @@ export class MongoCompactor {
8890
this.moveBatchLimit = options?.moveBatchLimit ?? DEFAULT_MOVE_BATCH_LIMIT;
8991
this.moveBatchQueryLimit = options?.moveBatchQueryLimit ?? DEFAULT_MOVE_BATCH_QUERY_LIMIT;
9092
this.clearBatchLimit = options?.clearBatchLimit ?? DEFAULT_CLEAR_BATCH_LIMIT;
93+
this.minBucketChanges = options?.minBucketChanges ?? DEFAULT_MIN_BUCKET_CHANGES;
9194
this.maxOpId = options?.maxOpId ?? 0n;
9295
this.buckets = options?.compactBuckets;
9396
this.signal = options?.signal;
@@ -113,14 +116,26 @@ export class MongoCompactor {
113116

114117
private async compactDirtyBuckets() {
115118
while (!this.signal?.aborted) {
116-
// Process all buckets with 1 or more changes since last time
117-
const buckets = await this.dirtyBucketBatch({ minBucketChanges: 1 });
119+
// Process all buckets with 10 or more changes since last time.
120+
// We exclude the last 100 compacted buckets, to avoid repeatedly re-compacting the same buckets over and over
121+
// if they are modified while compacting.
122+
const TRACK_RECENTLY_COMPACTED_NUMBER = 100;
123+
124+
let recentlyCompacted: string[] = [];
125+
const buckets = await this.dirtyBucketBatch({
126+
minBucketChanges: this.minBucketChanges,
127+
exclude: recentlyCompacted
128+
});
118129
if (buckets.length == 0) {
119130
// All done
120131
break;
121132
}
122-
for (let bucket of buckets) {
133+
for (let { bucket } of buckets) {
123134
await this.compactSingleBucket(bucket);
135+
recentlyCompacted.push(bucket);
136+
}
137+
if (recentlyCompacted.length > TRACK_RECENTLY_COMPACTED_NUMBER) {
138+
recentlyCompacted = recentlyCompacted.slice(-TRACK_RECENTLY_COMPACTED_NUMBER);
124139
}
125140
}
126141
}
@@ -482,10 +497,20 @@ export class MongoCompactor {
482497
break;
483498
}
484499
const start = Date.now();
485-
logger.info(`Calculating checksums for batch of ${buckets.length} buckets, starting at ${buckets[0]}`);
500+
logger.info(`Calculating checksums for batch of ${buckets.length} buckets`);
486501

487-
await this.updateChecksumsBatch(buckets);
488-
logger.info(`Updated checksums for batch of ${buckets.length} buckets in ${Date.now() - start}ms`);
502+
// Filter batch by estimated bucket size, to reduce possibility of timeouts
503+
let checkBuckets: typeof buckets = [];
504+
let totalCountEstimate = 0;
505+
for (let bucket of buckets) {
506+
checkBuckets.push(bucket);
507+
totalCountEstimate += bucket.estimatedCount;
508+
if (totalCountEstimate > 50_000) {
509+
break;
510+
}
511+
}
512+
await this.updateChecksumsBatch(checkBuckets.map((b) => b.bucket));
513+
logger.info(`Updated checksums for batch of ${checkBuckets.length} buckets in ${Date.now() - start}ms`);
489514
count += buckets.length;
490515
}
491516
return { buckets: count };
@@ -497,7 +522,10 @@ export class MongoCompactor {
497522
* This cannot be used to iterate on its own - the client is expected to process these buckets and
498523
* set estimate_since_compact.count: 0 when done, before fetching the next batch.
499524
*/
500-
private async dirtyBucketBatch(options: { minBucketChanges: number }): Promise<string[]> {
525+
private async dirtyBucketBatch(options: {
526+
minBucketChanges: number;
527+
exclude?: string[];
528+
}): Promise<{ bucket: string; estimatedCount: number }[]> {
501529
if (options.minBucketChanges <= 0) {
502530
throw new ReplicationAssertionError('minBucketChanges must be >= 1');
503531
}
@@ -506,22 +534,28 @@ export class MongoCompactor {
506534
.find(
507535
{
508536
'_id.g': this.group_id,
509-
'estimate_since_compact.count': { $gte: options.minBucketChanges }
537+
'estimate_since_compact.count': { $gte: options.minBucketChanges },
538+
'_id.b': { $nin: options.exclude ?? [] }
510539
},
511540
{
512541
projection: {
513-
_id: 1
542+
_id: 1,
543+
estimate_since_compact: 1,
544+
compacted_state: 1
514545
},
515546
sort: {
516547
'estimate_since_compact.count': -1
517548
},
518-
limit: 5_000,
549+
limit: 200,
519550
maxTimeMS: MONGO_OPERATION_TIMEOUT_MS
520551
}
521552
)
522553
.toArray();
523554

524-
return dirtyBuckets.map((bucket) => bucket._id.b);
555+
return dirtyBuckets.map((bucket) => ({
556+
bucket: bucket._id.b,
557+
estimatedCount: bucket.estimate_since_compact!.count + (bucket.compacted_state?.count ?? 0)
558+
}));
525559
}
526560

527561
private async updateChecksumsBatch(buckets: string[]) {

modules/module-mongodb-storage/test/src/storage_compacting.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ bucket_definitions:
6262
clearBatchLimit: 200,
6363
moveBatchLimit: 10,
6464
moveBatchQueryLimit: 10,
65+
minBucketChanges: 1,
6566
maxOpId: checkpoint,
6667
signal: null as any
6768
});

packages/service-core-tests/src/tests/register-compacting-tests.ts

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,8 @@ bucket_definitions:
8080
await bucketStorage.compact({
8181
clearBatchLimit: 2,
8282
moveBatchLimit: 1,
83-
moveBatchQueryLimit: 1
83+
moveBatchQueryLimit: 1,
84+
minBucketChanges: 1
8485
});
8586

8687
const batchAfter = await test_utils.oneFromAsync(
@@ -207,7 +208,8 @@ bucket_definitions:
207208
await bucketStorage.compact({
208209
clearBatchLimit: 2,
209210
moveBatchLimit: 1,
210-
moveBatchQueryLimit: 1
211+
moveBatchQueryLimit: 1,
212+
minBucketChanges: 1
211213
});
212214

213215
const batchAfter = await test_utils.oneFromAsync(
@@ -300,7 +302,8 @@ bucket_definitions:
300302
await bucketStorage.compact({
301303
clearBatchLimit: 2,
302304
moveBatchLimit: 1,
303-
moveBatchQueryLimit: 1
305+
moveBatchQueryLimit: 1,
306+
minBucketChanges: 1
304307
});
305308

306309
const batchAfter = await test_utils.oneFromAsync(
@@ -412,7 +415,8 @@ bucket_definitions:
412415
await bucketStorage.compact({
413416
clearBatchLimit: 100,
414417
moveBatchLimit: 100,
415-
moveBatchQueryLimit: 100 // Larger limit for a larger window of operations
418+
moveBatchQueryLimit: 100, // Larger limit for a larger window of operations
419+
minBucketChanges: 1
416420
});
417421

418422
const batchAfter = await test_utils.fromAsync(
@@ -498,7 +502,8 @@ bucket_definitions:
498502
await bucketStorage.compact({
499503
clearBatchLimit: 2,
500504
moveBatchLimit: 1,
501-
moveBatchQueryLimit: 1
505+
moveBatchQueryLimit: 1,
506+
minBucketChanges: 1
502507
});
503508

504509
const result2 = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => {
@@ -572,7 +577,8 @@ bucket_definitions:
572577
await bucketStorage.compact({
573578
clearBatchLimit: 20,
574579
moveBatchLimit: 10,
575-
moveBatchQueryLimit: 10
580+
moveBatchQueryLimit: 10,
581+
minBucketChanges: 1
576582
});
577583

578584
const checkpoint2 = result2!.flushed_op;

packages/service-core-tests/src/tests/register-sync-tests.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1141,7 +1141,9 @@ bucket_definitions:
11411141
await batch.commit('0/2');
11421142
});
11431143

1144-
await bucketStorage.compact();
1144+
await bucketStorage.compact({
1145+
minBucketChanges: 1
1146+
});
11451147

11461148
const lines2 = await getCheckpointLines(iter, { consume: true });
11471149

packages/service-core/src/storage/SyncRulesBucketStorage.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,11 @@ export interface CompactOptions {
217217
/** Minimum of 1 */
218218
moveBatchQueryLimit?: number;
219219

220+
/**
221+
* Minimum of 1, default of 10.
222+
*/
223+
minBucketChanges?: number;
224+
220225
/**
221226
* Internal/testing use: Cache size for compacting parameters.
222227
*/

0 commit comments

Comments
 (0)