diff options
author | Ben Caimano <ben.caimano@mongodb.com> | 2019-11-01 17:24:53 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-11-01 17:24:53 +0000 |
commit | bf5bef47a8e6937b4e0d2c9df3fde3470bdc72c9 (patch) | |
tree | 8f71a9f272082dd9ee0e471ef5fcb9f19519600d /src/mongo/db/repl | |
parent | f210bc645453c05979067c556bf6f2bd43e64134 (diff) | |
download | mongo-bf5bef47a8e6937b4e0d2c9df3fde3470bdc72c9.tar.gz |
SERVER-42165 Replace uses of stdx::mutex with mongo::Mutex
Diffstat (limited to 'src/mongo/db/repl')
76 files changed, 572 insertions, 573 deletions
diff --git a/src/mongo/db/repl/abstract_async_component.cpp b/src/mongo/db/repl/abstract_async_component.cpp index 1b99507fc5c..77b086af97e 100644 --- a/src/mongo/db/repl/abstract_async_component.cpp +++ b/src/mongo/db/repl/abstract_async_component.cpp @@ -52,7 +52,7 @@ std::string AbstractAsyncComponent::_getComponentName() const { } bool AbstractAsyncComponent::isActive() noexcept { - stdx::lock_guard<stdx::mutex> lock(*_getMutex()); + stdx::lock_guard<Latch> lock(*_getMutex()); return _isActive_inlock(); } @@ -61,7 +61,7 @@ bool AbstractAsyncComponent::_isActive_inlock() noexcept { } bool AbstractAsyncComponent::_isShuttingDown() noexcept { - stdx::lock_guard<stdx::mutex> lock(*_getMutex()); + stdx::lock_guard<Latch> lock(*_getMutex()); return _isShuttingDown_inlock(); } @@ -70,7 +70,7 @@ bool AbstractAsyncComponent::_isShuttingDown_inlock() noexcept { } Status AbstractAsyncComponent::startup() noexcept { - stdx::lock_guard<stdx::mutex> lock(*_getMutex()); + stdx::lock_guard<Latch> lock(*_getMutex()); switch (_state) { case State::kPreStart: _state = State::kRunning; @@ -97,7 +97,7 @@ Status AbstractAsyncComponent::startup() noexcept { } void AbstractAsyncComponent::shutdown() noexcept { - stdx::lock_guard<stdx::mutex> lock(*_getMutex()); + stdx::lock_guard<Latch> lock(*_getMutex()); switch (_state) { case State::kPreStart: // Transition directly from PreStart to Complete if not started yet. @@ -116,17 +116,17 @@ void AbstractAsyncComponent::shutdown() noexcept { } void AbstractAsyncComponent::join() noexcept { - stdx::unique_lock<stdx::mutex> lk(*_getMutex()); + stdx::unique_lock<Latch> lk(*_getMutex()); _stateCondition.wait(lk, [this]() { return !_isActive_inlock(); }); } AbstractAsyncComponent::State AbstractAsyncComponent::getState_forTest() noexcept { - stdx::lock_guard<stdx::mutex> lock(*_getMutex()); + stdx::lock_guard<Latch> lock(*_getMutex()); return _state; } void AbstractAsyncComponent::_transitionToComplete() noexcept { - stdx::lock_guard<stdx::mutex> lock(*_getMutex()); + stdx::lock_guard<Latch> lock(*_getMutex()); _transitionToComplete_inlock(); } @@ -138,13 +138,13 @@ void AbstractAsyncComponent::_transitionToComplete_inlock() noexcept { Status AbstractAsyncComponent::_checkForShutdownAndConvertStatus( const executor::TaskExecutor::CallbackArgs& callbackArgs, const std::string& message) { - stdx::unique_lock<stdx::mutex> lk(*_getMutex()); + stdx::unique_lock<Latch> lk(*_getMutex()); return _checkForShutdownAndConvertStatus_inlock(callbackArgs, message); } Status AbstractAsyncComponent::_checkForShutdownAndConvertStatus(const Status& status, const std::string& message) { - stdx::unique_lock<stdx::mutex> lk(*_getMutex()); + stdx::unique_lock<Latch> lk(*_getMutex()); return _checkForShutdownAndConvertStatus_inlock(status, message); } diff --git a/src/mongo/db/repl/abstract_async_component.h b/src/mongo/db/repl/abstract_async_component.h index 64d88ad41e8..5b0e6426900 100644 --- a/src/mongo/db/repl/abstract_async_component.h +++ b/src/mongo/db/repl/abstract_async_component.h @@ -37,8 +37,8 @@ #include "mongo/base/static_assert.h" #include "mongo/base/status.h" #include "mongo/executor/task_executor.h" +#include "mongo/platform/mutex.h" #include "mongo/stdx/condition_variable.h" -#include "mongo/stdx/mutex.h" namespace mongo { namespace repl { @@ -207,7 +207,7 @@ private: /** * Returns mutex to guard this component's state variable. */ - virtual stdx::mutex* _getMutex() noexcept = 0; + virtual Mutex* _getMutex() noexcept = 0; private: // All member variables are labeled with one of the following codes indicating the @@ -259,7 +259,7 @@ Status AbstractAsyncComponent::_startupComponent_inlock(std::unique_ptr<T>& comp template <typename T> Status AbstractAsyncComponent::_startupComponent(std::unique_ptr<T>& component) { - stdx::lock_guard<stdx::mutex> lock(*_getMutex()); + stdx::lock_guard<Latch> lock(*_getMutex()); return _startupComponent_inlock(component); } @@ -275,7 +275,7 @@ void AbstractAsyncComponent::_shutdownComponent_inlock(const std::unique_ptr<T>& template <typename T> void AbstractAsyncComponent::_shutdownComponent(const std::unique_ptr<T>& component) { - stdx::lock_guard<stdx::mutex> lock(*_getMutex()); + stdx::lock_guard<Latch> lock(*_getMutex()); _shutdownComponent_inlock(component); } diff --git a/src/mongo/db/repl/abstract_async_component_test.cpp b/src/mongo/db/repl/abstract_async_component_test.cpp index 4fdc4128cb7..ad74edf0f33 100644 --- a/src/mongo/db/repl/abstract_async_component_test.cpp +++ b/src/mongo/db/repl/abstract_async_component_test.cpp @@ -33,8 +33,8 @@ #include "mongo/db/repl/abstract_async_component.h" #include "mongo/db/repl/task_executor_mock.h" #include "mongo/executor/thread_pool_task_executor_test_fixture.h" +#include "mongo/platform/mutex.h" #include "mongo/stdx/memory.h" -#include "mongo/stdx/mutex.h" #include "mongo/unittest/unittest.h" @@ -94,10 +94,10 @@ public: private: Status _doStartup_inlock() noexcept override; void _doShutdown_inlock() noexcept override; - stdx::mutex* _getMutex() noexcept override; + Mutex* _getMutex() noexcept override; // Used by AbstractAsyncComponent to guard start changes. - stdx::mutex _mutex; + Mutex _mutex = MONGO_MAKE_LATCH("MockAsyncComponent::_mutex"); public: // Returned by _doStartup_inlock(). Override for testing. @@ -124,7 +124,7 @@ Status MockAsyncComponent::scheduleWorkAndSaveHandle_forTest( executor::TaskExecutor::CallbackFn work, executor::TaskExecutor::CallbackHandle* handle, const std::string& name) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _scheduleWorkAndSaveHandle_inlock(std::move(work), handle, name); } @@ -133,12 +133,12 @@ Status MockAsyncComponent::scheduleWorkAtAndSaveHandle_forTest( executor::TaskExecutor::CallbackFn work, executor::TaskExecutor::CallbackHandle* handle, const std::string& name) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _scheduleWorkAtAndSaveHandle_inlock(when, std::move(work), handle, name); } void MockAsyncComponent::cancelHandle_forTest(executor::TaskExecutor::CallbackHandle handle) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); _cancelHandle_inlock(handle); } @@ -159,7 +159,7 @@ Status MockAsyncComponent::_doStartup_inlock() noexcept { void MockAsyncComponent::_doShutdown_inlock() noexcept {} -stdx::mutex* MockAsyncComponent::_getMutex() noexcept { +Mutex* MockAsyncComponent::_getMutex() noexcept { return &_mutex; } diff --git a/src/mongo/db/repl/abstract_oplog_fetcher.cpp b/src/mongo/db/repl/abstract_oplog_fetcher.cpp index a3f27e65e94..820d2417e9d 100644 --- a/src/mongo/db/repl/abstract_oplog_fetcher.cpp +++ b/src/mongo/db/repl/abstract_oplog_fetcher.cpp @@ -38,8 +38,8 @@ #include "mongo/db/commands/server_status_metric.h" #include "mongo/db/jsobj.h" #include "mongo/db/repl/repl_server_parameters_gen.h" +#include "mongo/platform/mutex.h" #include "mongo/stdx/memory.h" -#include "mongo/stdx/mutex.h" #include "mongo/util/assert_util.h" #include "mongo/util/log.h" @@ -92,7 +92,7 @@ Milliseconds AbstractOplogFetcher::_getGetMoreMaxTime() const { } std::string AbstractOplogFetcher::toString() const { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); str::stream msg; msg << _getComponentName() << " -" << " last optime fetched: " << _lastFetched.toString(); @@ -117,7 +117,7 @@ void AbstractOplogFetcher::_makeAndScheduleFetcherCallback( Status scheduleStatus = Status::OK(); { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); _fetcher = _makeFetcher(findCommandObj, metadataObj, _getInitialFindMaxTime()); scheduleStatus = _scheduleFetcher_inlock(); } @@ -143,7 +143,7 @@ void AbstractOplogFetcher::_doShutdown_inlock() noexcept { } } -stdx::mutex* AbstractOplogFetcher::_getMutex() noexcept { +Mutex* AbstractOplogFetcher::_getMutex() noexcept { return &_mutex; } @@ -157,12 +157,12 @@ OpTime AbstractOplogFetcher::getLastOpTimeFetched_forTest() const { } OpTime AbstractOplogFetcher::_getLastOpTimeFetched() const { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _lastFetched; } BSONObj AbstractOplogFetcher::getCommandObject_forTest() const { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _fetcher->getCommandObject(); } @@ -197,7 +197,7 @@ void AbstractOplogFetcher::_callback(const Fetcher::QueryResponseStatus& result, _makeFindCommandObject(_nss, _getLastOpTimeFetched(), _getRetriedFindMaxTime()); BSONObj metadataObj = _makeMetadataObject(); { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); if (_fetcherRestarts == _maxFetcherRestarts) { log() << "Error returned from oplog query (no more query restarts left): " << redact(responseStatus); @@ -229,7 +229,7 @@ void AbstractOplogFetcher::_callback(const Fetcher::QueryResponseStatus& result, // Reset fetcher restart counter on successful response. { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); invariant(_isActive_inlock()); _fetcherRestarts = 0; } @@ -274,7 +274,7 @@ void AbstractOplogFetcher::_callback(const Fetcher::QueryResponseStatus& result, LOG(3) << _getComponentName() << " setting last fetched optime ahead after batch: " << lastDoc; - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); _lastFetched = lastDoc; } @@ -295,7 +295,7 @@ void AbstractOplogFetcher::_finishCallback(Status status) { _onShutdownCallbackFn(status); decltype(_onShutdownCallbackFn) onShutdownCallbackFn; - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); _transitionToComplete_inlock(); // Release any resources that might be held by the '_onShutdownCallbackFn' function object. diff --git a/src/mongo/db/repl/abstract_oplog_fetcher.h b/src/mongo/db/repl/abstract_oplog_fetcher.h index 11d59fd82a8..45c5961b385 100644 --- a/src/mongo/db/repl/abstract_oplog_fetcher.h +++ b/src/mongo/db/repl/abstract_oplog_fetcher.h @@ -34,8 +34,8 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/repl/abstract_async_component.h" #include "mongo/db/repl/optime_with.h" +#include "mongo/platform/mutex.h" #include "mongo/stdx/functional.h" -#include "mongo/stdx/mutex.h" namespace mongo { namespace repl { @@ -147,7 +147,7 @@ protected: virtual void _doShutdown_inlock() noexcept override; private: - stdx::mutex* _getMutex() noexcept override; + Mutex* _getMutex() noexcept override; /** * This function must be overriden by subclass oplog fetchers to specify what `find` command @@ -213,7 +213,7 @@ private: const std::size_t _maxFetcherRestarts; // Protects member data of this AbstractOplogFetcher. - mutable stdx::mutex _mutex; + mutable Mutex _mutex = MONGO_MAKE_LATCH("AbstractOplogFetcher::_mutex"); // Function to call when the oplog fetcher shuts down. OnShutdownCallbackFn _onShutdownCallbackFn; diff --git a/src/mongo/db/repl/base_cloner_test_fixture.cpp b/src/mongo/db/repl/base_cloner_test_fixture.cpp index 6d7918a7f5c..bf98dc7eec9 100644 --- a/src/mongo/db/repl/base_cloner_test_fixture.cpp +++ b/src/mongo/db/repl/base_cloner_test_fixture.cpp @@ -149,13 +149,13 @@ void BaseClonerTest::clear() { } void BaseClonerTest::setStatus(const Status& status) { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); _status = status; _setStatusCondition.notify_all(); } const Status& BaseClonerTest::getStatus() const { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); return _status; } diff --git a/src/mongo/db/repl/base_cloner_test_fixture.h b/src/mongo/db/repl/base_cloner_test_fixture.h index c4d56c00397..328bfdb27d2 100644 --- a/src/mongo/db/repl/base_cloner_test_fixture.h +++ b/src/mongo/db/repl/base_cloner_test_fixture.h @@ -41,8 +41,8 @@ #include "mongo/db/service_context_test_fixture.h" #include "mongo/executor/network_interface_mock.h" #include "mongo/executor/thread_pool_task_executor_test_fixture.h" +#include "mongo/platform/mutex.h" #include "mongo/stdx/condition_variable.h" -#include "mongo/stdx/mutex.h" #include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/net/hostandport.h" @@ -135,7 +135,7 @@ protected: private: // Protects member data of this base cloner fixture. - mutable stdx::mutex _mutex; + mutable Mutex _mutex = MONGO_MAKE_LATCH("BaseCloner::_mutex"); stdx::condition_variable _setStatusCondition; diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 53d3e6a3612..d8cfe485c67 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -133,7 +133,7 @@ void BackgroundSync::startup(OperationContext* opCtx) { } void BackgroundSync::shutdown(OperationContext* opCtx) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); _state = ProducerState::Stopped; @@ -157,7 +157,7 @@ void BackgroundSync::join(OperationContext* opCtx) { } bool BackgroundSync::inShutdown() const { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _inShutdown_inlock(); } @@ -236,7 +236,7 @@ void BackgroundSync::_produce() { HostAndPort source; SyncSourceResolverResponse syncSourceResp; { - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); if (_lastOpTimeFetched.isNull()) { // then we're initial syncing and we're still waiting for this to be set lock.unlock(); @@ -259,7 +259,7 @@ void BackgroundSync::_produce() { auto opCtx = cc().makeOperationContext(); minValidSaved = _replicationProcess->getConsistencyMarkers()->getMinValid(opCtx.get()); } - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); if (_state != ProducerState::Running) { return; } @@ -289,7 +289,7 @@ void BackgroundSync::_produce() { fassert(40349, status); _syncSourceResolver->join(); { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); _syncSourceResolver.reset(); } @@ -338,7 +338,7 @@ void BackgroundSync::_produce() { return; } else if (syncSourceResp.isOK() && !syncSourceResp.getSyncSource().empty()) { { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); _syncSourceHost = syncSourceResp.getSyncSource(); source = _syncSourceHost; } @@ -380,7 +380,7 @@ void BackgroundSync::_produce() { } { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); if (_state != ProducerState::Running) { return; } @@ -428,7 +428,7 @@ void BackgroundSync::_produce() { }, onOplogFetcherShutdownCallbackFn, bgSyncOplogFetcherBatchSize); - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); if (_state != ProducerState::Running) { return; } @@ -504,7 +504,7 @@ Status BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begi // are done to prevent going into shutdown. This avoids a race where shutdown() clears the // buffer between the time we check _inShutdown and the point where we finish writing to the // buffer. - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); if (_state != ProducerState::Running) { return Status::OK(); } @@ -556,7 +556,7 @@ void BackgroundSync::_runRollback(OperationContext* opCtx, OpTime lastOpTimeFetched; { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); lastOpTimeFetched = _lastOpTimeFetched; } @@ -633,7 +633,7 @@ void BackgroundSync::_runRollbackViaRecoverToCheckpoint( rollbackRemoteOplogQueryBatchSize.load()); { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); if (_state != ProducerState::Running) { return; } @@ -670,18 +670,18 @@ void BackgroundSync::_fallBackOnRollbackViaRefetch( } HostAndPort BackgroundSync::getSyncTarget() const { - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); return _syncSourceHost; } void BackgroundSync::clearSyncTarget() { - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); log() << "Resetting sync source to empty, which was " << _syncSourceHost; _syncSourceHost = HostAndPort(); } void BackgroundSync::stop(bool resetLastFetchedOptime) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); _state = ProducerState::Stopped; log() << "Stopping replication producer"; @@ -711,7 +711,7 @@ void BackgroundSync::start(OperationContext* opCtx) { do { lastAppliedOpTime = _readLastAppliedOpTime(opCtx); - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); // Double check the state after acquiring the mutex. if (_state != ProducerState::Starting) { return; @@ -781,12 +781,12 @@ bool BackgroundSync::shouldStopFetching() const { } BackgroundSync::ProducerState BackgroundSync::getState() const { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _state; } void BackgroundSync::startProducerIfStopped() { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); // Let producer run if it's already running. if (_state == ProducerState::Stopped) { _state = ProducerState::Starting; diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h index 194bf202b8f..de99f5191af 100644 --- a/src/mongo/db/repl/bgsync.h +++ b/src/mongo/db/repl/bgsync.h @@ -41,9 +41,9 @@ #include "mongo/db/repl/optime.h" #include "mongo/db/repl/rollback_impl.h" #include "mongo/db/repl/sync_source_resolver.h" +#include "mongo/platform/mutex.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/functional.h" -#include "mongo/stdx/mutex.h" #include "mongo/stdx/thread.h" #include "mongo/util/net/hostandport.h" @@ -230,7 +230,7 @@ private: // Protects member data of BackgroundSync. // Never hold the BackgroundSync mutex when trying to acquire the ReplicationCoordinator mutex. - mutable stdx::mutex _mutex; // (S) + mutable Mutex _mutex = MONGO_MAKE_LATCH("BackgroundSync::_mutex"); // (S) OpTime _lastOpTimeFetched; // (M) diff --git a/src/mongo/db/repl/callback_completion_guard.h b/src/mongo/db/repl/callback_completion_guard.h index 9eb4020db7d..a83e27af979 100644 --- a/src/mongo/db/repl/callback_completion_guard.h +++ b/src/mongo/db/repl/callback_completion_guard.h @@ -32,8 +32,8 @@ #include <boost/optional.hpp> +#include "mongo/platform/mutex.h" #include "mongo/stdx/functional.h" -#include "mongo/stdx/mutex.h" #include "mongo/util/assert_util.h" namespace mongo { @@ -83,9 +83,9 @@ public: * Requires either a unique_lock or lock_guard to be passed in to ensure that we call * _cancelRemainingWork_inlock()) while we have a lock on the callers's mutex. */ - void setResultAndCancelRemainingWork_inlock(const stdx::lock_guard<stdx::mutex>& lock, + void setResultAndCancelRemainingWork_inlock(const stdx::lock_guard<Latch>& lock, const Result& result); - void setResultAndCancelRemainingWork_inlock(const stdx::unique_lock<stdx::mutex>& lock, + void setResultAndCancelRemainingWork_inlock(const stdx::unique_lock<Latch>& lock, const Result& result); private: @@ -124,13 +124,13 @@ CallbackCompletionGuard<Result>::~CallbackCompletionGuard() { template <typename Result> void CallbackCompletionGuard<Result>::setResultAndCancelRemainingWork_inlock( - const stdx::lock_guard<stdx::mutex>& lock, const Result& result) { + const stdx::lock_guard<Latch>& lock, const Result& result) { _setResultAndCancelRemainingWork_inlock(result); } template <typename Result> void CallbackCompletionGuard<Result>::setResultAndCancelRemainingWork_inlock( - const stdx::unique_lock<stdx::mutex>& lock, const Result& result) { + const stdx::unique_lock<Latch>& lock, const Result& result) { invariant(lock.owns_lock()); _setResultAndCancelRemainingWork_inlock(result); } diff --git a/src/mongo/db/repl/check_quorum_for_config_change_test.cpp b/src/mongo/db/repl/check_quorum_for_config_change_test.cpp index a80a9160896..31e4f1c9c42 100644 --- a/src/mongo/db/repl/check_quorum_for_config_change_test.cpp +++ b/src/mongo/db/repl/check_quorum_for_config_change_test.cpp @@ -87,7 +87,7 @@ private: std::unique_ptr<stdx::thread> _quorumCheckThread; Status _quorumCheckStatus; - stdx::mutex _mutex; + Mutex _mutex = MONGO_MAKE_LATCH("CheckQuorumTest::_mutex"); bool _isQuorumCheckDone; }; @@ -108,13 +108,13 @@ Status CheckQuorumTest::waitForQuorumCheck() { } bool CheckQuorumTest::isQuorumCheckDone() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _isQuorumCheckDone; } void CheckQuorumTest::_runQuorumCheck(const ReplSetConfig& config, int myIndex) { _quorumCheckStatus = _runQuorumCheckImpl(config, myIndex); - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _isQuorumCheckDone = true; } diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp index a974144673a..a283762f140 100644 --- a/src/mongo/db/repl/collection_cloner.cpp +++ b/src/mongo/db/repl/collection_cloner.cpp @@ -57,8 +57,8 @@ namespace mongo { namespace repl { namespace { -using LockGuard = stdx::lock_guard<stdx::mutex>; -using UniqueLock = stdx::unique_lock<stdx::mutex>; +using LockGuard = stdx::lock_guard<Latch>; +using UniqueLock = stdx::unique_lock<Latch>; using executor::RemoteCommandRequest; constexpr auto kCountResponseDocumentCountFieldName = "n"_sd; @@ -199,7 +199,7 @@ bool CollectionCloner::_isActive_inlock() const { } bool CollectionCloner::_isShuttingDown() const { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return State::kShuttingDown == _state; } @@ -230,7 +230,7 @@ Status CollectionCloner::startup() noexcept { } void CollectionCloner::shutdown() { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); switch (_state) { case State::kPreStart: // Transition directly from PreStart to Complete if not started yet. @@ -263,12 +263,12 @@ void CollectionCloner::_cancelRemainingWork_inlock() { } CollectionCloner::Stats CollectionCloner::getStats() const { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); return _stats; } void CollectionCloner::join() { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); _condition.wait(lk, [this]() { return (_queryState == QueryState::kNotStarted || _queryState == QueryState::kFinished) && !_isActive_inlock(); @@ -288,7 +288,7 @@ void CollectionCloner::setScheduleDbWorkFn_forTest(ScheduleDbWorkFn scheduleDbWo } void CollectionCloner::setCreateClientFn_forTest(const CreateClientFn& createClientFn) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _createClientFn = createClientFn; } @@ -478,7 +478,7 @@ void CollectionCloner::_beginCollectionCallback(const executor::TaskExecutor::Ca auto cancelRemainingWorkInLock = [this]() { _cancelRemainingWork_inlock(); }; auto finishCallbackFn = [this](const Status& status) { { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); _queryState = QueryState::kFinished; _clientConnection.reset(); } @@ -498,13 +498,13 @@ void CollectionCloner::_beginCollectionCallback(const executor::TaskExecutor::Ca void CollectionCloner::_runQuery(const executor::TaskExecutor::CallbackArgs& callbackData, std::shared_ptr<OnCompletionGuard> onCompletionGuard) { if (!callbackData.status.isOK()) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, callbackData.status); return; } bool queryStateOK = false; { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); queryStateOK = _queryState == QueryState::kNotStarted; if (queryStateOK) { _queryState = QueryState::kRunning; @@ -529,12 +529,12 @@ void CollectionCloner::_runQuery(const executor::TaskExecutor::CallbackArgs& cal Status clientConnectionStatus = _clientConnection->connect(_source, StringData()); if (!clientConnectionStatus.isOK()) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, clientConnectionStatus); return; } if (!replAuthenticate(_clientConnection.get())) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); onCompletionGuard->setResultAndCancelRemainingWork_inlock( lock, {ErrorCodes::AuthenticationFailed, @@ -561,7 +561,7 @@ void CollectionCloner::_runQuery(const executor::TaskExecutor::CallbackArgs& cal } catch (const DBException& e) { auto queryStatus = e.toStatus().withContext(str::stream() << "Error querying collection '" << _sourceNss.ns()); - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); if (queryStatus.code() == ErrorCodes::OperationFailed || queryStatus.code() == ErrorCodes::CursorNotFound || queryStatus.code() == ErrorCodes::QueryPlanKilled) { @@ -581,7 +581,7 @@ void CollectionCloner::_runQuery(const executor::TaskExecutor::CallbackArgs& cal } } waitForDbWorker(); - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, Status::OK()); } @@ -589,7 +589,7 @@ void CollectionCloner::_handleNextBatch(std::shared_ptr<OnCompletionGuard> onCom DBClientCursorBatchIterator& iter) { _stats.receivedBatches++; { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); uassert(ErrorCodes::CallbackCanceled, "Collection cloning cancelled.", _queryState != QueryState::kCanceling); @@ -628,7 +628,7 @@ void CollectionCloner::_handleNextBatch(std::shared_ptr<OnCompletionGuard> onCom } void CollectionCloner::_verifyCollectionWasDropped( - const stdx::unique_lock<stdx::mutex>& lk, + const stdx::unique_lock<Latch>& lk, Status batchStatus, std::shared_ptr<OnCompletionGuard> onCompletionGuard) { // If we already have a _verifyCollectionDroppedScheduler, just return; the existing @@ -691,7 +691,7 @@ void CollectionCloner::_insertDocumentsCallback( const executor::TaskExecutor::CallbackArgs& cbd, std::shared_ptr<OnCompletionGuard> onCompletionGuard) { if (!cbd.status.isOK()) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, cbd.status); return; } diff --git a/src/mongo/db/repl/collection_cloner.h b/src/mongo/db/repl/collection_cloner.h index 33ca6ef6e71..1eb92679c9e 100644 --- a/src/mongo/db/repl/collection_cloner.h +++ b/src/mongo/db/repl/collection_cloner.h @@ -47,9 +47,9 @@ #include "mongo/db/repl/storage_interface.h" #include "mongo/db/repl/task_runner.h" #include "mongo/executor/task_executor.h" +#include "mongo/platform/mutex.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/functional.h" -#include "mongo/stdx/mutex.h" #include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/net/hostandport.h" #include "mongo/util/progress_meter.h" @@ -239,7 +239,7 @@ private: * Verifies that an error from the query was the result of a collection drop. If * so, cloning is stopped with no error. Otherwise it is stopped with the given error. */ - void _verifyCollectionWasDropped(const stdx::unique_lock<stdx::mutex>& lk, + void _verifyCollectionWasDropped(const stdx::unique_lock<Latch>& lk, Status batchStatus, std::shared_ptr<OnCompletionGuard> onCompletionGuard); @@ -259,7 +259,7 @@ private: // (S) Self-synchronizing; access in any way from any context. // (RT) Read-only in concurrent operation; synchronized externally by tests // - mutable stdx::mutex _mutex; + mutable Mutex _mutex = MONGO_MAKE_LATCH("CollectionCloner::_mutex"); mutable stdx::condition_variable _condition; // (M) executor::TaskExecutor* _executor; // (R) Not owned by us. ThreadPool* _dbWorkThreadPool; // (R) Not owned by us. diff --git a/src/mongo/db/repl/collection_cloner_test.cpp b/src/mongo/db/repl/collection_cloner_test.cpp index 09e61df9080..84cd3a8004b 100644 --- a/src/mongo/db/repl/collection_cloner_test.cpp +++ b/src/mongo/db/repl/collection_cloner_test.cpp @@ -66,7 +66,7 @@ public: : MockDBClientConnection(remote), _net(net) {} virtual ~FailableMockDBClientConnection() { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); _paused = false; _cond.notify_all(); _cond.wait(lk, [this] { return !_resuming; }); @@ -87,13 +87,13 @@ public: int batchSize) override { ON_BLOCK_EXIT([this]() { { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _queryCount++; } _cond.notify_all(); }); { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); _waiting = _paused; _cond.notify_all(); while (_paused) { @@ -119,14 +119,14 @@ public: void pause() { { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _paused = true; } _cond.notify_all(); } void resume() { { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); _resuming = true; _resume(&lk); _resuming = false; @@ -136,13 +136,13 @@ public: // Waits for the next query after pause() is called to start. void waitForPausedQuery() { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); _cond.wait(lk, [this] { return _waiting; }); } // Resumes, then waits for the next query to run after resume() is called to complete. void resumeAndWaitForResumedQuery() { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); _resuming = true; _resume(&lk); _cond.notify_all(); // This is to wake up the paused thread. @@ -153,7 +153,7 @@ public: private: executor::NetworkInterfaceMock* _net; - stdx::mutex _mutex; + Mutex _mutex = MONGO_MAKE_LATCH("FailableMockDBClientConnection::_mutex"); stdx::condition_variable _cond; bool _paused = false; bool _waiting = false; @@ -163,7 +163,7 @@ private: Status _failureForConnect = Status::OK(); Status _failureForQuery = Status::OK(); - void _resume(stdx::unique_lock<stdx::mutex>* lk) { + void _resume(stdx::unique_lock<Latch>* lk) { invariant(lk->owns_lock()); _paused = false; _resumedQueryCount = _queryCount; diff --git a/src/mongo/db/repl/database_cloner.cpp b/src/mongo/db/repl/database_cloner.cpp index 1466b7b2dc3..a8a61969809 100644 --- a/src/mongo/db/repl/database_cloner.cpp +++ b/src/mongo/db/repl/database_cloner.cpp @@ -57,8 +57,8 @@ MONGO_FAIL_POINT_DEFINE(initialSyncHangBeforeListCollections); namespace { -using LockGuard = stdx::lock_guard<stdx::mutex>; -using UniqueLock = stdx::unique_lock<stdx::mutex>; +using LockGuard = stdx::lock_guard<Latch>; +using UniqueLock = stdx::unique_lock<Latch>; using executor::RemoteCommandRequest; const char* kNameFieldName = "name"; @@ -206,7 +206,7 @@ Status DatabaseCloner::startup() noexcept { } void DatabaseCloner::shutdown() { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); switch (_state) { case State::kPreStart: // Transition directly from PreStart to Complete if not started yet. @@ -254,7 +254,7 @@ void DatabaseCloner::setStartCollectionClonerFn( } DatabaseCloner::State DatabaseCloner::getState_forTest() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _state; } diff --git a/src/mongo/db/repl/database_cloner.h b/src/mongo/db/repl/database_cloner.h index 94b559d8278..051c0ba35a3 100644 --- a/src/mongo/db/repl/database_cloner.h +++ b/src/mongo/db/repl/database_cloner.h @@ -41,8 +41,8 @@ #include "mongo/db/repl/base_cloner.h" #include "mongo/db/repl/collection_cloner.h" #include "mongo/executor/task_executor.h" +#include "mongo/platform/mutex.h" #include "mongo/stdx/condition_variable.h" -#include "mongo/stdx/mutex.h" #include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/net/hostandport.h" @@ -201,7 +201,7 @@ private: /** * Calls the above method after unlocking. */ - void _finishCallback_inlock(stdx::unique_lock<stdx::mutex>& lk, const Status& status); + void _finishCallback_inlock(stdx::unique_lock<Latch>& lk, const Status& status); // // All member variables are labeled with one of the following codes indicating the @@ -212,7 +212,7 @@ private: // (S) Self-synchronizing; access in any way from any context. // (RT) Read-only in concurrent operation; synchronized externally by tests // - mutable stdx::mutex _mutex; + mutable Mutex _mutex = MONGO_MAKE_LATCH("DatabaseCloner::_mutex"); mutable stdx::condition_variable _condition; // (M) executor::TaskExecutor* _executor; // (R) ThreadPool* _dbWorkThreadPool; // (R) diff --git a/src/mongo/db/repl/databases_cloner.cpp b/src/mongo/db/repl/databases_cloner.cpp index 54c1b3fdfc5..1a0857ce6b7 100644 --- a/src/mongo/db/repl/databases_cloner.cpp +++ b/src/mongo/db/repl/databases_cloner.cpp @@ -56,8 +56,8 @@ namespace { using Request = executor::RemoteCommandRequest; using Response = executor::RemoteCommandResponse; -using LockGuard = stdx::lock_guard<stdx::mutex>; -using UniqueLock = stdx::unique_lock<stdx::mutex>; +using LockGuard = stdx::lock_guard<Latch>; +using UniqueLock = stdx::unique_lock<Latch>; } // namespace diff --git a/src/mongo/db/repl/databases_cloner.h b/src/mongo/db/repl/databases_cloner.h index 8d94afe26fc..db6f1129edf 100644 --- a/src/mongo/db/repl/databases_cloner.h +++ b/src/mongo/db/repl/databases_cloner.h @@ -42,8 +42,8 @@ #include "mongo/db/repl/collection_cloner.h" #include "mongo/db/repl/database_cloner.h" #include "mongo/executor/task_executor.h" +#include "mongo/platform/mutex.h" #include "mongo/stdx/condition_variable.h" -#include "mongo/stdx/mutex.h" #include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/net/hostandport.h" @@ -138,10 +138,10 @@ private: void _setStatus_inlock(Status s); /** Will fail the cloner, call the completion function, and become inactive. */ - void _fail_inlock(stdx::unique_lock<stdx::mutex>* lk, Status s); + void _fail_inlock(stdx::unique_lock<Latch>* lk, Status s); /** Will call the completion function, and become inactive. */ - void _succeed_inlock(stdx::unique_lock<stdx::mutex>* lk); + void _succeed_inlock(stdx::unique_lock<Latch>* lk); /** Called each time a database clone is finished */ void _onEachDBCloneFinish(const Status& status, const std::string& name); @@ -175,7 +175,7 @@ private: // (M) Reads and writes guarded by _mutex // (S) Self-synchronizing; access in any way from any context. // - mutable stdx::mutex _mutex; // (S) + mutable Mutex _mutex = MONGO_MAKE_LATCH("DatabasesCloner::_mutex"); // (S) Status _status{ErrorCodes::NotYetInitialized, ""}; // (M) If it is not OK, we stop everything. executor::TaskExecutor* _exec; // (R) executor to schedule things with ThreadPool* _dbWorkThreadPool; // (R) db worker thread pool for collection cloning. diff --git a/src/mongo/db/repl/databases_cloner_test.cpp b/src/mongo/db/repl/databases_cloner_test.cpp index a631fff5dbc..b09146240b3 100644 --- a/src/mongo/db/repl/databases_cloner_test.cpp +++ b/src/mongo/db/repl/databases_cloner_test.cpp @@ -43,7 +43,7 @@ #include "mongo/dbtests/mock/mock_dbclient_connection.h" #include "mongo/executor/network_interface_mock.h" #include "mongo/executor/thread_pool_task_executor_test_fixture.h" -#include "mongo/stdx/mutex.h" +#include "mongo/platform/mutex.h" #include "mongo/unittest/task_executor_proxy.h" #include "mongo/unittest/unittest.h" #include "mongo/util/concurrency/thread_name.h" @@ -57,9 +57,9 @@ using namespace mongo::repl; using executor::NetworkInterfaceMock; using executor::RemoteCommandRequest; using executor::RemoteCommandResponse; -using LockGuard = stdx::lock_guard<stdx::mutex>; -using UniqueLock = stdx::unique_lock<stdx::mutex>; -using mutex = stdx::mutex; +using LockGuard = stdx::lock_guard<Latch>; +using UniqueLock = stdx::unique_lock<Latch>; +using mutex = Mutex; using NetworkGuard = executor::NetworkInterfaceMock::InNetworkGuard; using namespace unittest; using Responses = std::vector<std::pair<std::string, BSONObj>>; @@ -288,7 +288,7 @@ protected: void runCompleteClone(Responses responses) { Status result{Status::OK()}; bool done = false; - stdx::mutex mutex; + auto mutex = MONGO_MAKE_LATCH(); stdx::condition_variable cvDone; DatabasesCloner cloner{&getStorage(), &getExecutor(), diff --git a/src/mongo/db/repl/drop_pending_collection_reaper.cpp b/src/mongo/db/repl/drop_pending_collection_reaper.cpp index 31993c6acc2..2484bf1d892 100644 --- a/src/mongo/db/repl/drop_pending_collection_reaper.cpp +++ b/src/mongo/db/repl/drop_pending_collection_reaper.cpp @@ -78,7 +78,7 @@ DropPendingCollectionReaper::DropPendingCollectionReaper(StorageInterface* stora void DropPendingCollectionReaper::addDropPendingNamespace( const OpTime& dropOpTime, const NamespaceString& dropPendingNamespace) { invariant(dropPendingNamespace.isDropPendingNamespace()); - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); const auto equalRange = _dropPendingNamespaces.equal_range(dropOpTime); const auto& lowerBound = equalRange.first; const auto& upperBound = equalRange.second; @@ -95,7 +95,7 @@ void DropPendingCollectionReaper::addDropPendingNamespace( } boost::optional<OpTime> DropPendingCollectionReaper::getEarliestDropOpTime() { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); auto it = _dropPendingNamespaces.cbegin(); if (it == _dropPendingNamespaces.cend()) { return boost::none; @@ -110,7 +110,7 @@ bool DropPendingCollectionReaper::rollBackDropPendingCollection( const auto pendingNss = collectionNamespace.makeDropPendingNamespace(opTime); { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); const auto equalRange = _dropPendingNamespaces.equal_range(opTime); const auto& lowerBound = equalRange.first; const auto& upperBound = equalRange.second; @@ -135,7 +135,7 @@ void DropPendingCollectionReaper::dropCollectionsOlderThan(OperationContext* opC const OpTime& opTime) { DropPendingNamespaces toDrop; { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); for (auto it = _dropPendingNamespaces.cbegin(); it != _dropPendingNamespaces.cend() && it->first <= opTime; ++it) { @@ -175,7 +175,7 @@ void DropPendingCollectionReaper::dropCollectionsOlderThan(OperationContext* opC { // Entries must be removed AFTER drops are completed, so that getEarliestDropOpTime() // returns appropriate results. - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); auto it = _dropPendingNamespaces.cbegin(); while (it != _dropPendingNamespaces.cend() && it->first <= opTime) { if (toDrop.find(it->first) != toDrop.cend()) { diff --git a/src/mongo/db/repl/drop_pending_collection_reaper.h b/src/mongo/db/repl/drop_pending_collection_reaper.h index be8dd9a77d8..133c693fa0b 100644 --- a/src/mongo/db/repl/drop_pending_collection_reaper.h +++ b/src/mongo/db/repl/drop_pending_collection_reaper.h @@ -36,7 +36,7 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/repl/optime.h" -#include "mongo/stdx/mutex.h" +#include "mongo/platform/mutex.h" namespace mongo { @@ -100,7 +100,7 @@ public: void dropCollectionsOlderThan(OperationContext* opCtx, const OpTime& opTime); void clearDropPendingState() { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); _dropPendingNamespaces.clear(); } @@ -126,7 +126,7 @@ private: // (M) Reads and writes guarded by _mutex. // Guards access to member variables. - stdx::mutex _mutex; + Mutex _mutex = MONGO_MAKE_LATCH("DropPendingCollectionReaper::_mutex"); // Used to access the storage layer. StorageInterface* const _storageInterface; // (R) diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp index b489e842b4c..a5e8860ce30 100644 --- a/src/mongo/db/repl/initial_syncer.cpp +++ b/src/mongo/db/repl/initial_syncer.cpp @@ -120,8 +120,8 @@ using Event = executor::TaskExecutor::EventHandle; using Handle = executor::TaskExecutor::CallbackHandle; using Operations = MultiApplier::Operations; using QueryResponseStatus = StatusWith<Fetcher::QueryResponse>; -using UniqueLock = stdx::unique_lock<stdx::mutex>; -using LockGuard = stdx::lock_guard<stdx::mutex>; +using UniqueLock = stdx::unique_lock<Latch>; +using LockGuard = stdx::lock_guard<Latch>; // Used to reset the oldest timestamp during initial sync to a non-null timestamp. const Timestamp kTimestampOne(0, 1); @@ -243,7 +243,7 @@ InitialSyncer::~InitialSyncer() { } bool InitialSyncer::isActive() const { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _isActive_inlock(); } @@ -256,7 +256,7 @@ Status InitialSyncer::startup(OperationContext* opCtx, invariant(opCtx); invariant(initialSyncMaxAttempts >= 1U); - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); switch (_state) { case State::kPreStart: _state = State::kRunning; @@ -289,7 +289,7 @@ Status InitialSyncer::startup(OperationContext* opCtx, } Status InitialSyncer::shutdown() { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); switch (_state) { case State::kPreStart: // Transition directly from PreStart to Complete if not started yet. @@ -327,22 +327,22 @@ void InitialSyncer::_cancelRemainingWork_inlock() { } void InitialSyncer::join() { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); _stateCondition.wait(lk, [this]() { return !_isActive_inlock(); }); } InitialSyncer::State InitialSyncer::getState_forTest() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _state; } Date_t InitialSyncer::getWallClockTime_forTest() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _lastApplied.wallTime; } bool InitialSyncer::_isShuttingDown() const { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _isShuttingDown_inlock(); } @@ -515,7 +515,7 @@ void InitialSyncer::_startInitialSyncAttemptCallback( // Lock guard must be declared after completion guard because completion guard destructor // has to run outside lock. - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); _oplogApplier = {}; @@ -569,7 +569,7 @@ void InitialSyncer::_chooseSyncSourceCallback( std::uint32_t chooseSyncSourceAttempt, std::uint32_t chooseSyncSourceMaxAttempts, std::shared_ptr<OnCompletionGuard> onCompletionGuard) { - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); // Cancellation should be treated the same as other errors. In this case, the most likely cause // of a failed _chooseSyncSourceCallback() task is a cancellation triggered by // InitialSyncer::shutdown() or the task executor shutting down. @@ -724,7 +724,7 @@ Status InitialSyncer::_scheduleGetBeginFetchingOpTime_inlock( void InitialSyncer::_rollbackCheckerResetCallback( const RollbackChecker::Result& result, std::shared_ptr<OnCompletionGuard> onCompletionGuard) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); auto status = _checkForShutdownAndConvertStatus_inlock(result.getStatus(), "error while getting base rollback ID"); if (!status.isOK()) { @@ -742,7 +742,7 @@ void InitialSyncer::_rollbackCheckerResetCallback( void InitialSyncer::_getBeginFetchingOpTimeCallback( const StatusWith<Fetcher::QueryResponse>& result, std::shared_ptr<OnCompletionGuard> onCompletionGuard) { - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); auto status = _checkForShutdownAndConvertStatus_inlock( result.getStatus(), "error while getting oldest active transaction timestamp for begin fetching timestamp"); @@ -792,7 +792,7 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackForBeginApplyingTimestamp( const StatusWith<Fetcher::QueryResponse>& result, std::shared_ptr<OnCompletionGuard> onCompletionGuard, OpTime& beginFetchingOpTime) { - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); auto status = _checkForShutdownAndConvertStatus_inlock( result.getStatus(), "error while getting last oplog entry for begin timestamp"); if (!status.isOK()) { @@ -849,7 +849,7 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse> std::shared_ptr<OnCompletionGuard> onCompletionGuard, const OpTime& lastOpTime, OpTime& beginFetchingOpTime) { - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); auto status = _checkForShutdownAndConvertStatus_inlock( result.getStatus(), "error while getting the remote feature compatibility version"); if (!status.isOK()) { @@ -1026,7 +1026,7 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse> void InitialSyncer::_oplogFetcherCallback(const Status& oplogFetcherFinishStatus, std::shared_ptr<OnCompletionGuard> onCompletionGuard) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); log() << "Finished fetching oplog during initial sync: " << redact(oplogFetcherFinishStatus) << ". Last fetched optime: " << _lastFetched.toString(); @@ -1073,7 +1073,7 @@ void InitialSyncer::_databasesClonerCallback(const Status& databaseClonerFinishS } } - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); auto status = _checkForShutdownAndConvertStatus_inlock(databaseClonerFinishStatus, "error cloning databases"); if (!status.isOK()) { @@ -1098,7 +1098,7 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackForStopTimestamp( std::shared_ptr<OnCompletionGuard> onCompletionGuard) { OpTimeAndWallTime resultOpTimeAndWallTime = {OpTime(), Date_t()}; { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); auto status = _checkForShutdownAndConvertStatus_inlock( result.getStatus(), "error fetching last oplog entry for stop timestamp"); if (!status.isOK()) { @@ -1145,7 +1145,7 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackForStopTimestamp( TimestampedBSONObj{oplogSeedDoc, resultOpTimeAndWallTime.opTime.getTimestamp()}, resultOpTimeAndWallTime.opTime.getTerm()); if (!status.isOK()) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); return; } @@ -1154,7 +1154,7 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackForStopTimestamp( opCtx.get(), resultOpTimeAndWallTime.opTime.getTimestamp(), orderedCommit); } - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); _lastApplied = resultOpTimeAndWallTime; log() << "No need to apply operations. (currently at " << _initialSyncState->stopTimestamp.toBSON() << ")"; @@ -1166,7 +1166,7 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackForStopTimestamp( void InitialSyncer::_getNextApplierBatchCallback( const executor::TaskExecutor::CallbackArgs& callbackArgs, std::shared_ptr<OnCompletionGuard> onCompletionGuard) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); auto status = _checkForShutdownAndConvertStatus_inlock(callbackArgs, "error getting next applier batch"); if (!status.isOK()) { @@ -1267,7 +1267,7 @@ void InitialSyncer::_multiApplierCallback(const Status& multiApplierStatus, OpTimeAndWallTime lastApplied, std::uint32_t numApplied, std::shared_ptr<OnCompletionGuard> onCompletionGuard) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); auto status = _checkForShutdownAndConvertStatus_inlock(multiApplierStatus, "error applying batch"); @@ -1324,7 +1324,7 @@ void InitialSyncer::_multiApplierCallback(const Status& multiApplierStatus, void InitialSyncer::_lastOplogEntryFetcherCallbackAfterFetchingMissingDocuments( const StatusWith<Fetcher::QueryResponse>& result, std::shared_ptr<OnCompletionGuard> onCompletionGuard) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); auto status = _checkForShutdownAndConvertStatus_inlock( result.getStatus(), "error getting last oplog entry after fetching missing documents"); if (!status.isOK()) { @@ -1354,7 +1354,7 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackAfterFetchingMissingDocuments( void InitialSyncer::_rollbackCheckerCheckForRollbackCallback( const RollbackChecker::Result& result, std::shared_ptr<OnCompletionGuard> onCompletionGuard) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); auto status = _checkForShutdownAndConvertStatus_inlock(result.getStatus(), "error while getting last rollback ID"); if (!status.isOK()) { @@ -1419,7 +1419,7 @@ void InitialSyncer::_finishInitialSyncAttempt(const StatusWith<OpTimeAndWallTime log() << "Initial sync attempt finishing up."; - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); log() << "Initial Sync Attempt Statistics: " << redact(_getInitialSyncProgress_inlock()); auto runTime = _initialSyncState ? _initialSyncState->timer.millis() : 0; @@ -1492,7 +1492,7 @@ void InitialSyncer::_finishCallback(StatusWith<OpTimeAndWallTime> lastApplied) { // before we transition the state to Complete. decltype(_onCompletion) onCompletion; { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); auto opCtx = makeOpCtx(); _tearDown_inlock(opCtx.get(), lastApplied); @@ -1522,7 +1522,7 @@ void InitialSyncer::_finishCallback(StatusWith<OpTimeAndWallTime> lastApplied) { // before InitialSyncer::join() returns. onCompletion = {}; - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); invariant(_state != State::kComplete); _state = State::kComplete; _stateCondition.notify_all(); @@ -1558,8 +1558,7 @@ Status InitialSyncer::_scheduleLastOplogEntryFetcher_inlock(Fetcher::CallbackFn } void InitialSyncer::_checkApplierProgressAndScheduleGetNextApplierBatch_inlock( - const stdx::lock_guard<stdx::mutex>& lock, - std::shared_ptr<OnCompletionGuard> onCompletionGuard) { + const stdx::lock_guard<Latch>& lock, std::shared_ptr<OnCompletionGuard> onCompletionGuard) { // We should check our current state because shutdown() could have been called before // we re-acquired the lock. if (_isShuttingDown_inlock()) { @@ -1614,8 +1613,7 @@ void InitialSyncer::_checkApplierProgressAndScheduleGetNextApplierBatch_inlock( } void InitialSyncer::_scheduleRollbackCheckerCheckForRollback_inlock( - const stdx::lock_guard<stdx::mutex>& lock, - std::shared_ptr<OnCompletionGuard> onCompletionGuard) { + const stdx::lock_guard<Latch>& lock, std::shared_ptr<OnCompletionGuard> onCompletionGuard) { // We should check our current state because shutdown() could have been called before // we re-acquired the lock. if (_isShuttingDown_inlock()) { diff --git a/src/mongo/db/repl/initial_syncer.h b/src/mongo/db/repl/initial_syncer.h index 4b994f9ea88..6103099a435 100644 --- a/src/mongo/db/repl/initial_syncer.h +++ b/src/mongo/db/repl/initial_syncer.h @@ -51,9 +51,9 @@ #include "mongo/db/repl/rollback_checker.h" #include "mongo/db/repl/sync_source_selector.h" #include "mongo/dbtests/mock/mock_dbclient_connection.h" +#include "mongo/platform/mutex.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/functional.h" -#include "mongo/stdx/mutex.h" #include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/net/hostandport.h" @@ -535,8 +535,7 @@ private: * Passes 'lock' through to completion guard. */ void _checkApplierProgressAndScheduleGetNextApplierBatch_inlock( - const stdx::lock_guard<stdx::mutex>& lock, - std::shared_ptr<OnCompletionGuard> onCompletionGuard); + const stdx::lock_guard<Latch>& lock, std::shared_ptr<OnCompletionGuard> onCompletionGuard); /** * Schedules a rollback checker to get the rollback ID after data cloning or applying. This @@ -546,8 +545,7 @@ private: * Passes 'lock' through to completion guard. */ void _scheduleRollbackCheckerCheckForRollback_inlock( - const stdx::lock_guard<stdx::mutex>& lock, - std::shared_ptr<OnCompletionGuard> onCompletionGuard); + const stdx::lock_guard<Latch>& lock, std::shared_ptr<OnCompletionGuard> onCompletionGuard); /** * Checks the given status (or embedded status inside the callback args) and current data @@ -607,7 +605,7 @@ private: // (MX) Must hold _mutex and be in a callback in _exec to write; must either hold // _mutex or be in a callback in _exec to read. - mutable stdx::mutex _mutex; // (S) + mutable Mutex _mutex = MONGO_MAKE_LATCH("InitialSyncer::_mutex"); // (S) const InitialSyncerOptions _opts; // (R) std::unique_ptr<DataReplicatorExternalState> _dataReplicatorExternalState; // (R) executor::TaskExecutor* _exec; // (R) diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp index 003ec073d1d..8065edda292 100644 --- a/src/mongo/db/repl/initial_syncer_test.cpp +++ b/src/mongo/db/repl/initial_syncer_test.cpp @@ -60,7 +60,7 @@ #include "mongo/db/service_context_test_fixture.h" #include "mongo/executor/network_interface_mock.h" #include "mongo/executor/thread_pool_task_executor_test_fixture.h" -#include "mongo/stdx/mutex.h" +#include "mongo/platform/mutex.h" #include "mongo/util/concurrency/thread_name.h" #include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/fail_point_service.h" @@ -104,9 +104,9 @@ using executor::RemoteCommandRequest; using executor::RemoteCommandResponse; using unittest::log; -using LockGuard = stdx::lock_guard<stdx::mutex>; +using LockGuard = stdx::lock_guard<Latch>; using NetworkGuard = executor::NetworkInterfaceMock::InNetworkGuard; -using UniqueLock = stdx::unique_lock<stdx::mutex>; +using UniqueLock = stdx::unique_lock<Latch>; struct CollectionCloneInfo { std::shared_ptr<CollectionMockStats> stats = std::make_shared<CollectionMockStats>(); @@ -246,7 +246,9 @@ protected: bool upgradeNonReplicatedUniqueIndexesShouldFail = false; }; - stdx::mutex _storageInterfaceWorkDoneMutex; // protects _storageInterfaceWorkDone. + // protects _storageInterfaceWorkDone. + Mutex _storageInterfaceWorkDoneMutex = + MONGO_MAKE_LATCH("InitialSyncerTest::_storageInterfaceWorkDoneMutex"); StorageInterfaceResults _storageInterfaceWorkDone; void setUp() override { diff --git a/src/mongo/db/repl/local_oplog_info.cpp b/src/mongo/db/repl/local_oplog_info.cpp index 069c199def1..b17da6d88c5 100644 --- a/src/mongo/db/repl/local_oplog_info.cpp +++ b/src/mongo/db/repl/local_oplog_info.cpp @@ -95,7 +95,7 @@ void LocalOplogInfo::resetCollection() { } void LocalOplogInfo::setNewTimestamp(ServiceContext* service, const Timestamp& newTime) { - stdx::lock_guard<stdx::mutex> lk(_newOpMutex); + stdx::lock_guard<Latch> lk(_newOpMutex); LogicalClock::get(service)->setClusterTimeFromTrustedSource(LogicalTime(newTime)); } @@ -120,7 +120,7 @@ std::vector<OplogSlot> LocalOplogInfo::getNextOpTimes(OperationContext* opCtx, s // Allow the storage engine to start the transaction outside the critical section. opCtx->recoveryUnit()->preallocateSnapshot(); - stdx::lock_guard<stdx::mutex> lk(_newOpMutex); + stdx::lock_guard<Latch> lk(_newOpMutex); ts = LogicalClock::get(opCtx)->reserveTicks(count).asTimestamp(); const bool orderedCommit = false; diff --git a/src/mongo/db/repl/local_oplog_info.h b/src/mongo/db/repl/local_oplog_info.h index 67ab7e0560d..96cdb259f36 100644 --- a/src/mongo/db/repl/local_oplog_info.h +++ b/src/mongo/db/repl/local_oplog_info.h @@ -92,7 +92,7 @@ private: // Synchronizes the section where a new Timestamp is generated and when it is registered in the // storage engine. - mutable stdx::mutex _newOpMutex; + mutable Mutex _newOpMutex = MONGO_MAKE_LATCH("LocaloplogInfo::_newOpMutex"); }; } // namespace repl diff --git a/src/mongo/db/repl/multiapplier.cpp b/src/mongo/db/repl/multiapplier.cpp index 99f09fa2484..02c993a0e67 100644 --- a/src/mongo/db/repl/multiapplier.cpp +++ b/src/mongo/db/repl/multiapplier.cpp @@ -60,7 +60,7 @@ MultiApplier::~MultiApplier() { } bool MultiApplier::isActive() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _isActive_inlock(); } @@ -69,7 +69,7 @@ bool MultiApplier::_isActive_inlock() const { } Status MultiApplier::startup() noexcept { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); switch (_state) { case State::kPreStart: @@ -96,7 +96,7 @@ Status MultiApplier::startup() noexcept { } void MultiApplier::shutdown() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); switch (_state) { case State::kPreStart: // Transition directly from PreStart to Complete if not started yet. @@ -117,12 +117,12 @@ void MultiApplier::shutdown() { } void MultiApplier::join() { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); _condition.wait(lk, [this]() { return !_isActive_inlock(); }); } MultiApplier::State MultiApplier::getState_forTest() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _state; } @@ -153,14 +153,14 @@ void MultiApplier::_finishCallback(const Status& result) { // destroyed outside the lock. decltype(_onCompletion) onCompletion; { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); invariant(_onCompletion); std::swap(_onCompletion, onCompletion); } onCompletion(result); - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); invariant(State::kComplete != _state); _state = State::kComplete; _condition.notify_all(); diff --git a/src/mongo/db/repl/multiapplier.h b/src/mongo/db/repl/multiapplier.h index 87d34964c2f..e550316ec8e 100644 --- a/src/mongo/db/repl/multiapplier.h +++ b/src/mongo/db/repl/multiapplier.h @@ -42,9 +42,9 @@ #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/service_context.h" #include "mongo/executor/task_executor.h" +#include "mongo/platform/mutex.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/functional.h" -#include "mongo/stdx/mutex.h" namespace mongo { namespace repl { @@ -149,7 +149,7 @@ private: CallbackFn _onCompletion; // Protects member data of this MultiApplier. - mutable stdx::mutex _mutex; + mutable Mutex _mutex = MONGO_MAKE_LATCH("MultiApplier::_mutex"); stdx::condition_variable _condition; diff --git a/src/mongo/db/repl/noop_writer.cpp b/src/mongo/db/repl/noop_writer.cpp index 8c0fbfaa6b9..ac0c19bcceb 100644 --- a/src/mongo/db/repl/noop_writer.cpp +++ b/src/mongo/db/repl/noop_writer.cpp @@ -70,7 +70,7 @@ public: : _thread([this, noopWrite, waitTime] { run(waitTime, std::move(noopWrite)); }) {} ~PeriodicNoopRunner() { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); _inShutdown = true; _cv.notify_all(); lk.unlock(); @@ -84,7 +84,7 @@ private: const ServiceContext::UniqueOperationContext opCtxPtr = cc().makeOperationContext(); OperationContext& opCtx = *opCtxPtr; { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); MONGO_IDLE_THREAD_BLOCK; _cv.wait_for(lk, waitTime.toSystemDuration(), [&] { return _inShutdown; }); @@ -103,7 +103,7 @@ private: /** * Mutex for the CV */ - stdx::mutex _mutex; + Mutex _mutex = MONGO_MAKE_LATCH("PeriodicNoopRunner::_mutex"); /** * CV to wait for. @@ -126,7 +126,7 @@ NoopWriter::~NoopWriter() { } Status NoopWriter::startWritingPeriodicNoops(OpTime lastKnownOpTime) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _lastKnownOpTime = lastKnownOpTime; invariant(!_noopRunner); @@ -139,7 +139,7 @@ Status NoopWriter::startWritingPeriodicNoops(OpTime lastKnownOpTime) { } void NoopWriter::stopWritingPeriodicNoops() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _noopRunner.reset(); } diff --git a/src/mongo/db/repl/noop_writer.h b/src/mongo/db/repl/noop_writer.h index 07f664668d0..4d01a8bd18c 100644 --- a/src/mongo/db/repl/noop_writer.h +++ b/src/mongo/db/repl/noop_writer.h @@ -30,8 +30,8 @@ #pragma once #include "mongo/db/repl/optime.h" +#include "mongo/platform/mutex.h" #include "mongo/stdx/functional.h" -#include "mongo/stdx/mutex.h" #include "mongo/util/time_support.h" namespace mongo { @@ -74,7 +74,7 @@ private: * Protects member data of this class during start and stop. There is no need to synchronize * access once its running because its run by a one thread only. */ - mutable stdx::mutex _mutex; + mutable Mutex _mutex = MONGO_MAKE_LATCH("NoopWriter::_mutex"); std::unique_ptr<PeriodicNoopRunner> _noopRunner; }; diff --git a/src/mongo/db/repl/oplog_applier.cpp b/src/mongo/db/repl/oplog_applier.cpp index 2a1c71e9d77..e752aaa3d9e 100644 --- a/src/mongo/db/repl/oplog_applier.cpp +++ b/src/mongo/db/repl/oplog_applier.cpp @@ -107,12 +107,12 @@ Future<void> OplogApplier::startup() { void OplogApplier::shutdown() { _shutdown(); - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); _inShutdown = true; } bool OplogApplier::inShutdown() const { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _inShutdown; } diff --git a/src/mongo/db/repl/oplog_applier.h b/src/mongo/db/repl/oplog_applier.h index 7752554cfbd..9a3a346a9d0 100644 --- a/src/mongo/db/repl/oplog_applier.h +++ b/src/mongo/db/repl/oplog_applier.h @@ -40,7 +40,7 @@ #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/executor/task_executor.h" -#include "mongo/stdx/mutex.h" +#include "mongo/platform/mutex.h" #include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/functional.h" #include "mongo/util/future.h" @@ -244,7 +244,7 @@ private: Observer* const _observer; // Protects member data of OplogApplier. - mutable stdx::mutex _mutex; + mutable Mutex _mutex = MONGO_MAKE_LATCH("OplogApplier::_mutex"); // Set to true if shutdown() has been called. bool _inShutdown = false; diff --git a/src/mongo/db/repl/oplog_buffer_collection.cpp b/src/mongo/db/repl/oplog_buffer_collection.cpp index cfea973d17d..2e0736c82d1 100644 --- a/src/mongo/db/repl/oplog_buffer_collection.cpp +++ b/src/mongo/db/repl/oplog_buffer_collection.cpp @@ -106,7 +106,7 @@ void OplogBufferCollection::startup(OperationContext* opCtx) { return; } - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); // If we are starting from an existing collection, we must populate the in memory state of the // buffer. auto sizeResult = _storageInterface->getCollectionSize(opCtx, _nss); @@ -148,7 +148,7 @@ void OplogBufferCollection::startup(OperationContext* opCtx) { void OplogBufferCollection::shutdown(OperationContext* opCtx) { if (_options.dropCollectionAtShutdown) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _dropCollection(opCtx); _size = 0; _count = 0; @@ -176,7 +176,7 @@ void OplogBufferCollection::pushAllNonBlocking(OperationContext* opCtx, } size_t numDocs = std::distance(begin, end); std::vector<InsertStatement> docsToInsert(numDocs); - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); auto ts = _lastPushedTimestamp; auto sentinelCount = _sentinelCount; std::transform(begin, end, docsToInsert.begin(), [&sentinelCount, &ts](const Value& value) { @@ -202,7 +202,7 @@ void OplogBufferCollection::pushAllNonBlocking(OperationContext* opCtx, void OplogBufferCollection::waitForSpace(OperationContext* opCtx, std::size_t size) {} bool OplogBufferCollection::isEmpty() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _count == 0; } @@ -211,17 +211,17 @@ std::size_t OplogBufferCollection::getMaxSize() const { } std::size_t OplogBufferCollection::getSize() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _size; } std::size_t OplogBufferCollection::getCount() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _count; } void OplogBufferCollection::clear(OperationContext* opCtx) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _dropCollection(opCtx); _createCollection(opCtx); _size = 0; @@ -233,7 +233,7 @@ void OplogBufferCollection::clear(OperationContext* opCtx) { } bool OplogBufferCollection::tryPop(OperationContext* opCtx, Value* value) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); if (_count == 0) { return false; } @@ -241,7 +241,7 @@ bool OplogBufferCollection::tryPop(OperationContext* opCtx, Value* value) { } bool OplogBufferCollection::waitForData(Seconds waitDuration) { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); if (!_cvNoLongerEmpty.wait_for( lk, waitDuration.toSystemDuration(), [&]() { return _count != 0; })) { return false; @@ -250,7 +250,7 @@ bool OplogBufferCollection::waitForData(Seconds waitDuration) { } bool OplogBufferCollection::peek(OperationContext* opCtx, Value* value) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); if (_count == 0) { return false; } @@ -260,7 +260,7 @@ bool OplogBufferCollection::peek(OperationContext* opCtx, Value* value) { boost::optional<OplogBuffer::Value> OplogBufferCollection::lastObjectPushed( OperationContext* opCtx) const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); auto lastDocumentPushed = _lastDocumentPushed_inlock(opCtx); if (lastDocumentPushed) { BSONObj entryObj = extractEmbeddedOplogDocument(*lastDocumentPushed); @@ -365,23 +365,23 @@ void OplogBufferCollection::_dropCollection(OperationContext* opCtx) { } std::size_t OplogBufferCollection::getSentinelCount_forTest() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _sentinelCount; } Timestamp OplogBufferCollection::getLastPushedTimestamp_forTest() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _lastPushedTimestamp; } Timestamp OplogBufferCollection::getLastPoppedTimestamp_forTest() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _lastPoppedKey.isEmpty() ? Timestamp() : _lastPoppedKey[""].Obj()[kTimestampFieldName].timestamp(); } std::queue<BSONObj> OplogBufferCollection::getPeekCache_forTest() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _peekCache; } diff --git a/src/mongo/db/repl/oplog_buffer_collection.h b/src/mongo/db/repl/oplog_buffer_collection.h index b6ef88eb734..cc6af3a96cc 100644 --- a/src/mongo/db/repl/oplog_buffer_collection.h +++ b/src/mongo/db/repl/oplog_buffer_collection.h @@ -34,7 +34,7 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/repl/oplog_buffer.h" -#include "mongo/stdx/mutex.h" +#include "mongo/platform/mutex.h" #include "mongo/util/queue.h" namespace mongo { @@ -183,7 +183,7 @@ private: stdx::condition_variable _cvNoLongerEmpty; // Protects member data below and synchronizes it with the underlying collection. - mutable stdx::mutex _mutex; + mutable Mutex _mutex = MONGO_MAKE_LATCH("OplogBufferCollection::_mutex"); // Number of documents in buffer. std::size_t _count = 0; diff --git a/src/mongo/db/repl/oplog_buffer_proxy.cpp b/src/mongo/db/repl/oplog_buffer_proxy.cpp index 1a339644c91..30b2f9e785a 100644 --- a/src/mongo/db/repl/oplog_buffer_proxy.cpp +++ b/src/mongo/db/repl/oplog_buffer_proxy.cpp @@ -51,8 +51,8 @@ void OplogBufferProxy::startup(OperationContext* opCtx) { void OplogBufferProxy::shutdown(OperationContext* opCtx) { { - stdx::lock_guard<stdx::mutex> backLock(_lastPushedMutex); - stdx::lock_guard<stdx::mutex> frontLock(_lastPeekedMutex); + stdx::lock_guard<Latch> backLock(_lastPushedMutex); + stdx::lock_guard<Latch> frontLock(_lastPeekedMutex); _lastPushed.reset(); _lastPeeked.reset(); } @@ -60,13 +60,13 @@ void OplogBufferProxy::shutdown(OperationContext* opCtx) { } void OplogBufferProxy::pushEvenIfFull(OperationContext* opCtx, const Value& value) { - stdx::lock_guard<stdx::mutex> lk(_lastPushedMutex); + stdx::lock_guard<Latch> lk(_lastPushedMutex); _lastPushed = value; _target->pushEvenIfFull(opCtx, value); } void OplogBufferProxy::push(OperationContext* opCtx, const Value& value) { - stdx::lock_guard<stdx::mutex> lk(_lastPushedMutex); + stdx::lock_guard<Latch> lk(_lastPushedMutex); _lastPushed = value; _target->push(opCtx, value); } @@ -77,7 +77,7 @@ void OplogBufferProxy::pushAllNonBlocking(OperationContext* opCtx, if (begin == end) { return; } - stdx::lock_guard<stdx::mutex> lk(_lastPushedMutex); + stdx::lock_guard<Latch> lk(_lastPushedMutex); _lastPushed = *(end - 1); _target->pushAllNonBlocking(opCtx, begin, end); } @@ -103,16 +103,16 @@ std::size_t OplogBufferProxy::getCount() const { } void OplogBufferProxy::clear(OperationContext* opCtx) { - stdx::lock_guard<stdx::mutex> backLock(_lastPushedMutex); - stdx::lock_guard<stdx::mutex> frontLock(_lastPeekedMutex); + stdx::lock_guard<Latch> backLock(_lastPushedMutex); + stdx::lock_guard<Latch> frontLock(_lastPeekedMutex); _lastPushed.reset(); _lastPeeked.reset(); _target->clear(opCtx); } bool OplogBufferProxy::tryPop(OperationContext* opCtx, Value* value) { - stdx::lock_guard<stdx::mutex> backLock(_lastPushedMutex); - stdx::lock_guard<stdx::mutex> frontLock(_lastPeekedMutex); + stdx::lock_guard<Latch> backLock(_lastPushedMutex); + stdx::lock_guard<Latch> frontLock(_lastPeekedMutex); if (!_target->tryPop(opCtx, value)) { return false; } @@ -126,7 +126,7 @@ bool OplogBufferProxy::tryPop(OperationContext* opCtx, Value* value) { bool OplogBufferProxy::waitForData(Seconds waitDuration) { { - stdx::unique_lock<stdx::mutex> lk(_lastPushedMutex); + stdx::unique_lock<Latch> lk(_lastPushedMutex); if (_lastPushed) { return true; } @@ -135,7 +135,7 @@ bool OplogBufferProxy::waitForData(Seconds waitDuration) { } bool OplogBufferProxy::peek(OperationContext* opCtx, Value* value) { - stdx::lock_guard<stdx::mutex> lk(_lastPeekedMutex); + stdx::lock_guard<Latch> lk(_lastPeekedMutex); if (_lastPeeked) { *value = *_lastPeeked; return true; @@ -149,7 +149,7 @@ bool OplogBufferProxy::peek(OperationContext* opCtx, Value* value) { boost::optional<OplogBuffer::Value> OplogBufferProxy::lastObjectPushed( OperationContext* opCtx) const { - stdx::lock_guard<stdx::mutex> lk(_lastPushedMutex); + stdx::lock_guard<Latch> lk(_lastPushedMutex); if (!_lastPushed) { return boost::none; } @@ -157,7 +157,7 @@ boost::optional<OplogBuffer::Value> OplogBufferProxy::lastObjectPushed( } boost::optional<OplogBuffer::Value> OplogBufferProxy::getLastPeeked_forTest() const { - stdx::lock_guard<stdx::mutex> lk(_lastPeekedMutex); + stdx::lock_guard<Latch> lk(_lastPeekedMutex); return _lastPeeked; } diff --git a/src/mongo/db/repl/oplog_buffer_proxy.h b/src/mongo/db/repl/oplog_buffer_proxy.h index 544b5b6739f..7ef7537225b 100644 --- a/src/mongo/db/repl/oplog_buffer_proxy.h +++ b/src/mongo/db/repl/oplog_buffer_proxy.h @@ -33,7 +33,7 @@ #include <memory> #include "mongo/db/repl/oplog_buffer.h" -#include "mongo/stdx/mutex.h" +#include "mongo/platform/mutex.h" namespace mongo { namespace repl { @@ -82,10 +82,10 @@ private: std::unique_ptr<OplogBuffer> _target; // If both mutexes have to be acquired, acquire _lastPushedMutex first. - mutable stdx::mutex _lastPushedMutex; + mutable Mutex _lastPushedMutex = MONGO_MAKE_LATCH("OplogBufferProxy::_lastPushedMutex"); boost::optional<Value> _lastPushed; - mutable stdx::mutex _lastPeekedMutex; + mutable Mutex _lastPeekedMutex = MONGO_MAKE_LATCH("OplogBufferProxy::_lastPeekedMutex"); boost::optional<Value> _lastPeeked; }; diff --git a/src/mongo/db/repl/oplog_test.cpp b/src/mongo/db/repl/oplog_test.cpp index 00f76f96c4d..0715f665f8c 100644 --- a/src/mongo/db/repl/oplog_test.cpp +++ b/src/mongo/db/repl/oplog_test.cpp @@ -44,8 +44,8 @@ #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/service_context_d_test_fixture.h" +#include "mongo/platform/mutex.h" #include "mongo/stdx/functional.h" -#include "mongo/stdx/mutex.h" #include "mongo/unittest/barrier.h" #include "mongo/util/concurrency/thread_pool.h" @@ -171,7 +171,7 @@ void _testConcurrentLogOp(const F& makeTaskFunction, // Run 2 concurrent logOp() requests using the thread pool. // Use a barrier with a thread count of 3 to ensure both logOp() tasks are complete before this // test thread can proceed with shutting the thread pool down. - stdx::mutex mtx; + auto mtx = MONGO_MAKE_LATCH(); unittest::Barrier barrier(3U); const NamespaceString nss1("test1.coll"); const NamespaceString nss2("test2.coll"); @@ -206,7 +206,7 @@ void _testConcurrentLogOp(const F& makeTaskFunction, std::reverse(oplogEntries->begin(), oplogEntries->end()); // Look up namespaces and their respective optimes (returned by logOp()) in the map. - stdx::lock_guard<stdx::mutex> lock(mtx); + stdx::lock_guard<Latch> lock(mtx); ASSERT_EQUALS(2U, opTimeNssMap->size()); } @@ -216,10 +216,10 @@ void _testConcurrentLogOp(const F& makeTaskFunction, * Returns optime of generated oplog entry. */ OpTime _logOpNoopWithMsg(OperationContext* opCtx, - stdx::mutex* mtx, + Mutex* mtx, OpTimeNamespaceStringMap* opTimeNssMap, const NamespaceString& nss) { - stdx::lock_guard<stdx::mutex> lock(*mtx); + stdx::lock_guard<Latch> lock(*mtx); // logOp() must be called while holding lock because ephemeralForTest storage engine does not // support concurrent updates to its internal state. @@ -252,7 +252,7 @@ TEST_F(OplogTest, ConcurrentLogOpWithoutDocLockingSupport) { _testConcurrentLogOp( [](const NamespaceString& nss, - stdx::mutex* mtx, + Mutex* mtx, OpTimeNamespaceStringMap* opTimeNssMap, unittest::Barrier* barrier) { return [=] { @@ -285,7 +285,7 @@ TEST_F(OplogTest, ConcurrentLogOpWithDocLockingSupport) { ForceSupportsDocLocking support(true); _testConcurrentLogOp( [](const NamespaceString& nss, - stdx::mutex* mtx, + Mutex* mtx, OpTimeNamespaceStringMap* opTimeNssMap, unittest::Barrier* barrier) { return [=] { @@ -317,7 +317,7 @@ TEST_F(OplogTest, ConcurrentLogOpWithDocLockingSupportRevertFirstOplogEntry) { ForceSupportsDocLocking support(true); _testConcurrentLogOp( [](const NamespaceString& nss, - stdx::mutex* mtx, + Mutex* mtx, OpTimeNamespaceStringMap* opTimeNssMap, unittest::Barrier* barrier) { return [=] { @@ -335,7 +335,7 @@ TEST_F(OplogTest, ConcurrentLogOpWithDocLockingSupportRevertFirstOplogEntry) { // Revert the first logOp() call and confirm that there are no holes in the // oplog after committing the oplog entry with the more recent optime. { - stdx::lock_guard<stdx::mutex> lock(*mtx); + stdx::lock_guard<Latch> lock(*mtx); auto firstOpTimeAndNss = *(opTimeNssMap->cbegin()); if (opTime == firstOpTimeAndNss.first) { ASSERT_EQUALS(nss, firstOpTimeAndNss.second) @@ -364,7 +364,7 @@ TEST_F(OplogTest, ConcurrentLogOpWithDocLockingSupportRevertLastOplogEntry) { ForceSupportsDocLocking support(true); _testConcurrentLogOp( [](const NamespaceString& nss, - stdx::mutex* mtx, + Mutex* mtx, OpTimeNamespaceStringMap* opTimeNssMap, unittest::Barrier* barrier) { return [=] { @@ -382,7 +382,7 @@ TEST_F(OplogTest, ConcurrentLogOpWithDocLockingSupportRevertLastOplogEntry) { // Revert the last logOp() call and confirm that there are no holes in the // oplog after committing the oplog entry with the earlier optime. { - stdx::lock_guard<stdx::mutex> lock(*mtx); + stdx::lock_guard<Latch> lock(*mtx); auto lastOpTimeAndNss = *(opTimeNssMap->crbegin()); if (opTime == lastOpTimeAndNss.first) { ASSERT_EQUALS(nss, lastOpTimeAndNss.second) diff --git a/src/mongo/db/repl/replication_consistency_markers_mock.cpp b/src/mongo/db/repl/replication_consistency_markers_mock.cpp index 61f46bf0bef..5c698190445 100644 --- a/src/mongo/db/repl/replication_consistency_markers_mock.cpp +++ b/src/mongo/db/repl/replication_consistency_markers_mock.cpp @@ -36,12 +36,12 @@ namespace repl { void ReplicationConsistencyMarkersMock::initializeMinValidDocument(OperationContext* opCtx) { { - stdx::lock_guard<stdx::mutex> lock(_initialSyncFlagMutex); + stdx::lock_guard<Latch> lock(_initialSyncFlagMutex); _initialSyncFlag = false; } { - stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex); + stdx::lock_guard<Latch> lock(_minValidBoundariesMutex); _minValid = {}; _oplogTruncateAfterPoint = {}; _appliedThrough = {}; @@ -49,64 +49,64 @@ void ReplicationConsistencyMarkersMock::initializeMinValidDocument(OperationCont } bool ReplicationConsistencyMarkersMock::getInitialSyncFlag(OperationContext* opCtx) const { - stdx::lock_guard<stdx::mutex> lock(_initialSyncFlagMutex); + stdx::lock_guard<Latch> lock(_initialSyncFlagMutex); return _initialSyncFlag; } void ReplicationConsistencyMarkersMock::setInitialSyncFlag(OperationContext* opCtx) { - stdx::lock_guard<stdx::mutex> lock(_initialSyncFlagMutex); + stdx::lock_guard<Latch> lock(_initialSyncFlagMutex); _initialSyncFlag = true; } void ReplicationConsistencyMarkersMock::clearInitialSyncFlag(OperationContext* opCtx) { - stdx::lock_guard<stdx::mutex> lock(_initialSyncFlagMutex); + stdx::lock_guard<Latch> lock(_initialSyncFlagMutex); _initialSyncFlag = false; } OpTime ReplicationConsistencyMarkersMock::getMinValid(OperationContext* opCtx) const { - stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex); + stdx::lock_guard<Latch> lock(_minValidBoundariesMutex); return _minValid; } void ReplicationConsistencyMarkersMock::setMinValid(OperationContext* opCtx, const OpTime& minValid) { - stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex); + stdx::lock_guard<Latch> lock(_minValidBoundariesMutex); _minValid = minValid; } void ReplicationConsistencyMarkersMock::setMinValidToAtLeast(OperationContext* opCtx, const OpTime& minValid) { - stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex); + stdx::lock_guard<Latch> lock(_minValidBoundariesMutex); _minValid = std::max(_minValid, minValid); } void ReplicationConsistencyMarkersMock::setOplogTruncateAfterPoint(OperationContext* opCtx, const Timestamp& timestamp) { - stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex); + stdx::lock_guard<Latch> lock(_minValidBoundariesMutex); _oplogTruncateAfterPoint = timestamp; } Timestamp ReplicationConsistencyMarkersMock::getOplogTruncateAfterPoint( OperationContext* opCtx) const { - stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex); + stdx::lock_guard<Latch> lock(_minValidBoundariesMutex); return _oplogTruncateAfterPoint; } void ReplicationConsistencyMarkersMock::setAppliedThrough(OperationContext* opCtx, const OpTime& optime, bool setTimestamp) { - stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex); + stdx::lock_guard<Latch> lock(_minValidBoundariesMutex); _appliedThrough = optime; } void ReplicationConsistencyMarkersMock::clearAppliedThrough(OperationContext* opCtx, const Timestamp& writeTimestamp) { - stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex); + stdx::lock_guard<Latch> lock(_minValidBoundariesMutex); _appliedThrough = {}; } OpTime ReplicationConsistencyMarkersMock::getAppliedThrough(OperationContext* opCtx) const { - stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex); + stdx::lock_guard<Latch> lock(_minValidBoundariesMutex); return _appliedThrough; } diff --git a/src/mongo/db/repl/replication_consistency_markers_mock.h b/src/mongo/db/repl/replication_consistency_markers_mock.h index 3215264110f..3fe3c2670f5 100644 --- a/src/mongo/db/repl/replication_consistency_markers_mock.h +++ b/src/mongo/db/repl/replication_consistency_markers_mock.h @@ -31,7 +31,7 @@ #include "mongo/db/repl/optime.h" #include "mongo/db/repl/replication_consistency_markers.h" -#include "mongo/stdx/mutex.h" +#include "mongo/platform/mutex.h" namespace mongo { @@ -73,10 +73,12 @@ public: Status createInternalCollections(OperationContext* opCtx) override; private: - mutable stdx::mutex _initialSyncFlagMutex; + mutable Mutex _initialSyncFlagMutex = + MONGO_MAKE_LATCH("ReplicationConsistencyMarkersMock::_initialSyncFlagMutex"); bool _initialSyncFlag = false; - mutable stdx::mutex _minValidBoundariesMutex; + mutable Mutex _minValidBoundariesMutex = + MONGO_MAKE_LATCH("ReplicationConsistencyMarkersMock::_minValidBoundariesMutex"); OpTime _appliedThrough; OpTime _minValid; Timestamp _oplogTruncateAfterPoint; diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 54cf29caf20..6abca36f5d6 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -205,7 +205,7 @@ bool ReplicationCoordinatorExternalStateImpl::isInitialSyncFlagSet(OperationCont void ReplicationCoordinatorExternalStateImpl::startSteadyStateReplication( OperationContext* opCtx, ReplicationCoordinator* replCoord) { - stdx::lock_guard<stdx::mutex> lk(_threadMutex); + stdx::lock_guard<Latch> lk(_threadMutex); // We've shut down the external state, don't start again. if (_inShutdown) @@ -256,12 +256,12 @@ void ReplicationCoordinatorExternalStateImpl::startSteadyStateReplication( } void ReplicationCoordinatorExternalStateImpl::stopDataReplication(OperationContext* opCtx) { - stdx::unique_lock<stdx::mutex> lk(_threadMutex); + stdx::unique_lock<Latch> lk(_threadMutex); _stopDataReplication_inlock(opCtx, lk); } void ReplicationCoordinatorExternalStateImpl::_stopDataReplication_inlock( - OperationContext* opCtx, stdx::unique_lock<stdx::mutex>& lock) { + OperationContext* opCtx, stdx::unique_lock<Latch>& lock) { // Make sue no other _stopDataReplication calls are in progress. _dataReplicationStopped.wait(lock, [this]() { return !_stoppingDataReplication; }); _stoppingDataReplication = true; @@ -316,7 +316,7 @@ void ReplicationCoordinatorExternalStateImpl::_stopDataReplication_inlock( void ReplicationCoordinatorExternalStateImpl::startThreads(const ReplSettings& settings) { - stdx::lock_guard<stdx::mutex> lk(_threadMutex); + stdx::lock_guard<Latch> lk(_threadMutex); if (_startedThreads) { return; } @@ -339,7 +339,7 @@ void ReplicationCoordinatorExternalStateImpl::startThreads(const ReplSettings& s } void ReplicationCoordinatorExternalStateImpl::shutdown(OperationContext* opCtx) { - stdx::unique_lock<stdx::mutex> lk(_threadMutex); + stdx::unique_lock<Latch> lk(_threadMutex); _inShutdown = true; if (!_startedThreads) { return; @@ -826,21 +826,21 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook } void ReplicationCoordinatorExternalStateImpl::signalApplierToChooseNewSyncSource() { - stdx::lock_guard<stdx::mutex> lk(_threadMutex); + stdx::lock_guard<Latch> lk(_threadMutex); if (_bgSync) { _bgSync->clearSyncTarget(); } } void ReplicationCoordinatorExternalStateImpl::stopProducer() { - stdx::lock_guard<stdx::mutex> lk(_threadMutex); + stdx::lock_guard<Latch> lk(_threadMutex); if (_bgSync) { _bgSync->stop(false); } } void ReplicationCoordinatorExternalStateImpl::startProducerIfStopped() { - stdx::lock_guard<stdx::mutex> lk(_threadMutex); + stdx::lock_guard<Latch> lk(_threadMutex); if (_bgSync) { _bgSync->startProducerIfStopped(); } diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h index caacf96c068..8b984b454e5 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h @@ -39,7 +39,7 @@ #include "mongo/db/repl/task_runner.h" #include "mongo/db/storage/journal_listener.h" #include "mongo/db/storage/snapshot_manager.h" -#include "mongo/stdx/mutex.h" +#include "mongo/platform/mutex.h" #include "mongo/util/concurrency/thread_pool.h" namespace mongo { @@ -120,7 +120,7 @@ private: /** * Stops data replication and returns with 'lock' locked. */ - void _stopDataReplication_inlock(OperationContext* opCtx, stdx::unique_lock<stdx::mutex>& lock); + void _stopDataReplication_inlock(OperationContext* opCtx, stdx::unique_lock<Latch>& lock); /** * Called when the instance transitions to primary in order to notify a potentially sharded host @@ -141,7 +141,7 @@ private: ServiceContext* _service; // Guards starting threads and setting _startedThreads - stdx::mutex _threadMutex; + Mutex _threadMutex = MONGO_MAKE_LATCH("ReplicationCoordinatorExternalStateImpl::_threadMutex"); // Flag for guarding against concurrent data replication stopping. bool _stoppingDataReplication = false; @@ -187,7 +187,8 @@ private: Future<void> _oplogApplierShutdownFuture; // Mutex guarding the _nextThreadId value to prevent concurrent incrementing. - stdx::mutex _nextThreadIdMutex; + Mutex _nextThreadIdMutex = + MONGO_MAKE_LATCH("ReplicationCoordinatorExternalStateImpl::_nextThreadIdMutex"); // Number used to uniquely name threads. long long _nextThreadId = 0; diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp index d7cb605e61a..c742ac33db2 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp @@ -145,7 +145,7 @@ StatusWith<LastVote> ReplicationCoordinatorExternalStateMock::loadLocalLastVoteD Status ReplicationCoordinatorExternalStateMock::storeLocalLastVoteDocument( OperationContext* opCtx, const LastVote& lastVote) { { - stdx::unique_lock<stdx::mutex> lock(_shouldHangLastVoteMutex); + stdx::unique_lock<Latch> lock(_shouldHangLastVoteMutex); while (_storeLocalLastVoteDocumentShouldHang) { _shouldHangLastVoteCondVar.wait(lock); } @@ -210,7 +210,7 @@ void ReplicationCoordinatorExternalStateMock::setStoreLocalLastVoteDocumentStatu } void ReplicationCoordinatorExternalStateMock::setStoreLocalLastVoteDocumentToHang(bool hang) { - stdx::unique_lock<stdx::mutex> lock(_shouldHangLastVoteMutex); + stdx::unique_lock<Latch> lock(_shouldHangLastVoteMutex); _storeLocalLastVoteDocumentShouldHang = hang; if (!hang) { _shouldHangLastVoteCondVar.notify_all(); diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h index 772cab29b66..8676aaa8c14 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h @@ -37,8 +37,8 @@ #include "mongo/db/jsobj.h" #include "mongo/db/repl/last_vote.h" #include "mongo/db/repl/replication_coordinator_external_state.h" +#include "mongo/platform/mutex.h" #include "mongo/stdx/condition_variable.h" -#include "mongo/stdx/mutex.h" #include "mongo/stdx/thread.h" #include "mongo/util/net/hostandport.h" @@ -196,7 +196,8 @@ private: Status _storeLocalConfigDocumentStatus; Status _storeLocalLastVoteDocumentStatus; // mutex and cond var for controlling stroeLocalLastVoteDocument()'s hanging - stdx::mutex _shouldHangLastVoteMutex; + Mutex _shouldHangLastVoteMutex = + MONGO_MAKE_LATCH("ReplicationCoordinatorExternalStateMock::_shouldHangLastVoteMutex"); stdx::condition_variable _shouldHangLastVoteCondVar; bool _storeLocalLastVoteDocumentShouldHang; bool _connectionsClosed; diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 7354e5e5ede..060e425238b 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -81,10 +81,10 @@ #include "mongo/db/write_concern_options.h" #include "mongo/executor/connection_pool_stats.h" #include "mongo/executor/network_interface.h" +#include "mongo/platform/mutex.h" #include "mongo/rpc/metadata/oplog_query_metadata.h" #include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/stdx/functional.h" -#include "mongo/stdx/mutex.h" #include "mongo/util/assert_util.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" @@ -149,7 +149,7 @@ private: const bool _initialState; }; -void lockAndCall(stdx::unique_lock<stdx::mutex>* lk, const stdx::function<void()>& fn) { +void lockAndCall(stdx::unique_lock<Latch>* lk, const stdx::function<void()>& fn) { if (!lk->owns_lock()) { lk->lock(); } @@ -228,7 +228,7 @@ public: * _list is guarded by ReplicationCoordinatorImpl::_mutex, thus it is illegal to construct one * of these without holding _mutex */ - WaiterGuard(const stdx::unique_lock<stdx::mutex>& lock, WaiterList* list, Waiter* waiter) + WaiterGuard(const stdx::unique_lock<Latch>& lock, WaiterList* list, Waiter* waiter) : _lock(lock), _list(list), _waiter(waiter) { invariant(_lock.owns_lock()); list->add_inlock(_waiter); @@ -240,7 +240,7 @@ public: } private: - const stdx::unique_lock<stdx::mutex>& _lock; + const stdx::unique_lock<Latch>& _lock; WaiterList* _list; Waiter* _waiter; }; @@ -370,7 +370,7 @@ void ReplicationCoordinatorImpl::waitForStartUpComplete_forTest() { void ReplicationCoordinatorImpl::_waitForStartUpComplete() { CallbackHandle handle; { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); while (_rsConfigState == kConfigPreStart || _rsConfigState == kConfigStartingUp) { _rsConfigStateChange.wait(lk); } @@ -382,12 +382,12 @@ void ReplicationCoordinatorImpl::_waitForStartUpComplete() { } ReplSetConfig ReplicationCoordinatorImpl::getReplicaSetConfig_forTest() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _rsConfig; } Date_t ReplicationCoordinatorImpl::getElectionTimeout_forTest() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); if (!_handleElectionTimeoutCbh.isValid()) { return Date_t(); } @@ -395,12 +395,12 @@ Date_t ReplicationCoordinatorImpl::getElectionTimeout_forTest() const { } Milliseconds ReplicationCoordinatorImpl::getRandomizedElectionOffset_forTest() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _getRandomizedElectionOffset_inlock(); } boost::optional<Date_t> ReplicationCoordinatorImpl::getPriorityTakeover_forTest() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); if (!_priorityTakeoverCbh.isValid()) { return boost::none; } @@ -408,7 +408,7 @@ boost::optional<Date_t> ReplicationCoordinatorImpl::getPriorityTakeover_forTest( } boost::optional<Date_t> ReplicationCoordinatorImpl::getCatchupTakeover_forTest() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); if (!_catchupTakeoverCbh.isValid()) { return boost::none; } @@ -421,12 +421,12 @@ executor::TaskExecutor::CallbackHandle ReplicationCoordinatorImpl::getCatchupTak } OpTime ReplicationCoordinatorImpl::getCurrentCommittedSnapshotOpTime() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _getCurrentCommittedSnapshotOpTime_inlock(); } OpTimeAndWallTime ReplicationCoordinatorImpl::getCurrentCommittedSnapshotOpTimeAndWallTime() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _getCurrentCommittedSnapshotOpTimeAndWallTime_inlock(); } @@ -477,7 +477,7 @@ bool ReplicationCoordinatorImpl::_startLoadLocalConfig(OperationContext* opCtx) log() << "Did not find local initialized voted for document at startup."; } { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _topCoord->loadLastVote(lastVote.getValue()); } @@ -538,7 +538,7 @@ bool ReplicationCoordinatorImpl::_startLoadLocalConfig(OperationContext* opCtx) handle = CallbackHandle{}; } fassert(40446, handle); - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _finishLoadLocalConfigCbh = std::move(handle.getValue()); return false; @@ -638,7 +638,7 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig( // applied optime is never greater than the latest cluster time in the logical clock. _externalState->setGlobalTimestamp(getServiceContext(), lastOpTime.getTimestamp()); - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); invariant(_rsConfigState == kConfigStartingUp); const PostMemberStateUpdateAction action = _setCurrentRSConfig(lock, opCtx.get(), localConfig, myIndex.getValue()); @@ -655,7 +655,7 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig( } { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); // Step down is impossible, so we don't need to wait for the returned event. _updateTerm_inlock(term); } @@ -671,7 +671,7 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig( void ReplicationCoordinatorImpl::_stopDataReplication(OperationContext* opCtx) { std::shared_ptr<InitialSyncer> initialSyncerCopy; { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _initialSyncer.swap(initialSyncerCopy); } if (initialSyncerCopy) { @@ -713,7 +713,7 @@ void ReplicationCoordinatorImpl::_startDataReplication(OperationContext* opCtx, auto onCompletion = [this, startCompleted](const StatusWith<OpTimeAndWallTime>& opTimeStatus) { { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); if (opTimeStatus == ErrorCodes::CallbackCanceled) { log() << "Initial Sync has been cancelled: " << opTimeStatus.getStatus(); return; @@ -754,7 +754,7 @@ void ReplicationCoordinatorImpl::_startDataReplication(OperationContext* opCtx, try { { // Must take the lock to set _initialSyncer, but not call it. - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); if (_inShutdown) { log() << "Initial Sync not starting because replication is shutting down."; return; @@ -811,7 +811,7 @@ void ReplicationCoordinatorImpl::startup(OperationContext* opCtx) { storageGlobalParams.readOnly = true; } - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _setConfigState_inlock(kConfigReplicationDisabled); return; } @@ -819,7 +819,7 @@ void ReplicationCoordinatorImpl::startup(OperationContext* opCtx) { invariant(!ReplSettings::shouldRecoverFromOplogAsStandalone()); { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); fassert(18822, !_inShutdown); _setConfigState_inlock(kConfigStartingUp); _topCoord->setStorageEngineSupportsReadCommitted( @@ -835,7 +835,7 @@ void ReplicationCoordinatorImpl::startup(OperationContext* opCtx) { if (doneLoadingConfig) { // If we're not done loading the config, then the config state will be set by // _finishLoadLocalConfig. - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); invariant(!_rsConfig.isInitialized()); _setConfigState_inlock(kConfigUninitialized); } @@ -861,7 +861,7 @@ void ReplicationCoordinatorImpl::shutdown(OperationContext* opCtx) { // Used to shut down outside of the lock. std::shared_ptr<InitialSyncer> initialSyncerCopy; { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); fassert(28533, !_inShutdown); _inShutdown = true; if (_rsConfigState == kConfigPreStart) { @@ -909,12 +909,12 @@ ReplicationCoordinator::Mode ReplicationCoordinatorImpl::getReplicationMode() co } MemberState ReplicationCoordinatorImpl::getMemberState() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _getMemberState_inlock(); } std::vector<MemberData> ReplicationCoordinatorImpl::getMemberData() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _topCoord->getMemberData(); } @@ -928,7 +928,7 @@ Status ReplicationCoordinatorImpl::waitForMemberState(MemberState expectedState, return Status(ErrorCodes::BadValue, "Timeout duration cannot be negative"); } - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); auto pred = [this, expectedState]() { return _memberState == expectedState; }; if (!_memberStateChange.wait_for(lk, timeout.toSystemDuration(), pred)) { return Status(ErrorCodes::ExceededTimeLimit, @@ -940,7 +940,7 @@ Status ReplicationCoordinatorImpl::waitForMemberState(MemberState expectedState, } Seconds ReplicationCoordinatorImpl::getSlaveDelaySecs() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); invariant(_rsConfig.isInitialized()); if (_selfIndex == -1) { // We aren't currently in the set. Return 0 seconds so we can clear out the applier's @@ -951,7 +951,7 @@ Seconds ReplicationCoordinatorImpl::getSlaveDelaySecs() const { } void ReplicationCoordinatorImpl::clearSyncSourceBlacklist() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _topCoord->clearSyncSourceBlacklist(); } @@ -968,7 +968,7 @@ Status ReplicationCoordinatorImpl::setFollowerMode(const MemberState& newState) Status ReplicationCoordinatorImpl::_setFollowerMode(OperationContext* opCtx, const MemberState& newState) { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); if (newState == _topCoord->getMemberState()) { return Status::OK(); } @@ -999,7 +999,7 @@ Status ReplicationCoordinatorImpl::_setFollowerMode(OperationContext* opCtx, } ReplicationCoordinator::ApplierState ReplicationCoordinatorImpl::getApplierState() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _applierState; } @@ -1031,7 +1031,7 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* opCtx, // When we go to drop all temp collections, we must replicate the drops. invariant(opCtx->writesAreReplicated()); - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); if (_applierState != ApplierState::Draining) { return; } @@ -1092,7 +1092,7 @@ Status ReplicationCoordinatorImpl::waitForDrainFinish(Milliseconds timeout) { return Status(ErrorCodes::BadValue, "Timeout duration cannot be negative"); } - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); auto pred = [this]() { return _applierState != ApplierState::Draining; }; if (!_drainFinishedCond.wait_for(lk, timeout.toSystemDuration(), pred)) { return Status(ErrorCodes::ExceededTimeLimit, @@ -1107,7 +1107,7 @@ void ReplicationCoordinatorImpl::signalUpstreamUpdater() { } void ReplicationCoordinatorImpl::setMyHeartbeatMessage(const std::string& msg) { - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); _topCoord->setMyHeartbeatMessage(_replExecutor->now(), msg); } @@ -1118,7 +1118,7 @@ void ReplicationCoordinatorImpl::setMyLastAppliedOpTimeAndWallTimeForward( const auto opTime = opTimeAndWallTime.opTime; _externalState->setGlobalTimestamp(getServiceContext(), opTime.getTimestamp()); - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); auto myLastAppliedOpTime = _getMyLastAppliedOpTime_inlock(); if (opTime > myLastAppliedOpTime) { _setMyLastAppliedOpTimeAndWallTime(lock, opTimeAndWallTime, false, consistency); @@ -1144,7 +1144,7 @@ void ReplicationCoordinatorImpl::setMyLastAppliedOpTimeAndWallTimeForward( void ReplicationCoordinatorImpl::setMyLastDurableOpTimeAndWallTimeForward( const OpTimeAndWallTime& opTimeAndWallTime) { - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); if (opTimeAndWallTime.opTime > _getMyLastDurableOpTime_inlock()) { _setMyLastDurableOpTimeAndWallTime(lock, opTimeAndWallTime, false); _reportUpstream_inlock(std::move(lock)); @@ -1158,7 +1158,7 @@ void ReplicationCoordinatorImpl::setMyLastAppliedOpTimeAndWallTime( // applied optime is never greater than the latest cluster time in the logical clock. _externalState->setGlobalTimestamp(getServiceContext(), opTime.getTimestamp()); - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); // The optime passed to this function is required to represent a consistent database state. _setMyLastAppliedOpTimeAndWallTime(lock, opTimeAndWallTime, false, DataConsistency::Consistent); _reportUpstream_inlock(std::move(lock)); @@ -1166,13 +1166,13 @@ void ReplicationCoordinatorImpl::setMyLastAppliedOpTimeAndWallTime( void ReplicationCoordinatorImpl::setMyLastDurableOpTimeAndWallTime( const OpTimeAndWallTime& opTimeAndWallTime) { - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); _setMyLastDurableOpTimeAndWallTime(lock, opTimeAndWallTime, false); _reportUpstream_inlock(std::move(lock)); } void ReplicationCoordinatorImpl::resetMyLastOpTimes() { - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); _resetMyLastOpTimes(lock); _reportUpstream_inlock(std::move(lock)); } @@ -1187,7 +1187,7 @@ void ReplicationCoordinatorImpl::_resetMyLastOpTimes(WithLock lk) { _stableOpTimeCandidates.clear(); } -void ReplicationCoordinatorImpl::_reportUpstream_inlock(stdx::unique_lock<stdx::mutex> lock) { +void ReplicationCoordinatorImpl::_reportUpstream_inlock(stdx::unique_lock<Latch> lock) { invariant(lock.owns_lock()); if (getReplicationMode() != modeReplSet) { @@ -1274,22 +1274,22 @@ void ReplicationCoordinatorImpl::_setMyLastDurableOpTimeAndWallTime( } OpTime ReplicationCoordinatorImpl::getMyLastAppliedOpTime() const { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _getMyLastAppliedOpTime_inlock(); } OpTimeAndWallTime ReplicationCoordinatorImpl::getMyLastAppliedOpTimeAndWallTime() const { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _getMyLastAppliedOpTimeAndWallTime_inlock(); } OpTimeAndWallTime ReplicationCoordinatorImpl::getMyLastDurableOpTimeAndWallTime() const { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _getMyLastDurableOpTimeAndWallTime_inlock(); } OpTime ReplicationCoordinatorImpl::getMyLastDurableOpTime() const { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _getMyLastDurableOpTime_inlock(); } @@ -1396,7 +1396,7 @@ Status ReplicationCoordinatorImpl::_waitUntilOpTime(OperationContext* opCtx, } } - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); if (isMajorityCommittedRead && !_externalState->snapshotsEnabled()) { return {ErrorCodes::CommandNotSupported, @@ -1563,7 +1563,7 @@ Status ReplicationCoordinatorImpl::setLastDurableOptime_forTest(long long cfgVer long long memberId, const OpTime& opTime, Date_t wallTime) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); invariant(getReplicationMode() == modeReplSet); if (wallTime == Date_t()) { @@ -1582,7 +1582,7 @@ Status ReplicationCoordinatorImpl::setLastAppliedOptime_forTest(long long cfgVer long long memberId, const OpTime& opTime, Date_t wallTime) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); invariant(getReplicationMode() == modeReplSet); if (wallTime == Date_t()) { @@ -1682,7 +1682,7 @@ ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorImpl::awaitRepli OperationContext* opCtx, const OpTime& opTime, const WriteConcernOptions& writeConcern) { Timer timer; WriteConcernOptions fixedWriteConcern = populateUnsetWriteConcernOptionsSyncMode(writeConcern); - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); auto status = _awaitReplication_inlock(&lock, opCtx, opTime, fixedWriteConcern); return {std::move(status), duration_cast<Milliseconds>(timer.elapsed())}; } @@ -1705,7 +1705,7 @@ BSONObj ReplicationCoordinatorImpl::_getReplicationProgress(WithLock wl) const { return progress.obj(); } Status ReplicationCoordinatorImpl::_awaitReplication_inlock( - stdx::unique_lock<stdx::mutex>* lock, + stdx::unique_lock<Latch>* lock, OperationContext* opCtx, const OpTime& opTime, const WriteConcernOptions& writeConcern) { @@ -1825,7 +1825,7 @@ Status ReplicationCoordinatorImpl::_awaitReplication_inlock( void ReplicationCoordinatorImpl::waitForStepDownAttempt_forTest() { auto isSteppingDown = [&]() { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); // If true, we know that a stepdown is underway. return (_topCoord->isSteppingDown()); }; @@ -1924,7 +1924,7 @@ void ReplicationCoordinatorImpl::AutoGetRstlForStepUpStepDown::_killOpThreadFn() // X mode for the first time. This ensures that no writing operations will continue // after the node's term change. { - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); if (_stopKillingOps.wait_for( lock, Milliseconds(10).toSystemDuration(), [this] { return _killSignaled; })) { log() << "Stopped killing user operations"; @@ -1940,7 +1940,7 @@ void ReplicationCoordinatorImpl::AutoGetRstlForStepUpStepDown::_stopAndWaitForKi return; { - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); _killSignaled = true; _stopKillingOps.notify_all(); } @@ -2000,7 +2000,7 @@ void ReplicationCoordinatorImpl::stepDown(OperationContext* opCtx, auto deadline = force ? stepDownUntil : waitUntil; AutoGetRstlForStepUpStepDown arsd(this, opCtx, deadline); - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); opCtx->checkForInterrupt(); @@ -2033,7 +2033,7 @@ void ReplicationCoordinatorImpl::stepDown(OperationContext* opCtx, while (MONGO_FAIL_POINT(stepdownHangBeforePerformingPostMemberStateUpdateActions)) { mongo::sleepsecs(1); { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); if (_inShutdown) { break; } @@ -2139,7 +2139,7 @@ void ReplicationCoordinatorImpl::stepDown(OperationContext* opCtx, } void ReplicationCoordinatorImpl::_performElectionHandoff() { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); auto candidateIndex = _topCoord->chooseElectionHandoffCandidate(); if (candidateIndex < 0) { @@ -2187,7 +2187,7 @@ bool ReplicationCoordinatorImpl::isMasterForReportingPurposes() { return true; } - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); invariant(getReplicationMode() == modeReplSet); return _getMemberState_inlock().primary(); } @@ -2216,7 +2216,7 @@ bool ReplicationCoordinatorImpl::canAcceptWritesForDatabase_UNSAFE(OperationCont } bool ReplicationCoordinatorImpl::canAcceptNonLocalWrites() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _readWriteAbility->canAcceptNonLocalWrites(lk); } @@ -2248,7 +2248,7 @@ bool ReplicationCoordinatorImpl::canAcceptWritesFor_UNSAFE(OperationContext* opC return true; } - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); if (_memberState.rollback()) { return false; } @@ -2276,7 +2276,7 @@ Status ReplicationCoordinatorImpl::checkCanServeReadsFor_UNSAFE(OperationContext // Oplog reads are not allowed during STARTUP state, but we make an exception for internal // reads. Internal reads are required for cleaning up unfinished apply batches. if (!isPrimaryOrSecondary && getReplicationMode() == modeReplSet && ns.isOplog()) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); if ((_memberState.startup() && client->isFromUserConnection()) || _memberState.startup2() || _memberState.rollback()) { return Status{ErrorCodes::NotMasterOrSecondary, @@ -2320,17 +2320,17 @@ bool ReplicationCoordinatorImpl::shouldRelaxIndexConstraints(OperationContext* o } OID ReplicationCoordinatorImpl::getElectionId() { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _electionId; } int ReplicationCoordinatorImpl::getMyId() const { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _getMyId_inlock(); } HostAndPort ReplicationCoordinatorImpl::getMyHostAndPort() const { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _rsConfig.getMemberAt(_selfIndex).getHostAndPort(); } @@ -2347,7 +2347,7 @@ Status ReplicationCoordinatorImpl::resyncData(OperationContext* opCtx, bool wait f = [&finishedEvent, this]() { _replExecutor->signalEvent(finishedEvent); }; { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _resetMyLastOpTimes(lk); } // unlock before calling _startDataReplication(). @@ -2359,7 +2359,7 @@ Status ReplicationCoordinatorImpl::resyncData(OperationContext* opCtx, bool wait } StatusWith<BSONObj> ReplicationCoordinatorImpl::prepareReplSetUpdatePositionCommand() const { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _topCoord->prepareReplSetUpdatePositionCommand( _getCurrentCommittedSnapshotOpTime_inlock()); } @@ -2371,7 +2371,7 @@ Status ReplicationCoordinatorImpl::processReplSetGetStatus( if (responseStyle == ReplSetGetStatusResponseStyle::kInitialSync) { std::shared_ptr<InitialSyncer> initialSyncerCopy; { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); initialSyncerCopy = _initialSyncer; } @@ -2388,7 +2388,7 @@ Status ReplicationCoordinatorImpl::processReplSetGetStatus( BSONObj electionParticipantMetrics = ReplicationMetrics::get(getServiceContext()).getElectionParticipantMetricsBSON(); - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); Status result(ErrorCodes::InternalError, "didn't set status in prepareStatusResponse"); _topCoord->prepareStatusResponse( TopologyCoordinator::ReplSetStatusArgs{ @@ -2409,7 +2409,7 @@ void ReplicationCoordinatorImpl::fillIsMasterForReplSet( IsMasterResponse* response, const SplitHorizon::Parameters& horizonParams) { invariant(getSettings().usingReplSets()); - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _topCoord->fillIsMasterForReplSet(response, horizonParams); OpTime lastOpTime = _getMyLastAppliedOpTime_inlock(); @@ -2432,17 +2432,17 @@ void ReplicationCoordinatorImpl::fillIsMasterForReplSet( } void ReplicationCoordinatorImpl::appendSlaveInfoData(BSONObjBuilder* result) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); _topCoord->fillMemberData(result); } ReplSetConfig ReplicationCoordinatorImpl::getConfig() const { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _rsConfig; } void ReplicationCoordinatorImpl::processReplSetGetConfig(BSONObjBuilder* result) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); result->append("config", _rsConfig.toBSON()); } @@ -2450,7 +2450,7 @@ void ReplicationCoordinatorImpl::processReplSetMetadata(const rpc::ReplSetMetada EventHandle evh; { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); evh = _processReplSetMetadata_inlock(replMetadata); } @@ -2460,7 +2460,7 @@ void ReplicationCoordinatorImpl::processReplSetMetadata(const rpc::ReplSetMetada } void ReplicationCoordinatorImpl::cancelAndRescheduleElectionTimeout() { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); _cancelAndRescheduleElectionTimeout_inlock(); } @@ -2473,7 +2473,7 @@ EventHandle ReplicationCoordinatorImpl::_processReplSetMetadata_inlock( } bool ReplicationCoordinatorImpl::getMaintenanceMode() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _topCoord->getMaintenanceCount() > 0; } @@ -2483,7 +2483,7 @@ Status ReplicationCoordinatorImpl::setMaintenanceMode(bool activate) { "can only set maintenance mode on replica set members"); } - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); if (_topCoord->getRole() == TopologyCoordinator::Role::kCandidate) { return Status(ErrorCodes::NotSecondary, "currently running for election"); } @@ -2522,7 +2522,7 @@ Status ReplicationCoordinatorImpl::processReplSetSyncFrom(OperationContext* opCt Status result(ErrorCodes::InternalError, "didn't set status in prepareSyncFromResponse"); auto doResync = false; { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _topCoord->prepareSyncFromResponse(target, resultObj, &result); // If we are in the middle of an initial sync, do a resync. doResync = result.isOK() && _initialSyncer && _initialSyncer->isActive(); @@ -2537,7 +2537,7 @@ Status ReplicationCoordinatorImpl::processReplSetSyncFrom(OperationContext* opCt Status ReplicationCoordinatorImpl::processReplSetFreeze(int secs, BSONObjBuilder* resultObj) { auto result = [=]() { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _topCoord->prepareFreezeResponse(_replExecutor->now(), secs, resultObj); }(); if (!result.isOK()) { @@ -2560,7 +2560,7 @@ Status ReplicationCoordinatorImpl::processReplSetReconfig(OperationContext* opCt log() << "replSetReconfig admin command received from client; new config: " << args.newConfigObj; - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); while (_rsConfigState == kConfigPreStart || _rsConfigState == kConfigStartingUp) { _rsConfigStateChange.wait(lk); @@ -2665,7 +2665,7 @@ void ReplicationCoordinatorImpl::_finishReplSetReconfig(OperationContext* opCtx, // Do not conduct an election during a reconfig, as the node may not be electable post-reconfig. executor::TaskExecutor::EventHandle electionFinishedEvent; { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); electionFinishedEvent = _cancelElectionIfNeeded_inlock(); } @@ -2680,7 +2680,7 @@ void ReplicationCoordinatorImpl::_finishReplSetReconfig(OperationContext* opCtx, } boost::optional<AutoGetRstlForStepUpStepDown> arsd; - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); if (isForceReconfig && _shouldStepDownOnReconfig(lk, newConfig, myIndex)) { _topCoord->prepareForUnconditionalStepDown(); lk.unlock(); @@ -2739,7 +2739,7 @@ Status ReplicationCoordinatorImpl::processReplSetInitiate(OperationContext* opCt log() << "replSetInitiate admin command received from client"; const auto replEnabled = _settings.usingReplSets(); - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); if (!replEnabled) { return Status(ErrorCodes::NoReplicationEnabled, "server is not running with --replSet"); } @@ -2828,7 +2828,7 @@ Status ReplicationCoordinatorImpl::processReplSetInitiate(OperationContext* opCt void ReplicationCoordinatorImpl::_finishReplSetInitiate(OperationContext* opCtx, const ReplSetConfig& newConfig, int myIndex) { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); invariant(_rsConfigState == kConfigInitiating); invariant(!_rsConfig.isInitialized()); auto action = _setCurrentRSConfig(lk, opCtx, newConfig, myIndex); @@ -3059,7 +3059,7 @@ void ReplicationCoordinatorImpl::CatchupState::start_inlock() { if (!cbData.status.isOK()) { return; } - stdx::lock_guard<stdx::mutex> lk(*mutex); + stdx::lock_guard<Latch> lk(*mutex); // Check whether the callback has been cancelled while holding mutex. if (cbData.myHandle.isCanceled()) { return; @@ -3169,7 +3169,7 @@ void ReplicationCoordinatorImpl::CatchupState::incrementNumCatchUpOps_inlock(lon } Status ReplicationCoordinatorImpl::abortCatchupIfNeeded(PrimaryCatchUpConclusionReason reason) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); if (_catchupState) { _catchupState->abort_inlock(reason); return Status::OK(); @@ -3178,14 +3178,14 @@ Status ReplicationCoordinatorImpl::abortCatchupIfNeeded(PrimaryCatchUpConclusion } void ReplicationCoordinatorImpl::incrementNumCatchUpOpsIfCatchingUp(long numOps) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); if (_catchupState) { _catchupState->incrementNumCatchUpOps_inlock(numOps); } } void ReplicationCoordinatorImpl::signalDropPendingCollectionsRemovedFromStorage() { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); _wakeReadyWaiters(lock); } @@ -3302,7 +3302,7 @@ void ReplicationCoordinatorImpl::_wakeReadyWaiters(WithLock lk) { Status ReplicationCoordinatorImpl::processReplSetUpdatePosition(const UpdatePositionArgs& updates, long long* configVersion) { - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); Status status = Status::OK(); bool somethingChanged = false; for (UpdatePositionArgs::UpdateIterator update = updates.updatesBegin(); @@ -3324,7 +3324,7 @@ Status ReplicationCoordinatorImpl::processReplSetUpdatePosition(const UpdatePosi } bool ReplicationCoordinatorImpl::buildsIndexes() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); if (_selfIndex == -1) { return true; } @@ -3334,12 +3334,12 @@ bool ReplicationCoordinatorImpl::buildsIndexes() { std::vector<HostAndPort> ReplicationCoordinatorImpl::getHostsWrittenTo(const OpTime& op, bool durablyWritten) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _topCoord->getHostsWrittenTo(op, durablyWritten); } std::vector<HostAndPort> ReplicationCoordinatorImpl::getOtherNodesInReplSet() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); invariant(_settings.usingReplSets()); std::vector<HostAndPort> nodes; @@ -3358,7 +3358,7 @@ std::vector<HostAndPort> ReplicationCoordinatorImpl::getOtherNodesInReplSet() co Status ReplicationCoordinatorImpl::checkIfWriteConcernCanBeSatisfied( const WriteConcernOptions& writeConcern) const { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _checkIfWriteConcernCanBeSatisfied_inlock(writeConcern); } @@ -3375,7 +3375,7 @@ Status ReplicationCoordinatorImpl::_checkIfWriteConcernCanBeSatisfied_inlock( Status ReplicationCoordinatorImpl::checkIfCommitQuorumCanBeSatisfied( const CommitQuorumOptions& commitQuorum) const { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _checkIfCommitQuorumCanBeSatisfied(lock, commitQuorum); } @@ -3408,7 +3408,7 @@ StatusWith<bool> ReplicationCoordinatorImpl::checkIfCommitQuorumIsSatisfied( // If the 'commitQuorum' cannot be satisfied with all the members of this replica set, we // need to inform the caller to avoid hanging while waiting for satisfiability of the // 'commitQuorum' with 'commitReadyMembers' due to replica set reconfigurations. - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); Status status = _checkIfCommitQuorumCanBeSatisfied(lock, commitQuorum); if (!status.isOK()) { return status; @@ -3419,7 +3419,7 @@ StatusWith<bool> ReplicationCoordinatorImpl::checkIfCommitQuorumIsSatisfied( } WriteConcernOptions ReplicationCoordinatorImpl::getGetLastErrorDefault() { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); if (_rsConfig.isInitialized()) { return _rsConfig.getDefaultWriteConcern(); } @@ -3447,7 +3447,7 @@ bool ReplicationCoordinatorImpl::isReplEnabled() const { } HostAndPort ReplicationCoordinatorImpl::chooseNewSyncSource(const OpTime& lastOpTimeFetched) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); HostAndPort oldSyncSource = _topCoord->getSyncSourceAddress(); // Always allow chaining while in catchup and drain mode. @@ -3472,12 +3472,12 @@ void ReplicationCoordinatorImpl::_unblacklistSyncSource( if (cbData.status == ErrorCodes::CallbackCanceled) return; - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); _topCoord->unblacklistSyncSource(host, _replExecutor->now()); } void ReplicationCoordinatorImpl::blacklistSyncSource(const HostAndPort& host, Date_t until) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); _topCoord->blacklistSyncSource(host, until); _scheduleWorkAt(until, [=](const executor::TaskExecutor::CallbackArgs& cbData) { _unblacklistSyncSource(cbData, host); @@ -3501,7 +3501,7 @@ void ReplicationCoordinatorImpl::resetLastOpTimesFromOplog(OperationContext* opC _externalState->setGlobalTimestamp(opCtx->getServiceContext(), lastOpTimeAndWallTime.opTime.getTimestamp()); - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); bool isRollbackAllowed = true; _setMyLastAppliedOpTimeAndWallTime(lock, lastOpTimeAndWallTime, isRollbackAllowed, consistency); _setMyLastDurableOpTimeAndWallTime(lock, lastOpTimeAndWallTime, isRollbackAllowed); @@ -3512,7 +3512,7 @@ bool ReplicationCoordinatorImpl::shouldChangeSyncSource( const HostAndPort& currentSource, const rpc::ReplSetMetadata& replMetadata, boost::optional<rpc::OplogQueryMetadata> oqMetadata) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _topCoord->shouldChangeSyncSource( currentSource, replMetadata, oqMetadata, _replExecutor->now()); } @@ -3608,7 +3608,7 @@ void ReplicationCoordinatorImpl::_cleanupStableOpTimeCandidates( boost::optional<OpTimeAndWallTime> ReplicationCoordinatorImpl::chooseStableOpTimeFromCandidates_forTest( const std::set<OpTimeAndWallTime>& candidates, const OpTimeAndWallTime& maximumStableOpTime) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _chooseStableOpTimeFromCandidates(lk, candidates, maximumStableOpTime); } void ReplicationCoordinatorImpl::cleanupStableOpTimeCandidates_forTest( @@ -3617,12 +3617,12 @@ void ReplicationCoordinatorImpl::cleanupStableOpTimeCandidates_forTest( } std::set<OpTimeAndWallTime> ReplicationCoordinatorImpl::getStableOpTimeCandidates_forTest() { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); return _stableOpTimeCandidates; } void ReplicationCoordinatorImpl::attemptToAdvanceStableTimestamp() { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); _setStableTimestampForStorage(lk); } @@ -3704,7 +3704,7 @@ void ReplicationCoordinatorImpl::_setStableTimestampForStorage(WithLock lk) { void ReplicationCoordinatorImpl::advanceCommitPoint( const OpTimeAndWallTime& committedOpTimeAndWallTime, bool fromSyncSource) { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); _advanceCommitPoint(lk, committedOpTimeAndWallTime, fromSyncSource); } @@ -3726,12 +3726,12 @@ void ReplicationCoordinatorImpl::_advanceCommitPoint( } OpTime ReplicationCoordinatorImpl::getLastCommittedOpTime() const { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); return _topCoord->getLastCommittedOpTime(); } OpTimeAndWallTime ReplicationCoordinatorImpl::getLastCommittedOpTimeAndWallTime() const { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); return _topCoord->getLastCommittedOpTimeAndWallTime(); } @@ -3745,7 +3745,7 @@ Status ReplicationCoordinatorImpl::processReplSetRequestVotes( return termStatus; { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); // We should only enter terminal shutdown from global terminal exit. In that case, rather // than voting in a term we don't plan to stay alive in, refuse to vote. @@ -3812,7 +3812,7 @@ void ReplicationCoordinatorImpl::prepareReplMetadata(const BSONObj& metadataRequ invariant(-1 != rbid); } - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); if (hasReplSetMetadata) { _prepareReplSetMetadata_inlock(lastOpTimeFromClient, builder); @@ -3847,7 +3847,7 @@ bool ReplicationCoordinatorImpl::getWriteConcernMajorityShouldJournal_inlock() c Status ReplicationCoordinatorImpl::processHeartbeatV1(const ReplSetHeartbeatArgsV1& args, ReplSetHeartbeatResponse* response) { { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); if (_rsConfigState == kConfigPreStart || _rsConfigState == kConfigStartingUp) { return Status(ErrorCodes::NotYetInitialized, "Received heartbeat while still initializing replication system"); @@ -3855,7 +3855,7 @@ Status ReplicationCoordinatorImpl::processHeartbeatV1(const ReplSetHeartbeatArgs } Status result(ErrorCodes::InternalError, "didn't set status in prepareHeartbeatResponse"); - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); auto senderHost(args.getSenderHost()); const Date_t now = _replExecutor->now(); @@ -3888,7 +3888,7 @@ long long ReplicationCoordinatorImpl::getTerm() { EventHandle ReplicationCoordinatorImpl::updateTerm_forTest( long long term, TopologyCoordinator::UpdateTermResult* updateResult) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); EventHandle finishEvh; finishEvh = _updateTerm_inlock(term, updateResult); @@ -3907,7 +3907,7 @@ Status ReplicationCoordinatorImpl::updateTerm(OperationContext* opCtx, long long EventHandle finishEvh; { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); finishEvh = _updateTerm_inlock(term, &updateTermResult); } @@ -3960,7 +3960,7 @@ EventHandle ReplicationCoordinatorImpl::_updateTerm_inlock( void ReplicationCoordinatorImpl::waitUntilSnapshotCommitted(OperationContext* opCtx, const Timestamp& untilSnapshot) { - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); uassert(ErrorCodes::NotYetInitialized, "Cannot use snapshots until replica set is finished initializing.", @@ -3976,7 +3976,7 @@ size_t ReplicationCoordinatorImpl::getNumUncommittedSnapshots() { } void ReplicationCoordinatorImpl::createWMajorityWriteAvailabilityDateWaiter(OpTime opTime) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); auto opTimeCB = [this, opTime]() { ReplicationMetrics::get(getServiceContext()) .setWMajorityWriteAvailabilityDate(_replExecutor->now()); @@ -4022,7 +4022,7 @@ bool ReplicationCoordinatorImpl::_updateCommittedSnapshot( } void ReplicationCoordinatorImpl::dropAllSnapshots() { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); _dropAllSnapshots_inlock(); } @@ -4068,7 +4068,7 @@ EventHandle ReplicationCoordinatorImpl::_makeEvent() { WriteConcernOptions ReplicationCoordinatorImpl::populateUnsetWriteConcernOptionsSyncMode( WriteConcernOptions wc) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _populateUnsetWriteConcernOptionsSyncMode(lock, wc); } @@ -4104,7 +4104,7 @@ Status ReplicationCoordinatorImpl::stepUpIfEligible(bool skipDryRun) { EventHandle finishEvent; { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); finishEvent = _electionFinishedEvent; } if (finishEvent.isValid()) { @@ -4114,7 +4114,7 @@ Status ReplicationCoordinatorImpl::stepUpIfEligible(bool skipDryRun) { // Step up is considered successful only if we are currently a primary and we are not in the // process of stepping down. If we know we are going to step down, we should fail the // replSetStepUp command so caller can retry if necessary. - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); if (!_getMemberState_inlock().primary()) return Status(ErrorCodes::CommandFailed, "Election failed."); else if (_topCoord->isSteppingDown()) @@ -4137,7 +4137,7 @@ int64_t ReplicationCoordinatorImpl::_nextRandomInt64_inlock(int64_t limit) { } bool ReplicationCoordinatorImpl::setContainsArbiter() const { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _rsConfig.containsArbiter(); } diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index 503186a1e8e..07bdf226a80 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -569,7 +569,7 @@ private: // Tracks number of operations left running on step down. size_t _userOpsRunning = 0; // Protects killSignaled and stopKillingOps cond. variable. - stdx::mutex _mutex; + Mutex _mutex = MONGO_MAKE_LATCH("AutoGetRstlForStepUpStepDown::_mutex"); // Signals thread about the change of killSignaled value. stdx::condition_variable _stopKillingOps; // Once this is set to true, the killOpThreadFn method will terminate. @@ -800,7 +800,7 @@ private: * Helper method for _awaitReplication that takes an already locked unique_lock, but leaves * operation timing to the caller. */ - Status _awaitReplication_inlock(stdx::unique_lock<stdx::mutex>* lock, + Status _awaitReplication_inlock(stdx::unique_lock<Latch>* lock, OperationContext* opCtx, const OpTime& opTime, const WriteConcernOptions& writeConcern); @@ -852,7 +852,7 @@ private: * * Lock will be released after this method finishes. */ - void _reportUpstream_inlock(stdx::unique_lock<stdx::mutex> lock); + void _reportUpstream_inlock(stdx::unique_lock<Latch> lock); /** * Helpers to set the last applied and durable OpTime. @@ -1135,10 +1135,10 @@ private: * * Requires "lock" to own _mutex, and returns the same unique_lock. */ - stdx::unique_lock<stdx::mutex> _handleHeartbeatResponseAction_inlock( + stdx::unique_lock<Latch> _handleHeartbeatResponseAction_inlock( const HeartbeatResponseAction& action, const StatusWith<ReplSetHeartbeatResponse>& responseStatus, - stdx::unique_lock<stdx::mutex> lock); + stdx::unique_lock<Latch> lock); /** * Updates the last committed OpTime to be 'committedOpTime' if it is more recent than the @@ -1360,7 +1360,7 @@ private: // (I) Independently synchronized, see member variable comment. // Protects member data of this ReplicationCoordinator. - mutable stdx::mutex _mutex; // (S) + mutable Mutex _mutex = MONGO_MAKE_LATCH("ReplicationCoordinatorImpl::_mutex"); // (S) // Handles to actively queued heartbeats. HeartbeatHandles _heartbeatHandles; // (M) diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp index 04f191ad81c..cebb98aef11 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp @@ -37,7 +37,7 @@ #include "mongo/db/repl/replication_metrics.h" #include "mongo/db/repl/topology_coordinator.h" #include "mongo/db/repl/vote_requester.h" -#include "mongo/stdx/mutex.h" +#include "mongo/platform/mutex.h" #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" @@ -93,7 +93,7 @@ public: }; void ReplicationCoordinatorImpl::_startElectSelfV1(StartElectionReasonEnum reason) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _startElectSelfV1_inlock(reason); } @@ -181,7 +181,7 @@ void ReplicationCoordinatorImpl::_startElectSelfV1_inlock(StartElectionReasonEnu void ReplicationCoordinatorImpl::_processDryRunResult(long long originalTerm, StartElectionReasonEnum reason) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); LoseElectionDryRunGuardV1 lossGuard(this); invariant(_voteRequester); @@ -285,7 +285,7 @@ void ReplicationCoordinatorImpl::_writeLastVoteForMyElection( return _externalState->storeLocalLastVoteDocument(opCtx.get(), lastVote); }(); - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); LoseElectionDryRunGuardV1 lossGuard(this); if (status == ErrorCodes::CallbackCanceled) { return; @@ -331,7 +331,7 @@ MONGO_FAIL_POINT_DEFINE(electionHangsBeforeUpdateMemberState); void ReplicationCoordinatorImpl::_onVoteRequestComplete(long long newTerm, StartElectionReasonEnum reason) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); LoseElectionGuardV1 lossGuard(this); invariant(_voteRequester); diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index faec4c34d41..5b2a7730e04 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -53,10 +53,10 @@ #include "mongo/db/repl/topology_coordinator.h" #include "mongo/db/repl/vote_requester.h" #include "mongo/db/service_context.h" +#include "mongo/platform/mutex.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/stdx/functional.h" -#include "mongo/stdx/mutex.h" #include "mongo/util/assert_util.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" @@ -91,7 +91,7 @@ Milliseconds ReplicationCoordinatorImpl::_getRandomizedElectionOffset_inlock() { void ReplicationCoordinatorImpl::_doMemberHeartbeat(executor::TaskExecutor::CallbackArgs cbData, const HostAndPort& target, int targetIndex) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _untrackHeartbeatHandle_inlock(cbData.myHandle); if (cbData.status == ErrorCodes::CallbackCanceled) { @@ -131,7 +131,7 @@ void ReplicationCoordinatorImpl::_scheduleHeartbeatToTarget_inlock(const HostAnd void ReplicationCoordinatorImpl::_handleHeartbeatResponse( const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData, int targetIndex) { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); // remove handle from queued heartbeats _untrackHeartbeatHandle_inlock(cbData.myHandle); @@ -263,10 +263,10 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse( _handleHeartbeatResponseAction_inlock(action, hbStatusResponse, std::move(lk)); } -stdx::unique_lock<stdx::mutex> ReplicationCoordinatorImpl::_handleHeartbeatResponseAction_inlock( +stdx::unique_lock<Latch> ReplicationCoordinatorImpl::_handleHeartbeatResponseAction_inlock( const HeartbeatResponseAction& action, const StatusWith<ReplSetHeartbeatResponse>& responseStatus, - stdx::unique_lock<stdx::mutex> lock) { + stdx::unique_lock<Latch> lock) { invariant(lock.owns_lock()); switch (action.getAction()) { case HeartbeatResponseAction::NoAction: @@ -391,7 +391,7 @@ void ReplicationCoordinatorImpl::_stepDownFinish( "Blocking until fail point is disabled."; auto inShutdown = [&] { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _inShutdown; }; @@ -406,7 +406,7 @@ void ReplicationCoordinatorImpl::_stepDownFinish( // have taken global lock in S mode and operations blocked on prepare conflict will be killed to // avoid 3-way deadlock between read, prepared transaction and step down thread. AutoGetRstlForStepUpStepDown arsd(this, opCtx.get()); - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); // This node has already stepped down due to reconfig. So, signal anyone who is waiting on the // step down event. @@ -512,7 +512,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore( _externalState.get(), newConfig, getGlobalServiceContext()); if (myIndex.getStatus() == ErrorCodes::NodeNotFound) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); // If this node absent in newConfig, and this node was not previously initialized, // return to kConfigUninitialized immediately, rather than storing the config and // transitioning into the RS_REMOVED state. See SERVER-15740. @@ -538,7 +538,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore( auto status = _externalState->storeLocalConfigDocument(opCtx.get(), newConfig.toBSON()); bool isFirstConfig; { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); isFirstConfig = !_rsConfig.isInitialized(); if (!status.isOK()) { error() << "Ignoring new configuration in heartbeat response because we failed to" @@ -609,7 +609,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish( // we have already set our ReplicationCoordinatorImpl::_rsConfigState state to // "kConfigReconfiguring" which prevents new elections from happening. { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); if (auto electionFinishedEvent = _cancelElectionIfNeeded_inlock()) { LOG_FOR_HEARTBEATS(0) << "Waiting for election to complete before finishing reconfig to version " @@ -628,7 +628,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish( auto opCtx = cc().makeOperationContext(); boost::optional<AutoGetRstlForStepUpStepDown> arsd; - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); if (_shouldStepDownOnReconfig(lk, newConfig, myIndex)) { _topCoord->prepareForUnconditionalStepDown(); lk.unlock(); @@ -755,7 +755,7 @@ void ReplicationCoordinatorImpl::_startHeartbeats_inlock() { void ReplicationCoordinatorImpl::_handleLivenessTimeout( const executor::TaskExecutor::CallbackArgs& cbData) { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); // Only reset the callback handle if it matches, otherwise more will be coming through if (cbData.myHandle == _handleLivenessTimeoutCbh) { _handleLivenessTimeoutCbh = CallbackHandle(); @@ -878,7 +878,7 @@ void ReplicationCoordinatorImpl::_cancelAndRescheduleElectionTimeout_inlock() { } void ReplicationCoordinatorImpl::_startElectSelfIfEligibleV1(StartElectionReasonEnum reason) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); // If it is not a single node replica set, no need to start an election after stepdown timeout. if (reason == StartElectionReasonEnum::kSingleNodePromptElection && _rsConfig.getNumMembers() != 1) { diff --git a/src/mongo/db/repl/replication_metrics.cpp b/src/mongo/db/repl/replication_metrics.cpp index e7d55c50660..addb4f7027d 100644 --- a/src/mongo/db/repl/replication_metrics.cpp +++ b/src/mongo/db/repl/replication_metrics.cpp @@ -57,7 +57,7 @@ ReplicationMetrics::ReplicationMetrics() ReplicationMetrics::~ReplicationMetrics() {} void ReplicationMetrics::incrementNumElectionsCalledForReason(StartElectionReasonEnum reason) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); switch (reason) { case StartElectionReasonEnum::kStepUpRequest: case StartElectionReasonEnum::kStepUpRequestSkipDryRun: { @@ -89,7 +89,7 @@ void ReplicationMetrics::incrementNumElectionsCalledForReason(StartElectionReaso } void ReplicationMetrics::incrementNumElectionsSuccessfulForReason(StartElectionReasonEnum reason) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); switch (reason) { case StartElectionReasonEnum::kStepUpRequest: case StartElectionReasonEnum::kStepUpRequestSkipDryRun: { @@ -121,20 +121,20 @@ void ReplicationMetrics::incrementNumElectionsSuccessfulForReason(StartElectionR } void ReplicationMetrics::incrementNumStepDownsCausedByHigherTerm() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _electionMetrics.setNumStepDownsCausedByHigherTerm( _electionMetrics.getNumStepDownsCausedByHigherTerm() + 1); } void ReplicationMetrics::incrementNumCatchUps() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _electionMetrics.setNumCatchUps(_electionMetrics.getNumCatchUps() + 1); _updateAverageCatchUpOps(lk); } void ReplicationMetrics::incrementNumCatchUpsConcludedForReason( ReplicationCoordinator::PrimaryCatchUpConclusionReason reason) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); switch (reason) { case ReplicationCoordinator::PrimaryCatchUpConclusionReason::kSucceeded: _electionMetrics.setNumCatchUpsSucceeded(_electionMetrics.getNumCatchUpsSucceeded() + @@ -167,97 +167,97 @@ void ReplicationMetrics::incrementNumCatchUpsConcludedForReason( } long ReplicationMetrics::getNumStepUpCmdsCalled_forTesting() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _electionMetrics.getStepUpCmd().getCalled(); } long ReplicationMetrics::getNumPriorityTakeoversCalled_forTesting() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _electionMetrics.getPriorityTakeover().getCalled(); } long ReplicationMetrics::getNumCatchUpTakeoversCalled_forTesting() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _electionMetrics.getCatchUpTakeover().getCalled(); } long ReplicationMetrics::getNumElectionTimeoutsCalled_forTesting() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _electionMetrics.getElectionTimeout().getCalled(); } long ReplicationMetrics::getNumFreezeTimeoutsCalled_forTesting() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _electionMetrics.getFreezeTimeout().getCalled(); } long ReplicationMetrics::getNumStepUpCmdsSuccessful_forTesting() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _electionMetrics.getStepUpCmd().getSuccessful(); } long ReplicationMetrics::getNumPriorityTakeoversSuccessful_forTesting() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _electionMetrics.getPriorityTakeover().getSuccessful(); } long ReplicationMetrics::getNumCatchUpTakeoversSuccessful_forTesting() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _electionMetrics.getCatchUpTakeover().getSuccessful(); } long ReplicationMetrics::getNumElectionTimeoutsSuccessful_forTesting() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _electionMetrics.getElectionTimeout().getSuccessful(); } long ReplicationMetrics::getNumFreezeTimeoutsSuccessful_forTesting() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _electionMetrics.getFreezeTimeout().getSuccessful(); } long ReplicationMetrics::getNumStepDownsCausedByHigherTerm_forTesting() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _electionMetrics.getNumStepDownsCausedByHigherTerm(); } long ReplicationMetrics::getNumCatchUps_forTesting() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _electionMetrics.getNumCatchUps(); } long ReplicationMetrics::getNumCatchUpsSucceeded_forTesting() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _electionMetrics.getNumCatchUpsSucceeded(); } long ReplicationMetrics::getNumCatchUpsAlreadyCaughtUp_forTesting() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _electionMetrics.getNumCatchUpsAlreadyCaughtUp(); } long ReplicationMetrics::getNumCatchUpsSkipped_forTesting() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _electionMetrics.getNumCatchUpsSkipped(); } long ReplicationMetrics::getNumCatchUpsTimedOut_forTesting() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _electionMetrics.getNumCatchUpsTimedOut(); } long ReplicationMetrics::getNumCatchUpsFailedWithError_forTesting() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _electionMetrics.getNumCatchUpsFailedWithError(); } long ReplicationMetrics::getNumCatchUpsFailedWithNewTerm_forTesting() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _electionMetrics.getNumCatchUpsFailedWithNewTerm(); } long ReplicationMetrics::getNumCatchUpsFailedWithReplSetAbortPrimaryCatchUpCmd_forTesting() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _electionMetrics.getNumCatchUpsFailedWithReplSetAbortPrimaryCatchUpCmd(); } @@ -272,7 +272,7 @@ void ReplicationMetrics::setElectionCandidateMetrics( const Milliseconds electionTimeout, const boost::optional<int> priorPrimaryMemberId) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _nodeIsCandidateOrPrimary = true; _electionCandidateMetrics.setLastElectionReason(reason); @@ -288,12 +288,12 @@ void ReplicationMetrics::setElectionCandidateMetrics( } void ReplicationMetrics::setTargetCatchupOpTime(OpTime opTime) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _electionCandidateMetrics.setTargetCatchupOpTime(opTime); } void ReplicationMetrics::setNumCatchUpOps(long numCatchUpOps) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); invariant(numCatchUpOps >= 0); _electionCandidateMetrics.setNumCatchUpOps(numCatchUpOps); _totalNumCatchUpOps += numCatchUpOps; @@ -301,27 +301,27 @@ void ReplicationMetrics::setNumCatchUpOps(long numCatchUpOps) { } void ReplicationMetrics::setCandidateNewTermStartDate(Date_t newTermStartDate) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _electionCandidateMetrics.setNewTermStartDate(newTermStartDate); } void ReplicationMetrics::setWMajorityWriteAvailabilityDate(Date_t wMajorityWriteAvailabilityDate) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _electionCandidateMetrics.setWMajorityWriteAvailabilityDate(wMajorityWriteAvailabilityDate); } boost::optional<OpTime> ReplicationMetrics::getTargetCatchupOpTime_forTesting() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _electionCandidateMetrics.getTargetCatchupOpTime(); } BSONObj ReplicationMetrics::getElectionMetricsBSON() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _electionMetrics.toBSON(); } BSONObj ReplicationMetrics::getElectionCandidateMetricsBSON() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); if (_nodeIsCandidateOrPrimary) { return _electionCandidateMetrics.toBSON(); } @@ -329,7 +329,7 @@ BSONObj ReplicationMetrics::getElectionCandidateMetricsBSON() { } void ReplicationMetrics::clearElectionCandidateMetrics() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _electionCandidateMetrics.setTargetCatchupOpTime(boost::none); _electionCandidateMetrics.setNumCatchUpOps(boost::none); _electionCandidateMetrics.setNewTermStartDate(boost::none); @@ -345,7 +345,7 @@ void ReplicationMetrics::setElectionParticipantMetrics(const bool votedForCandid const OpTime lastAppliedOpTime, const OpTime maxAppliedOpTimeInSet, const double priorityAtElection) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _nodeHasVotedInElection = true; _electionParticipantMetrics.setVotedForCandidate(votedForCandidate); @@ -359,7 +359,7 @@ void ReplicationMetrics::setElectionParticipantMetrics(const bool votedForCandid } BSONObj ReplicationMetrics::getElectionParticipantMetricsBSON() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); if (_nodeHasVotedInElection) { return _electionParticipantMetrics.toBSON(); } @@ -368,13 +368,13 @@ BSONObj ReplicationMetrics::getElectionParticipantMetricsBSON() { void ReplicationMetrics::setParticipantNewTermDates(Date_t newTermStartDate, Date_t newTermAppliedDate) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _electionParticipantMetrics.setNewTermStartDate(newTermStartDate); _electionParticipantMetrics.setNewTermAppliedDate(newTermAppliedDate); } void ReplicationMetrics::clearParticipantNewTermDates() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _electionParticipantMetrics.setNewTermStartDate(boost::none); _electionParticipantMetrics.setNewTermAppliedDate(boost::none); } diff --git a/src/mongo/db/repl/replication_metrics.h b/src/mongo/db/repl/replication_metrics.h index 59d27ace445..108510bbcd8 100644 --- a/src/mongo/db/repl/replication_metrics.h +++ b/src/mongo/db/repl/replication_metrics.h @@ -32,7 +32,7 @@ #include "mongo/db/repl/replication_metrics_gen.h" #include "mongo/db/repl/topology_coordinator.h" #include "mongo/db/service_context.h" -#include "mongo/stdx/mutex.h" +#include "mongo/platform/mutex.h" namespace mongo { namespace repl { @@ -125,7 +125,7 @@ private: void _updateAverageCatchUpOps(WithLock lk); - mutable stdx::mutex _mutex; + mutable Mutex _mutex = MONGO_MAKE_LATCH("ReplicationMetrics::_mutex"); ElectionMetrics _electionMetrics; ElectionCandidateMetrics _electionCandidateMetrics; ElectionParticipantMetrics _electionParticipantMetrics; diff --git a/src/mongo/db/repl/replication_process.cpp b/src/mongo/db/repl/replication_process.cpp index d3e77314cd3..117972289af 100644 --- a/src/mongo/db/repl/replication_process.cpp +++ b/src/mongo/db/repl/replication_process.cpp @@ -84,7 +84,7 @@ ReplicationProcess::ReplicationProcess( _rbid(kUninitializedRollbackId) {} Status ReplicationProcess::refreshRollbackID(OperationContext* opCtx) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); auto rbidResult = _storageInterface->getRollbackID(opCtx); if (!rbidResult.isOK()) { @@ -102,7 +102,7 @@ Status ReplicationProcess::refreshRollbackID(OperationContext* opCtx) { } int ReplicationProcess::getRollbackID() const { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); if (kUninitializedRollbackId == _rbid) { // This may happen when serverStatus is called by an internal client before we have a chance // to read the rollback ID from storage. @@ -112,7 +112,7 @@ int ReplicationProcess::getRollbackID() const { } Status ReplicationProcess::initializeRollbackID(OperationContext* opCtx) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); invariant(kUninitializedRollbackId == _rbid); @@ -132,7 +132,7 @@ Status ReplicationProcess::initializeRollbackID(OperationContext* opCtx) { } Status ReplicationProcess::incrementRollbackID(OperationContext* opCtx) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); auto status = _storageInterface->incrementRollbackID(opCtx); diff --git a/src/mongo/db/repl/replication_process.h b/src/mongo/db/repl/replication_process.h index 849ac7df8c4..82c298d363d 100644 --- a/src/mongo/db/repl/replication_process.h +++ b/src/mongo/db/repl/replication_process.h @@ -38,7 +38,7 @@ #include "mongo/db/repl/optime.h" #include "mongo/db/repl/replication_consistency_markers.h" #include "mongo/db/repl/replication_recovery.h" -#include "mongo/stdx/mutex.h" +#include "mongo/platform/mutex.h" namespace mongo { @@ -103,7 +103,7 @@ private: // (M) Reads and writes guarded by _mutex. // Guards access to member variables. - mutable stdx::mutex _mutex; + mutable Mutex _mutex = MONGO_MAKE_LATCH("ReplicationProcess::_mutex"); // Used to access the storage layer. StorageInterface* const _storageInterface; // (R) diff --git a/src/mongo/db/repl/replication_recovery_test.cpp b/src/mongo/db/repl/replication_recovery_test.cpp index c97746080e5..bf440816b12 100644 --- a/src/mongo/db/repl/replication_recovery_test.cpp +++ b/src/mongo/db/repl/replication_recovery_test.cpp @@ -63,47 +63,47 @@ const NamespaceString testNs("a.a"); class StorageInterfaceRecovery : public StorageInterfaceImpl { public: boost::optional<Timestamp> getRecoveryTimestamp(ServiceContext* serviceCtx) const override { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _recoveryTimestamp; } void setRecoveryTimestamp(Timestamp recoveryTimestamp) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); _recoveryTimestamp = recoveryTimestamp; } bool supportsRecoverToStableTimestamp(ServiceContext* serviceCtx) const override { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _supportsRecoverToStableTimestamp; } void setSupportsRecoverToStableTimestamp(bool supports) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); _supportsRecoverToStableTimestamp = supports; } bool supportsRecoveryTimestamp(ServiceContext* serviceCtx) const override { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _supportsRecoveryTimestamp; } void setSupportsRecoveryTimestamp(bool supports) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); _supportsRecoveryTimestamp = supports; } void setPointInTimeReadTimestamp(Timestamp pointInTimeReadTimestamp) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); _pointInTimeReadTimestamp = pointInTimeReadTimestamp; } Timestamp getPointInTimeReadTimestamp(OperationContext* opCtx) const override { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _pointInTimeReadTimestamp; } private: - mutable stdx::mutex _mutex; + mutable Mutex _mutex = MONGO_MAKE_LATCH("StorageInterfaceRecovery::_mutex"); Timestamp _initialDataTimestamp = Timestamp::min(); boost::optional<Timestamp> _recoveryTimestamp = boost::none; Timestamp _pointInTimeReadTimestamp = {}; diff --git a/src/mongo/db/repl/reporter.cpp b/src/mongo/db/repl/reporter.cpp index 6ad2390d3c0..451c00615f9 100644 --- a/src/mongo/db/repl/reporter.cpp +++ b/src/mongo/db/repl/reporter.cpp @@ -118,17 +118,17 @@ std::string Reporter::toString() const { } HostAndPort Reporter::getTarget() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _target; } Milliseconds Reporter::getKeepAliveInterval() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _keepAliveInterval; } void Reporter::shutdown() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _status = Status(ErrorCodes::CallbackCanceled, "Reporter no longer valid"); @@ -152,13 +152,13 @@ void Reporter::shutdown() { } Status Reporter::join() { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); _condition.wait(lk, [this]() { return !_isActive_inlock(); }); return _status; } Status Reporter::trigger() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); // If these was a previous error then the reporter is dead and return that error. if (!_status.isOK()) { @@ -196,7 +196,7 @@ Status Reporter::trigger() { StatusWith<BSONObj> Reporter::_prepareCommand() { auto prepareResult = _prepareReplSetUpdatePositionCommandFn(); - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); // Reporter could have been canceled while preparing the command. if (!_status.isOK()) { @@ -239,7 +239,7 @@ void Reporter::_sendCommand_inlock(BSONObj commandRequest, Milliseconds netTimeo void Reporter::_processResponseCallback( const executor::TaskExecutor::RemoteCommandCallbackArgs& rcbd) { { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); // If the reporter was shut down before this callback is invoked, // return the canceled "_status". @@ -299,7 +299,7 @@ void Reporter::_processResponseCallback( // Must call without holding the lock. auto prepareResult = _prepareCommand(); - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); if (!_status.isOK()) { _onShutdown_inlock(); return; @@ -318,7 +318,7 @@ void Reporter::_processResponseCallback( void Reporter::_prepareAndSendCommandCallback(const executor::TaskExecutor::CallbackArgs& args, bool fromTrigger) { { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); if (!_status.isOK()) { _onShutdown_inlock(); return; @@ -341,7 +341,7 @@ void Reporter::_prepareAndSendCommandCallback(const executor::TaskExecutor::Call // Must call without holding the lock. auto prepareResult = _prepareCommand(); - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); if (!_status.isOK()) { _onShutdown_inlock(); return; @@ -367,7 +367,7 @@ void Reporter::_onShutdown_inlock() { } bool Reporter::isActive() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _isActive_inlock(); } @@ -376,12 +376,12 @@ bool Reporter::_isActive_inlock() const { } bool Reporter::isWaitingToSendReport() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _isWaitingToSendReporter; } Date_t Reporter::getKeepAliveTimeoutWhen_forTest() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _keepAliveTimeoutWhen; } diff --git a/src/mongo/db/repl/reporter.h b/src/mongo/db/repl/reporter.h index f332401ea2e..6e41083635a 100644 --- a/src/mongo/db/repl/reporter.h +++ b/src/mongo/db/repl/reporter.h @@ -34,9 +34,9 @@ #include "mongo/db/jsobj.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/executor/task_executor.h" +#include "mongo/platform/mutex.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/functional.h" -#include "mongo/stdx/mutex.h" #include "mongo/util/time_support.h" namespace mongo { @@ -187,7 +187,7 @@ private: const Milliseconds _updatePositionTimeout; // Protects member data of this Reporter declared below. - mutable stdx::mutex _mutex; + mutable Mutex _mutex = MONGO_MAKE_LATCH("Reporter::_mutex"); mutable stdx::condition_variable _condition; diff --git a/src/mongo/db/repl/rollback_checker.cpp b/src/mongo/db/repl/rollback_checker.cpp index cb5e57f6ae9..9089163aae5 100644 --- a/src/mongo/db/repl/rollback_checker.cpp +++ b/src/mongo/db/repl/rollback_checker.cpp @@ -33,14 +33,13 @@ #include "mongo/db/repl/rollback_checker.h" -#include "mongo/stdx/mutex.h" #include "mongo/util/log.h" namespace mongo { namespace repl { using RemoteCommandCallbackArgs = executor::TaskExecutor::RemoteCommandCallbackArgs; -using UniqueLock = stdx::unique_lock<stdx::mutex>; +using UniqueLock = stdx::unique_lock<Latch>; RollbackChecker::RollbackChecker(executor::TaskExecutor* executor, HostAndPort syncSource) : _executor(executor), _syncSource(syncSource), _baseRBID(-1), _lastRBID(-1) { @@ -121,12 +120,12 @@ Status RollbackChecker::reset_sync() { } int RollbackChecker::getBaseRBID() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _baseRBID; } int RollbackChecker::getLastRBID_forTest() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _lastRBID; } diff --git a/src/mongo/db/repl/rollback_checker.h b/src/mongo/db/repl/rollback_checker.h index ed589e57c7c..d1397cccae4 100644 --- a/src/mongo/db/repl/rollback_checker.h +++ b/src/mongo/db/repl/rollback_checker.h @@ -31,12 +31,11 @@ #include "mongo/base/status_with.h" #include "mongo/executor/task_executor.h" +#include "mongo/platform/mutex.h" namespace mongo { namespace repl { -class Mutex; - /** * The RollbackChecker maintains a sync source and its baseline rollback ID (rbid). It * contains methods to check if a rollback occurred by checking if the rbid has changed since @@ -119,7 +118,7 @@ private: executor::TaskExecutor* const _executor; // Protects member data of this RollbackChecker. - mutable stdx::mutex _mutex; + mutable Mutex _mutex = MONGO_MAKE_LATCH("RollbackChecker::_mutex"); // The sync source to check for rollbacks against. HostAndPort _syncSource; diff --git a/src/mongo/db/repl/rollback_checker_test.cpp b/src/mongo/db/repl/rollback_checker_test.cpp index 21ff4dbd149..1dd737557d1 100644 --- a/src/mongo/db/repl/rollback_checker_test.cpp +++ b/src/mongo/db/repl/rollback_checker_test.cpp @@ -45,7 +45,7 @@ using namespace mongo::repl; using executor::NetworkInterfaceMock; using executor::RemoteCommandResponse; -using LockGuard = stdx::lock_guard<stdx::mutex>; +using LockGuard = stdx::lock_guard<Latch>; class RollbackCheckerTest : public executor::ThreadPoolExecutorTest { public: @@ -57,7 +57,7 @@ protected: std::unique_ptr<RollbackChecker> _rollbackChecker; RollbackChecker::Result _hasRolledBackResult = {ErrorCodes::NotYetInitialized, ""}; bool _hasCalledCallback; - mutable stdx::mutex _mutex; + mutable Mutex _mutex = MONGO_MAKE_LATCH("RollbackCheckerTest::_mutex"); }; void RollbackCheckerTest::setUp() { @@ -65,7 +65,7 @@ void RollbackCheckerTest::setUp() { launchExecutorThread(); getNet()->enterNetwork(); _rollbackChecker = stdx::make_unique<RollbackChecker>(&getExecutor(), HostAndPort()); - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _hasRolledBackResult = {ErrorCodes::NotYetInitialized, ""}; _hasCalledCallback = false; } diff --git a/src/mongo/db/repl/rollback_impl.cpp b/src/mongo/db/repl/rollback_impl.cpp index 73c484ec452..ae022c6e09e 100644 --- a/src/mongo/db/repl/rollback_impl.cpp +++ b/src/mongo/db/repl/rollback_impl.cpp @@ -264,12 +264,12 @@ Status RollbackImpl::runRollback(OperationContext* opCtx) { } void RollbackImpl::shutdown() { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); _inShutdown = true; } bool RollbackImpl::_isInShutdown() const { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _inShutdown; } diff --git a/src/mongo/db/repl/rollback_impl.h b/src/mongo/db/repl/rollback_impl.h index 660231c4dbc..424b394fa95 100644 --- a/src/mongo/db/repl/rollback_impl.h +++ b/src/mongo/db/repl/rollback_impl.h @@ -448,7 +448,7 @@ private: void _resetDropPendingState(OperationContext* opCtx); // Guards access to member variables. - mutable stdx::mutex _mutex; // (S) + mutable Mutex _mutex = MONGO_MAKE_LATCH("RollbackImpl::_mutex"); // (S) // Set to true when RollbackImpl should shut down. bool _inShutdown = false; // (M) diff --git a/src/mongo/db/repl/rollback_test_fixture.h b/src/mongo/db/repl/rollback_test_fixture.h index 8f03742bfd4..a8a1e1fd690 100644 --- a/src/mongo/db/repl/rollback_test_fixture.h +++ b/src/mongo/db/repl/rollback_test_fixture.h @@ -119,7 +119,7 @@ protected: class RollbackTest::StorageInterfaceRollback : public StorageInterfaceImpl { public: void setStableTimestamp(ServiceContext* serviceCtx, Timestamp snapshotName) override { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); _stableTimestamp = snapshotName; } @@ -129,7 +129,7 @@ public: * of '_currTimestamp'. */ StatusWith<Timestamp> recoverToStableTimestamp(OperationContext* opCtx) override { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); if (_recoverToTimestampStatus) { return _recoverToTimestampStatus.get(); } else { @@ -152,17 +152,17 @@ public: } void setRecoverToTimestampStatus(Status status) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); _recoverToTimestampStatus = status; } void setCurrentTimestamp(Timestamp ts) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); _currTimestamp = ts; } Timestamp getCurrentTimestamp() { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _currTimestamp; } @@ -172,7 +172,7 @@ public: Status setCollectionCount(OperationContext* opCtx, const NamespaceStringOrUUID& nsOrUUID, long long newCount) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); if (_setCollectionCountStatus && _setCollectionCountStatusUUID && nsOrUUID.uuid() == _setCollectionCountStatusUUID) { return *_setCollectionCountStatus; @@ -182,18 +182,18 @@ public: } void setSetCollectionCountStatus(UUID uuid, Status status) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); _setCollectionCountStatus = status; _setCollectionCountStatusUUID = uuid; } long long getFinalCollectionCount(const UUID& uuid) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _newCounts[uuid]; } private: - mutable stdx::mutex _mutex; + mutable Mutex _mutex = MONGO_MAKE_LATCH("StorageInterfaceRollback::_mutex"); Timestamp _stableTimestamp; diff --git a/src/mongo/db/repl/scatter_gather_runner.cpp b/src/mongo/db/repl/scatter_gather_runner.cpp index 77c87778308..aca7d774b50 100644 --- a/src/mongo/db/repl/scatter_gather_runner.cpp +++ b/src/mongo/db/repl/scatter_gather_runner.cpp @@ -46,7 +46,7 @@ namespace mongo { namespace repl { using executor::RemoteCommandRequest; -using LockGuard = stdx::lock_guard<stdx::mutex>; +using LockGuard = stdx::lock_guard<Latch>; using CallbackHandle = executor::TaskExecutor::CallbackHandle; using EventHandle = executor::TaskExecutor::EventHandle; using RemoteCommandCallbackArgs = executor::TaskExecutor::RemoteCommandCallbackArgs; diff --git a/src/mongo/db/repl/scatter_gather_runner.h b/src/mongo/db/repl/scatter_gather_runner.h index 4d65e417efb..831e58b44d5 100644 --- a/src/mongo/db/repl/scatter_gather_runner.h +++ b/src/mongo/db/repl/scatter_gather_runner.h @@ -32,8 +32,8 @@ #include <vector> #include "mongo/executor/task_executor.h" +#include "mongo/platform/mutex.h" #include "mongo/stdx/functional.h" -#include "mongo/stdx/mutex.h" namespace mongo { @@ -134,7 +134,7 @@ private: executor::TaskExecutor::EventHandle _sufficientResponsesReceived; std::vector<executor::TaskExecutor::CallbackHandle> _callbacks; bool _started = false; - stdx::mutex _mutex; + Mutex _mutex = MONGO_MAKE_LATCH("RunnerImpl::_mutex"); }; executor::TaskExecutor* _executor; // Not owned here. diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp index 19e7c8840fa..5c70c521a7b 100644 --- a/src/mongo/db/repl/storage_interface_impl.cpp +++ b/src/mongo/db/repl/storage_interface_impl.cpp @@ -87,7 +87,7 @@ const char StorageInterfaceImpl::kRollbackIdFieldName[] = "rollbackId"; const char StorageInterfaceImpl::kRollbackIdDocumentId[] = "rollbackId"; namespace { -using UniqueLock = stdx::unique_lock<stdx::mutex>; +using UniqueLock = stdx::unique_lock<Latch>; const auto kIdIndexName = "_id_"_sd; diff --git a/src/mongo/db/repl/storage_interface_mock.cpp b/src/mongo/db/repl/storage_interface_mock.cpp index 77936b4453d..e9fa17504be 100644 --- a/src/mongo/db/repl/storage_interface_mock.cpp +++ b/src/mongo/db/repl/storage_interface_mock.cpp @@ -41,7 +41,7 @@ namespace mongo { namespace repl { StatusWith<int> StorageInterfaceMock::getRollbackID(OperationContext* opCtx) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); if (!_rbidInitialized) { return Status(ErrorCodes::NamespaceNotFound, "Rollback ID not initialized"); } @@ -49,7 +49,7 @@ StatusWith<int> StorageInterfaceMock::getRollbackID(OperationContext* opCtx) { } StatusWith<int> StorageInterfaceMock::initializeRollbackID(OperationContext* opCtx) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); if (_rbidInitialized) { return Status(ErrorCodes::NamespaceExists, "Rollback ID already initialized"); } @@ -61,7 +61,7 @@ StatusWith<int> StorageInterfaceMock::initializeRollbackID(OperationContext* opC } StatusWith<int> StorageInterfaceMock::incrementRollbackID(OperationContext* opCtx) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); if (!_rbidInitialized) { return Status(ErrorCodes::NamespaceNotFound, "Rollback ID not initialized"); } @@ -70,23 +70,23 @@ StatusWith<int> StorageInterfaceMock::incrementRollbackID(OperationContext* opCt } void StorageInterfaceMock::setStableTimestamp(ServiceContext* serviceCtx, Timestamp snapshotName) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); _stableTimestamp = snapshotName; } void StorageInterfaceMock::setInitialDataTimestamp(ServiceContext* serviceCtx, Timestamp snapshotName) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); _initialDataTimestamp = snapshotName; } Timestamp StorageInterfaceMock::getStableTimestamp() const { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _stableTimestamp; } Timestamp StorageInterfaceMock::getInitialDataTimestamp() const { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _initialDataTimestamp; } diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h index cc031904cb8..68811f01bab 100644 --- a/src/mongo/db/repl/storage_interface_mock.h +++ b/src/mongo/db/repl/storage_interface_mock.h @@ -43,7 +43,7 @@ #include "mongo/bson/timestamp.h" #include "mongo/db/namespace_string.h" #include "mongo/db/repl/storage_interface.h" -#include "mongo/stdx/mutex.h" +#include "mongo/platform/mutex.h" namespace mongo { namespace repl { @@ -420,7 +420,7 @@ public: Timestamp oldestOpenReadTimestamp = Timestamp::min(); private: - mutable stdx::mutex _mutex; + mutable Mutex _mutex = MONGO_MAKE_LATCH("StorageInterfaceMock::_mutex"); int _rbid; bool _rbidInitialized = false; Timestamp _stableTimestamp = Timestamp::min(); diff --git a/src/mongo/db/repl/sync_source_feedback.cpp b/src/mongo/db/repl/sync_source_feedback.cpp index 012bad86797..03b5af98376 100644 --- a/src/mongo/db/repl/sync_source_feedback.cpp +++ b/src/mongo/db/repl/sync_source_feedback.cpp @@ -98,7 +98,7 @@ Reporter::PrepareReplSetUpdatePositionCommandFn makePrepareReplSetUpdatePosition void SyncSourceFeedback::forwardSlaveProgress() { { - stdx::unique_lock<stdx::mutex> lock(_mtx); + stdx::unique_lock<Latch> lock(_mtx); _positionChanged = true; _cond.notify_all(); if (_reporter) { @@ -133,7 +133,7 @@ Status SyncSourceFeedback::_updateUpstream(Reporter* reporter) { } void SyncSourceFeedback::shutdown() { - stdx::unique_lock<stdx::mutex> lock(_mtx); + stdx::unique_lock<Latch> lock(_mtx); if (_reporter) { _reporter->shutdown(); } @@ -161,7 +161,7 @@ void SyncSourceFeedback::run(executor::TaskExecutor* executor, // Take SyncSourceFeedback lock before calling into ReplicationCoordinator // to avoid deadlock because ReplicationCoordinator could conceivably calling back into // this class. - stdx::unique_lock<stdx::mutex> lock(_mtx); + stdx::unique_lock<Latch> lock(_mtx); while (!_positionChanged && !_shutdownSignaled) { { MONGO_IDLE_THREAD_BLOCK; @@ -184,7 +184,7 @@ void SyncSourceFeedback::run(executor::TaskExecutor* executor, } { - stdx::lock_guard<stdx::mutex> lock(_mtx); + stdx::lock_guard<Latch> lock(_mtx); MemberState state = replCoord->getMemberState(); if (state.primary() || state.startup()) { continue; @@ -220,14 +220,14 @@ void SyncSourceFeedback::run(executor::TaskExecutor* executor, keepAliveInterval, syncSourceFeedbackNetworkTimeoutSecs); { - stdx::lock_guard<stdx::mutex> lock(_mtx); + stdx::lock_guard<Latch> lock(_mtx); if (_shutdownSignaled) { break; } _reporter = &reporter; } ON_BLOCK_EXIT([this]() { - stdx::lock_guard<stdx::mutex> lock(_mtx); + stdx::lock_guard<Latch> lock(_mtx); _reporter = nullptr; }); diff --git a/src/mongo/db/repl/sync_source_feedback.h b/src/mongo/db/repl/sync_source_feedback.h index a75cb23ad64..fdec94bff72 100644 --- a/src/mongo/db/repl/sync_source_feedback.h +++ b/src/mongo/db/repl/sync_source_feedback.h @@ -32,8 +32,8 @@ #include "mongo/base/status.h" #include "mongo/db/repl/replication_coordinator.h" +#include "mongo/platform/mutex.h" #include "mongo/stdx/condition_variable.h" -#include "mongo/stdx/mutex.h" namespace mongo { struct HostAndPort; @@ -79,7 +79,7 @@ private: Status _updateUpstream(Reporter* reporter); // protects cond, _shutdownSignaled, _keepAliveInterval, and _positionChanged. - stdx::mutex _mtx; + Mutex _mtx = MONGO_MAKE_LATCH("SyncSourceFeedback::_mtx"); // used to alert our thread of changes which need to be passed up the chain stdx::condition_variable _cond; // used to indicate a position change which has not yet been pushed along diff --git a/src/mongo/db/repl/sync_source_resolver.cpp b/src/mongo/db/repl/sync_source_resolver.cpp index af82a940d35..2baa83477e1 100644 --- a/src/mongo/db/repl/sync_source_resolver.cpp +++ b/src/mongo/db/repl/sync_source_resolver.cpp @@ -84,7 +84,7 @@ SyncSourceResolver::~SyncSourceResolver() { } bool SyncSourceResolver::isActive() const { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _isActive_inlock(); } @@ -94,7 +94,7 @@ bool SyncSourceResolver::_isActive_inlock() const { Status SyncSourceResolver::startup() { { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); switch (_state) { case State::kPreStart: _state = State::kRunning; @@ -112,7 +112,7 @@ Status SyncSourceResolver::startup() { } void SyncSourceResolver::shutdown() { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); // Transition directly from PreStart to Complete if not started yet. if (State::kPreStart == _state) { _state = State::kComplete; @@ -136,12 +136,12 @@ void SyncSourceResolver::shutdown() { } void SyncSourceResolver::join() { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); _condition.wait(lk, [this]() { return !_isActive_inlock(); }); } bool SyncSourceResolver::_isShuttingDown() const { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return State::kShuttingDown == _state; } @@ -205,7 +205,7 @@ std::unique_ptr<Fetcher> SyncSourceResolver::_makeRequiredOpTimeFetcher(HostAndP } Status SyncSourceResolver::_scheduleFetcher(std::unique_ptr<Fetcher> fetcher) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); // TODO SERVER-27499 need to check if _state is kShuttingDown inside the mutex. // Must schedule fetcher inside lock in case fetcher's callback gets invoked immediately by task // executor. @@ -340,7 +340,7 @@ Status SyncSourceResolver::_scheduleRBIDRequest(HostAndPort candidate, OpTime ea // Once a work is scheduled, nothing prevents it finishing. We need the mutex to protect the // access of member variables after scheduling, because otherwise the scheduled callback could // finish and allow the destructor to fire before we access the member variables. - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); if (_state == State::kShuttingDown) { return Status( ErrorCodes::CallbackCanceled, @@ -529,7 +529,7 @@ Status SyncSourceResolver::_finishCallback(const SyncSourceResolverResponse& res << exceptionToStatus(); } - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); invariant(_state != State::kComplete); _state = State::kComplete; _condition.notify_all(); diff --git a/src/mongo/db/repl/sync_source_resolver.h b/src/mongo/db/repl/sync_source_resolver.h index bf38628ac32..6f13242e5e0 100644 --- a/src/mongo/db/repl/sync_source_resolver.h +++ b/src/mongo/db/repl/sync_source_resolver.h @@ -37,9 +37,9 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/repl/optime.h" #include "mongo/executor/task_executor.h" +#include "mongo/platform/mutex.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/functional.h" -#include "mongo/stdx/mutex.h" #include "mongo/util/net/hostandport.h" #include "mongo/util/time_support.h" @@ -234,7 +234,7 @@ private: const OnCompletionFn _onCompletion; // Protects members of this sync source resolver defined below. - mutable stdx::mutex _mutex; + mutable Mutex _mutex = MONGO_MAKE_LATCH("SyncSourceResolverResponse::_mutex"); mutable stdx::condition_variable _condition; // State transitions: diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 1fced6c80f0..c3b20fbf009 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -158,7 +158,7 @@ private: void _run(); // Protects _cond, _shutdownSignaled, and _latestOpTime. - stdx::mutex _mutex; + Mutex _mutex = MONGO_MAKE_LATCH("ApplyBatchFinalizerForJournal::_mutex"); // Used to alert our thread of a new OpTime. stdx::condition_variable _cond; // The next OpTime to set as the ReplicationCoordinator's lastOpTime after flushing. @@ -170,7 +170,7 @@ private: }; ApplyBatchFinalizerForJournal::~ApplyBatchFinalizerForJournal() { - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); _shutdownSignaled = true; _cond.notify_all(); lock.unlock(); @@ -182,7 +182,7 @@ void ApplyBatchFinalizerForJournal::record(const OpTimeAndWallTime& newOpTimeAnd ReplicationCoordinator::DataConsistency consistency) { _recordApplied(newOpTimeAndWallTime, consistency); - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); _latestOpTimeAndWallTime = newOpTimeAndWallTime; _cond.notify_all(); } @@ -194,7 +194,7 @@ void ApplyBatchFinalizerForJournal::_run() { OpTimeAndWallTime latestOpTimeAndWallTime = {OpTime(), Date_t()}; { - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); while (_latestOpTimeAndWallTime.opTime.isNull() && !_shutdownSignaled) { _cond.wait(lock); } @@ -601,7 +601,7 @@ public: } OpQueue getNextBatch(Seconds maxWaitTime) { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); // _ops can indicate the following cases: // 1. A new batch is ready to consume. // 2. Shutdown. @@ -713,7 +713,7 @@ private: } } - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); // Block until the previous batch has been taken. _cv.wait(lk, [&] { return _ops.empty() && !_ops.termWhenExhausted(); }); _ops = std::move(ops); @@ -730,7 +730,7 @@ private: OplogBuffer* const _oplogBuffer; OplogApplier::GetNextApplierBatchFn const _getNextApplierBatchFn; - stdx::mutex _mutex; // Guards _ops. + Mutex _mutex = MONGO_MAKE_LATCH("OpQueueBatcher::_mutex"); // Guards _ops. stdx::condition_variable _cv; OpQueue _ops; @@ -881,12 +881,12 @@ void SyncTail::shutdown() { fassertFailedNoTrace(40304); } - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); _inShutdown = true; } bool SyncTail::inShutdown() const { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _inShutdown; } diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index c4923544c29..1bff8061adb 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -42,8 +42,8 @@ #include "mongo/db/repl/replication_consistency_markers.h" #include "mongo/db/repl/session_update_tracker.h" #include "mongo/db/repl/storage_interface.h" +#include "mongo/platform/mutex.h" #include "mongo/stdx/functional.h" -#include "mongo/stdx/mutex.h" #include "mongo/util/concurrency/thread_pool.h" namespace mongo { @@ -277,7 +277,7 @@ private: const OplogApplier::Options _options; // Protects member data of SyncTail. - mutable stdx::mutex _mutex; + mutable Mutex _mutex = MONGO_MAKE_LATCH("SyncTail::_mutex"); // Set to true if shutdown() has been called. bool _inShutdown = false; diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp index 188e00875a9..22024e7d35c 100644 --- a/src/mongo/db/repl/sync_tail_test.cpp +++ b/src/mongo/db/repl/sync_tail_test.cpp @@ -64,7 +64,7 @@ #include "mongo/db/session_txn_record_gen.h" #include "mongo/db/stats/counters.h" #include "mongo/db/transaction_participant_gen.h" -#include "mongo/stdx/mutex.h" +#include "mongo/platform/mutex.h" #include "mongo/unittest/death_test.h" #include "mongo/unittest/unittest.h" #include "mongo/util/clock_source_mock.h" @@ -498,7 +498,7 @@ protected: _insertOp2->getOpTime()); _opObserver->onInsertsFn = [&](OperationContext*, const NamespaceString& nss, const std::vector<BSONObj>& docs) { - stdx::lock_guard<stdx::mutex> lock(_insertMutex); + stdx::lock_guard<Latch> lock(_insertMutex); if (nss.isOplog() || nss == _nss1 || nss == _nss2 || nss == NamespaceString::kSessionTransactionsTableNamespace) { _insertedDocs[nss].insert(_insertedDocs[nss].end(), docs.begin(), docs.end()); @@ -545,7 +545,7 @@ protected: std::unique_ptr<ThreadPool> _writerPool; private: - stdx::mutex _insertMutex; + Mutex _insertMutex = MONGO_MAKE_LATCH("MultiOplogEntrySyncTailTest::_insertMutex"); }; TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionSeparate) { @@ -881,7 +881,7 @@ protected: _abortSinglePrepareApplyOp; private: - stdx::mutex _insertMutex; + Mutex _insertMutex = MONGO_MAKE_LATCH("MultiOplogEntryPreparedTransactionTest::_insertMutex"); }; TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionSteadyState) { diff --git a/src/mongo/db/repl/task_runner.cpp b/src/mongo/db/repl/task_runner.cpp index 4c53b558aa1..86edc6da9c5 100644 --- a/src/mongo/db/repl/task_runner.cpp +++ b/src/mongo/db/repl/task_runner.cpp @@ -50,8 +50,8 @@ namespace mongo { namespace repl { namespace { -using UniqueLock = stdx::unique_lock<stdx::mutex>; -using LockGuard = stdx::lock_guard<stdx::mutex>; +using UniqueLock = stdx::unique_lock<Latch>; +using LockGuard = stdx::lock_guard<Latch>; /** @@ -87,7 +87,7 @@ TaskRunner::~TaskRunner() { } std::string TaskRunner::getDiagnosticString() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); str::stream output; output << "TaskRunner"; output << " scheduled tasks: " << _tasks.size(); @@ -97,14 +97,14 @@ std::string TaskRunner::getDiagnosticString() const { } bool TaskRunner::isActive() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _active; } void TaskRunner::schedule(Task task) { invariant(task); - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _tasks.push_back(std::move(task)); _condition.notify_all(); @@ -123,7 +123,7 @@ void TaskRunner::schedule(Task task) { } void TaskRunner::cancel() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _cancelRequested = true; _condition.notify_all(); } @@ -159,7 +159,7 @@ void TaskRunner::_runTasks() { // Release thread back to pool after disposing if no scheduled tasks in queue. if (nextAction == NextAction::kDisposeOperationContext || nextAction == NextAction::kInvalid) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); if (_tasks.empty()) { _finishRunTasks_inlock(); return; diff --git a/src/mongo/db/repl/task_runner.h b/src/mongo/db/repl/task_runner.h index a63a428177f..202b64d6286 100644 --- a/src/mongo/db/repl/task_runner.h +++ b/src/mongo/db/repl/task_runner.h @@ -32,9 +32,9 @@ #include <list> #include "mongo/db/service_context.h" +#include "mongo/platform/mutex.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/functional.h" -#include "mongo/stdx/mutex.h" #include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/functional.h" @@ -151,7 +151,7 @@ private: ThreadPool* _threadPool; // Protects member data of this TaskRunner. - mutable stdx::mutex _mutex; + mutable Mutex _mutex = MONGO_MAKE_LATCH("TaskRunner::_mutex"); stdx::condition_variable _condition; diff --git a/src/mongo/db/repl/task_runner_test.cpp b/src/mongo/db/repl/task_runner_test.cpp index 6953f4900ec..d71dc3c42e8 100644 --- a/src/mongo/db/repl/task_runner_test.cpp +++ b/src/mongo/db/repl/task_runner_test.cpp @@ -34,8 +34,8 @@ #include "mongo/db/operation_context_noop.h" #include "mongo/db/repl/task_runner.h" #include "mongo/db/repl/task_runner_test_fixture.h" +#include "mongo/platform/mutex.h" #include "mongo/stdx/condition_variable.h" -#include "mongo/stdx/mutex.h" #include "mongo/unittest/barrier.h" #include "mongo/util/concurrency/thread_pool.h" @@ -57,12 +57,12 @@ TEST_F(TaskRunnerTest, GetDiagnosticString) { } TEST_F(TaskRunnerTest, CallbackValues) { - stdx::mutex mutex; + auto mutex = MONGO_MAKE_LATCH(); bool called = false; OperationContext* opCtx = nullptr; Status status = getDetectableErrorStatus(); auto task = [&](OperationContext* theTxn, const Status& theStatus) { - stdx::lock_guard<stdx::mutex> lk(mutex); + stdx::lock_guard<Latch> lk(mutex); called = true; opCtx = theTxn; status = theStatus; @@ -72,7 +72,7 @@ TEST_F(TaskRunnerTest, CallbackValues) { getThreadPool().waitForIdle(); ASSERT_FALSE(getTaskRunner().isActive()); - stdx::lock_guard<stdx::mutex> lk(mutex); + stdx::lock_guard<Latch> lk(mutex); ASSERT_TRUE(called); ASSERT(opCtx); ASSERT_OK(status); @@ -84,11 +84,11 @@ OpIdVector _testRunTaskTwice(TaskRunnerTest& test, TaskRunner::NextAction nextAction, unique_function<void(Task task)> schedule) { unittest::Barrier barrier(2U); - stdx::mutex mutex; + auto mutex = MONGO_MAKE_LATCH(); std::vector<OperationContext*> txns; OpIdVector txnIds; auto task = [&](OperationContext* theTxn, const Status& theStatus) { - stdx::lock_guard<stdx::mutex> lk(mutex); + stdx::lock_guard<Latch> lk(mutex); if (txns.size() >= 2U) { return TaskRunner::NextAction::kInvalid; } @@ -111,7 +111,7 @@ OpIdVector _testRunTaskTwice(TaskRunnerTest& test, test.getThreadPool().waitForIdle(); ASSERT_FALSE(test.getTaskRunner().isActive()); - stdx::lock_guard<stdx::mutex> lk(mutex); + stdx::lock_guard<Latch> lk(mutex); ASSERT_EQUALS(2U, txns.size()); ASSERT(txns[0]); ASSERT(txns[1]); @@ -148,14 +148,14 @@ TEST_F(TaskRunnerTest, RunTaskTwiceKeepOperationContext) { } TEST_F(TaskRunnerTest, SkipSecondTask) { - stdx::mutex mutex; + auto mutex = MONGO_MAKE_LATCH(); int i = 0; OperationContext* opCtx[2] = {nullptr, nullptr}; Status status[2] = {getDetectableErrorStatus(), getDetectableErrorStatus()}; stdx::condition_variable condition; bool schedulingDone = false; auto task = [&](OperationContext* theTxn, const Status& theStatus) { - stdx::unique_lock<stdx::mutex> lk(mutex); + stdx::unique_lock<Latch> lk(mutex); int j = i++; if (j >= 2) { return TaskRunner::NextAction::kCancel; @@ -174,14 +174,14 @@ TEST_F(TaskRunnerTest, SkipSecondTask) { ASSERT_TRUE(getTaskRunner().isActive()); getTaskRunner().schedule(task); { - stdx::lock_guard<stdx::mutex> lk(mutex); + stdx::lock_guard<Latch> lk(mutex); schedulingDone = true; condition.notify_all(); } getThreadPool().waitForIdle(); ASSERT_FALSE(getTaskRunner().isActive()); - stdx::lock_guard<stdx::mutex> lk(mutex); + stdx::lock_guard<Latch> lk(mutex); ASSERT_EQUALS(2, i); ASSERT(opCtx[0]); ASSERT_OK(status[0]); @@ -190,14 +190,14 @@ TEST_F(TaskRunnerTest, SkipSecondTask) { } TEST_F(TaskRunnerTest, FirstTaskThrowsException) { - stdx::mutex mutex; + auto mutex = MONGO_MAKE_LATCH(); int i = 0; OperationContext* opCtx[2] = {nullptr, nullptr}; Status status[2] = {getDetectableErrorStatus(), getDetectableErrorStatus()}; stdx::condition_variable condition; bool schedulingDone = false; auto task = [&](OperationContext* theTxn, const Status& theStatus) { - stdx::unique_lock<stdx::mutex> lk(mutex); + stdx::unique_lock<Latch> lk(mutex); int j = i++; if (j >= 2) { return TaskRunner::NextAction::kCancel; @@ -223,14 +223,14 @@ TEST_F(TaskRunnerTest, FirstTaskThrowsException) { ASSERT_TRUE(getTaskRunner().isActive()); getTaskRunner().schedule(task); { - stdx::lock_guard<stdx::mutex> lk(mutex); + stdx::lock_guard<Latch> lk(mutex); schedulingDone = true; condition.notify_all(); } getThreadPool().waitForIdle(); ASSERT_FALSE(getTaskRunner().isActive()); - stdx::lock_guard<stdx::mutex> lk(mutex); + stdx::lock_guard<Latch> lk(mutex); ASSERT_EQUALS(2, i); ASSERT(opCtx[0]); ASSERT_OK(status[0]); @@ -239,7 +239,7 @@ TEST_F(TaskRunnerTest, FirstTaskThrowsException) { } TEST_F(TaskRunnerTest, Cancel) { - stdx::mutex mutex; + auto mutex = MONGO_MAKE_LATCH(); stdx::condition_variable condition; Status status = getDetectableErrorStatus(); bool taskRunning = false; @@ -247,7 +247,7 @@ TEST_F(TaskRunnerTest, Cancel) { // Running this task causes the task runner to wait for another task that // is never scheduled. auto task = [&](OperationContext* theTxn, const Status& theStatus) { - stdx::lock_guard<stdx::mutex> lk(mutex); + stdx::lock_guard<Latch> lk(mutex); status = theStatus; taskRunning = true; condition.notify_all(); @@ -261,7 +261,7 @@ TEST_F(TaskRunnerTest, Cancel) { getTaskRunner().schedule(task); ASSERT_TRUE(getTaskRunner().isActive()); { - stdx::unique_lock<stdx::mutex> lk(mutex); + stdx::unique_lock<Latch> lk(mutex); while (!taskRunning) { condition.wait(lk); } @@ -276,13 +276,13 @@ TEST_F(TaskRunnerTest, Cancel) { // This status will not be OK if canceling the task runner // before scheduling the task results in the task being canceled. - stdx::lock_guard<stdx::mutex> lk(mutex); + stdx::lock_guard<Latch> lk(mutex); ASSERT_OK(status); } TEST_F(TaskRunnerTest, JoinShouldWaitForTasksToComplete) { unittest::Barrier barrier(2U); - stdx::mutex mutex; + auto mutex = MONGO_MAKE_LATCH(); Status status1 = getDetectableErrorStatus(); Status status2 = getDetectableErrorStatus(); @@ -290,7 +290,7 @@ TEST_F(TaskRunnerTest, JoinShouldWaitForTasksToComplete) { // Upon completion, "task1" requests the task runner to retain the operation context. This has // effect of keeping the task runner active. auto task1 = [&](OperationContext* theTxn, const Status& theStatus) { - stdx::lock_guard<stdx::mutex> lk(mutex); + stdx::lock_guard<Latch> lk(mutex); barrier.countDownAndWait(); status1 = theStatus; return TaskRunner::NextAction::kKeepOperationContext; @@ -300,7 +300,7 @@ TEST_F(TaskRunnerTest, JoinShouldWaitForTasksToComplete) { // Upon completion, "task2" requests the task runner to dispose the operation context. After the // operation context is destroyed, the task runner will go into an inactive state. auto task2 = [&](OperationContext* theTxn, const Status& theStatus) { - stdx::lock_guard<stdx::mutex> lk(mutex); + stdx::lock_guard<Latch> lk(mutex); status2 = theStatus; return TaskRunner::NextAction::kDisposeOperationContext; }; @@ -314,13 +314,13 @@ TEST_F(TaskRunnerTest, JoinShouldWaitForTasksToComplete) { // This status should be OK because we ensured that the task // was scheduled and invoked before we called cancel(). - stdx::lock_guard<stdx::mutex> lk(mutex); + stdx::lock_guard<Latch> lk(mutex); ASSERT_OK(status1); ASSERT_OK(status2); } TEST_F(TaskRunnerTest, DestroyShouldWaitForTasksToComplete) { - stdx::mutex mutex; + auto mutex = MONGO_MAKE_LATCH(); stdx::condition_variable condition; Status status = getDetectableErrorStatus(); bool taskRunning = false; @@ -328,7 +328,7 @@ TEST_F(TaskRunnerTest, DestroyShouldWaitForTasksToComplete) { // Running this task causes the task runner to wait for another task that // is never scheduled. auto task = [&](OperationContext* theTxn, const Status& theStatus) { - stdx::lock_guard<stdx::mutex> lk(mutex); + stdx::lock_guard<Latch> lk(mutex); status = theStatus; taskRunning = true; condition.notify_all(); @@ -338,7 +338,7 @@ TEST_F(TaskRunnerTest, DestroyShouldWaitForTasksToComplete) { getTaskRunner().schedule(task); ASSERT_TRUE(getTaskRunner().isActive()); { - stdx::unique_lock<stdx::mutex> lk(mutex); + stdx::unique_lock<Latch> lk(mutex); while (!taskRunning) { condition.wait(lk); } @@ -350,7 +350,7 @@ TEST_F(TaskRunnerTest, DestroyShouldWaitForTasksToComplete) { // This status will not be OK if canceling the task runner // before scheduling the task results in the task being canceled. - stdx::lock_guard<stdx::mutex> lk(mutex); + stdx::lock_guard<Latch> lk(mutex); ASSERT_OK(status); } diff --git a/src/mongo/db/repl/topology_coordinator_v1_test.cpp b/src/mongo/db/repl/topology_coordinator_v1_test.cpp index 170bfc1587b..f6483cc3197 100644 --- a/src/mongo/db/repl/topology_coordinator_v1_test.cpp +++ b/src/mongo/db/repl/topology_coordinator_v1_test.cpp @@ -1529,7 +1529,6 @@ TEST_F(TopoCoordTest, ReplSetGetStatus) { Date_t appliedWallTime = Date_t() + Seconds(oplogProgress.getSecs()); OpTime oplogDurable(Timestamp(1, 1), 19); Date_t durableWallTime = Date_t() + Seconds(oplogDurable.getSecs()); - ; OpTime lastCommittedOpTime(Timestamp(5, 1), 20); Date_t lastCommittedWallTime = Date_t() + Seconds(lastCommittedOpTime.getSecs()); OpTime readConcernMajorityOpTime(Timestamp(4, 1), 20); |