summaryrefslogtreecommitdiff
path: root/Source/cmWorkerPool.cxx
diff options
context:
space:
mode:
Diffstat (limited to 'Source/cmWorkerPool.cxx')
-rw-r--r--Source/cmWorkerPool.cxx226
1 files changed, 106 insertions, 120 deletions
diff --git a/Source/cmWorkerPool.cxx b/Source/cmWorkerPool.cxx
index 75ca47a035..cbf070e91c 100644
--- a/Source/cmWorkerPool.cxx
+++ b/Source/cmWorkerPool.cxx
@@ -371,138 +371,62 @@ void cmUVReadOnlyProcess::UVTryFinish()
}
/**
- * @brief Private worker pool internals
+ * @brief Worker pool worker thread
*/
-class cmWorkerPoolInternal
+class cmWorkerPoolWorker
{
public:
- // -- Types
-
- /**
- * @brief Worker thread
- */
- class WorkerT
- {
- public:
- WorkerT(unsigned int index);
- ~WorkerT();
-
- WorkerT(WorkerT const&) = delete;
- WorkerT& operator=(WorkerT const&) = delete;
-
- /**
- * Start the thread
- */
- void Start(cmWorkerPoolInternal* internal);
-
- /**
- * @brief Run an external process
- */
- bool RunProcess(cmWorkerPool::ProcessResultT& result,
- std::vector<std::string> const& command,
- std::string const& workingDirectory);
-
- // -- Accessors
- unsigned int Index() const { return Index_; }
- cmWorkerPool::JobHandleT& JobHandle() { return JobHandle_; }
-
- private:
- // -- Libuv callbacks
- static void UVProcessStart(uv_async_t* handle);
- void UVProcessFinished();
-
- private:
- //! @brief Job handle
- cmWorkerPool::JobHandleT JobHandle_;
- //! @brief Worker index
- unsigned int Index_;
- // -- Process management
- struct
- {
- std::mutex Mutex;
- cm::uv_async_ptr Request;
- std::condition_variable Condition;
- std::unique_ptr<cmUVReadOnlyProcess> ROP;
- } Proc_;
- // -- System thread
- std::thread Thread_;
- };
-
-public:
- // -- Constructors
- cmWorkerPoolInternal(cmWorkerPool* pool);
- ~cmWorkerPoolInternal();
+ cmWorkerPoolWorker(uv_loop_t& uvLoop);
+ ~cmWorkerPoolWorker();
- /**
- * @brief Runs the libuv loop
- */
- bool Process();
-
- /**
- * @brief Clear queue and abort threads
- */
- void Abort();
+ cmWorkerPoolWorker(cmWorkerPoolWorker const&) = delete;
+ cmWorkerPoolWorker& operator=(cmWorkerPoolWorker const&) = delete;
/**
- * @brief Push a job to the queue and notify a worker
+ * Set the internal thread
*/
- bool PushJob(cmWorkerPool::JobHandleT&& jobHandle);
+ void SetThread(std::thread&& aThread) { Thread_ = std::move(aThread); }
/**
- * @brief Worker thread main loop method
+ * Run an external process
*/
- void Work(WorkerT* worker);
+ bool RunProcess(cmWorkerPool::ProcessResultT& result,
+ std::vector<std::string> const& command,
+ std::string const& workingDirectory);
- // -- Request slots
- static void UVSlotBegin(uv_async_t* handle);
- static void UVSlotEnd(uv_async_t* handle);
-
-public:
- // -- UV loop
-#ifdef CMAKE_UV_SIGNAL_HACK
- std::unique_ptr<cmUVSignalHackRAII> UVHackRAII;
-#endif
- std::unique_ptr<uv_loop_t> UVLoop;
- cm::uv_async_ptr UVRequestBegin;
- cm::uv_async_ptr UVRequestEnd;
-
- // -- Thread pool and job queue
- std::mutex Mutex;
- bool Processing = false;
- bool Aborting = false;
- bool FenceProcessing = false;
- unsigned int WorkersRunning = 0;
- unsigned int WorkersIdle = 0;
- unsigned int JobsProcessing = 0;
- std::deque<cmWorkerPool::JobHandleT> Queue;
- std::condition_variable Condition;
- std::vector<std::unique_ptr<WorkerT>> Workers;
+private:
+ // -- Libuv callbacks
+ static void UVProcessStart(uv_async_t* handle);
+ void UVProcessFinished();
- // -- References
- cmWorkerPool* Pool = nullptr;
+private:
+ // -- Process management
+ struct
+ {
+ std::mutex Mutex;
+ cm::uv_async_ptr Request;
+ std::condition_variable Condition;
+ std::unique_ptr<cmUVReadOnlyProcess> ROP;
+ } Proc_;
+ // -- System thread
+ std::thread Thread_;
};
-cmWorkerPoolInternal::WorkerT::WorkerT(unsigned int index)
- : Index_(index)
+cmWorkerPoolWorker::cmWorkerPoolWorker(uv_loop_t& uvLoop)
{
+ Proc_.Request.init(uvLoop, &cmWorkerPoolWorker::UVProcessStart, this);
}
-cmWorkerPoolInternal::WorkerT::~WorkerT()
+cmWorkerPoolWorker::~cmWorkerPoolWorker()
{
if (Thread_.joinable()) {
Thread_.join();
}
}
-void cmWorkerPoolInternal::WorkerT::Start(cmWorkerPoolInternal* internal)
-{
- Proc_.Request.init(*(internal->UVLoop), &WorkerT::UVProcessStart, this);
- Thread_ = std::thread(&cmWorkerPoolInternal::Work, internal, this);
-}
-
-bool cmWorkerPoolInternal::WorkerT::RunProcess(
- cmWorkerPool::ProcessResultT& result,
- std::vector<std::string> const& command, std::string const& workingDirectory)
+bool cmWorkerPoolWorker::RunProcess(cmWorkerPool::ProcessResultT& result,
+ std::vector<std::string> const& command,
+ std::string const& workingDirectory)
{
if (command.empty()) {
return false;
@@ -525,9 +449,9 @@ bool cmWorkerPoolInternal::WorkerT::RunProcess(
return !result.error();
}
-void cmWorkerPoolInternal::WorkerT::UVProcessStart(uv_async_t* handle)
+void cmWorkerPoolWorker::UVProcessStart(uv_async_t* handle)
{
- auto* wrk = reinterpret_cast<WorkerT*>(handle->data);
+ auto* wrk = reinterpret_cast<cmWorkerPoolWorker*>(handle->data);
bool startFailed = false;
{
auto& Proc = wrk->Proc_;
@@ -543,7 +467,7 @@ void cmWorkerPoolInternal::WorkerT::UVProcessStart(uv_async_t* handle)
}
}
-void cmWorkerPoolInternal::WorkerT::UVProcessFinished()
+void cmWorkerPoolWorker::UVProcessFinished()
{
{
std::lock_guard<std::mutex> lock(Proc_.Mutex);
@@ -555,6 +479,65 @@ void cmWorkerPoolInternal::WorkerT::UVProcessFinished()
Proc_.Condition.notify_one();
}
+/**
+ * @brief Private worker pool internals
+ */
+class cmWorkerPoolInternal
+{
+public:
+ // -- Constructors
+ cmWorkerPoolInternal(cmWorkerPool* pool);
+ ~cmWorkerPoolInternal();
+
+ /**
+ * Runs the libuv loop.
+ */
+ bool Process();
+
+ /**
+ * Clear queue and abort threads.
+ */
+ void Abort();
+
+ /**
+ * Push a job to the queue and notify a worker.
+ */
+ bool PushJob(cmWorkerPool::JobHandleT&& jobHandle);
+
+ /**
+ * Worker thread main loop method.
+ */
+ void Work(unsigned int workerIndex);
+
+ // -- Request slots
+ static void UVSlotBegin(uv_async_t* handle);
+ static void UVSlotEnd(uv_async_t* handle);
+
+public:
+ // -- UV loop
+#ifdef CMAKE_UV_SIGNAL_HACK
+ std::unique_ptr<cmUVSignalHackRAII> UVHackRAII;
+#endif
+ std::unique_ptr<uv_loop_t> UVLoop;
+ cm::uv_async_ptr UVRequestBegin;
+ cm::uv_async_ptr UVRequestEnd;
+
+ // -- Thread pool and job queue
+ std::mutex Mutex;
+ bool Processing = false;
+ bool Aborting = false;
+ bool FenceProcessing = false;
+ unsigned int WorkersRunning = 0;
+ unsigned int WorkersIdle = 0;
+ unsigned int JobsProcessing = 0;
+ std::deque<cmWorkerPool::JobHandleT> Queue;
+ std::condition_variable Condition;
+ std::vector<std::unique_ptr<cmWorkerPoolWorker>> Workers;
+
+ // -- References
+ cmWorkerPool* Pool = nullptr;
+};
+
void cmWorkerPool::ProcessResultT::reset()
{
ExitStatus = 0;
@@ -652,11 +635,13 @@ void cmWorkerPoolInternal::UVSlotBegin(uv_async_t* handle)
// Create workers
gint.Workers.reserve(num);
for (unsigned int ii = 0; ii != num; ++ii) {
- gint.Workers.emplace_back(cm::make_unique<WorkerT>(ii));
+ gint.Workers.emplace_back(
+ cm::make_unique<cmWorkerPoolWorker>(*gint.UVLoop));
}
- // Start workers
- for (auto& wrk : gint.Workers) {
- wrk->Start(&gint);
+ // Start worker threads
+ for (unsigned int ii = 0; ii != num; ++ii) {
+ gint.Workers[ii]->SetThread(
+ std::thread(&cmWorkerPoolInternal::Work, &gint, ii));
}
}
// Destroy begin request
@@ -672,8 +657,9 @@ void cmWorkerPoolInternal::UVSlotEnd(uv_async_t* handle)
gint.UVRequestEnd.reset();
}
-void cmWorkerPoolInternal::Work(WorkerT* worker)
+void cmWorkerPoolInternal::Work(unsigned int workerIndex)
{
+ cmWorkerPool::JobHandleT jobHandle;
std::unique_lock<std::mutex> uLock(Mutex);
// Increment running workers count
++WorkersRunning;
@@ -702,15 +688,15 @@ void cmWorkerPoolInternal::Work(WorkerT* worker)
}
// Pop next job from queue
- worker->JobHandle() = std::move(Queue.front());
+ jobHandle = std::move(Queue.front());
Queue.pop_front();
// Unlocked scope for job processing
++JobsProcessing;
{
uLock.unlock();
- worker->JobHandle()->Work(Pool, worker->Index()); // Process job
- worker->JobHandle().reset(); // Destroy job
+ jobHandle->Work(Pool, workerIndex); // Process job
+ jobHandle.reset(); // Destroy job
uLock.lock();
}
--JobsProcessing;