From a96b25fb05e52295e42578539939e170c8ab8530 Mon Sep 17 00:00:00 2001 From: Dzmitry Shuiski Date: Thu, 6 Nov 2025 16:31:26 +0100 Subject: [PATCH 1/5] Limit the number of parallel requests to Ogmios --- spago-packages.nix | 48 ++++++++------------------------ spago.dhall | 1 + src/Kupmios/KupmiosM.purs | 40 +++++++++++++++++++++----- src/Kupmios/KupmiosM/Kupo.purs | 4 +-- src/Kupmios/KupmiosM/Ogmios.purs | 19 +++++++++---- src/Kupmios/Provider.purs | 3 +- src/KupmiosProvider.purs | 1 + 7 files changed, 63 insertions(+), 53 deletions(-) diff --git a/spago-packages.nix b/spago-packages.nix index dff13e4..8acd10e 100644 --- a/spago-packages.nix +++ b/spago-packages.nix @@ -185,18 +185,6 @@ let installPhase = "ln -s $src $out"; }; - "cardano-collateral-select" = pkgs.stdenv.mkDerivation { - name = "cardano-collateral-select"; - version = "v1.0.0"; - src = pkgs.fetchgit { - url = "https://github.com/mlabs-haskell/purescript-cardano-collateral-select"; - rev = "193bf49be979b42aa1f0f9cb3d7582d6bc98e3b9"; - sha256 = "1jbl6k779brbqzf7jf80is63b23k3mqzf2mzr222qswd3wg8s5b0"; - }; - phases = "installPhase"; - installPhase = "ln -s $src $out"; - }; - "cardano-data-lite" = pkgs.stdenv.mkDerivation { name = "cardano-data-lite"; version = "070a1a502472211853099c2566a7e9100a7b1a61"; @@ -209,30 +197,6 @@ let installPhase = "ln -s $src $out"; }; - "cardano-key-wallet" = pkgs.stdenv.mkDerivation { - name = "cardano-key-wallet"; - version = "v2.0.0"; - src = pkgs.fetchgit { - url = "https://github.com/mlabs-haskell/purescript-cardano-key-wallet"; - rev = "99d9bb7c8b291ad0bc9709d493ff7e02d14a89c0"; - sha256 = "11jw05s7vpgg6bdyi3zy4z1fcj53a8kaaja5717b7yjgflmhfn8s"; - }; - phases = "installPhase"; - installPhase = "ln -s $src $out"; - }; - - "cardano-message-signing" = pkgs.stdenv.mkDerivation { - name = "cardano-message-signing"; - version = "v1.0.0"; - src = pkgs.fetchgit { - url = "https://github.com/mlabs-haskell/purescript-cardano-message-signing"; - rev = "97f6f97a258ae3490df0be6b39fa6769677aa04f"; - sha256 = "1ns7m9awn4w5amvf9ffldxk7acm73fg8clw4hja4nnl61mskqr5w"; - }; - phases = "installPhase"; - installPhase = "ln -s $src $out"; - }; - "cardano-plutus-data-schema" = pkgs.stdenv.mkDerivation { name = "cardano-plutus-data-schema"; version = "110ba00261b1480895f8412016c639ea655412f6"; @@ -281,6 +245,18 @@ let installPhase = "ln -s $src $out"; }; + "concurrent-queues" = pkgs.stdenv.mkDerivation { + name = "concurrent-queues"; + version = "v3.0.0"; + src = pkgs.fetchgit { + url = "https://github.com/purescript-contrib/purescript-concurrent-queues.git"; + rev = "905a0cb902dec070fa621819455363660de289c4"; + sha256 = "0wr9fkdf23mam4ip7nwdm1i7qy0l1lysyfkffbqj07dvrgpkzd0f"; + }; + phases = "installPhase"; + installPhase = "ln -s $src $out"; + }; + "console" = pkgs.stdenv.mkDerivation { name = "console"; version = "v6.0.0"; diff --git a/spago.dhall b/spago.dhall index a940441..b6e370e 100644 --- a/spago.dhall +++ b/spago.dhall @@ -9,6 +9,7 @@ , "cardano-provider" , "cardano-data-lite" , "cardano-types" + , "concurrent-queues" , "console" , "control" , "datetime" diff --git a/src/Kupmios/KupmiosM.purs b/src/Kupmios/KupmiosM.purs index 1ab6a6a..7c72d58 100644 --- a/src/Kupmios/KupmiosM.purs +++ b/src/Kupmios/KupmiosM.purs @@ -7,16 +7,22 @@ module Cardano.Kupmios.KupmiosM , ParKupmiosM , KupmiosMT(KupmiosMT) , handleAffjaxResponse + , mkKupmiosEnv ) where import Prelude import Aeson (class DecodeAeson, decodeAeson, parseJsonStringToAeson) import Affjax (Error, Response) as Affjax +import Cardano.Kupmios.Helpers (logWithLevel) +import Cardano.Kupmios.KupmiosM.HttpUtils (handleAffjaxResponseGeneric) import Cardano.Provider.Error ( ClientError(ClientHttpError, ClientHttpResponseError, ClientDecodeJsonError) , ServiceError(ServiceOtherError) ) +import Cardano.Provider.ServerConfig (ServerConfig) +import Concurrent.Queue (Queue) +import Concurrent.Queue (new, write) as Queue import Control.Alt (class Alt) import Control.Alternative (class Alternative) import Control.Monad.Error.Class (class MonadError, class MonadThrow) @@ -26,15 +32,13 @@ import Control.Monad.Reader.Trans (ReaderT(ReaderT), asks) import Control.Monad.Rec.Class (class MonadRec) import Control.Parallel (class Parallel, parallel, sequential) import Control.Plus (class Plus) -import Cardano.Kupmios.Helpers (logWithLevel) -import Cardano.Kupmios.KupmiosM.HttpUtils (handleAffjaxResponseGeneric) - -import Cardano.Provider.ServerConfig (ServerConfig) +import Data.Array ((..)) import Data.Either (Either) import Data.Log.Level (LogLevel) import Data.Log.Message (Message) -import Data.Maybe (Maybe, fromMaybe) +import Data.Maybe (Maybe(Just, Nothing), fromMaybe) import Data.Newtype (class Newtype, unwrap, wrap) +import Data.Traversable (traverse_) import Effect.Aff (Aff, ParAff) import Effect.Aff.Class (class MonadAff, liftAff) import Effect.Class (class MonadEffect) @@ -48,8 +52,13 @@ import Effect.Exception (Error) -- | - logging level -- | - optional custom logger type KupmiosConfig = - { ogmiosConfig :: ServerConfig - , kupoConfig :: ServerConfig + { ogmios :: + { serverConfig :: ServerConfig + , maxParallelRequests :: Maybe Int + } + , kupo :: + { serverConfig :: ServerConfig + } , logLevel :: LogLevel , customLogger :: Maybe (LogLevel -> Message -> Aff Unit) , suppressLogs :: Boolean @@ -58,8 +67,25 @@ type KupmiosConfig = -- | `KupmiosEnv` contains everything needed for `KupmiosM` to run. type KupmiosEnv = { config :: KupmiosConfig + , ogmiosRequestSemaphore :: Maybe (Queue Unit) } +mkKupmiosEnv :: KupmiosConfig -> Aff KupmiosEnv +mkKupmiosEnv config = + case config.ogmios.maxParallelRequests of + Nothing -> + pure + { config + , ogmiosRequestSemaphore: Nothing + } + Just n -> do + sem <- Queue.new + traverse_ (const (Queue.write sem unit)) $ 1 .. n + pure + { config + , ogmiosRequestSemaphore: Just sem + } + type KupmiosM = KupmiosMT Aff type ParKupmiosM = KupmiosMT ParAff diff --git a/src/Kupmios/KupmiosM/Kupo.purs b/src/Kupmios/KupmiosM/Kupo.purs index 76fb09d..3d5049d 100644 --- a/src/Kupmios/KupmiosM/Kupo.purs +++ b/src/Kupmios/KupmiosM/Kupo.purs @@ -156,7 +156,7 @@ getScriptByHash scriptHash = do -- https://github.com/Plutonomicon/cardano-transaction-lib/issues/1293 isTxConfirmed :: TransactionHash -> KupmiosM (Either ClientError (Maybe Slot)) isTxConfirmed txHash = do - config <- asks (_.kupoConfig <<< _.config) + config <- asks (_.serverConfig <<< _.kupo <<< _.config) do -- we don't add `?unspent`, because we only care about existence of UTxOs, -- possibly they can be consumed @@ -493,7 +493,7 @@ unwrapKupoAuxData (KupoAuxiliaryData mAuxData) = mAuxData kupoGetRequest :: String -> KupmiosM (Either Affjax.Error (Affjax.Response String)) kupoGetRequest endpoint = do - config <- asks (_.kupoConfig <<< _.config) + config <- asks (_.serverConfig <<< _.kupo <<< _.config) logTrace' $ "sending kupo request: " <> endpoint liftAff $ kupoGetRequestAff config endpoint diff --git a/src/Kupmios/KupmiosM/Ogmios.purs b/src/Kupmios/KupmiosM/Ogmios.purs index f6e2585..f8c8967 100644 --- a/src/Kupmios/KupmiosM/Ogmios.purs +++ b/src/Kupmios/KupmiosM/Ogmios.purs @@ -46,8 +46,9 @@ import Cardano.Provider.ServerConfig (ServerConfig, mkHttpUrl) import Cardano.Types.CborBytes (CborBytes) import Cardano.Types.Chain as Chain import Cardano.Types.TransactionHash (TransactionHash) +import Concurrent.Queue (read, write) as Queue import Control.Monad.Error.Class (class MonadThrow, throwError) -import Control.Monad.Reader.Class (asks) +import Control.Monad.Reader.Class (ask) import Data.ByteArray (byteArrayToHex) import Data.Either (Either(Left), either, hush) import Data.HTTP.Method (Method(POST)) @@ -56,7 +57,7 @@ import Data.Maybe (Maybe(Just, Nothing)) import Data.Newtype (unwrap, wrap) import Data.Time.Duration (Milliseconds(Milliseconds)) import Data.Tuple.Nested (type (/\), (/\)) -import Effect.Aff (Aff, delay) +import Effect.Aff (Aff, bracket, delay) import Effect.Aff.Class (class MonadAff, liftAff) import Effect.Exception (Error, error) @@ -159,11 +160,17 @@ ogmiosPostRequest :: Aeson -- ^ JSON-RPC request body -> KupmiosM (Either Affjax.Error (Affjax.Response String)) ogmiosPostRequest body = do - config <- asks (_.ogmiosConfig <<< _.config) + { config: { ogmios: { serverConfig } }, ogmiosRequestSemaphore } <- ask logTrace' $ "sending ogmios HTTP request: " <> show body - s <- liftAff $ ogmiosPostRequestAff config body - logTrace' $ "response: " <> (show $ hush $ s) - pure s + let request = ogmiosPostRequestAff serverConfig body + resp <- + liftAff case ogmiosRequestSemaphore of + Just sem -> + bracket (Queue.read sem) (const (Queue.write sem unit)) $ const request + Nothing -> + request + logTrace' $ "response: " <> (show $ hush resp) + pure resp ogmiosPostRequestAff :: ServerConfig diff --git a/src/Kupmios/Provider.purs b/src/Kupmios/Provider.purs index 95d0321..0a1da1a 100644 --- a/src/Kupmios/Provider.purs +++ b/src/Kupmios/Provider.purs @@ -34,8 +34,7 @@ import Data.Maybe (isJust) import Data.Newtype (unwrap, wrap) import Effect.Aff (Aff) -providerForKupmiosBackend - :: (forall (a :: Type). KupmiosM a -> Aff a) -> Provider +providerForKupmiosBackend :: (forall (a :: Type). KupmiosM a -> Aff a) -> Provider providerForKupmiosBackend runKupmiosM = { getDatumByHash: runKupmiosM <<< Kupo.getDatumByHash , getScriptByHash: runKupmiosM <<< Kupo.getScriptByHash diff --git a/src/KupmiosProvider.purs b/src/KupmiosProvider.purs index 93423f5..511ef4d 100644 --- a/src/KupmiosProvider.purs +++ b/src/KupmiosProvider.purs @@ -7,6 +7,7 @@ import Cardano.Kupmios.KupmiosM , KupmiosM , KupmiosMT(KupmiosMT) , handleAffjaxResponse + , mkKupmiosEnv ) as X import Cardano.Kupmios.Kupo ( getDatumByHash From 937f4091b721bc3bc27df460029495dcdafd1c0d Mon Sep 17 00:00:00 2001 From: Dzmitry Shuiski Date: Fri, 7 Nov 2025 15:17:58 +0100 Subject: [PATCH 2/5] Refactor KupmiosEnv initialization interface --- src/Kupmios/KupmiosM.purs | 33 +++++++++----------------------- src/Kupmios/KupmiosM/Kupo.purs | 4 ++-- src/Kupmios/KupmiosM/Ogmios.purs | 4 ++-- src/KupmiosProvider.purs | 2 +- 4 files changed, 14 insertions(+), 29 deletions(-) diff --git a/src/Kupmios/KupmiosM.purs b/src/Kupmios/KupmiosM.purs index 7c72d58..b6edc89 100644 --- a/src/Kupmios/KupmiosM.purs +++ b/src/Kupmios/KupmiosM.purs @@ -7,7 +7,7 @@ module Cardano.Kupmios.KupmiosM , ParKupmiosM , KupmiosMT(KupmiosMT) , handleAffjaxResponse - , mkKupmiosEnv + , initOgmiosRequestSemaphore ) where import Prelude @@ -36,7 +36,7 @@ import Data.Array ((..)) import Data.Either (Either) import Data.Log.Level (LogLevel) import Data.Log.Message (Message) -import Data.Maybe (Maybe(Just, Nothing), fromMaybe) +import Data.Maybe (Maybe, fromMaybe) import Data.Newtype (class Newtype, unwrap, wrap) import Data.Traversable (traverse_) import Effect.Aff (Aff, ParAff) @@ -52,13 +52,8 @@ import Effect.Exception (Error) -- | - logging level -- | - optional custom logger type KupmiosConfig = - { ogmios :: - { serverConfig :: ServerConfig - , maxParallelRequests :: Maybe Int - } - , kupo :: - { serverConfig :: ServerConfig - } + { ogmiosConfig :: ServerConfig + , kupoConfig :: ServerConfig , logLevel :: LogLevel , customLogger :: Maybe (LogLevel -> Message -> Aff Unit) , suppressLogs :: Boolean @@ -70,21 +65,11 @@ type KupmiosEnv = , ogmiosRequestSemaphore :: Maybe (Queue Unit) } -mkKupmiosEnv :: KupmiosConfig -> Aff KupmiosEnv -mkKupmiosEnv config = - case config.ogmios.maxParallelRequests of - Nothing -> - pure - { config - , ogmiosRequestSemaphore: Nothing - } - Just n -> do - sem <- Queue.new - traverse_ (const (Queue.write sem unit)) $ 1 .. n - pure - { config - , ogmiosRequestSemaphore: Just sem - } +initOgmiosRequestSemaphore :: Int -> Aff (Queue Unit) +initOgmiosRequestSemaphore maxParallelRequests = do + sem <- Queue.new + traverse_ (const (Queue.write sem unit)) $ 1 .. maxParallelRequests + pure sem type KupmiosM = KupmiosMT Aff diff --git a/src/Kupmios/KupmiosM/Kupo.purs b/src/Kupmios/KupmiosM/Kupo.purs index 3d5049d..76fb09d 100644 --- a/src/Kupmios/KupmiosM/Kupo.purs +++ b/src/Kupmios/KupmiosM/Kupo.purs @@ -156,7 +156,7 @@ getScriptByHash scriptHash = do -- https://github.com/Plutonomicon/cardano-transaction-lib/issues/1293 isTxConfirmed :: TransactionHash -> KupmiosM (Either ClientError (Maybe Slot)) isTxConfirmed txHash = do - config <- asks (_.serverConfig <<< _.kupo <<< _.config) + config <- asks (_.kupoConfig <<< _.config) do -- we don't add `?unspent`, because we only care about existence of UTxOs, -- possibly they can be consumed @@ -493,7 +493,7 @@ unwrapKupoAuxData (KupoAuxiliaryData mAuxData) = mAuxData kupoGetRequest :: String -> KupmiosM (Either Affjax.Error (Affjax.Response String)) kupoGetRequest endpoint = do - config <- asks (_.serverConfig <<< _.kupo <<< _.config) + config <- asks (_.kupoConfig <<< _.config) logTrace' $ "sending kupo request: " <> endpoint liftAff $ kupoGetRequestAff config endpoint diff --git a/src/Kupmios/KupmiosM/Ogmios.purs b/src/Kupmios/KupmiosM/Ogmios.purs index f8c8967..02a69fc 100644 --- a/src/Kupmios/KupmiosM/Ogmios.purs +++ b/src/Kupmios/KupmiosM/Ogmios.purs @@ -160,9 +160,9 @@ ogmiosPostRequest :: Aeson -- ^ JSON-RPC request body -> KupmiosM (Either Affjax.Error (Affjax.Response String)) ogmiosPostRequest body = do - { config: { ogmios: { serverConfig } }, ogmiosRequestSemaphore } <- ask + { config: { ogmiosConfig }, ogmiosRequestSemaphore } <- ask logTrace' $ "sending ogmios HTTP request: " <> show body - let request = ogmiosPostRequestAff serverConfig body + let request = ogmiosPostRequestAff ogmiosConfig body resp <- liftAff case ogmiosRequestSemaphore of Just sem -> diff --git a/src/KupmiosProvider.purs b/src/KupmiosProvider.purs index 511ef4d..510195b 100644 --- a/src/KupmiosProvider.purs +++ b/src/KupmiosProvider.purs @@ -7,7 +7,7 @@ import Cardano.Kupmios.KupmiosM , KupmiosM , KupmiosMT(KupmiosMT) , handleAffjaxResponse - , mkKupmiosEnv + , initOgmiosRequestSemaphore ) as X import Cardano.Kupmios.Kupo ( getDatumByHash From 51a0b8e8662f7f96b2ffd7c8866c5d38d691b421 Mon Sep 17 00:00:00 2001 From: Dzmitry Shuiski Date: Fri, 7 Nov 2025 15:22:54 +0100 Subject: [PATCH 3/5] Update signature of initOgmiosRequestSemaphore --- src/Kupmios/KupmiosM.purs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Kupmios/KupmiosM.purs b/src/Kupmios/KupmiosM.purs index b6edc89..d753ec3 100644 --- a/src/Kupmios/KupmiosM.purs +++ b/src/Kupmios/KupmiosM.purs @@ -65,8 +65,8 @@ type KupmiosEnv = , ogmiosRequestSemaphore :: Maybe (Queue Unit) } -initOgmiosRequestSemaphore :: Int -> Aff (Queue Unit) -initOgmiosRequestSemaphore maxParallelRequests = do +initOgmiosRequestSemaphore :: { maxParallelRequests :: Int } -> Aff (Queue Unit) +initOgmiosRequestSemaphore { maxParallelRequests } = do sem <- Queue.new traverse_ (const (Queue.write sem unit)) $ 1 .. maxParallelRequests pure sem From 22044b695b6d3e43724c01ed5f255ece33dc4744 Mon Sep 17 00:00:00 2001 From: Dzmitry Shuiski Date: Fri, 7 Nov 2025 17:19:03 +0100 Subject: [PATCH 4/5] Ogmios request semaphore: Switch to BoundedQueue; Add cooldown if empty --- src/Kupmios/KupmiosM.purs | 31 ++++++++++++++++++++----------- src/Kupmios/KupmiosM/Kupo.purs | 4 ++-- src/Kupmios/KupmiosM/Ogmios.purs | 21 +++++++++++++++++---- 3 files changed, 39 insertions(+), 17 deletions(-) diff --git a/src/Kupmios/KupmiosM.purs b/src/Kupmios/KupmiosM.purs index d753ec3..622abf2 100644 --- a/src/Kupmios/KupmiosM.purs +++ b/src/Kupmios/KupmiosM.purs @@ -1,11 +1,12 @@ -- | Kupo+Ogmios query layer monad. -- | This module defines an Aff interface for backend queries. module Cardano.Kupmios.KupmiosM - ( KupmiosM - , KupmiosEnv + ( KupmiosEnv , KupmiosConfig - , ParKupmiosM + , KupmiosM , KupmiosMT(KupmiosMT) + , ParKupmiosM + , Semaphore , handleAffjaxResponse , initOgmiosRequestSemaphore ) where @@ -21,8 +22,8 @@ import Cardano.Provider.Error , ServiceError(ServiceOtherError) ) import Cardano.Provider.ServerConfig (ServerConfig) -import Concurrent.Queue (Queue) -import Concurrent.Queue (new, write) as Queue +import Concurrent.BoundedQueue (BoundedQueue) +import Concurrent.BoundedQueue (new, write) as BoundedQueue import Control.Alt (class Alt) import Control.Alternative (class Alternative) import Control.Monad.Error.Class (class MonadError, class MonadThrow) @@ -38,6 +39,7 @@ import Data.Log.Level (LogLevel) import Data.Log.Message (Message) import Data.Maybe (Maybe, fromMaybe) import Data.Newtype (class Newtype, unwrap, wrap) +import Data.Time.Duration (Milliseconds) import Data.Traversable (traverse_) import Effect.Aff (Aff, ParAff) import Effect.Aff.Class (class MonadAff, liftAff) @@ -52,8 +54,13 @@ import Effect.Exception (Error) -- | - logging level -- | - optional custom logger type KupmiosConfig = - { ogmiosConfig :: ServerConfig - , kupoConfig :: ServerConfig + { ogmios :: + { serverConfig :: ServerConfig + , requestSemaphoreCooldown :: Maybe Milliseconds + } + , kupo :: + { serverConfig :: ServerConfig + } , logLevel :: LogLevel , customLogger :: Maybe (LogLevel -> Message -> Aff Unit) , suppressLogs :: Boolean @@ -62,13 +69,15 @@ type KupmiosConfig = -- | `KupmiosEnv` contains everything needed for `KupmiosM` to run. type KupmiosEnv = { config :: KupmiosConfig - , ogmiosRequestSemaphore :: Maybe (Queue Unit) + , ogmiosRequestSemaphore :: Maybe Semaphore } -initOgmiosRequestSemaphore :: { maxParallelRequests :: Int } -> Aff (Queue Unit) +type Semaphore = BoundedQueue Unit + +initOgmiosRequestSemaphore :: { maxParallelRequests :: Int } -> Aff Semaphore initOgmiosRequestSemaphore { maxParallelRequests } = do - sem <- Queue.new - traverse_ (const (Queue.write sem unit)) $ 1 .. maxParallelRequests + sem <- BoundedQueue.new maxParallelRequests + traverse_ (const (BoundedQueue.write sem unit)) $ 1 .. maxParallelRequests pure sem type KupmiosM = KupmiosMT Aff diff --git a/src/Kupmios/KupmiosM/Kupo.purs b/src/Kupmios/KupmiosM/Kupo.purs index 76fb09d..3d5049d 100644 --- a/src/Kupmios/KupmiosM/Kupo.purs +++ b/src/Kupmios/KupmiosM/Kupo.purs @@ -156,7 +156,7 @@ getScriptByHash scriptHash = do -- https://github.com/Plutonomicon/cardano-transaction-lib/issues/1293 isTxConfirmed :: TransactionHash -> KupmiosM (Either ClientError (Maybe Slot)) isTxConfirmed txHash = do - config <- asks (_.kupoConfig <<< _.config) + config <- asks (_.serverConfig <<< _.kupo <<< _.config) do -- we don't add `?unspent`, because we only care about existence of UTxOs, -- possibly they can be consumed @@ -493,7 +493,7 @@ unwrapKupoAuxData (KupoAuxiliaryData mAuxData) = mAuxData kupoGetRequest :: String -> KupmiosM (Either Affjax.Error (Affjax.Response String)) kupoGetRequest endpoint = do - config <- asks (_.kupoConfig <<< _.config) + config <- asks (_.serverConfig <<< _.kupo <<< _.config) logTrace' $ "sending kupo request: " <> endpoint liftAff $ kupoGetRequestAff config endpoint diff --git a/src/Kupmios/KupmiosM/Ogmios.purs b/src/Kupmios/KupmiosM/Ogmios.purs index 02a69fc..90591f5 100644 --- a/src/Kupmios/KupmiosM/Ogmios.purs +++ b/src/Kupmios/KupmiosM/Ogmios.purs @@ -46,7 +46,7 @@ import Cardano.Provider.ServerConfig (ServerConfig, mkHttpUrl) import Cardano.Types.CborBytes (CborBytes) import Cardano.Types.Chain as Chain import Cardano.Types.TransactionHash (TransactionHash) -import Concurrent.Queue (read, write) as Queue +import Concurrent.BoundedQueue (isEmpty, read, write) as BoundedQueue import Control.Monad.Error.Class (class MonadThrow, throwError) import Control.Monad.Reader.Class (ask) import Data.ByteArray (byteArrayToHex) @@ -160,13 +160,26 @@ ogmiosPostRequest :: Aeson -- ^ JSON-RPC request body -> KupmiosM (Either Affjax.Error (Affjax.Response String)) ogmiosPostRequest body = do - { config: { ogmiosConfig }, ogmiosRequestSemaphore } <- ask + { config: { ogmios }, ogmiosRequestSemaphore } <- ask logTrace' $ "sending ogmios HTTP request: " <> show body - let request = ogmiosPostRequestAff ogmiosConfig body + let request = ogmiosPostRequestAff ogmios.serverConfig body resp <- liftAff case ogmiosRequestSemaphore of Just sem -> - bracket (Queue.read sem) (const (Queue.write sem unit)) $ const request + let + acquireSem = + case ogmios.requestSemaphoreCooldown of + Nothing -> + BoundedQueue.read sem + Just cd -> + BoundedQueue.isEmpty sem >>= + case _ of + false -> + BoundedQueue.read sem + true -> + delay cd *> BoundedQueue.read sem + in + bracket acquireSem (const (BoundedQueue.write sem unit)) $ const request Nothing -> request logTrace' $ "response: " <> (show $ hush resp) From 19e1f1067c91ebfcd3323c32e88676ebf1f49ceb Mon Sep 17 00:00:00 2001 From: Dzmitry Shuiski Date: Mon, 10 Nov 2025 17:11:37 +0100 Subject: [PATCH 5/5] Add a changelog entry --- CHANGELOG.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c229a70..ac776e0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,18 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) and we follow [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +# v3.0.0 + +## Changed + +- `KupmiosConfig` and `KupmiosEnv` types. Ogmios cannot handle a high number of +parallel HTTP requests, so we now limit concurrency using a semaphore managed +within `KupmiosEnv`. The semaphore can be initialized with +`initOgmiosRequestSemaphore`. Additionally, users can configure +`requestSemaphoreCooldown` in `KupmiosConfig` to further throttle requests that +exceed the semaphore limit. Note that this solution is intended as a temporary +workaround, so contributions are more than welcome. + # v2.1.0 ## Changed