diff options
Diffstat (limited to 'compiler/GHC/Driver/Make.hs')
-rw-r--r-- | compiler/GHC/Driver/Make.hs | 120 |
1 files changed, 72 insertions, 48 deletions
diff --git a/compiler/GHC/Driver/Make.hs b/compiler/GHC/Driver/Make.hs index d72b452d2e..c047056ea6 100644 --- a/compiler/GHC/Driver/Make.hs +++ b/compiler/GHC/Driver/Make.hs @@ -75,6 +75,7 @@ import GHC.Driver.Env import GHC.Driver.Errors import GHC.Driver.Errors.Types import GHC.Driver.Main +import GHC.Driver.MakeSem import GHC.Parser.Header @@ -151,10 +152,10 @@ import GHC.Runtime.Loader import GHC.Rename.Names import GHC.Utils.Constants import GHC.Types.Unique.DFM (udfmRestrictKeysSet) -import qualified Data.IntSet as I import GHC.Types.Unique import GHC.Iface.Errors.Types +import qualified Data.IntSet as I -- ----------------------------------------------------------------------------- -- Loading the program @@ -664,6 +665,30 @@ createBuildPlan mod_graph maybe_top_mod = (vcat [text "Build plan missing nodes:", (text "PLAN:" <+> ppr (sum (map countMods build_plan))), (text "GRAPH:" <+> ppr (length (mgModSummaries' mod_graph )))]) build_plan +mkWorkerLimit :: DynFlags -> IO WorkerLimit +mkWorkerLimit dflags = + case parMakeCount dflags of + Nothing -> pure $ num_procs 1 + Just (ParMakeSemaphore h) -> pure (JSemLimit (SemaphoreName h)) + Just ParMakeNumProcessors -> num_procs <$> getNumProcessors + Just (ParMakeThisMany n) -> pure $ num_procs n + where + num_procs x = NumProcessorsLimit (max 1 x) + +isWorkerLimitSequential :: WorkerLimit -> Bool +isWorkerLimitSequential (NumProcessorsLimit x) = x <= 1 +isWorkerLimitSequential (JSemLimit {}) = False + +-- | This describes what we use to limit the number of jobs, either we limit it +-- ourselves to a specific number or we have an external parallelism semaphore +-- limit it for us. +data WorkerLimit + = NumProcessorsLimit Int + | JSemLimit + SemaphoreName + -- ^ Semaphore name to use + deriving Eq + -- | Generalized version of 'load' which also supports a custom -- 'Messager' (for reporting progress) and 'ModuleGraph' (generally -- produced by calling 'depanal'. @@ -744,14 +769,12 @@ load' mhmi_cache how_much mHscMessage mod_graph = do liftIO $ debugTraceMsg logger 2 (hang (text "Ready for upsweep") 2 (ppr build_plan)) - n_jobs <- case parMakeCount (hsc_dflags hsc_env) of - Nothing -> liftIO getNumProcessors - Just n -> return n + worker_limit <- liftIO $ mkWorkerLimit dflags setSession $ hscUpdateHUG (unitEnv_map pruneHomeUnitEnv) hsc_env (upsweep_ok, hsc_env1) <- withDeferredDiagnostics $ do hsc_env <- getSession - liftIO $ upsweep n_jobs hsc_env mhmi_cache mHscMessage (toCache pruned_cache) build_plan + liftIO $ upsweep worker_limit hsc_env mhmi_cache mHscMessage (toCache pruned_cache) build_plan setSession hsc_env1 case upsweep_ok of Failed -> loadFinish upsweep_ok @@ -1036,13 +1059,7 @@ getDependencies direct_deps build_map = type BuildM a = StateT BuildLoopState IO a --- | Abstraction over the operations of a semaphore which allows usage with the --- -j1 case -data AbstractSem = AbstractSem { acquireSem :: IO () - , releaseSem :: IO () } -withAbstractSem :: AbstractSem -> IO b -> IO b -withAbstractSem sem = MC.bracket_ (acquireSem sem) (releaseSem sem) -- | Environment used when compiling a module data MakeEnv = MakeEnv { hsc_env :: !HscEnv -- The basic HscEnv which will be augmented for each module @@ -1227,7 +1244,7 @@ withCurrentUnit uid = do local (\env -> env { hsc_env = hscSetActiveUnitId uid (hsc_env env)}) upsweep - :: Int -- ^ The number of workers we wish to run in parallel + :: WorkerLimit -- ^ The number of workers we wish to run in parallel -> HscEnv -- ^ The base HscEnv, which is augmented for each module -> Maybe ModIfaceCache -- ^ A cache to incrementally write final interface files to -> Maybe Messager @@ -2832,7 +2849,7 @@ label_self thread_name = do CC.labelThread self_tid thread_name -runPipelines :: Int -> HscEnv -> Maybe Messager -> [MakeAction] -> IO () +runPipelines :: WorkerLimit -> HscEnv -> Maybe Messager -> [MakeAction] -> IO () -- Don't even initialise plugins if there are no pipelines runPipelines _ _ _ [] = return () runPipelines n_job orig_hsc_env mHscMessager all_pipelines = do @@ -2840,7 +2857,7 @@ runPipelines n_job orig_hsc_env mHscMessager all_pipelines = do plugins_hsc_env <- initializePlugins orig_hsc_env case n_job of - 1 -> runSeqPipelines plugins_hsc_env mHscMessager all_pipelines + NumProcessorsLimit n | n <= 1 -> runSeqPipelines plugins_hsc_env mHscMessager all_pipelines _n -> runParPipelines n_job plugins_hsc_env mHscMessager all_pipelines runSeqPipelines :: HscEnv -> Maybe Messager -> [MakeAction] -> IO () @@ -2850,16 +2867,38 @@ runSeqPipelines plugin_hsc_env mHscMessager all_pipelines = , compile_sem = AbstractSem (return ()) (return ()) , env_messager = mHscMessager } - in runAllPipelines 1 env all_pipelines + in runAllPipelines (NumProcessorsLimit 1) env all_pipelines +runNjobsAbstractSem :: Int -> (AbstractSem -> IO a) -> IO a +runNjobsAbstractSem n_jobs action = do + compile_sem <- newQSem n_jobs + n_capabilities <- getNumCapabilities + n_cpus <- getNumProcessors + let + asem = AbstractSem (waitQSem compile_sem) (signalQSem compile_sem) + set_num_caps n = unless (n_capabilities /= 1) $ setNumCapabilities n + updNumCapabilities = do + -- Setting number of capabilities more than + -- CPU count usually leads to high userspace + -- lock contention. #9221 + set_num_caps $ min n_jobs n_cpus + resetNumCapabilities = set_num_caps n_capabilities + MC.bracket_ updNumCapabilities resetNumCapabilities $ action asem + +runWorkerLimit :: WorkerLimit -> (AbstractSem -> IO a) -> IO a +runWorkerLimit worker_limit action = case worker_limit of + NumProcessorsLimit n_jobs -> + runNjobsAbstractSem n_jobs action + JSemLimit sem -> + runJSemAbstractSem sem action -- | Build and run a pipeline -runParPipelines :: Int -- ^ How many capabilities to use - -> HscEnv -- ^ The basic HscEnv which is augmented with specific info for each module +runParPipelines :: WorkerLimit -- ^ How to limit work parallelism + -> HscEnv -- ^ The basic HscEnv which is augmented with specific info for each module -> Maybe Messager -- ^ Optional custom messager to use to report progress -> [MakeAction] -- ^ The build plan for all the module nodes -> IO () -runParPipelines n_jobs plugin_hsc_env mHscMessager all_pipelines = do +runParPipelines worker_limit plugin_hsc_env mHscMessager all_pipelines = do -- A variable which we write to when an error has happened and we have to tell the @@ -2869,39 +2908,23 @@ runParPipelines n_jobs plugin_hsc_env mHscMessager all_pipelines = do -- will add it's LogQueue into this queue. log_queue_queue_var <- newTVarIO newLogQueueQueue -- Thread which coordinates the printing of logs - wait_log_thread <- logThread n_jobs (length all_pipelines) (hsc_logger plugin_hsc_env) stopped_var log_queue_queue_var + wait_log_thread <- logThread (hsc_logger plugin_hsc_env) stopped_var log_queue_queue_var -- Make the logger thread-safe, in case there is some output which isn't sent via the LogQueue. thread_safe_logger <- liftIO $ makeThreadSafe (hsc_logger plugin_hsc_env) let thread_safe_hsc_env = plugin_hsc_env { hsc_logger = thread_safe_logger } - let updNumCapabilities = liftIO $ do - n_capabilities <- getNumCapabilities - n_cpus <- getNumProcessors - -- Setting number of capabilities more than - -- CPU count usually leads to high userspace - -- lock contention. #9221 - let n_caps = min n_jobs n_cpus - unless (n_capabilities /= 1) $ setNumCapabilities n_caps - return n_capabilities - - let resetNumCapabilities orig_n = do - liftIO $ setNumCapabilities orig_n - atomically $ writeTVar stopped_var True - wait_log_thread - - compile_sem <- newQSem n_jobs - let abstract_sem = AbstractSem (waitQSem compile_sem) (signalQSem compile_sem) + runWorkerLimit worker_limit $ \abstract_sem -> do + let env = MakeEnv { hsc_env = thread_safe_hsc_env + , withLogger = withParLog log_queue_queue_var + , compile_sem = abstract_sem + , env_messager = mHscMessager + } -- Reset the number of capabilities once the upsweep ends. - let env = MakeEnv { hsc_env = thread_safe_hsc_env - , withLogger = withParLog log_queue_queue_var - , compile_sem = abstract_sem - , env_messager = mHscMessager - } - - MC.bracket updNumCapabilities resetNumCapabilities $ \_ -> - runAllPipelines n_jobs env all_pipelines + runAllPipelines worker_limit env all_pipelines + atomically $ writeTVar stopped_var True + wait_log_thread withLocalTmpFS :: RunMakeM a -> RunMakeM a withLocalTmpFS act = do @@ -2918,10 +2941,11 @@ withLocalTmpFS act = do MC.bracket initialiser finaliser $ \lcl_hsc_env -> local (\env -> env { hsc_env = lcl_hsc_env}) act -- | Run the given actions and then wait for them all to finish. -runAllPipelines :: Int -> MakeEnv -> [MakeAction] -> IO () -runAllPipelines n_jobs env acts = do - let spawn_actions :: IO [ThreadId] - spawn_actions = if n_jobs == 1 +runAllPipelines :: WorkerLimit -> MakeEnv -> [MakeAction] -> IO () +runAllPipelines worker_limit env acts = do + let single_worker = isWorkerLimitSequential worker_limit + spawn_actions :: IO [ThreadId] + spawn_actions = if single_worker then (:[]) <$> (forkIOWithUnmask $ \unmask -> void $ runLoop (\io -> io unmask) env acts) else runLoop forkIOWithUnmask env acts |