From 7f9a93e9d9d5590cb4bfa8fcbac3cb53a8ff9908 Mon Sep 17 00:00:00 2001 From: ohad Date: Tue, 4 Nov 2025 13:13:17 -0500 Subject: [PATCH 1/5] Implement a refresh getter for the DirectoryReader to be able to see latest IndexWriter changes --- .../record/lucene/LuceneIndexMaintainer.java | 3 +- .../lucene/directory/FDBDirectoryManager.java | 6 +- .../lucene/directory/FDBDirectoryWrapper.java | 20 ++++++- .../lucene/LuceneIndexMaintenanceTest.java | 55 +++++++++++++++++++ 4 files changed, 81 insertions(+), 3 deletions(-) diff --git a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintainer.java b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintainer.java index 070be07355..e34c5bc5a1 100644 --- a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintainer.java +++ b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintainer.java @@ -300,7 +300,8 @@ int deleteDocument(Tuple groupingKey, Integer partitionId, Tuple primaryKey) thr @Nullable final LucenePrimaryKeySegmentIndex segmentIndex = directoryManager.getDirectory(groupingKey, partitionId).getPrimaryKeySegmentIndex(); if (segmentIndex != null) { - final DirectoryReader directoryReader = directoryManager.getWriterReader(groupingKey, partitionId); + // Use refresh to ensure the reader can see the latest deletes + final DirectoryReader directoryReader = directoryManager.getWriterReader(groupingKey, partitionId, true); final LucenePrimaryKeySegmentIndex.DocumentIndexEntry documentIndexEntry = segmentIndex.findDocument(directoryReader, primaryKey); if (documentIndexEntry != null) { state.context.ensureActive().clear(documentIndexEntry.entryKey); // TODO: Only if valid? diff --git a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/FDBDirectoryManager.java b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/FDBDirectoryManager.java index 04a8356de7..0118711b5b 100644 --- a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/FDBDirectoryManager.java +++ b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/FDBDirectoryManager.java @@ -368,7 +368,11 @@ public IndexWriter getIndexWriter(@Nullable Tuple groupingKey, @Nullable Integer } public DirectoryReader getWriterReader(@Nullable Tuple groupingKey, @Nullable Integer partititonId) throws IOException { - return getDirectoryWrapper(groupingKey, partititonId).getWriterReader(); + return getWriterReader(groupingKey, partititonId, false); + } + + public DirectoryReader getWriterReader(@Nullable Tuple groupingKey, @Nullable Integer partititonId, boolean refresh) throws IOException { + return getDirectoryWrapper(groupingKey, partititonId).getWriterReader(refresh); } @Nonnull diff --git a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/FDBDirectoryWrapper.java b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/FDBDirectoryWrapper.java index 23306cdeed..5f0b633c6b 100644 --- a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/FDBDirectoryWrapper.java +++ b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/FDBDirectoryWrapper.java @@ -91,7 +91,7 @@ public class FDBDirectoryWrapper implements AutoCloseable { * predominately used by the {@link com.apple.foundationdb.record.lucene.LucenePrimaryKeySegmentIndex} to find the * segments associated with documents being deleted. */ - private final LazyCloseable writerReader; + private LazyCloseable writerReader; FDBDirectoryWrapper(@Nonnull final IndexMaintainerState state, @@ -205,6 +205,24 @@ public IndexReader getReader() throws IOException { */ @SuppressWarnings("PMD.CloseResource") public DirectoryReader getWriterReader() throws IOException { + return getWriterReader(false); + } + + /** + * Get a {@link DirectoryReader} wrapped around the {@link #getWriter()} to be able to get segments associated with + * documents. This resource will be closed when {@code this} is closed, and should not be closed by callers + * @param refresh if TRUE will try to refresh the reader data from the writer + */ + @SuppressWarnings("PMD.CloseResource") + public DirectoryReader getWriterReader(boolean refresh) throws IOException { + if (refresh) { + final DirectoryReader newReader = DirectoryReader.openIfChanged(writerReader.get()); + if (newReader != null) { + // previous reader instantiated but then writer changed + writerReader.close(); + writerReader = LazyCloseable.supply(() -> newReader); + } + } return writerReader.get(); } diff --git a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java index f7a8a651c6..75883e5d95 100644 --- a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java +++ b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java @@ -55,6 +55,7 @@ import com.apple.foundationdb.subspace.Subspace; import com.apple.foundationdb.tuple.Tuple; import com.apple.foundationdb.util.LoggableKeysAndValues; +import com.apple.test.ParameterizedTestUtils; import com.apple.test.RandomizedTestUtils; import com.apple.test.SuperSlow; import com.apple.test.Tags; @@ -900,6 +901,60 @@ void sampledDelete(boolean isSynthetic, boolean isGrouped, long seed) throws IOE assertThat(partitionCounts, Matchers.contains(5, 3, 4))); } + static Stream multiUpdate() { + return ParameterizedTestUtils.cartesianProduct( + ParameterizedTestUtils.booleans("isSynthetic"), + ParameterizedTestUtils.booleans("isGrouped"), + Stream.of(0, 10), + Stream.of(0, 1, 4), + Stream.of(5365)); + } + + @ParameterizedTest + @MethodSource("multiUpdate") + void multipleUpdatesInTransaction(boolean isSynthetic, boolean isGrouped, int highWatermark, int updateCount, long seed) throws IOException { + final int documentCount = 25; + final LuceneIndexTestDataModel dataModel = new LuceneIndexTestDataModel.Builder(seed, this::getStoreBuilder, pathManager) + .setIsGrouped(isGrouped) + .setIsSynthetic(isSynthetic) + .setPrimaryKeySegmentIndexEnabled(true) + .setPartitionHighWatermark(highWatermark) + .build(); + + final RecordLayerPropertyStorage contextProps = RecordLayerPropertyStorage.newBuilder() + .addProp(LuceneRecordContextProperties.LUCENE_REPARTITION_DOCUMENT_COUNT, 2) + .addProp(LuceneRecordContextProperties.LUCENE_MERGE_SEGMENTS_PER_TIER, (double)dataModel.nextInt(10) + 2) // it must be at least 2.0 + .build(); + + // save records + try (FDBRecordContext context = openContext(contextProps)) { + final FDBRecordStore store = dataModel.createOrOpenRecordStore(context); + dataModel.saveRecordsToAllGroups(documentCount, context); + commit(context); + } + + try (FDBRecordContext context = openContext(contextProps)) { + final FDBRecordStore store = dataModel.createOrOpenRecordStore(context); + dataModel.sampleRecordsUnderTest().forEach(rec -> { + for (int i = 0; i < updateCount; i++) { + // update some documents multiple times + rec.updateOtherValue(store).join(); + } + }); + commit(context); + } + explicitMergeIndex(dataModel.index, contextProps, dataModel.schemaSetup); + + if (highWatermark > 0) { + // ensure each partition has all records + dataModel.getPartitionCounts(() -> openContext(contextProps)).forEach((groupingKey, partitionCounts) -> + assertThat(partitionCounts.stream().mapToInt(i -> i).sum(), Matchers.equalTo(documentCount))); + } + + explicitMergeIndex(dataModel.index, contextProps, dataModel.schemaSetup); + dataModel.validate(() -> openContext(contextProps)); + } + static Stream changingEncryptionKey() { return Stream.concat(Stream.of(Arguments.of(true, true, 288513), Arguments.of(false, false, 792025)), From ee7eea9bcd896c2ab899ae97e95b82582809545a Mon Sep 17 00:00:00 2001 From: ohad Date: Fri, 7 Nov 2025 09:33:35 -0500 Subject: [PATCH 2/5] Keep old readers and close all together, only refresh reader if fail to find doc. --- .../record/lucene/LuceneIndexMaintainer.java | 28 +++++++++++++++++-- .../lucene/directory/FDBDirectoryWrapper.java | 19 +++++++++++-- .../lucene/LuceneIndexMaintenanceTest.java | 2 +- 3 files changed, 43 insertions(+), 6 deletions(-) diff --git a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintainer.java b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintainer.java index e34c5bc5a1..c91cd9f130 100644 --- a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintainer.java +++ b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintainer.java @@ -300,9 +300,7 @@ int deleteDocument(Tuple groupingKey, Integer partitionId, Tuple primaryKey) thr @Nullable final LucenePrimaryKeySegmentIndex segmentIndex = directoryManager.getDirectory(groupingKey, partitionId).getPrimaryKeySegmentIndex(); if (segmentIndex != null) { - // Use refresh to ensure the reader can see the latest deletes - final DirectoryReader directoryReader = directoryManager.getWriterReader(groupingKey, partitionId, true); - final LucenePrimaryKeySegmentIndex.DocumentIndexEntry documentIndexEntry = segmentIndex.findDocument(directoryReader, primaryKey); + final LucenePrimaryKeySegmentIndex.DocumentIndexEntry documentIndexEntry = getDocumentIndexEntryWithRetry(segmentIndex, groupingKey, partitionId, primaryKey); if (documentIndexEntry != null) { state.context.ensureActive().clear(documentIndexEntry.entryKey); // TODO: Only if valid? long valid = indexWriter.tryDeleteDocument(documentIndexEntry.indexReader, documentIndexEntry.docId); @@ -353,6 +351,30 @@ int deleteDocument(Tuple groupingKey, Integer partitionId, Tuple primaryKey) thr return 0; } + /** + * Try to find the document for the given record in the segment index. + * This method would first try to find the document using teh existing reader. If it can't, it will refresh the reader + * and try again. The issue is that when the documents have been updated in memory (e.g. in the same transaction), the + * writer may cache the changes in NRT and the reader (created earlier) can't see them. Refreshing the reader from the + * writer can alleviate this. If the index can't find the document with the refresh reader, null is returned. + * @param groupingKey + * @param partitionId + * @param primaryKey + * @return + * @throws IOException + */ + private LucenePrimaryKeySegmentIndex.DocumentIndexEntry getDocumentIndexEntryWithRetry(LucenePrimaryKeySegmentIndex segmentIndex, final Tuple groupingKey, final Integer partitionId, final Tuple primaryKey) throws IOException { + // Use refresh to ensure the reader can see the latest deletes + DirectoryReader directoryReader = directoryManager.getWriterReader(groupingKey, partitionId, false); + LucenePrimaryKeySegmentIndex.DocumentIndexEntry documentIndexEntry = segmentIndex.findDocument(directoryReader, primaryKey); + if (documentIndexEntry != null) { + return documentIndexEntry; + } else { + directoryReader = directoryManager.getWriterReader(groupingKey, partitionId, true); + return segmentIndex.findDocument(directoryReader, primaryKey); + } + } + @Override public CompletableFuture mergeIndex() { return rebalancePartitions() diff --git a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/FDBDirectoryWrapper.java b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/FDBDirectoryWrapper.java index 5f0b633c6b..1000b00343 100644 --- a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/FDBDirectoryWrapper.java +++ b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/FDBDirectoryWrapper.java @@ -30,6 +30,8 @@ import com.apple.foundationdb.record.provider.foundationdb.IndexMaintainerState; import com.apple.foundationdb.subspace.Subspace; import com.apple.foundationdb.tuple.Tuple; +import com.apple.foundationdb.util.CloseException; +import com.apple.foundationdb.util.CloseableUtils; import com.google.common.annotations.VisibleForTesting; import org.apache.lucene.codecs.Codec; import org.apache.lucene.index.ConcurrentMergeScheduler; @@ -50,6 +52,8 @@ import javax.annotation.Nullable; import java.io.IOException; import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ThreadLocalRandom; /** @@ -92,7 +96,12 @@ public class FDBDirectoryWrapper implements AutoCloseable { * segments associated with documents being deleted. */ private LazyCloseable writerReader; - + /** + * WriterReaders that were replaced (through {@link #getWriterReader(boolean)} )} with a {@code refresh==true}). + * These readers should all be closed, but they may still be in use while this class is in circulation, so their + * closure is postponed until this class' {@link #close()} call. + */ + private Queue> readersToClose; FDBDirectoryWrapper(@Nonnull final IndexMaintainerState state, @Nonnull final Tuple key, @@ -109,6 +118,7 @@ public class FDBDirectoryWrapper implements AutoCloseable { this.analyzerWrapper = analyzerWrapper; writer = LazyCloseable.supply(() -> createIndexWriter(exceptionAtCreation)); writerReader = LazyCloseable.supply(() -> DirectoryReader.open(writer.get())); + readersToClose = new ConcurrentLinkedQueue<>(); } @VisibleForTesting @@ -219,7 +229,7 @@ public DirectoryReader getWriterReader(boolean refresh) throws IOException { final DirectoryReader newReader = DirectoryReader.openIfChanged(writerReader.get()); if (newReader != null) { // previous reader instantiated but then writer changed - writerReader.close(); + readersToClose.add(writerReader); writerReader = LazyCloseable.supply(() -> newReader); } } @@ -377,6 +387,11 @@ public IndexWriter getWriter() throws IOException { @SuppressWarnings("PMD.CloseResource") public synchronized void close() throws IOException { IOUtils.close(writer, writerReader, directory); + try { + CloseableUtils.closeAll(readersToClose.toArray(new LazyCloseable[0])); + } catch (CloseException e) { + throw new IOException(e); + } } public void mergeIndex() throws IOException { diff --git a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java index 75883e5d95..67f165eaae 100644 --- a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java +++ b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java @@ -913,7 +913,7 @@ static Stream multiUpdate() { @ParameterizedTest @MethodSource("multiUpdate") void multipleUpdatesInTransaction(boolean isSynthetic, boolean isGrouped, int highWatermark, int updateCount, long seed) throws IOException { - final int documentCount = 25; + final int documentCount = 15; final LuceneIndexTestDataModel dataModel = new LuceneIndexTestDataModel.Builder(seed, this::getStoreBuilder, pathManager) .setIsGrouped(isGrouped) .setIsSynthetic(isSynthetic) From 19089136daa4864073c2c5145d8bed0660add83e Mon Sep 17 00:00:00 2001 From: ohad Date: Fri, 7 Nov 2025 11:58:10 -0500 Subject: [PATCH 3/5] Cleanup --- .../record/lucene/LuceneIndexMaintainer.java | 14 ++++++++------ .../record/lucene/LuceneIndexMaintenanceTest.java | 1 - 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintainer.java b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintainer.java index c91cd9f130..dbf76dc47c 100644 --- a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintainer.java +++ b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintainer.java @@ -357,19 +357,21 @@ int deleteDocument(Tuple groupingKey, Integer partitionId, Tuple primaryKey) thr * and try again. The issue is that when the documents have been updated in memory (e.g. in the same transaction), the * writer may cache the changes in NRT and the reader (created earlier) can't see them. Refreshing the reader from the * writer can alleviate this. If the index can't find the document with the refresh reader, null is returned. - * @param groupingKey - * @param partitionId - * @param primaryKey - * @return - * @throws IOException + * Note that the refresh of the reader will do so at the {@link com.apple.foundationdb.record.lucene.directory.FDBDirectoryWrapper} + * and so has impact on the entire directory. + * @param groupingKey the grouping key for the index + * @param partitionId the partition ID for the index + * @param primaryKey the record primary key to look for + * @return segment index entry if the record was found, null if none + * @throws IOException in case of error */ private LucenePrimaryKeySegmentIndex.DocumentIndexEntry getDocumentIndexEntryWithRetry(LucenePrimaryKeySegmentIndex segmentIndex, final Tuple groupingKey, final Integer partitionId, final Tuple primaryKey) throws IOException { - // Use refresh to ensure the reader can see the latest deletes DirectoryReader directoryReader = directoryManager.getWriterReader(groupingKey, partitionId, false); LucenePrimaryKeySegmentIndex.DocumentIndexEntry documentIndexEntry = segmentIndex.findDocument(directoryReader, primaryKey); if (documentIndexEntry != null) { return documentIndexEntry; } else { + // Use refresh to ensure the reader can see the latest deletes directoryReader = directoryManager.getWriterReader(groupingKey, partitionId, true); return segmentIndex.findDocument(directoryReader, primaryKey); } diff --git a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java index 67f165eaae..a507f250b4 100644 --- a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java +++ b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java @@ -928,7 +928,6 @@ void multipleUpdatesInTransaction(boolean isSynthetic, boolean isGrouped, int hi // save records try (FDBRecordContext context = openContext(contextProps)) { - final FDBRecordStore store = dataModel.createOrOpenRecordStore(context); dataModel.saveRecordsToAllGroups(documentCount, context); commit(context); } From 71513582c77c7caedf9d336b7952b8e8e03823e1 Mon Sep 17 00:00:00 2001 From: ohad Date: Fri, 7 Nov 2025 12:24:29 -0500 Subject: [PATCH 4/5] Cleanup --- .../apple/foundationdb/record/lucene/LuceneIndexMaintainer.java | 1 + 1 file changed, 1 insertion(+) diff --git a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintainer.java b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintainer.java index dbf76dc47c..f0401e86ed 100644 --- a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintainer.java +++ b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintainer.java @@ -365,6 +365,7 @@ int deleteDocument(Tuple groupingKey, Integer partitionId, Tuple primaryKey) thr * @return segment index entry if the record was found, null if none * @throws IOException in case of error */ + @SuppressWarnings("PMD.CloseResource") private LucenePrimaryKeySegmentIndex.DocumentIndexEntry getDocumentIndexEntryWithRetry(LucenePrimaryKeySegmentIndex segmentIndex, final Tuple groupingKey, final Integer partitionId, final Tuple primaryKey) throws IOException { DirectoryReader directoryReader = directoryManager.getWriterReader(groupingKey, partitionId, false); LucenePrimaryKeySegmentIndex.DocumentIndexEntry documentIndexEntry = segmentIndex.findDocument(directoryReader, primaryKey); From 1a60d94a7a6c0378e8fc49f5c8d7603219d690b5 Mon Sep 17 00:00:00 2001 From: ohad Date: Fri, 7 Nov 2025 14:26:17 -0500 Subject: [PATCH 5/5] Cleanup --- .../record/lucene/directory/FDBDirectoryWrapper.java | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/FDBDirectoryWrapper.java b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/FDBDirectoryWrapper.java index 1000b00343..0381ee0059 100644 --- a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/FDBDirectoryWrapper.java +++ b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/FDBDirectoryWrapper.java @@ -20,6 +20,7 @@ package com.apple.foundationdb.record.lucene.directory; +import com.apple.foundationdb.annotation.API; import com.apple.foundationdb.record.lucene.LuceneAnalyzerWrapper; import com.apple.foundationdb.record.lucene.LuceneEvents; import com.apple.foundationdb.record.lucene.LuceneLoggerInfoStream; @@ -62,6 +63,7 @@ * {@link FDBDirectory} contains cached information from FDB, it is important for cache coherency that all writers * (etc.) accessing that directory go through the same wrapper object so that they share a common cache. */ +@API(API.Status.INTERNAL) public class FDBDirectoryWrapper implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(FDBDirectoryWrapper.class); @@ -209,15 +211,6 @@ public IndexReader getReader() throws IOException { } } - /** - * Get a {@link DirectoryReader} wrapped around the {@link #getWriter()} to be able to get segments associated with - * documents. This resource will be closed when {@code this} is closed, and should not be closed by callers - */ - @SuppressWarnings("PMD.CloseResource") - public DirectoryReader getWriterReader() throws IOException { - return getWriterReader(false); - } - /** * Get a {@link DirectoryReader} wrapped around the {@link #getWriter()} to be able to get segments associated with * documents. This resource will be closed when {@code this} is closed, and should not be closed by callers