diff options
23 files changed, 784 insertions, 177 deletions
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index c5497433a2f..27e599f0510 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -94,6 +94,7 @@ #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/repl/replication_coordinator_impl.h" #include "mongo/db/repl/replication_process.h" +#include "mongo/db/repl/replication_recovery.h" #include "mongo/db/repl/storage_interface_impl.h" #include "mongo/db/repl/topology_coordinator_impl.h" #include "mongo/db/s/balancer/balancer.h" @@ -919,11 +920,14 @@ MONGO_INITIALIZER_WITH_PREREQUISITES(CreateReplicationManager, repl::StorageInterface::set(serviceContext, stdx::make_unique<repl::StorageInterfaceImpl>()); auto storageInterface = repl::StorageInterface::get(serviceContext); + auto consistencyMarkers = + stdx::make_unique<repl::ReplicationConsistencyMarkersImpl>(storageInterface); + auto recovery = stdx::make_unique<repl::ReplicationRecoveryImpl>(storageInterface, + consistencyMarkers.get()); repl::ReplicationProcess::set( serviceContext, stdx::make_unique<repl::ReplicationProcess>( - storageInterface, - stdx::make_unique<repl::ReplicationConsistencyMarkersImpl>(storageInterface))); + storageInterface, std::move(consistencyMarkers), std::move(recovery))); auto replicationProcess = repl::ReplicationProcess::get(serviceContext); repl::DropPendingCollectionReaper::set( diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 2653b607ab4..0877b12cf65 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -223,6 +223,34 @@ env.CppUnitTest( ) env.Library( + target='replication_recovery', + source=[ + 'replication_recovery.cpp', + ], + LIBDEPS=[ + ], + LIBDEPS_PRIVATE=[ + 'sync_tail', + '$BUILD_DIR/mongo/base', + ], +) + +env.CppUnitTest( + target='replication_recovery_test', + source=[ + 'replication_recovery_test.cpp', + ], + LIBDEPS=[ + ], + LIBDEPS_PRIVATE=[ + 'replmocks', + 'replication_recovery', + 'storage_interface_impl', + '$BUILD_DIR/mongo/db/service_context_d_test_fixture', + ], +) + +env.Library( target='replication_process', source=[ 'replication_consistency_markers.cpp', @@ -1534,6 +1562,7 @@ env.Library( 'repl_settings', 'replication_consistency_markers_impl', 'replication_process', + 'replication_recovery', 'rollback_source_impl', 'sync_tail', ], diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp index ddce6686acb..033af4a4ad6 100644 --- a/src/mongo/db/repl/initial_syncer_test.cpp +++ b/src/mongo/db/repl/initial_syncer_test.cpp @@ -45,6 +45,7 @@ #include "mongo/db/repl/optime.h" #include "mongo/db/repl/replication_consistency_markers_mock.h" #include "mongo/db/repl/replication_process.h" +#include "mongo/db/repl/replication_recovery_mock.h" #include "mongo/db/repl/reporter.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/db/repl/storage_interface_mock.h" @@ -285,7 +286,9 @@ protected: launchExecutorThread(); _replicationProcess = stdx::make_unique<ReplicationProcess>( - _storageInterface.get(), stdx::make_unique<ReplicationConsistencyMarkersMock>()); + _storageInterface.get(), + stdx::make_unique<ReplicationConsistencyMarkersMock>(), + stdx::make_unique<ReplicationRecoveryMock>()); _executorProxy = stdx::make_unique<TaskExecutorMock>(&getExecutor()); diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index ffb85a0bdf0..0e7f873fcfd 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -364,56 +364,6 @@ OplogDocWriter _logOpWriter(OperationContext* opCtx, } } // end anon namespace -// Truncates the oplog after and including the "truncateTimestamp" entry. -void truncateOplogTo(OperationContext* opCtx, Timestamp truncateTimestamp) { - const NamespaceString oplogNss(NamespaceString::kRsOplogNamespace); - AutoGetDb autoDb(opCtx, oplogNss.db(), MODE_IX); - Lock::CollectionLock oplogCollectionLoc(opCtx->lockState(), oplogNss.ns(), MODE_X); - Collection* oplogCollection = autoDb.getDb()->getCollection(opCtx, oplogNss); - if (!oplogCollection) { - fassertFailedWithStatusNoTrace( - 34418, - Status(ErrorCodes::NamespaceNotFound, - str::stream() << "Can't find " << NamespaceString::kRsOplogNamespace.ns())); - } - - // Scan through oplog in reverse, from latest entry to first, to find the truncateTimestamp. - RecordId oldestIDToDelete; // Non-null if there is something to delete. - auto oplogRs = oplogCollection->getRecordStore(); - auto oplogReverseCursor = oplogRs->getCursor(opCtx, /*forward=*/false); - size_t count = 0; - while (auto next = oplogReverseCursor->next()) { - const BSONObj entry = next->data.releaseToBson(); - const RecordId id = next->id; - count++; - - const auto tsElem = entry["ts"]; - if (count == 1) { - if (tsElem.eoo()) - LOG(2) << "Oplog tail entry: " << redact(entry); - else - LOG(2) << "Oplog tail entry ts field: " << tsElem; - } - - if (tsElem.timestamp() < truncateTimestamp) { - // If count == 1, that means that we have nothing to delete because everything in the - // oplog is < truncateTimestamp. - if (count != 1) { - invariant(!oldestIDToDelete.isNull()); - oplogCollection->cappedTruncateAfter(opCtx, oldestIDToDelete, /*inclusive=*/true); - } - return; - } - - oldestIDToDelete = id; - } - - severe() << "Reached end of oplog looking for oplog entry before " - << truncateTimestamp.toStringPretty() - << " but couldn't find any after looking through " << count << " entries."; - fassertFailedNoTrace(40296); -} - /* we write to local.oplog.rs: { ts : ..., h: ..., v: ..., op: ..., etc } ts: an OpTime timestamp diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h index f3974984514..f119534ad7a 100644 --- a/src/mongo/db/repl/oplog.h +++ b/src/mongo/db/repl/oplog.h @@ -50,11 +50,6 @@ namespace repl { class ReplSettings; /** - * Truncates the oplog after, and including, the "truncateTimestamp" entry. - */ -void truncateOplogTo(OperationContext* opCtx, Timestamp truncateTimestamp); - -/** * Create a new capped collection for the oplog if it doesn't yet exist. * If the collection already exists (and isReplSet is false), * set the 'last' Timestamp from the last entry of the oplog collection (side effect!) diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h index 81d217dac06..617c92cb7fa 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state.h +++ b/src/mongo/db/repl/replication_coordinator_external_state.h @@ -204,13 +204,6 @@ public: virtual StatusWith<OpTime> loadLastOpTime(OperationContext* opCtx) = 0; /** - * Cleaning up the oplog, by potentially truncating: - * If we are recovering from a failed batch then minvalid.start though minvalid.end need - * to be removed from the oplog before we can start applying operations. - */ - virtual void cleanUpLastApplyBatch(OperationContext* opCtx) = 0; - - /** * Returns the HostAndPort of the remote client connected to us that initiated the operation * represented by "opCtx". */ diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 93dd61ad3ef..f396126c7de 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -569,89 +569,6 @@ void ReplicationCoordinatorExternalStateImpl::setGlobalTimestamp(ServiceContext* setNewTimestamp(ctx, newTime); } -void ReplicationCoordinatorExternalStateImpl::cleanUpLastApplyBatch(OperationContext* opCtx) { - if (_replicationProcess->getConsistencyMarkers()->getInitialSyncFlag(opCtx)) { - return; // Initial Sync will take over so no cleanup is needed. - } - - const auto truncateAfterPoint = - _replicationProcess->getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx); - const auto appliedThrough = - _replicationProcess->getConsistencyMarkers()->getAppliedThrough(opCtx); - - const bool needToDeleteEndOfOplog = !truncateAfterPoint.isNull() && - // This version should never have a non-null truncateAfterPoint with a null appliedThrough. - // This scenario means that we downgraded after unclean shutdown, then the downgraded node - // deleted the ragged end of our oplog, then did a clean shutdown. - !appliedThrough.isNull() && - // Similarly we should never have an appliedThrough higher than the truncateAfterPoint. This - // means that the downgraded node deleted our ragged end then applied ahead of our - // truncateAfterPoint and then had an unclean shutdown before upgrading. We are ok with - // applying these ops because older versions wrote to the oplog from a single thread so we - // know they are in order. - !(appliedThrough.getTimestamp() >= truncateAfterPoint); - if (needToDeleteEndOfOplog) { - log() << "Removing unapplied entries starting at: " << truncateAfterPoint; - truncateOplogTo(opCtx, truncateAfterPoint); - } - _replicationProcess->getConsistencyMarkers()->setOplogTruncateAfterPoint( - opCtx, {}); // clear the truncateAfterPoint - - if (appliedThrough.isNull()) { - // No follow-up work to do. - return; - } - - // Check if we have any unapplied ops in our oplog. It is important that this is done after - // deleting the ragged end of the oplog. - const auto topOfOplog = fassertStatusOK(40290, loadLastOpTime(opCtx)); - if (appliedThrough == topOfOplog) { - return; // We've applied all the valid oplog we have. - } else if (appliedThrough > topOfOplog) { - severe() << "Applied op " << appliedThrough << " not found. Top of oplog is " << topOfOplog - << '.'; - fassertFailedNoTrace(40313); - } - - log() << "Replaying stored operations from " << appliedThrough << " (exclusive) to " - << topOfOplog << " (inclusive)."; - - DBDirectClient db(opCtx); - auto cursor = db.query(NamespaceString::kRsOplogNamespace.ns(), - QUERY("ts" << BSON("$gte" << appliedThrough.getTimestamp())), - /*batchSize*/ 0, - /*skip*/ 0, - /*projection*/ nullptr, - QueryOption_OplogReplay); - - // Check that the first document matches our appliedThrough point then skip it since it's - // already been applied. - if (!cursor->more()) { - // This should really be impossible because we check above that the top of the oplog is - // strictly > appliedThrough. If this fails it represents a serious bug in either the - // storage engine or query's implementation of OplogReplay. - severe() << "Couldn't find any entries in the oplog >= " << appliedThrough - << " which should be impossible."; - fassertFailedNoTrace(40293); - } - auto firstOpTimeFound = fassertStatusOK(40291, OpTime::parseFromOplogEntry(cursor->nextSafe())); - if (firstOpTimeFound != appliedThrough) { - severe() << "Oplog entry at " << appliedThrough << " is missing; actual entry found is " - << firstOpTimeFound; - fassertFailedNoTrace(40292); - } - - // Apply remaining ops one at at time, but don't log them because they are already logged. - UnreplicatedWritesBlock uwb(opCtx); - - while (cursor->more()) { - auto entry = cursor->nextSafe(); - fassertStatusOK(40294, SyncTail::syncApply(opCtx, entry, true)); - _replicationProcess->getConsistencyMarkers()->setAppliedThrough( - opCtx, fassertStatusOK(40295, OpTime::parseFromOplogEntry(entry))); - } -} - StatusWith<OpTime> ReplicationCoordinatorExternalStateImpl::loadLastOpTime( OperationContext* opCtx) { // TODO: handle WriteConflictExceptions below diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h index 68318f01455..0893bdc16bd 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h @@ -92,7 +92,6 @@ public: virtual Status storeLocalLastVoteDocument(OperationContext* opCtx, const LastVote& lastVote); virtual void setGlobalTimestamp(ServiceContext* service, const Timestamp& newTime); virtual StatusWith<OpTime> loadLastOpTime(OperationContext* opCtx); - virtual void cleanUpLastApplyBatch(OperationContext* opCtx); virtual HostAndPort getClientHostAndPort(const OperationContext* opCtx); virtual void closeConnections(); virtual void killAllUserOperations(OperationContext* opCtx); diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp index 524db05c86a..1f544138fcc 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp @@ -166,8 +166,6 @@ void ReplicationCoordinatorExternalStateMock::setLocalLastVoteDocument( void ReplicationCoordinatorExternalStateMock::setGlobalTimestamp(ServiceContext* service, const Timestamp& newTime) {} -void ReplicationCoordinatorExternalStateMock::cleanUpLastApplyBatch(OperationContext* opCtx) {} - StatusWith<OpTime> ReplicationCoordinatorExternalStateMock::loadLastOpTime( OperationContext* opCtx) { return _lastOpTime; diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h index 5f6143ce329..c18c8c8fc16 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h @@ -80,7 +80,6 @@ public: virtual Status storeLocalLastVoteDocument(OperationContext* opCtx, const LastVote& lastVote); virtual void setGlobalTimestamp(ServiceContext* service, const Timestamp& newTime); virtual StatusWith<OpTime> loadLastOpTime(OperationContext* opCtx); - virtual void cleanUpLastApplyBatch(OperationContext* opCtx); virtual void closeConnections(); virtual void killAllUserOperations(OperationContext* opCtx); virtual void shardingOnStepDownHook(); diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index a3fd2fe8ddd..2ea7ee7b3fb 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -447,7 +447,7 @@ bool ReplicationCoordinatorImpl::_startLoadLocalConfig(OperationContext* opCtx) } // Read the last op from the oplog after cleaning up any partially applied batches. - _externalState->cleanUpLastApplyBatch(opCtx); + _replicationProcess->getReplicationRecovery()->recoverFromOplog(opCtx); auto lastOpTimeStatus = _externalState->loadLastOpTime(opCtx); // Use a callback here, because _finishLoadLocalConfig calls isself() which requires diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp index 5e8eac78138..ffc5af1de2d 100644 --- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp +++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp @@ -42,6 +42,7 @@ #include "mongo/db/repl/replication_coordinator_external_state_mock.h" #include "mongo/db/repl/replication_coordinator_impl.h" #include "mongo/db/repl/replication_process.h" +#include "mongo/db/repl/replication_recovery_mock.h" #include "mongo/db/repl/storage_interface_mock.h" #include "mongo/db/repl/topology_coordinator_impl.h" #include "mongo/executor/network_interface_mock.h" @@ -125,10 +126,11 @@ void ReplCoordTest::init() { StorageInterface::set(service, std::unique_ptr<StorageInterface>(_storageInterface)); ASSERT_TRUE(_storageInterface == StorageInterface::get(service)); - ReplicationProcess::set( - service, - stdx::make_unique<ReplicationProcess>( - _storageInterface, stdx::make_unique<ReplicationConsistencyMarkersMock>())); + ReplicationProcess::set(service, + stdx::make_unique<ReplicationProcess>( + _storageInterface, + stdx::make_unique<ReplicationConsistencyMarkersMock>(), + stdx::make_unique<ReplicationRecoveryMock>())); auto replicationProcess = ReplicationProcess::get(service); // PRNG seed for tests. diff --git a/src/mongo/db/repl/replication_process.cpp b/src/mongo/db/repl/replication_process.cpp index 7e7dc077b11..cde7fcdbae9 100644 --- a/src/mongo/db/repl/replication_process.cpp +++ b/src/mongo/db/repl/replication_process.cpp @@ -82,9 +82,11 @@ void ReplicationProcess::set(ServiceContext* service, std::unique_ptr<Replicatio ReplicationProcess::ReplicationProcess( StorageInterface* storageInterface, - std::unique_ptr<ReplicationConsistencyMarkers> consistencyMarkers) + std::unique_ptr<ReplicationConsistencyMarkers> consistencyMarkers, + std::unique_ptr<ReplicationRecovery> recovery) : _storageInterface(storageInterface), _consistencyMarkers(std::move(consistencyMarkers)), + _recovery(std::move(recovery)), _rbid(kUninitializedRollbackId) {} StatusWith<int> ReplicationProcess::getRollbackID(OperationContext* opCtx) { @@ -182,5 +184,9 @@ ReplicationConsistencyMarkers* ReplicationProcess::getConsistencyMarkers() { return _consistencyMarkers.get(); } +ReplicationRecovery* ReplicationProcess::getReplicationRecovery() { + return _recovery.get(); +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/replication_process.h b/src/mongo/db/repl/replication_process.h index da4a2886c6a..5b6229ac6d0 100644 --- a/src/mongo/db/repl/replication_process.h +++ b/src/mongo/db/repl/replication_process.h @@ -37,6 +37,7 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/replication_consistency_markers.h" +#include "mongo/db/repl/replication_recovery.h" #include "mongo/stdx/mutex.h" namespace mongo { @@ -81,7 +82,8 @@ public: static void set(ServiceContext* service, std::unique_ptr<ReplicationProcess> process); ReplicationProcess(StorageInterface* storageInterface, - std::unique_ptr<ReplicationConsistencyMarkers> consistencyMarkers); + std::unique_ptr<ReplicationConsistencyMarkers> consistencyMarkers, + std::unique_ptr<ReplicationRecovery> recovery); virtual ~ReplicationProcess() = default; /** @@ -134,6 +136,11 @@ public: */ ReplicationConsistencyMarkers* getConsistencyMarkers(); + /** + * Returns an object used to recover from the oplog on startup or rollback. + */ + ReplicationRecovery* getReplicationRecovery(); + private: // All member variables are labeled with one of the following codes indicating the // synchronization rules for accessing them. @@ -151,6 +158,8 @@ private: // Used for operations on documents that maintain replication consistency. std::unique_ptr<ReplicationConsistencyMarkers> _consistencyMarkers; // (S) + std::unique_ptr<ReplicationRecovery> _recovery; // (S) + // Rollback ID. This is a cached copy of the persisted value in the local.system.rollback.id // collection. int _rbid; // (M) diff --git a/src/mongo/db/repl/replication_process_test.cpp b/src/mongo/db/repl/replication_process_test.cpp index 8464b54906f..40a51312433 100644 --- a/src/mongo/db/repl/replication_process_test.cpp +++ b/src/mongo/db/repl/replication_process_test.cpp @@ -37,6 +37,7 @@ #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/repl/replication_process.h" +#include "mongo/db/repl/replication_recovery_mock.h" #include "mongo/db/repl/storage_interface_impl.h" #include "mongo/db/repl/storage_interface_mock.h" #include "mongo/db/service_context.h" @@ -78,7 +79,8 @@ TEST_F(ReplicationProcessTest, ServiceContextDecorator) { ASSERT_FALSE(ReplicationProcess::get(serviceContext)); ReplicationProcess* replicationProcess = new ReplicationProcess( _storageInterface.get(), - stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get())); + stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()), + stdx::make_unique<ReplicationRecoveryMock>()); ReplicationProcess::set(serviceContext, std::unique_ptr<ReplicationProcess>(replicationProcess)); ASSERT_TRUE(replicationProcess == ReplicationProcess::get(serviceContext)); @@ -90,7 +92,8 @@ TEST_F(ReplicationProcessTest, GetRollbackProgressReturnsNoSuchKeyIfDocumentWithIdProgressIsNotFound) { ReplicationProcess replicationProcess( _storageInterface.get(), - stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get())); + stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()), + stdx::make_unique<ReplicationRecoveryMock>()); // Collection is not found. auto opCtx = makeOpCtx(); @@ -126,7 +129,8 @@ TEST_F(ReplicationProcessTest, GetRollbackProgressReturnsBadStatusIfApplyUntilFi ReplicationProcess replicationProcess( _storageInterface.get(), - stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get())); + stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()), + stdx::make_unique<ReplicationRecoveryMock>()); ASSERT_EQUALS(ErrorCodes::TypeMismatch, replicationProcess.getRollbackProgress(opCtx.get())); } @@ -147,7 +151,8 @@ TEST_F(ReplicationProcessTest, ReplicationProcess replicationProcess( _storageInterface.get(), - stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get())); + stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()), + stdx::make_unique<ReplicationRecoveryMock>()); ASSERT_EQUALS(ErrorCodes::TypeMismatch, replicationProcess.getRollbackProgress(opCtx.get())); } @@ -166,7 +171,8 @@ TEST_F(ReplicationProcessTest, ReplicationProcess replicationProcess( _storageInterface.get(), - stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get())); + stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()), + stdx::make_unique<ReplicationRecoveryMock>()); ASSERT_EQUALS(applyUntil, unittest::assertGet(replicationProcess.getRollbackProgress(opCtx.get()))); @@ -181,7 +187,8 @@ TEST_F(ReplicationProcessTest, auto opCtx = makeOpCtx(); ReplicationProcess replicationProcess( _storageInterface.get(), - stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get())); + stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()), + stdx::make_unique<ReplicationRecoveryMock>()); ASSERT_OK(replicationProcess.setRollbackProgress(opCtx.get(), applyUntil)); ASSERT_EQUALS(1U, unittest::assertGet(_storageInterface->getCollectionCount( @@ -199,7 +206,8 @@ TEST_F(ReplicationProcessTest, auto opCtx = makeOpCtx(); ReplicationProcess replicationProcess( _storageInterface.get(), - stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get())); + stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()), + stdx::make_unique<ReplicationRecoveryMock>()); ASSERT_EQUALS(ErrorCodes::IllegalOperation, replicationProcess.setRollbackProgress(opCtx.get(), applyUntil)); } @@ -208,7 +216,8 @@ TEST_F(ReplicationProcessTest, ClearRollbackProgressReturnsSuccessIfCollectionDo auto opCtx = makeOpCtx(); ReplicationProcess replicationProcess( _storageInterface.get(), - stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get())); + stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()), + stdx::make_unique<ReplicationRecoveryMock>()); ASSERT_OK(replicationProcess.clearRollbackProgress(opCtx.get())); } @@ -220,7 +229,8 @@ TEST_F(ReplicationProcessTest, auto opCtx = makeOpCtx(); ReplicationProcess replicationProcess( _storageInterface.get(), - stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get())); + stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()), + stdx::make_unique<ReplicationRecoveryMock>()); ASSERT_EQUALS(ErrorCodes::IllegalOperation, replicationProcess.clearRollbackProgress(opCtx.get())); } diff --git a/src/mongo/db/repl/replication_recovery.cpp b/src/mongo/db/repl/replication_recovery.cpp new file mode 100644 index 00000000000..214e0c3cf7d --- /dev/null +++ b/src/mongo/db/repl/replication_recovery.cpp @@ -0,0 +1,200 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication + +#include "mongo/platform/basic.h" + +#include "mongo/db/repl/replication_recovery.h" + +#include "mongo/db/db_raii.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/repl/replication_consistency_markers_impl.h" +#include "mongo/db/repl/storage_interface.h" +#include "mongo/db/repl/sync_tail.h" +#include "mongo/util/log.h" + +namespace mongo { +namespace repl { + +ReplicationRecoveryImpl::ReplicationRecoveryImpl(StorageInterface* storageInterface, + ReplicationConsistencyMarkers* consistencyMarkers) + : _storageInterface(storageInterface), _consistencyMarkers(consistencyMarkers) {} + +void ReplicationRecoveryImpl::recoverFromOplog(OperationContext* opCtx) { + if (_consistencyMarkers->getInitialSyncFlag(opCtx)) { + log() << "No recovery needed. Initial sync flag set."; + return; // Initial Sync will take over so no cleanup is needed. + } + + const auto truncateAfterPoint = _consistencyMarkers->getOplogTruncateAfterPoint(opCtx); + const auto appliedThrough = _consistencyMarkers->getAppliedThrough(opCtx); + + const bool needToDeleteEndOfOplog = !truncateAfterPoint.isNull() && + // This version should never have a non-null truncateAfterPoint with a null appliedThrough. + // This scenario means that we downgraded after unclean shutdown, then the downgraded node + // deleted the ragged end of our oplog, then did a clean shutdown. + !appliedThrough.isNull() && + // Similarly we should never have an appliedThrough higher than the truncateAfterPoint. This + // means that the downgraded node deleted our ragged end then applied ahead of our + // truncateAfterPoint and then had an unclean shutdown before upgrading. We are ok with + // applying these ops because older versions wrote to the oplog from a single thread so we + // know they are in order. + !(appliedThrough.getTimestamp() >= truncateAfterPoint); + if (needToDeleteEndOfOplog) { + log() << "Removing unapplied entries starting at: " << truncateAfterPoint.toBSON(); + _truncateOplogTo(opCtx, truncateAfterPoint); + } + _consistencyMarkers->setOplogTruncateAfterPoint(opCtx, {}); // clear the truncateAfterPoint + + if (appliedThrough.isNull()) { + log() << "No oplog entries to apply for recovery. appliedThrough is null."; + // No follow-up work to do. + return; + } + + // Check if we have any unapplied ops in our oplog. It is important that this is done after + // deleting the ragged end of the oplog. + const auto topOfOplog = fassertStatusOK(40290, _getLastAppliedOpTime(opCtx)); + if (appliedThrough == topOfOplog) { + log() + << "No oplog entries to apply for recovery. appliedThrough is at the top of the oplog."; + return; // We've applied all the valid oplog we have. + } else if (appliedThrough > topOfOplog) { + severe() << "Applied op " << appliedThrough << " not found. Top of oplog is " << topOfOplog + << '.'; + fassertFailedNoTrace(40313); + } + + log() << "Replaying stored operations from " << appliedThrough << " (exclusive) to " + << topOfOplog << " (inclusive)."; + + DBDirectClient db(opCtx); + auto cursor = db.query(NamespaceString::kRsOplogNamespace.ns(), + QUERY("ts" << BSON("$gte" << appliedThrough.getTimestamp())), + /*batchSize*/ 0, + /*skip*/ 0, + /*projection*/ nullptr, + QueryOption_OplogReplay); + + // Check that the first document matches our appliedThrough point then skip it since it's + // already been applied. + if (!cursor->more()) { + // This should really be impossible because we check above that the top of the oplog is + // strictly > appliedThrough. If this fails it represents a serious bug in either the + // storage engine or query's implementation of OplogReplay. + severe() << "Couldn't find any entries in the oplog >= " << appliedThrough + << " which should be impossible."; + fassertFailedNoTrace(40293); + } + auto firstOpTimeFound = fassertStatusOK(40291, OpTime::parseFromOplogEntry(cursor->nextSafe())); + if (firstOpTimeFound != appliedThrough) { + severe() << "Oplog entry at " << appliedThrough << " is missing; actual entry found is " + << firstOpTimeFound; + fassertFailedNoTrace(40292); + } + + // Apply remaining ops one at at time, but don't log them because they are already logged. + UnreplicatedWritesBlock uwb(opCtx); + + while (cursor->more()) { + auto entry = cursor->nextSafe(); + fassertStatusOK(40294, SyncTail::syncApply(opCtx, entry, true)); + _consistencyMarkers->setAppliedThrough( + opCtx, fassertStatusOK(40295, OpTime::parseFromOplogEntry(entry))); + } +} + +StatusWith<OpTime> ReplicationRecoveryImpl::_getLastAppliedOpTime(OperationContext* opCtx) const { + const auto docsSW = _storageInterface->findDocuments(opCtx, + NamespaceString::kRsOplogNamespace, + boost::none, // Collection scan + StorageInterface::ScanDirection::kBackward, + {}, + BoundInclusion::kIncludeStartKeyOnly, + 1U); + if (!docsSW.isOK()) { + return docsSW.getStatus(); + } + const auto docs = docsSW.getValue(); + invariant(1U == docs.size()); + + return OpTime::parseFromOplogEntry(docs.front()); +} + +void ReplicationRecoveryImpl::_truncateOplogTo(OperationContext* opCtx, + Timestamp truncateTimestamp) { + const NamespaceString oplogNss(NamespaceString::kRsOplogNamespace); + AutoGetDb autoDb(opCtx, oplogNss.db(), MODE_IX); + Lock::CollectionLock oplogCollectionLoc(opCtx->lockState(), oplogNss.ns(), MODE_X); + Collection* oplogCollection = autoDb.getDb()->getCollection(opCtx, oplogNss); + if (!oplogCollection) { + fassertFailedWithStatusNoTrace( + 34418, + Status(ErrorCodes::NamespaceNotFound, + str::stream() << "Can't find " << NamespaceString::kRsOplogNamespace.ns())); + } + + // Scan through oplog in reverse, from latest entry to first, to find the truncateTimestamp. + RecordId oldestIDToDelete; // Non-null if there is something to delete. + auto oplogRs = oplogCollection->getRecordStore(); + auto oplogReverseCursor = oplogRs->getCursor(opCtx, /*forward=*/false); + size_t count = 0; + while (auto next = oplogReverseCursor->next()) { + const BSONObj entry = next->data.releaseToBson(); + const RecordId id = next->id; + count++; + + const auto tsElem = entry["ts"]; + if (count == 1) { + if (tsElem.eoo()) + LOG(2) << "Oplog tail entry: " << redact(entry); + else + LOG(2) << "Oplog tail entry ts field: " << tsElem; + } + + if (tsElem.timestamp() < truncateTimestamp) { + // If count == 1, that means that we have nothing to delete because everything in the + // oplog is < truncateTimestamp. + if (count != 1) { + invariant(!oldestIDToDelete.isNull()); + oplogCollection->cappedTruncateAfter(opCtx, oldestIDToDelete, /*inclusive=*/true); + } + return; + } + + oldestIDToDelete = id; + } + + severe() << "Reached end of oplog looking for oplog entry before " << truncateTimestamp.toBSON() + << " but couldn't find any after looking through " << count << " entries."; + fassertFailedNoTrace(40296); +} + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/replication_recovery.h b/src/mongo/db/repl/replication_recovery.h new file mode 100644 index 00000000000..77dbcf404e8 --- /dev/null +++ b/src/mongo/db/repl/replication_recovery.h @@ -0,0 +1,83 @@ +/** +* Copyright (C) 2017 MongoDB Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +* +* As a special exception, the copyright holders give permission to link the +* code of portions of this program with the OpenSSL library under certain +* conditions as described in each individual source file and distribute +* linked combinations including the program with the OpenSSL library. You +* must comply with the GNU Affero General Public License in all respects for +* all of the code used other than as permitted herein. If you modify file(s) +* with this exception, you may extend this exception to your version of the +* file(s), but you are not obligated to do so. If you do not wish to do so, +* delete this exception statement from your version. If you delete this +* exception statement from all source files in the program, then also delete +* it in the license file. +*/ + +#pragma once + +#include "mongo/base/disallow_copying.h" +#include "mongo/base/status_with.h" +#include "mongo/db/repl/optime.h" + +namespace mongo { + +class OperationContext; + +namespace repl { + +class StorageInterface; +class ReplicationConsistencyMarkers; + +/** + * This class is used by the replication system to recover after an unclean shutdown or a rollback. + */ +class ReplicationRecovery { +public: + ReplicationRecovery() = default; + virtual ~ReplicationRecovery() = default; + + /** + * Recovers the data on disk from the oplog. + */ + virtual void recoverFromOplog(OperationContext* opCtx) = 0; +}; + +class ReplicationRecoveryImpl : public ReplicationRecovery { + MONGO_DISALLOW_COPYING(ReplicationRecoveryImpl); + +public: + ReplicationRecoveryImpl(StorageInterface* storageInterface, + ReplicationConsistencyMarkers* consistencyMarkers); + + void recoverFromOplog(OperationContext* opCtx) override; + +private: + /** + * Gets the last applied OpTime from the end of the oplog. + */ + StatusWith<OpTime> _getLastAppliedOpTime(OperationContext* opCtx) const; + + /** + * Truncates the oplog after and including the "truncateTimestamp" entry. + */ + void _truncateOplogTo(OperationContext* opCtx, Timestamp truncateTimestamp); + + StorageInterface* _storageInterface; + ReplicationConsistencyMarkers* _consistencyMarkers; +}; + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/replication_recovery_mock.h b/src/mongo/db/repl/replication_recovery_mock.h new file mode 100644 index 00000000000..220030ab0b5 --- /dev/null +++ b/src/mongo/db/repl/replication_recovery_mock.h @@ -0,0 +1,48 @@ +/** +* Copyright (C) 2017 MongoDB Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +* +* As a special exception, the copyright holders give permission to link the +* code of portions of this program with the OpenSSL library under certain +* conditions as described in each individual source file and distribute +* linked combinations including the program with the OpenSSL library. You +* must comply with the GNU Affero General Public License in all respects for +* all of the code used other than as permitted herein. If you modify file(s) +* with this exception, you may extend this exception to your version of the +* file(s), but you are not obligated to do so. If you do not wish to do so, +* delete this exception statement from your version. If you delete this +* exception statement from all source files in the program, then also delete +* it in the license file. +*/ + +#pragma once + +#include "mongo/base/disallow_copying.h" +#include "mongo/db/repl/replication_recovery.h" + +namespace mongo { +class OperationContext; +namespace repl { + +class ReplicationRecoveryMock : public ReplicationRecovery { + MONGO_DISALLOW_COPYING(ReplicationRecoveryMock); + +public: + ReplicationRecoveryMock() = default; + + void recoverFromOplog(OperationContext* opCtx) override {} +}; + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/replication_recovery_test.cpp b/src/mongo/db/repl/replication_recovery_test.cpp new file mode 100644 index 00000000000..e03a615287b --- /dev/null +++ b/src/mongo/db/repl/replication_recovery_test.cpp @@ -0,0 +1,350 @@ +/** + * Copyright 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/repl/replication_recovery.h" + +#include "mongo/platform/basic.h" + +#include "mongo/db/client.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/repl/oplog_interface_local.h" +#include "mongo/db/repl/replication_consistency_markers_mock.h" +#include "mongo/db/repl/replication_coordinator_mock.h" +#include "mongo/db/repl/storage_interface_impl.h" +#include "mongo/db/service_context_d_test_fixture.h" +#include "mongo/stdx/memory.h" +#include "mongo/unittest/death_test.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/assert_util.h" +#include "mongo/util/mongoutils/str.h" + +namespace { + +using namespace mongo; +using namespace mongo::repl; + +const auto& oplogNs = NamespaceString::kRsOplogNamespace; +const NamespaceString testNs("a.a"); + +class ReplicationRecoveryTest : public ServiceContextMongoDTest { +protected: + OperationContext* getOperationContext() { + return _opCtx.get(); + } + + StorageInterface* getStorageInterface() { + return _storageInterface.get(); + } + + ReplicationConsistencyMarkers* getConsistencyMarkers() { + return _consistencyMarkers.get(); + } + +private: + void setUp() override { + ServiceContextMongoDTest::setUp(); + _createOpCtx(); + _storageInterface = stdx::make_unique<StorageInterfaceImpl>(); + _consistencyMarkers = stdx::make_unique<ReplicationConsistencyMarkersMock>(); + + auto service = getServiceContext(); + ReplicationCoordinator::set(service, + stdx::make_unique<ReplicationCoordinatorMock>(service)); + + ASSERT_OK(_storageInterface->createCollection( + getOperationContext(), testNs, CollectionOptions())); + } + + void tearDown() override { + _opCtx.reset(nullptr); + _consistencyMarkers.reset(); + _storageInterface.reset(); + ServiceContextMongoDTest::tearDown(); + } + + void _createOpCtx() { + _opCtx = cc().makeOperationContext(); + } + + ServiceContext::UniqueOperationContext _opCtx; + std::unique_ptr<StorageInterfaceImpl> _storageInterface; + std::unique_ptr<ReplicationConsistencyMarkersMock> _consistencyMarkers; +}; + +/** + * Generates a document to be inserted into the test collection. + */ +BSONObj _makeInsertDocument(int t) { + return BSON("_id" << t << "a" << t); +} + +/** + * Generates oplog entries with the given number used for the timestamp. + */ +BSONObj _makeOplogEntry(int t) { + return BSON("ts" << Timestamp(t, t) << "h" << t << "ns" << testNs.ns() << "v" << 2 << "op" + << "i" + << "o" + << _makeInsertDocument(t)); +} + +/** + * Creates collection options suitable for oplog. + */ +CollectionOptions _createOplogCollectionOptions() { + CollectionOptions options; + options.capped = true; + options.cappedSize = 64 * 1024 * 1024LL; + options.autoIndexId = CollectionOptions::NO; + return options; +} + +/** + * Creates an oplog with insert entries at the given timestamps. + */ +void _setUpOplog(OperationContext* opCtx, StorageInterface* storage, std::vector<int> timestamps) { + ASSERT_OK(storage->createCollection(opCtx, oplogNs, _createOplogCollectionOptions())); + + for (int ts : timestamps) { + ASSERT_OK(storage->insertDocument(opCtx, oplogNs, _makeOplogEntry(ts))); + } +} + +/** + * Check collection contents. OplogInterface returns documents in reverse natural order. + */ +void _assertDocumentsInCollectionEquals(OperationContext* opCtx, + const NamespaceString& nss, + const std::vector<BSONObj>& docs) { + std::vector<BSONObj> reversedDocs(docs); + std::reverse(reversedDocs.begin(), reversedDocs.end()); + OplogInterfaceLocal oplog(opCtx, nss.ns()); + auto iter = oplog.makeIterator(); + for (const auto& doc : reversedDocs) { + ASSERT_BSONOBJ_EQ(doc, unittest::assertGet(iter->next()).first); + } + ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus()); +} + +/** + * Asserts that the documents in the oplog have the given timestamps. + */ +void _assertDocsInOplog(OperationContext* opCtx, std::vector<int> timestamps) { + std::vector<BSONObj> expectedOplog(timestamps.size()); + std::transform(timestamps.begin(), timestamps.end(), expectedOplog.begin(), [](int ts) { + return _makeOplogEntry(ts); + }); + _assertDocumentsInCollectionEquals(opCtx, oplogNs, expectedOplog); +} + +/** + * Asserts that the documents in the test collection have the given ids. + */ +void _assertDocsInTestCollection(OperationContext* opCtx, std::vector<int> ids) { + std::vector<BSONObj> expectedColl(ids.size()); + std::transform(ids.begin(), ids.end(), expectedColl.begin(), [](int id) { + return _makeInsertDocument(id); + }); + _assertDocumentsInCollectionEquals(opCtx, testNs, expectedColl); +} + +TEST_F(ReplicationRecoveryTest, RecoveryWithEmptyOplogSucceeds) { + ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers()); + auto opCtx = getOperationContext(); + + _setUpOplog(opCtx, getStorageInterface(), {}); + + recovery.recoverFromOplog(opCtx); + + _assertDocsInOplog(opCtx, {}); + _assertDocsInTestCollection(opCtx, {}); +} + +DEATH_TEST_F(ReplicationRecoveryTest, + RecoveryWithEmptyOplogAndNonNullAppliedThroughInvariants, + "Invariant failure 1U == docs.size()") { + ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers()); + auto opCtx = getOperationContext(); + + _setUpOplog(opCtx, getStorageInterface(), {}); + + getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(3, 3), 1)); + recovery.recoverFromOplog(opCtx); + + _assertDocsInOplog(opCtx, {}); + _assertDocsInTestCollection(opCtx, {}); +} + +DEATH_TEST_F(ReplicationRecoveryTest, + TruncateFassertsWithoutOplogCollection, + "Fatal assertion 34418 NamespaceNotFound: Can't find local.oplog.rs") { + ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers()); + auto opCtx = getOperationContext(); + + getConsistencyMarkers()->setOplogTruncateAfterPoint(opCtx, Timestamp(4, 4)); + getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(3, 3), 1)); + + // Create the database. + ASSERT_OK(getStorageInterface()->createCollection( + opCtx, NamespaceString("local.other"), CollectionOptions())); + + recovery.recoverFromOplog(opCtx); +} + +DEATH_TEST_F(ReplicationRecoveryTest, + TruncateEntireOplogFasserts, + "Reached end of oplog looking for oplog entry before { : Timestamp 4000|4 } but " + "couldn't find any after looking through 3 entries.") { + ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers()); + auto opCtx = getOperationContext(); + + getConsistencyMarkers()->setOplogTruncateAfterPoint(opCtx, Timestamp(4, 4)); + getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(3, 3), 1)); + _setUpOplog(opCtx, getStorageInterface(), {7, 8, 9}); + + recovery.recoverFromOplog(opCtx); +} + +TEST_F(ReplicationRecoveryTest, RecoveryTruncatesOplogAtOplogTruncateAfterPoint) { + ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers()); + auto opCtx = getOperationContext(); + + getConsistencyMarkers()->setOplogTruncateAfterPoint(opCtx, Timestamp(4, 4)); + getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(3, 3), 1)); + _setUpOplog(opCtx, getStorageInterface(), {1, 2, 3, 4, 5}); + + recovery.recoverFromOplog(opCtx); + + _assertDocsInOplog(opCtx, {1, 2, 3}); + _assertDocsInTestCollection(opCtx, {}); + ASSERT_EQ(getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx), Timestamp()); + ASSERT_EQ(getConsistencyMarkers()->getAppliedThrough(opCtx), OpTime(Timestamp(3, 3), 1)); +} + +TEST_F(ReplicationRecoveryTest, RecoverySkipsEverythingIfInitialSyncFlagIsSet) { + ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers()); + auto opCtx = getOperationContext(); + + getConsistencyMarkers()->setInitialSyncFlag(opCtx); + getConsistencyMarkers()->setOplogTruncateAfterPoint(opCtx, Timestamp(4, 4)); + getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(1, 1), 1)); + _setUpOplog(opCtx, getStorageInterface(), {1, 2, 3, 4, 5}); + + recovery.recoverFromOplog(opCtx); + + _assertDocsInOplog(opCtx, {1, 2, 3, 4, 5}); + _assertDocsInTestCollection(opCtx, {}); + ASSERT_EQ(getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx), Timestamp(4, 4)); + ASSERT_EQ(getConsistencyMarkers()->getAppliedThrough(opCtx), OpTime(Timestamp(1, 1), 1)); +} + +TEST_F(ReplicationRecoveryTest, RecoveryResetsOplogTruncateAfterPointWhenAppliedThroughIsNull) { + ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers()); + auto opCtx = getOperationContext(); + + getConsistencyMarkers()->setOplogTruncateAfterPoint(opCtx, Timestamp(4, 4)); + getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime()); + _setUpOplog(opCtx, getStorageInterface(), {1, 2, 3, 4, 5}); + + recovery.recoverFromOplog(opCtx); + + _assertDocsInOplog(opCtx, {1, 2, 3, 4, 5}); + _assertDocsInTestCollection(opCtx, {}); + ASSERT_EQ(getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx), Timestamp()); + ASSERT_EQ(getConsistencyMarkers()->getAppliedThrough(opCtx), OpTime()); +} + +TEST_F(ReplicationRecoveryTest, RecoveryAppliesDocumentsWhenAppliedThroughIsBehind) { + ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers()); + auto opCtx = getOperationContext(); + + getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(3, 3), 1)); + _setUpOplog(opCtx, getStorageInterface(), {1, 2, 3, 4, 5}); + + recovery.recoverFromOplog(opCtx); + + _assertDocsInOplog(opCtx, {1, 2, 3, 4, 5}); + _assertDocsInTestCollection(opCtx, {4, 5}); + ASSERT_EQ(getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx), Timestamp()); + ASSERT_EQ(getConsistencyMarkers()->getAppliedThrough(opCtx), OpTime(Timestamp(5, 5), 1)); +} + +TEST_F(ReplicationRecoveryTest, RecoveryAppliesDocumentsWhenAppliedThroughIsBehindAfterTruncation) { + ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers()); + auto opCtx = getOperationContext(); + + getConsistencyMarkers()->setOplogTruncateAfterPoint(opCtx, Timestamp(4, 4)); + getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(1, 1), 1)); + _setUpOplog(opCtx, getStorageInterface(), {1, 2, 3, 4, 5}); + + recovery.recoverFromOplog(opCtx); + + _assertDocsInOplog(opCtx, {1, 2, 3}); + _assertDocsInTestCollection(opCtx, {2, 3}); + ASSERT_EQ(getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx), Timestamp()); + ASSERT_EQ(getConsistencyMarkers()->getAppliedThrough(opCtx), OpTime(Timestamp(3, 3), 1)); +} + +DEATH_TEST_F(ReplicationRecoveryTest, AppliedThroughBehindOplogFasserts, "Fatal Assertion 40292") { + ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers()); + auto opCtx = getOperationContext(); + + getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(1, 1), 1)); + _setUpOplog(opCtx, getStorageInterface(), {3, 4, 5}); + + recovery.recoverFromOplog(opCtx); +} + +DEATH_TEST_F(ReplicationRecoveryTest, + AppliedThroughAheadOfTopOfOplogCausesFassert, + "Applied op { ts: Timestamp 9000|9, t: 1 } not found. Top of oplog is { ts: Timestamp " + "5000|5, t: -1 }.") { + ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers()); + auto opCtx = getOperationContext(); + + getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(9, 9), 1)); + _setUpOplog(opCtx, getStorageInterface(), {1, 2, 3, 4, 5}); + + recovery.recoverFromOplog(opCtx); +} + +DEATH_TEST_F(ReplicationRecoveryTest, + AppliedThroughNotInOplogCausesFassert, + "Oplog entry at { ts: Timestamp 3000|3, t: 1 } is missing; actual entry found is { " + "ts: Timestamp 4000|4, t: -1 }") { + ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers()); + auto opCtx = getOperationContext(); + + getConsistencyMarkers()->setAppliedThrough(opCtx, OpTime(Timestamp(3, 3), 1)); + _setUpOplog(opCtx, getStorageInterface(), {1, 2, 4, 5}); + + recovery.recoverFromOplog(opCtx); +} + +} // namespace diff --git a/src/mongo/db/repl/rollback_test_fixture.cpp b/src/mongo/db/repl/rollback_test_fixture.cpp index 89d52be56e2..4bfd76b96f4 100644 --- a/src/mongo/db/repl/rollback_test_fixture.cpp +++ b/src/mongo/db/repl/rollback_test_fixture.cpp @@ -38,6 +38,7 @@ #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/repl/replication_process.h" +#include "mongo/db/repl/replication_recovery_mock.h" #include "mongo/db/session_catalog.h" #include "mongo/stdx/memory.h" #include "mongo/util/mongoutils/str.h" @@ -63,7 +64,9 @@ void RollbackTest::setUp() { _serviceContextMongoDTest.setUp(); auto serviceContext = _serviceContextMongoDTest.getServiceContext(); _replicationProcess = stdx::make_unique<ReplicationProcess>( - &_storageInterface, stdx::make_unique<ReplicationConsistencyMarkersMock>()); + &_storageInterface, + stdx::make_unique<ReplicationConsistencyMarkersMock>(), + stdx::make_unique<ReplicationRecoveryMock>()); _dropPendingCollectionReaper = new DropPendingCollectionReaper(&_storageInterface); DropPendingCollectionReaper::set( serviceContext, std::unique_ptr<DropPendingCollectionReaper>(_dropPendingCollectionReaper)); diff --git a/src/mongo/db/repl/sync_tail_test_fixture.cpp b/src/mongo/db/repl/sync_tail_test_fixture.cpp index a67817924ba..c9998c89660 100644 --- a/src/mongo/db/repl/sync_tail_test_fixture.cpp +++ b/src/mongo/db/repl/sync_tail_test_fixture.cpp @@ -36,6 +36,7 @@ #include "mongo/db/repl/replication_consistency_markers_mock.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/repl/replication_process.h" +#include "mongo/db/repl/replication_recovery_mock.h" #include "mongo/db/repl/storage_interface_mock.h" namespace mongo { @@ -56,8 +57,10 @@ void SyncTailTest::setUp() { DropPendingCollectionReaper::set( service, stdx::make_unique<DropPendingCollectionReaper>(_storageInterface)); - _replicationProcess = new ReplicationProcess( - _storageInterface, stdx::make_unique<ReplicationConsistencyMarkersMock>()); + _replicationProcess = + new ReplicationProcess(_storageInterface, + stdx::make_unique<ReplicationConsistencyMarkersMock>(), + stdx::make_unique<ReplicationRecoveryMock>()); ReplicationProcess::set(cc().getServiceContext(), std::unique_ptr<ReplicationProcess>(_replicationProcess)); diff --git a/src/mongo/dbtests/replica_set_tests.cpp b/src/mongo/dbtests/replica_set_tests.cpp index e70a221fa57..c6453e49e2e 100644 --- a/src/mongo/dbtests/replica_set_tests.cpp +++ b/src/mongo/dbtests/replica_set_tests.cpp @@ -35,6 +35,7 @@ #include "mongo/db/repl/replication_consistency_markers_impl.h" #include "mongo/db/repl/replication_coordinator_external_state_impl.h" #include "mongo/db/repl/replication_process.h" +#include "mongo/db/repl/replication_recovery.h" #include "mongo/db/repl/storage_interface_mock.h" #include "mongo/db/service_context.h" #include "mongo/unittest/unittest.h" @@ -53,9 +54,12 @@ protected: _storageInterface = stdx::make_unique<repl::StorageInterfaceMock>(); _dropPendingCollectionReaper = stdx::make_unique<repl::DropPendingCollectionReaper>(_storageInterface.get()); + auto consistencyMarkers = + stdx::make_unique<repl::ReplicationConsistencyMarkersImpl>(_storageInterface.get()); + auto recovery = stdx::make_unique<repl::ReplicationRecoveryImpl>(_storageInterface.get(), + consistencyMarkers.get()); _replicationProcess = stdx::make_unique<repl::ReplicationProcess>( - _storageInterface.get(), - stdx::make_unique<repl::ReplicationConsistencyMarkersImpl>(_storageInterface.get())); + _storageInterface.get(), std::move(consistencyMarkers), std::move(recovery)); _replCoordExternalState = stdx::make_unique<repl::ReplicationCoordinatorExternalStateImpl>( opCtx->getServiceContext(), _dropPendingCollectionReaper.get(), diff --git a/src/mongo/s/sharding_mongod_test_fixture.cpp b/src/mongo/s/sharding_mongod_test_fixture.cpp index 3a9fa023b2a..453951a1735 100644 --- a/src/mongo/s/sharding_mongod_test_fixture.cpp +++ b/src/mongo/s/sharding_mongod_test_fixture.cpp @@ -51,6 +51,7 @@ #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/repl/replication_process.h" +#include "mongo/db/repl/replication_recovery_mock.h" #include "mongo/db/repl/storage_interface_mock.h" #include "mongo/db/service_context_noop.h" #include "mongo/executor/network_interface_mock.h" @@ -130,10 +131,11 @@ void ShardingMongodTestFixture::setUp() { repl::DropPendingCollectionReaper::set( service, stdx::make_unique<repl::DropPendingCollectionReaper>(storagePtr.get())); - repl::ReplicationProcess::set( - service, - stdx::make_unique<repl::ReplicationProcess>( - storagePtr.get(), stdx::make_unique<repl::ReplicationConsistencyMarkersMock>())); + repl::ReplicationProcess::set(service, + stdx::make_unique<repl::ReplicationProcess>( + storagePtr.get(), + stdx::make_unique<repl::ReplicationConsistencyMarkersMock>(), + stdx::make_unique<repl::ReplicationRecoveryMock>())); repl::ReplicationProcess::get(_opCtx.get()) ->initializeRollbackID(_opCtx.get()) .transitional_ignore(); |