66{-# LANGUAGE QuantifiedConstraints #-}
77{-# LANGUAGE RankNTypes #-}
88{-# LANGUAGE ScopedTypeVariables #-}
9- {-# LANGUAGE StandaloneDeriving #-}
109{-# LANGUAGE TypeFamilies #-}
1110-- @UndecidableInstances@ extensions is required for defining @Show@ instance
1211-- of @'TraceSendRecv'@.
@@ -19,10 +18,12 @@ module Ouroboros.Network.Driver.Simple
1918 -- $intro
2019 -- * Normal peers
2120 runPeer
21+ , runAnnotatedPeer
2222 , TraceSendRecv (.. )
2323 , DecoderFailure (.. )
2424 -- * Pipelined peers
2525 , runPipelinedPeer
26+ , runPipelinedAnnotatedPeer
2627 -- * Connected peers
2728 -- TODO: move these to a test lib
2829 , Role (.. )
@@ -43,6 +44,9 @@ import Ouroboros.Network.Channel
4344import Control.Monad.Class.MonadAsync
4445import Control.Monad.Class.MonadThrow
4546import Control.Tracer (Tracer (.. ), contramap , traceWith )
47+ import Data.Maybe (fromMaybe )
48+ import Data.Functor.Identity (Identity )
49+ import Control.Monad.Identity (Identity (.. ))
4650
4751
4852-- $intro
@@ -107,18 +111,31 @@ instance Show DecoderFailure where
107111instance Exception DecoderFailure where
108112
109113
110- driverSimple :: forall ps failure bytes m .
111- ( MonadThrow m
112- , Show failure
113- , forall (st :: ps ). Show (ClientHasAgency st )
114- , forall (st :: ps ). Show (ServerHasAgency st )
115- , ShowProxy ps
116- )
117- => Tracer m (TraceSendRecv ps )
118- -> Codec ps failure m bytes
119- -> Channel m bytes
120- -> Driver ps (Maybe bytes ) m
121- driverSimple tracer Codec {encode, decode} channel@ Channel {send} =
114+ mkSimpleDriver :: forall ps failure bytes m f annotator .
115+ ( MonadThrow m
116+ , Show failure
117+ , forall (st :: ps ). Show (ClientHasAgency st )
118+ , forall (st :: ps ). Show (ServerHasAgency st )
119+ , ShowProxy ps
120+ )
121+ => (forall a .
122+ Channel m bytes
123+ -> Maybe bytes
124+ -> DecodeStep bytes failure m (f a )
125+ -> m (Either failure (a , Maybe bytes ))
126+ )
127+ -- ^ run incremental decoder against a channel
128+
129+ -> (forall st . annotator st -> f (SomeMessage st ))
130+ -- ^ transform annotator to a container holding the decoded
131+ -- message
132+
133+ -> Tracer m (TraceSendRecv ps )
134+ -> Codec' ps failure m annotator bytes
135+ -> Channel m bytes
136+ -> Driver ps (Maybe bytes ) m
137+
138+ mkSimpleDriver runDecodeSteps nat tracer Codec {encode, decode} channel@ Channel {send} =
122139 Driver { sendMessage, recvMessage, startDState = Nothing }
123140 where
124141 sendMessage :: forall (pr :: PeerRole ) (st :: ps ) (st' :: ps ).
@@ -135,7 +152,7 @@ driverSimple tracer Codec{encode, decode} channel@Channel{send} =
135152 -> m (SomeMessage st , Maybe bytes )
136153 recvMessage stok trailing = do
137154 decoder <- decode stok
138- result <- runDecoderWithChannel channel trailing decoder
155+ result <- runDecodeSteps channel trailing (nat <$> decoder)
139156 case result of
140157 Right x@ (SomeMessage msg, _trailing') -> do
141158 traceWith tracer (TraceRecvMsg (AnyMessageAndAgency stok msg))
@@ -144,6 +161,36 @@ driverSimple tracer Codec{encode, decode} channel@Channel{send} =
144161 throwIO (DecoderFailure stok failure)
145162
146163
164+ simpleDriver :: forall ps failure bytes m .
165+ ( MonadThrow m
166+ , Show failure
167+ , forall (st :: ps ). Show (ClientHasAgency st )
168+ , forall (st :: ps ). Show (ServerHasAgency st )
169+ , ShowProxy ps
170+ )
171+ => Tracer m (TraceSendRecv ps )
172+ -> Codec ps failure m bytes
173+ -> Channel m bytes
174+ -> Driver ps (Maybe bytes ) m
175+ simpleDriver = mkSimpleDriver runDecoderWithChannel Identity
176+
177+
178+ annotatedSimpleDriver
179+ :: forall ps failure bytes m .
180+ ( MonadThrow m
181+ , Monoid bytes
182+ , Show failure
183+ , forall (st :: ps ). Show (ClientHasAgency st )
184+ , forall (st :: ps ). Show (ServerHasAgency st )
185+ , ShowProxy ps
186+ )
187+ => Tracer m (TraceSendRecv ps )
188+ -> AnnotatedCodec ps failure m bytes
189+ -> Channel m bytes
190+ -> Driver ps (Maybe bytes ) m
191+ annotatedSimpleDriver = mkSimpleDriver runAnnotatedDecoderWithChannel runAnnotator
192+
193+
147194-- | Run a peer with the given channel via the given codec.
148195--
149196-- This runs the peer to completion (if the protocol allows for termination).
@@ -164,7 +211,31 @@ runPeer
164211runPeer tracer codec channel peer =
165212 runPeerWithDriver driver peer (startDState driver)
166213 where
167- driver = driverSimple tracer codec channel
214+ driver = simpleDriver tracer codec channel
215+
216+
217+ -- | Run a peer with the given channel via the given annotated codec.
218+ --
219+ -- This runs the peer to completion (if the protocol allows for termination).
220+ --
221+ runAnnotatedPeer
222+ :: forall ps (st :: ps ) pr failure bytes m a .
223+ ( MonadThrow m
224+ , Monoid bytes
225+ , Show failure
226+ , forall (st' :: ps ). Show (ClientHasAgency st' )
227+ , forall (st' :: ps ). Show (ServerHasAgency st' )
228+ , ShowProxy ps
229+ )
230+ => Tracer m (TraceSendRecv ps )
231+ -> AnnotatedCodec ps failure m bytes
232+ -> Channel m bytes
233+ -> Peer ps pr st m a
234+ -> m (a , Maybe bytes )
235+ runAnnotatedPeer tracer codec channel peer =
236+ runPeerWithDriver driver peer (startDState driver)
237+ where
238+ driver = annotatedSimpleDriver tracer codec channel
168239
169240
170241-- | Run a pipelined peer with the given channel via the given codec.
@@ -191,7 +262,35 @@ runPipelinedPeer
191262runPipelinedPeer tracer codec channel peer =
192263 runPipelinedPeerWithDriver driver peer (startDState driver)
193264 where
194- driver = driverSimple tracer codec channel
265+ driver = simpleDriver tracer codec channel
266+
267+
268+ -- | Run a pipelined peer with the given channel via the given annotated codec.
269+ --
270+ -- This runs the peer to completion (if the protocol allows for termination).
271+ --
272+ -- Unlike normal peers, running pipelined peers rely on concurrency, hence the
273+ -- 'MonadAsync' constraint.
274+ --
275+ runPipelinedAnnotatedPeer
276+ :: forall ps (st :: ps ) pr failure bytes m a .
277+ ( MonadAsync m
278+ , MonadThrow m
279+ , Monoid bytes
280+ , Show failure
281+ , forall (st' :: ps ). Show (ClientHasAgency st' )
282+ , forall (st' :: ps ). Show (ServerHasAgency st' )
283+ , ShowProxy ps
284+ )
285+ => Tracer m (TraceSendRecv ps )
286+ -> AnnotatedCodec ps failure m bytes
287+ -> Channel m bytes
288+ -> PeerPipelined ps pr st m a
289+ -> m (a , Maybe bytes )
290+ runPipelinedAnnotatedPeer tracer codec channel peer =
291+ runPipelinedPeerWithDriver driver peer (startDState driver)
292+ where
293+ driver = annotatedSimpleDriver tracer codec channel
195294
196295
197296--
@@ -204,17 +303,36 @@ runPipelinedPeer tracer codec channel peer =
204303runDecoderWithChannel :: Monad m
205304 => Channel m bytes
206305 -> Maybe bytes
207- -> DecodeStep bytes failure m a
306+ -> DecodeStep bytes failure m ( Identity a )
208307 -> m (Either failure (a , Maybe bytes ))
209308
210309runDecoderWithChannel Channel {recv} = go
211310 where
212- go _ (DecodeDone x trailing) = return (Right (x, trailing))
311+ go _ (DecodeDone ( Identity x) trailing) = return (Right (x, trailing))
213312 go _ (DecodeFail failure) = return (Left failure)
214313 go Nothing (DecodePartial k) = recv >>= k >>= go Nothing
215314 go (Just trailing) (DecodePartial k) = k (Just trailing) >>= go Nothing
216315
217316
317+ runAnnotatedDecoderWithChannel
318+ :: forall m bytes failure a .
319+ ( Monad m
320+ , Monoid bytes
321+ )
322+ => Channel m bytes
323+ -> Maybe bytes
324+ -> DecodeStep bytes failure m (bytes -> a )
325+ -> m (Either failure (a , Maybe bytes ))
326+
327+ runAnnotatedDecoderWithChannel Channel {recv} bs0 = go (fromMaybe mempty bs0) bs0
328+ where
329+ go :: bytes -> Maybe bytes -> DecodeStep bytes failure m (bytes -> a ) -> m (Either failure (a , Maybe bytes ))
330+ go bytes _ (DecodeDone f trailing) = return $ Right (f bytes, trailing)
331+ go _bytes _ (DecodeFail failure) = return (Left failure)
332+ go bytes Nothing (DecodePartial k) = recv >>= \ bs -> k bs >>= go (bytes <> fromMaybe mempty bs) Nothing
333+ go bytes (Just trailing) (DecodePartial k) = k (Just trailing) >>= go (bytes <> trailing) Nothing
334+
335+
218336data Role = Client | Server
219337
220338-- | Run two 'Peer's via a pair of connected 'Channel's and a common 'Codec'.
0 commit comments