Skip to content

Commit 2205e3e

Browse files
committed
feat: use SafeAsyncIterator for getBlocks; limit memory usage for fetchBatched when used as prefetcher
1 parent d94bfe6 commit 2205e3e

File tree

6 files changed

+108
-43
lines changed

6 files changed

+108
-43
lines changed

codex/blockexchange/engine/engine.nim

Lines changed: 46 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ proc downloadInternal(
256256

257257
proc requestBlocks*(
258258
self: BlockExcEngine, addresses: seq[BlockAddress]
259-
): Future[seq[?!Block]] {.async: (raises: [CancelledError]).} =
259+
): SafeAsyncIter[Block] =
260260
var handles: seq[BlockHandle]
261261
# Adds all blocks to pendingBlocks before calling the first downloadInternal. This will
262262
# ensure that we don't send incomplete want lists.
@@ -267,20 +267,27 @@ proc requestBlocks*(
267267
for address in addresses:
268268
self.trackedFutures.track(self.downloadInternal(address))
269269

270-
# TODO: we can reduce latency and improve download times
271-
# by returning blocks out of order as futures complete.
272-
var blocks: seq[?!Block]
273-
for handle in handles:
274-
try:
275-
blocks.add(success await handle)
276-
except CancelledError as err:
277-
warn "Block request cancelled", addresses, err = err.msg
278-
raise err
279-
except CatchableError as err:
280-
error "Error getting blocks from exchange engine", addresses, err = err.msg
281-
blocks.add(Block.failure err)
270+
var completed: int = 0
271+
272+
proc isFinished(): bool =
273+
completed == handles.len
274+
275+
proc genNext(): Future[?!Block] {.async: (raises: [CancelledError]).} =
276+
# Be it success or failure, we're completing this future.
277+
let value =
278+
try:
279+
success await handles[completed]
280+
except CancelledError as err:
281+
warn "Block request cancelled", addresses, err = err.msg
282+
raise err
283+
except CatchableError as err:
284+
error "Error getting blocks from exchange engine", addresses, err = err.msg
285+
failure err
282286

283-
return blocks
287+
inc(completed)
288+
return value
289+
290+
return SafeAsyncIter[Block].new(genNext, isFinished)
284291

285292
proc requestBlock*(
286293
self: BlockExcEngine, address: BlockAddress
@@ -368,28 +375,42 @@ proc cancelBlocks(
368375
## Tells neighboring peers that we're no longer interested in a block.
369376
##
370377

378+
let addrSet = toHashSet(addrs)
379+
var pendingCancellations: Table[PeerId, HashSet[BlockAddress]]
380+
371381
if self.peers.len == 0:
372382
return
373383

374384
trace "Sending block request cancellations to peers",
375385
addrs, peers = self.peers.peerIds
376386

377-
proc processPeer(peerCtx: BlockExcPeerCtx): Future[BlockExcPeerCtx] {.async.} =
387+
proc processPeer(
388+
entry: tuple[peerId: PeerId, addresses: HashSet[BlockAddress]]
389+
): Future[PeerId] {.async: (raises: [CancelledError]).} =
378390
await self.network.request.sendWantCancellations(
379-
peer = peerCtx.id, addresses = addrs.filterIt(it in peerCtx)
391+
peer = entry.peerId, addresses = entry.addresses.toSeq
380392
)
381393

382-
return peerCtx
394+
return entry.peerId
383395

384396
try:
385-
let (succeededFuts, failedFuts) = await allFinishedFailed[BlockExcPeerCtx](
386-
toSeq(self.peers.peers.values).filterIt(it.peerHave.anyIt(it in addrs)).map(
387-
processPeer
388-
)
397+
# Does the peer have any of the blocks we're canceling?
398+
for peerCtx in self.peers.peers.values:
399+
let intersection = peerCtx.peerHave.intersection(addrSet)
400+
if intersection.len > 0:
401+
pendingCancellations[peerCtx.id] = intersection
402+
403+
# If so, dispatches cancellations.
404+
# FIXME: we're still spamming peers - the fact that the peer has the block does
405+
# not mean we've requested it.
406+
let (succeededFuts, failedFuts) = await allFinishedFailed[PeerId](
407+
toSeq(pendingCancellations.pairs).map(processPeer)
389408
)
390409

391-
(await allFinished(succeededFuts)).mapIt(it.read).apply do(peerCtx: BlockExcPeerCtx):
392-
peerCtx.cleanPresence(addrs)
410+
(await allFinished(succeededFuts)).mapIt(it.read).apply do(peerId: PeerId):
411+
let ctx = self.peers.get(peerId)
412+
if not ctx.isNil:
413+
ctx.cleanPresence(addrs)
393414

394415
if failedFuts.len > 0:
395416
warn "Failed to send block request cancellations to peers", peers = failedFuts.len
@@ -539,6 +560,8 @@ proc wantListHandler*(
539560
price = @(self.pricing.get(Pricing(price: 0.u256)).price.toBytesBE)
540561

541562
if e.cancel:
563+
# This is sort of expected if we sent the block to the peer, as we have removed
564+
# it from the peer's wantlist ourselves.
542565
trace "Received cancelation for untracked block, skipping",
543566
address = e.address
544567
continue

codex/node.nim

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ import ./indexingstrategy
4444
import ./utils
4545
import ./errors
4646
import ./logutils
47-
import ./utils/asynciter
47+
import ./utils/safeasynciter
4848
import ./utils/trackedfutures
4949

5050
export logutils
@@ -194,20 +194,38 @@ proc fetchBatched*(
194194
if not (await address in self.networkStore) or fetchLocal:
195195
address
196196

197-
let
198-
blockResults = await self.networkStore.getBlocks(addresses)
199-
blocks = blockResults.filterIt(it.isSuccess()).mapIt(it.value)
200-
numOfFailedBlocks = blockResults.len - blocks.len
197+
proc successful(
198+
blk: ?!bt.Block
199+
): Future[bool] {.async: (raises: [CancelledError]).} =
200+
return blk.isSuccess()
201201

202-
if numOfFailedBlocks > 0:
203-
return
204-
failure("Some blocks failed (Result) to fetch (" & $numOfFailedBlocks & ")")
202+
let blockResults = await self.networkStore.getBlocks(addresses)
205203

206-
if not onBatch.isNil and batchErr =? (await onBatch(blocks)).errorOption:
204+
var
205+
successfulBlocks = 0
206+
failedBlocks = 0
207+
blockData: seq[bt.Block]
208+
209+
for res in blockResults:
210+
without blk =? await res:
211+
inc(failedBlocks)
212+
continue
213+
214+
inc(successfulBlocks)
215+
216+
# Only retains block data in memory if there's
217+
# a callback.
218+
if not onBatch.isNil:
219+
blockData.add(blk)
220+
221+
if failedBlocks > 0:
222+
return failure("Some blocks failed (Result) to fetch (" & $failedBlocks & ")")
223+
224+
if not onBatch.isNil and batchErr =? (await onBatch(blockData)).errorOption:
207225
return failure(batchErr)
208226

209227
if not iter.finished:
210-
await sleepAsync(1.millis)
228+
await idleAsync()
211229

212230
success()
213231

codex/stores/blockstore.nim

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,14 @@ method getBlock*(
6565

6666
raiseAssert("getBlock by addr not implemented!")
6767

68+
method getBlocks*(
69+
self: BlockStore, addresses: seq[BlockAddress]
70+
): Future[SafeAsyncIter[Block]] {.async: (raises: [CancelledError]).} =
71+
## Gets a set of blocks from the blockstore. Blocks might
72+
## be returned in any order.
73+
74+
raiseAssert("getBlocks not implemented!")
75+
6876
method getBlockAndProof*(
6977
self: BlockStore, treeCid: Cid, index: Natural
7078
): Future[?!(Block, CodexProof)] {.base, async: (raises: [CancelledError]), gcsafe.} =

codex/stores/networkstore.nim

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,24 +31,23 @@ type NetworkStore* = ref object of BlockStore
3131
engine*: BlockExcEngine # blockexc decision engine
3232
localStore*: BlockStore # local block store
3333

34-
proc getBlocks*(
34+
method getBlocks*(
3535
self: NetworkStore, addresses: seq[BlockAddress]
36-
): Future[seq[?!Block]] {.async: (raises: [CancelledError]).} =
36+
): Future[SafeAsyncIter[Block]] {.async: (raises: [CancelledError]).} =
3737
var
38-
localBlocks: seq[?!Block]
38+
localAddresses: seq[BlockAddress]
3939
remoteAddresses: seq[BlockAddress]
4040

41-
# We can resolve local blocks sequentially as for now those are blocking anyway. Still:
42-
# TODO: implement getBlocks for local store so we can delegate it here.
4341
for address in addresses:
4442
if not (await address in self.localStore):
4543
remoteAddresses.add(address)
4644
else:
47-
localBlocks.add(await self.localStore.getBlock(address))
45+
localAddresses.add(address)
4846

49-
let remoteBlocks = await self.engine.requestBlocks(remoteAddresses)
50-
51-
return localBlocks.concat(remoteBlocks)
47+
return chain(
48+
await self.localStore.getBlocks(localAddresses),
49+
self.engine.requestBlocks(remoteAddresses),
50+
)
5251

5352
method getBlock*(
5453
self: NetworkStore, address: BlockAddress

codex/stores/repostore/store.nim

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,21 @@ logScope:
3838
# BlockStore API
3939
###########################################################
4040

41+
method getBlocks*(
42+
self: RepoStore, addresses: seq[BlockAddress]
43+
): Future[SafeAsyncIter[Block]] {.async: (raises: [CancelledError]).} =
44+
var i = 0
45+
46+
proc isFinished(): bool =
47+
i == addresses.len
48+
49+
proc genNext(): Future[?!Block] {.async: (raises: [CancelledError]).} =
50+
let value = await self.getBlock(addresses[i])
51+
inc(i)
52+
return value
53+
54+
return SafeAsyncIter[Block].new(genNext, isFinished)
55+
4156
method getBlock*(
4257
self: RepoStore, cid: Cid
4358
): Future[?!Block] {.async: (raises: [CancelledError]).} =

tests/codex/utils/testsafeasynciter.nim

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -404,8 +404,10 @@ asyncchecksuite "Test SafeAsyncIter":
404404

405405
expect CancelledError:
406406
for fut in iter2:
407-
without i =? (await fut), err:
407+
if i =? (await fut):
408408
collected.add(i)
409+
else:
410+
fail()
409411

410412
check:
411413
# We expect only values "0" and "1" to be collected

0 commit comments

Comments
 (0)