summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/repl/rollback_impl.cpp35
-rw-r--r--src/mongo/db/repl/rollback_impl.h29
-rw-r--r--src/mongo/db/repl/rollback_impl_test.cpp207
-rw-r--r--src/mongo/db/repl/rollback_test_fixture.cpp15
-rw-r--r--src/mongo/db/repl/rollback_test_fixture.h61
5 files changed, 240 insertions, 107 deletions
diff --git a/src/mongo/db/repl/rollback_impl.cpp b/src/mongo/db/repl/rollback_impl.cpp
index cb332e44e16..31e913ac3a5 100644
--- a/src/mongo/db/repl/rollback_impl.cpp
+++ b/src/mongo/db/repl/rollback_impl.cpp
@@ -123,21 +123,6 @@ Status RollbackImpl::runRollback(OperationContext* opCtx) {
if (!commonPointSW.isOK()) {
return commonPointSW.getStatus();
}
-
- // During replication recovery, we truncate all oplog entries with timestamps greater than or
- // equal to the oplog truncate after point. As a result, we must find the oplog entry after
- // the common point so we do not truncate the common point itself. If we entered rollback,
- // we are guaranteed to have at least one oplog entry after the common point.
- Timestamp truncatePoint = _findTruncateTimestamp(opCtx, commonPointSW.getValue());
-
- // Persist the truncate point to the 'oplogTruncateAfterPoint' document. We save this value so
- // that the replication recovery logic knows where to truncate the oplog. Note that it must be
- // saved *durably* in case a crash occurs after the storage engine recovers to the stable
- // timestamp. Upon startup after such a crash, the standard replication recovery code will know
- // where to truncate the oplog by observing the value of the 'oplogTruncateAfterPoint' document.
- // Note that the storage engine timestamp recovery only restores the database *data* to a stable
- // timestamp, but does not revert the oplog, which must be done as part of the rollback process.
- _replicationProcess->getConsistencyMarkers()->setOplogTruncateAfterPoint(opCtx, truncatePoint);
_listener->onCommonPointFound(commonPointSW.getValue().first.getTimestamp());
// Increment the Rollback ID of this node. The Rollback ID is a natural number that it is
@@ -155,6 +140,26 @@ Status RollbackImpl::runRollback(OperationContext* opCtx) {
}
_listener->onRecoverToStableTimestamp(stableTimestampSW.getValue());
+ // During replication recovery, we truncate all oplog entries with timestamps greater than or
+ // equal to the oplog truncate after point. As a result, we must find the oplog entry after
+ // the common point so we do not truncate the common point itself. If we entered rollback,
+ // we are guaranteed to have at least one oplog entry after the common point.
+ Timestamp truncatePoint = _findTruncateTimestamp(opCtx, commonPointSW.getValue());
+
+ // Persist the truncate point to the 'oplogTruncateAfterPoint' document. We save this value so
+ // that the replication recovery logic knows where to truncate the oplog. We save this value
+ // durably to match the behavior during startup recovery. This must occur after we successfully
+ // recover to a stable timestamp. If recovering to a stable timestamp fails and we still
+ // truncate the oplog then the oplog will not match the data files. If we crash at any earlier
+ // point, we will recover, find a new sync source, and restart roll back (if necessary on the
+ // new sync source). This is safe because a crash before this point would recover to a stable
+ // checkpoint anyways at or earlier than the stable timestamp.
+ //
+ // Note that storage engine timestamp recovery only restores the database *data* to a stable
+ // timestamp, but does not revert the oplog, which must be done as part of the rollback process.
+ _replicationProcess->getConsistencyMarkers()->setOplogTruncateAfterPoint(opCtx, truncatePoint);
+ _listener->onSetOplogTruncateAfterPoint(truncatePoint);
+
// Run the oplog recovery logic.
status = _oplogRecovery(opCtx, stableTimestampSW.getValue());
if (!status.isOK()) {
diff --git a/src/mongo/db/repl/rollback_impl.h b/src/mongo/db/repl/rollback_impl.h
index aa086c1d708..d19da54cac6 100644
--- a/src/mongo/db/repl/rollback_impl.h
+++ b/src/mongo/db/repl/rollback_impl.h
@@ -64,19 +64,19 @@ class ReplicationProcess;
* machinery. This class runs synchronously on the caller's thread.
*
* Order of actions:
- * 1. Transition to ROLLBACK
- * 2. Find the common point between the local and remote oplogs.
- * a. Keep track of what is rolled back to provide a summary to the user
- * b. Write rolled back documents to 'Rollback Files'
- * 3. Increment the Rollback ID (RBID)
- * 4. Write the common point as the 'OplogTruncateAfterPoint'
- * 5. Tell the storage engine to recover to the last stable timestamp
- * 6. Call recovery code
- * a. Truncate the oplog at the common point
+ * 1. Transition to ROLLBACK.
+ * 2. Await background index completion.
+ * 3. Find the common point between the local and remote oplogs.
+ * a. Keep track of what is rolled back to provide a summary to the user.
+ * b. Write rolled back documents to 'Rollback Files'.
+ * 4. Increment the Rollback ID (RBID).
+ * 5. Tell the storage engine to recover to the last stable timestamp.
+ * 6. Write the oplog entry after the common point as the 'OplogTruncateAfterPoint'.
+ * 7. Call recovery code.
+ * a. Truncate the oplog at the common point.
* b. Apply all oplog entries to the end of oplog.
- * 7. Check the shard identity document for roll back
- * 8. Clear the in-memory transaction table
- * 9. Transition to SECONDARY
+ * 8. Trigger the on-rollback op observer.
+ * 9. Transition to SECONDARY.
*
* If the node crashes while in rollback and the storage engine has not recovered to the last
* stable timestamp yet, then rollback will simply restart against the new sync source upon restart.
@@ -118,6 +118,11 @@ public:
virtual void onRecoverToStableTimestamp(Timestamp stableTimestamp) noexcept {}
/**
+ * Function called after we set the oplog truncate after point.
+ */
+ virtual void onSetOplogTruncateAfterPoint(Timestamp truncatePoint) noexcept {}
+
+ /**
* Function called after we recover from the oplog.
*/
virtual void onRecoverFromOplog() noexcept {}
diff --git a/src/mongo/db/repl/rollback_impl_test.cpp b/src/mongo/db/repl/rollback_impl_test.cpp
index bb060b1493a..bbfaf017204 100644
--- a/src/mongo/db/repl/rollback_impl_test.cpp
+++ b/src/mongo/db/repl/rollback_impl_test.cpp
@@ -47,59 +47,6 @@ using namespace mongo::repl;
NamespaceString nss("local.oplog.rs");
-class StorageInterfaceRollback : public StorageInterfaceImpl {
-public:
- void setStableTimestamp(ServiceContext* serviceCtx, Timestamp snapshotName) override {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- _stableTimestamp = snapshotName;
- }
-
- /**
- * If '_recoverToTimestampStatus' is non-empty, returns it. If '_recoverToTimestampStatus' is
- * empty, updates '_currTimestamp' to be equal to '_stableTimestamp' and returns the new value
- * of '_currTimestamp'.
- */
- StatusWith<Timestamp> recoverToStableTimestamp(ServiceContext* serviceCtx) override {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- if (_recoverToTimestampStatus) {
- return _recoverToTimestampStatus.get();
- } else {
- _currTimestamp = _stableTimestamp;
- return _currTimestamp;
- }
- }
-
- void setRecoverToTimestampStatus(Status status) {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- _recoverToTimestampStatus = status;
- }
-
- void setCurrentTimestamp(Timestamp ts) {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- _currTimestamp = ts;
- }
-
- Timestamp getCurrentTimestamp() {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- return _currTimestamp;
- }
-
-private:
- mutable stdx::mutex _mutex;
-
- Timestamp _stableTimestamp;
-
- // Used to mock the behavior of 'recoverToStableTimestamp'. Upon calling
- // 'recoverToStableTimestamp', the 'currTimestamp' should be set to the current
- // '_stableTimestamp' value. Can be viewed as mock version of replication's 'lastApplied'
- // optime.
- Timestamp _currTimestamp;
-
- // A Status value which, if set, will be returned by the 'recoverToStableTimestamp' function, in
- // order to simulate the error case for that function. Defaults to boost::none.
- boost::optional<Status> _recoverToTimestampStatus = boost::none;
-};
-
/**
* Unit test for rollback implementation introduced in 3.6.
*/
@@ -117,7 +64,6 @@ private:
friend class RollbackImplTest::Listener;
protected:
- std::unique_ptr<StorageInterfaceRollback> _storageInterface;
std::unique_ptr<OplogInterfaceLocal> _localOplog;
std::unique_ptr<OplogInterfaceMock> _remoteOplog;
std::unique_ptr<RollbackImpl> _rollback;
@@ -140,6 +86,10 @@ protected:
stdx::function<void(Timestamp commonPoint)> _onCommonPointFoundFn =
[this](Timestamp commonPoint) { _commonPointFound = commonPoint; };
+ Timestamp _truncatePoint;
+ stdx::function<void(Timestamp truncatePoint)> _onSetOplogTruncateAfterPointFn =
+ [this](Timestamp truncatePoint) { _truncatePoint = truncatePoint; };
+
bool _triggeredOpObserver = false;
stdx::function<void(const OpObserver::RollbackObserverInfo& rbInfo)> _onRollbackOpObserverFn =
[this](const OpObserver::RollbackObserverInfo& rbInfo) {};
@@ -150,16 +100,13 @@ protected:
void RollbackImplTest::setUp() {
RollbackTest::setUp();
- // Set up test-specific storage interface.
- _storageInterface = stdx::make_unique<StorageInterfaceRollback>();
-
_localOplog = stdx::make_unique<OplogInterfaceLocal>(_opCtx.get(),
NamespaceString::kRsOplogNamespace.ns());
_remoteOplog = stdx::make_unique<OplogInterfaceMock>();
_listener = stdx::make_unique<Listener>(this);
_rollback = stdx::make_unique<RollbackImpl>(_localOplog.get(),
_remoteOplog.get(),
- _storageInterface.get(),
+ _storageInterface,
_replicationProcess.get(),
_coordinator,
_listener.get());
@@ -191,11 +138,15 @@ public:
_test->_onRecoverToStableTimestampFn(stableTimestamp);
}
+ void onSetOplogTruncateAfterPoint(Timestamp truncatePoint) noexcept override {
+ _test->_onSetOplogTruncateAfterPointFn(truncatePoint);
+ }
+
void onRecoverFromOplog() noexcept override {
_test->_onRecoverFromOplogFn();
}
- void onRollbackOpObserver(const OpObserver::RollbackObserverInfo& rbInfo) noexcept {
+ void onRollbackOpObserver(const OpObserver::RollbackObserverInfo& rbInfo) noexcept override {
_test->_onRollbackOpObserverFn(rbInfo);
}
@@ -207,6 +158,8 @@ private:
* Helper functions to make simple oplog entries with timestamps, terms, and hashes.
*/
BSONObj makeOp(OpTime time, long long hash) {
+ const auto kGenericUUID =
+ unittest::assertGet(UUID::parse("b4c66a44-c1ca-4d86-8d25-12e82fa2de5b"));
return BSON("ts" << time.getTimestamp() << "h" << hash << "t" << time.getTerm() << "op"
<< "i"
<< "o"
@@ -214,7 +167,7 @@ BSONObj makeOp(OpTime time, long long hash) {
<< "ns"
<< "test.coll"
<< "ui"
- << UUID::gen());
+ << kGenericUUID);
}
BSONObj makeOp(int count) {
@@ -238,6 +191,23 @@ OplogInterfaceMock::Operation makeOpAndRecordId(int count) {
return makeOpAndRecordId(makeOp(count));
}
+/**
+ * Asserts that the documents in the oplog have the given timestamps.
+ */
+void _assertDocsInOplog(OperationContext* opCtx, std::vector<int> timestamps) {
+ std::vector<BSONObj> expectedOplog(timestamps.size());
+ std::transform(timestamps.begin(), timestamps.end(), expectedOplog.begin(), [](int ts) {
+ return makeOp(ts);
+ });
+
+ OplogInterfaceLocal oplog(opCtx, NamespaceString::kRsOplogNamespace.ns());
+ auto iter = oplog.makeIterator();
+ for (auto reverseIt = expectedOplog.rbegin(); reverseIt != expectedOplog.rend(); reverseIt++) {
+ ASSERT_BSONOBJ_EQ(*reverseIt, unittest::assertGet(iter->next()).first);
+ }
+ ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus());
+}
+
TEST_F(RollbackImplTest, TestFixtureSetUpInitializesStorageEngine) {
auto serviceContext = _serviceContextMongoDTest.getServiceContext();
ASSERT_TRUE(serviceContext);
@@ -282,16 +252,13 @@ TEST_F(RollbackImplTest, RollbackPersistsDocumentAfterCommonPointToOplogTruncate
auto commonPoint = makeOpAndRecordId(2);
_remoteOplog->setOperations({commonPoint});
ASSERT_OK(_insertOplogEntry(commonPoint.first));
+ _storageInterface->setStableTimestamp(nullptr, Timestamp(2, 2));
auto nextTime = 3;
ASSERT_OK(_insertOplogEntry(makeOp(nextTime)));
ASSERT_OK(_rollback->runRollback(_opCtx.get()));
-
- // Check that the common point was saved.
- auto truncateAfterPoint =
- _replicationProcess->getConsistencyMarkers()->getOplogTruncateAfterPoint(_opCtx.get());
- ASSERT_EQUALS(Timestamp(nextTime, nextTime), truncateAfterPoint);
+ ASSERT_EQUALS(_truncatePoint, Timestamp(3, 3));
}
TEST_F(RollbackImplTest, RollbackIncrementsRollbackID) {
@@ -299,6 +266,7 @@ TEST_F(RollbackImplTest, RollbackIncrementsRollbackID) {
_remoteOplog->setOperations({op});
ASSERT_OK(_insertOplogEntry(op.first));
ASSERT_OK(_insertOplogEntry(makeOp(2)));
+ _storageInterface->setStableTimestamp(nullptr, Timestamp(1, 1));
// Get the initial rollback id.
int initRollbackId = _replicationProcess->getRollbackID();
@@ -317,8 +285,8 @@ TEST_F(RollbackImplTest, RollbackCallsRecoverToStableTimestamp) {
ASSERT_OK(_insertOplogEntry(op.first));
ASSERT_OK(_insertOplogEntry(makeOp(2)));
- auto stableTimestamp = Timestamp(10, 0);
- auto currTimestamp = Timestamp(20, 0);
+ auto stableTimestamp = Timestamp(1, 1);
+ auto currTimestamp = Timestamp(2, 2);
_storageInterface->setStableTimestamp(nullptr, stableTimestamp);
_storageInterface->setCurrentTimestamp(currTimestamp);
@@ -332,7 +300,7 @@ TEST_F(RollbackImplTest, RollbackCallsRecoverToStableTimestamp) {
// Set the stable timestamp ahead to see that the current timestamp and the stable timestamp
// we recovered to don't change.
- auto newTimestamp = Timestamp(30, 0);
+ auto newTimestamp = Timestamp(3, 3);
_storageInterface->setStableTimestamp(nullptr, newTimestamp);
// Make sure "recover to timestamp" occurred by checking that the current timestamp was set back
@@ -347,11 +315,16 @@ TEST_F(RollbackImplTest, RollbackReturnsBadStatusIfRecoverToStableTimestampFails
ASSERT_OK(_insertOplogEntry(op.first));
ASSERT_OK(_insertOplogEntry(makeOp(2)));
- auto stableTimestamp = Timestamp(10, 0);
- auto currTimestamp = Timestamp(20, 0);
+ auto stableTimestamp = Timestamp(1, 1);
+ auto currTimestamp = Timestamp(2, 2);
_storageInterface->setStableTimestamp(nullptr, stableTimestamp);
_storageInterface->setCurrentTimestamp(currTimestamp);
+ _assertDocsInOplog(_opCtx.get(), {1, 2});
+ auto truncateAfterPoint =
+ _replicationProcess->getConsistencyMarkers()->getOplogTruncateAfterPoint(_opCtx.get());
+ ASSERT_EQUALS(Timestamp(), truncateAfterPoint);
+
// Make it so that the 'recoverToStableTimestamp' method will fail.
auto recoverToTimestampStatus =
Status(ErrorCodes::InternalError, "recoverToStableTimestamp failed.");
@@ -371,6 +344,13 @@ TEST_F(RollbackImplTest, RollbackReturnsBadStatusIfRecoverToStableTimestampFails
// Make sure we transitioned back to SECONDARY state.
ASSERT_EQUALS(_coordinator->getMemberState(), MemberState::RS_SECONDARY);
+
+ // Don't set the truncate after point if we fail early.
+ _assertDocsInOplog(_opCtx.get(), {1, 2});
+ truncateAfterPoint =
+ _replicationProcess->getConsistencyMarkers()->getOplogTruncateAfterPoint(_opCtx.get());
+ ASSERT_EQUALS(Timestamp(), truncateAfterPoint);
+ ASSERT_EQUALS(_truncatePoint, Timestamp());
}
TEST_F(RollbackImplTest, RollbackReturnsBadStatusIfIncrementRollbackIDFails) {
@@ -383,6 +363,11 @@ TEST_F(RollbackImplTest, RollbackReturnsBadStatusIfIncrementRollbackIDFails) {
auto rollbackIdNss = NamespaceString(_storageInterface->kDefaultRollbackIdNamespace);
ASSERT_OK(_storageInterface->dropCollection(_opCtx.get(), rollbackIdNss));
+ _assertDocsInOplog(_opCtx.get(), {1, 2});
+ auto truncateAfterPoint =
+ _replicationProcess->getConsistencyMarkers()->getOplogTruncateAfterPoint(_opCtx.get());
+ ASSERT_EQUALS(Timestamp(), truncateAfterPoint);
+
// Run rollback.
auto status = _rollback->runRollback(_opCtx.get());
@@ -392,6 +377,13 @@ TEST_F(RollbackImplTest, RollbackReturnsBadStatusIfIncrementRollbackIDFails) {
// Make sure we transitioned back to SECONDARY state.
ASSERT_EQUALS(_coordinator->getMemberState(), MemberState::RS_SECONDARY);
+
+ // Don't set the truncate after point if we fail early.
+ _assertDocsInOplog(_opCtx.get(), {1, 2});
+ truncateAfterPoint =
+ _replicationProcess->getConsistencyMarkers()->getOplogTruncateAfterPoint(_opCtx.get());
+ ASSERT_EQUALS(Timestamp(), truncateAfterPoint);
+ ASSERT_EQUALS(_truncatePoint, Timestamp());
}
TEST_F(RollbackImplTest, RollbackCallsRecoverFromOplog) {
@@ -399,6 +391,7 @@ TEST_F(RollbackImplTest, RollbackCallsRecoverFromOplog) {
_remoteOplog->setOperations({op});
ASSERT_OK(_insertOplogEntry(op.first));
ASSERT_OK(_insertOplogEntry(makeOp(2)));
+ _storageInterface->setStableTimestamp(nullptr, Timestamp(1, 1));
// Run rollback.
ASSERT_OK(_rollback->runRollback(_opCtx.get()));
@@ -407,12 +400,17 @@ TEST_F(RollbackImplTest, RollbackCallsRecoverFromOplog) {
ASSERT(_recoveredFromOplog);
}
-TEST_F(RollbackImplTest, RollbackSkipsRecoverFromOplogWhenShutdownEarly) {
+TEST_F(RollbackImplTest, RollbackSkipsRecoverFromOplogWhenShutdownDuringRTT) {
auto op = makeOpAndRecordId(1);
_remoteOplog->setOperations({op});
ASSERT_OK(_insertOplogEntry(op.first));
ASSERT_OK(_insertOplogEntry(makeOp(2)));
+ _assertDocsInOplog(_opCtx.get(), {1, 2});
+ auto truncateAfterPoint =
+ _replicationProcess->getConsistencyMarkers()->getOplogTruncateAfterPoint(_opCtx.get());
+ ASSERT_EQUALS(Timestamp(), truncateAfterPoint);
+
_onRecoverToStableTimestampFn = [this](Timestamp stableTimestamp) {
_recoveredToStableTimestamp = true;
_stableTimestamp = stableTimestamp;
@@ -429,18 +427,77 @@ TEST_F(RollbackImplTest, RollbackSkipsRecoverFromOplogWhenShutdownEarly) {
// Make sure we transitioned back to SECONDARY state.
ASSERT_EQUALS(_coordinator->getMemberState(), MemberState::RS_SECONDARY);
-
ASSERT(_stableTimestamp.isNull());
+
+ // This shutdown occurs between setting the oplog truncate after point and truncating the oplog.
+ // This must be safe since it is no different than an untimely crash.
+ _assertDocsInOplog(_opCtx.get(), {1, 2});
+ truncateAfterPoint =
+ _replicationProcess->getConsistencyMarkers()->getOplogTruncateAfterPoint(_opCtx.get());
+ ASSERT_EQUALS(Timestamp(2, 2), truncateAfterPoint);
+ ASSERT_EQUALS(_truncatePoint, Timestamp(2, 2));
}
-TEST_F(RollbackImplTest, RollbackSucceeds) {
+TEST_F(RollbackImplTest, RollbackSkipsRecoverFromOplogWhenShutdownDuringSetTruncatePoint) {
auto op = makeOpAndRecordId(1);
_remoteOplog->setOperations({op});
ASSERT_OK(_insertOplogEntry(op.first));
ASSERT_OK(_insertOplogEntry(makeOp(2)));
+ _assertDocsInOplog(_opCtx.get(), {1, 2});
+ auto truncateAfterPoint =
+ _replicationProcess->getConsistencyMarkers()->getOplogTruncateAfterPoint(_opCtx.get());
+ ASSERT_EQUALS(Timestamp(), truncateAfterPoint);
+ _onSetOplogTruncateAfterPointFn = [this](Timestamp truncatePoint) {
+ _truncatePoint = truncatePoint;
+ _rollback->shutdown();
+ };
+
+ _onRecoverToStableTimestampFn = [this](Timestamp stableTimestamp) {
+ _recoveredToStableTimestamp = true;
+ };
+
+ // Run rollback.
+ auto status = _rollback->runRollback(_opCtx.get());
+
+ // Make sure shutdown occurred before oplog recovery.
+ ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, _rollback->runRollback(_opCtx.get()));
+ ASSERT(_recoveredToStableTimestamp);
+ ASSERT_FALSE(_recoveredFromOplog);
+
+ // Make sure we transitioned back to SECONDARY state.
+ ASSERT_EQUALS(_coordinator->getMemberState(), MemberState::RS_SECONDARY);
+
+ // This shutdown occurs between setting the oplog truncate after point and truncating the oplog.
+ // This must be safe since it is no different than an untimely crash.
+ _assertDocsInOplog(_opCtx.get(), {1, 2});
+ truncateAfterPoint =
+ _replicationProcess->getConsistencyMarkers()->getOplogTruncateAfterPoint(_opCtx.get());
+ ASSERT_EQUALS(Timestamp(2, 2), truncateAfterPoint);
+ ASSERT_EQUALS(_truncatePoint, Timestamp(2, 2));
+}
+
+TEST_F(RollbackImplTest, RollbackSucceedsAndTruncatesOplog) {
+ auto op = makeOpAndRecordId(1);
+ _remoteOplog->setOperations({op});
+ ASSERT_OK(_insertOplogEntry(op.first));
+ ASSERT_OK(_insertOplogEntry(makeOp(2)));
+ _storageInterface->setStableTimestamp(nullptr, Timestamp(1, 1));
+
+ auto truncateAfterPoint =
+ _replicationProcess->getConsistencyMarkers()->getOplogTruncateAfterPoint(_opCtx.get());
+ ASSERT_EQUALS(Timestamp(), truncateAfterPoint);
+ _assertDocsInOplog(_opCtx.get(), {1, 2});
+
ASSERT_OK(_rollback->runRollback(_opCtx.get()));
ASSERT_EQUALS(Timestamp(1, 1), _commonPointFound);
+
+ // Clear truncate after point after truncation.
+ truncateAfterPoint =
+ _replicationProcess->getConsistencyMarkers()->getOplogTruncateAfterPoint(_opCtx.get());
+ ASSERT_EQUALS(Timestamp(), truncateAfterPoint);
+ _assertDocsInOplog(_opCtx.get(), {1});
+ ASSERT_EQUALS(_truncatePoint, Timestamp(2, 2));
}
DEATH_TEST_F(RollbackImplTest,
@@ -453,6 +510,7 @@ DEATH_TEST_F(RollbackImplTest,
_remoteOplog->setOperations({op});
ASSERT_OK(_insertOplogEntry(op.first));
ASSERT_OK(_insertOplogEntry(makeOp(2)));
+ _storageInterface->setStableTimestamp(nullptr, Timestamp(1, 1));
auto status = _rollback->runRollback(_opCtx.get());
unittest::log() << "Mongod did not crash. Status: " << status;
@@ -488,6 +546,7 @@ TEST_F(RollbackImplTest, RollbackSkipsTriggerOpObserverWhenShutDownEarly) {
_remoteOplog->setOperations({op});
ASSERT_OK(_insertOplogEntry(op.first));
ASSERT_OK(_insertOplogEntry(makeOp(2)));
+ _storageInterface->setStableTimestamp(nullptr, Timestamp(1, 1));
ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, _rollback->runRollback(_opCtx.get()));
ASSERT(_recoveredFromOplog);
@@ -506,6 +565,8 @@ public:
* process.
*/
Status rollbackOps(const OplogInterfaceMock::Operations& ops) {
+ _storageInterface->setStableTimestamp(nullptr, Timestamp(1, 1));
+
auto commonOp = makeOpAndRecordId(1);
_remoteOplog->setOperations({commonOp});
ASSERT_OK(_insertOplogEntry(commonOp.first));
diff --git a/src/mongo/db/repl/rollback_test_fixture.cpp b/src/mongo/db/repl/rollback_test_fixture.cpp
index 03e2ea7300b..44993fe9b10 100644
--- a/src/mongo/db/repl/rollback_test_fixture.cpp
+++ b/src/mongo/db/repl/rollback_test_fixture.cpp
@@ -41,7 +41,7 @@
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/repl/replication_process.h"
-#include "mongo/db/repl/replication_recovery_mock.h"
+#include "mongo/db/repl/replication_recovery.h"
#include "mongo/db/repl/rs_rollback.h"
#include "mongo/db/session_catalog.h"
#include "mongo/logger/log_component.h"
@@ -69,14 +69,17 @@ ReplSettings createReplSettings() {
void RollbackTest::setUp() {
_serviceContextMongoDTest.setUp();
+ _storageInterface = new StorageInterfaceRollback();
auto serviceContext = _serviceContextMongoDTest.getServiceContext();
+ auto consistencyMarkers = stdx::make_unique<ReplicationConsistencyMarkersMock>();
+ auto recovery =
+ stdx::make_unique<ReplicationRecoveryImpl>(_storageInterface, consistencyMarkers.get());
_replicationProcess = stdx::make_unique<ReplicationProcess>(
- &_storageInterface,
- stdx::make_unique<ReplicationConsistencyMarkersMock>(),
- stdx::make_unique<ReplicationRecoveryMock>());
- _dropPendingCollectionReaper = new DropPendingCollectionReaper(&_storageInterface);
+ _storageInterface, std::move(consistencyMarkers), std::move(recovery));
+ _dropPendingCollectionReaper = new DropPendingCollectionReaper(_storageInterface);
DropPendingCollectionReaper::set(
serviceContext, std::unique_ptr<DropPendingCollectionReaper>(_dropPendingCollectionReaper));
+ StorageInterface::set(serviceContext, std::unique_ptr<StorageInterface>(_storageInterface));
_coordinator = new ReplicationCoordinatorRollbackMock(serviceContext);
ReplicationCoordinator::set(serviceContext,
std::unique_ptr<ReplicationCoordinator>(_coordinator));
@@ -199,7 +202,7 @@ Collection* RollbackTest::_createCollection(OperationContext* opCtx,
Status RollbackTest::_insertOplogEntry(const BSONObj& doc) {
TimestampedBSONObj obj;
obj.obj = doc;
- return _storageInterface.insertDocument(
+ return _storageInterface->insertDocument(
_opCtx.get(), NamespaceString::kRsOplogNamespace, obj, 0);
}
diff --git a/src/mongo/db/repl/rollback_test_fixture.h b/src/mongo/db/repl/rollback_test_fixture.h
index 932f327914b..63c1fc6295f 100644
--- a/src/mongo/db/repl/rollback_test_fixture.h
+++ b/src/mongo/db/repl/rollback_test_fixture.h
@@ -112,7 +112,9 @@ protected:
class ReplicationCoordinatorRollbackMock;
ReplicationCoordinatorRollbackMock* _coordinator = nullptr;
- StorageInterfaceImpl _storageInterface;
+ class StorageInterfaceRollback;
+ StorageInterfaceRollback* _storageInterface = nullptr;
+ ReplicationRecovery* _recovery;
// ReplicationProcess used to access consistency markers.
std::unique_ptr<ReplicationProcess> _replicationProcess;
@@ -121,6 +123,63 @@ protected:
DropPendingCollectionReaper* _dropPendingCollectionReaper = nullptr;
};
+class RollbackTest::StorageInterfaceRollback : public StorageInterfaceImpl {
+public:
+ void setStableTimestamp(ServiceContext* serviceCtx, Timestamp snapshotName) override {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ _stableTimestamp = snapshotName;
+ }
+
+ /**
+ * If '_recoverToTimestampStatus' is non-empty, returns it. If '_recoverToTimestampStatus' is
+ * empty, updates '_currTimestamp' to be equal to '_stableTimestamp' and returns the new value
+ * of '_currTimestamp'.
+ */
+ StatusWith<Timestamp> recoverToStableTimestamp(ServiceContext* serviceCtx) override {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ if (_recoverToTimestampStatus) {
+ return _recoverToTimestampStatus.get();
+ } else {
+ _currTimestamp = _stableTimestamp;
+ return _currTimestamp;
+ }
+ }
+
+ bool supportsRecoverToStableTimestamp(ServiceContext* serviceCtx) const override {
+ return true;
+ }
+
+ void setRecoverToTimestampStatus(Status status) {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ _recoverToTimestampStatus = status;
+ }
+
+ void setCurrentTimestamp(Timestamp ts) {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ _currTimestamp = ts;
+ }
+
+ Timestamp getCurrentTimestamp() {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ return _currTimestamp;
+ }
+
+private:
+ mutable stdx::mutex _mutex;
+
+ Timestamp _stableTimestamp;
+
+ // Used to mock the behavior of 'recoverToStableTimestamp'. Upon calling
+ // 'recoverToStableTimestamp', the 'currTimestamp' should be set to the current
+ // '_stableTimestamp' value. Can be viewed as mock version of replication's 'lastApplied'
+ // optime.
+ Timestamp _currTimestamp;
+
+ // A Status value which, if set, will be returned by the 'recoverToStableTimestamp' function, in
+ // order to simulate the error case for that function. Defaults to boost::none.
+ boost::optional<Status> _recoverToTimestampStatus = boost::none;
+};
+
/**
* ReplicationCoordinator mock implementation for rollback tests.
*/