Skip to content

Commit 97fd68e

Browse files
committed
feat: drop peer on activity timeout
1 parent ad8e250 commit 97fd68e

File tree

5 files changed

+280
-200
lines changed

5 files changed

+280
-200
lines changed

codex/blockexchange/engine/engine.nim

Lines changed: 88 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ declareCounter(codex_block_exchange_blocks_sent, "codex blockexchange blocks sen
6464
declareCounter(
6565
codex_block_exchange_blocks_received, "codex blockexchange blocks received"
6666
)
67+
declareCounter(
68+
codex_block_exchange_spurious_blocks_received,
69+
"codex blockexchange unrequested/duplicate blocks received",
70+
)
6771

6872
const
6973
DefaultMaxPeersPerRequest* = 10
@@ -80,13 +84,15 @@ const
8084
type
8185
TaskHandler* = proc(task: BlockExcPeerCtx): Future[void] {.gcsafe.}
8286
TaskScheduler* = proc(task: BlockExcPeerCtx): bool {.gcsafe.}
87+
PeerSelector* =
88+
proc(peers: seq[BlockExcPeerCtx]): BlockExcPeerCtx {.gcsafe, raises: [].}
8389

8490
BlockExcEngine* = ref object of RootObj
8591
localStore*: BlockStore # Local block store for this instance
86-
network*: BlockExcNetwork # Petwork interface
92+
network*: BlockExcNetwork # Network interface
8793
peers*: PeerCtxStore # Peers we're currently actively exchanging with
8894
taskQueue*: AsyncHeapQueue[BlockExcPeerCtx]
89-
# Peers we're currently processing tasks for
95+
selectPeer*: PeerSelector # Peers we're currently processing tasks for
9096
concurrentTasks: int # Number of concurrent peers we're serving at any given time
9197
trackedFutures: TrackedFutures # Tracks futures of blockexc tasks
9298
blockexcRunning: bool # Indicates if the blockexc task is running
@@ -205,8 +211,19 @@ proc searchForNewPeers(self: BlockExcEngine, cid: Cid) =
205211
else:
206212
trace "Not searching for new peers, rate limit not expired", cid = cid
207213

208-
proc randomPeer(peers: seq[BlockExcPeerCtx]): BlockExcPeerCtx =
209-
Rng.instance.sample(peers)
214+
proc evictPeer(self: BlockExcEngine, peer: PeerId) =
215+
## Cleanup disconnected peer
216+
##
217+
218+
trace "Evicting disconnected/departed peer", peer
219+
220+
let peerCtx = self.peers.get(peer)
221+
if not peerCtx.isNil:
222+
for address in peerCtx.blocksRequested:
223+
self.pendingBlocks.clearRequest(address, peer.some)
224+
225+
# drop the peer from the peers table
226+
self.peers.remove(peer)
210227

211228
proc downloadInternal(
212229
self: BlockExcEngine, address: BlockAddress
@@ -245,23 +262,34 @@ proc downloadInternal(
245262
# control what happens and how many neighbors we get like this.
246263
self.searchForNewPeers(address.cidOrTreeCid)
247264

248-
# We wait for a bit and then retry. Since we've shipped our wantlists to
249-
# connected peers, they might reply and we might request the block in the
250-
# meantime as part of `blockPresenceHandler`.
265+
# We now wait for a bit and then retry. If the handle gets completed in the
266+
# meantime (cause the presence handler might have requested the block and
267+
# received it in the meantime), we are done.
251268
await handle or sleepAsync(self.pendingBlocks.retryInterval)
252-
# If we already got the block, we're done. Otherwise, we'll go for another
253-
# cycle, potentially refreshing knowledge on peers again, and looking up
254-
# the DHT again.
255269
if handle.finished:
256270
break
271+
# If we still don't have the block, we'll go for another cycle.
257272
trace "No peers for block, will retry shortly"
258273
continue
259274

260-
let scheduledPeer = peers.with.randomPeer
261-
self.pendingBlocks.setInFlight(address, true)
262-
scheduledPeer.blockRequested(address)
263-
await self.sendWantBlock(@[address], scheduledPeer)
275+
# Once again, it might happen that the block was requested to a peer
276+
# in the meantime. If so, we don't need to do anything. Otherwise,
277+
# we'll be the ones placing the request.
278+
let scheduledPeer =
279+
if not self.pendingBlocks.isRequested(address):
280+
let peer = self.selectPeer(peers.with)
281+
self.pendingBlocks.markRequested(address, peer.id)
282+
peer.blockRequested(address)
283+
trace "Request block from block retry loop"
284+
await self.sendWantBlock(@[address], peer)
285+
peer
286+
else:
287+
let peerId = self.pendingBlocks.getRequestPeer(address).get()
288+
self.peers.get(peerId)
289+
290+
assert not scheduledPeer.isNil
264291

292+
# Parks until either the block is received, or the peer times out.
265293
let activityTimer = scheduledPeer.activityTimer()
266294
await handle or activityTimer # TODO: or peerDropped
267295
activityTimer.cancel()
@@ -275,8 +303,11 @@ proc downloadInternal(
275303
break
276304
else:
277305
# If the peer timed out, retries immediately.
278-
trace "Dropping timed out peer.", peer = scheduledPeer.id
279-
# TODO: disconnect peer
306+
trace "Peer timed out during block request", peer = scheduledPeer.id
307+
await self.network.dropPeer(scheduledPeer.id)
308+
# Evicts peer immediately or we may end up picking it again in the
309+
# next retry.
310+
self.evictPeer(scheduledPeer.id)
280311
except CancelledError as exc:
281312
trace "Block download cancelled"
282313
if not handle.finished:
@@ -286,7 +317,7 @@ proc downloadInternal(
286317
if not handle.finished:
287318
handle.fail(exc)
288319
finally:
289-
self.pendingBlocks.setInFlight(address, false)
320+
self.pendingBlocks.clearRequest(address)
290321

291322
proc requestBlocks*(
292323
self: BlockExcEngine, addresses: seq[BlockAddress]
@@ -375,19 +406,21 @@ proc blockPresenceHandler*(
375406

376407
let ourWantCids = ourWantList.filterIt(
377408
it in peerHave and not self.pendingBlocks.retriesExhausted(it) and
378-
not self.pendingBlocks.isInFlight(it)
409+
not self.pendingBlocks.isRequested(it)
379410
).toSeq
380411

381412
for address in ourWantCids:
382-
self.pendingBlocks.setInFlight(address, true)
383413
self.pendingBlocks.decRetries(address)
414+
self.pendingBlocks.markRequested(address, peer)
384415
peerCtx.blockRequested(address)
385416

386417
if ourWantCids.len > 0:
387418
trace "Peer has blocks in our wantList", peer, wants = ourWantCids
388419
# FIXME: this will result in duplicate requests for blocks
389420
if err =? catch(await self.sendWantBlock(ourWantCids, peerCtx)).errorOption:
390421
warn "Failed to send wantBlock to peer", peer, err = err.msg
422+
for address in ourWantCids:
423+
self.pendingBlocks.clearRequest(address, peer.some)
391424

392425
proc scheduleTasks(
393426
self: BlockExcEngine, blocksDelivery: seq[BlockDelivery]
@@ -417,18 +450,17 @@ proc cancelBlocks(
417450
## Tells neighboring peers that we're no longer interested in a block.
418451
##
419452

420-
let addrSet = toHashSet(addrs)
421-
var pendingCancellations: Table[PeerId, HashSet[BlockAddress]]
453+
let blocksDelivered = toHashSet(addrs)
454+
var scheduledCancellations: Table[PeerId, HashSet[BlockAddress]]
422455

423456
if self.peers.len == 0:
424457
return
425458

426-
trace "Sending block request cancellations to peers",
427-
addrs, peers = self.peers.peerIds
428-
429-
proc processPeer(
459+
proc dispatchCancellations(
430460
entry: tuple[peerId: PeerId, addresses: HashSet[BlockAddress]]
431461
): Future[PeerId] {.async: (raises: [CancelledError]).} =
462+
trace "Sending block request cancellations to peer",
463+
peer = entry.peerId, addresses = entry.addresses.len
432464
await self.network.request.sendWantCancellations(
433465
peer = entry.peerId, addresses = entry.addresses.toSeq
434466
)
@@ -437,21 +469,22 @@ proc cancelBlocks(
437469

438470
try:
439471
for peerCtx in self.peers.peers.values:
440-
# Have we requested any of the blocks we're cancelling to this peer?
441-
let intersection = peerCtx.blocksRequested.intersection(addrSet)
472+
# Do we have pending requests, towards this peer, for any of the blocks
473+
# that were just delivered?
474+
let intersection = peerCtx.blocksRequested.intersection(blocksDelivered)
442475
if intersection.len > 0:
443-
pendingCancellations[peerCtx.id] = intersection
476+
# If so, schedules a cancellation.
477+
scheduledCancellations[peerCtx.id] = intersection
444478

445-
# If so, dispatches cancellations.
446479
let (succeededFuts, failedFuts) = await allFinishedFailed[PeerId](
447-
toSeq(pendingCancellations.pairs).map(processPeer)
480+
toSeq(scheduledCancellations.pairs).map(dispatchCancellations)
448481
)
449482

450483
(await allFinished(succeededFuts)).mapIt(it.read).apply do(peerId: PeerId):
451484
let ctx = self.peers.get(peerId)
452485
if not ctx.isNil:
453486
ctx.cleanPresence(addrs)
454-
for address in pendingCancellations[peerId]:
487+
for address in scheduledCancellations[peerId]:
455488
ctx.blockRequestCancelled(address)
456489

457490
if failedFuts.len > 0:
@@ -535,6 +568,12 @@ proc blocksDeliveryHandler*(
535568
address = bd.address
536569

537570
try:
571+
# Unknown peers and unrequested blocks are dropped with a warning.
572+
if peerCtx == nil or not peerCtx.blockReceived(bd.address):
573+
warn "Dropping unrequested or duplicate block received from peer"
574+
codex_block_exchange_spurious_blocks_received.inc()
575+
continue
576+
538577
if err =? self.validateBlockDelivery(bd).errorOption:
539578
warn "Block validation failed", msg = err.msg
540579
continue
@@ -554,9 +593,6 @@ proc blocksDeliveryHandler*(
554593
).errorOption:
555594
warn "Unable to store proof and cid for a block"
556595
continue
557-
558-
if peerCtx != nil:
559-
peerCtx.blockReceived(bd.address)
560596
except CatchableError as exc:
561597
warn "Error handling block delivery", error = exc.msg
562598
continue
@@ -681,7 +717,7 @@ proc paymentHandler*(
681717
else:
682718
context.paymentChannel = self.wallet.acceptChannel(payment).option
683719

684-
proc setupPeer*(
720+
proc peerAddedHandler*(
685721
self: BlockExcEngine, peer: PeerId
686722
) {.async: (raises: [CancelledError]).} =
687723
## Perform initial setup, such as want
@@ -701,15 +737,6 @@ proc setupPeer*(
701737
trace "Sending account to peer", peer
702738
await self.network.request.sendAccount(peer, Account(address: address))
703739

704-
proc dropPeer*(self: BlockExcEngine, peer: PeerId) {.raises: [].} =
705-
## Cleanup disconnected peer
706-
##
707-
708-
trace "Dropping peer", peer
709-
710-
# drop the peer from the peers table
711-
self.peers.remove(peer)
712-
713740
proc localLookup(
714741
self: BlockExcEngine, address: BlockAddress
715742
): Future[?!BlockDelivery] {.async: (raises: [CancelledError]).} =
@@ -775,7 +802,7 @@ proc taskHandler*(
775802
peerCtx.wantedBlocks.keepItIf(it notin sent)
776803
finally:
777804
# Better safe than sorry: if an exception does happen, we don't want to keep
778-
# those in flight as it'll effectively prevent the blocks from ever being sent.
805+
# those as sent, as it'll effectively prevent the blocks from ever being sent again.
779806
peerCtx.blocksSent.keepItIf(it notin wantedBlocks)
780807

781808
proc blockexcTaskRunner(self: BlockExcEngine) {.async: (raises: []).} =
@@ -792,6 +819,9 @@ proc blockexcTaskRunner(self: BlockExcEngine) {.async: (raises: []).} =
792819

793820
info "Exiting blockexc task runner"
794821

822+
proc selectRandom*(peers: seq[BlockExcPeerCtx]): BlockExcPeerCtx =
823+
Rng.instance.sample(peers)
824+
795825
proc new*(
796826
T: type BlockExcEngine,
797827
localStore: BlockStore,
@@ -803,6 +833,7 @@ proc new*(
803833
pendingBlocks: PendingBlocksManager,
804834
maxBlocksPerMessage = DefaultMaxBlocksPerMessage,
805835
concurrentTasks = DefaultConcurrentTasks,
836+
selectPeer: PeerSelector = selectRandom,
806837
): BlockExcEngine =
807838
## Create new block exchange engine instance
808839
##
@@ -821,18 +852,6 @@ proc new*(
821852
advertiser: advertiser,
822853
)
823854

824-
proc peerEventHandler(
825-
peerId: PeerId, event: PeerEvent
826-
): Future[void] {.gcsafe, async: (raises: [CancelledError]).} =
827-
if event.kind == PeerEventKind.Joined:
828-
await self.setupPeer(peerId)
829-
else:
830-
self.dropPeer(peerId)
831-
832-
if not isNil(network.switch):
833-
network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Joined)
834-
network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Left)
835-
836855
proc blockWantListHandler(
837856
peer: PeerId, wantList: WantList
838857
): Future[void] {.async: (raises: []).} =
@@ -858,12 +877,24 @@ proc new*(
858877
): Future[void] {.async: (raises: []).} =
859878
self.paymentHandler(peer, payment)
860879

880+
proc peerAddedHandler(
881+
peer: PeerId
882+
): Future[void] {.async: (raises: [CancelledError]).} =
883+
await self.peerAddedHandler(peer)
884+
885+
proc peerDepartedHandler(
886+
peer: PeerId
887+
): Future[void] {.async: (raises: [CancelledError]).} =
888+
self.evictPeer(peer)
889+
861890
network.handlers = BlockExcHandlers(
862891
onWantList: blockWantListHandler,
863892
onBlocksDelivery: blocksDeliveryHandler,
864893
onPresence: blockPresenceHandler,
865894
onAccount: accountHandler,
866895
onPayment: paymentHandler,
896+
onPeerJoined: peerAddedHandler,
897+
onPeerDeparted: peerDepartedHandler,
867898
)
868899

869900
return self

codex/blockexchange/engine/pendingblocks.nim

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ type
4242

4343
BlockReq* = object
4444
handle*: BlockHandle
45-
inFlight*: bool
45+
requested*: ?PeerId
4646
blockRetries*: int
4747
startTime*: int64
4848

@@ -56,7 +56,7 @@ proc updatePendingBlockGauge(p: PendingBlocksManager) =
5656
codex_block_exchange_pending_block_requests.set(p.blocks.len.int64)
5757

5858
proc getWantHandle*(
59-
self: PendingBlocksManager, address: BlockAddress, inFlight = false
59+
self: PendingBlocksManager, address: BlockAddress, requested: ?PeerId = PeerId.none
6060
): Future[Block] {.async: (raw: true, raises: [CancelledError, RetriesExhaustedError]).} =
6161
## Add an event for a block
6262
##
@@ -66,7 +66,7 @@ proc getWantHandle*(
6666
do:
6767
let blk = BlockReq(
6868
handle: newFuture[Block]("pendingBlocks.getWantHandle"),
69-
inFlight: inFlight,
69+
requested: requested,
7070
blockRetries: self.blockRetries,
7171
startTime: getMonoTime().ticks,
7272
)
@@ -89,9 +89,9 @@ proc getWantHandle*(
8989
return handle
9090

9191
proc getWantHandle*(
92-
self: PendingBlocksManager, cid: Cid, inFlight = false
92+
self: PendingBlocksManager, cid: Cid, requested: ?PeerId = PeerId.none
9393
): Future[Block] {.async: (raw: true, raises: [CancelledError, RetriesExhaustedError]).} =
94-
self.getWantHandle(BlockAddress.init(cid), inFlight)
94+
self.getWantHandle(BlockAddress.init(cid), requested)
9595

9696
proc resolve*(
9797
self: PendingBlocksManager, blocksDelivery: seq[BlockDelivery]
@@ -128,19 +128,37 @@ func retriesExhausted*(self: PendingBlocksManager, address: BlockAddress): bool
128128
self.blocks.withValue(address, pending):
129129
result = pending[].blockRetries <= 0
130130

131-
func setInFlight*(self: PendingBlocksManager, address: BlockAddress, inFlight = true) =
132-
## Set inflight status for a block
131+
func isRequested*(self: PendingBlocksManager, address: BlockAddress): bool =
132+
## Check if a block has been requested to a peer
133133
##
134+
result = false
135+
self.blocks.withValue(address, pending):
136+
result = pending[].requested.isSome
134137

138+
func getRequestPeer*(self: PendingBlocksManager, address: BlockAddress): ?PeerId =
139+
## Returns the peer that requested this block
140+
##
141+
result = PeerId.none
135142
self.blocks.withValue(address, pending):
136-
pending[].inFlight = inFlight
143+
result = pending[].requested
137144

138-
func isInFlight*(self: PendingBlocksManager, address: BlockAddress): bool =
139-
## Check if a block is in flight
145+
proc markRequested*(self: PendingBlocksManager, address: BlockAddress, peer: PeerId) =
146+
## Marks this block as having been requested to a peer
140147
##
141148

149+
if self.isRequested(address):
150+
error "Attempt to request block twice", address = address, peer = peer
151+
152+
self.blocks.withValue(address, pending):
153+
pending[].requested = peer.some
154+
155+
proc clearRequest*(
156+
self: PendingBlocksManager, address: BlockAddress, peer: ?PeerId = PeerId.none
157+
) =
142158
self.blocks.withValue(address, pending):
143-
result = pending[].inFlight
159+
if peer.isSome:
160+
assert peer == pending[].requested
161+
pending[].requested = PeerId.none
144162

145163
func contains*(self: PendingBlocksManager, cid: Cid): bool =
146164
BlockAddress.init(cid) in self.blocks

0 commit comments

Comments
 (0)