summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMathias Stearn <mathias@10gen.com>2017-08-07 20:18:33 -0400
committerMathias Stearn <mathias@10gen.com>2017-08-07 20:18:33 -0400
commitfd62ac35e27f83155cbe3d60bf02c49f45298e54 (patch)
tree09368db6e56db643e06c0ee97c65b07dc753f9c0
parent9255b0a684c6c9ca35da96493b91f04b832dc792 (diff)
downloadmongo-fd62ac35e27f83155cbe3d60bf02c49f45298e54.tar.gz
Revert "SERVER-29893 Rename recovery code and make it accessible to both startup and rollback"
This reverts commit 8fa770baf8fac6e71a45f84b48eeb3bae96a8dab.
-rw-r--r--src/mongo/db/db.cpp8
-rw-r--r--src/mongo/db/repl/SConscript29
-rw-r--r--src/mongo/db/repl/initial_syncer_test.cpp5
-rw-r--r--src/mongo/db/repl/oplog.cpp50
-rw-r--r--src/mongo/db/repl/oplog.h5
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state.h7
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp83
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.h1
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.h1
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator_test_fixture.cpp10
-rw-r--r--src/mongo/db/repl/replication_process.cpp8
-rw-r--r--src/mongo/db/repl/replication_process.h11
-rw-r--r--src/mongo/db/repl/replication_process_test.cpp28
-rw-r--r--src/mongo/db/repl/replication_recovery.cpp200
-rw-r--r--src/mongo/db/repl/replication_recovery.h83
-rw-r--r--src/mongo/db/repl/replication_recovery_mock.h48
-rw-r--r--src/mongo/db/repl/replication_recovery_test.cpp350
-rw-r--r--src/mongo/db/repl/rollback_test_fixture.cpp5
-rw-r--r--src/mongo/db/repl/sync_tail_test_fixture.cpp7
-rw-r--r--src/mongo/dbtests/replica_set_tests.cpp8
-rw-r--r--src/mongo/s/sharding_mongod_test_fixture.cpp10
23 files changed, 177 insertions, 784 deletions
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp
index 27eb07bdb53..7f7c49adfc4 100644
--- a/src/mongo/db/db.cpp
+++ b/src/mongo/db/db.cpp
@@ -93,7 +93,6 @@
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/repl/replication_coordinator_impl.h"
#include "mongo/db/repl/replication_process.h"
-#include "mongo/db/repl/replication_recovery.h"
#include "mongo/db/repl/storage_interface_impl.h"
#include "mongo/db/repl/topology_coordinator_impl.h"
#include "mongo/db/s/balancer/balancer.h"
@@ -916,14 +915,11 @@ MONGO_INITIALIZER_WITH_PREREQUISITES(CreateReplicationManager,
repl::StorageInterface::set(serviceContext, stdx::make_unique<repl::StorageInterfaceImpl>());
auto storageInterface = repl::StorageInterface::get(serviceContext);
- auto consistencyMarkers =
- stdx::make_unique<repl::ReplicationConsistencyMarkersImpl>(storageInterface);
- auto recovery = stdx::make_unique<repl::ReplicationRecoveryImpl>(storageInterface,
- consistencyMarkers.get());
repl::ReplicationProcess::set(
serviceContext,
stdx::make_unique<repl::ReplicationProcess>(
- storageInterface, std::move(consistencyMarkers), std::move(recovery)));
+ storageInterface,
+ stdx::make_unique<repl::ReplicationConsistencyMarkersImpl>(storageInterface)));
auto replicationProcess = repl::ReplicationProcess::get(serviceContext);
repl::DropPendingCollectionReaper::set(
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 33afbd008ee..0c841ed5e45 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -223,34 +223,6 @@ env.CppUnitTest(
)
env.Library(
- target='replication_recovery',
- source=[
- 'replication_recovery.cpp',
- ],
- LIBDEPS=[
- ],
- LIBDEPS_PRIVATE=[
- 'sync_tail',
- '$BUILD_DIR/mongo/base',
- ],
-)
-
-env.CppUnitTest(
- target='replication_recovery_test',
- source=[
- 'replication_recovery_test.cpp',
- ],
- LIBDEPS=[
- ],
- LIBDEPS_PRIVATE=[
- 'replmocks',
- 'replication_recovery',
- 'storage_interface_impl',
- '$BUILD_DIR/mongo/db/service_context_d_test_fixture',
- ],
-)
-
-env.Library(
target='replication_process',
source=[
'replication_consistency_markers.cpp',
@@ -1555,7 +1527,6 @@ env.Library(
'repl_settings',
'replication_consistency_markers_impl',
'replication_process',
- 'replication_recovery',
'rollback_source_impl',
'sync_tail',
],
diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp
index 1af4ecd3264..2a402f2ff06 100644
--- a/src/mongo/db/repl/initial_syncer_test.cpp
+++ b/src/mongo/db/repl/initial_syncer_test.cpp
@@ -45,7 +45,6 @@
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/replication_consistency_markers_mock.h"
#include "mongo/db/repl/replication_process.h"
-#include "mongo/db/repl/replication_recovery_mock.h"
#include "mongo/db/repl/reporter.h"
#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/repl/storage_interface_mock.h"
@@ -286,9 +285,7 @@ protected:
launchExecutorThread();
_replicationProcess = stdx::make_unique<ReplicationProcess>(
- _storageInterface.get(),
- stdx::make_unique<ReplicationConsistencyMarkersMock>(),
- stdx::make_unique<ReplicationRecoveryMock>());
+ _storageInterface.get(), stdx::make_unique<ReplicationConsistencyMarkersMock>());
_executorProxy = stdx::make_unique<TaskExecutorMock>(&getExecutor());
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 52e23d85477..d800549a9d9 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -364,6 +364,56 @@ OplogDocWriter _logOpWriter(OperationContext* opCtx,
}
} // end anon namespace
+// Truncates the oplog after and including the "truncateTimestamp" entry.
+void truncateOplogTo(OperationContext* opCtx, Timestamp truncateTimestamp) {
+ const NamespaceString oplogNss(NamespaceString::kRsOplogNamespace);
+ AutoGetDb autoDb(opCtx, oplogNss.db(), MODE_IX);
+ Lock::CollectionLock oplogCollectionLoc(opCtx->lockState(), oplogNss.ns(), MODE_X);
+ Collection* oplogCollection = autoDb.getDb()->getCollection(opCtx, oplogNss);
+ if (!oplogCollection) {
+ fassertFailedWithStatusNoTrace(
+ 34418,
+ Status(ErrorCodes::NamespaceNotFound,
+ str::stream() << "Can't find " << NamespaceString::kRsOplogNamespace.ns()));
+ }
+
+ // Scan through oplog in reverse, from latest entry to first, to find the truncateTimestamp.
+ RecordId oldestIDToDelete; // Non-null if there is something to delete.
+ auto oplogRs = oplogCollection->getRecordStore();
+ auto oplogReverseCursor = oplogRs->getCursor(opCtx, /*forward=*/false);
+ size_t count = 0;
+ while (auto next = oplogReverseCursor->next()) {
+ const BSONObj entry = next->data.releaseToBson();
+ const RecordId id = next->id;
+ count++;
+
+ const auto tsElem = entry["ts"];
+ if (count == 1) {
+ if (tsElem.eoo())
+ LOG(2) << "Oplog tail entry: " << redact(entry);
+ else
+ LOG(2) << "Oplog tail entry ts field: " << tsElem;
+ }
+
+ if (tsElem.timestamp() < truncateTimestamp) {
+ // If count == 1, that means that we have nothing to delete because everything in the
+ // oplog is < truncateTimestamp.
+ if (count != 1) {
+ invariant(!oldestIDToDelete.isNull());
+ oplogCollection->cappedTruncateAfter(opCtx, oldestIDToDelete, /*inclusive=*/true);
+ }
+ return;
+ }
+
+ oldestIDToDelete = id;
+ }
+
+ severe() << "Reached end of oplog looking for oplog entry before "
+ << truncateTimestamp.toStringPretty()
+ << " but couldn't find any after looking through " << count << " entries.";
+ fassertFailedNoTrace(40296);
+}
+
/* we write to local.oplog.rs:
{ ts : ..., h: ..., v: ..., op: ..., etc }
ts: an OpTime timestamp
diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h
index f119534ad7a..f3974984514 100644
--- a/src/mongo/db/repl/oplog.h
+++ b/src/mongo/db/repl/oplog.h
@@ -50,6 +50,11 @@ namespace repl {
class ReplSettings;
/**
+ * Truncates the oplog after, and including, the "truncateTimestamp" entry.
+ */
+void truncateOplogTo(OperationContext* opCtx, Timestamp truncateTimestamp);
+
+/**
* Create a new capped collection for the oplog if it doesn't yet exist.
* If the collection already exists (and isReplSet is false),
* set the 'last' Timestamp from the last entry of the oplog collection (side effect!)
diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h
index 617c92cb7fa..81d217dac06 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state.h
@@ -204,6 +204,13 @@ public:
virtual StatusWith<OpTime> loadLastOpTime(OperationContext* opCtx) = 0;
/**
+ * Cleaning up the oplog, by potentially truncating:
+ * If we are recovering from a failed batch then minvalid.start though minvalid.end need
+ * to be removed from the oplog before we can start applying operations.
+ */
+ virtual void cleanUpLastApplyBatch(OperationContext* opCtx) = 0;
+
+ /**
* Returns the HostAndPort of the remote client connected to us that initiated the operation
* represented by "opCtx".
*/
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index cc8edcc6722..5bdfcc46ca2 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -568,6 +568,89 @@ void ReplicationCoordinatorExternalStateImpl::setGlobalTimestamp(ServiceContext*
setNewTimestamp(ctx, newTime);
}
+void ReplicationCoordinatorExternalStateImpl::cleanUpLastApplyBatch(OperationContext* opCtx) {
+ if (_replicationProcess->getConsistencyMarkers()->getInitialSyncFlag(opCtx)) {
+ return; // Initial Sync will take over so no cleanup is needed.
+ }
+
+ const auto truncateAfterPoint =
+ _replicationProcess->getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx);
+ const auto appliedThrough =
+ _replicationProcess->getConsistencyMarkers()->getAppliedThrough(opCtx);
+
+ const bool needToDeleteEndOfOplog = !truncateAfterPoint.isNull() &&
+ // This version should never have a non-null truncateAfterPoint with a null appliedThrough.
+ // This scenario means that we downgraded after unclean shutdown, then the downgraded node
+ // deleted the ragged end of our oplog, then did a clean shutdown.
+ !appliedThrough.isNull() &&
+ // Similarly we should never have an appliedThrough higher than the truncateAfterPoint. This
+ // means that the downgraded node deleted our ragged end then applied ahead of our
+ // truncateAfterPoint and then had an unclean shutdown before upgrading. We are ok with
+ // applying these ops because older versions wrote to the oplog from a single thread so we
+ // know they are in order.
+ !(appliedThrough.getTimestamp() >= truncateAfterPoint);
+ if (needToDeleteEndOfOplog) {
+ log() << "Removing unapplied entries starting at: " << truncateAfterPoint;
+ truncateOplogTo(opCtx, truncateAfterPoint);
+ }
+ _replicationProcess->getConsistencyMarkers()->setOplogTruncateAfterPoint(
+ opCtx, {}); // clear the truncateAfterPoint
+
+ if (appliedThrough.isNull()) {
+ // No follow-up work to do.
+ return;
+ }
+
+ // Check if we have any unapplied ops in our oplog. It is important that this is done after
+ // deleting the ragged end of the oplog.
+ const auto topOfOplog = fassertStatusOK(40290, loadLastOpTime(opCtx));
+ if (appliedThrough == topOfOplog) {
+ return; // We've applied all the valid oplog we have.
+ } else if (appliedThrough > topOfOplog) {
+ severe() << "Applied op " << appliedThrough << " not found. Top of oplog is " << topOfOplog
+ << '.';
+ fassertFailedNoTrace(40313);
+ }
+
+ log() << "Replaying stored operations from " << appliedThrough << " (exclusive) to "
+ << topOfOplog << " (inclusive).";
+
+ DBDirectClient db(opCtx);
+ auto cursor = db.query(NamespaceString::kRsOplogNamespace.ns(),
+ QUERY("ts" << BSON("$gte" << appliedThrough.getTimestamp())),
+ /*batchSize*/ 0,
+ /*skip*/ 0,
+ /*projection*/ nullptr,
+ QueryOption_OplogReplay);
+
+ // Check that the first document matches our appliedThrough point then skip it since it's
+ // already been applied.
+ if (!cursor->more()) {
+ // This should really be impossible because we check above that the top of the oplog is
+ // strictly > appliedThrough. If this fails it represents a serious bug in either the
+ // storage engine or query's implementation of OplogReplay.
+ severe() << "Couldn't find any entries in the oplog >= " << appliedThrough
+ << " which should be impossible.";
+ fassertFailedNoTrace(40293);
+ }
+ auto firstOpTimeFound = fassertStatusOK(40291, OpTime::parseFromOplogEntry(cursor->nextSafe()));
+ if (firstOpTimeFound != appliedThrough) {
+ severe() << "Oplog entry at " << appliedThrough << " is missing; actual entry found is "
+ << firstOpTimeFound;
+ fassertFailedNoTrace(40292);
+ }
+
+ // Apply remaining ops one at at time, but don't log them because they are already logged.
+ UnreplicatedWritesBlock uwb(opCtx);
+
+ while (cursor->more()) {
+ auto entry = cursor->nextSafe();
+ fassertStatusOK(40294, SyncTail::syncApply(opCtx, entry, true));
+ _replicationProcess->getConsistencyMarkers()->setAppliedThrough(
+ opCtx, fassertStatusOK(40295, OpTime::parseFromOplogEntry(entry)));
+ }
+}
+
StatusWith<OpTime> ReplicationCoordinatorExternalStateImpl::loadLastOpTime(
OperationContext* opCtx) {
// TODO: handle WriteConflictExceptions below
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
index 0893bdc16bd..68318f01455 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
@@ -92,6 +92,7 @@ public:
virtual Status storeLocalLastVoteDocument(OperationContext* opCtx, const LastVote& lastVote);
virtual void setGlobalTimestamp(ServiceContext* service, const Timestamp& newTime);
virtual StatusWith<OpTime> loadLastOpTime(OperationContext* opCtx);
+ virtual void cleanUpLastApplyBatch(OperationContext* opCtx);
virtual HostAndPort getClientHostAndPort(const OperationContext* opCtx);
virtual void closeConnections();
virtual void killAllUserOperations(OperationContext* opCtx);
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
index 1f544138fcc..524db05c86a 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
@@ -166,6 +166,8 @@ void ReplicationCoordinatorExternalStateMock::setLocalLastVoteDocument(
void ReplicationCoordinatorExternalStateMock::setGlobalTimestamp(ServiceContext* service,
const Timestamp& newTime) {}
+void ReplicationCoordinatorExternalStateMock::cleanUpLastApplyBatch(OperationContext* opCtx) {}
+
StatusWith<OpTime> ReplicationCoordinatorExternalStateMock::loadLastOpTime(
OperationContext* opCtx) {
return _lastOpTime;
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
index c18c8c8fc16..5f6143ce329 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
@@ -80,6 +80,7 @@ public:
virtual Status storeLocalLastVoteDocument(OperationContext* opCtx, const LastVote& lastVote);
virtual void setGlobalTimestamp(ServiceContext* service, const Timestamp& newTime);
virtual StatusWith<OpTime> loadLastOpTime(OperationContext* opCtx);
+ virtual void cleanUpLastApplyBatch(OperationContext* opCtx);
virtual void closeConnections();
virtual void killAllUserOperations(OperationContext* opCtx);
virtual void shardingOnStepDownHook();
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 9c513cc73e8..5ca9f6cabf5 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -447,7 +447,7 @@ bool ReplicationCoordinatorImpl::_startLoadLocalConfig(OperationContext* opCtx)
}
// Read the last op from the oplog after cleaning up any partially applied batches.
- _replicationProcess->getReplicationRecovery()->recoverFromOplog(opCtx);
+ _externalState->cleanUpLastApplyBatch(opCtx);
auto lastOpTimeStatus = _externalState->loadLastOpTime(opCtx);
// Use a callback here, because _finishLoadLocalConfig calls isself() which requires
diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
index ffc5af1de2d..5e8eac78138 100644
--- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
+++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
@@ -42,7 +42,6 @@
#include "mongo/db/repl/replication_coordinator_external_state_mock.h"
#include "mongo/db/repl/replication_coordinator_impl.h"
#include "mongo/db/repl/replication_process.h"
-#include "mongo/db/repl/replication_recovery_mock.h"
#include "mongo/db/repl/storage_interface_mock.h"
#include "mongo/db/repl/topology_coordinator_impl.h"
#include "mongo/executor/network_interface_mock.h"
@@ -126,11 +125,10 @@ void ReplCoordTest::init() {
StorageInterface::set(service, std::unique_ptr<StorageInterface>(_storageInterface));
ASSERT_TRUE(_storageInterface == StorageInterface::get(service));
- ReplicationProcess::set(service,
- stdx::make_unique<ReplicationProcess>(
- _storageInterface,
- stdx::make_unique<ReplicationConsistencyMarkersMock>(),
- stdx::make_unique<ReplicationRecoveryMock>()));
+ ReplicationProcess::set(
+ service,
+ stdx::make_unique<ReplicationProcess>(
+ _storageInterface, stdx::make_unique<ReplicationConsistencyMarkersMock>()));
auto replicationProcess = ReplicationProcess::get(service);
// PRNG seed for tests.
diff --git a/src/mongo/db/repl/replication_process.cpp b/src/mongo/db/repl/replication_process.cpp
index cde7fcdbae9..7e7dc077b11 100644
--- a/src/mongo/db/repl/replication_process.cpp
+++ b/src/mongo/db/repl/replication_process.cpp
@@ -82,11 +82,9 @@ void ReplicationProcess::set(ServiceContext* service, std::unique_ptr<Replicatio
ReplicationProcess::ReplicationProcess(
StorageInterface* storageInterface,
- std::unique_ptr<ReplicationConsistencyMarkers> consistencyMarkers,
- std::unique_ptr<ReplicationRecovery> recovery)
+ std::unique_ptr<ReplicationConsistencyMarkers> consistencyMarkers)
: _storageInterface(storageInterface),
_consistencyMarkers(std::move(consistencyMarkers)),
- _recovery(std::move(recovery)),
_rbid(kUninitializedRollbackId) {}
StatusWith<int> ReplicationProcess::getRollbackID(OperationContext* opCtx) {
@@ -184,9 +182,5 @@ ReplicationConsistencyMarkers* ReplicationProcess::getConsistencyMarkers() {
return _consistencyMarkers.get();
}
-ReplicationRecovery* ReplicationProcess::getReplicationRecovery() {
- return _recovery.get();
-}
-
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/replication_process.h b/src/mongo/db/repl/replication_process.h
index 5b6229ac6d0..da4a2886c6a 100644
--- a/src/mongo/db/repl/replication_process.h
+++ b/src/mongo/db/repl/replication_process.h
@@ -37,7 +37,6 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/replication_consistency_markers.h"
-#include "mongo/db/repl/replication_recovery.h"
#include "mongo/stdx/mutex.h"
namespace mongo {
@@ -82,8 +81,7 @@ public:
static void set(ServiceContext* service, std::unique_ptr<ReplicationProcess> process);
ReplicationProcess(StorageInterface* storageInterface,
- std::unique_ptr<ReplicationConsistencyMarkers> consistencyMarkers,
- std::unique_ptr<ReplicationRecovery> recovery);
+ std::unique_ptr<ReplicationConsistencyMarkers> consistencyMarkers);
virtual ~ReplicationProcess() = default;
/**
@@ -136,11 +134,6 @@ public:
*/
ReplicationConsistencyMarkers* getConsistencyMarkers();
- /**
- * Returns an object used to recover from the oplog on startup or rollback.
- */
- ReplicationRecovery* getReplicationRecovery();
-
private:
// All member variables are labeled with one of the following codes indicating the
// synchronization rules for accessing them.
@@ -158,8 +151,6 @@ private:
// Used for operations on documents that maintain replication consistency.
std::unique_ptr<ReplicationConsistencyMarkers> _consistencyMarkers; // (S)
- std::unique_ptr<ReplicationRecovery> _recovery; // (S)
-
// Rollback ID. This is a cached copy of the persisted value in the local.system.rollback.id
// collection.
int _rbid; // (M)
diff --git a/src/mongo/db/repl/replication_process_test.cpp b/src/mongo/db/repl/replication_process_test.cpp
index 40a51312433..8464b54906f 100644
--- a/src/mongo/db/repl/replication_process_test.cpp
+++ b/src/mongo/db/repl/replication_process_test.cpp
@@ -37,7 +37,6 @@
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/repl/replication_process.h"
-#include "mongo/db/repl/replication_recovery_mock.h"
#include "mongo/db/repl/storage_interface_impl.h"
#include "mongo/db/repl/storage_interface_mock.h"
#include "mongo/db/service_context.h"
@@ -79,8 +78,7 @@ TEST_F(ReplicationProcessTest, ServiceContextDecorator) {
ASSERT_FALSE(ReplicationProcess::get(serviceContext));
ReplicationProcess* replicationProcess = new ReplicationProcess(
_storageInterface.get(),
- stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()),
- stdx::make_unique<ReplicationRecoveryMock>());
+ stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()));
ReplicationProcess::set(serviceContext,
std::unique_ptr<ReplicationProcess>(replicationProcess));
ASSERT_TRUE(replicationProcess == ReplicationProcess::get(serviceContext));
@@ -92,8 +90,7 @@ TEST_F(ReplicationProcessTest,
GetRollbackProgressReturnsNoSuchKeyIfDocumentWithIdProgressIsNotFound) {
ReplicationProcess replicationProcess(
_storageInterface.get(),
- stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()),
- stdx::make_unique<ReplicationRecoveryMock>());
+ stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()));
// Collection is not found.
auto opCtx = makeOpCtx();
@@ -129,8 +126,7 @@ TEST_F(ReplicationProcessTest, GetRollbackProgressReturnsBadStatusIfApplyUntilFi
ReplicationProcess replicationProcess(
_storageInterface.get(),
- stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()),
- stdx::make_unique<ReplicationRecoveryMock>());
+ stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()));
ASSERT_EQUALS(ErrorCodes::TypeMismatch, replicationProcess.getRollbackProgress(opCtx.get()));
}
@@ -151,8 +147,7 @@ TEST_F(ReplicationProcessTest,
ReplicationProcess replicationProcess(
_storageInterface.get(),
- stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()),
- stdx::make_unique<ReplicationRecoveryMock>());
+ stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()));
ASSERT_EQUALS(ErrorCodes::TypeMismatch, replicationProcess.getRollbackProgress(opCtx.get()));
}
@@ -171,8 +166,7 @@ TEST_F(ReplicationProcessTest,
ReplicationProcess replicationProcess(
_storageInterface.get(),
- stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()),
- stdx::make_unique<ReplicationRecoveryMock>());
+ stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()));
ASSERT_EQUALS(applyUntil,
unittest::assertGet(replicationProcess.getRollbackProgress(opCtx.get())));
@@ -187,8 +181,7 @@ TEST_F(ReplicationProcessTest,
auto opCtx = makeOpCtx();
ReplicationProcess replicationProcess(
_storageInterface.get(),
- stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()),
- stdx::make_unique<ReplicationRecoveryMock>());
+ stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()));
ASSERT_OK(replicationProcess.setRollbackProgress(opCtx.get(), applyUntil));
ASSERT_EQUALS(1U,
unittest::assertGet(_storageInterface->getCollectionCount(
@@ -206,8 +199,7 @@ TEST_F(ReplicationProcessTest,
auto opCtx = makeOpCtx();
ReplicationProcess replicationProcess(
_storageInterface.get(),
- stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()),
- stdx::make_unique<ReplicationRecoveryMock>());
+ stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()));
ASSERT_EQUALS(ErrorCodes::IllegalOperation,
replicationProcess.setRollbackProgress(opCtx.get(), applyUntil));
}
@@ -216,8 +208,7 @@ TEST_F(ReplicationProcessTest, ClearRollbackProgressReturnsSuccessIfCollectionDo
auto opCtx = makeOpCtx();
ReplicationProcess replicationProcess(
_storageInterface.get(),
- stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()),
- stdx::make_unique<ReplicationRecoveryMock>());
+ stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()));
ASSERT_OK(replicationProcess.clearRollbackProgress(opCtx.get()));
}
@@ -229,8 +220,7 @@ TEST_F(ReplicationProcessTest,
auto opCtx = makeOpCtx();
ReplicationProcess replicationProcess(
_storageInterface.get(),
- stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()),
- stdx::make_unique<ReplicationRecoveryMock>());
+ stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()));
ASSERT_EQUALS(ErrorCodes::IllegalOperation,
replicationProcess.clearRollbackProgress(opCtx.get()));
}
diff --git a/src/mongo/db/repl/replication_recovery.cpp b/src/mongo/db/repl/replication_recovery.cpp
deleted file mode 100644
index 214e0c3cf7d..00000000000
--- a/src/mongo/db/repl/replication_recovery.cpp
+++ /dev/null
@@ -1,200 +0,0 @@
-/**
- * Copyright (C) 2017 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/repl/replication_recovery.h"
-
-#include "mongo/db/db_raii.h"
-#include "mongo/db/dbdirectclient.h"
-#include "mongo/db/namespace_string.h"
-#include "mongo/db/repl/replication_consistency_markers_impl.h"
-#include "mongo/db/repl/storage_interface.h"
-#include "mongo/db/repl/sync_tail.h"
-#include "mongo/util/log.h"
-
-namespace mongo {
-namespace repl {
-
-ReplicationRecoveryImpl::ReplicationRecoveryImpl(StorageInterface* storageInterface,
- ReplicationConsistencyMarkers* consistencyMarkers)
- : _storageInterface(storageInterface), _consistencyMarkers(consistencyMarkers) {}
-
-void ReplicationRecoveryImpl::recoverFromOplog(OperationContext* opCtx) {
- if (_consistencyMarkers->getInitialSyncFlag(opCtx)) {
- log() << "No recovery needed. Initial sync flag set.";
- return; // Initial Sync will take over so no cleanup is needed.
- }
-
- const auto truncateAfterPoint = _consistencyMarkers->getOplogTruncateAfterPoint(opCtx);
- const auto appliedThrough = _consistencyMarkers->getAppliedThrough(opCtx);
-
- const bool needToDeleteEndOfOplog = !truncateAfterPoint.isNull() &&
- // This version should never have a non-null truncateAfterPoint with a null appliedThrough.
- // This scenario means that we downgraded after unclean shutdown, then the downgraded node
- // deleted the ragged end of our oplog, then did a clean shutdown.
- !appliedThrough.isNull() &&
- // Similarly we should never have an appliedThrough higher than the truncateAfterPoint. This
- // means that the downgraded node deleted our ragged end then applied ahead of our
- // truncateAfterPoint and then had an unclean shutdown before upgrading. We are ok with
- // applying these ops because older versions wrote to the oplog from a single thread so we
- // know they are in order.
- !(appliedThrough.getTimestamp() >= truncateAfterPoint);
- if (needToDeleteEndOfOplog) {
- log() << "Removing unapplied entries starting at: " << truncateAfterPoint.toBSON();
- _truncateOplogTo(opCtx, truncateAfterPoint);
- }
- _consistencyMarkers->setOplogTruncateAfterPoint(opCtx, {}); // clear the truncateAfterPoint
-
- if (appliedThrough.isNull()) {
- log() << "No oplog entries to apply for recovery. appliedThrough is null.";
- // No follow-up work to do.
- return;
- }
-
- // Check if we have any unapplied ops in our oplog. It is important that this is done after
- // deleting the ragged end of the oplog.
- const auto topOfOplog = fassertStatusOK(40290, _getLastAppliedOpTime(opCtx));
- if (appliedThrough == topOfOplog) {
- log()
- << "No oplog entries to apply for recovery. appliedThrough is at the top of the oplog.";
- return; // We've applied all the valid oplog we have.
- } else if (appliedThrough > topOfOplog) {
- severe() << "Applied op " << appliedThrough << " not found. Top of oplog is " << topOfOplog
- << '.';
- fassertFailedNoTrace(40313);
- }
-
- log() << "Replaying stored operations from " << appliedThrough << " (exclusive) to "
- << topOfOplog << " (inclusive).";
-
- DBDirectClient db(opCtx);
- auto cursor = db.query(NamespaceString::kRsOplogNamespace.ns(),
- QUERY("ts" << BSON("$gte" << appliedThrough.getTimestamp())),
- /*batchSize*/ 0,
- /*skip*/ 0,
- /*projection*/ nullptr,
- QueryOption_OplogReplay);
-
- // Check that the first document matches our appliedThrough point then skip it since it's
- // already been applied.
- if (!cursor->more()) {
- // This should really be impossible because we check above that the top of the oplog is
- // strictly > appliedThrough. If this fails it represents a serious bug in either the
- // storage engine or query's implementation of OplogReplay.
- severe() << "Couldn't find any entries in the oplog >= " << appliedThrough
- << " which should be impossible.";
- fassertFailedNoTrace(40293);
- }
- auto firstOpTimeFound = fassertStatusOK(40291, OpTime::parseFromOplogEntry(cursor->nextSafe()));
- if (firstOpTimeFound != appliedThrough) {
- severe() << "Oplog entry at " << appliedThrough << " is missing; actual entry found is "
- << firstOpTimeFound;
- fassertFailedNoTrace(40292);
- }
-
- // Apply remaining ops one at at time, but don't log them because they are already logged.
- UnreplicatedWritesBlock uwb(opCtx);
-
- while (cursor->more()) {
- auto entry = cursor->nextSafe();
- fassertStatusOK(40294, SyncTail::syncApply(opCtx, entry, true));
- _consistencyMarkers->setAppliedThrough(
- opCtx, fassertStatusOK(40295, OpTime::parseFromOplogEntry(entry)));
- }
-}
-
-StatusWith<OpTime> ReplicationRecoveryImpl::_getLastAppliedOpTime(OperationContext* opCtx) const {
- const auto docsSW = _storageInterface->findDocuments(opCtx,
- NamespaceString::kRsOplogNamespace,
- boost::none, // Collection scan
- StorageInterface::ScanDirection::kBackward,
- {},
- BoundInclusion::kIncludeStartKeyOnly,
- 1U);
- if (!docsSW.isOK()) {
- return docsSW.getStatus();
- }
- const auto docs = docsSW.getValue();
- invariant(1U == docs.size());
-
- return OpTime::parseFromOplogEntry(docs.front());
-}
-
-void ReplicationRecoveryImpl::_truncateOplogTo(OperationContext* opCtx,
- Timestamp truncateTimestamp) {
- const NamespaceString oplogNss(NamespaceString::kRsOplogNamespace);
- AutoGetDb autoDb(opCtx, oplogNss.db(), MODE_IX);
- Lock::CollectionLock oplogCollectionLoc(opCtx->lockState(), oplogNss.ns(), MODE_X);
- Collection* oplogCollection = autoDb.getDb()->getCollection(opCtx, oplogNss);
- if (!oplogCollection) {
- fassertFailedWithStatusNoTrace(
- 34418,
- Status(ErrorCodes::NamespaceNotFound,
- str::stream() << "Can't find " << NamespaceString::kRsOplogNamespace.ns()));
- }
-
- // Scan through oplog in reverse, from latest entry to first, to find the truncateTimestamp.
- RecordId oldestIDToDelete; // Non-null if there is something to delete.
- auto oplogRs = oplogCollection->getRecordStore();
- auto oplogReverseCursor = oplogRs->getCursor(opCtx, /*forward=*/false);
- size_t count = 0;
- while (auto next = oplogReverseCursor->next()) {
- const BSONObj entry = next->data.releaseToBson();
- const RecordId id = next->id;
- count++;
-
- const auto tsElem = entry["ts"];
- if (count == 1) {
- if (tsElem.eoo())
- LOG(2) << "Oplog tail entry: " << redact(entry);
- else
- LOG(2) << "Oplog tail entry ts field: " << tsElem;
- }
-
- if (tsElem.timestamp() < truncateTimestamp) {
- // If count == 1, that means that we have nothing to delete because everything in the
- // oplog is < truncateTimestamp.
- if (count != 1) {
- invariant(!oldestIDToDelete.isNull());
- oplogCollection->cappedTruncateAfter(opCtx, oldestIDToDelete, /*inclusive=*/true);
- }
- return;
- }
-
- oldestIDToDelete = id;
- }
-
- severe() << "Reached end of oplog looking for oplog entry before " << truncateTimestamp.toBSON()
- << " but couldn't find any after looking through " << count << " entries.";
- fassertFailedNoTrace(40296);
-}
-
-} // namespace repl
-} // namespace mongo
diff --git a/src/mongo/db/repl/replication_recovery.h b/src/mongo/db/repl/replication_recovery.h
deleted file mode 100644
index 77dbcf404e8..00000000000
--- a/src/mongo/db/repl/replication_recovery.h
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
-* Copyright (C) 2017 MongoDB Inc.
-*
-* This program is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License, version 3,
-* as published by the Free Software Foundation.
-*
-* This program is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see <http://www.gnu.org/licenses/>.
-*
-* As a special exception, the copyright holders give permission to link the
-* code of portions of this program with the OpenSSL library under certain
-* conditions as described in each individual source file and distribute
-* linked combinations including the program with the OpenSSL library. You
-* must comply with the GNU Affero General Public License in all respects for
-* all of the code used other than as permitted herein. If you modify file(s)
-* with this exception, you may extend this exception to your version of the
-* file(s), but you are not obligated to do so. If you do not wish to do so,
-* delete this exception statement from your version. If you delete this
-* exception statement from all source files in the program, then also delete
-* it in the license file.
-*/
-
-#pragma once
-
-#include "mongo/base/disallow_copying.h"
-#include "mongo/base/status_with.h"
-#include "mongo/db/repl/optime.h"
-
-namespace mongo {
-
-class OperationContext;
-
-namespace repl {
-
-class StorageInterface;
-class ReplicationConsistencyMarkers;
-
-/**
- * This class is used by the replication system to recover after an unclean shutdown or a rollback.
- */
-class ReplicationRecovery {
-public:
- ReplicationRecovery() = default;
- virtual ~ReplicationRecovery() = default;
-
- /**
- * Recovers the data on disk from the oplog.
- */
- virtual void recoverFromOplog(OperationContext* opCtx) = 0;
-};
-
-class ReplicationRecoveryImpl : public ReplicationRecovery {
- MONGO_DISALLOW_COPYING(ReplicationRecoveryImpl);
-
-public:
- ReplicationRecoveryImpl(StorageInterface* storageInterface,
- ReplicationConsistencyMarkers* consistencyMarkers);
-
- void recoverFromOplog(OperationContext* opCtx) override;
-
-private:
- /**
- * Gets the last applied OpTime from the end of the oplog.
- */
- StatusWith<OpTime> _getLastAppliedOpTime(OperationContext* opCtx) const;
-
- /**
- * Truncates the oplog after and including the "truncateTimestamp" entry.
- */
- void _truncateOplogTo(OperationContext* opCtx, Timestamp truncateTimestamp);
-
- StorageInterface* _storageInterface;
- ReplicationConsistencyMarkers* _consistencyMarkers;
-};
-
-} // namespace repl
-} // namespace mongo
diff --git a/src/mongo/db/repl/replication_recovery_mock.h b/src/mongo/db/repl/replication_recovery_mock.h
deleted file mode 100644
index 220030ab0b5..00000000000
--- a/src/mongo/db/repl/replication_recovery_mock.h
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
-* Copyright (C) 2017 MongoDB Inc.
-*
-* This program is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License, version 3,
-* as published by the Free Software Foundation.
-*
-* This program is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see <http://www.gnu.org/licenses/>.
-*
-* As a special exception, the copyright holders give permission to link the
-* code of portions of this program with the OpenSSL library under certain
-* conditions as described in each individual source file and distribute
-* linked combinations including the program with the OpenSSL library. You
-* must comply with the GNU Affero General Public License in all respects for
-* all of the code used other than as permitted herein. If you modify file(s)
-* with this exception, you may extend this exception to your version of the
-* file(s), but you are not obligated to do so. If you do not wish to do so,
-* delete this exception statement from your version. If you delete this
-* exception statement from all source files in the program, then also delete
-* it in the license file.
-*/
-
-#pragma once
-
-#include "mongo/base/disallow_copying.h"
-#include "mongo/db/repl/replication_recovery.h"
-
-namespace mongo {
-class OperationContext;
-namespace repl {
-
-class ReplicationRecoveryMock : public ReplicationRecovery {
- MONGO_DISALLOW_COPYING(ReplicationRecoveryMock);
-
-public:
- ReplicationRecoveryMock() = default;
-
- void recoverFromOplog(OperationContext* opCtx) override {}
-};
-
-} // namespace repl
-} // namespace mongo
diff --git a/src/mongo/db/repl/replication_recovery_test.cpp b/src/mongo/db/repl/replication_recovery_test.cpp
deleted file mode 100644
index 5e77881bb8a..00000000000
--- a/src/mongo/db/repl/replication_recovery_test.cpp
+++ /dev/null
@@ -1,350 +0,0 @@
-/**
- * Copyright 2017 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/db/repl/replication_recovery.h"
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/client.h"
-#include "mongo/db/namespace_string.h"
-#include "mongo/db/operation_context.h"
-#include "mongo/db/repl/oplog_interface_local.h"
-#include "mongo/db/repl/replication_consistency_markers_mock.h"
-#include "mongo/db/repl/replication_coordinator_mock.h"
-#include "mongo/db/repl/storage_interface_impl.h"
-#include "mongo/db/service_context_d_test_fixture.h"
-#include "mongo/stdx/memory.h"
-#include "mongo/unittest/death_test.h"
-#include "mongo/unittest/unittest.h"
-#include "mongo/util/assert_util.h"
-#include "mongo/util/mongoutils/str.h"
-
-namespace {
-
-using namespace mongo;
-using namespace mongo::repl;
-
-const auto& oplogNs = NamespaceString::kRsOplogNamespace;
-const NamespaceString testNs("a.a");
-
-class ReplicationRecoveryTest : public ServiceContextMongoDTest {
-protected:
- OperationContext* getOperationContext() {
- return _opCtx.get();
- }
-
- StorageInterface* getStorageInterface() {
- return _storageInterface.get();
- }
-
- ReplicationConsistencyMarkers* getConsistencyMarkers() {
- return _consistencyMarkers.get();
- }
-
-private:
- void setUp() override {
- ServiceContextMongoDTest::setUp();
- _createOpCtx();
- _storageInterface = stdx::make_unique<StorageInterfaceImpl>();
- _consistencyMarkers = stdx::make_unique<ReplicationConsistencyMarkersMock>();
-
- auto service = getServiceContext();
- ReplicationCoordinator::set(service,
- stdx::make_unique<ReplicationCoordinatorMock>(service));
-
- ASSERT_OK(_storageInterface->createCollection(
- getOperationContext(), testNs, CollectionOptions()));
- }
-
- void tearDown() override {
- _opCtx.reset(nullptr);
- _consistencyMarkers.reset();
- _storageInterface.reset();
- ServiceContextMongoDTest::tearDown();
- }
-
- void _createOpCtx() {
- _opCtx = cc().makeOperationContext();
- }
-
- ServiceContext::UniqueOperationContext _opCtx;
- std::unique_ptr<StorageInterfaceImpl> _storageInterface;
- std::unique_ptr<ReplicationConsistencyMarkersMock> _consistencyMarkers;
-};
-
-/**
- * Generates a document to be inserted into the test collection.
- */
-BSONObj _makeInsertDocument(int t) {
- return BSON("_id" << t << "a" << t);
-}
-
-/**
- * Generates oplog entries with the given number used for the timestamp.
- */
-BSONObj _makeOplogEntry(int t) {
- return BSON("ts" << Timestamp(t, t) << "h" << t << "ns" << testNs.ns() << "v" << 2 << "op"
- << "i"
- << "o"
- << _makeInsertDocument(t));
-}
-
-/**
- * Creates collection options suitable for oplog.
- */
-CollectionOptions _createOplogCollectionOptions() {
- CollectionOptions options;
- options.capped = true;
- options.cappedSize = 64 * 1024 * 1024LL;
- options.autoIndexId = CollectionOptions::NO;
- return options;
-}
-
-/**
- * Creates an oplog with insert entries at the given timestamps.
- */
-void _setUpOplog(OperationContext* opCtx, StorageInterface* storage, std::vector<int> timestamps) {
- ASSERT_OK(storage->createCollection(opCtx, oplogNs, _createOplogCollectionOptions()));
-
- for (int ts : timestamps) {
- ASSERT_OK(storage->insertDocument(opCtx, oplogNs, _makeOplogEntry(ts)));
- }
-}
-
-/**
- * Check collection contents. OplogInterface returns documents in reverse natural order.
- */
-void _assertDocumentsInCollectionEquals(OperationContext* opCtx,
- const NamespaceString& nss,
- const std::vector<BSONObj>& docs) {
- std::vector<BSONObj> reversedDocs(docs);
- std::reverse(reversedDocs.begin(), reversedDocs.end());
- OplogInterfaceLocal oplog(opCtx, nss.ns());
- auto iter = oplog.makeIterator();
- for (const auto& doc : reversedDocs) {
- ASSERT_BSONOBJ_EQ(doc, unittest::assertGet(iter->next()).first);
- }
- ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus());
-}
-
-/**
- * Asserts that the documents in the oplog have the given timestamps.
- */
-void _assertDocsInOplog(OperationContext* opCtx, std::vector<int> timestamps) {
- std::vector<BSONObj> expectedOplog(timestamps.size());
- std::transform(timestamps.begin(), timestamps.end(), expectedOplog.begin(), [](int ts) {
- return _makeOplogEntry(ts);
- });
- _assertDocumentsInCollectionEquals(opCtx, oplogNs, expectedOplog);
-}
-
-/**
- * Asserts that the documents in the test collection have the given ids.
- */
-void _assertDocsInTestCollection(OperationContext* opCtx, std::vector<int> ids) {
- std::vector<BSONObj> expectedColl(ids.size());
- std::transform(ids.begin(), ids.end(), expectedColl.begin(), [](int id) {
- return _makeInsertDocument(id);
- });
- _assertDocumentsInCollectionEquals(opCtx, testNs, expectedColl);
-}
-
-TEST_F(ReplicationRecoveryTest, RecoveryWithEmptyOplogSucceeds) {
- ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers());
- auto opCtx = getOperationContext();
-
- _setUpOplog(opCtx, getStorageInterface(), {});
-
- recovery.recoverFromOplog(opCtx);
-
- _assertDocsInOplog(opCtx, {});
- _assertDocsInTestCollection(opCtx, {});
-}
-
-DEATH_TEST_F(ReplicationRecoveryTest,
- RecoveryWithEmptyOplogAndNonNullAppliedThroughInvariants,
- "Invariant failure 1U == docs.size()") {
- ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers());
- auto opCtx = getOperationContext();
-
- _setUpOplog(opCtx, getStorageInterface(), {});
-
- getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(3, 3), 1));
- recovery.recoverFromOplog(opCtx);
-
- _assertDocsInOplog(opCtx, {});
- _assertDocsInTestCollection(opCtx, {});
-}
-
-DEATH_TEST_F(ReplicationRecoveryTest,
- TruncateFassertsWithoutOplogCollection,
- "Fatal assertion 34418 NamespaceNotFound: Can't find local.oplog.rs") {
- ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers());
- auto opCtx = getOperationContext();
-
- getConsistencyMarkers()->setOplogTruncateAfterPoint(opCtx, Timestamp(4, 4));
- getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(3, 3), 1));
-
- // Create the database.
- ASSERT_OK(getStorageInterface()->createCollection(
- opCtx, NamespaceString("local.other"), CollectionOptions()));
-
- recovery.recoverFromOplog(opCtx);
-}
-
-DEATH_TEST_F(ReplicationRecoveryTest,
- TruncateEntireOplogFasserts,
- "Reached end of oplog looking for oplog entry before { : Timestamp 4000|4 } but "
- "couldn't find any after looking through 3 entries.") {
- ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers());
- auto opCtx = getOperationContext();
-
- getConsistencyMarkers()->setOplogTruncateAfterPoint(opCtx, Timestamp(4, 4));
- getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(3, 3), 1));
- _setUpOplog(opCtx, getStorageInterface(), {7, 8, 9});
-
- recovery.recoverFromOplog(opCtx);
-}
-
-TEST_F(ReplicationRecoveryTest, RecoveryTruncatesOplogAtOplogTruncateAfterPoint) {
- ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers());
- auto opCtx = getOperationContext();
-
- getConsistencyMarkers()->setOplogTruncateAfterPoint(opCtx, Timestamp(4, 4));
- getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(3, 3), 1));
- _setUpOplog(opCtx, getStorageInterface(), {1, 2, 3, 4, 5});
-
- recovery.recoverFromOplog(opCtx);
-
- _assertDocsInOplog(opCtx, {1, 2, 3});
- _assertDocsInTestCollection(opCtx, {});
- ASSERT_EQ(getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx), Timestamp());
- ASSERT_EQ(getConsistencyMarkers()->getAppliedThrough(opCtx), OpTime(Timestamp(3, 3), 1));
-}
-
-TEST_F(ReplicationRecoveryTest, RecoverySkipsEverythingIfInitialSyncFlagIsSet) {
- ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers());
- auto opCtx = getOperationContext();
-
- getConsistencyMarkers()->setInitialSyncFlag(opCtx);
- getConsistencyMarkers()->setOplogTruncateAfterPoint(opCtx, Timestamp(4, 4));
- getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(1, 1), 1));
- _setUpOplog(opCtx, getStorageInterface(), {1, 2, 3, 4, 5});
-
- recovery.recoverFromOplog(opCtx);
-
- _assertDocsInOplog(opCtx, {1, 2, 3, 4, 5});
- _assertDocsInTestCollection(opCtx, {});
- ASSERT_EQ(getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx), Timestamp(4, 4));
- ASSERT_EQ(getConsistencyMarkers()->getAppliedThrough(opCtx), OpTime(Timestamp(1, 1), 1));
-}
-
-TEST_F(ReplicationRecoveryTest, RecoveryResetsOplogTruncateAfterPointWhenAppliedThroughIsNull) {
- ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers());
- auto opCtx = getOperationContext();
-
- getConsistencyMarkers()->setOplogTruncateAfterPoint(opCtx, Timestamp(4, 4));
- getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime());
- _setUpOplog(opCtx, getStorageInterface(), {1, 2, 3, 4, 5});
-
- recovery.recoverFromOplog(opCtx);
-
- _assertDocsInOplog(opCtx, {1, 2, 3, 4, 5});
- _assertDocsInTestCollection(opCtx, {});
- ASSERT_EQ(getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx), Timestamp());
- ASSERT_EQ(getConsistencyMarkers()->getAppliedThrough(opCtx), OpTime());
-}
-
-TEST_F(ReplicationRecoveryTest, RecoveryAppliesDocumentsWhenAppliedThroughIsBehind) {
- ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers());
- auto opCtx = getOperationContext();
-
- getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(3, 3), 1));
- _setUpOplog(opCtx, getStorageInterface(), {1, 2, 3, 4, 5});
-
- recovery.recoverFromOplog(opCtx);
-
- _assertDocsInOplog(opCtx, {1, 2, 3, 4, 5});
- _assertDocsInTestCollection(opCtx, {4, 5});
- ASSERT_EQ(getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx), Timestamp());
- ASSERT_EQ(getConsistencyMarkers()->getAppliedThrough(opCtx), OpTime(Timestamp(5, 5), 1));
-}
-
-TEST_F(ReplicationRecoveryTest, RecoveryAppliesDocumentsWhenAppliedThroughIsBehindAfterTruncation) {
- ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers());
- auto opCtx = getOperationContext();
-
- getConsistencyMarkers()->setOplogTruncateAfterPoint(opCtx, Timestamp(4, 4));
- getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(1, 1), 1));
- _setUpOplog(opCtx, getStorageInterface(), {1, 2, 3, 4, 5});
-
- recovery.recoverFromOplog(opCtx);
-
- _assertDocsInOplog(opCtx, {1, 2, 3});
- _assertDocsInTestCollection(opCtx, {2, 3});
- ASSERT_EQ(getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx), Timestamp());
- ASSERT_EQ(getConsistencyMarkers()->getAppliedThrough(opCtx), OpTime(Timestamp(3, 3), 1));
-}
-
-DEATH_TEST_F(ReplicationRecoveryTest, AppliedThroughBehindOplogFasserts, "Fatal Assertion 40293") {
- ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers());
- auto opCtx = getOperationContext();
-
- getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(1, 1), 1));
- _setUpOplog(opCtx, getStorageInterface(), {3, 4, 5});
-
- recovery.recoverFromOplog(opCtx);
-}
-
-DEATH_TEST_F(ReplicationRecoveryTest,
- AppliedThroughAheadOfTopOfOplogCausesFassert,
- "Applied op { ts: Timestamp 9000|9, t: 1 } not found. Top of oplog is { ts: Timestamp "
- "5000|5, t: -1 }.") {
- ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers());
- auto opCtx = getOperationContext();
-
- getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(9, 9), 1));
- _setUpOplog(opCtx, getStorageInterface(), {1, 2, 3, 4, 5});
-
- recovery.recoverFromOplog(opCtx);
-}
-
-DEATH_TEST_F(ReplicationRecoveryTest,
- AppliedThroughNotInOplogCausesFassert,
- "Oplog entry at { ts: Timestamp 3000|3, t: 1 } is missing; actual entry found is { "
- "ts: Timestamp 4000|4, t: -1 }") {
- ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers());
- auto opCtx = getOperationContext();
-
- getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(3, 3), 1));
- _setUpOplog(opCtx, getStorageInterface(), {1, 2, 4, 5});
-
- recovery.recoverFromOplog(opCtx);
-}
-
-} // namespace
diff --git a/src/mongo/db/repl/rollback_test_fixture.cpp b/src/mongo/db/repl/rollback_test_fixture.cpp
index 4bfd76b96f4..89d52be56e2 100644
--- a/src/mongo/db/repl/rollback_test_fixture.cpp
+++ b/src/mongo/db/repl/rollback_test_fixture.cpp
@@ -38,7 +38,6 @@
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/repl/replication_process.h"
-#include "mongo/db/repl/replication_recovery_mock.h"
#include "mongo/db/session_catalog.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/mongoutils/str.h"
@@ -64,9 +63,7 @@ void RollbackTest::setUp() {
_serviceContextMongoDTest.setUp();
auto serviceContext = _serviceContextMongoDTest.getServiceContext();
_replicationProcess = stdx::make_unique<ReplicationProcess>(
- &_storageInterface,
- stdx::make_unique<ReplicationConsistencyMarkersMock>(),
- stdx::make_unique<ReplicationRecoveryMock>());
+ &_storageInterface, stdx::make_unique<ReplicationConsistencyMarkersMock>());
_dropPendingCollectionReaper = new DropPendingCollectionReaper(&_storageInterface);
DropPendingCollectionReaper::set(
serviceContext, std::unique_ptr<DropPendingCollectionReaper>(_dropPendingCollectionReaper));
diff --git a/src/mongo/db/repl/sync_tail_test_fixture.cpp b/src/mongo/db/repl/sync_tail_test_fixture.cpp
index c9998c89660..a67817924ba 100644
--- a/src/mongo/db/repl/sync_tail_test_fixture.cpp
+++ b/src/mongo/db/repl/sync_tail_test_fixture.cpp
@@ -36,7 +36,6 @@
#include "mongo/db/repl/replication_consistency_markers_mock.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/repl/replication_process.h"
-#include "mongo/db/repl/replication_recovery_mock.h"
#include "mongo/db/repl/storage_interface_mock.h"
namespace mongo {
@@ -57,10 +56,8 @@ void SyncTailTest::setUp() {
DropPendingCollectionReaper::set(
service, stdx::make_unique<DropPendingCollectionReaper>(_storageInterface));
- _replicationProcess =
- new ReplicationProcess(_storageInterface,
- stdx::make_unique<ReplicationConsistencyMarkersMock>(),
- stdx::make_unique<ReplicationRecoveryMock>());
+ _replicationProcess = new ReplicationProcess(
+ _storageInterface, stdx::make_unique<ReplicationConsistencyMarkersMock>());
ReplicationProcess::set(cc().getServiceContext(),
std::unique_ptr<ReplicationProcess>(_replicationProcess));
diff --git a/src/mongo/dbtests/replica_set_tests.cpp b/src/mongo/dbtests/replica_set_tests.cpp
index c6453e49e2e..e70a221fa57 100644
--- a/src/mongo/dbtests/replica_set_tests.cpp
+++ b/src/mongo/dbtests/replica_set_tests.cpp
@@ -35,7 +35,6 @@
#include "mongo/db/repl/replication_consistency_markers_impl.h"
#include "mongo/db/repl/replication_coordinator_external_state_impl.h"
#include "mongo/db/repl/replication_process.h"
-#include "mongo/db/repl/replication_recovery.h"
#include "mongo/db/repl/storage_interface_mock.h"
#include "mongo/db/service_context.h"
#include "mongo/unittest/unittest.h"
@@ -54,12 +53,9 @@ protected:
_storageInterface = stdx::make_unique<repl::StorageInterfaceMock>();
_dropPendingCollectionReaper =
stdx::make_unique<repl::DropPendingCollectionReaper>(_storageInterface.get());
- auto consistencyMarkers =
- stdx::make_unique<repl::ReplicationConsistencyMarkersImpl>(_storageInterface.get());
- auto recovery = stdx::make_unique<repl::ReplicationRecoveryImpl>(_storageInterface.get(),
- consistencyMarkers.get());
_replicationProcess = stdx::make_unique<repl::ReplicationProcess>(
- _storageInterface.get(), std::move(consistencyMarkers), std::move(recovery));
+ _storageInterface.get(),
+ stdx::make_unique<repl::ReplicationConsistencyMarkersImpl>(_storageInterface.get()));
_replCoordExternalState = stdx::make_unique<repl::ReplicationCoordinatorExternalStateImpl>(
opCtx->getServiceContext(),
_dropPendingCollectionReaper.get(),
diff --git a/src/mongo/s/sharding_mongod_test_fixture.cpp b/src/mongo/s/sharding_mongod_test_fixture.cpp
index 453951a1735..3a9fa023b2a 100644
--- a/src/mongo/s/sharding_mongod_test_fixture.cpp
+++ b/src/mongo/s/sharding_mongod_test_fixture.cpp
@@ -51,7 +51,6 @@
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/repl/replication_process.h"
-#include "mongo/db/repl/replication_recovery_mock.h"
#include "mongo/db/repl/storage_interface_mock.h"
#include "mongo/db/service_context_noop.h"
#include "mongo/executor/network_interface_mock.h"
@@ -131,11 +130,10 @@ void ShardingMongodTestFixture::setUp() {
repl::DropPendingCollectionReaper::set(
service, stdx::make_unique<repl::DropPendingCollectionReaper>(storagePtr.get()));
- repl::ReplicationProcess::set(service,
- stdx::make_unique<repl::ReplicationProcess>(
- storagePtr.get(),
- stdx::make_unique<repl::ReplicationConsistencyMarkersMock>(),
- stdx::make_unique<repl::ReplicationRecoveryMock>()));
+ repl::ReplicationProcess::set(
+ service,
+ stdx::make_unique<repl::ReplicationProcess>(
+ storagePtr.get(), stdx::make_unique<repl::ReplicationConsistencyMarkersMock>()));
repl::ReplicationProcess::get(_opCtx.get())
->initializeRollbackID(_opCtx.get())
.transitional_ignore();