-
Notifications
You must be signed in to change notification settings - Fork 93
feat: create LocalLockStrategy using LRU and async-mutex
#4610
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
046823c
9f64cd0
11e6bf5
7fc9246
e534446
c04db62
5345dc9
9e2d66e
d91d411
88a3795
0bcf7a7
16031ce
5d0eee4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,168 @@ | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| import { Mutex } from 'async-mutex'; | ||
| import { randomUUID } from 'crypto'; | ||
| import { LRUCache } from 'lru-cache'; | ||
| import { Logger } from 'pino'; | ||
|
|
||
| /** | ||
| * Represents the internal state for a lock associated with a given address. | ||
| */ | ||
| interface LockState { | ||
| mutex: Mutex; | ||
| sessionKey: string | null; | ||
| acquiredAt: number | null; | ||
| maxLockTime: NodeJS.Timeout | null; | ||
quiet-node marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| /** | ||
| * Implements a local, in-memory locking strategy. | ||
| * | ||
| * Each unique "address" gets its own mutex to ensure only one session can hold | ||
| * the lock at a time. Locks are auto-expiring and stored in an LRU cache. | ||
| */ | ||
| export class LocalLockStrategy { | ||
| /** | ||
| * Maximum number of lock entries stored in memory. | ||
| * Prevents unbounded memory growth. | ||
| */ | ||
| public static LOCAL_LOCK_MAX_ENTRIES: number = 1_000; // Max 1000 addresses | ||
|
|
||
| /** | ||
| * Time-to-live for each lock entry in the cache (in milliseconds). | ||
| */ | ||
| public static LOCAL_LOCK_TTL: number = 300_000; // 5 minutes | ||
natanasow marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| /** | ||
| * Seconds for auto-release if lock not manually released | ||
| */ | ||
| public static LOCAL_LOCK_MAX_LOCK_TIME: number = 30_000; // 30 secs | ||
|
|
||
| /** | ||
| * LRU cache of lock states, keyed by address. | ||
| */ | ||
| private localLockStates = new LRUCache<string, LockState>({ | ||
| max: LocalLockStrategy.LOCAL_LOCK_MAX_ENTRIES, | ||
| ttl: LocalLockStrategy.LOCAL_LOCK_TTL, | ||
| }); | ||
|
|
||
| /** | ||
| * Logger. | ||
| * | ||
| * @private | ||
| */ | ||
| private readonly logger: Logger; | ||
|
|
||
| /** | ||
| * Creates a new LocalLockStrategy instance. | ||
| * | ||
| * @param logger - The logger | ||
| */ | ||
| constructor(logger: Logger) { | ||
| this.logger = logger; | ||
| } | ||
|
|
||
| /** | ||
| * Acquire a lock for a specific address. | ||
| * Waits until the lock is available (blocking if another session holds it). | ||
| * | ||
| * @param address - The key representing the resource to lock | ||
| * @returns A session key identifying the current lock owner | ||
| */ | ||
| async acquireLock(address: string): Promise<string> { | ||
| const sessionKey = randomUUID(); | ||
| const state = this.getOrCreateState(address); | ||
|
|
||
| // Acquire the mutex (this will block until available) | ||
| await state.mutex.acquire(); | ||
|
|
||
| // Record lock ownership metadata | ||
| state.sessionKey = sessionKey; | ||
| state.acquiredAt = Date.now(); | ||
|
|
||
| // Start a 30-second timer to auto-release if lock not manually released | ||
| state.maxLockTime = setTimeout(() => { | ||
| this.forceReleaseExpiredLock(address, sessionKey); | ||
| }, LocalLockStrategy.LOCAL_LOCK_MAX_LOCK_TIME); | ||
|
|
||
| return sessionKey; | ||
| } | ||
|
|
||
| /** | ||
| * Release a previously acquired lock, if the session key matches the current owner. | ||
| * | ||
| * @param address - The locked resource key | ||
| * @param sessionKey - The session key of the lock holder | ||
| */ | ||
| async releaseLock(address: string, sessionKey: string): Promise<void> { | ||
| const state = this.localLockStates.get(address); | ||
|
|
||
| // Ensure only the lock owner can release | ||
| if (state?.sessionKey !== sessionKey) { | ||
| return; // Not the owner — safely ignore | ||
| } | ||
|
|
||
| // Perform cleanup and release | ||
| await this.doRelease(state); | ||
| } | ||
quiet-node marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| /** | ||
| * Retrieve an existing lock state for the given address, or create a new one if it doesn't exist. | ||
| * | ||
| * @param address - Unique identifier for the lock | ||
| * @returns The LockState object associated with the address | ||
| */ | ||
| private getOrCreateState(address: string): LockState { | ||
| if (!this.localLockStates.has(address)) { | ||
quiet-node marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| this.localLockStates.set(address, { | ||
| mutex: new Mutex(), | ||
| sessionKey: null, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not pass the session key here when creating the state?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. They are defined here, but yeah, the
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we pass the sessionKey here and only set it when address doesn’t exist yet, then how would we assign the sessionKey when the address already exists in localLockStates? Keep in mind we never delete entries from localLockStates we only reset their fields. So to set the sessionKey for an existing address, we’d have to assign it again anyway. That feels unnecessary, and I think assigning the sessionKey after mutex.acquire() as we currently do makes more sense. |
||
| acquiredAt: null, | ||
| maxLockTime: null, | ||
| }); | ||
| } | ||
|
|
||
| return this.localLockStates.get(address)!; | ||
| } | ||
|
|
||
| /** | ||
| * Internal helper to perform cleanup and release the mutex. | ||
| * | ||
| * @param state - The LockState instance to reset and release | ||
| */ | ||
| private async doRelease(state: LockState): Promise<void> { | ||
| // Clear timeout first | ||
| clearTimeout(state.maxLockTime!); | ||
|
|
||
| // Reset state | ||
| state.sessionKey = null; | ||
| state.maxLockTime = null; | ||
| state.acquiredAt = null; | ||
|
|
||
| // Release the mutex lock | ||
| state.mutex.release(); | ||
| } | ||
|
|
||
| /** | ||
| * Forcefully release a lock that has exceeded its maximum execution time. | ||
| * Used by the timeout set during `acquireLock`. | ||
| * | ||
| * @param address - The resource key associated with the lock | ||
| * @param sessionKey - The session key to verify ownership before releasing | ||
| */ | ||
| private async forceReleaseExpiredLock(address: string, sessionKey: string): Promise<void> { | ||
| const state = this.localLockStates.get(address); | ||
|
|
||
| // Ensure the session still owns the lock before force-releasing | ||
| if (!state || state.sessionKey !== sessionKey) { | ||
| return; // Already released or lock reassigned | ||
| } | ||
|
|
||
| if (this.logger.isLevelEnabled('debug')) { | ||
| const holdTime = Date.now() - state.acquiredAt!; | ||
| this.logger.debug(`Force releasing expired local lock for address ${address} held for ${holdTime}ms.`); | ||
| } | ||
|
|
||
| await this.doRelease(state); | ||
| } | ||
quiet-node marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.