summaryrefslogtreecommitdiff
path: root/src/mongo/util/concurrency/thread_pool.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/util/concurrency/thread_pool.h')
-rw-r--r--src/mongo/util/concurrency/thread_pool.h181
1 files changed, 33 insertions, 148 deletions
diff --git a/src/mongo/util/concurrency/thread_pool.h b/src/mongo/util/concurrency/thread_pool.h
index a6e56f8c9bf..29acd9e09c0 100644
--- a/src/mongo/util/concurrency/thread_pool.h
+++ b/src/mongo/util/concurrency/thread_pool.h
@@ -29,47 +29,52 @@
#pragma once
-#include <deque>
#include <functional>
+#include <memory>
#include <string>
-#include <vector>
-#include "mongo/platform/mutex.h"
-#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/thread.h"
#include "mongo/util/concurrency/thread_pool_interface.h"
-#include "mongo/util/hierarchical_acquisition.h"
+#include "mongo/util/duration.h"
#include "mongo/util/time_support.h"
namespace mongo {
-class Status;
-
/**
* A configurable thread pool, for general use.
*
* See the Options struct for information about how to configure an instance.
*/
class ThreadPool final : public ThreadPoolInterface {
- ThreadPool(const ThreadPool&) = delete;
- ThreadPool& operator=(const ThreadPool&) = delete;
-
public:
- struct Limits;
+ /**
+ * Contains a subset of the fields from Options related to limiting the number of concurrent
+ * threads in the pool. Used in places where we want a way to specify limits to the size of a
+ * ThreadPool without overriding the other behaviors of the pool such thread names or onCreate
+ * behaviors. Each field of Limits maps directly to the same-named field in Options.
+ */
+ struct Limits {
+ size_t minThreads = 1;
+ size_t maxThreads = 8;
+ Milliseconds maxIdleThreadAge = Seconds{30};
+ };
/**
* Structure used to configure an instance of ThreadPool.
*/
struct Options {
-
- Options() = default;
- explicit Options(const Limits& limits);
-
// Set maxThreads to this if you don't want to limit the number of threads in the pool.
// Note: the value used here is high enough that it will never be reached, but low enough
// that it won't cause overflows if mixed with signed ints or math.
static constexpr size_t kUnlimited = 1'000'000'000;
+ Options() = default;
+
+ explicit Options(const Limits& limits)
+ : minThreads(limits.minThreads),
+ maxThreads(limits.maxThreads),
+ maxIdleThreadAge(limits.maxIdleThreadAge) {}
+
// Name of the thread pool. If this string is empty, the pool will be assigned a
// name unique to the current process.
std::string poolName;
@@ -95,29 +100,15 @@ public:
// a thread.
Milliseconds maxIdleThreadAge = Seconds{30};
- // This function is run before each worker thread begins consuming tasks.
- using OnCreateThreadFn = std::function<void(const std::string& threadName)>;
- OnCreateThreadFn onCreateThread = [](const std::string&) {};
+ /** If callable, called before each worker thread begins consuming tasks. */
+ std::function<void(const std::string&)> onCreateThread;
/**
- * This function is called after joining each retired thread.
+ * If callable, called after joining each retired thread.
* Since there could be multiple calls to this function in a single critical section,
* avoid complex logic in the callback.
*/
- using OnJoinRetiredThreadFn = std::function<void(const stdx::thread&)>;
- OnJoinRetiredThreadFn onJoinRetiredThread = [](const stdx::thread&) {};
- };
-
- /**
- * Contains a subset of the fields from Options related to limiting the number of concurrent
- * threads in the pool. Used in places where we want a way to specify limits to the size of a
- * ThreadPool without overriding the other behaviors of the pool such thread names or onCreate
- * behaviors. Each field of Limits maps directly to the same-named field in Options.
- */
- struct Limits {
- size_t minThreads = 1;
- size_t maxThreads = 8;
- Milliseconds maxIdleThreadAge = Seconds{30};
+ std::function<void(const stdx::thread&)> onJoinRetiredThread;
};
/**
@@ -145,12 +136,18 @@ public:
*/
explicit ThreadPool(Options options);
+ ThreadPool(const ThreadPool&) = delete;
+ ThreadPool& operator=(const ThreadPool&) = delete;
+
~ThreadPool() override;
+ // from OutOfLineExecutor (base of ThreadPoolInterface)
+ void schedule(Task task) override;
+
+ // from ThreadPoolInterface
void startup() override;
void shutdown() override;
void join() override;
- void schedule(Task task) override;
/**
* Blocks the caller until there are no pending tasks on this pool.
@@ -170,120 +167,8 @@ public:
Stats getStats() const;
private:
- using TaskList = std::deque<Task>;
- using ThreadList = std::vector<stdx::thread>;
- using RetiredThreadList = std::list<stdx::thread>;
-
- /**
- * Representation of the stage of life of a thread pool.
- *
- * A pool starts out in the preStart state, and ends life in the shutdownComplete state. Work
- * may only be scheduled in the preStart and running states. Threads may only be started in the
- * running state. In shutdownComplete, there are no remaining threads or pending tasks to
- * execute.
- *
- * Diagram of legal transitions:
- *
- * preStart -> running -> joinRequired -> joining -> shutdownComplete
- * \ ^
- * \_____________/
- */
- enum LifecycleState { preStart, running, joinRequired, joining, shutdownComplete };
-
- /**
- * This is the thread body for worker threads. It is a static member function,
- * because late in its execution it is possible for the pool to have been destroyed.
- * As such, it is advisable to pass the pool pointer as an explicit argument, rather
- * than as the implicit "this" argument.
- */
- static void _workerThreadBody(ThreadPool* pool, const std::string& threadName) noexcept;
-
- /**
- * Starts a worker thread, unless _options.maxThreads threads are already running or
- * _state is not running.
- */
- void _startWorkerThread_inlock();
-
- /**
- * This is the run loop of a worker thread, invoked by _workerThreadBody.
- */
- void _consumeTasks();
-
- /**
- * Implementation of shutdown once _mutex is locked.
- */
- void _shutdown_inlock();
-
- /**
- * Implementation of join once _mutex is owned by "lk".
- */
- void _join_inlock(stdx::unique_lock<Latch>* lk);
-
- /**
- * Runs the remaining tasks on a new thread as part of the join process, blocking until
- * complete. Caller must not hold the mutex!
- */
- void _drainPendingTasks();
-
- /**
- * Executes one task from _pendingTasks. "lk" must own _mutex, and _pendingTasks must have at
- * least one entry.
- */
- void _doOneTask(stdx::unique_lock<Latch>* lk) noexcept;
-
- /**
- * Changes the lifecycle state (_state) of the pool and wakes up any threads waiting for a state
- * change. Has no effect if _state == newState.
- */
- void _setState_inlock(LifecycleState newState);
-
- /**
- * Waits for all remaining retired threads to join.
- * If a thread's _workerThreadBody() were ever to attempt to reacquire
- * ThreadPool::_mutex after that thread had been added to _retiredThreads,
- * it could cause a deadlock.
- */
- void _joinRetired_inlock();
-
- // These are the options with which the pool was configured at construction time.
- const Options _options;
-
- // Mutex guarding all non-const member variables.
- mutable Mutex _mutex = MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "ThreadPool::_mutex");
-
- // This variable represents the lifecycle state of the pool.
- //
- // Work may only be scheduled in states preStart and running, and only executes in states
- // running and shuttingDown.
- LifecycleState _state = preStart;
-
- // Condition signaled to indicate that there is work in the _pendingTasks queue, or
- // that the system is shutting down.
- stdx::condition_variable _workAvailable;
-
- // Condition signaled to indicate that there is no work in the _pendingTasks queue.
- stdx::condition_variable _poolIsIdle;
-
- // Condition variable signaled whenever _state changes.
- stdx::condition_variable _stateChange;
-
- // Queue of yet-to-be-executed tasks.
- TaskList _pendingTasks;
-
- // List of threads serving as the worker pool.
- ThreadList _threads;
-
- // List of threads that are retired and pending join
- RetiredThreadList _retiredThreads;
-
- // Count of idle threads.
- size_t _numIdleThreads = 0;
-
- // Id counter for assigning thread names
- size_t _nextThreadId = 0;
-
- // The last time that _pendingTasks.size() grew to be at least _threads.size().
- Date_t _lastFullUtilizationDate;
+ class Impl;
+ std::unique_ptr<Impl> _impl;
};
} // namespace mongo