1- import type { JSONValue } from '@aws-lambda-powertools/commons' ;
1+ import type {
2+ JSONValue ,
3+ MiddyLikeRequest ,
4+ } from '@aws-lambda-powertools/commons' ;
25import type { AnyFunction , IdempotencyHandlerOptions } from './types' ;
36import { IdempotencyRecordStatus } from './types' ;
47import {
@@ -34,7 +37,7 @@ export class IdempotencyHandler<Func extends AnyFunction> {
3437 *
3538 * This is the argument that is used for the idempotency.
3639 */
37- readonly #functionPayloadToBeHashed: JSONValue ;
40+ #functionPayloadToBeHashed: JSONValue ;
3841 /**
3942 * Reference to the function to be made idempotent.
4043 */
@@ -68,9 +71,17 @@ export class IdempotencyHandler<Func extends AnyFunction> {
6871 } ) ;
6972 }
7073
74+ /**
75+ * Takes an idempotency key and returns the idempotency record from the persistence layer.
76+ *
77+ * If the idempotency record is not COMPLETE, then it will throw an error based on the status of the record.
78+ *
79+ * @param idempotencyRecord The idempotency record stored in the persistence layer
80+ * @returns The result of the function if the idempotency record is in a terminal state
81+ */
7182 public static determineResultFromIdempotencyRecord (
7283 idempotencyRecord : IdempotencyRecord
73- ) : Promise < unknown > | unknown {
84+ ) : JSONValue {
7485 if ( idempotencyRecord . getStatus ( ) === IdempotencyRecordStatus . EXPIRED ) {
7586 throw new IdempotencyInconsistentStateError (
7687 'Item has expired during processing and may not longer be valid.'
@@ -96,50 +107,55 @@ export class IdempotencyHandler<Func extends AnyFunction> {
96107 return idempotencyRecord . getResponse ( ) ;
97108 }
98109
110+ /**
111+ * Execute the handler and return the result.
112+ *
113+ * If the handler fails, the idempotency record will be deleted.
114+ * If it succeeds, the idempotency record will be updated with the result.
115+ *
116+ * @returns The result of the function execution
117+ */
99118 public async getFunctionResult ( ) : Promise < ReturnType < Func > > {
100119 let result ;
101120 try {
102121 result = await this . #functionToMakeIdempotent( ...this . #functionArguments) ;
103- } catch ( e ) {
104- try {
105- await this . #persistenceStore. deleteRecord (
106- this . #functionPayloadToBeHashed
107- ) ;
108- } catch ( e ) {
109- throw new IdempotencyPersistenceLayerError (
110- 'Failed to delete record from idempotency store' ,
111- e as Error
112- ) ;
113- }
114- throw e ;
115- }
116- try {
117- await this . #persistenceStore. saveSuccess (
118- this . #functionPayloadToBeHashed,
119- result
120- ) ;
121- } catch ( e ) {
122- throw new IdempotencyPersistenceLayerError (
123- 'Failed to update success record to idempotency store' ,
124- e as Error
125- ) ;
122+ } catch ( error ) {
123+ await this . #deleteInProgressRecord( ) ;
124+ throw error ;
126125 }
126+ await this . #saveSuccessfullResult( result ) ;
127127
128128 return result ;
129129 }
130130
131131 /**
132- * Main entry point for the handler
132+ * Entry point to handle the idempotency logic.
133+ *
134+ * Before the handler is executed, we need to check if there is already an
135+ * execution in progress for the given idempotency key. If there is, we
136+ * need to determine its status and return the appropriate response or
137+ * throw an error.
138+ *
139+ * If there is no execution in progress, we need to save a record to the
140+ * idempotency store to indicate that an execution is in progress.
133141 *
134142 * In some rare cases, when the persistent state changes in small time
135143 * window, we might get an `IdempotencyInconsistentStateError`. In such
136144 * cases we can safely retry the handling a few times.
137145 */
138146 public async handle ( ) : Promise < ReturnType < Func > > {
147+ // early return if we should skip idempotency completely
148+ if ( this . shouldSkipIdempotency ( ) ) {
149+ return await this . #functionToMakeIdempotent( ...this . #functionArguments) ;
150+ }
151+
139152 let e ;
140153 for ( let retryNo = 0 ; retryNo <= MAX_RETRIES ; retryNo ++ ) {
141154 try {
142- return await this . processIdempotency ( ) ;
155+ const result = await this . #saveInProgressOrReturnExistingResult( ) ;
156+ if ( result ) return result as ReturnType < Func > ;
157+
158+ return await this . getFunctionResult ( ) ;
143159 } catch ( error ) {
144160 if (
145161 error instanceof IdempotencyInconsistentStateError &&
@@ -156,60 +172,183 @@ export class IdempotencyHandler<Func extends AnyFunction> {
156172 throw e ;
157173 }
158174
159- public async processIdempotency ( ) : Promise < ReturnType < Func > > {
160- // early return if we should skip idempotency completely
175+ /**
176+ * Handle the idempotency operations needed after the handler has returned.
177+ *
178+ * When the handler returns successfully, we need to update the record in the
179+ * idempotency store to indicate that the execution has completed and
180+ * store its result.
181+ *
182+ * To avoid duplication of code, we expose this method so that it can be
183+ * called from the `after` phase of the Middy middleware.
184+ *
185+ * @param response The response returned by the handler.
186+ */
187+ public async handleMiddyAfter ( response : unknown ) : Promise < void > {
188+ await this . #saveSuccessfullResult( response as ReturnType < Func > ) ;
189+ }
190+
191+ /**
192+ * Handle the idempotency operations needed after the handler has returned.
193+ *
194+ * Before the handler is executed, we need to check if there is already an
195+ * execution in progress for the given idempotency key. If there is, we
196+ * need to determine its status and return the appropriate response or
197+ * throw an error.
198+ *
199+ * If there is no execution in progress, we need to save a record to the
200+ * idempotency store to indicate that an execution is in progress.
201+ *
202+ * In some rare cases, when the persistent state changes in small time
203+ * window, we might get an `IdempotencyInconsistentStateError`. In such
204+ * cases we can safely retry the handling a few times.
205+ *
206+ * @param request The request object passed to the handler.
207+ * @param callback Callback function to cleanup pending middlewares when returning early.
208+ */
209+ public async handleMiddyBefore (
210+ request : MiddyLikeRequest ,
211+ callback : ( request : MiddyLikeRequest ) => Promise < void >
212+ ) : Promise < ReturnType < Func > | void > {
213+ for ( let retryNo = 0 ; retryNo <= MAX_RETRIES ; retryNo ++ ) {
214+ try {
215+ const result = await this . #saveInProgressOrReturnExistingResult( ) ;
216+ if ( result ) {
217+ await callback ( request ) ;
218+
219+ return result as ReturnType < Func > ;
220+ }
221+ break ;
222+ } catch ( error ) {
223+ if (
224+ error instanceof IdempotencyInconsistentStateError &&
225+ retryNo < MAX_RETRIES
226+ ) {
227+ // Retry
228+ continue ;
229+ }
230+ // Retries exhausted or other error
231+ throw error ;
232+ }
233+ }
234+ }
235+
236+ /**
237+ * Handle the idempotency operations needed when an error is thrown in the handler.
238+ *
239+ * When an error is thrown in the handler, we need to delete the record from the
240+ * idempotency store.
241+ *
242+ * To avoid duplication of code, we expose this method so that it can be
243+ * called from the `onError` phase of the Middy middleware.
244+ */
245+ public async handleMiddyOnError ( ) : Promise < void > {
246+ await this . #deleteInProgressRecord( ) ;
247+ }
248+
249+ /**
250+ * Setter for the payload to be hashed to generate the idempotency key.
251+ *
252+ * This is useful if you want to use a different payload than the one
253+ * used to instantiate the `IdempotencyHandler`, for example when using
254+ * it within a Middy middleware.
255+ *
256+ * @param functionPayloadToBeHashed The payload to be hashed to generate the idempotency key
257+ */
258+ public setFunctionPayloadToBeHashed (
259+ functionPayloadToBeHashed : JSONValue
260+ ) : void {
261+ this . #functionPayloadToBeHashed = functionPayloadToBeHashed ;
262+ }
263+
264+ /**
265+ * Avoid idempotency if the eventKeyJmesPath is not present in the payload and throwOnNoIdempotencyKey is false
266+ */
267+ public shouldSkipIdempotency ( ) : boolean {
268+ if ( ! this . #idempotencyConfig. isEnabled ( ) ) return true ;
269+
161270 if (
162- IdempotencyHandler . shouldSkipIdempotency (
163- this . #idempotencyConfig. eventKeyJmesPath ,
164- this . #idempotencyConfig. throwOnNoIdempotencyKey ,
165- this . #functionPayloadToBeHashed
166- )
271+ this . #idempotencyConfig. eventKeyJmesPath !== '' &&
272+ ! this . #idempotencyConfig. throwOnNoIdempotencyKey
167273 ) {
168- return await this . #functionToMakeIdempotent( ...this . #functionArguments) ;
274+ const selection = search (
275+ this . #functionPayloadToBeHashed,
276+ this . #idempotencyConfig. eventKeyJmesPath
277+ ) ;
278+
279+ return selection === undefined || selection === null ;
280+ } else {
281+ return false ;
169282 }
283+ }
170284
285+ /**
286+ * Delete an in progress record from the idempotency store.
287+ *
288+ * This is called when the handler throws an error.
289+ */
290+ #deleteInProgressRecord = async ( ) : Promise < void > => {
171291 try {
172- await this . #persistenceStore. saveInProgress (
173- this . #functionPayloadToBeHashed,
174- this . #idempotencyConfig. lambdaContext ?. getRemainingTimeInMillis ( )
292+ await this . #persistenceStore. deleteRecord (
293+ this . #functionPayloadToBeHashed
175294 ) ;
176295 } catch ( e ) {
177- if ( e instanceof IdempotencyItemAlreadyExistsError ) {
178- const idempotencyRecord : IdempotencyRecord =
179- await this . #persistenceStore. getRecord (
180- this . #functionPayloadToBeHashed
181- ) ;
296+ throw new IdempotencyPersistenceLayerError (
297+ 'Failed to delete record from idempotency store' ,
298+ e as Error
299+ ) ;
300+ }
301+ } ;
182302
183- return IdempotencyHandler . determineResultFromIdempotencyRecord (
184- idempotencyRecord
185- ) as ReturnType < Func > ;
186- } else {
187- throw new IdempotencyPersistenceLayerError (
188- 'Failed to save in progress record to idempotency store' ,
189- e as Error
303+ /**
304+ * Save an in progress record to the idempotency store or return an existing result.
305+ *
306+ * If the record already exists, return the result from the record.
307+ */
308+ #saveInProgressOrReturnExistingResult =
309+ async ( ) : Promise < JSONValue | void > => {
310+ try {
311+ await this . #persistenceStore. saveInProgress (
312+ this . #functionPayloadToBeHashed,
313+ this . #idempotencyConfig. lambdaContext ?. getRemainingTimeInMillis ( )
190314 ) ;
191- }
192- }
315+ } catch ( e ) {
316+ if ( e instanceof IdempotencyItemAlreadyExistsError ) {
317+ const idempotencyRecord : IdempotencyRecord =
318+ await this . #persistenceStore. getRecord (
319+ this . #functionPayloadToBeHashed
320+ ) ;
193321
194- return this . getFunctionResult ( ) ;
195- }
322+ return IdempotencyHandler . determineResultFromIdempotencyRecord (
323+ idempotencyRecord
324+ ) ;
325+ } else {
326+ throw new IdempotencyPersistenceLayerError (
327+ 'Failed to save in progress record to idempotency store' ,
328+ e as Error
329+ ) ;
330+ }
331+ }
332+ } ;
196333
197334 /**
198- * avoid idempotency if the eventKeyJmesPath is not present in the payload and throwOnNoIdempotencyKey is false
199- * static so {@link makeHandlerIdempotent} middleware can use it
200- * TOOD: refactor so middy uses IdempotencyHandler internally wihtout reimplementing the logic
201- * @param eventKeyJmesPath
202- * @param throwOnNoIdempotencyKey
203- * @param fullFunctionPayload
204- * @private
205- */
206- public static shouldSkipIdempotency (
207- eventKeyJmesPath : string ,
208- throwOnNoIdempotencyKey : boolean ,
209- fullFunctionPayload : JSONValue
210- ) : boolean {
211- return ( eventKeyJmesPath &&
212- ! throwOnNoIdempotencyKey &&
213- ! search ( fullFunctionPayload , eventKeyJmesPath ) ) as boolean ;
214- }
335+ * Save a successful result to the idempotency store.
336+ *
337+ * This is called when the handler returns successfully.
338+ *
339+ * @param result The result returned by the handler.
340+ */
341+ #saveSuccessfullResult = async ( result : ReturnType < Func > ) : Promise < void > => {
342+ try {
343+ await this . #persistenceStore. saveSuccess (
344+ this . #functionPayloadToBeHashed,
345+ result
346+ ) ;
347+ } catch ( e ) {
348+ throw new IdempotencyPersistenceLayerError (
349+ 'Failed to update success record to idempotency store' ,
350+ e as Error
351+ ) ;
352+ }
353+ } ;
215354}
0 commit comments