Skip to content

Commit 313d6ba

Browse files
committed
fix: refresh timestamp before issuing request to prevent flood of knowledge updates
1 parent 03a1cc7 commit 313d6ba

File tree

3 files changed

+175
-4
lines changed

3 files changed

+175
-4
lines changed

codex/blockexchange/engine/engine.nim

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -164,26 +164,34 @@ proc sendWantBlock(
164164
proc refreshBlockKnowledge(
165165
self: BlockExcEngine, peer: BlockExcPeerCtx
166166
) {.async: (raises: [CancelledError]).} =
167-
# broadcast our want list, the other peer will do the same
168167
if self.pendingBlocks.wantListLen > 0:
169168
let cids = toSeq(self.pendingBlocks.wantList)
170169
trace "Sending our want list to a peer", peer = peer.id, length = cids.len
171170
await self.network.request.sendWantList(peer.id, cids, full = true)
172171

173172
proc refreshBlockKnowledge(self: BlockExcEngine) {.async: (raises: [CancelledError]).} =
174-
for peer in self.peers.peers.values:
173+
for peer in self.peers.peers.values.toSeq:
175174
# We refresh block knowledge if:
176175
# 1. the peer hasn't been refreshed in a while;
177-
# 2. the list of blocks we care about has actually changed.
176+
# 2. the list of blocks we care about has changed.
178177
#
179178
# Note that because of (2), it is important that we update our
180179
# want list in the coarsest way possible instead of over many
181180
# small updates.
182181
#
183182
# In dynamic swarms, staleness will dominate latency.
184183
if peer.lastRefresh < self.pendingBlocks.lastInclusion or peer.isKnowledgeStale:
185-
await self.refreshBlockKnowledge(peer)
184+
# FIXME: we update the lastRefresh before actually refreshing because otherwise
185+
# a slow peer will be bombarded with requests. If the request does fail or the
186+
# peer does not reply, a retrying block will eventually issue this again. This
187+
# is a complex and convoluted flow - ideally we should simply be tracking this
188+
# request and retrying it on the absence of a response, eventually disconnecting
189+
# the peer if it consistently fails to respond.
186190
peer.refreshed()
191+
# TODO: optimize this by keeping track of what was sent and sending deltas.
192+
# This should allow us to run much more frequent refreshes, and be way more
193+
# efficient about it.
194+
await self.refreshBlockKnowledge(peer)
187195

188196
proc randomPeer(peers: seq[BlockExcPeerCtx]): BlockExcPeerCtx =
189197
Rng.instance.sample(peers)
@@ -220,6 +228,9 @@ proc downloadInternal(
220228
else:
221229
self.pendingBlocks.setInFlight(address, false)
222230
if peers.without.len > 0:
231+
# We have peers connected, but none of them have the block. This
232+
# could be because our knowledge about what they have has run stale.
233+
# Tries to refresh it.
223234
await self.refreshBlockKnowledge()
224235
self.discovery.queueFindBlocksReq(@[address.cidOrTreeCid])
225236

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
import std/assertions
2+
import std/enumerate
3+
import std/sugar
4+
5+
import pkg/chronos
6+
import pkg/libp2p
7+
8+
import pkg/codex/manifest
9+
import pkg/codex/merkletree
10+
import pkg/codex/blockexchange
11+
import pkg/codex/blockexchange/network/network {.all.}
12+
import pkg/codex/blockexchange/protobuf/[message, blockexc]
13+
import pkg/codex/blocktype
14+
import pkg/codex/rng
15+
16+
import ../../helpers
17+
18+
type
19+
## Fake network in which one real peer can talk to
20+
## k fake peers.
21+
FakeNetwork* = ref object
22+
fakePeers*: Table[PeerId, FakePeer]
23+
sender*: BlockExcNetwork
24+
25+
FakePeer* = ref object
26+
id*: PeerId
27+
fakeNetwork*: FakeNetwork
28+
pendingRequests*: seq[BlockAddress]
29+
blocks*: Table[BlockAddress, Block]
30+
proofs*: Table[BlockAddress, CodexProof]
31+
32+
Dataset* = object
33+
blocks*: seq[Block]
34+
proofs*: seq[CodexProof]
35+
manifest*: Manifest
36+
37+
proc makePeerId(): PeerId =
38+
let
39+
gen = Rng.instance()
40+
secKey = PrivateKey.random(gen[]).tryGet()
41+
42+
return PeerId.init(secKey.getPublicKey().tryGet()).tryGet()
43+
44+
proc newDataset*(
45+
nBlocks: int = 5, blockSize: NBytes = 1024.NBytes
46+
): Future[Dataset] {.async.} =
47+
let
48+
blocks = await makeRandomBlocks(blockSize.int * nBlocks, blockSize)
49+
(manifest, tree) = makeManifestAndTree(blocks).tryGet()
50+
treeCid = tree.rootCid.tryGet()
51+
52+
return Dataset(
53+
blocks: blocks,
54+
proofs: (0 ..< blocks.len).mapIt(tree.getProof(it).tryGet()).toSeq,
55+
manifest: manifest,
56+
)
57+
58+
proc storeDataset*(self: FakePeer, dataset: Dataset, slice: HSlice[int, int] = 1 .. 0) =
59+
let actualSlice =
60+
if slice.len == 0:
61+
0 ..< dataset.blocks.len
62+
else:
63+
slice
64+
65+
for index in actualSlice:
66+
let address = BlockAddress.init(dataset.manifest.treeCid, index.Natural)
67+
self.proofs[address] = dataset.proofs[index]
68+
self.blocks[address] = dataset.blocks[index]
69+
70+
proc blockPresences(self: FakePeer, addresses: seq[BlockAddress]): seq[BlockPresence] =
71+
collect:
72+
for address in addresses:
73+
if self.blocks.hasKey(address):
74+
BlockPresence(address: address, `type`: BlockPresenceType.Have)
75+
76+
proc getPeer(self: FakeNetwork, id: PeerId): FakePeer =
77+
try:
78+
return self.fakePeers[id]
79+
except KeyError as exc:
80+
raise newException(Defect, "peer not found")
81+
82+
proc newInstrumentedNetwork(self: FakeNetwork): BlockExcNetwork =
83+
var sender = BlockExcNetwork()
84+
85+
proc sendWantList(
86+
id: PeerId,
87+
addresses: seq[BlockAddress],
88+
priority: int32 = 0,
89+
cancel: bool = false,
90+
wantType: WantType = WantType.WantHave,
91+
full: bool = false,
92+
sendDontHave: bool = false,
93+
) {.async: (raises: [CancelledError]).} =
94+
var peer = self.getPeer(id)
95+
case wantType
96+
# WantHaves are replied to immediately.
97+
of WantType.WantHave:
98+
let haves = peer.blockPresences(addresses)
99+
if haves.len > 0:
100+
await sender.handlers.onPresence(id, haves)
101+
102+
# WantBlocks are deferred till `sendPendingBlocks` is called.
103+
of WantType.WantBlock:
104+
let blockAddresses = addresses.filterIt(peer.blocks.hasKey(it)).toSeq
105+
if blockAddresses.len > 0:
106+
for blockAddress in blockAddresses:
107+
peer.pendingRequests.add(blockAddress)
108+
109+
proc sendBlocksDelivery(
110+
id: PeerId, blocksDelivery: seq[BlockDelivery]
111+
) {.async: (raises: [CancelledError]).} =
112+
var peer = self.getPeer(id)
113+
for delivery in blocksDelivery:
114+
peer.blocks[delivery.address] = delivery.blk
115+
if delivery.proof.isSome:
116+
peer.proofs[delivery.address] = delivery.proof.get
117+
118+
sender.request = BlockExcRequest(
119+
sendWantList: sendWantList,
120+
sendBlocksDelivery: sendBlocksDelivery,
121+
sendWantCancellations: proc(
122+
id: PeerId, addresses: seq[BlockAddress]
123+
) {.async: (raises: [CancelledError]).} =
124+
discard,
125+
)
126+
127+
return sender
128+
129+
proc sendPendingBlocks*(self: FakePeer) {.async.} =
130+
## Replies to any pending block requests.
131+
let blocks = collect:
132+
for blockAddress in self.pendingRequests:
133+
if not self.blocks.hasKey(blockAddress):
134+
continue
135+
136+
let proof =
137+
if blockAddress in self.proofs:
138+
self.proofs[blockAddress].some
139+
else:
140+
CodexProof.none
141+
142+
BlockDelivery(address: blockAddress, blk: self.blocks[blockAddress], proof: proof)
143+
144+
await self.fakeNetwork.sender.handlers.onBlocksDelivery(self.id, blocks)
145+
146+
proc newPeer*(self: FakeNetwork): FakePeer =
147+
## Adds a new `FakePeer` to a `FakeNetwork`.
148+
let peer = FakePeer(id: makePeerId(), fakeNetwork: self)
149+
self.fakePeers[peer.id] = peer
150+
return peer
151+
152+
proc new*(_: type FakeNetwork): FakeNetwork =
153+
let fakeNetwork = FakeNetwork()
154+
fakeNetwork.sender = fakeNetwork.newInstrumentedNetwork()
155+
return fakeNetwork

tests/codex/blockexchange/engine/testengine.nim

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -520,6 +520,11 @@ asyncchecksuite "Block Download":
520520
expect CancelledError:
521521
discard (await pending).tryGet()
522522

523+
# test "Should not keep looking up providers for the same dataset repeatedly":
524+
# let
525+
# blocks = await makeRandomBlocks(datasetSize = 4096, blockSize = 128'nb)
526+
# manifest = await storeDataGetManifest(store, blocks)
527+
523528
asyncchecksuite "Task Handler":
524529
var
525530
rng: Rng

0 commit comments

Comments
 (0)