diff options
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_impl.cpp')
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 237 |
1 files changed, 116 insertions, 121 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index ba1739049fb..8587b3a56a0 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -82,9 +82,9 @@ #include "mongo/db/write_concern_options.h" #include "mongo/executor/connection_pool_stats.h" #include "mongo/executor/network_interface.h" +#include "mongo/platform/mutex.h" #include "mongo/rpc/metadata/oplog_query_metadata.h" #include "mongo/rpc/metadata/repl_set_metadata.h" -#include "mongo/stdx/mutex.h" #include "mongo/util/assert_util.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" @@ -154,7 +154,7 @@ private: const bool _initialState; }; -void lockAndCall(stdx::unique_lock<stdx::mutex>* lk, const std::function<void()>& fn) { +void lockAndCall(stdx::unique_lock<Latch>* lk, const std::function<void()>& fn) { if (!lk->owns_lock()) { lk->lock(); } @@ -233,7 +233,7 @@ public: * _list is guarded by ReplicationCoordinatorImpl::_mutex, thus it is illegal to construct one * of these without holding _mutex */ - WaiterGuard(const stdx::unique_lock<stdx::mutex>& lock, WaiterList* list, Waiter* waiter) + WaiterGuard(const stdx::unique_lock<Latch>& lock, WaiterList* list, Waiter* waiter) : _lock(lock), _list(list), _waiter(waiter) { invariant(_lock.owns_lock()); list->add_inlock(_waiter); @@ -245,7 +245,7 @@ public: } private: - const stdx::unique_lock<stdx::mutex>& _lock; + const stdx::unique_lock<Latch>& _lock; WaiterList* _list; Waiter* _waiter; }; @@ -374,7 +374,7 @@ void ReplicationCoordinatorImpl::waitForStartUpComplete_forTest() { void ReplicationCoordinatorImpl::_waitForStartUpComplete() { CallbackHandle handle; { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); while (_rsConfigState == kConfigPreStart || _rsConfigState == kConfigStartingUp) { _rsConfigStateChange.wait(lk); } @@ -386,12 +386,12 @@ void ReplicationCoordinatorImpl::_waitForStartUpComplete() { } ReplSetConfig ReplicationCoordinatorImpl::getReplicaSetConfig_forTest() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _rsConfig; } Date_t ReplicationCoordinatorImpl::getElectionTimeout_forTest() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); if (!_handleElectionTimeoutCbh.isValid()) { return Date_t(); } @@ -399,12 +399,12 @@ Date_t ReplicationCoordinatorImpl::getElectionTimeout_forTest() const { } Milliseconds ReplicationCoordinatorImpl::getRandomizedElectionOffset_forTest() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _getRandomizedElectionOffset_inlock(); } boost::optional<Date_t> ReplicationCoordinatorImpl::getPriorityTakeover_forTest() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); if (!_priorityTakeoverCbh.isValid()) { return boost::none; } @@ -412,7 +412,7 @@ boost::optional<Date_t> ReplicationCoordinatorImpl::getPriorityTakeover_forTest( } boost::optional<Date_t> ReplicationCoordinatorImpl::getCatchupTakeover_forTest() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); if (!_catchupTakeoverCbh.isValid()) { return boost::none; } @@ -425,12 +425,12 @@ executor::TaskExecutor::CallbackHandle ReplicationCoordinatorImpl::getCatchupTak } OpTime ReplicationCoordinatorImpl::getCurrentCommittedSnapshotOpTime() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _getCurrentCommittedSnapshotOpTime_inlock(); } OpTimeAndWallTime ReplicationCoordinatorImpl::getCurrentCommittedSnapshotOpTimeAndWallTime() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _getCurrentCommittedSnapshotOpTimeAndWallTime_inlock(); } @@ -481,7 +481,7 @@ bool ReplicationCoordinatorImpl::_startLoadLocalConfig(OperationContext* opCtx) log() << "Did not find local initialized voted for document at startup."; } { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _topCoord->loadLastVote(lastVote.getValue()); } @@ -542,7 +542,7 @@ bool ReplicationCoordinatorImpl::_startLoadLocalConfig(OperationContext* opCtx) handle = CallbackHandle{}; } fassert(40446, handle); - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _finishLoadLocalConfigCbh = std::move(handle.getValue()); return false; @@ -644,7 +644,7 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig( // applied optime is never greater than the latest cluster time in the logical clock. _externalState->setGlobalTimestamp(getServiceContext(), lastOpTime.getTimestamp()); - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); invariant(_rsConfigState == kConfigStartingUp); const PostMemberStateUpdateAction action = _setCurrentRSConfig(lock, opCtx.get(), localConfig, myIndex.getValue()); @@ -661,7 +661,7 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig( } { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); // Step down is impossible, so we don't need to wait for the returned event. _updateTerm_inlock(term); } @@ -677,7 +677,7 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig( void ReplicationCoordinatorImpl::_stopDataReplication(OperationContext* opCtx) { std::shared_ptr<InitialSyncer> initialSyncerCopy; { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _initialSyncer.swap(initialSyncerCopy); } if (initialSyncerCopy) { @@ -719,7 +719,7 @@ void ReplicationCoordinatorImpl::_startDataReplication(OperationContext* opCtx, auto onCompletion = [this, startCompleted](const StatusWith<OpTimeAndWallTime>& opTimeStatus) { { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); if (opTimeStatus == ErrorCodes::CallbackCanceled) { log() << "Initial Sync has been cancelled: " << opTimeStatus.getStatus(); return; @@ -760,11 +760,7 @@ void ReplicationCoordinatorImpl::_startDataReplication(OperationContext* opCtx, try { { // Must take the lock to set _initialSyncer, but not call it. - stdx::lock_guard<stdx::mutex> lock(_mutex); - if (_inShutdown) { - log() << "Initial Sync not starting because replication is shutting down."; - return; - } + stdx::lock_guard<Latch> lock(_mutex); initialSyncerCopy = std::make_shared<InitialSyncer>( createInitialSyncerOptions(this, _externalState.get()), std::make_unique<DataReplicatorExternalStateInitialSync>(this, @@ -817,7 +813,7 @@ void ReplicationCoordinatorImpl::startup(OperationContext* opCtx) { storageGlobalParams.readOnly = true; } - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _setConfigState_inlock(kConfigReplicationDisabled); return; } @@ -828,7 +824,7 @@ void ReplicationCoordinatorImpl::startup(OperationContext* opCtx) { _storage->initializeStorageControlsForReplication(opCtx->getServiceContext()); { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); fassert(18822, !_inShutdown); _setConfigState_inlock(kConfigStartingUp); _topCoord->setStorageEngineSupportsReadCommitted( @@ -844,7 +840,7 @@ void ReplicationCoordinatorImpl::startup(OperationContext* opCtx) { if (doneLoadingConfig) { // If we're not done loading the config, then the config state will be set by // _finishLoadLocalConfig. - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); invariant(!_rsConfig.isInitialized()); _setConfigState_inlock(kConfigUninitialized); } @@ -870,7 +866,7 @@ void ReplicationCoordinatorImpl::shutdown(OperationContext* opCtx) { // Used to shut down outside of the lock. std::shared_ptr<InitialSyncer> initialSyncerCopy; { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); fassert(28533, !_inShutdown); _inShutdown = true; if (_rsConfigState == kConfigPreStart) { @@ -918,12 +914,12 @@ ReplicationCoordinator::Mode ReplicationCoordinatorImpl::getReplicationMode() co } MemberState ReplicationCoordinatorImpl::getMemberState() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _getMemberState_inlock(); } std::vector<MemberData> ReplicationCoordinatorImpl::getMemberData() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _topCoord->getMemberData(); } @@ -937,7 +933,7 @@ Status ReplicationCoordinatorImpl::waitForMemberState(MemberState expectedState, return Status(ErrorCodes::BadValue, "Timeout duration cannot be negative"); } - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); auto pred = [this, expectedState]() { return _memberState == expectedState; }; if (!_memberStateChange.wait_for(lk, timeout.toSystemDuration(), pred)) { return Status(ErrorCodes::ExceededTimeLimit, @@ -949,7 +945,7 @@ Status ReplicationCoordinatorImpl::waitForMemberState(MemberState expectedState, } Seconds ReplicationCoordinatorImpl::getSlaveDelaySecs() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); invariant(_rsConfig.isInitialized()); if (_selfIndex == -1) { // We aren't currently in the set. Return 0 seconds so we can clear out the applier's @@ -960,7 +956,7 @@ Seconds ReplicationCoordinatorImpl::getSlaveDelaySecs() const { } void ReplicationCoordinatorImpl::clearSyncSourceBlacklist() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _topCoord->clearSyncSourceBlacklist(); } @@ -977,7 +973,7 @@ Status ReplicationCoordinatorImpl::setFollowerMode(const MemberState& newState) Status ReplicationCoordinatorImpl::_setFollowerMode(OperationContext* opCtx, const MemberState& newState) { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); if (newState == _topCoord->getMemberState()) { return Status::OK(); } @@ -1008,7 +1004,7 @@ Status ReplicationCoordinatorImpl::_setFollowerMode(OperationContext* opCtx, } ReplicationCoordinator::ApplierState ReplicationCoordinatorImpl::getApplierState() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _applierState; } @@ -1040,7 +1036,7 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* opCtx, // When we go to drop all temp collections, we must replicate the drops. invariant(opCtx->writesAreReplicated()); - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); if (_applierState != ApplierState::Draining) { return; } @@ -1101,7 +1097,7 @@ Status ReplicationCoordinatorImpl::waitForDrainFinish(Milliseconds timeout) { return Status(ErrorCodes::BadValue, "Timeout duration cannot be negative"); } - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); auto pred = [this]() { return _applierState != ApplierState::Draining; }; if (!_drainFinishedCond.wait_for(lk, timeout.toSystemDuration(), pred)) { return Status(ErrorCodes::ExceededTimeLimit, @@ -1116,7 +1112,7 @@ void ReplicationCoordinatorImpl::signalUpstreamUpdater() { } void ReplicationCoordinatorImpl::setMyHeartbeatMessage(const std::string& msg) { - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); _topCoord->setMyHeartbeatMessage(_replExecutor->now(), msg); } @@ -1127,7 +1123,7 @@ void ReplicationCoordinatorImpl::setMyLastAppliedOpTimeAndWallTimeForward( const auto opTime = opTimeAndWallTime.opTime; _externalState->setGlobalTimestamp(getServiceContext(), opTime.getTimestamp()); - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); auto myLastAppliedOpTime = _getMyLastAppliedOpTime_inlock(); if (opTime > myLastAppliedOpTime) { _setMyLastAppliedOpTimeAndWallTime(lock, opTimeAndWallTime, false, consistency); @@ -1153,7 +1149,7 @@ void ReplicationCoordinatorImpl::setMyLastAppliedOpTimeAndWallTimeForward( void ReplicationCoordinatorImpl::setMyLastDurableOpTimeAndWallTimeForward( const OpTimeAndWallTime& opTimeAndWallTime) { - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); if (opTimeAndWallTime.opTime > _getMyLastDurableOpTime_inlock()) { _setMyLastDurableOpTimeAndWallTime(lock, opTimeAndWallTime, false); _reportUpstream_inlock(std::move(lock)); @@ -1167,7 +1163,7 @@ void ReplicationCoordinatorImpl::setMyLastAppliedOpTimeAndWallTime( // applied optime is never greater than the latest cluster time in the logical clock. _externalState->setGlobalTimestamp(getServiceContext(), opTime.getTimestamp()); - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); // The optime passed to this function is required to represent a consistent database state. _setMyLastAppliedOpTimeAndWallTime(lock, opTimeAndWallTime, false, DataConsistency::Consistent); _reportUpstream_inlock(std::move(lock)); @@ -1175,13 +1171,13 @@ void ReplicationCoordinatorImpl::setMyLastAppliedOpTimeAndWallTime( void ReplicationCoordinatorImpl::setMyLastDurableOpTimeAndWallTime( const OpTimeAndWallTime& opTimeAndWallTime) { - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); _setMyLastDurableOpTimeAndWallTime(lock, opTimeAndWallTime, false); _reportUpstream_inlock(std::move(lock)); } void ReplicationCoordinatorImpl::resetMyLastOpTimes() { - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); _resetMyLastOpTimes(lock); _reportUpstream_inlock(std::move(lock)); } @@ -1196,7 +1192,7 @@ void ReplicationCoordinatorImpl::_resetMyLastOpTimes(WithLock lk) { _stableOpTimeCandidates.clear(); } -void ReplicationCoordinatorImpl::_reportUpstream_inlock(stdx::unique_lock<stdx::mutex> lock) { +void ReplicationCoordinatorImpl::_reportUpstream_inlock(stdx::unique_lock<Latch> lock) { invariant(lock.owns_lock()); if (getReplicationMode() != modeReplSet) { @@ -1283,22 +1279,22 @@ void ReplicationCoordinatorImpl::_setMyLastDurableOpTimeAndWallTime( } OpTime ReplicationCoordinatorImpl::getMyLastAppliedOpTime() const { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _getMyLastAppliedOpTime_inlock(); } OpTimeAndWallTime ReplicationCoordinatorImpl::getMyLastAppliedOpTimeAndWallTime() const { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _getMyLastAppliedOpTimeAndWallTime_inlock(); } OpTimeAndWallTime ReplicationCoordinatorImpl::getMyLastDurableOpTimeAndWallTime() const { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _getMyLastDurableOpTimeAndWallTime_inlock(); } OpTime ReplicationCoordinatorImpl::getMyLastDurableOpTime() const { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _getMyLastDurableOpTime_inlock(); } @@ -1405,7 +1401,7 @@ Status ReplicationCoordinatorImpl::_waitUntilOpTime(OperationContext* opCtx, } } - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); if (isMajorityCommittedRead && !_externalState->snapshotsEnabled()) { return {ErrorCodes::CommandNotSupported, @@ -1572,7 +1568,7 @@ Status ReplicationCoordinatorImpl::setLastDurableOptime_forTest(long long cfgVer long long memberId, const OpTime& opTime, Date_t wallTime) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); invariant(getReplicationMode() == modeReplSet); if (wallTime == Date_t()) { @@ -1591,7 +1587,7 @@ Status ReplicationCoordinatorImpl::setLastAppliedOptime_forTest(long long cfgVer long long memberId, const OpTime& opTime, Date_t wallTime) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); invariant(getReplicationMode() == modeReplSet); if (wallTime == Date_t()) { @@ -1691,7 +1687,7 @@ ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorImpl::awaitRepli OperationContext* opCtx, const OpTime& opTime, const WriteConcernOptions& writeConcern) { Timer timer; WriteConcernOptions fixedWriteConcern = populateUnsetWriteConcernOptionsSyncMode(writeConcern); - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); auto status = _awaitReplication_inlock(&lock, opCtx, opTime, fixedWriteConcern); return {std::move(status), duration_cast<Milliseconds>(timer.elapsed())}; } @@ -1714,7 +1710,7 @@ BSONObj ReplicationCoordinatorImpl::_getReplicationProgress(WithLock wl) const { return progress.obj(); } Status ReplicationCoordinatorImpl::_awaitReplication_inlock( - stdx::unique_lock<stdx::mutex>* lock, + stdx::unique_lock<Latch>* lock, OperationContext* opCtx, const OpTime& opTime, const WriteConcernOptions& writeConcern) { @@ -1834,7 +1830,7 @@ Status ReplicationCoordinatorImpl::_awaitReplication_inlock( void ReplicationCoordinatorImpl::waitForStepDownAttempt_forTest() { auto isSteppingDown = [&]() { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); // If true, we know that a stepdown is underway. return (_topCoord->isSteppingDown()); }; @@ -1933,7 +1929,7 @@ void ReplicationCoordinatorImpl::AutoGetRstlForStepUpStepDown::_killOpThreadFn() // X mode for the first time. This ensures that no writing operations will continue // after the node's term change. { - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); if (_stopKillingOps.wait_for( lock, Milliseconds(10).toSystemDuration(), [this] { return _killSignaled; })) { log() << "Stopped killing user operations"; @@ -1949,7 +1945,7 @@ void ReplicationCoordinatorImpl::AutoGetRstlForStepUpStepDown::_stopAndWaitForKi return; { - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); _killSignaled = true; _stopKillingOps.notify_all(); } @@ -2009,7 +2005,7 @@ void ReplicationCoordinatorImpl::stepDown(OperationContext* opCtx, auto deadline = force ? stepDownUntil : waitUntil; AutoGetRstlForStepUpStepDown arsd(this, opCtx, deadline); - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); opCtx->checkForInterrupt(); @@ -2043,7 +2039,7 @@ void ReplicationCoordinatorImpl::stepDown(OperationContext* opCtx, stepdownHangBeforePerformingPostMemberStateUpdateActions.shouldFail())) { mongo::sleepsecs(1); { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); if (_inShutdown) { break; } @@ -2149,7 +2145,7 @@ void ReplicationCoordinatorImpl::stepDown(OperationContext* opCtx, } void ReplicationCoordinatorImpl::_performElectionHandoff() { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); auto candidateIndex = _topCoord->chooseElectionHandoffCandidate(); if (candidateIndex < 0) { @@ -2198,7 +2194,7 @@ bool ReplicationCoordinatorImpl::isMasterForReportingPurposes() { return true; } - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); invariant(getReplicationMode() == modeReplSet); return _getMemberState_inlock().primary(); } @@ -2227,7 +2223,7 @@ bool ReplicationCoordinatorImpl::canAcceptWritesForDatabase_UNSAFE(OperationCont } bool ReplicationCoordinatorImpl::canAcceptNonLocalWrites() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _readWriteAbility->canAcceptNonLocalWrites(lk); } @@ -2259,7 +2255,7 @@ bool ReplicationCoordinatorImpl::canAcceptWritesFor_UNSAFE(OperationContext* opC return true; } - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); if (_memberState.rollback()) { return false; } @@ -2287,7 +2283,7 @@ Status ReplicationCoordinatorImpl::checkCanServeReadsFor_UNSAFE(OperationContext // Oplog reads are not allowed during STARTUP state, but we make an exception for internal // reads. Internal reads are required for cleaning up unfinished apply batches. if (!isPrimaryOrSecondary && getReplicationMode() == modeReplSet && ns.isOplog()) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); if ((_memberState.startup() && client->isFromUserConnection()) || _memberState.startup2() || _memberState.rollback()) { return Status{ErrorCodes::NotMasterOrSecondary, @@ -2331,17 +2327,17 @@ bool ReplicationCoordinatorImpl::shouldRelaxIndexConstraints(OperationContext* o } OID ReplicationCoordinatorImpl::getElectionId() { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _electionId; } int ReplicationCoordinatorImpl::getMyId() const { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _getMyId_inlock(); } HostAndPort ReplicationCoordinatorImpl::getMyHostAndPort() const { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _rsConfig.getMemberAt(_selfIndex).getHostAndPort(); } @@ -2358,7 +2354,7 @@ Status ReplicationCoordinatorImpl::resyncData(OperationContext* opCtx, bool wait f = [&finishedEvent, this]() { _replExecutor->signalEvent(finishedEvent); }; { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _resetMyLastOpTimes(lk); } // unlock before calling _startDataReplication(). @@ -2370,7 +2366,7 @@ Status ReplicationCoordinatorImpl::resyncData(OperationContext* opCtx, bool wait } StatusWith<BSONObj> ReplicationCoordinatorImpl::prepareReplSetUpdatePositionCommand() const { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _topCoord->prepareReplSetUpdatePositionCommand( _getCurrentCommittedSnapshotOpTime_inlock()); } @@ -2382,7 +2378,7 @@ Status ReplicationCoordinatorImpl::processReplSetGetStatus( if (responseStyle == ReplSetGetStatusResponseStyle::kInitialSync) { std::shared_ptr<InitialSyncer> initialSyncerCopy; { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); initialSyncerCopy = _initialSyncer; } @@ -2397,7 +2393,7 @@ Status ReplicationCoordinatorImpl::processReplSetGetStatus( BSONObj electionCandidateMetrics = ReplicationMetrics::get(getServiceContext()).getElectionCandidateMetricsBSON(); - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); Status result(ErrorCodes::InternalError, "didn't set status in prepareStatusResponse"); _topCoord->prepareStatusResponse( TopologyCoordinator::ReplSetStatusArgs{ @@ -2417,7 +2413,7 @@ void ReplicationCoordinatorImpl::fillIsMasterForReplSet( IsMasterResponse* response, const SplitHorizon::Parameters& horizonParams) { invariant(getSettings().usingReplSets()); - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _topCoord->fillIsMasterForReplSet(response, horizonParams); OpTime lastOpTime = _getMyLastAppliedOpTime_inlock(); @@ -2440,17 +2436,17 @@ void ReplicationCoordinatorImpl::fillIsMasterForReplSet( } void ReplicationCoordinatorImpl::appendSlaveInfoData(BSONObjBuilder* result) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); _topCoord->fillMemberData(result); } ReplSetConfig ReplicationCoordinatorImpl::getConfig() const { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _rsConfig; } void ReplicationCoordinatorImpl::processReplSetGetConfig(BSONObjBuilder* result) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); result->append("config", _rsConfig.toBSON()); } @@ -2458,7 +2454,7 @@ void ReplicationCoordinatorImpl::processReplSetMetadata(const rpc::ReplSetMetada EventHandle evh; { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); evh = _processReplSetMetadata_inlock(replMetadata); } @@ -2468,7 +2464,7 @@ void ReplicationCoordinatorImpl::processReplSetMetadata(const rpc::ReplSetMetada } void ReplicationCoordinatorImpl::cancelAndRescheduleElectionTimeout() { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); _cancelAndRescheduleElectionTimeout_inlock(); } @@ -2481,7 +2477,7 @@ EventHandle ReplicationCoordinatorImpl::_processReplSetMetadata_inlock( } bool ReplicationCoordinatorImpl::getMaintenanceMode() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _topCoord->getMaintenanceCount() > 0; } @@ -2491,7 +2487,7 @@ Status ReplicationCoordinatorImpl::setMaintenanceMode(bool activate) { "can only set maintenance mode on replica set members"); } - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); if (_topCoord->getRole() == TopologyCoordinator::Role::kCandidate) { return Status(ErrorCodes::NotSecondary, "currently running for election"); } @@ -2530,7 +2526,7 @@ Status ReplicationCoordinatorImpl::processReplSetSyncFrom(OperationContext* opCt Status result(ErrorCodes::InternalError, "didn't set status in prepareSyncFromResponse"); auto doResync = false; { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _topCoord->prepareSyncFromResponse(target, resultObj, &result); // If we are in the middle of an initial sync, do a resync. doResync = result.isOK() && _initialSyncer && _initialSyncer->isActive(); @@ -2545,7 +2541,7 @@ Status ReplicationCoordinatorImpl::processReplSetSyncFrom(OperationContext* opCt Status ReplicationCoordinatorImpl::processReplSetFreeze(int secs, BSONObjBuilder* resultObj) { auto result = [=]() { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _topCoord->prepareFreezeResponse(_replExecutor->now(), secs, resultObj); }(); if (!result.isOK()) { @@ -2569,7 +2565,7 @@ Status ReplicationCoordinatorImpl::processReplSetReconfig(OperationContext* opCt log() << "replSetReconfig admin command received from client; new config: " << args.newConfigObj; - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); while (_rsConfigState == kConfigPreStart || _rsConfigState == kConfigStartingUp) { _rsConfigStateChange.wait(lk); @@ -2625,7 +2621,6 @@ Status ReplicationCoordinatorImpl::processReplSetReconfig(OperationContext* opCt if (!status.isOK()) { error() << "replSetReconfig got " << status << " while parsing " << newConfigObj; return Status(ErrorCodes::InvalidReplicaSetConfig, status.reason()); - ; } if (newConfig.getReplSetName() != _settings.ourSetName()) { str::stream errmsg; @@ -2674,7 +2669,7 @@ void ReplicationCoordinatorImpl::_finishReplSetReconfig(OperationContext* opCtx, // Do not conduct an election during a reconfig, as the node may not be electable post-reconfig. executor::TaskExecutor::EventHandle electionFinishedEvent; { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); electionFinishedEvent = _cancelElectionIfNeeded_inlock(); } @@ -2689,7 +2684,7 @@ void ReplicationCoordinatorImpl::_finishReplSetReconfig(OperationContext* opCtx, } boost::optional<AutoGetRstlForStepUpStepDown> arsd; - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); if (isForceReconfig && _shouldStepDownOnReconfig(lk, newConfig, myIndex)) { _topCoord->prepareForUnconditionalStepDown(); lk.unlock(); @@ -2748,7 +2743,7 @@ Status ReplicationCoordinatorImpl::processReplSetInitiate(OperationContext* opCt log() << "replSetInitiate admin command received from client"; const auto replEnabled = _settings.usingReplSets(); - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); if (!replEnabled) { return Status(ErrorCodes::NoReplicationEnabled, "server is not running with --replSet"); } @@ -2837,7 +2832,7 @@ Status ReplicationCoordinatorImpl::processReplSetInitiate(OperationContext* opCt void ReplicationCoordinatorImpl::_finishReplSetInitiate(OperationContext* opCtx, const ReplSetConfig& newConfig, int myIndex) { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); invariant(_rsConfigState == kConfigInitiating); invariant(!_rsConfig.isInitialized()); auto action = _setCurrentRSConfig(lk, opCtx, newConfig, myIndex); @@ -3065,7 +3060,7 @@ void ReplicationCoordinatorImpl::CatchupState::start_inlock() { if (!cbData.status.isOK()) { return; } - stdx::lock_guard<stdx::mutex> lk(*mutex); + stdx::lock_guard<Latch> lk(*mutex); // Check whether the callback has been cancelled while holding mutex. if (cbData.myHandle.isCanceled()) { return; @@ -3177,7 +3172,7 @@ void ReplicationCoordinatorImpl::CatchupState::incrementNumCatchUpOps_inlock(int } Status ReplicationCoordinatorImpl::abortCatchupIfNeeded(PrimaryCatchUpConclusionReason reason) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); if (_catchupState) { _catchupState->abort_inlock(reason); return Status::OK(); @@ -3186,14 +3181,14 @@ Status ReplicationCoordinatorImpl::abortCatchupIfNeeded(PrimaryCatchUpConclusion } void ReplicationCoordinatorImpl::incrementNumCatchUpOpsIfCatchingUp(int numOps) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); if (_catchupState) { _catchupState->incrementNumCatchUpOps_inlock(numOps); } } void ReplicationCoordinatorImpl::signalDropPendingCollectionsRemovedFromStorage() { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); _wakeReadyWaiters(lock); } @@ -3310,7 +3305,7 @@ void ReplicationCoordinatorImpl::_wakeReadyWaiters(WithLock lk) { Status ReplicationCoordinatorImpl::processReplSetUpdatePosition(const UpdatePositionArgs& updates, long long* configVersion) { - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); Status status = Status::OK(); bool somethingChanged = false; for (UpdatePositionArgs::UpdateIterator update = updates.updatesBegin(); @@ -3332,7 +3327,7 @@ Status ReplicationCoordinatorImpl::processReplSetUpdatePosition(const UpdatePosi } bool ReplicationCoordinatorImpl::buildsIndexes() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); if (_selfIndex == -1) { return true; } @@ -3342,12 +3337,12 @@ bool ReplicationCoordinatorImpl::buildsIndexes() { std::vector<HostAndPort> ReplicationCoordinatorImpl::getHostsWrittenTo(const OpTime& op, bool durablyWritten) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _topCoord->getHostsWrittenTo(op, durablyWritten); } std::vector<HostAndPort> ReplicationCoordinatorImpl::getOtherNodesInReplSet() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); invariant(_settings.usingReplSets()); std::vector<HostAndPort> nodes; @@ -3366,7 +3361,7 @@ std::vector<HostAndPort> ReplicationCoordinatorImpl::getOtherNodesInReplSet() co Status ReplicationCoordinatorImpl::checkIfWriteConcernCanBeSatisfied( const WriteConcernOptions& writeConcern) const { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _checkIfWriteConcernCanBeSatisfied_inlock(writeConcern); } @@ -3383,7 +3378,7 @@ Status ReplicationCoordinatorImpl::_checkIfWriteConcernCanBeSatisfied_inlock( Status ReplicationCoordinatorImpl::checkIfCommitQuorumCanBeSatisfied( const CommitQuorumOptions& commitQuorum) const { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _checkIfCommitQuorumCanBeSatisfied(lock, commitQuorum); } @@ -3416,7 +3411,7 @@ StatusWith<bool> ReplicationCoordinatorImpl::checkIfCommitQuorumIsSatisfied( // If the 'commitQuorum' cannot be satisfied with all the members of this replica set, we // need to inform the caller to avoid hanging while waiting for satisfiability of the // 'commitQuorum' with 'commitReadyMembers' due to replica set reconfigurations. - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); Status status = _checkIfCommitQuorumCanBeSatisfied(lock, commitQuorum); if (!status.isOK()) { return status; @@ -3427,7 +3422,7 @@ StatusWith<bool> ReplicationCoordinatorImpl::checkIfCommitQuorumIsSatisfied( } WriteConcernOptions ReplicationCoordinatorImpl::getGetLastErrorDefault() { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); if (_rsConfig.isInitialized()) { return _rsConfig.getDefaultWriteConcern(); } @@ -3455,7 +3450,7 @@ bool ReplicationCoordinatorImpl::isReplEnabled() const { } HostAndPort ReplicationCoordinatorImpl::chooseNewSyncSource(const OpTime& lastOpTimeFetched) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); HostAndPort oldSyncSource = _topCoord->getSyncSourceAddress(); // Always allow chaining while in catchup and drain mode. @@ -3480,12 +3475,12 @@ void ReplicationCoordinatorImpl::_unblacklistSyncSource( if (cbData.status == ErrorCodes::CallbackCanceled) return; - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); _topCoord->unblacklistSyncSource(host, _replExecutor->now()); } void ReplicationCoordinatorImpl::blacklistSyncSource(const HostAndPort& host, Date_t until) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); _topCoord->blacklistSyncSource(host, until); _scheduleWorkAt(until, [=](const executor::TaskExecutor::CallbackArgs& cbData) { _unblacklistSyncSource(cbData, host); @@ -3509,7 +3504,7 @@ void ReplicationCoordinatorImpl::resetLastOpTimesFromOplog(OperationContext* opC _externalState->setGlobalTimestamp(opCtx->getServiceContext(), lastOpTimeAndWallTime.opTime.getTimestamp()); - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); bool isRollbackAllowed = true; _setMyLastAppliedOpTimeAndWallTime(lock, lastOpTimeAndWallTime, isRollbackAllowed, consistency); _setMyLastDurableOpTimeAndWallTime(lock, lastOpTimeAndWallTime, isRollbackAllowed); @@ -3520,7 +3515,7 @@ bool ReplicationCoordinatorImpl::shouldChangeSyncSource( const HostAndPort& currentSource, const rpc::ReplSetMetadata& replMetadata, boost::optional<rpc::OplogQueryMetadata> oqMetadata) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _topCoord->shouldChangeSyncSource( currentSource, replMetadata, oqMetadata, _replExecutor->now()); } @@ -3615,7 +3610,7 @@ void ReplicationCoordinatorImpl::_cleanupStableOpTimeCandidates( boost::optional<OpTimeAndWallTime> ReplicationCoordinatorImpl::chooseStableOpTimeFromCandidates_forTest( const std::set<OpTimeAndWallTime>& candidates, const OpTimeAndWallTime& maximumStableOpTime) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _chooseStableOpTimeFromCandidates(lk, candidates, maximumStableOpTime); } void ReplicationCoordinatorImpl::cleanupStableOpTimeCandidates_forTest( @@ -3624,12 +3619,12 @@ void ReplicationCoordinatorImpl::cleanupStableOpTimeCandidates_forTest( } std::set<OpTimeAndWallTime> ReplicationCoordinatorImpl::getStableOpTimeCandidates_forTest() { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); return _stableOpTimeCandidates; } void ReplicationCoordinatorImpl::attemptToAdvanceStableTimestamp() { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); _setStableTimestampForStorage(lk); } @@ -3757,7 +3752,7 @@ void ReplicationCoordinatorImpl::finishRecoveryIfEligible(OperationContext* opCt void ReplicationCoordinatorImpl::advanceCommitPoint( const OpTimeAndWallTime& committedOpTimeAndWallTime, bool fromSyncSource) { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); _advanceCommitPoint(lk, committedOpTimeAndWallTime, fromSyncSource); } @@ -3779,12 +3774,12 @@ void ReplicationCoordinatorImpl::_advanceCommitPoint( } OpTime ReplicationCoordinatorImpl::getLastCommittedOpTime() const { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); return _topCoord->getLastCommittedOpTime(); } OpTimeAndWallTime ReplicationCoordinatorImpl::getLastCommittedOpTimeAndWallTime() const { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); return _topCoord->getLastCommittedOpTimeAndWallTime(); } @@ -3798,7 +3793,7 @@ Status ReplicationCoordinatorImpl::processReplSetRequestVotes( return termStatus; { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); // We should only enter terminal shutdown from global terminal exit. In that case, rather // than voting in a term we don't plan to stay alive in, refuse to vote. @@ -3839,7 +3834,7 @@ void ReplicationCoordinatorImpl::prepareReplMetadata(const BSONObj& metadataRequ invariant(-1 != rbid); } - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); if (hasReplSetMetadata) { _prepareReplSetMetadata_inlock(lastOpTimeFromClient, builder); @@ -3874,7 +3869,7 @@ bool ReplicationCoordinatorImpl::getWriteConcernMajorityShouldJournal_inlock() c Status ReplicationCoordinatorImpl::processHeartbeatV1(const ReplSetHeartbeatArgsV1& args, ReplSetHeartbeatResponse* response) { { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); if (_rsConfigState == kConfigPreStart || _rsConfigState == kConfigStartingUp) { return Status(ErrorCodes::NotYetInitialized, "Received heartbeat while still initializing replication system"); @@ -3882,7 +3877,7 @@ Status ReplicationCoordinatorImpl::processHeartbeatV1(const ReplSetHeartbeatArgs } Status result(ErrorCodes::InternalError, "didn't set status in prepareHeartbeatResponse"); - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); auto senderHost(args.getSenderHost()); const Date_t now = _replExecutor->now(); @@ -3915,7 +3910,7 @@ long long ReplicationCoordinatorImpl::getTerm() { EventHandle ReplicationCoordinatorImpl::updateTerm_forTest( long long term, TopologyCoordinator::UpdateTermResult* updateResult) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); EventHandle finishEvh; finishEvh = _updateTerm_inlock(term, updateResult); @@ -3934,7 +3929,7 @@ Status ReplicationCoordinatorImpl::updateTerm(OperationContext* opCtx, long long EventHandle finishEvh; { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); finishEvh = _updateTerm_inlock(term, &updateTermResult); } @@ -3983,7 +3978,7 @@ EventHandle ReplicationCoordinatorImpl::_updateTerm_inlock( void ReplicationCoordinatorImpl::waitUntilSnapshotCommitted(OperationContext* opCtx, const Timestamp& untilSnapshot) { - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); uassert(ErrorCodes::NotYetInitialized, "Cannot use snapshots until replica set is finished initializing.", @@ -3999,7 +3994,7 @@ size_t ReplicationCoordinatorImpl::getNumUncommittedSnapshots() { } void ReplicationCoordinatorImpl::createWMajorityWriteAvailabilityDateWaiter(OpTime opTime) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); auto opTimeCB = [this, opTime]() { ReplicationMetrics::get(getServiceContext()) .setWMajorityWriteAvailabilityDate(_replExecutor->now()); @@ -4045,7 +4040,7 @@ bool ReplicationCoordinatorImpl::_updateCommittedSnapshot( } void ReplicationCoordinatorImpl::dropAllSnapshots() { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); _dropAllSnapshots_inlock(); } @@ -4091,7 +4086,7 @@ EventHandle ReplicationCoordinatorImpl::_makeEvent() { WriteConcernOptions ReplicationCoordinatorImpl::populateUnsetWriteConcernOptionsSyncMode( WriteConcernOptions wc) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _populateUnsetWriteConcernOptionsSyncMode(lock, wc); } @@ -4127,7 +4122,7 @@ Status ReplicationCoordinatorImpl::stepUpIfEligible(bool skipDryRun) { EventHandle finishEvent; { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); finishEvent = _electionFinishedEvent; } if (finishEvent.isValid()) { @@ -4137,7 +4132,7 @@ Status ReplicationCoordinatorImpl::stepUpIfEligible(bool skipDryRun) { // Step up is considered successful only if we are currently a primary and we are not in the // process of stepping down. If we know we are going to step down, we should fail the // replSetStepUp command so caller can retry if necessary. - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); if (!_getMemberState_inlock().primary()) return Status(ErrorCodes::CommandFailed, "Election failed."); else if (_topCoord->isSteppingDown()) @@ -4160,7 +4155,7 @@ int64_t ReplicationCoordinatorImpl::_nextRandomInt64_inlock(int64_t limit) { } bool ReplicationCoordinatorImpl::setContainsArbiter() const { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _rsConfig.containsArbiter(); } |