Skip to content

Commit c324166

Browse files
committed
Enhance GSM view with PerasCertDiffusion information
Conflicts: ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound.hs
1 parent e624834 commit c324166

File tree

4 files changed

+101
-28
lines changed
  • ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus
  • ouroboros-consensus
    • src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion
    • test/consensus-test/Test/Consensus/MiniProtocol/ObjectDiffusion

4 files changed

+101
-28
lines changed

ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Network/NodeToNode.hs

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@ import Ouroboros.Consensus.MiniProtocol.ChainSync.Client
7070
import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client as CsClient
7171
import Ouroboros.Consensus.MiniProtocol.ChainSync.Server
7272
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound (objectDiffusionInbound)
73+
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.State
74+
( ObjectDiffusionInboundStateView
75+
, bracketObjectDiffusionInbound
76+
)
7377
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.PerasCert
7478
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Outbound (objectDiffusionOutbound)
7579
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.PerasCert
@@ -213,6 +217,7 @@ data Handlers m addr blk = Handlers
213217
, hPerasCertDiffusionClient ::
214218
NodeToNodeVersion ->
215219
ControlMessageSTM m ->
220+
ObjectDiffusionInboundStateView m ->
216221
ConnectionId addr ->
217222
PerasCertDiffusionInboundPipelined blk m ()
218223
, hPerasCertDiffusionServer ::
@@ -315,7 +320,7 @@ mkHandlers
315320
(mapTxSubmissionMempoolReader txForgetValidated $ getMempoolReader getMempool)
316321
(getMempoolWriter getMempool)
317322
version
318-
, hPerasCertDiffusionClient = \version controlMessageSTM peer ->
323+
, hPerasCertDiffusionClient = \version controlMessageSTM state peer ->
319324
objectDiffusionInbound
320325
(contramap (TraceLabelPeer peer) (Node.perasCertDiffusionInboundTracer tracers))
321326
( perasCertDiffusionMaxFifoLength miniProtocolParameters
@@ -325,6 +330,7 @@ mkHandlers
325330
(makePerasCertPoolWriterFromChainDB $ getChainDB)
326331
version
327332
controlMessageSTM
333+
state
328334
, hPerasCertDiffusionServer = \version peer ->
329335
objectDiffusionOutbound
330336
(contramap (TraceLabelPeer peer) (Node.perasCertDiffusionOutboundTracer tracers))
@@ -870,17 +876,22 @@ mkApps kernel rng Tracers{..} mkCodecs ByteLimits{..} chainSyncTimeouts lopBucke
870876
}
871877
channel = do
872878
labelThisThread "PerasCertDiffusionClient"
873-
((), trailing) <-
874-
runPipelinedPeerWithLimits
875-
(TraceLabelPeer them `contramap` tPerasCertDiffusionTracer)
876-
(cPerasCertDiffusionCodec (mkCodecs version))
877-
blPerasCertDiffusion
878-
timeLimitsObjectDiffusion
879-
channel
880-
( objectDiffusionInboundPeerPipelined
881-
(hPerasCertDiffusionClient version controlMessageSTM them)
882-
)
883-
return (NoInitiatorResult, trailing)
879+
bracketObjectDiffusionInbound
880+
version
881+
(getPerasCertDiffusionHandles kernel)
882+
them
883+
$ \state -> do
884+
((), trailing) <-
885+
runPipelinedPeerWithLimits
886+
(TraceLabelPeer them `contramap` tPerasCertDiffusionTracer)
887+
(cPerasCertDiffusionCodec (mkCodecs version))
888+
blPerasCertDiffusion
889+
timeLimitsObjectDiffusion
890+
channel
891+
( objectDiffusionInboundPeerPipelined
892+
(hPerasCertDiffusionClient version controlMessageSTM state them)
893+
)
894+
return (NoInitiatorResult, trailing)
884895

885896
aPerasCertDiffusionServer ::
886897
NodeToNodeVersion ->

ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs

