Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -410,4 +410,26 @@ default RecordCursor<DataInKeySpacePath> exportAllData(@Nonnull FDBRecordContext
@Nonnull ScanProperties scanProperties) {
throw new UnsupportedOperationException("exportAllData is not supported");
}

/**
* Imports the provided data exported via {@link #exportAllData} into this {@code KeySpacePath}.
* <p>
* This will validate that any data provided in {@code dataToImport} has a path that should be in this path,
* or one of the sub-directories, if not, the future will complete exceptionally with
* {@link RecordCoreIllegalImportDataException}.
* If there is any data already existing under this path, the new data will overwrite if the keys are the same.
* This will use the logical values in the {@link DataInKeySpacePath#getPath()} and
* {@link DataInKeySpacePath#getRemainder()} to determine the key, rather
* than the raw key, meaning that this will work even if the data was exported from a different cluster.
* Note, this will not correct for any cluster-specific data, other than {@link DirectoryLayerDirectory} data;
* for example, if you have versionstamps, that data will not align on the destination.
* </p>
* @param context the transaction context in which to save the data
* @param dataToImport the data to be saved to the database
* @return a future to be completed once all data has been important.
*/
@API(API.Status.EXPERIMENTAL)
@Nonnull
CompletableFuture<Void> importData(@Nonnull FDBRecordContext context,
@Nonnull Iterable<DataInKeySpacePath> dataToImport);
}
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,43 @@ public RecordCursor<DataInKeySpacePath> exportAllData(@Nonnull FDBRecordContext
1);
}

@Nonnull
@Override
public CompletableFuture<Void> importData(@Nonnull FDBRecordContext context,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the assumption here that resource control be handled externally? Meaning, if the given stream of data is too large, someone else is going to trim it and retry? In that sense, would it make sense to provide a single import method (for a single item) and return the future, as there seems no efficiency by working on the whole collection?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the idea would be that the calling process would limit dataToImport, and decrease that limit in response to failures.
I hadn't really considered the possibility of limiting in response to the number that were imported, our existing code does have some dynamic response (e.g. ThrottlingRetryingIterator and IndexingThrottle), so it is probably a good idea to support that here. It could do something like add the number imported to the exception, but that doesn't seem great.
There is some performance advantage to doing it as an iterable, namely toTupleAsync may have a cost if it is DirectoryLayerDirectory. That should be minimal, but it could go away completely by putting it on ResolvedKeySpacePath.
It's unfortunate that would put exportAllData and importData on different classes.

I can try moving this if you think it's worthwhile at this point.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot/misread one other advantage: This imports all the data concurrently.
The set should be incredibly efficient, so the primary thing you are gaining in performance is resolving the path for each data item.
If a client wants to do imports one-at-a-time they can always provide dataToImport with just one element, and call it multiple times.

I feel like that is a good enough reason to stick with the current implementation, at least for now.
That being said, it may make sense to use a MapPipelinedCursor so that we are only resolving X at a time.

@Nonnull Iterable<DataInKeySpacePath> dataToImport) {
return toTupleAsync(context).thenCompose(targetTuple -> {
List<CompletableFuture<Void>> importFutures = new ArrayList<>();

for (DataInKeySpacePath dataItem : dataToImport) {
CompletableFuture<Void> importFuture = dataItem.getPath().toTupleAsync(context).thenCompose(itemPathTuple -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming that the directory layer implementation of Directory is creating a new entry in case the requested value is not found, right? This is done in the same transaction and is rolled back in case of failure, right? Can there be cases where multiple entries are created for the same resolved value? Do we care?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the directory layer will create any entries that don't exist, but I should ensure there are tests of that.
The DirectoryLayer creates entries in separate transactions, borrowing the read version. I believe it does this to:

  1. Isolate transaction conflict risk
  2. Once it is created it is cached

The general assumption is that anything that you're interacting with in the DirectoryLayer should almost always already exist, and most likely already be in the cache.

And the DirectoryLayer ensures that both the logical values (String) and resolved values (Long) are unique.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a test where the directory-layer entry won't exist: importDataWithDirectoryLayer is using a uuid for the string value, so when copying between clusters it won't exist on the destination.

// Validate that this data belongs under this path
if (!TupleHelpers.isPrefix(targetTuple, itemPathTuple)) {
throw new RecordCoreIllegalImportDataException(
"Data item path does not belong under target path",
"target", targetTuple,
"item", itemPathTuple);
}

// Reconstruct the key using the path and remainder
Tuple keyTuple = itemPathTuple;
if (dataItem.getRemainder() != null) {
keyTuple = keyTuple.addAll(dataItem.getRemainder());
}

// Store the data
byte[] keyBytes = keyTuple.pack();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add some fdb timer metrics for future use (imported_count)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think a timer around importFuture makes sense.

byte[] valueBytes = dataItem.getValue();
context.ensureActive().set(keyBytes, valueBytes);

return AsyncUtil.DONE;
});
importFutures.add(importFuture);
}

return AsyncUtil.whenAll(importFutures);
});
}

