diff options
19 files changed, 593 insertions, 249 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 07acdf5be45..f3393400981 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -499,7 +499,8 @@ bool ReplicationCoordinatorImpl::_startLoadLocalConfig(OperationContext* opCtx) } // Read the last op from the oplog after cleaning up any partially applied batches. - _replicationProcess->getReplicationRecovery()->recoverFromOplog(opCtx); + const auto stableTimestamp = boost::none; + _replicationProcess->getReplicationRecovery()->recoverFromOplog(opCtx, stableTimestamp); auto lastOpTimeStatus = _externalState->loadLastOpTime(opCtx); // Use a callback here, because _finishLoadLocalConfig calls isself() which requires diff --git a/src/mongo/db/repl/replication_recovery.cpp b/src/mongo/db/repl/replication_recovery.cpp index 1c2cef4dadd..6e676942929 100644 --- a/src/mongo/db/repl/replication_recovery.cpp +++ b/src/mongo/db/repl/replication_recovery.cpp @@ -42,122 +42,101 @@ namespace mongo { namespace repl { -namespace { - -/** - * Returns the timestamp at which we should start oplog application. Returns boost::none if - * there are no oplog entries to apply. - */ -boost::optional<Timestamp> _getOplogApplicationStartPoint(Timestamp checkpointTimestamp, - OpTime appliedThrough) { - if (!checkpointTimestamp.isNull() && !appliedThrough.isNull()) { - // In versions that support "recover to stable timestamp" you should never see a - // non-null appliedThrough in a checkpoint, since we never take checkpoints in the middle - // of a secondary batch application, and a node that does not support "recover to stable - // timestamp" should never see a non-null checkpointTimestamp. - severe() << "checkpointTimestamp (" << checkpointTimestamp.toBSON() - << ") and appliedThrough (" << appliedThrough << ") cannot both be non-null."; - fassertFailedNoTrace(40603); - - } else if (!checkpointTimestamp.isNull()) { - // If appliedThrough is null and the checkpointTimestamp is not null, then we recovered - // to a checkpoint and should use that checkpoint timestamp as the oplog application - // start point. - log() << "Starting recovery oplog application at the checkpointTimestamp: " - << checkpointTimestamp.toBSON(); - return checkpointTimestamp; - - } else if (!appliedThrough.isNull()) { - // If the checkpointTimestamp is null and the appliedThrough is not null, then we did not - // recover to a checkpoint and we should use the appliedThrough as the oplog application - // start point. - log() << "Starting recovery oplog application at the appliedThrough: " << appliedThrough; - return appliedThrough.getTimestamp(); - - } else { - log() << "No oplog entries to apply for recovery. appliedThrough and " - "checkpointTimestamp are both null."; - // No follow-up work to do. - return boost::none; - } - MONGO_UNREACHABLE; -} - -} // namespace - ReplicationRecoveryImpl::ReplicationRecoveryImpl(StorageInterface* storageInterface, ReplicationConsistencyMarkers* consistencyMarkers) : _storageInterface(storageInterface), _consistencyMarkers(consistencyMarkers) {} -void ReplicationRecoveryImpl::recoverFromOplog(OperationContext* opCtx) try { +void ReplicationRecoveryImpl::recoverFromOplog(OperationContext* opCtx, + boost::optional<Timestamp> stableTimestamp) try { if (_consistencyMarkers->getInitialSyncFlag(opCtx)) { log() << "No recovery needed. Initial sync flag set."; return; // Initial Sync will take over so no cleanup is needed. } const auto truncateAfterPoint = _consistencyMarkers->getOplogTruncateAfterPoint(opCtx); - const auto appliedThrough = _consistencyMarkers->getAppliedThrough(opCtx); - if (!truncateAfterPoint.isNull()) { log() << "Removing unapplied entries starting at: " << truncateAfterPoint.toBSON(); _truncateOplogTo(opCtx, truncateAfterPoint); - } - // Clear the truncateAfterPoint so that we don't truncate the next batch of oplog entries - // erroneously. - _consistencyMarkers->setOplogTruncateAfterPoint(opCtx, {}); - - // TODO (SERVER-30556): Delete this line since the old oplog delete from point cannot exist. - _consistencyMarkers->removeOldOplogDeleteFromPointField(opCtx); - - auto topOfOplogSW = _getLastAppliedOpTime(opCtx); - boost::optional<OpTime> topOfOplog = boost::none; - if (topOfOplogSW.getStatus() != ErrorCodes::CollectionIsEmpty && - topOfOplogSW.getStatus() != ErrorCodes::NamespaceNotFound) { - fassert(40290, topOfOplogSW); - topOfOplog = topOfOplogSW.getValue(); + // Clear the truncateAfterPoint so that we don't truncate the next batch of oplog entries + // erroneously. + _consistencyMarkers->setOplogTruncateAfterPoint(opCtx, {}); } - // If we have a checkpoint timestamp, then we recovered to a timestamp and should set the - // initial data timestamp to that. Otherwise, we simply recovered the data on disk so we should - // set the initial data timestamp to the top OpTime in the oplog once the data is consistent - // there. If there is nothing in the oplog, then we do not set the initial data timestamp. - auto checkpointTimestamp = _consistencyMarkers->getCheckpointTimestamp(opCtx); - if (!checkpointTimestamp.isNull()) { - - // If we have a checkpoint timestamp, we set the initial data timestamp now so that - // the operations we apply below can be given the proper timestamps. - _storageInterface->setInitialDataTimestamp(opCtx->getServiceContext(), checkpointTimestamp); - } - - // Oplog is empty. There are no oplog entries to apply, so we exit recovery. If there was a - // checkpointTimestamp then we already set the initial data timestamp. Otherwise, there is - // nothing to set it to. - if (!topOfOplog) { - log() << "No oplog entries to apply for recovery. Oplog is empty."; + auto topOfOplogSW = _getTopOfOplog(opCtx); + if (topOfOplogSW.getStatus() == ErrorCodes::CollectionIsEmpty || + topOfOplogSW.getStatus() == ErrorCodes::NamespaceNotFound) { + // Oplog is empty. There are no oplog entries to apply, so we exit recovery and go into + // initial sync. + log() << "No oplog entries to apply for recovery. Oplog is empty. Entering initial sync."; return; } + fassert(40290, topOfOplogSW); + const auto topOfOplog = topOfOplogSW.getValue(); - if (auto startPoint = _getOplogApplicationStartPoint(checkpointTimestamp, appliedThrough)) { - _applyToEndOfOplog(opCtx, startPoint.get(), topOfOplog->getTimestamp()); + const auto appliedThrough = _consistencyMarkers->getAppliedThrough(opCtx); + invariant(!stableTimestamp || appliedThrough.isNull() || + *stableTimestamp == appliedThrough.getTimestamp(), + str::stream() << "Stable timestamp " << stableTimestamp->toString() + << " does not equal appliedThrough timestamp " + << appliedThrough.toString()); + + // If we were passed in a stable timestamp, we are in rollback recovery and should recover from + // that stable timestamp. Otherwise, we're recovering at startup. If this storage engine + // supports recover to stable timestamp, we ask it for the recovery timestamp. If the storage + // engine returns a timestamp, we recover from that point. However, if the storage engine + // returns "none", the storage engine does not have a stable checkpoint and we must recover from + // an unstable checkpoint instead. + const bool supportsRecoverToStableTimestamp = + _storageInterface->supportsRecoverToStableTimestamp(opCtx->getServiceContext()); + if (!stableTimestamp && supportsRecoverToStableTimestamp) { + stableTimestamp = _storageInterface->getRecoveryTimestamp(opCtx->getServiceContext()); } - // If we don't have a checkpoint timestamp, then we are either not running a storage engine - // that supports "recover to stable timestamp" or we just upgraded from a version that didn't. - // In both cases, the data on disk is not consistent until we have applied all oplog entries to - // the end of the oplog, since we do not know which ones actually got applied before shutdown. - // As a result, we do not set the initial data timestamp until after we have applied to the end - // of the oplog. - if (checkpointTimestamp.isNull()) { - _storageInterface->setInitialDataTimestamp(opCtx->getServiceContext(), - topOfOplog->getTimestamp()); + if (stableTimestamp) { + invariant(supportsRecoverToStableTimestamp); + _recoverFromStableTimestamp(opCtx, *stableTimestamp, appliedThrough, topOfOplog); + } else { + _recoverFromUnstableCheckpoint(opCtx, appliedThrough, topOfOplog); } - } catch (...) { severe() << "Caught exception during replication recovery: " << exceptionToStatus(); std::terminate(); } +void ReplicationRecoveryImpl::_recoverFromStableTimestamp(OperationContext* opCtx, + Timestamp stableTimestamp, + OpTime appliedThrough, + OpTime topOfOplog) { + invariant(!stableTimestamp.isNull()); + invariant(!topOfOplog.isNull()); + log() << "Recovering from stable timestamp: " << stableTimestamp + << " (top of oplog: " << topOfOplog << ", appliedThrough: " << appliedThrough << ")"; + + log() << "Starting recovery oplog application at the stable timestamp: " << stableTimestamp; + _applyToEndOfOplog(opCtx, stableTimestamp, topOfOplog.getTimestamp()); +} + +void ReplicationRecoveryImpl::_recoverFromUnstableCheckpoint(OperationContext* opCtx, + OpTime appliedThrough, + OpTime topOfOplog) { + invariant(!topOfOplog.isNull()); + log() << "Recovering from an unstable checkpoint (top of oplog: " << topOfOplog + << ", appliedThrough: " << appliedThrough << ")"; + + if (appliedThrough.isNull()) { + // The appliedThrough would be null if we shut down cleanly or crashed as a primary. Either + // way we are consistent at the top of the oplog. + log() << "No oplog entries to apply for recovery. appliedThrough is null."; + } else { + // If the appliedThrough is not null, then we shut down uncleanly during secondary oplog + // application and must apply from the appliedThrough to the top of the oplog. + log() << "Starting recovery oplog application at the appliedThrough: " << appliedThrough + << ", through the top of the oplog: " << topOfOplog; + _applyToEndOfOplog(opCtx, appliedThrough.getTimestamp(), topOfOplog.getTimestamp()); + } +} + void ReplicationRecoveryImpl::_applyToEndOfOplog(OperationContext* opCtx, Timestamp oplogApplicationStartPoint, Timestamp topOfOplog) { @@ -167,8 +146,7 @@ void ReplicationRecoveryImpl::_applyToEndOfOplog(OperationContext* opCtx, // Check if we have any unapplied ops in our oplog. It is important that this is done after // deleting the ragged end of the oplog. if (oplogApplicationStartPoint == topOfOplog) { - log() - << "No oplog entries to apply for recovery. appliedThrough is at the top of the oplog."; + log() << "No oplog entries to apply for recovery. Start point is at the top of the oplog."; return; // We've applied all the valid oplog we have. } else if (oplogApplicationStartPoint > topOfOplog) { severe() << "Applied op " << oplogApplicationStartPoint.toBSON() @@ -209,15 +187,23 @@ void ReplicationRecoveryImpl::_applyToEndOfOplog(OperationContext* opCtx, // Apply remaining ops one at at time, but don't log them because they are already logged. UnreplicatedWritesBlock uwb(opCtx); + BSONObj entry; while (cursor->more()) { - auto entry = cursor->nextSafe(); + entry = cursor->nextSafe(); fassert(40294, SyncTail::syncApply(opCtx, entry, OplogApplication::Mode::kRecovering)); - _consistencyMarkers->setAppliedThrough(opCtx, - fassert(40295, OpTime::parseFromOplogEntry(entry))); } + + // We may crash before setting appliedThrough. If we have a stable checkpoint, we will recover + // to that checkpoint at a replication consistent point, and applying the oplog is safe. + // If we don't have a stable checkpoint, then we must be in startup recovery, and not rollback + // recovery, because we only roll back to a stable timestamp when we have a stable checkpoint. + // Startup recovery from an unstable checkpoint only ever applies a single batch and it is safe + // to replay the batch from any point. + _consistencyMarkers->setAppliedThrough(opCtx, + fassert(40295, OpTime::parseFromOplogEntry(entry))); } -StatusWith<OpTime> ReplicationRecoveryImpl::_getLastAppliedOpTime(OperationContext* opCtx) const { +StatusWith<OpTime> ReplicationRecoveryImpl::_getTopOfOplog(OperationContext* opCtx) const { const auto docsSW = _storageInterface->findDocuments(opCtx, NamespaceString::kRsOplogNamespace, boost::none, // Collection scan diff --git a/src/mongo/db/repl/replication_recovery.h b/src/mongo/db/repl/replication_recovery.h index 6748d695fa5..76d34671519 100644 --- a/src/mongo/db/repl/replication_recovery.h +++ b/src/mongo/db/repl/replication_recovery.h @@ -50,9 +50,11 @@ public: virtual ~ReplicationRecovery() = default; /** - * Recovers the data on disk from the oplog. + * Recovers the data on disk from the oplog. If the provided stable timestamp is not "none", + * this function assumes the data reflects that timestamp. */ - virtual void recoverFromOplog(OperationContext* opCtx) = 0; + virtual void recoverFromOplog(OperationContext* opCtx, + boost::optional<Timestamp> stableTimestamp) = 0; }; class ReplicationRecoveryImpl : public ReplicationRecovery { @@ -62,10 +64,28 @@ public: ReplicationRecoveryImpl(StorageInterface* storageInterface, ReplicationConsistencyMarkers* consistencyMarkers); - void recoverFromOplog(OperationContext* opCtx) override; + void recoverFromOplog(OperationContext* opCtx, + boost::optional<Timestamp> stableTimestamp) override; private: /** + * After truncating the oplog, completes recovery if we're recovering from a stable timestamp + * or a stable checkpoint. + */ + void _recoverFromStableTimestamp(OperationContext* opCtx, + Timestamp stableTimestamp, + OpTime appliedThrough, + OpTime topOfOplog); + + /** + * After truncating the oplog, completes recovery if we're recovering from an unstable + * checkpoint. + */ + void _recoverFromUnstableCheckpoint(OperationContext* opCtx, + OpTime appliedThrough, + OpTime topOfOplog); + + /** * Applies all oplog entries from oplogApplicationStartPoint (exclusive) to topOfOplog * (inclusive). This fasserts if oplogApplicationStartPoint is not in the oplog. */ @@ -77,7 +97,7 @@ private: * Gets the last applied OpTime from the end of the oplog. Returns CollectionIsEmpty if there is * no oplog. */ - StatusWith<OpTime> _getLastAppliedOpTime(OperationContext* opCtx) const; + StatusWith<OpTime> _getTopOfOplog(OperationContext* opCtx) const; /** * Truncates the oplog after and including the "truncateTimestamp" entry. diff --git a/src/mongo/db/repl/replication_recovery_mock.h b/src/mongo/db/repl/replication_recovery_mock.h index 220030ab0b5..e6149706d42 100644 --- a/src/mongo/db/repl/replication_recovery_mock.h +++ b/src/mongo/db/repl/replication_recovery_mock.h @@ -41,7 +41,8 @@ class ReplicationRecoveryMock : public ReplicationRecovery { public: ReplicationRecoveryMock() = default; - void recoverFromOplog(OperationContext* opCtx) override {} + void recoverFromOplog(OperationContext* opCtx, + boost::optional<Timestamp> stableTimestamp) override {} }; } // namespace repl diff --git a/src/mongo/db/repl/replication_recovery_test.cpp b/src/mongo/db/repl/replication_recovery_test.cpp index 5b87f3223be..192b25d0c6a 100644 --- a/src/mongo/db/repl/replication_recovery_test.cpp +++ b/src/mongo/db/repl/replication_recovery_test.cpp @@ -31,8 +31,10 @@ #include "mongo/platform/basic.h" #include "mongo/db/client.h" +#include "mongo/db/db_raii.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" +#include "mongo/db/repl/oplog_entry.h" #include "mongo/db/repl/oplog_interface_local.h" #include "mongo/db/repl/replication_consistency_markers_mock.h" #include "mongo/db/repl/replication_coordinator_mock.h" @@ -54,28 +56,31 @@ const NamespaceString testNs("a.a"); class StorageInterfaceRecovery : public StorageInterfaceImpl { public: - using OnSetInitialDataTimestampFn = stdx::function<void()>; + boost::optional<Timestamp> getRecoveryTimestamp(ServiceContext* serviceCtx) const override { + stdx::lock_guard<stdx::mutex> lock(_mutex); + return _recoveryTimestamp; + } - void setInitialDataTimestamp(ServiceContext* serviceCtx, Timestamp snapshotName) override { + void setRecoveryTimestamp(Timestamp recoveryTimestamp) { stdx::lock_guard<stdx::mutex> lock(_mutex); - _initialDataTimestamp = snapshotName; - _onSetInitialDataTimestampFn(); + _recoveryTimestamp = recoveryTimestamp; } - Timestamp getInitialDataTimestamp() const { + bool supportsRecoverToStableTimestamp(ServiceContext* serviceCtx) const override { stdx::lock_guard<stdx::mutex> lock(_mutex); - return _initialDataTimestamp; + return _supportsRecoverToStableTimestamp; } - void setOnSetInitialDataTimestampFn(OnSetInitialDataTimestampFn onSetInitialDataTimestampFn) { + void setSupportsRecoverToStableTimestamp(bool supports) { stdx::lock_guard<stdx::mutex> lock(_mutex); - _onSetInitialDataTimestampFn = onSetInitialDataTimestampFn; + _supportsRecoverToStableTimestamp = supports; } private: mutable stdx::mutex _mutex; Timestamp _initialDataTimestamp = Timestamp::min(); - OnSetInitialDataTimestampFn _onSetInitialDataTimestampFn = []() {}; + boost::optional<Timestamp> _recoveryTimestamp = boost::none; + bool _supportsRecoverToStableTimestamp = true; }; class ReplicationRecoveryTest : public ServiceContextMongoDTest { @@ -107,6 +112,11 @@ protected: return options; } + void testRecoveryAppliesDocumentsWhenAppliedThroughIsBehind(bool hasStableTimestamp, + bool hasStableCheckpoint); + void testRecoveryToStableAppliesDocumentsWithNoAppliedThrough(bool hasStableTimestamp); + void testRecoveryAppliesDocumentsWithNoAppliedThroughAfterTruncation(bool hasStableTimestamp); + private: void setUp() override { ServiceContextMongoDTest::setUp(); @@ -146,14 +156,61 @@ BSONObj _makeInsertDocument(int t) { } /** + * Creates an OplogEntry with given parameters and preset defaults for this test suite. + */ +repl::OplogEntry _makeOplogEntry(repl::OpTime opTime, + repl::OpTypeEnum opType, + BSONObj object, + boost::optional<BSONObj> object2 = boost::none) { + return repl::OplogEntry(opTime, // optime + 1LL, // hash + opType, // opType + testNs, // namespace + boost::none, // uuid + boost::none, // fromMigrate + repl::OplogEntry::kOplogVersion, // version + object, // o + object2, // o2 + {}, // sessionInfo + boost::none, // isUpsert + boost::none, // wall clock time + boost::none, // statement id + boost::none, // optime of previous write within same transaction + boost::none, // pre-image optime + boost::none); // post-image optime +} + +/** * Generates oplog entries with the given number used for the timestamp. */ -TimestampedBSONObj _makeOplogEntry(int t) { - return {BSON("ts" << Timestamp(t, t) << "h" << t << "ns" << testNs.ns() << "v" << 2 << "op" - << "i" - << "o" - << _makeInsertDocument(t)), - Timestamp(t)}; +TimestampedBSONObj _makeInsertOplogEntry(int t) { + auto entry = _makeOplogEntry(OpTime(Timestamp(t, t), 1), // optime + OpTypeEnum::kInsert, // op type + _makeInsertDocument(t), // o + boost::none); // o2 + return {entry.toBSON(), Timestamp(t)}; +} + +/** + * Creates a delete oplog entry with the given number used for the timestamp. + */ +OplogEntry _makeDeleteOplogEntry(int t, const BSONObj& documentToDelete) { + return _makeOplogEntry(OpTime(Timestamp(t, t), 1), // optime + OpTypeEnum::kDelete, // op type + documentToDelete, // o + boost::none); // o2 +} + +/** + * Creates an update oplog entry with the given number used for the timestamp. + */ +OplogEntry _makeUpdateOplogEntry(int t, + const BSONObj& documentToUpdate, + const BSONObj& updatedDocument) { + return _makeOplogEntry(OpTime(Timestamp(t, t), 1), // optime + OpTypeEnum::kUpdate, // op type + updatedDocument, // o + documentToUpdate); // o2 } /** @@ -176,7 +233,7 @@ void _setUpOplog(OperationContext* opCtx, StorageInterface* storage, std::vector for (int ts : timestamps) { ASSERT_OK(storage->insertDocument( - opCtx, oplogNs, _makeOplogEntry(ts), OpTime::kUninitializedTerm)); + opCtx, oplogNs, _makeInsertOplogEntry(ts), OpTime::kUninitializedTerm)); } } @@ -202,7 +259,7 @@ void _assertDocumentsInCollectionEquals(OperationContext* opCtx, void _assertDocsInOplog(OperationContext* opCtx, std::vector<int> timestamps) { std::vector<BSONObj> expectedOplog(timestamps.size()); std::transform(timestamps.begin(), timestamps.end(), expectedOplog.begin(), [](int ts) { - return _makeOplogEntry(ts).obj; + return _makeInsertOplogEntry(ts).obj; }); _assertDocumentsInCollectionEquals(opCtx, oplogNs, expectedOplog); } @@ -226,11 +283,25 @@ TEST_F(ReplicationRecoveryTest, RecoveryWithNoOplogSucceeds) { ASSERT_OK(getStorageInterface()->createCollection( opCtx, NamespaceString("local.other"), generateOptionsWithUuid())); - recovery.recoverFromOplog(opCtx); + recovery.recoverFromOplog(opCtx, boost::none); + + _assertDocsInOplog(opCtx, {}); + _assertDocsInTestCollection(opCtx, {}); +} + +TEST_F(ReplicationRecoveryTest, RecoveryWithNoOplogSucceedsWithStableTimestamp) { + ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers()); + auto opCtx = getOperationContext(); + + // Create the database. + ASSERT_OK(getStorageInterface()->createCollection( + opCtx, NamespaceString("local.other"), generateOptionsWithUuid())); + + Timestamp stableTimestamp(3, 3); + recovery.recoverFromOplog(opCtx, stableTimestamp); _assertDocsInOplog(opCtx, {}); _assertDocsInTestCollection(opCtx, {}); - ASSERT_EQ(getStorageInterfaceRecovery()->getInitialDataTimestamp(), Timestamp::min()); } TEST_F(ReplicationRecoveryTest, RecoveryWithEmptyOplogSucceeds) { @@ -239,11 +310,23 @@ TEST_F(ReplicationRecoveryTest, RecoveryWithEmptyOplogSucceeds) { _setUpOplog(opCtx, getStorageInterface(), {}); - recovery.recoverFromOplog(opCtx); + recovery.recoverFromOplog(opCtx, boost::none); + + _assertDocsInOplog(opCtx, {}); + _assertDocsInTestCollection(opCtx, {}); +} + +TEST_F(ReplicationRecoveryTest, RecoveryWithEmptyOplogSucceedsWithStableTimestamp) { + ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers()); + auto opCtx = getOperationContext(); + + _setUpOplog(opCtx, getStorageInterface(), {}); + + Timestamp stableTimestamp(3, 3); + recovery.recoverFromOplog(opCtx, stableTimestamp); _assertDocsInOplog(opCtx, {}); _assertDocsInTestCollection(opCtx, {}); - ASSERT_EQ(getStorageInterfaceRecovery()->getInitialDataTimestamp(), Timestamp::min()); } DEATH_TEST_F(ReplicationRecoveryTest, @@ -259,7 +342,19 @@ DEATH_TEST_F(ReplicationRecoveryTest, ASSERT_OK(getStorageInterface()->createCollection( opCtx, NamespaceString("local.other"), generateOptionsWithUuid())); - recovery.recoverFromOplog(opCtx); + recovery.recoverFromOplog(opCtx, boost::none); +} + +DEATH_TEST_F(ReplicationRecoveryTest, + RecoveryInvariantsIfStableTimestampAndDoesNotSupportRTT, + "Invariant failure") { + getStorageInterfaceRecovery()->setSupportsRecoverToStableTimestamp(false); + ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers()); + auto opCtx = getOperationContext(); + + _setUpOplog(opCtx, getStorageInterface(), {1}); + + recovery.recoverFromOplog(opCtx, Timestamp(1, 1)); } DEATH_TEST_F(ReplicationRecoveryTest, TruncateEntireOplogFasserts, "Fatal Assertion 40296") { @@ -270,7 +365,7 @@ DEATH_TEST_F(ReplicationRecoveryTest, TruncateEntireOplogFasserts, "Fatal Assert getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(3, 3), 1)); _setUpOplog(opCtx, getStorageInterface(), {7, 8, 9}); - recovery.recoverFromOplog(opCtx); + recovery.recoverFromOplog(opCtx, boost::none); } TEST_F(ReplicationRecoveryTest, RecoveryTruncatesOplogAtOplogTruncateAfterPoint) { @@ -281,13 +376,44 @@ TEST_F(ReplicationRecoveryTest, RecoveryTruncatesOplogAtOplogTruncateAfterPoint) getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(3, 3), 1)); _setUpOplog(opCtx, getStorageInterface(), {1, 2, 3, 4, 5}); - recovery.recoverFromOplog(opCtx); + recovery.recoverFromOplog(opCtx, boost::none); _assertDocsInOplog(opCtx, {1, 2, 3}); _assertDocsInTestCollection(opCtx, {}); ASSERT_EQ(getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx), Timestamp()); ASSERT_EQ(getConsistencyMarkers()->getAppliedThrough(opCtx), OpTime(Timestamp(3, 3), 1)); - ASSERT_EQ(getStorageInterfaceRecovery()->getInitialDataTimestamp(), Timestamp(3, 3)); +} + +TEST_F(ReplicationRecoveryTest, RecoverySucceedsWithOplogTruncatePointTooHigh) { + ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers()); + auto opCtx = getOperationContext(); + + getConsistencyMarkers()->setOplogTruncateAfterPoint(opCtx, Timestamp(6, 6)); + getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(3, 3), 1)); + _setUpOplog(opCtx, getStorageInterface(), {1, 2, 3, 4, 5}); + + recovery.recoverFromOplog(opCtx, boost::none); + + _assertDocsInOplog(opCtx, {1, 2, 3, 4, 5}); + _assertDocsInTestCollection(opCtx, {4, 5}); + ASSERT_EQ(getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx), Timestamp()); + ASSERT_EQ(getConsistencyMarkers()->getAppliedThrough(opCtx), OpTime(Timestamp(5, 5), 1)); +} + +TEST_F(ReplicationRecoveryTest, RecoverySucceedsWithOplogTruncatePointInGap) { + ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers()); + auto opCtx = getOperationContext(); + + getConsistencyMarkers()->setOplogTruncateAfterPoint(opCtx, Timestamp(4, 4)); + getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(2, 2), 1)); + _setUpOplog(opCtx, getStorageInterface(), {1, 2, 3, 5, 6}); + + recovery.recoverFromOplog(opCtx, boost::none); + + _assertDocsInOplog(opCtx, {1, 2, 3}); + _assertDocsInTestCollection(opCtx, {3}); + ASSERT_EQ(getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx), Timestamp()); + ASSERT_EQ(getConsistencyMarkers()->getAppliedThrough(opCtx), OpTime(Timestamp(3, 3), 1)); } TEST_F(ReplicationRecoveryTest, RecoverySkipsEverythingIfInitialSyncFlagIsSet) { @@ -299,80 +425,195 @@ TEST_F(ReplicationRecoveryTest, RecoverySkipsEverythingIfInitialSyncFlagIsSet) { getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(1, 1), 1)); _setUpOplog(opCtx, getStorageInterface(), {1, 2, 3, 4, 5}); - recovery.recoverFromOplog(opCtx); + recovery.recoverFromOplog(opCtx, boost::none); _assertDocsInOplog(opCtx, {1, 2, 3, 4, 5}); _assertDocsInTestCollection(opCtx, {}); ASSERT_EQ(getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx), Timestamp(4, 4)); ASSERT_EQ(getConsistencyMarkers()->getAppliedThrough(opCtx), OpTime(Timestamp(1, 1), 1)); - ASSERT_EQ(getStorageInterfaceRecovery()->getInitialDataTimestamp(), Timestamp::min()); +} + +void ReplicationRecoveryTest::testRecoveryAppliesDocumentsWhenAppliedThroughIsBehind( + bool hasStableTimestamp, bool hasStableCheckpoint) { + ASSERT(!(hasStableTimestamp && hasStableCheckpoint)); + + ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers()); + auto opCtx = getOperationContext(); + + auto appliedThroughTS = Timestamp(3, 3); + getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(appliedThroughTS, 1)); + _setUpOplog(opCtx, getStorageInterface(), {1, 2, 3, 4, 5}); + + boost::optional<Timestamp> stableTimestamp = boost::none; + if (hasStableCheckpoint) { + getStorageInterfaceRecovery()->setRecoveryTimestamp(appliedThroughTS); + } else if (hasStableTimestamp) { + stableTimestamp = appliedThroughTS; + } + + recovery.recoverFromOplog(opCtx, stableTimestamp); + + _assertDocsInOplog(opCtx, {1, 2, 3, 4, 5}); + _assertDocsInTestCollection(opCtx, {4, 5}); + ASSERT_EQ(getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx), Timestamp()); + + auto topTS = Timestamp(5, 5); + ASSERT_EQ(getConsistencyMarkers()->getAppliedThrough(opCtx), OpTime(topTS, 1)); } TEST_F(ReplicationRecoveryTest, RecoveryAppliesDocumentsWhenAppliedThroughIsBehind) { + getStorageInterfaceRecovery()->setSupportsRecoverToStableTimestamp(true); + bool hasStableTimestamp = false; + bool hasStableCheckpoint = false; + testRecoveryAppliesDocumentsWhenAppliedThroughIsBehind(hasStableTimestamp, hasStableCheckpoint); +} + +TEST_F(ReplicationRecoveryTest, RecoveryAppliesDocumentsWhenAppliedThroughIsBehindNoRTT) { + getStorageInterfaceRecovery()->setSupportsRecoverToStableTimestamp(false); + bool hasStableTimestamp = false; + bool hasStableCheckpoint = false; + testRecoveryAppliesDocumentsWhenAppliedThroughIsBehind(hasStableTimestamp, hasStableCheckpoint); +} + +TEST_F(ReplicationRecoveryTest, + RecoveryAppliesDocumentsWhenAppliedThroughIsBehindWithStableTimestamp) { + bool hasStableTimestamp = true; + bool hasStableCheckpoint = false; + testRecoveryAppliesDocumentsWhenAppliedThroughIsBehind(hasStableTimestamp, hasStableCheckpoint); +} + +TEST_F(ReplicationRecoveryTest, + RecoveryAppliesDocumentsWhenAppliedThroughIsBehindWithStableCheckpoint) { + bool hasStableTimestamp = false; + bool hasStableCheckpoint = true; + testRecoveryAppliesDocumentsWhenAppliedThroughIsBehind(hasStableTimestamp, hasStableCheckpoint); +} + +void ReplicationRecoveryTest::testRecoveryToStableAppliesDocumentsWithNoAppliedThrough( + bool hasStableTimestamp) { ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers()); auto opCtx = getOperationContext(); - getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(3, 3), 1)); _setUpOplog(opCtx, getStorageInterface(), {1, 2, 3, 4, 5}); - recovery.recoverFromOplog(opCtx); + auto startingTS = Timestamp(3, 3); + boost::optional<Timestamp> stableTimestamp = boost::none; + if (hasStableTimestamp) { + stableTimestamp = startingTS; + } else { + getStorageInterfaceRecovery()->setRecoveryTimestamp(startingTS); + } + recovery.recoverFromOplog(opCtx, stableTimestamp); _assertDocsInOplog(opCtx, {1, 2, 3, 4, 5}); _assertDocsInTestCollection(opCtx, {4, 5}); ASSERT_EQ(getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx), Timestamp()); ASSERT_EQ(getConsistencyMarkers()->getAppliedThrough(opCtx), OpTime(Timestamp(5, 5), 1)); - ASSERT_EQ(getStorageInterfaceRecovery()->getInitialDataTimestamp(), Timestamp(5, 5)); } -TEST_F(ReplicationRecoveryTest, RecoveryAppliesDocumentsWhenAppliedThroughIsBehindAfterTruncation) { +TEST_F(ReplicationRecoveryTest, + RecoveryAppliesDocumentsWithNoAppliedThroughAndStableTimestampIsBehind) { + testRecoveryToStableAppliesDocumentsWithNoAppliedThrough(true); +} + +TEST_F(ReplicationRecoveryTest, + RecoveryAppliesDocumentsWithNoAppliedThroughAndStableCheckpointIsBehind) { + testRecoveryToStableAppliesDocumentsWithNoAppliedThrough(false); +} + +TEST_F(ReplicationRecoveryTest, + RecoveryAppliesDocumentsWithUnmatchedAppliedThroughAndStableCheckpointIsBehind) { ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers()); auto opCtx = getOperationContext(); - getConsistencyMarkers()->setOplogTruncateAfterPoint(opCtx, Timestamp(4, 4)); - getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(1, 1), 1)); _setUpOplog(opCtx, getStorageInterface(), {1, 2, 3, 4, 5}); - recovery.recoverFromOplog(opCtx); + // Fake applying op 3, which will be reapplied. + auto appliedThroughTS = Timestamp(4, 4); + getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(appliedThroughTS, 1)); + ASSERT_OK(getStorageInterface()->insertDocument(opCtx, testNs, {_makeInsertDocument(3)}, 1)); + _assertDocsInTestCollection(opCtx, {3}); - _assertDocsInOplog(opCtx, {1, 2, 3}); - _assertDocsInTestCollection(opCtx, {2, 3}); + getStorageInterfaceRecovery()->setRecoveryTimestamp(Timestamp(2, 2)); + recovery.recoverFromOplog(opCtx, boost::none); + + _assertDocsInOplog(opCtx, {1, 2, 3, 4, 5}); + _assertDocsInTestCollection(opCtx, {3, 4, 5}); ASSERT_EQ(getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx), Timestamp()); - ASSERT_EQ(getConsistencyMarkers()->getAppliedThrough(opCtx), OpTime(Timestamp(3, 3), 1)); - ASSERT_EQ(getStorageInterfaceRecovery()->getInitialDataTimestamp(), Timestamp(3, 3)); + ASSERT_EQ(getConsistencyMarkers()->getAppliedThrough(opCtx), OpTime(Timestamp(5, 5), 1)); } -TEST_F(ReplicationRecoveryTest, RecoveryAppliesDocumentsWhenCheckpointTimestampIsBehind) { +TEST_F(ReplicationRecoveryTest, RecoveryIgnoresDroppedCollections) { ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers()); auto opCtx = getOperationContext(); - getConsistencyMarkers()->writeCheckpointTimestamp(opCtx, Timestamp(3, 3)); _setUpOplog(opCtx, getStorageInterface(), {1, 2, 3, 4, 5}); - recovery.recoverFromOplog(opCtx); + ASSERT_OK(getStorageInterface()->dropCollection(opCtx, testNs)); + { + AutoGetCollectionForReadCommand autoColl(opCtx, testNs); + ASSERT_FALSE(autoColl.getCollection()); + } + + getStorageInterfaceRecovery()->setRecoveryTimestamp(Timestamp(2, 2)); + recovery.recoverFromOplog(opCtx, boost::none); _assertDocsInOplog(opCtx, {1, 2, 3, 4, 5}); - _assertDocsInTestCollection(opCtx, {4, 5}); + { + AutoGetCollectionForReadCommand autoColl(opCtx, testNs); + ASSERT_FALSE(autoColl.getCollection()); + } ASSERT_EQ(getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx), Timestamp()); ASSERT_EQ(getConsistencyMarkers()->getAppliedThrough(opCtx), OpTime(Timestamp(5, 5), 1)); - ASSERT_EQ(getStorageInterfaceRecovery()->getInitialDataTimestamp(), Timestamp(3, 3)); } -TEST_F(ReplicationRecoveryTest, - RecoveryAppliesDocumentsWhenCheckpointTimestampIsBehindAfterTruncation) { +TEST_F(ReplicationRecoveryTest, RecoveryAppliesDocumentsWhenAppliedThroughIsBehindAfterTruncation) { + ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers()); + auto opCtx = getOperationContext(); + + getConsistencyMarkers()->setOplogTruncateAfterPoint(opCtx, Timestamp(4, 4)); + getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(1, 1), 1)); + _setUpOplog(opCtx, getStorageInterface(), {1, 2, 3, 4, 5}); + + recovery.recoverFromOplog(opCtx, boost::none); + + _assertDocsInOplog(opCtx, {1, 2, 3}); + _assertDocsInTestCollection(opCtx, {2, 3}); + ASSERT_EQ(getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx), Timestamp()); + ASSERT_EQ(getConsistencyMarkers()->getAppliedThrough(opCtx), OpTime(Timestamp(3, 3), 1)); +} + +void ReplicationRecoveryTest::testRecoveryAppliesDocumentsWithNoAppliedThroughAfterTruncation( + bool hasStableTimestamp) { ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers()); auto opCtx = getOperationContext(); getConsistencyMarkers()->setOplogTruncateAfterPoint(opCtx, Timestamp(4, 4)); - getConsistencyMarkers()->writeCheckpointTimestamp(opCtx, Timestamp(1, 1)); _setUpOplog(opCtx, getStorageInterface(), {1, 2, 3, 4, 5}); - recovery.recoverFromOplog(opCtx); + auto startingTS = Timestamp(1, 1); + boost::optional<Timestamp> stableTimestamp = boost::none; + if (hasStableTimestamp) { + stableTimestamp = startingTS; + } else { + getStorageInterfaceRecovery()->setRecoveryTimestamp(startingTS); + } + recovery.recoverFromOplog(opCtx, stableTimestamp); _assertDocsInOplog(opCtx, {1, 2, 3}); _assertDocsInTestCollection(opCtx, {2, 3}); ASSERT_EQ(getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx), Timestamp()); ASSERT_EQ(getConsistencyMarkers()->getAppliedThrough(opCtx), OpTime(Timestamp(3, 3), 1)); - ASSERT_EQ(getStorageInterfaceRecovery()->getInitialDataTimestamp(), Timestamp(1, 1)); +} + +TEST_F(ReplicationRecoveryTest, + RecoveryAppliesDocumentsWhenStableTimestampIsBehindAfterTruncation) { + testRecoveryAppliesDocumentsWithNoAppliedThroughAfterTruncation(true); +} + +TEST_F(ReplicationRecoveryTest, + RecoveryAppliesDocumentsWhenRecoveryTimestampIsBehindAfterTruncation) { + testRecoveryAppliesDocumentsWithNoAppliedThroughAfterTruncation(false); } DEATH_TEST_F(ReplicationRecoveryTest, AppliedThroughBehindOplogFasserts, "Fatal Assertion 40292") { @@ -382,7 +623,7 @@ DEATH_TEST_F(ReplicationRecoveryTest, AppliedThroughBehindOplogFasserts, "Fatal getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(1, 1), 1)); _setUpOplog(opCtx, getStorageInterface(), {3, 4, 5}); - recovery.recoverFromOplog(opCtx); + recovery.recoverFromOplog(opCtx, boost::none); } DEATH_TEST_F(ReplicationRecoveryTest, @@ -394,7 +635,7 @@ DEATH_TEST_F(ReplicationRecoveryTest, getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(9, 9), 1)); _setUpOplog(opCtx, getStorageInterface(), {1, 2, 3, 4, 5}); - recovery.recoverFromOplog(opCtx); + recovery.recoverFromOplog(opCtx, boost::none); } DEATH_TEST_F(ReplicationRecoveryTest, @@ -406,99 +647,109 @@ DEATH_TEST_F(ReplicationRecoveryTest, getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(3, 3), 1)); _setUpOplog(opCtx, getStorageInterface(), {1, 2, 4, 5}); - recovery.recoverFromOplog(opCtx); + recovery.recoverFromOplog(opCtx, boost::none); } -TEST_F(ReplicationRecoveryTest, RecoverySetsInitialDataTimestampToCheckpointTimestampIfItExists) { - ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers()); - auto opCtx = getOperationContext(); - - // Assert that we set the initial data timestamp before we apply operations. - getStorageInterfaceRecovery()->setOnSetInitialDataTimestampFn( - [&]() { ASSERT(getConsistencyMarkers()->getAppliedThrough(opCtx).isNull()); }); - - getConsistencyMarkers()->writeCheckpointTimestamp(opCtx, Timestamp(4, 4)); - _setUpOplog(opCtx, getStorageInterface(), {1, 2, 3, 4, 5, 6}); - - recovery.recoverFromOplog(opCtx); - - _assertDocsInOplog(opCtx, {1, 2, 3, 4, 5, 6}); - _assertDocsInTestCollection(opCtx, {5, 6}); - ASSERT(getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx).isNull()); - ASSERT_EQ(getConsistencyMarkers()->getAppliedThrough(opCtx), OpTime(Timestamp(6, 6), 6)); - ASSERT_EQ(getStorageInterfaceRecovery()->getInitialDataTimestamp(), Timestamp(4, 4)); -} - -TEST_F(ReplicationRecoveryTest, - RecoverySetsInitialDataTimestampToTopOfOplogIfNoCheckpointTimestampAndSingleOp) { +TEST_F(ReplicationRecoveryTest, RecoveryDoesNotApplyOperationsIfAppliedThroughIsNull) { ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers()); auto opCtx = getOperationContext(); _setUpOplog(opCtx, getStorageInterface(), {5}); - recovery.recoverFromOplog(opCtx); + recovery.recoverFromOplog(opCtx, boost::none); _assertDocsInOplog(opCtx, {5}); _assertDocsInTestCollection(opCtx, {}); ASSERT(getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx).isNull()); ASSERT(getConsistencyMarkers()->getAppliedThrough(opCtx).isNull()); - ASSERT(getConsistencyMarkers()->getCheckpointTimestamp(opCtx).isNull()); - ASSERT_EQ(getStorageInterfaceRecovery()->getInitialDataTimestamp(), Timestamp(5, 5)); } -TEST_F(ReplicationRecoveryTest, - RecoverySetsInitialDataTimestampToTopOfOplogIfNoCheckpointTimestampAndMultipleOps) { +DEATH_TEST_F(ReplicationRecoveryTest, + RecoveryInvariantsWithUnequalStableTimestampAndAppliedThrough, + "Invariant failure") { ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers()); auto opCtx = getOperationContext(); - // Assert that we set the initial data timestamp after we apply operations. - getStorageInterfaceRecovery()->setOnSetInitialDataTimestampFn([&]() { - ASSERT_EQ(getConsistencyMarkers()->getAppliedThrough(opCtx), OpTime(Timestamp(6, 6), 6)); - }); - getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(5, 5), 5)); - - _setUpOplog(opCtx, getStorageInterface(), {5, 6}); + _setUpOplog(opCtx, getStorageInterface(), {5}); - recovery.recoverFromOplog(opCtx); + getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(3, 3), 1)); - _assertDocsInOplog(opCtx, {5, 6}); - _assertDocsInTestCollection(opCtx, {6}); - ASSERT(getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx).isNull()); - ASSERT_EQ(getConsistencyMarkers()->getAppliedThrough(opCtx), OpTime(Timestamp(6, 6), 6)); - ASSERT(getConsistencyMarkers()->getCheckpointTimestamp(opCtx).isNull()); - ASSERT_EQ(getStorageInterfaceRecovery()->getInitialDataTimestamp(), Timestamp(6, 6)); + recovery.recoverFromOplog(opCtx, Timestamp(4, 4)); } -TEST_F(ReplicationRecoveryTest, - RecoveryDoesNotSetInitialDataTimestampIfNoCheckpointTimestampOrOplog) { +TEST_F(ReplicationRecoveryTest, RecoveryAppliesUpdatesIdempotently) { ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers()); auto opCtx = getOperationContext(); - _setUpOplog(opCtx, getStorageInterface(), {}); + ASSERT_OK(getStorageInterface()->insertDocument(opCtx, testNs, {_makeInsertDocument(2)}, 1)); + ASSERT_OK(getStorageInterface()->insertDocument(opCtx, testNs, {_makeInsertDocument(3)}, 1)); + _assertDocsInTestCollection(opCtx, {2, 3}); - recovery.recoverFromOplog(opCtx); + auto ts = 1; + getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(ts, ts), 1)); + _setUpOplog(opCtx, getStorageInterface(), {1}); + + // Test that updates and deletes on a missing document succeed. + ts++; + ASSERT_OK(getStorageInterface()->insertDocument( + opCtx, + oplogNs, + {_makeUpdateOplogEntry(ts, BSON("_id" << 1), BSON("$set" << BSON("a" << 7))).toBSON(), + Timestamp(ts, ts)}, + OpTime::kUninitializedTerm)); + ts++; + ASSERT_OK(getStorageInterface()->insertDocument( + opCtx, + oplogNs, + {_makeDeleteOplogEntry(ts, BSON("_id" << 1)).toBSON(), Timestamp(ts, ts)}, + OpTime::kUninitializedTerm)); + // Test that updates and deletes on a document succeed. + ts++; + ASSERT_OK(getStorageInterface()->insertDocument( + opCtx, + oplogNs, + {_makeUpdateOplogEntry(ts, BSON("_id" << 2), BSON("$set" << BSON("a" << 7))).toBSON(), + Timestamp(ts, ts)}, + OpTime::kUninitializedTerm)); + ts++; + ASSERT_OK(getStorageInterface()->insertDocument( + opCtx, + oplogNs, + {_makeDeleteOplogEntry(ts, BSON("_id" << 2)).toBSON(), Timestamp(ts, ts)}, + OpTime::kUninitializedTerm)); + // Test that updates on a document succeed. + ts++; + ASSERT_OK(getStorageInterface()->insertDocument( + opCtx, + oplogNs, + {_makeUpdateOplogEntry(ts, BSON("_id" << 3), BSON("$set" << BSON("a" << 7))).toBSON(), + Timestamp(ts, ts)}, + OpTime::kUninitializedTerm)); + + recovery.recoverFromOplog(opCtx, boost::none); + + std::vector<BSONObj> expectedColl{BSON("_id" << 3 << "a" << 7)}; + _assertDocumentsInCollectionEquals(opCtx, testNs, expectedColl); - _assertDocsInOplog(opCtx, {}); - _assertDocsInTestCollection(opCtx, {}); - ASSERT(getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx).isNull()); - ASSERT(getConsistencyMarkers()->getAppliedThrough(opCtx).isNull()); - ASSERT(getConsistencyMarkers()->getCheckpointTimestamp(opCtx).isNull()); - ASSERT_EQ(getStorageInterfaceRecovery()->getInitialDataTimestamp(), Timestamp::min()); + ASSERT_EQ(getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx), Timestamp()); + ASSERT_EQ(getConsistencyMarkers()->getAppliedThrough(opCtx), OpTime(Timestamp(ts, ts), 1)); } -DEATH_TEST_F(ReplicationRecoveryTest, - RecoveryFassertsWithNonNullCheckpointTimestampAndAppliedThrough, - "Fatal Assertion 40603") { +DEATH_TEST_F(ReplicationRecoveryTest, RecoveryFailsWithBadOp, "terminate() called") { ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers()); auto opCtx = getOperationContext(); - _setUpOplog(opCtx, getStorageInterface(), {5}); + getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(1, 1), 1)); + _setUpOplog(opCtx, getStorageInterface(), {1}); - getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(3, 3), 1)); - getConsistencyMarkers()->writeCheckpointTimestamp(opCtx, Timestamp(4, 4)); + ASSERT_OK(getStorageInterface()->insertDocument( + opCtx, + oplogNs, + {_makeUpdateOplogEntry(2, BSON("bad_op" << 1), BSON("$set" << BSON("a" << 7))).toBSON(), + Timestamp(2, 2)}, + OpTime::kUninitializedTerm)); - recovery.recoverFromOplog(opCtx); + recovery.recoverFromOplog(opCtx, boost::none); } - } // namespace diff --git a/src/mongo/db/repl/rollback_impl.cpp b/src/mongo/db/repl/rollback_impl.cpp index 1878d2cc679..cb332e44e16 100644 --- a/src/mongo/db/repl/rollback_impl.cpp +++ b/src/mongo/db/repl/rollback_impl.cpp @@ -149,14 +149,14 @@ Status RollbackImpl::runRollback(OperationContext* opCtx) { } // Recover to the stable timestamp. - status = _recoverToStableTimestamp(opCtx); - if (!status.isOK()) { - return status; + auto stableTimestampSW = _recoverToStableTimestamp(opCtx); + if (!stableTimestampSW.isOK()) { + return stableTimestampSW.getStatus(); } - _listener->onRecoverToStableTimestamp(); + _listener->onRecoverToStableTimestamp(stableTimestampSW.getValue()); // Run the oplog recovery logic. - status = _oplogRecovery(opCtx); + status = _oplogRecovery(opCtx, stableTimestampSW.getValue()); if (!status.isOK()) { return status; } @@ -449,7 +449,7 @@ Timestamp RollbackImpl::_findTruncateTimestamp( return truncatePointTime.getValue().getTimestamp(); } -Status RollbackImpl::_recoverToStableTimestamp(OperationContext* opCtx) { +StatusWith<Timestamp> RollbackImpl::_recoverToStableTimestamp(OperationContext* opCtx) { if (_isInShutdown()) { return Status(ErrorCodes::ShutdownInProgress, "rollback shutting down"); } @@ -465,12 +465,12 @@ Status RollbackImpl::_recoverToStableTimestamp(OperationContext* opCtx) { } } -Status RollbackImpl::_oplogRecovery(OperationContext* opCtx) { +Status RollbackImpl::_oplogRecovery(OperationContext* opCtx, Timestamp stableTimestamp) { if (_isInShutdown()) { return Status(ErrorCodes::ShutdownInProgress, "rollback shutting down"); } // Run the recovery process. - _replicationProcess->getReplicationRecovery()->recoverFromOplog(opCtx); + _replicationProcess->getReplicationRecovery()->recoverFromOplog(opCtx, stableTimestamp); return Status::OK(); } diff --git a/src/mongo/db/repl/rollback_impl.h b/src/mongo/db/repl/rollback_impl.h index 6fa56bc8427..aa086c1d708 100644 --- a/src/mongo/db/repl/rollback_impl.h +++ b/src/mongo/db/repl/rollback_impl.h @@ -115,7 +115,7 @@ public: /** * Function called after we recover to the stable timestamp. */ - virtual void onRecoverToStableTimestamp() noexcept {} + virtual void onRecoverToStableTimestamp(Timestamp stableTimestamp) noexcept {} /** * Function called after we recover from the oplog. @@ -219,14 +219,15 @@ private: /** * Recovers to the stable timestamp while holding the global exclusive lock. + * Returns the stable timestamp that the storage engine recovered to. */ - Status _recoverToStableTimestamp(OperationContext* opCtx); + StatusWith<Timestamp> _recoverToStableTimestamp(OperationContext* opCtx); /** * Runs the oplog recovery logic. This involves applying oplog operations between the stable * timestamp and the common point. */ - Status _oplogRecovery(OperationContext* opCtx); + Status _oplogRecovery(OperationContext* opCtx, Timestamp stableTimestamp); /** * Process a single oplog entry that is getting rolled back and update the necessary rollback diff --git a/src/mongo/db/repl/rollback_impl_test.cpp b/src/mongo/db/repl/rollback_impl_test.cpp index 62524d09db2..bb060b1493a 100644 --- a/src/mongo/db/repl/rollback_impl_test.cpp +++ b/src/mongo/db/repl/rollback_impl_test.cpp @@ -56,15 +56,16 @@ public: /** * If '_recoverToTimestampStatus' is non-empty, returns it. If '_recoverToTimestampStatus' is - * empty, updates '_currTimestamp' to be equal to '_stableTimestamp' and returns an OK status. + * empty, updates '_currTimestamp' to be equal to '_stableTimestamp' and returns the new value + * of '_currTimestamp'. */ - Status recoverToStableTimestamp(ServiceContext* serviceCtx) override { + StatusWith<Timestamp> recoverToStableTimestamp(ServiceContext* serviceCtx) override { stdx::lock_guard<stdx::mutex> lock(_mutex); if (_recoverToTimestampStatus) { return _recoverToTimestampStatus.get(); } else { _currTimestamp = _stableTimestamp; - return Status::OK(); + return _currTimestamp; } } @@ -125,9 +126,12 @@ protected: stdx::function<void()> _onTransitionToRollbackFn = [this]() { _transitionedToRollback = true; }; bool _recoveredToStableTimestamp = false; - stdx::function<void()> _onRecoverToStableTimestampFn = [this]() { - _recoveredToStableTimestamp = true; - }; + Timestamp _stableTimestamp; + stdx::function<void(Timestamp)> _onRecoverToStableTimestampFn = + [this](Timestamp stableTimestamp) { + _recoveredToStableTimestamp = true; + _stableTimestamp = stableTimestamp; + }; bool _recoveredFromOplog = false; stdx::function<void()> _onRecoverFromOplogFn = [this]() { _recoveredFromOplog = true; }; @@ -175,19 +179,19 @@ class RollbackImplTest::Listener : public RollbackImpl::Listener { public: Listener(RollbackImplTest* test) : _test(test) {} - void onTransitionToRollback() noexcept { + void onTransitionToRollback() noexcept override { _test->_onTransitionToRollbackFn(); } - void onCommonPointFound(Timestamp commonPoint) noexcept { + void onCommonPointFound(Timestamp commonPoint) noexcept override { _test->_onCommonPointFoundFn(commonPoint); } - void onRecoverToStableTimestamp() noexcept { - _test->_onRecoverToStableTimestampFn(); + void onRecoverToStableTimestamp(Timestamp stableTimestamp) noexcept override { + _test->_onRecoverToStableTimestampFn(stableTimestamp); } - void onRecoverFromOplog() noexcept { + void onRecoverFromOplog() noexcept override { _test->_onRecoverFromOplogFn(); } @@ -321,13 +325,20 @@ TEST_F(RollbackImplTest, RollbackCallsRecoverToStableTimestamp) { // Check the current timestamp. ASSERT_EQUALS(currTimestamp, _storageInterface->getCurrentTimestamp()); + ASSERT_EQUALS(Timestamp(), _stableTimestamp); // Run rollback. ASSERT_OK(_rollback->runRollback(_opCtx.get())); + // Set the stable timestamp ahead to see that the current timestamp and the stable timestamp + // we recovered to don't change. + auto newTimestamp = Timestamp(30, 0); + _storageInterface->setStableTimestamp(nullptr, newTimestamp); + // Make sure "recover to timestamp" occurred by checking that the current timestamp was set back // to the stable timestamp. ASSERT_EQUALS(stableTimestamp, _storageInterface->getCurrentTimestamp()); + ASSERT_EQUALS(stableTimestamp, _stableTimestamp); } TEST_F(RollbackImplTest, RollbackReturnsBadStatusIfRecoverToStableTimestampFails) { @@ -348,6 +359,7 @@ TEST_F(RollbackImplTest, RollbackReturnsBadStatusIfRecoverToStableTimestampFails // Check the current timestamp. ASSERT_EQUALS(currTimestamp, _storageInterface->getCurrentTimestamp()); + ASSERT_EQUALS(Timestamp(), _stableTimestamp); // Run rollback. auto rollbackStatus = _rollback->runRollback(_opCtx.get()); @@ -355,6 +367,7 @@ TEST_F(RollbackImplTest, RollbackReturnsBadStatusIfRecoverToStableTimestampFails // Make sure rollback failed, and didn't execute the recover to timestamp logic. ASSERT_EQUALS(recoverToTimestampStatus, rollbackStatus); ASSERT_EQUALS(currTimestamp, _storageInterface->getCurrentTimestamp()); + ASSERT_EQUALS(Timestamp(), _stableTimestamp); // Make sure we transitioned back to SECONDARY state. ASSERT_EQUALS(_coordinator->getMemberState(), MemberState::RS_SECONDARY); @@ -400,8 +413,9 @@ TEST_F(RollbackImplTest, RollbackSkipsRecoverFromOplogWhenShutdownEarly) { ASSERT_OK(_insertOplogEntry(op.first)); ASSERT_OK(_insertOplogEntry(makeOp(2))); - _onRecoverToStableTimestampFn = [this]() { + _onRecoverToStableTimestampFn = [this](Timestamp stableTimestamp) { _recoveredToStableTimestamp = true; + _stableTimestamp = stableTimestamp; _rollback->shutdown(); }; @@ -415,6 +429,8 @@ TEST_F(RollbackImplTest, RollbackSkipsRecoverFromOplogWhenShutdownEarly) { // Make sure we transitioned back to SECONDARY state. ASSERT_EQUALS(_coordinator->getMemberState(), MemberState::RS_SECONDARY); + + ASSERT(_stableTimestamp.isNull()); } TEST_F(RollbackImplTest, RollbackSucceeds) { diff --git a/src/mongo/db/repl/storage_interface.h b/src/mongo/db/repl/storage_interface.h index 895c6d93a6e..9f517d96971 100644 --- a/src/mongo/db/repl/storage_interface.h +++ b/src/mongo/db/repl/storage_interface.h @@ -348,12 +348,22 @@ public: * Reverts the state of all database data to the last stable timestamp. * * The "local" database is exempt and none of its state should be reverted except for - * "local.replset.minvalid" and "local.replset.checkpointTimestamp" which should be reverted to - * the last stable timestamp. + * "local.replset.minvalid" which should be reverted to the last stable timestamp. * * The 'stable' timestamp is set by calling StorageInterface::setStableTimestamp. */ - virtual Status recoverToStableTimestamp(ServiceContext* serviceCtx) = 0; + virtual StatusWith<Timestamp> recoverToStableTimestamp(ServiceContext* serviceCtx) = 0; + + /** + * Returns whether the storage engine supports "recover to stable timestamp". + */ + virtual bool supportsRecoverToStableTimestamp(ServiceContext* serviceCtx) const = 0; + + /** + * Returns the stable timestamp that the storage engine recovered to on startup. If the + * recovery point was not stable, returns "none". + */ + virtual boost::optional<Timestamp> getRecoveryTimestamp(ServiceContext* serviceCtx) const = 0; /** * Waits for oplog writes to be visible in the oplog. diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp index 7d127709294..d816ee05ea6 100644 --- a/src/mongo/db/repl/storage_interface_impl.cpp +++ b/src/mongo/db/repl/storage_interface_impl.cpp @@ -1014,10 +1014,19 @@ void StorageInterfaceImpl::setInitialDataTimestamp(ServiceContext* serviceCtx, serviceCtx->getGlobalStorageEngine()->setInitialDataTimestamp(snapshotName); } -Status StorageInterfaceImpl::recoverToStableTimestamp(ServiceContext* serviceCtx) { +StatusWith<Timestamp> StorageInterfaceImpl::recoverToStableTimestamp(ServiceContext* serviceCtx) { return serviceCtx->getGlobalStorageEngine()->recoverToStableTimestamp(); } +bool StorageInterfaceImpl::supportsRecoverToStableTimestamp(ServiceContext* serviceCtx) const { + return serviceCtx->getGlobalStorageEngine()->supportsRecoverToStableTimestamp(); +} + +boost::optional<Timestamp> StorageInterfaceImpl::getRecoveryTimestamp( + ServiceContext* serviceCtx) const { + return serviceCtx->getGlobalStorageEngine()->getRecoveryTimestamp(); +} + Status StorageInterfaceImpl::isAdminDbValid(OperationContext* opCtx) { AutoGetDb autoDB(opCtx, "admin", MODE_X); auto adminDb = autoDB.getDb(); diff --git a/src/mongo/db/repl/storage_interface_impl.h b/src/mongo/db/repl/storage_interface_impl.h index 5200b11839b..6a9b6c07ee6 100644 --- a/src/mongo/db/repl/storage_interface_impl.h +++ b/src/mongo/db/repl/storage_interface_impl.h @@ -156,7 +156,11 @@ public: void setInitialDataTimestamp(ServiceContext* serviceCtx, Timestamp snapshotName) override; - Status recoverToStableTimestamp(ServiceContext* serviceCtx) override; + StatusWith<Timestamp> recoverToStableTimestamp(ServiceContext* serviceCtx) override; + + bool supportsRecoverToStableTimestamp(ServiceContext* serviceCtx) const override; + + boost::optional<Timestamp> getRecoveryTimestamp(ServiceContext* serviceCtx) const override; /** * Checks that the "admin" database contains a supported version of the auth data schema. diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h index 76acf703819..cd02935179f 100644 --- a/src/mongo/db/repl/storage_interface_mock.h +++ b/src/mongo/db/repl/storage_interface_mock.h @@ -285,10 +285,18 @@ public: Timestamp getInitialDataTimestamp() const; - Status recoverToStableTimestamp(ServiceContext* serviceCtx) override { + StatusWith<Timestamp> recoverToStableTimestamp(ServiceContext* serviceCtx) override { return Status{ErrorCodes::IllegalOperation, "recoverToStableTimestamp not implemented."}; } + bool supportsRecoverToStableTimestamp(ServiceContext* serviceCtx) const override { + return false; + } + + boost::optional<Timestamp> getRecoveryTimestamp(ServiceContext* serviceCtx) const override { + return boost::none; + } + Status isAdminDbValid(OperationContext* opCtx) override { return isAdminDbValidFn(opCtx); }; diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index dae8ce8ea9b..6962ddfc2fd 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -366,7 +366,10 @@ Status SyncTail::syncApply(OperationContext* opCtx, } catch (ExceptionFor<ErrorCodes::NamespaceNotFound>& ex) { // Delete operations on non-existent namespaces can be treated as successful for // idempotency reasons. - if (opType == OpTypeEnum::kDelete) { + // During RECOVERING mode, we ignore NamespaceNotFound for all CRUD ops since + // storage does not wait for drops to be checkpointed (SERVER-33161). + if (opType == OpTypeEnum::kDelete || + oplogApplicationMode == OplogApplication::Mode::kRecovering) { return Status::OK(); } ex.addContext(str::stream() << "Failed to apply operation: " << redact(op)); diff --git a/src/mongo/db/storage/kv/kv_engine.h b/src/mongo/db/storage/kv/kv_engine.h index d02fc9e9955..4d5f65f3418 100644 --- a/src/mongo/db/storage/kv/kv_engine.h +++ b/src/mongo/db/storage/kv/kv_engine.h @@ -271,11 +271,18 @@ public: /** * See `StorageEngine::recoverToStableTimestamp` */ - virtual Status recoverToStableTimestamp() { + virtual StatusWith<Timestamp> recoverToStableTimestamp() { fassertFailed(50664); } /** + * See `StorageEngine::getRecoveryTimestamp` + */ + virtual boost::optional<Timestamp> getRecoveryTimestamp() const { + MONGO_UNREACHABLE; + } + + /** * See `StorageEngine::supportsReadConcernSnapshot` */ virtual bool supportsReadConcernSnapshot() const { diff --git a/src/mongo/db/storage/kv/kv_storage_engine.cpp b/src/mongo/db/storage/kv/kv_storage_engine.cpp index b7a64f30d94..19cbded2aec 100644 --- a/src/mongo/db/storage/kv/kv_storage_engine.cpp +++ b/src/mongo/db/storage/kv/kv_storage_engine.cpp @@ -587,10 +587,14 @@ bool KVStorageEngine::supportsRecoverToStableTimestamp() const { return _engine->supportsRecoverToStableTimestamp(); } -Status KVStorageEngine::recoverToStableTimestamp() { +StatusWith<Timestamp> KVStorageEngine::recoverToStableTimestamp() { return _engine->recoverToStableTimestamp(); } +boost::optional<Timestamp> KVStorageEngine::getRecoveryTimestamp() const { + return _engine->getRecoveryTimestamp(); +} + bool KVStorageEngine::supportsReadConcernSnapshot() const { return _engine->supportsReadConcernSnapshot(); } diff --git a/src/mongo/db/storage/kv/kv_storage_engine.h b/src/mongo/db/storage/kv/kv_storage_engine.h index 96f064d2ce5..6d1f5b3a489 100644 --- a/src/mongo/db/storage/kv/kv_storage_engine.h +++ b/src/mongo/db/storage/kv/kv_storage_engine.h @@ -121,7 +121,9 @@ public: virtual bool supportsRecoverToStableTimestamp() const override; - virtual Status recoverToStableTimestamp() override; + virtual StatusWith<Timestamp> recoverToStableTimestamp() override; + + virtual boost::optional<Timestamp> getRecoveryTimestamp() const override; bool supportsReadConcernSnapshot() const final; diff --git a/src/mongo/db/storage/storage_engine.h b/src/mongo/db/storage/storage_engine.h index cd881198f03..c3b02246013 100644 --- a/src/mongo/db/storage/storage_engine.h +++ b/src/mongo/db/storage/storage_engine.h @@ -320,8 +320,9 @@ public: * used should be one provided by StorageEngine::setStableTimestamp(). * * The "local" database is exempt and should not roll back any state except for - * "local.replset.minvalid" and "local.replset.checkpointTimestamp" which must roll back to - * the last stable timestamp. + * "local.replset.minvalid" which must roll back to the last stable timestamp. + * + * If successful, returns the timestamp that the storage engine recovered to. * * fasserts if StorageEngine::supportsRecoverToStableTimestamp() would return * false. Returns a bad status if there is no stable timestamp to recover to. @@ -329,11 +330,20 @@ public: * It is illegal to call this concurrently with `setStableTimestamp` or * `setInitialDataTimestamp`. */ - virtual Status recoverToStableTimestamp() { + virtual StatusWith<Timestamp> recoverToStableTimestamp() { fassertFailed(40547); } /** + * Returns the stable timestamp that the storage engine recovered to on startup. If the + * recovery point was not stable, returns "none". + * fasserts if StorageEngine::supportsRecoverToStableTimestamp() would return false. + */ + virtual boost::optional<Timestamp> getRecoveryTimestamp() const { + MONGO_UNREACHABLE; + } + + /** * Sets the highest timestamp at which the storage engine is allowed to take a checkpoint. * This timestamp can never decrease, and thus should be a timestamp that can never roll back. */ diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp index a365149462e..5f699c1ff48 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp @@ -1123,7 +1123,7 @@ bool WiredTigerKVEngine::supportsRecoverToStableTimestamp() const { return true; } -Status WiredTigerKVEngine::recoverToStableTimestamp() { +StatusWith<Timestamp> WiredTigerKVEngine::recoverToStableTimestamp() { if (!supportsRecoverToStableTimestamp()) { severe() << "WiredTiger is configured to not support recover to a stable timestamp"; fassertFailed(50665); @@ -1143,6 +1143,15 @@ Status WiredTigerKVEngine::recoverToStableTimestamp() { "WT does not support recover to stable timestamp yet."); } +boost::optional<Timestamp> WiredTigerKVEngine::getRecoveryTimestamp() const { + if (!supportsRecoverToStableTimestamp()) { + severe() << "WiredTiger is configured to not support recover to a stable timestamp"; + fassertFailed(50745); + } + + return boost::none; +} + bool WiredTigerKVEngine::supportsReadConcernSnapshot() const { return true; } diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h index 39e6c00a85a..0d25901d6fc 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h @@ -176,7 +176,9 @@ public: virtual bool supportsRecoverToStableTimestamp() const override; - virtual Status recoverToStableTimestamp() override; + virtual StatusWith<Timestamp> recoverToStableTimestamp() override; + + virtual boost::optional<Timestamp> getRecoveryTimestamp() const override; bool supportsReadConcernSnapshot() const final; |