Skip to content

Commit a55b490

Browse files
hmottestadkenwenzel
authored andcommitted
GH-5433 new lock manager that doesn't use a lock object and doesn't support lock tracking
1 parent 2fcabe5 commit a55b490

File tree

8 files changed

+348
-80
lines changed

8 files changed

+348
-80
lines changed
Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
/*******************************************************************************
2+
* Copyright (c) 2025 Eclipse RDF4J contributors.
3+
*
4+
* All rights reserved. This program and the accompanying materials
5+
* are made available under the terms of the Eclipse Distribution License v1.0
6+
* which accompanies this distribution, and is available at
7+
* http://www.eclipse.org/org/documents/edl-v10.php.
8+
*
9+
* SPDX-License-Identifier: BSD-3-Clause
10+
*******************************************************************************/
11+
package org.eclipse.rdf4j.common.concurrent.locks;
12+
13+
import java.lang.invoke.VarHandle;
14+
import java.util.concurrent.TimeUnit;
15+
import java.util.concurrent.atomic.LongAdder;
16+
import java.util.concurrent.locks.StampedLock;
17+
18+
/**
19+
* A lightweight read/write lock manager that avoids allocating per-lock objects. Readers are tracked using two
20+
* {@link LongAdder LongAdders} while writers rely on a {@link StampedLock}. The read-lock method returns a constant
21+
* stamp ({@link #READ_LOCK_STAMP}) and writers receive the stamp produced by the underlying {@link StampedLock}.
22+
*/
23+
public class StampedLongAdderLockManager {
24+
25+
/**
26+
* Stamp returned to callers holding a read lock. Passing any other value to {@link #unlockRead(long)} is considered
27+
* an illegal monitor state.
28+
*/
29+
public static final long READ_LOCK_STAMP = Long.MIN_VALUE;
30+
31+
private final StampedLock stampedLock = new StampedLock();
32+
private final LongAdder readersLocked = new LongAdder();
33+
private final LongAdder readersUnlocked = new LongAdder();
34+
35+
// milliseconds to wait when trying to acquire the write lock interruptibly
36+
private final int tryWriteLockMillis;
37+
38+
// Number of spin attempts before temporarily releasing the write lock when readers are active.
39+
private final int writePreference;
40+
41+
public StampedLongAdderLockManager() {
42+
this(1, 100);
43+
}
44+
45+
public StampedLongAdderLockManager(int writePreference, int tryWriteLockMillis) {
46+
this.writePreference = Math.max(1, writePreference);
47+
this.tryWriteLockMillis = Math.max(1, tryWriteLockMillis);
48+
}
49+
50+
public boolean isWriterActive() {
51+
return stampedLock.isWriteLocked();
52+
}
53+
54+
public boolean isReaderActive() {
55+
return readersUnlocked.sum() != readersLocked.sum();
56+
}
57+
58+
public void waitForActiveWriter() throws InterruptedException {
59+
while (stampedLock.isWriteLocked() && !isReaderActive()) {
60+
spinWait();
61+
}
62+
}
63+
64+
public void waitForActiveReaders() throws InterruptedException {
65+
while (isReaderActive()) {
66+
spinWait();
67+
}
68+
}
69+
70+
public long readLock() throws InterruptedException {
71+
readersLocked.increment();
72+
while (stampedLock.isWriteLocked()) {
73+
try {
74+
spinWaitAtReadLock();
75+
} catch (InterruptedException e) {
76+
readersUnlocked.increment();
77+
throw e;
78+
}
79+
}
80+
return READ_LOCK_STAMP;
81+
}
82+
83+
public long tryReadLock() {
84+
readersLocked.increment();
85+
if (!stampedLock.isWriteLocked()) {
86+
return READ_LOCK_STAMP;
87+
}
88+
readersUnlocked.increment();
89+
return 0L;
90+
}
91+
92+
public void unlockRead(long stamp) {
93+
if (stamp != READ_LOCK_STAMP) {
94+
throw new IllegalMonitorStateException("Trying to release a stamp that is not a read lock");
95+
}
96+
97+
VarHandle.acquireFence();
98+
readersUnlocked.increment();
99+
}
100+
101+
public long writeLock() throws InterruptedException {
102+
long writeStamp = writeLockInterruptibly();
103+
boolean lockAcquired = false;
104+
105+
try {
106+
int attempts = 0;
107+
do {
108+
if (Thread.interrupted()) {
109+
throw new InterruptedException();
110+
}
111+
112+
if (!hasActiveReaders()) {
113+
lockAcquired = true;
114+
break;
115+
}
116+
117+
if (attempts++ > writePreference) {
118+
attempts = 0;
119+
120+
stampedLock.unlockWrite(writeStamp);
121+
writeStamp = 0;
122+
123+
yieldWait();
124+
125+
writeStamp = writeLockInterruptibly();
126+
} else {
127+
spinWait();
128+
}
129+
130+
} while (!lockAcquired);
131+
} finally {
132+
if (!lockAcquired && writeStamp != 0) {
133+
stampedLock.unlockWrite(writeStamp);
134+
}
135+
}
136+
137+
VarHandle.releaseFence();
138+
return writeStamp;
139+
}
140+
141+
public long tryWriteLock() {
142+
long writeStamp = stampedLock.tryWriteLock();
143+
if (writeStamp == 0) {
144+
return 0L;
145+
}
146+
147+
if (!hasActiveReaders()) {
148+
VarHandle.releaseFence();
149+
return writeStamp;
150+
}
151+
152+
stampedLock.unlockWrite(writeStamp);
153+
return 0L;
154+
}
155+
156+
public void unlockWrite(long stamp) {
157+
if (stamp == 0) {
158+
throw new IllegalMonitorStateException("Trying to release a write lock that is not locked");
159+
}
160+
stampedLock.unlockWrite(stamp);
161+
}
162+
163+
private boolean hasActiveReaders() {
164+
return readersUnlocked.sum() != readersLocked.sum();
165+
}
166+
167+
private long writeLockInterruptibly() throws InterruptedException {
168+
long writeStamp;
169+
do {
170+
if (Thread.interrupted()) {
171+
throw new InterruptedException();
172+
}
173+
writeStamp = stampedLock.tryWriteLock(tryWriteLockMillis, TimeUnit.MILLISECONDS);
174+
} while (writeStamp == 0);
175+
return writeStamp;
176+
}
177+
178+
private void spinWait() throws InterruptedException {
179+
Thread.onSpinWait();
180+
if (Thread.interrupted()) {
181+
throw new InterruptedException();
182+
}
183+
}
184+
185+
private void spinWaitAtReadLock() throws InterruptedException {
186+
Thread.onSpinWait();
187+
if (Thread.interrupted()) {
188+
throw new InterruptedException();
189+
}
190+
}
191+
192+
private void yieldWait() throws InterruptedException {
193+
Thread.yield();
194+
if (Thread.interrupted()) {
195+
throw new InterruptedException();
196+
}
197+
}
198+
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*******************************************************************************
2+
* Copyright (c) 2025 Eclipse RDF4J contributors.
3+
*
4+
* All rights reserved. This program and the accompanying materials
5+
* are made available under the terms of the Eclipse Distribution License v1.0
6+
* which accompanies this distribution, and is available at
7+
* http://www.eclipse.org/org/documents/edl-v10.php.
8+
*
9+
* SPDX-License-Identifier: BSD-3-Clause
10+
*******************************************************************************/
11+
package org.eclipse.rdf4j.common.concurrent.locks;
12+
13+
import static org.junit.jupiter.api.Assertions.assertEquals;
14+
import static org.junit.jupiter.api.Assertions.assertFalse;
15+
import static org.junit.jupiter.api.Assertions.assertTrue;
16+
17+
import java.util.concurrent.CountDownLatch;
18+
import java.util.concurrent.ExecutorService;
19+
import java.util.concurrent.Executors;
20+
import java.util.concurrent.Future;
21+
import java.util.concurrent.TimeUnit;
22+
import java.util.concurrent.atomic.AtomicBoolean;
23+
24+
import org.junit.jupiter.api.Test;
25+
26+
class StampedLongAdderLockManagerTest {
27+
28+
@Test
29+
void writeLockWaitsForReaders() throws Exception {
30+
StampedLongAdderLockManager manager = new StampedLongAdderLockManager();
31+
long readStamp = manager.readLock();
32+
assertTrue(manager.isReaderActive());
33+
34+
ExecutorService executor = Executors.newSingleThreadExecutor();
35+
try {
36+
CountDownLatch attemptingWrite = new CountDownLatch(1);
37+
AtomicBoolean acquiredWrite = new AtomicBoolean(false);
38+
39+
Future<Long> writeFuture = executor.submit(() -> {
40+
attemptingWrite.countDown();
41+
long stamp = manager.writeLock();
42+
acquiredWrite.set(true);
43+
return stamp;
44+
});
45+
46+
assertTrue(attemptingWrite.await(500, TimeUnit.MILLISECONDS), "write attempt did not start in time");
47+
TimeUnit.MILLISECONDS.sleep(100);
48+
assertFalse(acquiredWrite.get(), "write lock acquired while read lock active");
49+
50+
manager.unlockRead(readStamp);
51+
long writeStamp = writeFuture.get(2, TimeUnit.SECONDS);
52+
assertTrue(acquiredWrite.get());
53+
assertTrue(manager.isWriterActive());
54+
manager.unlockWrite(writeStamp);
55+
assertFalse(manager.isWriterActive());
56+
} finally {
57+
executor.shutdownNow();
58+
executor.awaitTermination(1, TimeUnit.SECONDS);
59+
}
60+
}
61+
62+
@Test
63+
void readLockWaitsForWriters() throws Exception {
64+
StampedLongAdderLockManager manager = new StampedLongAdderLockManager();
65+
long writeStamp = manager.writeLock();
66+
assertTrue(manager.isWriterActive());
67+
68+
ExecutorService executor = Executors.newSingleThreadExecutor();
69+
try {
70+
CountDownLatch attemptingRead = new CountDownLatch(1);
71+
AtomicBoolean acquiredRead = new AtomicBoolean(false);
72+
73+
Future<Long> readFuture = executor.submit(() -> {
74+
attemptingRead.countDown();
75+
long stamp = manager.readLock();
76+
acquiredRead.set(true);
77+
return stamp;
78+
});
79+
80+
assertTrue(attemptingRead.await(500, TimeUnit.MILLISECONDS), "read attempt did not start in time");
81+
TimeUnit.MILLISECONDS.sleep(100);
82+
assertFalse(acquiredRead.get(), "read lock acquired while write lock active");
83+
84+
manager.unlockWrite(writeStamp);
85+
long readStamp = readFuture.get(2, TimeUnit.SECONDS);
86+
assertTrue(acquiredRead.get());
87+
assertEquals(StampedLongAdderLockManager.READ_LOCK_STAMP, readStamp);
88+
assertTrue(manager.isReaderActive());
89+
manager.unlockRead(readStamp);
90+
assertFalse(manager.isReaderActive());
91+
} finally {
92+
executor.shutdownNow();
93+
executor.awaitTermination(1, TimeUnit.SECONDS);
94+
}
95+
}
96+
}

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbContextIdIterator.java

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,8 @@
2323
import java.io.Closeable;
2424
import java.io.IOException;
2525
import java.nio.ByteBuffer;
26-
import java.util.concurrent.locks.StampedLock;
2726

28-
import org.eclipse.rdf4j.common.concurrent.locks.Lock;
29-
import org.eclipse.rdf4j.common.concurrent.locks.ReadWriteLockManager;
27+
import org.eclipse.rdf4j.common.concurrent.locks.StampedLongAdderLockManager;
3028
import org.eclipse.rdf4j.sail.SailException;
3129
import org.eclipse.rdf4j.sail.lmdb.TxnManager.Txn;
3230
import org.lwjgl.PointerBuffer;
@@ -63,7 +61,7 @@ class LmdbContextIdIterator implements Closeable {
6361

6462
private boolean fetchNext = false;
6563

66-
private final ReadWriteLockManager txnLockManager;
64+
private final StampedLongAdderLockManager txnLockManager;
6765

6866
private final Thread ownerThread = Thread.currentThread();
6967

@@ -76,9 +74,9 @@ class LmdbContextIdIterator implements Closeable {
7674
this.txnRef = txnRef;
7775
this.txnLockManager = txnRef.lockManager();
7876

79-
Lock lock;
77+
long readStamp;
8078
try {
81-
lock = txnLockManager.getReadLock();
79+
readStamp = txnLockManager.readLock();
8280
} catch (InterruptedException e) {
8381
throw new SailException(e);
8482
}
@@ -92,14 +90,14 @@ class LmdbContextIdIterator implements Closeable {
9290
cursor = pp.get(0);
9391
}
9492
} finally {
95-
lock.release();
93+
txnLockManager.unlockRead(readStamp);
9694
}
9795
}
9896

9997
public long[] next() {
100-
Lock lock;
98+
long readStamp;
10199
try {
102-
lock = txnLockManager.getReadLock();
100+
readStamp = txnLockManager.readLock();
103101
} catch (InterruptedException e) {
104102
throw new SailException(e);
105103
}
@@ -155,21 +153,21 @@ public long[] next() {
155153
} catch (IOException e) {
156154
throw new SailException(e);
157155
} finally {
158-
lock.release();
156+
txnLockManager.unlockRead(readStamp);
159157
}
160158
}
161159

162160
private void closeInternal(boolean maybeCalledAsync) {
163161
if (!closed) {
164-
Lock lock;
162+
long writeStamp = 0L;
163+
boolean writeLocked = false;
165164
if (maybeCalledAsync && ownerThread != Thread.currentThread()) {
166165
try {
167-
lock = txnLockManager.getWriteLock();
166+
writeStamp = txnLockManager.writeLock();
167+
writeLocked = true;
168168
} catch (InterruptedException e) {
169169
throw new SailException(e);
170170
}
171-
} else {
172-
lock = null;
173171
}
174172
try {
175173
if (!closed) {
@@ -182,8 +180,8 @@ private void closeInternal(boolean maybeCalledAsync) {
182180
}
183181
} finally {
184182
closed = true;
185-
if (lock != null) {
186-
lock.release();
183+
if (writeLocked) {
184+
txnLockManager.unlockWrite(writeStamp);
187185
}
188186
}
189187
}

0 commit comments

Comments
 (0)