diff options
author | sheaf <sam.derbyshire@gmail.com> | 2022-09-07 14:22:34 +0200 |
---|---|---|
committer | Marge Bot <ben+marge-bot@smart-cactus.org> | 2023-04-20 18:33:34 -0400 |
commit | 5c8731244bc13a3d813d2a4d53b3188b28dc8355 (patch) | |
tree | 49561b32034c2f4199479290975b2e624c8ffab1 /compiler | |
parent | 7012ec2facc632fe4966916f797e4d1f612d7318 (diff) | |
download | haskell-5c8731244bc13a3d813d2a4d53b3188b28dc8355.tar.gz |
Implement -jsem: parallelism controlled by semaphores
See https://github.com/ghc-proposals/ghc-proposals/pull/540/ for a
complete description for the motivation for this feature.
The `-jsem` option allows a build tool to pass a semaphore to GHC which
GHC can use in order to control how much parallelism it requests.
GHC itself acts as a client in the GHC jobserver protocol.
```
GHC Jobserver Protocol
~~~~~~~~~~~~~~~~~~~~~~
This proposal introduces the GHC Jobserver Protocol. This protocol allows
a server to dynamically invoke many instances of a client process,
while restricting all of those instances to use no more than <n> capabilities.
This is achieved by coordination over a system semaphore (either a POSIX
semaphore [6]_ in the case of Linux and Darwin, or a Win32 semaphore [7]_
in the case of Windows platforms).
There are two kinds of participants in the GHC Jobserver protocol:
- The *jobserver* creates a system semaphore with a certain number of
available tokens.
Each time the jobserver wants to spawn a new jobclient subprocess, it **must**
first acquire a single token from the semaphore, before spawning
the subprocess. This token **must** be released once the subprocess terminates.
Once work is finished, the jobserver **must** destroy the semaphore it created.
- A *jobclient* is a subprocess spawned by the jobserver or another jobclient.
Each jobclient starts with one available token (its *implicit token*,
which was acquired by the parent which spawned it), and can request more
tokens through the Jobserver Protocol by waiting on the semaphore.
Each time a jobclient wants to spawn a new jobclient subprocess, it **must**
pass on a single token to the child jobclient. This token can either be the
jobclient's implicit token, or another token which the jobclient acquired
from the semaphore.
Each jobclient **must** release exactly as many tokens as it has acquired from
the semaphore (this does not include the implicit tokens).
```
Build tools such as cabal act as jobservers in the protocol and are
responsibile for correctly creating, cleaning up and managing the
semaphore.
Adds a new submodule (semaphore-compat) for managing and interacting
with semaphores in a cross-platform way.
Fixes #19349
Diffstat (limited to 'compiler')
-rw-r--r-- | compiler/GHC/Driver/Make.hs | 120 | ||||
-rw-r--r-- | compiler/GHC/Driver/MakeSem.hs | 545 | ||||
-rw-r--r-- | compiler/GHC/Driver/Pipeline/LogQueue.hs | 4 | ||||
-rw-r--r-- | compiler/GHC/Driver/Session.hs | 25 | ||||
-rw-r--r-- | compiler/ghc.cabal.in | 2 |
5 files changed, 640 insertions, 56 deletions
diff --git a/compiler/GHC/Driver/Make.hs b/compiler/GHC/Driver/Make.hs index d72b452d2e..c047056ea6 100644 --- a/compiler/GHC/Driver/Make.hs +++ b/compiler/GHC/Driver/Make.hs @@ -75,6 +75,7 @@ import GHC.Driver.Env import GHC.Driver.Errors import GHC.Driver.Errors.Types import GHC.Driver.Main +import GHC.Driver.MakeSem import GHC.Parser.Header @@ -151,10 +152,10 @@ import GHC.Runtime.Loader import GHC.Rename.Names import GHC.Utils.Constants import GHC.Types.Unique.DFM (udfmRestrictKeysSet) -import qualified Data.IntSet as I import GHC.Types.Unique import GHC.Iface.Errors.Types +import qualified Data.IntSet as I -- ----------------------------------------------------------------------------- -- Loading the program @@ -664,6 +665,30 @@ createBuildPlan mod_graph maybe_top_mod = (vcat [text "Build plan missing nodes:", (text "PLAN:" <+> ppr (sum (map countMods build_plan))), (text "GRAPH:" <+> ppr (length (mgModSummaries' mod_graph )))]) build_plan +mkWorkerLimit :: DynFlags -> IO WorkerLimit +mkWorkerLimit dflags = + case parMakeCount dflags of + Nothing -> pure $ num_procs 1 + Just (ParMakeSemaphore h) -> pure (JSemLimit (SemaphoreName h)) + Just ParMakeNumProcessors -> num_procs <$> getNumProcessors + Just (ParMakeThisMany n) -> pure $ num_procs n + where + num_procs x = NumProcessorsLimit (max 1 x) + +isWorkerLimitSequential :: WorkerLimit -> Bool +isWorkerLimitSequential (NumProcessorsLimit x) = x <= 1 +isWorkerLimitSequential (JSemLimit {}) = False + +-- | This describes what we use to limit the number of jobs, either we limit it +-- ourselves to a specific number or we have an external parallelism semaphore +-- limit it for us. +data WorkerLimit + = NumProcessorsLimit Int + | JSemLimit + SemaphoreName + -- ^ Semaphore name to use + deriving Eq + -- | Generalized version of 'load' which also supports a custom -- 'Messager' (for reporting progress) and 'ModuleGraph' (generally -- produced by calling 'depanal'. @@ -744,14 +769,12 @@ load' mhmi_cache how_much mHscMessage mod_graph = do liftIO $ debugTraceMsg logger 2 (hang (text "Ready for upsweep") 2 (ppr build_plan)) - n_jobs <- case parMakeCount (hsc_dflags hsc_env) of - Nothing -> liftIO getNumProcessors - Just n -> return n + worker_limit <- liftIO $ mkWorkerLimit dflags setSession $ hscUpdateHUG (unitEnv_map pruneHomeUnitEnv) hsc_env (upsweep_ok, hsc_env1) <- withDeferredDiagnostics $ do hsc_env <- getSession - liftIO $ upsweep n_jobs hsc_env mhmi_cache mHscMessage (toCache pruned_cache) build_plan + liftIO $ upsweep worker_limit hsc_env mhmi_cache mHscMessage (toCache pruned_cache) build_plan setSession hsc_env1 case upsweep_ok of Failed -> loadFinish upsweep_ok @@ -1036,13 +1059,7 @@ getDependencies direct_deps build_map = type BuildM a = StateT BuildLoopState IO a --- | Abstraction over the operations of a semaphore which allows usage with the --- -j1 case -data AbstractSem = AbstractSem { acquireSem :: IO () - , releaseSem :: IO () } -withAbstractSem :: AbstractSem -> IO b -> IO b -withAbstractSem sem = MC.bracket_ (acquireSem sem) (releaseSem sem) -- | Environment used when compiling a module data MakeEnv = MakeEnv { hsc_env :: !HscEnv -- The basic HscEnv which will be augmented for each module @@ -1227,7 +1244,7 @@ withCurrentUnit uid = do local (\env -> env { hsc_env = hscSetActiveUnitId uid (hsc_env env)}) upsweep - :: Int -- ^ The number of workers we wish to run in parallel + :: WorkerLimit -- ^ The number of workers we wish to run in parallel -> HscEnv -- ^ The base HscEnv, which is augmented for each module -> Maybe ModIfaceCache -- ^ A cache to incrementally write final interface files to -> Maybe Messager @@ -2832,7 +2849,7 @@ label_self thread_name = do CC.labelThread self_tid thread_name -runPipelines :: Int -> HscEnv -> Maybe Messager -> [MakeAction] -> IO () +runPipelines :: WorkerLimit -> HscEnv -> Maybe Messager -> [MakeAction] -> IO () -- Don't even initialise plugins if there are no pipelines runPipelines _ _ _ [] = return () runPipelines n_job orig_hsc_env mHscMessager all_pipelines = do @@ -2840,7 +2857,7 @@ runPipelines n_job orig_hsc_env mHscMessager all_pipelines = do plugins_hsc_env <- initializePlugins orig_hsc_env case n_job of - 1 -> runSeqPipelines plugins_hsc_env mHscMessager all_pipelines + NumProcessorsLimit n | n <= 1 -> runSeqPipelines plugins_hsc_env mHscMessager all_pipelines _n -> runParPipelines n_job plugins_hsc_env mHscMessager all_pipelines runSeqPipelines :: HscEnv -> Maybe Messager -> [MakeAction] -> IO () @@ -2850,16 +2867,38 @@ runSeqPipelines plugin_hsc_env mHscMessager all_pipelines = , compile_sem = AbstractSem (return ()) (return ()) , env_messager = mHscMessager } - in runAllPipelines 1 env all_pipelines + in runAllPipelines (NumProcessorsLimit 1) env all_pipelines +runNjobsAbstractSem :: Int -> (AbstractSem -> IO a) -> IO a +runNjobsAbstractSem n_jobs action = do + compile_sem <- newQSem n_jobs + n_capabilities <- getNumCapabilities + n_cpus <- getNumProcessors + let + asem = AbstractSem (waitQSem compile_sem) (signalQSem compile_sem) + set_num_caps n = unless (n_capabilities /= 1) $ setNumCapabilities n + updNumCapabilities = do + -- Setting number of capabilities more than + -- CPU count usually leads to high userspace + -- lock contention. #9221 + set_num_caps $ min n_jobs n_cpus + resetNumCapabilities = set_num_caps n_capabilities + MC.bracket_ updNumCapabilities resetNumCapabilities $ action asem + +runWorkerLimit :: WorkerLimit -> (AbstractSem -> IO a) -> IO a +runWorkerLimit worker_limit action = case worker_limit of + NumProcessorsLimit n_jobs -> + runNjobsAbstractSem n_jobs action + JSemLimit sem -> + runJSemAbstractSem sem action -- | Build and run a pipeline -runParPipelines :: Int -- ^ How many capabilities to use - -> HscEnv -- ^ The basic HscEnv which is augmented with specific info for each module +runParPipelines :: WorkerLimit -- ^ How to limit work parallelism + -> HscEnv -- ^ The basic HscEnv which is augmented with specific info for each module -> Maybe Messager -- ^ Optional custom messager to use to report progress -> [MakeAction] -- ^ The build plan for all the module nodes -> IO () -runParPipelines n_jobs plugin_hsc_env mHscMessager all_pipelines = do +runParPipelines worker_limit plugin_hsc_env mHscMessager all_pipelines = do -- A variable which we write to when an error has happened and we have to tell the @@ -2869,39 +2908,23 @@ runParPipelines n_jobs plugin_hsc_env mHscMessager all_pipelines = do -- will add it's LogQueue into this queue. log_queue_queue_var <- newTVarIO newLogQueueQueue -- Thread which coordinates the printing of logs - wait_log_thread <- logThread n_jobs (length all_pipelines) (hsc_logger plugin_hsc_env) stopped_var log_queue_queue_var + wait_log_thread <- logThread (hsc_logger plugin_hsc_env) stopped_var log_queue_queue_var -- Make the logger thread-safe, in case there is some output which isn't sent via the LogQueue. thread_safe_logger <- liftIO $ makeThreadSafe (hsc_logger plugin_hsc_env) let thread_safe_hsc_env = plugin_hsc_env { hsc_logger = thread_safe_logger } - let updNumCapabilities = liftIO $ do - n_capabilities <- getNumCapabilities - n_cpus <- getNumProcessors - -- Setting number of capabilities more than - -- CPU count usually leads to high userspace - -- lock contention. #9221 - let n_caps = min n_jobs n_cpus - unless (n_capabilities /= 1) $ setNumCapabilities n_caps - return n_capabilities - - let resetNumCapabilities orig_n = do - liftIO $ setNumCapabilities orig_n - atomically $ writeTVar stopped_var True - wait_log_thread - - compile_sem <- newQSem n_jobs - let abstract_sem = AbstractSem (waitQSem compile_sem) (signalQSem compile_sem) + runWorkerLimit worker_limit $ \abstract_sem -> do + let env = MakeEnv { hsc_env = thread_safe_hsc_env + , withLogger = withParLog log_queue_queue_var + , compile_sem = abstract_sem + , env_messager = mHscMessager + } -- Reset the number of capabilities once the upsweep ends. - let env = MakeEnv { hsc_env = thread_safe_hsc_env - , withLogger = withParLog log_queue_queue_var - , compile_sem = abstract_sem - , env_messager = mHscMessager - } - - MC.bracket updNumCapabilities resetNumCapabilities $ \_ -> - runAllPipelines n_jobs env all_pipelines + runAllPipelines worker_limit env all_pipelines + atomically $ writeTVar stopped_var True + wait_log_thread withLocalTmpFS :: RunMakeM a -> RunMakeM a withLocalTmpFS act = do @@ -2918,10 +2941,11 @@ withLocalTmpFS act = do MC.bracket initialiser finaliser $ \lcl_hsc_env -> local (\env -> env { hsc_env = lcl_hsc_env}) act -- | Run the given actions and then wait for them all to finish. -runAllPipelines :: Int -> MakeEnv -> [MakeAction] -> IO () -runAllPipelines n_jobs env acts = do - let spawn_actions :: IO [ThreadId] - spawn_actions = if n_jobs == 1 +runAllPipelines :: WorkerLimit -> MakeEnv -> [MakeAction] -> IO () +runAllPipelines worker_limit env acts = do + let single_worker = isWorkerLimitSequential worker_limit + spawn_actions :: IO [ThreadId] + spawn_actions = if single_worker then (:[]) <$> (forkIOWithUnmask $ \unmask -> void $ runLoop (\io -> io unmask) env acts) else runLoop forkIOWithUnmask env acts diff --git a/compiler/GHC/Driver/MakeSem.hs b/compiler/GHC/Driver/MakeSem.hs new file mode 100644 index 0000000000..4e36a26c86 --- /dev/null +++ b/compiler/GHC/Driver/MakeSem.hs @@ -0,0 +1,545 @@ +{-# LANGUAGE BlockArguments #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE TupleSections #-} +{-# LANGUAGE NumericUnderscores #-} + +-- | Implementation of a jobserver using system semaphores. +-- +-- +module GHC.Driver.MakeSem + ( -- * JSem: parallelism semaphore backed + -- by a system semaphore (Posix/Windows) + runJSemAbstractSem + + -- * System semaphores + , Semaphore, SemaphoreName(..) + + -- * Abstract semaphores + , AbstractSem(..) + , withAbstractSem + ) + where + +import GHC.Prelude +import GHC.Conc +import GHC.Data.OrdList +import GHC.IO.Exception +import GHC.Utils.Outputable +import GHC.Utils.Panic +import GHC.Utils.Json + +import System.Semaphore + +import Control.Monad +import qualified Control.Monad.Catch as MC +import Control.Concurrent.MVar +import Control.Concurrent.STM +import Data.Foldable +import Data.Functor +import GHC.Stack +import Debug.Trace + +--------------------------------------- +-- Semaphore jobserver + +-- | A jobserver based off a system 'Semaphore'. +-- +-- Keeps track of the pending jobs and resources +-- available from the semaphore. +data Jobserver + = Jobserver + { jSemaphore :: !Semaphore + -- ^ The semaphore which controls available resources + , jobs :: !(TVar JobResources) + -- ^ The currently pending jobs, and the resources + -- obtained from the semaphore + } + +data JobserverOptions + = JobserverOptions + { releaseDebounce :: !Int + -- ^ Minimum delay, in milliseconds, between acquiring a token + -- and releasing a token. + , setNumCapsDebounce :: !Int + -- ^ Minimum delay, in milliseconds, between two consecutive + -- calls of 'setNumCapabilities'. + } + +defaultJobserverOptions :: JobserverOptions +defaultJobserverOptions = + JobserverOptions + { releaseDebounce = 1000 -- 1 second + , setNumCapsDebounce = 1000 -- 1 second + } + +-- | Resources available for running jobs, i.e. +-- tokens obtained from the parallelism semaphore. +data JobResources + = Jobs + { tokensOwned :: !Int + -- ^ How many tokens have been claimed from the semaphore + , tokensFree :: !Int + -- ^ How many tokens are not currently being used + , jobsWaiting :: !(OrdList (TMVar ())) + -- ^ Pending jobs waiting on a token, the job will be blocked on the TMVar so putting into + -- the TMVar will allow the job to continue. + } + +instance Outputable JobResources where + ppr Jobs{..} + = text "JobResources" <+> + ( braces $ hsep + [ text "owned=" <> ppr tokensOwned + , text "free=" <> ppr tokensFree + , text "num_waiting=" <> ppr (length jobsWaiting) + ] ) + +-- | Add one new token. +addToken :: JobResources -> JobResources +addToken jobs@( Jobs { tokensOwned = owned, tokensFree = free }) + = jobs { tokensOwned = owned + 1, tokensFree = free + 1 } + +-- | Free one token. +addFreeToken :: JobResources -> JobResources +addFreeToken jobs@( Jobs { tokensFree = free }) + = assertPpr (tokensOwned jobs > free) + (text "addFreeToken:" <+> ppr (tokensOwned jobs) <+> ppr free) + $ jobs { tokensFree = free + 1 } + +-- | Use up one token. +removeFreeToken :: JobResources -> JobResources +removeFreeToken jobs@( Jobs { tokensFree = free }) + = assertPpr (free > 0) + (text "removeFreeToken:" <+> ppr free) + $ jobs { tokensFree = free - 1 } + +-- | Return one owned token. +removeOwnedToken :: JobResources -> JobResources +removeOwnedToken jobs@( Jobs { tokensOwned = owned }) + = assertPpr (owned > 1) + (text "removeOwnedToken:" <+> ppr owned) + $ jobs { tokensOwned = owned - 1 } + +-- | Add one new job to the end of the list of pending jobs. +addJob :: TMVar () -> JobResources -> JobResources +addJob job jobs@( Jobs { jobsWaiting = wait }) + = jobs { jobsWaiting = wait `SnocOL` job } + +-- | The state of the semaphore job server. +data JobserverState + = JobserverState + { jobserverAction :: !JobserverAction + -- ^ The current action being performed by the + -- job server. + , canChangeNumCaps :: !(TVar Bool) + -- ^ A TVar that signals whether it has been long + -- enough since we last changed 'numCapabilities'. + , canReleaseToken :: !(TVar Bool) + -- ^ A TVar that signals whether we last acquired + -- a token long enough ago that we can now release + -- a token. + } +data JobserverAction + -- | The jobserver is idle: no thread is currently + -- interacting with the semaphore. + = Idle + -- | A thread is waiting for a token on the semaphore. + | Acquiring + { activeWaitId :: WaitId + , threadFinished :: TMVar (Maybe MC.SomeException) } + +-- | Retrieve the 'TMVar' that signals if the current thread has finished, +-- if any thread is currently active in the jobserver. +activeThread_maybe :: JobserverAction -> Maybe (TMVar (Maybe MC.SomeException)) +activeThread_maybe Idle = Nothing +activeThread_maybe (Acquiring { threadFinished = tmvar }) = Just tmvar + +-- | Whether we should try to acquire a new token from the semaphore: +-- there is a pending job and no free tokens. +guardAcquire :: JobResources -> Bool +guardAcquire ( Jobs { tokensFree, jobsWaiting } ) + = tokensFree == 0 && not (null jobsWaiting) + +-- | Whether we should release a token from the semaphore: +-- there are no pending jobs and we can release a token. +guardRelease :: JobResources -> Bool +guardRelease ( Jobs { tokensFree, tokensOwned, jobsWaiting } ) + = null jobsWaiting && tokensFree > 0 && tokensOwned > 1 + +--------------------------------------- +-- Semaphore jobserver implementation + +-- | Add one pending job to the jobserver. +-- +-- Blocks, waiting on the jobserver to supply a free token. +acquireJob :: TVar JobResources -> IO () +acquireJob jobs_tvar = do + (job_tmvar, _jobs0) <- tracedAtomically "acquire" $ + modifyJobResources jobs_tvar \ jobs -> do + job_tmvar <- newEmptyTMVar + return ((job_tmvar, jobs), addJob job_tmvar jobs) + atomically $ takeTMVar job_tmvar + +-- | Signal to the job server that one job has completed, +-- releasing its corresponding token. +releaseJob :: TVar JobResources -> IO () +releaseJob jobs_tvar = do + tracedAtomically "release" do + modifyJobResources jobs_tvar \ jobs -> do + massertPpr (tokensFree jobs < tokensOwned jobs) + (text "releaseJob: more free jobs than owned jobs!") + return ((), addFreeToken jobs) + + +-- | Release all tokens owned from the semaphore (to clean up +-- the jobserver at the end). +cleanupJobserver :: Jobserver -> IO () +cleanupJobserver (Jobserver { jSemaphore = sem + , jobs = jobs_tvar }) + = do + Jobs { tokensOwned = owned } <- readTVarIO jobs_tvar + let toks_to_release = owned - 1 + -- Subtract off the implicit token: whoever spawned the ghc process + -- in the first place is responsible for that token. + releaseSemaphore sem toks_to_release + +-- | Dispatch the available tokens acquired from the semaphore +-- to the pending jobs in the job server. +dispatchTokens :: JobResources -> STM JobResources +dispatchTokens jobs@( Jobs { tokensFree = toks_free, jobsWaiting = wait } ) + | toks_free > 0 + , next `ConsOL` rest <- wait + -- There's a pending job and a free token: + -- pass on the token to that job, and recur. + = do + putTMVar next () + let jobs' = jobs { tokensFree = toks_free - 1, jobsWaiting = rest } + dispatchTokens jobs' + | otherwise + = return jobs + +-- | Update the available resources used from a semaphore, dispatching +-- any newly acquired resources. +-- +-- Invariant: if the number of available resources decreases, there +-- must be no pending jobs. +-- +-- All modifications should go through this function to ensure the contents +-- of the 'TVar' remains in normal form. +modifyJobResources :: HasCallStack => TVar JobResources + -> (JobResources -> STM (a, JobResources)) + -> STM (a, Maybe JobResources) +modifyJobResources jobs_tvar action = do + old_jobs <- readTVar jobs_tvar + (a, jobs) <- action old_jobs + + -- Check the invariant: if the number of free tokens has decreased, + -- there must be no pending jobs. + massertPpr (null (jobsWaiting jobs) || tokensFree jobs >= tokensFree old_jobs) $ + vcat [ text "modiyJobResources: pending jobs but fewer free tokens" ] + dispatched_jobs <- dispatchTokens jobs + writeTVar jobs_tvar dispatched_jobs + return (a, Just dispatched_jobs) + + +tracedAtomically_ :: String -> STM (Maybe JobResources) -> IO () +tracedAtomically_ s act = tracedAtomically s (((),) <$> act) + +tracedAtomically :: String -> STM (a, Maybe JobResources) -> IO a +tracedAtomically origin act = do + (a, mjr) <- atomically act + forM_ mjr $ \ jr -> do + -- Use the "jsem:" prefix to identify where the write traces are + traceEventIO ("jsem:" ++ renderJobResources origin jr) + return a + +renderJobResources :: String -> JobResources -> String +renderJobResources origin (Jobs own free pending) = showSDocUnsafe $ renderJSON $ + JSObject [ ("name", JSString origin) + , ("owned", JSInt own) + , ("free", JSInt free) + , ("pending", JSInt (length pending) ) + ] + + +-- | Spawn a new thread that waits on the semaphore in order to acquire +-- an additional token. +acquireThread :: Jobserver -> IO JobserverAction +acquireThread (Jobserver { jSemaphore = sem, jobs = jobs_tvar }) = do + threadFinished_tmvar <- newEmptyTMVarIO + let + wait_result_action :: Either MC.SomeException Bool -> IO () + wait_result_action wait_res = + tracedAtomically_ "acquire_thread" do + (r, jb) <- case wait_res of + Left (e :: MC.SomeException) -> do + return $ (Just e, Nothing) + Right success -> do + if success + then do + modifyJobResources jobs_tvar \ jobs -> + return (Nothing, addToken jobs) + else + return (Nothing, Nothing) + putTMVar threadFinished_tmvar r + return jb + wait_id <- forkWaitOnSemaphoreInterruptible sem wait_result_action + labelThread (waitingThreadId wait_id) "acquire_thread" + return $ Acquiring { activeWaitId = wait_id + , threadFinished = threadFinished_tmvar } + +-- | Spawn a thread to release ownership of one resource from the semaphore, +-- provided we have spare resources and no pending jobs. +releaseThread :: Jobserver -> IO JobserverAction +releaseThread (Jobserver { jSemaphore = sem, jobs = jobs_tvar }) = do + threadFinished_tmvar <- newEmptyTMVarIO + MC.mask_ do + -- Pre-release the resource so that another thread doesn't take control of it + -- just as we release the lock on the semaphore. + still_ok_to_release + <- tracedAtomically "pre_release" $ + modifyJobResources jobs_tvar \ jobs -> + if guardRelease jobs + -- TODO: should this also debounce? + then return (True , removeOwnedToken $ removeFreeToken jobs) + else return (False, jobs) + if not still_ok_to_release + then return Idle + else do + tid <- forkIO $ do + x <- MC.try $ releaseSemaphore sem 1 + tracedAtomically_ "post-release" $ do + (r, jobs) <- case x of + Left (e :: MC.SomeException) -> do + modifyJobResources jobs_tvar \ jobs -> + return (Just e, addToken jobs) + Right _ -> do + return (Nothing, Nothing) + putTMVar threadFinished_tmvar r + return jobs + labelThread tid "release_thread" + return Idle + +-- | When there are pending jobs but no free tokens, +-- spawn a thread to acquire a new token from the semaphore. +-- +-- See 'acquireThread'. +tryAcquire :: JobserverOptions + -> Jobserver + -> JobserverState + -> STM (IO JobserverState) +tryAcquire opts js@( Jobserver { jobs = jobs_tvar }) + st@( JobserverState { jobserverAction = Idle } ) + = do + jobs <- readTVar jobs_tvar + guard $ guardAcquire jobs + return do + action <- acquireThread js + -- Set a debounce after acquiring a token. + can_release_tvar <- registerDelay $ (releaseDebounce opts * 1000) + return $ st { jobserverAction = action + , canReleaseToken = can_release_tvar } +tryAcquire _ _ _ = retry + +-- | When there are free tokens and no pending jobs, +-- spawn a thread to release a token from the semamphore. +-- +-- See 'releaseThread'. +tryRelease :: Jobserver + -> JobserverState + -> STM (IO JobserverState) +tryRelease sjs@( Jobserver { jobs = jobs_tvar } ) + st@( JobserverState + { jobserverAction = Idle + , canReleaseToken = can_release_tvar } ) + = do + jobs <- readTVar jobs_tvar + guard $ guardRelease jobs + can_release <- readTVar can_release_tvar + guard can_release + return do + action <- releaseThread sjs + return $ st { jobserverAction = action } +tryRelease _ _ = retry + +-- | Wait for an active thread to finish. Once it finishes: +-- +-- - set the 'JobserverAction' to 'Idle', +-- - update the number of capabilities to reflect the number +-- of owned tokens from the semaphore. +tryNoticeIdle :: JobserverOptions + -> TVar JobResources + -> JobserverState + -> STM (IO JobserverState) +tryNoticeIdle opts jobs_tvar jobserver_state + | Just threadFinished_tmvar <- activeThread_maybe $ jobserverAction jobserver_state + = sync_num_caps (canChangeNumCaps jobserver_state) threadFinished_tmvar + | otherwise + = retry -- no active thread: wait until jobserver isn't idle + where + sync_num_caps :: TVar Bool + -> TMVar (Maybe MC.SomeException) + -> STM (IO JobserverState) + sync_num_caps can_change_numcaps_tvar threadFinished_tmvar = do + mb_ex <- takeTMVar threadFinished_tmvar + for_ mb_ex MC.throwM + Jobs { tokensOwned } <- readTVar jobs_tvar + can_change_numcaps <- readTVar can_change_numcaps_tvar + guard can_change_numcaps + return do + x <- getNumCapabilities + can_change_numcaps_tvar_2 <- + if x == tokensOwned + then return can_change_numcaps_tvar + else do + setNumCapabilities tokensOwned + registerDelay $ (setNumCapsDebounce opts * 1000) + return $ + jobserver_state + { jobserverAction = Idle + , canChangeNumCaps = can_change_numcaps_tvar_2 } + +-- | Try to stop the current thread which is acquiring/releasing resources +-- if that operation is no longer relevant. +tryStopThread :: TVar JobResources + -> JobserverState + -> STM (IO JobserverState) +tryStopThread jobs_tvar jsj = do + case jobserverAction jsj of + Acquiring { activeWaitId = wait_id } -> do + jobs <- readTVar jobs_tvar + guard $ null (jobsWaiting jobs) + return do + interruptWaitOnSemaphore wait_id + return $ jsj { jobserverAction = Idle } + _ -> retry + +-- | Main jobserver loop: acquire/release resources as +-- needed for the pending jobs and available semaphore tokens. +jobserverLoop :: JobserverOptions -> Jobserver -> IO () +jobserverLoop opts sjs@(Jobserver { jobs = jobs_tvar }) + = do + true_tvar <- newTVarIO True + let init_state :: JobserverState + init_state = + JobserverState + { jobserverAction = Idle + , canChangeNumCaps = true_tvar + , canReleaseToken = true_tvar } + loop init_state + where + loop s = do + action <- atomically $ asum $ (\x -> x s) <$> + [ tryRelease sjs + , tryAcquire opts sjs + , tryNoticeIdle opts jobs_tvar + , tryStopThread jobs_tvar + ] + s <- action + loop s + +-- | Create a new jobserver using the given semaphore handle. +makeJobserver :: SemaphoreName -> IO (AbstractSem, IO ()) +makeJobserver sem_name = do + semaphore <- openSemaphore sem_name + let + init_jobs = + Jobs { tokensOwned = 1 + , tokensFree = 1 + , jobsWaiting = NilOL + } + jobs_tvar <- newTVarIO init_jobs + let + opts = defaultJobserverOptions -- TODO: allow this to be configured + sjs = Jobserver { jSemaphore = semaphore + , jobs = jobs_tvar } + loop_finished_mvar <- newEmptyMVar + loop_tid <- forkIOWithUnmask \ unmask -> do + r <- try $ unmask $ jobserverLoop opts sjs + putMVar loop_finished_mvar $ + case r of + Left e + | Just ThreadKilled <- fromException e + -> Nothing + | otherwise + -> Just e + Right () -> Nothing + labelThread loop_tid "job_server" + let + acquireSem = acquireJob jobs_tvar + releaseSem = releaseJob jobs_tvar + cleanupSem = do + -- this is interruptible + cleanupJobserver sjs + killThread loop_tid + mb_ex <- takeMVar loop_finished_mvar + for_ mb_ex MC.throwM + + return (AbstractSem{..}, cleanupSem) + +-- | Implement an abstract semaphore using a semaphore 'Jobserver' +-- which queries the system semaphore of the given name for resources. +runJSemAbstractSem :: SemaphoreName -- ^ the system semaphore to use + -> (AbstractSem -> IO a) -- ^ the operation to run + -- which requires a semaphore + -> IO a +runJSemAbstractSem sem action = MC.mask \ unmask -> do + (abs, cleanup) <- makeJobserver sem + r <- try $ unmask $ action abs + case r of + Left (e1 :: MC.SomeException) -> do + (_ :: Either MC.SomeException ()) <- MC.try cleanup + MC.throwM e1 + Right x -> cleanup $> x + +{- Note [Architecture of the Job Server] +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +In `-jsem` mode, the amount of parallelism that GHC can use is controlled by a +system semaphore. We take resources from the semaphore when we need them, and +give them back if we don't have enough to do. + +A naive implementation would just take and release the semaphore around performing +the action, but this leads to two issues: + +* When taking a token in the semaphore, we must call `setNumCapabilities` in order + to adjust how many capabilities are available for parallel garbage collection. + This causes unnecessary synchronisations. +* We want to implement a debounce, so that whilst there is pending work in the + current process we prefer to keep hold of resources from the semaphore. + This reduces overall memory usage, as there are fewer live GHC processes at once. + +Therefore, the obtention of semaphore resources is separated away from the +request for the resource in the driver. + +A token from the semaphore is requested using `acquireJob`. This creates a pending +job, which is a MVar that can be filled in to signal that the requested token is ready. + +When the job is finished, the token is released by calling `releaseJob`, which just +increases the number of `free` jobs. If there are more pending jobs when the free count +is increased, the token is immediately reused (see `modifyJobResources`). + +The `jobServerLoop` interacts with the system semaphore: when there are pending +jobs, `acquireThread` blocks, waiting for a token from the semaphore. Once a +token is obtained, it increases the owned count. + +When GHC has free tokens (tokens from the semaphore that it is not using), +no pending jobs, and the debounce has expired, then `releaseThread` will +release tokens back to the global semaphore. + +`tryStopThread` attempts to kill threads which are waiting to acquire a resource +when we no longer need it. For example, consider that we attempt to acquire two +tokens, but the first job finishes before we acquire the second token. +This second token is no longer needed, so we should cancel the wait +(as it would not be used to do any work, and not be returned until the debounce). +We only need to kill `acquireJob`, because `releaseJob` never blocks. + +Note [Eventlog Messages for jsem] +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +It can be tricky to verify that the work is shared adequately across different +processes. To help debug this, we output the values of `JobResource` to the +eventlog whenever the global state changes. There are some scripts which can be used +to analyse this output and report statistics about core saturation in the +GitHub repo (https://github.com/mpickering/ghc-jsem-analyse). + +-} diff --git a/compiler/GHC/Driver/Pipeline/LogQueue.hs b/compiler/GHC/Driver/Pipeline/LogQueue.hs index 454cc8c870..499dff6dfa 100644 --- a/compiler/GHC/Driver/Pipeline/LogQueue.hs +++ b/compiler/GHC/Driver/Pipeline/LogQueue.hs @@ -100,10 +100,10 @@ dequeueLogQueueQueue (LogQueueQueue n lqq) = case IM.minViewWithKey lqq of Just ((k, v), lqq') | k == n -> Just (v, LogQueueQueue (n + 1) lqq') _ -> Nothing -logThread :: Int -> Int -> Logger -> TVar Bool -- Signal that no more new logs will be added, clear the queue and exit +logThread :: Logger -> TVar Bool -- Signal that no more new logs will be added, clear the queue and exit -> TVar LogQueueQueue -- Queue for logs -> IO (IO ()) -logThread _ _ logger stopped lqq_var = do +logThread logger stopped lqq_var = do finished_var <- newEmptyMVar _ <- forkIO $ print_logs *> putMVar finished_var () return (takeMVar finished_var) diff --git a/compiler/GHC/Driver/Session.hs b/compiler/GHC/Driver/Session.hs index 46290d4ade..d6dd214c75 100644 --- a/compiler/GHC/Driver/Session.hs +++ b/compiler/GHC/Driver/Session.hs @@ -48,6 +48,7 @@ module GHC.Driver.Session ( needSourceNotes, OnOff(..), DynFlags(..), + ParMakeCount(..), outputFile, objectSuf, ways, FlagSpec(..), HasDynFlags(..), ContainsDynFlags(..), @@ -467,9 +468,9 @@ data DynFlags = DynFlags { ruleCheck :: Maybe String, strictnessBefore :: [Int], -- ^ Additional demand analysis - parMakeCount :: Maybe Int, -- ^ The number of modules to compile in parallel - -- in --make mode, where Nothing ==> compile as - -- many in parallel as there are CPUs. + parMakeCount :: Maybe ParMakeCount, + -- ^ The number of modules to compile in parallel + -- If unspecified, compile with a single job. enableTimeStats :: Bool, -- ^ Enable RTS timing statistics? ghcHeapSize :: Maybe Int, -- ^ The heap size to set. @@ -791,6 +792,16 @@ instance (Monad m, HasDynFlags m) => HasDynFlags (ExceptT e m) where class ContainsDynFlags t where extractDynFlags :: t -> DynFlags +-- | The type for the -jN argument, specifying that -j on its own represents +-- using the number of machine processors. +data ParMakeCount + -- | Use this many processors (@-j<n>@ flag). + = ParMakeThisMany Int + -- | Use parallelism with as many processors as possible (@-j@ flag without an argument). + | ParMakeNumProcessors + -- | Use the specific semaphore @<sem>@ to control parallelism (@-jsem <sem>@ flag). + | ParMakeSemaphore FilePath + ----------------------------------------------------------------------------- -- Accessors from 'DynFlags' @@ -1154,7 +1165,7 @@ defaultDynFlags mySettings = historySize = 20, strictnessBefore = [], - parMakeCount = Just 1, + parMakeCount = Nothing, enableTimeStats = False, ghcHeapSize = Nothing, @@ -2120,14 +2131,16 @@ dynamic_flags_deps = [ , make_ord_flag defGhcFlag "j" (OptIntSuffix (\n -> case n of Just n - | n > 0 -> upd (\d -> d { parMakeCount = Just n }) + | n > 0 -> upd (\d -> d { parMakeCount = Just (ParMakeThisMany n) }) | otherwise -> addErr "Syntax: -j[n] where n > 0" - Nothing -> upd (\d -> d { parMakeCount = Nothing }))) + Nothing -> upd (\d -> d { parMakeCount = Just ParMakeNumProcessors }))) -- When the number of parallel builds -- is omitted, it is the same -- as specifying that the number of -- parallel builds is equal to the -- result of getNumProcessors + , make_ord_flag defGhcFlag "jsem" $ hasArg $ \f d -> d { parMakeCount = Just (ParMakeSemaphore f) } + , make_ord_flag defFlag "instantiated-with" (sepArg setUnitInstantiations) , make_ord_flag defFlag "this-component-id" (sepArg setUnitInstanceOf) diff --git a/compiler/ghc.cabal.in b/compiler/ghc.cabal.in index 59a033d568..4194dd7b05 100644 --- a/compiler/ghc.cabal.in +++ b/compiler/ghc.cabal.in @@ -85,6 +85,7 @@ Library hpc == 0.6.*, transformers >= 0.5 && < 0.7, exceptions == 0.10.*, + semaphore-compat, stm, ghc-boot == @ProjectVersionMunged@, ghc-heap == @ProjectVersionMunged@, @@ -436,6 +437,7 @@ Library GHC.Driver.GenerateCgIPEStub GHC.Driver.Hooks GHC.Driver.LlvmConfigCache + GHC.Driver.MakeSem GHC.Driver.Main GHC.Driver.Make GHC.Driver.MakeFile |