/**
* Returns this path properly wrapped in whatever implementation the directory the path is contained in dictates.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,4 +202,11 @@ public RecordCursor<DataInKeySpacePath> exportAllData(@Nonnull FDBRecordContext
@Nonnull ScanProperties scanProperties) {
return inner.exportAllData(context, continuation, scanProperties);
}

@Nonnull
@Override
public CompletableFuture<Void> importData(@Nonnull FDBRecordContext context,
@Nonnull Iterable<DataInKeySpacePath> dataToImport) {
return inner.importData(context, dataToImport);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* RecordCoreIllegalImportDataException.java
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2015-2025 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.apple.foundationdb.record.provider.foundationdb.keyspace;

import com.apple.foundationdb.record.RecordCoreArgumentException;

import javax.annotation.Nonnull;

/**
* Thrown if the data being imported into {@link KeySpacePath#importData} does not belong in that path.
*/
public class RecordCoreIllegalImportDataException extends RecordCoreArgumentException {
private static final long serialVersionUID = 1L;

public RecordCoreIllegalImportDataException(@Nonnull final String msg, @Nonnull final Object... keyValue) {
super(msg, keyValue);
}
}

Check warning on line 36 in fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/keyspace/RecordCoreIllegalImportDataException.java

View check run for this annotation

fdb.teamscale.io / Teamscale | Findings

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/keyspace/RecordCoreIllegalImportDataException.java#L30-L36

`RecordCoreIllegalImportDataException` has inheritance depth of 5 which is deeper than maximum of 2 https://fdb.teamscale.io/findings/details/foundationdb-fdb-record-layer?t=FORK_MR%2F3578%2FScottDugas%2Fkeyspace-import%3AHEAD&id=3C3B0BC4FA83FF44B4E2CE5FBF7DBFF5
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand All @@ -69,7 +68,7 @@ class KeySpacePathDataExportTest {
final FDBDatabaseExtension dbExtension = new FDBDatabaseExtension();

@Test
void exportAllDataFromSimplePath() throws ExecutionException, InterruptedException {
void exportAllDataFromSimplePath() {
KeySpace root = new KeySpace(
new KeySpaceDirectory("root", KeyType.STRING, UUID.randomUUID().toString())
.addSubdirectory(new KeySpaceDirectory("level1", KeyType.LONG)));
Expand All @@ -83,13 +82,14 @@ void exportAllDataFromSimplePath() throws ExecutionException, InterruptedExcepti

// Add data at different levels
for (int i = 0; i < 5; i++) {
Tuple key = basePath.add("level1", (long) i).toTuple(context);
final KeySpacePath path = basePath.add("level1", (long)i);
Tuple key = path.toTuple(context);
tr.set(key.pack(), Tuple.from("value" + i).pack());

// Add some sub-data under each key
for (int j = 0; j < 3; j++) {
Tuple subKey = key.add("sub" + j);
tr.set(subKey.pack(), Tuple.from("subvalue" + i + "_" + j).pack());
tr.set(path.toSubspace(context).pack(Tuple.from("sub" + j)),
Tuple.from("subvalue" + i + "_" + j).pack());
}
}
context.commit();
Expand All @@ -103,16 +103,19 @@ void exportAllDataFromSimplePath() throws ExecutionException, InterruptedExcepti
// Should have 5 main entries + 15 sub-entries = 20 total
assertEquals(20, allData.size());

assertThat(allData)
.allSatisfy(data ->
assertThat(data.getPath().getDirectoryName()).isEqualTo("level1"));

// Verify the data is sorted by key
for (int i = 1; i < allData.size(); i++) {
assertTrue(getKey(allData.get(i - 1), context).compareTo(getKey(allData.get(i), context)) < 0);
}
assertThat(allData.stream().map(data -> getKey(data, context)).collect(Collectors.toList()))
.isSorted();
}
}

// `toTuple` does not include the remainder, I'm not sure if that is intentional, or an oversight.
private Tuple getKey(final DataInKeySpacePath dataInKeySpacePath, final FDBRecordContext context) throws ExecutionException, InterruptedException {
final ResolvedKeySpacePath resolvedKeySpacePath = dataInKeySpacePath.getPath().toResolvedPathAsync(context).get();
private Tuple getKey(final DataInKeySpacePath dataInKeySpacePath, final FDBRecordContext context) {
final ResolvedKeySpacePath resolvedKeySpacePath = dataInKeySpacePath.getPath().toResolvedPathAsync(context).join();
if (dataInKeySpacePath.getRemainder() != null) {
return resolvedKeySpacePath.toTuple().addAll(dataInKeySpacePath.getRemainder());
} else {
Expand Down Expand Up @@ -524,9 +527,7 @@ private static void exportWithContinuations(final KeySpacePath pathToExport,
final RecordCursor<DataInKeySpacePath> cursor = pathToExport.exportAllData(context, continuation.toBytes(),
scanProperties);
final AtomicReference<RecordCursorResult<Tuple>> tupleResult = new AtomicReference<>();
final List<Tuple> batch = cursor.map(dataInPath -> {
return Tuple.fromBytes(dataInPath.getValue());
}).asList(tupleResult).join();
final List<Tuple> batch = cursor.map(dataInPath -> Tuple.fromBytes(dataInPath.getValue())).asList(tupleResult).join();
actual.add(batch);
continuation = tupleResult.get().getContinuation();
}
Expand Down Expand Up @@ -578,7 +579,7 @@ void exportAllDataThroughKeySpacePathWrapper() {
}

@Test
void exportAllDataThroughKeySpacePathWrapperResolvedPaths() {
void exportAllDataThroughKeySpacePathWrapperRemainders() {
final FDBDatabase database = dbExtension.getDatabase();
final EnvironmentKeySpace keySpace = EnvironmentKeySpace.setupSampleData(database);

Expand Down
Loading
Loading