88{-# LANGUAGE RecordWildCards #-}
99{-# LANGUAGE TypeFamilies #-}
1010
11- module Development.IDE.Graph.Internal.Database (compute , newDatabase , incDatabase , build , getDirtySet , getKeysAndVisitAge ) where
11+ module Development.IDE.Graph.Internal.Database (compute , newDatabase , incDatabase , build , getDirtySet , getKeysAndVisitAge , AsyncParentKill ( .. ) ) where
1212
1313import Prelude hiding (unzip )
1414
1515import Control.Concurrent.Async
1616import Control.Concurrent.Extra
17- import Control.Concurrent.STM.Stats (STM , atomically ,
17+ import Control.Concurrent.STM.Stats (STM , TVar , atomically ,
1818 atomicallyNamed ,
1919 modifyTVar' , newTVarIO ,
20- readTVarIO )
20+ readTVar , readTVarIO ,
21+ retry )
2122import Control.Exception
2223import Control.Monad
2324import Control.Monad.IO.Class (MonadIO (liftIO ))
2425import Control.Monad.Trans.Class (lift )
2526import Control.Monad.Trans.Reader
2627import qualified Control.Monad.Trans.State.Strict as State
2728import Data.Dynamic
28- import Data.Either
2929import Data.Foldable (for_ , traverse_ )
3030import Data.IORef.Extra
3131import Data.Maybe
@@ -39,11 +39,12 @@ import Development.IDE.Graph.Internal.Types
3939import qualified Focus
4040import qualified ListT
4141import qualified StmContainers.Map as SMap
42- import System.IO.Unsafe
4342import System.Time.Extra (duration , sleep )
4443
4544#if MIN_VERSION_base(4,19,0)
4645import Data.Functor (unzip )
46+ import UnliftIO (MonadUnliftIO (withRunInIO ))
47+ import qualified UnliftIO.Exception as UE
4748#else
4849import Data.List.NonEmpty (unzip )
4950#endif
@@ -67,18 +68,22 @@ incDatabase db (Just kk) = do
6768 -- since we assume that no build is mutating the db.
6869 -- Therefore run one transaction per key to minimise contention.
6970 atomicallyNamed " incDatabase" $ SMap. focus updateDirty k (databaseValues db)
71+ -- let list = SMap.listT (databaseValues db)
72+ -- atomicallyNamed "incDatabase - all " $ flip ListT.traverse_ list $ \(k,_) ->
73+ -- SMap.focus dirtyRunningKey k (databaseValues db)
7074
7175-- all keys are dirty
7276incDatabase db Nothing = do
7377 atomically $ modifyTVar' (databaseStep db) $ \ (Step i) -> Step $ i + 1
7478 let list = SMap. listT (databaseValues db)
79+ -- all running keys are also dirty
7580 atomicallyNamed " incDatabase - all " $ flip ListT. traverse_ list $ \ (k,_) ->
7681 SMap. focus updateDirty k (databaseValues db)
7782
7883updateDirty :: Monad m => Focus. Focus KeyDetails m ()
7984updateDirty = Focus. adjust $ \ (KeyDetails status rdeps) ->
8085 let status'
81- | Running _ _ _ x <- status = Dirty x
86+ | Running _ x <- status = Dirty x
8287 | Clean x <- status = Dirty (Just x)
8388 | otherwise = status
8489 in KeyDetails status' rdeps
@@ -88,58 +93,57 @@ build
8893 => Database -> Stack -> f key -> IO (f Key , f value )
8994-- build _ st k | traceShow ("build", st, k) False = undefined
9095build db stack keys = do
91- built <- runAIO $ do
92- built <- builder db stack (fmap newKey keys)
93- case built of
94- Left clean -> return clean
95- Right dirty -> liftIO dirty
96- let (ids, vs) = unzip built
97- pure (ids, fmap (asV . resultValue) vs)
96+ step <- readTVarIO $ databaseStep db
97+ go `catch` \ e@ (AsyncParentKill i s) -> do
98+ if s == step
99+ then throw e
100+ else throw $ AsyncParentKill i $ Step (- 1 )
98101 where
99- asV :: Value -> value
100- asV (Value x) = unwrapDynamic x
102+ go = do
103+ step <- readTVarIO $ databaseStep db
104+ ! built <- runAIO step $ builder db stack (fmap newKey keys)
105+ let (ids, vs) = unzip built
106+ pure (ids, fmap (asV . resultValue) vs)
107+ where
108+ asV :: Value -> value
109+ asV (Value x) = unwrapDynamic x
110+
101111
102112-- | Build a list of keys and return their results.
103113-- If none of the keys are dirty, we can return the results immediately.
104114-- Otherwise, a blocking computation is returned *which must be evaluated asynchronously* to avoid deadlock.
105- builder
106- :: Traversable f => Database -> Stack -> f Key -> AIO (Either (f (Key , Result )) (IO (f (Key , Result ))))
115+ builder :: (Traversable f ) => Database -> Stack -> f Key -> AIO (f (Key , Result ))
107116-- builder _ st kk | traceShow ("builder", st,kk) False = undefined
108- builder db@ Database {.. } stack keys = withRunInIO $ \ (RunInIO run) -> do
109- -- Things that I need to force before my results are ready
110- toForce <- liftIO $ newTVarIO []
111- current <- liftIO $ readTVarIO databaseStep
112- results <- liftIO $ for keys $ \ id ->
113- -- Updating the status of all the dependencies atomically is not necessary.
114- -- Therefore, run one transaction per dep. to avoid contention
115- atomicallyNamed " builder" $ do
116- -- Spawn the id if needed
117- status <- SMap. lookup id databaseValues
118- val <- case viewDirty current $ maybe (Dirty Nothing ) keyStatus status of
119- Clean r -> pure r
120- Running _ force val _
121- | memberStack id stack -> throw $ StackException stack
122- | otherwise -> do
123- modifyTVar' toForce (Wait force : )
124- pure val
125- Dirty s -> do
126- let act = run (refresh db stack id s)
127- (force, val) = splitIO (join act)
128- SMap. focus (updateStatus $ Running current force val s) id databaseValues
129- modifyTVar' toForce (Spawn force: )
130- pure val
131-
132- pure (id , val)
133-
134- toForceList <- liftIO $ readTVarIO toForce
135- let waitAll = run $ waitConcurrently_ toForceList
136- case toForceList of
137- [] -> return $ Left results
138- _ -> return $ Right $ do
139- waitAll
140- pure results
141-
142-
117+ builder db stack keys = do
118+ keyWaits <- for keys $ \ k -> builderOne db stack k
119+ ! res <- for keyWaits $ \ (k, waitR) -> do
120+ ! v<- liftIO waitR
121+ return (k, v)
122+ return res
123+
124+ builderOne :: Database -> Stack -> Key -> AIO (Key , IO Result )
125+ builderOne db@ Database {.. } stack id = UE. mask_ $ do
126+ current <- liftIO $ readTVarIO databaseStep
127+ (k, registerWaitResult) <- liftIO $ atomicallyNamed " builder" $ do
128+ -- Spawn the id if needed
129+ status <- SMap. lookup id databaseValues
130+ val <- case viewDirty current $ maybe (Dirty Nothing ) keyStatus status of
131+ Dirty s -> do
132+ let act =
133+ asyncWithCleanUp
134+ ( refresh db stack id s
135+ `UE.onException` liftIO (atomicallyNamed " builder - onException" (SMap. focus updateDirty id databaseValues))
136+ )
137+ SMap. focus (updateStatus $ Running current s) id databaseValues
138+ return act
139+ Clean r -> pure . pure . pure $ r
140+ -- force here might contains async exceptions from previous runs
141+ Running _step _s
142+ | memberStack id stack -> throw $ StackException stack
143+ | otherwise -> retry
144+ pure (id , val)
145+ waitR <- registerWaitResult
146+ return (k, waitR)
143147-- | isDirty
144148-- only dirty when it's build time is older than the changed time of one of its dependencies
145149isDirty :: Foldable t => Result -> t (a , Result ) -> Bool
@@ -155,41 +159,35 @@ isDirty me = any (\(_,dep) -> resultBuilt me < resultChanged dep)
155159refreshDeps :: KeySet -> Database -> Stack -> Key -> Result -> [KeySet ] -> AIO Result
156160refreshDeps visited db stack key result = \ case
157161 -- no more deps to refresh
158- [] -> liftIO $ compute db stack key RunDependenciesSame (Just result)
162+ [] -> compute db stack key RunDependenciesSame (Just result)
159163 (dep: deps) -> do
160164 let newVisited = dep <> visited
161165 res <- builder db stack (toListKeySet (dep `differenceKeySet` visited))
162- case res of
163- Left res -> if isDirty result res
166+ if isDirty result res
164167 -- restart the computation if any of the deps are dirty
165- then liftIO $ compute db stack key RunDependenciesChanged (Just result)
168+ then compute db stack key RunDependenciesChanged (Just result)
166169 -- else kick the rest of the deps
167170 else refreshDeps newVisited db stack key result deps
168- Right iores -> do
169- res <- liftIO iores
170- if isDirty result res
171- then liftIO $ compute db stack key RunDependenciesChanged (Just result)
172- else refreshDeps newVisited db stack key result deps
173-
174- -- | Refresh a key:
175- refresh :: Database -> Stack -> Key -> Maybe Result -> AIO (IO Result )
171+
172+
173+ -- refresh :: Database -> Stack -> Key -> Maybe Result -> IO Result
176174-- refresh _ st k _ | traceShow ("refresh", st, k) False = undefined
175+ refresh :: Database -> Stack -> Key -> Maybe Result -> AIO Result
177176refresh db stack key result = case (addStack key stack, result) of
178177 (Left e, _) -> throw e
179- (Right stack, Just me@ Result {resultDeps = ResultDeps deps}) -> asyncWithCleanUp $ refreshDeps mempty db stack key me (reverse deps)
180- (Right stack, _) ->
181- asyncWithCleanUp $ liftIO $ compute db stack key RunDependenciesChanged result
178+ (Right stack, Just me@ Result {resultDeps = ResultDeps deps}) -> refreshDeps mempty db stack key me (reverse deps)
179+ (Right stack, _) -> compute db stack key RunDependenciesChanged result
182180
183181-- | Compute a key.
184- compute :: Database -> Stack -> Key -> RunMode -> Maybe Result -> IO Result
182+ compute :: Database -> Stack -> Key -> RunMode -> Maybe Result -> AIO Result
185183-- compute _ st k _ _ | traceShow ("compute", st, k) False = undefined
186184compute db@ Database {.. } stack key mode result = do
187185 let act = runRule databaseRules key (fmap resultData result) mode
188- deps <- newIORef UnknownDeps
186+ deps <- liftIO $ newIORef UnknownDeps
189187 (execution, RunResult {.. }) <-
190- duration $ runReaderT (fromAction act) $ SAction db deps stack
191- curStep <- readTVarIO databaseStep
192- deps <- readIORef deps
188+ liftIO $ duration $ runReaderT (fromAction act) $ SAction db deps stack
189+ curStep <- liftIO $ readTVarIO databaseStep
190+ deps <- liftIO $ readIORef deps
193191 let lastChanged = maybe curStep resultChanged result
194192 let lastBuild = maybe curStep resultBuilt result
195193 -- changed time is always older than or equal to build time
@@ -212,12 +210,12 @@ compute db@Database{..} stack key mode result = do
212210 -- If an async exception strikes before the deps have been recorded,
213211 -- we won't be able to accurately propagate dirtiness for this key
214212 -- on the next build.
215- void $
213+ liftIO $ void $
216214 updateReverseDeps key db
217215 (getResultDepsDefault mempty previousDeps)
218216 deps
219217 _ -> pure ()
220- atomicallyNamed " compute and run hook" $ do
218+ liftIO $ atomicallyNamed " compute and run hook" $ do
221219 runHook
222220 SMap. focus (updateStatus $ Clean res) key databaseValues
223221 pure res
@@ -247,18 +245,6 @@ getKeysAndVisitAge db = do
247245 getAge Result {resultVisited = Step s} = curr - s
248246 return keysWithVisitAge
249247--------------------------------------------------------------------------------
250- -- Lazy IO trick
251-
252- data Box a = Box { fromBox :: a }
253-
254- -- | Split an IO computation into an unsafe lazy value and a forcing computation
255- splitIO :: IO a -> (IO () , a )
256- splitIO act = do
257- let act2 = Box <$> act
258- let res = unsafePerformIO act2
259- (void $ evaluate res, fromBox res)
260-
261- --------------------------------------------------------------------------------
262248-- Reverse dependencies
263249
264250-- | Update the reverse dependencies of an Id
@@ -301,14 +287,29 @@ transitiveDirtySet database = flip State.execStateT mempty . traverse_ loop
301287
302288-- | A simple monad to implement cancellation on top of 'Async',
303289-- generalizing 'withAsync' to monadic scopes.
304- newtype AIO a = AIO { unAIO :: ReaderT (IORef [Async () ]) IO a }
290+ newtype AIO a = AIO { unAIO :: ReaderT (TVar [Async () ]) IO a }
305291 deriving newtype (Applicative , Functor , Monad , MonadIO )
306292
293+ data AsyncParentKill = AsyncParentKill ThreadId Step
294+ deriving (Show , Eq )
295+
296+ instance Exception AsyncParentKill where
297+ toException = asyncExceptionToException
298+ fromException = asyncExceptionFromException
299+
307300-- | Run the monadic computation, cancelling all the spawned asyncs if an exception arises
308- runAIO :: AIO a -> IO a
309- runAIO (AIO act) = do
310- asyncs <- newIORef []
311- runReaderT act asyncs `onException` cleanupAsync asyncs
301+ runAIO :: Step -> AIO a -> IO a
302+ runAIO s (AIO act) = do
303+ asyncsRef <- newTVarIO []
304+ -- Log the exact exception (including async exceptions) before cleanup,
305+ -- then rethrow to preserve previous semantics.
306+ runReaderT act asyncsRef `onException` do
307+ asyncs <- atomically $ do
308+ r <- readTVar asyncsRef
309+ modifyTVar' asyncsRef $ const []
310+ return r
311+ tid <- myThreadId
312+ cleanupAsync asyncs tid s
312313
313314-- | Like 'async' but with built-in cancellation.
314315-- Returns an IO action to wait on the result.
@@ -319,27 +320,25 @@ asyncWithCleanUp act = do
319320 -- mask to make sure we keep track of the spawned async
320321 liftIO $ uninterruptibleMask $ \ restore -> do
321322 a <- async $ restore io
322- atomicModifyIORef'_ st (void a : )
323+ atomically $ modifyTVar' st (void a : )
323324 return $ wait a
324325
325326unliftAIO :: AIO a -> AIO (IO a )
326327unliftAIO act = do
327328 st <- AIO ask
328329 return $ runReaderT (unAIO act) st
329330
330- newtype RunInIO = RunInIO (forall a . AIO a -> IO a )
331+ instance MonadUnliftIO AIO where
332+ withRunInIO k = do
333+ st <- AIO ask
334+ liftIO $ k (\ aio -> runReaderT (unAIO aio) st)
331335
332- withRunInIO :: (RunInIO -> AIO b ) -> AIO b
333- withRunInIO k = do
334- st <- AIO ask
335- k $ RunInIO (\ aio -> runReaderT (unAIO aio) st)
336-
337- cleanupAsync :: IORef [Async a ] -> IO ()
336+ cleanupAsync :: [Async a ] -> ThreadId -> Step -> IO ()
338337-- mask to make sure we interrupt all the asyncs
339- cleanupAsync ref = uninterruptibleMask $ \ unmask -> do
340- asyncs <- atomicModifyIORef' ref ([] ,)
338+ cleanupAsync asyncs tid step = uninterruptibleMask $ \ unmask -> do
341339 -- interrupt all the asyncs without waiting
342- mapM_ (\ a -> throwTo (asyncThreadId a) AsyncCancelled ) asyncs
340+ -- mapM_ (\a -> throwTo (asyncThreadId a) AsyncCancelled) asyncs
341+ mapM_ (\ a -> throwTo (asyncThreadId a) $ AsyncParentKill tid step) asyncs
343342 -- Wait until all the asyncs are done
344343 -- But if it takes more than 10 seconds, log to stderr
345344 unless (null asyncs) $ do
@@ -348,32 +347,3 @@ cleanupAsync ref = uninterruptibleMask $ \unmask -> do
348347 traceM " cleanupAsync: waiting for asyncs to finish"
349348 withAsync warnIfTakingTooLong $ \ _ ->
350349 mapM_ waitCatch asyncs
351-
352- data Wait
353- = Wait { justWait :: ! (IO () )}
354- | Spawn { justWait :: ! (IO () )}
355-
356- fmapWait :: (IO () -> IO () ) -> Wait -> Wait
357- fmapWait f (Wait io) = Wait (f io)
358- fmapWait f (Spawn io) = Spawn (f io)
359-
360- waitOrSpawn :: Wait -> IO (Either (IO () ) (Async () ))
361- waitOrSpawn (Wait io) = pure $ Left io
362- waitOrSpawn (Spawn io) = Right <$> async io
363-
364- waitConcurrently_ :: [Wait ] -> AIO ()
365- waitConcurrently_ [] = pure ()
366- waitConcurrently_ [one] = liftIO $ justWait one
367- waitConcurrently_ many = do
368- ref <- AIO ask
369- -- spawn the async computations.
370- -- mask to make sure we keep track of all the asyncs.
371- (asyncs, syncs) <- liftIO $ uninterruptibleMask $ \ unmask -> do
372- waits <- liftIO $ traverse (waitOrSpawn . fmapWait unmask) many
373- let (syncs, asyncs) = partitionEithers waits
374- liftIO $ atomicModifyIORef'_ ref (asyncs ++ )
375- return (asyncs, syncs)
376- -- work on the sync computations
377- liftIO $ sequence_ syncs
378- -- wait for the async computations before returning
379- liftIO $ traverse_ wait asyncs
0 commit comments