From 01ede746a478f56eb43355ce56329ae416b64c03 Mon Sep 17 00:00:00 2001 From: Georgy Lukyanov Date: Mon, 1 Dec 2025 13:14:20 +0100 Subject: [PATCH] Revert "[Peras 4] Add ObjectDiffusion and `PerasCert` diffusion (instance of ObjectDiffusion) (#1679)" This reverts commit c0d8e62e452fdb4a5d8b06ca8bf5618709c1b4c7, reversing changes made to fe2513c5c888aba8f24f3b3136451b523d7a03ab. --- cabal.project | 13 +- ...9_095938_thomas.bagrel_object_diffusion.md | 23 - .../Ouroboros/Consensus/Cardano/Node.hs | 1 - .../Shelley/Ledger/NetworkProtocolVersion.hs | 1 - ...9_095930_thomas.bagrel_object_diffusion.md | 27 - .../Ouroboros/Consensus/Network/NodeToNode.hs | 176 ++----- .../Ouroboros/Consensus/Node.hs | 2 - .../Ouroboros/Consensus/Node/Tracers.hs | 11 - .../Test/ThreadNet/Network.hs | 16 +- .../test/mock-test/Test/ThreadNet/BFT.hs | 1 - ...8_104810_thomas.bagrel_object_diffusion.md | 28 - ouroboros-consensus/ouroboros-consensus.cabal | 7 - .../MiniProtocol/ObjectDiffusion/Inbound.hs | 492 ------------------ .../ObjectDiffusion/ObjectPool/API.hs | 67 --- .../ObjectDiffusion/ObjectPool/PerasCert.hs | 114 ---- .../MiniProtocol/ObjectDiffusion/Outbound.hs | 278 ---------- .../MiniProtocol/ObjectDiffusion/PerasCert.hs | 41 -- .../Ouroboros/Consensus/Node/Serialisation.hs | 40 +- .../test/consensus-test/Main.hs | 4 - .../ObjectDiffusion/PerasCert/Smoke.hs | 152 ------ .../MiniProtocol/ObjectDiffusion/Smoke.hs | 326 ------------ .../Storage/PerasCertDB/StateMachine.hs | 13 +- 22 files changed, 45 insertions(+), 1788 deletions(-) delete mode 100644 ouroboros-consensus-cardano/changelog.d/20250919_095938_thomas.bagrel_object_diffusion.md delete mode 100644 ouroboros-consensus-diffusion/changelog.d/20250919_095930_thomas.bagrel_object_diffusion.md delete mode 100644 ouroboros-consensus/changelog.d/20250918_104810_thomas.bagrel_object_diffusion.md delete mode 100644 ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound.hs delete mode 100644 ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/ObjectPool/API.hs delete mode 100644 ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/ObjectPool/PerasCert.hs delete mode 100644 ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Outbound.hs delete mode 100644 ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/PerasCert.hs delete mode 100644 ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ObjectDiffusion/PerasCert/Smoke.hs delete mode 100644 ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ObjectDiffusion/Smoke.hs diff --git a/cabal.project b/cabal.project index 82cbf035ba..b983c81b56 100644 --- a/cabal.project +++ b/cabal.project @@ -80,19 +80,12 @@ source-repository-package eras/byron/ledger/impl eras/byron/crypto -allow-newer: - -- https://github.com/phadej/vec/issues/121 - , ral:QuickCheck - , fin:QuickCheck - , bin:QuickCheck - --- Using https://github.com/IntersectMBO/ouroboros-network/tree/peras-staging/pr-5202-v2 +-- Backported version of https://github.com/IntersectMBO/ouroboros-network/pull/5161 source-repository-package type: git location: https://github.com/IntersectMBO/ouroboros-network - tag: 0db8669b67982cba755e80bf2e413527def41244 - --sha256: sha256-vEO721Xab0RTVKFQFKal5VCV5y+OUzELo8+7Z8TETJQ= + tag: 1385b53cefb81e79553b6b0252537455833ea9c4 + --sha256: sha256-zZ7WsMfRs1fG16bmvI5vIh4fhQ8RGyEvYGLSWlrxpg0= subdir: - ouroboros-network-protocols ouroboros-network-api ouroboros-network diff --git a/ouroboros-consensus-cardano/changelog.d/20250919_095938_thomas.bagrel_object_diffusion.md b/ouroboros-consensus-cardano/changelog.d/20250919_095938_thomas.bagrel_object_diffusion.md deleted file mode 100644 index 9efe00d438..0000000000 --- a/ouroboros-consensus-cardano/changelog.d/20250919_095938_thomas.bagrel_object_diffusion.md +++ /dev/null @@ -1,23 +0,0 @@ - - - - - -### Breaking - -- Added support for `NodeToNodeV_16` diff --git a/ouroboros-consensus-cardano/src/ouroboros-consensus-cardano/Ouroboros/Consensus/Cardano/Node.hs b/ouroboros-consensus-cardano/src/ouroboros-consensus-cardano/Ouroboros/Consensus/Cardano/Node.hs index 98093f86c5..52a7e4f910 100644 --- a/ouroboros-consensus-cardano/src/ouroboros-consensus-cardano/Ouroboros/Consensus/Cardano/Node.hs +++ b/ouroboros-consensus-cardano/src/ouroboros-consensus-cardano/Ouroboros/Consensus/Cardano/Node.hs @@ -430,7 +430,6 @@ instance Map.fromList $ [ (NodeToNodeV_14, CardanoNodeToNodeVersion2) , (NodeToNodeV_15, CardanoNodeToNodeVersion2) - , (NodeToNodeV_16, CardanoNodeToNodeVersion2) ] supportedNodeToClientVersions _ = diff --git a/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Ledger/NetworkProtocolVersion.hs b/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Ledger/NetworkProtocolVersion.hs index 7003a5ce8a..c03e0e5179 100644 --- a/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Ledger/NetworkProtocolVersion.hs +++ b/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Ledger/NetworkProtocolVersion.hs @@ -48,7 +48,6 @@ instance SupportedNetworkProtocolVersion (ShelleyBlock proto era) where Map.fromList [ (NodeToNodeV_14, ShelleyNodeToNodeVersion1) , (NodeToNodeV_15, ShelleyNodeToNodeVersion1) - , (NodeToNodeV_16, ShelleyNodeToNodeVersion1) ] supportedNodeToClientVersions _ = Map.fromList diff --git a/ouroboros-consensus-diffusion/changelog.d/20250919_095930_thomas.bagrel_object_diffusion.md b/ouroboros-consensus-diffusion/changelog.d/20250919_095930_thomas.bagrel_object_diffusion.md deleted file mode 100644 index 8ba76bb8ff..0000000000 --- a/ouroboros-consensus-diffusion/changelog.d/20250919_095930_thomas.bagrel_object_diffusion.md +++ /dev/null @@ -1,27 +0,0 @@ - - - - - -### Breaking - -- Modify `Ouroboros.Consensus{.Node,.Node.Tracer,.Network.NodeToNode}` to wire-in PerasCertDiffusion similarly to other mini-protocols (e.g. TX-submission) - -### Non-Breaking - -- Update `Test.ThreadNet.Network` in `unstable-diffusion-testlib` accordingly to the changes made in `Ouroboros.Consensus.Network.NodeToNode` diff --git a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Network/NodeToNode.hs b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Network/NodeToNode.hs index 294aace61f..bc66cf78d6 100644 --- a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Network/NodeToNode.hs +++ b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Network/NodeToNode.hs @@ -54,7 +54,6 @@ import qualified Data.ByteString.Lazy as BSL import Data.Hashable (Hashable) import Data.Int (Int64) import Data.Map.Strict (Map) -import qualified Data.Set as Set import Data.Void (Void) import qualified Network.Mux as Mux import Network.TypedProtocol.Codec @@ -69,10 +68,6 @@ import Ouroboros.Consensus.MiniProtocol.ChainSync.Client ) import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client as CsClient import Ouroboros.Consensus.MiniProtocol.ChainSync.Server -import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound (objectDiffusionInbound) -import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.PerasCert -import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Outbound (objectDiffusionOutbound) -import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.PerasCert import Ouroboros.Consensus.Node.ExitPolicy import Ouroboros.Consensus.Node.NetworkProtocolVersion import Ouroboros.Consensus.Node.Run @@ -86,6 +81,10 @@ import Ouroboros.Consensus.Util.IOLike import Ouroboros.Consensus.Util.Orphans () import Ouroboros.Network.Block ( Serialised (..) + , decodePoint + , decodeTip + , encodePoint + , encodeTip ) import Ouroboros.Network.BlockFetch import Ouroboros.Network.BlockFetch.Client @@ -125,18 +124,6 @@ import Ouroboros.Network.Protocol.KeepAlive.Client import Ouroboros.Network.Protocol.KeepAlive.Codec import Ouroboros.Network.Protocol.KeepAlive.Server import Ouroboros.Network.Protocol.KeepAlive.Type -import Ouroboros.Network.Protocol.ObjectDiffusion.Codec - ( byteLimitsObjectDiffusion - , codecObjectDiffusion - , codecObjectDiffusionId - , timeLimitsObjectDiffusion - ) -import Ouroboros.Network.Protocol.ObjectDiffusion.Inbound - ( objectDiffusionInboundPeerPipelined - ) -import Ouroboros.Network.Protocol.ObjectDiffusion.Outbound - ( objectDiffusionOutboundPeer - ) import Ouroboros.Network.Protocol.PeerSharing.Client ( PeerSharingClient , peerSharingClientPeer @@ -210,15 +197,6 @@ data Handlers m addr blk = Handlers NodeToNodeVersion -> ConnectionId addr -> TxSubmissionServerPipelined (GenTxId blk) (GenTx blk) m () - , hPerasCertDiffusionClient :: - NodeToNodeVersion -> - ControlMessageSTM m -> - ConnectionId addr -> - PerasCertDiffusionInboundPipelined blk m () - , hPerasCertDiffusionServer :: - NodeToNodeVersion -> - ConnectionId addr -> - PerasCertDiffusionOutbound blk m () , hKeepAliveClient :: NodeToNodeVersion -> ControlMessageSTM m -> @@ -315,22 +293,6 @@ mkHandlers (mapTxSubmissionMempoolReader txForgetValidated $ getMempoolReader getMempool) (getMempoolWriter getMempool) version - , hPerasCertDiffusionClient = \version controlMessageSTM peer -> - objectDiffusionInbound - (contramap (TraceLabelPeer peer) (Node.perasCertDiffusionInboundTracer tracers)) - ( perasCertDiffusionMaxFifoLength miniProtocolParameters - , 10 -- TODO: see https://github.com/tweag/cardano-peras/issues/97 - , 10 -- TODO: see https://github.com/tweag/cardano-peras/issues/97 - ) - (makePerasCertPoolWriterFromChainDB $ getChainDB) - version - controlMessageSTM - , hPerasCertDiffusionServer = \version peer -> - objectDiffusionOutbound - (contramap (TraceLabelPeer peer) (Node.perasCertDiffusionOutboundTracer tracers)) - (perasCertDiffusionMaxFifoLength miniProtocolParameters) - (makePerasCertPoolReaderFromChainDB $ getChainDB) - version , hKeepAliveClient = \_version -> keepAliveClient (Node.keepAliveClientTracer tracers) keepAliveRng , hKeepAliveServer = \_version _peer -> keepAliveServer , hPeerSharingClient = \_version controlMessageSTM _peer -> peerSharingClient controlMessageSTM @@ -342,7 +304,7 @@ mkHandlers -------------------------------------------------------------------------------} -- | Node-to-node protocol codecs needed to run 'Handlers'. -data Codecs blk addr e m bCS bSCS bBF bSBF bTX bPCD bKA bPS = Codecs +data Codecs blk addr e m bCS bSCS bBF bSBF bTX bKA bPS = Codecs { cChainSyncCodec :: Codec (ChainSync (Header blk) (Point blk) (Tip blk)) e m bCS , cChainSyncCodecSerialised :: Codec (ChainSync (SerialisedHeader blk) (Point blk) (Tip blk)) e m bSCS @@ -350,7 +312,6 @@ data Codecs blk addr e m bCS bSCS bBF bSBF bTX bPCD bKA bPS = Codecs , cBlockFetchCodecSerialised :: Codec (BlockFetch (Serialised blk) (Point blk)) e m bSBF , cTxSubmission2Codec :: Codec (TxSubmission2 (GenTxId blk) (GenTx blk)) e m bTX - , cPerasCertDiffusionCodec :: Codec (PerasCertDiffusion blk) e m bPCD , cKeepAliveCodec :: Codec KeepAlive e m bKA , cPeerSharingCodec :: Codec (PeerSharing addr) e m bPS } @@ -378,53 +339,49 @@ defaultCodecs :: ByteString ByteString ByteString - ByteString defaultCodecs ccfg version encAddr decAddr nodeToNodeVersion = Codecs { cChainSyncCodec = codecChainSync enc dec - enc - dec - enc - dec + (encodePoint (encodeRawHash p)) + (decodePoint (decodeRawHash p)) + (encodeTip (encodeRawHash p)) + (decodeTip (decodeRawHash p)) , cChainSyncCodecSerialised = codecChainSync enc dec - enc - dec - enc - dec + (encodePoint (encodeRawHash p)) + (decodePoint (decodeRawHash p)) + (encodeTip (encodeRawHash p)) + (decodeTip (decodeRawHash p)) , cBlockFetchCodec = codecBlockFetch enc dec - enc - dec + (encodePoint (encodeRawHash p)) + (decodePoint (decodeRawHash p)) , cBlockFetchCodecSerialised = codecBlockFetch enc dec - enc - dec + (encodePoint (encodeRawHash p)) + (decodePoint (decodeRawHash p)) , cTxSubmission2Codec = codecTxSubmission2 enc dec enc dec - , cPerasCertDiffusionCodec = - codecObjectDiffusion - enc - dec - enc - dec , cKeepAliveCodec = codecKeepAlive_v2 , cPeerSharingCodec = codecPeerSharing (encAddr nodeToNodeVersion) (decAddr nodeToNodeVersion) } where + p :: Proxy blk + p = Proxy + enc :: SerialiseNodeToNode blk a => a -> Encoding enc = encodeNodeToNode ccfg version @@ -444,7 +401,6 @@ identityCodecs :: (AnyMessage (BlockFetch blk (Point blk))) (AnyMessage (BlockFetch (Serialised blk) (Point blk))) (AnyMessage (TxSubmission2 (GenTxId blk) (GenTx blk))) - (AnyMessage (PerasCertDiffusion blk)) (AnyMessage KeepAlive) (AnyMessage (PeerSharing addr)) identityCodecs = @@ -454,7 +410,6 @@ identityCodecs = , cBlockFetchCodec = codecBlockFetchId , cBlockFetchCodecSerialised = codecBlockFetchId , cTxSubmission2Codec = codecTxSubmission2Id - , cPerasCertDiffusionCodec = codecObjectDiffusionId , cKeepAliveCodec = codecKeepAliveId , cPeerSharingCodec = codecPeerSharingId } @@ -477,7 +432,6 @@ data Tracers' peer ntnAddr blk e f = Tracers f (TraceLabelPeer peer (TraceSendRecv (BlockFetch (Serialised blk) (Point blk)))) , tTxSubmission2Tracer :: f (TraceLabelPeer peer (TraceSendRecv (TxSubmission2 (GenTxId blk) (GenTx blk)))) - , tPerasCertDiffusionTracer :: f (TraceLabelPeer peer (TraceSendRecv (PerasCertDiffusion blk))) , tKeepAliveTracer :: f (TraceLabelPeer peer (TraceSendRecv KeepAlive)) , tPeerSharingTracer :: f (TraceLabelPeer peer (TraceSendRecv (PeerSharing ntnAddr))) } @@ -490,7 +444,6 @@ instance (forall a. Semigroup (f a)) => Semigroup (Tracers' peer ntnAddr blk e f , tBlockFetchTracer = f tBlockFetchTracer , tBlockFetchSerialisedTracer = f tBlockFetchSerialisedTracer , tTxSubmission2Tracer = f tTxSubmission2Tracer - , tPerasCertDiffusionTracer = f tPerasCertDiffusionTracer , tKeepAliveTracer = f tKeepAliveTracer , tPeerSharingTracer = f tPeerSharingTracer } @@ -511,7 +464,6 @@ nullTracers = , tBlockFetchTracer = nullTracer , tBlockFetchSerialisedTracer = nullTracer , tTxSubmission2Tracer = nullTracer - , tPerasCertDiffusionTracer = nullTracer , tKeepAliveTracer = nullTracer , tPeerSharingTracer = nullTracer } @@ -533,7 +485,6 @@ showTracers tr = , tBlockFetchTracer = showTracing tr , tBlockFetchSerialisedTracer = showTracing tr , tTxSubmission2Tracer = showTracing tr - , tPerasCertDiffusionTracer = showTracing tr , tKeepAliveTracer = showTracing tr , tPeerSharingTracer = showTracing tr } @@ -558,7 +509,7 @@ type ServerApp m addr bytes a = -- | Applications for the node-to-node protocols -- -- See 'Network.Mux.Types.MuxApplication' -data Apps m addr bCS bBF bTX bPCD bKA bPS a b = Apps +data Apps m addr bCS bBF bTX bKA bPS a b = Apps { aChainSyncClient :: ClientApp m addr bCS a -- ^ Start a chain sync client that communicates with the given upstream -- node. @@ -574,10 +525,6 @@ data Apps m addr bCS bBF bTX bPCD bKA bPS a b = Apps -- given upstream node. , aTxSubmission2Server :: ServerApp m addr bTX b -- ^ Start a transaction submission v2 server. - , aPerasCertDiffusionClient :: ClientApp m addr bPCD a - -- ^ Start a Peras cert diffusion client. - , aPerasCertDiffusionServer :: ServerApp m addr bPCD b - -- ^ Start a Peras cert diffusion server. , aKeepAliveClient :: ClientApp m addr bKA a -- ^ Start a keep-alive client. , aKeepAliveServer :: ServerApp m addr bKA b @@ -593,7 +540,7 @@ data Apps m addr bCS bBF bTX bPCD bKA bPS a b = Apps -- -- They don't depend on the instantiation of the protocol parameters (which -- block type is used, etc.), hence the use of 'RankNTypes'. -data ByteLimits bCS bBF bTX bPCD bKA bPS = ByteLimits +data ByteLimits bCS bBF bTX bKA bPS = ByteLimits { blChainSync :: forall header point tip. ProtocolSizeLimits @@ -609,11 +556,6 @@ data ByteLimits bCS bBF bTX bPCD bKA bPS = ByteLimits ProtocolSizeLimits (TxSubmission2 txid tx) bTX - , blPerasCertDiffusion :: - forall blk. - ProtocolSizeLimits - (PerasCertDiffusion blk) - bPCD , blKeepAlive :: ProtocolSizeLimits KeepAlive @@ -625,24 +567,22 @@ data ByteLimits bCS bBF bTX bPCD bKA bPS = ByteLimits bPS } -noByteLimits :: ByteLimits bCS bBF bTX bPCD bKA bPS +noByteLimits :: ByteLimits bCS bBF bTX bKA bPS noByteLimits = ByteLimits { blChainSync = byteLimitsChainSync (const 0) , blBlockFetch = byteLimitsBlockFetch (const 0) , blTxSubmission2 = byteLimitsTxSubmission2 (const 0) - , blPerasCertDiffusion = byteLimitsObjectDiffusion (const 0) , blKeepAlive = byteLimitsKeepAlive (const 0) , blPeerSharing = byteLimitsPeerSharing (const 0) } -byteLimits :: ByteLimits ByteString ByteString ByteString ByteString ByteString ByteString +byteLimits :: ByteLimits ByteString ByteString ByteString ByteString ByteString byteLimits = ByteLimits { blChainSync = byteLimitsChainSync size , blBlockFetch = byteLimitsBlockFetch size , blTxSubmission2 = byteLimitsTxSubmission2 size - , blPerasCertDiffusion = byteLimitsObjectDiffusion size , blKeepAlive = byteLimitsKeepAlive size , blPeerSharing = byteLimitsPeerSharing size } @@ -654,7 +594,7 @@ byteLimits = -- | Construct the 'NetworkApplication' for the node-to-node protocols mkApps :: - forall m addrNTN addrNTC blk e bCS bBF bTX bPCD bKA bPS. + forall m addrNTN addrNTC blk e bCS bBF bTX bKA bPS. ( IOLike m , MonadTimer m , Ord addrNTN @@ -669,8 +609,8 @@ mkApps :: NodeKernel m addrNTN addrNTC blk -> StdGen -> Tracers m addrNTN blk e -> - (NodeToNodeVersion -> Codecs blk addrNTN e m bCS bCS bBF bBF bTX bPCD bKA bPS) -> - ByteLimits bCS bBF bTX bPCD bKA bPS -> + (NodeToNodeVersion -> Codecs blk addrNTN e m bCS bCS bBF bBF bTX bKA bPS) -> + ByteLimits bCS bBF bTX bKA bPS -> -- Chain-Sync timeouts for chain-sync client (using `Header blk`) as well as -- the server (`SerialisedHeader blk`). (forall header. ProtocolTimeLimitsWithRnd (ChainSync header (Point blk) (Tip blk))) -> @@ -678,7 +618,7 @@ mkApps :: CsClient.CSJConfig -> ReportPeerMetrics m (ConnectionId addrNTN) -> Handlers m addrNTN blk -> - Apps m addrNTN bCS bBF bTX bPCD bKA bPS NodeToNodeInitiatorResult () + Apps m addrNTN bCS bBF bTX bKA bPS NodeToNodeInitiatorResult () mkApps kernel rng Tracers{..} mkCodecs ByteLimits{..} chainSyncTimeouts lopBucketConfig csjConfig ReportPeerMetrics{..} Handlers{..} = Apps{..} where @@ -857,51 +797,6 @@ mkApps kernel rng Tracers{..} mkCodecs ByteLimits{..} chainSyncTimeouts lopBucke channel (txSubmissionServerPeerPipelined (hTxSubmissionServer version them)) - aPerasCertDiffusionClient :: - NodeToNodeVersion -> - ExpandedInitiatorContext addrNTN m -> - Channel m bPCD -> - m (NodeToNodeInitiatorResult, Maybe bPCD) - aPerasCertDiffusionClient - version - ExpandedInitiatorContext - { eicConnectionId = them - , eicControlMessage = controlMessageSTM - } - channel = do - labelThisThread "PerasCertDiffusionClient" - ((), trailing) <- - runPipelinedPeerWithLimits - (TraceLabelPeer them `contramap` tPerasCertDiffusionTracer) - (cPerasCertDiffusionCodec (mkCodecs version)) - blPerasCertDiffusion - timeLimitsObjectDiffusion - channel - ( objectDiffusionInboundPeerPipelined - (hPerasCertDiffusionClient version controlMessageSTM them) - ) - return (NoInitiatorResult, trailing) - - aPerasCertDiffusionServer :: - NodeToNodeVersion -> - ResponderContext addrNTN -> - Channel m bPCD -> - m ((), Maybe bPCD) - aPerasCertDiffusionServer - version - ResponderContext{rcConnectionId = them} - channel = do - labelThisThread "PerasCertDiffusionServer" - runPeerWithLimits - (TraceLabelPeer them `contramap` tPerasCertDiffusionTracer) - (cPerasCertDiffusionCodec (mkCodecs version)) - blPerasCertDiffusion - timeLimitsObjectDiffusion - channel - ( objectDiffusionOutboundPeer - (hPerasCertDiffusionServer version them) - ) - aKeepAliveClient :: NodeToNodeVersion -> ExpandedInitiatorContext addrNTN m -> @@ -1005,11 +900,10 @@ initiator :: MiniProtocolParameters -> NodeToNodeVersion -> NodeToNodeVersionData -> - Apps m addr b b b b b b a c -> + Apps m addr b b b b b a c -> OuroborosBundleWithExpandedCtx 'Mux.InitiatorMode addr b m a Void initiator miniProtocolParameters version versionData Apps{..} = nodeToNodeProtocols - Set.empty -- TODO: change for a meaningful value miniProtocolParameters -- TODO: currently consensus is using 'ConnectionId' for its 'peer' type. -- This is currently ok, as we might accept multiple connections from the @@ -1024,9 +918,6 @@ initiator miniProtocolParameters version versionData Apps{..} = (InitiatorProtocolOnly (MiniProtocolCb (\ctx -> aBlockFetchClient version ctx))) , txSubmissionProtocol = (InitiatorProtocolOnly (MiniProtocolCb (\ctx -> aTxSubmission2Client version ctx))) - , perasCertDiffusionProtocol = - (InitiatorProtocolOnly (MiniProtocolCb (\ctx -> aPerasCertDiffusionClient version ctx))) - , perasVoteDiffusionProtocol = error "perasVoteDiffusionProtocol not implemented" , keepAliveProtocol = (InitiatorProtocolOnly (MiniProtocolCb (\ctx -> aKeepAliveClient version ctx))) , peerSharingProtocol = @@ -1045,11 +936,10 @@ initiatorAndResponder :: MiniProtocolParameters -> NodeToNodeVersion -> NodeToNodeVersionData -> - Apps m addr b b b b b b a c -> + Apps m addr b b b b b a c -> OuroborosBundleWithExpandedCtx 'Mux.InitiatorResponderMode addr b m a c initiatorAndResponder miniProtocolParameters version versionData Apps{..} = nodeToNodeProtocols - Set.empty -- TODO: change for a meaningful value miniProtocolParameters ( NodeToNodeProtocols { chainSyncProtocol = @@ -1067,12 +957,6 @@ initiatorAndResponder miniProtocolParameters version versionData Apps{..} = (MiniProtocolCb (\initiatorCtx -> aTxSubmission2Client version initiatorCtx)) (MiniProtocolCb (\responderCtx -> aTxSubmission2Server version responderCtx)) ) - , perasCertDiffusionProtocol = - ( InitiatorAndResponderProtocol - (MiniProtocolCb (\initiatorCtx -> aPerasCertDiffusionClient version initiatorCtx)) - (MiniProtocolCb (\responderCtx -> aPerasCertDiffusionServer version responderCtx)) - ) - , perasVoteDiffusionProtocol = error "perasVoteDiffusionProtocol not implemented" , keepAliveProtocol = ( InitiatorAndResponderProtocol (MiniProtocolCb (\initiatorCtx -> aKeepAliveClient version initiatorCtx)) diff --git a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs index ac9fb75450..44123d6f3a 100644 --- a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs +++ b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs @@ -655,7 +655,6 @@ runWith RunNodeArgs{..} encAddrNtN decAddrNtN LowLevelRunNodeArgs{..} = ByteString ByteString ByteString - ByteString NodeToNodeInitiatorResult () mkNodeToNodeApps nodeKernelArgs nodeKernel peerMetrics encAddrNTN decAddrNTN version = @@ -697,7 +696,6 @@ runWith RunNodeArgs{..} encAddrNtN decAddrNtN LowLevelRunNodeArgs{..} = ByteString ByteString ByteString - ByteString NodeToNodeInitiatorResult () ) -> diff --git a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node/Tracers.hs b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node/Tracers.hs index 4b8627dbd9..3d025ea91d 100644 --- a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node/Tracers.hs +++ b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node/Tracers.hs @@ -42,7 +42,6 @@ import Ouroboros.Consensus.MiniProtocol.ChainSync.Server import Ouroboros.Consensus.MiniProtocol.LocalTxSubmission.Server ( TraceLocalTxSubmissionServerEvent (..) ) -import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.PerasCert import Ouroboros.Consensus.Node.GSM (TraceGsmEvent) import Ouroboros.Consensus.Protocol.Praos.AgentClient ( KESAgentClientTrace (..) @@ -80,10 +79,6 @@ data Tracers' remotePeer localPeer blk f = Tracers f (TraceLabelPeer remotePeer (TraceTxSubmissionOutbound (GenTxId blk) (GenTx blk))) , localTxSubmissionServerTracer :: f (TraceLocalTxSubmissionServerEvent blk) , mempoolTracer :: f (TraceEventMempool blk) - , perasCertDiffusionInboundTracer :: - f (TraceLabelPeer remotePeer (TracePerasCertDiffusionInbound blk)) - , perasCertDiffusionOutboundTracer :: - f (TraceLabelPeer remotePeer (TracePerasCertDiffusionOutbound blk)) , forgeTracer :: f (TraceLabelCreds (TraceForgeEvent blk)) , blockchainTimeTracer :: f (TraceBlockchainTimeEvent UTCTime) , forgeStateInfoTracer :: f (TraceLabelCreds (ForgeStateInfo blk)) @@ -114,8 +109,6 @@ instance , txOutboundTracer = f txOutboundTracer , localTxSubmissionServerTracer = f localTxSubmissionServerTracer , mempoolTracer = f mempoolTracer - , perasCertDiffusionInboundTracer = f perasCertDiffusionInboundTracer - , perasCertDiffusionOutboundTracer = f perasCertDiffusionOutboundTracer , forgeTracer = f forgeTracer , blockchainTimeTracer = f blockchainTimeTracer , forgeStateInfoTracer = f forgeStateInfoTracer @@ -153,8 +146,6 @@ nullTracers = , txOutboundTracer = nullTracer , localTxSubmissionServerTracer = nullTracer , mempoolTracer = nullTracer - , perasCertDiffusionInboundTracer = nullTracer - , perasCertDiffusionOutboundTracer = nullTracer , forgeTracer = nullTracer , blockchainTimeTracer = nullTracer , forgeStateInfoTracer = nullTracer @@ -194,8 +185,6 @@ showTracers tr = , txOutboundTracer = showTracing tr , localTxSubmissionServerTracer = showTracing tr , mempoolTracer = showTracing tr - , perasCertDiffusionInboundTracer = showTracing tr - , perasCertDiffusionOutboundTracer = showTracing tr , forgeTracer = showTracing tr , blockchainTimeTracer = showTracing tr , forgeStateInfoTracer = showTracing tr diff --git a/ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs b/ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs index f5e9dbecbd..8a58fe3b03 100644 --- a/ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs +++ b/ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs @@ -83,7 +83,6 @@ import Ouroboros.Consensus.Mempool import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client as CSClient import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client.HistoricityCheck as HistoricityCheck import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client.InFutureCheck as InFutureCheck -import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.PerasCert (PerasCertDiffusion) import qualified Ouroboros.Consensus.Network.NodeToNode as NTN import Ouroboros.Consensus.Node.ExitPolicy import qualified Ouroboros.Consensus.Node.GSM as GSM @@ -124,8 +123,8 @@ import Ouroboros.Network.NodeToNode ( ConnectionId (..) , ExpandedInitiatorContext (..) , IsBigLedgerPeer (..) + , MiniProtocolParameters (..) , ResponderContext (..) - , defaultMiniProtocolParameters ) import Ouroboros.Network.PeerSelection.Governor ( makePublicPeerSelectionStateVar @@ -1057,7 +1056,13 @@ runThreadNetwork , mempoolCapacityOverride = NoMempoolCapacityBytesOverride , keepAliveRng = kaRng , peerSharingRng = psRng - , miniProtocolParameters = defaultMiniProtocolParameters + , miniProtocolParameters = + MiniProtocolParameters + { chainSyncPipeliningHighMark = 4 + , chainSyncPipeliningLowMark = 2 + , blockFetchPipeliningMax = 10 + , txSubmissionMaxUnacked = 1000 -- TODO ? + } , blockFetchConfiguration = BlockFetchConfiguration { bfcMaxConcurrencyBulkSync = 1 @@ -1183,7 +1188,6 @@ runThreadNetwork Lazy.ByteString Lazy.ByteString (AnyMessage (TxSubmission2 (GenTxId blk) (GenTx blk))) - (AnyMessage (PerasCertDiffusion blk)) (AnyMessage KeepAlive) (AnyMessage (PeerSharing NodeId)) customNodeToNodeCodecs cfg ntnVersion = @@ -1203,9 +1207,6 @@ runThreadNetwork , cTxSubmission2Codec = mapFailureCodec CodecIdFailure $ NTN.cTxSubmission2Codec NTN.identityCodecs - , cPerasCertDiffusionCodec = - mapFailureCodec CodecIdFailure $ - NTN.cPerasCertDiffusionCodec NTN.identityCodecs , cKeepAliveCodec = mapFailureCodec CodecIdFailure $ NTN.cKeepAliveCodec NTN.identityCodecs @@ -1796,7 +1797,6 @@ type LimitedApp' m addr blk = Lazy.ByteString Lazy.ByteString (AnyMessage (TxSubmission2 (GenTxId blk) (GenTx blk))) - (AnyMessage (PerasCertDiffusion blk)) (AnyMessage KeepAlive) (AnyMessage (PeerSharing addr)) NodeToNodeInitiatorResult diff --git a/ouroboros-consensus-diffusion/test/mock-test/Test/ThreadNet/BFT.hs b/ouroboros-consensus-diffusion/test/mock-test/Test/ThreadNet/BFT.hs index 34d0c567a9..a455689110 100644 --- a/ouroboros-consensus-diffusion/test/mock-test/Test/ThreadNet/BFT.hs +++ b/ouroboros-consensus-diffusion/test/mock-test/Test/ThreadNet/BFT.hs @@ -103,7 +103,6 @@ prop_simple_bft_convergence , version = newestVersion (Proxy @MockBftBlock) } - testOutput :: TestOutput MockBftBlock testOutput = runTestNetwork testConfig diff --git a/ouroboros-consensus/changelog.d/20250918_104810_thomas.bagrel_object_diffusion.md b/ouroboros-consensus/changelog.d/20250918_104810_thomas.bagrel_object_diffusion.md deleted file mode 100644 index 6d34c05836..0000000000 --- a/ouroboros-consensus/changelog.d/20250918_104810_thomas.bagrel_object_diffusion.md +++ /dev/null @@ -1,28 +0,0 @@ - - - - - -### Breaking - -- Rely on a new version of `ouroboros-network` with support for ObjectDiffusion mini-protocol -- Add modules `Ouroboros.Consensus.MiniProtocol.ObjectDiffusion{.Inbound,.Outbound}` with implementations of the ObjectDiffusion protocol (quite similar/inspired from TX-submission, except that client = inbound, server = outbound) -- Add module `Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API` defining `ObjectPool{Reader,Writer}` interfaces, through which ObjectDiffusion accesses/stores the objects to send/that have been received. -- Add modules `Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.PerasCert` and `Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.PerasCert` containing definitions specific to `PerasCert` diffusion through the ObjectDiffusion mini-protocol -- Modify `Ouroboros.Consensus.Node.Serialisation` to add CBOR serialisation (`SerialiseNodeToNode`) for `Point blk`, `Tip blk`, and `PerasCert blk` -- Add modules `Test.Consensus.MiniProtocol.ObjectDiffusion.Smoke` and `Test.Consensus.MiniProtocol.ObjectDiffusion.PerasCert.Smoke` with smoke tests for the general ObjectDiffusion mini-protocol and for the `PerasCert`-specific instance of it diff --git a/ouroboros-consensus/ouroboros-consensus.cabal b/ouroboros-consensus/ouroboros-consensus.cabal index a0d3a83786..50da4ba99b 100644 --- a/ouroboros-consensus/ouroboros-consensus.cabal +++ b/ouroboros-consensus/ouroboros-consensus.cabal @@ -191,11 +191,6 @@ library Ouroboros.Consensus.MiniProtocol.LocalStateQuery.Server Ouroboros.Consensus.MiniProtocol.LocalTxMonitor.Server Ouroboros.Consensus.MiniProtocol.LocalTxSubmission.Server - Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound - Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API - Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.PerasCert - Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Outbound - Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.PerasCert Ouroboros.Consensus.Node.GsmState Ouroboros.Consensus.Node.InitStorage Ouroboros.Consensus.Node.NetworkProtocolVersion @@ -669,8 +664,6 @@ test-suite consensus-test Test.Consensus.MiniProtocol.ChainSync.CSJ Test.Consensus.MiniProtocol.ChainSync.Client Test.Consensus.MiniProtocol.LocalStateQuery.Server - Test.Consensus.MiniProtocol.ObjectDiffusion.PerasCert.Smoke - Test.Consensus.MiniProtocol.ObjectDiffusion.Smoke Test.Consensus.Peras.WeightSnapshot Test.Consensus.Util.MonadSTM.NormalForm Test.Consensus.Util.Versioned diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound.hs deleted file mode 100644 index 4208398f24..0000000000 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound.hs +++ /dev/null @@ -1,492 +0,0 @@ -{-# LANGUAGE BangPatterns #-} -{-# LANGUAGE CPP #-} -{-# LANGUAGE DataKinds #-} -{-# LANGUAGE DeriveAnyClass #-} -{-# LANGUAGE DeriveGeneric #-} -{-# LANGUAGE DerivingStrategies #-} -{-# LANGUAGE GADTs #-} -{-# LANGUAGE ImportQualifiedPost #-} -{-# LANGUAGE KindSignatures #-} -{-# LANGUAGE LambdaCase #-} -{-# LANGUAGE RecordWildCards #-} -{-# LANGUAGE ScopedTypeVariables #-} -{-# LANGUAGE TypeApplications #-} - --- | Defines the inbound side of the ObjectDiffusion mini-protocol. --- This protocol is inspired from TxSubmission and aims to provide a generic --- way to diffuse objects for Peras, i.e. Peras votes and certificates. --- --- See the design document here: https://tweag.github.io/cardano-peras/peras-design.pdf -module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound - ( objectDiffusionInbound - , TraceObjectDiffusionInbound (..) - , ObjectDiffusionInboundError (..) - , NumObjectsProcessed (..) - ) where - -import Cardano.Prelude (catMaybes, (&)) -import Control.Concurrent.Class.MonadSTM.Strict.TVar.Checked -import Control.Exception (assert) -import Control.Monad (when) -import Control.Monad.Class.MonadSTM -import Control.Monad.Class.MonadThrow -import Control.Tracer (Tracer, traceWith) -import Data.Data (Typeable) -import Data.Foldable as Foldable (foldl', toList) -import Data.List qualified as List -import Data.List.NonEmpty qualified as NonEmpty -import Data.Map.Strict (Map) -import Data.Map.Strict qualified as Map -import Data.Sequence.Strict (StrictSeq) -import Data.Sequence.Strict qualified as Seq -import Data.Set (Set) -import Data.Set qualified as Set -import Data.Word (Word64) -import GHC.Generics (Generic) -import Network.TypedProtocol.Core (N (Z), Nat (..), natToInt) -import NoThunks.Class (NoThunks (..)) -import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API -import Ouroboros.Consensus.Util.NormalForm.Invariant (noThunksInvariant) -import Ouroboros.Network.ControlMessage -import Ouroboros.Network.NodeToNode.Version (NodeToNodeVersion) -import Ouroboros.Network.Protocol.ObjectDiffusion.Inbound -import Ouroboros.Network.Protocol.ObjectDiffusion.Type - -newtype NumObjectsProcessed - = NumObjectsProcessed - { getNumObjectsProcessed :: Word64 - } - deriving (Eq, Show) - -data TraceObjectDiffusionInbound objectId object - = -- | Number of objects just about to be inserted. - TraceObjectDiffusionInboundCollectedObjects Int - | -- | Just processed object pass/fail breakdown. - TraceObjectDiffusionInboundAddedObjects NumObjectsProcessed - | -- | Received a 'ControlMessage' from the outbound peer governor, and about - -- to act on it. - TraceObjectDiffusionInboundRecvControlMessage ControlMessage - | TraceObjectDiffusionInboundCanRequestMoreObjects Int - | TraceObjectDiffusionInboundCannotRequestMoreObjects Int - deriving (Eq, Show) - -data ObjectDiffusionInboundError objectId object - = ProtocolErrorObjectsDifferentThanRequested (Set objectId) (Set objectId) - | ProtocolErrorObjectIdsNotRequested Int Int - | ProtocolErrorObjectIdsAlreadyKnown (Set objectId) - | ProtocolErrorObjectIdsDuplicate (Map objectId Int) - deriving Show - -instance - (Show objectId, Typeable object, Typeable objectId) => - Exception (ObjectDiffusionInboundError objectId object) - where - displayException (ProtocolErrorObjectsDifferentThanRequested reqButNotRecvd recvButNotReq) = - "The peer replied with different objects than those we asked for: " - ++ "requested but didn't receive " - ++ show reqButNotRecvd - ++ "; received but didn't requested " - ++ show recvButNotReq - displayException (ProtocolErrorObjectIdsNotRequested expected actual) = - "The peer replied with more objectIds than we asked for: expected " - ++ show expected - ++ " , received " - ++ show actual - displayException (ProtocolErrorObjectIdsAlreadyKnown offenders) = - "The peer replied with some objectIds that it has already sent us previously: " - ++ show offenders - displayException (ProtocolErrorObjectIdsDuplicate offenders) = - "The peer replied with a batch of objectIds containing duplicates: " - ++ show offenders - --- | Information maintained internally in the 'objectDiffusionInbound' --- implementation. -data InboundSt objectId object = InboundSt - { numIdsInFlight :: !NumObjectIdsReq - -- ^ The number of object identifiers that we have requested but - -- which have not yet been replied to. We need to track this to keep - -- our requests within the limit on the 'outstandingFifo' size. - , outstandingFifo :: !(StrictSeq objectId) - -- ^ This mirrors the queue of objects that the outbound peer has available - -- for us. Objects are kept in the order in which the outbound peer - -- advertised them to us. This is the same order in which we submit them to - -- the objectPool. It is also the order we acknowledge them. - , canRequestNext :: !(Set objectId) - -- ^ The objectIds that we can request. These are a subset of the - -- 'outstandingFifo' that we have not yet requested or not have in the pool - -- already. This is not ordered to illustrate the fact that we can - -- request objects out of order. - , pendingObjects :: !(Map objectId (Maybe object)) - -- ^ Objects we have successfully downloaded (or decided intentionally to - -- skip download) but have not yet added to the objectPool or acknowledged. - -- - -- Object IDs in this 'Map' are mapped to 'Nothing' if we notice that - -- they are already in the objectPool. That way we can skip requesting them - -- from the outbound peer, but still acknowledge them when the time comes. - , numToAckOnNextReq :: !NumObjectIdsAck - -- ^ The number of objects we can acknowledge on our next request - -- for more object IDs. Their corresponding IDs have already been removed - -- from 'outstandingFifo'. - } - deriving stock (Show, Generic) - deriving anyclass NoThunks - -initialInboundSt :: InboundSt objectId object -initialInboundSt = InboundSt 0 Seq.empty Set.empty Map.empty 0 - -objectDiffusionInbound :: - forall objectId object m. - ( Ord objectId - , Show objectId - , Typeable objectId - , Typeable object - , NoThunks objectId - , NoThunks object - , MonadSTM m - , MonadThrow m - ) => - Tracer m (TraceObjectDiffusionInbound objectId object) -> - -- | Maximum values for outstanding FIFO length, number of IDs to request, - -- and number of objects to request - (NumObjectsOutstanding, NumObjectIdsReq, NumObjectsReq) -> - ObjectPoolWriter objectId object m -> - NodeToNodeVersion -> - ControlMessageSTM m -> - ObjectDiffusionInboundPipelined objectId object m () -objectDiffusionInbound - tracer - (maxFifoLength, maxNumIdsToReq, maxNumObjectsToReq) - ObjectPoolWriter{..} - _version - controlMessageSTM = - ObjectDiffusionInboundPipelined $! - checkState initialInboundSt & go Zero - where - canRequestMoreObjects :: InboundSt k object -> Bool - canRequestMoreObjects !st = - not (Set.null (canRequestNext st)) - - -- Computes how many new IDs we can request so that receiving all of them - -- won't make 'outstandingFifo' exceed 'maxFifoLength'. - numIdsToReq :: InboundSt objectId object -> NumObjectIdsReq - numIdsToReq !st = - maxNumIdsToReq - `min` ( fromIntegral maxFifoLength - - (fromIntegral $ Seq.length $ outstandingFifo st) - - numIdsInFlight st - ) - - -- Updates 'InboundSt' with new object IDs and return the updated 'InboundSt'. - -- - -- Collected object IDs that are already in the objectPool are pre-emptively - -- acknowledged so that we don't need to bother requesting them from the - -- outbound peer. - preAcknowledge :: - InboundSt objectId object -> - (objectId -> Bool) -> - [objectId] -> - InboundSt objectId object - preAcknowledge !st _ collectedIds | null collectedIds = st - preAcknowledge !st poolHasObject collectedIds = - let - -- Divide the collected IDs in two parts: those that are already in the - -- objectPool and those that are not. - (alreadyObtained, notYetObtained) = - List.partition - poolHasObject - collectedIds - - -- The objects that we intentionally don't request, because they are - -- already in the objectPool, will need to be acknowledged. - -- So we extend 'pendingObjects' with those objects (so of course they - -- have no corresponding reply). - pendingObjects' = - foldl' - (\accMap objectId -> Map.insert objectId Nothing accMap) - (pendingObjects st) - alreadyObtained - - -- We initially extend 'outstandingFifo' with the all the collected IDs - -- (to properly mirror the server state). - outstandingFifo' = outstandingFifo st <> Seq.fromList collectedIds - - -- Now check if the update of 'pendingObjects' let us acknowledge a prefix - -- of the 'outstandingFifo', as we do in 'goCollect' -> 'CollectObjects'. - (objectIdsToAck, outstandingFifo'') = - Seq.spanl (`Map.member` pendingObjects') outstandingFifo' - - -- If so we can remove them from the 'pendingObjects' structure. - -- - -- Note that unlike in TX-Submission, we made sure the outstanding FIFO - -- couldn't have duplicate IDs, so we don't have to worry about re-adding - -- the duplicate IDs to 'pendingObjects' for future acknowledgment. - pendingObjects'' = - Foldable.foldl' - (flip Map.delete) - pendingObjects' - objectIdsToAck - - !st' = - st - { canRequestNext = canRequestNext st <> (Set.fromList notYetObtained) - , pendingObjects = pendingObjects'' - , outstandingFifo = outstandingFifo'' - , numToAckOnNextReq = - numToAckOnNextReq st - + fromIntegral (Seq.length objectIdsToAck) - } - in - st' - - go :: - forall (n :: N). - Nat n -> - InboundSt objectId object -> - InboundStIdle n objectId object m () - go n !st = WithEffect $ do - -- Check whether we should continue engaging in the protocol. - ctrlMsg <- atomically controlMessageSTM - traceWith tracer $ - TraceObjectDiffusionInboundRecvControlMessage ctrlMsg - case ctrlMsg of - -- The peer selection governor is asking us to terminate the connection. - Terminate -> - pure $! terminateAfterDrain n - -- Otherwise, we can continue the protocol normally. - _continue -> case n of - -- We didn't pipeline any requests, so there are no replies in flight - -- (nothing to collect) - Zero -> do - if canRequestMoreObjects st - then do - -- There are no replies in flight, but we do know some more objects - -- we can ask for, so lets ask for them and more objectIds in a - -- pipelined way. - traceWith tracer $ - TraceObjectDiffusionInboundCanRequestMoreObjects (natToInt n) - pure $! checkState st & goReqObjectsAndObjectIdsPipelined Zero - else do - -- There's no replies in flight, and we have no more objects we can - -- ask for so the only remaining thing to do is to ask for more - -- objectIds. Since this is the only thing to do now, we make this a - -- blocking call. - traceWith tracer $ - TraceObjectDiffusionInboundCannotRequestMoreObjects (natToInt n) - pure $! checkState st & goReqObjectIdsBlocking - - -- We have pipelined some requests, so there are some replies in flight. - Succ n' -> - if canRequestMoreObjects st - then do - -- We have replies in flight and we should eagerly collect them if - -- available, but there are objects to request too so we - -- should *not* block waiting for replies. - -- So we ask for new objects and objectIds in a pipelined way. - traceWith tracer $ - TraceObjectDiffusionInboundCanRequestMoreObjects (natToInt n) - pure $! - CollectPipelined - (Just (checkState st & goReqObjectsAndObjectIdsPipelined (Succ n'))) - (\collected -> checkState st & goCollect n' collected) - else do - traceWith tracer $ - TraceObjectDiffusionInboundCannotRequestMoreObjects (natToInt n) - -- In this case we can theoretically only collect replies or request - -- new object IDs. - -- - -- But it's important not to pipeline more requests for objectIds now - -- because if we did, then immediately after sending the request (but - -- having not yet received a response to either this or the other - -- pipelined requests), we would directly re-enter this code path, - -- resulting us in filling the pipeline with an unbounded number of - -- requests. - -- - -- So we instead block until we collect a reply. - pure $! - CollectPipelined - Nothing - (\collected -> checkState st & goCollect n' collected) - - goCollect :: - forall (n :: N). - Nat n -> - Collect objectId object -> - InboundSt objectId object -> - InboundStIdle n objectId object m () - goCollect n collect !st = case collect of - CollectObjectIds numIdsRequested collectedIds -> WithEffect $ do - let numCollectedIds = length collectedIds - collectedIdsMap = Map.fromListWith (+) [(x, 1 :: Int) | x <- collectedIds] - - -- Check they didn't send more than we asked for. We don't need to - -- check for a minimum: the blocking case checks for non-zero - -- elsewhere, and for the non-blocking case it is quite normal for - -- them to send us none. - when (numCollectedIds > fromIntegral numIdsRequested) $ - throwIO - ( ProtocolErrorObjectIdsNotRequested @objectId @object - (fromIntegral numIdsRequested) - numCollectedIds - ) - - -- Check that the server didn't send duplicate IDs in its response - when (Map.size collectedIdsMap /= numCollectedIds) $ - throwIO - ( ProtocolErrorObjectIdsDuplicate @objectId @object $ - Map.filter (> 1) $ - collectedIdsMap - ) - - -- Check that the server didn't send IDs that were already in the - -- outstanding FIFO - let alreadyKnownIds = Seq.filter (`Map.member` collectedIdsMap) (outstandingFifo st) - when (not (null alreadyKnownIds)) $ - throwIO - ( ProtocolErrorObjectIdsAlreadyKnown @objectId @object $ - Set.fromList (toList alreadyKnownIds) - ) - - -- We extend our outstanding FIFO with the newly received objectIds by - -- calling 'preAcknowledge' which will also pre-emptively acknowledge the - -- objectIds that we already have in the pool and thus don't need to - -- request. - let !st' = st{numIdsInFlight = numIdsInFlight st - numIdsRequested} - poolHasObject <- atomically $ opwHasObject - let !st'' = preAcknowledge st' poolHasObject collectedIds - pure $! checkState st'' & go n - CollectObjects requestedIds collectedObjects -> WithEffect $ do - let requestedIdsSet = Set.fromList requestedIds - obtainedIdsSet = Set.fromList (opwObjectId <$> collectedObjects) - - -- To start with we have to verify that the objects they have sent us are - -- exactly the objects we asked for, not more, not less. - when (requestedIdsSet /= obtainedIdsSet) $ - let reqButNotRecvd = requestedIdsSet `Set.difference` obtainedIdsSet - recvButNotReq = obtainedIdsSet `Set.difference` requestedIdsSet - in throwIO - ( ProtocolErrorObjectsDifferentThanRequested @objectId @object - reqButNotRecvd - recvButNotReq - ) - - traceWith tracer $ - TraceObjectDiffusionInboundCollectedObjects (length collectedObjects) - - -- We update 'pendingObjects' with the newly obtained objects - let pendingObjects' = - foldl' - (\accMap object -> Map.insert (opwObjectId object) (Just object) accMap) - (pendingObjects st) - collectedObjects - - -- We then find the longest prefix of 'outstandingFifo' for which we have - -- all the corresponding IDs in 'pendingObjects'. - -- We remove this prefix from 'outstandingFifo'. - (objectIdsToAck, outstandingFifo') = - Seq.spanl (`Map.member` pendingObjects') (outstandingFifo st) - - -- And also remove these entries from 'pendingObjects'. - -- - -- Note that unlike in TX-Submission, we made sure the outstanding FIFO - -- couldn't have duplicate IDs, so we don't have to worry about re-adding - -- the duplicate IDs to 'pendingObjects' for future acknowledgment. - pendingObjects'' = - Foldable.foldl' - (flip Map.delete) - pendingObjects' - objectIdsToAck - - -- These are the objects we need to submit to the object pool - objectsToAck = - catMaybes $ - (((Map.!) pendingObjects') <$> toList objectIdsToAck) - - opwAddObjects objectsToAck - traceWith tracer $ - TraceObjectDiffusionInboundAddedObjects - (NumObjectsProcessed (fromIntegral $ length objectsToAck)) - - let !st' = - st - { pendingObjects = pendingObjects'' - , outstandingFifo = outstandingFifo' - , numToAckOnNextReq = - numToAckOnNextReq st - + fromIntegral (Seq.length objectIdsToAck) - } - pure $! checkState st' & go n - - goReqObjectIdsBlocking :: - InboundSt objectId object -> - InboundStIdle 'Z objectId object m () - goReqObjectIdsBlocking !st = - let numIdsToRequest = numIdsToReq st - -- We should only request new object IDs in a blocking way if we have - -- absolutely nothing else we can do. - !st' = - st - { numToAckOnNextReq = 0 - , numIdsInFlight = numIdsToRequest - } - in assert - ( numIdsInFlight st == 0 - && Seq.null (outstandingFifo st) - && Set.null (canRequestNext st) - && Map.null (pendingObjects st) - ) - $ SendMsgRequestObjectIdsBlocking - (numToAckOnNextReq st) - numIdsToRequest - ( \neCollectedIds -> - checkState st' & goCollect Zero (CollectObjectIds numIdsToRequest (NonEmpty.toList neCollectedIds)) - ) - - goReqObjectsAndObjectIdsPipelined :: - forall (n :: N). - Nat n -> - InboundSt objectId object -> - InboundStIdle n objectId object m () - goReqObjectsAndObjectIdsPipelined n !st = - -- TODO: This implementation is deliberately naive, we pick in an - -- arbitrary order. We may want to revisit this later. - let (toRequest, canRequestNext') = - Set.splitAt (fromIntegral maxNumObjectsToReq) (canRequestNext st) - !st' = st{canRequestNext = canRequestNext'} - in SendMsgRequestObjectsPipelined - (toList toRequest) - (checkState st' & goReqObjectIdsPipelined (Succ n)) - - goReqObjectIdsPipelined :: - forall (n :: N). - Nat n -> - InboundSt objectId object -> - InboundStIdle n objectId object m () - goReqObjectIdsPipelined n !st = - let numIdsToRequest = numIdsToReq st - in if numIdsToRequest <= 0 - then checkState st & go n - else - let !st' = - st - { numIdsInFlight = - numIdsInFlight st - + numIdsToRequest - , numToAckOnNextReq = 0 - } - in SendMsgRequestObjectIdsPipelined - (numToAckOnNextReq st) - numIdsToRequest - (checkState st' & go (Succ n)) - - -- Ignore all outstanding replies to messages we pipelined ("drain"), and then - -- terminate. - terminateAfterDrain :: - Nat n -> InboundStIdle n objectId object m () - terminateAfterDrain = \case - Zero -> SendMsgDone () - Succ n -> CollectPipelined Nothing $ \_ignoredMsg -> terminateAfterDrain n - --- | Helper to ensure that the `InboundSt` is free of unexpected thunks and --- stays strict during the whole process -checkState :: NoThunks s => s -> s -checkState !st = checkInvariant (noThunksInvariant st) st diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/ObjectPool/API.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/ObjectPool/API.hs deleted file mode 100644 index a41e0cf675..0000000000 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/ObjectPool/API.hs +++ /dev/null @@ -1,67 +0,0 @@ --- | API for reading from and writing to object pools in the ObjectDiffusion --- miniprotocol. --- --- The underlying object pool can be any database, such as a 'PerasCertDb' in --- Peras certificate diffusion. --- --- 'ObjectPoolReader' is used on the outbound side of the protocol. Objects in --- the pool are ordered by a strictly increasing ticket number ('ticketNo'), --- which represents their time of arrival. Ticket numbers are local to each --- node, unlike object IDs, which are global. Object IDs are not used for --- ordering, since objects may arrive slightly out of order from peers. --- --- To read from the pool, one requests objects with a ticket number strictly --- greater than the last known one. 'oprZeroTicketNo' provides an initial ticket --- number for the first request. --- --- 'ObjectPoolWriter' is used on the inbound side of the protocol. It allows --- checking whether an object is already present (to avoid re-requesting it) and --- appending new objects. Ticket numbers are not part of the inbound interface, --- but are used internally: newly added objects always receive a ticket number --- strictly greater than those of older ones. --- --- This API design is inspired by 'MempoolSnapshot' from the TX-submission --- miniprotocol, see: --- -module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API - ( ObjectPoolReader (..) - , ObjectPoolWriter (..) - ) where - -import Control.Concurrent.Class.MonadSTM.Strict (STM) -import Data.Map (Map) -import Data.Word (Word64) - --- | Interface used by the outbound side of object diffusion as its source of --- objects to give to the remote side. -data ObjectPoolReader objectId object ticketNo m - = ObjectPoolReader - { oprObjectId :: object -> objectId - -- ^ Return the id of the specified object - , oprZeroTicketNo :: ticketNo - -- ^ Ticket number before the first item in the pool. - , oprObjectsAfter :: ticketNo -> Word64 -> STM m (Maybe (m (Map ticketNo object))) - -- ^ Get the map of objects available in the pool with a 'ticketNo' greater - -- than the specified one, with the number of returned objects capped to the - -- specified limit. - -- Returns 'Nothing' if there are no such objects available atm in the pool, - -- or 'Just' a monadic action yielding the map of objects otherwise. - -- The monadic action is there to account for the fact that objects might be - -- stored on disk. - -- If the monadic action is executed immediately, the returned map will be - -- non-empty. But if the consumer delays executing the action too much, - -- it is possible that the objects get garbage-collected in the meantime, and - -- thus the returned map could be empty. - } - --- | Interface used by the inbound side of object diffusion when receiving --- objects. -data ObjectPoolWriter objectId object m - = ObjectPoolWriter - { opwObjectId :: object -> objectId - -- ^ Return the id of the specified object - , opwAddObjects :: [object] -> m () - -- ^ Add a batch of objects to the objectPool. - , opwHasObject :: STM m (objectId -> Bool) - -- ^ Check if the object pool contains an object with the given id - } diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/ObjectPool/PerasCert.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/ObjectPool/PerasCert.hs deleted file mode 100644 index 26bff506f3..0000000000 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/ObjectPool/PerasCert.hs +++ /dev/null @@ -1,114 +0,0 @@ -{-# LANGUAGE GADTs #-} -{-# LANGUAGE StandaloneDeriving #-} - --- | Instantiate 'ObjectPoolReader' and 'ObjectPoolWriter' using Peras --- certificates from the 'PerasCertDB' (or the 'ChainDB' which is wrapping the --- 'PerasCertDB'). -module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.PerasCert - ( makePerasCertPoolReaderFromCertDB - , makePerasCertPoolWriterFromCertDB - , makePerasCertPoolReaderFromChainDB - , makePerasCertPoolWriterFromChainDB - ) where - -import Data.Map (Map) -import qualified Data.Map as Map -import GHC.Exception (throw) -import Ouroboros.Consensus.Block -import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API -import Ouroboros.Consensus.Storage.ChainDB.API (ChainDB) -import qualified Ouroboros.Consensus.Storage.ChainDB.API as ChainDB -import Ouroboros.Consensus.Storage.PerasCertDB.API - ( PerasCertDB - , PerasCertSnapshot - , PerasCertTicketNo - ) -import qualified Ouroboros.Consensus.Storage.PerasCertDB.API as PerasCertDB -import Ouroboros.Consensus.Util.IOLike - --- | TODO: replace by `Data.Map.take` as soon as we move to GHC 9.8 -takeAscMap :: Int -> Map k v -> Map k v -takeAscMap n = Map.fromDistinctAscList . take n . Map.toAscList - -makePerasCertPoolReaderFromSnapshot :: - (IOLike m, StandardHash blk) => - STM m (PerasCertSnapshot blk) -> - ObjectPoolReader PerasRoundNo (PerasCert blk) PerasCertTicketNo m -makePerasCertPoolReaderFromSnapshot getCertSnapshot = - ObjectPoolReader - { oprObjectId = getPerasCertRound - , oprZeroTicketNo = PerasCertDB.zeroPerasCertTicketNo - , oprObjectsAfter = \lastKnown limit -> do - certSnapshot <- getCertSnapshot - let certsAfterLastKnown = - PerasCertDB.getCertsAfter certSnapshot lastKnown - let loadCertsAfterLastKnown = - pure (getPerasCert <$> takeAscMap (fromIntegral limit) certsAfterLastKnown) - pure $ - if Map.null certsAfterLastKnown - then Nothing - else Just loadCertsAfterLastKnown - } - -makePerasCertPoolReaderFromCertDB :: - (IOLike m, StandardHash blk) => - PerasCertDB m blk -> ObjectPoolReader PerasRoundNo (PerasCert blk) PerasCertTicketNo m -makePerasCertPoolReaderFromCertDB perasCertDB = - makePerasCertPoolReaderFromSnapshot (PerasCertDB.getCertSnapshot perasCertDB) - -makePerasCertPoolWriterFromCertDB :: - (StandardHash blk, IOLike m) => - PerasCertDB m blk -> ObjectPoolWriter PerasRoundNo (PerasCert blk) m -makePerasCertPoolWriterFromCertDB perasCertDB = - ObjectPoolWriter - { opwObjectId = getPerasCertRound - , opwAddObjects = \certs -> do - validatePerasCerts certs - >>= mapM_ (PerasCertDB.addCert perasCertDB) - , opwHasObject = do - certSnapshot <- PerasCertDB.getCertSnapshot perasCertDB - pure $ PerasCertDB.containsCert certSnapshot - } - -makePerasCertPoolReaderFromChainDB :: - (IOLike m, StandardHash blk) => - ChainDB m blk -> ObjectPoolReader PerasRoundNo (PerasCert blk) PerasCertTicketNo m -makePerasCertPoolReaderFromChainDB chainDB = - makePerasCertPoolReaderFromSnapshot (ChainDB.getPerasCertSnapshot chainDB) - -makePerasCertPoolWriterFromChainDB :: - (StandardHash blk, IOLike m) => - ChainDB m blk -> ObjectPoolWriter PerasRoundNo (PerasCert blk) m -makePerasCertPoolWriterFromChainDB chainDB = - ObjectPoolWriter - { opwObjectId = getPerasCertRound - , opwAddObjects = \certs -> do - validatePerasCerts certs - >>= mapM_ (ChainDB.addPerasCertAsync chainDB) - , opwHasObject = do - certSnapshot <- ChainDB.getPerasCertSnapshot chainDB - pure $ PerasCertDB.containsCert certSnapshot - } - -data PerasCertInboundException - = forall blk. PerasCertValidationError (PerasValidationErr blk) - -deriving instance Show PerasCertInboundException - -instance Exception PerasCertInboundException - --- | Validate a list of 'PerasCert's, throwing a 'PerasCertInboundException' if --- any of them are invalid. -validatePerasCerts :: - (StandardHash blk, MonadThrow m) => - [PerasCert blk] -> - m [ValidatedPerasCert blk] -validatePerasCerts certs = do - let perasCfg = makePerasCfg Nothing - -- TODO replace the mocked-up Nothing with a real - -- 'BlockConfig' when all the plumbing is in place - -- see https://github.com/tweag/cardano-peras/issues/73 - -- see https://github.com/tweag/cardano-peras/issues/120 - case traverse (validatePerasCert perasCfg) certs of - Left validationErr -> throw (PerasCertValidationError validationErr) - Right validatedCerts -> return validatedCerts diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Outbound.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Outbound.hs deleted file mode 100644 index 52024ecdc8..0000000000 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Outbound.hs +++ /dev/null @@ -1,278 +0,0 @@ -{-# LANGUAGE BangPatterns #-} -{-# LANGUAGE GADTs #-} -{-# LANGUAGE ImportQualifiedPost #-} -{-# LANGUAGE RecordWildCards #-} -{-# LANGUAGE ScopedTypeVariables #-} - --- | Defines the outbound side of the ObjectDiffusion mini-protocol. --- This protocol is inspired from TxSubmission and aims to provide a generic --- way to diffuse objects for Peras, i.e. Peras votes and certificates. --- --- See the design document here: https://tweag.github.io/cardano-peras/peras-design.pdf -module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Outbound - ( objectDiffusionOutbound - , TraceObjectDiffusionOutbound (..) - , ObjectDiffusionOutboundError (..) - ) where - -import Control.Exception (assert) -import Control.Monad (join, unless, when) -import Control.Monad.Class.MonadSTM -import Control.Monad.Class.MonadThrow -import Control.Tracer (Tracer, traceWith) -import Data.Foldable qualified as Foldable -import Data.List.NonEmpty qualified as NonEmpty -import Data.Map (Map) -import Data.Map qualified as Map -import Data.Maybe (fromMaybe) -import Data.Sequence.Strict (StrictSeq) -import Data.Sequence.Strict qualified as Seq -import Data.Set qualified as Set -import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API -import Ouroboros.Network.NodeToNode.Version (NodeToNodeVersion) -import Ouroboros.Network.Protocol.ObjectDiffusion.Outbound -import Ouroboros.Network.Protocol.ObjectDiffusion.Type - -data TraceObjectDiffusionOutbound objectId object - = TraceObjectDiffusionOutboundRecvMsgRequestObjectIds NumObjectIdsReq - | -- | The IDs to be sent in the response - TraceObjectDiffusionOutboundSendMsgReplyObjectIds [objectId] - | -- | The IDs of the objects requested. - TraceObjectDiffusionOutboundRecvMsgRequestObjects - [objectId] - | -- | The objects to be sent in the response. - TraceObjectDiffusionOutboundSendMsgReplyObjects - [object] - | -- | There is a mismatch between the result from the in-memory object pool - -- index and the content we can load so we have to retry the whole monad m - -- action to load new objects. - TraceObjectDiffusionOutboundUnableToLoadObjectsThusRetrying - | -- | Received 'MsgDone' - TraceObjectDiffusionOutboundTerminated - deriving Show - -data ObjectDiffusionOutboundError - = ProtocolErrorAckedTooManyObjectIds - | ProtocolErrorRequestedNothing - | ProtocolErrorRequestedTooManyObjectIds NumObjectIdsReq NumObjectsOutstanding - | ProtocolErrorRequestBlocking - | ProtocolErrorRequestNonBlocking - | ProtocolErrorRequestedUnavailableObject - | ProtocolErrorRequestedDuplicateObject - deriving Show - -instance Exception ObjectDiffusionOutboundError where - displayException ProtocolErrorAckedTooManyObjectIds = - "The peer tried to acknowledged more objectIds than are available to do so." - displayException (ProtocolErrorRequestedTooManyObjectIds reqNo maxUnacked) = - "The peer requested " - ++ show reqNo - ++ " objectIds which would put the " - ++ "total in flight over the limit of " - ++ show maxUnacked - displayException ProtocolErrorRequestedNothing = - "The peer requested zero objectIds." - displayException ProtocolErrorRequestBlocking = - "The peer made a blocking request for more objectIds when there are still " - ++ "unacknowledged objectIds. It should have used a non-blocking request." - displayException ProtocolErrorRequestNonBlocking = - "The peer made a non-blocking request for more objectIds when there are " - ++ "no unacknowledged objectIds. It should have used a blocking request." - displayException ProtocolErrorRequestedUnavailableObject = - "The peer requested an object which is not available, either " - ++ "because it was never available or because it was previously requested." - displayException ProtocolErrorRequestedDuplicateObject = - "The peer requested the same object twice." - -data OutboundSt objectId object ticketNo = OutboundSt - { outstandingFifo :: !(StrictSeq object) - , lastTicketNo :: !ticketNo - } - -objectDiffusionOutbound :: - forall objectId object ticketNo m. - (Ord objectId, Ord ticketNo, MonadSTM m, MonadThrow m) => - Tracer m (TraceObjectDiffusionOutbound objectId object) -> - -- | Maximum number of unacknowledged objectIds allowed - NumObjectsOutstanding -> - ObjectPoolReader objectId object ticketNo m -> - NodeToNodeVersion -> - ObjectDiffusionOutbound objectId object m () -objectDiffusionOutbound tracer maxFifoLength ObjectPoolReader{..} _version = - ObjectDiffusionOutbound (pure (makeBundle $ OutboundSt Seq.empty oprZeroTicketNo)) - where - makeBundle :: OutboundSt objectId object ticketNo -> OutboundStIdle objectId object m () - makeBundle !st = - OutboundStIdle - { recvMsgRequestObjectIds = recvMsgRequestObjectIds st - , recvMsgRequestObjects = recvMsgRequestObjects st - , recvMsgDone = traceWith tracer TraceObjectDiffusionOutboundTerminated - } - - updateStNewObjects :: - OutboundSt objectId object ticketNo -> - Map ticketNo object -> - OutboundSt objectId object ticketNo - updateStNewObjects !OutboundSt{..} newObjectsWithTicketNos = - -- These objects should all be fresh - assert (all (> lastTicketNo) (Map.keys newObjectsWithTicketNos)) $ - let !outstandingFifo' = - Foldable.foldl' - (Seq.|>) - outstandingFifo - newObjectsWithTicketNos - !lastTicketNo' = case Map.lookupMax newObjectsWithTicketNos of - Nothing -> lastTicketNo - Just (ticketNo, _) -> ticketNo - in OutboundSt - { outstandingFifo = outstandingFifo' - , lastTicketNo = lastTicketNo' - } - - recvMsgRequestObjectIds :: - forall blocking. - OutboundSt objectId object ticketNo -> - SingBlockingStyle blocking -> - NumObjectIdsAck -> - NumObjectIdsReq -> - m (OutboundStObjectIds blocking objectId object m ()) - recvMsgRequestObjectIds !st@OutboundSt{..} blocking numIdsToAck numIdsToReq = do - traceWith tracer (TraceObjectDiffusionOutboundRecvMsgRequestObjectIds numIdsToReq) - - when (numIdsToAck > fromIntegral (Seq.length outstandingFifo)) $ - throwIO ProtocolErrorAckedTooManyObjectIds - - when - ( Seq.length outstandingFifo - - fromIntegral numIdsToAck - + fromIntegral numIdsToReq - > fromIntegral maxFifoLength - ) - $ throwIO (ProtocolErrorRequestedTooManyObjectIds numIdsToReq maxFifoLength) - - -- First we update our FIFO to remove the number of objectIds that the - -- inbound peer has acknowledged. - let !outstandingFifo' = Seq.drop (fromIntegral numIdsToAck) outstandingFifo - -- must specify the type here otherwise GHC complains about mismatch objectId types - st' :: OutboundSt objectId object ticketNo - !st' = st{outstandingFifo = outstandingFifo'} - - -- Grab info about any new objects after the last object ticketNo we've - -- seen, up to the number that the peer has requested. - case blocking of - ----------------------------------------------------------------------- - SingBlocking -> do - when (numIdsToReq == 0) $ - throwIO ProtocolErrorRequestedNothing - unless (Seq.null outstandingFifo') $ - throwIO ProtocolErrorRequestBlocking - - -- oprObjectIdsAfter returns early with Nothing if there are no new - -- objects after lastTicketNo. So we retry efficiently using STM until - -- we get 'Just'. - -- But even if we get 'Just', when loading the actual objects (as a - -- non-STM action, taking place in monad m), we might find that the map - -- is empty (because objects were deleted from the pool in the meantime). - -- So we have a outer-level retry loop, this time in monad m. - -- - -- The only case were the outer-level retry loop would spin forever is - -- if the in-memory index of the object pool keeps indicating that - -- there are new objects after 'lastTicketNo', but when we try to load - -- them, they are not there anymore. This would indicate a bug in the - -- object pool implementation. - newObjects <- retryMaybeWith (traceWith tracer TraceObjectDiffusionOutboundUnableToLoadObjectsThusRetrying) - . join - . atomically - $ do - mLoadNewObjects <- oprObjectsAfter lastTicketNo (fromIntegral numIdsToReq) - maybe retry (pure . fmap ensureNonEmpty) mLoadNewObjects - - let !newIds = oprObjectId <$> Map.elems newObjects - st'' = updateStNewObjects st' newObjects - - traceWith tracer (TraceObjectDiffusionOutboundSendMsgReplyObjectIds newIds) - - pure $ - SendMsgReplyObjectIds - (BlockingReply (NonEmpty.fromList $ newIds)) - (makeBundle st'') - - ----------------------------------------------------------------------- - SingNonBlocking -> do - when (numIdsToReq == 0 && numIdsToAck == 0) $ - throwIO ProtocolErrorRequestedNothing - when (Seq.null outstandingFifo') $ - throwIO ProtocolErrorRequestNonBlocking - - -- oprObjectIdsAfter returns early with Nothing if there are no new - -- objects after lastTicketNo. So we use fromMaybe to provide an empty map - -- in that case. - newObjects <- - join . atomically . fmap (fromMaybe $ pure Map.empty) $ - oprObjectsAfter lastTicketNo (fromIntegral numIdsToReq) - - let !newIds = oprObjectId <$> Map.elems newObjects - st'' = updateStNewObjects st' newObjects - - traceWith tracer (TraceObjectDiffusionOutboundSendMsgReplyObjectIds newIds) - - pure (SendMsgReplyObjectIds (NonBlockingReply newIds) (makeBundle st'')) - - recvMsgRequestObjects :: - OutboundSt objectId object ticketNo -> - [objectId] -> - m (OutboundStObjects objectId object m ()) - recvMsgRequestObjects !st@OutboundSt{..} requestedIds = do - traceWith tracer (TraceObjectDiffusionOutboundRecvMsgRequestObjects requestedIds) - - -- All the objects correspond to advertised objectIds are already in the - -- outstandingFifo. So we don't need to read from the object pool here. - - -- I've optimized the search to do only one traversal of 'outstandingFifo'. - -- When the 'requestedIds' is exactly the whole 'outstandingFifo', then this - -- should take O(n * log n) time. - -- - -- TODO: We might need to revisit the underlying 'outstandingFifo' data - -- structure and the search if performance isn't sufficient when we'll use - -- ObjectDiffusion for votes diffusion (and not just cert diffusion). - - let requestedIdsSet = Set.fromList requestedIds - - when (Set.size requestedIdsSet /= length requestedIds) $ - throwIO ProtocolErrorRequestedDuplicateObject - - let requestedObjects = - foldr - ( \obj acc -> - if Set.member (oprObjectId obj) requestedIdsSet - then obj : acc - else acc - ) - [] - outstandingFifo - - when (Set.size requestedIdsSet /= length requestedObjects) $ - throwIO ProtocolErrorRequestedUnavailableObject - - traceWith tracer (TraceObjectDiffusionOutboundSendMsgReplyObjects requestedObjects) - - pure (SendMsgReplyObjects requestedObjects (makeBundle st)) - --- | Retry an action that returns 'Maybe a' until it returns 'Just a'. --- The 'onRetry' action is executed before each retry. -retryMaybeWith :: Monad m => m () -> m (Maybe a) -> m a -retryMaybeWith onRetry action = loop - where - loop = do - mx <- action - case mx of - Just x -> pure x - Nothing -> onRetry >> loop - --- | Convert an empty map to 'Nothing', and a non-empty map to 'Just' that map. --- --- TODO: we could alternatively replace this with 'Data.NonEmpty.Map.nonEmptyMap' -ensureNonEmpty :: Map k v -> Maybe (Map k v) -ensureNonEmpty m - | Map.null m = Nothing - | otherwise = Just m diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/PerasCert.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/PerasCert.hs deleted file mode 100644 index ba0ba934a2..0000000000 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/PerasCert.hs +++ /dev/null @@ -1,41 +0,0 @@ --- | This module defines type aliases for the ObjectDiffusion protocol applied --- to PerasCert diffusion. -module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.PerasCert - ( TracePerasCertDiffusionInbound - , TracePerasCertDiffusionOutbound - , PerasCertPoolReader - , PerasCertPoolWriter - , PerasCertDiffusionInboundPipelined - , PerasCertDiffusionOutbound - , PerasCertDiffusion - ) where - -import Ouroboros.Consensus.Block -import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound -import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API -import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Outbound -import Ouroboros.Consensus.Storage.PerasCertDB.API -import Ouroboros.Network.Protocol.ObjectDiffusion.Inbound (ObjectDiffusionInboundPipelined) -import Ouroboros.Network.Protocol.ObjectDiffusion.Outbound (ObjectDiffusionOutbound) -import Ouroboros.Network.Protocol.ObjectDiffusion.Type (ObjectDiffusion) - -type TracePerasCertDiffusionInbound blk = - TraceObjectDiffusionInbound PerasRoundNo (PerasCert blk) - -type TracePerasCertDiffusionOutbound blk = - TraceObjectDiffusionOutbound PerasRoundNo (PerasCert blk) - -type PerasCertPoolReader blk m = - ObjectPoolReader PerasRoundNo (PerasCert blk) PerasCertTicketNo m - -type PerasCertPoolWriter blk m = - ObjectPoolWriter PerasRoundNo (PerasCert blk) m - -type PerasCertDiffusionInboundPipelined blk m a = - ObjectDiffusionInboundPipelined PerasRoundNo (PerasCert blk) m a - -type PerasCertDiffusionOutbound blk m a = - ObjectDiffusionOutbound PerasRoundNo (PerasCert blk) m a - -type PerasCertDiffusion blk = - ObjectDiffusion PerasRoundNo (PerasCert blk) diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Node/Serialisation.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Node/Serialisation.hs index 6a4fc87229..6520aae47c 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Node/Serialisation.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Node/Serialisation.hs @@ -6,11 +6,8 @@ {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE PolyKinds #-} {-# LANGUAGE RankNTypes #-} -{-# LANGUAGE RecordWildCards #-} -{-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE StandaloneDeriving #-} {-# LANGUAGE StandaloneKindSignatures #-} -{-# LANGUAGE TypeApplications #-} {-# LANGUAGE UndecidableInstances #-} -- | Serialisation for sending things across the network. @@ -36,8 +33,8 @@ module Ouroboros.Consensus.Node.Serialisation , Some (..) ) where -import Codec.CBOR.Decoding (Decoder, decodeListLenOf) -import Codec.CBOR.Encoding (Encoding, encodeListLen) +import Codec.CBOR.Decoding (Decoder) +import Codec.CBOR.Encoding (Encoding) import Codec.Serialise (Serialise (decode, encode)) import Data.Kind import Data.SOP.BasicFunctors @@ -50,15 +47,7 @@ import Ouroboros.Consensus.Ledger.SupportsMempool import Ouroboros.Consensus.Node.NetworkProtocolVersion import Ouroboros.Consensus.TypeFamilyWrappers import Ouroboros.Consensus.Util (Some (..)) -import Ouroboros.Network.Block - ( Tip - , decodePoint - , decodeTip - , encodePoint - , encodeTip - , unwrapCBORinCBOR - , wrapCBORinCBOR - ) +import Ouroboros.Network.Block (unwrapCBORinCBOR, wrapCBORinCBOR) {------------------------------------------------------------------------------- NodeToNode @@ -184,29 +173,6 @@ deriving newtype instance SerialiseNodeToNode blk (GenTxId blk) => SerialiseNodeToNode blk (WrapGenTxId blk) -instance ConvertRawHash blk => SerialiseNodeToNode blk (Point blk) where - encodeNodeToNode _ccfg _version = encodePoint $ encodeRawHash (Proxy @blk) - decodeNodeToNode _ccfg _version = decodePoint $ decodeRawHash (Proxy @blk) - -instance ConvertRawHash blk => SerialiseNodeToNode blk (Tip blk) where - encodeNodeToNode _ccfg _version = encodeTip $ encodeRawHash (Proxy @blk) - decodeNodeToNode _ccfg _version = decodeTip $ decodeRawHash (Proxy @blk) - -instance SerialiseNodeToNode blk PerasRoundNo where - encodeNodeToNode _ccfg _version = encode - decodeNodeToNode _ccfg _version = decode -instance ConvertRawHash blk => SerialiseNodeToNode blk (PerasCert blk) where - -- Consistent with the 'Serialise' instance for 'PerasCert' defined in Ouroboros.Consensus.Block.SupportsPeras - encodeNodeToNode ccfg version PerasCert{..} = - encodeListLen 2 - <> encodeNodeToNode ccfg version pcCertRound - <> encodeNodeToNode ccfg version pcCertBoostedBlock - decodeNodeToNode ccfg version = do - decodeListLenOf 2 - pcCertRound <- decodeNodeToNode ccfg version - pcCertBoostedBlock <- decodeNodeToNode ccfg version - pure $ PerasCert pcCertRound pcCertBoostedBlock - deriving newtype instance SerialiseNodeToClient blk (GenTxId blk) => SerialiseNodeToClient blk (WrapGenTxId blk) diff --git a/ouroboros-consensus/test/consensus-test/Main.hs b/ouroboros-consensus/test/consensus-test/Main.hs index 79d681213a..beddd1f7d2 100644 --- a/ouroboros-consensus/test/consensus-test/Main.hs +++ b/ouroboros-consensus/test/consensus-test/Main.hs @@ -16,8 +16,6 @@ import qualified Test.Consensus.MiniProtocol.BlockFetch.Client (tests) import qualified Test.Consensus.MiniProtocol.ChainSync.CSJ (tests) import qualified Test.Consensus.MiniProtocol.ChainSync.Client (tests) import qualified Test.Consensus.MiniProtocol.LocalStateQuery.Server (tests) -import qualified Test.Consensus.MiniProtocol.ObjectDiffusion.PerasCert.Smoke (tests) -import qualified Test.Consensus.MiniProtocol.ObjectDiffusion.Smoke (tests) import qualified Test.Consensus.Peras.WeightSnapshot (tests) import qualified Test.Consensus.Util.MonadSTM.NormalForm (tests) import qualified Test.Consensus.Util.Versioned (tests) @@ -39,8 +37,6 @@ tests = , Test.Consensus.MiniProtocol.BlockFetch.Client.tests , Test.Consensus.MiniProtocol.ChainSync.CSJ.tests , Test.Consensus.MiniProtocol.ChainSync.Client.tests - , Test.Consensus.MiniProtocol.ObjectDiffusion.Smoke.tests - , Test.Consensus.MiniProtocol.ObjectDiffusion.PerasCert.Smoke.tests , Test.Consensus.MiniProtocol.LocalStateQuery.Server.tests , testGroup "Mempool" diff --git a/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ObjectDiffusion/PerasCert/Smoke.hs b/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ObjectDiffusion/PerasCert/Smoke.hs deleted file mode 100644 index baee97b57d..0000000000 --- a/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ObjectDiffusion/PerasCert/Smoke.hs +++ /dev/null @@ -1,152 +0,0 @@ -{-# LANGUAGE FlexibleContexts #-} -{-# LANGUAGE FlexibleInstances #-} -{-# LANGUAGE MultiParamTypeClasses #-} -{-# LANGUAGE NamedFieldPuns #-} -{-# LANGUAGE RankNTypes #-} -{-# LANGUAGE TypeApplications #-} -{-# LANGUAGE UndecidableInstances #-} -{-# OPTIONS_GHC -Wno-orphans #-} - -module Test.Consensus.MiniProtocol.ObjectDiffusion.PerasCert.Smoke (tests) where - -import Control.Tracer (contramap, nullTracer) -import Data.Functor.Identity (Identity (..)) -import qualified Data.List.NonEmpty as NE -import qualified Data.Map as Map -import Network.TypedProtocol.Driver.Simple (runPeer, runPipelinedPeer) -import Ouroboros.Consensus.Block.SupportsPeras -import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API -import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.PerasCert -import Ouroboros.Consensus.Storage.PerasCertDB.API - ( AddPerasCertResult (..) - , PerasCertDB - , PerasCertTicketNo - ) -import qualified Ouroboros.Consensus.Storage.PerasCertDB.API as PerasCertDB -import qualified Ouroboros.Consensus.Storage.PerasCertDB.Impl as PerasCertDB -import Ouroboros.Consensus.Util.IOLike -import Ouroboros.Network.Block (Point (..), SlotNo (SlotNo), StandardHash) -import Ouroboros.Network.Point (Block (Block), WithOrigin (..)) -import Ouroboros.Network.Protocol.ObjectDiffusion.Codec -import Ouroboros.Network.Protocol.ObjectDiffusion.Inbound - ( objectDiffusionInboundPeerPipelined - ) -import Ouroboros.Network.Protocol.ObjectDiffusion.Outbound (objectDiffusionOutboundPeer) -import Test.Consensus.MiniProtocol.ObjectDiffusion.Smoke - ( ListWithUniqueIds (..) - , WithId - , genListWithUniqueIds - , genProtocolConstants - , getId - , prop_smoke_object_diffusion - ) -import Test.QuickCheck -import Test.Tasty -import Test.Tasty.QuickCheck (testProperty) -import Test.Util.TestBlock - -tests :: TestTree -tests = - testGroup - "ObjectDiffusion.PerasCert.Smoke" - [ testProperty - "PerasCertDiffusion smoke test" - prop_smoke - ] - -perasTestCfg :: PerasCfg TestBlock -perasTestCfg = makePerasCfg Nothing - -genPoint :: Gen (Point (TestBlock)) -genPoint = - -- Sometimes pick the genesis point - frequency - [ (1, pure $ Point Origin) - , - ( 50 - , do - slotNo <- SlotNo <$> arbitrary - hash <- TestHash . NE.fromList . getNonEmpty <$> arbitrary - pure $ Point (At (Block slotNo hash)) - ) - ] - -genPerasCert :: Gen (PerasCert TestBlock) -genPerasCert = do - pcCertRound <- PerasRoundNo <$> arbitrary - pcCertBoostedBlock <- genPoint - pure $ PerasCert{pcCertRound, pcCertBoostedBlock} - -instance WithId (PerasCert blk) PerasRoundNo where - getId = pcCertRound - -newCertDB :: - (IOLike m, StandardHash blk) => - PerasCfg blk -> - [PerasCert blk] -> - m (PerasCertDB m blk) -newCertDB perasCfg certs = do - db <- PerasCertDB.openDB (PerasCertDB.PerasCertDbArgs @Identity nullTracer) - mapM_ - ( \cert -> do - let validatedCert = - ValidatedPerasCert - { vpcCert = cert - , vpcCertBoost = perasCfgWeightBoost perasCfg - } - result <- PerasCertDB.addCert db validatedCert - case result of - AddedPerasCertToDB -> pure () - PerasCertAlreadyInDB -> throwIO (userError "Expected AddedPerasCertToDB, but cert was already in DB") - ) - certs - pure db - -prop_smoke :: Property -prop_smoke = - forAll genProtocolConstants $ \protocolConstants -> - forAll (genListWithUniqueIds genPerasCert) $ \(ListWithUniqueIds certs) -> - let - runOutboundPeer outbound outboundChannel tracer = - runPeer - ((\x -> "Outbound (Client): " ++ show x) `contramap` tracer) - codecObjectDiffusionId - outboundChannel - (objectDiffusionOutboundPeer outbound) - >> pure () - runInboundPeer inbound inboundChannel tracer = - runPipelinedPeer - ((\x -> "Inbound (Server): " ++ show x) `contramap` tracer) - codecObjectDiffusionId - inboundChannel - (objectDiffusionInboundPeerPipelined inbound) - >> pure () - mkPoolInterfaces :: - forall m. - IOLike m => - m - ( ObjectPoolReader PerasRoundNo (PerasCert TestBlock) PerasCertTicketNo m - , ObjectPoolWriter PerasRoundNo (PerasCert TestBlock) m - , m [PerasCert TestBlock] - ) - mkPoolInterfaces = do - outboundPool <- newCertDB perasTestCfg certs - inboundPool <- newCertDB perasTestCfg [] - - let outboundPoolReader = makePerasCertPoolReaderFromCertDB outboundPool - inboundPoolWriter = makePerasCertPoolWriterFromCertDB inboundPool - getAllInboundPoolContent = atomically $ do - snap <- PerasCertDB.getCertSnapshot inboundPool - let rawContent = - Map.toAscList $ - PerasCertDB.getCertsAfter snap (PerasCertDB.zeroPerasCertTicketNo) - pure $ getPerasCert . snd <$> rawContent - - return (outboundPoolReader, inboundPoolWriter, getAllInboundPoolContent) - in - prop_smoke_object_diffusion - protocolConstants - certs - runOutboundPeer - runInboundPeer - mkPoolInterfaces diff --git a/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ObjectDiffusion/Smoke.hs b/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ObjectDiffusion/Smoke.hs deleted file mode 100644 index bcd952fb31..0000000000 --- a/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ObjectDiffusion/Smoke.hs +++ /dev/null @@ -1,326 +0,0 @@ -{-# LANGUAGE DerivingVia #-} -{-# LANGUAGE FlexibleContexts #-} -{-# LANGUAGE FlexibleInstances #-} -{-# LANGUAGE FunctionalDependencies #-} -{-# LANGUAGE GADTs #-} -{-# LANGUAGE GeneralizedNewtypeDeriving #-} -{-# LANGUAGE KindSignatures #-} -{-# LANGUAGE NumericUnderscores #-} -{-# LANGUAGE RankNTypes #-} -{-# LANGUAGE ScopedTypeVariables #-} - --- | Smoke tests for the object diffusion protocol. This uses a trivial object --- pool and checks that a few objects can indeed be transferred from the --- outbound to the inbound peer. -module Test.Consensus.MiniProtocol.ObjectDiffusion.Smoke - ( tests - , WithId (..) - , ListWithUniqueIds (..) - , genListWithUniqueIds - , ProtocolConstants - , genProtocolConstants - , prop_smoke_object_diffusion - ) where - -import Control.Monad (void) -import Control.Monad.IOSim (runSimStrictShutdown) -import Control.ResourceRegistry (forkLinkedThread, withRegistry) -import Control.Tracer (Tracer, nullTracer, traceWith) -import Data.Containers.ListUtils (nubOrdOn) -import Data.Data (Typeable) -import Data.Functor.Contravariant (contramap) -import qualified Data.Map as Map -import Network.TypedProtocol.Channel (Channel, createConnectedChannels) -import Network.TypedProtocol.Codec (AnyMessage) -import Network.TypedProtocol.Driver.Simple (runPeer, runPipelinedPeer) -import NoThunks.Class (NoThunks) -import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound - ( objectDiffusionInbound - ) -import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API - ( ObjectPoolReader (..) - , ObjectPoolWriter (..) - ) -import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Outbound (objectDiffusionOutbound) -import Ouroboros.Consensus.Util.IOLike - ( IOLike - , MonadDelay (..) - , MonadSTM (..) - , StrictTVar - , modifyTVar - , readTVar - , uncheckedNewTVarM - , writeTVar - ) -import Ouroboros.Network.ControlMessage (ControlMessage (..)) -import Ouroboros.Network.NodeToNode.Version (NodeToNodeVersion (..)) -import Ouroboros.Network.Protocol.ObjectDiffusion.Codec (codecObjectDiffusionId) -import Ouroboros.Network.Protocol.ObjectDiffusion.Inbound - ( ObjectDiffusionInboundPipelined - , objectDiffusionInboundPeerPipelined - ) -import Ouroboros.Network.Protocol.ObjectDiffusion.Outbound - ( ObjectDiffusionOutbound - , objectDiffusionOutboundPeer - ) -import Ouroboros.Network.Protocol.ObjectDiffusion.Type - ( NumObjectIdsReq (..) - , NumObjectsOutstanding (..) - , NumObjectsReq (..) - , ObjectDiffusion - ) -import Test.QuickCheck -import Test.Tasty -import Test.Tasty.QuickCheck -import Test.Util.Orphans.IOLike () - -tests :: TestTree -tests = - testGroup - "ObjectDiffusion.Smoke" - [ testProperty - "ObjectDiffusion smoke test with mock objects" - prop_smoke - ] - -class WithId a idTy | a -> idTy where - getId :: a -> idTy - -newtype ListWithUniqueIds a idTy = ListWithUniqueIds [a] - deriving (Eq, Show, Ord) - -genListWithUniqueIds :: (Ord idTy, WithId a idTy) => Gen a -> Gen (ListWithUniqueIds a idTy) -genListWithUniqueIds genA = ListWithUniqueIds . nubOrdOn getId <$> listOf genA - -instance WithId SmokeObject SmokeObjectId where getId = getSmokeObjectId - -{------------------------------------------------------------------------------- - Mock objectPools --------------------------------------------------------------------------------} - -newtype SmokeObjectId = SmokeObjectId Int - deriving (Eq, Ord, Show, NoThunks) - -newtype SmokeObject = SmokeObject {getSmokeObjectId :: SmokeObjectId} - deriving (Eq, Ord, Show, NoThunks) - -genSmokeObject :: Gen SmokeObject -genSmokeObject = SmokeObject . SmokeObjectId <$> arbitrary - -newtype SmokeObjectPool m = SmokeObjectPool (StrictTVar m [SmokeObject]) - -newObjectPool :: MonadSTM m => [SmokeObject] -> m (SmokeObjectPool m) -newObjectPool initialPoolContent = SmokeObjectPool <$> uncheckedNewTVarM initialPoolContent - -makeObjectPoolReader :: - MonadSTM m => SmokeObjectPool m -> ObjectPoolReader SmokeObjectId SmokeObject Int m -makeObjectPoolReader (SmokeObjectPool poolContentTvar) = - ObjectPoolReader - { oprObjectId = getSmokeObjectId - , oprObjectsAfter = \minTicketNo limit -> do - poolContent <- readTVar poolContentTvar - let newContent = drop minTicketNo $ zip [(1 :: Int) ..] poolContent - if null newContent - then pure Nothing - else - pure . Just . pure $ - Map.fromList (take (fromIntegral limit) newContent) - , oprZeroTicketNo = 0 - } - -makeObjectPoolWriter :: - MonadSTM m => SmokeObjectPool m -> ObjectPoolWriter SmokeObjectId SmokeObject m -makeObjectPoolWriter (SmokeObjectPool poolContentTvar) = - ObjectPoolWriter - { opwObjectId = getSmokeObjectId - , opwAddObjects = \objects -> do - atomically $ modifyTVar poolContentTvar (++ objects) - return () - , opwHasObject = do - poolContent <- readTVar poolContentTvar - pure $ \objectId -> any (\obj -> getSmokeObjectId obj == objectId) poolContent - } - -mkMockPoolInterfaces :: - MonadSTM m => - [SmokeObject] -> - m - ( ObjectPoolReader SmokeObjectId SmokeObject Int m - , ObjectPoolWriter SmokeObjectId SmokeObject m - , m [SmokeObject] - ) -mkMockPoolInterfaces objects = do - outboundPool <- newObjectPool objects - inboundPool@(SmokeObjectPool tvar) <- newObjectPool [] - - let outboundPoolReader = makeObjectPoolReader outboundPool - inboundPoolWriter = makeObjectPoolWriter inboundPool - - return (outboundPoolReader, inboundPoolWriter, atomically $ readTVar tvar) - -{------------------------------------------------------------------------------- - Main properties --------------------------------------------------------------------------------} - --- Protocol constants - -newtype ProtocolConstants - = ProtocolConstants (NumObjectsOutstanding, NumObjectIdsReq, NumObjectsReq) - deriving Show - -genProtocolConstants :: Gen ProtocolConstants -genProtocolConstants = do - maxFifoSize <- choose (5, 20) - maxIdsToReq <- choose (3, maxFifoSize) - maxObjectsToReq <- choose (2, maxIdsToReq) - pure $ - ProtocolConstants - ( NumObjectsOutstanding maxFifoSize - , NumObjectIdsReq maxIdsToReq - , NumObjectsReq maxObjectsToReq - ) - -nodeToNodeVersion :: NodeToNodeVersion -nodeToNodeVersion = NodeToNodeV_14 - -prop_smoke :: Property -prop_smoke = - forAll genProtocolConstants $ \protocolConstants -> - forAll (genListWithUniqueIds genSmokeObject) $ \(ListWithUniqueIds objects) -> - let - runOutboundPeer outbound outboundChannel tracer = - runPeer - ((\x -> "Outbound (Server): " ++ show x) `contramap` tracer) - codecObjectDiffusionId - outboundChannel - (objectDiffusionOutboundPeer outbound) - >> pure () - - runInboundPeer inbound inboundChannel tracer = - runPipelinedPeer - ((\x -> "Inbound (Client): " ++ show x) `contramap` tracer) - codecObjectDiffusionId - inboundChannel - (objectDiffusionInboundPeerPipelined inbound) - >> pure () - in - prop_smoke_object_diffusion - protocolConstants - objects - runOutboundPeer - runInboundPeer - (mkMockPoolInterfaces objects) - ---- The core logic of the smoke test is shared between the generic smoke tests for ObjectDiffusion, and the ones specialised to PerasCert/PerasVote diffusion -prop_smoke_object_diffusion :: - ( Eq object - , Show object - , Ord objectId - , Typeable objectId - , Typeable object - , NoThunks objectId - , Show objectId - , NoThunks object - , Ord ticketNo - ) => - ProtocolConstants -> - [object] -> - ( forall m. - IOLike m => - ObjectDiffusionOutbound objectId object m () -> - Channel m (AnyMessage (ObjectDiffusion objectId object)) -> - (Tracer m String) -> - m () - ) -> - ( forall m. - IOLike m => - ObjectDiffusionInboundPipelined objectId object m () -> - (Channel m (AnyMessage (ObjectDiffusion objectId object))) -> - (Tracer m String) -> - m () - ) -> - ( forall m. - IOLike m => - m - ( ObjectPoolReader objectId object ticketNo m - , ObjectPoolWriter objectId object m - , m [object] - ) - ) -> - Property -prop_smoke_object_diffusion - (ProtocolConstants (maxFifoSize, maxIdsToReq, maxObjectsToReq)) - objects - runOutboundPeer - runInboundPeer - mkPoolInterfaces = - let - simulationResult = runSimStrictShutdown $ do - let tracer = nullTracer - - traceWith tracer "========== [ Starting ObjectDiffusion smoke test ] ==========" - traceWith tracer "objects: " - traceWith tracer (show objects) - - (outboundPoolReader, inboundPoolWriter, getAllInboundPoolContent) <- mkPoolInterfaces - - -- We wait until the inbound pool content stabilizes - -- Caveat: in the case where objects are continuously added to the - -- outbound pool, this may never terminate. - let waitUntilSettlement prevValue = do - -- TODO: should have a delay value equal to 4·Δ + Ɛ - -- were Δ is the delay in which any message is delivered on the - -- network and Ɛ is a small margin to encompass computation time; - -- as in the worst case, we need 4 echanged messages - -- (+ computation time, assumed negligible w.r.t. network delays) - -- to see a state update on the inbound side - threadDelay 10_000 - newValue <- getAllInboundPoolContent - if newValue == prevValue - then pure () - else waitUntilSettlement newValue - - controlMessage <- uncheckedNewTVarM Continue - - let - inbound = - objectDiffusionInbound - tracer - ( maxFifoSize - , maxIdsToReq - , maxObjectsToReq - ) - inboundPoolWriter - nodeToNodeVersion - (readTVar controlMessage) - - outbound = - objectDiffusionOutbound - tracer - maxFifoSize - outboundPoolReader - nodeToNodeVersion - - withRegistry $ \reg -> do - (outboundChannel, inboundChannel) <- createConnectedChannels - _outboundThread <- - forkLinkedThread reg "ObjectDiffusion Outbound peer thread" $ - runOutboundPeer outbound outboundChannel tracer - _inboundThread <- - forkLinkedThread reg "ObjectDiffusion Inbound peer thread" $ - runInboundPeer inbound inboundChannel tracer - - void $ waitUntilSettlement [] - atomically $ writeTVar controlMessage Terminate - - traceWith tracer "========== [ ObjectDiffusion smoke test finished ] ==========" - - poolContent <- getAllInboundPoolContent - traceWith tracer "inboundPoolContent: " - traceWith tracer (show poolContent) - traceWith tracer "========== ======================================= ==========" - pure poolContent - in - case simulationResult of - Right inboundPoolContent -> inboundPoolContent === objects - Left msg -> counterexample (show msg) $ property False diff --git a/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/PerasCertDB/StateMachine.hs b/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/PerasCertDB/StateMachine.hs index 504949b9ed..dbab2d03c9 100644 --- a/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/PerasCertDB/StateMachine.hs +++ b/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/PerasCertDB/StateMachine.hs @@ -116,18 +116,7 @@ instance StateModel Model where precondition (Model model) = \case OpenDB -> not model.open - action -> - model.open && case action of - CloseDB -> True - -- Equivocating certificates are not possible by construction, - -- so we should ensure that adding a certificate with the same round - -- number but different content is rejected. - -- See https://tweag.github.io/cardano-peras/peras-design.pdf#subsection.2.2.2 - AddCert cert -> all p model.certs - where - p cert' = getPerasCertRound cert /= getPerasCertRound cert' || cert == cert' - GetWeightSnapshot -> True - GarbageCollect _slot -> True + _ -> model.open deriving stock instance Show (Action Model a) deriving stock instance Eq (Action Model a)