Skip to content

Commit 229c539

Browse files
committed
add retry logic for updateQ
1 parent b69adf5 commit 229c539

File tree

1 file changed

+49
-33
lines changed

1 file changed

+49
-33
lines changed

packages/db/src/impl/couch/updateQueue.ts

Lines changed: 49 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -52,42 +52,58 @@ export default class UpdateQueue extends Loggable {
5252
if (this.pendingUpdates[id] && this.pendingUpdates[id].length > 0) {
5353
this.inprogressUpdates[id] = true;
5454

55-
try {
56-
let doc = await this.readDB.get<T>(id);
57-
logger.debug(`Retrieved doc: ${id}`);
58-
while (this.pendingUpdates[id].length !== 0) {
59-
const update = this.pendingUpdates[id].splice(0, 1)[0];
60-
if (typeof update === 'function') {
61-
doc = { ...doc, ...update(doc) };
62-
} else {
63-
doc = {
64-
...doc,
65-
...update,
66-
};
55+
const MAX_RETRIES = 5;
56+
for (let i = 0; i < MAX_RETRIES; i++) {
57+
try {
58+
const doc = await this.readDB.get<T>(id);
59+
logger.debug(`Retrieved doc: ${id}`);
60+
61+
// Create a new doc object to apply updates to for this attempt
62+
let updatedDoc = { ...doc };
63+
64+
// Note: This loop is not fully safe if updates are functions that depend on a specific doc state
65+
// that might change between retries. But for simple object merges, it's okay.
66+
const updatesToApply = [...this.pendingUpdates[id]];
67+
for (const update of updatesToApply) {
68+
if (typeof update === 'function') {
69+
updatedDoc = { ...updatedDoc, ...update(updatedDoc) };
70+
} else {
71+
updatedDoc = {
72+
...updatedDoc,
73+
...update,
74+
};
75+
}
6776
}
68-
}
69-
// for (const k in doc) {
70-
// console.log(`${k}: ${typeof k}`);
71-
// }
72-
// console.log(`Applied updates to doc: ${JSON.stringify(doc)}`);
73-
await this.writeDB.put<T>(doc);
74-
logger.debug(`Put doc: ${id}`);
7577

76-
if (this.pendingUpdates[id].length === 0) {
77-
this.inprogressUpdates[id] = false;
78-
delete this.inprogressUpdates[id];
79-
} else {
80-
return this.applyUpdates<T>(id);
81-
}
82-
return doc;
83-
} catch (e) {
84-
// Clean up queue state before re-throwing
85-
delete this.inprogressUpdates[id];
86-
if (this.pendingUpdates[id]) {
87-
delete this.pendingUpdates[id];
78+
await this.writeDB.put<T>(updatedDoc);
79+
logger.debug(`Put doc: ${id}`);
80+
81+
// Success! Remove the updates we just applied.
82+
this.pendingUpdates[id].splice(0, updatesToApply.length);
83+
84+
if (this.pendingUpdates[id].length === 0) {
85+
this.inprogressUpdates[id] = false;
86+
delete this.inprogressUpdates[id];
87+
} else {
88+
// More updates came in, run again.
89+
return this.applyUpdates<T>(id);
90+
}
91+
return updatedDoc as any; // success, exit loop and function
92+
} catch (e: any) {
93+
if (e.name === 'conflict' && i < MAX_RETRIES - 1) {
94+
logger.warn(`Conflict on update for doc ${id}, retry #${i + 1}`);
95+
await new Promise((res) => setTimeout(res, 50 * Math.random()));
96+
// continue to next iteration of the loop
97+
} else {
98+
// Max retries reached or a non-conflict error
99+
delete this.inprogressUpdates[id];
100+
if (this.pendingUpdates[id]) {
101+
delete this.pendingUpdates[id];
102+
}
103+
logger.error(`Error on attemped update (retry ${i}): ${JSON.stringify(e)}`);
104+
throw e; // Let caller handle
105+
}
88106
}
89-
logger.error(`Error on attemped update: ${JSON.stringify(e)}`);
90-
throw e; // Let caller handle (e.g., putCardRecord's 404 handling)
91107
}
92108
} else {
93109
throw new Error(`Empty Updates Queue Triggered`);

0 commit comments

Comments
 (0)