summaryrefslogtreecommitdiff
path: root/src/mongo/transport/service_executor_synchronous.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/transport/service_executor_synchronous.cpp')
-rw-r--r--src/mongo/transport/service_executor_synchronous.cpp205
1 files changed, 125 insertions, 80 deletions
diff --git a/src/mongo/transport/service_executor_synchronous.cpp b/src/mongo/transport/service_executor_synchronous.cpp
index a13053dfb08..002d44cb213 100644
--- a/src/mongo/transport/service_executor_synchronous.cpp
+++ b/src/mongo/transport/service_executor_synchronous.cpp
@@ -27,129 +27,174 @@
* it in the license file.
*/
-
-#include "mongo/platform/basic.h"
-
#include "mongo/transport/service_executor_synchronous.h"
#include "mongo/logv2/log.h"
#include "mongo/stdx/thread.h"
#include "mongo/transport/service_executor_utils.h"
-#include "mongo/util/thread_safety_context.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kExecutor
-
-namespace mongo {
-namespace transport {
+namespace mongo::transport {
namespace {
-constexpr auto kExecutorName = "passthrough"_sd;
-
-constexpr auto kThreadsRunning = "threadsRunning"_sd;
-constexpr auto kClientsInTotal = "clientsInTotal"_sd;
-constexpr auto kClientsRunning = "clientsRunning"_sd;
-constexpr auto kClientsWaiting = "clientsWaitingForData"_sd;
-
const auto getServiceExecutorSynchronous =
ServiceContext::declareDecoration<std::unique_ptr<ServiceExecutorSynchronous>>();
-const auto serviceExecutorSynchronousRegisterer = ServiceContext::ConstructorActionRegisterer{
+const ServiceContext::ConstructorActionRegisterer serviceExecutorSynchronousRegisterer{
"ServiceExecutorSynchronous", [](ServiceContext* ctx) {
getServiceExecutorSynchronous(ctx) = std::make_unique<ServiceExecutorSynchronous>(ctx);
}};
} // namespace
-thread_local std::deque<ServiceExecutor::Task> ServiceExecutorSynchronous::_localWorkQueue = {};
-thread_local int64_t ServiceExecutorSynchronous::_localThreadIdleCounter = 0;
+class ServiceExecutorSynchronous::SharedState : public std::enable_shared_from_this<SharedState> {
+private:
+ class LockRef {
+ public:
+ explicit LockRef(SharedState* p) : _p{p} {}
-ServiceExecutorSynchronous::ServiceExecutorSynchronous(ServiceContext* ctx)
- : _shutdownCondition(std::make_shared<stdx::condition_variable>()) {}
+ size_t threads() const {
+ return _p->_threads;
+ }
-Status ServiceExecutorSynchronous::start() {
- _stillRunning.store(true);
+ bool waitForDrain(Milliseconds dur) {
+ return _p->_cv.wait_for(_lk, dur.toSystemDuration(), [&] { return !_p->_threads; });
+ }
- return Status::OK();
-}
+ void onStartThread() {
+ ++_p->_threads;
+ }
-Status ServiceExecutorSynchronous::shutdown(Milliseconds timeout) {
- LOGV2_DEBUG(22982, 3, "Shutting down passthrough executor");
+ void onEndThread() {
+ if (!--_p->_threads)
+ _p->_cv.notify_all();
+ }
- _stillRunning.store(false);
+ private:
+ SharedState* _p;
+ stdx::unique_lock<stdx::mutex> _lk{_p->_mutex}; // NOLINT
+ };
- stdx::unique_lock<Latch> lock(_shutdownMutex);
- bool result = _shutdownCondition->wait_for(lock, timeout.toSystemDuration(), [this]() {
- return _numRunningWorkerThreads.load() == 0;
- });
+public:
+ void schedule(Task task);
- return result
- ? Status::OK()
- : Status(ErrorCodes::Error::ExceededTimeLimit,
- "passthrough executor couldn't shutdown all worker threads within time limit.");
-}
+ bool isRunning() const {
+ return _isRunning.loadRelaxed();
+ }
-ServiceExecutorSynchronous* ServiceExecutorSynchronous::get(ServiceContext* ctx) {
- auto& ref = getServiceExecutorSynchronous(ctx);
- invariant(ref);
- return ref.get();
-}
+ void setIsRunning(bool b) {
+ _isRunning.store(b);
+ }
-void ServiceExecutorSynchronous::schedule(Task task) {
- if (!_stillRunning.load()) {
+ LockRef lock() {
+ return LockRef{this};
+ }
+
+private:
+ class WorkerThreadInfo;
+
+ mutable stdx::mutex _mutex; // NOLINT
+ stdx::condition_variable _cv;
+ AtomicWord<bool> _isRunning;
+ size_t _threads = 0;
+};
+
+class ServiceExecutorSynchronous::SharedState::WorkerThreadInfo {
+public:
+ explicit WorkerThreadInfo(std::shared_ptr<SharedState> sharedState)
+ : sharedState{std::move(sharedState)} {}
+
+ void run() {
+ while (!queue.empty() && sharedState->isRunning()) {
+ queue.front()(Status::OK());
+ queue.pop_front();
+ }
+ }
+
+ std::shared_ptr<SharedState> sharedState;
+ std::deque<Task> queue;
+};
+
+void ServiceExecutorSynchronous::SharedState::schedule(Task task) {
+ if (!isRunning()) {
task(Status(ErrorCodes::ShutdownInProgress, "Executor is not running"));
return;
}
- if (!_localWorkQueue.empty()) {
- _localWorkQueue.emplace_back(std::move(task));
+ thread_local WorkerThreadInfo* workerThreadInfoTls = nullptr;
+
+ if (workerThreadInfoTls) {
+ workerThreadInfoTls->queue.push_back(std::move(task));
return;
}
- // First call to scheduleTask() for this connection, spawn a worker thread that will push jobs
- // into the thread local job queue.
- LOGV2_DEBUG(22983, 3, "Starting new executor thread in passthrough mode");
-
- Status status = launchServiceWorkerThread(
- [this, condVarAnchor = _shutdownCondition, task = std::move(task)]() mutable {
- _numRunningWorkerThreads.addAndFetch(1);
-
- _localWorkQueue.emplace_back(std::move(task));
- while (!_localWorkQueue.empty() && _stillRunning.loadRelaxed()) {
- _localWorkQueue.front()(Status::OK());
- _localWorkQueue.pop_front();
- }
-
- // We maintain an anchor to "_shutdownCondition" to ensure it remains alive even if the
- // service executor is freed. Any access to the service executor (through "this") is
- // prohibited (and unsafe) after the following line. For more context, see SERVER-49432.
- auto numWorkerThreadsStillRunning = _numRunningWorkerThreads.subtractAndFetch(1);
- if (numWorkerThreadsStillRunning == 0) {
- condVarAnchor->notify_all();
- }
- });
+ LOGV2_DEBUG(22983, 3, "Starting ServiceExecutorSynchronous worker thread");
+ auto workerInfo = std::make_unique<WorkerThreadInfo>(shared_from_this());
+ workerInfo->queue.push_back(std::move(task));
+
+ Status status = launchServiceWorkerThread([w = std::move(workerInfo)] {
+ w->sharedState->lock().onStartThread();
+ ScopeGuard onEndThreadGuard = [&] { w->sharedState->lock().onEndThread(); };
+
+ workerThreadInfoTls = &*w;
+ ScopeGuard resetTlsGuard = [&] { workerThreadInfoTls = nullptr; };
+
+ w->run();
+ });
// The usual way to fail to schedule is to invoke the task, but in this case
// we don't have the task anymore. We gave it away to the callback that the
// failed thread was supposed to run.
iassert(status);
}
+ServiceExecutorSynchronous::ServiceExecutorSynchronous(ServiceContext*)
+ : _sharedState{std::make_shared<SharedState>()} {}
+
+ServiceExecutorSynchronous::~ServiceExecutorSynchronous() = default;
+
+Status ServiceExecutorSynchronous::start() {
+ _sharedState->setIsRunning(true);
+ return Status::OK();
+}
+
+Status ServiceExecutorSynchronous::shutdown(Milliseconds timeout) {
+ LOGV2_DEBUG(22982, 3, "Shutting down passthrough executor");
+ auto stopLock = _sharedState->lock();
+ _sharedState->setIsRunning(false);
+ if (!stopLock.waitForDrain(timeout))
+ return Status(ErrorCodes::Error::ExceededTimeLimit,
+ "passthrough executor couldn't shutdown "
+ "all worker threads within time limit.");
+ return Status::OK();
+}
+
+ServiceExecutorSynchronous* ServiceExecutorSynchronous::get(ServiceContext* ctx) {
+ auto& ref = getServiceExecutorSynchronous(ctx);
+ invariant(ref);
+ return ref.get();
+}
+
+void ServiceExecutorSynchronous::schedule(Task task) {
+ _sharedState->schedule(std::move(task));
+}
+
+size_t ServiceExecutorSynchronous::getRunningThreads() const {
+ return _sharedState->lock().threads();
+}
+
void ServiceExecutorSynchronous::appendStats(BSONObjBuilder* bob) const {
- // The ServiceExecutorSynchronous has one client per thread and waits synchronously on thread.
- auto threads = static_cast<int>(_numRunningWorkerThreads.loadRelaxed());
-
- BSONObjBuilder subbob = bob->subobjStart(kExecutorName);
- subbob.append(kThreadsRunning, threads);
- subbob.append(kClientsInTotal, threads);
- subbob.append(kClientsRunning, threads);
- subbob.append(kClientsWaiting, 0);
+ // Has one client per thread and waits synchronously on that thread.
+ int threads = getRunningThreads();
+ BSONObjBuilder{bob->subobjStart("passthrough")}
+ .append("threadsRunning", threads)
+ .append("clientsInTotal", threads)
+ .append("clientsRunning", threads)
+ .append("clientsWaitingForData", 0);
}
-void ServiceExecutorSynchronous::runOnDataAvailable(const SessionHandle& session, Task callback) {
+void ServiceExecutorSynchronous::runOnDataAvailable(const SessionHandle& session, Task task) {
invariant(session);
yieldIfAppropriate();
-
- schedule([callback = std::move(callback)](Status status) { callback(std::move(status)); });
+ schedule(std::move(task));
}
-
-} // namespace transport
-} // namespace mongo
+} // namespace mongo::transport