Lines changed: 42 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ import Data.Functor ((<&>))
5050
import Data.Hashable (Hashable)
5151
import Data.List.NonEmpty (NonEmpty)
5252
import qualified Data.List.NonEmpty as NE
53-
import Data.Maybe (isJust, mapMaybe)
53+
import Data.Maybe (isJust, isNothing, mapMaybe)
5454
import Data.Proxy
5555
import Data.Set (Set)
5656
import qualified Data.Text as Text
@@ -82,8 +82,16 @@ import Ouroboros.Consensus.MiniProtocol.ChainSync.Client.HistoricityCheck
8282
import Ouroboros.Consensus.MiniProtocol.ChainSync.Client.InFutureCheck
8383
( SomeHeaderInFutureCheck
8484
)
85+
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.State
86+
( ObjectDiffusionInboundHandleCollection (..)
87+
, newObjectDiffusionInboundHandleCollection
88+
)
89+
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.PerasCert
90+
( PerasCertDiffusionInboundHandleCollection
91+
)
8592
import Ouroboros.Consensus.Node.GSM (GsmNodeKernelArgs (..))
8693
import qualified Ouroboros.Consensus.Node.GSM as GSM
94+
import Ouroboros.Consensus.Node.GSM.PeerState (gsmPeerIsIdle, maybeChainSyncState, mkGsmPeerStates)
8795
import Ouroboros.Consensus.Node.Genesis
8896
( GenesisNodeKernelArgs (..)
8997
, LoEAndGDDConfig (..)
@@ -175,6 +183,9 @@ data NodeKernel m addrNTN addrNTC blk = NodeKernel
175183
-- from it with 'GSM.gsmStateToLedgerJudgement'.
176184
, getChainSyncHandles :: ChainSyncClientHandleCollection (ConnectionId addrNTN) m blk
177185
-- ^ The kill handle and exposed state for each ChainSync client.
186+
, getPerasCertDiffusionHandles ::
187+
ObjectDiffusionInboundHandleCollection (ConnectionId addrNTN) m blk
188+
-- ^ The exposed state for each Peras CertDiffusion client.
178189
, getPeerSharingRegistry :: PeerSharingRegistry addrNTN m
179190
-- ^ Read the current peer sharing registry, used for interacting with
180191
-- the PeerSharing protocol
@@ -235,6 +246,7 @@ initNodeKernel
235246
args@NodeKernelArgs
236247
{ registry
237248
, cfg
249+
, featureFlags
238250
, tracers
239251
, chainDB
240252
, initChainDB
@@ -257,6 +269,7 @@ initNodeKernel
257269
, mempool
258270
, peerSharingRegistry
259271
, varChainSyncHandles
272+
, varPerasCertDiffusionHandles
260273
, varGsmState
261274
} = st
262275

@@ -275,24 +288,34 @@ initNodeKernel
275288
GSM.GsmView
276289
{ GSM.antiThunderingHerd = Just gsmAntiThunderingHerd
277290
, GSM.getCandidateOverSelection = do
278-
weights <- ChainDB.getPerasWeightSnapshot chainDB
279-
pure $ \(headers, _lst) state ->
280-
case AF.intersectionPoint headers (csCandidate state) of
281-
Nothing -> GSM.CandidateDoesNotIntersect
282-
Just{} ->
283-
GSM.WhetherCandidateIsBetter $ -- precondition requires intersection
284-
preferAnchoredCandidate
285-
(configBlock cfg)
286-
(forgetFingerprint weights)
287-
headers
288-
(csCandidate state)
289-
, GSM.peerIsIdle = csIdling
291+
weights <- forgetFingerprint <$> ChainDB.getPerasWeightSnapshot chainDB
292+
pure $ \(headers, _lst) peerState -> do
293+
case csCandidate <$> maybeChainSyncState peerState of
294+
Just candidate
295+
-- The candidate does not intersect with our current chain.
296+
-- This is a precondition for 'WhetherCandidateIsBetter'.
297+
| isNothing (AF.intersectionPoint headers candidate) ->
298+
GSM.CandidateDoesNotIntersect
299+
-- The candidate is better than our current chain.
300+
| preferAnchoredCandidate (configBlock cfg) weights headers candidate ->
301+
GSM.WhetherCandidateIsBetter True
302+
-- The candidate is not better than our current chain.
303+
| otherwise ->
304+
GSM.WhetherCandidateIsBetter False
305+
Nothing ->
306+
-- We don't have an established ChainSync connection with this peer.
307+
-- We conservatively assume that its candidate is not better than ours.
308+
GSM.WhetherCandidateIsBetter False
309+
, GSM.peerIsIdle = gsmPeerIsIdle featureFlags
290310
, GSM.durationUntilTooOld =
291311
gsmDurationUntilTooOld
292312
<&> \wd (_headers, lst) ->
293313
GSM.getDurationUntilTooOld wd (getTipSlot lst)
294314
, GSM.equivalent = (==) `on` (AF.headPoint . fst)
295-
, GSM.getPeerStates = traverse (readTVar . cschState) =<< cschcMap varChainSyncHandles
315+
, GSM.getPeerStates =
316+
mkGsmPeerStates
317+
varChainSyncHandles
318+
varPerasCertDiffusionHandles
296319
, GSM.getCurrentSelection = do
297320
headers <- ChainDB.getCurrentChainWithTime chainDB
298321
extLedgerState <- ChainDB.getCurrentLedger chainDB
@@ -369,6 +392,7 @@ initNodeKernel
369392
, getFetchMode = readFetchMode blockFetchInterface
370393
, getGsmState = readTVar varGsmState
371394
, getChainSyncHandles = varChainSyncHandles
395+
, getPerasCertDiffusionHandles = varPerasCertDiffusionHandles
372396
, getPeerSharingRegistry = peerSharingRegistry
373397
, getTracers = tracers
374398
, setBlockForging = \a -> atomically . LazySTM.putTMVar blockForgingVar $! a
@@ -419,6 +443,8 @@ data InternalState m addrNTN addrNTC blk = IS
419443
BlockFetchConsensusInterface (ConnectionId addrNTN) (HeaderWithTime blk) blk m
420444
, fetchClientRegistry :: FetchClientRegistry (ConnectionId addrNTN) (HeaderWithTime blk) blk m
421445
, varChainSyncHandles :: ChainSyncClientHandleCollection (ConnectionId addrNTN) m blk
446+
, varPerasCertDiffusionHandles ::
447+
PerasCertDiffusionInboundHandleCollection (ConnectionId addrNTN) m blk
422448
, varGsmState :: StrictTVar m GSM.GsmState
423449
, mempool :: Mempool m blk
424450
, peerSharingRegistry :: PeerSharingRegistry addrNTN m
@@ -457,6 +483,8 @@ initInternalState
457483
newTVarIO gsmState
458484

459485
varChainSyncHandles <- atomically newChainSyncClientHandleCollection
486+
varPerasCertDiffusionHandles <- atomically newObjectDiffusionInboundHandleCollection
487+
460488
mempool <-
461489
openMempool
462490
registry

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound.hs

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,11 @@ import Data.Word (Word64)
4545
import GHC.Generics (Generic)
4646
import Network.TypedProtocol.Core (N (Z), Nat (..), natToInt)
4747
import NoThunks.Class (NoThunks (..))
48+
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.State
49+
( ObjectDiffusionInboundStateView (..)
50+
)
4851
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API
52+
import Ouroboros.Consensus.MiniProtocol.Util.Idling qualified as Idling
4953
import Ouroboros.Consensus.Util.NormalForm.Invariant (noThunksInvariant)
5054
import Ouroboros.Network.ControlMessage
5155
import Ouroboros.Network.NodeToNode.Version (NodeToNodeVersion)
@@ -68,6 +72,8 @@ data TraceObjectDiffusionInbound objectId object
6872
TraceObjectDiffusionInboundRecvControlMessage ControlMessage
6973
| TraceObjectDiffusionInboundCanRequestMoreObjects Int
7074
| TraceObjectDiffusionInboundCannotRequestMoreObjects Int
75+
| TraceObjectDiffusionInboundStartedIdling
76+
| TraceObjectDiffusionInboundStoppedIdling
7177
deriving (Eq, Show)
7278

7379
data ObjectDiffusionInboundError objectId object
@@ -152,13 +158,15 @@ objectDiffusionInbound ::
152158
ObjectPoolWriter objectId object m ->
153159
NodeToNodeVersion ->
154160
ControlMessageSTM m ->
161+
ObjectDiffusionInboundStateView m ->
155162
ObjectDiffusionInboundPipelined objectId object m ()
156163
objectDiffusionInbound
157164
tracer
158165
(maxFifoLength, maxNumIdsToReq, maxNumObjectsToReq)
159166
ObjectPoolWriter{..}
160167
_version
161-
controlMessageSTM =
168+
controlMessageSTM
169+
state =
162170
ObjectDiffusionInboundPipelined $!
163171
checkState initialInboundSt & go Zero
164172
where
@@ -272,6 +280,13 @@ objectDiffusionInbound
272280
-- blocking call.
273281
traceWith tracer $
274282
TraceObjectDiffusionInboundCannotRequestMoreObjects (natToInt n)
283+
-- Before blocking, signal to the protocol client that we are idling
284+
--
285+
-- NOTE this change of state should be made explicit:
286+
-- https://github.com/tweag/cardano-peras/issues/144
287+
Idling.idlingStart (odisvIdling state)
288+
traceWith tracer $
289+
TraceObjectDiffusionInboundStartedIdling
275290
pure $! checkState st & goReqObjectIdsBlocking
276291

277292
-- We have pipelined some requests, so there are some replies in flight.
@@ -438,7 +453,16 @@ objectDiffusionInbound
438453
(numToAckOnNextReq st)
439454
numIdsToRequest
440455
( \neCollectedIds ->
441-
checkState st' & goCollect Zero (CollectObjectIds numIdsToRequest (NonEmpty.toList neCollectedIds))
456+
WithEffect $ do
457+
-- We just got some new object id's, so we are no longer idling
458+
--
459+
-- NOTE this change of state should be made explicit:
460+
-- https://github.com/tweag/cardano-peras/issues/144
461+
Idling.idlingStop (odisvIdling state)
462+
traceWith tracer $
463+
TraceObjectDiffusionInboundStoppedIdling
464+
pure $
465+
checkState st' & goCollect Zero (CollectObjectIds numIdsToRequest (NonEmpty.toList neCollectedIds))
442466
)
443467

444468
goReqObjectsAndObjectIdsPipelined ::

ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ObjectDiffusion/Smoke.hs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,15 @@ import NoThunks.Class (NoThunks)
3737
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound
3838
( objectDiffusionInbound
3939
)
40+
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.State
41+
( ObjectDiffusionInboundStateView (..)
42+
)
4043
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API
4144
( ObjectPoolReader (..)
4245
, ObjectPoolWriter (..)
4346
)
4447
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Outbound (objectDiffusionOutbound)
48+
import qualified Ouroboros.Consensus.MiniProtocol.Util.Idling as Idling
4549
import Ouroboros.Consensus.Util.IOLike
4650
( IOLike
4751
, MonadDelay (..)
@@ -283,6 +287,11 @@ prop_smoke_object_diffusion
283287
controlMessage <- uncheckedNewTVarM Continue
284288

285289
let
290+
inboundState =
291+
ObjectDiffusionInboundStateView
292+
{ odisvIdling = Idling.noIdling
293+
}
294+
286295
inbound =
287296
objectDiffusionInbound
288297
tracer
@@ -293,6 +302,7 @@ prop_smoke_object_diffusion
293302
inboundPoolWriter
294303
nodeToNodeVersion
295304
(readTVar controlMessage)
305+
inboundState
296306

297307
outbound =
298308
objectDiffusionOutbound

0 commit comments

Comments
 (0)