Skip to content

Commit 64ba6f8

Browse files
committed
working on new ID based join iterator
1 parent 2e49e1f commit 64ba6f8

File tree

2 files changed

+195
-65
lines changed

2 files changed

+195
-65
lines changed

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbDupRecordIterator.java

Lines changed: 146 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@
4040
*/
4141
class LmdbDupRecordIterator implements RecordIterator {
4242

43+
private boolean explicit;
44+
private boolean exhausted;
45+
private boolean wasEmpty;
46+
4347
@FunctionalInterface
4448
interface FallbackSupplier {
4549
RecordIterator get(long[] quadReuse, ByteBuffer minKeyBuf, ByteBuffer maxKeyBuf,
@@ -97,79 +101,160 @@ RecordIterator get(long[] quadReuse, ByteBuffer minKeyBuf, ByteBuffer maxKeyBuf,
97101

98102
void initialize(DupIndex index, long subj, long pred, boolean explicit, Txn txnRef, long[] quadReuse)
99103
throws IOException {
100-
if (!closed) {
101-
throw new IllegalStateException("Cannot initialize LMDB dup iterator while it is open");
102-
}
103104

104-
this.index = index;
105-
this.dupDbi = index.getDupDB(explicit);
106-
this.txnRef = txnRef;
107-
this.txnLockManager = txnRef.lockManager();
105+
this.exhausted = false;
108106

109-
this.prefixSubj = subj;
110-
this.prefixPred = pred;
107+
try {
111108

112-
if (quadReuse != null && quadReuse.length >= 4) {
113-
this.quad = quadReuse;
114-
} else if (this.quad == null || this.quad.length < 4) {
115-
this.quad = new long[4];
116-
}
117-
this.quad[0] = subj;
118-
this.quad[1] = pred;
119-
this.quad[2] = -1L;
120-
this.quad[3] = -1L;
109+
// TODO: Find out why this is slower than if we called close()
110+
// close();
111+
if (closed) {
112+
this.index = index;
113+
this.explicit = explicit;
114+
this.dupDbi = index.getDupDB(explicit);
115+
this.txnRef = txnRef;
116+
assert this.txnRef != null;
117+
this.txnLockManager = txnRef.lockManager();
118+
this.prefixSubj = subj;
119+
this.prefixPred = pred;
120+
assert this.keyData == null;
121+
assert this.valueData == null;
122+
assert this.prefixKeyBuf == null;
123+
this.keyData = pool.getVal();
124+
this.valueData = pool.getVal();
125+
this.prefixKeyBuf = pool.getKeyBuffer();
126+
127+
prefixKeyBuf.clear();
128+
Varint.writeUnsigned(prefixKeyBuf, prefixSubj);
129+
Varint.writeUnsigned(prefixKeyBuf, prefixPred);
130+
prefixKeyBuf.flip();
131+
132+
if (quadReuse != null && quadReuse.length >= 4) {
133+
this.quad = quadReuse;
134+
} else if (this.quad == null || this.quad.length < 4) {
135+
this.quad = new long[4];
136+
}
121137

122-
if (this.keyData == null) {
123-
this.keyData = pool.getVal();
124-
}
125-
if (this.valueData == null) {
126-
this.valueData = pool.getVal();
127-
}
138+
this.quad[0] = subj;
139+
this.quad[1] = pred;
140+
this.quad[2] = -1L;
141+
this.quad[3] = -1L;
142+
143+
} else {
144+
// TODO
145+
assert this.index == index;
146+
assert this.explicit == explicit;
147+
assert this.txnRef == txnRef;
148+
assert this.txnRef != null;
149+
assert this.txnLockManager != null;
150+
assert this.keyData != null;
151+
assert this.valueData != null;
152+
assert this.prefixKeyBuf != null;
153+
154+
if (this.prefixSubj == subj && this.prefixPred == pred) {
155+
assert this.quad[0] == this.prefixSubj;
156+
assert this.quad[1] == this.prefixPred;
157+
158+
if (wasEmpty) {
159+
exhausted = true;
160+
return;
161+
}
128162

129-
if (this.prefixKeyBuf == null) {
130-
this.prefixKeyBuf = pool.getKeyBuffer();
131-
}
132-
prefixKeyBuf.clear();
133-
Varint.writeUnsigned(prefixKeyBuf, prefixSubj);
134-
Varint.writeUnsigned(prefixKeyBuf, prefixPred);
135-
prefixKeyBuf.flip();
163+
// We can do a lot more reuse here!
164+
this.quad[2] = -1L;
165+
this.quad[3] = -1L;
166+
} else {
167+
this.prefixSubj = subj;
168+
this.prefixPred = pred;
169+
this.quad[0] = subj;
170+
this.quad[1] = pred;
171+
this.quad[2] = -1L;
172+
this.quad[3] = -1L;
173+
174+
prefixKeyBuf.clear();
175+
Varint.writeUnsigned(prefixKeyBuf, prefixSubj);
176+
Varint.writeUnsigned(prefixKeyBuf, prefixPred);
177+
prefixKeyBuf.flip();
178+
}
136179

137-
this.dupBuf = null;
138-
this.dupPos = 0;
139-
this.dupLimit = 0;
140-
this.lastResult = MDB_SUCCESS;
141-
this.closed = false;
180+
}
142181

143-
RecordIterator fallbackIterator = null;
182+
this.dupBuf = null;
183+
this.dupPos = 0;
184+
this.dupLimit = 0;
185+
this.lastResult = MDB_SUCCESS;
186+
this.wasEmpty = false;
144187

145-
long readStamp;
146-
try {
147-
readStamp = txnLockManager.readLock();
148-
} catch (InterruptedException e) {
149-
throw new SailException(e);
150-
}
151-
try {
152-
this.txnRefVersion = txnRef.version();
153-
this.txn = txnRef.get();
188+
if (closed) {
189+
long readStamp;
190+
try {
191+
readStamp = txnLockManager.readLock();
192+
} catch (InterruptedException e) {
193+
throw new SailException(e);
194+
}
195+
try {
196+
this.txnRefVersion = txnRef.version();
197+
this.txn = txnRef.get();
154198

155-
cursor = openCursor(txn, dupDbi, txnRef.isReadOnly());
199+
cursor = openCursor(txn, dupDbi, txnRef.isReadOnly());
156200

157-
boolean positioned = positionOnPrefix();
158-
if (positioned) {
159-
positioned = primeDuplicateBlock();
160-
}
161-
if (!positioned) {
162-
closeInternal(false);
163-
// TODO: We must be empty????
201+
boolean positioned = positionOnPrefix();
202+
if (positioned) {
203+
positioned = primeDuplicateBlock();
204+
}
205+
if (!positioned) {
206+
this.exhausted = true;
207+
this.wasEmpty = true;
208+
// closeInternal(false);
209+
// TODO: We must be empty????
210+
}
211+
} finally {
212+
txnLockManager.unlockRead(readStamp);
213+
}
214+
} else {
215+
long readStamp;
216+
try {
217+
readStamp = txnLockManager.readLock();
218+
} catch (InterruptedException e) {
219+
throw new SailException(e);
220+
}
221+
try {
222+
223+
if (txnRefVersion != txnRef.version()) {
224+
this.txn = txnRef.get();
225+
E(mdb_cursor_renew(txn, cursor));
226+
txnRefVersion = txnRef.version();
227+
} else {
228+
E(mdb_cursor_renew(txn, cursor));
229+
}
230+
231+
boolean positioned = positionOnPrefix();
232+
if (positioned) {
233+
positioned = primeDuplicateBlock();
234+
}
235+
if (!positioned) {
236+
this.exhausted = true;
237+
this.wasEmpty = true;
238+
// closeInternal(false);
239+
// TODO: We must be empty????
240+
}
241+
} finally {
242+
txnLockManager.unlockRead(readStamp);
243+
}
164244
}
245+
165246
} finally {
166-
txnLockManager.unlockRead(readStamp);
247+
assert this.txnRef != null;
248+
this.closed = false;
249+
167250
}
168251

169252
}
170253

171254
@Override
172255
public long[] next() {
256+
if (exhausted)
257+
return null;
173258
long readStamp;
174259
try {
175260
readStamp = txnLockManager.readLock();
@@ -187,7 +272,8 @@ public long[] next() {
187272
E(mdb_cursor_renew(txn, cursor));
188273
txnRefVersion = txnRef.version();
189274
if (!positionOnPrefix() || !primeDuplicateBlock()) {
190-
closeInternal(false);
275+
// closeInternal(false);
276+
exhausted = true;
191277
return null;
192278
}
193279
}
@@ -219,12 +305,14 @@ public long[] next() {
219305
do {
220306
lastResult = mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT);
221307
if (lastResult != MDB_SUCCESS) {
222-
closeInternal(false);
308+
// closeInternal(false);
309+
exhausted = true;
223310
return null;
224311
}
225312
// Ensure we're still within the requested (subj,pred) prefix
226313
if (!currentKeyHasPrefix() && !adjustCursorToPrefix()) {
227-
closeInternal(false);
314+
// closeInternal(false);
315+
exhausted = true;
228316
return null;
229317
}
230318
} while (!primeDuplicateBlock()); // skip any keys without a duplicate block (defensive)

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/join/LmdbIdBGPQueryEvaluationStep.java

Lines changed: 49 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -571,8 +571,11 @@ public long[] next() throws QueryEvaluationException {
571571
if (next != null) {
572572
return next;
573573
}
574-
currentRight.close();
575-
if (currentRight != LmdbIdJoinIterator.EMPTY_RECORD_ITERATOR) {
574+
if (reusableRight == null && currentRight != LmdbIdJoinIterator.EMPTY_RECORD_ITERATOR) {
575+
reusableRight = currentRight;
576+
} else if (reusableRight != currentRight
577+
&& currentRight != LmdbIdJoinIterator.EMPTY_RECORD_ITERATOR) {
578+
reusableRight.close();
576579
reusableRight = currentRight;
577580
}
578581
currentRight = null;
@@ -600,12 +603,51 @@ public long[] next() throws QueryEvaluationException {
600603

601604
@Override
602605
public void close() {
603-
if (currentRight != null) {
604-
currentRight.close();
605-
currentRight = null;
606-
}
607-
left.close();
606+
// capture references and null out fields early to avoid double-closing if re-entered
607+
RecordIterator toCloseCurrent = currentRight;
608+
RecordIterator toCloseLeft = left;
609+
RecordIterator toCloseReusable = reusableRight;
610+
611+
currentRight = null;
608612
reusableRight = null;
613+
614+
RuntimeException first = null;
615+
616+
// Close currentRight
617+
try {
618+
if (toCloseCurrent != null) {
619+
toCloseCurrent.close();
620+
}
621+
} catch (RuntimeException e) {
622+
first = e;
623+
} finally {
624+
// Always attempt to close left
625+
try {
626+
if (toCloseLeft != null) {
627+
toCloseLeft.close();
628+
}
629+
} catch (RuntimeException e) {
630+
if (first == null) {
631+
first = e;
632+
}
633+
} finally {
634+
// Always attempt to close reusableRight
635+
try {
636+
if (toCloseReusable != null) {
637+
toCloseReusable.close();
638+
}
639+
} catch (RuntimeException e) {
640+
if (first == null) {
641+
first = e;
642+
}
643+
}
644+
}
645+
}
646+
647+
// Rethrow the first failure after attempting all closes
648+
if (first != null) {
649+
throw first;
650+
}
609651
}
610652
}
611653

0 commit comments

Comments
 (0)