diff options
author | William Schultz <william.schultz@mongodb.com> | 2017-11-21 15:52:48 -0500 |
---|---|---|
committer | William Schultz <william.schultz@mongodb.com> | 2017-11-21 16:19:35 -0500 |
commit | 3867aecb8eb2a0d8c4835f9adf3e76c83e607a10 (patch) | |
tree | 669f3a1fc040a7b0bc89b1d72fa9f04b16b0b6f7 | |
parent | 01be30b1e364f10f3b0ba7e7b00fd81337bae434 (diff) | |
download | mongo-3867aecb8eb2a0d8c4835f9adf3e76c83e607a10.tar.gz |
SERVER-30577 Don't update the stable timestamp if database is in an inconsistent state
(cherry picked from commit 30de2f7c46a9aa0914fe91cba2075b244e9b516b)
-rw-r--r-- | src/mongo/db/repl/initial_syncer.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/initial_syncer.h | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/initial_syncer_test.cpp | 20 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator.h | 27 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 136 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.h | 16 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_test.cpp | 95 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_mock.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_mock.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/rollback_impl.cpp | 18 | ||||
-rw-r--r-- | src/mongo/db/repl/rollback_test_fixture.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/rollback_test_fixture.h | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_rollback.cpp | 24 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_rollback_no_uuid.cpp | 23 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 101 |
16 files changed, 389 insertions, 97 deletions
diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp index 66f01046277..14f5a8c1f21 100644 --- a/src/mongo/db/repl/initial_syncer.cpp +++ b/src/mongo/db/repl/initial_syncer.cpp @@ -388,7 +388,6 @@ void InitialSyncer::_tearDown_inlock(OperationContext* opCtx, _storage->setInitialDataTimestamp(opCtx->getServiceContext(), SnapshotName(lastApplied.getValue().opTime.getTimestamp())); _replicationProcess->getConsistencyMarkers()->clearInitialSyncFlag(opCtx); - _opts.setMyLastOptime(lastApplied.getValue().opTime); log() << "initial sync done; took " << duration_cast<Seconds>(_stats.initialSyncEnd - _stats.initialSyncStart) << "."; initialSyncCompletes.increment(); @@ -1022,7 +1021,8 @@ void InitialSyncer::_multiApplierCallback(const Status& multiApplierStatus, _initialSyncState->appliedOps += numApplied; _lastApplied = lastApplied; - _opts.setMyLastOptime(_lastApplied.opTime); + _opts.setMyLastOptime(_lastApplied.opTime, + ReplicationCoordinator::DataConsistency::Inconsistent); auto fetchCount = _fetchCount.load(); if (fetchCount > 0) { diff --git a/src/mongo/db/repl/initial_syncer.h b/src/mongo/db/repl/initial_syncer.h index 157ffcc7249..7d7b43e3727 100644 --- a/src/mongo/db/repl/initial_syncer.h +++ b/src/mongo/db/repl/initial_syncer.h @@ -79,7 +79,8 @@ struct InitialSyncerOptions { using GetMyLastOptimeFn = stdx::function<OpTime()>; /** Function to update optime of last operation applied on this node */ - using SetMyLastOptimeFn = stdx::function<void(const OpTime&)>; + using SetMyLastOptimeFn = + stdx::function<void(const OpTime&, ReplicationCoordinator::DataConsistency consistency)>; /** Function to reset all optimes on this node (e.g. applied & durable). */ using ResetOptimesFn = stdx::function<void()>; diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp index 9df6094127b..504003827bd 100644 --- a/src/mongo/db/repl/initial_syncer_test.cpp +++ b/src/mongo/db/repl/initial_syncer_test.cpp @@ -121,7 +121,10 @@ public: * clear/reset state */ void reset() { - _setMyLastOptime = [this](const OpTime& opTime) { _myLastOpTime = opTime; }; + _setMyLastOptime = [this](const OpTime& opTime, + ReplicationCoordinator::DataConsistency consistency) { + _myLastOpTime = opTime; + }; _myLastOpTime = OpTime(); _syncSourceSelector = stdx::make_unique<SyncSourceSelectorMock>(); } @@ -357,8 +360,11 @@ protected: InitialSyncerOptions options; options.initialSyncRetryWait = Milliseconds(1); options.getMyLastOptime = [this]() { return _myLastOpTime; }; - options.setMyLastOptime = [this](const OpTime& opTime) { _setMyLastOptime(opTime); }; - options.resetOptimes = [this]() { _setMyLastOptime(OpTime()); }; + options.setMyLastOptime = [this](const OpTime& opTime, + ReplicationCoordinator::DataConsistency consistency) { + _setMyLastOptime(opTime, consistency); + }; + options.resetOptimes = [this]() { _myLastOpTime = OpTime(); }; options.getSlaveDelay = [this]() { return Seconds(0); }; options.syncSourceSelector = this; @@ -627,7 +633,8 @@ void InitialSyncerTest::processSuccessfulFCVFetcherResponse(std::vector<BSONObj> TEST_F(InitialSyncerTest, InvalidConstruction) { InitialSyncerOptions options; options.getMyLastOptime = []() { return OpTime(); }; - options.setMyLastOptime = [](const OpTime&) {}; + options.setMyLastOptime = [](const OpTime&, + ReplicationCoordinator::DataConsistency consistency) {}; options.resetOptimes = []() {}; options.getSlaveDelay = []() { return Seconds(0); }; options.syncSourceSelector = this; @@ -858,9 +865,10 @@ TEST_F(InitialSyncerTest, InitialSyncerResetsOptimesOnNewAttempt) { _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort()); - // Set the last optime to an arbitrary nonzero value. + // Set the last optime to an arbitrary nonzero value. The value of the 'consistency' argument + // doesn't matter. auto origOptime = OpTime(Timestamp(1000, 1), 1); - _setMyLastOptime(origOptime); + _setMyLastOptime(origOptime, ReplicationCoordinator::DataConsistency::Inconsistent); // Start initial sync. const std::uint32_t initialSyncMaxAttempts = 1U; diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index ce512effa10..a49b3b1be38 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -398,7 +398,9 @@ void _logOpsInner(OperationContext* opCtx, // Set replCoord last optime only after we're sure the WUOW didn't abort and roll back. opCtx->recoveryUnit()->onCommit([opCtx, replCoord, finalOpTime] { - replCoord->setMyLastAppliedOpTimeForward(finalOpTime); + // Optimes on the primary should always represent consistent database states. + replCoord->setMyLastAppliedOpTimeForward( + finalOpTime, ReplicationCoordinator::DataConsistency::Consistent); ReplClientInfo::forClient(opCtx->getClient()).setLastOp(finalOpTime); }); } diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h index 403792a0f99..94585d67caa 100644 --- a/src/mongo/db/repl/replication_coordinator.h +++ b/src/mongo/db/repl/replication_coordinator.h @@ -312,7 +312,7 @@ public: * it is the caller's job to properly synchronize this behavior. The exception to this rule * is that after calls to resetLastOpTimesFromOplog(), the minimum acceptable value for * "opTime" is reset based on the contents of the oplog, and may go backwards due to - * rollback. + * rollback. Additionally, the optime given MUST represent a consistent database state. */ virtual void setMyLastAppliedOpTime(const OpTime& opTime) = 0; @@ -328,14 +328,27 @@ public: virtual void setMyLastDurableOpTime(const OpTime& opTime) = 0; /** + * This type is used to represent the "consistency" of a current database state. In + * replication, there may be times when our database data is not represented by a single optime, + * because we have fetched remote data from different points in time. For example, when we are + * in RECOVERING following a refetch based rollback. We never allow external clients to read + * from the database if it is not consistent. + */ + enum class DataConsistency { Consistent, Inconsistent }; + + /** * Updates our internal tracking of the last OpTime applied to this node, but only * if the supplied optime is later than the current last OpTime known to the replication - * coordinator. + * coordinator. The 'consistency' argument must tell whether or not the optime argument + * represents a consistent database state. * * This function is used by logOp() on a primary, since the ops in the oplog do not - * necessarily commit in sequential order. + * necessarily commit in sequential order. It is also used when we finish oplog batch + * application on secondaries, to avoid any potential race conditions around setting the + * applied optime from more than one thread. */ - virtual void setMyLastAppliedOpTimeForward(const OpTime& opTime) = 0; + virtual void setMyLastAppliedOpTimeForward(const OpTime& opTime, + DataConsistency consistency) = 0; /** * Updates our internal tracking of the last OpTime durable to this node, but only @@ -741,9 +754,11 @@ public: /** * Loads the optime from the last op in the oplog into the coordinator's lastAppliedOpTime and - * lastDurableOpTime values. + * lastDurableOpTime values. The 'consistency' argument must tell whether or not the optime of + * the op in the oplog represents a consistent database state. */ - virtual void resetLastOpTimesFromOplog(OperationContext* opCtx) = 0; + virtual void resetLastOpTimesFromOplog(OperationContext* opCtx, + DataConsistency consistency) = 0; /** * Returns the OpTime of the latest replica set-committed op known to this server. diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index f50edc09f3c..dfb3259b097 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -315,8 +315,9 @@ InitialSyncerOptions createInitialSyncerOptions( ReplicationCoordinator* replCoord, ReplicationCoordinatorExternalState* externalState) { InitialSyncerOptions options; options.getMyLastOptime = [replCoord]() { return replCoord->getMyLastAppliedOpTime(); }; - options.setMyLastOptime = [replCoord, externalState](const OpTime& opTime) { - replCoord->setMyLastAppliedOpTime(opTime); + options.setMyLastOptime = [replCoord, externalState]( + const OpTime& opTime, ReplicationCoordinator::DataConsistency consistency) { + replCoord->setMyLastAppliedOpTimeForward(opTime, consistency); externalState->setGlobalTimestamp(replCoord->getServiceContext(), opTime.getTimestamp()); }; options.resetOptimes = [replCoord]() { replCoord->resetMyLastOpTimes(); }; @@ -585,9 +586,21 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig( invariant(_rsConfigState == kConfigStartingUp); const PostMemberStateUpdateAction action = _setCurrentRSConfig_inlock(opCtx.get(), localConfig, myIndex.getValue()); + + // Set our last applied and durable optimes to the top of the oplog, if we have one. if (!lastOpTime.isNull()) { - _setMyLastAppliedOpTime_inlock(lastOpTime, false); - _setMyLastDurableOpTime_inlock(lastOpTime, false); + bool isRollbackAllowed = false; + + // If we have an oplog, it is still possible that our data is not in a consistent state. For + // example, if we are starting up after a crash following a post-rollback RECOVERING state. + // To detect this, we see if our last optime is >= the 'minValid' optime, which + // should be persistent across node crashes. + OpTime minValid = _replicationProcess->getConsistencyMarkers()->getMinValid(opCtx.get()); + auto consistency = + (lastOpTime >= minValid) ? DataConsistency::Consistent : DataConsistency::Inconsistent; + + _setMyLastAppliedOpTime_inlock(lastOpTime, isRollbackAllowed, consistency); + _setMyLastDurableOpTime_inlock(lastOpTime, isRollbackAllowed); _reportUpstream_inlock(std::move(lock)); // unlocks _mutex. } else { lock.unlock(); @@ -669,7 +682,7 @@ void ReplicationCoordinatorImpl::_startDataReplication(OperationContext* opCtx, } const auto lastApplied = status.getValue(); - _setMyLastAppliedOpTime_inlock(lastApplied.opTime, false); + _setMyLastAppliedOpTime_inlock(lastApplied.opTime, false, DataConsistency::Consistent); } // Clear maint. mode. @@ -1037,11 +1050,11 @@ void ReplicationCoordinatorImpl::setMyHeartbeatMessage(const std::string& msg) { _topCoord->setMyHeartbeatMessage(_replExecutor->now(), msg); } -void ReplicationCoordinatorImpl::setMyLastAppliedOpTimeForward(const OpTime& opTime) { +void ReplicationCoordinatorImpl::setMyLastAppliedOpTimeForward(const OpTime& opTime, + DataConsistency consistency) { stdx::unique_lock<stdx::mutex> lock(_mutex); if (opTime > _getMyLastAppliedOpTime_inlock()) { - const bool allowRollback = false; - _setMyLastAppliedOpTime_inlock(opTime, allowRollback); + _setMyLastAppliedOpTime_inlock(opTime, false, consistency); _reportUpstream_inlock(std::move(lock)); } } @@ -1056,7 +1069,8 @@ void ReplicationCoordinatorImpl::setMyLastDurableOpTimeForward(const OpTime& opT void ReplicationCoordinatorImpl::setMyLastAppliedOpTime(const OpTime& opTime) { stdx::unique_lock<stdx::mutex> lock(_mutex); - _setMyLastAppliedOpTime_inlock(opTime, false); + // The optime passed to this function is required to represent a consistent database state. + _setMyLastAppliedOpTime_inlock(opTime, false, DataConsistency::Consistent); _reportUpstream_inlock(std::move(lock)); } @@ -1075,8 +1089,9 @@ void ReplicationCoordinatorImpl::resetMyLastOpTimes() { void ReplicationCoordinatorImpl::_resetMyLastOpTimes_inlock() { LOG(1) << "resetting durable/applied optimes."; // Reset to uninitialized OpTime - _setMyLastAppliedOpTime_inlock(OpTime(), true); - _setMyLastDurableOpTime_inlock(OpTime(), true); + bool isRollbackAllowed = true; + _setMyLastAppliedOpTime_inlock(OpTime(), isRollbackAllowed, DataConsistency::Inconsistent); + _setMyLastDurableOpTime_inlock(OpTime(), isRollbackAllowed); _stableOpTimeCandidates.clear(); } @@ -1097,27 +1112,38 @@ void ReplicationCoordinatorImpl::_reportUpstream_inlock(stdx::unique_lock<stdx:: } void ReplicationCoordinatorImpl::_setMyLastAppliedOpTime_inlock(const OpTime& opTime, - bool isRollbackAllowed) { + bool isRollbackAllowed, + DataConsistency consistency) { auto* myMemberData = _topCoord->getMyMemberData(); - invariant(isRollbackAllowed || myMemberData->getLastAppliedOpTime() <= opTime); + invariant(isRollbackAllowed || opTime >= myMemberData->getLastAppliedOpTime()); myMemberData->setLastAppliedOpTime(opTime, _replExecutor->now()); _updateLastCommittedOpTime_inlock(); - // Add the new applied optime to the list of stable optime candidates and then set the - // last stable optime. Stable optimes are used to determine the last optime that it is - // safe to revert the database to, in the event of a rollback. Note that master-slave mode has - // no automatic fail over, and so rollbacks never occur. Additionally, the commit point for a - // master-slave set will never advance, since it doesn't use any consensus protocol. Since the - // set of stable optime candidates can only get cleaned up when the commit point advances, we - // should refrain from updating stable optime candidates in master-slave mode, to avoid the - // candidates list from growing unbounded. - if (!opTime.isNull() && getReplicationMode() == Mode::modeReplSet) { + // Signal anyone waiting on optime changes. + _opTimeWaiterList.signalAndRemoveIf_inlock( + [opTime](Waiter* waiter) { return waiter->opTime <= opTime; }); + + + // Note that master-slave mode has no automatic fail over, and so rollbacks never occur. + // Additionally, the commit point for a master-slave set will never advance, since it doesn't + // use any consensus protocol. Since the set of stable optime candidates can only get cleaned up + // when the commit point advances, we should refrain from updating stable optime candidates in + // master-slave mode, to avoid the candidates list from growing unbounded. + if (opTime.isNull() || getReplicationMode() != Mode::modeReplSet) { + return; + } + + // Add the new applied optime to the list of stable optime candidates and then set the last + // stable optime. Stable optimes are used to determine the last optime that it is safe to revert + // the database to, in the event of a rollback via the 'recover to timestamp' method. If we are + // setting our applied optime to a value that doesn't represent a consistent database state, we + // should not add it to the set of stable optime candidates. For example, if we are in + // RECOVERING after a rollback using the 'rollbackViaRefetch' algorithm, we will be inconsistent + // until we reach the 'minValid' optime. + if (consistency == DataConsistency::Consistent) { _stableOpTimeCandidates.insert(opTime); _setStableTimestampForStorage_inlock(); } - - _opTimeWaiterList.signalAndRemoveIf_inlock( - [opTime](Waiter* waiter) { return waiter->opTime <= opTime; }); } void ReplicationCoordinatorImpl::_setMyLastDurableOpTime_inlock(const OpTime& opTime, @@ -2618,7 +2644,16 @@ ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator_inlock( _dropAllSnapshots_inlock(); } + // Upon transitioning out of ROLLBACK, we must clear any stable optime candidates that may have + // been rolled back. if (_memberState.rollback()) { + // Our 'lastApplied' optime at this point should be the rollback common point. We should + // remove any stable optime candidates greater than the common point. + auto lastApplied = _getMyLastAppliedOpTime_inlock(); + // The upper bound will give us the first optime T such that T > lastApplied. + auto deletePoint = _stableOpTimeCandidates.upper_bound(lastApplied); + _stableOpTimeCandidates.erase(deletePoint, _stableOpTimeCandidates.end()); + // Ensure that no snapshots were created while we were in rollback. invariant(!_currentCommittedSnapshot); } @@ -3132,7 +3167,8 @@ void ReplicationCoordinatorImpl::blacklistSyncSource(const HostAndPort& host, Da host)); } -void ReplicationCoordinatorImpl::resetLastOpTimesFromOplog(OperationContext* opCtx) { +void ReplicationCoordinatorImpl::resetLastOpTimesFromOplog(OperationContext* opCtx, + DataConsistency consistency) { StatusWith<OpTime> lastOpTimeStatus = _externalState->loadLastOpTime(opCtx); OpTime lastOpTime; if (!lastOpTimeStatus.isOK()) { @@ -3143,8 +3179,9 @@ void ReplicationCoordinatorImpl::resetLastOpTimesFromOplog(OperationContext* opC } stdx::unique_lock<stdx::mutex> lock(_mutex); - _setMyLastAppliedOpTime_inlock(lastOpTime, true); - _setMyLastDurableOpTime_inlock(lastOpTime, true); + bool isRollbackAllowed = true; + _setMyLastAppliedOpTime_inlock(lastOpTime, isRollbackAllowed, consistency); + _setMyLastDurableOpTime_inlock(lastOpTime, isRollbackAllowed); _reportUpstream_inlock(std::move(lock)); // Unlocked below. @@ -3222,13 +3259,33 @@ std::set<OpTime> ReplicationCoordinatorImpl::getStableOpTimeCandidates_forTest() return _stableOpTimeCandidates; } -void ReplicationCoordinatorImpl::_setStableTimestampForStorage_inlock() { +boost::optional<OpTime> ReplicationCoordinatorImpl::getStableOpTime_forTest() { + return _getStableOpTime_inlock(); +} - // Get the current stable optime. +boost::optional<OpTime> ReplicationCoordinatorImpl::_getStableOpTime_inlock() { auto commitPoint = _topCoord->getLastCommittedOpTime(); + if (_currentCommittedSnapshot) { + auto snapshotOpTime = _currentCommittedSnapshot->opTime; + invariant(snapshotOpTime.getTimestamp() <= commitPoint.getTimestamp()); + invariant(snapshotOpTime <= commitPoint); + } + + // Compute the current stable optime. auto stableOpTime = _calculateStableOpTime(_stableOpTimeCandidates, commitPoint); + if (stableOpTime) { + // By definition, the stable optime should never be greater than the commit point. + invariant(stableOpTime->getTimestamp() <= commitPoint.getTimestamp()); + invariant(*stableOpTime <= commitPoint); + } - invariant(stableOpTime <= commitPoint); + return stableOpTime; +} + +void ReplicationCoordinatorImpl::_setStableTimestampForStorage_inlock() { + + // Get the current stable optime. + auto stableOpTime = _getStableOpTime_inlock(); // If there is a valid stable optime, set it for the storage engine, and then remove any // old, unneeded stable optime candidates. @@ -3256,7 +3313,9 @@ void ReplicationCoordinatorImpl::advanceCommitPoint(const OpTime& committedOpTim void ReplicationCoordinatorImpl::_advanceCommitPoint_inlock(const OpTime& committedOpTime) { if (_topCoord->advanceLastCommittedOpTime(committedOpTime)) { if (_getMemberState_inlock().arbiter()) { - _setMyLastAppliedOpTime_inlock(committedOpTime, false); + // Arbiters do not store replicated data, so we consider their data trivially + // consistent. + _setMyLastAppliedOpTime_inlock(committedOpTime, false, DataConsistency::Consistent); } _setStableTimestampForStorage_inlock(); @@ -3548,13 +3607,20 @@ void ReplicationCoordinatorImpl::_updateCommittedSnapshot_inlock( // If we are in ROLLBACK state, do not set any new _currentCommittedSnapshot, as it will be // cleared at the end of rollback anyway. if (_memberState.rollback()) { - log() << "not updating committed snapshot because we are in rollback"; + log() << "Not updating committed snapshot because we are in rollback"; return; } invariant(!newCommittedSnapshot.opTime.isNull()); - invariant(newCommittedSnapshot.opTime.getTimestamp() <= - _topCoord->getLastCommittedOpTime().getTimestamp()); + + // The new committed snapshot should be <= the current replication commit point. + OpTime lastCommittedOpTime = _topCoord->getLastCommittedOpTime(); + invariant(newCommittedSnapshot.opTime.getTimestamp() <= lastCommittedOpTime.getTimestamp()); + invariant(newCommittedSnapshot.opTime <= lastCommittedOpTime); + + // The new committed snapshot should be >= the current snapshot. if (_currentCommittedSnapshot) { + invariant(newCommittedSnapshot.opTime.getTimestamp() >= + _currentCommittedSnapshot->opTime.getTimestamp()); invariant(newCommittedSnapshot.opTime >= _currentCommittedSnapshot->opTime); } if (MONGO_FAIL_POINT(disableSnapshotting)) diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index ff6053ce5a7..62f9594f226 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -157,7 +157,7 @@ public: virtual void setMyLastAppliedOpTime(const OpTime& opTime); virtual void setMyLastDurableOpTime(const OpTime& opTime); - virtual void setMyLastAppliedOpTimeForward(const OpTime& opTime); + virtual void setMyLastAppliedOpTimeForward(const OpTime& opTime, DataConsistency consistency); virtual void setMyLastDurableOpTimeForward(const OpTime& opTime); virtual void resetMyLastOpTimes(); @@ -265,7 +265,8 @@ public: virtual void blacklistSyncSource(const HostAndPort& host, Date_t until) override; - virtual void resetLastOpTimesFromOplog(OperationContext* opCtx) override; + virtual void resetLastOpTimesFromOplog(OperationContext* opCtx, + DataConsistency consistency) override; virtual bool shouldChangeSyncSource( const HostAndPort& currentSource, @@ -387,6 +388,7 @@ public: const OpTime& commitPoint); void cleanupStableOpTimeCandidates_forTest(std::set<OpTime>* candidates, OpTime stableOpTime); std::set<OpTime> getStableOpTimeCandidates_forTest(); + boost::optional<OpTime> getStableOpTime_forTest(); /** * Non-blocking version of updateTerm. @@ -682,7 +684,9 @@ private: /** * Helpers to set the last applied and durable OpTime. */ - void _setMyLastAppliedOpTime_inlock(const OpTime& opTime, bool isRollbackAllowed); + void _setMyLastAppliedOpTime_inlock(const OpTime& opTime, + bool isRollbackAllowed, + DataConsistency consistency); void _setMyLastDurableOpTime_inlock(const OpTime& opTime, bool isRollbackAllowed); /** @@ -1001,6 +1005,12 @@ private: void _updateCommittedSnapshot_inlock(SnapshotInfo newCommittedSnapshot); /** + * A helper method that returns the current stable optime based on the current commit point and + * set of stable optime candidates. + */ + boost::optional<OpTime> _getStableOpTime_inlock(); + + /** * Calculates the 'stable' replication optime given a set of optime candidates and the * current commit point. The stable optime is the greatest optime in 'candidates' that is * also less than or equal to 'commitPoint'. diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp index 5b8ce8441a5..90a764ea447 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp @@ -3944,6 +3944,96 @@ TEST_F(StableOpTimeTest, SetMyLastAppliedDoesntAddTimestampCandidateInMasterSlav ASSERT(repl->getStableOpTimeCandidates_forTest().empty()); } +TEST_F(StableOpTimeTest, ClearOpTimeCandidatesPastCommonPointAfterRollback) { + + assertStartSuccess(BSON("_id" + << "mySet" + << "version" + << 1 + << "members" + << BSON_ARRAY(BSON("host" + << "node1:12345" + << "_id" + << 0)) + << "protocolVersion" + << 1), + HostAndPort("node1", 12345)); + + auto repl = getReplCoord(); + long long term = 0; + ASSERT_OK(repl->setFollowerMode(MemberState::RS_SECONDARY)); + + OpTime rollbackCommonPoint = OpTime({1, 2}, term); + OpTime commitPoint = OpTime({1, 2}, term); + repl->advanceCommitPoint(commitPoint); + ASSERT_EQUALS(SnapshotName::min(), getStorageInterface()->getStableTimestamp()); + + repl->setMyLastAppliedOpTime(OpTime({1, 1}, term)); + repl->setMyLastAppliedOpTime(OpTime({1, 2}, term)); + repl->setMyLastAppliedOpTime(OpTime({1, 3}, term)); + repl->setMyLastAppliedOpTime(OpTime({1, 4}, term)); + + // The stable timestamp should be equal to the commit point timestamp. + const Timestamp stableTimestamp = + Timestamp(getStorageInterface()->getStableTimestamp().asU64()); + Timestamp expectedStableTimestamp = commitPoint.getTimestamp(); + ASSERT_EQUALS(expectedStableTimestamp, stableTimestamp); + + // The stable optime candidate set should contain optimes >= the stable optime. + std::set<OpTime> opTimeCandidates = repl->getStableOpTimeCandidates_forTest(); + std::set<OpTime> expectedOpTimeCandidates = { + OpTime({1, 2}, term), OpTime({1, 3}, term), OpTime({1, 4}, term)}; + ASSERT_OPTIME_SET_EQ(expectedOpTimeCandidates, opTimeCandidates); + + // Transition to ROLLBACK. The set of stable optime candidates should not have changed. + ASSERT_OK(repl->setFollowerMode(MemberState::RS_ROLLBACK)); + opTimeCandidates = repl->getStableOpTimeCandidates_forTest(); + ASSERT_OPTIME_SET_EQ(expectedOpTimeCandidates, opTimeCandidates); + + // Simulate a rollback to the common point. + auto opCtx = makeOperationContext(); + getExternalState()->setLastOpTime(rollbackCommonPoint); + repl->resetLastOpTimesFromOplog(opCtx.get(), + ReplicationCoordinator::DataConsistency::Inconsistent); + + // Transition to RECOVERING from ROLLBACK. + ASSERT_OK(repl->setFollowerMode(MemberState::RS_RECOVERING)); + + // Make sure the stable optime candidate set has been cleared of all entries past the common + // point. + opTimeCandidates = repl->getStableOpTimeCandidates_forTest(); + auto stableOpTime = repl->getStableOpTime_forTest(); + ASSERT(stableOpTime); + expectedOpTimeCandidates = {*stableOpTime}; + ASSERT_OPTIME_SET_EQ(expectedOpTimeCandidates, opTimeCandidates); +} + +TEST_F(StableOpTimeTest, OpTimeCandidatesAreNotAddedWhenStateIsNotConsistent) { + + initReplSetMode(); + auto repl = getReplCoord(); + long long term = 0; + + OpTime consistentOpTime = OpTime({1, 1}, term); + OpTime inconsistentOpTime = OpTime({1, 2}, term); + std::set<OpTime> expectedOpTimeCandidates = {OpTime({1, 1}, term)}; + + // Set the lastApplied optime forward when data is consistent, and check that it was added to + // the candidate set. + repl->setMyLastAppliedOpTimeForward(consistentOpTime, + ReplicationCoordinator::DataConsistency::Consistent); + ASSERT_EQUALS(consistentOpTime, repl->getMyLastAppliedOpTime()); + ASSERT_OPTIME_SET_EQ(expectedOpTimeCandidates, repl->getStableOpTimeCandidates_forTest()); + + // Set the lastApplied optime forward when data is not consistent, and check that it wasn't + // added to the candidate set. + repl->setMyLastAppliedOpTimeForward(inconsistentOpTime, + ReplicationCoordinator::DataConsistency::Inconsistent); + ASSERT_EQUALS(inconsistentOpTime, repl->getMyLastAppliedOpTime()); + ASSERT_OPTIME_SET_EQ(expectedOpTimeCandidates, repl->getStableOpTimeCandidates_forTest()); +} + + TEST_F(ReplCoordTest, NodeReturnsShutdownInProgressWhenWaitingUntilAnOpTimeDuringShutdown) { assertStartSuccess(BSON("_id" << "mySet" @@ -5081,11 +5171,12 @@ TEST_F(ReplCoordTest, OpTime time2(Timestamp(100, 2), 1); OpTime time3(Timestamp(100, 3), 1); + auto consistency = ReplicationCoordinator::DataConsistency::Consistent; getReplCoord()->setMyLastAppliedOpTime(time1); ASSERT_EQUALS(time1, getReplCoord()->getMyLastAppliedOpTime()); - getReplCoord()->setMyLastAppliedOpTimeForward(time3); + getReplCoord()->setMyLastAppliedOpTimeForward(time3, consistency); ASSERT_EQUALS(time3, getReplCoord()->getMyLastAppliedOpTime()); - getReplCoord()->setMyLastAppliedOpTimeForward(time2); + getReplCoord()->setMyLastAppliedOpTimeForward(time2, consistency); getReplCoord()->setMyLastDurableOpTimeForward(time2); ASSERT_EQUALS(time3, getReplCoord()->getMyLastAppliedOpTime()); } diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index 58a3efd664d..2dd8a3ff71f 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -200,7 +200,8 @@ void ReplicationCoordinatorMock::setMyLastDurableOpTime(const OpTime& opTime) { _myLastDurableOpTime = opTime; } -void ReplicationCoordinatorMock::setMyLastAppliedOpTimeForward(const OpTime& opTime) { +void ReplicationCoordinatorMock::setMyLastAppliedOpTimeForward(const OpTime& opTime, + DataConsistency consistency) { if (opTime > _myLastAppliedOpTime) { _myLastAppliedOpTime = opTime; } @@ -412,7 +413,8 @@ HostAndPort ReplicationCoordinatorMock::chooseNewSyncSource(const OpTime& lastOp void ReplicationCoordinatorMock::blacklistSyncSource(const HostAndPort& host, Date_t until) {} -void ReplicationCoordinatorMock::resetLastOpTimesFromOplog(OperationContext* opCtx) { +void ReplicationCoordinatorMock::resetLastOpTimesFromOplog(OperationContext* opCtx, + DataConsistency consistency) { invariant(false); } diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index 12bf54c1f6c..47a945b85d2 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -119,7 +119,7 @@ public: virtual void setMyLastAppliedOpTime(const OpTime& opTime); virtual void setMyLastDurableOpTime(const OpTime& opTime); - virtual void setMyLastAppliedOpTimeForward(const OpTime& opTime); + virtual void setMyLastAppliedOpTimeForward(const OpTime& opTime, DataConsistency consistency); virtual void setMyLastDurableOpTimeForward(const OpTime& opTime); virtual void resetMyLastOpTimes(); @@ -220,7 +220,7 @@ public: virtual void blacklistSyncSource(const HostAndPort& host, Date_t until); - virtual void resetLastOpTimesFromOplog(OperationContext* opCtx); + virtual void resetLastOpTimesFromOplog(OperationContext* opCtx, DataConsistency consistency); virtual bool shouldChangeSyncSource(const HostAndPort& currentSource, const rpc::ReplSetMetadata& replMetadata, diff --git a/src/mongo/db/repl/rollback_impl.cpp b/src/mongo/db/repl/rollback_impl.cpp index 31c34c016ba..fa03b891526 100644 --- a/src/mongo/db/repl/rollback_impl.cpp +++ b/src/mongo/db/repl/rollback_impl.cpp @@ -198,11 +198,23 @@ StatusWith<Timestamp> RollbackImpl::_findCommonPoint() { return commonPointSW.getStatus(); } - log() << "Rollback common point is " << commonPointSW.getValue().first; + OpTime commonPoint = commonPointSW.getValue().first; + OpTime lastCommittedOpTime = _replicationCoordinator->getLastCommittedOpTime(); + OpTime committedSnapshot = _replicationCoordinator->getCurrentCommittedSnapshotOpTime(); + + log() << "Rollback common point is " << commonPoint; + + // Rollback common point should be >= the replication commit point. invariant(!_replicationCoordinator->isV1ElectionProtocol() || - commonPointSW.getValue().first >= _replicationCoordinator->getLastCommittedOpTime()); + commonPoint.getTimestamp() >= lastCommittedOpTime.getTimestamp()); + invariant(!_replicationCoordinator->isV1ElectionProtocol() || + commonPoint >= lastCommittedOpTime); + + // Rollback common point should be >= the committed snapshot optime. + invariant(commonPoint.getTimestamp() >= committedSnapshot.getTimestamp()); + invariant(commonPoint >= committedSnapshot); - return commonPointSW.getValue().first.getTimestamp(); + return commonPoint.getTimestamp(); } Status RollbackImpl::_recoverToStableTimestamp(OperationContext* opCtx) { diff --git a/src/mongo/db/repl/rollback_test_fixture.cpp b/src/mongo/db/repl/rollback_test_fixture.cpp index 4724fc9da8d..c017bb6110b 100644 --- a/src/mongo/db/repl/rollback_test_fixture.cpp +++ b/src/mongo/db/repl/rollback_test_fixture.cpp @@ -118,7 +118,7 @@ RollbackTest::ReplicationCoordinatorRollbackMock::ReplicationCoordinatorRollback : ReplicationCoordinatorMock(service, createReplSettings()) {} void RollbackTest::ReplicationCoordinatorRollbackMock::resetLastOpTimesFromOplog( - OperationContext* opCtx) {} + OperationContext* opCtx, ReplicationCoordinator::DataConsistency consistency) {} void RollbackTest::ReplicationCoordinatorRollbackMock::failSettingFollowerMode( const MemberState& transitionToFail, ErrorCodes::Error codeToFailWith) { diff --git a/src/mongo/db/repl/rollback_test_fixture.h b/src/mongo/db/repl/rollback_test_fixture.h index c31aeed617e..29a301b8e1c 100644 --- a/src/mongo/db/repl/rollback_test_fixture.h +++ b/src/mongo/db/repl/rollback_test_fixture.h @@ -114,7 +114,8 @@ public: * Base class implementation triggers an invariant. This function is overridden to be a no-op * for rollback tests. */ - void resetLastOpTimesFromOplog(OperationContext* opCtx) override; + void resetLastOpTimesFromOplog(OperationContext* opCtx, + ReplicationCoordinator::DataConsistency consistency) override; /** * Returns IllegalOperation (does not forward call to diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp index 5fdbe0ad1b9..3ba1d2ef5b9 100644 --- a/src/mongo/db/repl/rs_rollback.cpp +++ b/src/mongo/db/repl/rs_rollback.cpp @@ -935,9 +935,20 @@ Status _syncRollback(OperationContext* opCtx, << e.what()); } - log() << "Rollback common point is " << how.commonPoint; + OpTime commonPoint = how.commonPoint; + OpTime lastCommittedOpTime = replCoord->getLastCommittedOpTime(); + OpTime committedSnapshot = replCoord->getCurrentCommittedSnapshotOpTime(); + + log() << "Rollback common point is " << commonPoint; + + // Rollback common point should be >= the replication commit point. invariant(!replCoord->isV1ElectionProtocol() || - how.commonPoint >= replCoord->getLastCommittedOpTime()); + commonPoint.getTimestamp() >= lastCommittedOpTime.getTimestamp()); + invariant(!replCoord->isV1ElectionProtocol() || commonPoint >= lastCommittedOpTime); + + // Rollback common point should be >= the committed snapshot optime. + invariant(commonPoint.getTimestamp() >= committedSnapshot.getTimestamp()); + invariant(commonPoint >= committedSnapshot); try { ON_BLOCK_EXIT([&] { @@ -1418,8 +1429,13 @@ void rollback_internal::syncFixUp(OperationContext* opCtx, } // Reload the lastAppliedOpTime and lastDurableOpTime value in the replcoord and the - // lastAppliedHash value in bgsync to reflect our new last op. - replCoord->resetLastOpTimesFromOplog(opCtx); + // lastAppliedHash value in bgsync to reflect our new last op. The rollback common point does + // not necessarily represent a consistent database state. For example, on a secondary, we may + // have rolled back to an optime that fell in the middle of an oplog application batch. We make + // the database consistent again after rollback by applying ops forward until we reach + // 'minValid'. + replCoord->resetLastOpTimesFromOplog(opCtx, + ReplicationCoordinator::DataConsistency::Inconsistent); } Status syncRollback(OperationContext* opCtx, diff --git a/src/mongo/db/repl/rs_rollback_no_uuid.cpp b/src/mongo/db/repl/rs_rollback_no_uuid.cpp index 5e1409b2470..765041a0638 100644 --- a/src/mongo/db/repl/rs_rollback_no_uuid.cpp +++ b/src/mongo/db/repl/rs_rollback_no_uuid.cpp @@ -935,8 +935,13 @@ void syncFixUp(OperationContext* opCtx, } // Reload the lastAppliedOpTime and lastDurableOpTime value in the replcoord and the - // lastAppliedHash value in bgsync to reflect our new last op. - replCoord->resetLastOpTimesFromOplog(opCtx); + // lastAppliedHash value in bgsync to reflect our new last op. The rollback common point does + // not necessarily represent a consistent database state. For example, on a secondary, we may + // have rolled back to an optime that fell in the middle of an oplog application batch. We make + // the database consistent again after rollback by applying ops forward until we reach + // 'minValid'. + replCoord->resetLastOpTimesFromOplog(opCtx, + ReplicationCoordinator::DataConsistency::Inconsistent); } Status _syncRollback(OperationContext* opCtx, @@ -986,7 +991,21 @@ Status _syncRollback(OperationContext* opCtx, << e.what()); } + OpTime commonPoint = how.commonPoint; + OpTime lastCommittedOpTime = replCoord->getLastCommittedOpTime(); + OpTime committedSnapshot = replCoord->getCurrentCommittedSnapshotOpTime(); + log() << "Rollback common point is " << how.commonPoint; + + // Rollback common point should be >= the replication commit point. + invariant(!replCoord->isV1ElectionProtocol() || + commonPoint.getTimestamp() >= lastCommittedOpTime.getTimestamp()); + invariant(!replCoord->isV1ElectionProtocol() || commonPoint >= lastCommittedOpTime); + + // Rollback common point should be >= the committed snapshot optime. + invariant(commonPoint.getTimestamp() >= committedSnapshot.getTimestamp()); + invariant(commonPoint >= committedSnapshot); + try { ON_BLOCK_EXIT([&] { auto status = replicationProcess->incrementRollbackID(opCtx); diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 35362355aad..f3af941fbc5 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -62,6 +62,7 @@ #include "mongo/db/repl/oplogreader.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/repl_set_config.h" +#include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/repl/replication_process.h" #include "mongo/db/repl/storage_interface.h" @@ -172,15 +173,17 @@ public: ApplyBatchFinalizer(ReplicationCoordinator* replCoord) : _replCoord(replCoord) {} virtual ~ApplyBatchFinalizer(){}; - virtual void record(const OpTime& newOpTime) { - _recordApplied(newOpTime); + virtual void record(const OpTime& newOpTime, + ReplicationCoordinator::DataConsistency consistency) { + _recordApplied(newOpTime, consistency); }; protected: - void _recordApplied(const OpTime& newOpTime) { + void _recordApplied(const OpTime& newOpTime, + ReplicationCoordinator::DataConsistency consistency) { // We have to use setMyLastAppliedOpTimeForward since this thread races with // ReplicationExternalStateImpl::onTransitionToPrimary. - _replCoord->setMyLastAppliedOpTimeForward(newOpTime); + _replCoord->setMyLastAppliedOpTimeForward(newOpTime, consistency); } void _recordDurable(const OpTime& newOpTime) { @@ -201,7 +204,8 @@ public: _waiterThread{&ApplyBatchFinalizerForJournal::_run, this} {}; ~ApplyBatchFinalizerForJournal(); - void record(const OpTime& newOpTime) override; + void record(const OpTime& newOpTime, + ReplicationCoordinator::DataConsistency consistency) override; private: /** @@ -232,8 +236,9 @@ ApplyBatchFinalizerForJournal::~ApplyBatchFinalizerForJournal() { _waiterThread.join(); } -void ApplyBatchFinalizerForJournal::record(const OpTime& newOpTime) { - _recordApplied(newOpTime); +void ApplyBatchFinalizerForJournal::record(const OpTime& newOpTime, + ReplicationCoordinator::DataConsistency consistency) { + _recordApplied(newOpTime, consistency); stdx::unique_lock<stdx::mutex> lock(_mutex); _latestOpTime = newOpTime; @@ -695,8 +700,14 @@ void fillWriterVectorsAndLastestSessionRecords( } // namespace -// Applies a batch of oplog entries, by writing the oplog entries to the local oplog -// and then using a set of threads to apply the operations. +/** + * Applies a batch of oplog entries by writing the oplog entries to the local oplog and then using + * a set of threads to apply the operations. If the batch application is successful, returns the + * optime of the last op applied, which should be the last op in the batch. To provide crash + * resilience, this function will advance the persistent value of 'minValid' to at least the + * last optime of the batch. If 'minValid' is already greater than or equal to the last optime of + * this batch, it will not be updated. + */ OpTime SyncTail::multiApply(OperationContext* opCtx, MultiApplier::Operations ops) { auto applyOperation = [this](MultiApplier::OperationPtrs* ops) -> Status { _applyFunc(ops, this); @@ -709,7 +720,9 @@ OpTime SyncTail::multiApply(OperationContext* opCtx, MultiApplier::Operations op } namespace { -void tryToGoLiveAsASecondary(OperationContext* opCtx, ReplicationCoordinator* replCoord) { +void tryToGoLiveAsASecondary(OperationContext* opCtx, + ReplicationCoordinator* replCoord, + OpTime minValid) { if (replCoord->isInPrimaryOrSecondaryState()) { return; } @@ -718,27 +731,35 @@ void tryToGoLiveAsASecondary(OperationContext* opCtx, ReplicationCoordinator* re ON_BLOCK_EXIT([] { attemptsToBecomeSecondary.increment(); }); // Need global X lock to transition to SECONDARY - Lock::GlobalWrite readLock(opCtx); + Lock::GlobalWrite writeLock(opCtx); + // Maintenance mode will force us to remain in RECOVERING state, no matter what. if (replCoord->getMaintenanceMode()) { - LOG(1) << "Can't go live (tryToGoLiveAsASecondary) as maintenance mode is active."; - // we're not actually going live + LOG(1) << "We cannot transition to SECONDARY state while in maintenance mode."; return; } - // Only state RECOVERING can transition to SECONDARY. + // We can only transition to SECONDARY from RECOVERING state. MemberState state(replCoord->getMemberState()); if (!state.recovering()) { - LOG(2) << "Can't go live (tryToGoLiveAsASecondary) as state != recovering."; + LOG(2) << "We cannot transition to SECONDARY state since we are not currently in " + "RECOVERING state. Current state: " + << state.toString(); return; } - // We can't go to SECONDARY until we reach minvalid. - if (replCoord->getMyLastAppliedOpTime() < - ReplicationProcess::get(opCtx)->getConsistencyMarkers()->getMinValid(opCtx)) { + // We can't go to SECONDARY state until we reach 'minValid', since the database may be in an + // inconsistent state before this point. If our state is inconsistent, we need to disallow reads + // from clients, which is why we stay in RECOVERING state. + auto lastApplied = replCoord->getMyLastAppliedOpTime(); + if (lastApplied < minValid) { + LOG(2) << "We cannot transition to SECONDARY state because our 'lastApplied' optime is " + "less than the 'minValid' optime. minValid optime: " + << minValid << ", lastApplied optime: " << lastApplied; return; } + // Execute the transition to SECONDARY. auto status = replCoord->setFollowerMode(MemberState::RS_SECONDARY); if (!status.isOK()) { warning() << "Failed to transition into " << MemberState(MemberState::RS_SECONDARY) @@ -859,6 +880,11 @@ void SyncTail::oplogApplication(ReplicationCoordinator* replCoord) { ? new ApplyBatchFinalizerForJournal(replCoord) : new ApplyBatchFinalizer(replCoord)}; + // Get replication consistency markers. + ReplicationProcess* replProcess = ReplicationProcess::get(replCoord->getServiceContext()); + ReplicationConsistencyMarkers* consistencyMarkers = replProcess->getConsistencyMarkers(); + OpTime minValid; + while (true) { // Exits on message from OpQueueBatcher. // Use a new operation context each iteration, as otherwise we may appear to use a single // collection name to refer to collections with different UUIDs. @@ -880,7 +906,11 @@ void SyncTail::oplogApplication(ReplicationCoordinator* replCoord) { } } - tryToGoLiveAsASecondary(&opCtx, replCoord); + // Get the current value of 'minValid'. + minValid = consistencyMarkers->getMinValid(&opCtx); + + // Transition to SECONDARY state, if possible. + tryToGoLiveAsASecondary(&opCtx, replCoord, minValid); long long termWhenBufferIsEmpty = replCoord->getTerm(); // Blocks up to a second waiting for a batch to be ready to apply. If one doesn't become @@ -888,6 +918,7 @@ void SyncTail::oplogApplication(ReplicationCoordinator* replCoord) { OpQueue ops = batcher.getNextBatch(Seconds(1)); if (ops.empty()) { if (ops.mustShutdown()) { + // Shut down and exit oplog application loop. return; } if (MONGO_FAIL_POINT(rsSyncApplyStop)) { @@ -918,16 +949,34 @@ void SyncTail::oplogApplication(ReplicationCoordinator* replCoord) { // Don't allow the fsync+lock thread to see intermediate states of batch application. stdx::lock_guard<SimpleMutex> fsynclk(filesLockedFsync); - // Do the work. - multiApply(&opCtx, ops.releaseBatch()); + // Apply the operations in this batch. 'multiApply' returns the optime of the last op that + // was applied, which should be the last optime in the batch. + auto lastOpTimeAppliedInBatch = multiApply(&opCtx, ops.releaseBatch()); + invariant(lastOpTimeAppliedInBatch == lastOpTimeInBatch); + + // In order to provide resilience in the event of a crash in the middle of batch + // application, 'multiApply' will update 'minValid' so that it is at least as great as the + // last optime that it applied in this batch. If 'minValid' was moved forward, we make sure + // to update our view of it here. + if (lastOpTimeInBatch > minValid) { + minValid = lastOpTimeInBatch; + } // Update various things that care about our last applied optime. Tests rely on 2 happening // before 3 even though it isn't strictly necessary. The order of 1 doesn't matter. - setNewTimestamp(opCtx.getServiceContext(), lastOpTimeInBatch.getTimestamp()); // 1 - ReplicationProcess::get(&opCtx)->getConsistencyMarkers()->setAppliedThrough( - &opCtx, - lastOpTimeInBatch); // 2 - finalizer->record(lastOpTimeInBatch); // 3 + + // 1. Update the global timestamp. + setNewTimestamp(opCtx.getServiceContext(), lastOpTimeInBatch.getTimestamp()); + + // 2. Persist our "applied through" optime to disk. + consistencyMarkers->setAppliedThrough(&opCtx, lastOpTimeInBatch); + + // 3. Finalize this batch. We are at a consistent optime if our current optime is >= the + // current 'minValid' optime. + auto consistency = (lastOpTimeInBatch >= minValid) + ? ReplicationCoordinator::DataConsistency::Consistent + : ReplicationCoordinator::DataConsistency::Inconsistent; + finalizer->record(lastOpTimeInBatch, consistency); } } |