diff options
-rw-r--r-- | src/mongo/db/repl/rollback_impl.cpp | 35 | ||||
-rw-r--r-- | src/mongo/db/repl/rollback_impl.h | 29 | ||||
-rw-r--r-- | src/mongo/db/repl/rollback_impl_test.cpp | 207 | ||||
-rw-r--r-- | src/mongo/db/repl/rollback_test_fixture.cpp | 15 | ||||
-rw-r--r-- | src/mongo/db/repl/rollback_test_fixture.h | 61 |
5 files changed, 240 insertions, 107 deletions
diff --git a/src/mongo/db/repl/rollback_impl.cpp b/src/mongo/db/repl/rollback_impl.cpp index cb332e44e16..31e913ac3a5 100644 --- a/src/mongo/db/repl/rollback_impl.cpp +++ b/src/mongo/db/repl/rollback_impl.cpp @@ -123,21 +123,6 @@ Status RollbackImpl::runRollback(OperationContext* opCtx) { if (!commonPointSW.isOK()) { return commonPointSW.getStatus(); } - - // During replication recovery, we truncate all oplog entries with timestamps greater than or - // equal to the oplog truncate after point. As a result, we must find the oplog entry after - // the common point so we do not truncate the common point itself. If we entered rollback, - // we are guaranteed to have at least one oplog entry after the common point. - Timestamp truncatePoint = _findTruncateTimestamp(opCtx, commonPointSW.getValue()); - - // Persist the truncate point to the 'oplogTruncateAfterPoint' document. We save this value so - // that the replication recovery logic knows where to truncate the oplog. Note that it must be - // saved *durably* in case a crash occurs after the storage engine recovers to the stable - // timestamp. Upon startup after such a crash, the standard replication recovery code will know - // where to truncate the oplog by observing the value of the 'oplogTruncateAfterPoint' document. - // Note that the storage engine timestamp recovery only restores the database *data* to a stable - // timestamp, but does not revert the oplog, which must be done as part of the rollback process. - _replicationProcess->getConsistencyMarkers()->setOplogTruncateAfterPoint(opCtx, truncatePoint); _listener->onCommonPointFound(commonPointSW.getValue().first.getTimestamp()); // Increment the Rollback ID of this node. The Rollback ID is a natural number that it is @@ -155,6 +140,26 @@ Status RollbackImpl::runRollback(OperationContext* opCtx) { } _listener->onRecoverToStableTimestamp(stableTimestampSW.getValue()); + // During replication recovery, we truncate all oplog entries with timestamps greater than or + // equal to the oplog truncate after point. As a result, we must find the oplog entry after + // the common point so we do not truncate the common point itself. If we entered rollback, + // we are guaranteed to have at least one oplog entry after the common point. + Timestamp truncatePoint = _findTruncateTimestamp(opCtx, commonPointSW.getValue()); + + // Persist the truncate point to the 'oplogTruncateAfterPoint' document. We save this value so + // that the replication recovery logic knows where to truncate the oplog. We save this value + // durably to match the behavior during startup recovery. This must occur after we successfully + // recover to a stable timestamp. If recovering to a stable timestamp fails and we still + // truncate the oplog then the oplog will not match the data files. If we crash at any earlier + // point, we will recover, find a new sync source, and restart roll back (if necessary on the + // new sync source). This is safe because a crash before this point would recover to a stable + // checkpoint anyways at or earlier than the stable timestamp. + // + // Note that storage engine timestamp recovery only restores the database *data* to a stable + // timestamp, but does not revert the oplog, which must be done as part of the rollback process. + _replicationProcess->getConsistencyMarkers()->setOplogTruncateAfterPoint(opCtx, truncatePoint); + _listener->onSetOplogTruncateAfterPoint(truncatePoint); + // Run the oplog recovery logic. status = _oplogRecovery(opCtx, stableTimestampSW.getValue()); if (!status.isOK()) { diff --git a/src/mongo/db/repl/rollback_impl.h b/src/mongo/db/repl/rollback_impl.h index aa086c1d708..d19da54cac6 100644 --- a/src/mongo/db/repl/rollback_impl.h +++ b/src/mongo/db/repl/rollback_impl.h @@ -64,19 +64,19 @@ class ReplicationProcess; * machinery. This class runs synchronously on the caller's thread. * * Order of actions: - * 1. Transition to ROLLBACK - * 2. Find the common point between the local and remote oplogs. - * a. Keep track of what is rolled back to provide a summary to the user - * b. Write rolled back documents to 'Rollback Files' - * 3. Increment the Rollback ID (RBID) - * 4. Write the common point as the 'OplogTruncateAfterPoint' - * 5. Tell the storage engine to recover to the last stable timestamp - * 6. Call recovery code - * a. Truncate the oplog at the common point + * 1. Transition to ROLLBACK. + * 2. Await background index completion. + * 3. Find the common point between the local and remote oplogs. + * a. Keep track of what is rolled back to provide a summary to the user. + * b. Write rolled back documents to 'Rollback Files'. + * 4. Increment the Rollback ID (RBID). + * 5. Tell the storage engine to recover to the last stable timestamp. + * 6. Write the oplog entry after the common point as the 'OplogTruncateAfterPoint'. + * 7. Call recovery code. + * a. Truncate the oplog at the common point. * b. Apply all oplog entries to the end of oplog. - * 7. Check the shard identity document for roll back - * 8. Clear the in-memory transaction table - * 9. Transition to SECONDARY + * 8. Trigger the on-rollback op observer. + * 9. Transition to SECONDARY. * * If the node crashes while in rollback and the storage engine has not recovered to the last * stable timestamp yet, then rollback will simply restart against the new sync source upon restart. @@ -118,6 +118,11 @@ public: virtual void onRecoverToStableTimestamp(Timestamp stableTimestamp) noexcept {} /** + * Function called after we set the oplog truncate after point. + */ + virtual void onSetOplogTruncateAfterPoint(Timestamp truncatePoint) noexcept {} + + /** * Function called after we recover from the oplog. */ virtual void onRecoverFromOplog() noexcept {} diff --git a/src/mongo/db/repl/rollback_impl_test.cpp b/src/mongo/db/repl/rollback_impl_test.cpp index bb060b1493a..bbfaf017204 100644 --- a/src/mongo/db/repl/rollback_impl_test.cpp +++ b/src/mongo/db/repl/rollback_impl_test.cpp @@ -47,59 +47,6 @@ using namespace mongo::repl; NamespaceString nss("local.oplog.rs"); -class StorageInterfaceRollback : public StorageInterfaceImpl { -public: - void setStableTimestamp(ServiceContext* serviceCtx, Timestamp snapshotName) override { - stdx::lock_guard<stdx::mutex> lock(_mutex); - _stableTimestamp = snapshotName; - } - - /** - * If '_recoverToTimestampStatus' is non-empty, returns it. If '_recoverToTimestampStatus' is - * empty, updates '_currTimestamp' to be equal to '_stableTimestamp' and returns the new value - * of '_currTimestamp'. - */ - StatusWith<Timestamp> recoverToStableTimestamp(ServiceContext* serviceCtx) override { - stdx::lock_guard<stdx::mutex> lock(_mutex); - if (_recoverToTimestampStatus) { - return _recoverToTimestampStatus.get(); - } else { - _currTimestamp = _stableTimestamp; - return _currTimestamp; - } - } - - void setRecoverToTimestampStatus(Status status) { - stdx::lock_guard<stdx::mutex> lock(_mutex); - _recoverToTimestampStatus = status; - } - - void setCurrentTimestamp(Timestamp ts) { - stdx::lock_guard<stdx::mutex> lock(_mutex); - _currTimestamp = ts; - } - - Timestamp getCurrentTimestamp() { - stdx::lock_guard<stdx::mutex> lock(_mutex); - return _currTimestamp; - } - -private: - mutable stdx::mutex _mutex; - - Timestamp _stableTimestamp; - - // Used to mock the behavior of 'recoverToStableTimestamp'. Upon calling - // 'recoverToStableTimestamp', the 'currTimestamp' should be set to the current - // '_stableTimestamp' value. Can be viewed as mock version of replication's 'lastApplied' - // optime. - Timestamp _currTimestamp; - - // A Status value which, if set, will be returned by the 'recoverToStableTimestamp' function, in - // order to simulate the error case for that function. Defaults to boost::none. - boost::optional<Status> _recoverToTimestampStatus = boost::none; -}; - /** * Unit test for rollback implementation introduced in 3.6. */ @@ -117,7 +64,6 @@ private: friend class RollbackImplTest::Listener; protected: - std::unique_ptr<StorageInterfaceRollback> _storageInterface; std::unique_ptr<OplogInterfaceLocal> _localOplog; std::unique_ptr<OplogInterfaceMock> _remoteOplog; std::unique_ptr<RollbackImpl> _rollback; @@ -140,6 +86,10 @@ protected: stdx::function<void(Timestamp commonPoint)> _onCommonPointFoundFn = [this](Timestamp commonPoint) { _commonPointFound = commonPoint; }; + Timestamp _truncatePoint; + stdx::function<void(Timestamp truncatePoint)> _onSetOplogTruncateAfterPointFn = + [this](Timestamp truncatePoint) { _truncatePoint = truncatePoint; }; + bool _triggeredOpObserver = false; stdx::function<void(const OpObserver::RollbackObserverInfo& rbInfo)> _onRollbackOpObserverFn = [this](const OpObserver::RollbackObserverInfo& rbInfo) {}; @@ -150,16 +100,13 @@ protected: void RollbackImplTest::setUp() { RollbackTest::setUp(); - // Set up test-specific storage interface. - _storageInterface = stdx::make_unique<StorageInterfaceRollback>(); - _localOplog = stdx::make_unique<OplogInterfaceLocal>(_opCtx.get(), NamespaceString::kRsOplogNamespace.ns()); _remoteOplog = stdx::make_unique<OplogInterfaceMock>(); _listener = stdx::make_unique<Listener>(this); _rollback = stdx::make_unique<RollbackImpl>(_localOplog.get(), _remoteOplog.get(), - _storageInterface.get(), + _storageInterface, _replicationProcess.get(), _coordinator, _listener.get()); @@ -191,11 +138,15 @@ public: _test->_onRecoverToStableTimestampFn(stableTimestamp); } + void onSetOplogTruncateAfterPoint(Timestamp truncatePoint) noexcept override { + _test->_onSetOplogTruncateAfterPointFn(truncatePoint); + } + void onRecoverFromOplog() noexcept override { _test->_onRecoverFromOplogFn(); } - void onRollbackOpObserver(const OpObserver::RollbackObserverInfo& rbInfo) noexcept { + void onRollbackOpObserver(const OpObserver::RollbackObserverInfo& rbInfo) noexcept override { _test->_onRollbackOpObserverFn(rbInfo); } @@ -207,6 +158,8 @@ private: * Helper functions to make simple oplog entries with timestamps, terms, and hashes. */ BSONObj makeOp(OpTime time, long long hash) { + const auto kGenericUUID = + unittest::assertGet(UUID::parse("b4c66a44-c1ca-4d86-8d25-12e82fa2de5b")); return BSON("ts" << time.getTimestamp() << "h" << hash << "t" << time.getTerm() << "op" << "i" << "o" @@ -214,7 +167,7 @@ BSONObj makeOp(OpTime time, long long hash) { << "ns" << "test.coll" << "ui" - << UUID::gen()); + << kGenericUUID); } BSONObj makeOp(int count) { @@ -238,6 +191,23 @@ OplogInterfaceMock::Operation makeOpAndRecordId(int count) { return makeOpAndRecordId(makeOp(count)); } +/** + * Asserts that the documents in the oplog have the given timestamps. + */ +void _assertDocsInOplog(OperationContext* opCtx, std::vector<int> timestamps) { + std::vector<BSONObj> expectedOplog(timestamps.size()); + std::transform(timestamps.begin(), timestamps.end(), expectedOplog.begin(), [](int ts) { + return makeOp(ts); + }); + + OplogInterfaceLocal oplog(opCtx, NamespaceString::kRsOplogNamespace.ns()); + auto iter = oplog.makeIterator(); + for (auto reverseIt = expectedOplog.rbegin(); reverseIt != expectedOplog.rend(); reverseIt++) { + ASSERT_BSONOBJ_EQ(*reverseIt, unittest::assertGet(iter->next()).first); + } + ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus()); +} + TEST_F(RollbackImplTest, TestFixtureSetUpInitializesStorageEngine) { auto serviceContext = _serviceContextMongoDTest.getServiceContext(); ASSERT_TRUE(serviceContext); @@ -282,16 +252,13 @@ TEST_F(RollbackImplTest, RollbackPersistsDocumentAfterCommonPointToOplogTruncate auto commonPoint = makeOpAndRecordId(2); _remoteOplog->setOperations({commonPoint}); ASSERT_OK(_insertOplogEntry(commonPoint.first)); + _storageInterface->setStableTimestamp(nullptr, Timestamp(2, 2)); auto nextTime = 3; ASSERT_OK(_insertOplogEntry(makeOp(nextTime))); ASSERT_OK(_rollback->runRollback(_opCtx.get())); - - // Check that the common point was saved. - auto truncateAfterPoint = - _replicationProcess->getConsistencyMarkers()->getOplogTruncateAfterPoint(_opCtx.get()); - ASSERT_EQUALS(Timestamp(nextTime, nextTime), truncateAfterPoint); + ASSERT_EQUALS(_truncatePoint, Timestamp(3, 3)); } TEST_F(RollbackImplTest, RollbackIncrementsRollbackID) { @@ -299,6 +266,7 @@ TEST_F(RollbackImplTest, RollbackIncrementsRollbackID) { _remoteOplog->setOperations({op}); ASSERT_OK(_insertOplogEntry(op.first)); ASSERT_OK(_insertOplogEntry(makeOp(2))); + _storageInterface->setStableTimestamp(nullptr, Timestamp(1, 1)); // Get the initial rollback id. int initRollbackId = _replicationProcess->getRollbackID(); @@ -317,8 +285,8 @@ TEST_F(RollbackImplTest, RollbackCallsRecoverToStableTimestamp) { ASSERT_OK(_insertOplogEntry(op.first)); ASSERT_OK(_insertOplogEntry(makeOp(2))); - auto stableTimestamp = Timestamp(10, 0); - auto currTimestamp = Timestamp(20, 0); + auto stableTimestamp = Timestamp(1, 1); + auto currTimestamp = Timestamp(2, 2); _storageInterface->setStableTimestamp(nullptr, stableTimestamp); _storageInterface->setCurrentTimestamp(currTimestamp); @@ -332,7 +300,7 @@ TEST_F(RollbackImplTest, RollbackCallsRecoverToStableTimestamp) { // Set the stable timestamp ahead to see that the current timestamp and the stable timestamp // we recovered to don't change. - auto newTimestamp = Timestamp(30, 0); + auto newTimestamp = Timestamp(3, 3); _storageInterface->setStableTimestamp(nullptr, newTimestamp); // Make sure "recover to timestamp" occurred by checking that the current timestamp was set back @@ -347,11 +315,16 @@ TEST_F(RollbackImplTest, RollbackReturnsBadStatusIfRecoverToStableTimestampFails ASSERT_OK(_insertOplogEntry(op.first)); ASSERT_OK(_insertOplogEntry(makeOp(2))); - auto stableTimestamp = Timestamp(10, 0); - auto currTimestamp = Timestamp(20, 0); + auto stableTimestamp = Timestamp(1, 1); + auto currTimestamp = Timestamp(2, 2); _storageInterface->setStableTimestamp(nullptr, stableTimestamp); _storageInterface->setCurrentTimestamp(currTimestamp); + _assertDocsInOplog(_opCtx.get(), {1, 2}); + auto truncateAfterPoint = + _replicationProcess->getConsistencyMarkers()->getOplogTruncateAfterPoint(_opCtx.get()); + ASSERT_EQUALS(Timestamp(), truncateAfterPoint); + // Make it so that the 'recoverToStableTimestamp' method will fail. auto recoverToTimestampStatus = Status(ErrorCodes::InternalError, "recoverToStableTimestamp failed."); @@ -371,6 +344,13 @@ TEST_F(RollbackImplTest, RollbackReturnsBadStatusIfRecoverToStableTimestampFails // Make sure we transitioned back to SECONDARY state. ASSERT_EQUALS(_coordinator->getMemberState(), MemberState::RS_SECONDARY); + + // Don't set the truncate after point if we fail early. + _assertDocsInOplog(_opCtx.get(), {1, 2}); + truncateAfterPoint = + _replicationProcess->getConsistencyMarkers()->getOplogTruncateAfterPoint(_opCtx.get()); + ASSERT_EQUALS(Timestamp(), truncateAfterPoint); + ASSERT_EQUALS(_truncatePoint, Timestamp()); } TEST_F(RollbackImplTest, RollbackReturnsBadStatusIfIncrementRollbackIDFails) { @@ -383,6 +363,11 @@ TEST_F(RollbackImplTest, RollbackReturnsBadStatusIfIncrementRollbackIDFails) { auto rollbackIdNss = NamespaceString(_storageInterface->kDefaultRollbackIdNamespace); ASSERT_OK(_storageInterface->dropCollection(_opCtx.get(), rollbackIdNss)); + _assertDocsInOplog(_opCtx.get(), {1, 2}); + auto truncateAfterPoint = + _replicationProcess->getConsistencyMarkers()->getOplogTruncateAfterPoint(_opCtx.get()); + ASSERT_EQUALS(Timestamp(), truncateAfterPoint); + // Run rollback. auto status = _rollback->runRollback(_opCtx.get()); @@ -392,6 +377,13 @@ TEST_F(RollbackImplTest, RollbackReturnsBadStatusIfIncrementRollbackIDFails) { // Make sure we transitioned back to SECONDARY state. ASSERT_EQUALS(_coordinator->getMemberState(), MemberState::RS_SECONDARY); + + // Don't set the truncate after point if we fail early. + _assertDocsInOplog(_opCtx.get(), {1, 2}); + truncateAfterPoint = + _replicationProcess->getConsistencyMarkers()->getOplogTruncateAfterPoint(_opCtx.get()); + ASSERT_EQUALS(Timestamp(), truncateAfterPoint); + ASSERT_EQUALS(_truncatePoint, Timestamp()); } TEST_F(RollbackImplTest, RollbackCallsRecoverFromOplog) { @@ -399,6 +391,7 @@ TEST_F(RollbackImplTest, RollbackCallsRecoverFromOplog) { _remoteOplog->setOperations({op}); ASSERT_OK(_insertOplogEntry(op.first)); ASSERT_OK(_insertOplogEntry(makeOp(2))); + _storageInterface->setStableTimestamp(nullptr, Timestamp(1, 1)); // Run rollback. ASSERT_OK(_rollback->runRollback(_opCtx.get())); @@ -407,12 +400,17 @@ TEST_F(RollbackImplTest, RollbackCallsRecoverFromOplog) { ASSERT(_recoveredFromOplog); } -TEST_F(RollbackImplTest, RollbackSkipsRecoverFromOplogWhenShutdownEarly) { +TEST_F(RollbackImplTest, RollbackSkipsRecoverFromOplogWhenShutdownDuringRTT) { auto op = makeOpAndRecordId(1); _remoteOplog->setOperations({op}); ASSERT_OK(_insertOplogEntry(op.first)); ASSERT_OK(_insertOplogEntry(makeOp(2))); + _assertDocsInOplog(_opCtx.get(), {1, 2}); + auto truncateAfterPoint = + _replicationProcess->getConsistencyMarkers()->getOplogTruncateAfterPoint(_opCtx.get()); + ASSERT_EQUALS(Timestamp(), truncateAfterPoint); + _onRecoverToStableTimestampFn = [this](Timestamp stableTimestamp) { _recoveredToStableTimestamp = true; _stableTimestamp = stableTimestamp; @@ -429,18 +427,77 @@ TEST_F(RollbackImplTest, RollbackSkipsRecoverFromOplogWhenShutdownEarly) { // Make sure we transitioned back to SECONDARY state. ASSERT_EQUALS(_coordinator->getMemberState(), MemberState::RS_SECONDARY); - ASSERT(_stableTimestamp.isNull()); + + // This shutdown occurs between setting the oplog truncate after point and truncating the oplog. + // This must be safe since it is no different than an untimely crash. + _assertDocsInOplog(_opCtx.get(), {1, 2}); + truncateAfterPoint = + _replicationProcess->getConsistencyMarkers()->getOplogTruncateAfterPoint(_opCtx.get()); + ASSERT_EQUALS(Timestamp(2, 2), truncateAfterPoint); + ASSERT_EQUALS(_truncatePoint, Timestamp(2, 2)); } -TEST_F(RollbackImplTest, RollbackSucceeds) { +TEST_F(RollbackImplTest, RollbackSkipsRecoverFromOplogWhenShutdownDuringSetTruncatePoint) { auto op = makeOpAndRecordId(1); _remoteOplog->setOperations({op}); ASSERT_OK(_insertOplogEntry(op.first)); ASSERT_OK(_insertOplogEntry(makeOp(2))); + _assertDocsInOplog(_opCtx.get(), {1, 2}); + auto truncateAfterPoint = + _replicationProcess->getConsistencyMarkers()->getOplogTruncateAfterPoint(_opCtx.get()); + ASSERT_EQUALS(Timestamp(), truncateAfterPoint); + _onSetOplogTruncateAfterPointFn = [this](Timestamp truncatePoint) { + _truncatePoint = truncatePoint; + _rollback->shutdown(); + }; + + _onRecoverToStableTimestampFn = [this](Timestamp stableTimestamp) { + _recoveredToStableTimestamp = true; + }; + + // Run rollback. + auto status = _rollback->runRollback(_opCtx.get()); + + // Make sure shutdown occurred before oplog recovery. + ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, _rollback->runRollback(_opCtx.get())); + ASSERT(_recoveredToStableTimestamp); + ASSERT_FALSE(_recoveredFromOplog); + + // Make sure we transitioned back to SECONDARY state. + ASSERT_EQUALS(_coordinator->getMemberState(), MemberState::RS_SECONDARY); + + // This shutdown occurs between setting the oplog truncate after point and truncating the oplog. + // This must be safe since it is no different than an untimely crash. + _assertDocsInOplog(_opCtx.get(), {1, 2}); + truncateAfterPoint = + _replicationProcess->getConsistencyMarkers()->getOplogTruncateAfterPoint(_opCtx.get()); + ASSERT_EQUALS(Timestamp(2, 2), truncateAfterPoint); + ASSERT_EQUALS(_truncatePoint, Timestamp(2, 2)); +} + +TEST_F(RollbackImplTest, RollbackSucceedsAndTruncatesOplog) { + auto op = makeOpAndRecordId(1); + _remoteOplog->setOperations({op}); + ASSERT_OK(_insertOplogEntry(op.first)); + ASSERT_OK(_insertOplogEntry(makeOp(2))); + _storageInterface->setStableTimestamp(nullptr, Timestamp(1, 1)); + + auto truncateAfterPoint = + _replicationProcess->getConsistencyMarkers()->getOplogTruncateAfterPoint(_opCtx.get()); + ASSERT_EQUALS(Timestamp(), truncateAfterPoint); + _assertDocsInOplog(_opCtx.get(), {1, 2}); + ASSERT_OK(_rollback->runRollback(_opCtx.get())); ASSERT_EQUALS(Timestamp(1, 1), _commonPointFound); + + // Clear truncate after point after truncation. + truncateAfterPoint = + _replicationProcess->getConsistencyMarkers()->getOplogTruncateAfterPoint(_opCtx.get()); + ASSERT_EQUALS(Timestamp(), truncateAfterPoint); + _assertDocsInOplog(_opCtx.get(), {1}); + ASSERT_EQUALS(_truncatePoint, Timestamp(2, 2)); } DEATH_TEST_F(RollbackImplTest, @@ -453,6 +510,7 @@ DEATH_TEST_F(RollbackImplTest, _remoteOplog->setOperations({op}); ASSERT_OK(_insertOplogEntry(op.first)); ASSERT_OK(_insertOplogEntry(makeOp(2))); + _storageInterface->setStableTimestamp(nullptr, Timestamp(1, 1)); auto status = _rollback->runRollback(_opCtx.get()); unittest::log() << "Mongod did not crash. Status: " << status; @@ -488,6 +546,7 @@ TEST_F(RollbackImplTest, RollbackSkipsTriggerOpObserverWhenShutDownEarly) { _remoteOplog->setOperations({op}); ASSERT_OK(_insertOplogEntry(op.first)); ASSERT_OK(_insertOplogEntry(makeOp(2))); + _storageInterface->setStableTimestamp(nullptr, Timestamp(1, 1)); ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, _rollback->runRollback(_opCtx.get())); ASSERT(_recoveredFromOplog); @@ -506,6 +565,8 @@ public: * process. */ Status rollbackOps(const OplogInterfaceMock::Operations& ops) { + _storageInterface->setStableTimestamp(nullptr, Timestamp(1, 1)); + auto commonOp = makeOpAndRecordId(1); _remoteOplog->setOperations({commonOp}); ASSERT_OK(_insertOplogEntry(commonOp.first)); diff --git a/src/mongo/db/repl/rollback_test_fixture.cpp b/src/mongo/db/repl/rollback_test_fixture.cpp index 03e2ea7300b..44993fe9b10 100644 --- a/src/mongo/db/repl/rollback_test_fixture.cpp +++ b/src/mongo/db/repl/rollback_test_fixture.cpp @@ -41,7 +41,7 @@ #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/repl/replication_process.h" -#include "mongo/db/repl/replication_recovery_mock.h" +#include "mongo/db/repl/replication_recovery.h" #include "mongo/db/repl/rs_rollback.h" #include "mongo/db/session_catalog.h" #include "mongo/logger/log_component.h" @@ -69,14 +69,17 @@ ReplSettings createReplSettings() { void RollbackTest::setUp() { _serviceContextMongoDTest.setUp(); + _storageInterface = new StorageInterfaceRollback(); auto serviceContext = _serviceContextMongoDTest.getServiceContext(); + auto consistencyMarkers = stdx::make_unique<ReplicationConsistencyMarkersMock>(); + auto recovery = + stdx::make_unique<ReplicationRecoveryImpl>(_storageInterface, consistencyMarkers.get()); _replicationProcess = stdx::make_unique<ReplicationProcess>( - &_storageInterface, - stdx::make_unique<ReplicationConsistencyMarkersMock>(), - stdx::make_unique<ReplicationRecoveryMock>()); - _dropPendingCollectionReaper = new DropPendingCollectionReaper(&_storageInterface); + _storageInterface, std::move(consistencyMarkers), std::move(recovery)); + _dropPendingCollectionReaper = new DropPendingCollectionReaper(_storageInterface); DropPendingCollectionReaper::set( serviceContext, std::unique_ptr<DropPendingCollectionReaper>(_dropPendingCollectionReaper)); + StorageInterface::set(serviceContext, std::unique_ptr<StorageInterface>(_storageInterface)); _coordinator = new ReplicationCoordinatorRollbackMock(serviceContext); ReplicationCoordinator::set(serviceContext, std::unique_ptr<ReplicationCoordinator>(_coordinator)); @@ -199,7 +202,7 @@ Collection* RollbackTest::_createCollection(OperationContext* opCtx, Status RollbackTest::_insertOplogEntry(const BSONObj& doc) { TimestampedBSONObj obj; obj.obj = doc; - return _storageInterface.insertDocument( + return _storageInterface->insertDocument( _opCtx.get(), NamespaceString::kRsOplogNamespace, obj, 0); } diff --git a/src/mongo/db/repl/rollback_test_fixture.h b/src/mongo/db/repl/rollback_test_fixture.h index 932f327914b..63c1fc6295f 100644 --- a/src/mongo/db/repl/rollback_test_fixture.h +++ b/src/mongo/db/repl/rollback_test_fixture.h @@ -112,7 +112,9 @@ protected: class ReplicationCoordinatorRollbackMock; ReplicationCoordinatorRollbackMock* _coordinator = nullptr; - StorageInterfaceImpl _storageInterface; + class StorageInterfaceRollback; + StorageInterfaceRollback* _storageInterface = nullptr; + ReplicationRecovery* _recovery; // ReplicationProcess used to access consistency markers. std::unique_ptr<ReplicationProcess> _replicationProcess; @@ -121,6 +123,63 @@ protected: DropPendingCollectionReaper* _dropPendingCollectionReaper = nullptr; }; +class RollbackTest::StorageInterfaceRollback : public StorageInterfaceImpl { +public: + void setStableTimestamp(ServiceContext* serviceCtx, Timestamp snapshotName) override { + stdx::lock_guard<stdx::mutex> lock(_mutex); + _stableTimestamp = snapshotName; + } + + /** + * If '_recoverToTimestampStatus' is non-empty, returns it. If '_recoverToTimestampStatus' is + * empty, updates '_currTimestamp' to be equal to '_stableTimestamp' and returns the new value + * of '_currTimestamp'. + */ + StatusWith<Timestamp> recoverToStableTimestamp(ServiceContext* serviceCtx) override { + stdx::lock_guard<stdx::mutex> lock(_mutex); + if (_recoverToTimestampStatus) { + return _recoverToTimestampStatus.get(); + } else { + _currTimestamp = _stableTimestamp; + return _currTimestamp; + } + } + + bool supportsRecoverToStableTimestamp(ServiceContext* serviceCtx) const override { + return true; + } + + void setRecoverToTimestampStatus(Status status) { + stdx::lock_guard<stdx::mutex> lock(_mutex); + _recoverToTimestampStatus = status; + } + + void setCurrentTimestamp(Timestamp ts) { + stdx::lock_guard<stdx::mutex> lock(_mutex); + _currTimestamp = ts; + } + + Timestamp getCurrentTimestamp() { + stdx::lock_guard<stdx::mutex> lock(_mutex); + return _currTimestamp; + } + +private: + mutable stdx::mutex _mutex; + + Timestamp _stableTimestamp; + + // Used to mock the behavior of 'recoverToStableTimestamp'. Upon calling + // 'recoverToStableTimestamp', the 'currTimestamp' should be set to the current + // '_stableTimestamp' value. Can be viewed as mock version of replication's 'lastApplied' + // optime. + Timestamp _currTimestamp; + + // A Status value which, if set, will be returned by the 'recoverToStableTimestamp' function, in + // order to simulate the error case for that function. Defaults to boost::none. + boost::optional<Status> _recoverToTimestampStatus = boost::none; +}; + /** * ReplicationCoordinator mock implementation for rollback tests. */ |