summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJudah Schvimer <judah@mongodb.com>2017-08-10 10:36:27 -0400
committerJudah Schvimer <judah@mongodb.com>2017-08-10 10:44:22 -0400
commitd8eda8b631928bd78e71d06f20f39b6611b908a4 (patch)
treeb637cc2fedc89e38cb2067799b34688c19738661
parent9c246267785dba72660fc03f5767b9d663e8b94a (diff)
downloadmongo-d8eda8b631928bd78e71d06f20f39b6611b908a4.tar.gz
SERVER-29893 Rename recovery code and make it accessible to both startup and rollback
-rw-r--r--src/mongo/db/db.cpp8
-rw-r--r--src/mongo/db/repl/SConscript29
-rw-r--r--src/mongo/db/repl/initial_syncer_test.cpp5
-rw-r--r--src/mongo/db/repl/oplog.cpp50
-rw-r--r--src/mongo/db/repl/oplog.h5
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state.h7
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp83
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.h1
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.h1
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator_test_fixture.cpp10
-rw-r--r--src/mongo/db/repl/replication_process.cpp8
-rw-r--r--src/mongo/db/repl/replication_process.h11
-rw-r--r--src/mongo/db/repl/replication_process_test.cpp28
-rw-r--r--src/mongo/db/repl/replication_recovery.cpp200
-rw-r--r--src/mongo/db/repl/replication_recovery.h83
-rw-r--r--src/mongo/db/repl/replication_recovery_mock.h48
-rw-r--r--src/mongo/db/repl/replication_recovery_test.cpp350
-rw-r--r--src/mongo/db/repl/rollback_test_fixture.cpp5
-rw-r--r--src/mongo/db/repl/sync_tail_test_fixture.cpp7
-rw-r--r--src/mongo/dbtests/replica_set_tests.cpp8
-rw-r--r--src/mongo/s/sharding_mongod_test_fixture.cpp10
23 files changed, 784 insertions, 177 deletions
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp
index c5497433a2f..27e599f0510 100644
--- a/src/mongo/db/db.cpp
+++ b/src/mongo/db/db.cpp
@@ -94,6 +94,7 @@
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/repl/replication_coordinator_impl.h"
#include "mongo/db/repl/replication_process.h"
+#include "mongo/db/repl/replication_recovery.h"
#include "mongo/db/repl/storage_interface_impl.h"
#include "mongo/db/repl/topology_coordinator_impl.h"
#include "mongo/db/s/balancer/balancer.h"
@@ -919,11 +920,14 @@ MONGO_INITIALIZER_WITH_PREREQUISITES(CreateReplicationManager,
repl::StorageInterface::set(serviceContext, stdx::make_unique<repl::StorageInterfaceImpl>());
auto storageInterface = repl::StorageInterface::get(serviceContext);
+ auto consistencyMarkers =
+ stdx::make_unique<repl::ReplicationConsistencyMarkersImpl>(storageInterface);
+ auto recovery = stdx::make_unique<repl::ReplicationRecoveryImpl>(storageInterface,
+ consistencyMarkers.get());
repl::ReplicationProcess::set(
serviceContext,
stdx::make_unique<repl::ReplicationProcess>(
- storageInterface,
- stdx::make_unique<repl::ReplicationConsistencyMarkersImpl>(storageInterface)));
+ storageInterface, std::move(consistencyMarkers), std::move(recovery)));
auto replicationProcess = repl::ReplicationProcess::get(serviceContext);
repl::DropPendingCollectionReaper::set(
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 2653b607ab4..0877b12cf65 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -223,6 +223,34 @@ env.CppUnitTest(
)
env.Library(
+ target='replication_recovery',
+ source=[
+ 'replication_recovery.cpp',
+ ],
+ LIBDEPS=[
+ ],
+ LIBDEPS_PRIVATE=[
+ 'sync_tail',
+ '$BUILD_DIR/mongo/base',
+ ],
+)
+
+env.CppUnitTest(
+ target='replication_recovery_test',
+ source=[
+ 'replication_recovery_test.cpp',
+ ],
+ LIBDEPS=[
+ ],
+ LIBDEPS_PRIVATE=[
+ 'replmocks',
+ 'replication_recovery',
+ 'storage_interface_impl',
+ '$BUILD_DIR/mongo/db/service_context_d_test_fixture',
+ ],
+)
+
+env.Library(
target='replication_process',
source=[
'replication_consistency_markers.cpp',
@@ -1534,6 +1562,7 @@ env.Library(
'repl_settings',
'replication_consistency_markers_impl',
'replication_process',
+ 'replication_recovery',
'rollback_source_impl',
'sync_tail',
],
diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp
index ddce6686acb..033af4a4ad6 100644
--- a/src/mongo/db/repl/initial_syncer_test.cpp
+++ b/src/mongo/db/repl/initial_syncer_test.cpp
@@ -45,6 +45,7 @@
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/replication_consistency_markers_mock.h"
#include "mongo/db/repl/replication_process.h"
+#include "mongo/db/repl/replication_recovery_mock.h"
#include "mongo/db/repl/reporter.h"
#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/repl/storage_interface_mock.h"
@@ -285,7 +286,9 @@ protected:
launchExecutorThread();
_replicationProcess = stdx::make_unique<ReplicationProcess>(
- _storageInterface.get(), stdx::make_unique<ReplicationConsistencyMarkersMock>());
+ _storageInterface.get(),
+ stdx::make_unique<ReplicationConsistencyMarkersMock>(),
+ stdx::make_unique<ReplicationRecoveryMock>());
_executorProxy = stdx::make_unique<TaskExecutorMock>(&getExecutor());
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index ffb85a0bdf0..0e7f873fcfd 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -364,56 +364,6 @@ OplogDocWriter _logOpWriter(OperationContext* opCtx,
}
} // end anon namespace
-// Truncates the oplog after and including the "truncateTimestamp" entry.
-void truncateOplogTo(OperationContext* opCtx, Timestamp truncateTimestamp) {
- const NamespaceString oplogNss(NamespaceString::kRsOplogNamespace);
- AutoGetDb autoDb(opCtx, oplogNss.db(), MODE_IX);
- Lock::CollectionLock oplogCollectionLoc(opCtx->lockState(), oplogNss.ns(), MODE_X);
- Collection* oplogCollection = autoDb.getDb()->getCollection(opCtx, oplogNss);
- if (!oplogCollection) {
- fassertFailedWithStatusNoTrace(
- 34418,
- Status(ErrorCodes::NamespaceNotFound,
- str::stream() << "Can't find " << NamespaceString::kRsOplogNamespace.ns()));
- }
-
- // Scan through oplog in reverse, from latest entry to first, to find the truncateTimestamp.
- RecordId oldestIDToDelete; // Non-null if there is something to delete.
- auto oplogRs = oplogCollection->getRecordStore();
- auto oplogReverseCursor = oplogRs->getCursor(opCtx, /*forward=*/false);
- size_t count = 0;
- while (auto next = oplogReverseCursor->next()) {
- const BSONObj entry = next->data.releaseToBson();
- const RecordId id = next->id;
- count++;
-
- const auto tsElem = entry["ts"];
- if (count == 1) {
- if (tsElem.eoo())
- LOG(2) << "Oplog tail entry: " << redact(entry);
- else
- LOG(2) << "Oplog tail entry ts field: " << tsElem;
- }
-
- if (tsElem.timestamp() < truncateTimestamp) {
- // If count == 1, that means that we have nothing to delete because everything in the
- // oplog is < truncateTimestamp.
- if (count != 1) {
- invariant(!oldestIDToDelete.isNull());
- oplogCollection->cappedTruncateAfter(opCtx, oldestIDToDelete, /*inclusive=*/true);
- }
- return;
- }
-
- oldestIDToDelete = id;
- }
-
- severe() << "Reached end of oplog looking for oplog entry before "
- << truncateTimestamp.toStringPretty()
- << " but couldn't find any after looking through " << count << " entries.";
- fassertFailedNoTrace(40296);
-}
-
/* we write to local.oplog.rs:
{ ts : ..., h: ..., v: ..., op: ..., etc }
ts: an OpTime timestamp
diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h
index f3974984514..f119534ad7a 100644
--- a/src/mongo/db/repl/oplog.h
+++ b/src/mongo/db/repl/oplog.h
@@ -50,11 +50,6 @@ namespace repl {
class ReplSettings;
/**
- * Truncates the oplog after, and including, the "truncateTimestamp" entry.
- */
-void truncateOplogTo(OperationContext* opCtx, Timestamp truncateTimestamp);
-
-/**
* Create a new capped collection for the oplog if it doesn't yet exist.
* If the collection already exists (and isReplSet is false),
* set the 'last' Timestamp from the last entry of the oplog collection (side effect!)
diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h
index 81d217dac06..617c92cb7fa 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state.h
@@ -204,13 +204,6 @@ public:
virtual StatusWith<OpTime> loadLastOpTime(OperationContext* opCtx) = 0;
/**
- * Cleaning up the oplog, by potentially truncating:
- * If we are recovering from a failed batch then minvalid.start though minvalid.end need
- * to be removed from the oplog before we can start applying operations.
- */
- virtual void cleanUpLastApplyBatch(OperationContext* opCtx) = 0;
-
- /**
* Returns the HostAndPort of the remote client connected to us that initiated the operation
* represented by "opCtx".
*/
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 93dd61ad3ef..f396126c7de 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -569,89 +569,6 @@ void ReplicationCoordinatorExternalStateImpl::setGlobalTimestamp(ServiceContext*
setNewTimestamp(ctx, newTime);
}
-void ReplicationCoordinatorExternalStateImpl::cleanUpLastApplyBatch(OperationContext* opCtx) {
- if (_replicationProcess->getConsistencyMarkers()->getInitialSyncFlag(opCtx)) {
- return; // Initial Sync will take over so no cleanup is needed.
- }
-
- const auto truncateAfterPoint =
- _replicationProcess->getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx);
- const auto appliedThrough =
- _replicationProcess->getConsistencyMarkers()->getAppliedThrough(opCtx);
-
- const bool needToDeleteEndOfOplog = !truncateAfterPoint.isNull() &&
- // This version should never have a non-null truncateAfterPoint with a null appliedThrough.
- // This scenario means that we downgraded after unclean shutdown, then the downgraded node
- // deleted the ragged end of our oplog, then did a clean shutdown.
- !appliedThrough.isNull() &&
- // Similarly we should never have an appliedThrough higher than the truncateAfterPoint. This
- // means that the downgraded node deleted our ragged end then applied ahead of our
- // truncateAfterPoint and then had an unclean shutdown before upgrading. We are ok with
- // applying these ops because older versions wrote to the oplog from a single thread so we
- // know they are in order.
- !(appliedThrough.getTimestamp() >= truncateAfterPoint);
- if (needToDeleteEndOfOplog) {
- log() << "Removing unapplied entries starting at: " << truncateAfterPoint;
- truncateOplogTo(opCtx, truncateAfterPoint);
- }
- _replicationProcess->getConsistencyMarkers()->setOplogTruncateAfterPoint(
- opCtx, {}); // clear the truncateAfterPoint
-
- if (appliedThrough.isNull()) {
- // No follow-up work to do.
- return;
- }
-
- // Check if we have any unapplied ops in our oplog. It is important that this is done after
- // deleting the ragged end of the oplog.
- const auto topOfOplog = fassertStatusOK(40290, loadLastOpTime(opCtx));
- if (appliedThrough == topOfOplog) {
- return; // We've applied all the valid oplog we have.
- } else if (appliedThrough > topOfOplog) {
- severe() << "Applied op " << appliedThrough << " not found. Top of oplog is " << topOfOplog
- << '.';
- fassertFailedNoTrace(40313);
- }
-
- log() << "Replaying stored operations from " << appliedThrough << " (exclusive) to "
- << topOfOplog << " (inclusive).";
-
- DBDirectClient db(opCtx);
- auto cursor = db.query(NamespaceString::kRsOplogNamespace.ns(),
- QUERY("ts" << BSON("$gte" << appliedThrough.getTimestamp())),
- /*batchSize*/ 0,
- /*skip*/ 0,
- /*projection*/ nullptr,
- QueryOption_OplogReplay);
-
- // Check that the first document matches our appliedThrough point then skip it since it's
- // already been applied.
- if (!cursor->more()) {
- // This should really be impossible because we check above that the top of the oplog is
- // strictly > appliedThrough. If this fails it represents a serious bug in either the
- // storage engine or query's implementation of OplogReplay.
- severe() << "Couldn't find any entries in the oplog >= " << appliedThrough
- << " which should be impossible.";
- fassertFailedNoTrace(40293);
- }
- auto firstOpTimeFound = fassertStatusOK(40291, OpTime::parseFromOplogEntry(cursor->nextSafe()));
- if (firstOpTimeFound != appliedThrough) {
- severe() << "Oplog entry at " << appliedThrough << " is missing; actual entry found is "
- << firstOpTimeFound;
- fassertFailedNoTrace(40292);
- }
-
- // Apply remaining ops one at at time, but don't log them because they are already logged.
- UnreplicatedWritesBlock uwb(opCtx);
-
- while (cursor->more()) {
- auto entry = cursor->nextSafe();
- fassertStatusOK(40294, SyncTail::syncApply(opCtx, entry, true));
- _replicationProcess->getConsistencyMarkers()->setAppliedThrough(
- opCtx, fassertStatusOK(40295, OpTime::parseFromOplogEntry(entry)));
- }
-}
-
StatusWith<OpTime> ReplicationCoordinatorExternalStateImpl::loadLastOpTime(
OperationContext* opCtx) {
// TODO: handle WriteConflictExceptions below
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
index 68318f01455..0893bdc16bd 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
@@ -92,7 +92,6 @@ public:
virtual Status storeLocalLastVoteDocument(OperationContext* opCtx, const LastVote& lastVote);
virtual void setGlobalTimestamp(ServiceContext* service, const Timestamp& newTime);
virtual StatusWith<OpTime> loadLastOpTime(OperationContext* opCtx);
- virtual void cleanUpLastApplyBatch(OperationContext* opCtx);
virtual HostAndPort getClientHostAndPort(const OperationContext* opCtx);
virtual void closeConnections();
virtual void killAllUserOperations(OperationContext* opCtx);
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
index 524db05c86a..1f544138fcc 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
@@ -166,8 +166,6 @@ void ReplicationCoordinatorExternalStateMock::setLocalLastVoteDocument(
void ReplicationCoordinatorExternalStateMock::setGlobalTimestamp(ServiceContext* service,
const Timestamp& newTime) {}
-void ReplicationCoordinatorExternalStateMock::cleanUpLastApplyBatch(OperationContext* opCtx) {}
-
StatusWith<OpTime> ReplicationCoordinatorExternalStateMock::loadLastOpTime(
OperationContext* opCtx) {
return _lastOpTime;
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
index 5f6143ce329..c18c8c8fc16 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
@@ -80,7 +80,6 @@ public:
virtual Status storeLocalLastVoteDocument(OperationContext* opCtx, const LastVote& lastVote);
virtual void setGlobalTimestamp(ServiceContext* service, const Timestamp& newTime);
virtual StatusWith<OpTime> loadLastOpTime(OperationContext* opCtx);
- virtual void cleanUpLastApplyBatch(OperationContext* opCtx);
virtual void closeConnections();
virtual void killAllUserOperations(OperationContext* opCtx);
virtual void shardingOnStepDownHook();
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index a3fd2fe8ddd..2ea7ee7b3fb 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -447,7 +447,7 @@ bool ReplicationCoordinatorImpl::_startLoadLocalConfig(OperationContext* opCtx)
}
// Read the last op from the oplog after cleaning up any partially applied batches.
- _externalState->cleanUpLastApplyBatch(opCtx);
+ _replicationProcess->getReplicationRecovery()->recoverFromOplog(opCtx);
auto lastOpTimeStatus = _externalState->loadLastOpTime(opCtx);
// Use a callback here, because _finishLoadLocalConfig calls isself() which requires
diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
index 5e8eac78138..ffc5af1de2d 100644
--- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
+++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
@@ -42,6 +42,7 @@
#include "mongo/db/repl/replication_coordinator_external_state_mock.h"
#include "mongo/db/repl/replication_coordinator_impl.h"
#include "mongo/db/repl/replication_process.h"
+#include "mongo/db/repl/replication_recovery_mock.h"
#include "mongo/db/repl/storage_interface_mock.h"
#include "mongo/db/repl/topology_coordinator_impl.h"
#include "mongo/executor/network_interface_mock.h"
@@ -125,10 +126,11 @@ void ReplCoordTest::init() {
StorageInterface::set(service, std::unique_ptr<StorageInterface>(_storageInterface));
ASSERT_TRUE(_storageInterface == StorageInterface::get(service));
- ReplicationProcess::set(
- service,
- stdx::make_unique<ReplicationProcess>(
- _storageInterface, stdx::make_unique<ReplicationConsistencyMarkersMock>()));
+ ReplicationProcess::set(service,
+ stdx::make_unique<ReplicationProcess>(
+ _storageInterface,
+ stdx::make_unique<ReplicationConsistencyMarkersMock>(),
+ stdx::make_unique<ReplicationRecoveryMock>()));
auto replicationProcess = ReplicationProcess::get(service);
// PRNG seed for tests.
diff --git a/src/mongo/db/repl/replication_process.cpp b/src/mongo/db/repl/replication_process.cpp
index 7e7dc077b11..cde7fcdbae9 100644
--- a/src/mongo/db/repl/replication_process.cpp
+++ b/src/mongo/db/repl/replication_process.cpp
@@ -82,9 +82,11 @@ void ReplicationProcess::set(ServiceContext* service, std::unique_ptr<Replicatio
ReplicationProcess::ReplicationProcess(
StorageInterface* storageInterface,
- std::unique_ptr<ReplicationConsistencyMarkers> consistencyMarkers)
+ std::unique_ptr<ReplicationConsistencyMarkers> consistencyMarkers,
+ std::unique_ptr<ReplicationRecovery> recovery)
: _storageInterface(storageInterface),
_consistencyMarkers(std::move(consistencyMarkers)),
+ _recovery(std::move(recovery)),
_rbid(kUninitializedRollbackId) {}
StatusWith<int> ReplicationProcess::getRollbackID(OperationContext* opCtx) {
@@ -182,5 +184,9 @@ ReplicationConsistencyMarkers* ReplicationProcess::getConsistencyMarkers() {
return _consistencyMarkers.get();
}
+ReplicationRecovery* ReplicationProcess::getReplicationRecovery() {
+ return _recovery.get();
+}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/replication_process.h b/src/mongo/db/repl/replication_process.h
index da4a2886c6a..5b6229ac6d0 100644
--- a/src/mongo/db/repl/replication_process.h
+++ b/src/mongo/db/repl/replication_process.h
@@ -37,6 +37,7 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/replication_consistency_markers.h"
+#include "mongo/db/repl/replication_recovery.h"
#include "mongo/stdx/mutex.h"
namespace mongo {
@@ -81,7 +82,8 @@ public:
static void set(ServiceContext* service, std::unique_ptr<ReplicationProcess> process);
ReplicationProcess(StorageInterface* storageInterface,
- std::unique_ptr<ReplicationConsistencyMarkers> consistencyMarkers);
+ std::unique_ptr<ReplicationConsistencyMarkers> consistencyMarkers,
+ std::unique_ptr<ReplicationRecovery> recovery);
virtual ~ReplicationProcess() = default;
/**
@@ -134,6 +136,11 @@ public:
*/
ReplicationConsistencyMarkers* getConsistencyMarkers();
+ /**
+ * Returns an object used to recover from the oplog on startup or rollback.
+ */
+ ReplicationRecovery* getReplicationRecovery();
+
private:
// All member variables are labeled with one of the following codes indicating the
// synchronization rules for accessing them.
@@ -151,6 +158,8 @@ private:
// Used for operations on documents that maintain replication consistency.
std::unique_ptr<ReplicationConsistencyMarkers> _consistencyMarkers; // (S)
+ std::unique_ptr<ReplicationRecovery> _recovery; // (S)
+
// Rollback ID. This is a cached copy of the persisted value in the local.system.rollback.id
// collection.
int _rbid; // (M)
diff --git a/src/mongo/db/repl/replication_process_test.cpp b/src/mongo/db/repl/replication_process_test.cpp
index 8464b54906f..40a51312433 100644
--- a/src/mongo/db/repl/replication_process_test.cpp
+++ b/src/mongo/db/repl/replication_process_test.cpp
@@ -37,6 +37,7 @@
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/repl/replication_process.h"
+#include "mongo/db/repl/replication_recovery_mock.h"
#include "mongo/db/repl/storage_interface_impl.h"
#include "mongo/db/repl/storage_interface_mock.h"
#include "mongo/db/service_context.h"
@@ -78,7 +79,8 @@ TEST_F(ReplicationProcessTest, ServiceContextDecorator) {
ASSERT_FALSE(ReplicationProcess::get(serviceContext));
ReplicationProcess* replicationProcess = new ReplicationProcess(
_storageInterface.get(),
- stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()));
+ stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()),
+ stdx::make_unique<ReplicationRecoveryMock>());
ReplicationProcess::set(serviceContext,
std::unique_ptr<ReplicationProcess>(replicationProcess));
ASSERT_TRUE(replicationProcess == ReplicationProcess::get(serviceContext));
@@ -90,7 +92,8 @@ TEST_F(ReplicationProcessTest,
GetRollbackProgressReturnsNoSuchKeyIfDocumentWithIdProgressIsNotFound) {
ReplicationProcess replicationProcess(
_storageInterface.get(),
- stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()));
+ stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()),
+ stdx::make_unique<ReplicationRecoveryMock>());
// Collection is not found.
auto opCtx = makeOpCtx();
@@ -126,7 +129,8 @@ TEST_F(ReplicationProcessTest, GetRollbackProgressReturnsBadStatusIfApplyUntilFi
ReplicationProcess replicationProcess(
_storageInterface.get(),
- stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()));
+ stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()),
+ stdx::make_unique<ReplicationRecoveryMock>());
ASSERT_EQUALS(ErrorCodes::TypeMismatch, replicationProcess.getRollbackProgress(opCtx.get()));
}
@@ -147,7 +151,8 @@ TEST_F(ReplicationProcessTest,
ReplicationProcess replicationProcess(
_storageInterface.get(),
- stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()));
+ stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()),
+ stdx::make_unique<ReplicationRecoveryMock>());
ASSERT_EQUALS(ErrorCodes::TypeMismatch, replicationProcess.getRollbackProgress(opCtx.get()));
}
@@ -166,7 +171,8 @@ TEST_F(ReplicationProcessTest,
ReplicationProcess replicationProcess(
_storageInterface.get(),
- stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()));
+ stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()),
+ stdx::make_unique<ReplicationRecoveryMock>());
ASSERT_EQUALS(applyUntil,
unittest::assertGet(replicationProcess.getRollbackProgress(opCtx.get())));
@@ -181,7 +187,8 @@ TEST_F(ReplicationProcessTest,
auto opCtx = makeOpCtx();
ReplicationProcess replicationProcess(
_storageInterface.get(),
- stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()));
+ stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()),
+ stdx::make_unique<ReplicationRecoveryMock>());
ASSERT_OK(replicationProcess.setRollbackProgress(opCtx.get(), applyUntil));
ASSERT_EQUALS(1U,
unittest::assertGet(_storageInterface->getCollectionCount(
@@ -199,7 +206,8 @@ TEST_F(ReplicationProcessTest,
auto opCtx = makeOpCtx();
ReplicationProcess replicationProcess(
_storageInterface.get(),
- stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()));
+ stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()),
+ stdx::make_unique<ReplicationRecoveryMock>());
ASSERT_EQUALS(ErrorCodes::IllegalOperation,
replicationProcess.setRollbackProgress(opCtx.get(), applyUntil));
}
@@ -208,7 +216,8 @@ TEST_F(ReplicationProcessTest, ClearRollbackProgressReturnsSuccessIfCollectionDo
auto opCtx = makeOpCtx();
ReplicationProcess replicationProcess(
_storageInterface.get(),
- stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()));
+ stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()),
+ stdx::make_unique<ReplicationRecoveryMock>());
ASSERT_OK(replicationProcess.clearRollbackProgress(opCtx.get()));
}
@@ -220,7 +229,8 @@ TEST_F(ReplicationProcessTest,
auto opCtx = makeOpCtx();
ReplicationProcess replicationProcess(
_storageInterface.get(),
- stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()));
+ stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()),
+ stdx::make_unique<ReplicationRecoveryMock>());
ASSERT_EQUALS(ErrorCodes::IllegalOperation,
replicationProcess.clearRollbackProgress(opCtx.get()));
}
diff --git a/src/mongo/db/repl/replication_recovery.cpp b/src/mongo/db/repl/replication_recovery.cpp
new file mode 100644
index 00000000000..214e0c3cf7d
--- /dev/null
+++ b/src/mongo/db/repl/replication_recovery.cpp
@@ -0,0 +1,200 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/repl/replication_recovery.h"
+
+#include "mongo/db/db_raii.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/repl/replication_consistency_markers_impl.h"
+#include "mongo/db/repl/storage_interface.h"
+#include "mongo/db/repl/sync_tail.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+namespace repl {
+
+ReplicationRecoveryImpl::ReplicationRecoveryImpl(StorageInterface* storageInterface,
+ ReplicationConsistencyMarkers* consistencyMarkers)
+ : _storageInterface(storageInterface), _consistencyMarkers(consistencyMarkers) {}
+
+void ReplicationRecoveryImpl::recoverFromOplog(OperationContext* opCtx) {
+ if (_consistencyMarkers->getInitialSyncFlag(opCtx)) {
+ log() << "No recovery needed. Initial sync flag set.";
+ return; // Initial Sync will take over so no cleanup is needed.
+ }
+
+ const auto truncateAfterPoint = _consistencyMarkers->getOplogTruncateAfterPoint(opCtx);
+ const auto appliedThrough = _consistencyMarkers->getAppliedThrough(opCtx);
+
+ const bool needToDeleteEndOfOplog = !truncateAfterPoint.isNull() &&
+ // This version should never have a non-null truncateAfterPoint with a null appliedThrough.
+ // This scenario means that we downgraded after unclean shutdown, then the downgraded node
+ // deleted the ragged end of our oplog, then did a clean shutdown.
+ !appliedThrough.isNull() &&
+ // Similarly we should never have an appliedThrough higher than the truncateAfterPoint. This
+ // means that the downgraded node deleted our ragged end then applied ahead of our
+ // truncateAfterPoint and then had an unclean shutdown before upgrading. We are ok with
+ // applying these ops because older versions wrote to the oplog from a single thread so we
+ // know they are in order.
+ !(appliedThrough.getTimestamp() >= truncateAfterPoint);
+ if (needToDeleteEndOfOplog) {
+ log() << "Removing unapplied entries starting at: " << truncateAfterPoint.toBSON();
+ _truncateOplogTo(opCtx, truncateAfterPoint);
+ }
+ _consistencyMarkers->setOplogTruncateAfterPoint(opCtx, {}); // clear the truncateAfterPoint
+
+ if (appliedThrough.isNull()) {
+ log() << "No oplog entries to apply for recovery. appliedThrough is null.";
+ // No follow-up work to do.
+ return;
+ }
+
+ // Check if we have any unapplied ops in our oplog. It is important that this is done after
+ // deleting the ragged end of the oplog.
+ const auto topOfOplog = fassertStatusOK(40290, _getLastAppliedOpTime(opCtx));
+ if (appliedThrough == topOfOplog) {
+ log()
+ << "No oplog entries to apply for recovery. appliedThrough is at the top of the oplog.";
+ return; // We've applied all the valid oplog we have.
+ } else if (appliedThrough > topOfOplog) {
+ severe() << "Applied op " << appliedThrough << " not found. Top of oplog is " << topOfOplog
+ << '.';
+ fassertFailedNoTrace(40313);
+ }
+
+ log() << "Replaying stored operations from " << appliedThrough << " (exclusive) to "
+ << topOfOplog << " (inclusive).";
+
+ DBDirectClient db(opCtx);
+ auto cursor = db.query(NamespaceString::kRsOplogNamespace.ns(),
+ QUERY("ts" << BSON("$gte" << appliedThrough.getTimestamp())),
+ /*batchSize*/ 0,
+ /*skip*/ 0,
+ /*projection*/ nullptr,
+ QueryOption_OplogReplay);
+
+ // Check that the first document matches our appliedThrough point then skip it since it's
+ // already been applied.
+ if (!cursor->more()) {
+ // This should really be impossible because we check above that the top of the oplog is
+ // strictly > appliedThrough. If this fails it represents a serious bug in either the
+ // storage engine or query's implementation of OplogReplay.
+ severe() << "Couldn't find any entries in the oplog >= " << appliedThrough
+ << " which should be impossible.";
+ fassertFailedNoTrace(40293);
+ }
+ auto firstOpTimeFound = fassertStatusOK(40291, OpTime::parseFromOplogEntry(cursor->nextSafe()));
+ if (firstOpTimeFound != appliedThrough) {
+ severe() << "Oplog entry at " << appliedThrough << " is missing; actual entry found is "
+ << firstOpTimeFound;
+ fassertFailedNoTrace(40292);
+ }
+
+ // Apply remaining ops one at at time, but don't log them because they are already logged.
+ UnreplicatedWritesBlock uwb(opCtx);
+
+ while (cursor->more()) {
+ auto entry = cursor->nextSafe();
+ fassertStatusOK(40294, SyncTail::syncApply(opCtx, entry, true));
+ _consistencyMarkers->setAppliedThrough(
+ opCtx, fassertStatusOK(40295, OpTime::parseFromOplogEntry(entry)));
+ }
+}
+
+StatusWith<OpTime> ReplicationRecoveryImpl::_getLastAppliedOpTime(OperationContext* opCtx) const {
+ const auto docsSW = _storageInterface->findDocuments(opCtx,
+ NamespaceString::kRsOplogNamespace,
+ boost::none, // Collection scan
+ StorageInterface::ScanDirection::kBackward,
+ {},
+ BoundInclusion::kIncludeStartKeyOnly,
+ 1U);
+ if (!docsSW.isOK()) {
+ return docsSW.getStatus();
+ }
+ const auto docs = docsSW.getValue();
+ invariant(1U == docs.size());
+
+ return OpTime::parseFromOplogEntry(docs.front());
+}
+
+void ReplicationRecoveryImpl::_truncateOplogTo(OperationContext* opCtx,
+ Timestamp truncateTimestamp) {
+ const NamespaceString oplogNss(NamespaceString::kRsOplogNamespace);
+ AutoGetDb autoDb(opCtx, oplogNss.db(), MODE_IX);
+ Lock::CollectionLock oplogCollectionLoc(opCtx->lockState(), oplogNss.ns(), MODE_X);
+ Collection* oplogCollection = autoDb.getDb()->getCollection(opCtx, oplogNss);
+ if (!oplogCollection) {
+ fassertFailedWithStatusNoTrace(
+ 34418,
+ Status(ErrorCodes::NamespaceNotFound,
+ str::stream() << "Can't find " << NamespaceString::kRsOplogNamespace.ns()));
+ }
+
+ // Scan through oplog in reverse, from latest entry to first, to find the truncateTimestamp.
+ RecordId oldestIDToDelete; // Non-null if there is something to delete.
+ auto oplogRs = oplogCollection->getRecordStore();
+ auto oplogReverseCursor = oplogRs->getCursor(opCtx, /*forward=*/false);
+ size_t count = 0;
+ while (auto next = oplogReverseCursor->next()) {
+ const BSONObj entry = next->data.releaseToBson();
+ const RecordId id = next->id;
+ count++;
+
+ const auto tsElem = entry["ts"];
+ if (count == 1) {
+ if (tsElem.eoo())
+ LOG(2) << "Oplog tail entry: " << redact(entry);
+ else
+ LOG(2) << "Oplog tail entry ts field: " << tsElem;
+ }
+
+ if (tsElem.timestamp() < truncateTimestamp) {
+ // If count == 1, that means that we have nothing to delete because everything in the
+ // oplog is < truncateTimestamp.
+ if (count != 1) {
+ invariant(!oldestIDToDelete.isNull());
+ oplogCollection->cappedTruncateAfter(opCtx, oldestIDToDelete, /*inclusive=*/true);
+ }
+ return;
+ }
+
+ oldestIDToDelete = id;
+ }
+
+ severe() << "Reached end of oplog looking for oplog entry before " << truncateTimestamp.toBSON()
+ << " but couldn't find any after looking through " << count << " entries.";
+ fassertFailedNoTrace(40296);
+}
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/replication_recovery.h b/src/mongo/db/repl/replication_recovery.h
new file mode 100644
index 00000000000..77dbcf404e8
--- /dev/null
+++ b/src/mongo/db/repl/replication_recovery.h
@@ -0,0 +1,83 @@
+/**
+* Copyright (C) 2017 MongoDB Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*
+* As a special exception, the copyright holders give permission to link the
+* code of portions of this program with the OpenSSL library under certain
+* conditions as described in each individual source file and distribute
+* linked combinations including the program with the OpenSSL library. You
+* must comply with the GNU Affero General Public License in all respects for
+* all of the code used other than as permitted herein. If you modify file(s)
+* with this exception, you may extend this exception to your version of the
+* file(s), but you are not obligated to do so. If you do not wish to do so,
+* delete this exception statement from your version. If you delete this
+* exception statement from all source files in the program, then also delete
+* it in the license file.
+*/
+
+#pragma once
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/base/status_with.h"
+#include "mongo/db/repl/optime.h"
+
+namespace mongo {
+
+class OperationContext;
+
+namespace repl {
+
+class StorageInterface;
+class ReplicationConsistencyMarkers;
+
+/**
+ * This class is used by the replication system to recover after an unclean shutdown or a rollback.
+ */
+class ReplicationRecovery {
+public:
+ ReplicationRecovery() = default;
+ virtual ~ReplicationRecovery() = default;
+
+ /**
+ * Recovers the data on disk from the oplog.
+ */
+ virtual void recoverFromOplog(OperationContext* opCtx) = 0;
+};
+
+class ReplicationRecoveryImpl : public ReplicationRecovery {
+ MONGO_DISALLOW_COPYING(ReplicationRecoveryImpl);
+
+public:
+ ReplicationRecoveryImpl(StorageInterface* storageInterface,
+ ReplicationConsistencyMarkers* consistencyMarkers);
+
+ void recoverFromOplog(OperationContext* opCtx) override;
+
+private:
+ /**
+ * Gets the last applied OpTime from the end of the oplog.
+ */
+ StatusWith<OpTime> _getLastAppliedOpTime(OperationContext* opCtx) const;
+
+ /**
+ * Truncates the oplog after and including the "truncateTimestamp" entry.
+ */
+ void _truncateOplogTo(OperationContext* opCtx, Timestamp truncateTimestamp);
+
+ StorageInterface* _storageInterface;
+ ReplicationConsistencyMarkers* _consistencyMarkers;
+};
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/replication_recovery_mock.h b/src/mongo/db/repl/replication_recovery_mock.h
new file mode 100644
index 00000000000..220030ab0b5
--- /dev/null
+++ b/src/mongo/db/repl/replication_recovery_mock.h
@@ -0,0 +1,48 @@
+/**
+* Copyright (C) 2017 MongoDB Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*
+* As a special exception, the copyright holders give permission to link the
+* code of portions of this program with the OpenSSL library under certain
+* conditions as described in each individual source file and distribute
+* linked combinations including the program with the OpenSSL library. You
+* must comply with the GNU Affero General Public License in all respects for
+* all of the code used other than as permitted herein. If you modify file(s)
+* with this exception, you may extend this exception to your version of the
+* file(s), but you are not obligated to do so. If you do not wish to do so,
+* delete this exception statement from your version. If you delete this
+* exception statement from all source files in the program, then also delete
+* it in the license file.
+*/
+
+#pragma once
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/db/repl/replication_recovery.h"
+
+namespace mongo {
+class OperationContext;
+namespace repl {
+
+class ReplicationRecoveryMock : public ReplicationRecovery {
+ MONGO_DISALLOW_COPYING(ReplicationRecoveryMock);
+
+public:
+ ReplicationRecoveryMock() = default;
+
+ void recoverFromOplog(OperationContext* opCtx) override {}
+};
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/replication_recovery_test.cpp b/src/mongo/db/repl/replication_recovery_test.cpp
new file mode 100644
index 00000000000..e03a615287b
--- /dev/null
+++ b/src/mongo/db/repl/replication_recovery_test.cpp
@@ -0,0 +1,350 @@
+/**
+ * Copyright 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/repl/replication_recovery.h"
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/client.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/repl/oplog_interface_local.h"
+#include "mongo/db/repl/replication_consistency_markers_mock.h"
+#include "mongo/db/repl/replication_coordinator_mock.h"
+#include "mongo/db/repl/storage_interface_impl.h"
+#include "mongo/db/service_context_d_test_fixture.h"
+#include "mongo/stdx/memory.h"
+#include "mongo/unittest/death_test.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/assert_util.h"
+#include "mongo/util/mongoutils/str.h"
+
+namespace {
+
+using namespace mongo;
+using namespace mongo::repl;
+
+const auto& oplogNs = NamespaceString::kRsOplogNamespace;
+const NamespaceString testNs("a.a");
+
+class ReplicationRecoveryTest : public ServiceContextMongoDTest {
+protected:
+ OperationContext* getOperationContext() {
+ return _opCtx.get();
+ }
+
+ StorageInterface* getStorageInterface() {
+ return _storageInterface.get();
+ }
+
+ ReplicationConsistencyMarkers* getConsistencyMarkers() {
+ return _consistencyMarkers.get();
+ }
+
+private:
+ void setUp() override {
+ ServiceContextMongoDTest::setUp();
+ _createOpCtx();
+ _storageInterface = stdx::make_unique<StorageInterfaceImpl>();
+ _consistencyMarkers = stdx::make_unique<ReplicationConsistencyMarkersMock>();
+
+ auto service = getServiceContext();
+ ReplicationCoordinator::set(service,
+ stdx::make_unique<ReplicationCoordinatorMock>(service));
+
+ ASSERT_OK(_storageInterface->createCollection(
+ getOperationContext(), testNs, CollectionOptions()));
+ }
+
+ void tearDown() override {
+ _opCtx.reset(nullptr);
+ _consistencyMarkers.reset();
+ _storageInterface.reset();
+ ServiceContextMongoDTest::tearDown();
+ }
+
+ void _createOpCtx() {
+ _opCtx = cc().makeOperationContext();
+ }
+
+ ServiceContext::UniqueOperationContext _opCtx;
+ std::unique_ptr<StorageInterfaceImpl> _storageInterface;
+ std::unique_ptr<ReplicationConsistencyMarkersMock> _consistencyMarkers;
+};
+
+/**
+ * Generates a document to be inserted into the test collection.
+ */
+BSONObj _makeInsertDocument(int t) {
+ return BSON("_id" << t << "a" << t);
+}
+
+/**
+ * Generates oplog entries with the given number used for the timestamp.
+ */
+BSONObj _makeOplogEntry(int t) {
+ return BSON("ts" << Timestamp(t, t) << "h" << t << "ns" << testNs.ns() << "v" << 2 << "op"
+ << "i"
+ << "o"
+ << _makeInsertDocument(t));
+}
+
+/**
+ * Creates collection options suitable for oplog.
+ */
+CollectionOptions _createOplogCollectionOptions() {
+ CollectionOptions options;
+ options.capped = true;
+ options.cappedSize = 64 * 1024 * 1024LL;
+ options.autoIndexId = CollectionOptions::NO;
+ return options;
+}
+
+/**
+ * Creates an oplog with insert entries at the given timestamps.
+ */
+void _setUpOplog(OperationContext* opCtx, StorageInterface* storage, std::vector<int> timestamps) {
+ ASSERT_OK(storage->createCollection(opCtx, oplogNs, _createOplogCollectionOptions()));
+
+ for (int ts : timestamps) {
+ ASSERT_OK(storage->insertDocument(opCtx, oplogNs, _makeOplogEntry(ts)));
+ }
+}
+
+/**
+ * Check collection contents. OplogInterface returns documents in reverse natural order.
+ */
+void _assertDocumentsInCollectionEquals(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const std::vector<BSONObj>& docs) {
+ std::vector<BSONObj> reversedDocs(docs);
+ std::reverse(reversedDocs.begin(), reversedDocs.end());
+ OplogInterfaceLocal oplog(opCtx, nss.ns());
+ auto iter = oplog.makeIterator();
+ for (const auto& doc : reversedDocs) {
+ ASSERT_BSONOBJ_EQ(doc, unittest::assertGet(iter->next()).first);
+ }
+ ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus());
+}
+
+/**
+ * Asserts that the documents in the oplog have the given timestamps.
+ */
+void _assertDocsInOplog(OperationContext* opCtx, std::vector<int> timestamps) {
+ std::vector<BSONObj> expectedOplog(timestamps.size());
+ std::transform(timestamps.begin(), timestamps.end(), expectedOplog.begin(), [](int ts) {
+ return _makeOplogEntry(ts);
+ });
+ _assertDocumentsInCollectionEquals(opCtx, oplogNs, expectedOplog);
+}
+
+/**
+ * Asserts that the documents in the test collection have the given ids.
+ */
+void _assertDocsInTestCollection(OperationContext* opCtx, std::vector<int> ids) {
+ std::vector<BSONObj> expectedColl(ids.size());
+ std::transform(ids.begin(), ids.end(), expectedColl.begin(), [](int id) {
+ return _makeInsertDocument(id);
+ });
+ _assertDocumentsInCollectionEquals(opCtx, testNs, expectedColl);
+}
+
+TEST_F(ReplicationRecoveryTest, RecoveryWithEmptyOplogSucceeds) {
+ ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers());
+ auto opCtx = getOperationContext();
+
+ _setUpOplog(opCtx, getStorageInterface(), {});
+
+ recovery.recoverFromOplog(opCtx);
+
+ _assertDocsInOplog(opCtx, {});
+ _assertDocsInTestCollection(opCtx, {});
+}
+
+DEATH_TEST_F(ReplicationRecoveryTest,
+ RecoveryWithEmptyOplogAndNonNullAppliedThroughInvariants,
+ "Invariant failure 1U == docs.size()") {
+ ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers());
+ auto opCtx = getOperationContext();
+
+ _setUpOplog(opCtx, getStorageInterface(), {});
+
+ getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(3, 3), 1));
+ recovery.recoverFromOplog(opCtx);
+
+ _assertDocsInOplog(opCtx, {});
+ _assertDocsInTestCollection(opCtx, {});
+}
+
+DEATH_TEST_F(ReplicationRecoveryTest,
+ TruncateFassertsWithoutOplogCollection,
+ "Fatal assertion 34418 NamespaceNotFound: Can't find local.oplog.rs") {
+ ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers());
+ auto opCtx = getOperationContext();
+
+ getConsistencyMarkers()->setOplogTruncateAfterPoint(opCtx, Timestamp(4, 4));
+ getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(3, 3), 1));
+
+ // Create the database.
+ ASSERT_OK(getStorageInterface()->createCollection(
+ opCtx, NamespaceString("local.other"), CollectionOptions()));
+
+ recovery.recoverFromOplog(opCtx);
+}
+
+DEATH_TEST_F(ReplicationRecoveryTest,
+ TruncateEntireOplogFasserts,
+ "Reached end of oplog looking for oplog entry before { : Timestamp 4000|4 } but "
+ "couldn't find any after looking through 3 entries.") {
+ ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers());
+ auto opCtx = getOperationContext();
+
+ getConsistencyMarkers()->setOplogTruncateAfterPoint(opCtx, Timestamp(4, 4));
+ getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(3, 3), 1));
+ _setUpOplog(opCtx, getStorageInterface(), {7, 8, 9});
+
+ recovery.recoverFromOplog(opCtx);
+}
+
+TEST_F(ReplicationRecoveryTest, RecoveryTruncatesOplogAtOplogTruncateAfterPoint) {
+ ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers());
+ auto opCtx = getOperationContext();
+
+ getConsistencyMarkers()->setOplogTruncateAfterPoint(opCtx, Timestamp(4, 4));
+ getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(3, 3), 1));
+ _setUpOplog(opCtx, getStorageInterface(), {1, 2, 3, 4, 5});
+
+ recovery.recoverFromOplog(opCtx);
+
+ _assertDocsInOplog(opCtx, {1, 2, 3});
+ _assertDocsInTestCollection(opCtx, {});
+ ASSERT_EQ(getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx), Timestamp());
+ ASSERT_EQ(getConsistencyMarkers()->getAppliedThrough(opCtx), OpTime(Timestamp(3, 3), 1));
+}
+
+TEST_F(ReplicationRecoveryTest, RecoverySkipsEverythingIfInitialSyncFlagIsSet) {
+ ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers());
+ auto opCtx = getOperationContext();
+
+ getConsistencyMarkers()->setInitialSyncFlag(opCtx);
+ getConsistencyMarkers()->setOplogTruncateAfterPoint(opCtx, Timestamp(4, 4));
+ getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(1, 1), 1));
+ _setUpOplog(opCtx, getStorageInterface(), {1, 2, 3, 4, 5});
+
+ recovery.recoverFromOplog(opCtx);
+
+ _assertDocsInOplog(opCtx, {1, 2, 3, 4, 5});
+ _assertDocsInTestCollection(opCtx, {});
+ ASSERT_EQ(getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx), Timestamp(4, 4));
+ ASSERT_EQ(getConsistencyMarkers()->getAppliedThrough(opCtx), OpTime(Timestamp(1, 1), 1));
+}
+
+TEST_F(ReplicationRecoveryTest, RecoveryResetsOplogTruncateAfterPointWhenAppliedThroughIsNull) {
+ ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers());
+ auto opCtx = getOperationContext();
+
+ getConsistencyMarkers()->setOplogTruncateAfterPoint(opCtx, Timestamp(4, 4));
+ getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime());
+ _setUpOplog(opCtx, getStorageInterface(), {1, 2, 3, 4, 5});
+
+ recovery.recoverFromOplog(opCtx);
+
+ _assertDocsInOplog(opCtx, {1, 2, 3, 4, 5});
+ _assertDocsInTestCollection(opCtx, {});
+ ASSERT_EQ(getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx), Timestamp());
+ ASSERT_EQ(getConsistencyMarkers()->getAppliedThrough(opCtx), OpTime());
+}
+
+TEST_F(ReplicationRecoveryTest, RecoveryAppliesDocumentsWhenAppliedThroughIsBehind) {
+ ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers());
+ auto opCtx = getOperationContext();
+
+ getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(3, 3), 1));
+ _setUpOplog(opCtx, getStorageInterface(), {1, 2, 3, 4, 5});
+
+ recovery.recoverFromOplog(opCtx);
+
+ _assertDocsInOplog(opCtx, {1, 2, 3, 4, 5});
+ _assertDocsInTestCollection(opCtx, {4, 5});
+ ASSERT_EQ(getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx), Timestamp());
+ ASSERT_EQ(getConsistencyMarkers()->getAppliedThrough(opCtx), OpTime(Timestamp(5, 5), 1));
+}
+
+TEST_F(ReplicationRecoveryTest, RecoveryAppliesDocumentsWhenAppliedThroughIsBehindAfterTruncation) {
+ ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers());
+ auto opCtx = getOperationContext();
+
+ getConsistencyMarkers()->setOplogTruncateAfterPoint(opCtx, Timestamp(4, 4));
+ getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(1, 1), 1));
+ _setUpOplog(opCtx, getStorageInterface(), {1, 2, 3, 4, 5});
+
+ recovery.recoverFromOplog(opCtx);
+
+ _assertDocsInOplog(opCtx, {1, 2, 3});
+ _assertDocsInTestCollection(opCtx, {2, 3});
+ ASSERT_EQ(getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx), Timestamp());
+ ASSERT_EQ(getConsistencyMarkers()->getAppliedThrough(opCtx), OpTime(Timestamp(3, 3), 1));
+}
+
+DEATH_TEST_F(ReplicationRecoveryTest, AppliedThroughBehindOplogFasserts, "Fatal Assertion 40292") {
+ ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers());
+ auto opCtx = getOperationContext();
+
+ getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(1, 1), 1));
+ _setUpOplog(opCtx, getStorageInterface(), {3, 4, 5});
+
+ recovery.recoverFromOplog(opCtx);
+}
+
+DEATH_TEST_F(ReplicationRecoveryTest,
+ AppliedThroughAheadOfTopOfOplogCausesFassert,
+ "Applied op { ts: Timestamp 9000|9, t: 1 } not found. Top of oplog is { ts: Timestamp "
+ "5000|5, t: -1 }.") {
+ ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers());
+ auto opCtx = getOperationContext();
+
+ getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(9, 9), 1));
+ _setUpOplog(opCtx, getStorageInterface(), {1, 2, 3, 4, 5});
+
+ recovery.recoverFromOplog(opCtx);
+}
+
+DEATH_TEST_F(ReplicationRecoveryTest,
+ AppliedThroughNotInOplogCausesFassert,
+ "Oplog entry at { ts: Timestamp 3000|3, t: 1 } is missing; actual entry found is { "
+ "ts: Timestamp 4000|4, t: -1 }") {
+ ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers());
+ auto opCtx = getOperationContext();
+
+ getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(3, 3), 1));
+ _setUpOplog(opCtx, getStorageInterface(), {1, 2, 4, 5});
+
+ recovery.recoverFromOplog(opCtx);
+}
+
+} // namespace
diff --git a/src/mongo/db/repl/rollback_test_fixture.cpp b/src/mongo/db/repl/rollback_test_fixture.cpp
index 89d52be56e2..4bfd76b96f4 100644
--- a/src/mongo/db/repl/rollback_test_fixture.cpp
+++ b/src/mongo/db/repl/rollback_test_fixture.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/repl/replication_process.h"
+#include "mongo/db/repl/replication_recovery_mock.h"
#include "mongo/db/session_catalog.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/mongoutils/str.h"
@@ -63,7 +64,9 @@ void RollbackTest::setUp() {
_serviceContextMongoDTest.setUp();
auto serviceContext = _serviceContextMongoDTest.getServiceContext();
_replicationProcess = stdx::make_unique<ReplicationProcess>(
- &_storageInterface, stdx::make_unique<ReplicationConsistencyMarkersMock>());
+ &_storageInterface,
+ stdx::make_unique<ReplicationConsistencyMarkersMock>(),
+ stdx::make_unique<ReplicationRecoveryMock>());
_dropPendingCollectionReaper = new DropPendingCollectionReaper(&_storageInterface);
DropPendingCollectionReaper::set(
serviceContext, std::unique_ptr<DropPendingCollectionReaper>(_dropPendingCollectionReaper));
diff --git a/src/mongo/db/repl/sync_tail_test_fixture.cpp b/src/mongo/db/repl/sync_tail_test_fixture.cpp
index a67817924ba..c9998c89660 100644
--- a/src/mongo/db/repl/sync_tail_test_fixture.cpp
+++ b/src/mongo/db/repl/sync_tail_test_fixture.cpp
@@ -36,6 +36,7 @@
#include "mongo/db/repl/replication_consistency_markers_mock.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/repl/replication_process.h"
+#include "mongo/db/repl/replication_recovery_mock.h"
#include "mongo/db/repl/storage_interface_mock.h"
namespace mongo {
@@ -56,8 +57,10 @@ void SyncTailTest::setUp() {
DropPendingCollectionReaper::set(
service, stdx::make_unique<DropPendingCollectionReaper>(_storageInterface));
- _replicationProcess = new ReplicationProcess(
- _storageInterface, stdx::make_unique<ReplicationConsistencyMarkersMock>());
+ _replicationProcess =
+ new ReplicationProcess(_storageInterface,
+ stdx::make_unique<ReplicationConsistencyMarkersMock>(),
+ stdx::make_unique<ReplicationRecoveryMock>());
ReplicationProcess::set(cc().getServiceContext(),
std::unique_ptr<ReplicationProcess>(_replicationProcess));
diff --git a/src/mongo/dbtests/replica_set_tests.cpp b/src/mongo/dbtests/replica_set_tests.cpp
index e70a221fa57..c6453e49e2e 100644
--- a/src/mongo/dbtests/replica_set_tests.cpp
+++ b/src/mongo/dbtests/replica_set_tests.cpp
@@ -35,6 +35,7 @@
#include "mongo/db/repl/replication_consistency_markers_impl.h"
#include "mongo/db/repl/replication_coordinator_external_state_impl.h"
#include "mongo/db/repl/replication_process.h"
+#include "mongo/db/repl/replication_recovery.h"
#include "mongo/db/repl/storage_interface_mock.h"
#include "mongo/db/service_context.h"
#include "mongo/unittest/unittest.h"
@@ -53,9 +54,12 @@ protected:
_storageInterface = stdx::make_unique<repl::StorageInterfaceMock>();
_dropPendingCollectionReaper =
stdx::make_unique<repl::DropPendingCollectionReaper>(_storageInterface.get());
+ auto consistencyMarkers =
+ stdx::make_unique<repl::ReplicationConsistencyMarkersImpl>(_storageInterface.get());
+ auto recovery = stdx::make_unique<repl::ReplicationRecoveryImpl>(_storageInterface.get(),
+ consistencyMarkers.get());
_replicationProcess = stdx::make_unique<repl::ReplicationProcess>(
- _storageInterface.get(),
- stdx::make_unique<repl::ReplicationConsistencyMarkersImpl>(_storageInterface.get()));
+ _storageInterface.get(), std::move(consistencyMarkers), std::move(recovery));
_replCoordExternalState = stdx::make_unique<repl::ReplicationCoordinatorExternalStateImpl>(
opCtx->getServiceContext(),
_dropPendingCollectionReaper.get(),
diff --git a/src/mongo/s/sharding_mongod_test_fixture.cpp b/src/mongo/s/sharding_mongod_test_fixture.cpp
index 3a9fa023b2a..453951a1735 100644
--- a/src/mongo/s/sharding_mongod_test_fixture.cpp
+++ b/src/mongo/s/sharding_mongod_test_fixture.cpp
@@ -51,6 +51,7 @@
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/repl/replication_process.h"
+#include "mongo/db/repl/replication_recovery_mock.h"
#include "mongo/db/repl/storage_interface_mock.h"
#include "mongo/db/service_context_noop.h"
#include "mongo/executor/network_interface_mock.h"
@@ -130,10 +131,11 @@ void ShardingMongodTestFixture::setUp() {
repl::DropPendingCollectionReaper::set(
service, stdx::make_unique<repl::DropPendingCollectionReaper>(storagePtr.get()));
- repl::ReplicationProcess::set(
- service,
- stdx::make_unique<repl::ReplicationProcess>(
- storagePtr.get(), stdx::make_unique<repl::ReplicationConsistencyMarkersMock>()));
+ repl::ReplicationProcess::set(service,
+ stdx::make_unique<repl::ReplicationProcess>(
+ storagePtr.get(),
+ stdx::make_unique<repl::ReplicationConsistencyMarkersMock>(),
+ stdx::make_unique<repl::ReplicationRecoveryMock>()));
repl::ReplicationProcess::get(_opCtx.get())
->initializeRollbackID(_opCtx.get())
.transitional_ignore();