@@ -52,43 +52,61 @@ export default class UpdateQueue extends Loggable {
5252 if ( this . pendingUpdates [ id ] && this . pendingUpdates [ id ] . length > 0 ) {
5353 this . inprogressUpdates [ id ] = true ;
5454
55- try {
56- let doc = await this . readDB . get < T > ( id ) ;
57- logger . debug ( `Retrieved doc: ${ id } ` ) ;
58- while ( this . pendingUpdates [ id ] . length !== 0 ) {
59- const update = this . pendingUpdates [ id ] . splice ( 0 , 1 ) [ 0 ] ;
60- if ( typeof update === 'function' ) {
61- doc = { ...doc , ...update ( doc ) } ;
62- } else {
63- doc = {
64- ...doc ,
65- ...update ,
66- } ;
55+ const MAX_RETRIES = 5 ;
56+ for ( let i = 0 ; i < MAX_RETRIES ; i ++ ) {
57+ try {
58+ const doc = await this . readDB . get < T > ( id ) ;
59+ logger . debug ( `Retrieved doc: ${ id } ` ) ;
60+
61+ // Create a new doc object to apply updates to for this attempt
62+ let updatedDoc = { ...doc } ;
63+
64+ // Note: This loop is not fully safe if updates are functions that depend on a specific doc state
65+ // that might change between retries. But for simple object merges, it's okay.
66+ const updatesToApply = [ ...this . pendingUpdates [ id ] ] ;
67+ for ( const update of updatesToApply ) {
68+ if ( typeof update === 'function' ) {
69+ updatedDoc = { ...updatedDoc , ...update ( updatedDoc ) } ;
70+ } else {
71+ updatedDoc = {
72+ ...updatedDoc ,
73+ ...update ,
74+ } ;
75+ }
6776 }
68- }
69- // for (const k in doc) {
70- // console.log(`${k}: ${typeof k}`);
71- // }
72- // console.log(`Applied updates to doc: ${JSON.stringify(doc)}`);
73- await this . writeDB . put < T > ( doc ) ;
74- logger . debug ( `Put doc: ${ id } ` ) ;
7577
76- if ( this . pendingUpdates [ id ] . length === 0 ) {
77- this . inprogressUpdates [ id ] = false ;
78- delete this . inprogressUpdates [ id ] ;
79- } else {
80- return this . applyUpdates < T > ( id ) ;
81- }
82- return doc ;
83- } catch ( e ) {
84- // Clean up queue state before re-throwing
85- delete this . inprogressUpdates [ id ] ;
86- if ( this . pendingUpdates [ id ] ) {
87- delete this . pendingUpdates [ id ] ;
78+ await this . writeDB . put < T > ( updatedDoc ) ;
79+ logger . debug ( `Put doc: ${ id } ` ) ;
80+
81+ // Success! Remove the updates we just applied.
82+ this . pendingUpdates [ id ] . splice ( 0 , updatesToApply . length ) ;
83+
84+ if ( this . pendingUpdates [ id ] . length === 0 ) {
85+ this . inprogressUpdates [ id ] = false ;
86+ delete this . inprogressUpdates [ id ] ;
87+ } else {
88+ // More updates came in, run again.
89+ return this . applyUpdates < T > ( id ) ;
90+ }
91+ return updatedDoc as any ; // success, exit loop and function
92+ } catch ( e : any ) {
93+ if ( e . name === 'conflict' && i < MAX_RETRIES - 1 ) {
94+ logger . warn ( `Conflict on update for doc ${ id } , retry #${ i + 1 } ` ) ;
95+ await new Promise ( ( res ) => setTimeout ( res , 50 * Math . random ( ) ) ) ;
96+ // continue to next iteration of the loop
97+ } else {
98+ // Max retries reached or a non-conflict error
99+ delete this . inprogressUpdates [ id ] ;
100+ if ( this . pendingUpdates [ id ] ) {
101+ delete this . pendingUpdates [ id ] ;
102+ }
103+ logger . error ( `Error on attemped update (retry ${ i } ): ${ JSON . stringify ( e ) } ` ) ;
104+ throw e ; // Let caller handle
105+ }
88106 }
89- logger . error ( `Error on attemped update: ${ JSON . stringify ( e ) } ` ) ;
90- throw e ; // Let caller handle (e.g., putCardRecord's 404 handling)
91107 }
108+ // This should be unreachable, but it satisfies the compiler that a value is always returned or an error thrown.
109+ throw new Error ( `UpdateQueue failed for doc ${ id } after ${ MAX_RETRIES } retries.` ) ;
92110 } else {
93111 throw new Error ( `Empty Updates Queue Triggered` ) ;
94112 }
0 commit comments