Skip to content

Commit 6ae725a

Browse files
authored
feat: create LocalLockStrategy using LRU and async-mutex (#4610)
Signed-off-by: nikolay <n.atanasow94@gmail.com>
1 parent 5a88a01 commit 6ae725a

File tree

6 files changed

+324
-5
lines changed

6 files changed

+324
-5
lines changed

docs/configuration.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ Unless you need to set a non-default value, it is recommended to only populate o
6868
| `IP_RATE_LIMIT_STORE` | null | Specifies the rate limit store to use for IP-based rate limiting: valid values are "LRU", "REDIS", with the possibility to be extended with a custom implementation (see [Store Selection](rate-limiting.md#store-selection)). If unset, falls back to Redis when `REDIS_ENABLED=true`, otherwise uses in-memory LRU. |
6969
| `JUMBO_TX_ENABLED` | "true" | Controls how large transactions are handled during `eth_sendRawTransaction`. When set to `true`, transactions up to 128KB can be sent directly to consensus nodes without using Hedera File Service (HFS), as long as contract bytecode doesn't exceed 24KB. When set to `false`, all transactions containing contract deployments use the traditional HFS approach. This feature leverages the increased transaction size limit to simplify processing of standard Ethereum transactions. |
7070
| `LIMIT_DURATION` | "60000" | The maximum duration in ms applied to IP-method based rate limits. |
71+
| `LOCAL_LOCK_MAX_ENTRIES` | "1000" | Maximum number of lock entries stored in memory. Prevents unbounded memory growth. |
72+
| `LOCAL_LOCK_MAX_LOCK_TIME` | "30000" | Timer to auto-release if lock not manually released (in ms). |
7173
| `MAX_GAS_ALLOWANCE_HBAR` | "0" | The maximum amount, in hbars, that the JSON-RPC Relay is willing to pay to complete the transaction in case the senders don't provide enough funds. Please note, in case of fully subsidized transactions, the sender must set the gas price to `0` and the JSON-RPC Relay must configure the `MAX_GAS_ALLOWANCE_HBAR` parameter high enough to cover the entire transaction cost. |
7274
| `MAX_TRANSACTION_FEE_THRESHOLD` | "15000000" | Used to set the max transaction fee. This is the HAPI fee which is paid by the relay operator account. |
7375
| `MIRROR_NODE_AGENT_CACHEABLE_DNS` | "true" | Flag to set if the mirror node agent should cacheable DNS lookups, using better-lookup library. |

packages/config-service/src/services/globalConfig.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,16 @@ const _CONFIG = {
363363
required: false,
364364
defaultValue: null,
365365
},
366+
LOCAL_LOCK_MAX_ENTRIES: {
367+
type: 'number',
368+
required: false,
369+
defaultValue: 1000,
370+
},
371+
LOCAL_LOCK_MAX_LOCK_TIME: {
372+
type: 'number',
373+
required: false,
374+
defaultValue: 30000,
375+
},
366376
LOG_LEVEL: {
367377
type: 'string',
368378
required: false,
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
3+
import { ConfigService } from '@hashgraph/json-rpc-config-service/dist/services';
4+
import { Mutex } from 'async-mutex';
5+
import { randomUUID } from 'crypto';
6+
import { LRUCache } from 'lru-cache';
7+
import { Logger } from 'pino';
8+
9+
/**
10+
* Represents the internal state for a lock associated with a given address.
11+
*/
12+
export interface LockState {
13+
mutex: Mutex;
14+
sessionKey: string | null;
15+
acquiredAt: number | null;
16+
lockTimeoutId: NodeJS.Timeout | null;
17+
}
18+
19+
/**
20+
* Implements a local, in-memory locking strategy.
21+
*
22+
* Each unique "address" gets its own mutex to ensure only one session can hold
23+
* the lock at a time. Locks are auto-expiring and stored in an LRU cache.
24+
*/
25+
export class LocalLockStrategy {
26+
/**
27+
* LRU cache of lock states, keyed by address.
28+
*/
29+
private localLockStates = new LRUCache<string, LockState>({
30+
max: ConfigService.get('LOCAL_LOCK_MAX_ENTRIES'),
31+
});
32+
33+
/**
34+
* Logger.
35+
*
36+
* @private
37+
*/
38+
private readonly logger: Logger;
39+
40+
/**
41+
* Creates a new LocalLockStrategy instance.
42+
*
43+
* @param logger - The logger
44+
*/
45+
constructor(logger: Logger) {
46+
this.logger = logger;
47+
}
48+
49+
/**
50+
* Acquire a lock for a specific address.
51+
* Waits until the lock is available (blocking if another session holds it).
52+
*
53+
* @param address - The key representing the resource to lock
54+
* @returns A session key identifying the current lock owner
55+
*/
56+
async acquireLock(address: string): Promise<string> {
57+
if (this.logger.isLevelEnabled('debug')) {
58+
this.logger.debug(`Acquiring lock for address ${address}.`);
59+
}
60+
61+
const sessionKey = randomUUID();
62+
const state = this.getOrCreateState(address);
63+
64+
// Acquire the mutex (this will block until available)
65+
await state.mutex.acquire();
66+
67+
// Record lock ownership metadata
68+
state.sessionKey = sessionKey;
69+
state.acquiredAt = Date.now();
70+
71+
// Start a 30-second timer to auto-release if lock not manually released
72+
state.lockTimeoutId = setTimeout(() => {
73+
this.forceReleaseExpiredLock(address, sessionKey);
74+
}, ConfigService.get('LOCAL_LOCK_MAX_LOCK_TIME'));
75+
76+
return sessionKey;
77+
}
78+
79+
/**
80+
* Release a previously acquired lock, if the session key matches the current owner.
81+
*
82+
* @param address - The locked resource key
83+
* @param sessionKey - The session key of the lock holder
84+
*/
85+
async releaseLock(address: string, sessionKey: string): Promise<void> {
86+
if (this.logger.isLevelEnabled('debug')) {
87+
const holdTime = Date.now() - state.acquiredAt!;
88+
this.logger.debug(`Releasing lock for address ${address} and session key ${sessionKey} held for ${holdTime}ms.`);
89+
}
90+
91+
const state = this.localLockStates.get(address);
92+
93+
// Ensure only the lock owner can release
94+
if (state?.sessionKey === sessionKey) {
95+
await this.doRelease(state);
96+
}
97+
}
98+
99+
/**
100+
* Retrieve an existing lock state for the given address, or create a new one if it doesn't exist.
101+
*
102+
* @param address - Unique identifier for the lock
103+
* @returns The LockState object associated with the address
104+
*/
105+
private getOrCreateState(address: string): LockState {
106+
address = address.toLowerCase();
107+
if (!this.localLockStates.has(address)) {
108+
this.localLockStates.set(address, {
109+
mutex: new Mutex(),
110+
sessionKey: null,
111+
acquiredAt: null,
112+
lockTimeoutId: null,
113+
});
114+
}
115+
116+
return this.localLockStates.get(address)!;
117+
}
118+
119+
/**
120+
* Internal helper to perform cleanup and release the mutex.
121+
*
122+
* @param state - The LockState instance to reset and release
123+
*/
124+
private async doRelease(state: LockState): Promise<void> {
125+
// Clear timeout first
126+
clearTimeout(state.lockTimeoutId!);
127+
128+
// Reset state
129+
state.sessionKey = null;
130+
state.lockTimeoutId = null;
131+
state.acquiredAt = null;
132+
133+
// Release the mutex lock
134+
state.mutex.release();
135+
}
136+
137+
/**
138+
* Forcefully release a lock that has exceeded its maximum execution time.
139+
* Used by the timeout set during `acquireLock`.
140+
*
141+
* @param address - The resource key associated with the lock
142+
* @param sessionKey - The session key to verify ownership before releasing
143+
*/
144+
private async forceReleaseExpiredLock(address: string, sessionKey: string): Promise<void> {
145+
const state = this.localLockStates.get(address);
146+
147+
if (state?.sessionKey === sessionKey) {
148+
await this.doRelease(state);
149+
150+
if (this.logger.isLevelEnabled('debug')) {
151+
const holdTime = Date.now() - state.acquiredAt!;
152+
this.logger.debug(`Force releasing expired local lock for address ${address} held for ${holdTime}ms.`);
153+
}
154+
}
155+
}
156+
}

packages/relay/src/lib/services/lockService/LockService.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
// SPDX-License-Identifier: Apache-2.0
22

3-
import { LockStrategy } from '../../types/lock';
3+
import { LockStrategy } from '../../types';
44

55
/**
66
* Service that manages transaction ordering through distributed locking.

packages/relay/src/lib/services/lockService/LockStrategyFactory.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
import { Logger } from 'pino';
44
import { RedisClientType } from 'redis';
55

6-
import { LockStrategy } from '../../types/lock';
6+
import { LockStrategy } from '../../types';
7+
import { LocalLockStrategy } from './LocalLockStrategy';
78

89
/**
910
* Factory for creating LockStrategy instances.
@@ -20,13 +21,13 @@ export class LockStrategyFactory {
2021
* @param logger - Logger instance for the lock strategy.
2122
* @returns A LockStrategy implementation.
2223
*/
23-
// eslint-disable-next-line @typescript-eslint/no-unused-vars
24+
2425
static create(redisClient: RedisClientType | undefined, logger: Logger): LockStrategy {
2526
// TODO: Remove placeholder errors once strategies are implemented
2627
if (redisClient) {
27-
throw new Error('Redis lock strategy not yet implemented');
28+
// throw new Error('Redis lock strategy not yet implemented');
2829
}
2930

30-
throw new Error('Local lock strategy not yet implemented');
31+
return new LocalLockStrategy(logger);
3132
}
3233
}
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
3+
import { expect } from 'chai';
4+
import { pino } from 'pino';
5+
import sinon from 'sinon';
6+
7+
import { LocalLockStrategy, LockState } from '../../../../src/lib/services/lockService/LocalLockStrategy';
8+
import { withOverriddenEnvsInMochaTest } from '../../../helpers';
9+
10+
describe('LocalLockStrategy', function () {
11+
this.timeout(10000);
12+
13+
let lockStrategy: LocalLockStrategy;
14+
15+
beforeEach(() => {
16+
lockStrategy = new LocalLockStrategy(pino({ level: 'silent' }));
17+
});
18+
19+
afterEach(() => {
20+
sinon.restore();
21+
});
22+
23+
function getStateEntry(address: string): LockState | null {
24+
return lockStrategy['localLockStates'].get(address);
25+
}
26+
27+
it('should acquire and release a lock successfully', async () => {
28+
const address = 'test-address';
29+
30+
const sessionKey = await lockStrategy.acquireLock(address);
31+
expect(sessionKey).to.be.a('string');
32+
33+
const lockEntryAfterAcquisition = getStateEntry(address);
34+
expect(lockEntryAfterAcquisition.sessionKey).to.not.be.null;
35+
36+
await lockStrategy.releaseLock(address, sessionKey);
37+
const lockEntryAfterRelease = getStateEntry(address);
38+
expect(lockEntryAfterRelease.sessionKey).to.be.null;
39+
});
40+
41+
it('should not allow a non-owner to release a lock', async () => {
42+
const address = 'test-non-owner';
43+
const sessionKey = await lockStrategy.acquireLock(address);
44+
45+
const lockEntryAfterAcquisition = getStateEntry(address);
46+
expect(lockEntryAfterAcquisition.sessionKey).to.equal(sessionKey);
47+
48+
const wrongKey = 'fake-session';
49+
const doReleaseSpy = sinon.spy<any, any>(lockStrategy as any, 'doRelease');
50+
await lockStrategy.releaseLock(address, wrongKey);
51+
52+
const lockEntryAfterFakeRelease = getStateEntry(address);
53+
expect(lockEntryAfterFakeRelease.sessionKey).to.equal(sessionKey);
54+
expect(doReleaseSpy.called).to.be.false;
55+
56+
await lockStrategy.releaseLock(address, sessionKey);
57+
58+
const lockEntryAfterRelease = getStateEntry(address);
59+
expect(lockEntryAfterRelease.sessionKey).to.be.null;
60+
});
61+
62+
it('should block a second acquire until the first is released', async () => {
63+
const address = 'test-sequential';
64+
65+
const sessionKey1 = await lockStrategy.acquireLock(address);
66+
let secondAcquired = false;
67+
68+
const acquire2 = (async () => {
69+
const key2 = await lockStrategy.acquireLock(address);
70+
secondAcquired = true;
71+
await lockStrategy.releaseLock(address, key2);
72+
})();
73+
74+
// Wait 100ms to ensure second acquire is blocked
75+
await new Promise((res) => setTimeout(res, 100));
76+
expect(secondAcquired).to.be.false;
77+
78+
// Now release first
79+
await lockStrategy.releaseLock(address, sessionKey1);
80+
81+
// Wait for second acquire to complete
82+
await acquire2;
83+
expect(secondAcquired).to.be.true;
84+
});
85+
86+
withOverriddenEnvsInMochaTest({ LOCAL_LOCK_MAX_LOCK_TIME: 200 }, () => {
87+
it('should auto-release after max lock time', async () => {
88+
const address = 'test-auto-release';
89+
90+
const releaseSpy = sinon.spy<any, any>(lockStrategy as any, 'doRelease');
91+
await lockStrategy.acquireLock(address);
92+
93+
// Wait beyond auto-release timeout
94+
await new Promise((res) => setTimeout(res, 300));
95+
96+
expect(releaseSpy.called).to.be.true;
97+
const args = releaseSpy.getCall(0).args[0];
98+
expect(args.sessionKey).to.be.null;
99+
});
100+
});
101+
102+
it('should reuse existing lock state for same address', async () => {
103+
const address = 'test-reuse';
104+
105+
const state1 = lockStrategy['getOrCreateState'](address);
106+
const state2 = lockStrategy['getOrCreateState'](address);
107+
108+
expect(state1).to.equal(state2);
109+
});
110+
111+
it('should create a new lock state for new addresses', async () => {
112+
const stateA = lockStrategy['getOrCreateState']('a');
113+
const stateB = lockStrategy['getOrCreateState']('b');
114+
115+
expect(stateA).to.not.equal(stateB);
116+
});
117+
118+
it('should clear timeout and reset state on release', async () => {
119+
const address = 'test-reset';
120+
const sessionKey = await lockStrategy.acquireLock(address);
121+
const state = lockStrategy['localLockStates'].get(address);
122+
123+
expect(state.sessionKey).to.equal(sessionKey);
124+
expect(state.lockTimeoutId).to.not.be.null;
125+
126+
await lockStrategy.releaseLock(address, sessionKey);
127+
128+
expect(state.sessionKey).to.be.null;
129+
expect(state.lockTimeoutId).to.be.null;
130+
expect(state.acquiredAt).to.be.null;
131+
});
132+
133+
it('should ignore forceReleaseExpiredLock if session key does not match', async () => {
134+
const address = 'test-force-mismatch';
135+
const sessionKey = await lockStrategy.acquireLock(address);
136+
137+
const state = lockStrategy['localLockStates'].get(address);
138+
expect(state.sessionKey).to.equal(sessionKey);
139+
140+
// Modify session key to simulate ownership change
141+
state.sessionKey = 'different-key';
142+
143+
const doReleaseSpy = sinon.spy<any, any>(lockStrategy as any, 'doRelease');
144+
await lockStrategy['forceReleaseExpiredLock'](address, sessionKey);
145+
146+
expect(doReleaseSpy.called).to.be.false;
147+
148+
await lockStrategy.releaseLock(address, 'different-key');
149+
});
150+
});

0 commit comments

Comments
 (0)