summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/replication_coordinator_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_impl.cpp')
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp65
1 files changed, 23 insertions, 42 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 20480170b71..d44e28f117a 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -269,10 +269,9 @@ InitialSyncerOptions createInitialSyncerOptions(
InitialSyncerOptions options;
options.getMyLastOptime = [replCoord]() { return replCoord->getMyLastAppliedOpTime(); };
options.setMyLastOptime = [replCoord,
- externalState](const OpTimeAndWallTime& opTimeAndWallTime,
- ReplicationCoordinator::DataConsistency consistency) {
+ externalState](const OpTimeAndWallTime& opTimeAndWallTime) {
// Note that setting the last applied opTime forward also advances the global timestamp.
- replCoord->setMyLastAppliedOpTimeAndWallTimeForward(opTimeAndWallTime, consistency);
+ replCoord->setMyLastAppliedOpTimeAndWallTimeForward(opTimeAndWallTime);
// The oplog application phase of initial sync starts timestamping writes, causing
// WiredTiger to pin this data in memory. Advancing the oldest timestamp in step with the
// last applied optime here will permit WiredTiger to evict this data as it sees fit.
@@ -649,7 +648,6 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig(
}
auto opCtx = cc().makeOperationContext();
- auto consistency = DataConsistency::Inconsistent;
if (!lastOpTime.isNull()) {
// If we have an oplog, it is still possible that our data is not in a consistent state. For
@@ -657,8 +655,6 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig(
// To detect this, we see if our last optime is >= the 'minValid' optime, which
// should be persistent across node crashes.
OpTime minValid = _replicationProcess->getConsistencyMarkers()->getMinValid(opCtx.get());
- consistency =
- (lastOpTime >= minValid) ? DataConsistency::Consistent : DataConsistency::Inconsistent;
// It is not safe to take stable checkpoints until we reach minValid, so we set our
// initialDataTimestamp to prevent this. It is expected that this is only necessary when
@@ -686,8 +682,7 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig(
// Set our last applied and durable optimes to the top of the oplog, if we have one.
if (!lastOpTime.isNull()) {
bool isRollbackAllowed = false;
- _setMyLastAppliedOpTimeAndWallTime(
- lock, lastOpTimeAndWallTime, isRollbackAllowed, consistency);
+ _setMyLastAppliedOpTimeAndWallTime(lock, lastOpTimeAndWallTime, isRollbackAllowed);
_setMyLastDurableOpTimeAndWallTime(lock, lastOpTimeAndWallTime, isRollbackAllowed);
_reportUpstream_inlock(std::move(lock)); // unlocks _mutex.
} else {
@@ -786,8 +781,7 @@ void ReplicationCoordinatorImpl::_startDataReplication(OperationContext* opCtx,
}
const auto lastApplied = opTimeStatus.getValue();
- _setMyLastAppliedOpTimeAndWallTime(
- lock, lastApplied, false, DataConsistency::Consistent);
+ _setMyLastAppliedOpTimeAndWallTime(lock, lastApplied, false);
}
// Clear maint. mode.
@@ -1281,7 +1275,7 @@ void ReplicationCoordinatorImpl::setMyHeartbeatMessage(const std::string& msg) {
}
void ReplicationCoordinatorImpl::setMyLastAppliedOpTimeAndWallTimeForward(
- const OpTimeAndWallTime& opTimeAndWallTime, DataConsistency consistency) {
+ const OpTimeAndWallTime& opTimeAndWallTime) {
// Update the global timestamp before setting the last applied opTime forward so the last
// applied optime is never greater than the latest cluster time in the logical clock.
const auto opTime = opTimeAndWallTime.opTime;
@@ -1290,7 +1284,7 @@ void ReplicationCoordinatorImpl::setMyLastAppliedOpTimeAndWallTimeForward(
stdx::unique_lock<Latch> lock(_mutex);
auto myLastAppliedOpTime = _getMyLastAppliedOpTime_inlock();
if (opTime > myLastAppliedOpTime) {
- _setMyLastAppliedOpTimeAndWallTime(lock, opTimeAndWallTime, false, consistency);
+ _setMyLastAppliedOpTimeAndWallTime(lock, opTimeAndWallTime, false);
_reportUpstream_inlock(std::move(lock));
} else {
if (opTime != myLastAppliedOpTime) {
@@ -1302,8 +1296,7 @@ void ReplicationCoordinatorImpl::setMyLastAppliedOpTimeAndWallTimeForward(
opTime.getTimestamp() < myLastAppliedOpTime.getTimestamp());
}
- if (consistency == DataConsistency::Consistent &&
- _readWriteAbility->canAcceptNonLocalWrites(lock) && _rsConfig.getWriteMajority() == 1) {
+ if (_readWriteAbility->canAcceptNonLocalWrites(lock) && _rsConfig.getWriteMajority() == 1) {
// Single vote primaries may have a lagged stable timestamp due to paring back the
// stable timestamp to the all committed timestamp.
_setStableTimestampForStorage(lock);
@@ -1334,7 +1327,7 @@ void ReplicationCoordinatorImpl::setMyLastAppliedOpTimeAndWallTime(
stdx::unique_lock<Latch> lock(_mutex);
// The optime passed to this function is required to represent a consistent database state.
- _setMyLastAppliedOpTimeAndWallTime(lock, opTimeAndWallTime, false, DataConsistency::Consistent);
+ _setMyLastAppliedOpTimeAndWallTime(lock, opTimeAndWallTime, false);
_reportUpstream_inlock(std::move(lock));
}
@@ -1355,8 +1348,7 @@ void ReplicationCoordinatorImpl::_resetMyLastOpTimes(WithLock lk) {
LOGV2_DEBUG(21332, 1, "Resetting durable/applied optimes");
// Reset to uninitialized OpTime
bool isRollbackAllowed = true;
- _setMyLastAppliedOpTimeAndWallTime(
- lk, OpTimeAndWallTime(), isRollbackAllowed, DataConsistency::Inconsistent);
+ _setMyLastAppliedOpTimeAndWallTime(lk, OpTimeAndWallTime(), isRollbackAllowed);
_setMyLastDurableOpTimeAndWallTime(lk, OpTimeAndWallTime(), isRollbackAllowed);
}
@@ -1377,10 +1369,7 @@ void ReplicationCoordinatorImpl::_reportUpstream_inlock(stdx::unique_lock<Latch>
}
void ReplicationCoordinatorImpl::_setMyLastAppliedOpTimeAndWallTime(
- WithLock lk,
- const OpTimeAndWallTime& opTimeAndWallTime,
- bool isRollbackAllowed,
- DataConsistency consistency) {
+ WithLock lk, const OpTimeAndWallTime& opTimeAndWallTime, bool isRollbackAllowed) {
const auto opTime = opTimeAndWallTime.opTime;
// The last applied opTime should never advance beyond the global timestamp (i.e. the latest
@@ -1419,22 +1408,16 @@ void ReplicationCoordinatorImpl::_setMyLastAppliedOpTimeAndWallTime(
return;
}
- // Add the new applied optime to the list of stable optime candidates and then set the last
- // stable optime. Stable optimes are used to determine the last optime that it is safe to revert
- // the database to, in the event of a rollback via the 'recover to timestamp' method. If we are
- // setting our applied optime to a value that doesn't represent a consistent database state, we
- // should not add it to the set of stable optime candidates. For example, if we are in
- // RECOVERING after a rollback using the 'rollbackViaRefetch' algorithm, we will be inconsistent
- // until we reach the 'minValid' optime.
- if (consistency == DataConsistency::Consistent) {
- invariant(opTime.getTimestamp().getInc() > 0,
- str::stream() << "Impossible optime received: " << opTime.toString());
- // If we are lagged behind the commit optime, set a new stable timestamp here. When majority
- // read concern is disabled, the stable timestamp is set to lastApplied.
- if (opTime <= _topCoord->getLastCommittedOpTime() ||
- !serverGlobalParams.enableMajorityReadConcern) {
- _setStableTimestampForStorage(lk);
- }
+ // Advance the stable timestamp if necessary. Stable timestamps are used to determine the latest
+ // timestamp that it is safe to revert the database to, in the event of a rollback via the
+ // 'recover to timestamp' method.
+ invariant(opTime.getTimestamp().getInc() > 0,
+ str::stream() << "Impossible optime received: " << opTime.toString());
+ // If we are lagged behind the commit optime, set a new stable timestamp here. When majority
+ // read concern is disabled, the stable timestamp is set to lastApplied.
+ if (opTime <= _topCoord->getLastCommittedOpTime() ||
+ !serverGlobalParams.enableMajorityReadConcern) {
+ _setStableTimestampForStorage(lk);
}
}
@@ -4775,8 +4758,7 @@ void ReplicationCoordinatorImpl::blacklistSyncSource(const HostAndPort& host, Da
});
}
-void ReplicationCoordinatorImpl::resetLastOpTimesFromOplog(OperationContext* opCtx,
- DataConsistency consistency) {
+void ReplicationCoordinatorImpl::resetLastOpTimesFromOplog(OperationContext* opCtx) {
auto lastOpTimeAndWallTimeStatus = _externalState->loadLastOpTimeAndWallTime(opCtx);
OpTimeAndWallTime lastOpTimeAndWallTime = {OpTime(), Date_t()};
if (!lastOpTimeAndWallTimeStatus.getStatus().isOK()) {
@@ -4797,7 +4779,7 @@ void ReplicationCoordinatorImpl::resetLastOpTimesFromOplog(OperationContext* opC
stdx::unique_lock<Latch> lock(_mutex);
bool isRollbackAllowed = true;
- _setMyLastAppliedOpTimeAndWallTime(lock, lastOpTimeAndWallTime, isRollbackAllowed, consistency);
+ _setMyLastAppliedOpTimeAndWallTime(lock, lastOpTimeAndWallTime, isRollbackAllowed);
_setMyLastDurableOpTimeAndWallTime(lock, lastOpTimeAndWallTime, isRollbackAllowed);
_reportUpstream_inlock(std::move(lock));
}
@@ -5067,8 +5049,7 @@ void ReplicationCoordinatorImpl::_advanceCommitPoint(
if (_getMemberState_inlock().arbiter()) {
// Arbiters do not store replicated data, so we consider their data trivially
// consistent.
- _setMyLastAppliedOpTimeAndWallTime(
- lk, committedOpTimeAndWallTime, false, DataConsistency::Consistent);
+ _setMyLastAppliedOpTimeAndWallTime(lk, committedOpTimeAndWallTime, false);
}
_setStableTimestampForStorage(lk);