diff options
Diffstat (limited to 'src/mongo/transport/service_executor_synchronous.cpp')
-rw-r--r-- | src/mongo/transport/service_executor_synchronous.cpp | 205 |
1 files changed, 125 insertions, 80 deletions
diff --git a/src/mongo/transport/service_executor_synchronous.cpp b/src/mongo/transport/service_executor_synchronous.cpp index a13053dfb08..002d44cb213 100644 --- a/src/mongo/transport/service_executor_synchronous.cpp +++ b/src/mongo/transport/service_executor_synchronous.cpp @@ -27,129 +27,174 @@ * it in the license file. */ - -#include "mongo/platform/basic.h" - #include "mongo/transport/service_executor_synchronous.h" #include "mongo/logv2/log.h" #include "mongo/stdx/thread.h" #include "mongo/transport/service_executor_utils.h" -#include "mongo/util/thread_safety_context.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kExecutor - -namespace mongo { -namespace transport { +namespace mongo::transport { namespace { -constexpr auto kExecutorName = "passthrough"_sd; - -constexpr auto kThreadsRunning = "threadsRunning"_sd; -constexpr auto kClientsInTotal = "clientsInTotal"_sd; -constexpr auto kClientsRunning = "clientsRunning"_sd; -constexpr auto kClientsWaiting = "clientsWaitingForData"_sd; - const auto getServiceExecutorSynchronous = ServiceContext::declareDecoration<std::unique_ptr<ServiceExecutorSynchronous>>(); -const auto serviceExecutorSynchronousRegisterer = ServiceContext::ConstructorActionRegisterer{ +const ServiceContext::ConstructorActionRegisterer serviceExecutorSynchronousRegisterer{ "ServiceExecutorSynchronous", [](ServiceContext* ctx) { getServiceExecutorSynchronous(ctx) = std::make_unique<ServiceExecutorSynchronous>(ctx); }}; } // namespace -thread_local std::deque<ServiceExecutor::Task> ServiceExecutorSynchronous::_localWorkQueue = {}; -thread_local int64_t ServiceExecutorSynchronous::_localThreadIdleCounter = 0; +class ServiceExecutorSynchronous::SharedState : public std::enable_shared_from_this<SharedState> { +private: + class LockRef { + public: + explicit LockRef(SharedState* p) : _p{p} {} -ServiceExecutorSynchronous::ServiceExecutorSynchronous(ServiceContext* ctx) - : _shutdownCondition(std::make_shared<stdx::condition_variable>()) {} + size_t threads() const { + return _p->_threads; + } -Status ServiceExecutorSynchronous::start() { - _stillRunning.store(true); + bool waitForDrain(Milliseconds dur) { + return _p->_cv.wait_for(_lk, dur.toSystemDuration(), [&] { return !_p->_threads; }); + } - return Status::OK(); -} + void onStartThread() { + ++_p->_threads; + } -Status ServiceExecutorSynchronous::shutdown(Milliseconds timeout) { - LOGV2_DEBUG(22982, 3, "Shutting down passthrough executor"); + void onEndThread() { + if (!--_p->_threads) + _p->_cv.notify_all(); + } - _stillRunning.store(false); + private: + SharedState* _p; + stdx::unique_lock<stdx::mutex> _lk{_p->_mutex}; // NOLINT + }; - stdx::unique_lock<Latch> lock(_shutdownMutex); - bool result = _shutdownCondition->wait_for(lock, timeout.toSystemDuration(), [this]() { - return _numRunningWorkerThreads.load() == 0; - }); +public: + void schedule(Task task); - return result - ? Status::OK() - : Status(ErrorCodes::Error::ExceededTimeLimit, - "passthrough executor couldn't shutdown all worker threads within time limit."); -} + bool isRunning() const { + return _isRunning.loadRelaxed(); + } -ServiceExecutorSynchronous* ServiceExecutorSynchronous::get(ServiceContext* ctx) { - auto& ref = getServiceExecutorSynchronous(ctx); - invariant(ref); - return ref.get(); -} + void setIsRunning(bool b) { + _isRunning.store(b); + } -void ServiceExecutorSynchronous::schedule(Task task) { - if (!_stillRunning.load()) { + LockRef lock() { + return LockRef{this}; + } + +private: + class WorkerThreadInfo; + + mutable stdx::mutex _mutex; // NOLINT + stdx::condition_variable _cv; + AtomicWord<bool> _isRunning; + size_t _threads = 0; +}; + +class ServiceExecutorSynchronous::SharedState::WorkerThreadInfo { +public: + explicit WorkerThreadInfo(std::shared_ptr<SharedState> sharedState) + : sharedState{std::move(sharedState)} {} + + void run() { + while (!queue.empty() && sharedState->isRunning()) { + queue.front()(Status::OK()); + queue.pop_front(); + } + } + + std::shared_ptr<SharedState> sharedState; + std::deque<Task> queue; +}; + +void ServiceExecutorSynchronous::SharedState::schedule(Task task) { + if (!isRunning()) { task(Status(ErrorCodes::ShutdownInProgress, "Executor is not running")); return; } - if (!_localWorkQueue.empty()) { - _localWorkQueue.emplace_back(std::move(task)); + thread_local WorkerThreadInfo* workerThreadInfoTls = nullptr; + + if (workerThreadInfoTls) { + workerThreadInfoTls->queue.push_back(std::move(task)); return; } - // First call to scheduleTask() for this connection, spawn a worker thread that will push jobs - // into the thread local job queue. - LOGV2_DEBUG(22983, 3, "Starting new executor thread in passthrough mode"); - - Status status = launchServiceWorkerThread( - [this, condVarAnchor = _shutdownCondition, task = std::move(task)]() mutable { - _numRunningWorkerThreads.addAndFetch(1); - - _localWorkQueue.emplace_back(std::move(task)); - while (!_localWorkQueue.empty() && _stillRunning.loadRelaxed()) { - _localWorkQueue.front()(Status::OK()); - _localWorkQueue.pop_front(); - } - - // We maintain an anchor to "_shutdownCondition" to ensure it remains alive even if the - // service executor is freed. Any access to the service executor (through "this") is - // prohibited (and unsafe) after the following line. For more context, see SERVER-49432. - auto numWorkerThreadsStillRunning = _numRunningWorkerThreads.subtractAndFetch(1); - if (numWorkerThreadsStillRunning == 0) { - condVarAnchor->notify_all(); - } - }); + LOGV2_DEBUG(22983, 3, "Starting ServiceExecutorSynchronous worker thread"); + auto workerInfo = std::make_unique<WorkerThreadInfo>(shared_from_this()); + workerInfo->queue.push_back(std::move(task)); + + Status status = launchServiceWorkerThread([w = std::move(workerInfo)] { + w->sharedState->lock().onStartThread(); + ScopeGuard onEndThreadGuard = [&] { w->sharedState->lock().onEndThread(); }; + + workerThreadInfoTls = &*w; + ScopeGuard resetTlsGuard = [&] { workerThreadInfoTls = nullptr; }; + + w->run(); + }); // The usual way to fail to schedule is to invoke the task, but in this case // we don't have the task anymore. We gave it away to the callback that the // failed thread was supposed to run. iassert(status); } +ServiceExecutorSynchronous::ServiceExecutorSynchronous(ServiceContext*) + : _sharedState{std::make_shared<SharedState>()} {} + +ServiceExecutorSynchronous::~ServiceExecutorSynchronous() = default; + +Status ServiceExecutorSynchronous::start() { + _sharedState->setIsRunning(true); + return Status::OK(); +} + +Status ServiceExecutorSynchronous::shutdown(Milliseconds timeout) { + LOGV2_DEBUG(22982, 3, "Shutting down passthrough executor"); + auto stopLock = _sharedState->lock(); + _sharedState->setIsRunning(false); + if (!stopLock.waitForDrain(timeout)) + return Status(ErrorCodes::Error::ExceededTimeLimit, + "passthrough executor couldn't shutdown " + "all worker threads within time limit."); + return Status::OK(); +} + +ServiceExecutorSynchronous* ServiceExecutorSynchronous::get(ServiceContext* ctx) { + auto& ref = getServiceExecutorSynchronous(ctx); + invariant(ref); + return ref.get(); +} + +void ServiceExecutorSynchronous::schedule(Task task) { + _sharedState->schedule(std::move(task)); +} + +size_t ServiceExecutorSynchronous::getRunningThreads() const { + return _sharedState->lock().threads(); +} + void ServiceExecutorSynchronous::appendStats(BSONObjBuilder* bob) const { - // The ServiceExecutorSynchronous has one client per thread and waits synchronously on thread. - auto threads = static_cast<int>(_numRunningWorkerThreads.loadRelaxed()); - - BSONObjBuilder subbob = bob->subobjStart(kExecutorName); - subbob.append(kThreadsRunning, threads); - subbob.append(kClientsInTotal, threads); - subbob.append(kClientsRunning, threads); - subbob.append(kClientsWaiting, 0); + // Has one client per thread and waits synchronously on that thread. + int threads = getRunningThreads(); + BSONObjBuilder{bob->subobjStart("passthrough")} + .append("threadsRunning", threads) + .append("clientsInTotal", threads) + .append("clientsRunning", threads) + .append("clientsWaitingForData", 0); } -void ServiceExecutorSynchronous::runOnDataAvailable(const SessionHandle& session, Task callback) { +void ServiceExecutorSynchronous::runOnDataAvailable(const SessionHandle& session, Task task) { invariant(session); yieldIfAppropriate(); - - schedule([callback = std::move(callback)](Status status) { callback(std::move(status)); }); + schedule(std::move(task)); } - -} // namespace transport -} // namespace mongo +} // namespace mongo::transport |