Skip to content

Commit 1584f34

Browse files
authored
Use the LuceneSerializer to encode stored fields and field infos, too. (#3669)
Since existing fields will be straight protobuf without the serializer's prefix byte, recognize that, too.
1 parent 190700a commit 1584f34

File tree

9 files changed

+195
-30
lines changed

9 files changed

+195
-30
lines changed

fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneLogMessageKeys.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public enum LuceneLogMessageKeys {
4747
ENCODED_DATA_SIZE,
4848
ENCRYPTED_EVENTUALLY,
4949
ENCRYPTION_SUPPOSED,
50+
FIELD_PROTOBUF_ENCODED,
5051
FILE_ACTUAL_TOTAL_SIZE,
5152
FILE_COUNT,
5253
FILE_ID,

fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneRecordContextProperties.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@ public final class LuceneRecordContextProperties {
4949

5050
public static final RecordLayerPropertyKey<SerializationKeyManager> LUCENE_INDEX_KEY_MANAGER = new RecordLayerPropertyKey<>("com.apple.foundationdb.record.lucene.keyManager", null, SerializationKeyManager.class);
5151

52+
/**
53+
* Whether {@code StoredField} and {@code FieldInfo} are also encoded, allowing compression and encryption.
54+
*/
55+
public static final RecordLayerPropertyKey<Boolean> LUCENE_FIELD_PROTOBUF_PREFIX_ENABLED = RecordLayerPropertyKey.booleanPropertyKey("com.apple.foundationdb.record.lucene.fieldProtobufPrefixEnabled", false);
56+
5257
/**
5358
* An {@link ExecutorService} to use for parallel execution in {@link LuceneRecordCursor}.
5459
*/

fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/codec/LuceneOptimizedStoredFieldsReader.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
package com.apple.foundationdb.record.lucene.codec;
2222

23-
import com.apple.foundationdb.KeyValue;
2423
import com.apple.foundationdb.annotation.SpotBugsSuppressWarnings;
2524
import com.apple.foundationdb.record.RecordCoreException;
2625
import com.apple.foundationdb.record.lucene.LuceneExceptions;
@@ -65,10 +64,10 @@ public LuceneOptimizedStoredFieldsReader(final FDBDirectory directory, final Seg
6564

6665
public static List<byte[]> getPrimaryKeys(final String segmentName, final FDBDirectory directory) throws IOException {
6766
try {
68-
final List<KeyValue> rawStoredFields = directory.readAllStoredFields(segmentName);
67+
final List<byte[]> rawStoredFields = directory.readAllStoredFields(segmentName);
6968
List<byte[]> primaryKeys = new ArrayList<>();
70-
for (final KeyValue rawStoredField : rawStoredFields) {
71-
final var storedFields = LuceneStoredFieldsProto.LuceneStoredFields.parseFrom(rawStoredField.getValue());
69+
for (final byte[] rawStoredField : rawStoredFields) {
70+
final var storedFields = LuceneStoredFieldsProto.LuceneStoredFields.parseFrom(rawStoredField);
7271
primaryKeys.add(storedFields.getPrimaryKey().toByteArray());
7372
}
7473
return primaryKeys;

fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/FDBDirectory.java

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@
9292
import java.util.concurrent.atomic.AtomicLong;
9393
import java.util.concurrent.atomic.AtomicReference;
9494
import java.util.function.Supplier;
95+
import java.util.stream.Collectors;
9596
import java.util.stream.Stream;
9697
import java.util.zip.CRC32;
9798

@@ -229,7 +230,8 @@ private FDBDirectory(@Nonnull Subspace subspace, @Nullable Map<String, String> i
229230
this.fileSequenceCounter = new AtomicLong(-1);
230231
this.serializer = new LuceneSerializer(Objects.requireNonNullElse(agilityContext.getPropertyValue(LuceneRecordContextProperties.LUCENE_INDEX_COMPRESSION_ENABLED), false),
231232
Objects.requireNonNullElse(agilityContext.getPropertyValue(LuceneRecordContextProperties.LUCENE_INDEX_ENCRYPTION_ENABLED), false),
232-
agilityContext.getPropertyValue(LuceneRecordContextProperties.LUCENE_INDEX_KEY_MANAGER));
233+
agilityContext.getPropertyValue(LuceneRecordContextProperties.LUCENE_INDEX_KEY_MANAGER),
234+
Objects.requireNonNullElse(agilityContext.getPropertyValue(LuceneRecordContextProperties.LUCENE_FIELD_PROTOBUF_PREFIX_ENABLED), false));
233235
this.fileReferenceMapSupplier = Suppliers.memoize(this::loadFileReferenceCacheForMemoization);
234236
this.sharedCacheManager = sharedCacheManager;
235237
this.sharedCacheKey = sharedCacheKey;
@@ -338,11 +340,12 @@ public void setFieldInfoId(final String filename, final long id, final ByteStrin
338340
writeFDBLuceneFileReference(filename, reference);
339341
}
340342

341-
void writeFieldInfos(long id, byte[] value) {
343+
void writeFieldInfos(long id, byte[] rawBytes) {
342344
if (id == 0) {
343345
throw new RecordCoreArgumentException("FieldInfo id should never be 0");
344346
}
345347
byte[] key = fieldInfosSubspace.pack(id);
348+
byte[] value = serializer.encodeFieldProtobuf(rawBytes);
346349
agilityContext.recordSize(LuceneEvents.SizeEvents.LUCENE_WRITE, key.length + value.length);
347350
if (LOGGER.isTraceEnabled()) {
348351
LOGGER.trace(getLogMessage("Write lucene stored field infos data",
@@ -357,7 +360,9 @@ Stream<NonnullPair<Long, byte[]>> getAllFieldInfosStream() {
357360
LuceneEvents.Waits.WAIT_LUCENE_READ_FIELD_INFOS,
358361
agilityContext.apply(aContext -> aContext.ensureActive().getRange(fieldInfosSubspace.range()).asList()))
359362
.stream()
360-
.map(keyValue -> NonnullPair.of(fieldInfosSubspace.unpack(keyValue.getKey()).getLong(0), keyValue.getValue()));
363+
.map(keyValue -> NonnullPair.of(
364+
fieldInfosSubspace.unpack(keyValue.getKey()).getLong(0),
365+
serializer.decodeFieldProtobuf(keyValue.getValue())));
361366
}
362367

363368
public CompletableFuture<Integer> getFieldInfosCount() {
@@ -444,10 +449,11 @@ public int writeData(final long id, final int block, @Nonnull final byte[] value
444449
* Write stored fields document to the DB.
445450
* @param segmentName the segment name writing to
446451
* @param docID the document ID to write
447-
* @param value the bytes value of the stored fields
452+
* @param rawBytes the bytes value of the stored fields
448453
*/
449-
public void writeStoredFields(@Nonnull String segmentName, int docID, @Nonnull final byte[] value) {
454+
public void writeStoredFields(@Nonnull String segmentName, int docID, @Nonnull final byte[] rawBytes) {
450455
byte[] key = storedFieldsSubspace.pack(Tuple.from(segmentName, docID));
456+
byte[] value = serializer.encodeFieldProtobuf(rawBytes);
451457
agilityContext.recordSize(LuceneEvents.SizeEvents.LUCENE_WRITE_STORED_FIELDS, key.length + value.length);
452458
if (LOGGER.isTraceEnabled()) {
453459
LOGGER.trace(getLogMessage("Write lucene stored fields data",
@@ -542,7 +548,7 @@ private CompletableFuture<byte[]> readData(long id, int block) {
542548
}
543549

544550
@Nonnull
545-
public byte[] readStoredFields(String segmentName, int docId) throws IOException {
551+
public byte[] readStoredFields(String segmentName, int docId) {
546552
final byte[] key = storedFieldsSubspace.pack(Tuple.from(segmentName, docId));
547553
final byte[] rawBytes = asyncToSync(LuceneEvents.Waits.WAIT_LUCENE_GET_STORED_FIELDS,
548554
agilityContext.instrument(LuceneEvents.Events.LUCENE_READ_STORED_FIELDS,
@@ -553,11 +559,11 @@ public byte[] readStoredFields(String segmentName, int docId) throws IOException
553559
.addLogInfo(LuceneLogMessageKeys.DOC_ID, docId)
554560
.addLogInfo(LogMessageKeys.KEY, ByteArrayUtil2.loggable(key));
555561
}
556-
return rawBytes;
562+
return Objects.requireNonNull(serializer.decodeFieldProtobuf(rawBytes));
557563
}
558564

559565
@Nonnull
560-
public List<KeyValue> readAllStoredFields(String segmentName) {
566+
public List<byte[]> readAllStoredFields(String segmentName) {
561567
final Range range = storedFieldsSubspace.range(Tuple.from(segmentName));
562568
final List<KeyValue> list = asyncToSync(LuceneEvents.Waits.WAIT_LUCENE_GET_ALL_STORED_FIELDS,
563569
agilityContext.getRange(range.begin, range.end));
@@ -567,7 +573,7 @@ public List<KeyValue> readAllStoredFields(String segmentName) {
567573
.addLogInfo(LogMessageKeys.RANGE_START, ByteArrayUtil2.loggable(range.begin))
568574
.addLogInfo(LogMessageKeys.RANGE_END, ByteArrayUtil2.loggable(range.end));
569575
}
570-
return list;
576+
return list.stream().map(KeyValue::getValue).map(serializer::decodeFieldProtobuf).collect(Collectors.toList());
571577
}
572578

573579
/**
@@ -1055,7 +1061,8 @@ private KeyValueLogMessage getKeyValueLogMessage(final @Nonnull String staticMsg
10551061
return KeyValueLogMessage.build(staticMsg, keysAndValues)
10561062
.addKeyAndValue(LogMessageKeys.SUBSPACE, subspace)
10571063
.addKeyAndValue(LuceneLogMessageKeys.COMPRESSION_SUPPOSED, serializer.isCompressionEnabled())
1058-
.addKeyAndValue(LuceneLogMessageKeys.ENCRYPTION_SUPPOSED, serializer.isEncryptionEnabled());
1064+
.addKeyAndValue(LuceneLogMessageKeys.ENCRYPTION_SUPPOSED, serializer.isEncryptionEnabled())
1065+
.addKeyAndValue(LuceneLogMessageKeys.FIELD_PROTOBUF_ENCODED, serializer.isFieldProtobufPrefixEnabled());
10591066
}
10601067

10611068
/**

fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/LuceneSerializer.java

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,15 @@ public class LuceneSerializer {
5757
private final boolean encryptionEnabled;
5858
@Nullable
5959
private final SerializationKeyManager keyManager;
60+
private final boolean fieldProtobufPrefixEnabled;
6061

6162
public LuceneSerializer(boolean compressionEnabled,
62-
boolean encryptionEnabled, @Nullable SerializationKeyManager keyManager) {
63+
boolean encryptionEnabled, @Nullable SerializationKeyManager keyManager,
64+
boolean fieldProtobufPrefixEnabled) {
6365
this.compressionEnabled = compressionEnabled;
6466
this.encryptionEnabled = encryptionEnabled;
6567
this.keyManager = keyManager;
68+
this.fieldProtobufPrefixEnabled = fieldProtobufPrefixEnabled;
6669
}
6770

6871
public boolean isCompressionEnabled() {
@@ -73,6 +76,10 @@ public boolean isEncryptionEnabled() {
7376
return encryptionEnabled;
7477
}
7578

79+
public boolean isFieldProtobufPrefixEnabled() {
80+
return fieldProtobufPrefixEnabled;
81+
}
82+
7683
@Nullable
7784
public SerializationKeyManager getKeyManager() {
7885
return keyManager;
@@ -131,7 +138,7 @@ public byte[] decode(@Nullable byte[] data) {
131138
return null;
132139
}
133140

134-
if (data.length < 2) {
141+
if (data.length < 1) {
135142
throw new RecordCoreException("Invalid data")
136143
.addLogInfo(LuceneLogMessageKeys.DATA_VALUE, data);
137144
}
@@ -270,4 +277,46 @@ private void decryptIfNeeded(@Nonnull CompressedAndEncryptedSerializerState stat
270277
}
271278
encodedDataInput.reset(decrypted);
272279
}
280+
281+
@Nullable
282+
public byte[] encodeFieldProtobuf(@Nullable byte[] bytes) {
283+
if (fieldProtobufPrefixEnabled) {
284+
return encode(bytes);
285+
} else {
286+
return bytes;
287+
}
288+
}
289+
290+
@Nullable
291+
public byte[] decodeFieldProtobuf(@Nullable byte[] bytes) {
292+
if (bytes == null) {
293+
return null;
294+
}
295+
296+
if (isProtobufMessageWithoutPrefix(bytes)) {
297+
return bytes;
298+
}
299+
return decode(bytes);
300+
}
301+
302+
// This can be removed once it is guaranteed that all indexes are using the encoded format.
303+
// Only works for Protobuf messages all of whose fields are themselves length-delimited,
304+
// such as LuceneStoredFields (StoredField or bytes) or FieldInfos (FieldInfo).
305+
private boolean isProtobufMessageWithoutPrefix(@Nonnull byte[] bytes) {
306+
if (bytes.length < 1) {
307+
return true; // No room for prefix; empty message.
308+
}
309+
final int byte0 = bytes[0];
310+
final int fieldTypeOrPrefixFlags = byte0 & 7;
311+
// Either Protobuf LEN or ENCODING_COMPRESSED.
312+
if (fieldTypeOrPrefixFlags != 2) {
313+
return false;
314+
}
315+
final int fieldNumberOrKeyNumber = byte0 >> 3;
316+
// ENCODING_COMPRESSED will never have a key; 0 is not a valid field number.
317+
if (fieldNumberOrKeyNumber == 0) {
318+
return false;
319+
}
320+
return true;
321+
}
273322
}

fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/FDBLuceneQueryTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,8 @@ protected RecordLayerPropertyStorage.Builder addDefaultProps(final RecordLayerPr
235235
return super.addDefaultProps(props)
236236
.addProp(LuceneRecordContextProperties.LUCENE_EXECUTOR_SERVICE, (Supplier<ExecutorService>)() -> executorService)
237237
.addProp(LuceneRecordContextProperties.LUCENE_INDEX_ENCRYPTION_ENABLED, (Supplier<Boolean>)() -> keyManager != null)
238-
.addProp(LuceneRecordContextProperties.LUCENE_INDEX_KEY_MANAGER, (Supplier<SerializationKeyManager>)() -> keyManager);
238+
.addProp(LuceneRecordContextProperties.LUCENE_INDEX_KEY_MANAGER, (Supplier<SerializationKeyManager>)() -> keyManager)
239+
.addProp(LuceneRecordContextProperties.LUCENE_FIELD_PROTOBUF_PREFIX_ENABLED, (Supplier<Boolean>)() -> keyManager != null);
239240
}
240241

241242
@SuppressWarnings("SameParameterValue")

fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,8 @@ void manyDocuments(boolean isGrouped,
347347
.addProp(LuceneRecordContextProperties.LUCENE_MAX_DOCUMENTS_TO_MOVE_DURING_REPARTITIONING, dataModel.nextInt(1000) + repartitionCount)
348348
.addProp(LuceneRecordContextProperties.LUCENE_MERGE_SEGMENTS_PER_TIER, (double)dataModel.nextInt(10) + 2) // it must be at least 2.0
349349
.addProp(LuceneRecordContextProperties.LUCENE_INDEX_COMPRESSION_ENABLED, compressed)
350-
.addProp(LuceneRecordContextProperties.LUCENE_INDEX_ENCRYPTION_ENABLED, encrypted);
350+
.addProp(LuceneRecordContextProperties.LUCENE_INDEX_ENCRYPTION_ENABLED, encrypted)
351+
.addProp(LuceneRecordContextProperties.LUCENE_FIELD_PROTOBUF_PREFIX_ENABLED, encrypted);
351352
if (encrypted) {
352353
contextPropsBuilder.addProp(LuceneRecordContextProperties.LUCENE_INDEX_KEY_MANAGER, new RollingTestKeyManager(seed));
353354
}
@@ -917,7 +918,8 @@ void changingEncryptionKey(boolean isSynthetic, boolean isGrouped, long seed) th
917918

918919
final RecordLayerPropertyStorage.Builder contextPropsBuilder = RecordLayerPropertyStorage.newBuilder()
919920
.addProp(LuceneRecordContextProperties.LUCENE_INDEX_COMPRESSION_ENABLED, true)
920-
.addProp(LuceneRecordContextProperties.LUCENE_INDEX_ENCRYPTION_ENABLED, true);
921+
.addProp(LuceneRecordContextProperties.LUCENE_INDEX_ENCRYPTION_ENABLED, true)
922+
.addProp(LuceneRecordContextProperties.LUCENE_FIELD_PROTOBUF_PREFIX_ENABLED, true);
921923
final KeyGenerator keyGen = KeyGenerator.getInstance("AES");
922924
keyGen.init(128);
923925
final SecretKey key1 = keyGen.generateKey();

0 commit comments

Comments
 (0)