diff options
Diffstat (limited to 'src/mongo/transport/service_executor_fixed.cpp')
-rw-r--r-- | src/mongo/transport/service_executor_fixed.cpp | 364 |
1 files changed, 294 insertions, 70 deletions
diff --git a/src/mongo/transport/service_executor_fixed.cpp b/src/mongo/transport/service_executor_fixed.cpp index b9b5424872e..fb34101f213 100644 --- a/src/mongo/transport/service_executor_fixed.cpp +++ b/src/mongo/transport/service_executor_fixed.cpp @@ -35,8 +35,10 @@ #include "mongo/logv2/log.h" #include "mongo/transport/service_executor_gen.h" #include "mongo/transport/session.h" +#include "mongo/transport/transport_layer.h" #include "mongo/util/assert_util.h" #include "mongo/util/fail_point.h" +#include "mongo/util/testing_proctor.h" #include "mongo/util/thread_safety_context.h" namespace mongo { @@ -52,47 +54,166 @@ constexpr auto kExecutorLabel = "executor"_sd; constexpr auto kExecutorName = "fixed"_sd; const auto getServiceExecutorFixed = - ServiceContext::declareDecoration<std::unique_ptr<ServiceExecutorFixed>>(); + ServiceContext::declareDecoration<std::shared_ptr<ServiceExecutorFixed>>(); const auto serviceExecutorFixedRegisterer = ServiceContext::ConstructorActionRegisterer{ "ServiceExecutorFixed", [](ServiceContext* ctx) { + auto limits = ThreadPool::Limits{}; + limits.minThreads = 0; + limits.maxThreads = fixedServiceExecutorThreadLimit; getServiceExecutorFixed(ctx) = - std::make_unique<ServiceExecutorFixed>(ThreadPool::Options{}); + std::make_shared<ServiceExecutorFixed>(ctx, std::move(limits)); }}; } // namespace -ServiceExecutorFixed::ServiceExecutorFixed(ThreadPool::Options options) - : _options(std::move(options)) { - _options.onCreateThread = - [this, onCreate = std::move(_options.onCreateThread)](const std::string& name) mutable { - _executorContext = std::make_unique<ExecutorThreadContext>(this->weak_from_this()); - if (onCreate) { - onCreate(name); - } - }; - _threadPool = std::make_unique<ThreadPool>(_options); +class ServiceExecutorFixed::ExecutorThreadContext { +public: + ExecutorThreadContext(ServiceExecutorFixed* serviceExecutor); + ~ExecutorThreadContext(); + + ExecutorThreadContext(ExecutorThreadContext&&) = delete; + ExecutorThreadContext(const ExecutorThreadContext&) = delete; + + template <typename Task> + void run(Task&& task) { + // Yield here to improve concurrency, especially when there are more executor threads + // than CPU cores. + stdx::this_thread::yield(); + _executor->_stats.tasksStarted.fetchAndAdd(1); + _recursionDepth++; + + ON_BLOCK_EXIT([&] { + _recursionDepth--; + _executor->_stats.tasksEnded.fetchAndAdd(1); + + auto lk = stdx::lock_guard(_executor->_mutex); + _executor->_checkForShutdown(lk); + }); + + std::forward<Task>(task)(); + } + + int getRecursionDepth() const { + return _recursionDepth; + } + +private: + ServiceExecutorFixed* const _executor; + int _recursionDepth = 0; +}; + +ServiceExecutorFixed::ExecutorThreadContext::ExecutorThreadContext( + ServiceExecutorFixed* serviceExecutor) + : _executor(serviceExecutor) { + _executor->_stats.threadsStarted.fetchAndAdd(1); + hangAfterServiceExecutorFixedExecutorThreadsStart.pauseWhileSet(); +} + +ServiceExecutorFixed::ExecutorThreadContext::~ExecutorThreadContext() { + auto ended = _executor->_stats.threadsEnded.addAndFetch(1); + auto started = _executor->_stats.threadsStarted.loadRelaxed(); + if (ended == started) { + hangBeforeServiceExecutorFixedLastExecutorThreadReturns.pauseWhileSet(); + } +} + +thread_local std::unique_ptr<ServiceExecutorFixed::ExecutorThreadContext> + ServiceExecutorFixed::_executorContext; + +ServiceExecutorFixed::ServiceExecutorFixed(ServiceContext* ctx, ThreadPool::Limits limits) + : _svcCtx{ctx}, _options(std::move(limits)) { + _options.poolName = "ServiceExecutorFixed"; + _options.onCreateThread = [this](const auto&) { + _executorContext = std::make_unique<ExecutorThreadContext>(this); + }; + + _threadPool = std::make_shared<ThreadPool>(_options); } ServiceExecutorFixed::~ServiceExecutorFixed() { - invariant(!_canScheduleWork.load()); - if (_state == State::kNotStarted) - return; + switch (_state) { + case State::kNotStarted: + return; + case State::kRunning: { + // We should not be running while in this destructor. + MONGO_UNREACHABLE; + } + case State::kStopping: + case State::kStopped: { + // We can go ahead and attempt to join our thread pool. + } break; + default: { MONGO_UNREACHABLE; } + } + + LOGV2_DEBUG(4910502, + kDiagnosticLogLevel, + "Shutting down pool for fixed thread-pool service executor", + "name"_attr = _options.poolName); - // Ensures we always call "shutdown" after staring the service executor - invariant(_state == State::kStopped); + // We only can desturct when we have joined all of our tasks and canceled all of our sessions. + // This thread pool doesn't get to refuse work over its lifetime. It's possible that tasks are + // stiil blocking. If so, we block until they finish here. _threadPool->shutdown(); _threadPool->join(); - invariant(_numRunningExecutorThreads.load() == 0); + + invariant(_threadsRunning() == 0); + invariant(_tasksRunning() == 0); + invariant(_waiters.empty()); } Status ServiceExecutorFixed::start() { - stdx::lock_guard<Latch> lk(_mutex); - auto oldState = std::exchange(_state, State::kRunning); - invariant(oldState == State::kNotStarted); + { + stdx::lock_guard<Latch> lk(_mutex); + switch (_state) { + case State::kNotStarted: { + // Time to start + _state = State::kRunning; + } break; + case State::kRunning: { + return Status::OK(); + } + case State::kStopping: + case State::kStopped: { + return {ErrorCodes::ServiceExecutorInShutdown, + "ServiceExecutorFixed is already stopping or stopped"}; + } + default: { MONGO_UNREACHABLE; } + }; + } + + LOGV2_DEBUG(4910501, + kDiagnosticLogLevel, + "Starting fixed thread-pool service executor", + "name"_attr = _options.poolName); + _threadPool->startup(); - _canScheduleWork.store(true); - LOGV2_DEBUG( - 4910501, 3, "Started fixed thread-pool service executor", "name"_attr = _options.poolName); + + if (!_svcCtx) { + // For some tests, we do not have a ServiceContext. + invariant(TestingProctor::instance().isEnabled()); + return Status::OK(); + } + + auto tl = _svcCtx->getTransportLayer(); + invariant(tl); + + auto reactor = tl->getReactor(TransportLayer::WhichReactor::kIngress); + invariant(reactor); + _threadPool->schedule([this, reactor](Status) { + { + // Check to make sure we haven't been shutdown already. Note that there is still a brief + // race that immediately follows this check. ASIOReactor::stop() is not permanent, thus + // our run() could "restart" the reactor. + stdx::lock_guard<Latch> lk(_mutex); + if (_state != kRunning) { + return; + } + } + + // Start running on the reactor immediately. + reactor->run(); + }); + return Status::OK(); } @@ -103,37 +224,115 @@ ServiceExecutorFixed* ServiceExecutorFixed::get(ServiceContext* ctx) { } Status ServiceExecutorFixed::shutdown(Milliseconds timeout) { - auto waitForShutdown = [&]() mutable -> Status { - stdx::unique_lock<Latch> lk(_mutex); - bool success = _shutdownCondition.wait_for(lk, timeout.toSystemDuration(), [this] { - return _numRunningExecutorThreads.load() == 0; - }); - return success ? Status::OK() - : Status(ErrorCodes::ExceededTimeLimit, - "Failed to shutdown all executor threads within the time limit"); - }; - - LOGV2_DEBUG(4910502, - 3, + LOGV2_DEBUG(4910503, + kDiagnosticLogLevel, "Shutting down fixed thread-pool service executor", "name"_attr = _options.poolName); { - stdx::lock_guard<Latch> lk(_mutex); - _canScheduleWork.store(false); + auto lk = stdx::unique_lock(_mutex); + + switch (_state) { + case State::kNotStarted: + case State::kRunning: { + _state = State::kStopping; + + for (auto& waiter : _waiters) { + // Cancel any session we own. + waiter.session->cancelAsyncOperations(); + } + + // There may not be outstanding threads, check for shutdown now. + _checkForShutdown(lk); - auto oldState = std::exchange(_state, State::kStopped); - if (oldState != State::kStopped) { - _threadPool->shutdown(); + if (_state == State::kStopped) { + // We were able to become stopped immediately. + return Status::OK(); + } + } break; + case State::kStopping: { + // Just nead to wait it out. + } break; + case State::kStopped: { + // Totally done. + return Status::OK(); + } break; + default: { MONGO_UNREACHABLE; } } } - return waitForShutdown(); + LOGV2_DEBUG(4910504, + kDiagnosticLogLevel, + "Waiting for shutdown of fixed thread-pool service executor", + "name"_attr = _options.poolName); + + // There is a world where we are able to simply do a timed wait upon a future chain. However, + // that world likely requires an OperationContext available through shutdown. + auto lk = stdx::unique_lock(_mutex); + if (!_shutdownCondition.wait_for( + lk, timeout.toSystemDuration(), [this] { return _state == State::kStopped; })) { + return Status(ErrorCodes::ExceededTimeLimit, + "Failed to shutdown all executor threads within the time limit"); + } + + return Status::OK(); +} + +void ServiceExecutorFixed::_checkForShutdown(WithLock) { + if (_state == State::kRunning) { + // We're actively running. + return; + } + invariant(_state != State::kNotStarted); + + if (!_waiters.empty()) { + // We still have some in wait. + return; + } + + auto tasksLeft = _tasksLeft(); + if (tasksLeft > 0) { + // We have tasks remaining. + return; + } + invariant(tasksLeft == 0); + + // We have achieved a soft form of shutdown: + // - _state != kRunning means that there will be no new external tasks or waiters. + // - _waiters.empty() means that all network waits have finished and there will be no new + // internal tasks. + // - _tasksLeft() == 0 means that all tasks, both internal and external have finished. + // + // From this point on, all of our threads will be idle. When the dtor runs, the thread pool will + // experience a trivial shutdown() and join(). + _state = State::kStopped; + + LOGV2_DEBUG( + 4910505, kDiagnosticLogLevel, "Finishing shutdown", "name"_attr = _options.poolName); + _shutdownCondition.notify_one(); + + if (!_svcCtx) { + // For some tests, we do not have a ServiceContext. + invariant(TestingProctor::instance().isEnabled()); + return; + } + + auto tl = _svcCtx->getTransportLayer(); + invariant(tl); + + auto reactor = tl->getReactor(TransportLayer::WhichReactor::kIngress); + invariant(reactor); + reactor->stop(); } Status ServiceExecutorFixed::scheduleTask(Task task, ScheduleFlags flags) { - if (!_canScheduleWork.load()) { - return Status(ErrorCodes::ShutdownInProgress, "Executor is not running"); + { + auto lk = stdx::unique_lock(_mutex); + if (_state != State::kRunning) { + return kInShutdown; + } + + _stats.tasksScheduled.fetchAndAdd(1); } auto mayExecuteTaskInline = [&] { @@ -157,31 +356,70 @@ Status ServiceExecutorFixed::scheduleTask(Task task, ScheduleFlags flags) { hangBeforeSchedulingServiceExecutorFixedTask.pauseWhileSet(); - // May throw if an attempt is made to schedule after the thread pool is shutdown. - try { - _threadPool->schedule([task = std::move(task)](Status status) mutable { - internalAssert(status); - invariant(_executorContext); - _executorContext->run(std::move(task)); - }); - } catch (DBException& e) { - return e.toStatus(); - } + _threadPool->schedule([this, task = std::move(task)](Status status) mutable { + invariant(status); + + _executorContext->run([&] { task(); }); + }); return Status::OK(); } +void ServiceExecutorFixed::_schedule(OutOfLineExecutor::Task task) noexcept { + { + auto lk = stdx::unique_lock(_mutex); + if (_state != State::kRunning) { + lk.unlock(); + + task(kInShutdown); + return; + } + + _stats.tasksScheduled.fetchAndAdd(1); + } + + _threadPool->schedule([this, task = std::move(task)](Status status) mutable { + _executorContext->run([&] { task(std::move(status)); }); + }); +} + void ServiceExecutorFixed::runOnDataAvailable(const SessionHandle& session, OutOfLineExecutor::Task onCompletionCallback) { invariant(session); + + auto waiter = Waiter{session, std::move(onCompletionCallback)}; + + WaiterList::iterator it; + { + // Make sure we're still allowed to schedule and track the session + auto lk = stdx::unique_lock(_mutex); + if (_state != State::kRunning) { + lk.unlock(); + waiter.onCompletionCallback(kInShutdown); + return; + } + + it = _waiters.emplace(_waiters.end(), std::move(waiter)); + } + session->asyncWaitForData() .thenRunOn(shared_from_this()) - .getAsync(std::move(onCompletionCallback)); + .getAsync([this, anchor = shared_from_this(), it](Status status) mutable { + Waiter waiter; + { + // Remove our waiter from the list. + auto lk = stdx::unique_lock(_mutex); + waiter = std::exchange(*it, {}); + _waiters.erase(it); + } + + waiter.onCompletionCallback(std::move(status)); + }); } void ServiceExecutorFixed::appendStats(BSONObjBuilder* bob) const { *bob << kExecutorLabel << kExecutorName << kThreadsRunning - << static_cast<int>(_numRunningExecutorThreads.load()); + << static_cast<int>(_threadsRunning()); } int ServiceExecutorFixed::getRecursionDepthForExecutorThread() const { @@ -189,19 +427,5 @@ int ServiceExecutorFixed::getRecursionDepthForExecutorThread() const { return _executorContext->getRecursionDepth(); } -ServiceExecutorFixed::ExecutorThreadContext::ExecutorThreadContext( - std::weak_ptr<ServiceExecutorFixed> serviceExecutor) - : _executor(std::move(serviceExecutor)) { - _adjustRunningExecutorThreads(1); - hangAfterServiceExecutorFixedExecutorThreadsStart.pauseWhileSet(); -} - -ServiceExecutorFixed::ExecutorThreadContext::~ExecutorThreadContext() { - if (auto threadsRunning = _adjustRunningExecutorThreads(-1); - threadsRunning.has_value() && threadsRunning.value() == 0) { - hangBeforeServiceExecutorFixedLastExecutorThreadReturns.pauseWhileSet(); - } -} - } // namespace transport } // namespace mongo |