diff options
Diffstat (limited to 'src/mongo/transport')
18 files changed, 99 insertions, 99 deletions
diff --git a/src/mongo/transport/baton_asio_linux.h b/src/mongo/transport/baton_asio_linux.h index 3536bc16ab4..dd5062cab72 100644 --- a/src/mongo/transport/baton_asio_linux.h +++ b/src/mongo/transport/baton_asio_linux.h @@ -38,7 +38,7 @@ #include "mongo/base/checked_cast.h" #include "mongo/db/operation_context.h" -#include "mongo/stdx/mutex.h" +#include "mongo/platform/mutex.h" #include "mongo/stdx/unordered_map.h" #include "mongo/transport/baton.h" #include "mongo/transport/session_asio.h" @@ -158,7 +158,7 @@ public: auto pf = makePromiseFuture<void>(); auto id = timer.id(); - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); if (!_opCtx) { return kDetached; @@ -178,7 +178,7 @@ public: bool cancelSession(Session& session) noexcept override { const auto id = session.id(); - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); if (_sessions.find(id) == _sessions.end()) { return false; @@ -192,7 +192,7 @@ public: bool cancelTimer(const ReactorTimer& timer) noexcept override { const auto id = timer.id(); - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); if (_timersById.find(id) == _timersById.end()) { return false; @@ -211,7 +211,7 @@ public: } void schedule(Task func) noexcept override { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); if (!_opCtx) { func(kDetached); @@ -261,7 +261,7 @@ public: promise.emplaceValue(); } - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); while (_scheduled.size()) { auto toRun = std::exchange(_scheduled, {}); @@ -273,7 +273,7 @@ public: } }); - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); // If anything was scheduled, run it now. No need to poll if (_scheduled.size()) { @@ -374,7 +374,7 @@ private: auto id = session.id(); auto pf = makePromiseFuture<void>(); - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); if (!_opCtx) { return kDetached; @@ -394,7 +394,7 @@ private: decltype(_timers) timers; { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); invariant(_opCtx->getBaton().get() == this); _opCtx->setBaton(nullptr); @@ -438,10 +438,10 @@ private: * the eventfd. If not, we run inline. */ template <typename Callback> - void _safeExecute(stdx::unique_lock<stdx::mutex> lk, Callback&& cb) { + void _safeExecute(stdx::unique_lock<Latch> lk, Callback&& cb) { if (_inPoll) { _scheduled.push_back([cb = std::forward<Callback>(cb), this](Status) mutable { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); cb(); }); @@ -455,7 +455,7 @@ private: return EventFDHolder::getForClient(_opCtx->getClient()); } - stdx::mutex _mutex; + Mutex _mutex = MONGO_MAKE_LATCH("BatonASIO::_mutex"); OperationContext* _opCtx; diff --git a/src/mongo/transport/service_entry_point_impl.h b/src/mongo/transport/service_entry_point_impl.h index 2e3f5219e21..1788ef8a146 100644 --- a/src/mongo/transport/service_entry_point_impl.h +++ b/src/mongo/transport/service_entry_point_impl.h @@ -32,8 +32,8 @@ #include <list> #include "mongo/platform/atomic_word.h" -#include "mongo/stdx/condition_variable.h" -#include "mongo/stdx/mutex.h" +#include "mongo/platform/condition_variable.h" +#include "mongo/platform/mutex.h" #include "mongo/stdx/variant.h" #include "mongo/transport/service_entry_point.h" #include "mongo/transport/service_executor_reserved.h" @@ -81,7 +81,7 @@ private: ServiceContext* const _svcCtx; AtomicWord<std::size_t> _nWorkers; - mutable stdx::mutex _sessionsMutex; + mutable Mutex _sessionsMutex = MONGO_MAKE_LATCH("ServiceEntryPointImpl::_sessionsMutex"); stdx::condition_variable _shutdownCondition; SSMList _sessions; diff --git a/src/mongo/transport/service_executor_adaptive.cpp b/src/mongo/transport/service_executor_adaptive.cpp index 3f35fe07c78..848ca8eb531 100644 --- a/src/mongo/transport/service_executor_adaptive.cpp +++ b/src/mongo/transport/service_executor_adaptive.cpp @@ -160,7 +160,7 @@ Status ServiceExecutorAdaptive::shutdown(Milliseconds timeout) { _scheduleCondition.notify_one(); _controllerThread.join(); - stdx::unique_lock<stdx::mutex> lk(_threadsMutex); + stdx::unique_lock<Latch> lk(_threadsMutex); _reactorHandle->stop(); bool result = _deathCondition.wait_for(lk, timeout.toSystemDuration(), [&] { return _threads.empty(); }); @@ -285,7 +285,7 @@ bool ServiceExecutorAdaptive::_isStarved() const { * by schedule(). */ void ServiceExecutorAdaptive::_controllerThreadRoutine() { - stdx::mutex noopLock; + auto noopLock = MONGO_MAKE_LATCH(); setThreadName("worker-controller"_sd); // Setup the timers/timeout values for stuck thread detection. @@ -294,7 +294,7 @@ void ServiceExecutorAdaptive::_controllerThreadRoutine() { // Get the initial values for our utilization percentage calculations auto getTimerTotals = [this]() { - stdx::unique_lock<stdx::mutex> lk(_threadsMutex); + stdx::unique_lock<Latch> lk(_threadsMutex); auto first = _getThreadTimerTotal(ThreadTimer::kExecuting, lk); auto second = _getThreadTimerTotal(ThreadTimer::kRunning, lk); return std::make_pair(first, second); @@ -428,7 +428,7 @@ void ServiceExecutorAdaptive::_controllerThreadRoutine() { } void ServiceExecutorAdaptive::_startWorkerThread(ThreadCreationReason reason) { - stdx::unique_lock<stdx::mutex> lk(_threadsMutex); + stdx::unique_lock<Latch> lk(_threadsMutex); auto it = _threads.emplace(_threads.begin(), _tickSource); auto num = _threads.size(); @@ -452,7 +452,7 @@ void ServiceExecutorAdaptive::_startWorkerThread(ThreadCreationReason reason) { } Milliseconds ServiceExecutorAdaptive::_getThreadJitter() const { - static stdx::mutex jitterMutex; + static auto jitterMutex = MONGO_MAKE_LATCH(); static std::default_random_engine randomEngine = [] { std::random_device seed; return std::default_random_engine(seed()); @@ -464,7 +464,7 @@ Milliseconds ServiceExecutorAdaptive::_getThreadJitter() const { std::uniform_int_distribution<> jitterDist(-jitterParam, jitterParam); - stdx::lock_guard<stdx::mutex> lk(jitterMutex); + stdx::lock_guard<Latch> lk(jitterMutex); auto jitter = jitterDist(randomEngine); if (jitter > _config->workerThreadRunTime().count()) jitter = 0; @@ -485,8 +485,8 @@ void ServiceExecutorAdaptive::_accumulateTaskMetrics(MetricsArray* outArray, } } -void ServiceExecutorAdaptive::_accumulateAllTaskMetrics( - MetricsArray* outputMetricsArray, const stdx::unique_lock<stdx::mutex>& lk) const { +void ServiceExecutorAdaptive::_accumulateAllTaskMetrics(MetricsArray* outputMetricsArray, + const stdx::unique_lock<Latch>& lk) const { _accumulateTaskMetrics(outputMetricsArray, _accumulatedMetrics); for (auto& thread : _threads) { _accumulateTaskMetrics(outputMetricsArray, thread.threadMetrics); @@ -494,7 +494,7 @@ void ServiceExecutorAdaptive::_accumulateAllTaskMetrics( } TickSource::Tick ServiceExecutorAdaptive::_getThreadTimerTotal( - ThreadTimer which, const stdx::unique_lock<stdx::mutex>& lk) const { + ThreadTimer which, const stdx::unique_lock<Latch>& lk) const { TickSource::Tick accumulator; switch (which) { case ThreadTimer::kRunning: @@ -539,7 +539,7 @@ void ServiceExecutorAdaptive::_workerThreadRoutine( _accumulateTaskMetrics(&_accumulatedMetrics, state->threadMetrics); { - stdx::lock_guard<stdx::mutex> lk(_threadsMutex); + stdx::lock_guard<Latch> lk(_threadsMutex); _threads.erase(state); } _deathCondition.notify_one(); @@ -631,7 +631,7 @@ StringData ServiceExecutorAdaptive::_threadStartedByToString( } void ServiceExecutorAdaptive::appendStats(BSONObjBuilder* bob) const { - stdx::unique_lock<stdx::mutex> lk(_threadsMutex); + stdx::unique_lock<Latch> lk(_threadsMutex); *bob << kExecutorLabel << kExecutorName // << kTotalQueued << _totalQueued.load() // << kTotalExecuted << _totalExecuted.load() // diff --git a/src/mongo/transport/service_executor_adaptive.h b/src/mongo/transport/service_executor_adaptive.h index 4c3b670549b..a0def9da063 100644 --- a/src/mongo/transport/service_executor_adaptive.h +++ b/src/mongo/transport/service_executor_adaptive.h @@ -35,7 +35,7 @@ #include "mongo/db/service_context.h" #include "mongo/platform/atomic_word.h" -#include "mongo/stdx/condition_variable.h" +#include "mongo/platform/condition_variable.h" #include "mongo/stdx/thread.h" #include "mongo/transport/service_executor.h" #include "mongo/transport/service_executor_task_names.h" @@ -138,7 +138,7 @@ private: CumulativeTickTimer(TickSource* ts) : _timer(ts) {} TickSource::Tick markStopped() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); invariant(_running); _running = false; auto curTime = _timer.sinceStartTicks(); @@ -147,14 +147,14 @@ private: } void markRunning() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); invariant(!_running); _timer.reset(); _running = true; } TickSource::Tick totalTime() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); if (!_running) return _accumulator; return _timer.sinceStartTicks() + _accumulator; @@ -162,7 +162,7 @@ private: private: TickTimer _timer; - mutable stdx::mutex _mutex; + mutable Mutex _mutex = MONGO_MAKE_LATCH("::_mutex"); TickSource::Tick _accumulator = 0; bool _running = false; }; @@ -202,15 +202,15 @@ private: void _accumulateTaskMetrics(MetricsArray* outArray, const MetricsArray& inputArray) const; void _accumulateAllTaskMetrics(MetricsArray* outputMetricsArray, - const stdx::unique_lock<stdx::mutex>& lk) const; + const stdx::unique_lock<Latch>& lk) const; TickSource::Tick _getThreadTimerTotal(ThreadTimer which, - const stdx::unique_lock<stdx::mutex>& lk) const; + const stdx::unique_lock<Latch>& lk) const; ReactorHandle _reactorHandle; std::unique_ptr<Options> _config; - mutable stdx::mutex _threadsMutex; + mutable Mutex _threadsMutex = MONGO_MAKE_LATCH("ServiceExecutorAdaptive::_threadsMutex"); ThreadList _threads; std::array<int64_t, static_cast<size_t>(ThreadCreationReason::kMax)> _threadStartCounters; diff --git a/src/mongo/transport/service_executor_adaptive_test.cpp b/src/mongo/transport/service_executor_adaptive_test.cpp index 8e27d91549d..e4abf9f276a 100644 --- a/src/mongo/transport/service_executor_adaptive_test.cpp +++ b/src/mongo/transport/service_executor_adaptive_test.cpp @@ -114,11 +114,11 @@ protected: std::shared_ptr<asio::io_context> asioIoCtx; - stdx::mutex mutex; + mutex = MONGO_MAKE_LATCH("ServiceExecutorAdaptiveFixture::mutex"); AtomicWord<int> waitFor{-1}; stdx::condition_variable cond; std::function<void()> notifyCallback = [this] { - stdx::unique_lock<stdx::mutex> lk(mutex); + stdx::unique_lock<Latch> lk(mutex); invariant(waitFor.load() != -1); waitFor.fetchAndSubtract(1); cond.notify_one(); @@ -126,7 +126,7 @@ protected: }; void waitForCallback(int expected, boost::optional<Milliseconds> timeout = boost::none) { - stdx::unique_lock<stdx::mutex> lk(mutex); + stdx::unique_lock<Latch> lk(mutex); invariant(waitFor.load() != -1); if (timeout) { ASSERT_TRUE(cond.wait_for( @@ -163,8 +163,8 @@ protected: * that those threads retire when they become idle. */ TEST_F(ServiceExecutorAdaptiveFixture, TestStuckTask) { - stdx::mutex blockedMutex; - stdx::unique_lock<stdx::mutex> blockedLock(blockedMutex); + auto blockedMutex = MONGO_MAKE_LATCH(); + stdx::unique_lock<Latch> blockedLock(blockedMutex); auto exec = makeAndStartExecutor<TestOptions>(); auto guard = makeGuard([&] { @@ -178,7 +178,7 @@ TEST_F(ServiceExecutorAdaptiveFixture, TestStuckTask) { ASSERT_OK(exec->schedule( [this, &blockedMutex] { notifyCallback(); - stdx::unique_lock<stdx::mutex> lk(blockedMutex); + stdx::unique_lock<Latch> lk(blockedMutex); notifyCallback(); }, ServiceExecutor::kEmptyFlags, @@ -208,8 +208,8 @@ TEST_F(ServiceExecutorAdaptiveFixture, TestStuckTask) { * threads are running a task for longer than the stuckThreadTimeout. */ TEST_F(ServiceExecutorAdaptiveFixture, TestStuckThreads) { - stdx::mutex blockedMutex; - stdx::unique_lock<stdx::mutex> blockedLock(blockedMutex); + auto blockedMutex = MONGO_MAKE_LATCH(); + stdx::unique_lock<Latch> blockedLock(blockedMutex); auto exec = makeAndStartExecutor<TestOptions>(); auto guard = makeGuard([&] { @@ -221,7 +221,7 @@ TEST_F(ServiceExecutorAdaptiveFixture, TestStuckThreads) { auto blockedTask = [this, &blockedMutex] { log() << "waiting on blocked mutex"; notifyCallback(); - stdx::unique_lock<stdx::mutex> lk(blockedMutex); + stdx::unique_lock<Latch> lk(blockedMutex); notifyCallback(); }; @@ -260,8 +260,8 @@ TEST_F(ServiceExecutorAdaptiveFixture, TestStuckThreads) { TEST_F(ServiceExecutorAdaptiveFixture, TestStarvation) { auto exec = makeAndStartExecutor<TestOptions>(); - // Mutex so we don't attempt to call schedule and shutdown concurrently - stdx::mutex scheduleMutex; + // auto so = MONGO_MAKE_LATCH() we don't attempt to call schedule and shutdown concurrently + auto scheduleMutex = MONGO_MAKE_LATCH(); auto guard = makeGuard([&] { ASSERT_OK(exec->shutdown(config->workerThreadRunTime() * 2)); }); @@ -274,7 +274,7 @@ TEST_F(ServiceExecutorAdaptiveFixture, TestStarvation) { stdx::this_thread::sleep_for(config->maxQueueLatency().toSystemDuration() * 5); { - stdx::unique_lock<stdx::mutex> lock(scheduleMutex); + stdx::unique_lock<Latch> lock(scheduleMutex); if (scheduleNew) { ASSERT_OK(exec->schedule(task, @@ -298,7 +298,7 @@ TEST_F(ServiceExecutorAdaptiveFixture, TestStarvation) { stdx::this_thread::sleep_for(config->workerThreadRunTime().toSystemDuration() * 2); ASSERT_EQ(exec->threadsRunning(), 2); - stdx::unique_lock<stdx::mutex> lock(scheduleMutex); + stdx::unique_lock<Latch> lock(scheduleMutex); scheduleNew = false; } @@ -310,7 +310,7 @@ TEST_F(ServiceExecutorAdaptiveFixture, TestRecursion) { auto exec = makeAndStartExecutor<RecursionOptions>(); AtomicWord<int> remainingTasks{config->recursionLimit() - 1}; - stdx::mutex mutex; + auto mutex = MONGO_MAKE_LATCH(); stdx::condition_variable cv; std::function<void()> task; @@ -334,7 +334,7 @@ TEST_F(ServiceExecutorAdaptiveFixture, TestRecursion) { log() << "Completing task recursively"; }; - stdx::unique_lock<stdx::mutex> lock(mutex); + stdx::unique_lock<Latch> lock(mutex); ASSERT_OK(exec->schedule( task, ServiceExecutor::kEmptyFlags, ServiceExecutorTaskName::kSSMProcessMessage)); @@ -352,8 +352,8 @@ TEST_F(ServiceExecutorAdaptiveFixture, TestRecursion) { * with new normal tasks */ TEST_F(ServiceExecutorAdaptiveFixture, TestDeferredTasks) { - stdx::mutex blockedMutex; - stdx::unique_lock<stdx::mutex> blockedLock(blockedMutex); + auto blockedMutex = MONGO_MAKE_LATCH(); + stdx::unique_lock<Latch> blockedLock(blockedMutex); auto exec = makeAndStartExecutor<TestOptions>(); auto guard = makeGuard([&] { @@ -366,7 +366,7 @@ TEST_F(ServiceExecutorAdaptiveFixture, TestDeferredTasks) { log() << "Scheduling a blocking task"; ASSERT_OK(exec->schedule( [this, &blockedMutex] { - stdx::unique_lock<stdx::mutex> lk(blockedMutex); + stdx::unique_lock<Latch> lk(blockedMutex); notifyCallback(); }, ServiceExecutor::kEmptyFlags, diff --git a/src/mongo/transport/service_executor_reserved.cpp b/src/mongo/transport/service_executor_reserved.cpp index 24820ab1d91..902bf98d7c3 100644 --- a/src/mongo/transport/service_executor_reserved.cpp +++ b/src/mongo/transport/service_executor_reserved.cpp @@ -62,7 +62,7 @@ ServiceExecutorReserved::ServiceExecutorReserved(ServiceContext* ctx, Status ServiceExecutorReserved::start() { { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); _stillRunning.store(true); _numStartingThreads = _reservedThreads; } @@ -80,7 +80,7 @@ Status ServiceExecutorReserved::start() { Status ServiceExecutorReserved::_startWorker() { log() << "Starting new worker thread for " << _name << " service executor"; return launchServiceWorkerThread([this] { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); _numRunningWorkerThreads.addAndFetch(1); auto numRunningGuard = makeGuard([&] { _numRunningWorkerThreads.subtractAndFetch(1); @@ -142,7 +142,7 @@ Status ServiceExecutorReserved::_startWorker() { Status ServiceExecutorReserved::shutdown(Milliseconds timeout) { LOG(3) << "Shutting down reserved executor"; - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); _stillRunning.store(false); _threadWakeup.notify_all(); @@ -178,7 +178,7 @@ Status ServiceExecutorReserved::schedule(Task task, return Status::OK(); } - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _readyTasks.push_back(std::move(task)); _threadWakeup.notify_one(); @@ -186,7 +186,7 @@ Status ServiceExecutorReserved::schedule(Task task, } void ServiceExecutorReserved::appendStats(BSONObjBuilder* bob) const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); *bob << kExecutorLabel << kExecutorName << kThreadsRunning << static_cast<int>(_numRunningWorkerThreads.loadRelaxed()) << kReadyThreads << static_cast<int>(_numReadyThreads) << kStartingThreads diff --git a/src/mongo/transport/service_executor_reserved.h b/src/mongo/transport/service_executor_reserved.h index d83a07566f5..53bd3b00ade 100644 --- a/src/mongo/transport/service_executor_reserved.h +++ b/src/mongo/transport/service_executor_reserved.h @@ -33,8 +33,8 @@ #include "mongo/base/status.h" #include "mongo/platform/atomic_word.h" -#include "mongo/stdx/condition_variable.h" -#include "mongo/stdx/mutex.h" +#include "mongo/platform/condition_variable.h" +#include "mongo/platform/mutex.h" #include "mongo/transport/service_executor.h" #include "mongo/transport/service_executor_task_names.h" @@ -74,7 +74,7 @@ private: AtomicWord<bool> _stillRunning{false}; - mutable stdx::mutex _mutex; + mutable Mutex _mutex = MONGO_MAKE_LATCH("ServiceExecutorReserved::_mutex"); stdx::condition_variable _threadWakeup; stdx::condition_variable _shutdownCondition; diff --git a/src/mongo/transport/service_executor_synchronous.cpp b/src/mongo/transport/service_executor_synchronous.cpp index 79fc88e0033..25104fd46dd 100644 --- a/src/mongo/transport/service_executor_synchronous.cpp +++ b/src/mongo/transport/service_executor_synchronous.cpp @@ -67,7 +67,7 @@ Status ServiceExecutorSynchronous::shutdown(Milliseconds timeout) { _stillRunning.store(false); - stdx::unique_lock<stdx::mutex> lock(_shutdownMutex); + stdx::unique_lock<Latch> lock(_shutdownMutex); bool result = _shutdownCondition.wait_for(lock, timeout.toSystemDuration(), [this]() { return _numRunningWorkerThreads.load() == 0; }); diff --git a/src/mongo/transport/service_executor_synchronous.h b/src/mongo/transport/service_executor_synchronous.h index ebe381d9fe2..192583bded7 100644 --- a/src/mongo/transport/service_executor_synchronous.h +++ b/src/mongo/transport/service_executor_synchronous.h @@ -33,8 +33,8 @@ #include "mongo/base/status.h" #include "mongo/platform/atomic_word.h" -#include "mongo/stdx/condition_variable.h" -#include "mongo/stdx/mutex.h" +#include "mongo/platform/condition_variable.h" +#include "mongo/platform/mutex.h" #include "mongo/transport/service_executor.h" #include "mongo/transport/service_executor_task_names.h" @@ -66,7 +66,7 @@ private: AtomicWord<bool> _stillRunning{false}; - mutable stdx::mutex _shutdownMutex; + mutable Mutex _shutdownMutex = MONGO_MAKE_LATCH("ServiceExecutorSynchronous::_shutdownMutex"); stdx::condition_variable _shutdownCondition; AtomicWord<size_t> _numRunningWorkerThreads{0}; diff --git a/src/mongo/transport/service_executor_test.cpp b/src/mongo/transport/service_executor_test.cpp index a7482e09f17..d91cba4fbb7 100644 --- a/src/mongo/transport/service_executor_test.cpp +++ b/src/mongo/transport/service_executor_test.cpp @@ -178,13 +178,13 @@ protected: void scheduleBasicTask(ServiceExecutor* exec, bool expectSuccess) { stdx::condition_variable cond; - stdx::mutex mutex; + auto mutex = MONGO_MAKE_LATCH(); auto task = [&cond, &mutex] { - stdx::unique_lock<stdx::mutex> lk(mutex); + stdx::unique_lock<Latch> lk(mutex); cond.notify_all(); }; - stdx::unique_lock<stdx::mutex> lk(mutex); + stdx::unique_lock<Latch> lk(mutex); auto status = exec->schedule( std::move(task), ServiceExecutor::kEmptyFlags, ServiceExecutorTaskName::kSSMStartSession); if (expectSuccess) { diff --git a/src/mongo/transport/service_state_machine.h b/src/mongo/transport/service_state_machine.h index 840204e30f8..8125fd941b3 100644 --- a/src/mongo/transport/service_state_machine.h +++ b/src/mongo/transport/service_state_machine.h @@ -37,7 +37,7 @@ #include "mongo/config.h" #include "mongo/db/service_context.h" #include "mongo/platform/atomic_word.h" -#include "mongo/stdx/mutex.h" +#include "mongo/platform/mutex.h" #include "mongo/stdx/thread.h" #include "mongo/transport/message_compressor_base.h" #include "mongo/transport/service_entry_point.h" diff --git a/src/mongo/transport/service_state_machine_test.cpp b/src/mongo/transport/service_state_machine_test.cpp index 02447e5a289..2cc54156c6b 100644 --- a/src/mongo/transport/service_state_machine_test.cpp +++ b/src/mongo/transport/service_state_machine_test.cpp @@ -282,19 +282,19 @@ private: class SimpleEvent { public: void signal() { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); _signaled = true; _cond.notify_one(); } void wait() { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); _cond.wait(lk, [this] { return _signaled; }); _signaled = false; } private: - stdx::mutex _mutex; + Mutex _mutex = MONGO_MAKE_LATCH("SimpleEvent::_mutex"); stdx::condition_variable _cond; bool _signaled = false; }; diff --git a/src/mongo/transport/session_asio.h b/src/mongo/transport/session_asio.h index 434371863e5..34547494aeb 100644 --- a/src/mongo/transport/session_asio.h +++ b/src/mongo/transport/session_asio.h @@ -222,7 +222,7 @@ protected: #ifdef MONGO_CONFIG_SSL // The unique_lock here is held by TransportLayerASIO to synchronize with the asyncConnect // timeout callback. It will be unlocked before the SSL actually handshake begins. - Future<void> handshakeSSLForEgressWithLock(stdx::unique_lock<stdx::mutex> lk, + Future<void> handshakeSSLForEgressWithLock(stdx::unique_lock<Latch> lk, const HostAndPort& target) { if (!_tl->_egressSSLContext) { return Future<void>::makeReady(Status(ErrorCodes::SSLHandshakeFailed, @@ -254,8 +254,8 @@ protected: // For synchronous connections where we don't have an async timer, just take a dummy lock and // pass it to the WithLock version of handshakeSSLForEgress Future<void> handshakeSSLForEgress(const HostAndPort& target) { - stdx::mutex mutex; - return handshakeSSLForEgressWithLock(stdx::unique_lock<stdx::mutex>(mutex), target); + auto mutex = MONGO_MAKE_LATCH(); + return handshakeSSLForEgressWithLock(stdx::unique_lock<Latch>(mutex), target); } #endif diff --git a/src/mongo/transport/transport_layer_asio.cpp b/src/mongo/transport/transport_layer_asio.cpp index 58153a870e4..c2aef1e19e2 100644 --- a/src/mongo/transport/transport_layer_asio.cpp +++ b/src/mongo/transport/transport_layer_asio.cpp @@ -530,7 +530,7 @@ Future<SessionHandle> TransportLayerASIO::asyncConnect(HostAndPort peer, AtomicWord<bool> done{false}; Promise<SessionHandle> promise; - stdx::mutex mutex; + Mutex mutex = MONGO_MAKE_LATCH("AsyncConnectState::mutex"); GenericSocket socket; ASIOReactorTimer timeoutTimer; WrappedResolver resolver; @@ -562,7 +562,7 @@ Future<SessionHandle> TransportLayerASIO::asyncConnect(HostAndPort peer, connector->resolvedEndpoint)); std::error_code ec; - stdx::lock_guard<stdx::mutex> lk(connector->mutex); + stdx::lock_guard<Latch> lk(connector->mutex); connector->resolver.cancel(); if (connector->session) { connector->session->end(); @@ -583,7 +583,7 @@ Future<SessionHandle> TransportLayerASIO::asyncConnect(HostAndPort peer, << " took " << timeAfter - timeBefore; } - stdx::lock_guard<stdx::mutex> lk(connector->mutex); + stdx::lock_guard<Latch> lk(connector->mutex); connector->resolvedEndpoint = results.front(); connector->socket.open(connector->resolvedEndpoint->protocol()); @@ -595,7 +595,7 @@ Future<SessionHandle> TransportLayerASIO::asyncConnect(HostAndPort peer, return connector->socket.async_connect(*connector->resolvedEndpoint, UseFuture{}); }) .then([this, connector, sslMode]() -> Future<void> { - stdx::unique_lock<stdx::mutex> lk(connector->mutex); + stdx::unique_lock<Latch> lk(connector->mutex); connector->session = std::make_shared<ASIOSession>(this, std::move(connector->socket), false); connector->session->ensureAsync(); @@ -780,7 +780,7 @@ Status TransportLayerASIO::setup() { } Status TransportLayerASIO::start() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _running.store(true); if (_listenerOptions.isIngress()) { @@ -812,7 +812,7 @@ Status TransportLayerASIO::start() { } void TransportLayerASIO::shutdown() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _running.store(false); // Loop through the acceptors and cancel their calls to async_accept. This will prevent new diff --git a/src/mongo/transport/transport_layer_asio.h b/src/mongo/transport/transport_layer_asio.h index 90008fe3c89..fef6aeecea9 100644 --- a/src/mongo/transport/transport_layer_asio.h +++ b/src/mongo/transport/transport_layer_asio.h @@ -36,8 +36,8 @@ #include "mongo/base/status_with.h" #include "mongo/config.h" #include "mongo/db/server_options.h" -#include "mongo/stdx/condition_variable.h" -#include "mongo/stdx/mutex.h" +#include "mongo/platform/condition_variable.h" +#include "mongo/platform/mutex.h" #include "mongo/stdx/thread.h" #include "mongo/transport/transport_layer.h" #include "mongo/transport/transport_mode.h" @@ -160,7 +160,7 @@ private: SSLParams::SSLModes _sslMode() const; #endif - stdx::mutex _mutex; + Mutex _mutex = MONGO_MAKE_LATCH("TransportLayerASIO::_mutex"); // There are three reactors that are used by TransportLayerASIO. The _ingressReactor contains // all the accepted sockets and all ingress networking activity. The _acceptorReactor contains diff --git a/src/mongo/transport/transport_layer_asio_test.cpp b/src/mongo/transport/transport_layer_asio_test.cpp index 08dcd99dcae..53f979d9cd8 100644 --- a/src/mongo/transport/transport_layer_asio_test.cpp +++ b/src/mongo/transport/transport_layer_asio_test.cpp @@ -48,7 +48,7 @@ namespace { class ServiceEntryPointUtil : public ServiceEntryPoint { public: void startSession(transport::SessionHandle session) override { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); _sessions.push_back(std::move(session)); log() << "started session"; _cv.notify_one(); @@ -58,7 +58,7 @@ public: log() << "end all sessions"; std::vector<transport::SessionHandle> old_sessions; { - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); old_sessions.swap(_sessions); } old_sessions.clear(); @@ -75,7 +75,7 @@ public: void appendStats(BSONObjBuilder*) const override {} size_t numOpenSessions() const override { - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); return _sessions.size(); } @@ -88,12 +88,12 @@ public: } void waitForConnect() { - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); _cv.wait(lock, [&] { return !_sessions.empty(); }); } private: - mutable stdx::mutex _mutex; + mutable Mutex _mutex = MONGO_MAKE_LATCH("::_mutex"); stdx::condition_variable _cv; std::vector<transport::SessionHandle> _sessions; transport::TransportLayer* _transport = nullptr; @@ -107,7 +107,7 @@ public: SockAddr sa{"localhost", _port, AF_INET}; s.connect(sa); log() << "connection: port " << _port; - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); _cv.wait(lk, [&] { return _stop; }); log() << "connection: Rx stop request"; }}; @@ -115,7 +115,7 @@ public: void stop() { { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); _stop = true; } log() << "connection: Tx stop request"; @@ -125,7 +125,7 @@ public: } private: - stdx::mutex _mutex; + Mutex _mutex = MONGO_MAKE_LATCH("SimpleConnectionThread::_mutex"); stdx::condition_variable _cv; stdx::thread _thr; bool _stop = false; @@ -196,7 +196,7 @@ public: } bool waitForTimeout(boost::optional<Milliseconds> timeout = boost::none) { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); bool ret = true; if (timeout) { ret = _cond.wait_for(lk, timeout->toSystemDuration(), [this] { return _finished; }); @@ -210,7 +210,7 @@ public: protected: void notifyComplete() { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); _finished = true; _cond.notify_one(); } @@ -221,7 +221,7 @@ protected: } private: - stdx::mutex _mutex; + Mutex _mutex = MONGO_MAKE_LATCH("TimeoutSEP::_mutex"); stdx::condition_variable _cond; bool _finished = false; diff --git a/src/mongo/transport/transport_layer_manager.cpp b/src/mongo/transport/transport_layer_manager.cpp index dcc91cf3e3d..57a49fb563f 100644 --- a/src/mongo/transport/transport_layer_manager.cpp +++ b/src/mongo/transport/transport_layer_manager.cpp @@ -53,7 +53,7 @@ TransportLayerManager::TransportLayerManager() = default; template <typename Callable> void TransportLayerManager::_foreach(Callable&& cb) const { { - stdx::lock_guard<stdx::mutex> lk(_tlsMutex); + stdx::lock_guard<Latch> lk(_tlsMutex); for (auto&& tl : _tls) { cb(tl.get()); } @@ -111,7 +111,7 @@ Status TransportLayerManager::setup() { Status TransportLayerManager::addAndStartTransportLayer(std::unique_ptr<TransportLayer> tl) { auto ptr = tl.get(); { - stdx::lock_guard<stdx::mutex> lk(_tlsMutex); + stdx::lock_guard<Latch> lk(_tlsMutex); _tls.emplace_back(std::move(tl)); } return ptr->start(); diff --git a/src/mongo/transport/transport_layer_manager.h b/src/mongo/transport/transport_layer_manager.h index 1dd5ef38527..3bc0e6ba5c6 100644 --- a/src/mongo/transport/transport_layer_manager.h +++ b/src/mongo/transport/transport_layer_manager.h @@ -32,7 +32,7 @@ #include <vector> #include "mongo/base/status.h" -#include "mongo/stdx/mutex.h" +#include "mongo/platform/mutex.h" #include "mongo/transport/session.h" #include "mongo/transport/transport_layer.h" #include "mongo/util/time_support.h" @@ -91,7 +91,7 @@ public: static std::unique_ptr<TransportLayer> makeAndStartDefaultEgressTransportLayer(); BatonHandle makeBaton(OperationContext* opCtx) const override { - stdx::lock_guard<stdx::mutex> lk(_tlsMutex); + stdx::lock_guard<Latch> lk(_tlsMutex); // TODO: figure out what to do about managers with more than one transport layer. invariant(_tls.size() == 1); return _tls[0]->makeBaton(opCtx); @@ -101,7 +101,7 @@ private: template <typename Callable> void _foreach(Callable&& cb) const; - mutable stdx::mutex _tlsMutex; + mutable Mutex _tlsMutex = MONGO_MAKE_LATCH("TransportLayerManager::_tlsMutex"); std::vector<std::unique_ptr<TransportLayer>> _tls; }; |