22
33module Cardano.DbSync.Ledger.Async where
44
5+ import Cardano.DbSync.Types
6+ import Data.Set (Set )
7+ import Data.Map (Map )
58import Cardano.DbSync.Ledger.Types
69import Cardano.Ledger.BaseTypes (EpochNo )
710import Cardano.Ledger.Crypto (StandardCrypto )
811import qualified Cardano.Ledger.EpochBoundary as Ledger
912import Control.Concurrent.Class.MonadSTM.Strict
1013import qualified Control.Concurrent.STM.TBQueue as TBQ
14+ import qualified Cardano.Ledger.Rewards as Ledger
15+
16+ --------------------------------------------------------------------------------
17+ -- EpochStake
18+ --------------------------------------------------------------------------------
1119
1220newEpochStakeChannels :: IO EpochStakeChannels
1321newEpochStakeChannels =
@@ -18,9 +26,9 @@ newEpochStakeChannels =
1826 <*> newTVarIO Nothing
1927
2028-- To be used by the main thread
21- ensureEpochDone :: EpochStakeChannels -> EpochNo -> Ledger. SnapShot StandardCrypto -> IO ()
22- ensureEpochDone sQueue epoch snapshot = atomically $ do
23- mLastEpochDone <- waitFinished sQueue
29+ ensureStakeDone :: EpochStakeChannels -> EpochNo -> Ledger. SnapShot StandardCrypto -> IO ()
30+ ensureStakeDone sQueue epoch snapshot = atomically $ do
31+ mLastEpochDone <- waitStakeFinished sQueue
2432 case mLastEpochDone of
2533 Just lastEpochDone | lastEpochDone == epoch -> pure ()
2634 _ -> do
@@ -29,8 +37,8 @@ ensureEpochDone sQueue epoch snapshot = atomically $ do
2937 retry
3038
3139-- To be used by the main thread
32- waitFinished :: EpochStakeChannels -> STM IO (Maybe EpochNo )
33- waitFinished sQueue = do
40+ waitStakeFinished :: EpochStakeChannels -> STM IO (Maybe EpochNo )
41+ waitStakeFinished sQueue = do
3442 stakeThreadState <- readTVar (epochResult sQueue)
3543 case stakeThreadState of
3644 Just (lastEpoch, Done ) -> pure $ Just lastEpoch -- Normal case
@@ -42,3 +50,42 @@ writeEpochStakeAction :: EpochStakeChannels -> EpochNo -> Ledger.SnapShot Standa
4250writeEpochStakeAction sQueue epoch snapShot checkFirst = do
4351 TBQ. writeTBQueue (estakeQueue sQueue) $ EpochStakeDBAction epoch snapShot checkFirst
4452 writeTVar (epochResult sQueue) $ Just (epoch, Running )
53+
54+
55+ --------------------------------------------------------------------------------
56+ -- Rewards
57+ --------------------------------------------------------------------------------
58+
59+ newRewardsChannels :: IO RewardsChannels
60+ newRewardsChannels =
61+ -- This may never be more than 1. But let's keep it a queue for extensibility shake.
62+ -- This may allow us to parallelize the events workload even further
63+ RewardsChannels
64+ <$> TBQ. newTBQueueIO 1
65+ <*> newTVarIO Nothing
66+
67+ -- To be used by the main thread
68+ ensureRewardsDone :: RewardsChannels -> EpochNo -> EpochNo -> Map StakeCred (Set (Ledger. Reward StandardCrypto )) -> IO ()
69+ ensureRewardsDone sQueue epoch epoch' mp = atomically $ do
70+ mLastEpochDone <- waitRewardsFinished sQueue
71+ case mLastEpochDone of
72+ Just lastEpochDone | lastEpochDone == epoch -> pure ()
73+ _ -> do
74+ -- If last is not already there, put it to list and wait again
75+ writeRewardsAction sQueue epoch epoch' mp True
76+ retry
77+
78+ -- To be used by the main thread
79+ waitRewardsFinished :: RewardsChannels -> STM IO (Maybe EpochNo )
80+ waitRewardsFinished sQueue = do
81+ rewardsThreadState <- readTVar (rewardsResult sQueue)
82+ case rewardsThreadState of
83+ Just (lastEpoch, Done ) -> pure $ Just lastEpoch -- Normal case
84+ Just (_, Running ) -> retry -- Wait to finish current work.
85+ Nothing -> pure Nothing -- This will happen after a restart
86+
87+ -- To be used by the main thread
88+ writeRewardsAction :: RewardsChannels -> EpochNo -> EpochNo -> Map StakeCred (Set (Ledger. Reward StandardCrypto )) -> Bool -> STM IO ()
89+ writeRewardsAction sQueue epoch epoch' mp checkFirst = do
90+ TBQ. writeTBQueue (rQueue sQueue) $ RewardsDBAction epoch epoch' mp checkFirst
91+ writeTVar (rewardsResult sQueue) $ Just (epoch, Running )
0 commit comments