Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 30 additions & 6 deletions ouroboros-consensus-diffusion/app/conformance-test-runner/Main.hs
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE TypeSynonymInstances #-}

module Main (main) where

import qualified Data.Map.Merge.Lazy as M
import Test.Consensus.PeerSimulator.Resources (PeerSimulatorResources(..), makePeerSimulatorResources)
import Control.Tracer (nullTracer)
import qualified Data.List.NonEmpty as NonEmpty
import Data.Aeson (encode, throwDecode)
import qualified Data.ByteString.Lazy.Char8 as BSL8
import Data.Coerce
Expand All @@ -26,6 +34,9 @@ import System.Environment (getArgs)
import Test.Consensus.OrphanInstances ()
import Test.Consensus.PointSchedule (PointSchedule (..))
import Test.Consensus.PointSchedule.Peers (PeerId (..), Peers (Peers), getPeerIds)
import Test.Util.TestBlock
import Ouroboros.Consensus.Node.Serialisation
import Ouroboros.Consensus.Block.Abstract

testPointSchedule :: PointSchedule blk
testPointSchedule =
Expand Down Expand Up @@ -78,29 +89,42 @@ main = do
let simPeerMap = buildPeerMap (optPort opts) pointSchedule
BSL8.writeFile (optOutputTopologyFile opts) (encode $ makeTopology simPeerMap)

zipMaps :: Ord k => Map k a -> Map k b -> Map k (a, b)
zipMaps = M.merge M.dropMissing M.dropMissing $ M.zipWithMatched $ const (,)

instance SerialiseNodeToNode TestBlock (Header TestBlock)

runServer :: IO ()
runServer = do
let peerMap = buildPeerMap 6001 testPointSchedule

peerSim <- makePeerSimulatorResources nullTracer undefined $ NonEmpty.fromList $ M.keys peerMap

incomingTMV <- newEmptyTMVarIO

peerServers <-
for peerMap $ \port -> do
for (zipMaps peerMap $ psrPeers peerSim) $ \(port, res) -> do
-- Make a TMVar for the chainsync and blockfetch channels exposed through
-- the miniprotocols. These get threaded into the server, which will fill
-- them once the NUT has connected.
csChannelTMV <- newEmptyTMVarIO
bfChannelTMV <- newEmptyTMVarIO
csChannelTMV <- newTVarIO False
bfChannelTMV <- newTVarIO False

putStrLn $ "starting server on " <> show port
let sockAddr = Socket.SockAddrInet port $ Socket.tupleToHostAddress (127, 0, 0, 1)
thread <- async $ run csChannelTMV bfChannelTMV sockAddr
thread <- async $ run res incomingTMV csChannelTMV bfChannelTMV sockAddr
pure ((csChannelTMV, bfChannelTMV), thread)

-- Now, take each of the resulting TMVars. This effectively blocks until the
-- NUT has connected.
_peerChannels <- atomically $ do
for peerServers $ \((csChanTMV, bfChanTMV), _thread) -> do
csChan <- takeTMVar csChanTMV
bfChan <- takeTMVar bfChanTMV
csChan <- readTVar csChanTMV
bfChan <- readTVar bfChanTMV
pure (csChan, bfChan)

for_ peerServers $ uninterruptibleCancel . snd

putStrLn "took everything"

pure ()
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
-- picked up by the peer simulator.
module MiniProtocols (peerSimServer) where

