summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/replication_coordinator_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_impl.cpp')
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp237
1 files changed, 116 insertions, 121 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index ba1739049fb..8587b3a56a0 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -82,9 +82,9 @@
#include "mongo/db/write_concern_options.h"
#include "mongo/executor/connection_pool_stats.h"
#include "mongo/executor/network_interface.h"
+#include "mongo/platform/mutex.h"
#include "mongo/rpc/metadata/oplog_query_metadata.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
-#include "mongo/stdx/mutex.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
@@ -154,7 +154,7 @@ private:
const bool _initialState;
};
-void lockAndCall(stdx::unique_lock<stdx::mutex>* lk, const std::function<void()>& fn) {
+void lockAndCall(stdx::unique_lock<Latch>* lk, const std::function<void()>& fn) {
if (!lk->owns_lock()) {
lk->lock();
}
@@ -233,7 +233,7 @@ public:
* _list is guarded by ReplicationCoordinatorImpl::_mutex, thus it is illegal to construct one
* of these without holding _mutex
*/
- WaiterGuard(const stdx::unique_lock<stdx::mutex>& lock, WaiterList* list, Waiter* waiter)
+ WaiterGuard(const stdx::unique_lock<Latch>& lock, WaiterList* list, Waiter* waiter)
: _lock(lock), _list(list), _waiter(waiter) {
invariant(_lock.owns_lock());
list->add_inlock(_waiter);
@@ -245,7 +245,7 @@ public:
}
private:
- const stdx::unique_lock<stdx::mutex>& _lock;
+ const stdx::unique_lock<Latch>& _lock;
WaiterList* _list;
Waiter* _waiter;
};
@@ -374,7 +374,7 @@ void ReplicationCoordinatorImpl::waitForStartUpComplete_forTest() {
void ReplicationCoordinatorImpl::_waitForStartUpComplete() {
CallbackHandle handle;
{
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
while (_rsConfigState == kConfigPreStart || _rsConfigState == kConfigStartingUp) {
_rsConfigStateChange.wait(lk);
}
@@ -386,12 +386,12 @@ void ReplicationCoordinatorImpl::_waitForStartUpComplete() {
}
ReplSetConfig ReplicationCoordinatorImpl::getReplicaSetConfig_forTest() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
return _rsConfig;
}
Date_t ReplicationCoordinatorImpl::getElectionTimeout_forTest() const {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
if (!_handleElectionTimeoutCbh.isValid()) {
return Date_t();
}
@@ -399,12 +399,12 @@ Date_t ReplicationCoordinatorImpl::getElectionTimeout_forTest() const {
}
Milliseconds ReplicationCoordinatorImpl::getRandomizedElectionOffset_forTest() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
return _getRandomizedElectionOffset_inlock();
}
boost::optional<Date_t> ReplicationCoordinatorImpl::getPriorityTakeover_forTest() const {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
if (!_priorityTakeoverCbh.isValid()) {
return boost::none;
}
@@ -412,7 +412,7 @@ boost::optional<Date_t> ReplicationCoordinatorImpl::getPriorityTakeover_forTest(
}
boost::optional<Date_t> ReplicationCoordinatorImpl::getCatchupTakeover_forTest() const {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
if (!_catchupTakeoverCbh.isValid()) {
return boost::none;
}
@@ -425,12 +425,12 @@ executor::TaskExecutor::CallbackHandle ReplicationCoordinatorImpl::getCatchupTak
}
OpTime ReplicationCoordinatorImpl::getCurrentCommittedSnapshotOpTime() const {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
return _getCurrentCommittedSnapshotOpTime_inlock();
}
OpTimeAndWallTime ReplicationCoordinatorImpl::getCurrentCommittedSnapshotOpTimeAndWallTime() const {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
return _getCurrentCommittedSnapshotOpTimeAndWallTime_inlock();
}
@@ -481,7 +481,7 @@ bool ReplicationCoordinatorImpl::_startLoadLocalConfig(OperationContext* opCtx)
log() << "Did not find local initialized voted for document at startup.";
}
{
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
_topCoord->loadLastVote(lastVote.getValue());
}
@@ -542,7 +542,7 @@ bool ReplicationCoordinatorImpl::_startLoadLocalConfig(OperationContext* opCtx)
handle = CallbackHandle{};
}
fassert(40446, handle);
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
_finishLoadLocalConfigCbh = std::move(handle.getValue());
return false;
@@ -644,7 +644,7 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig(
// applied optime is never greater than the latest cluster time in the logical clock.
_externalState->setGlobalTimestamp(getServiceContext(), lastOpTime.getTimestamp());
- stdx::unique_lock<stdx::mutex> lock(_mutex);
+ stdx::unique_lock<Latch> lock(_mutex);
invariant(_rsConfigState == kConfigStartingUp);
const PostMemberStateUpdateAction action =
_setCurrentRSConfig(lock, opCtx.get(), localConfig, myIndex.getValue());
@@ -661,7 +661,7 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig(
}
{
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
// Step down is impossible, so we don't need to wait for the returned event.
_updateTerm_inlock(term);
}
@@ -677,7 +677,7 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig(
void ReplicationCoordinatorImpl::_stopDataReplication(OperationContext* opCtx) {
std::shared_ptr<InitialSyncer> initialSyncerCopy;
{
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
_initialSyncer.swap(initialSyncerCopy);
}
if (initialSyncerCopy) {
@@ -719,7 +719,7 @@ void ReplicationCoordinatorImpl::_startDataReplication(OperationContext* opCtx,
auto onCompletion = [this, startCompleted](const StatusWith<OpTimeAndWallTime>& opTimeStatus) {
{
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
if (opTimeStatus == ErrorCodes::CallbackCanceled) {
log() << "Initial Sync has been cancelled: " << opTimeStatus.getStatus();
return;
@@ -760,11 +760,7 @@ void ReplicationCoordinatorImpl::_startDataReplication(OperationContext* opCtx,
try {
{
// Must take the lock to set _initialSyncer, but not call it.
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- if (_inShutdown) {
- log() << "Initial Sync not starting because replication is shutting down.";
- return;
- }
+ stdx::lock_guard<Latch> lock(_mutex);
initialSyncerCopy = std::make_shared<InitialSyncer>(
createInitialSyncerOptions(this, _externalState.get()),
std::make_unique<DataReplicatorExternalStateInitialSync>(this,
@@ -817,7 +813,7 @@ void ReplicationCoordinatorImpl::startup(OperationContext* opCtx) {
storageGlobalParams.readOnly = true;
}
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
_setConfigState_inlock(kConfigReplicationDisabled);
return;
}
@@ -828,7 +824,7 @@ void ReplicationCoordinatorImpl::startup(OperationContext* opCtx) {
_storage->initializeStorageControlsForReplication(opCtx->getServiceContext());
{
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
fassert(18822, !_inShutdown);
_setConfigState_inlock(kConfigStartingUp);
_topCoord->setStorageEngineSupportsReadCommitted(
@@ -844,7 +840,7 @@ void ReplicationCoordinatorImpl::startup(OperationContext* opCtx) {
if (doneLoadingConfig) {
// If we're not done loading the config, then the config state will be set by
// _finishLoadLocalConfig.
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
invariant(!_rsConfig.isInitialized());
_setConfigState_inlock(kConfigUninitialized);
}
@@ -870,7 +866,7 @@ void ReplicationCoordinatorImpl::shutdown(OperationContext* opCtx) {
// Used to shut down outside of the lock.
std::shared_ptr<InitialSyncer> initialSyncerCopy;
{
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
fassert(28533, !_inShutdown);
_inShutdown = true;
if (_rsConfigState == kConfigPreStart) {
@@ -918,12 +914,12 @@ ReplicationCoordinator::Mode ReplicationCoordinatorImpl::getReplicationMode() co
}
MemberState ReplicationCoordinatorImpl::getMemberState() const {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
return _getMemberState_inlock();
}
std::vector<MemberData> ReplicationCoordinatorImpl::getMemberData() const {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
return _topCoord->getMemberData();
}
@@ -937,7 +933,7 @@ Status ReplicationCoordinatorImpl::waitForMemberState(MemberState expectedState,
return Status(ErrorCodes::BadValue, "Timeout duration cannot be negative");
}
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
auto pred = [this, expectedState]() { return _memberState == expectedState; };
if (!_memberStateChange.wait_for(lk, timeout.toSystemDuration(), pred)) {
return Status(ErrorCodes::ExceededTimeLimit,
@@ -949,7 +945,7 @@ Status ReplicationCoordinatorImpl::waitForMemberState(MemberState expectedState,
}
Seconds ReplicationCoordinatorImpl::getSlaveDelaySecs() const {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
invariant(_rsConfig.isInitialized());
if (_selfIndex == -1) {
// We aren't currently in the set. Return 0 seconds so we can clear out the applier's
@@ -960,7 +956,7 @@ Seconds ReplicationCoordinatorImpl::getSlaveDelaySecs() const {
}
void ReplicationCoordinatorImpl::clearSyncSourceBlacklist() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
_topCoord->clearSyncSourceBlacklist();
}
@@ -977,7 +973,7 @@ Status ReplicationCoordinatorImpl::setFollowerMode(const MemberState& newState)
Status ReplicationCoordinatorImpl::_setFollowerMode(OperationContext* opCtx,
const MemberState& newState) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
if (newState == _topCoord->getMemberState()) {
return Status::OK();
}
@@ -1008,7 +1004,7 @@ Status ReplicationCoordinatorImpl::_setFollowerMode(OperationContext* opCtx,
}
ReplicationCoordinator::ApplierState ReplicationCoordinatorImpl::getApplierState() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
return _applierState;
}
@@ -1040,7 +1036,7 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* opCtx,
// When we go to drop all temp collections, we must replicate the drops.
invariant(opCtx->writesAreReplicated());
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
if (_applierState != ApplierState::Draining) {
return;
}
@@ -1101,7 +1097,7 @@ Status ReplicationCoordinatorImpl::waitForDrainFinish(Milliseconds timeout) {
return Status(ErrorCodes::BadValue, "Timeout duration cannot be negative");
}
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
auto pred = [this]() { return _applierState != ApplierState::Draining; };
if (!_drainFinishedCond.wait_for(lk, timeout.toSystemDuration(), pred)) {
return Status(ErrorCodes::ExceededTimeLimit,
@@ -1116,7 +1112,7 @@ void ReplicationCoordinatorImpl::signalUpstreamUpdater() {
}
void ReplicationCoordinatorImpl::setMyHeartbeatMessage(const std::string& msg) {
- stdx::unique_lock<stdx::mutex> lock(_mutex);
+ stdx::unique_lock<Latch> lock(_mutex);
_topCoord->setMyHeartbeatMessage(_replExecutor->now(), msg);
}
@@ -1127,7 +1123,7 @@ void ReplicationCoordinatorImpl::setMyLastAppliedOpTimeAndWallTimeForward(
const auto opTime = opTimeAndWallTime.opTime;
_externalState->setGlobalTimestamp(getServiceContext(), opTime.getTimestamp());
- stdx::unique_lock<stdx::mutex> lock(_mutex);
+ stdx::unique_lock<Latch> lock(_mutex);
auto myLastAppliedOpTime = _getMyLastAppliedOpTime_inlock();
if (opTime > myLastAppliedOpTime) {
_setMyLastAppliedOpTimeAndWallTime(lock, opTimeAndWallTime, false, consistency);
@@ -1153,7 +1149,7 @@ void ReplicationCoordinatorImpl::setMyLastAppliedOpTimeAndWallTimeForward(
void ReplicationCoordinatorImpl::setMyLastDurableOpTimeAndWallTimeForward(
const OpTimeAndWallTime& opTimeAndWallTime) {
- stdx::unique_lock<stdx::mutex> lock(_mutex);
+ stdx::unique_lock<Latch> lock(_mutex);
if (opTimeAndWallTime.opTime > _getMyLastDurableOpTime_inlock()) {
_setMyLastDurableOpTimeAndWallTime(lock, opTimeAndWallTime, false);
_reportUpstream_inlock(std::move(lock));
@@ -1167,7 +1163,7 @@ void ReplicationCoordinatorImpl::setMyLastAppliedOpTimeAndWallTime(
// applied optime is never greater than the latest cluster time in the logical clock.
_externalState->setGlobalTimestamp(getServiceContext(), opTime.getTimestamp());
- stdx::unique_lock<stdx::mutex> lock(_mutex);
+ stdx::unique_lock<Latch> lock(_mutex);
// The optime passed to this function is required to represent a consistent database state.
_setMyLastAppliedOpTimeAndWallTime(lock, opTimeAndWallTime, false, DataConsistency::Consistent);
_reportUpstream_inlock(std::move(lock));
@@ -1175,13 +1171,13 @@ void ReplicationCoordinatorImpl::setMyLastAppliedOpTimeAndWallTime(
void ReplicationCoordinatorImpl::setMyLastDurableOpTimeAndWallTime(
const OpTimeAndWallTime& opTimeAndWallTime) {
- stdx::unique_lock<stdx::mutex> lock(_mutex);
+ stdx::unique_lock<Latch> lock(_mutex);
_setMyLastDurableOpTimeAndWallTime(lock, opTimeAndWallTime, false);
_reportUpstream_inlock(std::move(lock));
}
void ReplicationCoordinatorImpl::resetMyLastOpTimes() {
- stdx::unique_lock<stdx::mutex> lock(_mutex);
+ stdx::unique_lock<Latch> lock(_mutex);
_resetMyLastOpTimes(lock);
_reportUpstream_inlock(std::move(lock));
}
@@ -1196,7 +1192,7 @@ void ReplicationCoordinatorImpl::_resetMyLastOpTimes(WithLock lk) {
_stableOpTimeCandidates.clear();
}
-void ReplicationCoordinatorImpl::_reportUpstream_inlock(stdx::unique_lock<stdx::mutex> lock) {
+void ReplicationCoordinatorImpl::_reportUpstream_inlock(stdx::unique_lock<Latch> lock) {
invariant(lock.owns_lock());
if (getReplicationMode() != modeReplSet) {
@@ -1283,22 +1279,22 @@ void ReplicationCoordinatorImpl::_setMyLastDurableOpTimeAndWallTime(
}
OpTime ReplicationCoordinatorImpl::getMyLastAppliedOpTime() const {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
return _getMyLastAppliedOpTime_inlock();
}
OpTimeAndWallTime ReplicationCoordinatorImpl::getMyLastAppliedOpTimeAndWallTime() const {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
return _getMyLastAppliedOpTimeAndWallTime_inlock();
}
OpTimeAndWallTime ReplicationCoordinatorImpl::getMyLastDurableOpTimeAndWallTime() const {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
return _getMyLastDurableOpTimeAndWallTime_inlock();
}
OpTime ReplicationCoordinatorImpl::getMyLastDurableOpTime() const {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
return _getMyLastDurableOpTime_inlock();
}
@@ -1405,7 +1401,7 @@ Status ReplicationCoordinatorImpl::_waitUntilOpTime(OperationContext* opCtx,
}
}
- stdx::unique_lock<stdx::mutex> lock(_mutex);
+ stdx::unique_lock<Latch> lock(_mutex);
if (isMajorityCommittedRead && !_externalState->snapshotsEnabled()) {
return {ErrorCodes::CommandNotSupported,
@@ -1572,7 +1568,7 @@ Status ReplicationCoordinatorImpl::setLastDurableOptime_forTest(long long cfgVer
long long memberId,
const OpTime& opTime,
Date_t wallTime) {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
invariant(getReplicationMode() == modeReplSet);
if (wallTime == Date_t()) {
@@ -1591,7 +1587,7 @@ Status ReplicationCoordinatorImpl::setLastAppliedOptime_forTest(long long cfgVer
long long memberId,
const OpTime& opTime,
Date_t wallTime) {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
invariant(getReplicationMode() == modeReplSet);
if (wallTime == Date_t()) {
@@ -1691,7 +1687,7 @@ ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorImpl::awaitRepli
OperationContext* opCtx, const OpTime& opTime, const WriteConcernOptions& writeConcern) {
Timer timer;
WriteConcernOptions fixedWriteConcern = populateUnsetWriteConcernOptionsSyncMode(writeConcern);
- stdx::unique_lock<stdx::mutex> lock(_mutex);
+ stdx::unique_lock<Latch> lock(_mutex);
auto status = _awaitReplication_inlock(&lock, opCtx, opTime, fixedWriteConcern);
return {std::move(status), duration_cast<Milliseconds>(timer.elapsed())};
}
@@ -1714,7 +1710,7 @@ BSONObj ReplicationCoordinatorImpl::_getReplicationProgress(WithLock wl) const {
return progress.obj();
}
Status ReplicationCoordinatorImpl::_awaitReplication_inlock(
- stdx::unique_lock<stdx::mutex>* lock,
+ stdx::unique_lock<Latch>* lock,
OperationContext* opCtx,
const OpTime& opTime,
const WriteConcernOptions& writeConcern) {
@@ -1834,7 +1830,7 @@ Status ReplicationCoordinatorImpl::_awaitReplication_inlock(
void ReplicationCoordinatorImpl::waitForStepDownAttempt_forTest() {
auto isSteppingDown = [&]() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
// If true, we know that a stepdown is underway.
return (_topCoord->isSteppingDown());
};
@@ -1933,7 +1929,7 @@ void ReplicationCoordinatorImpl::AutoGetRstlForStepUpStepDown::_killOpThreadFn()
// X mode for the first time. This ensures that no writing operations will continue
// after the node's term change.
{
- stdx::unique_lock<stdx::mutex> lock(_mutex);
+ stdx::unique_lock<Latch> lock(_mutex);
if (_stopKillingOps.wait_for(
lock, Milliseconds(10).toSystemDuration(), [this] { return _killSignaled; })) {
log() << "Stopped killing user operations";
@@ -1949,7 +1945,7 @@ void ReplicationCoordinatorImpl::AutoGetRstlForStepUpStepDown::_stopAndWaitForKi
return;
{
- stdx::unique_lock<stdx::mutex> lock(_mutex);
+ stdx::unique_lock<Latch> lock(_mutex);
_killSignaled = true;
_stopKillingOps.notify_all();
}
@@ -2009,7 +2005,7 @@ void ReplicationCoordinatorImpl::stepDown(OperationContext* opCtx,
auto deadline = force ? stepDownUntil : waitUntil;
AutoGetRstlForStepUpStepDown arsd(this, opCtx, deadline);
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
opCtx->checkForInterrupt();
@@ -2043,7 +2039,7 @@ void ReplicationCoordinatorImpl::stepDown(OperationContext* opCtx,
stepdownHangBeforePerformingPostMemberStateUpdateActions.shouldFail())) {
mongo::sleepsecs(1);
{
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
if (_inShutdown) {
break;
}
@@ -2149,7 +2145,7 @@ void ReplicationCoordinatorImpl::stepDown(OperationContext* opCtx,
}
void ReplicationCoordinatorImpl::_performElectionHandoff() {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
auto candidateIndex = _topCoord->chooseElectionHandoffCandidate();
if (candidateIndex < 0) {
@@ -2198,7 +2194,7 @@ bool ReplicationCoordinatorImpl::isMasterForReportingPurposes() {
return true;
}
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
invariant(getReplicationMode() == modeReplSet);
return _getMemberState_inlock().primary();
}
@@ -2227,7 +2223,7 @@ bool ReplicationCoordinatorImpl::canAcceptWritesForDatabase_UNSAFE(OperationCont
}
bool ReplicationCoordinatorImpl::canAcceptNonLocalWrites() const {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
return _readWriteAbility->canAcceptNonLocalWrites(lk);
}
@@ -2259,7 +2255,7 @@ bool ReplicationCoordinatorImpl::canAcceptWritesFor_UNSAFE(OperationContext* opC
return true;
}
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
if (_memberState.rollback()) {
return false;
}
@@ -2287,7 +2283,7 @@ Status ReplicationCoordinatorImpl::checkCanServeReadsFor_UNSAFE(OperationContext
// Oplog reads are not allowed during STARTUP state, but we make an exception for internal
// reads. Internal reads are required for cleaning up unfinished apply batches.
if (!isPrimaryOrSecondary && getReplicationMode() == modeReplSet && ns.isOplog()) {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
if ((_memberState.startup() && client->isFromUserConnection()) || _memberState.startup2() ||
_memberState.rollback()) {
return Status{ErrorCodes::NotMasterOrSecondary,
@@ -2331,17 +2327,17 @@ bool ReplicationCoordinatorImpl::shouldRelaxIndexConstraints(OperationContext* o
}
OID ReplicationCoordinatorImpl::getElectionId() {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
return _electionId;
}
int ReplicationCoordinatorImpl::getMyId() const {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
return _getMyId_inlock();
}
HostAndPort ReplicationCoordinatorImpl::getMyHostAndPort() const {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
return _rsConfig.getMemberAt(_selfIndex).getHostAndPort();
}
@@ -2358,7 +2354,7 @@ Status ReplicationCoordinatorImpl::resyncData(OperationContext* opCtx, bool wait
f = [&finishedEvent, this]() { _replExecutor->signalEvent(finishedEvent); };
{
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
_resetMyLastOpTimes(lk);
}
// unlock before calling _startDataReplication().
@@ -2370,7 +2366,7 @@ Status ReplicationCoordinatorImpl::resyncData(OperationContext* opCtx, bool wait
}
StatusWith<BSONObj> ReplicationCoordinatorImpl::prepareReplSetUpdatePositionCommand() const {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
return _topCoord->prepareReplSetUpdatePositionCommand(
_getCurrentCommittedSnapshotOpTime_inlock());
}
@@ -2382,7 +2378,7 @@ Status ReplicationCoordinatorImpl::processReplSetGetStatus(
if (responseStyle == ReplSetGetStatusResponseStyle::kInitialSync) {
std::shared_ptr<InitialSyncer> initialSyncerCopy;
{
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
initialSyncerCopy = _initialSyncer;
}
@@ -2397,7 +2393,7 @@ Status ReplicationCoordinatorImpl::processReplSetGetStatus(
BSONObj electionCandidateMetrics =
ReplicationMetrics::get(getServiceContext()).getElectionCandidateMetricsBSON();
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
Status result(ErrorCodes::InternalError, "didn't set status in prepareStatusResponse");
_topCoord->prepareStatusResponse(
TopologyCoordinator::ReplSetStatusArgs{
@@ -2417,7 +2413,7 @@ void ReplicationCoordinatorImpl::fillIsMasterForReplSet(
IsMasterResponse* response, const SplitHorizon::Parameters& horizonParams) {
invariant(getSettings().usingReplSets());
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
_topCoord->fillIsMasterForReplSet(response, horizonParams);
OpTime lastOpTime = _getMyLastAppliedOpTime_inlock();
@@ -2440,17 +2436,17 @@ void ReplicationCoordinatorImpl::fillIsMasterForReplSet(
}
void ReplicationCoordinatorImpl::appendSlaveInfoData(BSONObjBuilder* result) {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
_topCoord->fillMemberData(result);
}
ReplSetConfig ReplicationCoordinatorImpl::getConfig() const {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
return _rsConfig;
}
void ReplicationCoordinatorImpl::processReplSetGetConfig(BSONObjBuilder* result) {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
result->append("config", _rsConfig.toBSON());
}
@@ -2458,7 +2454,7 @@ void ReplicationCoordinatorImpl::processReplSetMetadata(const rpc::ReplSetMetada
EventHandle evh;
{
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
evh = _processReplSetMetadata_inlock(replMetadata);
}
@@ -2468,7 +2464,7 @@ void ReplicationCoordinatorImpl::processReplSetMetadata(const rpc::ReplSetMetada
}
void ReplicationCoordinatorImpl::cancelAndRescheduleElectionTimeout() {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
_cancelAndRescheduleElectionTimeout_inlock();
}
@@ -2481,7 +2477,7 @@ EventHandle ReplicationCoordinatorImpl::_processReplSetMetadata_inlock(
}
bool ReplicationCoordinatorImpl::getMaintenanceMode() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
return _topCoord->getMaintenanceCount() > 0;
}
@@ -2491,7 +2487,7 @@ Status ReplicationCoordinatorImpl::setMaintenanceMode(bool activate) {
"can only set maintenance mode on replica set members");
}
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
if (_topCoord->getRole() == TopologyCoordinator::Role::kCandidate) {
return Status(ErrorCodes::NotSecondary, "currently running for election");
}
@@ -2530,7 +2526,7 @@ Status ReplicationCoordinatorImpl::processReplSetSyncFrom(OperationContext* opCt
Status result(ErrorCodes::InternalError, "didn't set status in prepareSyncFromResponse");
auto doResync = false;
{
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
_topCoord->prepareSyncFromResponse(target, resultObj, &result);
// If we are in the middle of an initial sync, do a resync.
doResync = result.isOK() && _initialSyncer && _initialSyncer->isActive();
@@ -2545,7 +2541,7 @@ Status ReplicationCoordinatorImpl::processReplSetSyncFrom(OperationContext* opCt
Status ReplicationCoordinatorImpl::processReplSetFreeze(int secs, BSONObjBuilder* resultObj) {
auto result = [=]() {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
return _topCoord->prepareFreezeResponse(_replExecutor->now(), secs, resultObj);
}();
if (!result.isOK()) {
@@ -2569,7 +2565,7 @@ Status ReplicationCoordinatorImpl::processReplSetReconfig(OperationContext* opCt
log() << "replSetReconfig admin command received from client; new config: "
<< args.newConfigObj;
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
while (_rsConfigState == kConfigPreStart || _rsConfigState == kConfigStartingUp) {
_rsConfigStateChange.wait(lk);
@@ -2625,7 +2621,6 @@ Status ReplicationCoordinatorImpl::processReplSetReconfig(OperationContext* opCt
if (!status.isOK()) {
error() << "replSetReconfig got " << status << " while parsing " << newConfigObj;
return Status(ErrorCodes::InvalidReplicaSetConfig, status.reason());
- ;
}
if (newConfig.getReplSetName() != _settings.ourSetName()) {
str::stream errmsg;
@@ -2674,7 +2669,7 @@ void ReplicationCoordinatorImpl::_finishReplSetReconfig(OperationContext* opCtx,
// Do not conduct an election during a reconfig, as the node may not be electable post-reconfig.
executor::TaskExecutor::EventHandle electionFinishedEvent;
{
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
electionFinishedEvent = _cancelElectionIfNeeded_inlock();
}
@@ -2689,7 +2684,7 @@ void ReplicationCoordinatorImpl::_finishReplSetReconfig(OperationContext* opCtx,
}
boost::optional<AutoGetRstlForStepUpStepDown> arsd;
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
if (isForceReconfig && _shouldStepDownOnReconfig(lk, newConfig, myIndex)) {
_topCoord->prepareForUnconditionalStepDown();
lk.unlock();
@@ -2748,7 +2743,7 @@ Status ReplicationCoordinatorImpl::processReplSetInitiate(OperationContext* opCt
log() << "replSetInitiate admin command received from client";
const auto replEnabled = _settings.usingReplSets();
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
if (!replEnabled) {
return Status(ErrorCodes::NoReplicationEnabled, "server is not running with --replSet");
}
@@ -2837,7 +2832,7 @@ Status ReplicationCoordinatorImpl::processReplSetInitiate(OperationContext* opCt
void ReplicationCoordinatorImpl::_finishReplSetInitiate(OperationContext* opCtx,
const ReplSetConfig& newConfig,
int myIndex) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
invariant(_rsConfigState == kConfigInitiating);
invariant(!_rsConfig.isInitialized());
auto action = _setCurrentRSConfig(lk, opCtx, newConfig, myIndex);
@@ -3065,7 +3060,7 @@ void ReplicationCoordinatorImpl::CatchupState::start_inlock() {
if (!cbData.status.isOK()) {
return;
}
- stdx::lock_guard<stdx::mutex> lk(*mutex);
+ stdx::lock_guard<Latch> lk(*mutex);
// Check whether the callback has been cancelled while holding mutex.
if (cbData.myHandle.isCanceled()) {
return;
@@ -3177,7 +3172,7 @@ void ReplicationCoordinatorImpl::CatchupState::incrementNumCatchUpOps_inlock(int
}
Status ReplicationCoordinatorImpl::abortCatchupIfNeeded(PrimaryCatchUpConclusionReason reason) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
if (_catchupState) {
_catchupState->abort_inlock(reason);
return Status::OK();
@@ -3186,14 +3181,14 @@ Status ReplicationCoordinatorImpl::abortCatchupIfNeeded(PrimaryCatchUpConclusion
}
void ReplicationCoordinatorImpl::incrementNumCatchUpOpsIfCatchingUp(int numOps) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
if (_catchupState) {
_catchupState->incrementNumCatchUpOps_inlock(numOps);
}
}
void ReplicationCoordinatorImpl::signalDropPendingCollectionsRemovedFromStorage() {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
_wakeReadyWaiters(lock);
}
@@ -3310,7 +3305,7 @@ void ReplicationCoordinatorImpl::_wakeReadyWaiters(WithLock lk) {
Status ReplicationCoordinatorImpl::processReplSetUpdatePosition(const UpdatePositionArgs& updates,
long long* configVersion) {
- stdx::unique_lock<stdx::mutex> lock(_mutex);
+ stdx::unique_lock<Latch> lock(_mutex);
Status status = Status::OK();
bool somethingChanged = false;
for (UpdatePositionArgs::UpdateIterator update = updates.updatesBegin();
@@ -3332,7 +3327,7 @@ Status ReplicationCoordinatorImpl::processReplSetUpdatePosition(const UpdatePosi
}
bool ReplicationCoordinatorImpl::buildsIndexes() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
if (_selfIndex == -1) {
return true;
}
@@ -3342,12 +3337,12 @@ bool ReplicationCoordinatorImpl::buildsIndexes() {
std::vector<HostAndPort> ReplicationCoordinatorImpl::getHostsWrittenTo(const OpTime& op,
bool durablyWritten) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
return _topCoord->getHostsWrittenTo(op, durablyWritten);
}
std::vector<HostAndPort> ReplicationCoordinatorImpl::getOtherNodesInReplSet() const {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
invariant(_settings.usingReplSets());
std::vector<HostAndPort> nodes;
@@ -3366,7 +3361,7 @@ std::vector<HostAndPort> ReplicationCoordinatorImpl::getOtherNodesInReplSet() co
Status ReplicationCoordinatorImpl::checkIfWriteConcernCanBeSatisfied(
const WriteConcernOptions& writeConcern) const {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
return _checkIfWriteConcernCanBeSatisfied_inlock(writeConcern);
}
@@ -3383,7 +3378,7 @@ Status ReplicationCoordinatorImpl::_checkIfWriteConcernCanBeSatisfied_inlock(
Status ReplicationCoordinatorImpl::checkIfCommitQuorumCanBeSatisfied(
const CommitQuorumOptions& commitQuorum) const {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
return _checkIfCommitQuorumCanBeSatisfied(lock, commitQuorum);
}
@@ -3416,7 +3411,7 @@ StatusWith<bool> ReplicationCoordinatorImpl::checkIfCommitQuorumIsSatisfied(
// If the 'commitQuorum' cannot be satisfied with all the members of this replica set, we
// need to inform the caller to avoid hanging while waiting for satisfiability of the
// 'commitQuorum' with 'commitReadyMembers' due to replica set reconfigurations.
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
Status status = _checkIfCommitQuorumCanBeSatisfied(lock, commitQuorum);
if (!status.isOK()) {
return status;
@@ -3427,7 +3422,7 @@ StatusWith<bool> ReplicationCoordinatorImpl::checkIfCommitQuorumIsSatisfied(
}
WriteConcernOptions ReplicationCoordinatorImpl::getGetLastErrorDefault() {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
if (_rsConfig.isInitialized()) {
return _rsConfig.getDefaultWriteConcern();
}
@@ -3455,7 +3450,7 @@ bool ReplicationCoordinatorImpl::isReplEnabled() const {
}
HostAndPort ReplicationCoordinatorImpl::chooseNewSyncSource(const OpTime& lastOpTimeFetched) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
HostAndPort oldSyncSource = _topCoord->getSyncSourceAddress();
// Always allow chaining while in catchup and drain mode.
@@ -3480,12 +3475,12 @@ void ReplicationCoordinatorImpl::_unblacklistSyncSource(
if (cbData.status == ErrorCodes::CallbackCanceled)
return;
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
_topCoord->unblacklistSyncSource(host, _replExecutor->now());
}
void ReplicationCoordinatorImpl::blacklistSyncSource(const HostAndPort& host, Date_t until) {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
_topCoord->blacklistSyncSource(host, until);
_scheduleWorkAt(until, [=](const executor::TaskExecutor::CallbackArgs& cbData) {
_unblacklistSyncSource(cbData, host);
@@ -3509,7 +3504,7 @@ void ReplicationCoordinatorImpl::resetLastOpTimesFromOplog(OperationContext* opC
_externalState->setGlobalTimestamp(opCtx->getServiceContext(),
lastOpTimeAndWallTime.opTime.getTimestamp());
- stdx::unique_lock<stdx::mutex> lock(_mutex);
+ stdx::unique_lock<Latch> lock(_mutex);
bool isRollbackAllowed = true;
_setMyLastAppliedOpTimeAndWallTime(lock, lastOpTimeAndWallTime, isRollbackAllowed, consistency);
_setMyLastDurableOpTimeAndWallTime(lock, lastOpTimeAndWallTime, isRollbackAllowed);
@@ -3520,7 +3515,7 @@ bool ReplicationCoordinatorImpl::shouldChangeSyncSource(
const HostAndPort& currentSource,
const rpc::ReplSetMetadata& replMetadata,
boost::optional<rpc::OplogQueryMetadata> oqMetadata) {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
return _topCoord->shouldChangeSyncSource(
currentSource, replMetadata, oqMetadata, _replExecutor->now());
}
@@ -3615,7 +3610,7 @@ void ReplicationCoordinatorImpl::_cleanupStableOpTimeCandidates(
boost::optional<OpTimeAndWallTime>
ReplicationCoordinatorImpl::chooseStableOpTimeFromCandidates_forTest(
const std::set<OpTimeAndWallTime>& candidates, const OpTimeAndWallTime& maximumStableOpTime) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
return _chooseStableOpTimeFromCandidates(lk, candidates, maximumStableOpTime);
}
void ReplicationCoordinatorImpl::cleanupStableOpTimeCandidates_forTest(
@@ -3624,12 +3619,12 @@ void ReplicationCoordinatorImpl::cleanupStableOpTimeCandidates_forTest(
}
std::set<OpTimeAndWallTime> ReplicationCoordinatorImpl::getStableOpTimeCandidates_forTest() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
return _stableOpTimeCandidates;
}
void ReplicationCoordinatorImpl::attemptToAdvanceStableTimestamp() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
_setStableTimestampForStorage(lk);
}
@@ -3757,7 +3752,7 @@ void ReplicationCoordinatorImpl::finishRecoveryIfEligible(OperationContext* opCt
void ReplicationCoordinatorImpl::advanceCommitPoint(
const OpTimeAndWallTime& committedOpTimeAndWallTime, bool fromSyncSource) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
_advanceCommitPoint(lk, committedOpTimeAndWallTime, fromSyncSource);
}
@@ -3779,12 +3774,12 @@ void ReplicationCoordinatorImpl::_advanceCommitPoint(
}
OpTime ReplicationCoordinatorImpl::getLastCommittedOpTime() const {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
return _topCoord->getLastCommittedOpTime();
}
OpTimeAndWallTime ReplicationCoordinatorImpl::getLastCommittedOpTimeAndWallTime() const {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
return _topCoord->getLastCommittedOpTimeAndWallTime();
}
@@ -3798,7 +3793,7 @@ Status ReplicationCoordinatorImpl::processReplSetRequestVotes(
return termStatus;
{
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
// We should only enter terminal shutdown from global terminal exit. In that case, rather
// than voting in a term we don't plan to stay alive in, refuse to vote.
@@ -3839,7 +3834,7 @@ void ReplicationCoordinatorImpl::prepareReplMetadata(const BSONObj& metadataRequ
invariant(-1 != rbid);
}
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
if (hasReplSetMetadata) {
_prepareReplSetMetadata_inlock(lastOpTimeFromClient, builder);
@@ -3874,7 +3869,7 @@ bool ReplicationCoordinatorImpl::getWriteConcernMajorityShouldJournal_inlock() c
Status ReplicationCoordinatorImpl::processHeartbeatV1(const ReplSetHeartbeatArgsV1& args,
ReplSetHeartbeatResponse* response) {
{
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
if (_rsConfigState == kConfigPreStart || _rsConfigState == kConfigStartingUp) {
return Status(ErrorCodes::NotYetInitialized,
"Received heartbeat while still initializing replication system");
@@ -3882,7 +3877,7 @@ Status ReplicationCoordinatorImpl::processHeartbeatV1(const ReplSetHeartbeatArgs
}
Status result(ErrorCodes::InternalError, "didn't set status in prepareHeartbeatResponse");
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
auto senderHost(args.getSenderHost());
const Date_t now = _replExecutor->now();
@@ -3915,7 +3910,7 @@ long long ReplicationCoordinatorImpl::getTerm() {
EventHandle ReplicationCoordinatorImpl::updateTerm_forTest(
long long term, TopologyCoordinator::UpdateTermResult* updateResult) {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
EventHandle finishEvh;
finishEvh = _updateTerm_inlock(term, updateResult);
@@ -3934,7 +3929,7 @@ Status ReplicationCoordinatorImpl::updateTerm(OperationContext* opCtx, long long
EventHandle finishEvh;
{
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
finishEvh = _updateTerm_inlock(term, &updateTermResult);
}
@@ -3983,7 +3978,7 @@ EventHandle ReplicationCoordinatorImpl::_updateTerm_inlock(
void ReplicationCoordinatorImpl::waitUntilSnapshotCommitted(OperationContext* opCtx,
const Timestamp& untilSnapshot) {
- stdx::unique_lock<stdx::mutex> lock(_mutex);
+ stdx::unique_lock<Latch> lock(_mutex);
uassert(ErrorCodes::NotYetInitialized,
"Cannot use snapshots until replica set is finished initializing.",
@@ -3999,7 +3994,7 @@ size_t ReplicationCoordinatorImpl::getNumUncommittedSnapshots() {
}
void ReplicationCoordinatorImpl::createWMajorityWriteAvailabilityDateWaiter(OpTime opTime) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
auto opTimeCB = [this, opTime]() {
ReplicationMetrics::get(getServiceContext())
.setWMajorityWriteAvailabilityDate(_replExecutor->now());
@@ -4045,7 +4040,7 @@ bool ReplicationCoordinatorImpl::_updateCommittedSnapshot(
}
void ReplicationCoordinatorImpl::dropAllSnapshots() {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
_dropAllSnapshots_inlock();
}
@@ -4091,7 +4086,7 @@ EventHandle ReplicationCoordinatorImpl::_makeEvent() {
WriteConcernOptions ReplicationCoordinatorImpl::populateUnsetWriteConcernOptionsSyncMode(
WriteConcernOptions wc) {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
return _populateUnsetWriteConcernOptionsSyncMode(lock, wc);
}
@@ -4127,7 +4122,7 @@ Status ReplicationCoordinatorImpl::stepUpIfEligible(bool skipDryRun) {
EventHandle finishEvent;
{
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
finishEvent = _electionFinishedEvent;
}
if (finishEvent.isValid()) {
@@ -4137,7 +4132,7 @@ Status ReplicationCoordinatorImpl::stepUpIfEligible(bool skipDryRun) {
// Step up is considered successful only if we are currently a primary and we are not in the
// process of stepping down. If we know we are going to step down, we should fail the
// replSetStepUp command so caller can retry if necessary.
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
if (!_getMemberState_inlock().primary())
return Status(ErrorCodes::CommandFailed, "Election failed.");
else if (_topCoord->isSteppingDown())
@@ -4160,7 +4155,7 @@ int64_t ReplicationCoordinatorImpl::_nextRandomInt64_inlock(int64_t limit) {
}
bool ReplicationCoordinatorImpl::setContainsArbiter() const {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
return _rsConfig.containsArbiter();
}