summaryrefslogtreecommitdiff
path: root/src/mongo/transport
diff options
context:
space:
mode:
authorBen Caimano <ben.caimano@mongodb.com>2019-11-01 17:24:53 +0000
committerevergreen <evergreen@mongodb.com>2019-11-01 17:24:53 +0000
commitbf5bef47a8e6937b4e0d2c9df3fde3470bdc72c9 (patch)
tree8f71a9f272082dd9ee0e471ef5fcb9f19519600d /src/mongo/transport
parentf210bc645453c05979067c556bf6f2bd43e64134 (diff)
downloadmongo-bf5bef47a8e6937b4e0d2c9df3fde3470bdc72c9.tar.gz
SERVER-42165 Replace uses of stdx::mutex with mongo::Mutex
Diffstat (limited to 'src/mongo/transport')
-rw-r--r--src/mongo/transport/baton_asio_linux.h24
-rw-r--r--src/mongo/transport/service_entry_point_impl.h4
-rw-r--r--src/mongo/transport/service_executor_adaptive.cpp22
-rw-r--r--src/mongo/transport/service_executor_adaptive.h15
-rw-r--r--src/mongo/transport/service_executor_adaptive_test.cpp36
-rw-r--r--src/mongo/transport/service_executor_reserved.cpp10
-rw-r--r--src/mongo/transport/service_executor_reserved.h4
-rw-r--r--src/mongo/transport/service_executor_synchronous.cpp2
-rw-r--r--src/mongo/transport/service_executor_synchronous.h4
-rw-r--r--src/mongo/transport/service_executor_test.cpp6
-rw-r--r--src/mongo/transport/service_state_machine.h2
-rw-r--r--src/mongo/transport/service_state_machine_test.cpp6
-rw-r--r--src/mongo/transport/session_asio.h6
-rw-r--r--src/mongo/transport/transport_layer_asio.cpp12
-rw-r--r--src/mongo/transport/transport_layer_asio.h4
-rw-r--r--src/mongo/transport/transport_layer_asio_test.cpp22
-rw-r--r--src/mongo/transport/transport_layer_manager.cpp4
-rw-r--r--src/mongo/transport/transport_layer_manager.h6
18 files changed, 95 insertions, 94 deletions
diff --git a/src/mongo/transport/baton_asio_linux.h b/src/mongo/transport/baton_asio_linux.h
index 0db7fda5230..930cf52c67d 100644
--- a/src/mongo/transport/baton_asio_linux.h
+++ b/src/mongo/transport/baton_asio_linux.h
@@ -38,7 +38,7 @@
#include "mongo/base/checked_cast.h"
#include "mongo/db/operation_context.h"
-#include "mongo/stdx/mutex.h"
+#include "mongo/platform/mutex.h"
#include "mongo/stdx/unordered_map.h"
#include "mongo/transport/baton.h"
#include "mongo/transport/session_asio.h"
@@ -158,7 +158,7 @@ public:
auto pf = makePromiseFuture<void>();
auto id = timer.id();
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
if (!_opCtx) {
return kDetached;
@@ -178,7 +178,7 @@ public:
bool cancelSession(Session& session) noexcept override {
const auto id = session.id();
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
if (_sessions.find(id) == _sessions.end()) {
return false;
@@ -192,7 +192,7 @@ public:
bool cancelTimer(const ReactorTimer& timer) noexcept override {
const auto id = timer.id();
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
if (_timersById.find(id) == _timersById.end()) {
return false;
@@ -211,7 +211,7 @@ public:
}
void schedule(Task func) noexcept override {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
if (!_opCtx) {
func(kDetached);
@@ -261,7 +261,7 @@ public:
promise.emplaceValue();
}
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
while (_scheduled.size()) {
auto toRun = std::exchange(_scheduled, {});
@@ -273,7 +273,7 @@ public:
}
});
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
// If anything was scheduled, run it now. No need to poll
if (_scheduled.size()) {
@@ -375,7 +375,7 @@ private:
auto id = session.id();
auto pf = makePromiseFuture<void>();
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
if (!_opCtx) {
return kDetached;
@@ -395,7 +395,7 @@ private:
decltype(_timers) timers;
{
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
invariant(_opCtx->getBaton().get() == this);
_opCtx->setBaton(nullptr);
@@ -439,10 +439,10 @@ private:
* the eventfd. If not, we run inline.
*/
template <typename Callback>
- void _safeExecute(stdx::unique_lock<stdx::mutex> lk, Callback&& cb) {
+ void _safeExecute(stdx::unique_lock<Latch> lk, Callback&& cb) {
if (_inPoll) {
_scheduled.push_back([cb = std::forward<Callback>(cb), this](Status) mutable {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
cb();
});
@@ -456,7 +456,7 @@ private:
return EventFDHolder::getForClient(_opCtx->getClient());
}
- stdx::mutex _mutex;
+ Mutex _mutex = MONGO_MAKE_LATCH("BatonASIO::_mutex");
OperationContext* _opCtx;
diff --git a/src/mongo/transport/service_entry_point_impl.h b/src/mongo/transport/service_entry_point_impl.h
index 87a8d815c91..2adc90be390 100644
--- a/src/mongo/transport/service_entry_point_impl.h
+++ b/src/mongo/transport/service_entry_point_impl.h
@@ -31,9 +31,9 @@
#include "mongo/platform/atomic_word.h"
+#include "mongo/platform/mutex.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/list.h"
-#include "mongo/stdx/mutex.h"
#include "mongo/stdx/variant.h"
#include "mongo/transport/service_entry_point.h"
#include "mongo/transport/service_executor_reserved.h"
@@ -81,7 +81,7 @@ private:
ServiceContext* const _svcCtx;
AtomicWord<std::size_t> _nWorkers;
- mutable stdx::mutex _sessionsMutex;
+ mutable Mutex _sessionsMutex = MONGO_MAKE_LATCH("ServiceEntryPointImpl::_sessionsMutex");
stdx::condition_variable _shutdownCondition;
SSMList _sessions;
diff --git a/src/mongo/transport/service_executor_adaptive.cpp b/src/mongo/transport/service_executor_adaptive.cpp
index 9e34f16f60d..2f193bd16af 100644
--- a/src/mongo/transport/service_executor_adaptive.cpp
+++ b/src/mongo/transport/service_executor_adaptive.cpp
@@ -160,7 +160,7 @@ Status ServiceExecutorAdaptive::shutdown(Milliseconds timeout) {
_scheduleCondition.notify_one();
_controllerThread.join();
- stdx::unique_lock<stdx::mutex> lk(_threadsMutex);
+ stdx::unique_lock<Latch> lk(_threadsMutex);
_reactorHandle->stop();
bool result =
_deathCondition.wait_for(lk, timeout.toSystemDuration(), [&] { return _threads.empty(); });
@@ -285,7 +285,7 @@ bool ServiceExecutorAdaptive::_isStarved() const {
* by schedule().
*/
void ServiceExecutorAdaptive::_controllerThreadRoutine() {
- stdx::mutex noopLock;
+ auto noopLock = MONGO_MAKE_LATCH();
setThreadName("worker-controller"_sd);
// Setup the timers/timeout values for stuck thread detection.
@@ -294,7 +294,7 @@ void ServiceExecutorAdaptive::_controllerThreadRoutine() {
// Get the initial values for our utilization percentage calculations
auto getTimerTotals = [this]() {
- stdx::unique_lock<stdx::mutex> lk(_threadsMutex);
+ stdx::unique_lock<Latch> lk(_threadsMutex);
auto first = _getThreadTimerTotal(ThreadTimer::kExecuting, lk);
auto second = _getThreadTimerTotal(ThreadTimer::kRunning, lk);
return std::make_pair(first, second);
@@ -428,7 +428,7 @@ void ServiceExecutorAdaptive::_controllerThreadRoutine() {
}
void ServiceExecutorAdaptive::_startWorkerThread(ThreadCreationReason reason) {
- stdx::unique_lock<stdx::mutex> lk(_threadsMutex);
+ stdx::unique_lock<Latch> lk(_threadsMutex);
auto it = _threads.emplace(_threads.begin(), _tickSource);
auto num = _threads.size();
@@ -452,7 +452,7 @@ void ServiceExecutorAdaptive::_startWorkerThread(ThreadCreationReason reason) {
}
Milliseconds ServiceExecutorAdaptive::_getThreadJitter() const {
- static stdx::mutex jitterMutex;
+ static auto jitterMutex = MONGO_MAKE_LATCH();
static std::default_random_engine randomEngine = [] {
std::random_device seed;
return std::default_random_engine(seed());
@@ -464,7 +464,7 @@ Milliseconds ServiceExecutorAdaptive::_getThreadJitter() const {
std::uniform_int_distribution<> jitterDist(-jitterParam, jitterParam);
- stdx::lock_guard<stdx::mutex> lk(jitterMutex);
+ stdx::lock_guard<Latch> lk(jitterMutex);
auto jitter = jitterDist(randomEngine);
if (jitter > _config->workerThreadRunTime().count())
jitter = 0;
@@ -485,8 +485,8 @@ void ServiceExecutorAdaptive::_accumulateTaskMetrics(MetricsArray* outArray,
}
}
-void ServiceExecutorAdaptive::_accumulateAllTaskMetrics(
- MetricsArray* outputMetricsArray, const stdx::unique_lock<stdx::mutex>& lk) const {
+void ServiceExecutorAdaptive::_accumulateAllTaskMetrics(MetricsArray* outputMetricsArray,
+ const stdx::unique_lock<Latch>& lk) const {
_accumulateTaskMetrics(outputMetricsArray, _accumulatedMetrics);
for (auto& thread : _threads) {
_accumulateTaskMetrics(outputMetricsArray, thread.threadMetrics);
@@ -494,7 +494,7 @@ void ServiceExecutorAdaptive::_accumulateAllTaskMetrics(
}
TickSource::Tick ServiceExecutorAdaptive::_getThreadTimerTotal(
- ThreadTimer which, const stdx::unique_lock<stdx::mutex>& lk) const {
+ ThreadTimer which, const stdx::unique_lock<Latch>& lk) const {
TickSource::Tick accumulator;
switch (which) {
case ThreadTimer::kRunning:
@@ -539,7 +539,7 @@ void ServiceExecutorAdaptive::_workerThreadRoutine(
_accumulateTaskMetrics(&_accumulatedMetrics, state->threadMetrics);
{
- stdx::lock_guard<stdx::mutex> lk(_threadsMutex);
+ stdx::lock_guard<Latch> lk(_threadsMutex);
_threads.erase(state);
}
_deathCondition.notify_one();
@@ -631,7 +631,7 @@ StringData ServiceExecutorAdaptive::_threadStartedByToString(
}
void ServiceExecutorAdaptive::appendStats(BSONObjBuilder* bob) const {
- stdx::unique_lock<stdx::mutex> lk(_threadsMutex);
+ stdx::unique_lock<Latch> lk(_threadsMutex);
*bob << kExecutorLabel << kExecutorName //
<< kTotalQueued << _totalQueued.load() //
<< kTotalExecuted << _totalExecuted.load() //
diff --git a/src/mongo/transport/service_executor_adaptive.h b/src/mongo/transport/service_executor_adaptive.h
index 7beaca91026..27b38f5b665 100644
--- a/src/mongo/transport/service_executor_adaptive.h
+++ b/src/mongo/transport/service_executor_adaptive.h
@@ -34,6 +34,7 @@
#include "mongo/db/service_context.h"
#include "mongo/platform/atomic_word.h"
+#include "mongo/platform/mutex.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/list.h"
#include "mongo/stdx/thread.h"
@@ -138,7 +139,7 @@ private:
CumulativeTickTimer(TickSource* ts) : _timer(ts) {}
TickSource::Tick markStopped() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
invariant(_running);
_running = false;
auto curTime = _timer.sinceStartTicks();
@@ -147,14 +148,14 @@ private:
}
void markRunning() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
invariant(!_running);
_timer.reset();
_running = true;
}
TickSource::Tick totalTime() const {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
if (!_running)
return _accumulator;
return _timer.sinceStartTicks() + _accumulator;
@@ -162,7 +163,7 @@ private:
private:
TickTimer _timer;
- mutable stdx::mutex _mutex;
+ mutable Mutex _mutex = MONGO_MAKE_LATCH("::_mutex");
TickSource::Tick _accumulator = 0;
bool _running = false;
};
@@ -202,15 +203,15 @@ private:
void _accumulateTaskMetrics(MetricsArray* outArray, const MetricsArray& inputArray) const;
void _accumulateAllTaskMetrics(MetricsArray* outputMetricsArray,
- const stdx::unique_lock<stdx::mutex>& lk) const;
+ const stdx::unique_lock<Latch>& lk) const;
TickSource::Tick _getThreadTimerTotal(ThreadTimer which,
- const stdx::unique_lock<stdx::mutex>& lk) const;
+ const stdx::unique_lock<Latch>& lk) const;
ReactorHandle _reactorHandle;
std::unique_ptr<Options> _config;
- mutable stdx::mutex _threadsMutex;
+ mutable Mutex _threadsMutex = MONGO_MAKE_LATCH("ServiceExecutorAdaptive::_threadsMutex");
ThreadList _threads;
std::array<int64_t, static_cast<size_t>(ThreadCreationReason::kMax)> _threadStartCounters;
diff --git a/src/mongo/transport/service_executor_adaptive_test.cpp b/src/mongo/transport/service_executor_adaptive_test.cpp
index 5f8a0192a8e..4c9fd26951f 100644
--- a/src/mongo/transport/service_executor_adaptive_test.cpp
+++ b/src/mongo/transport/service_executor_adaptive_test.cpp
@@ -114,11 +114,11 @@ protected:
std::shared_ptr<asio::io_context> asioIoCtx;
- stdx::mutex mutex;
+ mutex = MONGO_MAKE_LATCH("ServiceExecutorAdaptiveFixture::mutex");
AtomicWord<int> waitFor{-1};
stdx::condition_variable cond;
stdx::function<void()> notifyCallback = [this] {
- stdx::unique_lock<stdx::mutex> lk(mutex);
+ stdx::unique_lock<Latch> lk(mutex);
invariant(waitFor.load() != -1);
waitFor.fetchAndSubtract(1);
cond.notify_one();
@@ -126,7 +126,7 @@ protected:
};
void waitForCallback(int expected, boost::optional<Milliseconds> timeout = boost::none) {
- stdx::unique_lock<stdx::mutex> lk(mutex);
+ stdx::unique_lock<Latch> lk(mutex);
invariant(waitFor.load() != -1);
if (timeout) {
ASSERT_TRUE(cond.wait_for(
@@ -163,8 +163,8 @@ protected:
* that those threads retire when they become idle.
*/
TEST_F(ServiceExecutorAdaptiveFixture, TestStuckTask) {
- stdx::mutex blockedMutex;
- stdx::unique_lock<stdx::mutex> blockedLock(blockedMutex);
+ auto blockedMutex = MONGO_MAKE_LATCH();
+ stdx::unique_lock<Latch> blockedLock(blockedMutex);
auto exec = makeAndStartExecutor<TestOptions>();
auto guard = makeGuard([&] {
@@ -178,7 +178,7 @@ TEST_F(ServiceExecutorAdaptiveFixture, TestStuckTask) {
ASSERT_OK(exec->schedule(
[this, &blockedMutex] {
notifyCallback();
- stdx::unique_lock<stdx::mutex> lk(blockedMutex);
+ stdx::unique_lock<Latch> lk(blockedMutex);
notifyCallback();
},
ServiceExecutor::kEmptyFlags,
@@ -208,8 +208,8 @@ TEST_F(ServiceExecutorAdaptiveFixture, TestStuckTask) {
* threads are running a task for longer than the stuckThreadTimeout.
*/
TEST_F(ServiceExecutorAdaptiveFixture, TestStuckThreads) {
- stdx::mutex blockedMutex;
- stdx::unique_lock<stdx::mutex> blockedLock(blockedMutex);
+ auto blockedMutex = MONGO_MAKE_LATCH();
+ stdx::unique_lock<Latch> blockedLock(blockedMutex);
auto exec = makeAndStartExecutor<TestOptions>();
auto guard = makeGuard([&] {
@@ -221,7 +221,7 @@ TEST_F(ServiceExecutorAdaptiveFixture, TestStuckThreads) {
auto blockedTask = [this, &blockedMutex] {
log() << "waiting on blocked mutex";
notifyCallback();
- stdx::unique_lock<stdx::mutex> lk(blockedMutex);
+ stdx::unique_lock<Latch> lk(blockedMutex);
notifyCallback();
};
@@ -260,8 +260,8 @@ TEST_F(ServiceExecutorAdaptiveFixture, TestStuckThreads) {
TEST_F(ServiceExecutorAdaptiveFixture, TestStarvation) {
auto exec = makeAndStartExecutor<TestOptions>();
- // Mutex so we don't attempt to call schedule and shutdown concurrently
- stdx::mutex scheduleMutex;
+ // auto so = MONGO_MAKE_LATCH() we don't attempt to call schedule and shutdown concurrently
+ auto scheduleMutex = MONGO_MAKE_LATCH();
auto guard = makeGuard([&] { ASSERT_OK(exec->shutdown(config->workerThreadRunTime() * 2)); });
@@ -274,7 +274,7 @@ TEST_F(ServiceExecutorAdaptiveFixture, TestStarvation) {
stdx::this_thread::sleep_for(config->maxQueueLatency().toSystemDuration() * 5);
{
- stdx::unique_lock<stdx::mutex> lock(scheduleMutex);
+ stdx::unique_lock<Latch> lock(scheduleMutex);
if (scheduleNew) {
ASSERT_OK(exec->schedule(task,
@@ -298,7 +298,7 @@ TEST_F(ServiceExecutorAdaptiveFixture, TestStarvation) {
stdx::this_thread::sleep_for(config->workerThreadRunTime().toSystemDuration() * 2);
ASSERT_EQ(exec->threadsRunning(), 2);
- stdx::unique_lock<stdx::mutex> lock(scheduleMutex);
+ stdx::unique_lock<Latch> lock(scheduleMutex);
scheduleNew = false;
}
@@ -310,7 +310,7 @@ TEST_F(ServiceExecutorAdaptiveFixture, TestRecursion) {
auto exec = makeAndStartExecutor<RecursionOptions>();
AtomicWord<int> remainingTasks{config->recursionLimit() - 1};
- stdx::mutex mutex;
+ auto mutex = MONGO_MAKE_LATCH();
stdx::condition_variable cv;
stdx::function<void()> task;
@@ -334,7 +334,7 @@ TEST_F(ServiceExecutorAdaptiveFixture, TestRecursion) {
log() << "Completing task recursively";
};
- stdx::unique_lock<stdx::mutex> lock(mutex);
+ stdx::unique_lock<Latch> lock(mutex);
ASSERT_OK(exec->schedule(
task, ServiceExecutor::kEmptyFlags, ServiceExecutorTaskName::kSSMProcessMessage));
@@ -352,8 +352,8 @@ TEST_F(ServiceExecutorAdaptiveFixture, TestRecursion) {
* with new normal tasks
*/
TEST_F(ServiceExecutorAdaptiveFixture, TestDeferredTasks) {
- stdx::mutex blockedMutex;
- stdx::unique_lock<stdx::mutex> blockedLock(blockedMutex);
+ auto blockedMutex = MONGO_MAKE_LATCH();
+ stdx::unique_lock<Latch> blockedLock(blockedMutex);
auto exec = makeAndStartExecutor<TestOptions>();
auto guard = makeGuard([&] {
@@ -366,7 +366,7 @@ TEST_F(ServiceExecutorAdaptiveFixture, TestDeferredTasks) {
log() << "Scheduling a blocking task";
ASSERT_OK(exec->schedule(
[this, &blockedMutex] {
- stdx::unique_lock<stdx::mutex> lk(blockedMutex);
+ stdx::unique_lock<Latch> lk(blockedMutex);
notifyCallback();
},
ServiceExecutor::kEmptyFlags,
diff --git a/src/mongo/transport/service_executor_reserved.cpp b/src/mongo/transport/service_executor_reserved.cpp
index 24820ab1d91..902bf98d7c3 100644
--- a/src/mongo/transport/service_executor_reserved.cpp
+++ b/src/mongo/transport/service_executor_reserved.cpp
@@ -62,7 +62,7 @@ ServiceExecutorReserved::ServiceExecutorReserved(ServiceContext* ctx,
Status ServiceExecutorReserved::start() {
{
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
_stillRunning.store(true);
_numStartingThreads = _reservedThreads;
}
@@ -80,7 +80,7 @@ Status ServiceExecutorReserved::start() {
Status ServiceExecutorReserved::_startWorker() {
log() << "Starting new worker thread for " << _name << " service executor";
return launchServiceWorkerThread([this] {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
_numRunningWorkerThreads.addAndFetch(1);
auto numRunningGuard = makeGuard([&] {
_numRunningWorkerThreads.subtractAndFetch(1);
@@ -142,7 +142,7 @@ Status ServiceExecutorReserved::_startWorker() {
Status ServiceExecutorReserved::shutdown(Milliseconds timeout) {
LOG(3) << "Shutting down reserved executor";
- stdx::unique_lock<stdx::mutex> lock(_mutex);
+ stdx::unique_lock<Latch> lock(_mutex);
_stillRunning.store(false);
_threadWakeup.notify_all();
@@ -178,7 +178,7 @@ Status ServiceExecutorReserved::schedule(Task task,
return Status::OK();
}
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
_readyTasks.push_back(std::move(task));
_threadWakeup.notify_one();
@@ -186,7 +186,7 @@ Status ServiceExecutorReserved::schedule(Task task,
}
void ServiceExecutorReserved::appendStats(BSONObjBuilder* bob) const {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
*bob << kExecutorLabel << kExecutorName << kThreadsRunning
<< static_cast<int>(_numRunningWorkerThreads.loadRelaxed()) << kReadyThreads
<< static_cast<int>(_numReadyThreads) << kStartingThreads
diff --git a/src/mongo/transport/service_executor_reserved.h b/src/mongo/transport/service_executor_reserved.h
index d83a07566f5..8a71090bf63 100644
--- a/src/mongo/transport/service_executor_reserved.h
+++ b/src/mongo/transport/service_executor_reserved.h
@@ -33,8 +33,8 @@
#include "mongo/base/status.h"
#include "mongo/platform/atomic_word.h"
+#include "mongo/platform/mutex.h"
#include "mongo/stdx/condition_variable.h"
-#include "mongo/stdx/mutex.h"
#include "mongo/transport/service_executor.h"
#include "mongo/transport/service_executor_task_names.h"
@@ -74,7 +74,7 @@ private:
AtomicWord<bool> _stillRunning{false};
- mutable stdx::mutex _mutex;
+ mutable Mutex _mutex = MONGO_MAKE_LATCH("ServiceExecutorReserved::_mutex");
stdx::condition_variable _threadWakeup;
stdx::condition_variable _shutdownCondition;
diff --git a/src/mongo/transport/service_executor_synchronous.cpp b/src/mongo/transport/service_executor_synchronous.cpp
index 79fc88e0033..25104fd46dd 100644
--- a/src/mongo/transport/service_executor_synchronous.cpp
+++ b/src/mongo/transport/service_executor_synchronous.cpp
@@ -67,7 +67,7 @@ Status ServiceExecutorSynchronous::shutdown(Milliseconds timeout) {
_stillRunning.store(false);
- stdx::unique_lock<stdx::mutex> lock(_shutdownMutex);
+ stdx::unique_lock<Latch> lock(_shutdownMutex);
bool result = _shutdownCondition.wait_for(lock, timeout.toSystemDuration(), [this]() {
return _numRunningWorkerThreads.load() == 0;
});
diff --git a/src/mongo/transport/service_executor_synchronous.h b/src/mongo/transport/service_executor_synchronous.h
index ebe381d9fe2..1f0e2f6dd33 100644
--- a/src/mongo/transport/service_executor_synchronous.h
+++ b/src/mongo/transport/service_executor_synchronous.h
@@ -33,8 +33,8 @@
#include "mongo/base/status.h"
#include "mongo/platform/atomic_word.h"
+#include "mongo/platform/mutex.h"
#include "mongo/stdx/condition_variable.h"
-#include "mongo/stdx/mutex.h"
#include "mongo/transport/service_executor.h"
#include "mongo/transport/service_executor_task_names.h"
@@ -66,7 +66,7 @@ private:
AtomicWord<bool> _stillRunning{false};
- mutable stdx::mutex _shutdownMutex;
+ mutable Mutex _shutdownMutex = MONGO_MAKE_LATCH("ServiceExecutorSynchronous::_shutdownMutex");
stdx::condition_variable _shutdownCondition;
AtomicWord<size_t> _numRunningWorkerThreads{0};
diff --git a/src/mongo/transport/service_executor_test.cpp b/src/mongo/transport/service_executor_test.cpp
index f3c05a72a2d..817703218f9 100644
--- a/src/mongo/transport/service_executor_test.cpp
+++ b/src/mongo/transport/service_executor_test.cpp
@@ -178,13 +178,13 @@ protected:
void scheduleBasicTask(ServiceExecutor* exec, bool expectSuccess) {
stdx::condition_variable cond;
- stdx::mutex mutex;
+ auto mutex = MONGO_MAKE_LATCH();
auto task = [&cond, &mutex] {
- stdx::unique_lock<stdx::mutex> lk(mutex);
+ stdx::unique_lock<Latch> lk(mutex);
cond.notify_all();
};
- stdx::unique_lock<stdx::mutex> lk(mutex);
+ stdx::unique_lock<Latch> lk(mutex);
auto status = exec->schedule(
std::move(task), ServiceExecutor::kEmptyFlags, ServiceExecutorTaskName::kSSMStartSession);
if (expectSuccess) {
diff --git a/src/mongo/transport/service_state_machine.h b/src/mongo/transport/service_state_machine.h
index a48f4db321d..5d065d8cb39 100644
--- a/src/mongo/transport/service_state_machine.h
+++ b/src/mongo/transport/service_state_machine.h
@@ -35,9 +35,9 @@
#include "mongo/config.h"
#include "mongo/db/service_context.h"
#include "mongo/platform/atomic_word.h"
+#include "mongo/platform/mutex.h"
#include "mongo/stdx/functional.h"
#include "mongo/stdx/memory.h"
-#include "mongo/stdx/mutex.h"
#include "mongo/stdx/thread.h"
#include "mongo/transport/message_compressor_base.h"
#include "mongo/transport/service_entry_point.h"
diff --git a/src/mongo/transport/service_state_machine_test.cpp b/src/mongo/transport/service_state_machine_test.cpp
index f10fff2d0ef..6c61236769f 100644
--- a/src/mongo/transport/service_state_machine_test.cpp
+++ b/src/mongo/transport/service_state_machine_test.cpp
@@ -281,19 +281,19 @@ private:
class SimpleEvent {
public:
void signal() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
_signaled = true;
_cond.notify_one();
}
void wait() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
_cond.wait(lk, [this] { return _signaled; });
_signaled = false;
}
private:
- stdx::mutex _mutex;
+ Mutex _mutex = MONGO_MAKE_LATCH("SimpleEvent::_mutex");
stdx::condition_variable _cond;
bool _signaled = false;
};
diff --git a/src/mongo/transport/session_asio.h b/src/mongo/transport/session_asio.h
index eca9811ffb1..7e7a9809236 100644
--- a/src/mongo/transport/session_asio.h
+++ b/src/mongo/transport/session_asio.h
@@ -223,7 +223,7 @@ protected:
#ifdef MONGO_CONFIG_SSL
// The unique_lock here is held by TransportLayerASIO to synchronize with the asyncConnect
// timeout callback. It will be unlocked before the SSL actually handshake begins.
- Future<void> handshakeSSLForEgressWithLock(stdx::unique_lock<stdx::mutex> lk,
+ Future<void> handshakeSSLForEgressWithLock(stdx::unique_lock<Latch> lk,
const HostAndPort& target) {
if (!_tl->_egressSSLContext) {
return Future<void>::makeReady(Status(ErrorCodes::SSLHandshakeFailed,
@@ -255,8 +255,8 @@ protected:
// For synchronous connections where we don't have an async timer, just take a dummy lock and
// pass it to the WithLock version of handshakeSSLForEgress
Future<void> handshakeSSLForEgress(const HostAndPort& target) {
- stdx::mutex mutex;
- return handshakeSSLForEgressWithLock(stdx::unique_lock<stdx::mutex>(mutex), target);
+ auto mutex = MONGO_MAKE_LATCH();
+ return handshakeSSLForEgressWithLock(stdx::unique_lock<Latch>(mutex), target);
}
#endif
diff --git a/src/mongo/transport/transport_layer_asio.cpp b/src/mongo/transport/transport_layer_asio.cpp
index 2f91a6d52c5..25c5f9d906e 100644
--- a/src/mongo/transport/transport_layer_asio.cpp
+++ b/src/mongo/transport/transport_layer_asio.cpp
@@ -530,7 +530,7 @@ Future<SessionHandle> TransportLayerASIO::asyncConnect(HostAndPort peer,
AtomicWord<bool> done{false};
Promise<SessionHandle> promise;
- stdx::mutex mutex;
+ Mutex mutex = MONGO_MAKE_LATCH("AsyncConnectState::mutex");
GenericSocket socket;
ASIOReactorTimer timeoutTimer;
WrappedResolver resolver;
@@ -562,7 +562,7 @@ Future<SessionHandle> TransportLayerASIO::asyncConnect(HostAndPort peer,
connector->resolvedEndpoint));
std::error_code ec;
- stdx::lock_guard<stdx::mutex> lk(connector->mutex);
+ stdx::lock_guard<Latch> lk(connector->mutex);
connector->resolver.cancel();
if (connector->session) {
connector->session->end();
@@ -583,7 +583,7 @@ Future<SessionHandle> TransportLayerASIO::asyncConnect(HostAndPort peer,
<< " took " << timeAfter - timeBefore;
}
- stdx::lock_guard<stdx::mutex> lk(connector->mutex);
+ stdx::lock_guard<Latch> lk(connector->mutex);
connector->resolvedEndpoint = results.front();
connector->socket.open(connector->resolvedEndpoint->protocol());
@@ -595,7 +595,7 @@ Future<SessionHandle> TransportLayerASIO::asyncConnect(HostAndPort peer,
return connector->socket.async_connect(*connector->resolvedEndpoint, UseFuture{});
})
.then([this, connector, sslMode]() -> Future<void> {
- stdx::unique_lock<stdx::mutex> lk(connector->mutex);
+ stdx::unique_lock<Latch> lk(connector->mutex);
connector->session =
std::make_shared<ASIOSession>(this, std::move(connector->socket), false);
connector->session->ensureAsync();
@@ -780,7 +780,7 @@ Status TransportLayerASIO::setup() {
}
Status TransportLayerASIO::start() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
_running.store(true);
if (_listenerOptions.isIngress()) {
@@ -819,7 +819,7 @@ Status TransportLayerASIO::start() {
}
void TransportLayerASIO::shutdown() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
_running.store(false);
// Loop through the acceptors and cancel their calls to async_accept. This will prevent new
diff --git a/src/mongo/transport/transport_layer_asio.h b/src/mongo/transport/transport_layer_asio.h
index e37f72c40aa..c6c5f46c18c 100644
--- a/src/mongo/transport/transport_layer_asio.h
+++ b/src/mongo/transport/transport_layer_asio.h
@@ -35,9 +35,9 @@
#include "mongo/base/status_with.h"
#include "mongo/config.h"
#include "mongo/db/server_options.h"
+#include "mongo/platform/mutex.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/memory.h"
-#include "mongo/stdx/mutex.h"
#include "mongo/stdx/thread.h"
#include "mongo/transport/transport_layer.h"
#include "mongo/transport/transport_mode.h"
@@ -160,7 +160,7 @@ private:
SSLParams::SSLModes _sslMode() const;
#endif
- stdx::mutex _mutex;
+ Mutex _mutex = MONGO_MAKE_LATCH("TransportLayerASIO::_mutex");
// There are three reactors that are used by TransportLayerASIO. The _ingressReactor contains
// all the accepted sockets and all ingress networking activity. The _acceptorReactor contains
diff --git a/src/mongo/transport/transport_layer_asio_test.cpp b/src/mongo/transport/transport_layer_asio_test.cpp
index 08dcd99dcae..53f979d9cd8 100644
--- a/src/mongo/transport/transport_layer_asio_test.cpp
+++ b/src/mongo/transport/transport_layer_asio_test.cpp
@@ -48,7 +48,7 @@ namespace {
class ServiceEntryPointUtil : public ServiceEntryPoint {
public:
void startSession(transport::SessionHandle session) override {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
_sessions.push_back(std::move(session));
log() << "started session";
_cv.notify_one();
@@ -58,7 +58,7 @@ public:
log() << "end all sessions";
std::vector<transport::SessionHandle> old_sessions;
{
- stdx::unique_lock<stdx::mutex> lock(_mutex);
+ stdx::unique_lock<Latch> lock(_mutex);
old_sessions.swap(_sessions);
}
old_sessions.clear();
@@ -75,7 +75,7 @@ public:
void appendStats(BSONObjBuilder*) const override {}
size_t numOpenSessions() const override {
- stdx::unique_lock<stdx::mutex> lock(_mutex);
+ stdx::unique_lock<Latch> lock(_mutex);
return _sessions.size();
}
@@ -88,12 +88,12 @@ public:
}
void waitForConnect() {
- stdx::unique_lock<stdx::mutex> lock(_mutex);
+ stdx::unique_lock<Latch> lock(_mutex);
_cv.wait(lock, [&] { return !_sessions.empty(); });
}
private:
- mutable stdx::mutex _mutex;
+ mutable Mutex _mutex = MONGO_MAKE_LATCH("::_mutex");
stdx::condition_variable _cv;
std::vector<transport::SessionHandle> _sessions;
transport::TransportLayer* _transport = nullptr;
@@ -107,7 +107,7 @@ public:
SockAddr sa{"localhost", _port, AF_INET};
s.connect(sa);
log() << "connection: port " << _port;
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
_cv.wait(lk, [&] { return _stop; });
log() << "connection: Rx stop request";
}};
@@ -115,7 +115,7 @@ public:
void stop() {
{
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
_stop = true;
}
log() << "connection: Tx stop request";
@@ -125,7 +125,7 @@ public:
}
private:
- stdx::mutex _mutex;
+ Mutex _mutex = MONGO_MAKE_LATCH("SimpleConnectionThread::_mutex");
stdx::condition_variable _cv;
stdx::thread _thr;
bool _stop = false;
@@ -196,7 +196,7 @@ public:
}
bool waitForTimeout(boost::optional<Milliseconds> timeout = boost::none) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
bool ret = true;
if (timeout) {
ret = _cond.wait_for(lk, timeout->toSystemDuration(), [this] { return _finished; });
@@ -210,7 +210,7 @@ public:
protected:
void notifyComplete() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
_finished = true;
_cond.notify_one();
}
@@ -221,7 +221,7 @@ protected:
}
private:
- stdx::mutex _mutex;
+ Mutex _mutex = MONGO_MAKE_LATCH("TimeoutSEP::_mutex");
stdx::condition_variable _cond;
bool _finished = false;
diff --git a/src/mongo/transport/transport_layer_manager.cpp b/src/mongo/transport/transport_layer_manager.cpp
index 97a44e104f4..3db9e9f17a5 100644
--- a/src/mongo/transport/transport_layer_manager.cpp
+++ b/src/mongo/transport/transport_layer_manager.cpp
@@ -53,7 +53,7 @@ TransportLayerManager::TransportLayerManager() = default;
template <typename Callable>
void TransportLayerManager::_foreach(Callable&& cb) const {
{
- stdx::lock_guard<stdx::mutex> lk(_tlsMutex);
+ stdx::lock_guard<Latch> lk(_tlsMutex);
for (auto&& tl : _tls) {
cb(tl.get());
}
@@ -111,7 +111,7 @@ Status TransportLayerManager::setup() {
Status TransportLayerManager::addAndStartTransportLayer(std::unique_ptr<TransportLayer> tl) {
auto ptr = tl.get();
{
- stdx::lock_guard<stdx::mutex> lk(_tlsMutex);
+ stdx::lock_guard<Latch> lk(_tlsMutex);
_tls.emplace_back(std::move(tl));
}
return ptr->start();
diff --git a/src/mongo/transport/transport_layer_manager.h b/src/mongo/transport/transport_layer_manager.h
index 1dd5ef38527..3bc0e6ba5c6 100644
--- a/src/mongo/transport/transport_layer_manager.h
+++ b/src/mongo/transport/transport_layer_manager.h
@@ -32,7 +32,7 @@
#include <vector>
#include "mongo/base/status.h"
-#include "mongo/stdx/mutex.h"
+#include "mongo/platform/mutex.h"
#include "mongo/transport/session.h"
#include "mongo/transport/transport_layer.h"
#include "mongo/util/time_support.h"
@@ -91,7 +91,7 @@ public:
static std::unique_ptr<TransportLayer> makeAndStartDefaultEgressTransportLayer();
BatonHandle makeBaton(OperationContext* opCtx) const override {
- stdx::lock_guard<stdx::mutex> lk(_tlsMutex);
+ stdx::lock_guard<Latch> lk(_tlsMutex);
// TODO: figure out what to do about managers with more than one transport layer.
invariant(_tls.size() == 1);
return _tls[0]->makeBaton(opCtx);
@@ -101,7 +101,7 @@ private:
template <typename Callable>
void _foreach(Callable&& cb) const;
- mutable stdx::mutex _tlsMutex;
+ mutable Mutex _tlsMutex = MONGO_MAKE_LATCH("TransportLayerManager::_tlsMutex");
std::vector<std::unique_ptr<TransportLayer>> _tls;
};