import Ouroboros.Network.Protocol.BlockFetch.Server
import Ouroboros.Network.Util.ShowProxy (ShowProxy)
import Ouroboros.Network.Protocol.ChainSync.Server
import Test.Consensus.PeerSimulator.Resources (PeerResources (..), ChainSyncResources (..), BlockFetchResources (..))
import qualified Codec.CBOR.Decoding as CBOR
import qualified Codec.CBOR.Encoding as CBOR
import Control.Monad (forever)
Expand Down Expand Up @@ -59,10 +63,13 @@ peerSimServer ::
( IOLike m
, SerialiseNodeToNodeConstraints blk
, SupportedNetworkProtocolVersion blk
, ShowProxy blk
, ShowProxy (Header blk)
, MonadSay m
) =>
StrictTMVar m (Mux.Channel m BL.ByteString) ->
StrictTMVar m (Mux.Channel m BL.ByteString) ->
PeerResources m blk ->
StrictTVar m Bool ->
StrictTVar m Bool ->
CodecConfig blk ->
(NodeToNodeVersion -> addr -> CBOR.Encoding) ->
(NodeToNodeVersion -> forall s. CBOR.Decoder s addr) ->
Expand All @@ -71,7 +78,7 @@ peerSimServer ::
NodeToNodeVersion
NodeToNodeVersionData
(OuroborosApplicationWithMinimalCtx 'Mux.ResponderMode addr BL.ByteString m Void ())
peerSimServer csChanTMV bfChanTMV codecCfg encAddr decAddr networkMagic = do
peerSimServer res csChanTMV bfChanTMV codecCfg encAddr decAddr networkMagic = do
forAllVersions application
where
forAllVersions ::
Expand Down Expand Up @@ -115,19 +122,19 @@ peerSimServer csChanTMV bfChanTMV codecCfg encAddr decAddr networkMagic = do
$ MiniProtocolCb
$ \_ctx channel -> do
say "hello from cs"
atomically $
putTMVar csChanTMV channel
pure ((), Nothing)
atomically $ writeTVar csChanTMV True
runPeer nullTracer cChainSyncCodec channel
$ chainSyncServerPeer $ csrServer $ prChainSync res
, mkMiniProtocol
Mux.StartOnDemand
N2N.blockFetchMiniProtocolNum
N2N.blockFetchProtocolLimits
$ MiniProtocolCb
$ \_ctx channel -> do
say "hello from bf"
atomically $
putTMVar bfChanTMV channel
pure ((), Nothing)
atomically $ writeTVar bfChanTMV True
runPeer nullTracer cBlockFetchCodec channel
$ blockFetchServerPeer $ bfrServer $ prBlockFetch res
, mkMiniProtocol
Mux.StartOnDemand
N2N.txSubmissionMiniProtocolNum
Expand All @@ -138,8 +145,8 @@ peerSimServer csChanTMV bfChanTMV codecCfg encAddr decAddr networkMagic = do
where
Consensus.N2N.Codecs
{ cKeepAliveCodec
-- , cChainSyncCodecSerialised
-- , cBlockFetchCodecSerialised
, cChainSyncCodec
, cBlockFetchCodec
} =
Consensus.N2N.defaultCodecs codecCfg blockVersion encAddr decAddr version

Expand Down
30 changes: 20 additions & 10 deletions ouroboros-consensus-diffusion/app/conformance-test-runner/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@

module Server (run) where

import Ouroboros.Consensus.Node.Serialisation
import Ouroboros.Consensus.Node.ProtocolInfo (NumCoreNodes (..))
import Test.Consensus.PeerSimulator.Resources (PeerResources)
import Control.ResourceRegistry
import Control.Tracer
import qualified Data.ByteString.Lazy as BL
Expand Down Expand Up @@ -34,17 +37,20 @@ import qualified Ouroboros.Network.Protocol.Handshake as Handshake
import qualified Ouroboros.Network.Server.Simple as Server
import qualified Ouroboros.Network.Snocket as Snocket
import Ouroboros.Network.Socket (SomeResponderApplication (..), configureSocket)
import Test.Util.TestBlock (TestBlock)
import qualified Test.Util.TestBlock as TB

-- | Glue code for using just the bits from the Diffusion Layer that we need in
-- this context.
serve ::
StrictTMVar IO SockAddr ->
SockAddr ->
N2N.Versions
N2N.NodeToNodeVersion
N2N.NodeToNodeVersionData
(OuroborosApplicationWithMinimalCtx 'Mux.ResponderMode SockAddr BL.ByteString IO Void ()) ->
IO Void
serve sockAddr application = withIOManager \iocp ->
serve incomingTV sockAddr application = withIOManager \iocp ->
Server.with
(Snocket.socketSnocket iocp)
Snocket.makeSocketBearer
Expand All @@ -60,27 +66,31 @@ serve sockAddr application = withIOManager \iocp ->
, haTimeLimits = Handshake.timeLimitsHandshake
}
(SomeResponderApplication <$> application)
(\_ serverAsync -> wait serverAsync)
(\incoming serverAsync -> atomically (tryPutTMVar incomingTV incoming) *> wait serverAsync)

run ::
forall blk.
( SupportedNetworkProtocolVersion blk
, SerialiseNodeToNodeConstraints blk
, ConfigSupportsNode blk
, blk ~ SimpleBlock SimpleMockCrypto SimplePraosRuleExt
, blk ~ TestBlock
) =>
PeerResources IO blk ->
-- | A TMVar for the connecting peer
StrictTMVar IO SockAddr ->
-- | A TMVar for the chainsync channel that we will fill in once the node connects.
StrictTMVar IO (Mux.Channel IO BL.ByteString) ->
StrictTVar IO Bool ->
-- | A TMVar for the blockfetch channel that we will fill in once the node connects.
StrictTMVar IO (Mux.Channel IO BL.ByteString) ->
StrictTVar IO Bool ->
SockAddr ->
IO Void
run csChanTMV bfChanTMV sockAddr = withRegistry \_registry ->
serve sockAddr
$ peerSimServer @_ @(SimpleBlock SimpleMockCrypto SimplePraosRuleExt)
run res incomingTV csChanTMV bfChanTMV sockAddr = withRegistry \_registry ->
serve incomingTV sockAddr
$ peerSimServer @_ @TestBlock
res
csChanTMV
bfChanTMV
SimpleCodecConfig
TB.TestBlockCodecConfig
encodeRemoteAddress
decodeRemoteAddress
$ getNetworkMagic @(SimpleBlock SimpleMockCrypto SimplePraosRuleExt) SimpleBlockConfig
$ getNetworkMagic @TestBlock $ TB.TestBlockConfig $ NumCoreNodes 0
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,13 @@ data Validity = Valid | Invalid
deriving stock (Show, Eq, Ord, Enum, Bounded, Generic)
deriving anyclass (Serialise, NoThunks, ToExpr)


instance SupportedNetworkProtocolVersion (TestBlockWith ptype) where
supportedNodeToNodeVersions _ = foldMap (flip Map.singleton ()) [minBound .. maxBound]
supportedNodeToClientVersions _ = foldMap (flip Map.singleton ()) [minBound .. maxBound]

latestReleasedNodeVersion = latestReleasedNodeVersionDefault

-- | Test block parametrized on the payload type
--
-- For blocks without payload see the 'TestBlock' type alias.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ import Ouroboros.Consensus.Util.RedundantConstraints
-------------------------------------------------------------------------------}

instance SupportedNetworkProtocolVersion (SimpleBlock SimpleMockCrypto ext) where
supportedNodeToNodeVersions _ = Map.singleton maxBound ()
supportedNodeToClientVersions _ = Map.singleton maxBound ()
supportedNodeToNodeVersions _ = foldMap (flip Map.singleton ()) [minBound .. maxBound]
supportedNodeToClientVersions _ = foldMap (flip Map.singleton ()) [minBound .. maxBound]

latestReleasedNodeVersion = latestReleasedNodeVersionDefault

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class
-- different IDs from each other and from regular protocols.
constructMockNetworkMagic :: HasCallStack => NetworkMagic
constructMockNetworkMagic =
NetworkMagic $ fromIntegral $ hash (prettyCallStack callStack)
NetworkMagic $ fromIntegral $ 764824073

instance
RunMockBlock c ext =>
Expand Down