@@ -722,12 +722,7 @@ newForkerAtTarget ::
722722 ResourceRegistry m ->
723723 Target (Point blk ) ->
724724 m (Either GetForkerError (Forker m l blk ))
725- newForkerAtTarget h rr pt = getEnv h $ \ ldbEnv ->
726- withReadLock
727- (ldbLock ldbEnv)
728- ( acquireAtTarget ldbEnv (Right pt)
729- >>= traverse (newForker h ldbEnv rr)
730- )
725+ newForkerAtTarget h rr pt = withTransferrableReadAccess h rr (Right pt)
731726
732727newForkerByRollback ::
733728 ( HeaderHash l ~ HeaderHash blk
@@ -742,8 +737,41 @@ newForkerByRollback ::
742737 -- | How many blocks to rollback from the tip
743738 Word64 ->
744739 m (Either GetForkerError (Forker m l blk ))
745- newForkerByRollback h rr n = getEnv h $ \ ldbEnv -> do
746- withReadLock (ldbLock ldbEnv) (acquireAtTarget ldbEnv (Left n) >>= traverse (newForker h ldbEnv rr))
740+ newForkerByRollback h rr n = withTransferrableReadAccess h rr (Left n)
741+
742+ -- | Acquire read access and then allocate a forker, acquiring it at the given
743+ -- point or rollback.
744+ withTransferrableReadAccess ::
745+ ( HeaderHash l ~ HeaderHash blk
746+ , IOLike m
747+ , IsLedger l
748+ , StandardHash l
749+ , HasLedgerTables l
750+ , LedgerSupportsProtocol blk
751+ ) =>
752+ LedgerDBHandle m l blk ->
753+ ResourceRegistry m ->
754+ Either Word64 (Target (Point blk )) ->
755+ m (Either GetForkerError (Forker m l blk ))
756+ withTransferrableReadAccess h rr f = getEnv h $ \ ldbEnv -> do
757+ -- This TVar will be used to maybe release the read lock by the resource
758+ -- registry. Once the forker was opened it will be emptied.
759+ tv <- newTVarIO (pure () )
760+ (rk, _) <-
761+ allocate
762+ rr
763+ ( \ _ -> atomically $ do
764+ -- Populate the tvar with the releasing action. Creating the forker will empty this
765+ writeTVar tv (atomically $ unsafeReleaseReadAccess (ldbLock ldbEnv))
766+ -- Acquire the read access
767+ unsafeAcquireReadAccess (ldbLock ldbEnv)
768+ )
769+ ( \ _ ->
770+ -- Run the contents of the releasing TVar which will be `pure ()` if
771+ -- the forker was opened.
772+ join $ readTVarIO tv
773+ )
774+ unsafeRunReadLocked (acquireAtTarget ldbEnv f >>= traverse (newForker h ldbEnv (rk, tv) rr))
747775
748776-- | Acquire both a value handle and a db changelog at the tip. Holds a read lock
749777-- while doing so.
@@ -801,6 +829,7 @@ acquireAtTarget ldbEnv target = readLocked $ runExceptT $ do
801829-------------------------------------------------------------------------------}
802830
803831newForker ::
832+ forall m l blk .
804833 ( IOLike m
805834 , HasLedgerTables l
806835 , LedgerSupportsProtocol blk
@@ -810,33 +839,42 @@ newForker ::
810839 ) =>
811840 LedgerDBHandle m l blk ->
812841 LedgerDBEnv m l blk ->
842+ (ResourceKey m , StrictTVar m (m () )) ->
813843 ResourceRegistry m ->
814844 DbChangelog l ->
815845 ReadLocked m (Forker m l blk )
816- newForker h ldbEnv rr dblog = readLocked $ do
817- dblogVar <- newTVarIO dblog
818- forkerKey <- atomically $ stateTVar (ldbNextForkerKey ldbEnv) $ \ r -> (r, r + 1 )
819- forkerMVar <- newMVar $ Left (ldbLock ldbEnv, ldbBackingStore ldbEnv, rr)
820- let forkerEnv =
821- ForkerEnv
822- { foeBackingStoreValueHandle = forkerMVar
823- , foeChangelog = dblogVar
824- , foeSwitchVar = ldbChangelog ldbEnv
825- , foeTracer =
826- LedgerDBForkerEvent . TraceForkerEventWithKey forkerKey >$< ldbTracer ldbEnv
827- }
828- atomically $ do
829- -- We need to make sure to release this read access when we drop the value
830- -- handle, so in 'closeForkerEnv' (if it wasn't promoted) or in
831- -- 'getValueHandle' (if it was promoted).
832- unsafeAcquireReadAccess (ldbLock ldbEnv)
833-
834- -- Note that we add the forkerEnv to the 'ldbForkers' so that an exception
835- -- which will close all the forkers, also closes this one, releasing the
836- -- read access we acquired above.
837- modifyTVar (ldbForkers ldbEnv) $ Map. insert forkerKey forkerEnv
838- traceWith (foeTracer forkerEnv) ForkerOpen
839- pure $ mkForker h (ldbQueryBatchSize ldbEnv) forkerKey forkerEnv
846+ newForker h ldbEnv (rk, releaseVar) rr dblog =
847+ readLocked $ do
848+ (rk', frk) <-
849+ allocate
850+ rr
851+ ( \ _ -> do
852+ dblogVar <- newTVarIO dblog
853+ forkerKey <- atomically $ stateTVar (ldbNextForkerKey ldbEnv) $ \ r -> (r, r + 1 )
854+ forkerMVar <- newMVar $ Left (ldbLock ldbEnv, ldbBackingStore ldbEnv, rr)
855+ let forkerEnv =
856+ ForkerEnv
857+ { foeBackingStoreValueHandle = forkerMVar
858+ , foeChangelog = dblogVar
859+ , foeSwitchVar = ldbChangelog ldbEnv
860+ , foeTracer =
861+ LedgerDBForkerEvent . TraceForkerEventWithKey forkerKey >$< ldbTracer ldbEnv
862+ }
863+ atomically $ do
864+ -- Note that we add the forkerEnv to the 'ldbForkers' so that an exception
865+ -- which will close all the forkers, also closes this one, releasing the
866+ -- read access we acquired above.
867+ modifyTVar (ldbForkers ldbEnv) $ Map. insert forkerKey forkerEnv
868+ -- Empty the tvar created for allocating the unsafe read access,
869+ -- so that it is the forker the one that takes care of releasing
870+ -- it.
871+ writeTVar releaseVar (pure () )
872+ void $ release rk
873+ traceWith (foeTracer forkerEnv) ForkerOpen
874+ pure $ (mkForker h (ldbQueryBatchSize ldbEnv) forkerKey forkerEnv)
875+ )
876+ forkerClose
877+ pure $ frk{forkerClose = void $ release rk'}
840878
841879mkForker ::
842880 ( IOLike m
0 commit comments