Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,7 @@ int deleteDocument(Tuple groupingKey, Integer partitionId, Tuple primaryKey) thr
@Nullable final LucenePrimaryKeySegmentIndex segmentIndex = directoryManager.getDirectory(groupingKey, partitionId).getPrimaryKeySegmentIndex();

if (segmentIndex != null) {
final DirectoryReader directoryReader = directoryManager.getWriterReader(groupingKey, partitionId);
final LucenePrimaryKeySegmentIndex.DocumentIndexEntry documentIndexEntry = segmentIndex.findDocument(directoryReader, primaryKey);
final LucenePrimaryKeySegmentIndex.DocumentIndexEntry documentIndexEntry = getDocumentIndexEntryWithRetry(segmentIndex, groupingKey, partitionId, primaryKey);
if (documentIndexEntry != null) {
state.context.ensureActive().clear(documentIndexEntry.entryKey); // TODO: Only if valid?
long valid = indexWriter.tryDeleteDocument(documentIndexEntry.indexReader, documentIndexEntry.docId);
Expand Down Expand Up @@ -360,6 +359,33 @@ int deleteDocument(Tuple groupingKey, Integer partitionId, Tuple primaryKey) thr
return 0;
}

/**
* Try to find the document for the given record in the segment index.
* This method would first try to find the document using teh existing reader. If it can't, it will refresh the reader
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* This method would first try to find the document using teh existing reader. If it can't, it will refresh the reader
* This method would first try to find the document using the existing reader. If it can't, it will refresh the reader

* and try again. The issue is that when the documents have been updated in memory (e.g. in the same transaction), the
* writer may cache the changes in NRT and the reader (created earlier) can't see them. Refreshing the reader from the
* writer can alleviate this. If the index can't find the document with the refresh reader, null is returned.
* Note that the refresh of the reader will do so at the {@link com.apple.foundationdb.record.lucene.directory.FDBDirectoryWrapper}
* and so has impact on the entire directory.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please consider rewording.

* @param groupingKey the grouping key for the index
* @param partitionId the partition ID for the index
* @param primaryKey the record primary key to look for
* @return segment index entry if the record was found, null if none
* @throws IOException in case of error
*/
@SuppressWarnings("PMD.CloseResource")
private LucenePrimaryKeySegmentIndex.DocumentIndexEntry getDocumentIndexEntryWithRetry(LucenePrimaryKeySegmentIndex segmentIndex, final Tuple groupingKey, final Integer partitionId, final Tuple primaryKey) throws IOException {
DirectoryReader directoryReader = directoryManager.getWriterReader(groupingKey, partitionId, false);
LucenePrimaryKeySegmentIndex.DocumentIndexEntry documentIndexEntry = segmentIndex.findDocument(directoryReader, primaryKey);
if (documentIndexEntry != null) {
return documentIndexEntry;
} else {
// Use refresh to ensure the reader can see the latest deletes
directoryReader = directoryManager.getWriterReader(groupingKey, partitionId, true);
return segmentIndex.findDocument(directoryReader, primaryKey);
}
}

@Override
public CompletableFuture<Void> mergeIndex() {
return rebalancePartitions()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,11 @@ public IndexWriter getIndexWriter(@Nullable Tuple groupingKey, @Nullable Integer
}

public DirectoryReader getWriterReader(@Nullable Tuple groupingKey, @Nullable Integer partititonId) throws IOException {
return getDirectoryWrapper(groupingKey, partititonId).getWriterReader();
return getWriterReader(groupingKey, partititonId, false);
}

public DirectoryReader getWriterReader(@Nullable Tuple groupingKey, @Nullable Integer partititonId, boolean refresh) throws IOException {
return getDirectoryWrapper(groupingKey, partititonId).getWriterReader(refresh);
}

@Nonnull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

package com.apple.foundationdb.record.lucene.directory;

import com.apple.foundationdb.annotation.API;
import com.apple.foundationdb.record.lucene.LuceneAnalyzerWrapper;
import com.apple.foundationdb.record.lucene.LuceneEvents;
import com.apple.foundationdb.record.lucene.LuceneLoggerInfoStream;
Expand All @@ -30,6 +31,8 @@
import com.apple.foundationdb.record.provider.foundationdb.IndexMaintainerState;
import com.apple.foundationdb.subspace.Subspace;
import com.apple.foundationdb.tuple.Tuple;
import com.apple.foundationdb.util.CloseException;
import com.apple.foundationdb.util.CloseableUtils;
import com.google.common.annotations.VisibleForTesting;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.index.ConcurrentMergeScheduler;
Expand All @@ -50,6 +53,8 @@
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ThreadLocalRandom;

/**
Expand All @@ -58,6 +63,7 @@
* {@link FDBDirectory} contains cached information from FDB, it is important for cache coherency that all writers
* (etc.) accessing that directory go through the same wrapper object so that they share a common cache.
*/
@API(API.Status.INTERNAL)
public class FDBDirectoryWrapper implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(FDBDirectoryWrapper.class);

Expand Down Expand Up @@ -91,8 +97,13 @@ public class FDBDirectoryWrapper implements AutoCloseable {
* predominately used by the {@link com.apple.foundationdb.record.lucene.LucenePrimaryKeySegmentIndex} to find the
* segments associated with documents being deleted.
*/
private final LazyCloseable<DirectoryReader> writerReader;

private LazyCloseable<DirectoryReader> writerReader;
/**
* WriterReaders that were replaced (through {@link #getWriterReader(boolean)} )} with a {@code refresh==true}).
* These readers should all be closed, but they may still be in use while this class is in circulation, so their
* closure is postponed until this class' {@link #close()} call.
*/
private Queue<LazyCloseable<DirectoryReader>> readersToClose;

FDBDirectoryWrapper(@Nonnull final IndexMaintainerState state,
@Nonnull final Tuple key,
Expand All @@ -109,6 +120,7 @@ public class FDBDirectoryWrapper implements AutoCloseable {
this.analyzerWrapper = analyzerWrapper;
writer = LazyCloseable.supply(() -> createIndexWriter(exceptionAtCreation));
writerReader = LazyCloseable.supply(() -> DirectoryReader.open(writer.get()));
readersToClose = new ConcurrentLinkedQueue<>();
}

@VisibleForTesting
Expand Down Expand Up @@ -202,9 +214,18 @@ public IndexReader getReader() throws IOException {
/**
* Get a {@link DirectoryReader} wrapped around the {@link #getWriter()} to be able to get segments associated with
* documents. This resource will be closed when {@code this} is closed, and should not be closed by callers
* @param refresh if TRUE will try to refresh the reader data from the writer
*/
@SuppressWarnings("PMD.CloseResource")
public DirectoryReader getWriterReader() throws IOException {
public DirectoryReader getWriterReader(boolean refresh) throws IOException {
if (refresh) {
final DirectoryReader newReader = DirectoryReader.openIfChanged(writerReader.get());
if (newReader != null) {
// previous reader instantiated but then writer changed
readersToClose.add(writerReader);
writerReader = LazyCloseable.supply(() -> newReader);
}
}
return writerReader.get();
}

Expand Down Expand Up @@ -359,6 +380,11 @@ public IndexWriter getWriter() throws IOException {
@SuppressWarnings("PMD.CloseResource")
public synchronized void close() throws IOException {
IOUtils.close(writer, writerReader, directory);
try {
CloseableUtils.closeAll(readersToClose.toArray(new LazyCloseable<?>[0]));
} catch (CloseException e) {
throw new IOException(e);
}
}

public void mergeIndex() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1099,6 +1099,59 @@ void randomlyRemoveAllRecords() throws IOException {
assertThat(partitionCounts, contains(0)));
}

static Stream<Arguments> multiUpdate() {
return ParameterizedTestUtils.cartesianProduct(
ParameterizedTestUtils.booleans("isSynthetic"),
ParameterizedTestUtils.booleans("isGrouped"),
Stream.of(0, 10),
Stream.of(0, 1, 4),
Stream.of(5365));
}

@ParameterizedTest
@MethodSource("multiUpdate")
void multipleUpdatesInTransaction(boolean isSynthetic, boolean isGrouped, int highWatermark, int updateCount, long seed) throws IOException {
final int documentCount = 15;
final LuceneIndexTestDataModel dataModel = new LuceneIndexTestDataModel.Builder(seed, this::getStoreBuilder, pathManager)
.setIsGrouped(isGrouped)
.setIsSynthetic(isSynthetic)
.setPrimaryKeySegmentIndexEnabled(true)
.setPartitionHighWatermark(highWatermark)
.build();

final RecordLayerPropertyStorage contextProps = RecordLayerPropertyStorage.newBuilder()
.addProp(LuceneRecordContextProperties.LUCENE_REPARTITION_DOCUMENT_COUNT, 2)
.addProp(LuceneRecordContextProperties.LUCENE_MERGE_SEGMENTS_PER_TIER, (double)dataModel.nextInt(10) + 2) // it must be at least 2.0
.build();

// save records
try (FDBRecordContext context = openContext(contextProps)) {
dataModel.saveRecordsToAllGroups(documentCount, context);
commit(context);
}

try (FDBRecordContext context = openContext(contextProps)) {
final FDBRecordStore store = dataModel.createOrOpenRecordStore(context);
dataModel.sampleRecordsUnderTest().forEach(rec -> {
for (int i = 0; i < updateCount; i++) {
// update some documents multiple times
rec.updateOtherValue(store).join();
}
});
commit(context);
}
explicitMergeIndex(dataModel.index, contextProps, dataModel.schemaSetup);

if (highWatermark > 0) {
// ensure each partition has all records
dataModel.getPartitionCounts(() -> openContext(contextProps)).forEach((groupingKey, partitionCounts) ->
assertThat(partitionCounts.stream().mapToInt(i -> i).sum(), Matchers.equalTo(documentCount)));
}

explicitMergeIndex(dataModel.index, contextProps, dataModel.schemaSetup);
dataModel.validate(() -> openContext(contextProps));
}

static Stream<Arguments> changingEncryptionKey() {
return Stream.concat(Stream.of(Arguments.of(true, true, 288513),
Arguments.of(false, false, 792025)),
Expand Down
Loading