diff options
Diffstat (limited to 'src/mongo/db/s')
49 files changed, 251 insertions, 249 deletions
diff --git a/src/mongo/db/s/active_migrations_registry.cpp b/src/mongo/db/s/active_migrations_registry.cpp index a3854cb9038..def2a02bac2 100644 --- a/src/mongo/db/s/active_migrations_registry.cpp +++ b/src/mongo/db/s/active_migrations_registry.cpp @@ -60,7 +60,7 @@ ActiveMigrationsRegistry& ActiveMigrationsRegistry::get(OperationContext* opCtx) StatusWith<ScopedDonateChunk> ActiveMigrationsRegistry::registerDonateChunk( const MoveChunkRequest& args) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); if (_activeReceiveChunkState) { return _activeReceiveChunkState->constructErrorStatus(); } @@ -80,7 +80,7 @@ StatusWith<ScopedDonateChunk> ActiveMigrationsRegistry::registerDonateChunk( StatusWith<ScopedReceiveChunk> ActiveMigrationsRegistry::registerReceiveChunk( const NamespaceString& nss, const ChunkRange& chunkRange, const ShardId& fromShardId) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); if (_activeReceiveChunkState) { return _activeReceiveChunkState->constructErrorStatus(); } @@ -95,7 +95,7 @@ StatusWith<ScopedReceiveChunk> ActiveMigrationsRegistry::registerReceiveChunk( } boost::optional<NamespaceString> ActiveMigrationsRegistry::getActiveDonateChunkNss() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); if (_activeMoveChunkState) { return _activeMoveChunkState->args.getNss(); } @@ -106,7 +106,7 @@ boost::optional<NamespaceString> ActiveMigrationsRegistry::getActiveDonateChunkN BSONObj ActiveMigrationsRegistry::getActiveMigrationStatusReport(OperationContext* opCtx) { boost::optional<NamespaceString> nss; { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); if (_activeMoveChunkState) { nss = _activeMoveChunkState->args.getNss(); @@ -132,13 +132,13 @@ BSONObj ActiveMigrationsRegistry::getActiveMigrationStatusReport(OperationContex } void ActiveMigrationsRegistry::_clearDonateChunk() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); invariant(_activeMoveChunkState); _activeMoveChunkState.reset(); } void ActiveMigrationsRegistry::_clearReceiveChunk() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); invariant(_activeReceiveChunkState); _activeReceiveChunkState.reset(); } diff --git a/src/mongo/db/s/active_migrations_registry.h b/src/mongo/db/s/active_migrations_registry.h index d2e3f4b2ad0..205060d0ab2 100644 --- a/src/mongo/db/s/active_migrations_registry.h +++ b/src/mongo/db/s/active_migrations_registry.h @@ -32,9 +32,9 @@ #include <boost/optional.hpp> #include "mongo/db/s/migration_session_id.h" +#include "mongo/platform/mutex.h" #include "mongo/s/request_types/move_chunk_request.h" #include "mongo/stdx/memory.h" -#include "mongo/stdx/mutex.h" #include "mongo/util/concurrency/notification.h" namespace mongo { @@ -152,7 +152,7 @@ private: void _clearReceiveChunk(); // Protects the state below - stdx::mutex _mutex; + Mutex _mutex = MONGO_MAKE_LATCH("ActiveMigrationsRegistry::_mutex"); // If there is an active moveChunk operation, this field contains the original request boost::optional<ActiveMoveChunkState> _activeMoveChunkState; diff --git a/src/mongo/db/s/active_move_primaries_registry.cpp b/src/mongo/db/s/active_move_primaries_registry.cpp index f71f7a63d80..827b7912506 100644 --- a/src/mongo/db/s/active_move_primaries_registry.cpp +++ b/src/mongo/db/s/active_move_primaries_registry.cpp @@ -56,7 +56,7 @@ ActiveMovePrimariesRegistry& ActiveMovePrimariesRegistry::get(OperationContext* StatusWith<ScopedMovePrimary> ActiveMovePrimariesRegistry::registerMovePrimary( const ShardMovePrimary& requestArgs) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); if (_activeMovePrimaryState) { if (_activeMovePrimaryState->requestArgs == requestArgs) { return {ScopedMovePrimary(nullptr, false, _activeMovePrimaryState->notification)}; @@ -71,7 +71,7 @@ StatusWith<ScopedMovePrimary> ActiveMovePrimariesRegistry::registerMovePrimary( } boost::optional<NamespaceString> ActiveMovePrimariesRegistry::getActiveMovePrimaryNss() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); if (_activeMovePrimaryState) { return _activeMovePrimaryState->requestArgs.get_movePrimary(); } @@ -80,7 +80,7 @@ boost::optional<NamespaceString> ActiveMovePrimariesRegistry::getActiveMovePrima } void ActiveMovePrimariesRegistry::_clearMovePrimary() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); invariant(_activeMovePrimaryState); _activeMovePrimaryState.reset(); } diff --git a/src/mongo/db/s/active_move_primaries_registry.h b/src/mongo/db/s/active_move_primaries_registry.h index 38b19a6c94f..94f55657cba 100644 --- a/src/mongo/db/s/active_move_primaries_registry.h +++ b/src/mongo/db/s/active_move_primaries_registry.h @@ -99,7 +99,7 @@ private: void _clearMovePrimary(); // Protects the state below - stdx::mutex _mutex; + Mutex _mutex = MONGO_MAKE_LATCH("ActiveMovePrimariesRegistry::_mutex"); // If there is an active movePrimary operation going on, this field contains the request that // initiated it. diff --git a/src/mongo/db/s/active_shard_collection_registry.cpp b/src/mongo/db/s/active_shard_collection_registry.cpp index 6a01fdd90ee..d2bda7ece20 100644 --- a/src/mongo/db/s/active_shard_collection_registry.cpp +++ b/src/mongo/db/s/active_shard_collection_registry.cpp @@ -91,7 +91,7 @@ ActiveShardCollectionRegistry& ActiveShardCollectionRegistry::get(OperationConte StatusWith<ScopedShardCollection> ActiveShardCollectionRegistry::registerShardCollection( const ShardsvrShardCollection& request) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); std::string nss = request.get_shardsvrShardCollection().get().ns(); auto iter = _activeShardCollectionMap.find(nss); @@ -114,7 +114,7 @@ StatusWith<ScopedShardCollection> ActiveShardCollectionRegistry::registerShardCo } void ActiveShardCollectionRegistry::_clearShardCollection(std::string nss) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); auto iter = _activeShardCollectionMap.find(nss); invariant(iter != _activeShardCollectionMap.end()); _activeShardCollectionMap.erase(nss); @@ -122,7 +122,7 @@ void ActiveShardCollectionRegistry::_clearShardCollection(std::string nss) { void ActiveShardCollectionRegistry::_setUUIDOrError(std::string nss, StatusWith<boost::optional<UUID>> swUUID) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); auto iter = _activeShardCollectionMap.find(nss); invariant(iter != _activeShardCollectionMap.end()); auto activeShardCollectionState = iter->second; diff --git a/src/mongo/db/s/active_shard_collection_registry.h b/src/mongo/db/s/active_shard_collection_registry.h index cab710b5ee5..0a8d05d52e2 100644 --- a/src/mongo/db/s/active_shard_collection_registry.h +++ b/src/mongo/db/s/active_shard_collection_registry.h @@ -31,9 +31,9 @@ #include <boost/optional.hpp> +#include "mongo/platform/mutex.h" #include "mongo/s/request_types/shard_collection_gen.h" #include "mongo/stdx/memory.h" -#include "mongo/stdx/mutex.h" #include "mongo/util/concurrency/notification.h" namespace mongo { @@ -107,7 +107,7 @@ private: void _setUUIDOrError(std::string nss, StatusWith<boost::optional<UUID>> swUUID); // Protects the state below - stdx::mutex _mutex; + Mutex _mutex = MONGO_MAKE_LATCH("ActiveShardCollectionRegistry::_mutex"); // Map containing any collections currently being sharded StringMap<std::shared_ptr<ActiveShardCollectionState>> _activeShardCollectionMap; diff --git a/src/mongo/db/s/balancer/balancer.cpp b/src/mongo/db/s/balancer/balancer.cpp index 43f924132aa..bcb2fb1d389 100644 --- a/src/mongo/db/s/balancer/balancer.cpp +++ b/src/mongo/db/s/balancer/balancer.cpp @@ -161,7 +161,7 @@ Balancer::Balancer(ServiceContext* serviceContext) Balancer::~Balancer() { // The balancer thread must have been stopped - stdx::lock_guard<stdx::mutex> scopedLock(_mutex); + stdx::lock_guard<Latch> scopedLock(_mutex); invariant(_state == kStopped); } @@ -179,7 +179,7 @@ Balancer* Balancer::get(OperationContext* operationContext) { } void Balancer::initiateBalancer(OperationContext* opCtx) { - stdx::lock_guard<stdx::mutex> scopedLock(_mutex); + stdx::lock_guard<Latch> scopedLock(_mutex); invariant(_state == kStopped); _state = kRunning; @@ -191,7 +191,7 @@ void Balancer::initiateBalancer(OperationContext* opCtx) { } void Balancer::interruptBalancer() { - stdx::lock_guard<stdx::mutex> scopedLock(_mutex); + stdx::lock_guard<Latch> scopedLock(_mutex); if (_state != kRunning) return; @@ -215,7 +215,7 @@ void Balancer::interruptBalancer() { void Balancer::waitForBalancerToStop() { { - stdx::lock_guard<stdx::mutex> scopedLock(_mutex); + stdx::lock_guard<Latch> scopedLock(_mutex); if (_state == kStopped) return; @@ -225,7 +225,7 @@ void Balancer::waitForBalancerToStop() { _thread.join(); - stdx::lock_guard<stdx::mutex> scopedLock(_mutex); + stdx::lock_guard<Latch> scopedLock(_mutex); _state = kStopped; _thread = {}; @@ -233,7 +233,7 @@ void Balancer::waitForBalancerToStop() { } void Balancer::joinCurrentRound(OperationContext* opCtx) { - stdx::unique_lock<stdx::mutex> scopedLock(_mutex); + stdx::unique_lock<Latch> scopedLock(_mutex); const auto numRoundsAtStart = _numBalancerRounds; opCtx->waitForConditionOrInterrupt(_condVar, scopedLock, [&] { return !_inBalancerRound || _numBalancerRounds != numRoundsAtStart; @@ -286,7 +286,7 @@ void Balancer::report(OperationContext* opCtx, BSONObjBuilder* builder) { const auto mode = balancerConfig->getBalancerMode(); - stdx::lock_guard<stdx::mutex> scopedLock(_mutex); + stdx::lock_guard<Latch> scopedLock(_mutex); builder->append("mode", BalancerSettingsType::kBalancerModes[mode]); builder->append("inBalancerRound", _inBalancerRound); builder->append("numBalancerRounds", _numBalancerRounds); @@ -300,7 +300,7 @@ void Balancer::_mainThread() { log() << "CSRS balancer is starting"; { - stdx::lock_guard<stdx::mutex> scopedLock(_mutex); + stdx::lock_guard<Latch> scopedLock(_mutex); _threadOperationContext = opCtx.get(); } @@ -413,7 +413,7 @@ void Balancer::_mainThread() { } { - stdx::lock_guard<stdx::mutex> scopedLock(_mutex); + stdx::lock_guard<Latch> scopedLock(_mutex); invariant(_state == kStopping); invariant(_migrationManagerInterruptThread.joinable()); } @@ -422,7 +422,7 @@ void Balancer::_mainThread() { _migrationManager.drainActiveMigrations(); { - stdx::lock_guard<stdx::mutex> scopedLock(_mutex); + stdx::lock_guard<Latch> scopedLock(_mutex); _migrationManagerInterruptThread = {}; _threadOperationContext = nullptr; } @@ -431,19 +431,19 @@ void Balancer::_mainThread() { } bool Balancer::_stopRequested() { - stdx::lock_guard<stdx::mutex> scopedLock(_mutex); + stdx::lock_guard<Latch> scopedLock(_mutex); return (_state != kRunning); } void Balancer::_beginRound(OperationContext* opCtx) { - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); _inBalancerRound = true; _condVar.notify_all(); } void Balancer::_endRound(OperationContext* opCtx, Seconds waitTimeout) { { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); _inBalancerRound = false; _numBalancerRounds++; _condVar.notify_all(); @@ -454,7 +454,7 @@ void Balancer::_endRound(OperationContext* opCtx, Seconds waitTimeout) { } void Balancer::_sleepFor(OperationContext* opCtx, Seconds waitTimeout) { - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); _condVar.wait_for(lock, waitTimeout.toSystemDuration(), [&] { return _state != kRunning; }); } @@ -663,7 +663,7 @@ void Balancer::_splitOrMarkJumbo(OperationContext* opCtx, } void Balancer::notifyPersistedBalancerSettingsChanged() { - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); _condVar.notify_all(); } diff --git a/src/mongo/db/s/balancer/balancer.h b/src/mongo/db/s/balancer/balancer.h index 2b6738def19..7f1b6cd5b75 100644 --- a/src/mongo/db/s/balancer/balancer.h +++ b/src/mongo/db/s/balancer/balancer.h @@ -32,8 +32,8 @@ #include "mongo/db/s/balancer/balancer_chunk_selection_policy.h" #include "mongo/db/s/balancer/balancer_random.h" #include "mongo/db/s/balancer/migration_manager.h" +#include "mongo/platform/mutex.h" #include "mongo/stdx/condition_variable.h" -#include "mongo/stdx/mutex.h" #include "mongo/stdx/thread.h" namespace mongo { @@ -208,7 +208,7 @@ private: const BSONObj& minKey); // Protects the state below - stdx::mutex _mutex; + Mutex _mutex = MONGO_MAKE_LATCH("Balancer::_mutex"); // Indicates the current state of the balancer State _state{kStopped}; diff --git a/src/mongo/db/s/balancer/migration_manager.cpp b/src/mongo/db/s/balancer/migration_manager.cpp index 0a988cf1b13..4af124368e4 100644 --- a/src/mongo/db/s/balancer/migration_manager.cpp +++ b/src/mongo/db/s/balancer/migration_manager.cpp @@ -210,7 +210,7 @@ Status MigrationManager::executeManualMigration( void MigrationManager::startRecoveryAndAcquireDistLocks(OperationContext* opCtx) { { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); invariant(_state == State::kStopped); invariant(_migrationRecoveryMap.empty()); _state = State::kRecovering; @@ -285,7 +285,7 @@ void MigrationManager::finishRecovery(OperationContext* opCtx, uint64_t maxChunkSizeBytes, const MigrationSecondaryThrottleOptions& secondaryThrottle) { { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); if (_state == State::kStopping) { _migrationRecoveryMap.clear(); return; @@ -367,7 +367,7 @@ void MigrationManager::finishRecovery(OperationContext* opCtx, scopedGuard.dismiss(); { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); if (_state == State::kRecovering) { _state = State::kEnabled; _condVar.notify_all(); @@ -383,7 +383,7 @@ void MigrationManager::finishRecovery(OperationContext* opCtx, void MigrationManager::interruptAndDisableMigrations() { auto executor = Grid::get(_serviceContext)->getExecutorPool()->getFixedExecutor(); - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); invariant(_state == State::kEnabled || _state == State::kRecovering); _state = State::kStopping; @@ -402,7 +402,7 @@ void MigrationManager::interruptAndDisableMigrations() { } void MigrationManager::drainActiveMigrations() { - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); if (_state == State::kStopped) return; @@ -421,7 +421,7 @@ shared_ptr<Notification<RemoteCommandResponse>> MigrationManager::_schedule( // Ensure we are not stopped in order to avoid doing the extra work { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); if (_state != State::kEnabled && _state != State::kRecovering) { return std::make_shared<Notification<RemoteCommandResponse>>( Status(ErrorCodes::BalancerInterrupted, @@ -457,7 +457,7 @@ shared_ptr<Notification<RemoteCommandResponse>> MigrationManager::_schedule( secondaryThrottle, waitForDelete); - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); if (_state != State::kEnabled && _state != State::kRecovering) { return std::make_shared<Notification<RemoteCommandResponse>>( @@ -522,7 +522,7 @@ void MigrationManager::_schedule(WithLock lock, ThreadClient tc(getThreadName(), service); auto opCtx = cc().makeOperationContext(); - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); _complete(lock, opCtx.get(), itMigration, args.response); }); @@ -573,12 +573,12 @@ void MigrationManager::_checkDrained(WithLock) { } void MigrationManager::_waitForRecovery() { - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); _condVar.wait(lock, [this] { return _state != State::kRecovering; }); } void MigrationManager::_abandonActiveMigrationsAndEnableManager(OperationContext* opCtx) { - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); if (_state == State::kStopping) { // The balancer was interrupted. Let the next balancer recover the state. return; @@ -605,7 +605,7 @@ Status MigrationManager::_processRemoteCommandResponse( const RemoteCommandResponse& remoteCommandResponse, ScopedMigrationRequest* scopedMigrationRequest) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); Status commandStatus(ErrorCodes::InternalError, "Uninitialized value."); // Check for local errors sending the remote command caused by stepdown. diff --git a/src/mongo/db/s/balancer/migration_manager.h b/src/mongo/db/s/balancer/migration_manager.h index 4f6c1288571..b321b361e79 100644 --- a/src/mongo/db/s/balancer/migration_manager.h +++ b/src/mongo/db/s/balancer/migration_manager.h @@ -38,10 +38,10 @@ #include "mongo/db/s/balancer/balancer_policy.h" #include "mongo/db/s/balancer/type_migration.h" #include "mongo/executor/task_executor.h" +#include "mongo/platform/mutex.h" #include "mongo/s/catalog/dist_lock_manager.h" #include "mongo/s/request_types/migration_secondary_throttle_options.h" #include "mongo/stdx/condition_variable.h" -#include "mongo/stdx/mutex.h" #include "mongo/stdx/unordered_map.h" #include "mongo/util/concurrency/notification.h" #include "mongo/util/concurrency/with_lock.h" @@ -260,7 +260,7 @@ private: stdx::unordered_map<NamespaceString, std::list<MigrationType>> _migrationRecoveryMap; // Protects the class state below. - stdx::mutex _mutex; + Mutex _mutex = MONGO_MAKE_LATCH("MigrationManager::_mutex"); // Always start the migration manager in a stopped state. State _state{State::kStopped}; diff --git a/src/mongo/db/s/chunk_splitter.cpp b/src/mongo/db/s/chunk_splitter.cpp index 049ab0ae261..c7dd1e22250 100644 --- a/src/mongo/db/s/chunk_splitter.cpp +++ b/src/mongo/db/s/chunk_splitter.cpp @@ -234,12 +234,12 @@ ChunkSplitter& ChunkSplitter::get(ServiceContext* serviceContext) { } void ChunkSplitter::onShardingInitialization(bool isPrimary) { - stdx::lock_guard<stdx::mutex> scopedLock(_mutex); + stdx::lock_guard<Latch> scopedLock(_mutex); _isPrimary = isPrimary; } void ChunkSplitter::onStepUp() { - stdx::lock_guard<stdx::mutex> lg(_mutex); + stdx::lock_guard<Latch> lg(_mutex); if (_isPrimary) { return; } @@ -249,7 +249,7 @@ void ChunkSplitter::onStepUp() { } void ChunkSplitter::onStepDown() { - stdx::lock_guard<stdx::mutex> lg(_mutex); + stdx::lock_guard<Latch> lg(_mutex); if (!_isPrimary) { return; } diff --git a/src/mongo/db/s/chunk_splitter.h b/src/mongo/db/s/chunk_splitter.h index ef774dc017c..a05683fc6e7 100644 --- a/src/mongo/db/s/chunk_splitter.h +++ b/src/mongo/db/s/chunk_splitter.h @@ -107,7 +107,7 @@ private: long dataWritten); // Protects the state below. - stdx::mutex _mutex; + Mutex _mutex = MONGO_MAKE_LATCH("ChunkSplitter::_mutex"); // The ChunkSplitter is only active on a primary node. bool _isPrimary{false}; diff --git a/src/mongo/db/s/collection_range_deleter.cpp b/src/mongo/db/s/collection_range_deleter.cpp index d5affc26cc0..92e8b65a4fa 100644 --- a/src/mongo/db/s/collection_range_deleter.cpp +++ b/src/mongo/db/s/collection_range_deleter.cpp @@ -134,7 +134,7 @@ boost::optional<Date_t> CollectionRangeDeleter::cleanUpNextRange( bool writeOpLog = false; { - stdx::lock_guard<stdx::mutex> scopedLock(csr->_metadataManager->_managerLock); + stdx::lock_guard<Latch> scopedLock(csr->_metadataManager->_managerLock); if (self->isEmpty()) { LOG(1) << "No further range deletions scheduled on " << nss.ns(); return boost::none; @@ -181,7 +181,7 @@ boost::optional<Date_t> CollectionRangeDeleter::cleanUpNextRange( << "ns" << nss.ns() << "epoch" << epoch << "min" << range->getMin() << "max" << range->getMax())); } catch (const DBException& e) { - stdx::lock_guard<stdx::mutex> scopedLock(csr->_metadataManager->_managerLock); + stdx::lock_guard<Latch> scopedLock(csr->_metadataManager->_managerLock); csr->_metadataManager->_clearAllCleanups( scopedLock, e.toStatus("cannot push startRangeDeletion record to Op Log," @@ -254,7 +254,7 @@ boost::optional<Date_t> CollectionRangeDeleter::cleanUpNextRange( auto* const self = forTestOnly ? forTestOnly : &metadataManager->_rangesToClean; - stdx::lock_guard<stdx::mutex> scopedLock(csr->_metadataManager->_managerLock); + stdx::lock_guard<Latch> scopedLock(csr->_metadataManager->_managerLock); if (!replicationStatus.isOK()) { LOG(0) << "Error when waiting for write concern after removing " << nss << " range " @@ -304,7 +304,7 @@ bool CollectionRangeDeleter::_checkCollectionMetadataStillValid( if (!scopedCollectionMetadata) { LOG(0) << "Abandoning any range deletions because the metadata for " << nss.ns() << " was reset"; - stdx::lock_guard<stdx::mutex> lk(metadataManager->_managerLock); + stdx::lock_guard<Latch> lk(metadataManager->_managerLock); metadataManager->_clearAllCleanups(lk); return false; } @@ -319,7 +319,7 @@ bool CollectionRangeDeleter::_checkCollectionMetadataStillValid( << nss.ns(); } - stdx::lock_guard<stdx::mutex> lk(metadataManager->_managerLock); + stdx::lock_guard<Latch> lk(metadataManager->_managerLock); metadataManager->_clearAllCleanups(lk); return false; } diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp index feb519090e3..540b87c49ef 100644 --- a/src/mongo/db/s/collection_sharding_state.cpp +++ b/src/mongo/db/s/collection_sharding_state.cpp @@ -54,7 +54,7 @@ public: : _factory(std::move(factory)) {} CollectionShardingState& getOrCreate(const NamespaceString& nss) { - stdx::lock_guard<stdx::mutex> lg(_mutex); + stdx::lock_guard<Latch> lg(_mutex); auto it = _collections.find(nss.ns()); if (it == _collections.end()) { @@ -70,7 +70,7 @@ public: BSONObjBuilder versionB(builder->subobjStart("versions")); { - stdx::lock_guard<stdx::mutex> lg(_mutex); + stdx::lock_guard<Latch> lg(_mutex); for (auto& coll : _collections) { const auto optMetadata = coll.second->getCurrentMetadataIfKnown(); @@ -89,7 +89,7 @@ private: std::unique_ptr<CollectionShardingStateFactory> _factory; - stdx::mutex _mutex; + Mutex _mutex = MONGO_MAKE_LATCH("CollectionShardingStateMap::_mutex"); CollectionsMap _collections; }; diff --git a/src/mongo/db/s/collection_sharding_state_factory_shard.cpp b/src/mongo/db/s/collection_sharding_state_factory_shard.cpp index a7f0f5f8dc5..336ca1f1761 100644 --- a/src/mongo/db/s/collection_sharding_state_factory_shard.cpp +++ b/src/mongo/db/s/collection_sharding_state_factory_shard.cpp @@ -58,7 +58,7 @@ public: private: executor::TaskExecutor* _getExecutor() { - stdx::lock_guard<stdx::mutex> lg(_mutex); + stdx::lock_guard<Latch> lg(_mutex); if (!_taskExecutor) { const std::string kExecName("CollectionRangeDeleter-TaskExecutor"); @@ -75,7 +75,7 @@ private: } // Serializes the instantiation of the task executor - stdx::mutex _mutex; + Mutex _mutex = MONGO_MAKE_LATCH("CollectionShardingStateFactoryShard::_mutex"); std::unique_ptr<executor::TaskExecutor> _taskExecutor{nullptr}; }; diff --git a/src/mongo/db/s/config/namespace_serializer.cpp b/src/mongo/db/s/config/namespace_serializer.cpp index c132fe177b2..6c69eaa668d 100644 --- a/src/mongo/db/s/config/namespace_serializer.cpp +++ b/src/mongo/db/s/config/namespace_serializer.cpp @@ -49,7 +49,7 @@ NamespaceSerializer::ScopedLock::ScopedLock(StringData ns, NamespaceSerializer& : _ns(ns.toString()), _nsSerializer(nsSerializer) {} NamespaceSerializer::ScopedLock::~ScopedLock() { - stdx::unique_lock<stdx::mutex> lock(_nsSerializer._mutex); + stdx::unique_lock<Latch> lock(_nsSerializer._mutex); auto iter = _nsSerializer._inProgressMap.find(_ns); iter->second->numWaiting--; @@ -62,7 +62,7 @@ NamespaceSerializer::ScopedLock::~ScopedLock() { } NamespaceSerializer::ScopedLock NamespaceSerializer::lock(OperationContext* opCtx, StringData nss) { - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); auto iter = _inProgressMap.find(nss); if (iter == _inProgressMap.end()) { diff --git a/src/mongo/db/s/config/namespace_serializer.h b/src/mongo/db/s/config/namespace_serializer.h index 7b7832ebbe7..aa50552e3f8 100644 --- a/src/mongo/db/s/config/namespace_serializer.h +++ b/src/mongo/db/s/config/namespace_serializer.h @@ -36,8 +36,8 @@ #include "mongo/base/status.h" #include "mongo/base/status_with.h" #include "mongo/db/namespace_string.h" +#include "mongo/platform/mutex.h" #include "mongo/stdx/condition_variable.h" -#include "mongo/stdx/mutex.h" namespace mongo { @@ -71,7 +71,7 @@ private: bool isInProgress = true; }; - stdx::mutex _mutex; + Mutex _mutex = MONGO_MAKE_LATCH("NamespaceSerializer::_mutex"); StringMap<std::shared_ptr<NSLock>> _inProgressMap; }; diff --git a/src/mongo/db/s/config/sharding_catalog_manager.cpp b/src/mongo/db/s/config/sharding_catalog_manager.cpp index 424db73a9d0..557529099ff 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager.cpp @@ -100,7 +100,7 @@ ShardingCatalogManager::~ShardingCatalogManager() { } void ShardingCatalogManager::startup() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); if (_started) { return; } @@ -114,7 +114,7 @@ void ShardingCatalogManager::startup() { void ShardingCatalogManager::shutDown() { { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _inShutdown = true; } @@ -126,7 +126,7 @@ void ShardingCatalogManager::shutDown() { Status ShardingCatalogManager::initializeConfigDatabaseIfNeeded(OperationContext* opCtx) { { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); if (_configInitialized) { return {ErrorCodes::AlreadyInitialized, "Config database was previously loaded into memory"}; @@ -146,14 +146,14 @@ Status ShardingCatalogManager::initializeConfigDatabaseIfNeeded(OperationContext return status; } - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _configInitialized = true; return Status::OK(); } void ShardingCatalogManager::discardCachedConfigDatabaseInitializationState() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _configInitialized = false; } diff --git a/src/mongo/db/s/config/sharding_catalog_manager.h b/src/mongo/db/s/config/sharding_catalog_manager.h index ef6aa80be6a..26a43e966b8 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager.h +++ b/src/mongo/db/s/config/sharding_catalog_manager.h @@ -34,13 +34,13 @@ #include "mongo/db/repl/optime_with.h" #include "mongo/db/s/config/namespace_serializer.h" #include "mongo/executor/task_executor.h" +#include "mongo/platform/mutex.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/catalog/type_database.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/s/client/shard.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/shard_key_pattern.h" -#include "mongo/stdx/mutex.h" namespace mongo { @@ -506,7 +506,7 @@ private: // (S) Self-synchronizing; access in any way from any context. // - stdx::mutex _mutex; + Mutex _mutex = MONGO_MAKE_LATCH("ShardingCatalogManager::_mutex"); // True if shutDown() has been called. False, otherwise. bool _inShutdown{false}; // (M) diff --git a/src/mongo/db/s/implicit_create_collection.cpp b/src/mongo/db/s/implicit_create_collection.cpp index 7ea8c1e1345..a0a3d6068f9 100644 --- a/src/mongo/db/s/implicit_create_collection.cpp +++ b/src/mongo/db/s/implicit_create_collection.cpp @@ -46,8 +46,8 @@ #include "mongo/s/grid.h" #include "mongo/s/request_types/create_collection_gen.h" +#include "mongo/platform/mutex.h" #include "mongo/stdx/condition_variable.h" -#include "mongo/stdx/mutex.h" #include "mongo/util/scopeguard.h" namespace mongo { @@ -73,7 +73,7 @@ public: invariant(!opCtx->lockState()->isLocked()); { - stdx::unique_lock<stdx::mutex> lg(_mutex); + stdx::unique_lock<Latch> lg(_mutex); while (_isInProgress) { auto status = opCtx->waitForConditionOrInterruptNoAssert(_cvIsInProgress, lg); if (!status.isOK()) { @@ -85,7 +85,7 @@ public: } ON_BLOCK_EXIT([&] { - stdx::lock_guard<stdx::mutex> lg(_mutex); + stdx::lock_guard<Latch> lg(_mutex); _isInProgress = false; _cvIsInProgress.notify_one(); }); @@ -128,7 +128,7 @@ public: private: const NamespaceString _ns; - stdx::mutex _mutex; + Mutex _mutex = MONGO_MAKE_LATCH("CreateCollectionSerializer::_mutex"); stdx::condition_variable _cvIsInProgress; bool _isInProgress = false; }; @@ -136,7 +136,7 @@ private: class CreateCollectionSerializerMap { public: std::shared_ptr<CreateCollectionSerializer> getForNs(const NamespaceString& ns) { - stdx::lock_guard<stdx::mutex> lg(_mutex); + stdx::lock_guard<Latch> lg(_mutex); auto iter = _inProgressMap.find(ns.ns()); if (iter == _inProgressMap.end()) { std::tie(iter, std::ignore) = @@ -147,12 +147,12 @@ public: } void cleanupNs(const NamespaceString& ns) { - stdx::lock_guard<stdx::mutex> lg(_mutex); + stdx::lock_guard<Latch> lg(_mutex); _inProgressMap.erase(ns.ns()); } private: - stdx::mutex _mutex; + Mutex _mutex = MONGO_MAKE_LATCH("CreateCollectionSerializerMap::_mutex"); std::map<std::string, std::shared_ptr<CreateCollectionSerializer>> _inProgressMap; }; diff --git a/src/mongo/db/s/metadata_manager.cpp b/src/mongo/db/s/metadata_manager.cpp index 4926fe86508..8364929ad36 100644 --- a/src/mongo/db/s/metadata_manager.cpp +++ b/src/mongo/db/s/metadata_manager.cpp @@ -186,7 +186,7 @@ public: } ~RangePreserver() { - stdx::lock_guard<stdx::mutex> managerLock(_metadataManager->_managerLock); + stdx::lock_guard<Latch> managerLock(_metadataManager->_managerLock); invariant(_metadataTracker->usageCounter != 0); if (--_metadataTracker->usageCounter == 0) { @@ -245,7 +245,7 @@ void MetadataManager::_clearAllCleanups(WithLock, Status status) { boost::optional<ScopedCollectionMetadata> MetadataManager::getActiveMetadata( std::shared_ptr<MetadataManager> self, const boost::optional<LogicalTime>& atClusterTime) { - stdx::lock_guard<stdx::mutex> lg(_managerLock); + stdx::lock_guard<Latch> lg(_managerLock); if (_metadata.empty()) { return boost::none; @@ -282,7 +282,7 @@ boost::optional<ScopedCollectionMetadata> MetadataManager::getActiveMetadata( } size_t MetadataManager::numberOfMetadataSnapshots() const { - stdx::lock_guard<stdx::mutex> lg(_managerLock); + stdx::lock_guard<Latch> lg(_managerLock); if (_metadata.empty()) return 0; @@ -290,7 +290,7 @@ size_t MetadataManager::numberOfMetadataSnapshots() const { } int MetadataManager::numberOfEmptyMetadataSnapshots() const { - stdx::lock_guard<stdx::mutex> lg(_managerLock); + stdx::lock_guard<Latch> lg(_managerLock); int emptyMetadataSnapshots = 0; for (const auto& collMetadataTracker : _metadata) { @@ -302,7 +302,7 @@ int MetadataManager::numberOfEmptyMetadataSnapshots() const { } void MetadataManager::setFilteringMetadata(CollectionMetadata remoteMetadata) { - stdx::lock_guard<stdx::mutex> lg(_managerLock); + stdx::lock_guard<Latch> lg(_managerLock); // Collection is becoming sharded if (_metadata.empty()) { @@ -365,7 +365,7 @@ void MetadataManager::setFilteringMetadata(CollectionMetadata remoteMetadata) { } void MetadataManager::clearFilteringMetadata() { - stdx::lock_guard<stdx::mutex> lg(_managerLock); + stdx::lock_guard<Latch> lg(_managerLock); _receivingChunks.clear(); _clearAllCleanups(lg); _metadata.clear(); @@ -407,7 +407,7 @@ void MetadataManager::_retireExpiredMetadata(WithLock lock) { } void MetadataManager::toBSONPending(BSONArrayBuilder& bb) const { - stdx::lock_guard<stdx::mutex> lg(_managerLock); + stdx::lock_guard<Latch> lg(_managerLock); for (auto it = _receivingChunks.begin(); it != _receivingChunks.end(); ++it) { BSONArrayBuilder pendingBB(bb.subarrayStart()); @@ -418,7 +418,7 @@ void MetadataManager::toBSONPending(BSONArrayBuilder& bb) const { } void MetadataManager::append(BSONObjBuilder* builder) const { - stdx::lock_guard<stdx::mutex> lg(_managerLock); + stdx::lock_guard<Latch> lg(_managerLock); _rangesToClean.append(builder); @@ -463,7 +463,7 @@ void MetadataManager::_pushListToClean(WithLock, std::list<Deletion> ranges) { } auto MetadataManager::beginReceive(ChunkRange const& range) -> CleanupNotification { - stdx::lock_guard<stdx::mutex> lg(_managerLock); + stdx::lock_guard<Latch> lg(_managerLock); invariant(!_metadata.empty()); if (_overlapsInUseChunk(lg, range)) { @@ -480,7 +480,7 @@ auto MetadataManager::beginReceive(ChunkRange const& range) -> CleanupNotificati } void MetadataManager::forgetReceive(ChunkRange const& range) { - stdx::lock_guard<stdx::mutex> lg(_managerLock); + stdx::lock_guard<Latch> lg(_managerLock); invariant(!_metadata.empty()); // This is potentially a partially received chunk, which needs to be cleaned up. We know none @@ -499,7 +499,7 @@ void MetadataManager::forgetReceive(ChunkRange const& range) { auto MetadataManager::cleanUpRange(ChunkRange const& range, Date_t whenToDelete) -> CleanupNotification { - stdx::lock_guard<stdx::mutex> lg(_managerLock); + stdx::lock_guard<Latch> lg(_managerLock); invariant(!_metadata.empty()); auto* const activeMetadata = _metadata.back().get(); @@ -536,7 +536,7 @@ auto MetadataManager::cleanUpRange(ChunkRange const& range, Date_t whenToDelete) } size_t MetadataManager::numberOfRangesToCleanStillInUse() const { - stdx::lock_guard<stdx::mutex> lg(_managerLock); + stdx::lock_guard<Latch> lg(_managerLock); size_t count = 0; for (auto& tracker : _metadata) { count += tracker->orphans.size(); @@ -545,13 +545,13 @@ size_t MetadataManager::numberOfRangesToCleanStillInUse() const { } size_t MetadataManager::numberOfRangesToClean() const { - stdx::lock_guard<stdx::mutex> lg(_managerLock); + stdx::lock_guard<Latch> lg(_managerLock); return _rangesToClean.size(); } auto MetadataManager::trackOrphanedDataCleanup(ChunkRange const& range) const -> boost::optional<CleanupNotification> { - stdx::lock_guard<stdx::mutex> lg(_managerLock); + stdx::lock_guard<Latch> lg(_managerLock); auto overlaps = _overlapsInUseCleanups(lg, range); if (overlaps) { return overlaps; @@ -604,7 +604,7 @@ auto MetadataManager::_overlapsInUseCleanups(WithLock, ChunkRange const& range) } boost::optional<ChunkRange> MetadataManager::getNextOrphanRange(BSONObj const& from) const { - stdx::lock_guard<stdx::mutex> lg(_managerLock); + stdx::lock_guard<Latch> lg(_managerLock); invariant(!_metadata.empty()); return _metadata.back()->metadata->getNextOrphanRange(_receivingChunks, from); } diff --git a/src/mongo/db/s/metadata_manager.h b/src/mongo/db/s/metadata_manager.h index 26cb452ac28..ecea706ce41 100644 --- a/src/mongo/db/s/metadata_manager.h +++ b/src/mongo/db/s/metadata_manager.h @@ -240,7 +240,7 @@ private: executor::TaskExecutor* const _executor; // Mutex to protect the state below - mutable stdx::mutex _managerLock; + mutable Mutex _managerLock = MONGO_MAKE_LATCH("MetadataManager::_managerLock"); // Contains a list of collection metadata for the same collection epoch, ordered in // chronological order based on the refreshes that occurred. The entry at _metadata.back() is diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index 3d5ee943804..1d9eca8dbb4 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -292,7 +292,7 @@ Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* opCtx) { // between cancellations for different migration sessions. It is thus possible that a second // migration from different donor, but the same recipient would certainly abort an already // running migration. - stdx::lock_guard<stdx::mutex> sl(_mutex); + stdx::lock_guard<Latch> sl(_mutex); _state = kCloning; return Status::OK(); @@ -321,7 +321,7 @@ Status MigrationChunkClonerSourceLegacy::awaitUntilCriticalSectionIsAppropriate( } iteration++; - stdx::lock_guard<stdx::mutex> sl(_mutex); + stdx::lock_guard<Latch> sl(_mutex); const std::size_t cloneLocsRemaining = _cloneLocs.size(); @@ -551,14 +551,14 @@ void MigrationChunkClonerSourceLegacy::_addToTransferModsQueue( const repl::OpTime& prePostImageOpTime) { switch (op) { case 'd': { - stdx::lock_guard<stdx::mutex> sl(_mutex); + stdx::lock_guard<Latch> sl(_mutex); _deleted.push_back(idObj); _memoryUsed += idObj.firstElement().size() + 5; } break; case 'i': case 'u': { - stdx::lock_guard<stdx::mutex> sl(_mutex); + stdx::lock_guard<Latch> sl(_mutex); _reload.push_back(idObj); _memoryUsed += idObj.firstElement().size() + 5; } break; @@ -574,7 +574,7 @@ void MigrationChunkClonerSourceLegacy::_addToTransferModsQueue( } bool MigrationChunkClonerSourceLegacy::_addedOperationToOutstandingOperationTrackRequests() { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); if (!_acceptingNewOperationTrackRequests) { return false; } @@ -584,7 +584,7 @@ bool MigrationChunkClonerSourceLegacy::_addedOperationToOutstandingOperationTrac } void MigrationChunkClonerSourceLegacy::_drainAllOutstandingOperationTrackRequests( - stdx::unique_lock<stdx::mutex>& lk) { + stdx::unique_lock<Latch>& lk) { invariant(_state == kDone); _acceptingNewOperationTrackRequests = false; _allOutstandingOperationTrackRequestsDrained.wait( @@ -598,7 +598,7 @@ void MigrationChunkClonerSourceLegacy::_incrementOutstandingOperationTrackReques } void MigrationChunkClonerSourceLegacy::_decrementOutstandingOperationTrackRequests() { - stdx::lock_guard<stdx::mutex> sl(_mutex); + stdx::lock_guard<Latch> sl(_mutex); --_outstandingOperationTrackRequests; if (_outstandingOperationTrackRequests == 0) { _allOutstandingOperationTrackRequestsDrained.notify_all(); @@ -606,7 +606,7 @@ void MigrationChunkClonerSourceLegacy::_decrementOutstandingOperationTrackReques } uint64_t MigrationChunkClonerSourceLegacy::getCloneBatchBufferAllocationSize() { - stdx::lock_guard<stdx::mutex> sl(_mutex); + stdx::lock_guard<Latch> sl(_mutex); return std::min(static_cast<uint64_t>(BSONObjMaxUserSize), _averageObjectSizeForCloneLocs * _cloneLocs.size()); @@ -621,7 +621,7 @@ Status MigrationChunkClonerSourceLegacy::nextCloneBatch(OperationContext* opCtx, internalQueryExecYieldIterations.load(), Milliseconds(internalQueryExecYieldPeriodMS.load())); - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); auto iter = _cloneLocs.begin(); for (; iter != _cloneLocs.end(); ++iter) { @@ -666,7 +666,7 @@ Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* opCtx, { // All clone data must have been drained before starting to fetch the incremental changes. - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); invariant(_cloneLocs.empty()); // The "snapshot" for delete and update list must be taken under a single lock. This is to @@ -685,7 +685,7 @@ Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* opCtx, builder->append("size", totalDocSize); // Put back remaining ids we didn't consume - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); _deleted.splice(_deleted.cbegin(), deleteList); _reload.splice(_reload.cbegin(), updateList); @@ -693,7 +693,7 @@ Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* opCtx, } void MigrationChunkClonerSourceLegacy::_cleanup(OperationContext* opCtx) { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); _state = kDone; _drainAllOutstandingOperationTrackRequests(lk); @@ -800,7 +800,7 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opC } if (!isLargeChunk) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _cloneLocs.insert(recordId); } @@ -829,7 +829,7 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opC << _args.getMaxKey()}; } - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _averageObjectSizeForCloneLocs = collectionAverageObjectSize + 12; return Status::OK(); diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h index 2818b8f538b..e5263466c11 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h @@ -39,11 +39,11 @@ #include "mongo/db/s/migration_chunk_cloner_source.h" #include "mongo/db/s/migration_session_id.h" #include "mongo/db/s/session_catalog_migration_source.h" +#include "mongo/platform/mutex.h" #include "mongo/s/request_types/move_chunk_request.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/memory.h" -#include "mongo/stdx/mutex.h" #include "mongo/util/net/hostandport.h" namespace mongo { @@ -285,7 +285,7 @@ private: * function. Should only be used in the cleanup for this class. Should use a lock wrapped * around this class's mutex. */ - void _drainAllOutstandingOperationTrackRequests(stdx::unique_lock<stdx::mutex>& lk); + void _drainAllOutstandingOperationTrackRequests(stdx::unique_lock<Latch>& lk); /** * Appends to the builder the list of _id of documents that were deleted during migration. @@ -325,7 +325,7 @@ private: std::unique_ptr<SessionCatalogMigrationSource> _sessionCatalogSource; // Protects the entries below - stdx::mutex _mutex; + Mutex _mutex = MONGO_MAKE_LATCH("MigrationChunkClonerSourceLegacy::_mutex"); // The current state of the cloner State _state{kNew}; diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index b526b23816b..44f7ca273bc 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -223,12 +223,12 @@ MigrationDestinationManager* MigrationDestinationManager::get(OperationContext* } MigrationDestinationManager::State MigrationDestinationManager::getState() const { - stdx::lock_guard<stdx::mutex> sl(_mutex); + stdx::lock_guard<Latch> sl(_mutex); return _state; } void MigrationDestinationManager::setState(State newState) { - stdx::lock_guard<stdx::mutex> sl(_mutex); + stdx::lock_guard<Latch> sl(_mutex); _state = newState; _stateChangedCV.notify_all(); } @@ -236,7 +236,7 @@ void MigrationDestinationManager::setState(State newState) { void MigrationDestinationManager::_setStateFail(StringData msg) { log() << msg; { - stdx::lock_guard<stdx::mutex> sl(_mutex); + stdx::lock_guard<Latch> sl(_mutex); _errmsg = msg.toString(); _state = FAIL; _stateChangedCV.notify_all(); @@ -248,7 +248,7 @@ void MigrationDestinationManager::_setStateFail(StringData msg) { void MigrationDestinationManager::_setStateFailWarn(StringData msg) { warning() << msg; { - stdx::lock_guard<stdx::mutex> sl(_mutex); + stdx::lock_guard<Latch> sl(_mutex); _errmsg = msg.toString(); _state = FAIL; _stateChangedCV.notify_all(); @@ -258,7 +258,7 @@ void MigrationDestinationManager::_setStateFailWarn(StringData msg) { } bool MigrationDestinationManager::isActive() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _isActive(lk); } @@ -270,7 +270,7 @@ void MigrationDestinationManager::report(BSONObjBuilder& b, OperationContext* opCtx, bool waitForSteadyOrDone) { if (waitForSteadyOrDone) { - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); try { opCtx->waitForConditionOrInterruptFor(_stateChangedCV, lock, Seconds(1), [&]() -> bool { return _state != READY && _state != CLONE && _state != CATCHUP; @@ -281,7 +281,7 @@ void MigrationDestinationManager::report(BSONObjBuilder& b, } b.append("waited", true); } - stdx::lock_guard<stdx::mutex> sl(_mutex); + stdx::lock_guard<Latch> sl(_mutex); b.appendBool("active", _sessionId.is_initialized()); @@ -312,7 +312,7 @@ void MigrationDestinationManager::report(BSONObjBuilder& b, } BSONObj MigrationDestinationManager::getMigrationStatusReport() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); if (_isActive(lk)) { return migrationutil::makeMigrationStatusDocument( _nss, _fromShard, _toShard, false, _min, _max); @@ -327,7 +327,7 @@ Status MigrationDestinationManager::start(OperationContext* opCtx, const StartChunkCloneRequest cloneRequest, const OID& epoch, const WriteConcernOptions& writeConcern) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); invariant(!_sessionId); invariant(!_scopedReceiveChunk); @@ -435,7 +435,7 @@ repl::OpTime MigrationDestinationManager::cloneDocumentsFromDonor( } Status MigrationDestinationManager::abort(const MigrationSessionId& sessionId) { - stdx::lock_guard<stdx::mutex> sl(_mutex); + stdx::lock_guard<Latch> sl(_mutex); if (!_sessionId) { return Status::OK(); @@ -456,7 +456,7 @@ Status MigrationDestinationManager::abort(const MigrationSessionId& sessionId) { } void MigrationDestinationManager::abortWithoutSessionIdCheck() { - stdx::lock_guard<stdx::mutex> sl(_mutex); + stdx::lock_guard<Latch> sl(_mutex); _state = ABORT; _stateChangedCV.notify_all(); _errmsg = "aborted without session id check"; @@ -464,7 +464,7 @@ void MigrationDestinationManager::abortWithoutSessionIdCheck() { Status MigrationDestinationManager::startCommit(const MigrationSessionId& sessionId) { - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); if (_state != STEADY) { return {ErrorCodes::CommandFailed, @@ -710,7 +710,7 @@ void MigrationDestinationManager::_migrateThread() { _forgetPending(opCtx.get(), ChunkRange(_min, _max)); } - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _sessionId.reset(); _scopedReceiveChunk.reset(); _isActiveCV.notify_all(); @@ -822,7 +822,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx) { } { - stdx::lock_guard<stdx::mutex> statsLock(_mutex); + stdx::lock_guard<Latch> statsLock(_mutex); _numCloned += batchNumCloned; ShardingStatistics::get(opCtx).countDocsClonedOnRecipient.addAndFetch( batchNumCloned); diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h index afdc5c2f125..1833a024dcf 100644 --- a/src/mongo/db/s/migration_destination_manager.h +++ b/src/mongo/db/s/migration_destination_manager.h @@ -41,9 +41,9 @@ #include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/migration_session_id.h" #include "mongo/db/s/session_catalog_migration_destination.h" +#include "mongo/platform/mutex.h" #include "mongo/s/shard_id.h" #include "mongo/stdx/condition_variable.h" -#include "mongo/stdx/mutex.h" #include "mongo/stdx/thread.h" #include "mongo/util/concurrency/with_lock.h" #include "mongo/util/timer.h" @@ -178,7 +178,7 @@ private: bool _isActive(WithLock) const; // Mutex to guard all fields - mutable stdx::mutex _mutex; + mutable Mutex _mutex = MONGO_MAKE_LATCH("MigrationDestinationManager::_mutex"); // Migration session ID uniquely identifies the migration and indicates whether the prepare // method has been called. diff --git a/src/mongo/db/s/namespace_metadata_change_notifications.cpp b/src/mongo/db/s/namespace_metadata_change_notifications.cpp index 6a288834ce7..ecf63039105 100644 --- a/src/mongo/db/s/namespace_metadata_change_notifications.cpp +++ b/src/mongo/db/s/namespace_metadata_change_notifications.cpp @@ -36,7 +36,7 @@ namespace mongo { NamespaceMetadataChangeNotifications::NamespaceMetadataChangeNotifications() = default; NamespaceMetadataChangeNotifications::~NamespaceMetadataChangeNotifications() { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); invariant(_notificationsList.empty()); } @@ -44,7 +44,7 @@ NamespaceMetadataChangeNotifications::ScopedNotification NamespaceMetadataChangeNotifications::createNotification(const NamespaceString& nss) { auto notifToken = std::make_shared<NotificationToken>(nss); - stdx::lock_guard<stdx::mutex> lg(_mutex); + stdx::lock_guard<Latch> lg(_mutex); auto& notifList = _notificationsList[nss]; notifToken->itToErase = notifList.insert(notifList.end(), notifToken); @@ -53,7 +53,7 @@ NamespaceMetadataChangeNotifications::createNotification(const NamespaceString& } void NamespaceMetadataChangeNotifications::notifyChange(const NamespaceString& nss) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); auto mapIt = _notificationsList.find(nss); if (mapIt == _notificationsList.end()) { @@ -70,7 +70,7 @@ void NamespaceMetadataChangeNotifications::notifyChange(const NamespaceString& n void NamespaceMetadataChangeNotifications::_unregisterNotificationToken( std::shared_ptr<NotificationToken> token) { - stdx::lock_guard<stdx::mutex> lg(_mutex); + stdx::lock_guard<Latch> lg(_mutex); if (!token->itToErase) { return; diff --git a/src/mongo/db/s/namespace_metadata_change_notifications.h b/src/mongo/db/s/namespace_metadata_change_notifications.h index ba7c51e86a0..12df62bfb95 100644 --- a/src/mongo/db/s/namespace_metadata_change_notifications.h +++ b/src/mongo/db/s/namespace_metadata_change_notifications.h @@ -33,7 +33,7 @@ #include <map> #include "mongo/db/namespace_string.h" -#include "mongo/stdx/mutex.h" +#include "mongo/platform/mutex.h" #include "mongo/util/concurrency/notification.h" namespace mongo { @@ -114,7 +114,7 @@ private: void _unregisterNotificationToken(std::shared_ptr<NotificationToken> token); - stdx::mutex _mutex; + Mutex _mutex = MONGO_MAKE_LATCH("NamespaceMetadataChangeNotifications::_mutex"); std::map<NamespaceString, NotificationsList> _notificationsList; }; diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp index 9c5576c21be..7c4ab7d50a9 100644 --- a/src/mongo/db/s/session_catalog_migration_destination.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination.cpp @@ -334,7 +334,7 @@ SessionCatalogMigrationDestination::~SessionCatalogMigrationDestination() { void SessionCatalogMigrationDestination::start(ServiceContext* service) { { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); invariant(_state == State::NotStarted); _state = State::Migrating; _isStateChanged.notify_all(); @@ -358,7 +358,7 @@ void SessionCatalogMigrationDestination::start(ServiceContext* service) { } void SessionCatalogMigrationDestination::finish() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); if (_state != State::ErrorOccurred) { _state = State::Committing; _isStateChanged.notify_all(); @@ -393,7 +393,7 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service while (true) { { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); if (_state == State::ErrorOccurred) { return; } @@ -411,7 +411,7 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service if (oplogArray.isEmpty()) { { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); if (_state == State::Committing) { // The migration is considered done only when it gets an empty result from // the source shard while this is in state committing. This is to make sure @@ -432,7 +432,7 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service // We depleted the buffer at least once, transition to ready for commit. { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); // Note: only transition to "ready to commit" if state is not error/force stop. if (_state == State::Migrating) { _state = State::ReadyToCommit; @@ -473,19 +473,19 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service waitForWriteConcern(uniqueOpCtx.get(), lastResult.oplogTime, kMajorityWC, &unusedWCResult)); { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _state = State::Done; _isStateChanged.notify_all(); } } std::string SessionCatalogMigrationDestination::getErrMsg() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _errMsg; } void SessionCatalogMigrationDestination::_errorOccurred(StringData errMsg) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _state = State::ErrorOccurred; _errMsg = errMsg.toString(); @@ -493,7 +493,7 @@ void SessionCatalogMigrationDestination::_errorOccurred(StringData errMsg) { } SessionCatalogMigrationDestination::State SessionCatalogMigrationDestination::getState() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _state; } diff --git a/src/mongo/db/s/session_catalog_migration_destination.h b/src/mongo/db/s/session_catalog_migration_destination.h index 89c43be2e62..b5a85fd6998 100644 --- a/src/mongo/db/s/session_catalog_migration_destination.h +++ b/src/mongo/db/s/session_catalog_migration_destination.h @@ -36,9 +36,9 @@ #include "mongo/bson/bsonobj.h" #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/s/migration_session_id.h" +#include "mongo/platform/mutex.h" #include "mongo/s/shard_id.h" #include "mongo/stdx/condition_variable.h" -#include "mongo/stdx/mutex.h" #include "mongo/stdx/thread.h" #include "mongo/util/concurrency/with_lock.h" @@ -116,7 +116,7 @@ private: stdx::thread _thread; // Protects _state and _errMsg. - stdx::mutex _mutex; + Mutex _mutex = MONGO_MAKE_LATCH("SessionCatalogMigrationDestination::_mutex"); stdx::condition_variable _isStateChanged; State _state = State::NotStarted; std::string _errMsg; // valid only if _state == ErrorOccurred. diff --git a/src/mongo/db/s/session_catalog_migration_source.cpp b/src/mongo/db/s/session_catalog_migration_source.cpp index 86f8a8a6cf6..30bf462209c 100644 --- a/src/mongo/db/s/session_catalog_migration_source.cpp +++ b/src/mongo/db/s/session_catalog_migration_source.cpp @@ -180,12 +180,12 @@ bool SessionCatalogMigrationSource::hasMoreOplog() { return true; } - stdx::lock_guard<stdx::mutex> lk(_newOplogMutex); + stdx::lock_guard<Latch> lk(_newOplogMutex); return _hasNewWrites(lk); } void SessionCatalogMigrationSource::onCommitCloneStarted() { - stdx::lock_guard<stdx::mutex> _lk(_newOplogMutex); + stdx::lock_guard<Latch> _lk(_newOplogMutex); _state = State::kCommitStarted; if (_newOplogNotification) { @@ -195,7 +195,7 @@ void SessionCatalogMigrationSource::onCommitCloneStarted() { } void SessionCatalogMigrationSource::onCloneCleanup() { - stdx::lock_guard<stdx::mutex> _lk(_newOplogMutex); + stdx::lock_guard<Latch> _lk(_newOplogMutex); _state = State::kCleanup; if (_newOplogNotification) { @@ -206,14 +206,14 @@ void SessionCatalogMigrationSource::onCloneCleanup() { SessionCatalogMigrationSource::OplogResult SessionCatalogMigrationSource::getLastFetchedOplog() { { - stdx::lock_guard<stdx::mutex> _lk(_sessionCloneMutex); + stdx::lock_guard<Latch> _lk(_sessionCloneMutex); if (_lastFetchedOplog) { return OplogResult(_lastFetchedOplog, false); } } { - stdx::lock_guard<stdx::mutex> _lk(_newOplogMutex); + stdx::lock_guard<Latch> _lk(_newOplogMutex); return OplogResult(_lastFetchedNewWriteOplog, true); } } @@ -229,7 +229,7 @@ bool SessionCatalogMigrationSource::fetchNextOplog(OperationContext* opCtx) { std::shared_ptr<Notification<bool>> SessionCatalogMigrationSource::getNotificationForNewOplog() { invariant(!_hasMoreOplogFromSessionCatalog()); - stdx::lock_guard<stdx::mutex> lk(_newOplogMutex); + stdx::lock_guard<Latch> lk(_newOplogMutex); if (_newOplogNotification) { return _newOplogNotification; @@ -292,13 +292,13 @@ bool SessionCatalogMigrationSource::_handleWriteHistory(WithLock, OperationConte } bool SessionCatalogMigrationSource::_hasMoreOplogFromSessionCatalog() { - stdx::lock_guard<stdx::mutex> _lk(_sessionCloneMutex); + stdx::lock_guard<Latch> _lk(_sessionCloneMutex); return _lastFetchedOplog || !_lastFetchedOplogBuffer.empty() || !_sessionOplogIterators.empty() || _currentOplogIterator; } bool SessionCatalogMigrationSource::_fetchNextOplogFromSessionCatalog(OperationContext* opCtx) { - stdx::unique_lock<stdx::mutex> lk(_sessionCloneMutex); + stdx::unique_lock<Latch> lk(_sessionCloneMutex); if (!_lastFetchedOplogBuffer.empty()) { _lastFetchedOplog = _lastFetchedOplogBuffer.back(); @@ -333,7 +333,7 @@ bool SessionCatalogMigrationSource::_fetchNextNewWriteOplog(OperationContext* op EntryAtOpTimeType entryAtOpTimeType; { - stdx::lock_guard<stdx::mutex> lk(_newOplogMutex); + stdx::lock_guard<Latch> lk(_newOplogMutex); if (_newWriteOpTimeList.empty()) { _lastFetchedNewWriteOplog.reset(); @@ -368,7 +368,7 @@ bool SessionCatalogMigrationSource::_fetchNextNewWriteOplog(OperationContext* op } { - stdx::lock_guard<stdx::mutex> lk(_newOplogMutex); + stdx::lock_guard<Latch> lk(_newOplogMutex); _lastFetchedNewWriteOplog = newWriteOplogEntry; _newWriteOpTimeList.pop_front(); } @@ -378,7 +378,7 @@ bool SessionCatalogMigrationSource::_fetchNextNewWriteOplog(OperationContext* op void SessionCatalogMigrationSource::notifyNewWriteOpTime(repl::OpTime opTime, EntryAtOpTimeType entryAtOpTimeType) { - stdx::lock_guard<stdx::mutex> lk(_newOplogMutex); + stdx::lock_guard<Latch> lk(_newOplogMutex); _newWriteOpTimeList.emplace_back(opTime, entryAtOpTimeType); if (_newOplogNotification) { diff --git a/src/mongo/db/s/session_catalog_migration_source.h b/src/mongo/db/s/session_catalog_migration_source.h index 06093d4c8e8..df0d9d80259 100644 --- a/src/mongo/db/s/session_catalog_migration_source.h +++ b/src/mongo/db/s/session_catalog_migration_source.h @@ -37,9 +37,9 @@ #include "mongo/db/repl/optime.h" #include "mongo/db/session_txn_record_gen.h" #include "mongo/db/transaction_history_iterator.h" +#include "mongo/platform/mutex.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/shard_key_pattern.h" -#include "mongo/stdx/mutex.h" #include "mongo/util/concurrency/notification.h" #include "mongo/util/concurrency/with_lock.h" @@ -231,7 +231,8 @@ private: // Protects _sessionCatalogCursor, _sessionOplogIterators, _currentOplogIterator, // _lastFetchedOplogBuffer, _lastFetchedOplog - stdx::mutex _sessionCloneMutex; + Mutex _sessionCloneMutex = + MONGO_MAKE_LATCH("SessionCatalogMigrationSource::_sessionCloneMutex"); // List of remaining session records that needs to be cloned. std::vector<std::unique_ptr<SessionOplogIterator>> _sessionOplogIterators; @@ -248,7 +249,7 @@ private: boost::optional<repl::OplogEntry> _lastFetchedOplog; // Protects _newWriteTsList, _lastFetchedNewWriteOplog, _state, _newOplogNotification - stdx::mutex _newOplogMutex; + Mutex _newOplogMutex = MONGO_MAKE_LATCH("SessionCatalogMigrationSource::_newOplogMutex"); // Stores oplog opTime of new writes that are coming in. diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp index dec279a747f..2cbbb83480c 100644 --- a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp +++ b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp @@ -341,7 +341,7 @@ ShardServerCatalogCacheLoader::~ShardServerCatalogCacheLoader() { // Prevent further scheduling, then interrupt ongoing tasks. _threadPool.shutdown(); { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); _contexts.interrupt(ErrorCodes::InterruptedAtShutdown); ++_term; } @@ -355,7 +355,7 @@ void ShardServerCatalogCacheLoader::notifyOfCollectionVersionUpdate(const Namesp } void ShardServerCatalogCacheLoader::initializeReplicaSetRole(bool isPrimary) { - stdx::lock_guard<stdx::mutex> lg(_mutex); + stdx::lock_guard<Latch> lg(_mutex); invariant(_role == ReplicaSetRole::None); if (isPrimary) { @@ -366,7 +366,7 @@ void ShardServerCatalogCacheLoader::initializeReplicaSetRole(bool isPrimary) { } void ShardServerCatalogCacheLoader::onStepDown() { - stdx::lock_guard<stdx::mutex> lg(_mutex); + stdx::lock_guard<Latch> lg(_mutex); invariant(_role != ReplicaSetRole::None); _contexts.interrupt(ErrorCodes::PrimarySteppedDown); ++_term; @@ -374,7 +374,7 @@ void ShardServerCatalogCacheLoader::onStepDown() { } void ShardServerCatalogCacheLoader::onStepUp() { - stdx::lock_guard<stdx::mutex> lg(_mutex); + stdx::lock_guard<Latch> lg(_mutex); invariant(_role != ReplicaSetRole::None); ++_term; _role = ReplicaSetRole::Primary; @@ -387,7 +387,7 @@ std::shared_ptr<Notification<void>> ShardServerCatalogCacheLoader::getChunksSinc bool isPrimary; long long term; std::tie(isPrimary, term) = [&] { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return std::make_tuple(_role == ReplicaSetRole::Primary, _term); }(); @@ -403,7 +403,7 @@ std::shared_ptr<Notification<void>> ShardServerCatalogCacheLoader::getChunksSinc // We may have missed an OperationContextGroup interrupt since this operation // began but before the OperationContext was added to the group. So we'll check // that we're still in the same _term. - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); uassert(ErrorCodes::InterruptedDueToReplStateChange, "Unable to refresh routing table because replica set state changed or " "the node is shutting down.", @@ -430,7 +430,7 @@ void ShardServerCatalogCacheLoader::getDatabase( bool isPrimary; long long term; std::tie(isPrimary, term) = [&] { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return std::make_tuple(_role == ReplicaSetRole::Primary, _term); }(); @@ -446,7 +446,7 @@ void ShardServerCatalogCacheLoader::getDatabase( // We may have missed an OperationContextGroup interrupt since this operation began // but before the OperationContext was added to the group. So we'll check that we're // still in the same _term. - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); uassert(ErrorCodes::InterruptedDueToReplStateChange, "Unable to refresh database because replica set state changed or the node " "is shutting down.", @@ -466,7 +466,7 @@ void ShardServerCatalogCacheLoader::getDatabase( void ShardServerCatalogCacheLoader::waitForCollectionFlush(OperationContext* opCtx, const NamespaceString& nss) { - stdx::unique_lock<stdx::mutex> lg(_mutex); + stdx::unique_lock<Latch> lg(_mutex); const auto initialTerm = _term; boost::optional<uint64_t> taskNumToWait; @@ -517,7 +517,7 @@ void ShardServerCatalogCacheLoader::waitForCollectionFlush(OperationContext* opC void ShardServerCatalogCacheLoader::waitForDatabaseFlush(OperationContext* opCtx, StringData dbName) { - stdx::unique_lock<stdx::mutex> lg(_mutex); + stdx::unique_lock<Latch> lg(_mutex); const auto initialTerm = _term; boost::optional<uint64_t> taskNumToWait; @@ -599,7 +599,7 @@ void ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince( // Get the max version the loader has. const ChunkVersion maxLoaderVersion = [&] { { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); auto taskListIt = _collAndChunkTaskLists.find(nss); if (taskListIt != _collAndChunkTaskLists.end() && @@ -670,7 +670,7 @@ void ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince( } const auto termAfterRefresh = [&] { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _term; }(); @@ -827,7 +827,7 @@ std::pair<bool, CollectionAndChangedChunks> ShardServerCatalogCacheLoader::_getE const NamespaceString& nss, const ChunkVersion& catalogCacheSinceVersion, const long long term) { - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); auto taskListIt = _collAndChunkTaskLists.find(nss); if (taskListIt == _collAndChunkTaskLists.end()) { @@ -862,7 +862,7 @@ void ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleCollAndChun OperationContext* opCtx, const NamespaceString& nss, collAndChunkTask task) { { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); auto& list = _collAndChunkTaskLists[nss]; auto wasEmpty = list.empty(); @@ -884,7 +884,7 @@ void ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleDbTask(Oper DBTask task) { { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); auto& list = _dbTaskLists[dbName.toString()]; auto wasEmpty = list.empty(); @@ -918,7 +918,7 @@ void ShardServerCatalogCacheLoader::_runCollAndChunksTasks(const NamespaceString } { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); // If task completed successfully, remove it from work queue if (taskFinished) { @@ -940,7 +940,7 @@ void ShardServerCatalogCacheLoader::_runCollAndChunksTasks(const NamespaceString << " caller to refresh this namespace."; { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); _collAndChunkTaskLists.erase(nss); } return; @@ -967,7 +967,7 @@ void ShardServerCatalogCacheLoader::_runDbTasks(StringData dbName) { } { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); // If task completed successfully, remove it from work queue if (taskFinished) { @@ -989,7 +989,7 @@ void ShardServerCatalogCacheLoader::_runDbTasks(StringData dbName) { << " caller to refresh this namespace."; { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); _dbTaskLists.erase(name); } return; @@ -1002,7 +1002,7 @@ void ShardServerCatalogCacheLoader::_runDbTasks(StringData dbName) { void ShardServerCatalogCacheLoader::_updatePersistedCollAndChunksMetadata( OperationContext* opCtx, const NamespaceString& nss) { - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); const collAndChunkTask& task = _collAndChunkTaskLists[nss].front(); invariant(task.dropped || !task.collectionAndChangedChunks->changedChunks.empty()); @@ -1038,7 +1038,7 @@ void ShardServerCatalogCacheLoader::_updatePersistedCollAndChunksMetadata( void ShardServerCatalogCacheLoader::_updatePersistedDbMetadata(OperationContext* opCtx, StringData dbName) { - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); const DBTask& task = _dbTaskLists[dbName.toString()].front(); @@ -1203,7 +1203,7 @@ void ShardServerCatalogCacheLoader::DbTaskList::pop_front() { } void ShardServerCatalogCacheLoader::CollAndChunkTaskList::waitForActiveTaskCompletion( - stdx::unique_lock<stdx::mutex>& lg) { + stdx::unique_lock<Latch>& lg) { // Increase the use_count of the condition variable shared pointer, because the entire task list // might get deleted during the unlocked interval auto condVar = _activeTaskCompletedCondVar; @@ -1211,7 +1211,7 @@ void ShardServerCatalogCacheLoader::CollAndChunkTaskList::waitForActiveTaskCompl } void ShardServerCatalogCacheLoader::DbTaskList::waitForActiveTaskCompletion( - stdx::unique_lock<stdx::mutex>& lg) { + stdx::unique_lock<Latch>& lg) { // Increase the use_count of the condition variable shared pointer, because the entire task list // might get deleted during the unlocked interval auto condVar = _activeTaskCompletedCondVar; diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.h b/src/mongo/db/s/shard_server_catalog_cache_loader.h index 9e998415793..86b1739254b 100644 --- a/src/mongo/db/s/shard_server_catalog_cache_loader.h +++ b/src/mongo/db/s/shard_server_catalog_cache_loader.h @@ -202,7 +202,7 @@ private: * same task object on which it was called because it might have been deleted during the * unlocked period. */ - void waitForActiveTaskCompletion(stdx::unique_lock<stdx::mutex>& lg); + void waitForActiveTaskCompletion(stdx::unique_lock<Latch>& lg); /** * Checks whether 'term' matches the term of the latest task in the task list. This is @@ -312,7 +312,7 @@ private: * same task object on which it was called because it might have been deleted during the * unlocked period. */ - void waitForActiveTaskCompletion(stdx::unique_lock<stdx::mutex>& lg); + void waitForActiveTaskCompletion(stdx::unique_lock<Latch>& lg); /** * Checks whether 'term' matches the term of the latest task in the task list. This is @@ -482,7 +482,7 @@ private: NamespaceMetadataChangeNotifications _namespaceNotifications; // Protects the class state below - stdx::mutex _mutex; + Mutex _mutex = MONGO_MAKE_LATCH("ShardServerCatalogCacheLoader::_mutex"); // This value is bumped every time the set of currently scheduled tasks should no longer be // running. This includes, replica set state transitions and shutdown. diff --git a/src/mongo/db/s/sharding_initialization_mongod.cpp b/src/mongo/db/s/sharding_initialization_mongod.cpp index 0abb64e96cc..b3f38be0881 100644 --- a/src/mongo/db/s/sharding_initialization_mongod.cpp +++ b/src/mongo/db/s/sharding_initialization_mongod.cpp @@ -308,7 +308,7 @@ void ShardingInitializationMongoD::initializeFromShardIdentity( auto const shardingState = ShardingState::get(opCtx); auto const shardRegistry = Grid::get(opCtx)->shardRegistry(); - stdx::unique_lock<stdx::mutex> ul(_initSynchronizationMutex); + stdx::unique_lock<Latch> ul(_initSynchronizationMutex); if (shardingState->enabled()) { uassert(40371, "", shardingState->shardId() == shardIdentity.getShardName()); diff --git a/src/mongo/db/s/sharding_initialization_mongod.h b/src/mongo/db/s/sharding_initialization_mongod.h index a205d68d1b2..496a6e072fe 100644 --- a/src/mongo/db/s/sharding_initialization_mongod.h +++ b/src/mongo/db/s/sharding_initialization_mongod.h @@ -113,7 +113,8 @@ public: private: // This mutex ensures that only one thread at a time executes the sharding // initialization/teardown sequence - stdx::mutex _initSynchronizationMutex; + Mutex _initSynchronizationMutex = + MONGO_MAKE_LATCH("ShardingInitializationMongod::_initSynchronizationMutex"); // Function for initializing the sharding environment components (i.e. everything on the Grid) ShardingEnvironmentInitFunc _initFunc; diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp index b9c7e634a53..37e5f8930fa 100644 --- a/src/mongo/db/s/sharding_state.cpp +++ b/src/mongo/db/s/sharding_state.cpp @@ -57,7 +57,7 @@ ShardingState* ShardingState::get(OperationContext* operationContext) { } void ShardingState::setInitialized(ShardId shardId, OID clusterId) { - stdx::unique_lock<stdx::mutex> ul(_mutex); + stdx::unique_lock<Latch> ul(_mutex); invariant(_getInitializationState() == InitializationState::kNew); _shardId = std::move(shardId); @@ -71,7 +71,7 @@ void ShardingState::setInitialized(Status failedStatus) { invariant(!failedStatus.isOK()); log() << "Failed to initialize sharding components" << causedBy(failedStatus); - stdx::unique_lock<stdx::mutex> ul(_mutex); + stdx::unique_lock<Latch> ul(_mutex); invariant(_getInitializationState() == InitializationState::kNew); _initializationStatus = std::move(failedStatus); @@ -79,7 +79,7 @@ void ShardingState::setInitialized(Status failedStatus) { } boost::optional<Status> ShardingState::initializationStatus() { - stdx::unique_lock<stdx::mutex> ul(_mutex); + stdx::unique_lock<Latch> ul(_mutex); if (_getInitializationState() == InitializationState::kNew) return boost::none; @@ -105,13 +105,13 @@ Status ShardingState::canAcceptShardedCommands() const { ShardId ShardingState::shardId() { invariant(enabled()); - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _shardId; } OID ShardingState::clusterId() { invariant(enabled()); - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _clusterId; } diff --git a/src/mongo/db/s/sharding_state.h b/src/mongo/db/s/sharding_state.h index 4b78d0bdfb4..ab3430fb5ec 100644 --- a/src/mongo/db/s/sharding_state.h +++ b/src/mongo/db/s/sharding_state.h @@ -32,8 +32,8 @@ #include <string> #include "mongo/bson/oid.h" +#include "mongo/platform/mutex.h" #include "mongo/s/shard_id.h" -#include "mongo/stdx/mutex.h" namespace mongo { @@ -136,7 +136,7 @@ private: } // Protects state below - stdx::mutex _mutex; + Mutex _mutex = MONGO_MAKE_LATCH("ShardingState::_mutex"); // State of the initialization of the sharding state along with any potential errors AtomicWord<unsigned> _initializationState{static_cast<uint32_t>(InitializationState::kNew)}; diff --git a/src/mongo/db/s/transaction_coordinator.cpp b/src/mongo/db/s/transaction_coordinator.cpp index e45bb27a0f4..3e5d531618e 100644 --- a/src/mongo/db/s/transaction_coordinator.cpp +++ b/src/mongo/db/s/transaction_coordinator.cpp @@ -140,7 +140,7 @@ TransactionCoordinator::TransactionCoordinator(OperationContext* operationContex // _participantsDurable (optional) // Output: _participantsDurable = true { - stdx::lock_guard<stdx::mutex> lg(_mutex); + stdx::lock_guard<Latch> lg(_mutex); invariant(_participants); _step = Step::kWritingParticipantList; @@ -167,7 +167,7 @@ TransactionCoordinator::TransactionCoordinator(OperationContext* operationContex .thenRunOn(Grid::get(_serviceContext)->getExecutorPool()->getFixedExecutor()) .then([this] { { - stdx::lock_guard<stdx::mutex> lg(_mutex); + stdx::lock_guard<Latch> lg(_mutex); _participantsDurable = true; } @@ -178,7 +178,7 @@ TransactionCoordinator::TransactionCoordinator(OperationContext* operationContex // _decision (optional) // Output: _decision is set { - stdx::lock_guard<stdx::mutex> lg(_mutex); + stdx::lock_guard<Latch> lg(_mutex); invariant(_participantsDurable); _step = Step::kWaitingForVotes; @@ -196,7 +196,7 @@ TransactionCoordinator::TransactionCoordinator(OperationContext* operationContex _serviceContext, *_sendPrepareScheduler, _lsid, _txnNumber, *_participants) .then([this](PrepareVoteConsensus consensus) mutable { { - stdx::lock_guard<stdx::mutex> lg(_mutex); + stdx::lock_guard<Latch> lg(_mutex); _decision = consensus.decision(); } @@ -219,7 +219,7 @@ TransactionCoordinator::TransactionCoordinator(OperationContext* operationContex // _decisionDurable (optional) // Output: _decisionDurable = true { - stdx::lock_guard<stdx::mutex> lg(_mutex); + stdx::lock_guard<Latch> lg(_mutex); invariant(_decision); _step = Step::kWritingDecision; @@ -243,7 +243,7 @@ TransactionCoordinator::TransactionCoordinator(OperationContext* operationContex }) .then([this] { { - stdx::lock_guard<stdx::mutex> lg(_mutex); + stdx::lock_guard<Latch> lg(_mutex); _decisionDurable = true; } @@ -251,7 +251,7 @@ TransactionCoordinator::TransactionCoordinator(OperationContext* operationContex // Input: _decisionDurable // Output: (none) { - stdx::lock_guard<stdx::mutex> lg(_mutex); + stdx::lock_guard<Latch> lg(_mutex); invariant(_decisionDurable); _step = Step::kWaitingForDecisionAcks; @@ -292,7 +292,7 @@ TransactionCoordinator::TransactionCoordinator(OperationContext* operationContex // Do a best-effort attempt (i.e., writeConcern w:1) to delete the coordinator's durable // state. { - stdx::lock_guard<stdx::mutex> lg(_mutex); + stdx::lock_guard<Latch> lg(_mutex); _step = Step::kDeletingCoordinatorDoc; @@ -354,7 +354,7 @@ SharedSemiFuture<CommitDecision> TransactionCoordinator::getDecision() const { } Future<void> TransactionCoordinator::onCompletion() { - stdx::lock_guard<stdx::mutex> lg(_mutex); + stdx::lock_guard<Latch> lg(_mutex); if (_completionPromisesFired) return Future<void>::makeReady(); @@ -373,7 +373,7 @@ void TransactionCoordinator::cancelIfCommitNotYetStarted() { } bool TransactionCoordinator::_reserveKickOffCommitPromise() { - stdx::lock_guard<stdx::mutex> lg(_mutex); + stdx::lock_guard<Latch> lg(_mutex); if (_kickOffCommitPromiseSet) return false; @@ -394,7 +394,7 @@ void TransactionCoordinator::_done(Status status) { LOG(3) << txn::txnIdToString(_lsid, _txnNumber) << " Two-phase commit completed with " << redact(status); - stdx::unique_lock<stdx::mutex> ul(_mutex); + stdx::unique_lock<Latch> ul(_mutex); const auto tickSource = _serviceContext->getTickSource(); @@ -507,7 +507,7 @@ std::string TransactionCoordinator::_twoPhaseCommitInfoForLog( } TransactionCoordinator::Step TransactionCoordinator::getStep() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _step; } @@ -516,7 +516,7 @@ void TransactionCoordinator::reportState(BSONObjBuilder& parent) const { TickSource* tickSource = _serviceContext->getTickSource(); TickSource::Tick currentTick = tickSource->getTicks(); - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); BSONObjBuilder lsidBuilder(doc.subobjStart("lsid")); _lsid.serialize(&lsidBuilder); @@ -563,7 +563,7 @@ std::string TransactionCoordinator::toString(Step step) const { } void TransactionCoordinator::_updateAssociatedClient(Client* client) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _transactionCoordinatorMetricsObserver->updateLastClientInfo(client); } diff --git a/src/mongo/db/s/transaction_coordinator.h b/src/mongo/db/s/transaction_coordinator.h index e52cdb12fa3..ebc055bd575 100644 --- a/src/mongo/db/s/transaction_coordinator.h +++ b/src/mongo/db/s/transaction_coordinator.h @@ -166,7 +166,7 @@ private: std::unique_ptr<txn::AsyncWorkScheduler> _sendPrepareScheduler; // Protects the state below - mutable stdx::mutex _mutex; + mutable Mutex _mutex = MONGO_MAKE_LATCH("TransactionCoordinator::_mutex"); // Tracks which step of the 2PC coordination is currently (or was most recently) executing Step _step{Step::kInactive}; diff --git a/src/mongo/db/s/transaction_coordinator_catalog.cpp b/src/mongo/db/s/transaction_coordinator_catalog.cpp index 87e2252435e..2ff5ca5be39 100644 --- a/src/mongo/db/s/transaction_coordinator_catalog.cpp +++ b/src/mongo/db/s/transaction_coordinator_catalog.cpp @@ -51,14 +51,14 @@ void TransactionCoordinatorCatalog::exitStepUp(Status status) { << causedBy(status); } - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); invariant(!_stepUpCompletionStatus); _stepUpCompletionStatus = std::move(status); _stepUpCompleteCV.notify_all(); } void TransactionCoordinatorCatalog::onStepDown() { - stdx::unique_lock<stdx::mutex> ul(_mutex); + stdx::unique_lock<Latch> ul(_mutex); std::vector<std::shared_ptr<TransactionCoordinator>> coordinatorsToCancel; for (auto&& [sessionId, coordinatorsForSession] : _coordinatorsBySession) { @@ -82,7 +82,7 @@ void TransactionCoordinatorCatalog::insert(OperationContext* opCtx, LOG(3) << "Inserting coordinator " << lsid.getId() << ':' << txnNumber << " into in-memory catalog"; - stdx::unique_lock<stdx::mutex> ul(_mutex); + stdx::unique_lock<Latch> ul(_mutex); if (!forStepUp) { _waitForStepUpToComplete(ul, opCtx); } @@ -110,7 +110,7 @@ void TransactionCoordinatorCatalog::insert(OperationContext* opCtx, std::shared_ptr<TransactionCoordinator> TransactionCoordinatorCatalog::get( OperationContext* opCtx, const LogicalSessionId& lsid, TxnNumber txnNumber) { - stdx::unique_lock<stdx::mutex> ul(_mutex); + stdx::unique_lock<Latch> ul(_mutex); _waitForStepUpToComplete(ul, opCtx); std::shared_ptr<TransactionCoordinator> coordinatorToReturn; @@ -130,7 +130,7 @@ std::shared_ptr<TransactionCoordinator> TransactionCoordinatorCatalog::get( boost::optional<std::pair<TxnNumber, std::shared_ptr<TransactionCoordinator>>> TransactionCoordinatorCatalog::getLatestOnSession(OperationContext* opCtx, const LogicalSessionId& lsid) { - stdx::unique_lock<stdx::mutex> ul(_mutex); + stdx::unique_lock<Latch> ul(_mutex); _waitForStepUpToComplete(ul, opCtx); const auto& coordinatorsForSessionIter = _coordinatorsBySession.find(lsid); @@ -153,7 +153,7 @@ void TransactionCoordinatorCatalog::_remove(const LogicalSessionId& lsid, TxnNum LOG(3) << "Removing coordinator " << lsid.getId() << ':' << txnNumber << " from in-memory catalog"; - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); const auto& coordinatorsForSessionIter = _coordinatorsBySession.find(lsid); @@ -178,7 +178,7 @@ void TransactionCoordinatorCatalog::_remove(const LogicalSessionId& lsid, TxnNum } void TransactionCoordinatorCatalog::join() { - stdx::unique_lock<stdx::mutex> ul(_mutex); + stdx::unique_lock<Latch> ul(_mutex); while (!_noActiveCoordinatorsCV.wait_for( ul, stdx::chrono::seconds{5}, [this] { return _coordinatorsBySession.empty(); })) { @@ -189,11 +189,11 @@ void TransactionCoordinatorCatalog::join() { } std::string TransactionCoordinatorCatalog::toString() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _toString(lk); } -void TransactionCoordinatorCatalog::_waitForStepUpToComplete(stdx::unique_lock<stdx::mutex>& lk, +void TransactionCoordinatorCatalog::_waitForStepUpToComplete(stdx::unique_lock<Latch>& lk, OperationContext* opCtx) { invariant(lk.owns_lock()); opCtx->waitForConditionOrInterrupt( @@ -216,7 +216,7 @@ std::string TransactionCoordinatorCatalog::_toString(WithLock wl) const { } void TransactionCoordinatorCatalog::filter(FilterPredicate predicate, FilterVisitor visitor) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); for (auto sessionIt = _coordinatorsBySession.begin(); sessionIt != _coordinatorsBySession.end(); ++sessionIt) { auto& lsid = sessionIt->first; diff --git a/src/mongo/db/s/transaction_coordinator_catalog.h b/src/mongo/db/s/transaction_coordinator_catalog.h index 5768c69bb3c..057c5dfb575 100644 --- a/src/mongo/db/s/transaction_coordinator_catalog.h +++ b/src/mongo/db/s/transaction_coordinator_catalog.h @@ -125,7 +125,7 @@ private: * Blocks in an interruptible wait until the catalog is not marked as having a stepup in * progress. */ - void _waitForStepUpToComplete(stdx::unique_lock<stdx::mutex>& lk, OperationContext* opCtx); + void _waitForStepUpToComplete(stdx::unique_lock<Latch>& lk, OperationContext* opCtx); /** * Removes the coordinator with the given session id and transaction number from the catalog, if @@ -142,7 +142,7 @@ private: std::string _toString(WithLock wl) const; // Protects the state below. - mutable stdx::mutex _mutex; + mutable Mutex _mutex = MONGO_MAKE_LATCH("TransactionCoordinatorCatalog::_mutex"); // Contains TransactionCoordinator objects by session id and transaction number. May contain // more than one coordinator per session. All coordinators for a session that do not correspond diff --git a/src/mongo/db/s/transaction_coordinator_futures_util.cpp b/src/mongo/db/s/transaction_coordinator_futures_util.cpp index 9c0d51b07b4..627b4fd2aab 100644 --- a/src/mongo/db/s/transaction_coordinator_futures_util.cpp +++ b/src/mongo/db/s/transaction_coordinator_futures_util.cpp @@ -60,14 +60,14 @@ AsyncWorkScheduler::AsyncWorkScheduler(ServiceContext* serviceContext) AsyncWorkScheduler::~AsyncWorkScheduler() { { - stdx::lock_guard<stdx::mutex> lg(_mutex); + stdx::lock_guard<Latch> lg(_mutex); invariant(_quiesced(lg)); } if (!_parent) return; - stdx::lock_guard<stdx::mutex> lg(_parent->_mutex); + stdx::lock_guard<Latch> lg(_parent->_mutex); _parent->_childSchedulers.erase(_itToRemove); _parent->_notifyAllTasksComplete(lg); _parent = nullptr; @@ -129,7 +129,7 @@ Future<executor::TaskExecutor::ResponseStatus> AsyncWorkScheduler::scheduleRemot auto pf = makePromiseFuture<ResponseStatus>(); - stdx::unique_lock<stdx::mutex> ul(_mutex); + stdx::unique_lock<Latch> ul(_mutex); uassertStatusOK(_shutdownStatus); auto scheduledCommandHandle = @@ -157,7 +157,7 @@ Future<executor::TaskExecutor::ResponseStatus> AsyncWorkScheduler::scheduleRemot } else { promise->setError([&] { if (status == ErrorCodes::CallbackCanceled) { - stdx::unique_lock<stdx::mutex> ul(_mutex); + stdx::unique_lock<Latch> ul(_mutex); return _shutdownStatus.isOK() ? status : _shutdownStatus; } return status; @@ -172,7 +172,7 @@ Future<executor::TaskExecutor::ResponseStatus> AsyncWorkScheduler::scheduleRemot return std::move(pf.future).tapAll( [this, it = std::move(it)](StatusWith<ResponseStatus> s) { - stdx::lock_guard<stdx::mutex> lg(_mutex); + stdx::lock_guard<Latch> lg(_mutex); _activeHandles.erase(it); _notifyAllTasksComplete(lg); }); @@ -182,7 +182,7 @@ Future<executor::TaskExecutor::ResponseStatus> AsyncWorkScheduler::scheduleRemot std::unique_ptr<AsyncWorkScheduler> AsyncWorkScheduler::makeChildScheduler() { auto child = stdx::make_unique<AsyncWorkScheduler>(_serviceContext); - stdx::lock_guard<stdx::mutex> lg(_mutex); + stdx::lock_guard<Latch> lg(_mutex); if (!_shutdownStatus.isOK()) child->shutdown(_shutdownStatus); @@ -195,7 +195,7 @@ std::unique_ptr<AsyncWorkScheduler> AsyncWorkScheduler::makeChildScheduler() { void AsyncWorkScheduler::shutdown(Status status) { invariant(!status.isOK()); - stdx::lock_guard<stdx::mutex> lg(_mutex); + stdx::lock_guard<Latch> lg(_mutex); if (!_shutdownStatus.isOK()) return; @@ -216,7 +216,7 @@ void AsyncWorkScheduler::shutdown(Status status) { } void AsyncWorkScheduler::join() { - stdx::unique_lock<stdx::mutex> ul(_mutex); + stdx::unique_lock<Latch> ul(_mutex); _allListsEmptyCV.wait(ul, [&] { return _activeOpContexts.empty() && _activeHandles.empty() && _childSchedulers.empty(); }); diff --git a/src/mongo/db/s/transaction_coordinator_futures_util.h b/src/mongo/db/s/transaction_coordinator_futures_util.h index eb769319aad..a1f25c84744 100644 --- a/src/mongo/db/s/transaction_coordinator_futures_util.h +++ b/src/mongo/db/s/transaction_coordinator_futures_util.h @@ -78,7 +78,7 @@ public: auto pf = makePromiseFuture<ReturnType>(); auto taskCompletionPromise = std::make_shared<Promise<ReturnType>>(std::move(pf.promise)); try { - stdx::unique_lock<stdx::mutex> ul(_mutex); + stdx::unique_lock<Latch> ul(_mutex); uassertStatusOK(_shutdownStatus); auto scheduledWorkHandle = uassertStatusOK(_executor->scheduleWorkAt( @@ -119,7 +119,7 @@ public: return std::move(pf.future).tapAll( [this, it = std::move(it)](StatusOrStatusWith<ReturnType> s) { - stdx::lock_guard<stdx::mutex> lg(_mutex); + stdx::lock_guard<Latch> lg(_mutex); _activeHandles.erase(it); _notifyAllTasksComplete(lg); }); @@ -210,7 +210,7 @@ private: ChildIteratorsList::iterator _itToRemove; // Mutex to protect the shared state below - stdx::mutex _mutex; + Mutex _mutex = MONGO_MAKE_LATCH("AsyncWorkScheduler::_mutex"); // If shutdown() is called, this contains the first status that was passed to it and is an // indication that no more operations can be scheduled @@ -294,7 +294,7 @@ Future<GlobalResult> collect(std::vector<Future<IndividualResult>>&& futures, * The first few fields have fixed values. * ******************************************************/ // Protects all state in the SharedBlock. - stdx::mutex mutex; + Mutex mutex = MONGO_MAKE_LATCH("SharedBlock::mutex"); // If any response returns an error prior to a response setting shouldStopIteration to // ShouldStopIteration::kYes, the promise will be set with that error rather than the global @@ -332,7 +332,7 @@ Future<GlobalResult> collect(std::vector<Future<IndividualResult>>&& futures, for (auto&& localFut : futures) { std::move(localFut) .then([sharedBlock](IndividualResult res) { - stdx::unique_lock<stdx::mutex> lk(sharedBlock->mutex); + stdx::unique_lock<Latch> lk(sharedBlock->mutex); if (sharedBlock->shouldStopIteration == ShouldStopIteration::kNo && sharedBlock->status.isOK()) { sharedBlock->shouldStopIteration = @@ -340,14 +340,14 @@ Future<GlobalResult> collect(std::vector<Future<IndividualResult>>&& futures, } }) .onError([sharedBlock](Status s) { - stdx::unique_lock<stdx::mutex> lk(sharedBlock->mutex); + stdx::unique_lock<Latch> lk(sharedBlock->mutex); if (sharedBlock->shouldStopIteration == ShouldStopIteration::kNo && sharedBlock->status.isOK()) { sharedBlock->status = s; } }) .getAsync([sharedBlock](Status s) { - stdx::unique_lock<stdx::mutex> lk(sharedBlock->mutex); + stdx::unique_lock<Latch> lk(sharedBlock->mutex); sharedBlock->numOutstandingResponses--; if (sharedBlock->numOutstandingResponses == 0) { // Unlock before emplacing the result in case any continuations do expensive diff --git a/src/mongo/db/s/transaction_coordinator_service.cpp b/src/mongo/db/s/transaction_coordinator_service.cpp index 969bfd4338d..f033e14c14a 100644 --- a/src/mongo/db/s/transaction_coordinator_service.cpp +++ b/src/mongo/db/s/transaction_coordinator_service.cpp @@ -173,7 +173,7 @@ void TransactionCoordinatorService::onStepUp(OperationContext* opCtx, Milliseconds recoveryDelayForTesting) { joinPreviousRound(); - stdx::lock_guard<stdx::mutex> lg(_mutex); + stdx::lock_guard<Latch> lg(_mutex); invariant(!_catalogAndScheduler); _catalogAndScheduler = std::make_shared<CatalogAndScheduler>(opCtx->getServiceContext()); @@ -236,7 +236,7 @@ void TransactionCoordinatorService::onStepUp(OperationContext* opCtx, void TransactionCoordinatorService::onStepDown() { { - stdx::lock_guard<stdx::mutex> lg(_mutex); + stdx::lock_guard<Latch> lg(_mutex); if (!_catalogAndScheduler) return; @@ -251,7 +251,7 @@ void TransactionCoordinatorService::onShardingInitialization(OperationContext* o if (!isPrimary) return; - stdx::lock_guard<stdx::mutex> lg(_mutex); + stdx::lock_guard<Latch> lg(_mutex); invariant(!_catalogAndScheduler); _catalogAndScheduler = std::make_shared<CatalogAndScheduler>(opCtx->getServiceContext()); @@ -262,7 +262,7 @@ void TransactionCoordinatorService::onShardingInitialization(OperationContext* o std::shared_ptr<TransactionCoordinatorService::CatalogAndScheduler> TransactionCoordinatorService::_getCatalogAndScheduler(OperationContext* opCtx) { - stdx::unique_lock<stdx::mutex> ul(_mutex); + stdx::unique_lock<Latch> ul(_mutex); uassert( ErrorCodes::NotMaster, "Transaction coordinator is not a primary", _catalogAndScheduler); diff --git a/src/mongo/db/s/transaction_coordinator_service.h b/src/mongo/db/s/transaction_coordinator_service.h index 89031214474..aa1a8bfdd12 100644 --- a/src/mongo/db/s/transaction_coordinator_service.h +++ b/src/mongo/db/s/transaction_coordinator_service.h @@ -146,7 +146,7 @@ private: std::shared_ptr<CatalogAndScheduler> _catalogAndSchedulerToCleanup; // Protects the state below - mutable stdx::mutex _mutex; + mutable Mutex _mutex = MONGO_MAKE_LATCH("TransactionCoordinatorService::_mutex"); // The catalog + scheduler instantiated at the last step-up attempt. When nullptr, it means // onStepUp has not been called yet after the last stepDown (or construction). diff --git a/src/mongo/db/s/wait_for_majority_service.cpp b/src/mongo/db/s/wait_for_majority_service.cpp index 0625a84b611..f41ed83c630 100644 --- a/src/mongo/db/s/wait_for_majority_service.cpp +++ b/src/mongo/db/s/wait_for_majority_service.cpp @@ -141,7 +141,7 @@ SharedSemiFuture<void> WaitForMajorityService::waitUntilMajority(const repl::OpT void WaitForMajorityService::_periodicallyWaitForMajority(ServiceContext* service) { ThreadClient tc("waitForMajority", service); - stdx::unique_lock lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); while (!_inShutDown) { auto opCtx = tc->makeOperationContext(); diff --git a/src/mongo/db/s/wait_for_majority_service.h b/src/mongo/db/s/wait_for_majority_service.h index 970b475d0d3..90ec771bd40 100644 --- a/src/mongo/db/s/wait_for_majority_service.h +++ b/src/mongo/db/s/wait_for_majority_service.h @@ -36,7 +36,7 @@ #include "mongo/db/repl/optime.h" #include "mongo/db/service_context.h" #include "mongo/executor/task_executor.h" -#include "mongo/stdx/mutex.h" +#include "mongo/platform/mutex.h" #include "mongo/stdx/thread.h" #include "mongo/util/future.h" @@ -74,7 +74,7 @@ private: */ void _periodicallyWaitForMajority(ServiceContext* service); - stdx::mutex _mutex; + Mutex _mutex = MONGO_MAKE_LATCH("WaitForMaorityService::_mutex"); // Contains an ordered list of opTimes to wait to be majority comitted. OpTimeWaitingMap _queuedOpTimes; diff --git a/src/mongo/db/s/wait_for_majority_service_test.cpp b/src/mongo/db/s/wait_for_majority_service_test.cpp index d904d253af1..ca89ac04c8b 100644 --- a/src/mongo/db/s/wait_for_majority_service_test.cpp +++ b/src/mongo/db/s/wait_for_majority_service_test.cpp @@ -32,7 +32,7 @@ #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/s/wait_for_majority_service.h" #include "mongo/db/service_context_d_test_fixture.h" -#include "mongo/stdx/mutex.h" +#include "mongo/platform/mutex.h" #include "mongo/unittest/unittest.h" namespace mongo { @@ -64,7 +64,7 @@ public: } void finishWaitingOneOpTime() { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); _isTestReady = true; _isTestReadyCV.notify_one(); @@ -74,7 +74,7 @@ public: } Status waitForWriteConcernStub(OperationContext* opCtx, const repl::OpTime& opTime) { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); _waitForMajorityCallCount++; _callCountChangedCV.notify_one(); @@ -97,7 +97,7 @@ public: } const repl::OpTime& getLastOpTimeWaited() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _lastOpTimeWaited; } @@ -109,7 +109,7 @@ public: private: WaitForMajorityService _waitForMajorityService; - stdx::mutex _mutex; + Mutex _mutex = MONGO_MAKE_LATCH("WaitForMajorityServiceTest::_mutex"); stdx::condition_variable _isTestReadyCV; stdx::condition_variable _finishWaitingOneOpTimeCV; stdx::condition_variable _callCountChangedCV; |