diff options
Diffstat (limited to 'Source/cmWorkerPool.cxx')
-rw-r--r-- | Source/cmWorkerPool.cxx | 226 |
1 files changed, 106 insertions, 120 deletions
diff --git a/Source/cmWorkerPool.cxx b/Source/cmWorkerPool.cxx index 75ca47a035..cbf070e91c 100644 --- a/Source/cmWorkerPool.cxx +++ b/Source/cmWorkerPool.cxx @@ -371,138 +371,62 @@ void cmUVReadOnlyProcess::UVTryFinish() } /** - * @brief Private worker pool internals + * @brief Worker pool worker thread */ -class cmWorkerPoolInternal +class cmWorkerPoolWorker { public: - // -- Types - - /** - * @brief Worker thread - */ - class WorkerT - { - public: - WorkerT(unsigned int index); - ~WorkerT(); - - WorkerT(WorkerT const&) = delete; - WorkerT& operator=(WorkerT const&) = delete; - - /** - * Start the thread - */ - void Start(cmWorkerPoolInternal* internal); - - /** - * @brief Run an external process - */ - bool RunProcess(cmWorkerPool::ProcessResultT& result, - std::vector<std::string> const& command, - std::string const& workingDirectory); - - // -- Accessors - unsigned int Index() const { return Index_; } - cmWorkerPool::JobHandleT& JobHandle() { return JobHandle_; } - - private: - // -- Libuv callbacks - static void UVProcessStart(uv_async_t* handle); - void UVProcessFinished(); - - private: - //! @brief Job handle - cmWorkerPool::JobHandleT JobHandle_; - //! @brief Worker index - unsigned int Index_; - // -- Process management - struct - { - std::mutex Mutex; - cm::uv_async_ptr Request; - std::condition_variable Condition; - std::unique_ptr<cmUVReadOnlyProcess> ROP; - } Proc_; - // -- System thread - std::thread Thread_; - }; - -public: - // -- Constructors - cmWorkerPoolInternal(cmWorkerPool* pool); - ~cmWorkerPoolInternal(); + cmWorkerPoolWorker(uv_loop_t& uvLoop); + ~cmWorkerPoolWorker(); - /** - * @brief Runs the libuv loop - */ - bool Process(); - - /** - * @brief Clear queue and abort threads - */ - void Abort(); + cmWorkerPoolWorker(cmWorkerPoolWorker const&) = delete; + cmWorkerPoolWorker& operator=(cmWorkerPoolWorker const&) = delete; /** - * @brief Push a job to the queue and notify a worker + * Set the internal thread */ - bool PushJob(cmWorkerPool::JobHandleT&& jobHandle); + void SetThread(std::thread&& aThread) { Thread_ = std::move(aThread); } /** - * @brief Worker thread main loop method + * Run an external process */ - void Work(WorkerT* worker); + bool RunProcess(cmWorkerPool::ProcessResultT& result, + std::vector<std::string> const& command, + std::string const& workingDirectory); - // -- Request slots - static void UVSlotBegin(uv_async_t* handle); - static void UVSlotEnd(uv_async_t* handle); - -public: - // -- UV loop -#ifdef CMAKE_UV_SIGNAL_HACK - std::unique_ptr<cmUVSignalHackRAII> UVHackRAII; -#endif - std::unique_ptr<uv_loop_t> UVLoop; - cm::uv_async_ptr UVRequestBegin; - cm::uv_async_ptr UVRequestEnd; - - // -- Thread pool and job queue - std::mutex Mutex; - bool Processing = false; - bool Aborting = false; - bool FenceProcessing = false; - unsigned int WorkersRunning = 0; - unsigned int WorkersIdle = 0; - unsigned int JobsProcessing = 0; - std::deque<cmWorkerPool::JobHandleT> Queue; - std::condition_variable Condition; - std::vector<std::unique_ptr<WorkerT>> Workers; +private: + // -- Libuv callbacks + static void UVProcessStart(uv_async_t* handle); + void UVProcessFinished(); - // -- References - cmWorkerPool* Pool = nullptr; +private: + // -- Process management + struct + { + std::mutex Mutex; + cm::uv_async_ptr Request; + std::condition_variable Condition; + std::unique_ptr<cmUVReadOnlyProcess> ROP; + } Proc_; + // -- System thread + std::thread Thread_; }; -cmWorkerPoolInternal::WorkerT::WorkerT(unsigned int index) - : Index_(index) +cmWorkerPoolWorker::cmWorkerPoolWorker(uv_loop_t& uvLoop) { + Proc_.Request.init(uvLoop, &cmWorkerPoolWorker::UVProcessStart, this); } -cmWorkerPoolInternal::WorkerT::~WorkerT() +cmWorkerPoolWorker::~cmWorkerPoolWorker() { if (Thread_.joinable()) { Thread_.join(); } } -void cmWorkerPoolInternal::WorkerT::Start(cmWorkerPoolInternal* internal) -{ - Proc_.Request.init(*(internal->UVLoop), &WorkerT::UVProcessStart, this); - Thread_ = std::thread(&cmWorkerPoolInternal::Work, internal, this); -} - -bool cmWorkerPoolInternal::WorkerT::RunProcess( - cmWorkerPool::ProcessResultT& result, - std::vector<std::string> const& command, std::string const& workingDirectory) +bool cmWorkerPoolWorker::RunProcess(cmWorkerPool::ProcessResultT& result, + std::vector<std::string> const& command, + std::string const& workingDirectory) { if (command.empty()) { return false; @@ -525,9 +449,9 @@ bool cmWorkerPoolInternal::WorkerT::RunProcess( return !result.error(); } -void cmWorkerPoolInternal::WorkerT::UVProcessStart(uv_async_t* handle) +void cmWorkerPoolWorker::UVProcessStart(uv_async_t* handle) { - auto* wrk = reinterpret_cast<WorkerT*>(handle->data); + auto* wrk = reinterpret_cast<cmWorkerPoolWorker*>(handle->data); bool startFailed = false; { auto& Proc = wrk->Proc_; @@ -543,7 +467,7 @@ void cmWorkerPoolInternal::WorkerT::UVProcessStart(uv_async_t* handle) } } -void cmWorkerPoolInternal::WorkerT::UVProcessFinished() +void cmWorkerPoolWorker::UVProcessFinished() { { std::lock_guard<std::mutex> lock(Proc_.Mutex); @@ -555,6 +479,65 @@ void cmWorkerPoolInternal::WorkerT::UVProcessFinished() Proc_.Condition.notify_one(); } +/** + * @brief Private worker pool internals + */ +class cmWorkerPoolInternal +{ +public: + // -- Constructors + cmWorkerPoolInternal(cmWorkerPool* pool); + ~cmWorkerPoolInternal(); + + /** + * Runs the libuv loop. + */ + bool Process(); + + /** + * Clear queue and abort threads. + */ + void Abort(); + + /** + * Push a job to the queue and notify a worker. + */ + bool PushJob(cmWorkerPool::JobHandleT&& jobHandle); + + /** + * Worker thread main loop method. + */ + void Work(unsigned int workerIndex); + + // -- Request slots + static void UVSlotBegin(uv_async_t* handle); + static void UVSlotEnd(uv_async_t* handle); + +public: + // -- UV loop +#ifdef CMAKE_UV_SIGNAL_HACK + std::unique_ptr<cmUVSignalHackRAII> UVHackRAII; +#endif + std::unique_ptr<uv_loop_t> UVLoop; + cm::uv_async_ptr UVRequestBegin; + cm::uv_async_ptr UVRequestEnd; + + // -- Thread pool and job queue + std::mutex Mutex; + bool Processing = false; + bool Aborting = false; + bool FenceProcessing = false; + unsigned int WorkersRunning = 0; + unsigned int WorkersIdle = 0; + unsigned int JobsProcessing = 0; + std::deque<cmWorkerPool::JobHandleT> Queue; + std::condition_variable Condition; + std::vector<std::unique_ptr<cmWorkerPoolWorker>> Workers; + + // -- References + cmWorkerPool* Pool = nullptr; +}; + void cmWorkerPool::ProcessResultT::reset() { ExitStatus = 0; @@ -652,11 +635,13 @@ void cmWorkerPoolInternal::UVSlotBegin(uv_async_t* handle) // Create workers gint.Workers.reserve(num); for (unsigned int ii = 0; ii != num; ++ii) { - gint.Workers.emplace_back(cm::make_unique<WorkerT>(ii)); + gint.Workers.emplace_back( + cm::make_unique<cmWorkerPoolWorker>(*gint.UVLoop)); } - // Start workers - for (auto& wrk : gint.Workers) { - wrk->Start(&gint); + // Start worker threads + for (unsigned int ii = 0; ii != num; ++ii) { + gint.Workers[ii]->SetThread( + std::thread(&cmWorkerPoolInternal::Work, &gint, ii)); } } // Destroy begin request @@ -672,8 +657,9 @@ void cmWorkerPoolInternal::UVSlotEnd(uv_async_t* handle) gint.UVRequestEnd.reset(); } -void cmWorkerPoolInternal::Work(WorkerT* worker) +void cmWorkerPoolInternal::Work(unsigned int workerIndex) { + cmWorkerPool::JobHandleT jobHandle; std::unique_lock<std::mutex> uLock(Mutex); // Increment running workers count ++WorkersRunning; @@ -702,15 +688,15 @@ void cmWorkerPoolInternal::Work(WorkerT* worker) } // Pop next job from queue - worker->JobHandle() = std::move(Queue.front()); + jobHandle = std::move(Queue.front()); Queue.pop_front(); // Unlocked scope for job processing ++JobsProcessing; { uLock.unlock(); - worker->JobHandle()->Work(Pool, worker->Index()); // Process job - worker->JobHandle().reset(); // Destroy job + jobHandle->Work(Pool, workerIndex); // Process job + jobHandle.reset(); // Destroy job uLock.lock(); } --JobsProcessing; |