Skip to content

Commit 4d15509

Browse files
authored
Merge pull request #5173 from IntersectMBO/mwojtowicz/coot/dmq-node-to-node
dmq: node-to-client applications
2 parents 31f5046 + 2805796 commit 4d15509

File tree

22 files changed

+1393
-82
lines changed

22 files changed

+1393
-82
lines changed

decentralized-message-queue/app/Main.hs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,19 @@ import DMQ.Configuration.CLIOptions (parseCLIOptions)
1717
import DMQ.Configuration.Topology (readTopologyFileOrError)
1818
import DMQ.Diffusion.Applications (diffusionApplications)
1919
import DMQ.Diffusion.Arguments
20-
import DMQ.Diffusion.NodeKernel (withNodeKernel)
20+
import DMQ.Diffusion.NodeKernel (mempool, withNodeKernel)
21+
import DMQ.NodeToClient qualified as NtC
2122
import DMQ.NodeToNode (dmqCodecs, dmqLimitsAndTimeouts, ntnApps)
23+
import DMQ.Protocol.LocalMsgSubmission.Codec
24+
import DMQ.Protocol.SigSubmission.Codec
25+
import DMQ.Protocol.SigSubmission.Type (Sig (..))
2226
import DMQ.Tracer
2327

2428
import DMQ.Diffusion.PeerSelection (policy)
2529
import Ouroboros.Network.Diffusion qualified as Diffusion
2630
import Ouroboros.Network.PeerSelection.PeerSharing.Codec (decodeRemoteAddress,
2731
encodeRemoteAddress)
32+
import Ouroboros.Network.TxSubmission.Mempool.Simple qualified as Mempool
2833

