summaryrefslogtreecommitdiff
path: root/src/mongo/transport/service_executor_fixed.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/transport/service_executor_fixed.cpp')
-rw-r--r--src/mongo/transport/service_executor_fixed.cpp364
1 files changed, 294 insertions, 70 deletions
diff --git a/src/mongo/transport/service_executor_fixed.cpp b/src/mongo/transport/service_executor_fixed.cpp
index b9b5424872e..fb34101f213 100644
--- a/src/mongo/transport/service_executor_fixed.cpp
+++ b/src/mongo/transport/service_executor_fixed.cpp
@@ -35,8 +35,10 @@
#include "mongo/logv2/log.h"
#include "mongo/transport/service_executor_gen.h"
#include "mongo/transport/session.h"
+#include "mongo/transport/transport_layer.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/fail_point.h"
+#include "mongo/util/testing_proctor.h"
#include "mongo/util/thread_safety_context.h"
namespace mongo {
@@ -52,47 +54,166 @@ constexpr auto kExecutorLabel = "executor"_sd;
constexpr auto kExecutorName = "fixed"_sd;
const auto getServiceExecutorFixed =
- ServiceContext::declareDecoration<std::unique_ptr<ServiceExecutorFixed>>();
+ ServiceContext::declareDecoration<std::shared_ptr<ServiceExecutorFixed>>();
const auto serviceExecutorFixedRegisterer = ServiceContext::ConstructorActionRegisterer{
"ServiceExecutorFixed", [](ServiceContext* ctx) {
+ auto limits = ThreadPool::Limits{};
+ limits.minThreads = 0;
+ limits.maxThreads = fixedServiceExecutorThreadLimit;
getServiceExecutorFixed(ctx) =
- std::make_unique<ServiceExecutorFixed>(ThreadPool::Options{});
+ std::make_shared<ServiceExecutorFixed>(ctx, std::move(limits));
}};
} // namespace
-ServiceExecutorFixed::ServiceExecutorFixed(ThreadPool::Options options)
- : _options(std::move(options)) {
- _options.onCreateThread =
- [this, onCreate = std::move(_options.onCreateThread)](const std::string& name) mutable {
- _executorContext = std::make_unique<ExecutorThreadContext>(this->weak_from_this());
- if (onCreate) {
- onCreate(name);
- }
- };
- _threadPool = std::make_unique<ThreadPool>(_options);
+class ServiceExecutorFixed::ExecutorThreadContext {
+public:
+ ExecutorThreadContext(ServiceExecutorFixed* serviceExecutor);
+ ~ExecutorThreadContext();
+
+ ExecutorThreadContext(ExecutorThreadContext&&) = delete;
+ ExecutorThreadContext(const ExecutorThreadContext&) = delete;
+
+ template <typename Task>
+ void run(Task&& task) {
+ // Yield here to improve concurrency, especially when there are more executor threads
+ // than CPU cores.
+ stdx::this_thread::yield();
+ _executor->_stats.tasksStarted.fetchAndAdd(1);
+ _recursionDepth++;
+
+ ON_BLOCK_EXIT([&] {
+ _recursionDepth--;
+ _executor->_stats.tasksEnded.fetchAndAdd(1);
+
+ auto lk = stdx::lock_guard(_executor->_mutex);
+ _executor->_checkForShutdown(lk);
+ });
+
+ std::forward<Task>(task)();
+ }
+
+ int getRecursionDepth() const {
+ return _recursionDepth;
+ }
+
+private:
+ ServiceExecutorFixed* const _executor;
+ int _recursionDepth = 0;
+};
+
+ServiceExecutorFixed::ExecutorThreadContext::ExecutorThreadContext(
+ ServiceExecutorFixed* serviceExecutor)
+ : _executor(serviceExecutor) {
+ _executor->_stats.threadsStarted.fetchAndAdd(1);
+ hangAfterServiceExecutorFixedExecutorThreadsStart.pauseWhileSet();
+}
+
+ServiceExecutorFixed::ExecutorThreadContext::~ExecutorThreadContext() {
+ auto ended = _executor->_stats.threadsEnded.addAndFetch(1);
+ auto started = _executor->_stats.threadsStarted.loadRelaxed();
+ if (ended == started) {
+ hangBeforeServiceExecutorFixedLastExecutorThreadReturns.pauseWhileSet();
+ }
+}
+
+thread_local std::unique_ptr<ServiceExecutorFixed::ExecutorThreadContext>
+ ServiceExecutorFixed::_executorContext;
+
+ServiceExecutorFixed::ServiceExecutorFixed(ServiceContext* ctx, ThreadPool::Limits limits)
+ : _svcCtx{ctx}, _options(std::move(limits)) {
+ _options.poolName = "ServiceExecutorFixed";
+ _options.onCreateThread = [this](const auto&) {
+ _executorContext = std::make_unique<ExecutorThreadContext>(this);
+ };
+
+ _threadPool = std::make_shared<ThreadPool>(_options);
}
ServiceExecutorFixed::~ServiceExecutorFixed() {
- invariant(!_canScheduleWork.load());
- if (_state == State::kNotStarted)
- return;
+ switch (_state) {
+ case State::kNotStarted:
+ return;
+ case State::kRunning: {
+ // We should not be running while in this destructor.
+ MONGO_UNREACHABLE;
+ }
+ case State::kStopping:
+ case State::kStopped: {
+ // We can go ahead and attempt to join our thread pool.
+ } break;
+ default: { MONGO_UNREACHABLE; }
+ }
+
+ LOGV2_DEBUG(4910502,
+ kDiagnosticLogLevel,
+ "Shutting down pool for fixed thread-pool service executor",
+ "name"_attr = _options.poolName);
- // Ensures we always call "shutdown" after staring the service executor
- invariant(_state == State::kStopped);
+ // We only can desturct when we have joined all of our tasks and canceled all of our sessions.
+ // This thread pool doesn't get to refuse work over its lifetime. It's possible that tasks are
+ // stiil blocking. If so, we block until they finish here.
_threadPool->shutdown();
_threadPool->join();
- invariant(_numRunningExecutorThreads.load() == 0);
+
+ invariant(_threadsRunning() == 0);
+ invariant(_tasksRunning() == 0);
+ invariant(_waiters.empty());
}
Status ServiceExecutorFixed::start() {
- stdx::lock_guard<Latch> lk(_mutex);
- auto oldState = std::exchange(_state, State::kRunning);
- invariant(oldState == State::kNotStarted);
+ {
+ stdx::lock_guard<Latch> lk(_mutex);
+ switch (_state) {
+ case State::kNotStarted: {
+ // Time to start
+ _state = State::kRunning;
+ } break;
+ case State::kRunning: {
+ return Status::OK();
+ }
+ case State::kStopping:
+ case State::kStopped: {
+ return {ErrorCodes::ServiceExecutorInShutdown,
+ "ServiceExecutorFixed is already stopping or stopped"};
+ }
+ default: { MONGO_UNREACHABLE; }
+ };
+ }
+
+ LOGV2_DEBUG(4910501,
+ kDiagnosticLogLevel,
+ "Starting fixed thread-pool service executor",
+ "name"_attr = _options.poolName);
+
_threadPool->startup();
- _canScheduleWork.store(true);
- LOGV2_DEBUG(
- 4910501, 3, "Started fixed thread-pool service executor", "name"_attr = _options.poolName);
+
+ if (!_svcCtx) {
+ // For some tests, we do not have a ServiceContext.
+ invariant(TestingProctor::instance().isEnabled());
+ return Status::OK();
+ }
+
+ auto tl = _svcCtx->getTransportLayer();
+ invariant(tl);
+
+ auto reactor = tl->getReactor(TransportLayer::WhichReactor::kIngress);
+ invariant(reactor);
+ _threadPool->schedule([this, reactor](Status) {
+ {
+ // Check to make sure we haven't been shutdown already. Note that there is still a brief
+ // race that immediately follows this check. ASIOReactor::stop() is not permanent, thus
+ // our run() could "restart" the reactor.
+ stdx::lock_guard<Latch> lk(_mutex);
+ if (_state != kRunning) {
+ return;
+ }
+ }
+
+ // Start running on the reactor immediately.
+ reactor->run();
+ });
+
return Status::OK();
}
@@ -103,37 +224,115 @@ ServiceExecutorFixed* ServiceExecutorFixed::get(ServiceContext* ctx) {
}
Status ServiceExecutorFixed::shutdown(Milliseconds timeout) {
- auto waitForShutdown = [&]() mutable -> Status {
- stdx::unique_lock<Latch> lk(_mutex);
- bool success = _shutdownCondition.wait_for(lk, timeout.toSystemDuration(), [this] {
- return _numRunningExecutorThreads.load() == 0;
- });
- return success ? Status::OK()
- : Status(ErrorCodes::ExceededTimeLimit,
- "Failed to shutdown all executor threads within the time limit");
- };
-
- LOGV2_DEBUG(4910502,
- 3,
+ LOGV2_DEBUG(4910503,
+ kDiagnosticLogLevel,
"Shutting down fixed thread-pool service executor",
"name"_attr = _options.poolName);
{
- stdx::lock_guard<Latch> lk(_mutex);
- _canScheduleWork.store(false);
+ auto lk = stdx::unique_lock(_mutex);
+
+ switch (_state) {
+ case State::kNotStarted:
+ case State::kRunning: {
+ _state = State::kStopping;
+
+ for (auto& waiter : _waiters) {
+ // Cancel any session we own.
+ waiter.session->cancelAsyncOperations();
+ }
+
+ // There may not be outstanding threads, check for shutdown now.
+ _checkForShutdown(lk);
- auto oldState = std::exchange(_state, State::kStopped);
- if (oldState != State::kStopped) {
- _threadPool->shutdown();
+ if (_state == State::kStopped) {
+ // We were able to become stopped immediately.
+ return Status::OK();
+ }
+ } break;
+ case State::kStopping: {
+ // Just nead to wait it out.
+ } break;
+ case State::kStopped: {
+ // Totally done.
+ return Status::OK();
+ } break;
+ default: { MONGO_UNREACHABLE; }
}
}
- return waitForShutdown();
+ LOGV2_DEBUG(4910504,
+ kDiagnosticLogLevel,
+ "Waiting for shutdown of fixed thread-pool service executor",
+ "name"_attr = _options.poolName);
+
+ // There is a world where we are able to simply do a timed wait upon a future chain. However,
+ // that world likely requires an OperationContext available through shutdown.
+ auto lk = stdx::unique_lock(_mutex);
+ if (!_shutdownCondition.wait_for(
+ lk, timeout.toSystemDuration(), [this] { return _state == State::kStopped; })) {
+ return Status(ErrorCodes::ExceededTimeLimit,
+ "Failed to shutdown all executor threads within the time limit");
+ }
+
+ return Status::OK();
+}
+
+void ServiceExecutorFixed::_checkForShutdown(WithLock) {
+ if (_state == State::kRunning) {
+ // We're actively running.
+ return;
+ }
+ invariant(_state != State::kNotStarted);
+
+ if (!_waiters.empty()) {
+ // We still have some in wait.
+ return;
+ }
+
+ auto tasksLeft = _tasksLeft();
+ if (tasksLeft > 0) {
+ // We have tasks remaining.
+ return;
+ }
+ invariant(tasksLeft == 0);
+
+ // We have achieved a soft form of shutdown:
+ // - _state != kRunning means that there will be no new external tasks or waiters.
+ // - _waiters.empty() means that all network waits have finished and there will be no new
+ // internal tasks.
+ // - _tasksLeft() == 0 means that all tasks, both internal and external have finished.
+ //
+ // From this point on, all of our threads will be idle. When the dtor runs, the thread pool will
+ // experience a trivial shutdown() and join().
+ _state = State::kStopped;
+
+ LOGV2_DEBUG(
+ 4910505, kDiagnosticLogLevel, "Finishing shutdown", "name"_attr = _options.poolName);
+ _shutdownCondition.notify_one();
+
+ if (!_svcCtx) {
+ // For some tests, we do not have a ServiceContext.
+ invariant(TestingProctor::instance().isEnabled());
+ return;
+ }
+
+ auto tl = _svcCtx->getTransportLayer();
+ invariant(tl);
+
+ auto reactor = tl->getReactor(TransportLayer::WhichReactor::kIngress);
+ invariant(reactor);
+ reactor->stop();
}
Status ServiceExecutorFixed::scheduleTask(Task task, ScheduleFlags flags) {
- if (!_canScheduleWork.load()) {
- return Status(ErrorCodes::ShutdownInProgress, "Executor is not running");
+ {
+ auto lk = stdx::unique_lock(_mutex);
+ if (_state != State::kRunning) {
+ return kInShutdown;
+ }
+
+ _stats.tasksScheduled.fetchAndAdd(1);
}
auto mayExecuteTaskInline = [&] {
@@ -157,31 +356,70 @@ Status ServiceExecutorFixed::scheduleTask(Task task, ScheduleFlags flags) {
hangBeforeSchedulingServiceExecutorFixedTask.pauseWhileSet();
- // May throw if an attempt is made to schedule after the thread pool is shutdown.
- try {
- _threadPool->schedule([task = std::move(task)](Status status) mutable {
- internalAssert(status);
- invariant(_executorContext);
- _executorContext->run(std::move(task));
- });
- } catch (DBException& e) {
- return e.toStatus();
- }
+ _threadPool->schedule([this, task = std::move(task)](Status status) mutable {
+ invariant(status);
+
+ _executorContext->run([&] { task(); });
+ });
return Status::OK();
}
+void ServiceExecutorFixed::_schedule(OutOfLineExecutor::Task task) noexcept {
+ {
+ auto lk = stdx::unique_lock(_mutex);
+ if (_state != State::kRunning) {
+ lk.unlock();
+
+ task(kInShutdown);
+ return;
+ }
+
+ _stats.tasksScheduled.fetchAndAdd(1);
+ }
+
+ _threadPool->schedule([this, task = std::move(task)](Status status) mutable {
+ _executorContext->run([&] { task(std::move(status)); });
+ });
+}
+
void ServiceExecutorFixed::runOnDataAvailable(const SessionHandle& session,
OutOfLineExecutor::Task onCompletionCallback) {
invariant(session);
+
+ auto waiter = Waiter{session, std::move(onCompletionCallback)};
+
+ WaiterList::iterator it;
+ {
+ // Make sure we're still allowed to schedule and track the session
+ auto lk = stdx::unique_lock(_mutex);
+ if (_state != State::kRunning) {
+ lk.unlock();
+ waiter.onCompletionCallback(kInShutdown);
+ return;
+ }
+
+ it = _waiters.emplace(_waiters.end(), std::move(waiter));
+ }
+
session->asyncWaitForData()
.thenRunOn(shared_from_this())
- .getAsync(std::move(onCompletionCallback));
+ .getAsync([this, anchor = shared_from_this(), it](Status status) mutable {
+ Waiter waiter;
+ {
+ // Remove our waiter from the list.
+ auto lk = stdx::unique_lock(_mutex);
+ waiter = std::exchange(*it, {});
+ _waiters.erase(it);
+ }
+
+ waiter.onCompletionCallback(std::move(status));
+ });
}
void ServiceExecutorFixed::appendStats(BSONObjBuilder* bob) const {
*bob << kExecutorLabel << kExecutorName << kThreadsRunning
- << static_cast<int>(_numRunningExecutorThreads.load());
+ << static_cast<int>(_threadsRunning());
}
int ServiceExecutorFixed::getRecursionDepthForExecutorThread() const {
@@ -189,19 +427,5 @@ int ServiceExecutorFixed::getRecursionDepthForExecutorThread() const {
return _executorContext->getRecursionDepth();
}
-ServiceExecutorFixed::ExecutorThreadContext::ExecutorThreadContext(
- std::weak_ptr<ServiceExecutorFixed> serviceExecutor)
- : _executor(std::move(serviceExecutor)) {
- _adjustRunningExecutorThreads(1);
- hangAfterServiceExecutorFixedExecutorThreadsStart.pauseWhileSet();
-}
-
-ServiceExecutorFixed::ExecutorThreadContext::~ExecutorThreadContext() {
- if (auto threadsRunning = _adjustRunningExecutorThreads(-1);
- threadsRunning.has_value() && threadsRunning.value() == 0) {
- hangBeforeServiceExecutorFixedLastExecutorThreadReturns.pauseWhileSet();
- }
-}
-
} // namespace transport
} // namespace mongo