Skip to content
162 changes: 162 additions & 0 deletions packages/relay/src/lib/services/lockService/LocalLockStrategy.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
// SPDX-License-Identifier: Apache-2.0

import { Mutex } from 'async-mutex';
import { randomUUID } from 'crypto';
import { LRUCache } from 'lru-cache';
import { Logger } from 'pino';

/**
* Represents the internal state for a lock associated with a given address.
*/
interface LockState {
mutex: Mutex;
sessionKey: string | null;
acquiredAt: number | null;
maxLockTime: NodeJS.Timeout | null;
}

/**
* Implements a local, in-memory locking strategy.
*
* Each unique "address" gets its own mutex to ensure only one session can hold
* the lock at a time. Locks are auto-expiring and stored in an LRU cache.
*/
export class LocalLockStrategy {
/**
* Maximum number of lock entries stored in memory.
* Prevents unbounded memory growth.
*/
public static LOCAL_LOCK_MAX_ENTRIES: number = 1_000; // Max 1000 addresses

/**
* Seconds for auto-release if lock not manually released
*/
public static LOCAL_LOCK_MAX_LOCK_TIME: number = 30_000; // 30 secs

/**
* LRU cache of lock states, keyed by address.
*/
private localLockStates = new LRUCache<string, LockState>({
max: LocalLockStrategy.LOCAL_LOCK_MAX_ENTRIES,
});

/**
* Logger.
*
* @private
*/
private readonly logger: Logger;

/**
* Creates a new LocalLockStrategy instance.
*
* @param logger - The logger
*/
constructor(logger: Logger) {
this.logger = logger;
}

/**
* Acquire a lock for a specific address.
* Waits until the lock is available (blocking if another session holds it).
*
* @param address - The key representing the resource to lock
* @returns A session key identifying the current lock owner
*/
async acquireLock(address: string): Promise<string> {
const sessionKey = randomUUID();
const state = this.getOrCreateState(address);

// Acquire the mutex (this will block until available)
await state.mutex.acquire();

// Record lock ownership metadata
state.sessionKey = sessionKey;
state.acquiredAt = Date.now();

// Start a 30-second timer to auto-release if lock not manually released
state.maxLockTime = setTimeout(() => {
this.forceReleaseExpiredLock(address, sessionKey);
}, LocalLockStrategy.LOCAL_LOCK_MAX_LOCK_TIME);

return sessionKey;
}

/**
* Release a previously acquired lock, if the session key matches the current owner.
*
* @param address - The locked resource key
* @param sessionKey - The session key of the lock holder
*/
async releaseLock(address: string, sessionKey: string): Promise<void> {
const state = this.localLockStates.get(address);

// Ensure only the lock owner can release
if (state?.sessionKey !== sessionKey) {
return; // Not the owner — safely ignore
}

// Perform cleanup and release
await this.doRelease(state);
}

/**
* Retrieve an existing lock state for the given address, or create a new one if it doesn't exist.
*
* @param address - Unique identifier for the lock
* @returns The LockState object associated with the address
*/
private getOrCreateState(address: string): LockState {
if (!this.localLockStates.has(address)) {
this.localLockStates.set(address, {
mutex: new Mutex(),
sessionKey: null,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not pass the session key here when creating the state?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are defined here, but yeah, the sessionKey might be passed as a parameter. Will edit it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we pass the sessionKey here and only set it when address doesn’t exist yet, then how would we assign the sessionKey when the address already exists in localLockStates? Keep in mind we never delete entries from localLockStates we only reset their fields.

So to set the sessionKey for an existing address, we’d have to assign it again anyway. That feels unnecessary, and I think assigning the sessionKey after mutex.acquire() as we currently do makes more sense.

acquiredAt: null,
maxLockTime: null,
});
}

return this.localLockStates.get(address)!;
}

/**
* Internal helper to perform cleanup and release the mutex.
*
* @param state - The LockState instance to reset and release
*/
private async doRelease(state: LockState): Promise<void> {
// Clear timeout first
clearTimeout(state.maxLockTime!);

// Reset state
state.sessionKey = null;
state.maxLockTime = null;
state.acquiredAt = null;

// Release the mutex lock
state.mutex.release();
}

/**
* Forcefully release a lock that has exceeded its maximum execution time.
* Used by the timeout set during `acquireLock`.
*
* @param address - The resource key associated with the lock
* @param sessionKey - The session key to verify ownership before releasing
*/
private async forceReleaseExpiredLock(address: string, sessionKey: string): Promise<void> {
const state = this.localLockStates.get(address);

// Ensure the session still owns the lock before force-releasing
if (!state || state.sessionKey !== sessionKey) {
return; // Already released or lock reassigned
}

if (this.logger.isLevelEnabled('debug')) {
const holdTime = Date.now() - state.acquiredAt!;
this.logger.debug(`Force releasing expired local lock for address ${address} held for ${holdTime}ms.`);
}

await this.doRelease(state);
}
}
23 changes: 21 additions & 2 deletions packages/relay/src/lib/services/lockService/LockService.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// SPDX-License-Identifier: Apache-2.0

