summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDouglas Wilson <douglas.wilson@gmail.com>2022-05-26 17:42:00 +0100
committerDouglas Wilson <douglas.wilson@gmail.com>2022-05-26 17:42:00 +0100
commite6f353e55f86b5fcc730aa36a1f27c961778829f (patch)
tree568f6bc6765a504cf2f8ffd0e6691defb119a18f
parent9f6bd52c6e0c9daedfe051478017be350457b0f7 (diff)
downloadhaskell-wip/dougwilson/jsem2.tar.gz
-rw-r--r--compiler/GHC/Driver/Make.hs66
-rw-r--r--compiler/GHC/Driver/MakeSem.hs159
2 files changed, 116 insertions, 109 deletions
diff --git a/compiler/GHC/Driver/Make.hs b/compiler/GHC/Driver/Make.hs
index e1355adbd3..d9a08dd15b 100644
--- a/compiler/GHC/Driver/Make.hs
+++ b/compiler/GHC/Driver/Make.hs
@@ -2631,18 +2631,35 @@ runSeqPipelines :: HscEnv -> Maybe Messager -> [MakeAction] -> IO ()
runSeqPipelines plugin_hsc_env mHscMessager all_pipelines =
let env = MakeEnv { hsc_env = plugin_hsc_env
, withLogger = \_ k -> k id
- , compile_sem = AbstractSem (return ()) (return ()) (return ())
+ , compile_sem = AbstractSem (return ()) (return ())
, env_messager = mHscMessager
}
in runAllPipelines (NumProcessors 1) env all_pipelines
-mkAbstractSem :: WorkerLimit -> IO AbstractSem
-mkAbstractSem worker_limit = case worker_limit of
- NumProcessors n_jobs -> do
- compile_sem <- newQSem n_jobs
- pure $ AbstractSem (waitQSem compile_sem) (signalQSem compile_sem) (pure ())
- JobServer f -> makeSemaphoreJobserver f
+ -- TODO remove this capabilities management, it will be handled by the semaphore
+runNjobsAbstractSem :: Int -> (AbstractSem -> IO a) -> IO a
+runNjobsAbstractSem n_jobs action = do
+ compile_sem <- newQSem n_jobs
+ n_capabilities <- getNumCapabilities
+ n_cpus <- getNumProcessors
+ let
+ asem = AbstractSem (waitQSem compile_sem) (signalQSem compile_sem)
+ set_num_caps n = unless (n_capabilities /= 1) $ setNumCapabilities n
+ updNumCapabilities = do
+ -- Setting number of capabilities more than
+ -- CPU count usually leads to high userspace
+ -- lock contention. #9221
+ set_num_caps $ min n_jobs n_cpus
+ resetNumCapabilities = set_num_caps n_capabilities
+ MC.bracket_ updNumCapabilities resetNumCapabilities $ action asem
+
+runWorkerLimit :: WorkerLimit -> (AbstractSem -> IO a) -> IO a
+runWorkerLimit worker_limit action = case worker_limit of
+ NumProcessors n_jobs ->
+ runNjobsAbstractSem n_jobs action
+ JobServer s ->
+ runPosixSemaphoreAbstractSem s action
-- | Build and run a pipeline
runParPipelines :: WorkerLimit -- ^ How to limit work parallelism
@@ -2667,34 +2684,17 @@ runParPipelines worker_limit plugin_hsc_env mHscMessager all_pipelines = do
thread_safe_logger <- liftIO $ makeThreadSafe (hsc_logger plugin_hsc_env)
let thread_safe_hsc_env = plugin_hsc_env { hsc_logger = thread_safe_logger }
- -- TODO remove this capabilities management, it will be handled by the semaphore
- let updNumCapabilities = liftIO $ do
- n_capabilities <- getNumCapabilities
- n_cpus <- getNumProcessors
- -- Setting number of capabilities more than
- -- CPU count usually leads to high userspace
- -- lock contention. #9221
- let n_caps = case worker_limit of
- NumProcessors n_jobs -> min n_jobs n_cpus
- JobServer _ -> n_cpus
- unless (n_capabilities /= 1) $ setNumCapabilities n_caps
- return n_capabilities
-
- let resetNumCapabilities orig_n = do
- liftIO $ setNumCapabilities orig_n
- atomically $ writeTVar stopped_var True
- wait_log_thread
-
- abstract_sem <- mkAbstractSem worker_limit
- let env = MakeEnv { hsc_env = thread_safe_hsc_env
- , withLogger = withParLog log_queue_queue_var
- , compile_sem = abstract_sem
- , env_messager = mHscMessager
- }
- -- Reset the number of capabilities once the upsweep ends.
- MC.bracket updNumCapabilities resetNumCapabilities $ \_ ->
+ runWorkerLimit worker_limit $ \abstract_sem -> do
+ let env = MakeEnv { hsc_env = thread_safe_hsc_env
+ , withLogger = withParLog log_queue_queue_var
+ , compile_sem = abstract_sem
+ , env_messager = mHscMessager
+ }
+ -- Reset the number of capabilities once the upsweep ends.
runAllPipelines worker_limit env all_pipelines
+ atomically $ writeTVar stopped_var True
+ wait_log_thread
withLocalTmpFS :: RunMakeM a -> RunMakeM a
withLocalTmpFS act = do
diff --git a/compiler/GHC/Driver/MakeSem.hs b/compiler/GHC/Driver/MakeSem.hs
index 7accde353b..199f2aa714 100644
--- a/compiler/GHC/Driver/MakeSem.hs
+++ b/compiler/GHC/Driver/MakeSem.hs
@@ -2,7 +2,7 @@
{-# language NamedFieldPuns #-}
-- |
-module GHC.Driver.MakeSem(AbstractSem(..), withAbstractSem, makeSemaphoreJobserver) where
+module GHC.Driver.MakeSem(AbstractSem(..), withAbstractSem, runPosixSemaphoreAbstractSem) where
import GHC.Prelude
@@ -12,7 +12,7 @@ import qualified Control.Monad.Catch as MC
import Data.Foldable
import Control.Monad
-import Control.Concurrent.QSem
+import Control.Concurrent.MVar
import Control.Concurrent.STM
import Data.Time
import GHC.Data.OrdList
@@ -24,7 +24,6 @@ import GHC.IO.Exception
-- -j1 case or a jobserver
data AbstractSem = AbstractSem { acquireSem :: IO ()
, releaseSem :: IO ()
- , cleanupSem :: IO ()
}
withAbstractSem :: AbstractSem -> IO b -> IO b
@@ -40,7 +39,9 @@ data SemaphoreJobserver
}
data SemaphoreJobserverLoopState
- = SJLSIdle | SJLSAcquiring ThreadId (TVar Bool)| SJLSReleasing ThreadId (TVar Bool)
+ = SJLSIdle
+ | SJLSAcquiring ThreadId (TMVar (Maybe MC.SomeException))
+ | SJLSReleasing ThreadId (TMVar (Maybe MC.SomeException))
-- TODO pull out each operation that twiddles the SemaphoreJobserver into a named function
@@ -87,61 +88,76 @@ modifySjs_ tv = fmap fst . modifySjs tv
sjAcquireThread :: TVar SemaphoreJobserver -> IO SemaphoreJobserverLoopState
sjAcquireThread tv = do
sem <- atomically $ sjSem <$> readTVar tv
- tv_b <- newTVarIO False
+ r_tmv <- newEmptyTMVarIO
tid <- forkIOWithUnmask $ \unmask -> do
- flip MC.finally (atomically $ writeTVar tv_b True) $ do
- r <- MC.try $ unmask $ semWait sem
- case r of
- -- TODO if this is not ThreadKilled then we need to report this back
- Left e | Just ThreadKilled <- MC.fromException e
- -> pure ()
- Right () -> atomically $ modifySjs_ tv $ \sjs -> pure ((), sjs
- { sjTokensOwned = sjTokensOwned sjs + 1
- , sjTokensFree = sjTokensFree sjs + 1})
- pure $ SJLSAcquiring tid tv_b
+ MC.try (unmask $ semPost sem) >>= \x -> atomically $ do
+ r <- case x of
+ Left (e :: MC.SomeException) -> pure $ case MC.fromException e of
+ Just ThreadKilled -> Nothing
+ _ -> Just e
+ Right () -> do
+ modifySjs_ tv $ \sjs -> pure ((), sjs
+ { sjTokensOwned = sjTokensOwned sjs + 1
+ , sjTokensFree = sjTokensFree sjs + 1})
+ pure Nothing
+ putTMVar r_tmv r
+
+ pure $ SJLSAcquiring tid r_tmv
sjReleaseThread :: TVar SemaphoreJobserver -> IO SemaphoreJobserverLoopState
sjReleaseThread tv = do
sem <- atomically $ sjSem <$> readTVar tv
- tv_b <- newTVarIO False
- tid <- forkIOWithUnmask $ \unmask -> do
- flip MC.finally (atomically $ writeTVar tv_b True) $ do
- r <- MC.try $ unmask $ semPost sem
- case r of
- -- TODO if this is not ThreadKilled then we need to report this back
- Left (e :: MC.SomeException)
- -- | Just ThreadKilled <- MC.fromException e
- -> atomically $ modifySjs_ tv $ \sjs -> pure
- ((), sjs { sjTokensFree = sjTokensFree sjs + 1 })
- Right () -> atomically $ modifySjs_ tv $ \sjs -> pure ((), sjs { sjTokensOwned = sjTokensOwned sjs - 1 })
- pure $ SJLSReleasing tid tv_b
+ r_tmv <- newEmptyTMVarIO
+ MC.mask_ $ do
+ still_good <- atomically $ modifySjs_ tv $ \sjs -> if sjGuardRelease sjs
+ then pure (True, sjs { sjTokensFree = sjTokensFree sjs - 1 })
+ else pure (False, sjs)
+ if not still_good
+ then pure SJLSIdle
+ else do
+ tid <- forkIOWithUnmask $ \unmask -> do
+ x <- MC.try (unmask $ semPost sem)
+ atomically $ do
+ r <- case x of
+ Left (e :: MC.SomeException) -> do
+ modifySjs_ tv $ \sjs -> pure
+ ((), sjs { sjTokensFree = sjTokensFree sjs + 1 })
+ pure $ case MC.fromException e of
+ Just ThreadKilled -> Nothing
+ _ -> Just e
+ Right () -> do
+ modifySjs_ tv $ \sjs -> pure ((), sjs { sjTokensOwned = sjTokensOwned sjs - 1 })
+ pure Nothing
+ putTMVar r_tmv r
+ pure $ SJLSReleasing tid r_tmv
sjTryAcquire :: TVar SemaphoreJobserver -> SemaphoreJobserverLoopState -> STM (IO SemaphoreJobserverLoopState)
sjTryAcquire tv SJLSIdle = do
SemaphoreJobserver
- { sjTokensFree } <- readTVar tv
- guard $ sjTokensFree == 0
+ { sjTokensFree, sjWaiting } <- readTVar tv
+ guard $ sjTokensFree == 0 && length sjWaiting > 0
pure $ sjAcquireThread tv
sjTryAcquire _ _ = retry
+sjGuardRelease :: SemaphoreJobserver -> Bool
+sjGuardRelease SemaphoreJobserver { sjTokensFree, sjTokensOwned, sjWaiting} =
+ length sjWaiting == 0 && sjTokensFree > 0 && sjTokensOwned > 1
+
sjTryRelease :: TVar SemaphoreJobserver -> SemaphoreJobserverLoopState -> STM (IO SemaphoreJobserverLoopState)
sjTryRelease tv SJLSIdle = do
- SemaphoreJobserver
- { sjTokensFree, sjTokensOwned, sjWaiting
- } <- readTVar tv
- guard $ length sjWaiting == 0 && sjTokensFree > 0 && sjTokensOwned > 1
+ readTVar tv >>= guard . sjGuardRelease
pure $ sjReleaseThread tv
sjTryRelease _ _ = retry
sjTryNoticeIdle :: TVar SemaphoreJobserver -> SemaphoreJobserverLoopState -> STM (IO SemaphoreJobserverLoopState)
sjTryNoticeIdle tv ls = case ls of
- SJLSAcquiring _ tv_b -> sync_num_caps tv_b
- SJLSReleasing _ tv_b -> sync_num_caps tv_b
+ SJLSAcquiring _ tmv -> sync_num_caps tmv
+ SJLSReleasing _ tmv -> sync_num_caps tmv
_ -> retry
where
- sync_num_caps tv_b = do
- readTVar tv_b >>= guard
+ sync_num_caps tmv = do
+ takeTMVar tmv >>= maybe (pure ()) MC.throwM
SemaphoreJobserver
{ sjTokensOwned
} <- readTVar tv
@@ -167,54 +183,45 @@ sjTryStopThread tv ls = case ls of
semaphoreJobserverLoop :: TVar SemaphoreJobserver -> IO ()
semaphoreJobserverLoop tv = loop SJLSIdle where
loop s = do
- action <- atomically $ asum $ (\x -> x tv s) <$> [sjTryRelease, sjTryAcquire, sjTryNoticeIdle, sjTryStopThread ]
- -- sjs <- readTVar tv >>= normaliseSemaphoreJobserver
- -- let
- -- kill_thread_and_return_idle tid = killThread tid $> SJLSIdle
- -- -- TODO we can make this much nicer by 'asum'ing several STM ops together
- -- -- TODO the returned action also needs to call setNumCapabilities
- -- -- TODO rate limiting via registerDelay
- -- case (sjTokensOwned sjs, sjTokensFree sjs, sjWaiting sjs, s) of
- -- (_, 0, NilOL, SJLSIdle) -> retry
- -- (_, 0, NilOL, SJLSAcquiring tid _) ->
- -- pure $ kill_thread_and_return_idle tid
- -- (_, 0, NilOL, SJLSReleasing tid _) ->
- -- pure $ kill_thread_and_return_idle tid
- -- (num_owned, x, NilOL, SJLSIdle)
- -- | x > 0, num_owned > 1 -> do
- -- modifySjs_ tv $ \sjs0 -> let
- -- sjs = sjs0 { sjTokensFree = sjTokensFree sjs0 - 1 }
- -- in pure (sjReleaseThread tv, sjs)
- -- (_, x, NilOL, SJLSAcquiring tid _)
- -- | x > 0 -> pure $ kill_thread_and_return_idle tid
- -- (_, x, NilOL, SJLSReleasing _ tv_b)
- -- | x > 0 -> do
- -- readTVar tv_b >>= guard
- -- pure $ pure SJLSIdle
- -- (_, 0, _ `ConsOL` _, SJLSIdle) -> pure (sjAcquireThread tv)
- -- (_, 0, _ `ConsOL` _, SJLSAcquiring _ tv_b) -> do
- -- readTVar tv_b >>= guard
- -- pure $ pure SJLSIdle
- -- (_, 0, _ `ConsOL` _, SJLSReleasing tid _) ->
- -- pure $ kill_thread_and_return_idle tid
- -- _ -> panic "semaphoreJobserverLoop"
+ action <- atomically $ asum $ (\x -> x tv s) <$>
+ [ sjTryRelease
+ , sjTryAcquire
+ , sjTryNoticeIdle
+ , sjTryStopThread
+ ]
action >>= loop
-makeSemaphoreJobserver :: FilePath -> IO AbstractSem
+makeSemaphoreJobserver :: FilePath -> IO (AbstractSem, IO ())
makeSemaphoreJobserver sem_path = do
sjSem <- semOpen sem_path (OpenSemFlags { semCreate = False, semExclusive = False }) stdFileMode 0
let
init_sjs = SemaphoreJobserver { sjSem, sjTokensOwned = 1, sjTokensFree = 1, sjWaiting = NilOL }
sjs_tv <- newTVarIO init_sjs
- loop_tid <- forkIO $ semaphoreJobserverLoop sjs_tv
+ loop_res_mv <- newEmptyMVar
+ loop_tid <- forkIOWithUnmask $ \unmask -> do
+ r <- try $ unmask $ semaphoreJobserverLoop sjs_tv
+ putMVar loop_res_mv $ case r of
+ Left e
+ | Just ThreadKilled <- fromException e -> Nothing
+ | otherwise -> Just e
+ Right () -> Nothing
let
acquireSem = acquireSemaphoreJobserver sjs_tv
releaseSem = releaseSemaphoreJobserver sjs_tv
- cleanupSem = killThread loop_tid
- pure AbstractSem{..}
-
-
-
-
+ cleanupSem = do
+ -- this is interruptible
+ killThread loop_tid
+ takeMVar loop_res_mv >>= maybe (pure ()) MC.throwM
+ pure (AbstractSem{..}, cleanupSem)
+
+runPosixSemaphoreAbstractSem :: FilePath -> (AbstractSem -> IO a) -> IO a
+runPosixSemaphoreAbstractSem s action = MC.mask $ \unmask -> do
+ (abs, cleanup) <- makeSemaphoreJobserver s
+ r <- try $ unmask $ action abs
+ case r of
+ Left (e1 :: MC.SomeException) -> do
+ (_ :: Either MC.SomeException ()) <- MC.try cleanup
+ MC.throwM e1
+ Right x -> cleanup $> x