diff options
author | Douglas Wilson <douglas.wilson@gmail.com> | 2022-05-26 17:42:00 +0100 |
---|---|---|
committer | Douglas Wilson <douglas.wilson@gmail.com> | 2022-05-26 17:42:00 +0100 |
commit | e6f353e55f86b5fcc730aa36a1f27c961778829f (patch) | |
tree | 568f6bc6765a504cf2f8ffd0e6691defb119a18f | |
parent | 9f6bd52c6e0c9daedfe051478017be350457b0f7 (diff) | |
download | haskell-wip/dougwilson/jsem2.tar.gz |
tidyingwip/dougwilson/jsem2
-rw-r--r-- | compiler/GHC/Driver/Make.hs | 66 | ||||
-rw-r--r-- | compiler/GHC/Driver/MakeSem.hs | 159 |
2 files changed, 116 insertions, 109 deletions
diff --git a/compiler/GHC/Driver/Make.hs b/compiler/GHC/Driver/Make.hs index e1355adbd3..d9a08dd15b 100644 --- a/compiler/GHC/Driver/Make.hs +++ b/compiler/GHC/Driver/Make.hs @@ -2631,18 +2631,35 @@ runSeqPipelines :: HscEnv -> Maybe Messager -> [MakeAction] -> IO () runSeqPipelines plugin_hsc_env mHscMessager all_pipelines = let env = MakeEnv { hsc_env = plugin_hsc_env , withLogger = \_ k -> k id - , compile_sem = AbstractSem (return ()) (return ()) (return ()) + , compile_sem = AbstractSem (return ()) (return ()) , env_messager = mHscMessager } in runAllPipelines (NumProcessors 1) env all_pipelines -mkAbstractSem :: WorkerLimit -> IO AbstractSem -mkAbstractSem worker_limit = case worker_limit of - NumProcessors n_jobs -> do - compile_sem <- newQSem n_jobs - pure $ AbstractSem (waitQSem compile_sem) (signalQSem compile_sem) (pure ()) - JobServer f -> makeSemaphoreJobserver f + -- TODO remove this capabilities management, it will be handled by the semaphore +runNjobsAbstractSem :: Int -> (AbstractSem -> IO a) -> IO a +runNjobsAbstractSem n_jobs action = do + compile_sem <- newQSem n_jobs + n_capabilities <- getNumCapabilities + n_cpus <- getNumProcessors + let + asem = AbstractSem (waitQSem compile_sem) (signalQSem compile_sem) + set_num_caps n = unless (n_capabilities /= 1) $ setNumCapabilities n + updNumCapabilities = do + -- Setting number of capabilities more than + -- CPU count usually leads to high userspace + -- lock contention. #9221 + set_num_caps $ min n_jobs n_cpus + resetNumCapabilities = set_num_caps n_capabilities + MC.bracket_ updNumCapabilities resetNumCapabilities $ action asem + +runWorkerLimit :: WorkerLimit -> (AbstractSem -> IO a) -> IO a +runWorkerLimit worker_limit action = case worker_limit of + NumProcessors n_jobs -> + runNjobsAbstractSem n_jobs action + JobServer s -> + runPosixSemaphoreAbstractSem s action -- | Build and run a pipeline runParPipelines :: WorkerLimit -- ^ How to limit work parallelism @@ -2667,34 +2684,17 @@ runParPipelines worker_limit plugin_hsc_env mHscMessager all_pipelines = do thread_safe_logger <- liftIO $ makeThreadSafe (hsc_logger plugin_hsc_env) let thread_safe_hsc_env = plugin_hsc_env { hsc_logger = thread_safe_logger } - -- TODO remove this capabilities management, it will be handled by the semaphore - let updNumCapabilities = liftIO $ do - n_capabilities <- getNumCapabilities - n_cpus <- getNumProcessors - -- Setting number of capabilities more than - -- CPU count usually leads to high userspace - -- lock contention. #9221 - let n_caps = case worker_limit of - NumProcessors n_jobs -> min n_jobs n_cpus - JobServer _ -> n_cpus - unless (n_capabilities /= 1) $ setNumCapabilities n_caps - return n_capabilities - - let resetNumCapabilities orig_n = do - liftIO $ setNumCapabilities orig_n - atomically $ writeTVar stopped_var True - wait_log_thread - - abstract_sem <- mkAbstractSem worker_limit - let env = MakeEnv { hsc_env = thread_safe_hsc_env - , withLogger = withParLog log_queue_queue_var - , compile_sem = abstract_sem - , env_messager = mHscMessager - } - -- Reset the number of capabilities once the upsweep ends. - MC.bracket updNumCapabilities resetNumCapabilities $ \_ -> + runWorkerLimit worker_limit $ \abstract_sem -> do + let env = MakeEnv { hsc_env = thread_safe_hsc_env + , withLogger = withParLog log_queue_queue_var + , compile_sem = abstract_sem + , env_messager = mHscMessager + } + -- Reset the number of capabilities once the upsweep ends. runAllPipelines worker_limit env all_pipelines + atomically $ writeTVar stopped_var True + wait_log_thread withLocalTmpFS :: RunMakeM a -> RunMakeM a withLocalTmpFS act = do diff --git a/compiler/GHC/Driver/MakeSem.hs b/compiler/GHC/Driver/MakeSem.hs index 7accde353b..199f2aa714 100644 --- a/compiler/GHC/Driver/MakeSem.hs +++ b/compiler/GHC/Driver/MakeSem.hs @@ -2,7 +2,7 @@ {-# language NamedFieldPuns #-} -- | -module GHC.Driver.MakeSem(AbstractSem(..), withAbstractSem, makeSemaphoreJobserver) where +module GHC.Driver.MakeSem(AbstractSem(..), withAbstractSem, runPosixSemaphoreAbstractSem) where import GHC.Prelude @@ -12,7 +12,7 @@ import qualified Control.Monad.Catch as MC import Data.Foldable import Control.Monad -import Control.Concurrent.QSem +import Control.Concurrent.MVar import Control.Concurrent.STM import Data.Time import GHC.Data.OrdList @@ -24,7 +24,6 @@ import GHC.IO.Exception -- -j1 case or a jobserver data AbstractSem = AbstractSem { acquireSem :: IO () , releaseSem :: IO () - , cleanupSem :: IO () } withAbstractSem :: AbstractSem -> IO b -> IO b @@ -40,7 +39,9 @@ data SemaphoreJobserver } data SemaphoreJobserverLoopState - = SJLSIdle | SJLSAcquiring ThreadId (TVar Bool)| SJLSReleasing ThreadId (TVar Bool) + = SJLSIdle + | SJLSAcquiring ThreadId (TMVar (Maybe MC.SomeException)) + | SJLSReleasing ThreadId (TMVar (Maybe MC.SomeException)) -- TODO pull out each operation that twiddles the SemaphoreJobserver into a named function @@ -87,61 +88,76 @@ modifySjs_ tv = fmap fst . modifySjs tv sjAcquireThread :: TVar SemaphoreJobserver -> IO SemaphoreJobserverLoopState sjAcquireThread tv = do sem <- atomically $ sjSem <$> readTVar tv - tv_b <- newTVarIO False + r_tmv <- newEmptyTMVarIO tid <- forkIOWithUnmask $ \unmask -> do - flip MC.finally (atomically $ writeTVar tv_b True) $ do - r <- MC.try $ unmask $ semWait sem - case r of - -- TODO if this is not ThreadKilled then we need to report this back - Left e | Just ThreadKilled <- MC.fromException e - -> pure () - Right () -> atomically $ modifySjs_ tv $ \sjs -> pure ((), sjs - { sjTokensOwned = sjTokensOwned sjs + 1 - , sjTokensFree = sjTokensFree sjs + 1}) - pure $ SJLSAcquiring tid tv_b + MC.try (unmask $ semPost sem) >>= \x -> atomically $ do + r <- case x of + Left (e :: MC.SomeException) -> pure $ case MC.fromException e of + Just ThreadKilled -> Nothing + _ -> Just e + Right () -> do + modifySjs_ tv $ \sjs -> pure ((), sjs + { sjTokensOwned = sjTokensOwned sjs + 1 + , sjTokensFree = sjTokensFree sjs + 1}) + pure Nothing + putTMVar r_tmv r + + pure $ SJLSAcquiring tid r_tmv sjReleaseThread :: TVar SemaphoreJobserver -> IO SemaphoreJobserverLoopState sjReleaseThread tv = do sem <- atomically $ sjSem <$> readTVar tv - tv_b <- newTVarIO False - tid <- forkIOWithUnmask $ \unmask -> do - flip MC.finally (atomically $ writeTVar tv_b True) $ do - r <- MC.try $ unmask $ semPost sem - case r of - -- TODO if this is not ThreadKilled then we need to report this back - Left (e :: MC.SomeException) - -- | Just ThreadKilled <- MC.fromException e - -> atomically $ modifySjs_ tv $ \sjs -> pure - ((), sjs { sjTokensFree = sjTokensFree sjs + 1 }) - Right () -> atomically $ modifySjs_ tv $ \sjs -> pure ((), sjs { sjTokensOwned = sjTokensOwned sjs - 1 }) - pure $ SJLSReleasing tid tv_b + r_tmv <- newEmptyTMVarIO + MC.mask_ $ do + still_good <- atomically $ modifySjs_ tv $ \sjs -> if sjGuardRelease sjs + then pure (True, sjs { sjTokensFree = sjTokensFree sjs - 1 }) + else pure (False, sjs) + if not still_good + then pure SJLSIdle + else do + tid <- forkIOWithUnmask $ \unmask -> do + x <- MC.try (unmask $ semPost sem) + atomically $ do + r <- case x of + Left (e :: MC.SomeException) -> do + modifySjs_ tv $ \sjs -> pure + ((), sjs { sjTokensFree = sjTokensFree sjs + 1 }) + pure $ case MC.fromException e of + Just ThreadKilled -> Nothing + _ -> Just e + Right () -> do + modifySjs_ tv $ \sjs -> pure ((), sjs { sjTokensOwned = sjTokensOwned sjs - 1 }) + pure Nothing + putTMVar r_tmv r + pure $ SJLSReleasing tid r_tmv sjTryAcquire :: TVar SemaphoreJobserver -> SemaphoreJobserverLoopState -> STM (IO SemaphoreJobserverLoopState) sjTryAcquire tv SJLSIdle = do SemaphoreJobserver - { sjTokensFree } <- readTVar tv - guard $ sjTokensFree == 0 + { sjTokensFree, sjWaiting } <- readTVar tv + guard $ sjTokensFree == 0 && length sjWaiting > 0 pure $ sjAcquireThread tv sjTryAcquire _ _ = retry +sjGuardRelease :: SemaphoreJobserver -> Bool +sjGuardRelease SemaphoreJobserver { sjTokensFree, sjTokensOwned, sjWaiting} = + length sjWaiting == 0 && sjTokensFree > 0 && sjTokensOwned > 1 + sjTryRelease :: TVar SemaphoreJobserver -> SemaphoreJobserverLoopState -> STM (IO SemaphoreJobserverLoopState) sjTryRelease tv SJLSIdle = do - SemaphoreJobserver - { sjTokensFree, sjTokensOwned, sjWaiting - } <- readTVar tv - guard $ length sjWaiting == 0 && sjTokensFree > 0 && sjTokensOwned > 1 + readTVar tv >>= guard . sjGuardRelease pure $ sjReleaseThread tv sjTryRelease _ _ = retry sjTryNoticeIdle :: TVar SemaphoreJobserver -> SemaphoreJobserverLoopState -> STM (IO SemaphoreJobserverLoopState) sjTryNoticeIdle tv ls = case ls of - SJLSAcquiring _ tv_b -> sync_num_caps tv_b - SJLSReleasing _ tv_b -> sync_num_caps tv_b + SJLSAcquiring _ tmv -> sync_num_caps tmv + SJLSReleasing _ tmv -> sync_num_caps tmv _ -> retry where - sync_num_caps tv_b = do - readTVar tv_b >>= guard + sync_num_caps tmv = do + takeTMVar tmv >>= maybe (pure ()) MC.throwM SemaphoreJobserver { sjTokensOwned } <- readTVar tv @@ -167,54 +183,45 @@ sjTryStopThread tv ls = case ls of semaphoreJobserverLoop :: TVar SemaphoreJobserver -> IO () semaphoreJobserverLoop tv = loop SJLSIdle where loop s = do - action <- atomically $ asum $ (\x -> x tv s) <$> [sjTryRelease, sjTryAcquire, sjTryNoticeIdle, sjTryStopThread ] - -- sjs <- readTVar tv >>= normaliseSemaphoreJobserver - -- let - -- kill_thread_and_return_idle tid = killThread tid $> SJLSIdle - -- -- TODO we can make this much nicer by 'asum'ing several STM ops together - -- -- TODO the returned action also needs to call setNumCapabilities - -- -- TODO rate limiting via registerDelay - -- case (sjTokensOwned sjs, sjTokensFree sjs, sjWaiting sjs, s) of - -- (_, 0, NilOL, SJLSIdle) -> retry - -- (_, 0, NilOL, SJLSAcquiring tid _) -> - -- pure $ kill_thread_and_return_idle tid - -- (_, 0, NilOL, SJLSReleasing tid _) -> - -- pure $ kill_thread_and_return_idle tid - -- (num_owned, x, NilOL, SJLSIdle) - -- | x > 0, num_owned > 1 -> do - -- modifySjs_ tv $ \sjs0 -> let - -- sjs = sjs0 { sjTokensFree = sjTokensFree sjs0 - 1 } - -- in pure (sjReleaseThread tv, sjs) - -- (_, x, NilOL, SJLSAcquiring tid _) - -- | x > 0 -> pure $ kill_thread_and_return_idle tid - -- (_, x, NilOL, SJLSReleasing _ tv_b) - -- | x > 0 -> do - -- readTVar tv_b >>= guard - -- pure $ pure SJLSIdle - -- (_, 0, _ `ConsOL` _, SJLSIdle) -> pure (sjAcquireThread tv) - -- (_, 0, _ `ConsOL` _, SJLSAcquiring _ tv_b) -> do - -- readTVar tv_b >>= guard - -- pure $ pure SJLSIdle - -- (_, 0, _ `ConsOL` _, SJLSReleasing tid _) -> - -- pure $ kill_thread_and_return_idle tid - -- _ -> panic "semaphoreJobserverLoop" + action <- atomically $ asum $ (\x -> x tv s) <$> + [ sjTryRelease + , sjTryAcquire + , sjTryNoticeIdle + , sjTryStopThread + ] action >>= loop -makeSemaphoreJobserver :: FilePath -> IO AbstractSem +makeSemaphoreJobserver :: FilePath -> IO (AbstractSem, IO ()) makeSemaphoreJobserver sem_path = do sjSem <- semOpen sem_path (OpenSemFlags { semCreate = False, semExclusive = False }) stdFileMode 0 let init_sjs = SemaphoreJobserver { sjSem, sjTokensOwned = 1, sjTokensFree = 1, sjWaiting = NilOL } sjs_tv <- newTVarIO init_sjs - loop_tid <- forkIO $ semaphoreJobserverLoop sjs_tv + loop_res_mv <- newEmptyMVar + loop_tid <- forkIOWithUnmask $ \unmask -> do + r <- try $ unmask $ semaphoreJobserverLoop sjs_tv + putMVar loop_res_mv $ case r of + Left e + | Just ThreadKilled <- fromException e -> Nothing + | otherwise -> Just e + Right () -> Nothing let acquireSem = acquireSemaphoreJobserver sjs_tv releaseSem = releaseSemaphoreJobserver sjs_tv - cleanupSem = killThread loop_tid - pure AbstractSem{..} - - - - + cleanupSem = do + -- this is interruptible + killThread loop_tid + takeMVar loop_res_mv >>= maybe (pure ()) MC.throwM + pure (AbstractSem{..}, cleanupSem) + +runPosixSemaphoreAbstractSem :: FilePath -> (AbstractSem -> IO a) -> IO a +runPosixSemaphoreAbstractSem s action = MC.mask $ \unmask -> do + (abs, cleanup) <- makeSemaphoreJobserver s + r <- try $ unmask $ action abs + case r of + Left (e1 :: MC.SomeException) -> do + (_ :: Either MC.SomeException ()) <- MC.try cleanup + MC.throwM e1 + Right x -> cleanup $> x |