diff options
author | Ben Caimano <ben.caimano@mongodb.com> | 2019-09-17 23:22:19 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-09-17 23:22:19 +0000 |
commit | bc11369435ca51e2ff6897433d00f6b909f6a25f (patch) | |
tree | 251653ec8285d798b41846e343e7e414e80ff277 /src/mongo/executor | |
parent | 45aea2495306dd61fab46bd398735bb6aaf7b53a (diff) | |
download | mongo-bc11369435ca51e2ff6897433d00f6b909f6a25f.tar.gz |
SERVER-42165 Replace uses of stdx::mutex with mongo::Mutex
Diffstat (limited to 'src/mongo/executor')
25 files changed, 152 insertions, 152 deletions
diff --git a/src/mongo/executor/async_multicaster.cpp b/src/mongo/executor/async_multicaster.cpp index 24e72527d51..3962f8f551d 100644 --- a/src/mongo/executor/async_multicaster.cpp +++ b/src/mongo/executor/async_multicaster.cpp @@ -37,8 +37,8 @@ #include "mongo/base/status.h" #include "mongo/db/operation_context.h" -#include "mongo/stdx/condition_variable.h" -#include "mongo/stdx/mutex.h" +#include "mongo/platform/condition_variable.h" +#include "mongo/platform/mutex.h" #include "mongo/util/assert_util.h" namespace mongo { @@ -60,7 +60,7 @@ std::vector<AsyncMulticaster::Reply> AsyncMulticaster::multicast( struct State { State(size_t leftToDo) : leftToDo(leftToDo) {} - stdx::mutex mutex; + Mutex mutex = MONGO_MAKE_LATCH("State::mutex"); stdx::condition_variable cv; size_t leftToDo; size_t running = 0; @@ -71,7 +71,7 @@ std::vector<AsyncMulticaster::Reply> AsyncMulticaster::multicast( auto state = std::make_shared<State>(servers.size()); for (const auto& server : servers) { - stdx::unique_lock<stdx::mutex> lk(state->mutex); + stdx::unique_lock<Latch> lk(state->mutex); // spin up no more than maxConcurrency tasks at once opCtx->waitForConditionOrInterrupt( state->cv, lk, [&] { return state->running < _options.maxConcurrency; }); @@ -80,7 +80,7 @@ std::vector<AsyncMulticaster::Reply> AsyncMulticaster::multicast( uassertStatusOK(_executor->scheduleRemoteCommand( RemoteCommandRequest{server, theDbName, theCmdObj, opCtx, timeoutMillis}, [state](const TaskExecutor::RemoteCommandCallbackArgs& cbData) { - stdx::lock_guard<stdx::mutex> lk(state->mutex); + stdx::lock_guard<Latch> lk(state->mutex); state->out.emplace_back( std::forward_as_tuple(cbData.request.target, cbData.response)); @@ -96,7 +96,7 @@ std::vector<AsyncMulticaster::Reply> AsyncMulticaster::multicast( })); } - stdx::unique_lock<stdx::mutex> lk(state->mutex); + stdx::unique_lock<Latch> lk(state->mutex); opCtx->waitForConditionOrInterrupt(state->cv, lk, [&] { return state->leftToDo == 0; }); return std::move(state->out); diff --git a/src/mongo/executor/async_multicaster.h b/src/mongo/executor/async_multicaster.h index c2bc9e0be93..63eaaa4993d 100644 --- a/src/mongo/executor/async_multicaster.h +++ b/src/mongo/executor/async_multicaster.h @@ -34,7 +34,7 @@ #include "mongo/executor/remote_command_response.h" #include "mongo/executor/task_executor.h" -#include "mongo/stdx/mutex.h" +#include "mongo/platform/mutex.h" #include "mongo/util/net/hostandport.h" namespace mongo { diff --git a/src/mongo/executor/async_timer_mock.cpp b/src/mongo/executor/async_timer_mock.cpp index 6a796b684e7..9153cba5f0b 100644 --- a/src/mongo/executor/async_timer_mock.cpp +++ b/src/mongo/executor/async_timer_mock.cpp @@ -48,7 +48,7 @@ void AsyncTimerMockImpl::cancel() { void AsyncTimerMockImpl::asyncWait(AsyncTimerInterface::Handler handler) { { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); if (_timeLeft != kZeroMilliseconds) { _handlers.push_back(handler); return; @@ -66,7 +66,7 @@ void AsyncTimerMockImpl::fastForward(Milliseconds time) { // While holding the lock, change the time and remove // handlers that have expired { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); if (time >= _timeLeft) { _timeLeft = kZeroMilliseconds; tmp.swap(_handlers); @@ -82,7 +82,7 @@ void AsyncTimerMockImpl::fastForward(Milliseconds time) { } Milliseconds AsyncTimerMockImpl::timeLeft() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _timeLeft; } @@ -91,7 +91,7 @@ void AsyncTimerMockImpl::expireAfter(Milliseconds expiration) { // While holding the lock, reset the time and remove all handlers { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _timeLeft = expiration; tmp.swap(_handlers); } @@ -103,14 +103,14 @@ void AsyncTimerMockImpl::expireAfter(Milliseconds expiration) { } int AsyncTimerMockImpl::jobs() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _handlers.size(); } void AsyncTimerMockImpl::_callAllHandlers(std::error_code ec) { std::vector<AsyncTimerInterface::Handler> tmp; { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); tmp.swap(_handlers); } diff --git a/src/mongo/executor/async_timer_mock.h b/src/mongo/executor/async_timer_mock.h index 13463b679d7..5e3a83e3275 100644 --- a/src/mongo/executor/async_timer_mock.h +++ b/src/mongo/executor/async_timer_mock.h @@ -32,7 +32,7 @@ #include <vector> #include "mongo/executor/async_timer_interface.h" -#include "mongo/stdx/mutex.h" +#include "mongo/platform/mutex.h" #include "mongo/stdx/unordered_set.h" namespace mongo { @@ -84,7 +84,7 @@ public: private: void _callAllHandlers(std::error_code ec); - stdx::mutex _mutex; + Mutex _mutex = MONGO_MAKE_LATCH("AsyncTimerMockImpl::_mutex"); Milliseconds _timeLeft; std::vector<AsyncTimerInterface::Handler> _handlers; }; diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp index 77a83ec4ff4..a0a4c9f0a7c 100644 --- a/src/mongo/executor/connection_pool.cpp +++ b/src/mongo/executor/connection_pool.cpp @@ -189,7 +189,7 @@ protected: size_t target = 0; }; - stdx::mutex _mutex; + Mutex _mutex = MONGO_MAKE_LATCH("LimitController::_mutex"); stdx::unordered_map<PoolId, PoolData> _poolData; }; diff --git a/src/mongo/executor/connection_pool.h b/src/mongo/executor/connection_pool.h index 196e99014b7..f46c3c23dfa 100644 --- a/src/mongo/executor/connection_pool.h +++ b/src/mongo/executor/connection_pool.h @@ -35,7 +35,7 @@ #include "mongo/executor/egress_tag_closer.h" #include "mongo/executor/egress_tag_closer_manager.h" -#include "mongo/stdx/mutex.h" +#include "mongo/platform/mutex.h" #include "mongo/stdx/unordered_map.h" #include "mongo/transport/session.h" #include "mongo/transport/transport_layer.h" @@ -255,7 +255,7 @@ private: std::shared_ptr<ControllerInterface> _controller; // The global mutex for specific pool access and the generation counter - mutable stdx::mutex _mutex; + mutable Mutex _mutex = MONGO_MAKE_LATCH("ConnectionPool::_mutex"); PoolId _nextPoolId = 0; stdx::unordered_map<HostAndPort, std::shared_ptr<SpecificPool>> _pools; diff --git a/src/mongo/executor/connection_pool_tl.cpp b/src/mongo/executor/connection_pool_tl.cpp index e2f7711cca7..c3816eab43c 100644 --- a/src/mongo/executor/connection_pool_tl.cpp +++ b/src/mongo/executor/connection_pool_tl.cpp @@ -56,7 +56,7 @@ void TLTypeFactory::shutdown() { // Stop any attempt to schedule timers in the future _inShutdown.store(true); - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); log() << "Killing all outstanding egress activity."; for (auto collar : _collars) { @@ -65,12 +65,12 @@ void TLTypeFactory::shutdown() { } void TLTypeFactory::fasten(Type* type) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _collars.insert(type); } void TLTypeFactory::release(Type* type) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _collars.erase(type); type->_wasReleased = true; diff --git a/src/mongo/executor/connection_pool_tl.h b/src/mongo/executor/connection_pool_tl.h index 7297713b92b..f5bf54ff081 100644 --- a/src/mongo/executor/connection_pool_tl.h +++ b/src/mongo/executor/connection_pool_tl.h @@ -79,7 +79,7 @@ private: std::unique_ptr<NetworkConnectionHook> _onConnectHook; const ConnectionPool::Options _connPoolOptions; - mutable stdx::mutex _mutex; + mutable Mutex _mutex = MONGO_MAKE_LATCH("TLTypeFactory::_mutex"); AtomicWord<bool> _inShutdown{false}; stdx::unordered_set<Type*> _collars; }; diff --git a/src/mongo/executor/egress_tag_closer_manager.cpp b/src/mongo/executor/egress_tag_closer_manager.cpp index fa78a74e183..64c0f0493b8 100644 --- a/src/mongo/executor/egress_tag_closer_manager.cpp +++ b/src/mongo/executor/egress_tag_closer_manager.cpp @@ -48,19 +48,19 @@ EgressTagCloserManager& EgressTagCloserManager::get(ServiceContext* svc) { } void EgressTagCloserManager::add(EgressTagCloser* etc) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _egressTagClosers.insert(etc); } void EgressTagCloserManager::remove(EgressTagCloser* etc) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _egressTagClosers.erase(etc); } void EgressTagCloserManager::dropConnections(transport::Session::TagMask tags) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); for (auto etc : _egressTagClosers) { etc->dropConnections(tags); @@ -68,7 +68,7 @@ void EgressTagCloserManager::dropConnections(transport::Session::TagMask tags) { } void EgressTagCloserManager::dropConnections(const HostAndPort& hostAndPort) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); for (auto etc : _egressTagClosers) { etc->dropConnections(hostAndPort); @@ -78,7 +78,7 @@ void EgressTagCloserManager::dropConnections(const HostAndPort& hostAndPort) { void EgressTagCloserManager::mutateTags( const HostAndPort& hostAndPort, const std::function<transport::Session::TagMask(transport::Session::TagMask)>& mutateFunc) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); for (auto etc : _egressTagClosers) { etc->mutateTags(hostAndPort, mutateFunc); diff --git a/src/mongo/executor/egress_tag_closer_manager.h b/src/mongo/executor/egress_tag_closer_manager.h index 418658dc430..91d996ee3dc 100644 --- a/src/mongo/executor/egress_tag_closer_manager.h +++ b/src/mongo/executor/egress_tag_closer_manager.h @@ -33,7 +33,7 @@ #include "mongo/db/service_context.h" #include "mongo/executor/egress_tag_closer.h" -#include "mongo/stdx/mutex.h" +#include "mongo/platform/mutex.h" #include "mongo/stdx/unordered_set.h" #include "mongo/transport/session.h" #include "mongo/util/net/hostandport.h" @@ -65,7 +65,7 @@ public: const std::function<transport::Session::TagMask(transport::Session::TagMask)>& mutateFunc); private: - stdx::mutex _mutex; + Mutex _mutex = MONGO_MAKE_LATCH("EgressTagCloserManager::_mutex"); stdx::unordered_set<EgressTagCloser*> _egressTagClosers; }; diff --git a/src/mongo/executor/network_interface_integration_test.cpp b/src/mongo/executor/network_interface_integration_test.cpp index f4f05bbc538..03cf7d2a1e9 100644 --- a/src/mongo/executor/network_interface_integration_test.cpp +++ b/src/mongo/executor/network_interface_integration_test.cpp @@ -167,14 +167,14 @@ public: RemoteCommandResponse response; }; IsMasterData waitForIsMaster() { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); _isMasterCond.wait(lk, [this] { return _isMasterResult != boost::none; }); return std::move(*_isMasterResult); } bool hasIsMaster() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _isMasterResult != boost::none; } @@ -186,7 +186,7 @@ private: Status validateHost(const HostAndPort& host, const BSONObj& request, const RemoteCommandResponse& isMasterReply) override { - stdx::lock_guard<stdx::mutex> lk(_parent->_mutex); + stdx::lock_guard<Latch> lk(_parent->_mutex); _parent->_isMasterResult = IsMasterData{request, isMasterReply}; _parent->_isMasterCond.notify_all(); return Status::OK(); @@ -204,7 +204,7 @@ private: NetworkInterfaceTest* _parent; }; - stdx::mutex _mutex; + Mutex _mutex = MONGO_MAKE_LATCH("NetworkInterfaceTest::_mutex"); stdx::condition_variable _isMasterCond; boost::optional<IsMasterData> _isMasterResult; }; diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp index 86e1144b81e..478d72c4b39 100644 --- a/src/mongo/executor/network_interface_mock.cpp +++ b/src/mongo/executor/network_interface_mock.cpp @@ -58,14 +58,14 @@ NetworkInterfaceMock::NetworkInterfaceMock() _executorNextWakeupDate(Date_t::max()) {} NetworkInterfaceMock::~NetworkInterfaceMock() { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); invariant(!_hasStarted || inShutdown()); invariant(_scheduled.empty()); invariant(_blackHoled.empty()); } void NetworkInterfaceMock::logQueues() { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); const std::vector<std::pair<std::string, const NetworkOperationList*>> queues{ {"unscheduled", &_unscheduled}, {"scheduled", &_scheduled}, @@ -85,7 +85,7 @@ void NetworkInterfaceMock::logQueues() { } std::string NetworkInterfaceMock::getDiagnosticString() { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); return str::stream() << "NetworkInterfaceMock -- waitingToRunMask:" << _waitingToRunMask << ", now:" << _now_inlock().toString() << ", hasStarted:" << _hasStarted << ", inShutdown: " << _inShutdown.load() @@ -96,7 +96,7 @@ std::string NetworkInterfaceMock::getDiagnosticString() { } Date_t NetworkInterfaceMock::now() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _now_inlock(); } @@ -112,7 +112,7 @@ Status NetworkInterfaceMock::startCommand(const CallbackHandle& cbHandle, return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceMock shutdown in progress"}; } - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); const Date_t now = _now_inlock(); auto op = NetworkOperation(cbHandle, request, now, std::move(onFinish)); @@ -132,7 +132,7 @@ Status NetworkInterfaceMock::startCommand(const CallbackHandle& cbHandle, void NetworkInterfaceMock::setHandshakeReplyForHost( const mongo::HostAndPort& host, mongo::executor::RemoteCommandResponse&& reply) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); auto it = _handshakeReplies.find(host); if (it == std::end(_handshakeReplies)) { auto res = _handshakeReplies.emplace(host, std::move(reply)); @@ -145,7 +145,7 @@ void NetworkInterfaceMock::setHandshakeReplyForHost( void NetworkInterfaceMock::cancelCommand(const CallbackHandle& cbHandle, const BatonHandle& baton) { invariant(!inShutdown()); - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); ResponseStatus rs(ErrorCodes::CallbackCanceled, "Network operation canceled", Milliseconds(0)); // We mimic the real NetworkInterface by only delivering the CallbackCanceled status if the @@ -179,7 +179,7 @@ Status NetworkInterfaceMock::setAlarm(const TaskExecutor::CallbackHandle& cbHand return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceMock shutdown in progress"}; } - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); if (when <= _now_inlock()) { lk.unlock(); @@ -210,7 +210,7 @@ bool NetworkInterfaceMock::onNetworkThread() { } void NetworkInterfaceMock::startup() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _startup_inlock(); } @@ -225,7 +225,7 @@ void NetworkInterfaceMock::_startup_inlock() { void NetworkInterfaceMock::shutdown() { invariant(!inShutdown()); - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); if (!_hasStarted) { _startup_inlock(); } @@ -258,7 +258,7 @@ bool NetworkInterfaceMock::inShutdown() const { } void NetworkInterfaceMock::enterNetwork() { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); while (!_isNetworkThreadRunnable_inlock()) { _shouldWakeNetworkCondition.wait(lk); } @@ -267,7 +267,7 @@ void NetworkInterfaceMock::enterNetwork() { } void NetworkInterfaceMock::exitNetwork() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); if (_currentlyRunning != kNetworkThread) { return; } @@ -279,7 +279,7 @@ void NetworkInterfaceMock::exitNetwork() { } bool NetworkInterfaceMock::hasReadyRequests() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); invariant(_currentlyRunning == kNetworkThread); return _hasReadyRequests_inlock(); } @@ -294,7 +294,7 @@ bool NetworkInterfaceMock::_hasReadyRequests_inlock() { } NetworkInterfaceMock::NetworkOperationIterator NetworkInterfaceMock::getNextReadyRequest() { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); invariant(_currentlyRunning == kNetworkThread); while (!_hasReadyRequests_inlock()) { _waitingToRunMask |= kExecutorThread; @@ -311,7 +311,7 @@ NetworkInterfaceMock::NetworkOperationIterator NetworkInterfaceMock::getFrontOfU NetworkInterfaceMock::NetworkOperationIterator NetworkInterfaceMock::getNthUnscheduledRequest( size_t n) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); invariant(_currentlyRunning == kNetworkThread); invariant(_hasReadyRequests_inlock()); @@ -325,7 +325,7 @@ NetworkInterfaceMock::NetworkOperationIterator NetworkInterfaceMock::getNthUnsch void NetworkInterfaceMock::scheduleResponse(NetworkOperationIterator noi, Date_t when, const ResponseStatus& response) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); invariant(_currentlyRunning == kNetworkThread); NetworkOperationIterator insertBefore = _scheduled.begin(); while ((insertBefore != _scheduled.end()) && (insertBefore->getResponseDate() <= when)) { @@ -388,13 +388,13 @@ RemoteCommandRequest NetworkInterfaceMock::scheduleErrorResponse(NetworkOperatio } void NetworkInterfaceMock::blackHole(NetworkOperationIterator noi) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); invariant(_currentlyRunning == kNetworkThread); _blackHoled.splice(_blackHoled.end(), _processing, noi); } void NetworkInterfaceMock::requeueAt(NetworkOperationIterator noi, Date_t dontAskUntil) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); invariant(_currentlyRunning == kNetworkThread); invariant(noi->getNextConsiderationDate() < dontAskUntil); invariant(_now_inlock() < dontAskUntil); @@ -409,7 +409,7 @@ void NetworkInterfaceMock::requeueAt(NetworkOperationIterator noi, Date_t dontAs } Date_t NetworkInterfaceMock::runUntil(Date_t until) { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); invariant(_currentlyRunning == kNetworkThread); invariant(until > _now_inlock()); while (until > _now_inlock()) { @@ -436,7 +436,7 @@ Date_t NetworkInterfaceMock::runUntil(Date_t until) { } void NetworkInterfaceMock::advanceTime(Date_t newTime) { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); invariant(_currentlyRunning == kNetworkThread); invariant(newTime > _now_inlock()); _now = newTime; @@ -446,19 +446,19 @@ void NetworkInterfaceMock::advanceTime(Date_t newTime) { } void NetworkInterfaceMock::runReadyNetworkOperations() { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); invariant(_currentlyRunning == kNetworkThread); _runReadyNetworkOperations_inlock(&lk); } void NetworkInterfaceMock::waitForWork() { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); invariant(_currentlyRunning == kExecutorThread); _waitForWork_inlock(&lk); } void NetworkInterfaceMock::waitForWorkUntil(Date_t when) { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); invariant(_currentlyRunning == kExecutorThread); _executorNextWakeupDate = when; if (_executorNextWakeupDate <= _now_inlock()) { @@ -538,7 +538,7 @@ void NetworkInterfaceMock::_connectThenEnqueueOperation_inlock(const HostAndPort // The completion handler for the postconnect command schedules the original command. auto postconnectCompletionHandler = [this, op = std::move(op)](TaskExecutor::ResponseOnAnyStatus rs) mutable { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); if (!rs.isOK()) { op.setResponse(_now_inlock(), rs); op.finishResponse(); @@ -566,7 +566,7 @@ void NetworkInterfaceMock::_connectThenEnqueueOperation_inlock(const HostAndPort } void NetworkInterfaceMock::setConnectionHook(std::unique_ptr<NetworkConnectionHook> hook) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); invariant(!_hasStarted); invariant(!_hook); _hook = std::move(hook); @@ -574,21 +574,21 @@ void NetworkInterfaceMock::setConnectionHook(std::unique_ptr<NetworkConnectionHo void NetworkInterfaceMock::setEgressMetadataHook( std::unique_ptr<rpc::EgressMetadataHook> metadataHook) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); invariant(!_hasStarted); invariant(!_metadataHook); _metadataHook = std::move(metadataHook); } void NetworkInterfaceMock::signalWorkAvailable() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _waitingToRunMask |= kExecutorThread; if (_currentlyRunning == kNoThread) { _shouldWakeExecutorCondition.notify_one(); } } -void NetworkInterfaceMock::_runReadyNetworkOperations_inlock(stdx::unique_lock<stdx::mutex>* lk) { +void NetworkInterfaceMock::_runReadyNetworkOperations_inlock(stdx::unique_lock<Latch>* lk) { while (!_alarms.empty() && _now_inlock() >= _alarms.top().when) { auto& alarm = _alarms.top(); @@ -626,7 +626,7 @@ void NetworkInterfaceMock::_runReadyNetworkOperations_inlock(stdx::unique_lock<s _waitingToRunMask &= ~kNetworkThread; } -void NetworkInterfaceMock::_waitForWork_inlock(stdx::unique_lock<stdx::mutex>* lk) { +void NetworkInterfaceMock::_waitForWork_inlock(stdx::unique_lock<Latch>* lk) { if (_waitingToRunMask & kExecutorThread) { _waitingToRunMask &= ~kExecutorThread; return; diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h index 8fe6cdb3414..8a5d69b5ad9 100644 --- a/src/mongo/executor/network_interface_mock.h +++ b/src/mongo/executor/network_interface_mock.h @@ -36,9 +36,9 @@ #include <vector> #include "mongo/executor/network_interface.h" +#include "mongo/platform/condition_variable.h" +#include "mongo/platform/mutex.h" #include "mongo/rpc/metadata/metadata_hook.h" -#include "mongo/stdx/condition_variable.h" -#include "mongo/stdx/mutex.h" #include "mongo/stdx/unordered_map.h" #include "mongo/stdx/unordered_set.h" #include "mongo/util/clock_source.h" @@ -324,7 +324,7 @@ private: /** * Implementation of waitForWork*. */ - void _waitForWork_inlock(stdx::unique_lock<stdx::mutex>* lk); + void _waitForWork_inlock(stdx::unique_lock<Latch>* lk); /** * Returns true if there are ready requests for the network thread to service. @@ -356,12 +356,12 @@ private: * reaquire "lk" several times, but will not return until the executor has blocked * in waitFor*. */ - void _runReadyNetworkOperations_inlock(stdx::unique_lock<stdx::mutex>* lk); + void _runReadyNetworkOperations_inlock(stdx::unique_lock<Latch>* lk); // Mutex that synchronizes access to mutable data in this class and its subclasses. // Fields guarded by the mutex are labled (M), below, and those that are read-only // in multi-threaded execution, and so unsynchronized, are labeled (R). - stdx::mutex _mutex; + Mutex _mutex = MONGO_MAKE_LATCH("NetworkInterfaceMock::_mutex"); // Condition signaled to indicate that the network processing thread should wake up. stdx::condition_variable _shouldWakeNetworkCondition; // (M) diff --git a/src/mongo/executor/network_interface_perf_test.cpp b/src/mongo/executor/network_interface_perf_test.cpp index 205caa22a2a..2f625301dfe 100644 --- a/src/mongo/executor/network_interface_perf_test.cpp +++ b/src/mongo/executor/network_interface_perf_test.cpp @@ -66,7 +66,7 @@ int timeNetworkTestMillis(std::size_t operations, NetworkInterface* net) { auto server = fixture.getServers()[0]; std::atomic<int> remainingOps(operations); // NOLINT - stdx::mutex mtx; + auto mtx = MONGO_MAKE_LATCH(); stdx::condition_variable cv; Timer t; @@ -81,7 +81,7 @@ int timeNetworkTestMillis(std::size_t operations, NetworkInterface* net) { if (--remainingOps) { return func(); } - stdx::unique_lock<stdx::mutex> lk(mtx); + stdx::unique_lock<Latch> lk(mtx); cv.notify_one(); }; @@ -93,7 +93,7 @@ int timeNetworkTestMillis(std::size_t operations, NetworkInterface* net) { func(); - stdx::unique_lock<stdx::mutex> lk(mtx); + stdx::unique_lock<Latch> lk(mtx); cv.wait(lk, [&] { return remainingOps.load() == 0; }); return t.millis(); diff --git a/src/mongo/executor/network_interface_thread_pool.cpp b/src/mongo/executor/network_interface_thread_pool.cpp index 787bd0a6dac..f40a298aea1 100644 --- a/src/mongo/executor/network_interface_thread_pool.cpp +++ b/src/mongo/executor/network_interface_thread_pool.cpp @@ -49,7 +49,7 @@ NetworkInterfaceThreadPool::~NetworkInterfaceThreadPool() { void NetworkInterfaceThreadPool::_dtorImpl() { { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); if (_tasks.empty()) return; @@ -63,7 +63,7 @@ void NetworkInterfaceThreadPool::_dtorImpl() { } void NetworkInterfaceThreadPool::startup() { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); if (_started) { severe() << "Attempting to start pool, but it has already started"; fassertFailed(34358); @@ -75,7 +75,7 @@ void NetworkInterfaceThreadPool::startup() { void NetworkInterfaceThreadPool::shutdown() { { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _inShutdown = true; } @@ -84,7 +84,7 @@ void NetworkInterfaceThreadPool::shutdown() { void NetworkInterfaceThreadPool::join() { { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); if (_joining) { severe() << "Attempted to join pool more than once"; @@ -100,13 +100,13 @@ void NetworkInterfaceThreadPool::join() { _net->signalWorkAvailable(); - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); _joiningCondition.wait( lk, [&] { return _tasks.empty() && (_consumeState == ConsumeState::kNeutral); }); } void NetworkInterfaceThreadPool::schedule(Task task) { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); if (_inShutdown) { lk.unlock(); task({ErrorCodes::ShutdownInProgress, "Shutdown in progress"}); @@ -127,7 +127,7 @@ void NetworkInterfaceThreadPool::schedule(Task task) { * allows us to use the network interface's threads as our own pool, which should reduce context * switches if our tasks are getting scheduled by network interface tasks. */ -void NetworkInterfaceThreadPool::_consumeTasks(stdx::unique_lock<stdx::mutex> lk) { +void NetworkInterfaceThreadPool::_consumeTasks(stdx::unique_lock<Latch> lk) { if ((_consumeState != ConsumeState::kNeutral) || _tasks.empty()) return; @@ -140,7 +140,7 @@ void NetworkInterfaceThreadPool::_consumeTasks(stdx::unique_lock<stdx::mutex> lk _consumeState = ConsumeState::kScheduled; lk.unlock(); auto ret = _net->schedule([this](Status status) { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); if (_consumeState != ConsumeState::kScheduled) return; @@ -149,7 +149,7 @@ void NetworkInterfaceThreadPool::_consumeTasks(stdx::unique_lock<stdx::mutex> lk invariant(ret.isOK() || ErrorCodes::isShutdownError(ret.code())); } -void NetworkInterfaceThreadPool::_consumeTasksInline(stdx::unique_lock<stdx::mutex> lk) noexcept { +void NetworkInterfaceThreadPool::_consumeTasksInline(stdx::unique_lock<Latch> lk) noexcept { _consumeState = ConsumeState::kConsuming; const auto consumingTasksGuard = makeGuard([&] { _consumeState = ConsumeState::kNeutral; }); diff --git a/src/mongo/executor/network_interface_thread_pool.h b/src/mongo/executor/network_interface_thread_pool.h index 51771393032..946519b56f1 100644 --- a/src/mongo/executor/network_interface_thread_pool.h +++ b/src/mongo/executor/network_interface_thread_pool.h @@ -32,8 +32,8 @@ #include <cstdint> #include <vector> -#include "mongo/stdx/condition_variable.h" -#include "mongo/stdx/mutex.h" +#include "mongo/platform/condition_variable.h" +#include "mongo/platform/mutex.h" #include "mongo/util/concurrency/thread_pool_interface.h" namespace mongo { @@ -60,14 +60,14 @@ public: void schedule(Task task) override; private: - void _consumeTasks(stdx::unique_lock<stdx::mutex> lk); - void _consumeTasksInline(stdx::unique_lock<stdx::mutex> lk) noexcept; + void _consumeTasks(stdx::unique_lock<Latch> lk); + void _consumeTasksInline(stdx::unique_lock<Latch> lk) noexcept; void _dtorImpl(); NetworkInterface* const _net; // Protects all of the pool state below - stdx::mutex _mutex; + Mutex _mutex = MONGO_MAKE_LATCH("NetworkInterfaceThreadPool::_mutex"); stdx::condition_variable _joiningCondition; std::vector<Task> _tasks; bool _started = false; diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp index bcd1672e50e..e932130b005 100644 --- a/src/mongo/executor/network_interface_tl.cpp +++ b/src/mongo/executor/network_interface_tl.cpp @@ -65,7 +65,7 @@ std::string NetworkInterfaceTL::getDiagnosticString() { void NetworkInterfaceTL::appendConnectionStats(ConnectionPoolStats* stats) const { auto pool = [&] { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _pool.get(); }(); if (pool) @@ -74,7 +74,7 @@ void NetworkInterfaceTL::appendConnectionStats(ConnectionPoolStats* stats) const NetworkInterface::Counters NetworkInterfaceTL::getCounters() const { invariant(getTestCommandsEnabled()); - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _counters; } @@ -83,7 +83,7 @@ std::string NetworkInterfaceTL::getHostName() { } void NetworkInterfaceTL::startup() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); if (_svcCtx) { _tl = _svcCtx->getTransportLayer(); } @@ -144,19 +144,19 @@ bool NetworkInterfaceTL::inShutdown() const { } void NetworkInterfaceTL::waitForWork() { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); MONGO_IDLE_THREAD_BLOCK; _workReadyCond.wait(lk, [this] { return _isExecutorRunnable; }); } void NetworkInterfaceTL::waitForWorkUntil(Date_t when) { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); MONGO_IDLE_THREAD_BLOCK; _workReadyCond.wait_until(lk, when.toSystemTimePoint(), [this] { return _isExecutorRunnable; }); } void NetworkInterfaceTL::signalWorkAvailable() { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); if (!_isExecutorRunnable) { _isExecutorRunnable = true; _workReadyCond.notify_one(); @@ -401,7 +401,7 @@ void NetworkInterfaceTL::_onAcquireConn(std::shared_ptr<CommandState> state, } if (getTestCommandsEnabled()) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _counters.timedOut++; } @@ -449,7 +449,7 @@ void NetworkInterfaceTL::_onAcquireConn(std::shared_ptr<CommandState> state, } if (getTestCommandsEnabled()) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); if (swr.isOK() && swr.getValue().status.isOK()) { _counters.succeeded++; } else { @@ -467,7 +467,7 @@ void NetworkInterfaceTL::_onAcquireConn(std::shared_ptr<CommandState> state, void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle, const BatonHandle& baton) { - stdx::unique_lock<stdx::mutex> lk(_inProgressMutex); + stdx::unique_lock<Latch> lk(_inProgressMutex); auto it = _inProgress.find(cbHandle); if (it == _inProgress.end()) { return; @@ -485,7 +485,7 @@ void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHan } if (getTestCommandsEnabled()) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _counters.canceled++; } @@ -528,7 +528,7 @@ Status NetworkInterfaceTL::setAlarm(const TaskExecutor::CallbackHandle& cbHandle std::make_shared<AlarmState>(when, cbHandle, _reactor->makeTimer(), std::move(pf.promise)); { - stdx::lock_guard<stdx::mutex> lk(_inProgressMutex); + stdx::lock_guard<Latch> lk(_inProgressMutex); // If a user has already scheduled an alarm with a handle, make sure they intentionally // override it by canceling and setting a new one. @@ -546,7 +546,7 @@ Status NetworkInterfaceTL::setAlarm(const TaskExecutor::CallbackHandle& cbHandle } void NetworkInterfaceTL::cancelAlarm(const TaskExecutor::CallbackHandle& cbHandle) { - stdx::unique_lock<stdx::mutex> lk(_inProgressMutex); + stdx::unique_lock<Latch> lk(_inProgressMutex); auto iter = _inProgressAlarms.find(cbHandle); @@ -566,7 +566,7 @@ void NetworkInterfaceTL::cancelAlarm(const TaskExecutor::CallbackHandle& cbHandl void NetworkInterfaceTL::_cancelAllAlarms() { auto alarms = [&] { - stdx::unique_lock<stdx::mutex> lk(_inProgressMutex); + stdx::unique_lock<Latch> lk(_inProgressMutex); return std::exchange(_inProgressAlarms, {}); }(); @@ -599,7 +599,7 @@ void NetworkInterfaceTL::_answerAlarm(Status status, std::shared_ptr<AlarmState> // Erase the AlarmState from the map. { - stdx::lock_guard<stdx::mutex> lk(_inProgressMutex); + stdx::lock_guard<Latch> lk(_inProgressMutex); auto iter = _inProgressAlarms.find(state->cbHandle); if (iter == _inProgressAlarms.end()) { diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h index 15fdf391876..ee27fdd410f 100644 --- a/src/mongo/executor/network_interface_tl.h +++ b/src/mongo/executor/network_interface_tl.h @@ -147,7 +147,7 @@ private: std::unique_ptr<transport::TransportLayer> _ownedTransportLayer; transport::ReactorHandle _reactor; - mutable stdx::mutex _mutex; + mutable Mutex _mutex = MONGO_MAKE_LATCH("NetworkInterfaceTL::_mutex"); ConnectionPool::Options _connPoolOpts; std::unique_ptr<NetworkConnectionHook> _onConnectHook; std::shared_ptr<ConnectionPool> _pool; @@ -165,7 +165,7 @@ private: AtomicWord<State> _state; stdx::thread _ioThread; - stdx::mutex _inProgressMutex; + Mutex _inProgressMutex = MONGO_MAKE_LATCH("NetworkInterfaceTL::_inProgressMutex"); stdx::unordered_map<TaskExecutor::CallbackHandle, std::weak_ptr<CommandState>> _inProgress; stdx::unordered_map<TaskExecutor::CallbackHandle, std::shared_ptr<AlarmState>> _inProgressAlarms; diff --git a/src/mongo/executor/scoped_task_executor.cpp b/src/mongo/executor/scoped_task_executor.cpp index e7e2f1bd5ae..057ff4aa971 100644 --- a/src/mongo/executor/scoped_task_executor.cpp +++ b/src/mongo/executor/scoped_task_executor.cpp @@ -226,7 +226,7 @@ private: [id, work = std::forward<Work>(work), self = shared_from_this()](const auto& cargs) { using ArgsT = std::decay_t<decltype(cargs)>; - stdx::unique_lock<stdx::mutex> lk(self->_mutex); + stdx::unique_lock<Latch> lk(self->_mutex); auto doWorkAndNotify = [&](const ArgsT& x) noexcept { lk.unlock(); @@ -301,7 +301,7 @@ private: } } - stdx::mutex _mutex; + Mutex _mutex = MONGO_MAKE_LATCH("ScopedTaskExecutor::_mutex"); bool _inShutdown = false; std::shared_ptr<TaskExecutor> _executor; size_t _id = 0; diff --git a/src/mongo/executor/scoped_task_executor.h b/src/mongo/executor/scoped_task_executor.h index dc166606115..bcdd49e4151 100644 --- a/src/mongo/executor/scoped_task_executor.h +++ b/src/mongo/executor/scoped_task_executor.h @@ -34,8 +34,8 @@ #include "mongo/base/status.h" #include "mongo/executor/task_executor.h" -#include "mongo/stdx/condition_variable.h" -#include "mongo/stdx/mutex.h" +#include "mongo/platform/condition_variable.h" +#include "mongo/platform/mutex.h" #include "mongo/stdx/unordered_map.h" #include "mongo/util/fail_point_service.h" diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h index d36f5c9bac6..f84321a46b7 100644 --- a/src/mongo/executor/task_executor.h +++ b/src/mongo/executor/task_executor.h @@ -38,7 +38,7 @@ #include "mongo/base/string_data.h" #include "mongo/executor/remote_command_request.h" #include "mongo/executor/remote_command_response.h" -#include "mongo/stdx/condition_variable.h" +#include "mongo/platform/condition_variable.h" #include "mongo/transport/baton.h" #include "mongo/util/future.h" #include "mongo/util/out_of_line_executor.h" diff --git a/src/mongo/executor/thread_pool_mock.cpp b/src/mongo/executor/thread_pool_mock.cpp index 191537cebff..fb809990e49 100644 --- a/src/mongo/executor/thread_pool_mock.cpp +++ b/src/mongo/executor/thread_pool_mock.cpp @@ -43,7 +43,7 @@ ThreadPoolMock::ThreadPoolMock(NetworkInterfaceMock* net, int32_t prngSeed, Opti : _options(std::move(options)), _prng(prngSeed), _net(net) {} ThreadPoolMock::~ThreadPoolMock() { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); if (_joining) return; @@ -53,13 +53,13 @@ ThreadPoolMock::~ThreadPoolMock() { void ThreadPoolMock::startup() { LOG(1) << "Starting pool"; - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); invariant(!_started); invariant(!_worker.joinable()); _started = true; _worker = stdx::thread([this] { _options.onCreateThread(); - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); LOG(1) << "Starting to consume tasks"; while (!_joining) { @@ -77,17 +77,17 @@ void ThreadPoolMock::startup() { } void ThreadPoolMock::shutdown() { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); _shutdown(lk); } void ThreadPoolMock::join() { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); _join(lk); } void ThreadPoolMock::schedule(Task task) { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); if (_inShutdown) { lk.unlock(); @@ -98,7 +98,7 @@ void ThreadPoolMock::schedule(Task task) { _tasks.emplace_back(std::move(task)); } -void ThreadPoolMock::_consumeOneTask(stdx::unique_lock<stdx::mutex>& lk) { +void ThreadPoolMock::_consumeOneTask(stdx::unique_lock<Latch>& lk) { auto next = static_cast<size_t>(_prng.nextInt64(static_cast<int64_t>(_tasks.size()))); if (next + 1 != _tasks.size()) { std::swap(_tasks[next], _tasks.back()); @@ -114,14 +114,14 @@ void ThreadPoolMock::_consumeOneTask(stdx::unique_lock<stdx::mutex>& lk) { lk.lock(); } -void ThreadPoolMock::_shutdown(stdx::unique_lock<stdx::mutex>& lk) { +void ThreadPoolMock::_shutdown(stdx::unique_lock<Latch>& lk) { LOG(1) << "Shutting down pool"; _inShutdown = true; _net->signalWorkAvailable(); } -void ThreadPoolMock::_join(stdx::unique_lock<stdx::mutex>& lk) { +void ThreadPoolMock::_join(stdx::unique_lock<Latch>& lk) { LOG(1) << "Joining pool"; _joining = true; diff --git a/src/mongo/executor/thread_pool_mock.h b/src/mongo/executor/thread_pool_mock.h index e1f8e30a80f..d81f83dfb4c 100644 --- a/src/mongo/executor/thread_pool_mock.h +++ b/src/mongo/executor/thread_pool_mock.h @@ -33,8 +33,8 @@ #include <functional> #include <vector> +#include "mongo/platform/mutex.h" #include "mongo/platform/random.h" -#include "mongo/stdx/mutex.h" #include "mongo/stdx/thread.h" #include "mongo/util/concurrency/thread_pool_interface.h" @@ -73,14 +73,14 @@ public: void schedule(Task task) override; private: - void _consumeOneTask(stdx::unique_lock<stdx::mutex>& lk); - void _shutdown(stdx::unique_lock<stdx::mutex>& lk); - void _join(stdx::unique_lock<stdx::mutex>& lk); + void _consumeOneTask(stdx::unique_lock<Latch>& lk); + void _shutdown(stdx::unique_lock<Latch>& lk); + void _join(stdx::unique_lock<Latch>& lk); // These are the options with which the pool was configured at construction time. const Options _options; - stdx::mutex _mutex; + Mutex _mutex = MONGO_MAKE_LATCH("ThreadPoolMock::_mutex"); stdx::thread _worker; std::vector<Task> _tasks; PseudoRandom _prng; diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp index 55adfc29984..84153871128 100644 --- a/src/mongo/executor/thread_pool_task_executor.cpp +++ b/src/mongo/executor/thread_pool_task_executor.cpp @@ -140,20 +140,20 @@ ThreadPoolTaskExecutor::ThreadPoolTaskExecutor(std::unique_ptr<ThreadPoolInterfa ThreadPoolTaskExecutor::~ThreadPoolTaskExecutor() { shutdown(); - auto lk = _join(stdx::unique_lock<stdx::mutex>(_mutex)); + auto lk = _join(stdx::unique_lock<Latch>(_mutex)); invariant(_state == shutdownComplete); } void ThreadPoolTaskExecutor::startup() { _net->startup(); - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); invariant(_state == preStart); _setState_inlock(running); _pool->startup(); } void ThreadPoolTaskExecutor::shutdown() { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); if (_inShutdown_inlock()) { invariant(_networkInProgressQueue.empty()); invariant(_sleepersQueue.empty()); @@ -176,10 +176,10 @@ void ThreadPoolTaskExecutor::shutdown() { } void ThreadPoolTaskExecutor::join() { - _join(stdx::unique_lock<stdx::mutex>(_mutex)); + _join(stdx::unique_lock<Latch>(_mutex)); } -stdx::unique_lock<stdx::mutex> ThreadPoolTaskExecutor::_join(stdx::unique_lock<stdx::mutex> lk) { +stdx::unique_lock<Latch> ThreadPoolTaskExecutor::_join(stdx::unique_lock<Latch> lk) { _stateChange.wait(lk, [this] { // All tasks are spliced into the _poolInProgressQueue immediately after we accept them. // This occurs in scheduleIntoPool_inlock. @@ -223,7 +223,7 @@ stdx::unique_lock<stdx::mutex> ThreadPoolTaskExecutor::_join(stdx::unique_lock<s EventHandle event; setEventForHandle(&event, std::move(eventState)); signalEvent_inlock(event, std::move(lk)); - lk = stdx::unique_lock<stdx::mutex>(_mutex); + lk = stdx::unique_lock<Latch>(_mutex); } lk.unlock(); _net->shutdown(); @@ -237,7 +237,7 @@ stdx::unique_lock<stdx::mutex> ThreadPoolTaskExecutor::_join(stdx::unique_lock<s } void ThreadPoolTaskExecutor::appendDiagnosticBSON(BSONObjBuilder* b) const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); // ThreadPool details // TODO: fill in @@ -264,7 +264,7 @@ StatusWith<TaskExecutor::EventHandle> ThreadPoolTaskExecutor::makeEvent() { auto el = makeSingletonEventList(); EventHandle event; setEventForHandle(&event, el.front()); - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); if (_inShutdown_inlock()) { return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"}; } @@ -273,7 +273,7 @@ StatusWith<TaskExecutor::EventHandle> ThreadPoolTaskExecutor::makeEvent() { } void ThreadPoolTaskExecutor::signalEvent(const EventHandle& event) { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); signalEvent_inlock(event, std::move(lk)); } @@ -284,7 +284,7 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::onEvent(const E } // Unsure if we'll succeed yet, so pass an empty CallbackFn. auto wq = makeSingletonWorkQueue({}, nullptr); - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); auto eventState = checked_cast<EventState*>(getEventFromHandle(event)); auto cbHandle = enqueueCallbackState_inlock(&eventState->waiters, &wq); if (!cbHandle.isOK()) { @@ -304,7 +304,7 @@ StatusWith<stdx::cv_status> ThreadPoolTaskExecutor::waitForEvent(OperationContex invariant(opCtx); invariant(event.isValid()); auto eventState = checked_cast<EventState*>(getEventFromHandle(event)); - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); // std::condition_variable::wait() can wake up spuriously, so we have to loop until the event // is signalled or we time out. @@ -323,7 +323,7 @@ StatusWith<stdx::cv_status> ThreadPoolTaskExecutor::waitForEvent(OperationContex void ThreadPoolTaskExecutor::waitForEvent(const EventHandle& event) { invariant(event.isValid()); auto eventState = checked_cast<EventState*>(getEventFromHandle(event)); - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); while (!eventState->isSignaledFlag) { eventState->isSignaledCondition.wait(lk); @@ -334,7 +334,7 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWork(Ca // Unsure if we'll succeed yet, so pass an empty CallbackFn. auto wq = makeSingletonWorkQueue({}, nullptr); WorkQueue temp; - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); auto cbHandle = enqueueCallbackState_inlock(&temp, &wq); if (!cbHandle.isOK()) { return cbHandle; @@ -352,7 +352,7 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWorkAt( } auto wq = makeSingletonWorkQueue(std::move(work), nullptr, when); wq.front()->isTimerOperation = true; - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); auto cbHandle = enqueueCallbackState_inlock(&_sleepersQueue, &wq); if (!cbHandle.isOK()) { return cbHandle; @@ -366,7 +366,7 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWorkAt( } auto cbState = checked_cast<CallbackState*>(getCallbackFromHandle(cbHandle)); - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); if (cbState->canceled.load()) { return; } @@ -455,7 +455,7 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleRemoteC }, baton); wq.front()->isNetworkOperation = true; - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); auto swCbHandle = enqueueCallbackState_inlock(&_networkInProgressQueue, &wq); if (!swCbHandle.isOK()) return swCbHandle; @@ -471,7 +471,7 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleRemoteC CallbackFn newCb = [cb, scheduledRequest, response](const CallbackArgs& cbData) { remoteCommandFinished(cbData, cb, scheduledRequest, response); }; - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); if (_inShutdown_inlock()) { return; } @@ -491,7 +491,7 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleRemoteC void ThreadPoolTaskExecutor::cancel(const CallbackHandle& cbHandle) { invariant(cbHandle.isValid()); auto cbState = checked_cast<CallbackState*>(getCallbackFromHandle(cbHandle)); - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); if (_inShutdown_inlock()) { return; } @@ -527,7 +527,7 @@ void ThreadPoolTaskExecutor::wait(const CallbackHandle& cbHandle, Interruptible* if (cbState->isFinished.load()) { return; } - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); if (!cbState->finishedCondition) { cbState->finishedCondition.emplace(); } @@ -569,7 +569,7 @@ ThreadPoolTaskExecutor::EventList ThreadPoolTaskExecutor::makeSingletonEventList } void ThreadPoolTaskExecutor::signalEvent_inlock(const EventHandle& event, - stdx::unique_lock<stdx::mutex> lk) { + stdx::unique_lock<Latch> lk) { invariant(event.isValid()); auto eventState = checked_cast<EventState*>(getEventFromHandle(event)); invariant(!eventState->isSignaledFlag); @@ -580,20 +580,20 @@ void ThreadPoolTaskExecutor::signalEvent_inlock(const EventHandle& event, } void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue, - stdx::unique_lock<stdx::mutex> lk) { + stdx::unique_lock<Latch> lk) { scheduleIntoPool_inlock(fromQueue, fromQueue->begin(), fromQueue->end(), std::move(lk)); } void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue, const WorkQueue::iterator& iter, - stdx::unique_lock<stdx::mutex> lk) { + stdx::unique_lock<Latch> lk) { scheduleIntoPool_inlock(fromQueue, iter, std::next(iter), std::move(lk)); } void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue, const WorkQueue::iterator& begin, const WorkQueue::iterator& end, - stdx::unique_lock<stdx::mutex> lk) { + stdx::unique_lock<Latch> lk) { dassert(fromQueue != &_poolInProgressQueue); std::vector<std::shared_ptr<CallbackState>> todo(begin, end); _poolInProgressQueue.splice(_poolInProgressQueue.end(), *fromQueue, begin, end); @@ -626,7 +626,7 @@ void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue, } else { _pool->schedule([this, cbState](auto status) { if (ErrorCodes::isCancelationError(status.code())) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); cbState->canceled.store(1); } else { @@ -659,7 +659,7 @@ void ThreadPoolTaskExecutor::runCallback(std::shared_ptr<CallbackState> cbStateA callback(std::move(args)); } cbStateArg->isFinished.store(true); - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _poolInProgressQueue.erase(cbStateArg->iter); if (cbStateArg->finishedCondition) { cbStateArg->finishedCondition->notify_all(); diff --git a/src/mongo/executor/thread_pool_task_executor.h b/src/mongo/executor/thread_pool_task_executor.h index 9106f596069..35dca3ce6b7 100644 --- a/src/mongo/executor/thread_pool_task_executor.h +++ b/src/mongo/executor/thread_pool_task_executor.h @@ -33,8 +33,8 @@ #include <memory> #include "mongo/executor/task_executor.h" -#include "mongo/stdx/condition_variable.h" -#include "mongo/stdx/mutex.h" +#include "mongo/platform/condition_variable.h" +#include "mongo/platform/mutex.h" #include "mongo/stdx/thread.h" #include "mongo/transport/baton.h" #include "mongo/util/fail_point_service.h" @@ -149,13 +149,13 @@ private: /** * Signals the given event. */ - void signalEvent_inlock(const EventHandle& event, stdx::unique_lock<stdx::mutex> lk); + void signalEvent_inlock(const EventHandle& event, stdx::unique_lock<Latch> lk); /** * Schedules all items from "fromQueue" into the thread pool and moves them into * _poolInProgressQueue. */ - void scheduleIntoPool_inlock(WorkQueue* fromQueue, stdx::unique_lock<stdx::mutex> lk); + void scheduleIntoPool_inlock(WorkQueue* fromQueue, stdx::unique_lock<Latch> lk); /** * Schedules the given item from "fromQueue" into the thread pool and moves it into @@ -163,7 +163,7 @@ private: */ void scheduleIntoPool_inlock(WorkQueue* fromQueue, const WorkQueue::iterator& iter, - stdx::unique_lock<stdx::mutex> lk); + stdx::unique_lock<Latch> lk); /** * Schedules entries from "begin" through "end" in "fromQueue" into the thread pool @@ -172,7 +172,7 @@ private: void scheduleIntoPool_inlock(WorkQueue* fromQueue, const WorkQueue::iterator& begin, const WorkQueue::iterator& end, - stdx::unique_lock<stdx::mutex> lk); + stdx::unique_lock<Latch> lk); /** * Executes the callback specified by "cbState". @@ -181,7 +181,7 @@ private: bool _inShutdown_inlock() const; void _setState_inlock(State newState); - stdx::unique_lock<stdx::mutex> _join(stdx::unique_lock<stdx::mutex> lk); + stdx::unique_lock<Latch> _join(stdx::unique_lock<Latch> lk); // The network interface used for remote command execution and waiting. std::shared_ptr<NetworkInterface> _net; @@ -190,7 +190,7 @@ private: std::shared_ptr<ThreadPoolInterface> _pool; // Mutex guarding all remaining fields. - mutable stdx::mutex _mutex; + mutable Mutex _mutex = MONGO_MAKE_LATCH("ThreadPoolTaskExecutor::_mutex"); // Queue containing all items currently scheduled into the thread pool but not yet completed. WorkQueue _poolInProgressQueue; |