From 046823caf189993253c601a886de1420549e0f5e Mon Sep 17 00:00:00 2001 From: nikolay Date: Thu, 13 Nov 2025 17:09:35 +0200 Subject: [PATCH 01/13] chore: add local lock strategy Signed-off-by: nikolay --- .../transactionService/TransactionService.ts | 17 +- .../services/lockService/LocalLockStrategy.ts | 170 ++++++++++++++++++ .../lib/services/lockService/LockService.ts | 23 ++- .../lockService/LockStrategyFactory.ts | 9 +- .../lockService/LocalLockStrategy.spec.ts | 148 +++++++++++++++ 5 files changed, 360 insertions(+), 7 deletions(-) create mode 100644 packages/relay/src/lib/services/lockService/LocalLockStrategy.ts create mode 100644 packages/relay/tests/lib/services/lockService/LocalLockStrategy.spec.ts diff --git a/packages/relay/src/lib/services/ethService/transactionService/TransactionService.ts b/packages/relay/src/lib/services/ethService/transactionService/TransactionService.ts index b44d810d3d..bf959a7476 100644 --- a/packages/relay/src/lib/services/ethService/transactionService/TransactionService.ts +++ b/packages/relay/src/lib/services/ethService/transactionService/TransactionService.ts @@ -21,7 +21,7 @@ import { Precheck } from '../../../precheck'; import { ITransactionReceipt, RequestDetails, TypedEvents } from '../../../types'; import { CacheService } from '../../cacheService/cacheService'; import HAPIService from '../../hapiService/hapiService'; -import { ICommonService, TransactionPoolService } from '../../index'; +import { ICommonService, LockService, LockStrategyFactory, TransactionPoolService } from '../../index'; import { ITransactionService } from './ITransactionService'; export class TransactionService implements ITransactionService { @@ -66,8 +66,18 @@ export class TransactionService implements ITransactionService { */ private readonly precheck: Precheck; + /** + * Service responsible for managing pending transactions. + */ private readonly transactionPoolService: TransactionPoolService; + /** + * Service that provides mechanisms for acquiring and releasing locks + * to ensure thread-safe and concurrent operation handling across + * asynchronous processes. + */ + private readonly lockService: LockService; + /** * The ID of the chain, as a hex string, as it would be returned in a JSON-RPC call. * @private @@ -96,6 +106,7 @@ export class TransactionService implements ITransactionService { this.mirrorNodeClient = mirrorNodeClient; this.precheck = new Precheck(mirrorNodeClient, chain, transactionPoolService); this.transactionPoolService = transactionPoolService; + this.lockService = new LockService(LockStrategyFactory.create(undefined, logger), logger); } /** @@ -472,6 +483,7 @@ export class TransactionService implements ITransactionService { * @param {EthersTransaction} parsedTx - The parsed Ethereum transaction object. * @param {number} networkGasPriceInWeiBars - The current network gas price in wei bars. * @param {RequestDetails} requestDetails - Details of the request for logging and tracking purposes. + * @param {string} sessionKey - The key that is used as identifier in the lock service * @returns {Promise} A promise that resolves to the transaction hash if successful, or a JsonRpcError if an error occurs. */ async sendRawTransactionProcessor( @@ -484,6 +496,8 @@ export class TransactionService implements ITransactionService { const originalCallerAddress = parsedTx.from?.toString() || ''; + const sessionKey = await this.lockService.acquireLock(parsedTx.from!); + this.eventEmitter.emit('eth_execution', { method: constants.ETH_SEND_RAW_TRANSACTION, }); @@ -497,6 +511,7 @@ export class TransactionService implements ITransactionService { // Remove the transaction from the transaction pool after submission await this.transactionPoolService.removeTransaction(originalCallerAddress, parsedTx.serialized); + await this.lockService.releaseLock(parsedTx.from!, sessionKey); sendRawTransactionError = error; diff --git a/packages/relay/src/lib/services/lockService/LocalLockStrategy.ts b/packages/relay/src/lib/services/lockService/LocalLockStrategy.ts new file mode 100644 index 0000000000..8734a8ee18 --- /dev/null +++ b/packages/relay/src/lib/services/lockService/LocalLockStrategy.ts @@ -0,0 +1,170 @@ +// SPDX-License-Identifier: Apache-2.0 + +import { Mutex } from 'async-mutex'; +import { randomUUID } from 'crypto'; +import { LRUCache } from 'lru-cache'; +import { Logger } from 'pino'; + +import { LockStrategy } from '../../types'; + +/** + * Represents the internal state for a lock associated with a given address. + */ +interface LockState { + mutex: Mutex; + sessionKey: string | null; + acquiredAt: number | null; + maxLockTime: NodeJS.Timeout | null; +} + +/** + * Implements a local, in-memory locking strategy. + * + * Each unique "address" gets its own mutex to ensure only one session can hold + * the lock at a time. Locks are auto-expiring and stored in an LRU cache. + */ +export class LocalLockStrategy { + /** + * Maximum number of lock entries stored in memory. + * Prevents unbounded memory growth. + */ + public static LOCAL_LOCK_MAX_ENTRIES: number = 1_000; // Max 1000 addresses + + /** + * Time-to-live for each lock entry in the cache (in milliseconds). + */ + public static LOCAL_LOCK_TTL: number = 300_000; // 5 minutes + + /** + * Seconds for auto-release if lock not manually released + */ + public static LOCAL_LOCK_MAX_LOCK_TIME: number = 30_000; // 60 secs + + /** + * LRU cache of lock states, keyed by address. + */ + private localLockStates = new LRUCache({ + max: LocalLockStrategy.LOCAL_LOCK_MAX_ENTRIES, + ttl: LocalLockStrategy.LOCAL_LOCK_TTL, + }); + + /** + * Logger + * + * @private + */ + private readonly logger: Logger; + + /** + * Creates a new LocalLockStrategy instance. + * + * @param logger - The logger + */ + constructor(logger: Logger) { + this.logger = logger; + } + + /** + * Acquire a lock for a specific address. + * Waits until the lock is available (blocking if another session holds it). + * + * @param address - The key representing the resource to lock + * @returns A session key identifying the current lock owner + */ + async acquireLock(address: string): Promise { + const sessionKey = randomUUID(); + const state = this.getOrCreateState(address); + + // Acquire the mutex (this will block until available) + await state.mutex.acquire(); + + // Record lock ownership metadata + state.sessionKey = sessionKey; + state.acquiredAt = Date.now(); + + // Start a 30-second timer to auto-release if lock not manually released + state.maxLockTime = setTimeout(() => { + this.forceReleaseExpiredLock(address, sessionKey); + }, LocalLockStrategy.LOCAL_LOCK_MAX_LOCK_TIME); + + return sessionKey; + } + + /** + * Release a previously acquired lock, if the session key matches the current owner. + * + * @param address - The locked resource key + * @param sessionKey - The session key of the lock holder + */ + async releaseLock(address: string, sessionKey: string): Promise { + const state = this.localLockStates.get(address); + + // Ensure only the lock owner can release + if (state?.sessionKey !== sessionKey) { + return; // Not the owner — safely ignore + } + + // Perform cleanup and release + await this.doRelease(state); + } + + /** + * Retrieve an existing lock state for the given address, or create a new one if it doesn't exist. + * + * @param address - Unique identifier for the lock + * @returns The LockState object associated with the address + */ + private getOrCreateState(address: string): LockState { + if (!this.localLockStates.has(address)) { + this.localLockStates.set(address, { + mutex: new Mutex(), + sessionKey: null, + acquiredAt: null, + maxLockTime: null, + }); + } + + return this.localLockStates.get(address)!; + } + + /** + * Internal helper to perform cleanup and release the mutex. + * + * @param state - The LockState instance to reset and release + */ + private async doRelease(state: LockState): Promise { + // Clear timeout first + clearTimeout(state.maxLockTime!); + + // Reset state + state.sessionKey = null; + state.maxLockTime = null; + state.acquiredAt = null; + + // Release the mutex lock + state.mutex.release(); + } + + /** + * Forcefully release a lock that has exceeded its maximum execution time. + * Used by the timeout set during `acquireLock`. + * + * @param address - The resource key associated with the lock + * @param sessionKey - The session key to verify ownership before releasing + */ + private async forceReleaseExpiredLock(address: string, sessionKey: string): Promise { + const state = this.localLockStates.get(address); + + // Ensure the session still owns the lock before force-releasing + if (!state || state.sessionKey !== sessionKey) { + return; // Already released or lock reassigned + } + + if (this.logger.isLevelEnabled('debug')) { + const holdTime = Date.now() - state.acquiredAt!; + this.logger.debug(`Force releasing expired local lock for address ${address} held for ${holdTime}ms.`); + } + + await this.doRelease(state); + } +} diff --git a/packages/relay/src/lib/services/lockService/LockService.ts b/packages/relay/src/lib/services/lockService/LockService.ts index 744875b4b4..949a08198e 100644 --- a/packages/relay/src/lib/services/lockService/LockService.ts +++ b/packages/relay/src/lib/services/lockService/LockService.ts @@ -1,6 +1,8 @@ // SPDX-License-Identifier: Apache-2.0 -import { LockStrategy } from '../../types/lock'; +import { Logger } from 'pino'; + +import { LockStrategy } from '../../types'; /** * Service that manages transaction ordering through distributed locking. @@ -12,13 +14,22 @@ export class LockService { */ private readonly strategy: LockStrategy; + /** + * Logger + * + * @private + */ + private readonly logger: Logger; + /** * Creates a new LockService instance. * * @param strategy - The lock strategy implementation to use. + * @param logger - The logger */ - constructor(strategy: LockStrategy) { + constructor(strategy: LockStrategy, logger: Logger) { this.strategy = strategy; + this.logger = logger; } /** @@ -29,6 +40,10 @@ export class LockService { * @returns A promise that resolves to a unique session key. */ async acquireLock(address: string): Promise { + if (this.logger.isLevelEnabled('debug')) { + this.logger.debug(`Acquiring lock for address ${address}.`); + } + return await this.strategy.acquireLock(address); } @@ -40,6 +55,10 @@ export class LockService { * @param sessionKey - The session key obtained during lock acquisition. */ async releaseLock(address: string, sessionKey: string): Promise { + if (this.logger.isLevelEnabled('debug')) { + this.logger.debug(`Releasing lock for address ${address} and session key ${sessionKey}.`); + } + await this.strategy.releaseLock(address, sessionKey); } } diff --git a/packages/relay/src/lib/services/lockService/LockStrategyFactory.ts b/packages/relay/src/lib/services/lockService/LockStrategyFactory.ts index 72a5756c03..138e0178c1 100644 --- a/packages/relay/src/lib/services/lockService/LockStrategyFactory.ts +++ b/packages/relay/src/lib/services/lockService/LockStrategyFactory.ts @@ -3,7 +3,8 @@ import { Logger } from 'pino'; import { RedisClientType } from 'redis'; -import { LockStrategy } from '../../types/lock'; +import { LockStrategy } from '../../types'; +import { LocalLockStrategy } from './LocalLockStrategy'; /** * Factory for creating LockStrategy instances. @@ -20,13 +21,13 @@ export class LockStrategyFactory { * @param logger - Logger instance for the lock strategy. * @returns A LockStrategy implementation. */ - // eslint-disable-next-line @typescript-eslint/no-unused-vars + static create(redisClient: RedisClientType | undefined, logger: Logger): LockStrategy { // TODO: Remove placeholder errors once strategies are implemented if (redisClient) { - throw new Error('Redis lock strategy not yet implemented'); + // throw new Error('Redis lock strategy not yet implemented'); } - throw new Error('Local lock strategy not yet implemented'); + return new LocalLockStrategy(logger); } } diff --git a/packages/relay/tests/lib/services/lockService/LocalLockStrategy.spec.ts b/packages/relay/tests/lib/services/lockService/LocalLockStrategy.spec.ts new file mode 100644 index 0000000000..df18c743bf --- /dev/null +++ b/packages/relay/tests/lib/services/lockService/LocalLockStrategy.spec.ts @@ -0,0 +1,148 @@ +// SPDX-License-Identifier: Apache-2.0 + +import { expect } from 'chai'; +import sinon from 'sinon'; + +import { LocalLockStrategy } from '../../../../src/lib/services/lockService/LocalLockStrategy'; + +describe('LocalLockStrategy', function () { + this.timeout(10000); + + let lockStrategy: LocalLockStrategy; + + beforeEach(() => { + lockStrategy = new LocalLockStrategy(); + }); + + afterEach(() => { + sinon.restore(); + }); + + function getStateEntry(address) { + // @ts-ignore + return lockStrategy.localLockStates.get(address); + } + + it('should acquire and release a lock successfully', async () => { + const address = 'test-address'; + + const sessionKey = await lockStrategy.acquireLock(address); + expect(sessionKey).to.be.a('string'); + + const lockEntryAfterAcquisition = getStateEntry(address); + expect(lockEntryAfterAcquisition.sessionKey).to.not.be.null; + + await lockStrategy.releaseLock(address, sessionKey); + const lockEntryAfterRelease = getStateEntry(address); + expect(lockEntryAfterRelease.sessionKey).to.be.null; + }); + + it('should not allow a non-owner to release a lock', async () => { + const address = 'test-non-owner'; + const sessionKey = await lockStrategy.acquireLock(address); + + const lockEntryAfterAcquisition = getStateEntry(address); + expect(lockEntryAfterAcquisition.sessionKey).to.not.be.null; + + const wrongKey = 'fake-session'; + await lockStrategy.releaseLock(address, wrongKey); + + const lockEntryAfterFakeRelease = getStateEntry(address); + expect(lockEntryAfterFakeRelease.sessionKey).to.not.be.null; + + await lockStrategy.releaseLock(address, sessionKey); + + const lockEntryAfterRelease = getStateEntry(address); + expect(lockEntryAfterRelease.sessionKey).to.be.null; + }); + + it('should block a second acquire until the first is released', async () => { + const address = 'test-sequential'; + + const sessionKey1 = await lockStrategy.acquireLock(address); + let secondAcquired = false; + + const acquire2 = (async () => { + const key2 = await lockStrategy.acquireLock(address); + secondAcquired = true; + await lockStrategy.releaseLock(address, key2); + })(); + + // Wait 100ms to ensure second acquire is blocked + await new Promise((res) => setTimeout(res, 100)); + expect(secondAcquired).to.be.false; + + // Now release first + await lockStrategy.releaseLock(address, sessionKey1); + + // Wait for second acquire to complete + await acquire2; + expect(secondAcquired).to.be.true; + }); + + it('should auto-release after max lock time', async () => { + const address = 'test-auto-release'; + + // Shorten auto-release time for test + (LocalLockStrategy as any).LOCAL_LOCK_MAX_LOCK_TIME = 200; // 200ms + + const releaseSpy = sinon.spy(lockStrategy as any, 'doRelease'); + const sessionKey = await lockStrategy.acquireLock(address); + + // Wait beyond auto-release timeout + await new Promise((res) => setTimeout(res, 300)); + + expect(releaseSpy.called).to.be.true; + const args = releaseSpy.getCall(0).args[0]; + expect(args.sessionKey).to.be.null; + }); + + it('should reuse existing lock state for same address', async () => { + const address = 'test-reuse'; + + const state1 = (lockStrategy as any).getOrCreateState(address); + const state2 = (lockStrategy as any).getOrCreateState(address); + + expect(state1).to.equal(state2); + }); + + it('should create a new lock state for new addresses', async () => { + const stateA = (lockStrategy as any).getOrCreateState('a'); + const stateB = (lockStrategy as any).getOrCreateState('b'); + + expect(stateA).to.not.equal(stateB); + }); + + it('should clear timeout and reset state on release', async () => { + const address = 'test-reset'; + const sessionKey = await lockStrategy.acquireLock(address); + const state = (lockStrategy as any).localLockStates.get(address); + + expect(state.sessionKey).to.equal(sessionKey); + expect(state.maxLockTime).to.not.be.null; + + await lockStrategy.releaseLock(address, sessionKey); + + expect(state.sessionKey).to.be.null; + expect(state.maxLockTime).to.be.null; + expect(state.acquiredAt).to.be.null; + }); + + it('should ignore forceReleaseExpiredLock if session key does not match', async () => { + const address = 'test-force-mismatch'; + const sessionKey = await lockStrategy.acquireLock(address); + + const state = (lockStrategy as any).localLockStates.get(address); + expect(state.sessionKey).to.equal(sessionKey); + + // Modify session key to simulate ownership change + state.sessionKey = 'different-key'; + + const spy = sinon.spy(lockStrategy as any, 'doRelease'); + await (lockStrategy as any).forceReleaseExpiredLock(address, sessionKey); + + expect(spy.called).to.be.false; + + await lockStrategy.releaseLock(address, 'different-key'); + }); +}); From 9f64cd0a6c0c4b656c209720b1c5553662d99c80 Mon Sep 17 00:00:00 2001 From: nikolay Date: Thu, 13 Nov 2025 17:15:13 +0200 Subject: [PATCH 02/13] chore: fix comment Signed-off-by: nikolay --- .../relay/src/lib/services/lockService/LocalLockStrategy.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/relay/src/lib/services/lockService/LocalLockStrategy.ts b/packages/relay/src/lib/services/lockService/LocalLockStrategy.ts index 8734a8ee18..4252a2b0ff 100644 --- a/packages/relay/src/lib/services/lockService/LocalLockStrategy.ts +++ b/packages/relay/src/lib/services/lockService/LocalLockStrategy.ts @@ -49,7 +49,7 @@ export class LocalLockStrategy { }); /** - * Logger + * Logger. * * @private */ From 11e6bf5f44b529465188a3e936165c99521068ee Mon Sep 17 00:00:00 2001 From: nikolay Date: Thu, 13 Nov 2025 17:15:47 +0200 Subject: [PATCH 03/13] chore: remove unused var Signed-off-by: nikolay --- .../relay/src/lib/services/lockService/LocalLockStrategy.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/packages/relay/src/lib/services/lockService/LocalLockStrategy.ts b/packages/relay/src/lib/services/lockService/LocalLockStrategy.ts index 4252a2b0ff..02b2aff833 100644 --- a/packages/relay/src/lib/services/lockService/LocalLockStrategy.ts +++ b/packages/relay/src/lib/services/lockService/LocalLockStrategy.ts @@ -5,8 +5,6 @@ import { randomUUID } from 'crypto'; import { LRUCache } from 'lru-cache'; import { Logger } from 'pino'; -import { LockStrategy } from '../../types'; - /** * Represents the internal state for a lock associated with a given address. */ From 7fc92460d8f0f6b41717fad711407d2a44437c1c Mon Sep 17 00:00:00 2001 From: nikolay Date: Thu, 13 Nov 2025 17:17:03 +0200 Subject: [PATCH 04/13] chore: fix comment Signed-off-by: nikolay --- .../relay/src/lib/services/lockService/LocalLockStrategy.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/relay/src/lib/services/lockService/LocalLockStrategy.ts b/packages/relay/src/lib/services/lockService/LocalLockStrategy.ts index 02b2aff833..f5848afa26 100644 --- a/packages/relay/src/lib/services/lockService/LocalLockStrategy.ts +++ b/packages/relay/src/lib/services/lockService/LocalLockStrategy.ts @@ -36,7 +36,7 @@ export class LocalLockStrategy { /** * Seconds for auto-release if lock not manually released */ - public static LOCAL_LOCK_MAX_LOCK_TIME: number = 30_000; // 60 secs + public static LOCAL_LOCK_MAX_LOCK_TIME: number = 30_000; // 30 secs /** * LRU cache of lock states, keyed by address. From e5344460a9ad6ba82c4c17ffa37f06e1e6419295 Mon Sep 17 00:00:00 2001 From: nikolay Date: Thu, 13 Nov 2025 17:25:26 +0200 Subject: [PATCH 05/13] chore: add test Signed-off-by: nikolay --- .../tests/lib/services/lockService/LocalLockStrategy.spec.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/relay/tests/lib/services/lockService/LocalLockStrategy.spec.ts b/packages/relay/tests/lib/services/lockService/LocalLockStrategy.spec.ts index df18c743bf..ad8ade6f94 100644 --- a/packages/relay/tests/lib/services/lockService/LocalLockStrategy.spec.ts +++ b/packages/relay/tests/lib/services/lockService/LocalLockStrategy.spec.ts @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 import { expect } from 'chai'; +import { pino } from 'pino'; import sinon from 'sinon'; import { LocalLockStrategy } from '../../../../src/lib/services/lockService/LocalLockStrategy'; @@ -11,7 +12,7 @@ describe('LocalLockStrategy', function () { let lockStrategy: LocalLockStrategy; beforeEach(() => { - lockStrategy = new LocalLockStrategy(); + lockStrategy = new LocalLockStrategy(pino({ level: 'silent' })); }); afterEach(() => { From c04db62d475f9f5974b4b5c10bf07a739cb0108a Mon Sep 17 00:00:00 2001 From: nikolay Date: Thu, 13 Nov 2025 17:26:15 +0200 Subject: [PATCH 06/13] chore: eslint fix Signed-off-by: nikolay --- .../tests/lib/services/lockService/LocalLockStrategy.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/relay/tests/lib/services/lockService/LocalLockStrategy.spec.ts b/packages/relay/tests/lib/services/lockService/LocalLockStrategy.spec.ts index ad8ade6f94..bd344b1f61 100644 --- a/packages/relay/tests/lib/services/lockService/LocalLockStrategy.spec.ts +++ b/packages/relay/tests/lib/services/lockService/LocalLockStrategy.spec.ts @@ -88,7 +88,7 @@ describe('LocalLockStrategy', function () { (LocalLockStrategy as any).LOCAL_LOCK_MAX_LOCK_TIME = 200; // 200ms const releaseSpy = sinon.spy(lockStrategy as any, 'doRelease'); - const sessionKey = await lockStrategy.acquireLock(address); + await lockStrategy.acquireLock(address); // Wait beyond auto-release timeout await new Promise((res) => setTimeout(res, 300)); From 5345dc9a7d025229e95fce76b1a97c697aba103a Mon Sep 17 00:00:00 2001 From: nikolay Date: Mon, 17 Nov 2025 15:37:10 +0200 Subject: [PATCH 07/13] chore: remove local lock ttl Signed-off-by: nikolay --- .../relay/src/lib/services/lockService/LocalLockStrategy.ts | 6 ------ 1 file changed, 6 deletions(-) diff --git a/packages/relay/src/lib/services/lockService/LocalLockStrategy.ts b/packages/relay/src/lib/services/lockService/LocalLockStrategy.ts index f5848afa26..a5888dd2b1 100644 --- a/packages/relay/src/lib/services/lockService/LocalLockStrategy.ts +++ b/packages/relay/src/lib/services/lockService/LocalLockStrategy.ts @@ -28,11 +28,6 @@ export class LocalLockStrategy { */ public static LOCAL_LOCK_MAX_ENTRIES: number = 1_000; // Max 1000 addresses - /** - * Time-to-live for each lock entry in the cache (in milliseconds). - */ - public static LOCAL_LOCK_TTL: number = 300_000; // 5 minutes - /** * Seconds for auto-release if lock not manually released */ @@ -43,7 +38,6 @@ export class LocalLockStrategy { */ private localLockStates = new LRUCache({ max: LocalLockStrategy.LOCAL_LOCK_MAX_ENTRIES, - ttl: LocalLockStrategy.LOCAL_LOCK_TTL, }); /** From 9e2d66e80047a28acc866e13652d366e496f1267 Mon Sep 17 00:00:00 2001 From: nikolay Date: Mon, 17 Nov 2025 20:08:22 +0200 Subject: [PATCH 08/13] chore: revert transaction service Signed-off-by: nikolay --- .../transactionService/TransactionService.ts | 17 +---------------- 1 file changed, 1 insertion(+), 16 deletions(-) diff --git a/packages/relay/src/lib/services/ethService/transactionService/TransactionService.ts b/packages/relay/src/lib/services/ethService/transactionService/TransactionService.ts index bf959a7476..b44d810d3d 100644 --- a/packages/relay/src/lib/services/ethService/transactionService/TransactionService.ts +++ b/packages/relay/src/lib/services/ethService/transactionService/TransactionService.ts @@ -21,7 +21,7 @@ import { Precheck } from '../../../precheck'; import { ITransactionReceipt, RequestDetails, TypedEvents } from '../../../types'; import { CacheService } from '../../cacheService/cacheService'; import HAPIService from '../../hapiService/hapiService'; -import { ICommonService, LockService, LockStrategyFactory, TransactionPoolService } from '../../index'; +import { ICommonService, TransactionPoolService } from '../../index'; import { ITransactionService } from './ITransactionService'; export class TransactionService implements ITransactionService { @@ -66,18 +66,8 @@ export class TransactionService implements ITransactionService { */ private readonly precheck: Precheck; - /** - * Service responsible for managing pending transactions. - */ private readonly transactionPoolService: TransactionPoolService; - /** - * Service that provides mechanisms for acquiring and releasing locks - * to ensure thread-safe and concurrent operation handling across - * asynchronous processes. - */ - private readonly lockService: LockService; - /** * The ID of the chain, as a hex string, as it would be returned in a JSON-RPC call. * @private @@ -106,7 +96,6 @@ export class TransactionService implements ITransactionService { this.mirrorNodeClient = mirrorNodeClient; this.precheck = new Precheck(mirrorNodeClient, chain, transactionPoolService); this.transactionPoolService = transactionPoolService; - this.lockService = new LockService(LockStrategyFactory.create(undefined, logger), logger); } /** @@ -483,7 +472,6 @@ export class TransactionService implements ITransactionService { * @param {EthersTransaction} parsedTx - The parsed Ethereum transaction object. * @param {number} networkGasPriceInWeiBars - The current network gas price in wei bars. * @param {RequestDetails} requestDetails - Details of the request for logging and tracking purposes. - * @param {string} sessionKey - The key that is used as identifier in the lock service * @returns {Promise} A promise that resolves to the transaction hash if successful, or a JsonRpcError if an error occurs. */ async sendRawTransactionProcessor( @@ -496,8 +484,6 @@ export class TransactionService implements ITransactionService { const originalCallerAddress = parsedTx.from?.toString() || ''; - const sessionKey = await this.lockService.acquireLock(parsedTx.from!); - this.eventEmitter.emit('eth_execution', { method: constants.ETH_SEND_RAW_TRANSACTION, }); @@ -511,7 +497,6 @@ export class TransactionService implements ITransactionService { // Remove the transaction from the transaction pool after submission await this.transactionPoolService.removeTransaction(originalCallerAddress, parsedTx.serialized); - await this.lockService.releaseLock(parsedTx.from!, sessionKey); sendRawTransactionError = error; From d91d4112f01a704577f0935dc0cd85a67adb69a3 Mon Sep 17 00:00:00 2001 From: nikolay Date: Tue, 18 Nov 2025 09:56:39 +0200 Subject: [PATCH 09/13] chore: resolve comments Signed-off-by: nikolay --- docs/configuration.md | 2 + .../src/services/globalConfig.ts | 10 ++++ .../services/lockService/LocalLockStrategy.ts | 57 ++++++++----------- .../lib/services/lockService/LockService.ts | 19 +------ .../lockService/LocalLockStrategy.spec.ts | 29 +++++----- 5 files changed, 54 insertions(+), 63 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index 9744f2a6ea..d0239e4477 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -68,6 +68,8 @@ Unless you need to set a non-default value, it is recommended to only populate o | `IP_RATE_LIMIT_STORE` | null | Specifies the rate limit store to use for IP-based rate limiting: valid values are "LRU", "REDIS", with the possibility to be extended with a custom implementation (see [Store Selection](rate-limiting.md#store-selection)). If unset, falls back to Redis when `REDIS_ENABLED=true`, otherwise uses in-memory LRU. | | `JUMBO_TX_ENABLED` | "true" | Controls how large transactions are handled during `eth_sendRawTransaction`. When set to `true`, transactions up to 128KB can be sent directly to consensus nodes without using Hedera File Service (HFS), as long as contract bytecode doesn't exceed 24KB. When set to `false`, all transactions containing contract deployments use the traditional HFS approach. This feature leverages the increased transaction size limit to simplify processing of standard Ethereum transactions. | | `LIMIT_DURATION` | "60000" | The maximum duration in ms applied to IP-method based rate limits. | +| `LOCAL_LOCK_MAX_ENTRIES` | "1000" | Maximum number of lock entries stored in memory. Prevents unbounded memory growth. | +| `LOCAL_LOCK_MAX_LOCK_TIME` | "30000" | Timer to auto-release if lock not manually released (in ms). | | `MAX_GAS_ALLOWANCE_HBAR` | "0" | The maximum amount, in hbars, that the JSON-RPC Relay is willing to pay to complete the transaction in case the senders don't provide enough funds. Please note, in case of fully subsidized transactions, the sender must set the gas price to `0` and the JSON-RPC Relay must configure the `MAX_GAS_ALLOWANCE_HBAR` parameter high enough to cover the entire transaction cost. | | `MAX_TRANSACTION_FEE_THRESHOLD` | "15000000" | Used to set the max transaction fee. This is the HAPI fee which is paid by the relay operator account. | | `MIRROR_NODE_AGENT_CACHEABLE_DNS` | "true" | Flag to set if the mirror node agent should cacheable DNS lookups, using better-lookup library. | diff --git a/packages/config-service/src/services/globalConfig.ts b/packages/config-service/src/services/globalConfig.ts index 346cae7998..dad1b6fbca 100644 --- a/packages/config-service/src/services/globalConfig.ts +++ b/packages/config-service/src/services/globalConfig.ts @@ -363,6 +363,16 @@ const _CONFIG = { required: false, defaultValue: null, }, + LOCAL_LOCK_MAX_ENTRIES: { + type: 'number', + required: false, + defaultValue: 1000, + }, + LOCAL_LOCK_MAX_LOCK_TIME: { + type: 'number', + required: false, + defaultValue: 30000, + }, LOG_LEVEL: { type: 'string', required: false, diff --git a/packages/relay/src/lib/services/lockService/LocalLockStrategy.ts b/packages/relay/src/lib/services/lockService/LocalLockStrategy.ts index a5888dd2b1..5b7b9deb29 100644 --- a/packages/relay/src/lib/services/lockService/LocalLockStrategy.ts +++ b/packages/relay/src/lib/services/lockService/LocalLockStrategy.ts @@ -1,5 +1,6 @@ // SPDX-License-Identifier: Apache-2.0 +import { ConfigService } from '@hashgraph/json-rpc-config-service/dist/services'; import { Mutex } from 'async-mutex'; import { randomUUID } from 'crypto'; import { LRUCache } from 'lru-cache'; @@ -12,7 +13,7 @@ interface LockState { mutex: Mutex; sessionKey: string | null; acquiredAt: number | null; - maxLockTime: NodeJS.Timeout | null; + lockTimeoutId: NodeJS.Timeout | null; } /** @@ -22,22 +23,11 @@ interface LockState { * the lock at a time. Locks are auto-expiring and stored in an LRU cache. */ export class LocalLockStrategy { - /** - * Maximum number of lock entries stored in memory. - * Prevents unbounded memory growth. - */ - public static LOCAL_LOCK_MAX_ENTRIES: number = 1_000; // Max 1000 addresses - - /** - * Seconds for auto-release if lock not manually released - */ - public static LOCAL_LOCK_MAX_LOCK_TIME: number = 30_000; // 30 secs - /** * LRU cache of lock states, keyed by address. */ private localLockStates = new LRUCache({ - max: LocalLockStrategy.LOCAL_LOCK_MAX_ENTRIES, + max: ConfigService.get('LOCAL_LOCK_MAX_ENTRIES'), }); /** @@ -64,6 +54,10 @@ export class LocalLockStrategy { * @returns A session key identifying the current lock owner */ async acquireLock(address: string): Promise { + if (this.logger.isLevelEnabled('debug')) { + this.logger.debug(`Acquiring lock for address ${address}.`); + } + const sessionKey = randomUUID(); const state = this.getOrCreateState(address); @@ -75,9 +69,9 @@ export class LocalLockStrategy { state.acquiredAt = Date.now(); // Start a 30-second timer to auto-release if lock not manually released - state.maxLockTime = setTimeout(() => { + state.lockTimeoutId = setTimeout(() => { this.forceReleaseExpiredLock(address, sessionKey); - }, LocalLockStrategy.LOCAL_LOCK_MAX_LOCK_TIME); + }, ConfigService.get('LOCAL_LOCK_MAX_LOCK_TIME')); return sessionKey; } @@ -89,15 +83,16 @@ export class LocalLockStrategy { * @param sessionKey - The session key of the lock holder */ async releaseLock(address: string, sessionKey: string): Promise { + if (this.logger.isLevelEnabled('debug')) { + this.logger.debug(`Releasing lock for address ${address} and session key ${sessionKey}.`); + } + const state = this.localLockStates.get(address); // Ensure only the lock owner can release - if (state?.sessionKey !== sessionKey) { - return; // Not the owner — safely ignore + if (state?.sessionKey === sessionKey) { + await this.doRelease(state); } - - // Perform cleanup and release - await this.doRelease(state); } /** @@ -107,12 +102,13 @@ export class LocalLockStrategy { * @returns The LockState object associated with the address */ private getOrCreateState(address: string): LockState { + address = address.toLowerCase(); if (!this.localLockStates.has(address)) { this.localLockStates.set(address, { mutex: new Mutex(), sessionKey: null, acquiredAt: null, - maxLockTime: null, + lockTimeoutId: null, }); } @@ -126,11 +122,11 @@ export class LocalLockStrategy { */ private async doRelease(state: LockState): Promise { // Clear timeout first - clearTimeout(state.maxLockTime!); + clearTimeout(state.lockTimeoutId!); // Reset state state.sessionKey = null; - state.maxLockTime = null; + state.lockTimeoutId = null; state.acquiredAt = null; // Release the mutex lock @@ -147,16 +143,13 @@ export class LocalLockStrategy { private async forceReleaseExpiredLock(address: string, sessionKey: string): Promise { const state = this.localLockStates.get(address); - // Ensure the session still owns the lock before force-releasing - if (!state || state.sessionKey !== sessionKey) { - return; // Already released or lock reassigned - } + if (state?.sessionKey === sessionKey) { + await this.doRelease(state); - if (this.logger.isLevelEnabled('debug')) { - const holdTime = Date.now() - state.acquiredAt!; - this.logger.debug(`Force releasing expired local lock for address ${address} held for ${holdTime}ms.`); + if (this.logger.isLevelEnabled('debug')) { + const holdTime = Date.now() - state.acquiredAt!; + this.logger.debug(`Force releasing expired local lock for address ${address} held for ${holdTime}ms.`); + } } - - await this.doRelease(state); } } diff --git a/packages/relay/src/lib/services/lockService/LockService.ts b/packages/relay/src/lib/services/lockService/LockService.ts index 949a08198e..d53661617c 100644 --- a/packages/relay/src/lib/services/lockService/LockService.ts +++ b/packages/relay/src/lib/services/lockService/LockService.ts @@ -14,22 +14,13 @@ export class LockService { */ private readonly strategy: LockStrategy; - /** - * Logger - * - * @private - */ - private readonly logger: Logger; - /** * Creates a new LockService instance. * * @param strategy - The lock strategy implementation to use. - * @param logger - The logger */ - constructor(strategy: LockStrategy, logger: Logger) { + constructor(strategy: LockStrategy) { this.strategy = strategy; - this.logger = logger; } /** @@ -40,10 +31,6 @@ export class LockService { * @returns A promise that resolves to a unique session key. */ async acquireLock(address: string): Promise { - if (this.logger.isLevelEnabled('debug')) { - this.logger.debug(`Acquiring lock for address ${address}.`); - } - return await this.strategy.acquireLock(address); } @@ -55,10 +42,6 @@ export class LockService { * @param sessionKey - The session key obtained during lock acquisition. */ async releaseLock(address: string, sessionKey: string): Promise { - if (this.logger.isLevelEnabled('debug')) { - this.logger.debug(`Releasing lock for address ${address} and session key ${sessionKey}.`); - } - await this.strategy.releaseLock(address, sessionKey); } } diff --git a/packages/relay/tests/lib/services/lockService/LocalLockStrategy.spec.ts b/packages/relay/tests/lib/services/lockService/LocalLockStrategy.spec.ts index bd344b1f61..7e55fb971b 100644 --- a/packages/relay/tests/lib/services/lockService/LocalLockStrategy.spec.ts +++ b/packages/relay/tests/lib/services/lockService/LocalLockStrategy.spec.ts @@ -5,6 +5,7 @@ import { pino } from 'pino'; import sinon from 'sinon'; import { LocalLockStrategy } from '../../../../src/lib/services/lockService/LocalLockStrategy'; +import { withOverriddenEnvsInMochaTest } from '../../../helpers'; describe('LocalLockStrategy', function () { this.timeout(10000); @@ -81,21 +82,23 @@ describe('LocalLockStrategy', function () { expect(secondAcquired).to.be.true; }); - it('should auto-release after max lock time', async () => { - const address = 'test-auto-release'; + withOverriddenEnvsInMochaTest({ LOCAL_LOCK_MAX_LOCK_TIME: 200 }, () => { + it('should auto-release after max lock time', async () => { + const address = 'test-auto-release'; - // Shorten auto-release time for test - (LocalLockStrategy as any).LOCAL_LOCK_MAX_LOCK_TIME = 200; // 200ms + // Shorten auto-release time for test + // (LocalLockStrategy as any).LOCAL_LOCK_MAX_LOCK_TIME = 200; // 200ms - const releaseSpy = sinon.spy(lockStrategy as any, 'doRelease'); - await lockStrategy.acquireLock(address); + const releaseSpy = sinon.spy(lockStrategy as any, 'doRelease'); + await lockStrategy.acquireLock(address); - // Wait beyond auto-release timeout - await new Promise((res) => setTimeout(res, 300)); + // Wait beyond auto-release timeout + await new Promise((res) => setTimeout(res, 300)); - expect(releaseSpy.called).to.be.true; - const args = releaseSpy.getCall(0).args[0]; - expect(args.sessionKey).to.be.null; + expect(releaseSpy.called).to.be.true; + const args = releaseSpy.getCall(0).args[0]; + expect(args.sessionKey).to.be.null; + }); }); it('should reuse existing lock state for same address', async () => { @@ -120,12 +123,12 @@ describe('LocalLockStrategy', function () { const state = (lockStrategy as any).localLockStates.get(address); expect(state.sessionKey).to.equal(sessionKey); - expect(state.maxLockTime).to.not.be.null; + expect(state.lockTimeoutId).to.not.be.null; await lockStrategy.releaseLock(address, sessionKey); expect(state.sessionKey).to.be.null; - expect(state.maxLockTime).to.be.null; + expect(state.lockTimeoutId).to.be.null; expect(state.acquiredAt).to.be.null; }); From 88a3795115ac261b117b1f4d35969f32bf5e65a7 Mon Sep 17 00:00:00 2001 From: nikolay Date: Tue, 18 Nov 2025 09:57:14 +0200 Subject: [PATCH 10/13] chore: remove unsused import Signed-off-by: nikolay --- packages/relay/src/lib/services/lockService/LockService.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/packages/relay/src/lib/services/lockService/LockService.ts b/packages/relay/src/lib/services/lockService/LockService.ts index d53661617c..ccc34c9850 100644 --- a/packages/relay/src/lib/services/lockService/LockService.ts +++ b/packages/relay/src/lib/services/lockService/LockService.ts @@ -1,7 +1,5 @@ // SPDX-License-Identifier: Apache-2.0 -import { Logger } from 'pino'; - import { LockStrategy } from '../../types'; /** From 0bcf7a72428768d912f269726e775c1ea84847e1 Mon Sep 17 00:00:00 2001 From: nikolay Date: Tue, 18 Nov 2025 16:04:50 +0200 Subject: [PATCH 11/13] chore: resolve comments Signed-off-by: nikolay --- .../lockService/LocalLockStrategy.spec.ts | 30 +++++++++---------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/packages/relay/tests/lib/services/lockService/LocalLockStrategy.spec.ts b/packages/relay/tests/lib/services/lockService/LocalLockStrategy.spec.ts index 7e55fb971b..e5f937f9e2 100644 --- a/packages/relay/tests/lib/services/lockService/LocalLockStrategy.spec.ts +++ b/packages/relay/tests/lib/services/lockService/LocalLockStrategy.spec.ts @@ -21,8 +21,7 @@ describe('LocalLockStrategy', function () { }); function getStateEntry(address) { - // @ts-ignore - return lockStrategy.localLockStates.get(address); + return lockStrategy['localLockStates'].get(address); } it('should acquire and release a lock successfully', async () => { @@ -44,13 +43,15 @@ describe('LocalLockStrategy', function () { const sessionKey = await lockStrategy.acquireLock(address); const lockEntryAfterAcquisition = getStateEntry(address); - expect(lockEntryAfterAcquisition.sessionKey).to.not.be.null; + expect(lockEntryAfterAcquisition.sessionKey).to.equal(sessionKey); const wrongKey = 'fake-session'; + const doReleaseSpy = sinon.spy(lockStrategy as any, 'doRelease'); await lockStrategy.releaseLock(address, wrongKey); const lockEntryAfterFakeRelease = getStateEntry(address); - expect(lockEntryAfterFakeRelease.sessionKey).to.not.be.null; + expect(lockEntryAfterFakeRelease.sessionKey).to.equal(sessionKey); + expect(doReleaseSpy.called).to.be.false; await lockStrategy.releaseLock(address, sessionKey); @@ -86,9 +87,6 @@ describe('LocalLockStrategy', function () { it('should auto-release after max lock time', async () => { const address = 'test-auto-release'; - // Shorten auto-release time for test - // (LocalLockStrategy as any).LOCAL_LOCK_MAX_LOCK_TIME = 200; // 200ms - const releaseSpy = sinon.spy(lockStrategy as any, 'doRelease'); await lockStrategy.acquireLock(address); @@ -104,15 +102,15 @@ describe('LocalLockStrategy', function () { it('should reuse existing lock state for same address', async () => { const address = 'test-reuse'; - const state1 = (lockStrategy as any).getOrCreateState(address); - const state2 = (lockStrategy as any).getOrCreateState(address); + const state1 = lockStrategy['getOrCreateState'](address); + const state2 = lockStrategy['getOrCreateState'](address); expect(state1).to.equal(state2); }); it('should create a new lock state for new addresses', async () => { - const stateA = (lockStrategy as any).getOrCreateState('a'); - const stateB = (lockStrategy as any).getOrCreateState('b'); + const stateA = lockStrategy['getOrCreateState']('a'); + const stateB = lockStrategy['getOrCreateState']('b'); expect(stateA).to.not.equal(stateB); }); @@ -120,7 +118,7 @@ describe('LocalLockStrategy', function () { it('should clear timeout and reset state on release', async () => { const address = 'test-reset'; const sessionKey = await lockStrategy.acquireLock(address); - const state = (lockStrategy as any).localLockStates.get(address); + const state = lockStrategy['localLockStates'].get(address); expect(state.sessionKey).to.equal(sessionKey); expect(state.lockTimeoutId).to.not.be.null; @@ -136,16 +134,16 @@ describe('LocalLockStrategy', function () { const address = 'test-force-mismatch'; const sessionKey = await lockStrategy.acquireLock(address); - const state = (lockStrategy as any).localLockStates.get(address); + const state = lockStrategy['localLockStates'].get(address); expect(state.sessionKey).to.equal(sessionKey); // Modify session key to simulate ownership change state.sessionKey = 'different-key'; - const spy = sinon.spy(lockStrategy as any, 'doRelease'); - await (lockStrategy as any).forceReleaseExpiredLock(address, sessionKey); + const doReleaseSpy = sinon.spy(lockStrategy as any, 'doRelease'); + await lockStrategy['forceReleaseExpiredLock'](address, sessionKey); - expect(spy.called).to.be.false; + expect(doReleaseSpy.called).to.be.false; await lockStrategy.releaseLock(address, 'different-key'); }); From 16031ce7bfdbfa9d06e9c2fb33b5199f12100063 Mon Sep 17 00:00:00 2001 From: nikolay Date: Wed, 19 Nov 2025 11:32:14 +0200 Subject: [PATCH 12/13] chore: resolve comments Signed-off-by: nikolay --- .../relay/src/lib/services/lockService/LocalLockStrategy.ts | 2 +- .../tests/lib/services/lockService/LocalLockStrategy.spec.ts | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/relay/src/lib/services/lockService/LocalLockStrategy.ts b/packages/relay/src/lib/services/lockService/LocalLockStrategy.ts index 5b7b9deb29..953cc6effa 100644 --- a/packages/relay/src/lib/services/lockService/LocalLockStrategy.ts +++ b/packages/relay/src/lib/services/lockService/LocalLockStrategy.ts @@ -9,7 +9,7 @@ import { Logger } from 'pino'; /** * Represents the internal state for a lock associated with a given address. */ -interface LockState { +export interface LockState { mutex: Mutex; sessionKey: string | null; acquiredAt: number | null; diff --git a/packages/relay/tests/lib/services/lockService/LocalLockStrategy.spec.ts b/packages/relay/tests/lib/services/lockService/LocalLockStrategy.spec.ts index e5f937f9e2..cdd83d741d 100644 --- a/packages/relay/tests/lib/services/lockService/LocalLockStrategy.spec.ts +++ b/packages/relay/tests/lib/services/lockService/LocalLockStrategy.spec.ts @@ -4,7 +4,7 @@ import { expect } from 'chai'; import { pino } from 'pino'; import sinon from 'sinon'; -import { LocalLockStrategy } from '../../../../src/lib/services/lockService/LocalLockStrategy'; +import { LocalLockStrategy, LockState } from '../../../../src/lib/services/lockService/LocalLockStrategy'; import { withOverriddenEnvsInMochaTest } from '../../../helpers'; describe('LocalLockStrategy', function () { @@ -20,7 +20,7 @@ describe('LocalLockStrategy', function () { sinon.restore(); }); - function getStateEntry(address) { + function getStateEntry(address: string): LockState | null { return lockStrategy['localLockStates'].get(address); } From 5d0eee48a18b1a864f4dbf8b0fbc28c4993e9407 Mon Sep 17 00:00:00 2001 From: nikolay Date: Wed, 19 Nov 2025 20:55:29 +0200 Subject: [PATCH 13/13] chore: edit logging Signed-off-by: nikolay --- .../relay/src/lib/services/lockService/LocalLockStrategy.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/relay/src/lib/services/lockService/LocalLockStrategy.ts b/packages/relay/src/lib/services/lockService/LocalLockStrategy.ts index 953cc6effa..5b7c681e99 100644 --- a/packages/relay/src/lib/services/lockService/LocalLockStrategy.ts +++ b/packages/relay/src/lib/services/lockService/LocalLockStrategy.ts @@ -84,7 +84,8 @@ export class LocalLockStrategy { */ async releaseLock(address: string, sessionKey: string): Promise { if (this.logger.isLevelEnabled('debug')) { - this.logger.debug(`Releasing lock for address ${address} and session key ${sessionKey}.`); + const holdTime = Date.now() - state.acquiredAt!; + this.logger.debug(`Releasing lock for address ${address} and session key ${sessionKey} held for ${holdTime}ms.`); } const state = this.localLockStates.get(address);