Skip to content

Commit e18996e

Browse files
committed
optimize remaining list joins so they're not quadratic
1 parent a53f8da commit e18996e

File tree

4 files changed

+51
-58
lines changed

4 files changed

+51
-58
lines changed

codex/blockexchange/engine/engine.nim

Lines changed: 33 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ proc blockPresenceHandler*(
308308
peerCtx.setPresence(presence)
309309

310310
let
311-
peerHave = peerCtx.peerHave.toHashSet
311+
peerHave = peerCtx.peerHave
312312
dontWantCids = peerHave - ourWantList
313313

314314
if dontWantCids.len > 0:
@@ -332,24 +332,23 @@ proc blockPresenceHandler*(
332332
proc scheduleTasks(
333333
self: BlockExcEngine, blocksDelivery: seq[BlockDelivery]
334334
) {.async: (raises: [CancelledError]).} =
335-
let cids = blocksDelivery.mapIt(it.blk.cid)
336-
337335
# schedule any new peers to provide blocks to
338336
for p in self.peers:
339-
for c in cids: # for each cid
337+
for blockDelivery in blocksDelivery: # for each cid
340338
# schedule a peer if it wants at least one cid
341339
# and we have it in our local store
342-
if c in p.peerWantsCids:
340+
if blockDelivery.address in p.wantedBlocks:
341+
let cid = blockDelivery.blk.cid
343342
try:
344-
if await (c in self.localStore):
343+
if await (cid in self.localStore):
345344
# TODO: the try/except should go away once blockstore tracks exceptions
346345
self.scheduleTask(p)
347346
break
348347
except CancelledError as exc:
349-
warn "Checking local store canceled", cid = c, err = exc.msg
348+
warn "Checking local store canceled", cid = cid, err = exc.msg
350349
return
351350
except CatchableError as exc:
352-
error "Error checking local store for cid", cid = c, err = exc.msg
351+
error "Error checking local store for cid", cid = cid, err = exc.msg
353352
raiseAssert "Unexpected error checking local store for cid"
354353

355354
proc cancelBlocks(
@@ -513,14 +512,12 @@ proc wantListHandler*(
513512

514513
try:
515514
for e in wantList.entries:
516-
let idx = peerCtx.peerWants.findIt(it.address == e.address)
517-
518515
logScope:
519516
peer = peerCtx.id
520517
address = e.address
521518
wantType = $e.wantType
522519

523-
if idx < 0: # Adding new entry to peer wants
520+
if e.address notin peerCtx.wantedBlocks: # Adding new entry to peer wants
524521
let
525522
have =
526523
try:
@@ -556,25 +553,20 @@ proc wantListHandler*(
556553

557554
codex_block_exchange_want_have_lists_received.inc()
558555
of WantType.WantBlock:
559-
peerCtx.peerWants.add(e)
556+
peerCtx.wantedBlocks.incl(e.address)
560557
schedulePeer = true
561558
codex_block_exchange_want_block_lists_received.inc()
562559
else: # Updating existing entry in peer wants
563560
# peer doesn't want this block anymore
564561
if e.cancel:
565562
trace "Canceling want for block", address = e.address
566-
peerCtx.peerWants.del(idx)
563+
peerCtx.wantedBlocks.excl(e.address)
567564
trace "Canceled block request",
568-
address = e.address, len = peerCtx.peerWants.len
565+
address = e.address, len = peerCtx.wantedBlocks.len
569566
else:
567+
trace "Peer has requested a block more than once", address = e.address
570568
if e.wantType == WantType.WantBlock:
571569
schedulePeer = true
572-
# peer might want to ask for the same cid with
573-
# different want params
574-
trace "Updating want for block", address = e.address
575-
peerCtx.peerWants[idx] = e # update entry
576-
trace "Updated block request",
577-
address = e.address, len = peerCtx.peerWants.len
578570

579571
if presence.len > 0:
580572
trace "Sending presence to remote", items = presence.mapIt($it).join(",")
@@ -639,20 +631,16 @@ proc dropPeer*(self: BlockExcEngine, peer: PeerId) {.raises: [].} =
639631
self.peers.remove(peer)
640632

641633
proc localLookup(
642-
self: BlockExcEngine, e: WantListEntry
634+
self: BlockExcEngine, address: BlockAddress
643635
): Future[?!BlockDelivery] {.async: (raises: [CancelledError]).} =
644-
if e.address.leaf:
645-
(await self.localStore.getBlockAndProof(e.address.treeCid, e.address.index)).map(
636+
if address.leaf:
637+
(await self.localStore.getBlockAndProof(address.treeCid, address.index)).map(
646638
(blkAndProof: (Block, CodexProof)) =>
647-
BlockDelivery(
648-
address: e.address, blk: blkAndProof[0], proof: blkAndProof[1].some
649-
)
639+
BlockDelivery(address: address, blk: blkAndProof[0], proof: blkAndProof[1].some)
650640
)
651641
else:
652-
(await self.localStore.getBlock(e.address)).map(
653-
(blk: Block) => BlockDelivery(
654-
address: e.address, blk: blk, proof: CodexProof.none
655-
)
642+
(await self.localStore.getBlock(address)).map(
643+
(blk: Block) => BlockDelivery(address: address, blk: blk, proof: CodexProof.none)
656644
)
657645

658646
iterator splitBatches[T](sequence: seq[T], batchSize: int): seq[T] =
@@ -674,40 +662,41 @@ proc taskHandler*(
674662

675663
# Blocks that are in flight have already been picked up by other tasks and
676664
# should not be re-sent.
677-
var wantedBlocks = peerCtx.peerWants.filterIt(
678-
it.wantType == WantType.WantBlock and not peerCtx.isInFlight(it.address)
679-
)
680-
681-
wantedBlocks.sort(SortOrder.Descending)
665+
var
666+
wantedBlocks = peerCtx.wantedBlocks.filterIt(not peerCtx.isInFlight(it))
667+
sent: HashSet[BlockAddress]
682668

683669
for wantedBlock in wantedBlocks:
684-
peerCtx.addInFlight(wantedBlock.address)
670+
peerCtx.addInFlight(wantedBlock)
685671

686672
try:
687-
for batch in wantedBlocks.splitBatches(self.maxBlocksPerMessage):
673+
for batch in wantedBlocks.toSeq.splitBatches(self.maxBlocksPerMessage):
688674
var blockDeliveries: seq[BlockDelivery]
689675
for wantedBlock in batch:
690676
# I/O is blocking so looking up blocks sequentially is fine.
691677
without blockDelivery =? await self.localLookup(wantedBlock), err:
692678
error "Error getting block from local store",
693-
err = err.msg, address = wantedBlock.address
694-
peerCtx.removeInFlight(wantedBlock.address)
679+
err = err.msg, address = wantedBlock
680+
peerCtx.removeInFlight(wantedBlock)
695681
continue
696682
blockDeliveries.add(blockDelivery)
683+
sent.incl(wantedBlock)
697684

698685
if blockDeliveries.len == 0:
699686
continue
700687

701688
await self.network.request.sendBlocksDelivery(peerCtx.id, blockDeliveries)
702689
codex_block_exchange_blocks_sent.inc(blockDeliveries.len.int64)
703-
# Drops the batch from want list. Note that the send might still fail down the line
704-
# and we will have removed them anyway, at which point we rely on the requester
705-
# performing a retry for the request to succeed.
706-
peerCtx.peerWants.keepItIf(it.address notin blockDeliveries.mapIt(it.address))
690+
# Drops the batch from the peer's set of wanted blocks; i.e. assumes that after
691+
# we send the blocks, then the peer no longer wants them, so we don't need to
692+
# re-send them. Note that the send might still fail down the line and we will
693+
# have removed those anyway. At that point, we rely on the requester performing
694+
# a retry for the request to succeed.
695+
peerCtx.wantedBlocks.keepItIf(it notin sent)
707696
finally:
708697
# Better safe than sorry: if an exception does happen, we don't want to keep
709698
# those in flight as it'll effectively prevent the blocks from ever being sent.
710-
peerCtx.blocksInFlight.keepItIf(it notin wantedBlocks.mapIt(it.address))
699+
peerCtx.blocksInFlight.keepItIf(it notin wantedBlocks)
711700

712701
proc blockexcTaskRunner(self: BlockExcEngine) {.async: (raises: []).} =
713702
## process tasks

codex/blockexchange/peers/peercontext.nim

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ export payments, nitro
2828
type BlockExcPeerCtx* = ref object of RootObj
2929
id*: PeerId
3030
blocks*: Table[BlockAddress, Presence] # remote peer have list including price
31-
peerWants*: seq[WantListEntry] # remote peers want lists
31+
wantedBlocks*: HashSet[BlockAddress] # blocks that the peer wants
3232
exchanged*: int # times peer has exchanged with us
3333
lastExchange*: Moment # last time peer has exchanged with us
3434
lastRefresh*: Moment # last time we refreshed our knowledge of the blocks this peer has
@@ -37,7 +37,7 @@ type BlockExcPeerCtx* = ref object of RootObj
3737
blocksInFlight*: HashSet[BlockAddress] # blocks in flight towards peer
3838

3939
proc isKnowledgeStale*(self: BlockExcPeerCtx): bool =
40-
self.lastRefresh + 15.seconds < Moment.now()
40+
self.lastRefresh + 5.minutes < Moment.now()
4141

4242
proc isInFlight*(self: BlockExcPeerCtx, address: BlockAddress): bool =
4343
address in self.blocksInFlight
@@ -51,14 +51,11 @@ proc removeInFlight*(self: BlockExcPeerCtx, address: BlockAddress) =
5151
proc refreshed*(self: BlockExcPeerCtx) =
5252
self.lastRefresh = Moment.now()
5353

54-
proc peerHave*(self: BlockExcPeerCtx): seq[BlockAddress] =
55-
toSeq(self.blocks.keys)
56-
57-
proc peerHaveCids*(self: BlockExcPeerCtx): HashSet[Cid] =
58-
self.blocks.keys.toSeq.mapIt(it.cidOrTreeCid).toHashSet
59-
60-
proc peerWantsCids*(self: BlockExcPeerCtx): HashSet[Cid] =
61-
self.peerWants.mapIt(it.address.cidOrTreeCid).toHashSet
54+
proc peerHave*(self: BlockExcPeerCtx): HashSet[BlockAddress] =
55+
# XXX: this is ugly an inefficient, but since those will typically
56+
# be used in "joins", it's better to pay the price here and have
57+
# a linear join than to not do it and have a quadratic join.
58+
toHashSet(self.blocks.keys.toSeq)
6259

6360
proc contains*(self: BlockExcPeerCtx, address: BlockAddress): bool =
6461
address in self.blocks

codex/blockexchange/peers/peerctxstore.nim

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,21 +62,23 @@ func len*(self: PeerCtxStore): int =
6262
self.peers.len
6363

6464
func peersHave*(self: PeerCtxStore, address: BlockAddress): seq[BlockExcPeerCtx] =
65-
toSeq(self.peers.values).filterIt(it.peerHave.anyIt(it == address))
65+
toSeq(self.peers.values).filterIt(address in it.peerHave)
6666

6767
func peersHave*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] =
68+
# FIXME: this is way slower and can end up leading to unexpected performance loss.
6869
toSeq(self.peers.values).filterIt(it.peerHave.anyIt(it.cidOrTreeCid == cid))
6970

7071
func peersWant*(self: PeerCtxStore, address: BlockAddress): seq[BlockExcPeerCtx] =
71-
toSeq(self.peers.values).filterIt(it.peerWants.anyIt(it == address))
72+
toSeq(self.peers.values).filterIt(address in it.wantedBlocks)
7273

7374
func peersWant*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] =
74-
toSeq(self.peers.values).filterIt(it.peerWants.anyIt(it.address.cidOrTreeCid == cid))
75+
# FIXME: this is way slower and can end up leading to unexpected performance loss.
76+
toSeq(self.peers.values).filterIt(it.wantedBlocks.anyIt(it.cidOrTreeCid == cid))
7577

7678
proc getPeersForBlock*(self: PeerCtxStore, address: BlockAddress): PeersForBlock =
7779
var res: PeersForBlock = (@[], @[])
7880
for peer in self:
79-
if peer.peerHave.anyIt(it == address):
81+
if address in peer.peerHave:
8082
res.with.add(peer)
8183
else:
8284
res.without.add(peer)

codex/blockexchange/protobuf/message.nim

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@ type
2525

2626
WantListEntry* = object
2727
address*: BlockAddress
28+
# XXX: I think explicit priority is pointless as the peer will request
29+
# the blocks in the order it wants to receive them, and all we have to
30+
# do is process those in the same order as we send them back. It also
31+
# complicates things for no reason at the moment, as the priority is
32+
# always set to 0.
2833
priority*: int32 # The priority (normalized). default to 1
2934
cancel*: bool # Whether this revokes an entry
3035
wantType*: WantType # Note: defaults to enum 0, ie Block

0 commit comments

Comments
 (0)