diff options
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_impl.cpp')
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 65 |
1 files changed, 23 insertions, 42 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 20480170b71..d44e28f117a 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -269,10 +269,9 @@ InitialSyncerOptions createInitialSyncerOptions( InitialSyncerOptions options; options.getMyLastOptime = [replCoord]() { return replCoord->getMyLastAppliedOpTime(); }; options.setMyLastOptime = [replCoord, - externalState](const OpTimeAndWallTime& opTimeAndWallTime, - ReplicationCoordinator::DataConsistency consistency) { + externalState](const OpTimeAndWallTime& opTimeAndWallTime) { // Note that setting the last applied opTime forward also advances the global timestamp. - replCoord->setMyLastAppliedOpTimeAndWallTimeForward(opTimeAndWallTime, consistency); + replCoord->setMyLastAppliedOpTimeAndWallTimeForward(opTimeAndWallTime); // The oplog application phase of initial sync starts timestamping writes, causing // WiredTiger to pin this data in memory. Advancing the oldest timestamp in step with the // last applied optime here will permit WiredTiger to evict this data as it sees fit. @@ -649,7 +648,6 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig( } auto opCtx = cc().makeOperationContext(); - auto consistency = DataConsistency::Inconsistent; if (!lastOpTime.isNull()) { // If we have an oplog, it is still possible that our data is not in a consistent state. For @@ -657,8 +655,6 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig( // To detect this, we see if our last optime is >= the 'minValid' optime, which // should be persistent across node crashes. OpTime minValid = _replicationProcess->getConsistencyMarkers()->getMinValid(opCtx.get()); - consistency = - (lastOpTime >= minValid) ? DataConsistency::Consistent : DataConsistency::Inconsistent; // It is not safe to take stable checkpoints until we reach minValid, so we set our // initialDataTimestamp to prevent this. It is expected that this is only necessary when @@ -686,8 +682,7 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig( // Set our last applied and durable optimes to the top of the oplog, if we have one. if (!lastOpTime.isNull()) { bool isRollbackAllowed = false; - _setMyLastAppliedOpTimeAndWallTime( - lock, lastOpTimeAndWallTime, isRollbackAllowed, consistency); + _setMyLastAppliedOpTimeAndWallTime(lock, lastOpTimeAndWallTime, isRollbackAllowed); _setMyLastDurableOpTimeAndWallTime(lock, lastOpTimeAndWallTime, isRollbackAllowed); _reportUpstream_inlock(std::move(lock)); // unlocks _mutex. } else { @@ -786,8 +781,7 @@ void ReplicationCoordinatorImpl::_startDataReplication(OperationContext* opCtx, } const auto lastApplied = opTimeStatus.getValue(); - _setMyLastAppliedOpTimeAndWallTime( - lock, lastApplied, false, DataConsistency::Consistent); + _setMyLastAppliedOpTimeAndWallTime(lock, lastApplied, false); } // Clear maint. mode. @@ -1281,7 +1275,7 @@ void ReplicationCoordinatorImpl::setMyHeartbeatMessage(const std::string& msg) { } void ReplicationCoordinatorImpl::setMyLastAppliedOpTimeAndWallTimeForward( - const OpTimeAndWallTime& opTimeAndWallTime, DataConsistency consistency) { + const OpTimeAndWallTime& opTimeAndWallTime) { // Update the global timestamp before setting the last applied opTime forward so the last // applied optime is never greater than the latest cluster time in the logical clock. const auto opTime = opTimeAndWallTime.opTime; @@ -1290,7 +1284,7 @@ void ReplicationCoordinatorImpl::setMyLastAppliedOpTimeAndWallTimeForward( stdx::unique_lock<Latch> lock(_mutex); auto myLastAppliedOpTime = _getMyLastAppliedOpTime_inlock(); if (opTime > myLastAppliedOpTime) { - _setMyLastAppliedOpTimeAndWallTime(lock, opTimeAndWallTime, false, consistency); + _setMyLastAppliedOpTimeAndWallTime(lock, opTimeAndWallTime, false); _reportUpstream_inlock(std::move(lock)); } else { if (opTime != myLastAppliedOpTime) { @@ -1302,8 +1296,7 @@ void ReplicationCoordinatorImpl::setMyLastAppliedOpTimeAndWallTimeForward( opTime.getTimestamp() < myLastAppliedOpTime.getTimestamp()); } - if (consistency == DataConsistency::Consistent && - _readWriteAbility->canAcceptNonLocalWrites(lock) && _rsConfig.getWriteMajority() == 1) { + if (_readWriteAbility->canAcceptNonLocalWrites(lock) && _rsConfig.getWriteMajority() == 1) { // Single vote primaries may have a lagged stable timestamp due to paring back the // stable timestamp to the all committed timestamp. _setStableTimestampForStorage(lock); @@ -1334,7 +1327,7 @@ void ReplicationCoordinatorImpl::setMyLastAppliedOpTimeAndWallTime( stdx::unique_lock<Latch> lock(_mutex); // The optime passed to this function is required to represent a consistent database state. - _setMyLastAppliedOpTimeAndWallTime(lock, opTimeAndWallTime, false, DataConsistency::Consistent); + _setMyLastAppliedOpTimeAndWallTime(lock, opTimeAndWallTime, false); _reportUpstream_inlock(std::move(lock)); } @@ -1355,8 +1348,7 @@ void ReplicationCoordinatorImpl::_resetMyLastOpTimes(WithLock lk) { LOGV2_DEBUG(21332, 1, "Resetting durable/applied optimes"); // Reset to uninitialized OpTime bool isRollbackAllowed = true; - _setMyLastAppliedOpTimeAndWallTime( - lk, OpTimeAndWallTime(), isRollbackAllowed, DataConsistency::Inconsistent); + _setMyLastAppliedOpTimeAndWallTime(lk, OpTimeAndWallTime(), isRollbackAllowed); _setMyLastDurableOpTimeAndWallTime(lk, OpTimeAndWallTime(), isRollbackAllowed); } @@ -1377,10 +1369,7 @@ void ReplicationCoordinatorImpl::_reportUpstream_inlock(stdx::unique_lock<Latch> } void ReplicationCoordinatorImpl::_setMyLastAppliedOpTimeAndWallTime( - WithLock lk, - const OpTimeAndWallTime& opTimeAndWallTime, - bool isRollbackAllowed, - DataConsistency consistency) { + WithLock lk, const OpTimeAndWallTime& opTimeAndWallTime, bool isRollbackAllowed) { const auto opTime = opTimeAndWallTime.opTime; // The last applied opTime should never advance beyond the global timestamp (i.e. the latest @@ -1419,22 +1408,16 @@ void ReplicationCoordinatorImpl::_setMyLastAppliedOpTimeAndWallTime( return; } - // Add the new applied optime to the list of stable optime candidates and then set the last - // stable optime. Stable optimes are used to determine the last optime that it is safe to revert - // the database to, in the event of a rollback via the 'recover to timestamp' method. If we are - // setting our applied optime to a value that doesn't represent a consistent database state, we - // should not add it to the set of stable optime candidates. For example, if we are in - // RECOVERING after a rollback using the 'rollbackViaRefetch' algorithm, we will be inconsistent - // until we reach the 'minValid' optime. - if (consistency == DataConsistency::Consistent) { - invariant(opTime.getTimestamp().getInc() > 0, - str::stream() << "Impossible optime received: " << opTime.toString()); - // If we are lagged behind the commit optime, set a new stable timestamp here. When majority - // read concern is disabled, the stable timestamp is set to lastApplied. - if (opTime <= _topCoord->getLastCommittedOpTime() || - !serverGlobalParams.enableMajorityReadConcern) { - _setStableTimestampForStorage(lk); - } + // Advance the stable timestamp if necessary. Stable timestamps are used to determine the latest + // timestamp that it is safe to revert the database to, in the event of a rollback via the + // 'recover to timestamp' method. + invariant(opTime.getTimestamp().getInc() > 0, + str::stream() << "Impossible optime received: " << opTime.toString()); + // If we are lagged behind the commit optime, set a new stable timestamp here. When majority + // read concern is disabled, the stable timestamp is set to lastApplied. + if (opTime <= _topCoord->getLastCommittedOpTime() || + !serverGlobalParams.enableMajorityReadConcern) { + _setStableTimestampForStorage(lk); } } @@ -4775,8 +4758,7 @@ void ReplicationCoordinatorImpl::blacklistSyncSource(const HostAndPort& host, Da }); } -void ReplicationCoordinatorImpl::resetLastOpTimesFromOplog(OperationContext* opCtx, - DataConsistency consistency) { +void ReplicationCoordinatorImpl::resetLastOpTimesFromOplog(OperationContext* opCtx) { auto lastOpTimeAndWallTimeStatus = _externalState->loadLastOpTimeAndWallTime(opCtx); OpTimeAndWallTime lastOpTimeAndWallTime = {OpTime(), Date_t()}; if (!lastOpTimeAndWallTimeStatus.getStatus().isOK()) { @@ -4797,7 +4779,7 @@ void ReplicationCoordinatorImpl::resetLastOpTimesFromOplog(OperationContext* opC stdx::unique_lock<Latch> lock(_mutex); bool isRollbackAllowed = true; - _setMyLastAppliedOpTimeAndWallTime(lock, lastOpTimeAndWallTime, isRollbackAllowed, consistency); + _setMyLastAppliedOpTimeAndWallTime(lock, lastOpTimeAndWallTime, isRollbackAllowed); _setMyLastDurableOpTimeAndWallTime(lock, lastOpTimeAndWallTime, isRollbackAllowed); _reportUpstream_inlock(std::move(lock)); } @@ -5067,8 +5049,7 @@ void ReplicationCoordinatorImpl::_advanceCommitPoint( if (_getMemberState_inlock().arbiter()) { // Arbiters do not store replicated data, so we consider their data trivially // consistent. - _setMyLastAppliedOpTimeAndWallTime( - lk, committedOpTimeAndWallTime, false, DataConsistency::Consistent); + _setMyLastAppliedOpTimeAndWallTime(lk, committedOpTimeAndWallTime, false); } _setStableTimestampForStorage(lk); |