summaryrefslogtreecommitdiff
path: root/src/mongo/executor
diff options
context:
space:
mode:
authorBen Caimano <ben.caimano@mongodb.com>2019-09-17 23:22:19 +0000
committerevergreen <evergreen@mongodb.com>2019-09-17 23:22:19 +0000
commitbc11369435ca51e2ff6897433d00f6b909f6a25f (patch)
tree251653ec8285d798b41846e343e7e414e80ff277 /src/mongo/executor
parent45aea2495306dd61fab46bd398735bb6aaf7b53a (diff)
downloadmongo-bc11369435ca51e2ff6897433d00f6b909f6a25f.tar.gz
SERVER-42165 Replace uses of stdx::mutex with mongo::Mutex
Diffstat (limited to 'src/mongo/executor')
-rw-r--r--src/mongo/executor/async_multicaster.cpp12
-rw-r--r--src/mongo/executor/async_multicaster.h2
-rw-r--r--src/mongo/executor/async_timer_mock.cpp12
-rw-r--r--src/mongo/executor/async_timer_mock.h4
-rw-r--r--src/mongo/executor/connection_pool.cpp2
-rw-r--r--src/mongo/executor/connection_pool.h4
-rw-r--r--src/mongo/executor/connection_pool_tl.cpp6
-rw-r--r--src/mongo/executor/connection_pool_tl.h2
-rw-r--r--src/mongo/executor/egress_tag_closer_manager.cpp10
-rw-r--r--src/mongo/executor/egress_tag_closer_manager.h4
-rw-r--r--src/mongo/executor/network_interface_integration_test.cpp8
-rw-r--r--src/mongo/executor/network_interface_mock.cpp58
-rw-r--r--src/mongo/executor/network_interface_mock.h10
-rw-r--r--src/mongo/executor/network_interface_perf_test.cpp6
-rw-r--r--src/mongo/executor/network_interface_thread_pool.cpp18
-rw-r--r--src/mongo/executor/network_interface_thread_pool.h10
-rw-r--r--src/mongo/executor/network_interface_tl.cpp28
-rw-r--r--src/mongo/executor/network_interface_tl.h4
-rw-r--r--src/mongo/executor/scoped_task_executor.cpp4
-rw-r--r--src/mongo/executor/scoped_task_executor.h4
-rw-r--r--src/mongo/executor/task_executor.h2
-rw-r--r--src/mongo/executor/thread_pool_mock.cpp18
-rw-r--r--src/mongo/executor/thread_pool_mock.h10
-rw-r--r--src/mongo/executor/thread_pool_task_executor.cpp50
-rw-r--r--src/mongo/executor/thread_pool_task_executor.h16
25 files changed, 152 insertions, 152 deletions
diff --git a/src/mongo/executor/async_multicaster.cpp b/src/mongo/executor/async_multicaster.cpp
index 24e72527d51..3962f8f551d 100644
--- a/src/mongo/executor/async_multicaster.cpp
+++ b/src/mongo/executor/async_multicaster.cpp
@@ -37,8 +37,8 @@
#include "mongo/base/status.h"
#include "mongo/db/operation_context.h"
-#include "mongo/stdx/condition_variable.h"
-#include "mongo/stdx/mutex.h"
+#include "mongo/platform/condition_variable.h"
+#include "mongo/platform/mutex.h"
#include "mongo/util/assert_util.h"
namespace mongo {
@@ -60,7 +60,7 @@ std::vector<AsyncMulticaster::Reply> AsyncMulticaster::multicast(
struct State {
State(size_t leftToDo) : leftToDo(leftToDo) {}
- stdx::mutex mutex;
+ Mutex mutex = MONGO_MAKE_LATCH("State::mutex");
stdx::condition_variable cv;
size_t leftToDo;
size_t running = 0;
@@ -71,7 +71,7 @@ std::vector<AsyncMulticaster::Reply> AsyncMulticaster::multicast(
auto state = std::make_shared<State>(servers.size());
for (const auto& server : servers) {
- stdx::unique_lock<stdx::mutex> lk(state->mutex);
+ stdx::unique_lock<Latch> lk(state->mutex);
// spin up no more than maxConcurrency tasks at once
opCtx->waitForConditionOrInterrupt(
state->cv, lk, [&] { return state->running < _options.maxConcurrency; });
@@ -80,7 +80,7 @@ std::vector<AsyncMulticaster::Reply> AsyncMulticaster::multicast(
uassertStatusOK(_executor->scheduleRemoteCommand(
RemoteCommandRequest{server, theDbName, theCmdObj, opCtx, timeoutMillis},
[state](const TaskExecutor::RemoteCommandCallbackArgs& cbData) {
- stdx::lock_guard<stdx::mutex> lk(state->mutex);
+ stdx::lock_guard<Latch> lk(state->mutex);
state->out.emplace_back(
std::forward_as_tuple(cbData.request.target, cbData.response));
@@ -96,7 +96,7 @@ std::vector<AsyncMulticaster::Reply> AsyncMulticaster::multicast(
}));
}
- stdx::unique_lock<stdx::mutex> lk(state->mutex);
+ stdx::unique_lock<Latch> lk(state->mutex);
opCtx->waitForConditionOrInterrupt(state->cv, lk, [&] { return state->leftToDo == 0; });
return std::move(state->out);
diff --git a/src/mongo/executor/async_multicaster.h b/src/mongo/executor/async_multicaster.h
index c2bc9e0be93..63eaaa4993d 100644
--- a/src/mongo/executor/async_multicaster.h
+++ b/src/mongo/executor/async_multicaster.h
@@ -34,7 +34,7 @@
#include "mongo/executor/remote_command_response.h"
#include "mongo/executor/task_executor.h"
-#include "mongo/stdx/mutex.h"
+#include "mongo/platform/mutex.h"
#include "mongo/util/net/hostandport.h"
namespace mongo {
diff --git a/src/mongo/executor/async_timer_mock.cpp b/src/mongo/executor/async_timer_mock.cpp
index 6a796b684e7..9153cba5f0b 100644
--- a/src/mongo/executor/async_timer_mock.cpp
+++ b/src/mongo/executor/async_timer_mock.cpp
@@ -48,7 +48,7 @@ void AsyncTimerMockImpl::cancel() {
void AsyncTimerMockImpl::asyncWait(AsyncTimerInterface::Handler handler) {
{
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
if (_timeLeft != kZeroMilliseconds) {
_handlers.push_back(handler);
return;
@@ -66,7 +66,7 @@ void AsyncTimerMockImpl::fastForward(Milliseconds time) {
// While holding the lock, change the time and remove
// handlers that have expired
{
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
if (time >= _timeLeft) {
_timeLeft = kZeroMilliseconds;
tmp.swap(_handlers);
@@ -82,7 +82,7 @@ void AsyncTimerMockImpl::fastForward(Milliseconds time) {
}
Milliseconds AsyncTimerMockImpl::timeLeft() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
return _timeLeft;
}
@@ -91,7 +91,7 @@ void AsyncTimerMockImpl::expireAfter(Milliseconds expiration) {
// While holding the lock, reset the time and remove all handlers
{
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
_timeLeft = expiration;
tmp.swap(_handlers);
}
@@ -103,14 +103,14 @@ void AsyncTimerMockImpl::expireAfter(Milliseconds expiration) {
}
int AsyncTimerMockImpl::jobs() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
return _handlers.size();
}
void AsyncTimerMockImpl::_callAllHandlers(std::error_code ec) {
std::vector<AsyncTimerInterface::Handler> tmp;
{
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
tmp.swap(_handlers);
}
diff --git a/src/mongo/executor/async_timer_mock.h b/src/mongo/executor/async_timer_mock.h
index 13463b679d7..5e3a83e3275 100644
--- a/src/mongo/executor/async_timer_mock.h
+++ b/src/mongo/executor/async_timer_mock.h
@@ -32,7 +32,7 @@
#include <vector>
#include "mongo/executor/async_timer_interface.h"
-#include "mongo/stdx/mutex.h"
+#include "mongo/platform/mutex.h"
#include "mongo/stdx/unordered_set.h"
namespace mongo {
@@ -84,7 +84,7 @@ public:
private:
void _callAllHandlers(std::error_code ec);
- stdx::mutex _mutex;
+ Mutex _mutex = MONGO_MAKE_LATCH("AsyncTimerMockImpl::_mutex");
Milliseconds _timeLeft;
std::vector<AsyncTimerInterface::Handler> _handlers;
};
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp
index 77a83ec4ff4..a0a4c9f0a7c 100644
--- a/src/mongo/executor/connection_pool.cpp
+++ b/src/mongo/executor/connection_pool.cpp
@@ -189,7 +189,7 @@ protected:
size_t target = 0;
};
- stdx::mutex _mutex;
+ Mutex _mutex = MONGO_MAKE_LATCH("LimitController::_mutex");
stdx::unordered_map<PoolId, PoolData> _poolData;
};
diff --git a/src/mongo/executor/connection_pool.h b/src/mongo/executor/connection_pool.h
index 196e99014b7..f46c3c23dfa 100644
--- a/src/mongo/executor/connection_pool.h
+++ b/src/mongo/executor/connection_pool.h
@@ -35,7 +35,7 @@
#include "mongo/executor/egress_tag_closer.h"
#include "mongo/executor/egress_tag_closer_manager.h"
-#include "mongo/stdx/mutex.h"
+#include "mongo/platform/mutex.h"
#include "mongo/stdx/unordered_map.h"
#include "mongo/transport/session.h"
#include "mongo/transport/transport_layer.h"
@@ -255,7 +255,7 @@ private:
std::shared_ptr<ControllerInterface> _controller;
// The global mutex for specific pool access and the generation counter
- mutable stdx::mutex _mutex;
+ mutable Mutex _mutex = MONGO_MAKE_LATCH("ConnectionPool::_mutex");
PoolId _nextPoolId = 0;
stdx::unordered_map<HostAndPort, std::shared_ptr<SpecificPool>> _pools;
diff --git a/src/mongo/executor/connection_pool_tl.cpp b/src/mongo/executor/connection_pool_tl.cpp
index e2f7711cca7..c3816eab43c 100644
--- a/src/mongo/executor/connection_pool_tl.cpp
+++ b/src/mongo/executor/connection_pool_tl.cpp
@@ -56,7 +56,7 @@ void TLTypeFactory::shutdown() {
// Stop any attempt to schedule timers in the future
_inShutdown.store(true);
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
log() << "Killing all outstanding egress activity.";
for (auto collar : _collars) {
@@ -65,12 +65,12 @@ void TLTypeFactory::shutdown() {
}
void TLTypeFactory::fasten(Type* type) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
_collars.insert(type);
}
void TLTypeFactory::release(Type* type) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
_collars.erase(type);
type->_wasReleased = true;
diff --git a/src/mongo/executor/connection_pool_tl.h b/src/mongo/executor/connection_pool_tl.h
index 7297713b92b..f5bf54ff081 100644
--- a/src/mongo/executor/connection_pool_tl.h
+++ b/src/mongo/executor/connection_pool_tl.h
@@ -79,7 +79,7 @@ private:
std::unique_ptr<NetworkConnectionHook> _onConnectHook;
const ConnectionPool::Options _connPoolOptions;
- mutable stdx::mutex _mutex;
+ mutable Mutex _mutex = MONGO_MAKE_LATCH("TLTypeFactory::_mutex");
AtomicWord<bool> _inShutdown{false};
stdx::unordered_set<Type*> _collars;
};
diff --git a/src/mongo/executor/egress_tag_closer_manager.cpp b/src/mongo/executor/egress_tag_closer_manager.cpp
index fa78a74e183..64c0f0493b8 100644
--- a/src/mongo/executor/egress_tag_closer_manager.cpp
+++ b/src/mongo/executor/egress_tag_closer_manager.cpp
@@ -48,19 +48,19 @@ EgressTagCloserManager& EgressTagCloserManager::get(ServiceContext* svc) {
}
void EgressTagCloserManager::add(EgressTagCloser* etc) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
_egressTagClosers.insert(etc);
}
void EgressTagCloserManager::remove(EgressTagCloser* etc) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
_egressTagClosers.erase(etc);
}
void EgressTagCloserManager::dropConnections(transport::Session::TagMask tags) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
for (auto etc : _egressTagClosers) {
etc->dropConnections(tags);
@@ -68,7 +68,7 @@ void EgressTagCloserManager::dropConnections(transport::Session::TagMask tags) {
}
void EgressTagCloserManager::dropConnections(const HostAndPort& hostAndPort) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
for (auto etc : _egressTagClosers) {
etc->dropConnections(hostAndPort);
@@ -78,7 +78,7 @@ void EgressTagCloserManager::dropConnections(const HostAndPort& hostAndPort) {
void EgressTagCloserManager::mutateTags(
const HostAndPort& hostAndPort,
const std::function<transport::Session::TagMask(transport::Session::TagMask)>& mutateFunc) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
for (auto etc : _egressTagClosers) {
etc->mutateTags(hostAndPort, mutateFunc);
diff --git a/src/mongo/executor/egress_tag_closer_manager.h b/src/mongo/executor/egress_tag_closer_manager.h
index 418658dc430..91d996ee3dc 100644
--- a/src/mongo/executor/egress_tag_closer_manager.h
+++ b/src/mongo/executor/egress_tag_closer_manager.h
@@ -33,7 +33,7 @@
#include "mongo/db/service_context.h"
#include "mongo/executor/egress_tag_closer.h"
-#include "mongo/stdx/mutex.h"
+#include "mongo/platform/mutex.h"
#include "mongo/stdx/unordered_set.h"
#include "mongo/transport/session.h"
#include "mongo/util/net/hostandport.h"
@@ -65,7 +65,7 @@ public:
const std::function<transport::Session::TagMask(transport::Session::TagMask)>& mutateFunc);
private:
- stdx::mutex _mutex;
+ Mutex _mutex = MONGO_MAKE_LATCH("EgressTagCloserManager::_mutex");
stdx::unordered_set<EgressTagCloser*> _egressTagClosers;
};
diff --git a/src/mongo/executor/network_interface_integration_test.cpp b/src/mongo/executor/network_interface_integration_test.cpp
index f4f05bbc538..03cf7d2a1e9 100644
--- a/src/mongo/executor/network_interface_integration_test.cpp
+++ b/src/mongo/executor/network_interface_integration_test.cpp
@@ -167,14 +167,14 @@ public:
RemoteCommandResponse response;
};
IsMasterData waitForIsMaster() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
_isMasterCond.wait(lk, [this] { return _isMasterResult != boost::none; });
return std::move(*_isMasterResult);
}
bool hasIsMaster() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
return _isMasterResult != boost::none;
}
@@ -186,7 +186,7 @@ private:
Status validateHost(const HostAndPort& host,
const BSONObj& request,
const RemoteCommandResponse& isMasterReply) override {
- stdx::lock_guard<stdx::mutex> lk(_parent->_mutex);
+ stdx::lock_guard<Latch> lk(_parent->_mutex);
_parent->_isMasterResult = IsMasterData{request, isMasterReply};
_parent->_isMasterCond.notify_all();
return Status::OK();
@@ -204,7 +204,7 @@ private:
NetworkInterfaceTest* _parent;
};
- stdx::mutex _mutex;
+ Mutex _mutex = MONGO_MAKE_LATCH("NetworkInterfaceTest::_mutex");
stdx::condition_variable _isMasterCond;
boost::optional<IsMasterData> _isMasterResult;
};
diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp
index 86e1144b81e..478d72c4b39 100644
--- a/src/mongo/executor/network_interface_mock.cpp
+++ b/src/mongo/executor/network_interface_mock.cpp
@@ -58,14 +58,14 @@ NetworkInterfaceMock::NetworkInterfaceMock()
_executorNextWakeupDate(Date_t::max()) {}
NetworkInterfaceMock::~NetworkInterfaceMock() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
invariant(!_hasStarted || inShutdown());
invariant(_scheduled.empty());
invariant(_blackHoled.empty());
}
void NetworkInterfaceMock::logQueues() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
const std::vector<std::pair<std::string, const NetworkOperationList*>> queues{
{"unscheduled", &_unscheduled},
{"scheduled", &_scheduled},
@@ -85,7 +85,7 @@ void NetworkInterfaceMock::logQueues() {
}
std::string NetworkInterfaceMock::getDiagnosticString() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
return str::stream() << "NetworkInterfaceMock -- waitingToRunMask:" << _waitingToRunMask
<< ", now:" << _now_inlock().toString() << ", hasStarted:" << _hasStarted
<< ", inShutdown: " << _inShutdown.load()
@@ -96,7 +96,7 @@ std::string NetworkInterfaceMock::getDiagnosticString() {
}
Date_t NetworkInterfaceMock::now() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
return _now_inlock();
}
@@ -112,7 +112,7 @@ Status NetworkInterfaceMock::startCommand(const CallbackHandle& cbHandle,
return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceMock shutdown in progress"};
}
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
const Date_t now = _now_inlock();
auto op = NetworkOperation(cbHandle, request, now, std::move(onFinish));
@@ -132,7 +132,7 @@ Status NetworkInterfaceMock::startCommand(const CallbackHandle& cbHandle,
void NetworkInterfaceMock::setHandshakeReplyForHost(
const mongo::HostAndPort& host, mongo::executor::RemoteCommandResponse&& reply) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
auto it = _handshakeReplies.find(host);
if (it == std::end(_handshakeReplies)) {
auto res = _handshakeReplies.emplace(host, std::move(reply));
@@ -145,7 +145,7 @@ void NetworkInterfaceMock::setHandshakeReplyForHost(
void NetworkInterfaceMock::cancelCommand(const CallbackHandle& cbHandle, const BatonHandle& baton) {
invariant(!inShutdown());
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
ResponseStatus rs(ErrorCodes::CallbackCanceled, "Network operation canceled", Milliseconds(0));
// We mimic the real NetworkInterface by only delivering the CallbackCanceled status if the
@@ -179,7 +179,7 @@ Status NetworkInterfaceMock::setAlarm(const TaskExecutor::CallbackHandle& cbHand
return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceMock shutdown in progress"};
}
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
if (when <= _now_inlock()) {
lk.unlock();
@@ -210,7 +210,7 @@ bool NetworkInterfaceMock::onNetworkThread() {
}
void NetworkInterfaceMock::startup() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
_startup_inlock();
}
@@ -225,7 +225,7 @@ void NetworkInterfaceMock::_startup_inlock() {
void NetworkInterfaceMock::shutdown() {
invariant(!inShutdown());
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
if (!_hasStarted) {
_startup_inlock();
}
@@ -258,7 +258,7 @@ bool NetworkInterfaceMock::inShutdown() const {
}
void NetworkInterfaceMock::enterNetwork() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
while (!_isNetworkThreadRunnable_inlock()) {
_shouldWakeNetworkCondition.wait(lk);
}
@@ -267,7 +267,7 @@ void NetworkInterfaceMock::enterNetwork() {
}
void NetworkInterfaceMock::exitNetwork() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
if (_currentlyRunning != kNetworkThread) {
return;
}
@@ -279,7 +279,7 @@ void NetworkInterfaceMock::exitNetwork() {
}
bool NetworkInterfaceMock::hasReadyRequests() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
invariant(_currentlyRunning == kNetworkThread);
return _hasReadyRequests_inlock();
}
@@ -294,7 +294,7 @@ bool NetworkInterfaceMock::_hasReadyRequests_inlock() {
}
NetworkInterfaceMock::NetworkOperationIterator NetworkInterfaceMock::getNextReadyRequest() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
invariant(_currentlyRunning == kNetworkThread);
while (!_hasReadyRequests_inlock()) {
_waitingToRunMask |= kExecutorThread;
@@ -311,7 +311,7 @@ NetworkInterfaceMock::NetworkOperationIterator NetworkInterfaceMock::getFrontOfU
NetworkInterfaceMock::NetworkOperationIterator NetworkInterfaceMock::getNthUnscheduledRequest(
size_t n) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
invariant(_currentlyRunning == kNetworkThread);
invariant(_hasReadyRequests_inlock());
@@ -325,7 +325,7 @@ NetworkInterfaceMock::NetworkOperationIterator NetworkInterfaceMock::getNthUnsch
void NetworkInterfaceMock::scheduleResponse(NetworkOperationIterator noi,
Date_t when,
const ResponseStatus& response) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
invariant(_currentlyRunning == kNetworkThread);
NetworkOperationIterator insertBefore = _scheduled.begin();
while ((insertBefore != _scheduled.end()) && (insertBefore->getResponseDate() <= when)) {
@@ -388,13 +388,13 @@ RemoteCommandRequest NetworkInterfaceMock::scheduleErrorResponse(NetworkOperatio
}
void NetworkInterfaceMock::blackHole(NetworkOperationIterator noi) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
invariant(_currentlyRunning == kNetworkThread);
_blackHoled.splice(_blackHoled.end(), _processing, noi);
}
void NetworkInterfaceMock::requeueAt(NetworkOperationIterator noi, Date_t dontAskUntil) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
invariant(_currentlyRunning == kNetworkThread);
invariant(noi->getNextConsiderationDate() < dontAskUntil);
invariant(_now_inlock() < dontAskUntil);
@@ -409,7 +409,7 @@ void NetworkInterfaceMock::requeueAt(NetworkOperationIterator noi, Date_t dontAs
}
Date_t NetworkInterfaceMock::runUntil(Date_t until) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
invariant(_currentlyRunning == kNetworkThread);
invariant(until > _now_inlock());
while (until > _now_inlock()) {
@@ -436,7 +436,7 @@ Date_t NetworkInterfaceMock::runUntil(Date_t until) {
}
void NetworkInterfaceMock::advanceTime(Date_t newTime) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
invariant(_currentlyRunning == kNetworkThread);
invariant(newTime > _now_inlock());
_now = newTime;
@@ -446,19 +446,19 @@ void NetworkInterfaceMock::advanceTime(Date_t newTime) {
}
void NetworkInterfaceMock::runReadyNetworkOperations() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
invariant(_currentlyRunning == kNetworkThread);
_runReadyNetworkOperations_inlock(&lk);
}
void NetworkInterfaceMock::waitForWork() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
invariant(_currentlyRunning == kExecutorThread);
_waitForWork_inlock(&lk);
}
void NetworkInterfaceMock::waitForWorkUntil(Date_t when) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
invariant(_currentlyRunning == kExecutorThread);
_executorNextWakeupDate = when;
if (_executorNextWakeupDate <= _now_inlock()) {
@@ -538,7 +538,7 @@ void NetworkInterfaceMock::_connectThenEnqueueOperation_inlock(const HostAndPort
// The completion handler for the postconnect command schedules the original command.
auto postconnectCompletionHandler =
[this, op = std::move(op)](TaskExecutor::ResponseOnAnyStatus rs) mutable {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
if (!rs.isOK()) {
op.setResponse(_now_inlock(), rs);
op.finishResponse();
@@ -566,7 +566,7 @@ void NetworkInterfaceMock::_connectThenEnqueueOperation_inlock(const HostAndPort
}
void NetworkInterfaceMock::setConnectionHook(std::unique_ptr<NetworkConnectionHook> hook) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
invariant(!_hasStarted);
invariant(!_hook);
_hook = std::move(hook);
@@ -574,21 +574,21 @@ void NetworkInterfaceMock::setConnectionHook(std::unique_ptr<NetworkConnectionHo
void NetworkInterfaceMock::setEgressMetadataHook(
std::unique_ptr<rpc::EgressMetadataHook> metadataHook) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
invariant(!_hasStarted);
invariant(!_metadataHook);
_metadataHook = std::move(metadataHook);
}
void NetworkInterfaceMock::signalWorkAvailable() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
_waitingToRunMask |= kExecutorThread;
if (_currentlyRunning == kNoThread) {
_shouldWakeExecutorCondition.notify_one();
}
}
-void NetworkInterfaceMock::_runReadyNetworkOperations_inlock(stdx::unique_lock<stdx::mutex>* lk) {
+void NetworkInterfaceMock::_runReadyNetworkOperations_inlock(stdx::unique_lock<Latch>* lk) {
while (!_alarms.empty() && _now_inlock() >= _alarms.top().when) {
auto& alarm = _alarms.top();
@@ -626,7 +626,7 @@ void NetworkInterfaceMock::_runReadyNetworkOperations_inlock(stdx::unique_lock<s
_waitingToRunMask &= ~kNetworkThread;
}
-void NetworkInterfaceMock::_waitForWork_inlock(stdx::unique_lock<stdx::mutex>* lk) {
+void NetworkInterfaceMock::_waitForWork_inlock(stdx::unique_lock<Latch>* lk) {
if (_waitingToRunMask & kExecutorThread) {
_waitingToRunMask &= ~kExecutorThread;
return;
diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h
index 8fe6cdb3414..8a5d69b5ad9 100644
--- a/src/mongo/executor/network_interface_mock.h
+++ b/src/mongo/executor/network_interface_mock.h
@@ -36,9 +36,9 @@
#include <vector>
#include "mongo/executor/network_interface.h"
+#include "mongo/platform/condition_variable.h"
+#include "mongo/platform/mutex.h"
#include "mongo/rpc/metadata/metadata_hook.h"
-#include "mongo/stdx/condition_variable.h"
-#include "mongo/stdx/mutex.h"
#include "mongo/stdx/unordered_map.h"
#include "mongo/stdx/unordered_set.h"
#include "mongo/util/clock_source.h"
@@ -324,7 +324,7 @@ private:
/**
* Implementation of waitForWork*.
*/
- void _waitForWork_inlock(stdx::unique_lock<stdx::mutex>* lk);
+ void _waitForWork_inlock(stdx::unique_lock<Latch>* lk);
/**
* Returns true if there are ready requests for the network thread to service.
@@ -356,12 +356,12 @@ private:
* reaquire "lk" several times, but will not return until the executor has blocked
* in waitFor*.
*/
- void _runReadyNetworkOperations_inlock(stdx::unique_lock<stdx::mutex>* lk);
+ void _runReadyNetworkOperations_inlock(stdx::unique_lock<Latch>* lk);
// Mutex that synchronizes access to mutable data in this class and its subclasses.
// Fields guarded by the mutex are labled (M), below, and those that are read-only
// in multi-threaded execution, and so unsynchronized, are labeled (R).
- stdx::mutex _mutex;
+ Mutex _mutex = MONGO_MAKE_LATCH("NetworkInterfaceMock::_mutex");
// Condition signaled to indicate that the network processing thread should wake up.
stdx::condition_variable _shouldWakeNetworkCondition; // (M)
diff --git a/src/mongo/executor/network_interface_perf_test.cpp b/src/mongo/executor/network_interface_perf_test.cpp
index 205caa22a2a..2f625301dfe 100644
--- a/src/mongo/executor/network_interface_perf_test.cpp
+++ b/src/mongo/executor/network_interface_perf_test.cpp
@@ -66,7 +66,7 @@ int timeNetworkTestMillis(std::size_t operations, NetworkInterface* net) {
auto server = fixture.getServers()[0];
std::atomic<int> remainingOps(operations); // NOLINT
- stdx::mutex mtx;
+ auto mtx = MONGO_MAKE_LATCH();
stdx::condition_variable cv;
Timer t;
@@ -81,7 +81,7 @@ int timeNetworkTestMillis(std::size_t operations, NetworkInterface* net) {
if (--remainingOps) {
return func();
}
- stdx::unique_lock<stdx::mutex> lk(mtx);
+ stdx::unique_lock<Latch> lk(mtx);
cv.notify_one();
};
@@ -93,7 +93,7 @@ int timeNetworkTestMillis(std::size_t operations, NetworkInterface* net) {
func();
- stdx::unique_lock<stdx::mutex> lk(mtx);
+ stdx::unique_lock<Latch> lk(mtx);
cv.wait(lk, [&] { return remainingOps.load() == 0; });
return t.millis();
diff --git a/src/mongo/executor/network_interface_thread_pool.cpp b/src/mongo/executor/network_interface_thread_pool.cpp
index 787bd0a6dac..f40a298aea1 100644
--- a/src/mongo/executor/network_interface_thread_pool.cpp
+++ b/src/mongo/executor/network_interface_thread_pool.cpp
@@ -49,7 +49,7 @@ NetworkInterfaceThreadPool::~NetworkInterfaceThreadPool() {
void NetworkInterfaceThreadPool::_dtorImpl() {
{
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
if (_tasks.empty())
return;
@@ -63,7 +63,7 @@ void NetworkInterfaceThreadPool::_dtorImpl() {
}
void NetworkInterfaceThreadPool::startup() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
if (_started) {
severe() << "Attempting to start pool, but it has already started";
fassertFailed(34358);
@@ -75,7 +75,7 @@ void NetworkInterfaceThreadPool::startup() {
void NetworkInterfaceThreadPool::shutdown() {
{
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
_inShutdown = true;
}
@@ -84,7 +84,7 @@ void NetworkInterfaceThreadPool::shutdown() {
void NetworkInterfaceThreadPool::join() {
{
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
if (_joining) {
severe() << "Attempted to join pool more than once";
@@ -100,13 +100,13 @@ void NetworkInterfaceThreadPool::join() {
_net->signalWorkAvailable();
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
_joiningCondition.wait(
lk, [&] { return _tasks.empty() && (_consumeState == ConsumeState::kNeutral); });
}
void NetworkInterfaceThreadPool::schedule(Task task) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
if (_inShutdown) {
lk.unlock();
task({ErrorCodes::ShutdownInProgress, "Shutdown in progress"});
@@ -127,7 +127,7 @@ void NetworkInterfaceThreadPool::schedule(Task task) {
* allows us to use the network interface's threads as our own pool, which should reduce context
* switches if our tasks are getting scheduled by network interface tasks.
*/
-void NetworkInterfaceThreadPool::_consumeTasks(stdx::unique_lock<stdx::mutex> lk) {
+void NetworkInterfaceThreadPool::_consumeTasks(stdx::unique_lock<Latch> lk) {
if ((_consumeState != ConsumeState::kNeutral) || _tasks.empty())
return;
@@ -140,7 +140,7 @@ void NetworkInterfaceThreadPool::_consumeTasks(stdx::unique_lock<stdx::mutex> lk
_consumeState = ConsumeState::kScheduled;
lk.unlock();
auto ret = _net->schedule([this](Status status) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
if (_consumeState != ConsumeState::kScheduled)
return;
@@ -149,7 +149,7 @@ void NetworkInterfaceThreadPool::_consumeTasks(stdx::unique_lock<stdx::mutex> lk
invariant(ret.isOK() || ErrorCodes::isShutdownError(ret.code()));
}
-void NetworkInterfaceThreadPool::_consumeTasksInline(stdx::unique_lock<stdx::mutex> lk) noexcept {
+void NetworkInterfaceThreadPool::_consumeTasksInline(stdx::unique_lock<Latch> lk) noexcept {
_consumeState = ConsumeState::kConsuming;
const auto consumingTasksGuard = makeGuard([&] { _consumeState = ConsumeState::kNeutral; });
diff --git a/src/mongo/executor/network_interface_thread_pool.h b/src/mongo/executor/network_interface_thread_pool.h
index 51771393032..946519b56f1 100644
--- a/src/mongo/executor/network_interface_thread_pool.h
+++ b/src/mongo/executor/network_interface_thread_pool.h
@@ -32,8 +32,8 @@
#include <cstdint>
#include <vector>
-#include "mongo/stdx/condition_variable.h"
-#include "mongo/stdx/mutex.h"
+#include "mongo/platform/condition_variable.h"
+#include "mongo/platform/mutex.h"
#include "mongo/util/concurrency/thread_pool_interface.h"
namespace mongo {
@@ -60,14 +60,14 @@ public:
void schedule(Task task) override;
private:
- void _consumeTasks(stdx::unique_lock<stdx::mutex> lk);
- void _consumeTasksInline(stdx::unique_lock<stdx::mutex> lk) noexcept;
+ void _consumeTasks(stdx::unique_lock<Latch> lk);
+ void _consumeTasksInline(stdx::unique_lock<Latch> lk) noexcept;
void _dtorImpl();
NetworkInterface* const _net;
// Protects all of the pool state below
- stdx::mutex _mutex;
+ Mutex _mutex = MONGO_MAKE_LATCH("NetworkInterfaceThreadPool::_mutex");
stdx::condition_variable _joiningCondition;
std::vector<Task> _tasks;
bool _started = false;
diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp
index bcd1672e50e..e932130b005 100644
--- a/src/mongo/executor/network_interface_tl.cpp
+++ b/src/mongo/executor/network_interface_tl.cpp
@@ -65,7 +65,7 @@ std::string NetworkInterfaceTL::getDiagnosticString() {
void NetworkInterfaceTL::appendConnectionStats(ConnectionPoolStats* stats) const {
auto pool = [&] {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
return _pool.get();
}();
if (pool)
@@ -74,7 +74,7 @@ void NetworkInterfaceTL::appendConnectionStats(ConnectionPoolStats* stats) const
NetworkInterface::Counters NetworkInterfaceTL::getCounters() const {
invariant(getTestCommandsEnabled());
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
return _counters;
}
@@ -83,7 +83,7 @@ std::string NetworkInterfaceTL::getHostName() {
}
void NetworkInterfaceTL::startup() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
if (_svcCtx) {
_tl = _svcCtx->getTransportLayer();
}
@@ -144,19 +144,19 @@ bool NetworkInterfaceTL::inShutdown() const {
}
void NetworkInterfaceTL::waitForWork() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
MONGO_IDLE_THREAD_BLOCK;
_workReadyCond.wait(lk, [this] { return _isExecutorRunnable; });
}
void NetworkInterfaceTL::waitForWorkUntil(Date_t when) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
MONGO_IDLE_THREAD_BLOCK;
_workReadyCond.wait_until(lk, when.toSystemTimePoint(), [this] { return _isExecutorRunnable; });
}
void NetworkInterfaceTL::signalWorkAvailable() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
if (!_isExecutorRunnable) {
_isExecutorRunnable = true;
_workReadyCond.notify_one();
@@ -401,7 +401,7 @@ void NetworkInterfaceTL::_onAcquireConn(std::shared_ptr<CommandState> state,
}
if (getTestCommandsEnabled()) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
_counters.timedOut++;
}
@@ -449,7 +449,7 @@ void NetworkInterfaceTL::_onAcquireConn(std::shared_ptr<CommandState> state,
}
if (getTestCommandsEnabled()) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
if (swr.isOK() && swr.getValue().status.isOK()) {
_counters.succeeded++;
} else {
@@ -467,7 +467,7 @@ void NetworkInterfaceTL::_onAcquireConn(std::shared_ptr<CommandState> state,
void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle,
const BatonHandle& baton) {
- stdx::unique_lock<stdx::mutex> lk(_inProgressMutex);
+ stdx::unique_lock<Latch> lk(_inProgressMutex);
auto it = _inProgress.find(cbHandle);
if (it == _inProgress.end()) {
return;
@@ -485,7 +485,7 @@ void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHan
}
if (getTestCommandsEnabled()) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
_counters.canceled++;
}
@@ -528,7 +528,7 @@ Status NetworkInterfaceTL::setAlarm(const TaskExecutor::CallbackHandle& cbHandle
std::make_shared<AlarmState>(when, cbHandle, _reactor->makeTimer(), std::move(pf.promise));
{
- stdx::lock_guard<stdx::mutex> lk(_inProgressMutex);
+ stdx::lock_guard<Latch> lk(_inProgressMutex);
// If a user has already scheduled an alarm with a handle, make sure they intentionally
// override it by canceling and setting a new one.
@@ -546,7 +546,7 @@ Status NetworkInterfaceTL::setAlarm(const TaskExecutor::CallbackHandle& cbHandle
}
void NetworkInterfaceTL::cancelAlarm(const TaskExecutor::CallbackHandle& cbHandle) {
- stdx::unique_lock<stdx::mutex> lk(_inProgressMutex);
+ stdx::unique_lock<Latch> lk(_inProgressMutex);
auto iter = _inProgressAlarms.find(cbHandle);
@@ -566,7 +566,7 @@ void NetworkInterfaceTL::cancelAlarm(const TaskExecutor::CallbackHandle& cbHandl
void NetworkInterfaceTL::_cancelAllAlarms() {
auto alarms = [&] {
- stdx::unique_lock<stdx::mutex> lk(_inProgressMutex);
+ stdx::unique_lock<Latch> lk(_inProgressMutex);
return std::exchange(_inProgressAlarms, {});
}();
@@ -599,7 +599,7 @@ void NetworkInterfaceTL::_answerAlarm(Status status, std::shared_ptr<AlarmState>
// Erase the AlarmState from the map.
{
- stdx::lock_guard<stdx::mutex> lk(_inProgressMutex);
+ stdx::lock_guard<Latch> lk(_inProgressMutex);
auto iter = _inProgressAlarms.find(state->cbHandle);
if (iter == _inProgressAlarms.end()) {
diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h
index 15fdf391876..ee27fdd410f 100644
--- a/src/mongo/executor/network_interface_tl.h
+++ b/src/mongo/executor/network_interface_tl.h
@@ -147,7 +147,7 @@ private:
std::unique_ptr<transport::TransportLayer> _ownedTransportLayer;
transport::ReactorHandle _reactor;
- mutable stdx::mutex _mutex;
+ mutable Mutex _mutex = MONGO_MAKE_LATCH("NetworkInterfaceTL::_mutex");
ConnectionPool::Options _connPoolOpts;
std::unique_ptr<NetworkConnectionHook> _onConnectHook;
std::shared_ptr<ConnectionPool> _pool;
@@ -165,7 +165,7 @@ private:
AtomicWord<State> _state;
stdx::thread _ioThread;
- stdx::mutex _inProgressMutex;
+ Mutex _inProgressMutex = MONGO_MAKE_LATCH("NetworkInterfaceTL::_inProgressMutex");
stdx::unordered_map<TaskExecutor::CallbackHandle, std::weak_ptr<CommandState>> _inProgress;
stdx::unordered_map<TaskExecutor::CallbackHandle, std::shared_ptr<AlarmState>>
_inProgressAlarms;
diff --git a/src/mongo/executor/scoped_task_executor.cpp b/src/mongo/executor/scoped_task_executor.cpp
index e7e2f1bd5ae..057ff4aa971 100644
--- a/src/mongo/executor/scoped_task_executor.cpp
+++ b/src/mongo/executor/scoped_task_executor.cpp
@@ -226,7 +226,7 @@ private:
[id, work = std::forward<Work>(work), self = shared_from_this()](const auto& cargs) {
using ArgsT = std::decay_t<decltype(cargs)>;
- stdx::unique_lock<stdx::mutex> lk(self->_mutex);
+ stdx::unique_lock<Latch> lk(self->_mutex);
auto doWorkAndNotify = [&](const ArgsT& x) noexcept {
lk.unlock();
@@ -301,7 +301,7 @@ private:
}
}
- stdx::mutex _mutex;
+ Mutex _mutex = MONGO_MAKE_LATCH("ScopedTaskExecutor::_mutex");
bool _inShutdown = false;
std::shared_ptr<TaskExecutor> _executor;
size_t _id = 0;
diff --git a/src/mongo/executor/scoped_task_executor.h b/src/mongo/executor/scoped_task_executor.h
index dc166606115..bcdd49e4151 100644
--- a/src/mongo/executor/scoped_task_executor.h
+++ b/src/mongo/executor/scoped_task_executor.h
@@ -34,8 +34,8 @@
#include "mongo/base/status.h"
#include "mongo/executor/task_executor.h"
-#include "mongo/stdx/condition_variable.h"
-#include "mongo/stdx/mutex.h"
+#include "mongo/platform/condition_variable.h"
+#include "mongo/platform/mutex.h"
#include "mongo/stdx/unordered_map.h"
#include "mongo/util/fail_point_service.h"
diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h
index d36f5c9bac6..f84321a46b7 100644
--- a/src/mongo/executor/task_executor.h
+++ b/src/mongo/executor/task_executor.h
@@ -38,7 +38,7 @@
#include "mongo/base/string_data.h"
#include "mongo/executor/remote_command_request.h"
#include "mongo/executor/remote_command_response.h"
-#include "mongo/stdx/condition_variable.h"
+#include "mongo/platform/condition_variable.h"
#include "mongo/transport/baton.h"
#include "mongo/util/future.h"
#include "mongo/util/out_of_line_executor.h"
diff --git a/src/mongo/executor/thread_pool_mock.cpp b/src/mongo/executor/thread_pool_mock.cpp
index 191537cebff..fb809990e49 100644
--- a/src/mongo/executor/thread_pool_mock.cpp
+++ b/src/mongo/executor/thread_pool_mock.cpp
@@ -43,7 +43,7 @@ ThreadPoolMock::ThreadPoolMock(NetworkInterfaceMock* net, int32_t prngSeed, Opti
: _options(std::move(options)), _prng(prngSeed), _net(net) {}
ThreadPoolMock::~ThreadPoolMock() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
if (_joining)
return;
@@ -53,13 +53,13 @@ ThreadPoolMock::~ThreadPoolMock() {
void ThreadPoolMock::startup() {
LOG(1) << "Starting pool";
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
invariant(!_started);
invariant(!_worker.joinable());
_started = true;
_worker = stdx::thread([this] {
_options.onCreateThread();
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
LOG(1) << "Starting to consume tasks";
while (!_joining) {
@@ -77,17 +77,17 @@ void ThreadPoolMock::startup() {
}
void ThreadPoolMock::shutdown() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
_shutdown(lk);
}
void ThreadPoolMock::join() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
_join(lk);
}
void ThreadPoolMock::schedule(Task task) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
if (_inShutdown) {
lk.unlock();
@@ -98,7 +98,7 @@ void ThreadPoolMock::schedule(Task task) {
_tasks.emplace_back(std::move(task));
}
-void ThreadPoolMock::_consumeOneTask(stdx::unique_lock<stdx::mutex>& lk) {
+void ThreadPoolMock::_consumeOneTask(stdx::unique_lock<Latch>& lk) {
auto next = static_cast<size_t>(_prng.nextInt64(static_cast<int64_t>(_tasks.size())));
if (next + 1 != _tasks.size()) {
std::swap(_tasks[next], _tasks.back());
@@ -114,14 +114,14 @@ void ThreadPoolMock::_consumeOneTask(stdx::unique_lock<stdx::mutex>& lk) {
lk.lock();
}
-void ThreadPoolMock::_shutdown(stdx::unique_lock<stdx::mutex>& lk) {
+void ThreadPoolMock::_shutdown(stdx::unique_lock<Latch>& lk) {
LOG(1) << "Shutting down pool";
_inShutdown = true;
_net->signalWorkAvailable();
}
-void ThreadPoolMock::_join(stdx::unique_lock<stdx::mutex>& lk) {
+void ThreadPoolMock::_join(stdx::unique_lock<Latch>& lk) {
LOG(1) << "Joining pool";
_joining = true;
diff --git a/src/mongo/executor/thread_pool_mock.h b/src/mongo/executor/thread_pool_mock.h
index e1f8e30a80f..d81f83dfb4c 100644
--- a/src/mongo/executor/thread_pool_mock.h
+++ b/src/mongo/executor/thread_pool_mock.h
@@ -33,8 +33,8 @@
#include <functional>
#include <vector>
+#include "mongo/platform/mutex.h"
#include "mongo/platform/random.h"
-#include "mongo/stdx/mutex.h"
#include "mongo/stdx/thread.h"
#include "mongo/util/concurrency/thread_pool_interface.h"
@@ -73,14 +73,14 @@ public:
void schedule(Task task) override;
private:
- void _consumeOneTask(stdx::unique_lock<stdx::mutex>& lk);
- void _shutdown(stdx::unique_lock<stdx::mutex>& lk);
- void _join(stdx::unique_lock<stdx::mutex>& lk);
+ void _consumeOneTask(stdx::unique_lock<Latch>& lk);
+ void _shutdown(stdx::unique_lock<Latch>& lk);
+ void _join(stdx::unique_lock<Latch>& lk);
// These are the options with which the pool was configured at construction time.
const Options _options;
- stdx::mutex _mutex;
+ Mutex _mutex = MONGO_MAKE_LATCH("ThreadPoolMock::_mutex");
stdx::thread _worker;
std::vector<Task> _tasks;
PseudoRandom _prng;
diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp
index 55adfc29984..84153871128 100644
--- a/src/mongo/executor/thread_pool_task_executor.cpp
+++ b/src/mongo/executor/thread_pool_task_executor.cpp
@@ -140,20 +140,20 @@ ThreadPoolTaskExecutor::ThreadPoolTaskExecutor(std::unique_ptr<ThreadPoolInterfa
ThreadPoolTaskExecutor::~ThreadPoolTaskExecutor() {
shutdown();
- auto lk = _join(stdx::unique_lock<stdx::mutex>(_mutex));
+ auto lk = _join(stdx::unique_lock<Latch>(_mutex));
invariant(_state == shutdownComplete);
}
void ThreadPoolTaskExecutor::startup() {
_net->startup();
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
invariant(_state == preStart);
_setState_inlock(running);
_pool->startup();
}
void ThreadPoolTaskExecutor::shutdown() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
if (_inShutdown_inlock()) {
invariant(_networkInProgressQueue.empty());
invariant(_sleepersQueue.empty());
@@ -176,10 +176,10 @@ void ThreadPoolTaskExecutor::shutdown() {
}
void ThreadPoolTaskExecutor::join() {
- _join(stdx::unique_lock<stdx::mutex>(_mutex));
+ _join(stdx::unique_lock<Latch>(_mutex));
}
-stdx::unique_lock<stdx::mutex> ThreadPoolTaskExecutor::_join(stdx::unique_lock<stdx::mutex> lk) {
+stdx::unique_lock<Latch> ThreadPoolTaskExecutor::_join(stdx::unique_lock<Latch> lk) {
_stateChange.wait(lk, [this] {
// All tasks are spliced into the _poolInProgressQueue immediately after we accept them.
// This occurs in scheduleIntoPool_inlock.
@@ -223,7 +223,7 @@ stdx::unique_lock<stdx::mutex> ThreadPoolTaskExecutor::_join(stdx::unique_lock<s
EventHandle event;
setEventForHandle(&event, std::move(eventState));
signalEvent_inlock(event, std::move(lk));
- lk = stdx::unique_lock<stdx::mutex>(_mutex);
+ lk = stdx::unique_lock<Latch>(_mutex);
}
lk.unlock();
_net->shutdown();
@@ -237,7 +237,7 @@ stdx::unique_lock<stdx::mutex> ThreadPoolTaskExecutor::_join(stdx::unique_lock<s
}
void ThreadPoolTaskExecutor::appendDiagnosticBSON(BSONObjBuilder* b) const {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
// ThreadPool details
// TODO: fill in
@@ -264,7 +264,7 @@ StatusWith<TaskExecutor::EventHandle> ThreadPoolTaskExecutor::makeEvent() {
auto el = makeSingletonEventList();
EventHandle event;
setEventForHandle(&event, el.front());
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
if (_inShutdown_inlock()) {
return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"};
}
@@ -273,7 +273,7 @@ StatusWith<TaskExecutor::EventHandle> ThreadPoolTaskExecutor::makeEvent() {
}
void ThreadPoolTaskExecutor::signalEvent(const EventHandle& event) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
signalEvent_inlock(event, std::move(lk));
}
@@ -284,7 +284,7 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::onEvent(const E
}
// Unsure if we'll succeed yet, so pass an empty CallbackFn.
auto wq = makeSingletonWorkQueue({}, nullptr);
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
auto eventState = checked_cast<EventState*>(getEventFromHandle(event));
auto cbHandle = enqueueCallbackState_inlock(&eventState->waiters, &wq);
if (!cbHandle.isOK()) {
@@ -304,7 +304,7 @@ StatusWith<stdx::cv_status> ThreadPoolTaskExecutor::waitForEvent(OperationContex
invariant(opCtx);
invariant(event.isValid());
auto eventState = checked_cast<EventState*>(getEventFromHandle(event));
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
// std::condition_variable::wait() can wake up spuriously, so we have to loop until the event
// is signalled or we time out.
@@ -323,7 +323,7 @@ StatusWith<stdx::cv_status> ThreadPoolTaskExecutor::waitForEvent(OperationContex
void ThreadPoolTaskExecutor::waitForEvent(const EventHandle& event) {
invariant(event.isValid());
auto eventState = checked_cast<EventState*>(getEventFromHandle(event));
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
while (!eventState->isSignaledFlag) {
eventState->isSignaledCondition.wait(lk);
@@ -334,7 +334,7 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWork(Ca
// Unsure if we'll succeed yet, so pass an empty CallbackFn.
auto wq = makeSingletonWorkQueue({}, nullptr);
WorkQueue temp;
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
auto cbHandle = enqueueCallbackState_inlock(&temp, &wq);
if (!cbHandle.isOK()) {
return cbHandle;
@@ -352,7 +352,7 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWorkAt(
}
auto wq = makeSingletonWorkQueue(std::move(work), nullptr, when);
wq.front()->isTimerOperation = true;
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
auto cbHandle = enqueueCallbackState_inlock(&_sleepersQueue, &wq);
if (!cbHandle.isOK()) {
return cbHandle;
@@ -366,7 +366,7 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWorkAt(
}
auto cbState = checked_cast<CallbackState*>(getCallbackFromHandle(cbHandle));
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
if (cbState->canceled.load()) {
return;
}
@@ -455,7 +455,7 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleRemoteC
},
baton);
wq.front()->isNetworkOperation = true;
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
auto swCbHandle = enqueueCallbackState_inlock(&_networkInProgressQueue, &wq);
if (!swCbHandle.isOK())
return swCbHandle;
@@ -471,7 +471,7 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleRemoteC
CallbackFn newCb = [cb, scheduledRequest, response](const CallbackArgs& cbData) {
remoteCommandFinished(cbData, cb, scheduledRequest, response);
};
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
if (_inShutdown_inlock()) {
return;
}
@@ -491,7 +491,7 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleRemoteC
void ThreadPoolTaskExecutor::cancel(const CallbackHandle& cbHandle) {
invariant(cbHandle.isValid());
auto cbState = checked_cast<CallbackState*>(getCallbackFromHandle(cbHandle));
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
if (_inShutdown_inlock()) {
return;
}
@@ -527,7 +527,7 @@ void ThreadPoolTaskExecutor::wait(const CallbackHandle& cbHandle, Interruptible*
if (cbState->isFinished.load()) {
return;
}
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
if (!cbState->finishedCondition) {
cbState->finishedCondition.emplace();
}
@@ -569,7 +569,7 @@ ThreadPoolTaskExecutor::EventList ThreadPoolTaskExecutor::makeSingletonEventList
}
void ThreadPoolTaskExecutor::signalEvent_inlock(const EventHandle& event,
- stdx::unique_lock<stdx::mutex> lk) {
+ stdx::unique_lock<Latch> lk) {
invariant(event.isValid());
auto eventState = checked_cast<EventState*>(getEventFromHandle(event));
invariant(!eventState->isSignaledFlag);
@@ -580,20 +580,20 @@ void ThreadPoolTaskExecutor::signalEvent_inlock(const EventHandle& event,
}
void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue,
- stdx::unique_lock<stdx::mutex> lk) {
+ stdx::unique_lock<Latch> lk) {
scheduleIntoPool_inlock(fromQueue, fromQueue->begin(), fromQueue->end(), std::move(lk));
}
void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue,
const WorkQueue::iterator& iter,
- stdx::unique_lock<stdx::mutex> lk) {
+ stdx::unique_lock<Latch> lk) {
scheduleIntoPool_inlock(fromQueue, iter, std::next(iter), std::move(lk));
}
void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue,
const WorkQueue::iterator& begin,
const WorkQueue::iterator& end,
- stdx::unique_lock<stdx::mutex> lk) {
+ stdx::unique_lock<Latch> lk) {
dassert(fromQueue != &_poolInProgressQueue);
std::vector<std::shared_ptr<CallbackState>> todo(begin, end);
_poolInProgressQueue.splice(_poolInProgressQueue.end(), *fromQueue, begin, end);
@@ -626,7 +626,7 @@ void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue,
} else {
_pool->schedule([this, cbState](auto status) {
if (ErrorCodes::isCancelationError(status.code())) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
cbState->canceled.store(1);
} else {
@@ -659,7 +659,7 @@ void ThreadPoolTaskExecutor::runCallback(std::shared_ptr<CallbackState> cbStateA
callback(std::move(args));
}
cbStateArg->isFinished.store(true);
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
_poolInProgressQueue.erase(cbStateArg->iter);
if (cbStateArg->finishedCondition) {
cbStateArg->finishedCondition->notify_all();
diff --git a/src/mongo/executor/thread_pool_task_executor.h b/src/mongo/executor/thread_pool_task_executor.h
index 9106f596069..35dca3ce6b7 100644
--- a/src/mongo/executor/thread_pool_task_executor.h
+++ b/src/mongo/executor/thread_pool_task_executor.h
@@ -33,8 +33,8 @@
#include <memory>
#include "mongo/executor/task_executor.h"
-#include "mongo/stdx/condition_variable.h"
-#include "mongo/stdx/mutex.h"
+#include "mongo/platform/condition_variable.h"
+#include "mongo/platform/mutex.h"
#include "mongo/stdx/thread.h"
#include "mongo/transport/baton.h"
#include "mongo/util/fail_point_service.h"
@@ -149,13 +149,13 @@ private:
/**
* Signals the given event.
*/
- void signalEvent_inlock(const EventHandle& event, stdx::unique_lock<stdx::mutex> lk);
+ void signalEvent_inlock(const EventHandle& event, stdx::unique_lock<Latch> lk);
/**
* Schedules all items from "fromQueue" into the thread pool and moves them into
* _poolInProgressQueue.
*/
- void scheduleIntoPool_inlock(WorkQueue* fromQueue, stdx::unique_lock<stdx::mutex> lk);
+ void scheduleIntoPool_inlock(WorkQueue* fromQueue, stdx::unique_lock<Latch> lk);
/**
* Schedules the given item from "fromQueue" into the thread pool and moves it into
@@ -163,7 +163,7 @@ private:
*/
void scheduleIntoPool_inlock(WorkQueue* fromQueue,
const WorkQueue::iterator& iter,
- stdx::unique_lock<stdx::mutex> lk);
+ stdx::unique_lock<Latch> lk);
/**
* Schedules entries from "begin" through "end" in "fromQueue" into the thread pool
@@ -172,7 +172,7 @@ private:
void scheduleIntoPool_inlock(WorkQueue* fromQueue,
const WorkQueue::iterator& begin,
const WorkQueue::iterator& end,
- stdx::unique_lock<stdx::mutex> lk);
+ stdx::unique_lock<Latch> lk);
/**
* Executes the callback specified by "cbState".
@@ -181,7 +181,7 @@ private:
bool _inShutdown_inlock() const;
void _setState_inlock(State newState);
- stdx::unique_lock<stdx::mutex> _join(stdx::unique_lock<stdx::mutex> lk);
+ stdx::unique_lock<Latch> _join(stdx::unique_lock<Latch> lk);
// The network interface used for remote command execution and waiting.
std::shared_ptr<NetworkInterface> _net;
@@ -190,7 +190,7 @@ private:
std::shared_ptr<ThreadPoolInterface> _pool;
// Mutex guarding all remaining fields.
- mutable stdx::mutex _mutex;
+ mutable Mutex _mutex = MONGO_MAKE_LATCH("ThreadPoolTaskExecutor::_mutex");
// Queue containing all items currently scheduled into the thread pool but not yet completed.
WorkQueue _poolInProgressQueue;