2934
main :: IO ()
3035
main = void . runDMQ =<< execParser opts
@@ -78,6 +83,13 @@ runDMQ commandLineConfig = do
7883
(decodeRemoteAddress maxBound))
7984
dmqLimitsAndTimeouts
8085
defaultSigDecisionPolicy
86+
dmqNtCApps =
87+
let sigSize _ = 0 -- TODO
88+
maxMsgs = 1000 -- TODO: make this dynamic?
89+
mempoolReader = Mempool.getReader sigId sigSize (mempool nodeKernel)
90+
mempoolWriter = Mempool.getWriter sigId (const True) (mempool nodeKernel)
91+
in NtC.ntcApps mempoolReader mempoolWriter maxMsgs
92+
(NtC.dmqCodecs encodeSig decodeSig encodeReject decodeReject)
8193
dmqDiffusionArguments =
8294
diffusionArguments (if handshakeTracer
8395
then WithEventType "Handshake" >$< tracer
@@ -91,6 +103,7 @@ runDMQ commandLineConfig = do
91103
dmqDiffusionConfiguration
92104
dmqLimitsAndTimeouts
93105
dmqNtNApps
106+
dmqNtCApps
94107
(policy policyRng)
95108

96109
Diffusion.run dmqDiffusionArguments

decentralized-message-queue/decentralized-message-queue.cabal

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,16 @@ category: Network
1818
build-type: Simple
1919
extra-doc-files: CHANGELOG.md
2020

21+
common extensions
22+
default-extensions:
23+
BangPatterns
24+
BlockArguments
25+
GADTs
26+
ImportQualifiedPost
27+
LambdaCase
28+
NamedFieldPuns
29+
ScopedTypeVariables
30+
2131
common warnings
2232
ghc-options:
2333
-Wall
@@ -31,7 +41,10 @@ common warnings
3141
-Wunused-packages
3242

3343
library
34-
import: warnings
44+
import:
45+
warnings,
46+
extensions
47+
3548
exposed-modules:
3649
DMQ.Configuration
3750
DMQ.Configuration.CLIOptions
@@ -41,9 +54,20 @@ library
4154
DMQ.Diffusion.NodeKernel
4255
DMQ.Diffusion.PeerSelection
4356
DMQ.NodeToClient
57+
DMQ.NodeToClient.LocalMsgNotification
58+
DMQ.NodeToClient.LocalMsgSubmission
4459
DMQ.NodeToClient.Version
4560
DMQ.NodeToNode
4661
DMQ.NodeToNode.Version
62+
DMQ.Protocol.LocalMsgNotification.Client
63+
DMQ.Protocol.LocalMsgNotification.Codec
64+
DMQ.Protocol.LocalMsgNotification.Examples
65+
DMQ.Protocol.LocalMsgNotification.Server
66+
DMQ.Protocol.LocalMsgNotification.Type
67+
DMQ.Protocol.LocalMsgSubmission.Client
68+
DMQ.Protocol.LocalMsgSubmission.Codec
69+
DMQ.Protocol.LocalMsgSubmission.Server
70+
DMQ.Protocol.LocalMsgSubmission.Type
4771
DMQ.Protocol.SigSubmission.Codec
4872
DMQ.Protocol.SigSubmission.Type
4973
DMQ.Tracer
@@ -74,16 +98,19 @@ library
7498
ouroboros-network-framework ^>=0.19,
7599
ouroboros-network-protocols ^>=0.15,
76100
random ^>=1.2,
101+
singletons,
77102
text >=1.2.4 && <2.2,
78103
time ^>=1.12,
79104
typed-protocols:{typed-protocols, cborg} ^>=1.1,
80105

81106
hs-source-dirs: src
82107
default-language: Haskell2010
83-
default-extensions: ImportQualifiedPost
84108

85109
executable dmq-exe
86-
import: warnings
110+
import:
111+
warnings,
112+
extensions
113+
87114
main-is: Main.hs
88115
ghc-options:
89116
-threaded
@@ -102,15 +129,18 @@ executable dmq-exe
102129

103130
hs-source-dirs: app
104131
default-language: Haskell2010
105-
default-extensions: ImportQualifiedPost
106132

107133
test-suite dmq-test
108-
import: warnings
134+
import:
135+
warnings,
136+
extensions
137+
109138
default-language: Haskell2010
110-
default-extensions: ImportQualifiedPost
111139
other-modules:
112140
Test.DMQ.NodeToClient
113141
Test.DMQ.NodeToNode
142+
Test.DMQ.Protocol.LocalMsgNotification
143+
Test.DMQ.Protocol.LocalMsgSubmission
114144
Test.DMQ.Protocol.SigSubmission
115145

116146
type: exitcode-stdio-1.0
@@ -119,10 +149,17 @@ test-suite dmq-test
119149
build-depends:
120150
QuickCheck,
121151
base >=4.14 && <4.22,
152+
bytestring,
153+
contra-tracer,
122154
decentralized-message-queue,
155+
io-classes,
156+
io-sim,
123157
ouroboros-network-api,
158+
ouroboros-network-framework,
124159
ouroboros-network-protocols:testlib,
160+
ouroboros-network-testing,
125161
quickcheck-instances,
162+
serialise,
126163
tasty,
127164
tasty-quickcheck,
128165
time,

decentralized-message-queue/src/DMQ/Diffusion/Applications.hs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,13 @@ import DMQ.Configuration
88
import DMQ.Diffusion.NodeKernel (NodeKernel (..))
99
import DMQ.NodeToClient (NodeToClientVersion, NodeToClientVersionData,
1010
stdVersionDataNTC)
11+
import DMQ.NodeToClient qualified as NTC
1112
import DMQ.NodeToNode (NodeToNodeVersion, NodeToNodeVersionData,
1213
stdVersionDataNTN)
1314
import DMQ.NodeToNode qualified as NTN
1415

1516
import Ouroboros.Network.Diffusion.Types qualified as Diffusion
1617
import Ouroboros.Network.ExitPolicy (RepromoteDelay (..))
17-
import Ouroboros.Network.Mux (OuroborosApplication (..))
1818
import Ouroboros.Network.PeerSelection.Governor.Types (PeerSelectionPolicy)
1919
import Ouroboros.Network.Protocol.Handshake.Version (combineVersions,
2020
simpleSingletonVersions)
@@ -27,6 +27,7 @@ diffusionApplications
2727
-> Diffusion.Configuration NoExtraFlags m ntnFd ntnAddr ntcFd ntcAddr
2828
-> NTN.LimitsAndTimeouts ntnAddr
2929
-> NTN.Apps ntnAddr m a ()
30+
-> NTC.Apps ntcAddr m ()
3031
-> PeerSelectionPolicy ntnAddr m
3132
-> Diffusion.Applications ntnAddr NodeToNodeVersion NodeToNodeVersionData
3233
ntcAddr NodeToClientVersion NodeToClientVersionData
@@ -44,6 +45,7 @@ diffusionApplications
4445
}
4546
ntnLimitsAndTimeouts
4647
ntnApps
48+
ntcApps
4749
peerSelectionPolicy =
4850
Diffusion.Applications {
4951
daApplicationInitiatorMode =
@@ -67,11 +69,7 @@ diffusionApplications
6769
[ simpleSingletonVersions
6870
version
6971
(stdVersionDataNTC networkMagic)
70-
(\_versionData ->
71-
OuroborosApplication
72-
[
73-
]
74-
)
72+
(NTC.responders ntcApps version)
7573
| version <- [minBound..maxBound]
7674
]
7775
, daRethrowPolicy = muxErrorRethrowPolicy

decentralized-message-queue/src/DMQ/Diffusion/NodeKernel.hs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,16 +39,16 @@ import DMQ.Protocol.SigSubmission.Type (Sig (..), SigId)
3939
data NodeKernel ntnAddr m =
4040
NodeKernel {
4141
-- | The fetch client registry, used for the keep alive clients.
42-
fetchClientRegistry :: FetchClientRegistry (ConnectionId ntnAddr) () () m
42+
fetchClientRegistry :: !(FetchClientRegistry (ConnectionId ntnAddr) () () m)
4343

4444
-- | Read the current peer sharing registry, used for interacting with
4545
-- the PeerSharing protocol
46-
, peerSharingRegistry :: PeerSharingRegistry ntnAddr m
47-
, peerSharingAPI :: PeerSharingAPI ntnAddr StdGen m
48-
, mempool :: Mempool m Sig
49-
, sigChannelVar :: TxChannelsVar m ntnAddr SigId Sig
50-
, sigMempoolSem :: TxMempoolSem m
51-
, sigSharedTxStateVar :: SharedTxStateVar m ntnAddr SigId Sig
46+
, peerSharingRegistry :: !(PeerSharingRegistry ntnAddr m)
47+
, peerSharingAPI :: !(PeerSharingAPI ntnAddr StdGen m)
48+
, mempool :: !(Mempool m Sig)
49+
, sigChannelVar :: !(TxChannelsVar m ntnAddr SigId Sig)
50+
, sigMempoolSem :: !(TxMempoolSem m)
51+
, sigSharedTxStateVar :: !(SharedTxStateVar m ntnAddr SigId Sig)
5252
}
5353

5454
newNodeKernel :: ( MonadLabelledSTM m

0 commit comments

Comments
 (0)