diff --git a/CHANGELOG.md b/CHANGELOG.md index c229a70..ac776e0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,18 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) and we follow [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +# v3.0.0 + +## Changed + +- `KupmiosConfig` and `KupmiosEnv` types. Ogmios cannot handle a high number of +parallel HTTP requests, so we now limit concurrency using a semaphore managed +within `KupmiosEnv`. The semaphore can be initialized with +`initOgmiosRequestSemaphore`. Additionally, users can configure +`requestSemaphoreCooldown` in `KupmiosConfig` to further throttle requests that +exceed the semaphore limit. Note that this solution is intended as a temporary +workaround, so contributions are more than welcome. + # v2.1.0 ## Changed diff --git a/spago-packages.nix b/spago-packages.nix index dff13e4..8acd10e 100644 --- a/spago-packages.nix +++ b/spago-packages.nix @@ -185,18 +185,6 @@ let installPhase = "ln -s $src $out"; }; - "cardano-collateral-select" = pkgs.stdenv.mkDerivation { - name = "cardano-collateral-select"; - version = "v1.0.0"; - src = pkgs.fetchgit { - url = "https://github.com/mlabs-haskell/purescript-cardano-collateral-select"; - rev = "193bf49be979b42aa1f0f9cb3d7582d6bc98e3b9"; - sha256 = "1jbl6k779brbqzf7jf80is63b23k3mqzf2mzr222qswd3wg8s5b0"; - }; - phases = "installPhase"; - installPhase = "ln -s $src $out"; - }; - "cardano-data-lite" = pkgs.stdenv.mkDerivation { name = "cardano-data-lite"; version = "070a1a502472211853099c2566a7e9100a7b1a61"; @@ -209,30 +197,6 @@ let installPhase = "ln -s $src $out"; }; - "cardano-key-wallet" = pkgs.stdenv.mkDerivation { - name = "cardano-key-wallet"; - version = "v2.0.0"; - src = pkgs.fetchgit { - url = "https://github.com/mlabs-haskell/purescript-cardano-key-wallet"; - rev = "99d9bb7c8b291ad0bc9709d493ff7e02d14a89c0"; - sha256 = "11jw05s7vpgg6bdyi3zy4z1fcj53a8kaaja5717b7yjgflmhfn8s"; - }; - phases = "installPhase"; - installPhase = "ln -s $src $out"; - }; - - "cardano-message-signing" = pkgs.stdenv.mkDerivation { - name = "cardano-message-signing"; - version = "v1.0.0"; - src = pkgs.fetchgit { - url = "https://github.com/mlabs-haskell/purescript-cardano-message-signing"; - rev = "97f6f97a258ae3490df0be6b39fa6769677aa04f"; - sha256 = "1ns7m9awn4w5amvf9ffldxk7acm73fg8clw4hja4nnl61mskqr5w"; - }; - phases = "installPhase"; - installPhase = "ln -s $src $out"; - }; - "cardano-plutus-data-schema" = pkgs.stdenv.mkDerivation { name = "cardano-plutus-data-schema"; version = "110ba00261b1480895f8412016c639ea655412f6"; @@ -281,6 +245,18 @@ let installPhase = "ln -s $src $out"; }; + "concurrent-queues" = pkgs.stdenv.mkDerivation { + name = "concurrent-queues"; + version = "v3.0.0"; + src = pkgs.fetchgit { + url = "https://github.com/purescript-contrib/purescript-concurrent-queues.git"; + rev = "905a0cb902dec070fa621819455363660de289c4"; + sha256 = "0wr9fkdf23mam4ip7nwdm1i7qy0l1lysyfkffbqj07dvrgpkzd0f"; + }; + phases = "installPhase"; + installPhase = "ln -s $src $out"; + }; + "console" = pkgs.stdenv.mkDerivation { name = "console"; version = "v6.0.0"; diff --git a/spago.dhall b/spago.dhall index a940441..b6e370e 100644 --- a/spago.dhall +++ b/spago.dhall @@ -9,6 +9,7 @@ , "cardano-provider" , "cardano-data-lite" , "cardano-types" + , "concurrent-queues" , "console" , "control" , "datetime" diff --git a/src/Kupmios/KupmiosM.purs b/src/Kupmios/KupmiosM.purs index 1ab6a6a..622abf2 100644 --- a/src/Kupmios/KupmiosM.purs +++ b/src/Kupmios/KupmiosM.purs @@ -1,22 +1,29 @@ -- | Kupo+Ogmios query layer monad. -- | This module defines an Aff interface for backend queries. module Cardano.Kupmios.KupmiosM - ( KupmiosM - , KupmiosEnv + ( KupmiosEnv , KupmiosConfig - , ParKupmiosM + , KupmiosM , KupmiosMT(KupmiosMT) + , ParKupmiosM + , Semaphore , handleAffjaxResponse + , initOgmiosRequestSemaphore ) where import Prelude import Aeson (class DecodeAeson, decodeAeson, parseJsonStringToAeson) import Affjax (Error, Response) as Affjax +import Cardano.Kupmios.Helpers (logWithLevel) +import Cardano.Kupmios.KupmiosM.HttpUtils (handleAffjaxResponseGeneric) import Cardano.Provider.Error ( ClientError(ClientHttpError, ClientHttpResponseError, ClientDecodeJsonError) , ServiceError(ServiceOtherError) ) +import Cardano.Provider.ServerConfig (ServerConfig) +import Concurrent.BoundedQueue (BoundedQueue) +import Concurrent.BoundedQueue (new, write) as BoundedQueue import Control.Alt (class Alt) import Control.Alternative (class Alternative) import Control.Monad.Error.Class (class MonadError, class MonadThrow) @@ -26,15 +33,14 @@ import Control.Monad.Reader.Trans (ReaderT(ReaderT), asks) import Control.Monad.Rec.Class (class MonadRec) import Control.Parallel (class Parallel, parallel, sequential) import Control.Plus (class Plus) -import Cardano.Kupmios.Helpers (logWithLevel) -import Cardano.Kupmios.KupmiosM.HttpUtils (handleAffjaxResponseGeneric) - -import Cardano.Provider.ServerConfig (ServerConfig) +import Data.Array ((..)) import Data.Either (Either) import Data.Log.Level (LogLevel) import Data.Log.Message (Message) import Data.Maybe (Maybe, fromMaybe) import Data.Newtype (class Newtype, unwrap, wrap) +import Data.Time.Duration (Milliseconds) +import Data.Traversable (traverse_) import Effect.Aff (Aff, ParAff) import Effect.Aff.Class (class MonadAff, liftAff) import Effect.Class (class MonadEffect) @@ -48,8 +54,13 @@ import Effect.Exception (Error) -- | - logging level -- | - optional custom logger type KupmiosConfig = - { ogmiosConfig :: ServerConfig - , kupoConfig :: ServerConfig + { ogmios :: + { serverConfig :: ServerConfig + , requestSemaphoreCooldown :: Maybe Milliseconds + } + , kupo :: + { serverConfig :: ServerConfig + } , logLevel :: LogLevel , customLogger :: Maybe (LogLevel -> Message -> Aff Unit) , suppressLogs :: Boolean @@ -58,8 +69,17 @@ type KupmiosConfig = -- | `KupmiosEnv` contains everything needed for `KupmiosM` to run. type KupmiosEnv = { config :: KupmiosConfig + , ogmiosRequestSemaphore :: Maybe Semaphore } +type Semaphore = BoundedQueue Unit + +initOgmiosRequestSemaphore :: { maxParallelRequests :: Int } -> Aff Semaphore +initOgmiosRequestSemaphore { maxParallelRequests } = do + sem <- BoundedQueue.new maxParallelRequests + traverse_ (const (BoundedQueue.write sem unit)) $ 1 .. maxParallelRequests + pure sem + type KupmiosM = KupmiosMT Aff type ParKupmiosM = KupmiosMT ParAff diff --git a/src/Kupmios/KupmiosM/Kupo.purs b/src/Kupmios/KupmiosM/Kupo.purs index 76fb09d..3d5049d 100644 --- a/src/Kupmios/KupmiosM/Kupo.purs +++ b/src/Kupmios/KupmiosM/Kupo.purs @@ -156,7 +156,7 @@ getScriptByHash scriptHash = do -- https://github.com/Plutonomicon/cardano-transaction-lib/issues/1293 isTxConfirmed :: TransactionHash -> KupmiosM (Either ClientError (Maybe Slot)) isTxConfirmed txHash = do - config <- asks (_.kupoConfig <<< _.config) + config <- asks (_.serverConfig <<< _.kupo <<< _.config) do -- we don't add `?unspent`, because we only care about existence of UTxOs, -- possibly they can be consumed @@ -493,7 +493,7 @@ unwrapKupoAuxData (KupoAuxiliaryData mAuxData) = mAuxData kupoGetRequest :: String -> KupmiosM (Either Affjax.Error (Affjax.Response String)) kupoGetRequest endpoint = do - config <- asks (_.kupoConfig <<< _.config) + config <- asks (_.serverConfig <<< _.kupo <<< _.config) logTrace' $ "sending kupo request: " <> endpoint liftAff $ kupoGetRequestAff config endpoint diff --git a/src/Kupmios/KupmiosM/Ogmios.purs b/src/Kupmios/KupmiosM/Ogmios.purs index f6e2585..90591f5 100644 --- a/src/Kupmios/KupmiosM/Ogmios.purs +++ b/src/Kupmios/KupmiosM/Ogmios.purs @@ -46,8 +46,9 @@ import Cardano.Provider.ServerConfig (ServerConfig, mkHttpUrl) import Cardano.Types.CborBytes (CborBytes) import Cardano.Types.Chain as Chain import Cardano.Types.TransactionHash (TransactionHash) +import Concurrent.BoundedQueue (isEmpty, read, write) as BoundedQueue import Control.Monad.Error.Class (class MonadThrow, throwError) -import Control.Monad.Reader.Class (asks) +import Control.Monad.Reader.Class (ask) import Data.ByteArray (byteArrayToHex) import Data.Either (Either(Left), either, hush) import Data.HTTP.Method (Method(POST)) @@ -56,7 +57,7 @@ import Data.Maybe (Maybe(Just, Nothing)) import Data.Newtype (unwrap, wrap) import Data.Time.Duration (Milliseconds(Milliseconds)) import Data.Tuple.Nested (type (/\), (/\)) -import Effect.Aff (Aff, delay) +import Effect.Aff (Aff, bracket, delay) import Effect.Aff.Class (class MonadAff, liftAff) import Effect.Exception (Error, error) @@ -159,11 +160,30 @@ ogmiosPostRequest :: Aeson -- ^ JSON-RPC request body -> KupmiosM (Either Affjax.Error (Affjax.Response String)) ogmiosPostRequest body = do - config <- asks (_.ogmiosConfig <<< _.config) + { config: { ogmios }, ogmiosRequestSemaphore } <- ask logTrace' $ "sending ogmios HTTP request: " <> show body - s <- liftAff $ ogmiosPostRequestAff config body - logTrace' $ "response: " <> (show $ hush $ s) - pure s + let request = ogmiosPostRequestAff ogmios.serverConfig body + resp <- + liftAff case ogmiosRequestSemaphore of + Just sem -> + let + acquireSem = + case ogmios.requestSemaphoreCooldown of + Nothing -> + BoundedQueue.read sem + Just cd -> + BoundedQueue.isEmpty sem >>= + case _ of + false -> + BoundedQueue.read sem + true -> + delay cd *> BoundedQueue.read sem + in + bracket acquireSem (const (BoundedQueue.write sem unit)) $ const request + Nothing -> + request + logTrace' $ "response: " <> (show $ hush resp) + pure resp ogmiosPostRequestAff :: ServerConfig diff --git a/src/Kupmios/Provider.purs b/src/Kupmios/Provider.purs index 95d0321..0a1da1a 100644 --- a/src/Kupmios/Provider.purs +++ b/src/Kupmios/Provider.purs @@ -34,8 +34,7 @@ import Data.Maybe (isJust) import Data.Newtype (unwrap, wrap) import Effect.Aff (Aff) -providerForKupmiosBackend - :: (forall (a :: Type). KupmiosM a -> Aff a) -> Provider +providerForKupmiosBackend :: (forall (a :: Type). KupmiosM a -> Aff a) -> Provider providerForKupmiosBackend runKupmiosM = { getDatumByHash: runKupmiosM <<< Kupo.getDatumByHash , getScriptByHash: runKupmiosM <<< Kupo.getScriptByHash diff --git a/src/KupmiosProvider.purs b/src/KupmiosProvider.purs index 93423f5..510195b 100644 --- a/src/KupmiosProvider.purs +++ b/src/KupmiosProvider.purs @@ -7,6 +7,7 @@ import Cardano.Kupmios.KupmiosM , KupmiosM , KupmiosMT(KupmiosMT) , handleAffjaxResponse + , initOgmiosRequestSemaphore ) as X import Cardano.Kupmios.Kupo ( getDatumByHash