diff options
author | Mathias Stearn <mathias@10gen.com> | 2017-08-07 20:18:33 -0400 |
---|---|---|
committer | Mathias Stearn <mathias@10gen.com> | 2017-08-07 20:18:33 -0400 |
commit | fd62ac35e27f83155cbe3d60bf02c49f45298e54 (patch) | |
tree | 09368db6e56db643e06c0ee97c65b07dc753f9c0 | |
parent | 9255b0a684c6c9ca35da96493b91f04b832dc792 (diff) | |
download | mongo-fd62ac35e27f83155cbe3d60bf02c49f45298e54.tar.gz |
Revert "SERVER-29893 Rename recovery code and make it accessible to both startup and rollback"
This reverts commit 8fa770baf8fac6e71a45f84b48eeb3bae96a8dab.
23 files changed, 177 insertions, 784 deletions
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index 27eb07bdb53..7f7c49adfc4 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -93,7 +93,6 @@ #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/repl/replication_coordinator_impl.h" #include "mongo/db/repl/replication_process.h" -#include "mongo/db/repl/replication_recovery.h" #include "mongo/db/repl/storage_interface_impl.h" #include "mongo/db/repl/topology_coordinator_impl.h" #include "mongo/db/s/balancer/balancer.h" @@ -916,14 +915,11 @@ MONGO_INITIALIZER_WITH_PREREQUISITES(CreateReplicationManager, repl::StorageInterface::set(serviceContext, stdx::make_unique<repl::StorageInterfaceImpl>()); auto storageInterface = repl::StorageInterface::get(serviceContext); - auto consistencyMarkers = - stdx::make_unique<repl::ReplicationConsistencyMarkersImpl>(storageInterface); - auto recovery = stdx::make_unique<repl::ReplicationRecoveryImpl>(storageInterface, - consistencyMarkers.get()); repl::ReplicationProcess::set( serviceContext, stdx::make_unique<repl::ReplicationProcess>( - storageInterface, std::move(consistencyMarkers), std::move(recovery))); + storageInterface, + stdx::make_unique<repl::ReplicationConsistencyMarkersImpl>(storageInterface))); auto replicationProcess = repl::ReplicationProcess::get(serviceContext); repl::DropPendingCollectionReaper::set( diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 33afbd008ee..0c841ed5e45 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -223,34 +223,6 @@ env.CppUnitTest( ) env.Library( - target='replication_recovery', - source=[ - 'replication_recovery.cpp', - ], - LIBDEPS=[ - ], - LIBDEPS_PRIVATE=[ - 'sync_tail', - '$BUILD_DIR/mongo/base', - ], -) - -env.CppUnitTest( - target='replication_recovery_test', - source=[ - 'replication_recovery_test.cpp', - ], - LIBDEPS=[ - ], - LIBDEPS_PRIVATE=[ - 'replmocks', - 'replication_recovery', - 'storage_interface_impl', - '$BUILD_DIR/mongo/db/service_context_d_test_fixture', - ], -) - -env.Library( target='replication_process', source=[ 'replication_consistency_markers.cpp', @@ -1555,7 +1527,6 @@ env.Library( 'repl_settings', 'replication_consistency_markers_impl', 'replication_process', - 'replication_recovery', 'rollback_source_impl', 'sync_tail', ], diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp index 1af4ecd3264..2a402f2ff06 100644 --- a/src/mongo/db/repl/initial_syncer_test.cpp +++ b/src/mongo/db/repl/initial_syncer_test.cpp @@ -45,7 +45,6 @@ #include "mongo/db/repl/optime.h" #include "mongo/db/repl/replication_consistency_markers_mock.h" #include "mongo/db/repl/replication_process.h" -#include "mongo/db/repl/replication_recovery_mock.h" #include "mongo/db/repl/reporter.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/db/repl/storage_interface_mock.h" @@ -286,9 +285,7 @@ protected: launchExecutorThread(); _replicationProcess = stdx::make_unique<ReplicationProcess>( - _storageInterface.get(), - stdx::make_unique<ReplicationConsistencyMarkersMock>(), - stdx::make_unique<ReplicationRecoveryMock>()); + _storageInterface.get(), stdx::make_unique<ReplicationConsistencyMarkersMock>()); _executorProxy = stdx::make_unique<TaskExecutorMock>(&getExecutor()); diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 52e23d85477..d800549a9d9 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -364,6 +364,56 @@ OplogDocWriter _logOpWriter(OperationContext* opCtx, } } // end anon namespace +// Truncates the oplog after and including the "truncateTimestamp" entry. +void truncateOplogTo(OperationContext* opCtx, Timestamp truncateTimestamp) { + const NamespaceString oplogNss(NamespaceString::kRsOplogNamespace); + AutoGetDb autoDb(opCtx, oplogNss.db(), MODE_IX); + Lock::CollectionLock oplogCollectionLoc(opCtx->lockState(), oplogNss.ns(), MODE_X); + Collection* oplogCollection = autoDb.getDb()->getCollection(opCtx, oplogNss); + if (!oplogCollection) { + fassertFailedWithStatusNoTrace( + 34418, + Status(ErrorCodes::NamespaceNotFound, + str::stream() << "Can't find " << NamespaceString::kRsOplogNamespace.ns())); + } + + // Scan through oplog in reverse, from latest entry to first, to find the truncateTimestamp. + RecordId oldestIDToDelete; // Non-null if there is something to delete. + auto oplogRs = oplogCollection->getRecordStore(); + auto oplogReverseCursor = oplogRs->getCursor(opCtx, /*forward=*/false); + size_t count = 0; + while (auto next = oplogReverseCursor->next()) { + const BSONObj entry = next->data.releaseToBson(); + const RecordId id = next->id; + count++; + + const auto tsElem = entry["ts"]; + if (count == 1) { + if (tsElem.eoo()) + LOG(2) << "Oplog tail entry: " << redact(entry); + else + LOG(2) << "Oplog tail entry ts field: " << tsElem; + } + + if (tsElem.timestamp() < truncateTimestamp) { + // If count == 1, that means that we have nothing to delete because everything in the + // oplog is < truncateTimestamp. + if (count != 1) { + invariant(!oldestIDToDelete.isNull()); + oplogCollection->cappedTruncateAfter(opCtx, oldestIDToDelete, /*inclusive=*/true); + } + return; + } + + oldestIDToDelete = id; + } + + severe() << "Reached end of oplog looking for oplog entry before " + << truncateTimestamp.toStringPretty() + << " but couldn't find any after looking through " << count << " entries."; + fassertFailedNoTrace(40296); +} + /* we write to local.oplog.rs: { ts : ..., h: ..., v: ..., op: ..., etc } ts: an OpTime timestamp diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h index f119534ad7a..f3974984514 100644 --- a/src/mongo/db/repl/oplog.h +++ b/src/mongo/db/repl/oplog.h @@ -50,6 +50,11 @@ namespace repl { class ReplSettings; /** + * Truncates the oplog after, and including, the "truncateTimestamp" entry. + */ +void truncateOplogTo(OperationContext* opCtx, Timestamp truncateTimestamp); + +/** * Create a new capped collection for the oplog if it doesn't yet exist. * If the collection already exists (and isReplSet is false), * set the 'last' Timestamp from the last entry of the oplog collection (side effect!) diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h index 617c92cb7fa..81d217dac06 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state.h +++ b/src/mongo/db/repl/replication_coordinator_external_state.h @@ -204,6 +204,13 @@ public: virtual StatusWith<OpTime> loadLastOpTime(OperationContext* opCtx) = 0; /** + * Cleaning up the oplog, by potentially truncating: + * If we are recovering from a failed batch then minvalid.start though minvalid.end need + * to be removed from the oplog before we can start applying operations. + */ + virtual void cleanUpLastApplyBatch(OperationContext* opCtx) = 0; + + /** * Returns the HostAndPort of the remote client connected to us that initiated the operation * represented by "opCtx". */ diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index cc8edcc6722..5bdfcc46ca2 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -568,6 +568,89 @@ void ReplicationCoordinatorExternalStateImpl::setGlobalTimestamp(ServiceContext* setNewTimestamp(ctx, newTime); } +void ReplicationCoordinatorExternalStateImpl::cleanUpLastApplyBatch(OperationContext* opCtx) { + if (_replicationProcess->getConsistencyMarkers()->getInitialSyncFlag(opCtx)) { + return; // Initial Sync will take over so no cleanup is needed. + } + + const auto truncateAfterPoint = + _replicationProcess->getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx); + const auto appliedThrough = + _replicationProcess->getConsistencyMarkers()->getAppliedThrough(opCtx); + + const bool needToDeleteEndOfOplog = !truncateAfterPoint.isNull() && + // This version should never have a non-null truncateAfterPoint with a null appliedThrough. + // This scenario means that we downgraded after unclean shutdown, then the downgraded node + // deleted the ragged end of our oplog, then did a clean shutdown. + !appliedThrough.isNull() && + // Similarly we should never have an appliedThrough higher than the truncateAfterPoint. This + // means that the downgraded node deleted our ragged end then applied ahead of our + // truncateAfterPoint and then had an unclean shutdown before upgrading. We are ok with + // applying these ops because older versions wrote to the oplog from a single thread so we + // know they are in order. + !(appliedThrough.getTimestamp() >= truncateAfterPoint); + if (needToDeleteEndOfOplog) { + log() << "Removing unapplied entries starting at: " << truncateAfterPoint; + truncateOplogTo(opCtx, truncateAfterPoint); + } + _replicationProcess->getConsistencyMarkers()->setOplogTruncateAfterPoint( + opCtx, {}); // clear the truncateAfterPoint + + if (appliedThrough.isNull()) { + // No follow-up work to do. + return; + } + + // Check if we have any unapplied ops in our oplog. It is important that this is done after + // deleting the ragged end of the oplog. + const auto topOfOplog = fassertStatusOK(40290, loadLastOpTime(opCtx)); + if (appliedThrough == topOfOplog) { + return; // We've applied all the valid oplog we have. + } else if (appliedThrough > topOfOplog) { + severe() << "Applied op " << appliedThrough << " not found. Top of oplog is " << topOfOplog + << '.'; + fassertFailedNoTrace(40313); + } + + log() << "Replaying stored operations from " << appliedThrough << " (exclusive) to " + << topOfOplog << " (inclusive)."; + + DBDirectClient db(opCtx); + auto cursor = db.query(NamespaceString::kRsOplogNamespace.ns(), + QUERY("ts" << BSON("$gte" << appliedThrough.getTimestamp())), + /*batchSize*/ 0, + /*skip*/ 0, + /*projection*/ nullptr, + QueryOption_OplogReplay); + + // Check that the first document matches our appliedThrough point then skip it since it's + // already been applied. + if (!cursor->more()) { + // This should really be impossible because we check above that the top of the oplog is + // strictly > appliedThrough. If this fails it represents a serious bug in either the + // storage engine or query's implementation of OplogReplay. + severe() << "Couldn't find any entries in the oplog >= " << appliedThrough + << " which should be impossible."; + fassertFailedNoTrace(40293); + } + auto firstOpTimeFound = fassertStatusOK(40291, OpTime::parseFromOplogEntry(cursor->nextSafe())); + if (firstOpTimeFound != appliedThrough) { + severe() << "Oplog entry at " << appliedThrough << " is missing; actual entry found is " + << firstOpTimeFound; + fassertFailedNoTrace(40292); + } + + // Apply remaining ops one at at time, but don't log them because they are already logged. + UnreplicatedWritesBlock uwb(opCtx); + + while (cursor->more()) { + auto entry = cursor->nextSafe(); + fassertStatusOK(40294, SyncTail::syncApply(opCtx, entry, true)); + _replicationProcess->getConsistencyMarkers()->setAppliedThrough( + opCtx, fassertStatusOK(40295, OpTime::parseFromOplogEntry(entry))); + } +} + StatusWith<OpTime> ReplicationCoordinatorExternalStateImpl::loadLastOpTime( OperationContext* opCtx) { // TODO: handle WriteConflictExceptions below diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h index 0893bdc16bd..68318f01455 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h @@ -92,6 +92,7 @@ public: virtual Status storeLocalLastVoteDocument(OperationContext* opCtx, const LastVote& lastVote); virtual void setGlobalTimestamp(ServiceContext* service, const Timestamp& newTime); virtual StatusWith<OpTime> loadLastOpTime(OperationContext* opCtx); + virtual void cleanUpLastApplyBatch(OperationContext* opCtx); virtual HostAndPort getClientHostAndPort(const OperationContext* opCtx); virtual void closeConnections(); virtual void killAllUserOperations(OperationContext* opCtx); diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp index 1f544138fcc..524db05c86a 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp @@ -166,6 +166,8 @@ void ReplicationCoordinatorExternalStateMock::setLocalLastVoteDocument( void ReplicationCoordinatorExternalStateMock::setGlobalTimestamp(ServiceContext* service, const Timestamp& newTime) {} +void ReplicationCoordinatorExternalStateMock::cleanUpLastApplyBatch(OperationContext* opCtx) {} + StatusWith<OpTime> ReplicationCoordinatorExternalStateMock::loadLastOpTime( OperationContext* opCtx) { return _lastOpTime; diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h index c18c8c8fc16..5f6143ce329 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h @@ -80,6 +80,7 @@ public: virtual Status storeLocalLastVoteDocument(OperationContext* opCtx, const LastVote& lastVote); virtual void setGlobalTimestamp(ServiceContext* service, const Timestamp& newTime); virtual StatusWith<OpTime> loadLastOpTime(OperationContext* opCtx); + virtual void cleanUpLastApplyBatch(OperationContext* opCtx); virtual void closeConnections(); virtual void killAllUserOperations(OperationContext* opCtx); virtual void shardingOnStepDownHook(); diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 9c513cc73e8..5ca9f6cabf5 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -447,7 +447,7 @@ bool ReplicationCoordinatorImpl::_startLoadLocalConfig(OperationContext* opCtx) } // Read the last op from the oplog after cleaning up any partially applied batches. - _replicationProcess->getReplicationRecovery()->recoverFromOplog(opCtx); + _externalState->cleanUpLastApplyBatch(opCtx); auto lastOpTimeStatus = _externalState->loadLastOpTime(opCtx); // Use a callback here, because _finishLoadLocalConfig calls isself() which requires diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp index ffc5af1de2d..5e8eac78138 100644 --- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp +++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp @@ -42,7 +42,6 @@ #include "mongo/db/repl/replication_coordinator_external_state_mock.h" #include "mongo/db/repl/replication_coordinator_impl.h" #include "mongo/db/repl/replication_process.h" -#include "mongo/db/repl/replication_recovery_mock.h" #include "mongo/db/repl/storage_interface_mock.h" #include "mongo/db/repl/topology_coordinator_impl.h" #include "mongo/executor/network_interface_mock.h" @@ -126,11 +125,10 @@ void ReplCoordTest::init() { StorageInterface::set(service, std::unique_ptr<StorageInterface>(_storageInterface)); ASSERT_TRUE(_storageInterface == StorageInterface::get(service)); - ReplicationProcess::set(service, - stdx::make_unique<ReplicationProcess>( - _storageInterface, - stdx::make_unique<ReplicationConsistencyMarkersMock>(), - stdx::make_unique<ReplicationRecoveryMock>())); + ReplicationProcess::set( + service, + stdx::make_unique<ReplicationProcess>( + _storageInterface, stdx::make_unique<ReplicationConsistencyMarkersMock>())); auto replicationProcess = ReplicationProcess::get(service); // PRNG seed for tests. diff --git a/src/mongo/db/repl/replication_process.cpp b/src/mongo/db/repl/replication_process.cpp index cde7fcdbae9..7e7dc077b11 100644 --- a/src/mongo/db/repl/replication_process.cpp +++ b/src/mongo/db/repl/replication_process.cpp @@ -82,11 +82,9 @@ void ReplicationProcess::set(ServiceContext* service, std::unique_ptr<Replicatio ReplicationProcess::ReplicationProcess( StorageInterface* storageInterface, - std::unique_ptr<ReplicationConsistencyMarkers> consistencyMarkers, - std::unique_ptr<ReplicationRecovery> recovery) + std::unique_ptr<ReplicationConsistencyMarkers> consistencyMarkers) : _storageInterface(storageInterface), _consistencyMarkers(std::move(consistencyMarkers)), - _recovery(std::move(recovery)), _rbid(kUninitializedRollbackId) {} StatusWith<int> ReplicationProcess::getRollbackID(OperationContext* opCtx) { @@ -184,9 +182,5 @@ ReplicationConsistencyMarkers* ReplicationProcess::getConsistencyMarkers() { return _consistencyMarkers.get(); } -ReplicationRecovery* ReplicationProcess::getReplicationRecovery() { - return _recovery.get(); -} - } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/replication_process.h b/src/mongo/db/repl/replication_process.h index 5b6229ac6d0..da4a2886c6a 100644 --- a/src/mongo/db/repl/replication_process.h +++ b/src/mongo/db/repl/replication_process.h @@ -37,7 +37,6 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/replication_consistency_markers.h" -#include "mongo/db/repl/replication_recovery.h" #include "mongo/stdx/mutex.h" namespace mongo { @@ -82,8 +81,7 @@ public: static void set(ServiceContext* service, std::unique_ptr<ReplicationProcess> process); ReplicationProcess(StorageInterface* storageInterface, - std::unique_ptr<ReplicationConsistencyMarkers> consistencyMarkers, - std::unique_ptr<ReplicationRecovery> recovery); + std::unique_ptr<ReplicationConsistencyMarkers> consistencyMarkers); virtual ~ReplicationProcess() = default; /** @@ -136,11 +134,6 @@ public: */ ReplicationConsistencyMarkers* getConsistencyMarkers(); - /** - * Returns an object used to recover from the oplog on startup or rollback. - */ - ReplicationRecovery* getReplicationRecovery(); - private: // All member variables are labeled with one of the following codes indicating the // synchronization rules for accessing them. @@ -158,8 +151,6 @@ private: // Used for operations on documents that maintain replication consistency. std::unique_ptr<ReplicationConsistencyMarkers> _consistencyMarkers; // (S) - std::unique_ptr<ReplicationRecovery> _recovery; // (S) - // Rollback ID. This is a cached copy of the persisted value in the local.system.rollback.id // collection. int _rbid; // (M) diff --git a/src/mongo/db/repl/replication_process_test.cpp b/src/mongo/db/repl/replication_process_test.cpp index 40a51312433..8464b54906f 100644 --- a/src/mongo/db/repl/replication_process_test.cpp +++ b/src/mongo/db/repl/replication_process_test.cpp @@ -37,7 +37,6 @@ #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/repl/replication_process.h" -#include "mongo/db/repl/replication_recovery_mock.h" #include "mongo/db/repl/storage_interface_impl.h" #include "mongo/db/repl/storage_interface_mock.h" #include "mongo/db/service_context.h" @@ -79,8 +78,7 @@ TEST_F(ReplicationProcessTest, ServiceContextDecorator) { ASSERT_FALSE(ReplicationProcess::get(serviceContext)); ReplicationProcess* replicationProcess = new ReplicationProcess( _storageInterface.get(), - stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()), - stdx::make_unique<ReplicationRecoveryMock>()); + stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get())); ReplicationProcess::set(serviceContext, std::unique_ptr<ReplicationProcess>(replicationProcess)); ASSERT_TRUE(replicationProcess == ReplicationProcess::get(serviceContext)); @@ -92,8 +90,7 @@ TEST_F(ReplicationProcessTest, GetRollbackProgressReturnsNoSuchKeyIfDocumentWithIdProgressIsNotFound) { ReplicationProcess replicationProcess( _storageInterface.get(), - stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()), - stdx::make_unique<ReplicationRecoveryMock>()); + stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get())); // Collection is not found. auto opCtx = makeOpCtx(); @@ -129,8 +126,7 @@ TEST_F(ReplicationProcessTest, GetRollbackProgressReturnsBadStatusIfApplyUntilFi ReplicationProcess replicationProcess( _storageInterface.get(), - stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()), - stdx::make_unique<ReplicationRecoveryMock>()); + stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get())); ASSERT_EQUALS(ErrorCodes::TypeMismatch, replicationProcess.getRollbackProgress(opCtx.get())); } @@ -151,8 +147,7 @@ TEST_F(ReplicationProcessTest, ReplicationProcess replicationProcess( _storageInterface.get(), - stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()), - stdx::make_unique<ReplicationRecoveryMock>()); + stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get())); ASSERT_EQUALS(ErrorCodes::TypeMismatch, replicationProcess.getRollbackProgress(opCtx.get())); } @@ -171,8 +166,7 @@ TEST_F(ReplicationProcessTest, ReplicationProcess replicationProcess( _storageInterface.get(), - stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()), - stdx::make_unique<ReplicationRecoveryMock>()); + stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get())); ASSERT_EQUALS(applyUntil, unittest::assertGet(replicationProcess.getRollbackProgress(opCtx.get()))); @@ -187,8 +181,7 @@ TEST_F(ReplicationProcessTest, auto opCtx = makeOpCtx(); ReplicationProcess replicationProcess( _storageInterface.get(), - stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()), - stdx::make_unique<ReplicationRecoveryMock>()); + stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get())); ASSERT_OK(replicationProcess.setRollbackProgress(opCtx.get(), applyUntil)); ASSERT_EQUALS(1U, unittest::assertGet(_storageInterface->getCollectionCount( @@ -206,8 +199,7 @@ TEST_F(ReplicationProcessTest, auto opCtx = makeOpCtx(); ReplicationProcess replicationProcess( _storageInterface.get(), - stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()), - stdx::make_unique<ReplicationRecoveryMock>()); + stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get())); ASSERT_EQUALS(ErrorCodes::IllegalOperation, replicationProcess.setRollbackProgress(opCtx.get(), applyUntil)); } @@ -216,8 +208,7 @@ TEST_F(ReplicationProcessTest, ClearRollbackProgressReturnsSuccessIfCollectionDo auto opCtx = makeOpCtx(); ReplicationProcess replicationProcess( _storageInterface.get(), - stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()), - stdx::make_unique<ReplicationRecoveryMock>()); + stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get())); ASSERT_OK(replicationProcess.clearRollbackProgress(opCtx.get())); } @@ -229,8 +220,7 @@ TEST_F(ReplicationProcessTest, auto opCtx = makeOpCtx(); ReplicationProcess replicationProcess( _storageInterface.get(), - stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()), - stdx::make_unique<ReplicationRecoveryMock>()); + stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get())); ASSERT_EQUALS(ErrorCodes::IllegalOperation, replicationProcess.clearRollbackProgress(opCtx.get())); } diff --git a/src/mongo/db/repl/replication_recovery.cpp b/src/mongo/db/repl/replication_recovery.cpp deleted file mode 100644 index 214e0c3cf7d..00000000000 --- a/src/mongo/db/repl/replication_recovery.cpp +++ /dev/null @@ -1,200 +0,0 @@ -/** - * Copyright (C) 2017 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication - -#include "mongo/platform/basic.h" - -#include "mongo/db/repl/replication_recovery.h" - -#include "mongo/db/db_raii.h" -#include "mongo/db/dbdirectclient.h" -#include "mongo/db/namespace_string.h" -#include "mongo/db/repl/replication_consistency_markers_impl.h" -#include "mongo/db/repl/storage_interface.h" -#include "mongo/db/repl/sync_tail.h" -#include "mongo/util/log.h" - -namespace mongo { -namespace repl { - -ReplicationRecoveryImpl::ReplicationRecoveryImpl(StorageInterface* storageInterface, - ReplicationConsistencyMarkers* consistencyMarkers) - : _storageInterface(storageInterface), _consistencyMarkers(consistencyMarkers) {} - -void ReplicationRecoveryImpl::recoverFromOplog(OperationContext* opCtx) { - if (_consistencyMarkers->getInitialSyncFlag(opCtx)) { - log() << "No recovery needed. Initial sync flag set."; - return; // Initial Sync will take over so no cleanup is needed. - } - - const auto truncateAfterPoint = _consistencyMarkers->getOplogTruncateAfterPoint(opCtx); - const auto appliedThrough = _consistencyMarkers->getAppliedThrough(opCtx); - - const bool needToDeleteEndOfOplog = !truncateAfterPoint.isNull() && - // This version should never have a non-null truncateAfterPoint with a null appliedThrough. - // This scenario means that we downgraded after unclean shutdown, then the downgraded node - // deleted the ragged end of our oplog, then did a clean shutdown. - !appliedThrough.isNull() && - // Similarly we should never have an appliedThrough higher than the truncateAfterPoint. This - // means that the downgraded node deleted our ragged end then applied ahead of our - // truncateAfterPoint and then had an unclean shutdown before upgrading. We are ok with - // applying these ops because older versions wrote to the oplog from a single thread so we - // know they are in order. - !(appliedThrough.getTimestamp() >= truncateAfterPoint); - if (needToDeleteEndOfOplog) { - log() << "Removing unapplied entries starting at: " << truncateAfterPoint.toBSON(); - _truncateOplogTo(opCtx, truncateAfterPoint); - } - _consistencyMarkers->setOplogTruncateAfterPoint(opCtx, {}); // clear the truncateAfterPoint - - if (appliedThrough.isNull()) { - log() << "No oplog entries to apply for recovery. appliedThrough is null."; - // No follow-up work to do. - return; - } - - // Check if we have any unapplied ops in our oplog. It is important that this is done after - // deleting the ragged end of the oplog. - const auto topOfOplog = fassertStatusOK(40290, _getLastAppliedOpTime(opCtx)); - if (appliedThrough == topOfOplog) { - log() - << "No oplog entries to apply for recovery. appliedThrough is at the top of the oplog."; - return; // We've applied all the valid oplog we have. - } else if (appliedThrough > topOfOplog) { - severe() << "Applied op " << appliedThrough << " not found. Top of oplog is " << topOfOplog - << '.'; - fassertFailedNoTrace(40313); - } - - log() << "Replaying stored operations from " << appliedThrough << " (exclusive) to " - << topOfOplog << " (inclusive)."; - - DBDirectClient db(opCtx); - auto cursor = db.query(NamespaceString::kRsOplogNamespace.ns(), - QUERY("ts" << BSON("$gte" << appliedThrough.getTimestamp())), - /*batchSize*/ 0, - /*skip*/ 0, - /*projection*/ nullptr, - QueryOption_OplogReplay); - - // Check that the first document matches our appliedThrough point then skip it since it's - // already been applied. - if (!cursor->more()) { - // This should really be impossible because we check above that the top of the oplog is - // strictly > appliedThrough. If this fails it represents a serious bug in either the - // storage engine or query's implementation of OplogReplay. - severe() << "Couldn't find any entries in the oplog >= " << appliedThrough - << " which should be impossible."; - fassertFailedNoTrace(40293); - } - auto firstOpTimeFound = fassertStatusOK(40291, OpTime::parseFromOplogEntry(cursor->nextSafe())); - if (firstOpTimeFound != appliedThrough) { - severe() << "Oplog entry at " << appliedThrough << " is missing; actual entry found is " - << firstOpTimeFound; - fassertFailedNoTrace(40292); - } - - // Apply remaining ops one at at time, but don't log them because they are already logged. - UnreplicatedWritesBlock uwb(opCtx); - - while (cursor->more()) { - auto entry = cursor->nextSafe(); - fassertStatusOK(40294, SyncTail::syncApply(opCtx, entry, true)); - _consistencyMarkers->setAppliedThrough( - opCtx, fassertStatusOK(40295, OpTime::parseFromOplogEntry(entry))); - } -} - -StatusWith<OpTime> ReplicationRecoveryImpl::_getLastAppliedOpTime(OperationContext* opCtx) const { - const auto docsSW = _storageInterface->findDocuments(opCtx, - NamespaceString::kRsOplogNamespace, - boost::none, // Collection scan - StorageInterface::ScanDirection::kBackward, - {}, - BoundInclusion::kIncludeStartKeyOnly, - 1U); - if (!docsSW.isOK()) { - return docsSW.getStatus(); - } - const auto docs = docsSW.getValue(); - invariant(1U == docs.size()); - - return OpTime::parseFromOplogEntry(docs.front()); -} - -void ReplicationRecoveryImpl::_truncateOplogTo(OperationContext* opCtx, - Timestamp truncateTimestamp) { - const NamespaceString oplogNss(NamespaceString::kRsOplogNamespace); - AutoGetDb autoDb(opCtx, oplogNss.db(), MODE_IX); - Lock::CollectionLock oplogCollectionLoc(opCtx->lockState(), oplogNss.ns(), MODE_X); - Collection* oplogCollection = autoDb.getDb()->getCollection(opCtx, oplogNss); - if (!oplogCollection) { - fassertFailedWithStatusNoTrace( - 34418, - Status(ErrorCodes::NamespaceNotFound, - str::stream() << "Can't find " << NamespaceString::kRsOplogNamespace.ns())); - } - - // Scan through oplog in reverse, from latest entry to first, to find the truncateTimestamp. - RecordId oldestIDToDelete; // Non-null if there is something to delete. - auto oplogRs = oplogCollection->getRecordStore(); - auto oplogReverseCursor = oplogRs->getCursor(opCtx, /*forward=*/false); - size_t count = 0; - while (auto next = oplogReverseCursor->next()) { - const BSONObj entry = next->data.releaseToBson(); - const RecordId id = next->id; - count++; - - const auto tsElem = entry["ts"]; - if (count == 1) { - if (tsElem.eoo()) - LOG(2) << "Oplog tail entry: " << redact(entry); - else - LOG(2) << "Oplog tail entry ts field: " << tsElem; - } - - if (tsElem.timestamp() < truncateTimestamp) { - // If count == 1, that means that we have nothing to delete because everything in the - // oplog is < truncateTimestamp. - if (count != 1) { - invariant(!oldestIDToDelete.isNull()); - oplogCollection->cappedTruncateAfter(opCtx, oldestIDToDelete, /*inclusive=*/true); - } - return; - } - - oldestIDToDelete = id; - } - - severe() << "Reached end of oplog looking for oplog entry before " << truncateTimestamp.toBSON() - << " but couldn't find any after looking through " << count << " entries."; - fassertFailedNoTrace(40296); -} - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/replication_recovery.h b/src/mongo/db/repl/replication_recovery.h deleted file mode 100644 index 77dbcf404e8..00000000000 --- a/src/mongo/db/repl/replication_recovery.h +++ /dev/null @@ -1,83 +0,0 @@ -/** -* Copyright (C) 2017 MongoDB Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects for -* all of the code used other than as permitted herein. If you modify file(s) -* with this exception, you may extend this exception to your version of the -* file(s), but you are not obligated to do so. If you do not wish to do so, -* delete this exception statement from your version. If you delete this -* exception statement from all source files in the program, then also delete -* it in the license file. -*/ - -#pragma once - -#include "mongo/base/disallow_copying.h" -#include "mongo/base/status_with.h" -#include "mongo/db/repl/optime.h" - -namespace mongo { - -class OperationContext; - -namespace repl { - -class StorageInterface; -class ReplicationConsistencyMarkers; - -/** - * This class is used by the replication system to recover after an unclean shutdown or a rollback. - */ -class ReplicationRecovery { -public: - ReplicationRecovery() = default; - virtual ~ReplicationRecovery() = default; - - /** - * Recovers the data on disk from the oplog. - */ - virtual void recoverFromOplog(OperationContext* opCtx) = 0; -}; - -class ReplicationRecoveryImpl : public ReplicationRecovery { - MONGO_DISALLOW_COPYING(ReplicationRecoveryImpl); - -public: - ReplicationRecoveryImpl(StorageInterface* storageInterface, - ReplicationConsistencyMarkers* consistencyMarkers); - - void recoverFromOplog(OperationContext* opCtx) override; - -private: - /** - * Gets the last applied OpTime from the end of the oplog. - */ - StatusWith<OpTime> _getLastAppliedOpTime(OperationContext* opCtx) const; - - /** - * Truncates the oplog after and including the "truncateTimestamp" entry. - */ - void _truncateOplogTo(OperationContext* opCtx, Timestamp truncateTimestamp); - - StorageInterface* _storageInterface; - ReplicationConsistencyMarkers* _consistencyMarkers; -}; - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/replication_recovery_mock.h b/src/mongo/db/repl/replication_recovery_mock.h deleted file mode 100644 index 220030ab0b5..00000000000 --- a/src/mongo/db/repl/replication_recovery_mock.h +++ /dev/null @@ -1,48 +0,0 @@ -/** -* Copyright (C) 2017 MongoDB Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects for -* all of the code used other than as permitted herein. If you modify file(s) -* with this exception, you may extend this exception to your version of the -* file(s), but you are not obligated to do so. If you do not wish to do so, -* delete this exception statement from your version. If you delete this -* exception statement from all source files in the program, then also delete -* it in the license file. -*/ - -#pragma once - -#include "mongo/base/disallow_copying.h" -#include "mongo/db/repl/replication_recovery.h" - -namespace mongo { -class OperationContext; -namespace repl { - -class ReplicationRecoveryMock : public ReplicationRecovery { - MONGO_DISALLOW_COPYING(ReplicationRecoveryMock); - -public: - ReplicationRecoveryMock() = default; - - void recoverFromOplog(OperationContext* opCtx) override {} -}; - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/replication_recovery_test.cpp b/src/mongo/db/repl/replication_recovery_test.cpp deleted file mode 100644 index 5e77881bb8a..00000000000 --- a/src/mongo/db/repl/replication_recovery_test.cpp +++ /dev/null @@ -1,350 +0,0 @@ -/** - * Copyright 2017 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/db/repl/replication_recovery.h" - -#include "mongo/platform/basic.h" - -#include "mongo/db/client.h" -#include "mongo/db/namespace_string.h" -#include "mongo/db/operation_context.h" -#include "mongo/db/repl/oplog_interface_local.h" -#include "mongo/db/repl/replication_consistency_markers_mock.h" -#include "mongo/db/repl/replication_coordinator_mock.h" -#include "mongo/db/repl/storage_interface_impl.h" -#include "mongo/db/service_context_d_test_fixture.h" -#include "mongo/stdx/memory.h" -#include "mongo/unittest/death_test.h" -#include "mongo/unittest/unittest.h" -#include "mongo/util/assert_util.h" -#include "mongo/util/mongoutils/str.h" - -namespace { - -using namespace mongo; -using namespace mongo::repl; - -const auto& oplogNs = NamespaceString::kRsOplogNamespace; -const NamespaceString testNs("a.a"); - -class ReplicationRecoveryTest : public ServiceContextMongoDTest { -protected: - OperationContext* getOperationContext() { - return _opCtx.get(); - } - - StorageInterface* getStorageInterface() { - return _storageInterface.get(); - } - - ReplicationConsistencyMarkers* getConsistencyMarkers() { - return _consistencyMarkers.get(); - } - -private: - void setUp() override { - ServiceContextMongoDTest::setUp(); - _createOpCtx(); - _storageInterface = stdx::make_unique<StorageInterfaceImpl>(); - _consistencyMarkers = stdx::make_unique<ReplicationConsistencyMarkersMock>(); - - auto service = getServiceContext(); - ReplicationCoordinator::set(service, - stdx::make_unique<ReplicationCoordinatorMock>(service)); - - ASSERT_OK(_storageInterface->createCollection( - getOperationContext(), testNs, CollectionOptions())); - } - - void tearDown() override { - _opCtx.reset(nullptr); - _consistencyMarkers.reset(); - _storageInterface.reset(); - ServiceContextMongoDTest::tearDown(); - } - - void _createOpCtx() { - _opCtx = cc().makeOperationContext(); - } - - ServiceContext::UniqueOperationContext _opCtx; - std::unique_ptr<StorageInterfaceImpl> _storageInterface; - std::unique_ptr<ReplicationConsistencyMarkersMock> _consistencyMarkers; -}; - -/** - * Generates a document to be inserted into the test collection. - */ -BSONObj _makeInsertDocument(int t) { - return BSON("_id" << t << "a" << t); -} - -/** - * Generates oplog entries with the given number used for the timestamp. - */ -BSONObj _makeOplogEntry(int t) { - return BSON("ts" << Timestamp(t, t) << "h" << t << "ns" << testNs.ns() << "v" << 2 << "op" - << "i" - << "o" - << _makeInsertDocument(t)); -} - -/** - * Creates collection options suitable for oplog. - */ -CollectionOptions _createOplogCollectionOptions() { - CollectionOptions options; - options.capped = true; - options.cappedSize = 64 * 1024 * 1024LL; - options.autoIndexId = CollectionOptions::NO; - return options; -} - -/** - * Creates an oplog with insert entries at the given timestamps. - */ -void _setUpOplog(OperationContext* opCtx, StorageInterface* storage, std::vector<int> timestamps) { - ASSERT_OK(storage->createCollection(opCtx, oplogNs, _createOplogCollectionOptions())); - - for (int ts : timestamps) { - ASSERT_OK(storage->insertDocument(opCtx, oplogNs, _makeOplogEntry(ts))); - } -} - -/** - * Check collection contents. OplogInterface returns documents in reverse natural order. - */ -void _assertDocumentsInCollectionEquals(OperationContext* opCtx, - const NamespaceString& nss, - const std::vector<BSONObj>& docs) { - std::vector<BSONObj> reversedDocs(docs); - std::reverse(reversedDocs.begin(), reversedDocs.end()); - OplogInterfaceLocal oplog(opCtx, nss.ns()); - auto iter = oplog.makeIterator(); - for (const auto& doc : reversedDocs) { - ASSERT_BSONOBJ_EQ(doc, unittest::assertGet(iter->next()).first); - } - ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus()); -} - -/** - * Asserts that the documents in the oplog have the given timestamps. - */ -void _assertDocsInOplog(OperationContext* opCtx, std::vector<int> timestamps) { - std::vector<BSONObj> expectedOplog(timestamps.size()); - std::transform(timestamps.begin(), timestamps.end(), expectedOplog.begin(), [](int ts) { - return _makeOplogEntry(ts); - }); - _assertDocumentsInCollectionEquals(opCtx, oplogNs, expectedOplog); -} - -/** - * Asserts that the documents in the test collection have the given ids. - */ -void _assertDocsInTestCollection(OperationContext* opCtx, std::vector<int> ids) { - std::vector<BSONObj> expectedColl(ids.size()); - std::transform(ids.begin(), ids.end(), expectedColl.begin(), [](int id) { - return _makeInsertDocument(id); - }); - _assertDocumentsInCollectionEquals(opCtx, testNs, expectedColl); -} - -TEST_F(ReplicationRecoveryTest, RecoveryWithEmptyOplogSucceeds) { - ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers()); - auto opCtx = getOperationContext(); - - _setUpOplog(opCtx, getStorageInterface(), {}); - - recovery.recoverFromOplog(opCtx); - - _assertDocsInOplog(opCtx, {}); - _assertDocsInTestCollection(opCtx, {}); -} - -DEATH_TEST_F(ReplicationRecoveryTest, - RecoveryWithEmptyOplogAndNonNullAppliedThroughInvariants, - "Invariant failure 1U == docs.size()") { - ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers()); - auto opCtx = getOperationContext(); - - _setUpOplog(opCtx, getStorageInterface(), {}); - - getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(3, 3), 1)); - recovery.recoverFromOplog(opCtx); - - _assertDocsInOplog(opCtx, {}); - _assertDocsInTestCollection(opCtx, {}); -} - -DEATH_TEST_F(ReplicationRecoveryTest, - TruncateFassertsWithoutOplogCollection, - "Fatal assertion 34418 NamespaceNotFound: Can't find local.oplog.rs") { - ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers()); - auto opCtx = getOperationContext(); - - getConsistencyMarkers()->setOplogTruncateAfterPoint(opCtx, Timestamp(4, 4)); - getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(3, 3), 1)); - - // Create the database. - ASSERT_OK(getStorageInterface()->createCollection( - opCtx, NamespaceString("local.other"), CollectionOptions())); - - recovery.recoverFromOplog(opCtx); -} - -DEATH_TEST_F(ReplicationRecoveryTest, - TruncateEntireOplogFasserts, - "Reached end of oplog looking for oplog entry before { : Timestamp 4000|4 } but " - "couldn't find any after looking through 3 entries.") { - ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers()); - auto opCtx = getOperationContext(); - - getConsistencyMarkers()->setOplogTruncateAfterPoint(opCtx, Timestamp(4, 4)); - getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(3, 3), 1)); - _setUpOplog(opCtx, getStorageInterface(), {7, 8, 9}); - - recovery.recoverFromOplog(opCtx); -} - -TEST_F(ReplicationRecoveryTest, RecoveryTruncatesOplogAtOplogTruncateAfterPoint) { - ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers()); - auto opCtx = getOperationContext(); - - getConsistencyMarkers()->setOplogTruncateAfterPoint(opCtx, Timestamp(4, 4)); - getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(3, 3), 1)); - _setUpOplog(opCtx, getStorageInterface(), {1, 2, 3, 4, 5}); - - recovery.recoverFromOplog(opCtx); - - _assertDocsInOplog(opCtx, {1, 2, 3}); - _assertDocsInTestCollection(opCtx, {}); - ASSERT_EQ(getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx), Timestamp()); - ASSERT_EQ(getConsistencyMarkers()->getAppliedThrough(opCtx), OpTime(Timestamp(3, 3), 1)); -} - -TEST_F(ReplicationRecoveryTest, RecoverySkipsEverythingIfInitialSyncFlagIsSet) { - ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers()); - auto opCtx = getOperationContext(); - - getConsistencyMarkers()->setInitialSyncFlag(opCtx); - getConsistencyMarkers()->setOplogTruncateAfterPoint(opCtx, Timestamp(4, 4)); - getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(1, 1), 1)); - _setUpOplog(opCtx, getStorageInterface(), {1, 2, 3, 4, 5}); - - recovery.recoverFromOplog(opCtx); - - _assertDocsInOplog(opCtx, {1, 2, 3, 4, 5}); - _assertDocsInTestCollection(opCtx, {}); - ASSERT_EQ(getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx), Timestamp(4, 4)); - ASSERT_EQ(getConsistencyMarkers()->getAppliedThrough(opCtx), OpTime(Timestamp(1, 1), 1)); -} - -TEST_F(ReplicationRecoveryTest, RecoveryResetsOplogTruncateAfterPointWhenAppliedThroughIsNull) { - ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers()); - auto opCtx = getOperationContext(); - - getConsistencyMarkers()->setOplogTruncateAfterPoint(opCtx, Timestamp(4, 4)); - getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime()); - _setUpOplog(opCtx, getStorageInterface(), {1, 2, 3, 4, 5}); - - recovery.recoverFromOplog(opCtx); - - _assertDocsInOplog(opCtx, {1, 2, 3, 4, 5}); - _assertDocsInTestCollection(opCtx, {}); - ASSERT_EQ(getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx), Timestamp()); - ASSERT_EQ(getConsistencyMarkers()->getAppliedThrough(opCtx), OpTime()); -} - -TEST_F(ReplicationRecoveryTest, RecoveryAppliesDocumentsWhenAppliedThroughIsBehind) { - ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers()); - auto opCtx = getOperationContext(); - - getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(3, 3), 1)); - _setUpOplog(opCtx, getStorageInterface(), {1, 2, 3, 4, 5}); - - recovery.recoverFromOplog(opCtx); - - _assertDocsInOplog(opCtx, {1, 2, 3, 4, 5}); - _assertDocsInTestCollection(opCtx, {4, 5}); - ASSERT_EQ(getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx), Timestamp()); - ASSERT_EQ(getConsistencyMarkers()->getAppliedThrough(opCtx), OpTime(Timestamp(5, 5), 1)); -} - -TEST_F(ReplicationRecoveryTest, RecoveryAppliesDocumentsWhenAppliedThroughIsBehindAfterTruncation) { - ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers()); - auto opCtx = getOperationContext(); - - getConsistencyMarkers()->setOplogTruncateAfterPoint(opCtx, Timestamp(4, 4)); - getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(1, 1), 1)); - _setUpOplog(opCtx, getStorageInterface(), {1, 2, 3, 4, 5}); - - recovery.recoverFromOplog(opCtx); - - _assertDocsInOplog(opCtx, {1, 2, 3}); - _assertDocsInTestCollection(opCtx, {2, 3}); - ASSERT_EQ(getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx), Timestamp()); - ASSERT_EQ(getConsistencyMarkers()->getAppliedThrough(opCtx), OpTime(Timestamp(3, 3), 1)); -} - -DEATH_TEST_F(ReplicationRecoveryTest, AppliedThroughBehindOplogFasserts, "Fatal Assertion 40293") { - ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers()); - auto opCtx = getOperationContext(); - - getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(1, 1), 1)); - _setUpOplog(opCtx, getStorageInterface(), {3, 4, 5}); - - recovery.recoverFromOplog(opCtx); -} - -DEATH_TEST_F(ReplicationRecoveryTest, - AppliedThroughAheadOfTopOfOplogCausesFassert, - "Applied op { ts: Timestamp 9000|9, t: 1 } not found. Top of oplog is { ts: Timestamp " - "5000|5, t: -1 }.") { - ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers()); - auto opCtx = getOperationContext(); - - getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(9, 9), 1)); - _setUpOplog(opCtx, getStorageInterface(), {1, 2, 3, 4, 5}); - - recovery.recoverFromOplog(opCtx); -} - -DEATH_TEST_F(ReplicationRecoveryTest, - AppliedThroughNotInOplogCausesFassert, - "Oplog entry at { ts: Timestamp 3000|3, t: 1 } is missing; actual entry found is { " - "ts: Timestamp 4000|4, t: -1 }") { - ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers()); - auto opCtx = getOperationContext(); - - getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(3, 3), 1)); - _setUpOplog(opCtx, getStorageInterface(), {1, 2, 4, 5}); - - recovery.recoverFromOplog(opCtx); -} - -} // namespace diff --git a/src/mongo/db/repl/rollback_test_fixture.cpp b/src/mongo/db/repl/rollback_test_fixture.cpp index 4bfd76b96f4..89d52be56e2 100644 --- a/src/mongo/db/repl/rollback_test_fixture.cpp +++ b/src/mongo/db/repl/rollback_test_fixture.cpp @@ -38,7 +38,6 @@ #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/repl/replication_process.h" -#include "mongo/db/repl/replication_recovery_mock.h" #include "mongo/db/session_catalog.h" #include "mongo/stdx/memory.h" #include "mongo/util/mongoutils/str.h" @@ -64,9 +63,7 @@ void RollbackTest::setUp() { _serviceContextMongoDTest.setUp(); auto serviceContext = _serviceContextMongoDTest.getServiceContext(); _replicationProcess = stdx::make_unique<ReplicationProcess>( - &_storageInterface, - stdx::make_unique<ReplicationConsistencyMarkersMock>(), - stdx::make_unique<ReplicationRecoveryMock>()); + &_storageInterface, stdx::make_unique<ReplicationConsistencyMarkersMock>()); _dropPendingCollectionReaper = new DropPendingCollectionReaper(&_storageInterface); DropPendingCollectionReaper::set( serviceContext, std::unique_ptr<DropPendingCollectionReaper>(_dropPendingCollectionReaper)); diff --git a/src/mongo/db/repl/sync_tail_test_fixture.cpp b/src/mongo/db/repl/sync_tail_test_fixture.cpp index c9998c89660..a67817924ba 100644 --- a/src/mongo/db/repl/sync_tail_test_fixture.cpp +++ b/src/mongo/db/repl/sync_tail_test_fixture.cpp @@ -36,7 +36,6 @@ #include "mongo/db/repl/replication_consistency_markers_mock.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/repl/replication_process.h" -#include "mongo/db/repl/replication_recovery_mock.h" #include "mongo/db/repl/storage_interface_mock.h" namespace mongo { @@ -57,10 +56,8 @@ void SyncTailTest::setUp() { DropPendingCollectionReaper::set( service, stdx::make_unique<DropPendingCollectionReaper>(_storageInterface)); - _replicationProcess = - new ReplicationProcess(_storageInterface, - stdx::make_unique<ReplicationConsistencyMarkersMock>(), - stdx::make_unique<ReplicationRecoveryMock>()); + _replicationProcess = new ReplicationProcess( + _storageInterface, stdx::make_unique<ReplicationConsistencyMarkersMock>()); ReplicationProcess::set(cc().getServiceContext(), std::unique_ptr<ReplicationProcess>(_replicationProcess)); diff --git a/src/mongo/dbtests/replica_set_tests.cpp b/src/mongo/dbtests/replica_set_tests.cpp index c6453e49e2e..e70a221fa57 100644 --- a/src/mongo/dbtests/replica_set_tests.cpp +++ b/src/mongo/dbtests/replica_set_tests.cpp @@ -35,7 +35,6 @@ #include "mongo/db/repl/replication_consistency_markers_impl.h" #include "mongo/db/repl/replication_coordinator_external_state_impl.h" #include "mongo/db/repl/replication_process.h" -#include "mongo/db/repl/replication_recovery.h" #include "mongo/db/repl/storage_interface_mock.h" #include "mongo/db/service_context.h" #include "mongo/unittest/unittest.h" @@ -54,12 +53,9 @@ protected: _storageInterface = stdx::make_unique<repl::StorageInterfaceMock>(); _dropPendingCollectionReaper = stdx::make_unique<repl::DropPendingCollectionReaper>(_storageInterface.get()); - auto consistencyMarkers = - stdx::make_unique<repl::ReplicationConsistencyMarkersImpl>(_storageInterface.get()); - auto recovery = stdx::make_unique<repl::ReplicationRecoveryImpl>(_storageInterface.get(), - consistencyMarkers.get()); _replicationProcess = stdx::make_unique<repl::ReplicationProcess>( - _storageInterface.get(), std::move(consistencyMarkers), std::move(recovery)); + _storageInterface.get(), + stdx::make_unique<repl::ReplicationConsistencyMarkersImpl>(_storageInterface.get())); _replCoordExternalState = stdx::make_unique<repl::ReplicationCoordinatorExternalStateImpl>( opCtx->getServiceContext(), _dropPendingCollectionReaper.get(), diff --git a/src/mongo/s/sharding_mongod_test_fixture.cpp b/src/mongo/s/sharding_mongod_test_fixture.cpp index 453951a1735..3a9fa023b2a 100644 --- a/src/mongo/s/sharding_mongod_test_fixture.cpp +++ b/src/mongo/s/sharding_mongod_test_fixture.cpp @@ -51,7 +51,6 @@ #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/repl/replication_process.h" -#include "mongo/db/repl/replication_recovery_mock.h" #include "mongo/db/repl/storage_interface_mock.h" #include "mongo/db/service_context_noop.h" #include "mongo/executor/network_interface_mock.h" @@ -131,11 +130,10 @@ void ShardingMongodTestFixture::setUp() { repl::DropPendingCollectionReaper::set( service, stdx::make_unique<repl::DropPendingCollectionReaper>(storagePtr.get())); - repl::ReplicationProcess::set(service, - stdx::make_unique<repl::ReplicationProcess>( - storagePtr.get(), - stdx::make_unique<repl::ReplicationConsistencyMarkersMock>(), - stdx::make_unique<repl::ReplicationRecoveryMock>())); + repl::ReplicationProcess::set( + service, + stdx::make_unique<repl::ReplicationProcess>( + storagePtr.get(), stdx::make_unique<repl::ReplicationConsistencyMarkersMock>())); repl::ReplicationProcess::get(_opCtx.get()) ->initializeRollbackID(_opCtx.get()) .transitional_ignore(); |