summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWilliam Schultz <william.schultz@mongodb.com>2017-11-21 15:52:48 -0500
committerWilliam Schultz <william.schultz@mongodb.com>2017-11-21 16:19:35 -0500
commit3867aecb8eb2a0d8c4835f9adf3e76c83e607a10 (patch)
tree669f3a1fc040a7b0bc89b1d72fa9f04b16b0b6f7
parent01be30b1e364f10f3b0ba7e7b00fd81337bae434 (diff)
downloadmongo-3867aecb8eb2a0d8c4835f9adf3e76c83e607a10.tar.gz
SERVER-30577 Don't update the stable timestamp if database is in an inconsistent state
(cherry picked from commit 30de2f7c46a9aa0914fe91cba2075b244e9b516b)
-rw-r--r--src/mongo/db/repl/initial_syncer.cpp4
-rw-r--r--src/mongo/db/repl/initial_syncer.h3
-rw-r--r--src/mongo/db/repl/initial_syncer_test.cpp20
-rw-r--r--src/mongo/db/repl/oplog.cpp4
-rw-r--r--src/mongo/db/repl/replication_coordinator.h27
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp136
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h16
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_test.cpp95
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.cpp6
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.h4
-rw-r--r--src/mongo/db/repl/rollback_impl.cpp18
-rw-r--r--src/mongo/db/repl/rollback_test_fixture.cpp2
-rw-r--r--src/mongo/db/repl/rollback_test_fixture.h3
-rw-r--r--src/mongo/db/repl/rs_rollback.cpp24
-rw-r--r--src/mongo/db/repl/rs_rollback_no_uuid.cpp23
-rw-r--r--src/mongo/db/repl/sync_tail.cpp101
16 files changed, 389 insertions, 97 deletions
diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp
index 66f01046277..14f5a8c1f21 100644
--- a/src/mongo/db/repl/initial_syncer.cpp
+++ b/src/mongo/db/repl/initial_syncer.cpp
@@ -388,7 +388,6 @@ void InitialSyncer::_tearDown_inlock(OperationContext* opCtx,
_storage->setInitialDataTimestamp(opCtx->getServiceContext(),
SnapshotName(lastApplied.getValue().opTime.getTimestamp()));
_replicationProcess->getConsistencyMarkers()->clearInitialSyncFlag(opCtx);
- _opts.setMyLastOptime(lastApplied.getValue().opTime);
log() << "initial sync done; took "
<< duration_cast<Seconds>(_stats.initialSyncEnd - _stats.initialSyncStart) << ".";
initialSyncCompletes.increment();
@@ -1022,7 +1021,8 @@ void InitialSyncer::_multiApplierCallback(const Status& multiApplierStatus,
_initialSyncState->appliedOps += numApplied;
_lastApplied = lastApplied;
- _opts.setMyLastOptime(_lastApplied.opTime);
+ _opts.setMyLastOptime(_lastApplied.opTime,
+ ReplicationCoordinator::DataConsistency::Inconsistent);
auto fetchCount = _fetchCount.load();
if (fetchCount > 0) {
diff --git a/src/mongo/db/repl/initial_syncer.h b/src/mongo/db/repl/initial_syncer.h
index 157ffcc7249..7d7b43e3727 100644
--- a/src/mongo/db/repl/initial_syncer.h
+++ b/src/mongo/db/repl/initial_syncer.h
@@ -79,7 +79,8 @@ struct InitialSyncerOptions {
using GetMyLastOptimeFn = stdx::function<OpTime()>;
/** Function to update optime of last operation applied on this node */
- using SetMyLastOptimeFn = stdx::function<void(const OpTime&)>;
+ using SetMyLastOptimeFn =
+ stdx::function<void(const OpTime&, ReplicationCoordinator::DataConsistency consistency)>;
/** Function to reset all optimes on this node (e.g. applied & durable). */
using ResetOptimesFn = stdx::function<void()>;
diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp
index 9df6094127b..504003827bd 100644
--- a/src/mongo/db/repl/initial_syncer_test.cpp
+++ b/src/mongo/db/repl/initial_syncer_test.cpp
@@ -121,7 +121,10 @@ public:
* clear/reset state
*/
void reset() {
- _setMyLastOptime = [this](const OpTime& opTime) { _myLastOpTime = opTime; };
+ _setMyLastOptime = [this](const OpTime& opTime,
+ ReplicationCoordinator::DataConsistency consistency) {
+ _myLastOpTime = opTime;
+ };
_myLastOpTime = OpTime();
_syncSourceSelector = stdx::make_unique<SyncSourceSelectorMock>();
}
@@ -357,8 +360,11 @@ protected:
InitialSyncerOptions options;
options.initialSyncRetryWait = Milliseconds(1);
options.getMyLastOptime = [this]() { return _myLastOpTime; };
- options.setMyLastOptime = [this](const OpTime& opTime) { _setMyLastOptime(opTime); };
- options.resetOptimes = [this]() { _setMyLastOptime(OpTime()); };
+ options.setMyLastOptime = [this](const OpTime& opTime,
+ ReplicationCoordinator::DataConsistency consistency) {
+ _setMyLastOptime(opTime, consistency);
+ };
+ options.resetOptimes = [this]() { _myLastOpTime = OpTime(); };
options.getSlaveDelay = [this]() { return Seconds(0); };
options.syncSourceSelector = this;
@@ -627,7 +633,8 @@ void InitialSyncerTest::processSuccessfulFCVFetcherResponse(std::vector<BSONObj>
TEST_F(InitialSyncerTest, InvalidConstruction) {
InitialSyncerOptions options;
options.getMyLastOptime = []() { return OpTime(); };
- options.setMyLastOptime = [](const OpTime&) {};
+ options.setMyLastOptime = [](const OpTime&,
+ ReplicationCoordinator::DataConsistency consistency) {};
options.resetOptimes = []() {};
options.getSlaveDelay = []() { return Seconds(0); };
options.syncSourceSelector = this;
@@ -858,9 +865,10 @@ TEST_F(InitialSyncerTest, InitialSyncerResetsOptimesOnNewAttempt) {
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort());
- // Set the last optime to an arbitrary nonzero value.
+ // Set the last optime to an arbitrary nonzero value. The value of the 'consistency' argument
+ // doesn't matter.
auto origOptime = OpTime(Timestamp(1000, 1), 1);
- _setMyLastOptime(origOptime);
+ _setMyLastOptime(origOptime, ReplicationCoordinator::DataConsistency::Inconsistent);
// Start initial sync.
const std::uint32_t initialSyncMaxAttempts = 1U;
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index ce512effa10..a49b3b1be38 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -398,7 +398,9 @@ void _logOpsInner(OperationContext* opCtx,
// Set replCoord last optime only after we're sure the WUOW didn't abort and roll back.
opCtx->recoveryUnit()->onCommit([opCtx, replCoord, finalOpTime] {
- replCoord->setMyLastAppliedOpTimeForward(finalOpTime);
+ // Optimes on the primary should always represent consistent database states.
+ replCoord->setMyLastAppliedOpTimeForward(
+ finalOpTime, ReplicationCoordinator::DataConsistency::Consistent);
ReplClientInfo::forClient(opCtx->getClient()).setLastOp(finalOpTime);
});
}
diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h
index 403792a0f99..94585d67caa 100644
--- a/src/mongo/db/repl/replication_coordinator.h
+++ b/src/mongo/db/repl/replication_coordinator.h
@@ -312,7 +312,7 @@ public:
* it is the caller's job to properly synchronize this behavior. The exception to this rule
* is that after calls to resetLastOpTimesFromOplog(), the minimum acceptable value for
* "opTime" is reset based on the contents of the oplog, and may go backwards due to
- * rollback.
+ * rollback. Additionally, the optime given MUST represent a consistent database state.
*/
virtual void setMyLastAppliedOpTime(const OpTime& opTime) = 0;
@@ -328,14 +328,27 @@ public:
virtual void setMyLastDurableOpTime(const OpTime& opTime) = 0;
/**
+ * This type is used to represent the "consistency" of a current database state. In
+ * replication, there may be times when our database data is not represented by a single optime,
+ * because we have fetched remote data from different points in time. For example, when we are
+ * in RECOVERING following a refetch based rollback. We never allow external clients to read
+ * from the database if it is not consistent.
+ */
+ enum class DataConsistency { Consistent, Inconsistent };
+
+ /**
* Updates our internal tracking of the last OpTime applied to this node, but only
* if the supplied optime is later than the current last OpTime known to the replication
- * coordinator.
+ * coordinator. The 'consistency' argument must tell whether or not the optime argument
+ * represents a consistent database state.
*
* This function is used by logOp() on a primary, since the ops in the oplog do not
- * necessarily commit in sequential order.
+ * necessarily commit in sequential order. It is also used when we finish oplog batch
+ * application on secondaries, to avoid any potential race conditions around setting the
+ * applied optime from more than one thread.
*/
- virtual void setMyLastAppliedOpTimeForward(const OpTime& opTime) = 0;
+ virtual void setMyLastAppliedOpTimeForward(const OpTime& opTime,
+ DataConsistency consistency) = 0;
/**
* Updates our internal tracking of the last OpTime durable to this node, but only
@@ -741,9 +754,11 @@ public:
/**
* Loads the optime from the last op in the oplog into the coordinator's lastAppliedOpTime and
- * lastDurableOpTime values.
+ * lastDurableOpTime values. The 'consistency' argument must tell whether or not the optime of
+ * the op in the oplog represents a consistent database state.
*/
- virtual void resetLastOpTimesFromOplog(OperationContext* opCtx) = 0;
+ virtual void resetLastOpTimesFromOplog(OperationContext* opCtx,
+ DataConsistency consistency) = 0;
/**
* Returns the OpTime of the latest replica set-committed op known to this server.
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index f50edc09f3c..dfb3259b097 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -315,8 +315,9 @@ InitialSyncerOptions createInitialSyncerOptions(
ReplicationCoordinator* replCoord, ReplicationCoordinatorExternalState* externalState) {
InitialSyncerOptions options;
options.getMyLastOptime = [replCoord]() { return replCoord->getMyLastAppliedOpTime(); };
- options.setMyLastOptime = [replCoord, externalState](const OpTime& opTime) {
- replCoord->setMyLastAppliedOpTime(opTime);
+ options.setMyLastOptime = [replCoord, externalState](
+ const OpTime& opTime, ReplicationCoordinator::DataConsistency consistency) {
+ replCoord->setMyLastAppliedOpTimeForward(opTime, consistency);
externalState->setGlobalTimestamp(replCoord->getServiceContext(), opTime.getTimestamp());
};
options.resetOptimes = [replCoord]() { replCoord->resetMyLastOpTimes(); };
@@ -585,9 +586,21 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig(
invariant(_rsConfigState == kConfigStartingUp);
const PostMemberStateUpdateAction action =
_setCurrentRSConfig_inlock(opCtx.get(), localConfig, myIndex.getValue());
+
+ // Set our last applied and durable optimes to the top of the oplog, if we have one.
if (!lastOpTime.isNull()) {
- _setMyLastAppliedOpTime_inlock(lastOpTime, false);
- _setMyLastDurableOpTime_inlock(lastOpTime, false);
+ bool isRollbackAllowed = false;
+
+ // If we have an oplog, it is still possible that our data is not in a consistent state. For
+ // example, if we are starting up after a crash following a post-rollback RECOVERING state.
+ // To detect this, we see if our last optime is >= the 'minValid' optime, which
+ // should be persistent across node crashes.
+ OpTime minValid = _replicationProcess->getConsistencyMarkers()->getMinValid(opCtx.get());
+ auto consistency =
+ (lastOpTime >= minValid) ? DataConsistency::Consistent : DataConsistency::Inconsistent;
+
+ _setMyLastAppliedOpTime_inlock(lastOpTime, isRollbackAllowed, consistency);
+ _setMyLastDurableOpTime_inlock(lastOpTime, isRollbackAllowed);
_reportUpstream_inlock(std::move(lock)); // unlocks _mutex.
} else {
lock.unlock();
@@ -669,7 +682,7 @@ void ReplicationCoordinatorImpl::_startDataReplication(OperationContext* opCtx,
}
const auto lastApplied = status.getValue();
- _setMyLastAppliedOpTime_inlock(lastApplied.opTime, false);
+ _setMyLastAppliedOpTime_inlock(lastApplied.opTime, false, DataConsistency::Consistent);
}
// Clear maint. mode.
@@ -1037,11 +1050,11 @@ void ReplicationCoordinatorImpl::setMyHeartbeatMessage(const std::string& msg) {
_topCoord->setMyHeartbeatMessage(_replExecutor->now(), msg);
}
-void ReplicationCoordinatorImpl::setMyLastAppliedOpTimeForward(const OpTime& opTime) {
+void ReplicationCoordinatorImpl::setMyLastAppliedOpTimeForward(const OpTime& opTime,
+ DataConsistency consistency) {
stdx::unique_lock<stdx::mutex> lock(_mutex);
if (opTime > _getMyLastAppliedOpTime_inlock()) {
- const bool allowRollback = false;
- _setMyLastAppliedOpTime_inlock(opTime, allowRollback);
+ _setMyLastAppliedOpTime_inlock(opTime, false, consistency);
_reportUpstream_inlock(std::move(lock));
}
}
@@ -1056,7 +1069,8 @@ void ReplicationCoordinatorImpl::setMyLastDurableOpTimeForward(const OpTime& opT
void ReplicationCoordinatorImpl::setMyLastAppliedOpTime(const OpTime& opTime) {
stdx::unique_lock<stdx::mutex> lock(_mutex);
- _setMyLastAppliedOpTime_inlock(opTime, false);
+ // The optime passed to this function is required to represent a consistent database state.
+ _setMyLastAppliedOpTime_inlock(opTime, false, DataConsistency::Consistent);
_reportUpstream_inlock(std::move(lock));
}
@@ -1075,8 +1089,9 @@ void ReplicationCoordinatorImpl::resetMyLastOpTimes() {
void ReplicationCoordinatorImpl::_resetMyLastOpTimes_inlock() {
LOG(1) << "resetting durable/applied optimes.";
// Reset to uninitialized OpTime
- _setMyLastAppliedOpTime_inlock(OpTime(), true);
- _setMyLastDurableOpTime_inlock(OpTime(), true);
+ bool isRollbackAllowed = true;
+ _setMyLastAppliedOpTime_inlock(OpTime(), isRollbackAllowed, DataConsistency::Inconsistent);
+ _setMyLastDurableOpTime_inlock(OpTime(), isRollbackAllowed);
_stableOpTimeCandidates.clear();
}
@@ -1097,27 +1112,38 @@ void ReplicationCoordinatorImpl::_reportUpstream_inlock(stdx::unique_lock<stdx::
}
void ReplicationCoordinatorImpl::_setMyLastAppliedOpTime_inlock(const OpTime& opTime,
- bool isRollbackAllowed) {
+ bool isRollbackAllowed,
+ DataConsistency consistency) {
auto* myMemberData = _topCoord->getMyMemberData();
- invariant(isRollbackAllowed || myMemberData->getLastAppliedOpTime() <= opTime);
+ invariant(isRollbackAllowed || opTime >= myMemberData->getLastAppliedOpTime());
myMemberData->setLastAppliedOpTime(opTime, _replExecutor->now());
_updateLastCommittedOpTime_inlock();
- // Add the new applied optime to the list of stable optime candidates and then set the
- // last stable optime. Stable optimes are used to determine the last optime that it is
- // safe to revert the database to, in the event of a rollback. Note that master-slave mode has
- // no automatic fail over, and so rollbacks never occur. Additionally, the commit point for a
- // master-slave set will never advance, since it doesn't use any consensus protocol. Since the
- // set of stable optime candidates can only get cleaned up when the commit point advances, we
- // should refrain from updating stable optime candidates in master-slave mode, to avoid the
- // candidates list from growing unbounded.
- if (!opTime.isNull() && getReplicationMode() == Mode::modeReplSet) {
+ // Signal anyone waiting on optime changes.
+ _opTimeWaiterList.signalAndRemoveIf_inlock(
+ [opTime](Waiter* waiter) { return waiter->opTime <= opTime; });
+
+
+ // Note that master-slave mode has no automatic fail over, and so rollbacks never occur.
+ // Additionally, the commit point for a master-slave set will never advance, since it doesn't
+ // use any consensus protocol. Since the set of stable optime candidates can only get cleaned up
+ // when the commit point advances, we should refrain from updating stable optime candidates in
+ // master-slave mode, to avoid the candidates list from growing unbounded.
+ if (opTime.isNull() || getReplicationMode() != Mode::modeReplSet) {
+ return;
+ }
+
+ // Add the new applied optime to the list of stable optime candidates and then set the last
+ // stable optime. Stable optimes are used to determine the last optime that it is safe to revert
+ // the database to, in the event of a rollback via the 'recover to timestamp' method. If we are
+ // setting our applied optime to a value that doesn't represent a consistent database state, we
+ // should not add it to the set of stable optime candidates. For example, if we are in
+ // RECOVERING after a rollback using the 'rollbackViaRefetch' algorithm, we will be inconsistent
+ // until we reach the 'minValid' optime.
+ if (consistency == DataConsistency::Consistent) {
_stableOpTimeCandidates.insert(opTime);
_setStableTimestampForStorage_inlock();
}
-
- _opTimeWaiterList.signalAndRemoveIf_inlock(
- [opTime](Waiter* waiter) { return waiter->opTime <= opTime; });
}
void ReplicationCoordinatorImpl::_setMyLastDurableOpTime_inlock(const OpTime& opTime,
@@ -2618,7 +2644,16 @@ ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator_inlock(
_dropAllSnapshots_inlock();
}
+ // Upon transitioning out of ROLLBACK, we must clear any stable optime candidates that may have
+ // been rolled back.
if (_memberState.rollback()) {
+ // Our 'lastApplied' optime at this point should be the rollback common point. We should
+ // remove any stable optime candidates greater than the common point.
+ auto lastApplied = _getMyLastAppliedOpTime_inlock();
+ // The upper bound will give us the first optime T such that T > lastApplied.
+ auto deletePoint = _stableOpTimeCandidates.upper_bound(lastApplied);
+ _stableOpTimeCandidates.erase(deletePoint, _stableOpTimeCandidates.end());
+
// Ensure that no snapshots were created while we were in rollback.
invariant(!_currentCommittedSnapshot);
}
@@ -3132,7 +3167,8 @@ void ReplicationCoordinatorImpl::blacklistSyncSource(const HostAndPort& host, Da
host));
}
-void ReplicationCoordinatorImpl::resetLastOpTimesFromOplog(OperationContext* opCtx) {
+void ReplicationCoordinatorImpl::resetLastOpTimesFromOplog(OperationContext* opCtx,
+ DataConsistency consistency) {
StatusWith<OpTime> lastOpTimeStatus = _externalState->loadLastOpTime(opCtx);
OpTime lastOpTime;
if (!lastOpTimeStatus.isOK()) {
@@ -3143,8 +3179,9 @@ void ReplicationCoordinatorImpl::resetLastOpTimesFromOplog(OperationContext* opC
}
stdx::unique_lock<stdx::mutex> lock(_mutex);
- _setMyLastAppliedOpTime_inlock(lastOpTime, true);
- _setMyLastDurableOpTime_inlock(lastOpTime, true);
+ bool isRollbackAllowed = true;
+ _setMyLastAppliedOpTime_inlock(lastOpTime, isRollbackAllowed, consistency);
+ _setMyLastDurableOpTime_inlock(lastOpTime, isRollbackAllowed);
_reportUpstream_inlock(std::move(lock));
// Unlocked below.
@@ -3222,13 +3259,33 @@ std::set<OpTime> ReplicationCoordinatorImpl::getStableOpTimeCandidates_forTest()
return _stableOpTimeCandidates;
}
-void ReplicationCoordinatorImpl::_setStableTimestampForStorage_inlock() {
+boost::optional<OpTime> ReplicationCoordinatorImpl::getStableOpTime_forTest() {
+ return _getStableOpTime_inlock();
+}
- // Get the current stable optime.
+boost::optional<OpTime> ReplicationCoordinatorImpl::_getStableOpTime_inlock() {
auto commitPoint = _topCoord->getLastCommittedOpTime();
+ if (_currentCommittedSnapshot) {
+ auto snapshotOpTime = _currentCommittedSnapshot->opTime;
+ invariant(snapshotOpTime.getTimestamp() <= commitPoint.getTimestamp());
+ invariant(snapshotOpTime <= commitPoint);
+ }
+
+ // Compute the current stable optime.
auto stableOpTime = _calculateStableOpTime(_stableOpTimeCandidates, commitPoint);
+ if (stableOpTime) {
+ // By definition, the stable optime should never be greater than the commit point.
+ invariant(stableOpTime->getTimestamp() <= commitPoint.getTimestamp());
+ invariant(*stableOpTime <= commitPoint);
+ }
- invariant(stableOpTime <= commitPoint);
+ return stableOpTime;
+}
+
+void ReplicationCoordinatorImpl::_setStableTimestampForStorage_inlock() {
+
+ // Get the current stable optime.
+ auto stableOpTime = _getStableOpTime_inlock();
// If there is a valid stable optime, set it for the storage engine, and then remove any
// old, unneeded stable optime candidates.
@@ -3256,7 +3313,9 @@ void ReplicationCoordinatorImpl::advanceCommitPoint(const OpTime& committedOpTim
void ReplicationCoordinatorImpl::_advanceCommitPoint_inlock(const OpTime& committedOpTime) {
if (_topCoord->advanceLastCommittedOpTime(committedOpTime)) {
if (_getMemberState_inlock().arbiter()) {
- _setMyLastAppliedOpTime_inlock(committedOpTime, false);
+ // Arbiters do not store replicated data, so we consider their data trivially
+ // consistent.
+ _setMyLastAppliedOpTime_inlock(committedOpTime, false, DataConsistency::Consistent);
}
_setStableTimestampForStorage_inlock();
@@ -3548,13 +3607,20 @@ void ReplicationCoordinatorImpl::_updateCommittedSnapshot_inlock(
// If we are in ROLLBACK state, do not set any new _currentCommittedSnapshot, as it will be
// cleared at the end of rollback anyway.
if (_memberState.rollback()) {
- log() << "not updating committed snapshot because we are in rollback";
+ log() << "Not updating committed snapshot because we are in rollback";
return;
}
invariant(!newCommittedSnapshot.opTime.isNull());
- invariant(newCommittedSnapshot.opTime.getTimestamp() <=
- _topCoord->getLastCommittedOpTime().getTimestamp());
+
+ // The new committed snapshot should be <= the current replication commit point.
+ OpTime lastCommittedOpTime = _topCoord->getLastCommittedOpTime();
+ invariant(newCommittedSnapshot.opTime.getTimestamp() <= lastCommittedOpTime.getTimestamp());
+ invariant(newCommittedSnapshot.opTime <= lastCommittedOpTime);
+
+ // The new committed snapshot should be >= the current snapshot.
if (_currentCommittedSnapshot) {
+ invariant(newCommittedSnapshot.opTime.getTimestamp() >=
+ _currentCommittedSnapshot->opTime.getTimestamp());
invariant(newCommittedSnapshot.opTime >= _currentCommittedSnapshot->opTime);
}
if (MONGO_FAIL_POINT(disableSnapshotting))
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index ff6053ce5a7..62f9594f226 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -157,7 +157,7 @@ public:
virtual void setMyLastAppliedOpTime(const OpTime& opTime);
virtual void setMyLastDurableOpTime(const OpTime& opTime);
- virtual void setMyLastAppliedOpTimeForward(const OpTime& opTime);
+ virtual void setMyLastAppliedOpTimeForward(const OpTime& opTime, DataConsistency consistency);
virtual void setMyLastDurableOpTimeForward(const OpTime& opTime);
virtual void resetMyLastOpTimes();
@@ -265,7 +265,8 @@ public:
virtual void blacklistSyncSource(const HostAndPort& host, Date_t until) override;
- virtual void resetLastOpTimesFromOplog(OperationContext* opCtx) override;
+ virtual void resetLastOpTimesFromOplog(OperationContext* opCtx,
+ DataConsistency consistency) override;
virtual bool shouldChangeSyncSource(
const HostAndPort& currentSource,
@@ -387,6 +388,7 @@ public:
const OpTime& commitPoint);
void cleanupStableOpTimeCandidates_forTest(std::set<OpTime>* candidates, OpTime stableOpTime);
std::set<OpTime> getStableOpTimeCandidates_forTest();
+ boost::optional<OpTime> getStableOpTime_forTest();
/**
* Non-blocking version of updateTerm.
@@ -682,7 +684,9 @@ private:
/**
* Helpers to set the last applied and durable OpTime.
*/
- void _setMyLastAppliedOpTime_inlock(const OpTime& opTime, bool isRollbackAllowed);
+ void _setMyLastAppliedOpTime_inlock(const OpTime& opTime,
+ bool isRollbackAllowed,
+ DataConsistency consistency);
void _setMyLastDurableOpTime_inlock(const OpTime& opTime, bool isRollbackAllowed);
/**
@@ -1001,6 +1005,12 @@ private:
void _updateCommittedSnapshot_inlock(SnapshotInfo newCommittedSnapshot);
/**
+ * A helper method that returns the current stable optime based on the current commit point and
+ * set of stable optime candidates.
+ */
+ boost::optional<OpTime> _getStableOpTime_inlock();
+
+ /**
* Calculates the 'stable' replication optime given a set of optime candidates and the
* current commit point. The stable optime is the greatest optime in 'candidates' that is
* also less than or equal to 'commitPoint'.
diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
index 5b8ce8441a5..90a764ea447 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
@@ -3944,6 +3944,96 @@ TEST_F(StableOpTimeTest, SetMyLastAppliedDoesntAddTimestampCandidateInMasterSlav
ASSERT(repl->getStableOpTimeCandidates_forTest().empty());
}
+TEST_F(StableOpTimeTest, ClearOpTimeCandidatesPastCommonPointAfterRollback) {
+
+ assertStartSuccess(BSON("_id"
+ << "mySet"
+ << "version"
+ << 1
+ << "members"
+ << BSON_ARRAY(BSON("host"
+ << "node1:12345"
+ << "_id"
+ << 0))
+ << "protocolVersion"
+ << 1),
+ HostAndPort("node1", 12345));
+
+ auto repl = getReplCoord();
+ long long term = 0;
+ ASSERT_OK(repl->setFollowerMode(MemberState::RS_SECONDARY));
+
+ OpTime rollbackCommonPoint = OpTime({1, 2}, term);
+ OpTime commitPoint = OpTime({1, 2}, term);
+ repl->advanceCommitPoint(commitPoint);
+ ASSERT_EQUALS(SnapshotName::min(), getStorageInterface()->getStableTimestamp());
+
+ repl->setMyLastAppliedOpTime(OpTime({1, 1}, term));
+ repl->setMyLastAppliedOpTime(OpTime({1, 2}, term));
+ repl->setMyLastAppliedOpTime(OpTime({1, 3}, term));
+ repl->setMyLastAppliedOpTime(OpTime({1, 4}, term));
+
+ // The stable timestamp should be equal to the commit point timestamp.
+ const Timestamp stableTimestamp =
+ Timestamp(getStorageInterface()->getStableTimestamp().asU64());
+ Timestamp expectedStableTimestamp = commitPoint.getTimestamp();
+ ASSERT_EQUALS(expectedStableTimestamp, stableTimestamp);
+
+ // The stable optime candidate set should contain optimes >= the stable optime.
+ std::set<OpTime> opTimeCandidates = repl->getStableOpTimeCandidates_forTest();
+ std::set<OpTime> expectedOpTimeCandidates = {
+ OpTime({1, 2}, term), OpTime({1, 3}, term), OpTime({1, 4}, term)};
+ ASSERT_OPTIME_SET_EQ(expectedOpTimeCandidates, opTimeCandidates);
+
+ // Transition to ROLLBACK. The set of stable optime candidates should not have changed.
+ ASSERT_OK(repl->setFollowerMode(MemberState::RS_ROLLBACK));
+ opTimeCandidates = repl->getStableOpTimeCandidates_forTest();
+ ASSERT_OPTIME_SET_EQ(expectedOpTimeCandidates, opTimeCandidates);
+
+ // Simulate a rollback to the common point.
+ auto opCtx = makeOperationContext();
+ getExternalState()->setLastOpTime(rollbackCommonPoint);
+ repl->resetLastOpTimesFromOplog(opCtx.get(),
+ ReplicationCoordinator::DataConsistency::Inconsistent);
+
+ // Transition to RECOVERING from ROLLBACK.
+ ASSERT_OK(repl->setFollowerMode(MemberState::RS_RECOVERING));
+
+ // Make sure the stable optime candidate set has been cleared of all entries past the common
+ // point.
+ opTimeCandidates = repl->getStableOpTimeCandidates_forTest();
+ auto stableOpTime = repl->getStableOpTime_forTest();
+ ASSERT(stableOpTime);
+ expectedOpTimeCandidates = {*stableOpTime};
+ ASSERT_OPTIME_SET_EQ(expectedOpTimeCandidates, opTimeCandidates);
+}
+
+TEST_F(StableOpTimeTest, OpTimeCandidatesAreNotAddedWhenStateIsNotConsistent) {
+
+ initReplSetMode();
+ auto repl = getReplCoord();
+ long long term = 0;
+
+ OpTime consistentOpTime = OpTime({1, 1}, term);
+ OpTime inconsistentOpTime = OpTime({1, 2}, term);
+ std::set<OpTime> expectedOpTimeCandidates = {OpTime({1, 1}, term)};
+
+ // Set the lastApplied optime forward when data is consistent, and check that it was added to
+ // the candidate set.
+ repl->setMyLastAppliedOpTimeForward(consistentOpTime,
+ ReplicationCoordinator::DataConsistency::Consistent);
+ ASSERT_EQUALS(consistentOpTime, repl->getMyLastAppliedOpTime());
+ ASSERT_OPTIME_SET_EQ(expectedOpTimeCandidates, repl->getStableOpTimeCandidates_forTest());
+
+ // Set the lastApplied optime forward when data is not consistent, and check that it wasn't
+ // added to the candidate set.
+ repl->setMyLastAppliedOpTimeForward(inconsistentOpTime,
+ ReplicationCoordinator::DataConsistency::Inconsistent);
+ ASSERT_EQUALS(inconsistentOpTime, repl->getMyLastAppliedOpTime());
+ ASSERT_OPTIME_SET_EQ(expectedOpTimeCandidates, repl->getStableOpTimeCandidates_forTest());
+}
+
+
TEST_F(ReplCoordTest, NodeReturnsShutdownInProgressWhenWaitingUntilAnOpTimeDuringShutdown) {
assertStartSuccess(BSON("_id"
<< "mySet"
@@ -5081,11 +5171,12 @@ TEST_F(ReplCoordTest,
OpTime time2(Timestamp(100, 2), 1);
OpTime time3(Timestamp(100, 3), 1);
+ auto consistency = ReplicationCoordinator::DataConsistency::Consistent;
getReplCoord()->setMyLastAppliedOpTime(time1);
ASSERT_EQUALS(time1, getReplCoord()->getMyLastAppliedOpTime());
- getReplCoord()->setMyLastAppliedOpTimeForward(time3);
+ getReplCoord()->setMyLastAppliedOpTimeForward(time3, consistency);
ASSERT_EQUALS(time3, getReplCoord()->getMyLastAppliedOpTime());
- getReplCoord()->setMyLastAppliedOpTimeForward(time2);
+ getReplCoord()->setMyLastAppliedOpTimeForward(time2, consistency);
getReplCoord()->setMyLastDurableOpTimeForward(time2);
ASSERT_EQUALS(time3, getReplCoord()->getMyLastAppliedOpTime());
}
diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp
index 58a3efd664d..2dd8a3ff71f 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_mock.cpp
@@ -200,7 +200,8 @@ void ReplicationCoordinatorMock::setMyLastDurableOpTime(const OpTime& opTime) {
_myLastDurableOpTime = opTime;
}
-void ReplicationCoordinatorMock::setMyLastAppliedOpTimeForward(const OpTime& opTime) {
+void ReplicationCoordinatorMock::setMyLastAppliedOpTimeForward(const OpTime& opTime,
+ DataConsistency consistency) {
if (opTime > _myLastAppliedOpTime) {
_myLastAppliedOpTime = opTime;
}
@@ -412,7 +413,8 @@ HostAndPort ReplicationCoordinatorMock::chooseNewSyncSource(const OpTime& lastOp
void ReplicationCoordinatorMock::blacklistSyncSource(const HostAndPort& host, Date_t until) {}
-void ReplicationCoordinatorMock::resetLastOpTimesFromOplog(OperationContext* opCtx) {
+void ReplicationCoordinatorMock::resetLastOpTimesFromOplog(OperationContext* opCtx,
+ DataConsistency consistency) {
invariant(false);
}
diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h
index 12bf54c1f6c..47a945b85d2 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_mock.h
@@ -119,7 +119,7 @@ public:
virtual void setMyLastAppliedOpTime(const OpTime& opTime);
virtual void setMyLastDurableOpTime(const OpTime& opTime);
- virtual void setMyLastAppliedOpTimeForward(const OpTime& opTime);
+ virtual void setMyLastAppliedOpTimeForward(const OpTime& opTime, DataConsistency consistency);
virtual void setMyLastDurableOpTimeForward(const OpTime& opTime);
virtual void resetMyLastOpTimes();
@@ -220,7 +220,7 @@ public:
virtual void blacklistSyncSource(const HostAndPort& host, Date_t until);
- virtual void resetLastOpTimesFromOplog(OperationContext* opCtx);
+ virtual void resetLastOpTimesFromOplog(OperationContext* opCtx, DataConsistency consistency);
virtual bool shouldChangeSyncSource(const HostAndPort& currentSource,
const rpc::ReplSetMetadata& replMetadata,
diff --git a/src/mongo/db/repl/rollback_impl.cpp b/src/mongo/db/repl/rollback_impl.cpp
index 31c34c016ba..fa03b891526 100644
--- a/src/mongo/db/repl/rollback_impl.cpp
+++ b/src/mongo/db/repl/rollback_impl.cpp
@@ -198,11 +198,23 @@ StatusWith<Timestamp> RollbackImpl::_findCommonPoint() {
return commonPointSW.getStatus();
}
- log() << "Rollback common point is " << commonPointSW.getValue().first;
+ OpTime commonPoint = commonPointSW.getValue().first;
+ OpTime lastCommittedOpTime = _replicationCoordinator->getLastCommittedOpTime();
+ OpTime committedSnapshot = _replicationCoordinator->getCurrentCommittedSnapshotOpTime();
+
+ log() << "Rollback common point is " << commonPoint;
+
+ // Rollback common point should be >= the replication commit point.
invariant(!_replicationCoordinator->isV1ElectionProtocol() ||
- commonPointSW.getValue().first >= _replicationCoordinator->getLastCommittedOpTime());
+ commonPoint.getTimestamp() >= lastCommittedOpTime.getTimestamp());
+ invariant(!_replicationCoordinator->isV1ElectionProtocol() ||
+ commonPoint >= lastCommittedOpTime);
+
+ // Rollback common point should be >= the committed snapshot optime.
+ invariant(commonPoint.getTimestamp() >= committedSnapshot.getTimestamp());
+ invariant(commonPoint >= committedSnapshot);
- return commonPointSW.getValue().first.getTimestamp();
+ return commonPoint.getTimestamp();
}
Status RollbackImpl::_recoverToStableTimestamp(OperationContext* opCtx) {
diff --git a/src/mongo/db/repl/rollback_test_fixture.cpp b/src/mongo/db/repl/rollback_test_fixture.cpp
index 4724fc9da8d..c017bb6110b 100644
--- a/src/mongo/db/repl/rollback_test_fixture.cpp
+++ b/src/mongo/db/repl/rollback_test_fixture.cpp
@@ -118,7 +118,7 @@ RollbackTest::ReplicationCoordinatorRollbackMock::ReplicationCoordinatorRollback
: ReplicationCoordinatorMock(service, createReplSettings()) {}
void RollbackTest::ReplicationCoordinatorRollbackMock::resetLastOpTimesFromOplog(
- OperationContext* opCtx) {}
+ OperationContext* opCtx, ReplicationCoordinator::DataConsistency consistency) {}
void RollbackTest::ReplicationCoordinatorRollbackMock::failSettingFollowerMode(
const MemberState& transitionToFail, ErrorCodes::Error codeToFailWith) {
diff --git a/src/mongo/db/repl/rollback_test_fixture.h b/src/mongo/db/repl/rollback_test_fixture.h
index c31aeed617e..29a301b8e1c 100644
--- a/src/mongo/db/repl/rollback_test_fixture.h
+++ b/src/mongo/db/repl/rollback_test_fixture.h
@@ -114,7 +114,8 @@ public:
* Base class implementation triggers an invariant. This function is overridden to be a no-op
* for rollback tests.
*/
- void resetLastOpTimesFromOplog(OperationContext* opCtx) override;
+ void resetLastOpTimesFromOplog(OperationContext* opCtx,
+ ReplicationCoordinator::DataConsistency consistency) override;
/**
* Returns IllegalOperation (does not forward call to
diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp
index 5fdbe0ad1b9..3ba1d2ef5b9 100644
--- a/src/mongo/db/repl/rs_rollback.cpp
+++ b/src/mongo/db/repl/rs_rollback.cpp
@@ -935,9 +935,20 @@ Status _syncRollback(OperationContext* opCtx,
<< e.what());
}
- log() << "Rollback common point is " << how.commonPoint;
+ OpTime commonPoint = how.commonPoint;
+ OpTime lastCommittedOpTime = replCoord->getLastCommittedOpTime();
+ OpTime committedSnapshot = replCoord->getCurrentCommittedSnapshotOpTime();
+
+ log() << "Rollback common point is " << commonPoint;
+
+ // Rollback common point should be >= the replication commit point.
invariant(!replCoord->isV1ElectionProtocol() ||
- how.commonPoint >= replCoord->getLastCommittedOpTime());
+ commonPoint.getTimestamp() >= lastCommittedOpTime.getTimestamp());
+ invariant(!replCoord->isV1ElectionProtocol() || commonPoint >= lastCommittedOpTime);
+
+ // Rollback common point should be >= the committed snapshot optime.
+ invariant(commonPoint.getTimestamp() >= committedSnapshot.getTimestamp());
+ invariant(commonPoint >= committedSnapshot);
try {
ON_BLOCK_EXIT([&] {
@@ -1418,8 +1429,13 @@ void rollback_internal::syncFixUp(OperationContext* opCtx,
}
// Reload the lastAppliedOpTime and lastDurableOpTime value in the replcoord and the
- // lastAppliedHash value in bgsync to reflect our new last op.
- replCoord->resetLastOpTimesFromOplog(opCtx);
+ // lastAppliedHash value in bgsync to reflect our new last op. The rollback common point does
+ // not necessarily represent a consistent database state. For example, on a secondary, we may
+ // have rolled back to an optime that fell in the middle of an oplog application batch. We make
+ // the database consistent again after rollback by applying ops forward until we reach
+ // 'minValid'.
+ replCoord->resetLastOpTimesFromOplog(opCtx,
+ ReplicationCoordinator::DataConsistency::Inconsistent);
}
Status syncRollback(OperationContext* opCtx,
diff --git a/src/mongo/db/repl/rs_rollback_no_uuid.cpp b/src/mongo/db/repl/rs_rollback_no_uuid.cpp
index 5e1409b2470..765041a0638 100644
--- a/src/mongo/db/repl/rs_rollback_no_uuid.cpp
+++ b/src/mongo/db/repl/rs_rollback_no_uuid.cpp
@@ -935,8 +935,13 @@ void syncFixUp(OperationContext* opCtx,
}
// Reload the lastAppliedOpTime and lastDurableOpTime value in the replcoord and the
- // lastAppliedHash value in bgsync to reflect our new last op.
- replCoord->resetLastOpTimesFromOplog(opCtx);
+ // lastAppliedHash value in bgsync to reflect our new last op. The rollback common point does
+ // not necessarily represent a consistent database state. For example, on a secondary, we may
+ // have rolled back to an optime that fell in the middle of an oplog application batch. We make
+ // the database consistent again after rollback by applying ops forward until we reach
+ // 'minValid'.
+ replCoord->resetLastOpTimesFromOplog(opCtx,
+ ReplicationCoordinator::DataConsistency::Inconsistent);
}
Status _syncRollback(OperationContext* opCtx,
@@ -986,7 +991,21 @@ Status _syncRollback(OperationContext* opCtx,
<< e.what());
}
+ OpTime commonPoint = how.commonPoint;
+ OpTime lastCommittedOpTime = replCoord->getLastCommittedOpTime();
+ OpTime committedSnapshot = replCoord->getCurrentCommittedSnapshotOpTime();
+
log() << "Rollback common point is " << how.commonPoint;
+
+ // Rollback common point should be >= the replication commit point.
+ invariant(!replCoord->isV1ElectionProtocol() ||
+ commonPoint.getTimestamp() >= lastCommittedOpTime.getTimestamp());
+ invariant(!replCoord->isV1ElectionProtocol() || commonPoint >= lastCommittedOpTime);
+
+ // Rollback common point should be >= the committed snapshot optime.
+ invariant(commonPoint.getTimestamp() >= committedSnapshot.getTimestamp());
+ invariant(commonPoint >= committedSnapshot);
+
try {
ON_BLOCK_EXIT([&] {
auto status = replicationProcess->incrementRollbackID(opCtx);
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 35362355aad..f3af941fbc5 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -62,6 +62,7 @@
#include "mongo/db/repl/oplogreader.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/repl_set_config.h"
+#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/repl/replication_process.h"
#include "mongo/db/repl/storage_interface.h"
@@ -172,15 +173,17 @@ public:
ApplyBatchFinalizer(ReplicationCoordinator* replCoord) : _replCoord(replCoord) {}
virtual ~ApplyBatchFinalizer(){};
- virtual void record(const OpTime& newOpTime) {
- _recordApplied(newOpTime);
+ virtual void record(const OpTime& newOpTime,
+ ReplicationCoordinator::DataConsistency consistency) {
+ _recordApplied(newOpTime, consistency);
};
protected:
- void _recordApplied(const OpTime& newOpTime) {
+ void _recordApplied(const OpTime& newOpTime,
+ ReplicationCoordinator::DataConsistency consistency) {
// We have to use setMyLastAppliedOpTimeForward since this thread races with
// ReplicationExternalStateImpl::onTransitionToPrimary.
- _replCoord->setMyLastAppliedOpTimeForward(newOpTime);
+ _replCoord->setMyLastAppliedOpTimeForward(newOpTime, consistency);
}
void _recordDurable(const OpTime& newOpTime) {
@@ -201,7 +204,8 @@ public:
_waiterThread{&ApplyBatchFinalizerForJournal::_run, this} {};
~ApplyBatchFinalizerForJournal();
- void record(const OpTime& newOpTime) override;
+ void record(const OpTime& newOpTime,
+ ReplicationCoordinator::DataConsistency consistency) override;
private:
/**
@@ -232,8 +236,9 @@ ApplyBatchFinalizerForJournal::~ApplyBatchFinalizerForJournal() {
_waiterThread.join();
}
-void ApplyBatchFinalizerForJournal::record(const OpTime& newOpTime) {
- _recordApplied(newOpTime);
+void ApplyBatchFinalizerForJournal::record(const OpTime& newOpTime,
+ ReplicationCoordinator::DataConsistency consistency) {
+ _recordApplied(newOpTime, consistency);
stdx::unique_lock<stdx::mutex> lock(_mutex);
_latestOpTime = newOpTime;
@@ -695,8 +700,14 @@ void fillWriterVectorsAndLastestSessionRecords(
} // namespace
-// Applies a batch of oplog entries, by writing the oplog entries to the local oplog
-// and then using a set of threads to apply the operations.
+/**
+ * Applies a batch of oplog entries by writing the oplog entries to the local oplog and then using
+ * a set of threads to apply the operations. If the batch application is successful, returns the
+ * optime of the last op applied, which should be the last op in the batch. To provide crash
+ * resilience, this function will advance the persistent value of 'minValid' to at least the
+ * last optime of the batch. If 'minValid' is already greater than or equal to the last optime of
+ * this batch, it will not be updated.
+ */
OpTime SyncTail::multiApply(OperationContext* opCtx, MultiApplier::Operations ops) {
auto applyOperation = [this](MultiApplier::OperationPtrs* ops) -> Status {
_applyFunc(ops, this);
@@ -709,7 +720,9 @@ OpTime SyncTail::multiApply(OperationContext* opCtx, MultiApplier::Operations op
}
namespace {
-void tryToGoLiveAsASecondary(OperationContext* opCtx, ReplicationCoordinator* replCoord) {
+void tryToGoLiveAsASecondary(OperationContext* opCtx,
+ ReplicationCoordinator* replCoord,
+ OpTime minValid) {
if (replCoord->isInPrimaryOrSecondaryState()) {
return;
}
@@ -718,27 +731,35 @@ void tryToGoLiveAsASecondary(OperationContext* opCtx, ReplicationCoordinator* re
ON_BLOCK_EXIT([] { attemptsToBecomeSecondary.increment(); });
// Need global X lock to transition to SECONDARY
- Lock::GlobalWrite readLock(opCtx);
+ Lock::GlobalWrite writeLock(opCtx);
+ // Maintenance mode will force us to remain in RECOVERING state, no matter what.
if (replCoord->getMaintenanceMode()) {
- LOG(1) << "Can't go live (tryToGoLiveAsASecondary) as maintenance mode is active.";
- // we're not actually going live
+ LOG(1) << "We cannot transition to SECONDARY state while in maintenance mode.";
return;
}
- // Only state RECOVERING can transition to SECONDARY.
+ // We can only transition to SECONDARY from RECOVERING state.
MemberState state(replCoord->getMemberState());
if (!state.recovering()) {
- LOG(2) << "Can't go live (tryToGoLiveAsASecondary) as state != recovering.";
+ LOG(2) << "We cannot transition to SECONDARY state since we are not currently in "
+ "RECOVERING state. Current state: "
+ << state.toString();
return;
}
- // We can't go to SECONDARY until we reach minvalid.
- if (replCoord->getMyLastAppliedOpTime() <
- ReplicationProcess::get(opCtx)->getConsistencyMarkers()->getMinValid(opCtx)) {
+ // We can't go to SECONDARY state until we reach 'minValid', since the database may be in an
+ // inconsistent state before this point. If our state is inconsistent, we need to disallow reads
+ // from clients, which is why we stay in RECOVERING state.
+ auto lastApplied = replCoord->getMyLastAppliedOpTime();
+ if (lastApplied < minValid) {
+ LOG(2) << "We cannot transition to SECONDARY state because our 'lastApplied' optime is "
+ "less than the 'minValid' optime. minValid optime: "
+ << minValid << ", lastApplied optime: " << lastApplied;
return;
}
+ // Execute the transition to SECONDARY.
auto status = replCoord->setFollowerMode(MemberState::RS_SECONDARY);
if (!status.isOK()) {
warning() << "Failed to transition into " << MemberState(MemberState::RS_SECONDARY)
@@ -859,6 +880,11 @@ void SyncTail::oplogApplication(ReplicationCoordinator* replCoord) {
? new ApplyBatchFinalizerForJournal(replCoord)
: new ApplyBatchFinalizer(replCoord)};
+ // Get replication consistency markers.
+ ReplicationProcess* replProcess = ReplicationProcess::get(replCoord->getServiceContext());
+ ReplicationConsistencyMarkers* consistencyMarkers = replProcess->getConsistencyMarkers();
+ OpTime minValid;
+
while (true) { // Exits on message from OpQueueBatcher.
// Use a new operation context each iteration, as otherwise we may appear to use a single
// collection name to refer to collections with different UUIDs.
@@ -880,7 +906,11 @@ void SyncTail::oplogApplication(ReplicationCoordinator* replCoord) {
}
}
- tryToGoLiveAsASecondary(&opCtx, replCoord);
+ // Get the current value of 'minValid'.
+ minValid = consistencyMarkers->getMinValid(&opCtx);
+
+ // Transition to SECONDARY state, if possible.
+ tryToGoLiveAsASecondary(&opCtx, replCoord, minValid);
long long termWhenBufferIsEmpty = replCoord->getTerm();
// Blocks up to a second waiting for a batch to be ready to apply. If one doesn't become
@@ -888,6 +918,7 @@ void SyncTail::oplogApplication(ReplicationCoordinator* replCoord) {
OpQueue ops = batcher.getNextBatch(Seconds(1));
if (ops.empty()) {
if (ops.mustShutdown()) {
+ // Shut down and exit oplog application loop.
return;
}
if (MONGO_FAIL_POINT(rsSyncApplyStop)) {
@@ -918,16 +949,34 @@ void SyncTail::oplogApplication(ReplicationCoordinator* replCoord) {
// Don't allow the fsync+lock thread to see intermediate states of batch application.
stdx::lock_guard<SimpleMutex> fsynclk(filesLockedFsync);
- // Do the work.
- multiApply(&opCtx, ops.releaseBatch());
+ // Apply the operations in this batch. 'multiApply' returns the optime of the last op that
+ // was applied, which should be the last optime in the batch.
+ auto lastOpTimeAppliedInBatch = multiApply(&opCtx, ops.releaseBatch());
+ invariant(lastOpTimeAppliedInBatch == lastOpTimeInBatch);
+
+ // In order to provide resilience in the event of a crash in the middle of batch
+ // application, 'multiApply' will update 'minValid' so that it is at least as great as the
+ // last optime that it applied in this batch. If 'minValid' was moved forward, we make sure
+ // to update our view of it here.
+ if (lastOpTimeInBatch > minValid) {
+ minValid = lastOpTimeInBatch;
+ }
// Update various things that care about our last applied optime. Tests rely on 2 happening
// before 3 even though it isn't strictly necessary. The order of 1 doesn't matter.
- setNewTimestamp(opCtx.getServiceContext(), lastOpTimeInBatch.getTimestamp()); // 1
- ReplicationProcess::get(&opCtx)->getConsistencyMarkers()->setAppliedThrough(
- &opCtx,
- lastOpTimeInBatch); // 2
- finalizer->record(lastOpTimeInBatch); // 3
+
+ // 1. Update the global timestamp.
+ setNewTimestamp(opCtx.getServiceContext(), lastOpTimeInBatch.getTimestamp());
+
+ // 2. Persist our "applied through" optime to disk.
+ consistencyMarkers->setAppliedThrough(&opCtx, lastOpTimeInBatch);
+
+ // 3. Finalize this batch. We are at a consistent optime if our current optime is >= the
+ // current 'minValid' optime.
+ auto consistency = (lastOpTimeInBatch >= minValid)
+ ? ReplicationCoordinator::DataConsistency::Consistent
+ : ReplicationCoordinator::DataConsistency::Inconsistent;
+ finalizer->record(lastOpTimeInBatch, consistency);
}
}