|
25 | 25 | import com.apple.foundationdb.ReadTransaction; |
26 | 26 | import com.apple.foundationdb.StreamingMode; |
27 | 27 | import com.apple.foundationdb.Transaction; |
| 28 | +import com.apple.foundationdb.async.AsyncIterable; |
28 | 29 | import com.apple.foundationdb.async.AsyncUtil; |
29 | 30 | import com.apple.foundationdb.subspace.Subspace; |
| 31 | +import com.apple.foundationdb.tuple.ByteArrayUtil; |
30 | 32 | import com.apple.foundationdb.tuple.Tuple; |
31 | 33 | import com.christianheina.langx.half4j.Half; |
32 | 34 | import com.google.common.collect.ImmutableList; |
33 | 35 |
|
34 | 36 | import javax.annotation.Nonnull; |
| 37 | +import javax.annotation.Nullable; |
| 38 | +import java.util.List; |
35 | 39 | import java.util.concurrent.CompletableFuture; |
36 | 40 |
|
37 | 41 | /** |
@@ -66,27 +70,35 @@ protected CompletableFuture<Node<NodeReferenceWithVector>> fetchNodeInternal(@No |
66 | 70 |
|
67 | 71 | return AsyncUtil.collect(readTransaction.getRange(Range.startsWith(rangeKey), |
68 | 72 | ReadTransaction.ROW_LIMIT_UNLIMITED, false, StreamingMode.WANT_ALL), readTransaction.getExecutor()) |
69 | | - .thenApply(keyValues -> { |
70 | | - final OnReadListener onReadListener = getOnReadListener(); |
71 | | - |
72 | | - final ImmutableList.Builder<NodeReferenceWithVector> nodeReferencesWithVectorBuilder = ImmutableList.builder(); |
73 | | - for (final KeyValue keyValue : keyValues) { |
74 | | - final byte[] key = keyValue.getKey(); |
75 | | - final byte[] value = keyValue.getValue(); |
76 | | - onReadListener.onKeyValueRead(key, value); |
77 | | - final Tuple neighborKeyTuple = getDataSubspace().unpack(key); |
78 | | - final Tuple neighborValueTuple = Tuple.fromBytes(value); |
79 | | - |
80 | | - final Tuple neighborPrimaryKey = neighborKeyTuple.getNestedTuple(2); // neighbor primary key |
81 | | - final Vector<Half> neighborVector = StorageAdapter.vectorFromTuple(neighborValueTuple); // the entire value is the vector |
82 | | - nodeReferencesWithVectorBuilder.add(new NodeReferenceWithVector(neighborPrimaryKey, neighborVector)); |
83 | | - } |
84 | | - |
85 | | - final Node<NodeReferenceWithVector> node = |
86 | | - getNodeFactory().create(primaryKey, null, nodeReferencesWithVectorBuilder.build()); |
87 | | - onReadListener.onNodeRead(node); |
88 | | - return node; |
89 | | - }); |
| 73 | + .thenApply(keyValues -> nodeFromRaw(primaryKey, keyValues)); |
| 74 | + } |
| 75 | + |
| 76 | + @Nonnull |
| 77 | + private Node<NodeReferenceWithVector> nodeFromRaw(final @Nonnull Tuple primaryKey, final List<KeyValue> keyValues) { |
| 78 | + final OnReadListener onReadListener = getOnReadListener(); |
| 79 | + |
| 80 | + final ImmutableList.Builder<NodeReferenceWithVector> nodeReferencesWithVectorBuilder = ImmutableList.builder(); |
| 81 | + for (final KeyValue keyValue : keyValues) { |
| 82 | + nodeReferencesWithVectorBuilder.add(neighborFromRaw(keyValue.getKey(), keyValue.getValue())); |
| 83 | + } |
| 84 | + |
| 85 | + final Node<NodeReferenceWithVector> node = |
| 86 | + getNodeFactory().create(primaryKey, null, nodeReferencesWithVectorBuilder.build()); |
| 87 | + onReadListener.onNodeRead(node); |
| 88 | + return node; |
| 89 | + } |
| 90 | + |
| 91 | + @Nonnull |
| 92 | + private NodeReferenceWithVector neighborFromRaw(final @Nonnull byte[] key, final byte[] value) { |
| 93 | + final OnReadListener onReadListener = getOnReadListener(); |
| 94 | + |
| 95 | + onReadListener.onKeyValueRead(key, value); |
| 96 | + final Tuple neighborKeyTuple = getDataSubspace().unpack(key); |
| 97 | + final Tuple neighborValueTuple = Tuple.fromBytes(value); |
| 98 | + |
| 99 | + final Tuple neighborPrimaryKey = neighborKeyTuple.getNestedTuple(2); // neighbor primary key |
| 100 | + final Vector<Half> neighborVector = StorageAdapter.vectorFromTuple(neighborValueTuple); // the entire value is the vector |
| 101 | + return new NodeReferenceWithVector(neighborPrimaryKey, neighborVector); |
90 | 102 | } |
91 | 103 |
|
92 | 104 | @Override |
@@ -122,4 +134,43 @@ private byte[] getNeighborKey(final int layer, |
122 | 134 | @Nonnull final Tuple neighborPrimaryKey) { |
123 | 135 | return getDataSubspace().pack(Tuple.from(layer, node.getPrimaryKey(), neighborPrimaryKey)); |
124 | 136 | } |
| 137 | + |
| 138 | + @Override |
| 139 | + public Iterable<Node<NodeReferenceWithVector>> scanLayer(@Nonnull final ReadTransaction readTransaction, int layer, |
| 140 | + @Nullable final Tuple lastPrimaryKey, int maxNumRead) { |
| 141 | + final byte[] layerPrefix = getDataSubspace().pack(Tuple.from(layer)); |
| 142 | + final Range range = |
| 143 | + lastPrimaryKey == null |
| 144 | + ? Range.startsWith(layerPrefix) |
| 145 | + : new Range(ByteArrayUtil.strinc(getDataSubspace().pack(Tuple.from(layer, lastPrimaryKey))), |
| 146 | + ByteArrayUtil.strinc(layerPrefix)); |
| 147 | + final AsyncIterable<KeyValue> itemsIterable = |
| 148 | + readTransaction.getRange(range, |
| 149 | + maxNumRead, false, StreamingMode.ITERATOR); |
| 150 | + int numRead = 0; |
| 151 | + Tuple nodePrimaryKey = null; |
| 152 | + ImmutableList.Builder<Node<NodeReferenceWithVector>> nodeBuilder = ImmutableList.builder(); |
| 153 | + ImmutableList.Builder<NodeReferenceWithVector> neighborsBuilder = ImmutableList.builder(); |
| 154 | + for (final KeyValue item: itemsIterable) { |
| 155 | + final NodeReferenceWithVector neighbor = |
| 156 | + neighborFromRaw(item.getKey(), item.getValue()); |
| 157 | + final Tuple primaryKeyFromNodeReference = neighbor.getPrimaryKey(); |
| 158 | + if (nodePrimaryKey == null) { |
| 159 | + nodePrimaryKey = primaryKeyFromNodeReference; |
| 160 | + } else { |
| 161 | + if (!nodePrimaryKey.equals(primaryKeyFromNodeReference)) { |
| 162 | + nodeBuilder.add(getNodeFactory().create(nodePrimaryKey, null, neighborsBuilder.build())); |
| 163 | + } |
| 164 | + } |
| 165 | + neighborsBuilder.add(neighbor); |
| 166 | + numRead ++; |
| 167 | + } |
| 168 | + |
| 169 | + // there may be a rest |
| 170 | + if (numRead > 0 && numRead < maxNumRead) { |
| 171 | + nodeBuilder.add(getNodeFactory().create(nodePrimaryKey, null, neighborsBuilder.build())); |
| 172 | + } |
| 173 | + |
| 174 | + return nodeBuilder.build(); |
| 175 | + } |
125 | 176 | } |
0 commit comments