import { LockStrategy } from '../../types/lock';
import { Logger } from 'pino';

import { LockStrategy } from '../../types';

/**
* Service that manages transaction ordering through distributed locking.
Expand All @@ -12,13 +14,22 @@ export class LockService {
*/
private readonly strategy: LockStrategy;

/**
* Logger
*
* @private
*/
private readonly logger: Logger;

/**
* Creates a new LockService instance.
*
* @param strategy - The lock strategy implementation to use.
* @param logger - The logger
*/
constructor(strategy: LockStrategy) {
constructor(strategy: LockStrategy, logger: Logger) {
this.strategy = strategy;
this.logger = logger;
}

/**
Expand All @@ -29,6 +40,10 @@ export class LockService {
* @returns A promise that resolves to a unique session key.
*/
async acquireLock(address: string): Promise<string> {
if (this.logger.isLevelEnabled('debug')) {
this.logger.debug(`Acquiring lock for address ${address}.`);
}

return await this.strategy.acquireLock(address);
}

Expand All @@ -40,6 +55,10 @@ export class LockService {
* @param sessionKey - The session key obtained during lock acquisition.
*/
async releaseLock(address: string, sessionKey: string): Promise<void> {
if (this.logger.isLevelEnabled('debug')) {
this.logger.debug(`Releasing lock for address ${address} and session key ${sessionKey}.`);
}

await this.strategy.releaseLock(address, sessionKey);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import { Logger } from 'pino';
import { RedisClientType } from 'redis';

import { LockStrategy } from '../../types/lock';
import { LockStrategy } from '../../types';
import { LocalLockStrategy } from './LocalLockStrategy';

/**
* Factory for creating LockStrategy instances.
Expand All @@ -20,13 +21,13 @@ export class LockStrategyFactory {
* @param logger - Logger instance for the lock strategy.
* @returns A LockStrategy implementation.
*/
// eslint-disable-next-line @typescript-eslint/no-unused-vars

static create(redisClient: RedisClientType | undefined, logger: Logger): LockStrategy {
// TODO: Remove placeholder errors once strategies are implemented
if (redisClient) {
throw new Error('Redis lock strategy not yet implemented');
// throw new Error('Redis lock strategy not yet implemented');
}

throw new Error('Local lock strategy not yet implemented');
return new LocalLockStrategy(logger);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// SPDX-License-Identifier: Apache-2.0

import { expect } from 'chai';
import { pino } from 'pino';
import sinon from 'sinon';

import { LocalLockStrategy } from '../../../../src/lib/services/lockService/LocalLockStrategy';

describe('LocalLockStrategy', function () {
this.timeout(10000);

let lockStrategy: LocalLockStrategy;

beforeEach(() => {
lockStrategy = new LocalLockStrategy(pino({ level: 'silent' }));
});

afterEach(() => {
sinon.restore();
});

function getStateEntry(address) {
// @ts-ignore
return lockStrategy.localLockStates.get(address);
}

it('should acquire and release a lock successfully', async () => {
const address = 'test-address';

const sessionKey = await lockStrategy.acquireLock(address);
expect(sessionKey).to.be.a('string');

const lockEntryAfterAcquisition = getStateEntry(address);
expect(lockEntryAfterAcquisition.sessionKey).to.not.be.null;

await lockStrategy.releaseLock(address, sessionKey);
const lockEntryAfterRelease = getStateEntry(address);
expect(lockEntryAfterRelease.sessionKey).to.be.null;
});

it('should not allow a non-owner to release a lock', async () => {
const address = 'test-non-owner';
const sessionKey = await lockStrategy.acquireLock(address);

const lockEntryAfterAcquisition = getStateEntry(address);
expect(lockEntryAfterAcquisition.sessionKey).to.not.be.null;

const wrongKey = 'fake-session';
await lockStrategy.releaseLock(address, wrongKey);

const lockEntryAfterFakeRelease = getStateEntry(address);
expect(lockEntryAfterFakeRelease.sessionKey).to.not.be.null;

await lockStrategy.releaseLock(address, sessionKey);

const lockEntryAfterRelease = getStateEntry(address);
expect(lockEntryAfterRelease.sessionKey).to.be.null;
});

it('should block a second acquire until the first is released', async () => {
const address = 'test-sequential';

const sessionKey1 = await lockStrategy.acquireLock(address);
let secondAcquired = false;

const acquire2 = (async () => {
const key2 = await lockStrategy.acquireLock(address);
secondAcquired = true;
await lockStrategy.releaseLock(address, key2);
})();

// Wait 100ms to ensure second acquire is blocked
await new Promise((res) => setTimeout(res, 100));
expect(secondAcquired).to.be.false;

// Now release first
await lockStrategy.releaseLock(address, sessionKey1);

// Wait for second acquire to complete
await acquire2;
expect(secondAcquired).to.be.true;
});

it('should auto-release after max lock time', async () => {
const address = 'test-auto-release';

// Shorten auto-release time for test
(LocalLockStrategy as any).LOCAL_LOCK_MAX_LOCK_TIME = 200; // 200ms

const releaseSpy = sinon.spy<any, any>(lockStrategy as any, 'doRelease');
await lockStrategy.acquireLock(address);

// Wait beyond auto-release timeout
await new Promise((res) => setTimeout(res, 300));

expect(releaseSpy.called).to.be.true;
const args = releaseSpy.getCall(0).args[0];
expect(args.sessionKey).to.be.null;
});

it('should reuse existing lock state for same address', async () => {
const address = 'test-reuse';

const state1 = (lockStrategy as any).getOrCreateState(address);
const state2 = (lockStrategy as any).getOrCreateState(address);

expect(state1).to.equal(state2);
});

it('should create a new lock state for new addresses', async () => {
const stateA = (lockStrategy as any).getOrCreateState('a');
const stateB = (lockStrategy as any).getOrCreateState('b');

expect(stateA).to.not.equal(stateB);
});

it('should clear timeout and reset state on release', async () => {
const address = 'test-reset';
const sessionKey = await lockStrategy.acquireLock(address);
const state = (lockStrategy as any).localLockStates.get(address);

expect(state.sessionKey).to.equal(sessionKey);
expect(state.maxLockTime).to.not.be.null;

await lockStrategy.releaseLock(address, sessionKey);

expect(state.sessionKey).to.be.null;
expect(state.maxLockTime).to.be.null;
expect(state.acquiredAt).to.be.null;
});

it('should ignore forceReleaseExpiredLock if session key does not match', async () => {
const address = 'test-force-mismatch';
const sessionKey = await lockStrategy.acquireLock(address);

const state = (lockStrategy as any).localLockStates.get(address);
expect(state.sessionKey).to.equal(sessionKey);

// Modify session key to simulate ownership change
state.sessionKey = 'different-key';

const spy = sinon.spy<any, any>(lockStrategy as any, 'doRelease');
await (lockStrategy as any).forceReleaseExpiredLock(address, sessionKey);

expect(spy.called).to.be.false;

await lockStrategy.releaseLock(address, 'different-key');
});
});