-
Notifications
You must be signed in to change notification settings - Fork 115
Introduce KeySpacePath.importData to import previously exported data #3578
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
178cd61
938d240
bee1806
44d609b
b810497
9132184
f63ae18
714faf3
c255044
f30cb54
287ff3f
2cac9f3
77d85a2
afc98fb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -316,6 +316,43 @@ public RecordCursor<DataInKeySpacePath> exportAllData(@Nonnull FDBRecordContext | |
| 1); | ||
| } | ||
|
|
||
| @Nonnull | ||
| @Override | ||
| public CompletableFuture<Void> importData(@Nonnull FDBRecordContext context, | ||
| @Nonnull Iterable<DataInKeySpacePath> dataToImport) { | ||
| return toTupleAsync(context).thenCompose(targetTuple -> { | ||
| List<CompletableFuture<Void>> importFutures = new ArrayList<>(); | ||
|
|
||
| for (DataInKeySpacePath dataItem : dataToImport) { | ||
| CompletableFuture<Void> importFuture = dataItem.getPath().toTupleAsync(context).thenCompose(itemPathTuple -> { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm assuming that the directory layer implementation of
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, the directory layer will create any entries that don't exist, but I should ensure there are tests of that.
The general assumption is that anything that you're interacting with in the DirectoryLayer should almost always already exist, and most likely already be in the cache. And the DirectoryLayer ensures that both the logical values (
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is a test where the directory-layer entry won't exist: |
||
| // Validate that this data belongs under this path | ||
| if (!TupleHelpers.isPrefix(targetTuple, itemPathTuple)) { | ||
| throw new RecordCoreIllegalImportDataException( | ||
| "Data item path does not belong under target path", | ||
| "target", targetTuple, | ||
| "item", itemPathTuple); | ||
| } | ||
|
|
||
| // Reconstruct the key using the path and remainder | ||
| Tuple keyTuple = itemPathTuple; | ||
| if (dataItem.getRemainder() != null) { | ||
| keyTuple = keyTuple.addAll(dataItem.getRemainder()); | ||
| } | ||
|
|
||
| // Store the data | ||
| byte[] keyBytes = keyTuple.pack(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we add some fdb timer metrics for future use (imported_count)?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I think a timer around |
||
| byte[] valueBytes = dataItem.getValue(); | ||
| context.ensureActive().set(keyBytes, valueBytes); | ||
|
|
||
| return AsyncUtil.DONE; | ||
| }); | ||
| importFutures.add(importFuture); | ||
| } | ||
|
|
||
| return AsyncUtil.whenAll(importFutures); | ||
| }); | ||
| } | ||
|
|
||
| /** | ||
| * Returns this path properly wrapped in whatever implementation the directory the path is contained in dictates. | ||
| */ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,36 @@ | ||
| /* | ||
| * RecordCoreIllegalImportDataException.java | ||
| * | ||
| * This source file is part of the FoundationDB open source project | ||
| * | ||
| * Copyright 2015-2025 Apple Inc. and the FoundationDB project authors | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package com.apple.foundationdb.record.provider.foundationdb.keyspace; | ||
|
|
||
| import com.apple.foundationdb.record.RecordCoreArgumentException; | ||
|
|
||
| import javax.annotation.Nonnull; | ||
|
|
||
| /** | ||
| * Thrown if the data being imported into {@link KeySpacePath#importData} does not belong in that path. | ||
| */ | ||
| public class RecordCoreIllegalImportDataException extends RecordCoreArgumentException { | ||
| private static final long serialVersionUID = 1L; | ||
|
|
||
| public RecordCoreIllegalImportDataException(@Nonnull final String msg, @Nonnull final Object... keyValue) { | ||
| super(msg, keyValue); | ||
| } | ||
| } | ||
|
Check warning on line 36 in fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/keyspace/RecordCoreIllegalImportDataException.java
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the assumption here that resource control be handled externally? Meaning, if the given stream of data is too large, someone else is going to trim it and retry? In that sense, would it make sense to provide a single import method (for a single item) and return the future, as there seems no efficiency by working on the whole collection?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the idea would be that the calling process would limit
dataToImport, and decrease that limit in response to failures.I hadn't really considered the possibility of limiting in response to the number that were imported, our existing code does have some dynamic response (e.g.
ThrottlingRetryingIteratorandIndexingThrottle), so it is probably a good idea to support that here. It could do something like add the number imported to the exception, but that doesn't seem great.There is some performance advantage to doing it as an iterable, namely
toTupleAsyncmay have a cost if it isDirectoryLayerDirectory. That should be minimal, but it could go away completely by putting it onResolvedKeySpacePath.It's unfortunate that would put
exportAllDataandimportDataon different classes.I can try moving this if you think it's worthwhile at this point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I forgot/misread one other advantage: This imports all the data concurrently.
The set should be incredibly efficient, so the primary thing you are gaining in performance is resolving the path for each data item.
If a client wants to do imports one-at-a-time they can always provide
dataToImportwith just one element, and call it multiple times.I feel like that is a good enough reason to stick with the current implementation, at least for now.
That being said, it may make sense to use a
MapPipelinedCursorso that we are only resolving X at a time.