diff options
author | Kyle Edwards <kyle.edwards@kitware.com> | 2019-04-25 19:33:50 +0000 |
---|---|---|
committer | Kitware Robot <kwrobot@kitware.com> | 2019-04-25 15:34:12 -0400 |
commit | 1ddce8fd6de1a88af6e81f6cfa36b48b7948a53d (patch) | |
tree | 2a5657e7581cb24d2962e1e1b3f131940633b262 | |
parent | 4ccf40e61e02cffb28b549a3de6f4794ea3e3d92 (diff) | |
parent | 56890ede2a6eed4db074e3fe6c56e5d03dc42b6e (diff) | |
download | cmake-1ddce8fd6de1a88af6e81f6cfa36b48b7948a53d.tar.gz |
Merge topic 'cmWorkerPool_Tweaks'
56890ede2a cmWorkerPool: Factor our worker thread class (internals)
9794b72d38 cmWorkerPool: Set worker thread count separately to Process()
Acked-by: Kitware Robot <kwrobot@kitware.com>
Merge-request: !3260
-rw-r--r-- | Source/cmQtAutoMocUic.cxx | 6 | ||||
-rw-r--r-- | Source/cmWorkerPool.cxx | 261 | ||||
-rw-r--r-- | Source/cmWorkerPool.h | 47 |
3 files changed, 156 insertions, 158 deletions
diff --git a/Source/cmQtAutoMocUic.cxx b/Source/cmQtAutoMocUic.cxx index 75c5d8aee1..005c27d6ee 100644 --- a/Source/cmQtAutoMocUic.cxx +++ b/Source/cmQtAutoMocUic.cxx @@ -1186,6 +1186,7 @@ bool cmQtAutoMocUic::Init(cmMakefile* makefile) num = std::min<unsigned long>(num, ParallelMax); Base_.NumThreads = static_cast<unsigned int>(num); } + WorkerPool_.SetThreadCount(Base_.NumThreads); } // - Files and directories @@ -1482,15 +1483,12 @@ bool cmQtAutoMocUic::Process() if (!CreateDirectories()) { return false; } - - if (!WorkerPool_.Process(Base().NumThreads, this)) { + if (!WorkerPool_.Process(this)) { return false; } - if (JobError_) { return false; } - return SettingsFileWrite(); } diff --git a/Source/cmWorkerPool.cxx b/Source/cmWorkerPool.cxx index 464182c558..cbf070e91c 100644 --- a/Source/cmWorkerPool.cxx +++ b/Source/cmWorkerPool.cxx @@ -371,137 +371,62 @@ void cmUVReadOnlyProcess::UVTryFinish() } /** - * @brief Private worker pool internals + * @brief Worker pool worker thread */ -class cmWorkerPoolInternal +class cmWorkerPoolWorker { public: - // -- Types - - /** - * @brief Worker thread - */ - class WorkerT - { - public: - WorkerT(unsigned int index); - ~WorkerT(); - - WorkerT(WorkerT const&) = delete; - WorkerT& operator=(WorkerT const&) = delete; - - /** - * Start the thread - */ - void Start(cmWorkerPoolInternal* internal); - - /** - * @brief Run an external process - */ - bool RunProcess(cmWorkerPool::ProcessResultT& result, - std::vector<std::string> const& command, - std::string const& workingDirectory); - - // -- Accessors - unsigned int Index() const { return Index_; } - cmWorkerPool::JobHandleT& JobHandle() { return JobHandle_; } - - private: - // -- Libuv callbacks - static void UVProcessStart(uv_async_t* handle); - void UVProcessFinished(); - - private: - //! @brief Job handle - cmWorkerPool::JobHandleT JobHandle_; - //! @brief Worker index - unsigned int Index_; - // -- Process management - struct - { - std::mutex Mutex; - cm::uv_async_ptr Request; - std::condition_variable Condition; - std::unique_ptr<cmUVReadOnlyProcess> ROP; - } Proc_; - // -- System thread - std::thread Thread_; - }; - -public: - // -- Constructors - cmWorkerPoolInternal(cmWorkerPool* pool); - ~cmWorkerPoolInternal(); - - /** - * @brief Runs the libuv loop - */ - bool Process(); + cmWorkerPoolWorker(uv_loop_t& uvLoop); + ~cmWorkerPoolWorker(); - /** - * @brief Clear queue and abort threads - */ - void Abort(); + cmWorkerPoolWorker(cmWorkerPoolWorker const&) = delete; + cmWorkerPoolWorker& operator=(cmWorkerPoolWorker const&) = delete; /** - * @brief Push a job to the queue and notify a worker + * Set the internal thread */ - bool PushJob(cmWorkerPool::JobHandleT&& jobHandle); + void SetThread(std::thread&& aThread) { Thread_ = std::move(aThread); } /** - * @brief Worker thread main loop method + * Run an external process */ - void Work(WorkerT* worker); - - // -- Request slots - static void UVSlotBegin(uv_async_t* handle); - static void UVSlotEnd(uv_async_t* handle); - -public: - // -- UV loop -#ifdef CMAKE_UV_SIGNAL_HACK - std::unique_ptr<cmUVSignalHackRAII> UVHackRAII; -#endif - std::unique_ptr<uv_loop_t> UVLoop; - cm::uv_async_ptr UVRequestBegin; - cm::uv_async_ptr UVRequestEnd; + bool RunProcess(cmWorkerPool::ProcessResultT& result, + std::vector<std::string> const& command, + std::string const& workingDirectory); - // -- Thread pool and job queue - std::mutex Mutex; - bool Aborting = false; - bool FenceProcessing = false; - unsigned int WorkersRunning = 0; - unsigned int WorkersIdle = 0; - unsigned int JobsProcessing = 0; - std::deque<cmWorkerPool::JobHandleT> Queue; - std::condition_variable Condition; - std::vector<std::unique_ptr<WorkerT>> Workers; +private: + // -- Libuv callbacks + static void UVProcessStart(uv_async_t* handle); + void UVProcessFinished(); - // -- References - cmWorkerPool* Pool = nullptr; +private: + // -- Process management + struct + { + std::mutex Mutex; + cm::uv_async_ptr Request; + std::condition_variable Condition; + std::unique_ptr<cmUVReadOnlyProcess> ROP; + } Proc_; + // -- System thread + std::thread Thread_; }; -cmWorkerPoolInternal::WorkerT::WorkerT(unsigned int index) - : Index_(index) +cmWorkerPoolWorker::cmWorkerPoolWorker(uv_loop_t& uvLoop) { + Proc_.Request.init(uvLoop, &cmWorkerPoolWorker::UVProcessStart, this); } -cmWorkerPoolInternal::WorkerT::~WorkerT() +cmWorkerPoolWorker::~cmWorkerPoolWorker() { if (Thread_.joinable()) { Thread_.join(); } } -void cmWorkerPoolInternal::WorkerT::Start(cmWorkerPoolInternal* internal) -{ - Proc_.Request.init(*(internal->UVLoop), &WorkerT::UVProcessStart, this); - Thread_ = std::thread(&cmWorkerPoolInternal::Work, internal, this); -} - -bool cmWorkerPoolInternal::WorkerT::RunProcess( - cmWorkerPool::ProcessResultT& result, - std::vector<std::string> const& command, std::string const& workingDirectory) +bool cmWorkerPoolWorker::RunProcess(cmWorkerPool::ProcessResultT& result, + std::vector<std::string> const& command, + std::string const& workingDirectory) { if (command.empty()) { return false; @@ -524,9 +449,9 @@ bool cmWorkerPoolInternal::WorkerT::RunProcess( return !result.error(); } -void cmWorkerPoolInternal::WorkerT::UVProcessStart(uv_async_t* handle) +void cmWorkerPoolWorker::UVProcessStart(uv_async_t* handle) { - auto* wrk = reinterpret_cast<WorkerT*>(handle->data); + auto* wrk = reinterpret_cast<cmWorkerPoolWorker*>(handle->data); bool startFailed = false; { auto& Proc = wrk->Proc_; @@ -542,7 +467,7 @@ void cmWorkerPoolInternal::WorkerT::UVProcessStart(uv_async_t* handle) } } -void cmWorkerPoolInternal::WorkerT::UVProcessFinished() +void cmWorkerPoolWorker::UVProcessFinished() { { std::lock_guard<std::mutex> lock(Proc_.Mutex); @@ -554,6 +479,65 @@ void cmWorkerPoolInternal::WorkerT::UVProcessFinished() Proc_.Condition.notify_one(); } +/** + * @brief Private worker pool internals + */ +class cmWorkerPoolInternal +{ +public: + // -- Constructors + cmWorkerPoolInternal(cmWorkerPool* pool); + ~cmWorkerPoolInternal(); + + /** + * Runs the libuv loop. + */ + bool Process(); + + /** + * Clear queue and abort threads. + */ + void Abort(); + + /** + * Push a job to the queue and notify a worker. + */ + bool PushJob(cmWorkerPool::JobHandleT&& jobHandle); + + /** + * Worker thread main loop method. + */ + void Work(unsigned int workerIndex); + + // -- Request slots + static void UVSlotBegin(uv_async_t* handle); + static void UVSlotEnd(uv_async_t* handle); + +public: + // -- UV loop +#ifdef CMAKE_UV_SIGNAL_HACK + std::unique_ptr<cmUVSignalHackRAII> UVHackRAII; +#endif + std::unique_ptr<uv_loop_t> UVLoop; + cm::uv_async_ptr UVRequestBegin; + cm::uv_async_ptr UVRequestEnd; + + // -- Thread pool and job queue + std::mutex Mutex; + bool Processing = false; + bool Aborting = false; + bool FenceProcessing = false; + unsigned int WorkersRunning = 0; + unsigned int WorkersIdle = 0; + unsigned int JobsProcessing = 0; + std::deque<cmWorkerPool::JobHandleT> Queue; + std::condition_variable Condition; + std::vector<std::unique_ptr<cmWorkerPoolWorker>> Workers; + + // -- References + cmWorkerPool* Pool = nullptr; +}; + void cmWorkerPool::ProcessResultT::reset() { ExitStatus = 0; @@ -591,7 +575,8 @@ cmWorkerPoolInternal::~cmWorkerPoolInternal() bool cmWorkerPoolInternal::Process() { - // Reset state + // Reset state flags + Processing = true; Aborting = false; // Initialize libuv asynchronous request UVRequestBegin.init(*UVLoop, &cmWorkerPoolInternal::UVSlotBegin, this); @@ -599,23 +584,27 @@ bool cmWorkerPoolInternal::Process() // Send begin request UVRequestBegin.send(); // Run libuv loop - return (uv_run(UVLoop.get(), UV_RUN_DEFAULT) == 0); + bool success = (uv_run(UVLoop.get(), UV_RUN_DEFAULT) == 0); + // Update state flags + Processing = false; + Aborting = false; + return success; } void cmWorkerPoolInternal::Abort() { - bool firstCall = false; + bool notifyThreads = false; // Clear all jobs and set abort flag { std::lock_guard<std::mutex> guard(Mutex); - if (!Aborting) { + if (Processing && !Aborting) { // Register abort and clear queue Aborting = true; Queue.clear(); - firstCall = true; + notifyThreads = true; } } - if (firstCall) { + if (notifyThreads) { // Wake threads Condition.notify_all(); } @@ -627,15 +616,13 @@ inline bool cmWorkerPoolInternal::PushJob(cmWorkerPool::JobHandleT&& jobHandle) if (Aborting) { return false; } - // Append the job to the queue Queue.emplace_back(std::move(jobHandle)); - // Notify an idle worker if there's one if (WorkersIdle != 0) { Condition.notify_one(); } - + // Return success return true; } @@ -648,11 +635,13 @@ void cmWorkerPoolInternal::UVSlotBegin(uv_async_t* handle) // Create workers gint.Workers.reserve(num); for (unsigned int ii = 0; ii != num; ++ii) { - gint.Workers.emplace_back(cm::make_unique<WorkerT>(ii)); + gint.Workers.emplace_back( + cm::make_unique<cmWorkerPoolWorker>(*gint.UVLoop)); } - // Start workers - for (auto& wrk : gint.Workers) { - wrk->Start(&gint); + // Start worker threads + for (unsigned int ii = 0; ii != num; ++ii) { + gint.Workers[ii]->SetThread( + std::thread(&cmWorkerPoolInternal::Work, &gint, ii)); } } // Destroy begin request @@ -668,8 +657,9 @@ void cmWorkerPoolInternal::UVSlotEnd(uv_async_t* handle) gint.UVRequestEnd.reset(); } -void cmWorkerPoolInternal::Work(WorkerT* worker) +void cmWorkerPoolInternal::Work(unsigned int workerIndex) { + cmWorkerPool::JobHandleT jobHandle; std::unique_lock<std::mutex> uLock(Mutex); // Increment running workers count ++WorkersRunning; @@ -698,15 +688,15 @@ void cmWorkerPoolInternal::Work(WorkerT* worker) } // Pop next job from queue - worker->JobHandle() = std::move(Queue.front()); + jobHandle = std::move(Queue.front()); Queue.pop_front(); // Unlocked scope for job processing ++JobsProcessing; { uLock.unlock(); - worker->JobHandle()->Work(Pool, worker->Index()); // Process job - worker->JobHandle().reset(); // Destroy job + jobHandle->Work(Pool, workerIndex); // Process job + jobHandle.reset(); // Destroy job uLock.lock(); } --JobsProcessing; @@ -743,19 +733,22 @@ cmWorkerPool::cmWorkerPool() cmWorkerPool::~cmWorkerPool() = default; -bool cmWorkerPool::Process(unsigned int threadCount, void* userData) +void cmWorkerPool::SetThreadCount(unsigned int threadCount) +{ + if (!Int_->Processing) { + ThreadCount_ = (threadCount > 0) ? threadCount : 1u; + } +} + +bool cmWorkerPool::Process(void* userData) { // Setup user data UserData_ = userData; - ThreadCount_ = (threadCount > 0) ? threadCount : 1u; - // Run libuv loop bool success = Int_->Process(); - // Clear user data UserData_ = nullptr; - ThreadCount_ = 0; - + // Return return success; } diff --git a/Source/cmWorkerPool.h b/Source/cmWorkerPool.h index 71c7d84f0e..f08bb4f8d0 100644 --- a/Source/cmWorkerPool.h +++ b/Source/cmWorkerPool.h @@ -50,12 +50,12 @@ public: JobT& operator=(JobT const&) = delete; /** - * @brief Virtual destructor. + * Virtual destructor. */ virtual ~JobT(); /** - * @brief Fence job flag + * Fence job flag * * Fence jobs require that: * - all jobs before in the queue have been processed @@ -66,7 +66,7 @@ public: protected: /** - * @brief Protected default constructor + * Protected default constructor */ JobT(bool fence = false) : Fence_(fence) @@ -125,12 +125,12 @@ public: }; /** - * @brief Job handle type + * Job handle type */ typedef std::unique_ptr<JobT> JobHandleT; /** - * @brief Fence job base class + * Fence job base class */ class JobFenceT : public JobT { @@ -144,8 +144,9 @@ public: }; /** - * @brief Fence job that aborts the worker pool. - * This class is useful as the last job in the job queue. + * Fence job that aborts the worker pool. + * + * Useful as the last job in the job queue. */ class JobEndT : JobFenceT { @@ -160,23 +161,29 @@ public: ~cmWorkerPool(); /** - * @brief Blocking function that starts threads to process all Jobs in - * the queue. + * Number of worker threads. + */ + unsigned int ThreadCount() const { return ThreadCount_; } + + /** + * Set the number of worker threads. * - * This method blocks until a job calls the Abort() method. - * @arg threadCount Number of threads to process jobs. - * @arg userData Common user data pointer available in all Jobs. + * Calling this method during Process() has no effect. */ - bool Process(unsigned int threadCount, void* userData = nullptr); + void SetThreadCount(unsigned int threadCount); /** - * Number of worker threads passed to Process(). - * Only valid during Process(). + * Blocking function that starts threads to process all Jobs in the queue. + * + * This method blocks until a job calls the Abort() method. + * @arg threadCount Number of threads to process jobs. + * @arg userData Common user data pointer available in all Jobs. */ - unsigned int ThreadCount() const { return ThreadCount_; } + bool Process(void* userData = nullptr); /** * User data reference passed to Process(). + * * Only valid during Process(). */ void* UserData() const { return UserData_; } @@ -184,14 +191,14 @@ public: // -- Job processing interface /** - * @brief Clears the job queue and aborts all worker threads. + * Clears the job queue and aborts all worker threads. * * This method is thread safe and can be called from inside a job. */ void Abort(); /** - * @brief Push job to the queue. + * Push job to the queue. * * This method is thread safe and can be called from inside a job or before * Process(). @@ -199,7 +206,7 @@ public: bool PushJob(JobHandleT&& jobHandle); /** - * @brief Push job to the queue + * Push job to the queue * * This method is thread safe and can be called from inside a job or before * Process(). @@ -212,7 +219,7 @@ public: private: void* UserData_ = nullptr; - unsigned int ThreadCount_ = 0; + unsigned int ThreadCount_ = 1; std::unique_ptr<cmWorkerPoolInternal> Int_; }; |