diff options
author | Jason Chan <jason.chan@mongodb.com> | 2021-01-08 00:35:31 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-01-26 21:37:22 +0000 |
commit | 3b87ecca61a77614d03f01a36a6ea4e155917ff0 (patch) | |
tree | f77f3c04a0df2ffe50b9564573bfbfadd522ef24 | |
parent | 66f0361649bc5239460a6764cacf6c448fe190e9 (diff) | |
download | mongo-3b87ecca61a77614d03f01a36a6ea4e155917ff0.tar.gz |
SERVER-52723 Handle oplog application restart in TenantMigrationRecipientService
-rw-r--r-- | jstests/replsets/tenant_migration_resume_oplog_application.js | 108 | ||||
-rw-r--r-- | src/mongo/db/repl/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher.h | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_recipient_service.cpp | 113 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_recipient_service.h | 10 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_recipient_service_test.cpp | 664 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_oplog_applier.cpp | 31 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_oplog_applier.h | 9 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_oplog_batcher.cpp | 27 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_oplog_batcher.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_oplog_batcher_test.cpp | 82 |
12 files changed, 1011 insertions, 48 deletions
diff --git a/jstests/replsets/tenant_migration_resume_oplog_application.js b/jstests/replsets/tenant_migration_resume_oplog_application.js new file mode 100644 index 00000000000..bff439365bd --- /dev/null +++ b/jstests/replsets/tenant_migration_resume_oplog_application.js @@ -0,0 +1,108 @@ +/** + * Tests that in a tenant migration, the recipient primary will resume oplog application on + * failover. + * @tags: [requires_majority_read_concern, requires_fcv_49] + */ + +(function() { +"use strict"; + +load("jstests/libs/fail_point_util.js"); +load("jstests/libs/uuid_util.js"); // for 'extractUUIDFromObject' +load("jstests/libs/parallelTester.js"); // for 'Thread' +load("jstests/libs/write_concern_util.js"); // for 'stopReplicationOnSecondaries' +load("jstests/aggregation/extras/utils.js"); // For assertArrayEq. +load("jstests/replsets/libs/tenant_migration_test.js"); +load("jstests/replsets/libs/tenant_migration_util.js"); + +const recipientRst = new ReplSetTest({ + nodes: 3, + name: jsTestName() + "_recipient", + // Use a batch size of 2 so that we can hang in the middle of tenant oplog application. + nodeOptions: Object.assign(TenantMigrationUtil.makeX509OptionsForTest().recipient, + {setParameter: {tenantApplierBatchSizeOps: 2}}) +}); + +recipientRst.startSet(); +recipientRst.initiate(); +if (!TenantMigrationUtil.isFeatureFlagEnabled(recipientRst.getPrimary())) { + jsTestLog("Skipping test because the tenant migrations feature flag is disabled"); + recipientRst.stopSet(); + return; +} +const tenantMigrationTest = + new TenantMigrationTest({name: jsTestName(), recipientRst: recipientRst}); + +const tenantId = "testTenantId"; +const dbName = tenantMigrationTest.tenantDB(tenantId, "testDB"); +const collName = "testColl"; + +const donorPrimary = tenantMigrationTest.getDonorPrimary(); +const recipientPrimary = tenantMigrationTest.getRecipientPrimary(); +const donorRst = tenantMigrationTest.getDonorRst(); +const donorTestColl = donorPrimary.getDB(dbName).getCollection(collName); + +// Populate the donor replica set with some initial data and make sure it is majority committed. +const majorityCommittedDocs = [{_id: 0, x: 0}, {_id: 1, x: 1}]; +assert.commandWorked(donorTestColl.insert(majorityCommittedDocs, {writeConcern: {w: "majority"}})); +assert.eq(2, donorTestColl.find().readConcern("majority").itcount()); + +const migrationId = UUID(); +const migrationOpts = { + migrationIdString: extractUUIDFromObject(migrationId), + recipientConnString: tenantMigrationTest.getRecipientConnString(), + tenantId: tenantId, +}; + +// Configure fail point to have the recipient primary hang after the cloner completes and the oplog +// applier has started. +let waitAfterDatabaseClone = configureFailPoint( + recipientPrimary, "fpAfterStartingOplogApplierMigrationRecipientInstance", {action: "hang"}); +// Configure fail point to hang the tenant oplog applier after it applies the first batch. +let waitInOplogApplier = configureFailPoint(recipientPrimary, "hangInTenantOplogApplication"); + +// Start a migration and wait for recipient to hang in the tenant database cloner. +const donorRstArgs = TenantMigrationUtil.createRstArgs(donorRst); +const migrationThread = + new Thread(TenantMigrationUtil.runMigrationAsync, migrationOpts, donorRstArgs); +migrationThread.start(); +waitAfterDatabaseClone.wait(); + +// Insert some writes that will eventually be picked up by the tenant oplog applier on the +// recipient. +const docsToApply = [{_id: 2, x: 2}, {_id: 3, x: 3}, {_id: 4, x: 4}]; +tenantMigrationTest.insertDonorDB(dbName, collName, docsToApply); + +// Wait for the applied oplog batch to be replicated. +waitInOplogApplier.wait(); +recipientRst.awaitReplication(); +let local = recipientPrimary.getDB("local"); +let appliedNoOps = local.oplog.rs.find({fromTenantMigration: migrationId, op: "n"}); +let resultsArr = appliedNoOps.toArray(); +// We should have applied the no-op oplog entries for the first batch of documents (size 2). +assert.eq(2, appliedNoOps.count(), appliedNoOps); +// No-op entries will be in the same order. +assert.eq(docsToApply[0], resultsArr[0].o.o, resultsArr); +assert.eq(docsToApply[1], resultsArr[1].o.o, resultsArr); + +// Step up a new node in the recipient set and trigger a failover. The new primary should resume +// fetching starting from the unapplied documents. +const newRecipientPrimary = recipientRst.getSecondaries()[0]; +assert.commandWorked(newRecipientPrimary.adminCommand({replSetStepUp: 1})); +waitAfterDatabaseClone.off(); +waitInOplogApplier.off(); +recipientRst.getPrimary(); + +// The migration should go through after recipient failover. +assert.commandWorked(migrationThread.returnData()); +// Validate that the last no-op entry is applied. +local = newRecipientPrimary.getDB("local"); +appliedNoOps = local.oplog.rs.find({fromTenantMigration: migrationId, op: "n"}); +resultsArr = appliedNoOps.toArray(); +assert.eq(3, appliedNoOps.count(), appliedNoOps); +assert.eq(docsToApply[2], resultsArr[2].o.o, resultsArr); + +tenantMigrationTest.checkTenantDBHashes(tenantId); +tenantMigrationTest.stop(); +recipientRst.stopSet(); +})();
\ No newline at end of file diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index d92e3262a38..ca18216794f 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -1318,6 +1318,7 @@ env.Library( 'oplog_buffer_collection', 'oplog_entry', 'oplog_fetcher', + 'oplog_interface_local', 'repl_server_parameters', 'replication_auth', 'tenant_migration_cloners', diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp index 8f82f9be8ec..d5948177dbb 100644 --- a/src/mongo/db/repl/oplog_fetcher.cpp +++ b/src/mongo/db/repl/oplog_fetcher.cpp @@ -129,7 +129,6 @@ StatusWith<OplogFetcher::DocumentsInfo> OplogFetcher::validateDocuments( "least 1 document matching ts: " << lastTS.toString()); } - DocumentsInfo info; // The count of the bytes of the documents read off the network. info.networkDocumentBytes = 0; @@ -252,6 +251,10 @@ std::string OplogFetcher::toString() { return output; } +OplogFetcher::StartingPoint OplogFetcher::getStartingPoint_forTest() const { + return _config.startingPoint; +} + OpTime OplogFetcher::getLastOpTimeFetched_forTest() const { return _getLastOpTimeFetched(); } diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h index acd43fbb18b..58a0f80f9a7 100644 --- a/src/mongo/db/repl/oplog_fetcher.h +++ b/src/mongo/db/repl/oplog_fetcher.h @@ -261,6 +261,11 @@ public: // ================== Test support API =================== /** + * Returns the StartingPoint defined in the OplogFetcher::Config. + */ + StartingPoint getStartingPoint_forTest() const; + + /** * Returns the `find` query run on the sync source's oplog. */ BSONObj getFindQuery_forTest(long long findTimeout) const; diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp index a63f2094bd7..725bf40ffc4 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp @@ -44,6 +44,7 @@ #include "mongo/db/repl/oplog_applier.h" #include "mongo/db/repl/oplog_buffer_collection.h" #include "mongo/db/repl/oplog_entry.h" +#include "mongo/db/repl/oplog_interface_local.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/repl_server_parameters_gen.h" @@ -56,6 +57,7 @@ #include "mongo/db/write_concern_options.h" #include "mongo/logv2/log.h" #include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/util/assert_util.h" #include "mongo/util/future_util.h" namespace mongo { @@ -69,6 +71,7 @@ MONGO_FAIL_POINT_DEFINE(pauseBeforeRunTenantMigrationRecipientInstance); MONGO_FAIL_POINT_DEFINE(pauseAfterRunTenantMigrationRecipientInstance); MONGO_FAIL_POINT_DEFINE(skipTenantMigrationRecipientAuth); MONGO_FAIL_POINT_DEFINE(autoRecipientForgetMigration); +MONGO_FAIL_POINT_DEFINE(pauseAfterCreatingOplogBuffer); // Fails before waiting for the state doc to be majority replicated. MONGO_FAIL_POINT_DEFINE(failWhilePersistingTenantMigrationRecipientInstanceStateDoc); @@ -484,7 +487,12 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_initializeStateDoc( .semi(); } -void TenantMigrationRecipientService::Instance::_getStartOpTimesFromDonor(WithLock) { +void TenantMigrationRecipientService::Instance::_getStartOpTimesFromDonor(WithLock lk) { + if (_isCloneCompletedMarkerSet(lk)) { + invariant(_stateDoc.getStartApplyingDonorOpTime().has_value()); + invariant(_stateDoc.getStartFetchingDonorOpTime().has_value()); + return; + } // Get the last oplog entry at the read concern majority optime in the remote oplog. It // does not matter which tenant it is for. auto oplogOpTimeFields = @@ -572,9 +580,21 @@ void TenantMigrationRecipientService::Instance::_startOplogFetcher() { _donorOplogBuffer = std::make_unique<OplogBufferCollection>( StorageInterface::get(opCtx.get()), oplogBufferNs, options); _donorOplogBuffer->startup(opCtx.get()); + + pauseAfterCreatingOplogBuffer.pauseWhileSet(); + _dataReplicatorExternalState = std::make_unique<DataReplicatorExternalStateTenantMigration>(); + auto startFetchOpTime = *_stateDoc.getStartFetchingDonorOpTime(); + auto resumingFromOplogBuffer = false; + if (_isCloneCompletedMarkerSet(lk)) { + auto topOfOplogBuffer = _donorOplogBuffer->lastObjectPushed(opCtx.get()); + if (topOfOplogBuffer) { + startFetchOpTime = uassertStatusOK(OpTime::parseFromOplogEntry(topOfOplogBuffer.get())); + resumingFromOplogBuffer = true; + } + } OplogFetcher::Config oplogFetcherConfig( - *_stateDoc.getStartFetchingDonorOpTime(), + startFetchOpTime, _oplogFetcherClient->getServerHostAndPort(), // The config is only used for setting the awaitData timeout; the defaults are fine. ReplSetConfig::parse(BSON("_id" @@ -590,7 +610,9 @@ void TenantMigrationRecipientService::Instance::_startOplogFetcher() { oplogFetcherConfig.requestResumeToken = true; oplogFetcherConfig.name = "TenantOplogFetcher_" + getTenantId() + "_" + getMigrationUUID().toString(); - oplogFetcherConfig.startingPoint = OplogFetcher::StartingPoint::kEnqueueFirstDoc; + oplogFetcherConfig.startingPoint = resumingFromOplogBuffer + ? OplogFetcher::StartingPoint::kSkipFirstDoc + : OplogFetcher::StartingPoint::kEnqueueFirstDoc; _donorOplogFetcher = (*_createOplogFetcherFn)( (**_scopedExecutor).get(), @@ -705,6 +727,54 @@ bool TenantMigrationRecipientService::Instance::_isCloneCompletedMarkerSet(WithL return _stateDoc.getCloneFinishedRecipientOpTime().has_value(); } +OpTime TenantMigrationRecipientService::Instance::_getOplogResumeApplyingDonorOptime( + const OpTime startApplyingDonorOpTime, const OpTime cloneFinishedRecipientOpTime) const { + invariant(_stateDoc.getCloneFinishedRecipientOpTime().has_value()); + auto opCtx = cc().makeOperationContext(); + OplogInterfaceLocal oplog(opCtx.get()); + auto oplogIter = oplog.makeIterator(); + auto result = oplogIter->next(); + + while (result.isOK()) { + const auto oplogObj = result.getValue().first; + auto swRecipientOpTime = repl::OpTime::parseFromOplogEntry(oplogObj); + uassert(5272311, + str::stream() << "Unable to parse opTime from oplog entry: " << redact(oplogObj) + << ", error: " << swRecipientOpTime.getStatus(), + swRecipientOpTime.isOK()); + if (swRecipientOpTime.getValue() <= cloneFinishedRecipientOpTime) { + break; + } + const bool isFromCurrentMigration = oplogObj.hasField("fromTenantMigration") && + (uassertStatusOK(UUID::parse(oplogObj.getField("fromTenantMigration"))) == + getMigrationUUID()); + // Find the most recent no-op oplog entry from the current migration. + if (isFromCurrentMigration && + (oplogObj.getStringField("op") == OpType_serializer(repl::OpTypeEnum::kNoop))) { + const auto migratedEntryObj = oplogObj.getObjectField("o"); + const auto swDonorOpTime = repl::OpTime::parseFromOplogEntry(migratedEntryObj); + uassert(5272305, + str::stream() << "Unable to parse opTime from tenant migration oplog entry: " + << redact(oplogObj) << ", error: " << swDonorOpTime.getStatus(), + swDonorOpTime.isOK()); + if (swDonorOpTime.getValue() < startApplyingDonorOpTime) { + break; + } + LOGV2_DEBUG(5272302, + 1, + "Found an optime to resume oplog application from", + "opTime"_attr = swDonorOpTime.getValue()); + return swDonorOpTime.getValue(); + } + result = oplogIter->next(); + } + LOGV2_DEBUG(5272304, + 1, + "Resuming oplog application from startApplyingDonorOpTime", + "opTime"_attr = startApplyingDonorOpTime); + return startApplyingDonorOpTime; +} + Future<void> TenantMigrationRecipientService::Instance::_startTenantAllDatabaseCloner(WithLock lk) { // If the state is data consistent, do not start the cloner. if (_isCloneCompletedMarkerSet(lk)) { @@ -1096,7 +1166,7 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run( .then([this, self = shared_from_this()] { _stopOrHangOnFailPoint(&fpAfterStartingOplogFetcherMigrationRecipientInstance); - stdx::lock_guard lk(_mutex); + stdx::unique_lock lk(_mutex); { // Throwing error when cloner is canceled externally via interrupt(), makes the @@ -1110,20 +1180,37 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run( // Create the oplog applier but do not start it yet. invariant(_stateDoc.getStartApplyingDonorOpTime()); + + OpTime beginApplyingAfterOpTime; + bool isResuming = false; + if (_isCloneCompletedMarkerSet(lk)) { + const auto startApplyingDonorOpTime = *_stateDoc.getStartApplyingDonorOpTime(); + const auto cloneFinishedRecipientOptime = + *_stateDoc.getCloneFinishedRecipientOpTime(); + lk.unlock(); + // We avoid holding the mutex while scanning the local oplog which acquires the RSTL + // in IX mode. This is to allow us to be interruptable via a concurrent stepDown + // which acquires the RSTL in X mode. + beginApplyingAfterOpTime = _getOplogResumeApplyingDonorOptime( + startApplyingDonorOpTime, cloneFinishedRecipientOptime); + isResuming = beginApplyingAfterOpTime > startApplyingDonorOpTime; + lk.lock(); + } else { + beginApplyingAfterOpTime = *_stateDoc.getStartApplyingDonorOpTime(); + } LOGV2_DEBUG(4881202, 1, "Recipient migration service creating oplog applier", "tenantId"_attr = getTenantId(), "migrationId"_attr = getMigrationUUID(), - "startApplyingDonorOpTime"_attr = *_stateDoc.getStartApplyingDonorOpTime()); - - _tenantOplogApplier = - std::make_shared<TenantOplogApplier>(_migrationUuid, - _tenantId, - *_stateDoc.getStartApplyingDonorOpTime(), - _donorOplogBuffer.get(), - **_scopedExecutor, - _writerPool.get()); + "startApplyingDonorOpTime"_attr = beginApplyingAfterOpTime); + _tenantOplogApplier = std::make_shared<TenantOplogApplier>(_migrationUuid, + _tenantId, + beginApplyingAfterOpTime, + _donorOplogBuffer.get(), + **_scopedExecutor, + _writerPool.get(), + isResuming); // Start the cloner. auto clonerFuture = _startTenantAllDatabaseCloner(lk); diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.h b/src/mongo/db/repl/tenant_migration_recipient_service.h index 5b8cf10bf47..ddcb0b16b2e 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.h +++ b/src/mongo/db/repl/tenant_migration_recipient_service.h @@ -306,7 +306,7 @@ public: /** * Retrieves the start optimes from the donor and updates the in-memory state accordingly. */ - void _getStartOpTimesFromDonor(WithLock); + void _getStartOpTimesFromDonor(WithLock lk); /** * Pushes documents from oplog fetcher to oplog buffer. @@ -340,6 +340,14 @@ public: bool _isCloneCompletedMarkerSet(WithLock) const; /* + * Traverse backwards through the oplog to find the optime which tenant oplog application + * should resume from. The oplog applier should resume applying entries that have a greater + * optime than the returned value. + */ + OpTime _getOplogResumeApplyingDonorOptime(const OpTime startApplyingDonorOpTime, + const OpTime cloneFinishedRecipientOpTime) const; + + /* * Starts the tenant cloner. * Returns future that will be fulfilled when the cloner completes. */ diff --git a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp index fd04b139e13..26cb2aad961 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp @@ -101,6 +101,23 @@ OplogEntry makeOplogEntry(OpTime opTime, boost::none)}; // _id } +MutableOplogEntry makeNoOpOplogEntry(OpTime opTime, + NamespaceString nss, + OptionalCollectionUUID uuid, + BSONObj o, + boost::optional<UUID> migrationUUID) { + MutableOplogEntry oplogEntry; + oplogEntry.setOpType(repl::OpTypeEnum::kNoop); + oplogEntry.setOpTime(opTime); + oplogEntry.setNss(nss); + oplogEntry.setObject(o); + oplogEntry.setWallClockTime(Date_t::now()); + if (migrationUUID) { + oplogEntry.setFromTenantMigration(migrationUUID.get()); + } + return oplogEntry; +} + /** * Generates a listDatabases response for an TenantAllDatabaseCloner to consume. */ @@ -324,6 +341,11 @@ protected: return instance->_donorOplogBuffer.get(); } + TenantOplogApplier* getTenantOplogApplier( + const TenantMigrationRecipientService::Instance* instance) const { + return instance->_tenantOplogApplier.get(); + } + const TenantMigrationRecipientDocument& getStateDoc( const TenantMigrationRecipientService::Instance* instance) const { return instance->_stateDoc; @@ -341,6 +363,23 @@ protected: */ Date_t now() { return _clkSource->now(); + }; + + /* + * Populates the migration state document to simulate a recipient service restart where cloning + * has already finished. This requires the oplog buffer to contain an oplog entry with the + * optime to resume from. Otherwise, oplog application will fail when the OplogBatcher seeks + * to the resume timestamp. + */ + void updateStateDocToCloningFinished(TenantMigrationRecipientDocument& initialStateDoc, + OpTime cloneFinishedRecipientOpTime, + OpTime dataConsistentStopDonorOpTime, + OpTime startApplyingDonorOpTime, + OpTime startFetchingDonorOptime) { + initialStateDoc.setCloneFinishedRecipientOpTime(cloneFinishedRecipientOpTime); + initialStateDoc.setDataConsistentStopDonorOpTime(dataConsistentStopDonorOpTime); + initialStateDoc.setStartApplyingDonorOpTime(startApplyingDonorOpTime); + initialStateDoc.setStartFetchingDonorOpTime(startFetchingDonorOptime); } private: @@ -1259,9 +1298,11 @@ TEST_F(TenantMigrationRecipientServiceTest, OplogFetcherFailsDuringOplogApplicat ReadPreferenceSetting(ReadPreference::PrimaryOnly), kRecipientPEMPayload); - // Setting these causes us to skip cloning. - initialStateDocument.setCloneFinishedRecipientOpTime(topOfOplogOpTime); - initialStateDocument.setDataConsistentStopDonorOpTime(topOfOplogOpTime); + // Skip the cloners in this test, so we provide an empty list of databases. + MockRemoteDBServer* const _donorServer = + mongo::MockConnRegistry::get()->getMockRemoteDBServer(replSet.getPrimary()); + _donorServer->setCommandReply("listDatabases", makeListDatabasesResponse({})); + _donorServer->setCommandReply("find", makeFindResponse()); auto opCtx = makeOperationContext(); std::shared_ptr<TenantMigrationRecipientService::Instance> instance; @@ -1296,6 +1337,591 @@ TEST_F(TenantMigrationRecipientServiceTest, OplogFetcherFailsDuringOplogApplicat ASSERT_OK(instance->getCompletionFuture().getNoThrow()); } +TEST_F(TenantMigrationRecipientServiceTest, OplogFetcherResumesFromTopOfOplogBuffer) { + const UUID migrationUUID = UUID::gen(); + const OpTime initialOpTime(Timestamp(1, 1), 1); + const OpTime dataConsistentOpTime(Timestamp(4, 1), 1); + + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); + insertTopOfOplog(&replSet, initialOpTime); + + TenantMigrationRecipientDocument initialStateDocument( + migrationUUID, + replSet.getConnectionString(), + "tenantA", + ReadPreferenceSetting(ReadPreference::PrimaryOnly), + kRecipientPEMPayload); + + // We skip cloning here as a way to simulate that the recipient service has detected an existing + // migration on startup and will resume oplog fetching from the appropriate optime. + updateStateDocToCloningFinished( + initialStateDocument, initialOpTime, dataConsistentOpTime, initialOpTime, initialOpTime); + + // Hang after creating the oplog buffer collection but before starting the oplog fetcher. + const auto hangBeforeFetcherFp = + globalFailPointRegistry().find("pauseAfterCreatingOplogBuffer"); + auto initialTimesEntered = hangBeforeFetcherFp->setMode(FailPoint::alwaysOn, + 0, + BSON("action" + << "hang")); + + auto opCtx = makeOperationContext(); + std::shared_ptr<TenantMigrationRecipientService::Instance> instance; + { + FailPointEnableBlock fp("pauseBeforeRunTenantMigrationRecipientInstance"); + // Create and start the instance. + instance = TenantMigrationRecipientService::Instance::getOrCreate( + opCtx.get(), _service, initialStateDocument.toBSON()); + ASSERT(instance.get()); + instance->setCreateOplogFetcherFn_forTest(std::make_unique<CreateOplogFetcherMockFn>()); + } + + hangBeforeFetcherFp->waitForTimesEntered(initialTimesEntered + 1); + + const auto oplogBuffer = getDonorOplogBuffer(instance.get()); + OplogBuffer::Batch batch1; + const OpTime resumeOpTime(Timestamp(2, 1), initialOpTime.getTerm()); + auto resumeOplogBson = makeOplogEntry(resumeOpTime, + OpTypeEnum::kInsert, + NamespaceString("tenantA_foo.bar"), + UUID::gen(), + BSON("doc" << 2), + boost::none /* o2 */) + .getEntry() + .toBSON(); + batch1.push_back(resumeOplogBson); + oplogBuffer->push(opCtx.get(), batch1.cbegin(), batch1.cend()); + ASSERT_EQUALS(oplogBuffer->getCount(), 1); + + // Continue the recipient service to hang after starting the oplog applier. + const auto hangAfterStartingOplogApplier = + globalFailPointRegistry().find("fpAfterStartingOplogApplierMigrationRecipientInstance"); + initialTimesEntered = hangAfterStartingOplogApplier->setMode(FailPoint::alwaysOn, + 0, + BSON("action" + << "hang")); + hangBeforeFetcherFp->setMode(FailPoint::off); + hangAfterStartingOplogApplier->waitForTimesEntered(initialTimesEntered + 1); + + // The oplog fetcher should exist and be running. + auto oplogFetcher = checked_cast<OplogFetcherMock*>(getDonorOplogFetcher(instance.get())); + ASSERT_TRUE(oplogFetcher != nullptr); + ASSERT_TRUE(oplogFetcher->isActive()); + // The oplog fetcher should have started fetching from resumeOpTime. + ASSERT_EQUALS(oplogFetcher->getLastOpTimeFetched_forTest(), resumeOpTime); + ASSERT(oplogFetcher->getStartingPoint_forTest() == OplogFetcher::StartingPoint::kSkipFirstDoc); + + hangAfterStartingOplogApplier->setMode(FailPoint::off); + + // Feed the oplog fetcher the last doc required for us to be considered consistent. + auto dataConsistentOplogEntry = makeOplogEntry(dataConsistentOpTime, + OpTypeEnum::kInsert, + NamespaceString("tenantA_foo.bar"), + UUID::gen(), + BSON("doc" << 3), + boost::none /* o2 */); + oplogFetcher->receiveBatch( + 1, {dataConsistentOplogEntry.getEntry().toBSON()}, dataConsistentOpTime.getTimestamp()); + + LOGV2(5272308, + "Waiting for recipient service to reach consistent state", + "suite"_attr = _agent.getSuiteName(), + "test"_attr = _agent.getTestName()); + instance->waitUntilMigrationReachesConsistentState(opCtx.get()); + + // Stop the oplog applier. + instance->stopOplogApplier_forTest(); + // Wait for task completion. Since we're using a test function to cancel the applier, + // the actual result is not critical. + ASSERT_NOT_OK(instance->getDataSyncCompletionFuture().getNoThrow()); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); +} + +TEST_F(TenantMigrationRecipientServiceTest, OplogFetcherNoDocInBufferToResumeFrom) { + const UUID migrationUUID = UUID::gen(); + const OpTime initialOpTime(Timestamp(1, 1), 1); + const OpTime startFetchingOpTime(Timestamp(2, 1), 1); + const OpTime clonerFinishedOpTime(Timestamp(3, 1), 1); + const OpTime resumeFetchingOpTime(Timestamp(4, 1), 1); + const OpTime dataConsistentOpTime(Timestamp(5, 1), 1); + + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); + insertTopOfOplog(&replSet, initialOpTime); + + TenantMigrationRecipientDocument initialStateDocument( + migrationUUID, + replSet.getConnectionString(), + "tenantA", + ReadPreferenceSetting(ReadPreference::PrimaryOnly), + kRecipientPEMPayload); + + // We skip cloning here as a way to simulate that the recipient service has detected an existing + // migration on startup and will attempt to resume oplog fetching from the appropriate optime. + updateStateDocToCloningFinished(initialStateDocument, + clonerFinishedOpTime /* clonerFinishedRecipientOpTime */, + dataConsistentOpTime /* dataConsistentStopDonorOpTime */, + startFetchingOpTime /* startApplyingDonorOpTime */, + startFetchingOpTime /* startFetchingDonorOpTime */); + + auto opCtx = makeOperationContext(); + std::shared_ptr<TenantMigrationRecipientService::Instance> instance; + + // Hang after creating the oplog buffer collection but before starting the oplog fetcher. + const auto hangBeforeFetcherFp = + globalFailPointRegistry().find("pauseAfterCreatingOplogBuffer"); + auto initialTimesEntered = hangBeforeFetcherFp->setMode(FailPoint::alwaysOn, + 0, + BSON("action" + << "hang")); + { + FailPointEnableBlock fp("pauseBeforeRunTenantMigrationRecipientInstance"); + // Create and start the instance. + instance = TenantMigrationRecipientService::Instance::getOrCreate( + opCtx.get(), _service, initialStateDocument.toBSON()); + ASSERT(instance.get()); + instance->setCreateOplogFetcherFn_forTest(std::make_unique<CreateOplogFetcherMockFn>()); + } + + hangBeforeFetcherFp->waitForTimesEntered(initialTimesEntered + 1); + + // There are no documents in the oplog buffer to resume fetching from. + const auto oplogBuffer = getDonorOplogBuffer(instance.get()); + ASSERT_EQUALS(oplogBuffer->getCount(), 0); + + // Continue and hang before starting the oplog applier. + const auto hangAfterStartingOplogApplier = + globalFailPointRegistry().find("fpAfterStartingOplogApplierMigrationRecipientInstance"); + hangBeforeFetcherFp->setMode(FailPoint::off); + initialTimesEntered = hangAfterStartingOplogApplier->setMode(FailPoint::alwaysOn, + 0, + BSON("action" + << "hang")); + hangAfterStartingOplogApplier->waitForTimesEntered(initialTimesEntered + 1); + + // The oplog fetcher should exist and be running. + auto oplogFetcher = checked_cast<OplogFetcherMock*>(getDonorOplogFetcher(instance.get())); + ASSERT_TRUE(oplogFetcher != nullptr); + ASSERT_TRUE(oplogFetcher->isActive()); + // The oplog fetcher should have started fetching from 'startFetchingOpTime'. Since no document + // was found in the oplog buffer, we should have set the 'StartingPoint' to 'kEnqueueFirstDoc'. + ASSERT_EQUALS(oplogFetcher->getLastOpTimeFetched_forTest(), startFetchingOpTime); + ASSERT(oplogFetcher->getStartingPoint_forTest() == + OplogFetcher::StartingPoint::kEnqueueFirstDoc); + + // Feed the oplog fetcher the last doc required for the recipient to be considered consistent. + const auto tenantNss = NamespaceString("tenantA_foo.bar"); + auto resumeFetchingOplogEntry = makeOplogEntry(resumeFetchingOpTime, + OpTypeEnum::kInsert, + tenantNss, + UUID::gen(), + BSON("doc" << 1), + boost::none /* o2 */); + auto dataConsistentOplogEntry = makeOplogEntry(dataConsistentOpTime, + OpTypeEnum::kInsert, + tenantNss, + UUID::gen(), + BSON("doc" << 3), + boost::none /* o2 */); + oplogFetcher->receiveBatch(1, + {resumeFetchingOplogEntry.getEntry().toBSON(), + dataConsistentOplogEntry.getEntry().toBSON()}, + dataConsistentOpTime.getTimestamp()); + + // Allow the service to continue. + hangAfterStartingOplogApplier->setMode(FailPoint::off); + LOGV2(5272310, + "Waiting for recipient service to reach consistent state", + "suite"_attr = _agent.getSuiteName(), + "test"_attr = _agent.getTestName()); + instance->waitUntilMigrationReachesConsistentState(opCtx.get()); + + // Stop the oplog applier. + instance->stopOplogApplier_forTest(); + // Wait for task completion. Since we're using a test function to cancel the applier, + // the actual result is not critical. + ASSERT_NOT_OK(instance->getDataSyncCompletionFuture().getNoThrow()); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); +} + +TEST_F(TenantMigrationRecipientServiceTest, OplogApplierResumesFromLastNoOpOplogEntry) { + const UUID migrationUUID = UUID::gen(); + const OpTime initialOpTime(Timestamp(1, 1), 1); + const OpTime clonerFinishedOpTime(Timestamp(2, 1), 1); + const OpTime resumeOpTime(Timestamp(3, 1), 1); + const OpTime dataConsistentOpTime(Timestamp(4, 1), 1); + + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); + insertTopOfOplog(&replSet, initialOpTime); + + TenantMigrationRecipientDocument initialStateDocument( + migrationUUID, + replSet.getConnectionString(), + "tenantA", + ReadPreferenceSetting(ReadPreference::PrimaryOnly), + kRecipientPEMPayload); + + // We skip cloning here as a way to simulate that the recipient service has detected an existing + // migration on startup and will attempt to resume oplog fetching from the appropriate optime. + updateStateDocToCloningFinished(initialStateDocument, + clonerFinishedOpTime /* cloneFinishedRecipientOpTime */, + dataConsistentOpTime /* dataConsistentStopDonorOpTime */, + clonerFinishedOpTime /* startApplyingDonorOpTime */, + initialOpTime /* startFetchingDonorOpTime */); + + auto opCtx = makeOperationContext(); + std::shared_ptr<TenantMigrationRecipientService::Instance> instance; + + { + FailPointEnableBlock fp("pauseBeforeRunTenantMigrationRecipientInstance"); + // Create and start the instance. + instance = TenantMigrationRecipientService::Instance::getOrCreate( + opCtx.get(), _service, initialStateDocument.toBSON()); + ASSERT(instance.get()); + instance->setCreateOplogFetcherFn_forTest(std::make_unique<CreateOplogFetcherMockFn>()); + } + // Create and insert two tenant migration no-op entries into the oplog. The oplog applier should + // resume from the no-op entry with the most recent donor opTime. + const auto insertNss = NamespaceString("tenantA_foo.bar"); + const auto earlierOplogBson = makeOplogEntry(clonerFinishedOpTime, + OpTypeEnum::kInsert, + insertNss, + UUID::gen(), + BSON("doc" << 1), + boost::none /* o2 */) + .getEntry() + .toBSON(); + const auto resumeOplogBson = makeOplogEntry(resumeOpTime, + OpTypeEnum::kInsert, + insertNss, + UUID::gen(), + BSON("doc" << 2), + boost::none /* o2 */) + .getEntry() + .toBSON(); + auto storage = StorageInterface::get(opCtx->getServiceContext()); + const auto oplogNss = NamespaceString::kRsOplogNamespace; + const OpTime earlierRecipientOpTime(Timestamp(9, 1), 1); + const OpTime resumeRecipientOpTime(Timestamp(10, 1), 1); + auto earlierNoOpEntry = makeNoOpOplogEntry(earlierRecipientOpTime, + insertNss, + UUID::gen(), + earlierOplogBson, + instance->getMigrationUUID()); + auto resumeNoOpEntry = makeNoOpOplogEntry(resumeRecipientOpTime, + insertNss, + UUID::gen(), + resumeOplogBson, + instance->getMigrationUUID()); + ASSERT_OK( + storage->insertDocument(opCtx.get(), + oplogNss, + {earlierNoOpEntry.toBSON(), earlierRecipientOpTime.getTimestamp()}, + earlierRecipientOpTime.getTerm())); + ASSERT_OK( + storage->insertDocument(opCtx.get(), + oplogNss, + {resumeNoOpEntry.toBSON(), resumeRecipientOpTime.getTimestamp()}, + resumeRecipientOpTime.getTerm())); + + // Hang before starting the oplog applier. + const auto hangAfterStartingOplogApplier = + globalFailPointRegistry().find("fpAfterStartingOplogApplierMigrationRecipientInstance"); + auto initialTimesEntered = hangAfterStartingOplogApplier->setMode(FailPoint::alwaysOn, + 0, + BSON("action" + << "hang")); + hangAfterStartingOplogApplier->waitForTimesEntered(initialTimesEntered + 1); + + auto oplogFetcher = getDonorOplogFetcher(instance.get()); + auto dataConsistentOplogEntry = makeOplogEntry(dataConsistentOpTime, + OpTypeEnum::kInsert, + insertNss, + UUID::gen(), + BSON("doc" << 3), + boost::none /* o2 */); + // Feed the oplog fetcher the last doc required for the recipient to be considered consistent. + oplogFetcher->receiveBatch( + 1, {dataConsistentOplogEntry.getEntry().toBSON()}, dataConsistentOpTime.getTimestamp()); + + // Allow the service to continue. + hangAfterStartingOplogApplier->setMode(FailPoint::off); + LOGV2(5272350, + "Waiting for recipient service to reach consistent state", + "suite"_attr = _agent.getSuiteName(), + "test"_attr = _agent.getTestName()); + instance->waitUntilMigrationReachesConsistentState(opCtx.get()); + + // The oplog applier should have started applying at the 'resumeOpTime'. + const auto oplogApplier = getTenantOplogApplier(instance.get()); + ASSERT_EQUALS(resumeOpTime, oplogApplier->getBeginApplyingOpTime_forTest()); + + // Stop the oplog applier. + instance->stopOplogApplier_forTest(); + // Wait for task completion. Since we're using a test function to cancel the applier, + // the actual result is not critical. + ASSERT_NOT_OK(instance->getDataSyncCompletionFuture().getNoThrow()); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); +} + +TEST_F(TenantMigrationRecipientServiceTest, OplogApplierResumesFromStartDonorApplyingOpTime) { + const UUID migrationUUID = UUID::gen(); + const OpTime initialOpTime(Timestamp(1, 1), 1); + const OpTime startApplyingOpTime(Timestamp(2, 1), 1); + const OpTime dataConsistentOpTime(Timestamp(4, 1), 1); + + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); + insertTopOfOplog(&replSet, initialOpTime); + + TenantMigrationRecipientDocument initialStateDocument( + migrationUUID, + replSet.getConnectionString(), + "tenantA", + ReadPreferenceSetting(ReadPreference::PrimaryOnly), + kRecipientPEMPayload); + + // We skip cloning here as a way to simulate that the recipient service has detected an existing + // migration on startup and will attempt to resume oplog fetching from the appropriate optime. + updateStateDocToCloningFinished(initialStateDocument, + OpTime(Timestamp(10, 1), 1) /* cloneFinishedRecipientOpTime */, + dataConsistentOpTime /* dataConsistentStopDonorOpTime */, + startApplyingOpTime /* startApplyingDonorOpTime */, + initialOpTime /* startFetchingDonorOpTime */); + + auto opCtx = makeOperationContext(); + std::shared_ptr<TenantMigrationRecipientService::Instance> instance; + + { + FailPointEnableBlock fp("pauseBeforeRunTenantMigrationRecipientInstance"); + // Create and start the instance. + instance = TenantMigrationRecipientService::Instance::getOrCreate( + opCtx.get(), _service, initialStateDocument.toBSON()); + ASSERT(instance.get()); + instance->setCreateOplogFetcherFn_forTest(std::make_unique<CreateOplogFetcherMockFn>()); + } + + // Create and insert the following into the oplog: + // - (1) An oplog entry with opTime earlier than 'cloneFinishedRecipientOpTime'. + // - (2) An oplog entry with opTime greater than 'cloneFinishedRecipientOpTime'. + // - (3) A no-op oplog entry with an inner donor oplog entry as the 'o' field. The donor opTime + // is less than the 'startApplyingDonorOpTime'. + // - (4) A no-op oplog entry with an inner oplog entry as the 'o' field but no 'fromMigrate' + // field. These oplog entries do not satisfy the conditions for the oplog applier to resume from + // so we default to resuming from 'startDonorApplyingOpTime'. + const auto insertNss = NamespaceString("tenantA_foo.bar"); + const auto entryBeforeStartApplyingOpTime = makeOplogEntry( + initialOpTime, + OpTypeEnum::kInsert, + insertNss, + UUID::gen(), + BSON("doc" + << "before startApplyingDonorOpTime"), + boost::none /* o2 */) + .getEntry() + .toBSON(); + const auto afterStartApplyingOpTime = OpTime(Timestamp(3, 1), 1); + const auto entryAfterStartApplyingOpTime = makeOplogEntry( + afterStartApplyingOpTime, + OpTypeEnum::kInsert, + insertNss, + UUID::gen(), + BSON("doc" + << "after startApplyingDonorOpTime"), + boost::none /* o2 */) + .getEntry() + .toBSON(); + auto storage = StorageInterface::get(opCtx->getServiceContext()); + const auto oplogNss = NamespaceString::kRsOplogNamespace; + const auto collUuid = UUID::gen(); + std::vector<DurableOplogEntry> oplogEntries; + std::vector<MutableOplogEntry> noOpEntries; + // (1) + oplogEntries.push_back(makeOplogEntry(OpTime(Timestamp(9, 1), 1), + OpTypeEnum::kInsert, + insertNss, + collUuid, + BSON("doc" + << "before clonerFinishedOpTime"), + boost::none /* o2 */) + .getEntry()); + // (2) + oplogEntries.push_back(makeOplogEntry(OpTime(Timestamp(11, 1), 1), + OpTypeEnum::kInsert, + insertNss, + collUuid, + BSON("doc" + << "after clonerFinishedOpTime"), + boost::none /* o2 */) + .getEntry()); + // (3) + noOpEntries.push_back(makeNoOpOplogEntry(OpTime(Timestamp(12, 1), 1), + insertNss, + collUuid, + entryBeforeStartApplyingOpTime, + instance->getMigrationUUID())); + // (4) + noOpEntries.push_back(makeNoOpOplogEntry(OpTime(Timestamp(13, 1), 1), + insertNss, + collUuid, + entryAfterStartApplyingOpTime, + boost::none /* o2 */)); + for (auto entry : oplogEntries) { + auto opTime = entry.getOpTime(); + ASSERT_OK(storage->insertDocument( + opCtx.get(), oplogNss, {entry.toBSON(), opTime.getTimestamp()}, opTime.getTerm())); + } + for (auto entry : noOpEntries) { + auto opTime = entry.getOpTime(); + ASSERT_OK(storage->insertDocument( + opCtx.get(), oplogNss, {entry.toBSON(), opTime.getTimestamp()}, opTime.getTerm())); + } + + // Hang before starting the oplog applier. + const auto hangAfterStartingOplogApplier = + globalFailPointRegistry().find("fpAfterStartingOplogApplierMigrationRecipientInstance"); + auto initialTimesEntered = hangAfterStartingOplogApplier->setMode(FailPoint::alwaysOn, + 0, + BSON("action" + << "hang")); + hangAfterStartingOplogApplier->waitForTimesEntered(initialTimesEntered + 1); + + auto dataConsistentOplogEntry = makeOplogEntry(dataConsistentOpTime, + OpTypeEnum::kInsert, + NamespaceString("tenantA_foo.bar"), + UUID::gen(), + BSON("doc" << 3), + boost::none /* o2 */); + + auto oplogFetcher = getDonorOplogFetcher(instance.get()); + // Feed the oplog fetcher the last doc required for the recipient to be considered consistent. + oplogFetcher->receiveBatch( + 1, {dataConsistentOplogEntry.getEntry().toBSON()}, dataConsistentOpTime.getTimestamp()); + + // Allow the service to continue. + hangAfterStartingOplogApplier->setMode(FailPoint::off); + LOGV2(5272340, + "Waiting for recipient service to reach consistent state", + "suite"_attr = _agent.getSuiteName(), + "test"_attr = _agent.getTestName()); + instance->waitUntilMigrationReachesConsistentState(opCtx.get()); + + // The oplog applier starts applying from the first opTime after the 'beginApplyingOpTime'. + const auto oplogApplier = getTenantOplogApplier(instance.get()); + ASSERT_EQUALS(startApplyingOpTime, oplogApplier->getBeginApplyingOpTime_forTest()); + + // Stop the oplog applier. + instance->stopOplogApplier_forTest(); + // Wait for task completion. Since we're using a test function to cancel the applier, + // the actual result is not critical. + ASSERT_NOT_OK(instance->getDataSyncCompletionFuture().getNoThrow()); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); +} + +TEST_F(TenantMigrationRecipientServiceTest, + OplogFetcherResumesFromStartFetchingOpTimeWithDocInBuffer) { + const UUID migrationUUID = UUID::gen(); + const OpTime initialOpTime(Timestamp(1, 1), 1); + const OpTime startFetchingOpTime(Timestamp(2, 1), 1); + const OpTime dataConsistentOpTime(Timestamp(4, 1), 1); + + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); + insertTopOfOplog(&replSet, initialOpTime); + + TenantMigrationRecipientDocument initialStateDocument( + migrationUUID, + replSet.getConnectionString(), + "tenantA", + ReadPreferenceSetting(ReadPreference::PrimaryOnly), + kRecipientPEMPayload); + + // We skip cloning here as a way to simulate that the recipient service has detected an existing + // migration on startup and will resume oplog fetching from the appropriate optime. + updateStateDocToCloningFinished(initialStateDocument, + startFetchingOpTime, + dataConsistentOpTime, + startFetchingOpTime, + startFetchingOpTime); + + // Hang after creating the oplog buffer collection but before starting the oplog fetcher. + const auto hangBeforeFetcherFp = + globalFailPointRegistry().find("pauseAfterCreatingOplogBuffer"); + auto initialTimesEntered = hangBeforeFetcherFp->setMode(FailPoint::alwaysOn, + 0, + BSON("action" + << "hang")); + + auto opCtx = makeOperationContext(); + std::shared_ptr<TenantMigrationRecipientService::Instance> instance; + { + FailPointEnableBlock fp("pauseBeforeRunTenantMigrationRecipientInstance"); + // Create and start the instance. + instance = TenantMigrationRecipientService::Instance::getOrCreate( + opCtx.get(), _service, initialStateDocument.toBSON()); + ASSERT(instance.get()); + instance->setCreateOplogFetcherFn_forTest(std::make_unique<CreateOplogFetcherMockFn>()); + } + + hangBeforeFetcherFp->waitForTimesEntered(initialTimesEntered + 1); + + // Insert the first document with 'startFetchingOpTime' into the oplog buffer. The fetcher + // should know to skip this document on service restart. + const auto oplogBuffer = getDonorOplogBuffer(instance.get()); + OplogBuffer::Batch batch1; + batch1.push_back(makeOplogEntry(startFetchingOpTime, + OpTypeEnum::kInsert, + NamespaceString("tenantA_foo.bar"), + UUID::gen(), + BSON("doc" << 2), + boost::none /* o2 */) + .getEntry() + .toBSON()); + oplogBuffer->push(opCtx.get(), batch1.cbegin(), batch1.cend()); + ASSERT_EQUALS(oplogBuffer->getCount(), 1); + + auto dataConsistentOplogEntry = makeOplogEntry(dataConsistentOpTime, + OpTypeEnum::kInsert, + NamespaceString("tenantA_foo.bar"), + UUID::gen(), + BSON("doc" << 3), + boost::none /* o2 */); + // Continue the recipient service to hang before starting the oplog applier. + const auto hangAfterStartingOplogApplier = + globalFailPointRegistry().find("fpAfterStartingOplogApplierMigrationRecipientInstance"); + initialTimesEntered = hangAfterStartingOplogApplier->setMode(FailPoint::alwaysOn, + 0, + BSON("action" + << "hang")); + hangBeforeFetcherFp->setMode(FailPoint::off); + hangAfterStartingOplogApplier->waitForTimesEntered(initialTimesEntered + 1); + + // The oplog fetcher should exist and be running. + auto oplogFetcher = checked_cast<OplogFetcherMock*>(getDonorOplogFetcher(instance.get())); + ASSERT_TRUE(oplogFetcher != nullptr); + ASSERT_TRUE(oplogFetcher->isActive()); + // The oplog fetcher should have started fetching from 'startFetchingOpTime'. However, the + // fetcher should skip the first doc from being fetched since it already exists in the buffer. + ASSERT_EQUALS(oplogFetcher->getLastOpTimeFetched_forTest(), startFetchingOpTime); + ASSERT(oplogFetcher->getStartingPoint_forTest() == OplogFetcher::StartingPoint::kSkipFirstDoc); + + // Feed the oplog fetcher the last doc required for us to be considered consistent. + oplogFetcher->receiveBatch( + 1, {dataConsistentOplogEntry.getEntry().toBSON()}, dataConsistentOpTime.getTimestamp()); + + // Allow the service to continue. + hangAfterStartingOplogApplier->setMode(FailPoint::off); + LOGV2(5272317, + "Waiting for recipient service to reach consistent state", + "suite"_attr = _agent.getSuiteName(), + "test"_attr = _agent.getTestName()); + instance->waitUntilMigrationReachesConsistentState(opCtx.get()); + + // Stop the oplog applier. + instance->stopOplogApplier_forTest(); + // Wait for task completion. Since we're using a test function to cancel the applier, + // the actual result is not critical. + ASSERT_NOT_OK(instance->getDataSyncCompletionFuture().getNoThrow()); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); +} + TEST_F(TenantMigrationRecipientServiceTest, OplogApplierFails) { const UUID migrationUUID = UUID::gen(); const OpTime topOfOplogOpTime(Timestamp(5, 1), 1); @@ -1311,9 +1937,11 @@ TEST_F(TenantMigrationRecipientServiceTest, OplogApplierFails) { ReadPreferenceSetting(ReadPreference::PrimaryOnly), kRecipientPEMPayload); - // Setting these causes us to skip cloning. - initialStateDocument.setCloneFinishedRecipientOpTime(topOfOplogOpTime); - initialStateDocument.setDataConsistentStopDonorOpTime(topOfOplogOpTime); + // Skip the cloners in this test, so we provide an empty list of databases. + MockRemoteDBServer* const _donorServer = + mongo::MockConnRegistry::get()->getMockRemoteDBServer(replSet.getPrimary()); + _donorServer->setCommandReply("listDatabases", makeListDatabasesResponse({})); + _donorServer->setCommandReply("find", makeFindResponse()); auto opCtx = makeOperationContext(); std::shared_ptr<TenantMigrationRecipientService::Instance> instance; @@ -1373,9 +2001,11 @@ TEST_F(TenantMigrationRecipientServiceTest, StoppingApplierAllowsCompletion) { ReadPreferenceSetting(ReadPreference::PrimaryOnly), kRecipientPEMPayload); - // Setting these causes us to skip cloning. - initialStateDocument.setCloneFinishedRecipientOpTime(topOfOplogOpTime); - initialStateDocument.setDataConsistentStopDonorOpTime(topOfOplogOpTime); + // Skip the cloners in this test, so we provide an empty list of databases. + MockRemoteDBServer* const _donorServer = + mongo::MockConnRegistry::get()->getMockRemoteDBServer(replSet.getPrimary()); + _donorServer->setCommandReply("listDatabases", makeListDatabasesResponse({})); + _donorServer->setCommandReply("find", makeFindResponse()); auto opCtx = makeOperationContext(); std::shared_ptr<TenantMigrationRecipientService::Instance> instance; @@ -1732,9 +2362,11 @@ TEST_F(TenantMigrationRecipientServiceTest, RecipientForgetMigration_AfterConsis ReadPreferenceSetting(ReadPreference::PrimaryOnly), kRecipientPEMPayload); - // Setting these causes us to skip cloning. - initialStateDocument.setCloneFinishedRecipientOpTime(topOfOplogOpTime); - initialStateDocument.setDataConsistentStopDonorOpTime(topOfOplogOpTime); + // Skip the cloners in this test, so we provide an empty list of databases. + MockRemoteDBServer* const _donorServer = + mongo::MockConnRegistry::get()->getMockRemoteDBServer(replSet.getPrimary()); + _donorServer->setCommandReply("listDatabases", makeListDatabasesResponse({})); + _donorServer->setCommandReply("find", makeFindResponse()); auto opCtx = makeOperationContext(); std::shared_ptr<TenantMigrationRecipientService::Instance> instance; @@ -1816,9 +2448,11 @@ TEST_F(TenantMigrationRecipientServiceTest, RecipientForgetMigration_AfterFail) ReadPreferenceSetting(ReadPreference::PrimaryOnly), kRecipientPEMPayload); - // Setting these causes us to skip cloning. - initialStateDocument.setCloneFinishedRecipientOpTime(topOfOplogOpTime); - initialStateDocument.setDataConsistentStopDonorOpTime(topOfOplogOpTime); + // Skip the cloners in this test, so we provide an empty list of databases. + MockRemoteDBServer* const _donorServer = + mongo::MockConnRegistry::get()->getMockRemoteDBServer(replSet.getPrimary()); + _donorServer->setCommandReply("listDatabases", makeListDatabasesResponse({})); + _donorServer->setCommandReply("find", makeFindResponse()); auto opCtx = makeOperationContext(); std::shared_ptr<TenantMigrationRecipientService::Instance> instance; diff --git a/src/mongo/db/repl/tenant_oplog_applier.cpp b/src/mongo/db/repl/tenant_oplog_applier.cpp index 54698b77f00..c36d5c73d64 100644 --- a/src/mongo/db/repl/tenant_oplog_applier.cpp +++ b/src/mongo/db/repl/tenant_oplog_applier.cpp @@ -55,19 +55,23 @@ namespace mongo { namespace repl { +MONGO_FAIL_POINT_DEFINE(hangInTenantOplogApplication); + TenantOplogApplier::TenantOplogApplier(const UUID& migrationUuid, const std::string& tenantId, OpTime applyFromOpTime, RandomAccessOplogBuffer* oplogBuffer, std::shared_ptr<executor::TaskExecutor> executor, - ThreadPool* writerPool) + ThreadPool* writerPool, + const bool isResuming) : AbstractAsyncComponent(executor.get(), std::string("TenantOplogApplier_") + tenantId), _migrationUuid(migrationUuid), _tenantId(tenantId), _beginApplyingAfterOpTime(applyFromOpTime), _oplogBuffer(oplogBuffer), _executor(std::move(executor)), - _writerPool(writerPool) {} + _writerPool(writerPool), + _isResuming(isResuming) {} TenantOplogApplier::~TenantOplogApplier() { shutdown(); @@ -93,8 +97,17 @@ SemiFuture<TenantOplogApplier::OpTimePair> TenantOplogApplier::getNotificationFo return iter->second.getFuture().semi(); } +OpTime TenantOplogApplier::getBeginApplyingOpTime_forTest() const { + return _beginApplyingAfterOpTime; +} + Status TenantOplogApplier::_doStartup_inlock() noexcept { - _oplogBatcher = std::make_shared<TenantOplogBatcher>(_tenantId, _oplogBuffer, _executor); + Timestamp resumeTs; + if (_isResuming) { + resumeTs = _beginApplyingAfterOpTime.getTimestamp(); + } + _oplogBatcher = + std::make_shared<TenantOplogBatcher>(_tenantId, _oplogBuffer, _executor, resumeTs); auto status = _oplogBatcher->startup(); if (!status.isOK()) return status; @@ -288,6 +301,18 @@ void TenantOplogApplier::_applyOplogBatch(TenantOplogBatch* batch) { iter->second.emplaceValue(_lastAppliedOpTimesUpToLastBatch); } _opTimeNotificationList.erase(_opTimeNotificationList.begin(), firstUnexpiredIter); + + hangInTenantOplogApplication.executeIf( + [&](const BSONObj& data) { + LOGV2( + 5272315, + "hangInTenantOplogApplication failpoint enabled -- blocking until it is disabled.", + "tenant"_attr = _tenantId, + "migrationUuid"_attr = _migrationUuid, + "lastBatchCompletedOpTimes"_attr = lastBatchCompletedOpTimes); + hangInTenantOplogApplication.pauseWhileSet(opCtx.get()); + }, + [&](const BSONObj& data) { return !lastBatchCompletedOpTimes.recipientOpTime.isNull(); }); } void TenantOplogApplier::_checkNsAndUuidsBelongToTenant(OperationContext* opCtx, diff --git a/src/mongo/db/repl/tenant_oplog_applier.h b/src/mongo/db/repl/tenant_oplog_applier.h index ce9bd29136d..bae6772f296 100644 --- a/src/mongo/db/repl/tenant_oplog_applier.h +++ b/src/mongo/db/repl/tenant_oplog_applier.h @@ -76,7 +76,8 @@ public: OpTime applyFromOpTime, RandomAccessOplogBuffer* oplogBuffer, std::shared_ptr<executor::TaskExecutor> executor, - ThreadPool* writerPool); + ThreadPool* writerPool, + const bool isResuming = false); virtual ~TenantOplogApplier(); @@ -89,6 +90,11 @@ public: */ SemiFuture<OpTimePair> getNotificationForOpTime(OpTime donorOpTime); + /** + * Returns the optime the applier will start applying from. Used for testing. + */ + OpTime getBeginApplyingOpTime_forTest() const; + private: Status _doStartup_inlock() noexcept final; void _doShutdown_inlock() noexcept final; @@ -153,6 +159,7 @@ private: Status _finalStatus = Status::OK(); // (M) stdx::unordered_set<UUID, UUID::Hash> _knownGoodUuids; // (X) bool _applyLoopApplyingBatch = false; // (M) + const bool _isResuming; // (R) }; /** diff --git a/src/mongo/db/repl/tenant_oplog_batcher.cpp b/src/mongo/db/repl/tenant_oplog_batcher.cpp index c564d4b0119..47cbbef1c5d 100644 --- a/src/mongo/db/repl/tenant_oplog_batcher.cpp +++ b/src/mongo/db/repl/tenant_oplog_batcher.cpp @@ -41,10 +41,12 @@ namespace mongo { namespace repl { TenantOplogBatcher::TenantOplogBatcher(const std::string& tenantId, RandomAccessOplogBuffer* oplogBuffer, - std::shared_ptr<executor::TaskExecutor> executor) + std::shared_ptr<executor::TaskExecutor> executor, + Timestamp resumeBatchingTs) : AbstractAsyncComponent(executor.get(), std::string("TenantOplogBatcher_") + tenantId), _oplogBuffer(oplogBuffer), - _executor(executor) {} + _executor(executor), + _resumeBatchingTs(resumeBatchingTs) {} TenantOplogBatcher::~TenantOplogBatcher() { shutdown(); @@ -208,6 +210,27 @@ SemiFuture<TenantOplogBatch> TenantOplogBatcher::getNextBatch(BatchLimits limits Status TenantOplogBatcher::_doStartup_inlock() noexcept { LOGV2_DEBUG( 4885604, 1, "Tenant Oplog Batcher starting up", "component"_attr = _getComponentName()); + if (!_resumeBatchingTs.isNull()) { + auto opCtx = cc().makeOperationContext(); + uassert(5272303, + str::stream() << "Error resuming oplog batcher", + _oplogBuffer + ->seekToTimestamp(opCtx.get(), + _resumeBatchingTs, + RandomAccessOplogBuffer::SeekStrategy::kInexact) + .isOK()); + // Doing a 'seekToTimestamp' will not set the '_lastPoppedKey' on its own if a document + // with '_resumeBatchingTs' exists in the buffer collection. We do a 'tryPop' here to set + // '_lastPoppedKey' to equal '_resumeBatchingTs'. + if (_oplogBuffer->findByTimestamp(opCtx.get(), _resumeBatchingTs).isOK()) { + BSONObj opToPopAndDiscard; + _oplogBuffer->tryPop(opCtx.get(), &opToPopAndDiscard); + } + LOGV2_DEBUG(5272306, + 1, + "Tenant Oplog Batcher will resume batching from after timestamp", + "timestamp"_attr = _resumeBatchingTs); + } return Status::OK(); } diff --git a/src/mongo/db/repl/tenant_oplog_batcher.h b/src/mongo/db/repl/tenant_oplog_batcher.h index ddb8f769111..c8b51f0cbd9 100644 --- a/src/mongo/db/repl/tenant_oplog_batcher.h +++ b/src/mongo/db/repl/tenant_oplog_batcher.h @@ -75,7 +75,8 @@ public: TenantOplogBatcher(const std::string& tenantId, RandomAccessOplogBuffer* oplogBuffer, - std::shared_ptr<executor::TaskExecutor> executor); + std::shared_ptr<executor::TaskExecutor> executor, + const Timestamp resumeBatchingTs); virtual ~TenantOplogBatcher(); @@ -114,6 +115,7 @@ private: RandomAccessOplogBuffer* _oplogBuffer; // (S) bool _batchRequested = false; // (M) std::shared_ptr<executor::TaskExecutor> _executor; // (R) + const Timestamp _resumeBatchingTs; // (R) }; } // namespace repl diff --git a/src/mongo/db/repl/tenant_oplog_batcher_test.cpp b/src/mongo/db/repl/tenant_oplog_batcher_test.cpp index 18ca8741d6a..7bb3d2273f2 100644 --- a/src/mongo/db/repl/tenant_oplog_batcher_test.cpp +++ b/src/mongo/db/repl/tenant_oplog_batcher_test.cpp @@ -49,6 +49,7 @@ class TenantOplogBatcherTest : public unittest::Test, public ScopedGlobalService public: void setUp() override { unittest::Test::setUp(); + Client::initThread("TenantOplogBatcherTest"); auto network = std::make_unique<executor::NetworkInterfaceMock>(); _net = network.get(); executor::ThreadPoolMock::Options thread_pool_options; @@ -58,6 +59,10 @@ public: _oplogBuffer.startup(nullptr); } + void tearDown() override { + Client::releaseCurrent(); + } + protected: TenantOplogBatcher::BatchLimits bigBatchLimits = TenantOplogBatcher::BatchLimits(1ULL << 32, 1ULL << 32); @@ -94,7 +99,8 @@ std::string toString(TenantOplogBatch& batch) { constexpr auto dbName = "tenant_test"_sd; TEST_F(TenantOplogBatcherTest, CannotRequestTwoBatchesAtOnce) { - auto batcher = std::make_shared<TenantOplogBatcher>("tenant", &_oplogBuffer, _executor); + auto batcher = std::make_shared<TenantOplogBatcher>( + "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */); ASSERT_OK(batcher->startup()); auto batchFuture = batcher->getNextBatch(bigBatchLimits); // We just started, no batch should be available. @@ -107,7 +113,8 @@ TEST_F(TenantOplogBatcherTest, CannotRequestTwoBatchesAtOnce) { } TEST_F(TenantOplogBatcherTest, OplogBatcherGroupsCrudOps) { - auto batcher = std::make_shared<TenantOplogBatcher>("tenant", &_oplogBuffer, _executor); + auto batcher = std::make_shared<TenantOplogBatcher>( + "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */); ASSERT_OK(batcher->startup()); auto batchFuture = batcher->getNextBatch(bigBatchLimits); // We just started, no batch should be available. @@ -131,7 +138,8 @@ TEST_F(TenantOplogBatcherTest, OplogBatcherGroupsCrudOps) { } TEST_F(TenantOplogBatcherTest, OplogBatcherFailsOnPreparedApplyOps) { - auto batcher = std::make_shared<TenantOplogBatcher>("tenant", &_oplogBuffer, _executor); + auto batcher = std::make_shared<TenantOplogBatcher>( + "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */); ASSERT_OK(batcher->startup()); auto batchFuture = batcher->getNextBatch(bigBatchLimits); @@ -145,7 +153,8 @@ TEST_F(TenantOplogBatcherTest, OplogBatcherFailsOnPreparedApplyOps) { } TEST_F(TenantOplogBatcherTest, OplogBatcherFailsOnPreparedCommit) { - auto batcher = std::make_shared<TenantOplogBatcher>("tenant", &_oplogBuffer, _executor); + auto batcher = std::make_shared<TenantOplogBatcher>( + "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */); ASSERT_OK(batcher->startup()); auto batchFuture = batcher->getNextBatch(bigBatchLimits); @@ -176,7 +185,8 @@ TEST_F(TenantOplogBatcherTest, GetNextApplierBatchGroupsUnpreparedApplyOpsOpWith srcOps.push_back(makeApplyOpsOplogEntry(1, false, innerOps).getEntry().toBSON()); srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar")).getEntry().toBSON()); - auto batcher = std::make_shared<TenantOplogBatcher>("tenant", &_oplogBuffer, _executor); + auto batcher = std::make_shared<TenantOplogBatcher>( + "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */); ASSERT_OK(batcher->startup()); auto batchFuture = batcher->getNextBatch(bigBatchLimits); _oplogBuffer.push(nullptr, srcOps.cbegin(), srcOps.cend()); @@ -209,7 +219,8 @@ TEST_F(TenantOplogBatcherTest, GetNextApplierBatchGroupsMultipleTransactions) { srcOps.push_back(makeApplyOpsOplogEntry(1, false, innerOps1).getEntry().toBSON()); srcOps.push_back(makeApplyOpsOplogEntry(2, false, innerOps2).getEntry().toBSON()); - auto batcher = std::make_shared<TenantOplogBatcher>("tenant", &_oplogBuffer, _executor); + auto batcher = std::make_shared<TenantOplogBatcher>( + "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */); ASSERT_OK(batcher->startup()); auto batchFuture = batcher->getNextBatch(bigBatchLimits); _oplogBuffer.push(nullptr, srcOps.cbegin(), srcOps.cend()); @@ -252,7 +263,8 @@ TEST_F(TenantOplogBatcherTest, GetNextApplierBatchChecksBatchLimitsForNumberOfOp // Set batch limits so that each batch contains a maximum of 'BatchLimit::ops'. auto limits = bigBatchLimits; limits.ops = 3U; - auto batcher = std::make_shared<TenantOplogBatcher>("tenant", &_oplogBuffer, _executor); + auto batcher = std::make_shared<TenantOplogBatcher>( + "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */); ASSERT_OK(batcher->startup()); auto batchFuture = batcher->getNextBatch(limits); @@ -283,7 +295,8 @@ TEST_F(TenantOplogBatcherTest, GetNextApplierBatchChecksBatchLimitsForSizeOfOper // Set batch limits so that only the first two operations can fit into the first batch. auto limits = bigBatchLimits; limits.bytes = std::size_t(srcOps[0].objsize() + srcOps[1].objsize()); - auto batcher = std::make_shared<TenantOplogBatcher>("tenant", &_oplogBuffer, _executor); + auto batcher = std::make_shared<TenantOplogBatcher>( + "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */); ASSERT_OK(batcher->startup()); auto batchFuture = batcher->getNextBatch(limits); @@ -328,7 +341,8 @@ TEST_F(TenantOplogBatcherTest, LargeTransactionProcessedIndividuallyAndExpanded) _oplogBuffer.push(nullptr, srcOps.cbegin(), srcOps.cend()); - auto batcher = std::make_shared<TenantOplogBatcher>("tenant", &_oplogBuffer, _executor); + auto batcher = std::make_shared<TenantOplogBatcher>( + "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */); ASSERT_OK(batcher->startup()); auto batchFuture = batcher->getNextBatch(bigBatchLimits); @@ -380,7 +394,8 @@ TEST_F(TenantOplogBatcherTest, LargeTransactionProcessedIndividuallyAndExpanded) } TEST_F(TenantOplogBatcherTest, GetNextApplierBatchRejectsZeroBatchOpsLimits) { - auto batcher = std::make_shared<TenantOplogBatcher>("tenant", &_oplogBuffer, _executor); + auto batcher = std::make_shared<TenantOplogBatcher>( + "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */); ASSERT_OK(batcher->startup()); // bigBatchLimits is a legal batch limit. auto limits = bigBatchLimits; @@ -392,7 +407,8 @@ TEST_F(TenantOplogBatcherTest, GetNextApplierBatchRejectsZeroBatchOpsLimits) { } TEST_F(TenantOplogBatcherTest, GetNextApplierBatchRejectsZeroBatchSizeLimits) { - auto batcher = std::make_shared<TenantOplogBatcher>("tenant", &_oplogBuffer, _executor); + auto batcher = std::make_shared<TenantOplogBatcher>( + "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */); ASSERT_OK(batcher->startup()); // bigBatchLimits is a legal batch limit. auto limits = bigBatchLimits; @@ -403,5 +419,49 @@ TEST_F(TenantOplogBatcherTest, GetNextApplierBatchRejectsZeroBatchSizeLimits) { batcher->join(); } +TEST_F(TenantOplogBatcherTest, ResumeOplogBatcherFromTimestamp) { + std::vector<BSONObj> srcOps; + srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(dbName, "bar")).getEntry().toBSON()); + srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar")).getEntry().toBSON()); + srcOps.push_back(makeInsertOplogEntry(3, NamespaceString(dbName, "bar")).getEntry().toBSON()); + srcOps.push_back(makeInsertOplogEntry(4, NamespaceString(dbName, "bar")).getEntry().toBSON()); + srcOps.push_back(makeInsertOplogEntry(5, NamespaceString(dbName, "bar")).getEntry().toBSON()); + _oplogBuffer.push(nullptr, srcOps.cbegin(), srcOps.cend()); + + auto batcher = + std::make_shared<TenantOplogBatcher>("tenant", &_oplogBuffer, _executor, Timestamp(4, 1)); + ASSERT_OK(batcher->startup()); + + auto batchFuture = batcher->getNextBatch(bigBatchLimits); + + auto batch = batchFuture.get(); + ASSERT_EQUALS(1U, batch.ops.size()) << toString(batch); + ASSERT_BSONOBJ_EQ(srcOps[4], batch.ops[0].entry.getEntry().toBSON()); + + batcher->shutdown(); + batcher->join(); +} + +TEST_F(TenantOplogBatcherTest, ResumeOplogBatcherFromNonExistentTimestamp) { + std::vector<BSONObj> srcOps; + srcOps.push_back(makeInsertOplogEntry(4, NamespaceString(dbName, "bar")).getEntry().toBSON()); + srcOps.push_back(makeInsertOplogEntry(5, NamespaceString(dbName, "bar")).getEntry().toBSON()); + _oplogBuffer.push(nullptr, srcOps.cbegin(), srcOps.cend()); + + auto batcher = + std::make_shared<TenantOplogBatcher>("tenant", &_oplogBuffer, _executor, Timestamp(3, 1)); + ASSERT_OK(batcher->startup()); + + auto batchFuture = batcher->getNextBatch(bigBatchLimits); + + auto batch = batchFuture.get(); + ASSERT_EQUALS(2U, batch.ops.size()) << toString(batch); + ASSERT_BSONOBJ_EQ(srcOps[0], batch.ops[0].entry.getEntry().toBSON()); + ASSERT_BSONOBJ_EQ(srcOps[1], batch.ops[1].entry.getEntry().toBSON()); + + batcher->shutdown(); + batcher->join(); +} + } // namespace repl } // namespace mongo |