diff options
author | Christopher Caplinger <christopher.caplinger@mongodb.com> | 2023-03-21 17:49:58 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-03-21 21:03:45 +0000 |
commit | 3c130a69eaddc7cb44895f57af4da6e39556dbb4 (patch) | |
tree | bdcf8be7bde8dac12106d578f69dff21331dccfd | |
parent | 2dbc2a40b841eef00e2ad1b79e3f938bad889c58 (diff) | |
download | mongo-3c130a69eaddc7cb44895f57af4da6e39556dbb4.tar.gz |
SERVER-72622: Track TenantOplogApplier progress in replicated collection
-rw-r--r-- | jstests/multiVersion/targetedTestsLastContinuousFeatures/tenant_oplog_applier_progress.js | 166 | ||||
-rw-r--r-- | jstests/replsets/libs/tenant_migration_util.js | 1 | ||||
-rw-r--r-- | jstests/replsets/tenant_migration_resume_oplog_application.js | 4 | ||||
-rw-r--r-- | src/mongo/db/basic_types.idl | 2 | ||||
-rw-r--r-- | src/mongo/db/namespace_string.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/namespace_string.h | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/shard_merge_recipient_service.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_recipient_service.cpp | 70 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_recipient_service.h | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_recipient_service_test.cpp | 462 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_oplog_applier.cpp | 112 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_oplog_applier.h | 41 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_oplog_applier_progress.idl | 50 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_oplog_applier_test.cpp | 1145 |
15 files changed, 1005 insertions, 1075 deletions
diff --git a/jstests/multiVersion/targetedTestsLastContinuousFeatures/tenant_oplog_applier_progress.js b/jstests/multiVersion/targetedTestsLastContinuousFeatures/tenant_oplog_applier_progress.js new file mode 100644 index 00000000000..c4d5a63e6a1 --- /dev/null +++ b/jstests/multiVersion/targetedTestsLastContinuousFeatures/tenant_oplog_applier_progress.js @@ -0,0 +1,166 @@ +/** + * Tests that in a tenant migration, the recipient primary will resume oplog application on + * failover with the same behavior regardless of the binary version running on the primary + * vs. secondary. + * + * @tags: [ + * incompatible_with_macos, + * incompatible_with_windows_tls, + * requires_majority_read_concern, + * requires_persistence, + * serverless, + * ] + */ + +import {TenantMigrationTest} from "jstests/replsets/libs/tenant_migration_test.js"; +import { + checkTenantDBHashes, + makeX509OptionsForTest, + runMigrationAsync, +} from "jstests/replsets/libs/tenant_migration_util.js"; + +load("jstests/libs/fail_point_util.js"); +load("jstests/libs/uuid_util.js"); // for 'extractUUIDFromObject' +load("jstests/libs/parallelTester.js"); // for 'Thread' +load('jstests/multiVersion/libs/multi_rs.js'); +load('jstests/replsets/rslib.js'); // For 'createRstArgs' + +const docsToApply = [{_id: 2, x: 2}, {_id: 3, x: 3}, {_id: 4, x: 4}]; + +function runTest({initialPrimaryVersion, initialSecondaryVersion, featureCompatibilityVersion}) { + jsTestLog(`Test TenantOplogApplier failover with Primary version '${ + initialPrimaryVersion}' Secondary version: ${initialSecondaryVersion} and FCV '${ + featureCompatibilityVersion}'`); + + const recipientRst = new ReplSetTest({ + name: jsTestName(), + nodes: [{binVersion: initialPrimaryVersion}, {binVersion: initialSecondaryVersion}], + // Use a batch size of 2 so that we can hang in the middle of tenant oplog application. + nodeOptions: Object.assign(makeX509OptionsForTest().recipient, + {setParameter: {tenantApplierBatchSizeOps: 2}}) + }); + + recipientRst.startSet(); + recipientRst.initiate(); + + const tenantMigrationTest = new TenantMigrationTest({name: jsTestName(), recipientRst}); + + const donorPrimary = tenantMigrationTest.getDonorPrimary(); + const recipientPrimary = tenantMigrationTest.getRecipientPrimary(); + + jsTestLog(`Set Donor FCV to ${featureCompatibilityVersion}`); + assert.commandWorked( + donorPrimary.adminCommand({setFeatureCompatibilityVersion: featureCompatibilityVersion})); + + jsTestLog(`Set Recipient FCV to ${featureCompatibilityVersion}`); + assert.commandWorked(recipientPrimary.adminCommand( + {setFeatureCompatibilityVersion: featureCompatibilityVersion})); + + const tenantId = ObjectId().str; + const dbName = tenantMigrationTest.tenantDB(tenantId, "testDB"); + const collName = "testColl"; + const donorTestColl = donorPrimary.getDB(dbName).getCollection(collName); + + // Populate the donor replica set with some initial data and make sure it is majority committed. + const majorityCommittedDocs = [{_id: 0, x: 0}, {_id: 1, x: 1}]; + assert.commandWorked( + donorTestColl.insert(majorityCommittedDocs, {writeConcern: {w: "majority"}})); + assert.eq(2, donorTestColl.find().readConcern("majority").itcount()); + + const migrationId = UUID(); + + const fetchNoOps = () => { + const recipientPrimary = recipientRst.getPrimary(); + const local = recipientPrimary.getDB("local"); + return local.oplog.rs.find({fromTenantMigration: migrationId, op: "n"}).toArray(); + }; + + const migrationOpts = { + migrationIdString: extractUUIDFromObject(migrationId), + recipientConnString: tenantMigrationTest.getRecipientConnString(), + protocol: "multitenant migrations", + tenantId, + }; + + // Configure fail point to have the recipient primary hang after the cloner completes and the + // oplog applier has started. + const waitAfterDatabaseClone = + configureFailPoint(recipientPrimary, + "fpAfterStartingOplogApplierMigrationRecipientInstance", + {action: "hang"}); + // Configure fail point to hang the tenant oplog applier after it applies the first batch. + const waitInOplogApplier = configureFailPoint(recipientPrimary, "hangInTenantOplogApplication"); + + // Start a migration and wait for recipient to hang in the tenant database cloner. + const migrationThread = new Thread( + runMigrationAsync, migrationOpts, createRstArgs(tenantMigrationTest.getDonorRst())); + migrationThread.start(); + waitAfterDatabaseClone.wait(); + + // Insert some writes that will eventually be picked up by the tenant oplog applier on the + // recipient. + tenantMigrationTest.insertDonorDB(dbName, collName, docsToApply); + + // Wait for the applied oplog batch to be replicated. + waitInOplogApplier.wait(); + recipientRst.awaitReplication(); + + // It is possible that the first batch applied includes a resume no-op token. We do not write + // no-op entries for resume token entries in tenant migrations. + const results = fetchNoOps(); + assert.gt(results.length, 0, results); + assert.lte(results.length, 2, results); + assert.eq(docsToApply[0], results[0].o2.o, results); + if (results.length === 2) { + assert.eq(docsToApply[1], results[1].o2.o, results); + } + + jsTestLog(`Step up a new Primary with version ${initialSecondaryVersion}`); + // Step up a new node in the recipient set and trigger a failover. The new primary should resume + // fetching starting from the unapplied documents. + recipientRst.stepUp(recipientRst.getSecondaries()[0]); + waitAfterDatabaseClone.off(); + waitInOplogApplier.off(); + + // The migration should go through after recipient failover. + TenantMigrationTest.assertCommitted(migrationThread.returnData()); + + const appliedNoOps = fetchNoOps(); + assert.eq(3, appliedNoOps.length, appliedNoOps); + docsToApply.forEach((docToApply, idx) => { + assert.eq(docToApply, appliedNoOps[idx].o2.o, appliedNoOps); + }); + + checkTenantDBHashes({ + donorRst: tenantMigrationTest.getDonorRst(), + recipientRst: tenantMigrationTest.getRecipientRst(), + tenantId + }); + + tenantMigrationTest.stop(); + recipientRst.stopSet(); +} + +// Initial primary is running 6.3, use FCV 6.3 and fall back to oplog scanning for tenant oplog +// applier resumption. +runTest({ + initialPrimaryVersion: "last-continuous", + initialSecondaryVersion: "latest", + featureCompatibilityVersion: "6.3" +}); + +// Initial secondary is running 6.3, use FCV 6.3 and fall back to oplog scanning for tenant oplog +// applier resumption. +runTest({ + initialPrimaryVersion: "latest", + initialSecondaryVersion: "last-continuous", + featureCompatibilityVersion: "6.3" +}); + +// All nodes running latest version, use latest FCV to utilize progress collection for tenant oplog +// applier resumption. +runTest({ + initialPrimaryVersion: "latest", + initialSecondaryVersion: "latest", + featureCompatibilityVersion: "7.0" +}); diff --git a/jstests/replsets/libs/tenant_migration_util.js b/jstests/replsets/libs/tenant_migration_util.js index 611ca24fb66..633fb3c536d 100644 --- a/jstests/replsets/libs/tenant_migration_util.js +++ b/jstests/replsets/libs/tenant_migration_util.js @@ -154,6 +154,7 @@ export async function runMigrationAsync(migrationOpts, donorRstArgs, opts = {}) migrationId: UUID(migrationOpts.migrationIdString), tenantId: migrationOpts.tenantId, tenantIds: eval(migrationOpts.tenantIds), + protocol: migrationOpts.protocol, recipientConnectionString: migrationOpts.recipientConnString, readPreference: migrationOpts.readPreference || {mode: "primary"}, donorCertificateForRecipient: migrationOpts.donorCertificateForRecipient || diff --git a/jstests/replsets/tenant_migration_resume_oplog_application.js b/jstests/replsets/tenant_migration_resume_oplog_application.js index 70849750914..50a9bedbbc5 100644 --- a/jstests/replsets/tenant_migration_resume_oplog_application.js +++ b/jstests/replsets/tenant_migration_resume_oplog_application.js @@ -63,10 +63,10 @@ const migrationOpts = { // Configure fail point to have the recipient primary hang after the cloner completes and the oplog // applier has started. -let waitAfterDatabaseClone = configureFailPoint( +const waitAfterDatabaseClone = configureFailPoint( recipientPrimary, "fpAfterStartingOplogApplierMigrationRecipientInstance", {action: "hang"}); // Configure fail point to hang the tenant oplog applier after it applies the first batch. -let waitInOplogApplier = configureFailPoint(recipientPrimary, "hangInTenantOplogApplication"); +const waitInOplogApplier = configureFailPoint(recipientPrimary, "hangInTenantOplogApplication"); // Start a migration and wait for recipient to hang in the tenant database cloner. const donorRstArgs = createRstArgs(donorRst); diff --git a/src/mongo/db/basic_types.idl b/src/mongo/db/basic_types.idl index 70f52ceea6b..e1c1e25ec84 100644 --- a/src/mongo/db/basic_types.idl +++ b/src/mongo/db/basic_types.idl @@ -196,7 +196,7 @@ types: array: bson_serialization_type: array - description: "An unowned BSONArray without custom deserialization or seialization" + description: "An unowned BSONArray without custom deserialization or serialization" cpp_type: "mongo::BSONArray" array_owned: diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp index 8965f0bcc98..6bd83eaba2c 100644 --- a/src/mongo/db/namespace_string.cpp +++ b/src/mongo/db/namespace_string.cpp @@ -240,6 +240,11 @@ NamespaceString NamespaceString::makeDummyNamespace(const boost::optional<Tenant return NamespaceString(tenantId, DatabaseName::kConfig.db(), "dummy.namespace"); } +NamespaceString NamespaceString::makeTenantOplogApplierProgressNSS(const UUID& migrationUUID) { + return NamespaceString(DatabaseName::kConfig, + NamespaceString::kTenantOplogApplierProgressPrefix + + migrationUUID.toString()); +} std::string NamespaceString::getSisterNS(StringData local) const { verify(local.size() && local[0] != '.'); diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h index 0669d777caa..a19f446f62e 100644 --- a/src/mongo/db/namespace_string.h +++ b/src/mongo/db/namespace_string.h @@ -188,6 +188,8 @@ public: static constexpr StringData kAnalyzeShardKeySplitPointsCollectionPrefix = "analyzeShardKey.splitPoints."_sd; + // Prefix for tracking TenantOplogApplier progress during a tenant migration. + static constexpr StringData kTenantOplogApplierProgressPrefix = "repl.migration.progress_"_sd; // Maintainers Note: The large set of `NamespaceString`-typed static data // members of the `NamespaceString` class representing system-reserved @@ -383,6 +385,12 @@ public: static NamespaceString makeDummyNamespace(const boost::optional<TenantId>& tenantId); /** + * Constructs a NamespaceString used for tracking TenantOplogApplier progress during a tenant + * migration. + */ + static NamespaceString makeTenantOplogApplierProgressNSS(const UUID& migrationUUID); + + /** * NOTE: DollarInDbNameBehavior::allow is deprecated. * * Please use DollarInDbNameBehavior::disallow and check explicitly for any DB names that must diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 4f7c9df477c..637097b7f65 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -2060,6 +2060,7 @@ env.Library( source=[ 'tenant_oplog_batcher.cpp', 'tenant_oplog_applier.cpp', + 'tenant_oplog_applier_progress.idl', ], LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/base', diff --git a/src/mongo/db/repl/shard_merge_recipient_service.cpp b/src/mongo/db/repl/shard_merge_recipient_service.cpp index 8e362697ee3..1392b3cd6f6 100644 --- a/src/mongo/db/repl/shard_merge_recipient_service.cpp +++ b/src/mongo/db/repl/shard_merge_recipient_service.cpp @@ -2112,15 +2112,18 @@ void ShardMergeRecipientService::Instance::_startOplogApplier() { invariant(startApplyingDonorOpTime); const auto& cloneFinishedRecipientOpTime = _stateDoc.getCloneFinishedRecipientOpTime(); invariant(cloneFinishedRecipientOpTime); + invariant(!cloneFinishedRecipientOpTime->isNull()); _tenantOplogApplier = std::make_shared<TenantOplogApplier>(_migrationUuid, MigrationProtocolEnum::kShardMerge, boost::none, + boost::none, *startApplyingDonorOpTime, _donorOplogBuffer.get(), **_scopedExecutor, - _writerPool.get()); - _tenantOplogApplier->setCloneFinishedRecipientOpTime(*cloneFinishedRecipientOpTime); + _writerPool.get(), + *cloneFinishedRecipientOpTime, + false); LOGV2_DEBUG(7339750, 1, diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp index 1e01d46a9be..d64e9af6433 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp @@ -95,6 +95,8 @@ using namespace fmt; const std::string kTTLIndexName = "TenantMigrationRecipientTTLIndex"; const Backoff kExponentialBackoff(Seconds(1), Milliseconds::max()); constexpr StringData kOplogBufferPrefix = "repl.migration.oplog_"_sd; + +constexpr StringData kTenantOplogApplierProgressPrefix = "repl.migration.progress_"_sd; constexpr int kBackupCursorFileFetcherRetryAttempts = 10; constexpr int kCheckpointTsBackupCursorErrorCode = 6929900; constexpr int kCloseCursorBeforeOpenErrorCode = 50886; @@ -2672,16 +2674,7 @@ void TenantMigrationRecipientService::Instance::_startOplogApplier() { stdx::unique_lock lk(_mutex); const auto& cloneFinishedRecipientOpTime = _stateDoc.getCloneFinishedRecipientOpTime(); invariant(cloneFinishedRecipientOpTime); - - OpTime resumeOpTime; - if (_sharedData->getResumePhase() == ResumePhase::kOplogCatchup) { - lk.unlock(); - // We avoid holding the mutex while scanning the local oplog which - // acquires the RSTL in IX mode. This is to allow us to be interruptable - // via a concurrent stepDown which acquires the RSTL in X mode. - resumeOpTime = _getOplogResumeApplyingDonorOptime(*cloneFinishedRecipientOpTime); - lk.lock(); - } + invariant(!cloneFinishedRecipientOpTime->isNull()); // Throwing error when cloner is canceled externally via interrupt(), // makes the instance to skip the remaining task (i.e., starting oplog @@ -2696,18 +2689,37 @@ void TenantMigrationRecipientService::Instance::_startOplogApplier() { const auto& startApplyingDonorOpTime = _stateDoc.getStartApplyingDonorOpTime(); invariant(startApplyingDonorOpTime); - _tenantOplogApplier = std::make_shared<TenantOplogApplier>( - _migrationUuid, - _protocol, - (_protocol != MigrationProtocolEnum::kShardMerge) ? boost::make_optional(_tenantId) - : boost::none, - (!resumeOpTime.isNull()) ? std::max(resumeOpTime, *startApplyingDonorOpTime) - : *startApplyingDonorOpTime, - _donorOplogBuffer.get(), - **_scopedExecutor, - _writerPool.get(), - resumeOpTime.getTimestamp()); - _tenantOplogApplier->setCloneFinishedRecipientOpTime(*cloneFinishedRecipientOpTime); + OpTime deprecatedResumeOpTime; + boost::optional<NamespaceString> nss = boost::none; + // If we are running < 7.0, fall back to oplog scanning for TenantOplogApplier resumption. Only + // versions 7.0 and later will store oplog applier progress data in a replicated collection. + if (serverGlobalParams.featureCompatibility.isLessThan( + multiversion::FeatureCompatibilityVersion::kVersion_7_0)) { + // We avoid holding the mutex while scanning the local oplog which + // acquires the RSTL in IX mode. This is to allow us to be interruptable + // via a concurrent stepDown which acquires the RSTL in X mode. + lk.unlock(); + // This node is not aware of the tenant oplog applier progress collection, fall back to + // scanning the oplog to ensure that we don't throw away our previous progress. + deprecatedResumeOpTime = _getOplogResumeApplyingDonorOptime(*cloneFinishedRecipientOpTime); + lk.lock(); + } else { + nss = NamespaceString::makeTenantOplogApplierProgressNSS(_migrationUuid); + } + + _tenantOplogApplier = std::make_shared<TenantOplogApplier>(_migrationUuid, + _protocol, + _tenantId, + nss, + *startApplyingDonorOpTime, + _donorOplogBuffer.get(), + **_scopedExecutor, + _writerPool.get(), + *cloneFinishedRecipientOpTime, + _sharedData->getResumePhase() == + ResumePhase::kOplogCatchup); + + _tenantOplogApplier->setDeprecatedResumeOpTime(deprecatedResumeOpTime); LOGV2_DEBUG(4881202, 1, @@ -2715,8 +2727,7 @@ void TenantMigrationRecipientService::Instance::_startOplogApplier() { "tenantId"_attr = getTenantId(), "migrationId"_attr = getMigrationUUID(), "startApplyingAfterDonorOpTime"_attr = - _tenantOplogApplier->getStartApplyingAfterOpTime(), - "resumeBatchingTs"_attr = _tenantOplogApplier->getResumeBatchingTs()); + _tenantOplogApplier->getStartApplyingAfterOpTime()); uassertStatusOK(_tenantOplogApplier->startup()); _oplogApplierReady = true; @@ -2856,15 +2867,18 @@ void TenantMigrationRecipientService::Instance::_dropTempCollections() { auto opCtx = cc().makeOperationContext(); auto storageInterface = StorageInterface::get(opCtx.get()); - // The donated files and oplog buffer collections can be safely dropped at this - // point. In case either collection does not exist, dropping will be a no-op. - // It isn't necessary that a given drop is majority-committed. A new primary will - // attempt to drop the collection anyway. + // The donated files, oplog buffer, and tenant oplog applier progress collections + // can be safely dropped at this point. In case either collection does not exist, + // dropping will be a no-op. It isn't necessary that a given drop is + // majority-committed. A new primary will attempt to drop the collection anyway. uassertStatusOK(storageInterface->dropCollection( opCtx.get(), shard_merge_utils::getDonatedFilesNs(getMigrationUUID()))); uassertStatusOK( storageInterface->dropCollection(opCtx.get(), getOplogBufferNs(getMigrationUUID()))); + + uassertStatusOK(storageInterface->dropCollection( + opCtx.get(), NamespaceString::makeTenantOplogApplierProgressNSS(getMigrationUUID()))); } SemiFuture<void> TenantMigrationRecipientService::Instance::run( diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.h b/src/mongo/db/repl/tenant_migration_recipient_service.h index 69e2020b6a0..254553a626a 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.h +++ b/src/mongo/db/repl/tenant_migration_recipient_service.h @@ -39,6 +39,7 @@ #include "mongo/db/repl/tenant_all_database_cloner.h" #include "mongo/db/repl/tenant_migration_state_machine_gen.h" #include "mongo/db/repl/tenant_oplog_applier.h" +#include "mongo/db/repl/tenant_oplog_applier_progress_gen.h" #include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/util/time_support.h" @@ -489,7 +490,10 @@ public: /* * Traverse backwards through the oplog to find the optime which tenant oplog application * should resume from. The oplog applier should resume applying entries that have a greater - * optime than the returned value. + * optime than the returned value. Note, this is currently only used as a fallback for if + * _getStoredTenantOplogApplierProgress returns no results in order to preserve backwards + * compatibility and avoid the reapplication of noop entries. This can be removed after + * all Serverless nodes are running the new progress collection-aware version of the code. */ OpTime _getOplogResumeApplyingDonorOptime(const OpTime& cloneFinishedRecipientOpTime) const; diff --git a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp index 36c74cdacbe..c883e976300 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp @@ -1853,468 +1853,6 @@ TEST_F(TenantMigrationRecipientServiceTest, OplogFetcherNoDocInBufferToResumeFro ASSERT_OK(instance->getForgetMigrationDurableFuture().getNoThrow()); } -TEST_F(TenantMigrationRecipientServiceTest, OplogApplierResumesFromLastNoOpOplogEntry) { - const UUID migrationUUID = UUID::gen(); - // Recipient opTimes - const OpTime clonerFinishedOpTime(Timestamp(1, 1), 1); - // Donor opTimes - const OpTime earlierThanResumeOpTime(Timestamp(2, 1), 1); - const OpTime resumeOpTime(Timestamp(3, 1), 1); - const OpTime dataConsistentOpTime(Timestamp(4, 1), 1); - - MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); - getTopologyManager()->setTopologyDescription(replSet.getTopologyDescription(clock())); - insertTopOfOplog(&replSet, clonerFinishedOpTime); - - const auto tenantId = OID::gen().toString(); - TenantMigrationRecipientDocument initialStateDocument( - migrationUUID, - replSet.getConnectionString(), - tenantId, - kDefaultStartMigrationTimestamp, - ReadPreferenceSetting(ReadPreference::PrimaryOnly)); - initialStateDocument.setProtocol(MigrationProtocolEnum::kMultitenantMigrations); - initialStateDocument.setRecipientCertificateForDonor(kRecipientPEMPayload); - - // We skip cloning here as a way to simulate that the recipient service has detected an existing - // migration on startup and will attempt to resume oplog fetching from the appropriate optime. - updateStateDocToCloningFinished(initialStateDocument, - clonerFinishedOpTime /* cloneFinishedRecipientOpTime */, - dataConsistentOpTime /* dataConsistentStopDonorOpTime */, - clonerFinishedOpTime /* startApplyingDonorOpTime */, - clonerFinishedOpTime /* startFetchingDonorOpTime */); - - auto opCtx = makeOperationContext(); - std::shared_ptr<TenantMigrationRecipientService::Instance> instance; - - // Hang before reading oplog. - const auto hangAfterStartingOplogFetcher = - globalFailPointRegistry().find("fpAfterStartingOplogFetcherMigrationRecipientInstance"); - hangAfterStartingOplogFetcher->setMode(FailPoint::alwaysOn, - 0, - BSON("action" - << "hang")); - - // Hang before starting the oplog applier. - const auto hangAfterStartingOplogApplier = - globalFailPointRegistry().find("fpAfterStartingOplogApplierMigrationRecipientInstance"); - auto initialTimesEntered = hangAfterStartingOplogApplier->setMode(FailPoint::alwaysOn, - 0, - BSON("action" - << "hang")); - - { - FailPointEnableBlock fp("pauseBeforeRunTenantMigrationRecipientInstance"); - // Create and start the instance. - instance = TenantMigrationRecipientService::Instance::getOrCreate( - opCtx.get(), _service, initialStateDocument.toBSON()); - ASSERT(instance.get()); - instance->setCreateOplogFetcherFn_forTest(std::make_unique<CreateOplogFetcherMockFn>()); - } - // Create and insert two tenant migration no-op entries into the oplog. The oplog applier should - // resume from the no-op entry with the most recent donor opTime. - const auto insertNss = NamespaceString::createNamespaceString_forTest(tenantId + "_foo.bar"); - const auto earlierOplogBson = makeOplogEntry(earlierThanResumeOpTime, - OpTypeEnum::kInsert, - insertNss, - UUID::gen(), - BSON("doc" << 1), - boost::none /* o2 */) - .getEntry() - .toBSON(); - const auto resumeOplogBson = makeOplogEntry(resumeOpTime, - OpTypeEnum::kInsert, - insertNss, - UUID::gen(), - BSON("doc" << 2), - boost::none /* o2 */) - .getEntry() - .toBSON(); - auto storage = StorageInterface::get(opCtx->getServiceContext()); - const auto oplogNss = NamespaceString::kRsOplogNamespace; - const OpTime earlierRecipientOpTime(Timestamp(9, 1), 1); - const OpTime resumeRecipientOpTime(Timestamp(10, 1), 1); - auto earlierNoOpEntry = makeNoOpOplogEntry(earlierRecipientOpTime, - insertNss, - UUID::gen(), - earlierOplogBson, - instance->getMigrationUUID()); - auto resumeNoOpEntry = makeNoOpOplogEntry(resumeRecipientOpTime, - insertNss, - UUID::gen(), - resumeOplogBson, - instance->getMigrationUUID()); - ASSERT_OK( - storage->insertDocument(opCtx.get(), - oplogNss, - {earlierNoOpEntry.toBSON(), earlierRecipientOpTime.getTimestamp()}, - earlierRecipientOpTime.getTerm())); - ASSERT_OK( - storage->insertDocument(opCtx.get(), - oplogNss, - {resumeNoOpEntry.toBSON(), resumeRecipientOpTime.getTimestamp()}, - resumeRecipientOpTime.getTerm())); - - hangAfterStartingOplogFetcher->setMode(FailPoint::off); - hangAfterStartingOplogApplier->waitForTimesEntered(initialTimesEntered + 1); - - auto oplogFetcher = getDonorOplogFetcher(instance.get()); - auto dataConsistentOplogEntry = makeOplogEntry(dataConsistentOpTime, - OpTypeEnum::kInsert, - insertNss, - UUID::gen(), - BSON("doc" << 3), - boost::none /* o2 */); - // Feed the oplog fetcher the last doc required for the recipient to be considered consistent. - oplogFetcher->receiveBatch( - 1, {dataConsistentOplogEntry.getEntry().toBSON()}, dataConsistentOpTime.getTimestamp()); - - // Allow the service to continue. - hangAfterStartingOplogApplier->setMode(FailPoint::off); - LOGV2(5272350, - "Waiting for recipient service to reach consistent state", - "suite"_attr = _agent.getSuiteName(), - "test"_attr = _agent.getTestName()); - instance->waitUntilMigrationReachesConsistentState(opCtx.get()); - - // The oplog applier should have started batching and applying at the donor opTime equal to - // 'resumeOpTime'. - const auto oplogApplier = getTenantOplogApplier(instance.get()); - ASSERT_EQUALS(resumeOpTime, oplogApplier->getStartApplyingAfterOpTime()); - ASSERT_EQUALS(resumeOpTime.getTimestamp(), oplogApplier->getResumeBatchingTs()); - - // Stop the oplog applier. - instance->stopOplogApplier_forTest(); - // Wait for task completion. Since we're using a test function to cancel the applier, - // the actual result is not critical. - ASSERT_NOT_OK(instance->getDataSyncCompletionFuture().getNoThrow()); - ASSERT_OK(instance->getForgetMigrationDurableFuture().getNoThrow()); -} - -TEST_F(TenantMigrationRecipientServiceTest, - OplogApplierResumesBatchingAndApplyingAtDifferentTimestamps) { - const UUID migrationUUID = UUID::gen(); - // Donor opTimes - const OpTime startApplyingOpTime(Timestamp(2, 1), 1); - const OpTime dataConsistentOpTime(Timestamp(4, 1), 1); - - MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); - getTopologyManager()->setTopologyDescription(replSet.getTopologyDescription(clock())); - insertTopOfOplog(&replSet, startApplyingOpTime); - - const auto tenantId = OID::gen().toString(); - TenantMigrationRecipientDocument initialStateDocument( - migrationUUID, - replSet.getConnectionString(), - tenantId, - kDefaultStartMigrationTimestamp, - ReadPreferenceSetting(ReadPreference::PrimaryOnly)); - initialStateDocument.setProtocol(MigrationProtocolEnum::kMultitenantMigrations); - initialStateDocument.setRecipientCertificateForDonor(kRecipientPEMPayload); - - // We skip cloning here as a way to simulate that the recipient service has detected an existing - // migration on startup and will attempt to resume oplog fetching from the appropriate optime. - updateStateDocToCloningFinished(initialStateDocument, - OpTime(Timestamp(10, 1), 1) /* cloneFinishedRecipientOpTime - */ - , - dataConsistentOpTime /* dataConsistentStopDonorOpTime */, - startApplyingOpTime /* startApplyingDonorOpTime */, - startApplyingOpTime /* startFetchingDonorOpTime */); - - auto opCtx = makeOperationContext(); - std::shared_ptr<TenantMigrationRecipientService::Instance> instance; - - // Hang before creating the oplog applier. - const auto hangBeforeCreatingOplogApplier = - globalFailPointRegistry().find("fpAfterStartingOplogFetcherMigrationRecipientInstance"); - hangBeforeCreatingOplogApplier->setMode(FailPoint::alwaysOn, - 0, - BSON("action" - << "hang")); - // Hang after starting the oplog applier. - const auto hangAfterStartingOplogApplier = - globalFailPointRegistry().find("fpAfterStartingOplogApplierMigrationRecipientInstance"); - auto initialTimesEntered = hangAfterStartingOplogApplier->setMode(FailPoint::alwaysOn, - 0, - BSON("action" - << "hang")); - - { - FailPointEnableBlock fp("pauseBeforeRunTenantMigrationRecipientInstance"); - // Create and start the instance. - instance = TenantMigrationRecipientService::Instance::getOrCreate( - opCtx.get(), _service, initialStateDocument.toBSON()); - ASSERT(instance.get()); - instance->setCreateOplogFetcherFn_forTest(std::make_unique<CreateOplogFetcherMockFn>()); - } - - // Create and insert the following into the oplog: - // - (1) An oplog entry with opTime earlier than 'cloneFinishedRecipientOpTime'. - // - (2) An oplog entry with opTime greater than 'cloneFinishedRecipientOpTime'. - // - (3) A no-op oplog entry with an inner donor oplog entry as the 'o2' field. The donor opTime - // is less than the 'startApplyingDonorOpTime'. We will resume batching from this - // timestamp. - // - (4) A no-op oplog entry with an inner oplog entry as the 'o2' field but no - // 'fromTenantMigrate' field. This oplog entry does not satisfy the conditions - // for the oplog applier to resume applying from so we default to apply from - // 'startDonorApplyingOpTime'. - const auto insertNss = NamespaceString::createNamespaceString_forTest(tenantId + "_foo.bar"); - const auto beforeStartApplyingOpTime = OpTime(Timestamp(1, 1), 1); - const auto entryBeforeStartApplyingOpTime = makeOplogEntry( - beforeStartApplyingOpTime, - OpTypeEnum::kInsert, - insertNss, - UUID::gen(), - BSON("doc" - << "before startApplyingDonorOpTime"), - boost::none /* o2 */) - .getEntry() - .toBSON(); - const auto afterStartApplyingOpTime = OpTime(Timestamp(3, 1), 1); - const auto entryAfterStartApplyingOpTime = makeOplogEntry( - afterStartApplyingOpTime, - OpTypeEnum::kInsert, - insertNss, - UUID::gen(), - BSON("doc" - << "after startApplyingDonorOpTime"), - boost::none /* o2 */) - .getEntry() - .toBSON(); - auto storage = StorageInterface::get(opCtx->getServiceContext()); - const auto oplogNss = NamespaceString::kRsOplogNamespace; - const auto collUuid = UUID::gen(); - std::vector<DurableOplogEntry> oplogEntries; - std::vector<MutableOplogEntry> noOpEntries; - // (1) - oplogEntries.push_back(makeOplogEntry(OpTime(Timestamp(9, 1), 1), - OpTypeEnum::kInsert, - insertNss, - collUuid, - BSON("doc" - << "before clonerFinishedOpTime"), - boost::none /* o2 */) - .getEntry()); - // (2) - oplogEntries.push_back(makeOplogEntry(OpTime(Timestamp(11, 1), 1), - OpTypeEnum::kInsert, - insertNss, - collUuid, - BSON("doc" - << "after clonerFinishedOpTime"), - boost::none /* o2 */) - .getEntry()); - // (3) - noOpEntries.push_back(makeNoOpOplogEntry(OpTime(Timestamp(12, 1), 1), - insertNss, - collUuid, - entryBeforeStartApplyingOpTime, - instance->getMigrationUUID())); - // (4) - noOpEntries.push_back(makeNoOpOplogEntry(OpTime(Timestamp(13, 1), 1), - insertNss, - collUuid, - entryAfterStartApplyingOpTime, - boost::none /* migrationUUID */)); - for (const auto& entry : oplogEntries) { - auto opTime = entry.getOpTime(); - ASSERT_OK(storage->insertDocument( - opCtx.get(), oplogNss, {entry.toBSON(), opTime.getTimestamp()}, opTime.getTerm())); - } - for (const auto& entry : noOpEntries) { - auto opTime = entry.getOpTime(); - ASSERT_OK(storage->insertDocument( - opCtx.get(), oplogNss, {entry.toBSON(), opTime.getTimestamp()}, opTime.getTerm())); - } - // Move on to the next failpoint to hang after starting the oplog applier. - hangBeforeCreatingOplogApplier->setMode(FailPoint::off); - hangAfterStartingOplogApplier->waitForTimesEntered(initialTimesEntered + 1); - - auto dataConsistentOplogEntry = - makeOplogEntry(dataConsistentOpTime, - OpTypeEnum::kInsert, - NamespaceString::createNamespaceString_forTest(tenantId + "_foo.bar"), - UUID::gen(), - BSON("doc" << 3), - boost::none /* o2 */); - - auto oplogFetcher = getDonorOplogFetcher(instance.get()); - // Feed the oplog fetcher the last doc required for the recipient to be considered consistent. - oplogFetcher->receiveBatch( - 1, {dataConsistentOplogEntry.getEntry().toBSON()}, dataConsistentOpTime.getTimestamp()); - - // Allow the service to continue. - hangAfterStartingOplogApplier->setMode(FailPoint::off); - LOGV2(5272340, - "Waiting for recipient service to reach consistent state", - "suite"_attr = _agent.getSuiteName(), - "test"_attr = _agent.getTestName()); - instance->waitUntilMigrationReachesConsistentState(opCtx.get()); - - const auto oplogApplier = getTenantOplogApplier(instance.get()); - // Resume batching from the first migration no-op oplog entry. In this test, this is before - // the 'startApplyingDonorOpTime'. - ASSERT_EQUALS(beforeStartApplyingOpTime.getTimestamp(), oplogApplier->getResumeBatchingTs()); - // The oplog applier starts applying from the donor opTime equal to 'beginApplyingOpTime'. - ASSERT_EQUALS(startApplyingOpTime, oplogApplier->getStartApplyingAfterOpTime()); - - // Stop the oplog applier. - instance->stopOplogApplier_forTest(); - // Wait for task completion. Since we're using a test function to cancel the applier, - // the actual result is not critical. - ASSERT_NOT_OK(instance->getDataSyncCompletionFuture().getNoThrow()); - ASSERT_OK(instance->getForgetMigrationDurableFuture().getNoThrow()); -} - -TEST_F(TenantMigrationRecipientServiceTest, OplogApplierResumesFromStartDonorApplyingOpTime) { - const UUID migrationUUID = UUID::gen(); - // Donor opTimes - const OpTime startApplyingOpTime(Timestamp(2, 1), 1); - const OpTime dataConsistentOpTime(Timestamp(4, 1), 1); - - MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); - getTopologyManager()->setTopologyDescription(replSet.getTopologyDescription(clock())); - insertTopOfOplog(&replSet, startApplyingOpTime); - - const auto tenantId = OID::gen().toString(); - TenantMigrationRecipientDocument initialStateDocument( - migrationUUID, - replSet.getConnectionString(), - tenantId, - kDefaultStartMigrationTimestamp, - ReadPreferenceSetting(ReadPreference::PrimaryOnly)); - initialStateDocument.setProtocol(MigrationProtocolEnum::kMultitenantMigrations); - initialStateDocument.setRecipientCertificateForDonor(kRecipientPEMPayload); - - // We skip cloning here as a way to simulate that the recipient service has detected an existing - // migration on startup and will attempt to resume oplog fetching from the appropriate optime. - updateStateDocToCloningFinished(initialStateDocument, - OpTime(Timestamp(10, 1), 1) /* cloneFinishedRecipientOpTime - */ - , - dataConsistentOpTime /* dataConsistentStopDonorOpTime */, - startApplyingOpTime /* startApplyingDonorOpTime */, - startApplyingOpTime /* startFetchingDonorOpTime */); - - auto opCtx = makeOperationContext(); - std::shared_ptr<TenantMigrationRecipientService::Instance> instance; - - // Hang before starting the oplog applier. - const auto hangAfterStartingOplogApplier = - globalFailPointRegistry().find("fpAfterStartingOplogApplierMigrationRecipientInstance"); - auto initialTimesEntered = hangAfterStartingOplogApplier->setMode(FailPoint::alwaysOn, - 0, - BSON("action" - << "hang")); - - { - FailPointEnableBlock fp("pauseBeforeRunTenantMigrationRecipientInstance"); - // Create and start the instance. - instance = TenantMigrationRecipientService::Instance::getOrCreate( - opCtx.get(), _service, initialStateDocument.toBSON()); - ASSERT(instance.get()); - instance->setCreateOplogFetcherFn_forTest(std::make_unique<CreateOplogFetcherMockFn>()); - } - - // Create and insert the following into the oplog: - // - (1) An oplog entry with opTime earlier than 'cloneFinishedRecipientOpTime'. - // - (2) An oplog entry with opTime greater than 'cloneFinishedRecipientOpTime'. - // - (3) A no-op oplog entry with an inner oplog entry as the 'o2' field but no - // 'fromTenantMigrate' field. This oplog entry does not satisfy the conditions - // for the oplog applier to resume applying from so we default to applying and - // batching from the start of the buffer collection. - const auto insertNss = NamespaceString::createNamespaceString_forTest(tenantId + "_foo.bar"); - const auto afterStartApplyingOpTime = OpTime(Timestamp(3, 1), 1); - const auto entryAfterStartApplyingOpTime = makeOplogEntry( - afterStartApplyingOpTime, - OpTypeEnum::kInsert, - insertNss, - UUID::gen(), - BSON("doc" - << "after startApplyingDonorOpTime"), - boost::none /* o2 */) - .getEntry() - .toBSON(); - auto storage = StorageInterface::get(opCtx->getServiceContext()); - const auto oplogNss = NamespaceString::kRsOplogNamespace; - const auto collUuid = UUID::gen(); - std::vector<DurableOplogEntry> oplogEntries; - std::vector<MutableOplogEntry> noOpEntries; - // (1) - oplogEntries.push_back(makeOplogEntry(OpTime(Timestamp(9, 1), 1), - OpTypeEnum::kInsert, - insertNss, - collUuid, - BSON("doc" - << "before clonerFinishedOpTime"), - boost::none /* o2 */) - .getEntry()); - // (2) - oplogEntries.push_back(makeOplogEntry(OpTime(Timestamp(11, 1), 1), - OpTypeEnum::kInsert, - insertNss, - collUuid, - BSON("doc" - << "after clonerFinishedOpTime"), - boost::none /* o2 */) - .getEntry()); - // (3) - const auto laterOpTime = OpTime(Timestamp(13, 1), 1); - const auto noOpEntry = makeNoOpOplogEntry(laterOpTime, - insertNss, - collUuid, - entryAfterStartApplyingOpTime, - boost::none /* migrationUUID */); - - for (const auto& entry : oplogEntries) { - auto opTime = entry.getOpTime(); - ASSERT_OK(storage->insertDocument( - opCtx.get(), oplogNss, {entry.toBSON(), opTime.getTimestamp()}, opTime.getTerm())); - } - ASSERT_OK(storage->insertDocument(opCtx.get(), - oplogNss, - {noOpEntry.toBSON(), laterOpTime.getTimestamp()}, - laterOpTime.getTerm())); - - hangAfterStartingOplogApplier->waitForTimesEntered(initialTimesEntered + 1); - - auto dataConsistentOplogEntry = - makeOplogEntry(dataConsistentOpTime, - OpTypeEnum::kInsert, - NamespaceString::createNamespaceString_forTest(tenantId + "_foo.bar"), - UUID::gen(), - BSON("doc" << 3), - boost::none /* o2 */); - - auto oplogFetcher = getDonorOplogFetcher(instance.get()); - // Feed the oplog fetcher the last doc required for the recipient to be considered consistent. - oplogFetcher->receiveBatch( - 1, {dataConsistentOplogEntry.getEntry().toBSON()}, dataConsistentOpTime.getTimestamp()); - - // Allow the service to continue. - hangAfterStartingOplogApplier->setMode(FailPoint::off); - LOGV2(5394602, - "Waiting for recipient service to reach consistent state", - "suite"_attr = _agent.getSuiteName(), - "test"_attr = _agent.getTestName()); - instance->waitUntilMigrationReachesConsistentState(opCtx.get()); - - const auto oplogApplier = getTenantOplogApplier(instance.get()); - // There is no oplog entry to resume batching from, so we treat it as if we are resuming - // oplog application from the start. The 'resumeBatchingTs' will be a null timestamp. - ASSERT_EQUALS(Timestamp(), oplogApplier->getResumeBatchingTs()); - // The oplog applier starts applying from the donor opTime equal to 'beginApplyingOpTime'. - ASSERT_EQUALS(startApplyingOpTime, oplogApplier->getStartApplyingAfterOpTime()); - - // Stop the oplog applier. - instance->stopOplogApplier_forTest(); - // Wait for task completion. Since we're using a test function to cancel the applier, - // the actual result is not critical. - ASSERT_NOT_OK(instance->getDataSyncCompletionFuture().getNoThrow()); - ASSERT_OK(instance->getForgetMigrationDurableFuture().getNoThrow()); -} - TEST_F(TenantMigrationRecipientServiceTest, OplogFetcherResumesFromStartFetchingOpTimeWithDocInBuffer) { const UUID migrationUUID = UUID::gen(); diff --git a/src/mongo/db/repl/tenant_oplog_applier.cpp b/src/mongo/db/repl/tenant_oplog_applier.cpp index aa92bd63a88..95c2643513b 100644 --- a/src/mongo/db/repl/tenant_oplog_applier.cpp +++ b/src/mongo/db/repl/tenant_oplog_applier.cpp @@ -41,6 +41,7 @@ #include "mongo/db/db_raii.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/op_observer/op_observer.h" +#include "mongo/db/persistent_task_store.h" #include "mongo/db/repl/apply_ops.h" #include "mongo/db/repl/cloner_utils.h" #include "mongo/db/repl/insert_group.h" @@ -70,21 +71,25 @@ MONGO_FAIL_POINT_DEFINE(fpBeforeTenantOplogApplyingBatch); TenantOplogApplier::TenantOplogApplier(const UUID& migrationUuid, const MigrationProtocolEnum& protocol, boost::optional<std::string> tenantId, + boost::optional<NamespaceString> progressNss, OpTime startApplyingAfterOpTime, RandomAccessOplogBuffer* oplogBuffer, std::shared_ptr<executor::TaskExecutor> executor, ThreadPool* writerPool, - Timestamp resumeBatchingTs) + OpTime cloneFinishedRecipientOpTime, + const bool isResuming) : AbstractAsyncComponent(executor.get(), std::string("TenantOplogApplier_") + migrationUuid.toString()), _migrationUuid(migrationUuid), + _progressNamespaceString(progressNss), _protocol(protocol), _tenantId(tenantId), _startApplyingAfterOpTime(startApplyingAfterOpTime), _oplogBuffer(oplogBuffer), _executor(std::move(executor)), + _cloneFinishedRecipientOpTime(cloneFinishedRecipientOpTime), _writerPool(writerPool), - _resumeBatchingTs(resumeBatchingTs) { + _isResuming(isResuming) { if (_protocol != MigrationProtocolEnum::kShardMerge) { invariant(_tenantId); } else { @@ -120,21 +125,58 @@ OpTime TenantOplogApplier::getStartApplyingAfterOpTime() const { return _startApplyingAfterOpTime; } -Timestamp TenantOplogApplier::getResumeBatchingTs() const { - return _resumeBatchingTs; +boost::optional<TenantOplogApplierProgress> TenantOplogApplier::getStoredProgress( + OperationContext* opCtx) { + if (!_progressNamespaceString) { + return boost::none; + } + + DBDirectClient client(opCtx); + const auto tenantOplogApplierProgress = + client.findOne(_progressNamespaceString.get(), + BSON(TenantOplogApplierProgress::kMigrationUuidFieldName << _migrationUuid)); + if (tenantOplogApplierProgress.isEmpty()) { + return boost::none; + } + + IDLParserContext ctx("TenantOplogApplierProgress"); + return TenantOplogApplierProgress::parse(ctx, tenantOplogApplierProgress); } -void TenantOplogApplier::setCloneFinishedRecipientOpTime(OpTime cloneFinishedRecipientOpTime) { - stdx::lock_guard lk(_mutex); - invariant(!_isActive_inlock()); - invariant(!cloneFinishedRecipientOpTime.isNull()); - invariant(_cloneFinishedRecipientOpTime.isNull()); - _cloneFinishedRecipientOpTime = cloneFinishedRecipientOpTime; +void TenantOplogApplier::_storeProgress(OperationContext* opCtx, OpTime donorOpTime) { + if (!_progressNamespaceString) { + return; + } + + PersistentTaskStore<TenantOplogApplierProgress> store(_progressNamespaceString.get()); + + BSONObjBuilder builder; + builder.append("$set", BSON(TenantOplogApplierProgress::kDonorOpTimeFieldName << donorOpTime)); + + store.upsert(opCtx, + BSON(TenantOplogApplierProgress::kMigrationUuidFieldName << _migrationUuid), + builder.obj()); } void TenantOplogApplier::_doStartup_inlock() { + Timestamp resumeBatchingTs = Timestamp(); + if (_isResuming) { + if (_deprecatedResumeOpTime.isNull()) { + auto opCtx = cc().makeOperationContext(); + if (const auto storedProgress = getStoredProgress(opCtx.get())) { + auto donorOpTime = storedProgress->getDonorOpTime(); + _startApplyingAfterOpTime = std::max(donorOpTime, _startApplyingAfterOpTime); + resumeBatchingTs = donorOpTime.getTimestamp(); + } + } else { + _startApplyingAfterOpTime = + std::max(_deprecatedResumeOpTime, _startApplyingAfterOpTime); + resumeBatchingTs = _deprecatedResumeOpTime.getTimestamp(); + } + } + _oplogBatcher = std::make_shared<TenantOplogBatcher>( - _migrationUuid, _oplogBuffer, _executor, _resumeBatchingTs, _startApplyingAfterOpTime); + _migrationUuid, _oplogBuffer, _executor, resumeBatchingTs, _startApplyingAfterOpTime); uassertStatusOK(_oplogBatcher->startup()); auto fut = _oplogBatcher->getNextBatch( TenantOplogBatcher::BatchLimits(std::size_t(tenantApplierBatchSizeBytes.load()), @@ -322,6 +364,7 @@ void TenantOplogApplier::_applyOplogBatch(TenantOplogBatch* batch) { "Tenant Oplog Applier starting to write no-ops", "protocol"_attr = _protocol, "migrationId"_attr = _migrationUuid); + auto lastBatchCompletedOpTimes = _writeNoOpEntries(opCtx.get(), *batch); stdx::lock_guard lk(_mutex); _lastAppliedOpTimesUpToLastBatch.donorOpTime = lastBatchCompletedOpTimes.donorOpTime; @@ -334,6 +377,8 @@ void TenantOplogApplier::_applyOplogBatch(TenantOplogBatch* batch) { _numOpsApplied += batch->ops.size(); + _storeProgress(opCtx.get(), lastBatchCompletedOpTimes.donorOpTime); + LOGV2_DEBUG(4886002, 1, "Tenant Oplog Applier finished applying batch", @@ -630,7 +675,12 @@ void TenantOplogApplier::_writeSessionNoOpsForRange( // Check out the session. if (!scopedSession) { auto mongoDSessionCatalog = MongoDSessionCatalog::get(opCtx.get()); - scopedSession = mongoDSessionCatalog->checkOutSessionWithoutOplogRead(opCtx.get()); + if (_isResuming) { + scopedSession = mongoDSessionCatalog->checkOutSession(opCtx.get()); + } else { + scopedSession = + mongoDSessionCatalog->checkOutSessionWithoutOplogRead(opCtx.get()); + } } auto txnParticipant = TransactionParticipant::get(opCtx.get()); @@ -736,6 +786,7 @@ void TenantOplogApplier::_writeSessionNoOpsForRange( noopEntry.setObject2(o2Entry.toBSON()); } } + stmtIds.insert(stmtIds.end(), entryStmtIds.begin(), entryStmtIds.end()); if (!prePostImageEntry && (entry.getPreImageOpTime() || entry.getPostImageOpTime())) { @@ -779,9 +830,15 @@ void TenantOplogApplier::_writeSessionNoOpsForRange( opCtx->setLogicalSessionId(sessionId); opCtx->setTxnNumber(txnNumber); } + if (!scopedSession) { auto mongoDSessionCatalog = MongoDSessionCatalog::get(opCtx.get()); - scopedSession = mongoDSessionCatalog->checkOutSessionWithoutOplogRead(opCtx.get()); + if (_isResuming) { + scopedSession = mongoDSessionCatalog->checkOutSession(opCtx.get()); + } else { + scopedSession = + mongoDSessionCatalog->checkOutSessionWithoutOplogRead(opCtx.get()); + } } auto txnParticipant = TransactionParticipant::get(opCtx.get()); @@ -790,6 +847,7 @@ void TenantOplogApplier::_writeSessionNoOpsForRange( "for transaction " << txnNumber << " on session " << sessionId, txnParticipant); + // beginOrContinue throws on failure, which will abort the migration. Failure should // only result from out-of-order processing, which should not happen. TxnNumberAndRetryCounter txnNumberAndRetryCounter{txnNumber}; @@ -819,7 +877,7 @@ void TenantOplogApplier::_writeSessionNoOpsForRange( "_cloneFinishedRecipientOpTime"_attr = _cloneFinishedRecipientOpTime, "sessionId"_attr = sessionId, "txnNumber"_attr = txnNumber, - "statementIds"_attr = entryStmtIds, + "statementIds"_attr = stmtIds, "protocol"_attr = _protocol, "migrationId"_attr = _migrationUuid); txnParticipant.invalidate(opCtx.get()); @@ -832,19 +890,26 @@ void TenantOplogApplier::_writeSessionNoOpsForRange( } // We should never process the same donor statement twice, except in failover - // cases where we'll also have "forgotten" the statement was executed. - uassert(5350902, - str::stream() << "Tenant oplog application processed same retryable write " - "twice for transaction " - << txnNumber << " statement " << entryStmtIds.front() - << " on session " << sessionId, - !txnParticipant.checkStatementExecutedNoOplogEntryFetch(opCtx.get(), - entryStmtIds.front())); + // cases. In the event of a failover, it is possible that we were able to successfully + // log the noop but failed to persist progress checkpoint data. As a result, we can end + // up re-applying noop entries. We can safely skip the entry in this case. + if (txnParticipant.checkStatementExecutedNoOplogEntryFetch(opCtx.get(), + stmtIds.front())) { + LOGV2_DEBUG(7262200, + 1, + "Tenant Oplog Applier skipping previously processed retryable write", + "protocol"_attr = _protocol, + "migrationId"_attr = _migrationUuid, + "txnNumber"_attr = txnNumber, + "statement"_attr = entryStmtIds.front(), + "sessionId"_attr = sessionId); + continue; + } // Set sessionId, txnNumber, and statementId for all ops in a retryable write. noopEntry.setSessionId(sessionId); noopEntry.setTxnNumber(txnNumber); - noopEntry.setStatementIds(entryStmtIds); + noopEntry.setStatementIds(stmtIds); // set fromMigrate on the no-op so the session update tracker recognizes it. noopEntry.setFromMigrate(true); @@ -884,7 +949,6 @@ void TenantOplogApplier::_writeSessionNoOpsForRange( TransactionParticipant::get(opCtx.get()) .onWriteOpCompletedOnPrimary(opCtx.get(), {stmtIds}, *sessionTxnRecord); } - wuow.commit(); }); prePostImageEntry = boost::none; diff --git a/src/mongo/db/repl/tenant_oplog_applier.h b/src/mongo/db/repl/tenant_oplog_applier.h index c4c218b7289..c9d79ca0ec4 100644 --- a/src/mongo/db/repl/tenant_oplog_applier.h +++ b/src/mongo/db/repl/tenant_oplog_applier.h @@ -36,6 +36,7 @@ #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/oplog_buffer.h" #include "mongo/db/repl/oplog_entry.h" +#include "mongo/db/repl/tenant_oplog_applier_progress_gen.h" #include "mongo/db/repl/tenant_oplog_batcher.h" #include "mongo/db/serverless/serverless_types_gen.h" #include "mongo/util/future.h" @@ -75,11 +76,13 @@ public: TenantOplogApplier(const UUID& migrationUuid, const MigrationProtocolEnum& protocol, boost::optional<std::string> tenantId, + boost::optional<NamespaceString> progressNss, OpTime StartApplyingAfterOpTime, RandomAccessOplogBuffer* oplogBuffer, std::shared_ptr<executor::TaskExecutor> executor, ThreadPool* writerPool, - Timestamp resumeBatchingTs = Timestamp()); + OpTime cloneFinishedRecipientOpTime, + bool isResuming); virtual ~TenantOplogApplier(); @@ -98,19 +101,20 @@ public: } /** - * This should only be called once before the applier starts. + * Returns information describing TenantOplogApplier progress for the + * given migration UUID. */ - void setCloneFinishedRecipientOpTime(OpTime cloneFinishedRecipientOpTime); + boost::optional<TenantOplogApplierProgress> getStoredProgress(OperationContext* opCtx); /** * Returns the optime the applier will start applying from. */ OpTime getStartApplyingAfterOpTime() const; - /** - * Returns the timestamp the applier will resume batching from. - */ - Timestamp getResumeBatchingTs() const; + void setDeprecatedResumeOpTime(OpTime deprecatedResumeOpTime) { + stdx::lock_guard lk(_mutex); + _deprecatedResumeOpTime = deprecatedResumeOpTime; + } private: void _doStartup_inlock() final; @@ -143,6 +147,11 @@ private: TenantOplogBatch* batch); /** + * Stores information describing TenantOplogApplier progress in a replicated collection. + */ + void _storeProgress(OperationContext* opCtx, OpTime donorOpTime); + + /** * Sets the _finalStatus to the new status if and only if the old status is "OK". */ void _setFinalStatusIfOk(WithLock, Status newStatus); @@ -161,13 +170,14 @@ private: // (X) Access only allowed from the main flow of control called from run() or constructor. // Handles consuming oplog entries from the OplogBuffer for oplog application. - std::shared_ptr<TenantOplogBatcher> _oplogBatcher; // (R) - const UUID _migrationUuid; // (R) - const MigrationProtocolEnum _protocol; // (R) + std::shared_ptr<TenantOplogBatcher> _oplogBatcher; // (R) + const UUID _migrationUuid; // (R) + const boost::optional<NamespaceString> _progressNamespaceString; // (R) + const MigrationProtocolEnum _protocol; // (R) // For multi-tenant migration protocol, _tenantId is set. // But, for shard merge protcol, _tenantId is empty. const boost::optional<std::string> _tenantId; // (R) - const OpTime _startApplyingAfterOpTime; // (R) + OpTime _startApplyingAfterOpTime; // (R) RandomAccessOplogBuffer* _oplogBuffer; // (R) std::shared_ptr<executor::TaskExecutor> _executor; // (R) // All no-op entries written by this tenant migration should have OpTime greater than this @@ -179,10 +189,11 @@ private: // Pool of worker threads for writing ops to the databases. // Not owned by us. ThreadPool* const _writerPool; // (S) - // The timestamp to resume batching from. A null timestamp indicates that the oplog applier - // is starting fresh (not a retry), and will start batching from the beginning of the oplog - // buffer. - const Timestamp _resumeBatchingTs; // (R) + // Used when resuming oplog applier state via oplog scanning for nodes running FCV <= 6.3. If + // set, we will use this as our _startApplyingAfterOpTime, as well as the resume batching + // timestamp passed to the TenantOplogBatcher. + OpTime _deprecatedResumeOpTime; // (M) + const bool _isResuming; // (R) std::map<OpTime, SharedPromise<OpTimePair>> _opTimeNotificationList; // (M) Status _finalStatus = Status::OK(); // (M) stdx::unordered_set<UUID, UUID::Hash> _knownGoodUuids; // (X) diff --git a/src/mongo/db/repl/tenant_oplog_applier_progress.idl b/src/mongo/db/repl/tenant_oplog_applier_progress.idl new file mode 100644 index 00000000000..a8cf8c98b03 --- /dev/null +++ b/src/mongo/db/repl/tenant_oplog_applier_progress.idl @@ -0,0 +1,50 @@ +# Copyright (C) 2023-present MongoDB, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the Server Side Public License, version 1, +# as published by MongoDB, Inc. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Server Side Public License for more details. +# +# You should have received a copy of the Server Side Public License +# along with this program. If not, see +# <http://www.mongodb.com/licensing/server-side-public-license>. +# +# As a special exception, the copyright holders give permission to link the +# code of portions of this program with the OpenSSL library under certain +# conditions as described in each individual source file and distribute +# linked combinations including the program with the OpenSSL library. You +# must comply with the Server Side Public License in all respects for +# all of the code used other than as permitted herein. If you modify file(s) +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, +# delete this exception statement from your version. If you delete this +# exception statement from all source files in the program, then also delete +# it in the license file. +# + +# This file defines the document used for storing progress by the tenant oplog applier. + +global: + cpp_namespace: "mongo" + +imports: + - "mongo/db/basic_types.idl" + - "mongo/db/repl/replication_types.idl" + +structs: + TenantOplogApplierProgress: + description: "Used for storing the progress made by the tenant oplog applier." + strict: true + fields: + _id: + type: uuid + description: "The UUID of the associated tenant migration." + cpp_name: migrationUuid + donorOpTime: + type: optime + description: "The last applied donor optime." + cpp_name: donorOpTime diff --git a/src/mongo/db/repl/tenant_oplog_applier_test.cpp b/src/mongo/db/repl/tenant_oplog_applier_test.cpp index 7e2b2bc2862..83ddd04502b 100644 --- a/src/mongo/db/repl/tenant_oplog_applier_test.cpp +++ b/src/mongo/db/repl/tenant_oplog_applier_test.cpp @@ -38,6 +38,7 @@ #include "mongo/db/repl/oplog_applier_impl_test_fixture.h" #include "mongo/db/repl/oplog_batcher_test_fixture.h" #include "mongo/db/repl/oplog_entry_test_helpers.h" +#include "mongo/db/repl/oplog_interface_local.h" #include "mongo/db/repl/repl_server_parameters_gen.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/repl/storage_interface_impl.h" @@ -155,8 +156,8 @@ public: // Set up oplog collection. If the WT storage engine is used, the oplog collection is // expected to exist when fetching the next opTime (LocalOplogInfo::getNextOpTimes) to use // for a write. - _opCtx = cc().makeOperationContext(); - repl::createOplog(_opCtx.get()); + auto opCtx = cc().makeOperationContext(); + repl::createOplog(opCtx.get()); MongoDSessionCatalog::set( service, @@ -164,7 +165,7 @@ public: std::make_unique<MongoDSessionCatalogTransactionInterfaceImpl>())); // Ensure that we are primary. - auto replCoord = ReplicationCoordinator::get(_opCtx.get()); + auto replCoord = ReplicationCoordinator::get(opCtx.get()); ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_PRIMARY)); } @@ -188,8 +189,42 @@ public: _oplogBuffer.push(nullptr, bsonOps.begin(), bsonOps.end()); } + std::shared_ptr<TenantOplogApplier> makeTenantMigrationOplogApplier( + ThreadPool* writerPool, + OpTime startApplyingAfterOpTime = OpTime(), + OpTime cloneFinishedOpTime = OpTime(), + boost::optional<NamespaceString> progressNss = boost::none, + const bool isResuming = false) { + return std::make_shared<TenantOplogApplier>(_migrationUuid, + MigrationProtocolEnum::kMultitenantMigrations, + _tenantId, + progressNss, + startApplyingAfterOpTime, + &_oplogBuffer, + _executor, + writerPool, + cloneFinishedOpTime, + isResuming); + }; + + std::shared_ptr<TenantOplogApplier> makeShardMergeOplogApplier( + ThreadPool* writerPool, + OpTime startApplyingAfterOpTime = OpTime(), + OpTime cloneFinishedOpTime = OpTime()) { + return std::make_shared<TenantOplogApplier>(_migrationUuid, + MigrationProtocolEnum::kShardMerge, + boost::none, + boost::none, + startApplyingAfterOpTime, + &_oplogBuffer, + _executor, + writerPool, + cloneFinishedOpTime, + false); + }; + StorageInterface* getStorageInterface() { - return StorageInterface::get(_opCtx->getServiceContext()); + return StorageInterface::get(getServiceContext()); } protected: @@ -199,7 +234,6 @@ protected: std::string _tenantId = OID::gen().toString(); DatabaseName _dbName = DatabaseName(TenantId(OID(_tenantId)), "test"); UUID _migrationUuid = UUID::gen(); - ServiceContext::UniqueOperationContext _opCtx; TenantOplogApplierTestOpObserver* _opObserver; // Owned by service context opObserverRegistry private: @@ -225,14 +259,7 @@ TEST_F(TenantOplogApplierTest, NoOpsForSingleBatch) { auto writerPool = makeTenantMigrationWriterPool(); - auto applier = - std::make_shared<TenantOplogApplier>(_migrationUuid, - MigrationProtocolEnum::kMultitenantMigrations, - _tenantId, - OpTime(), - &_oplogBuffer, - _executor, - writerPool.get()); + auto applier = makeTenantMigrationOplogApplier(writerPool.get()); ASSERT_OK(applier->startup()); // Even if we wait for the first op in a batch, it is the last op we should be notified on. auto lastBatchTimes = applier->getNotificationForOpTime(srcOps.front().getOpTime()).get(); @@ -243,7 +270,8 @@ TEST_F(TenantOplogApplierTest, NoOpsForSingleBatch) { assertNoOpMatches(srcOps[1], entries[1]); ASSERT_EQ(srcOps.size(), applier->getNumOpsApplied()); applier->shutdown(); - _oplogBuffer.shutdown(_opCtx.get()); + auto opCtx = cc().makeOperationContext(); + _oplogBuffer.shutdown(opCtx.get()); applier->join(); } @@ -260,14 +288,7 @@ TEST_F(TenantOplogApplierTest, NoOpsForLargeBatch) { auto writerPool = makeTenantMigrationWriterPool(); - auto applier = - std::make_shared<TenantOplogApplier>(_migrationUuid, - MigrationProtocolEnum::kMultitenantMigrations, - _tenantId, - OpTime(), - &_oplogBuffer, - _executor, - writerPool.get()); + auto applier = makeTenantMigrationOplogApplier(writerPool.get()); ASSERT_OK(applier->startup()); // Even if we wait for the first op in a batch, it is the last op we should be notified on. auto lastBatchTimes = applier->getNotificationForOpTime(srcOps.front().getOpTime()).get(); @@ -279,7 +300,8 @@ TEST_F(TenantOplogApplierTest, NoOpsForLargeBatch) { } ASSERT_EQ(srcOps.size(), applier->getNumOpsApplied()); applier->shutdown(); - _oplogBuffer.shutdown(_opCtx.get()); + auto opCtx = cc().makeOperationContext(); + _oplogBuffer.shutdown(opCtx.get()); applier->join(); } @@ -304,14 +326,7 @@ TEST_F(TenantOplogApplierTest, NoOpsForMultipleBatches) { auto writerPool = makeTenantMigrationWriterPool(); - auto applier = - std::make_shared<TenantOplogApplier>(_migrationUuid, - MigrationProtocolEnum::kMultitenantMigrations, - _tenantId, - OpTime(), - &_oplogBuffer, - _executor, - writerPool.get()); + auto applier = makeTenantMigrationOplogApplier(writerPool.get()); tenantApplierBatchSizeBytes.store(100 * 1024 /* bytes */); tenantApplierBatchSizeOps.store(2 /* ops */); @@ -330,7 +345,8 @@ TEST_F(TenantOplogApplierTest, NoOpsForMultipleBatches) { assertNoOpMatches(srcOps[2], entries[2]); assertNoOpMatches(srcOps[3], entries[3]); applier->shutdown(); - _oplogBuffer.shutdown(_opCtx.get()); + auto opCtx = cc().makeOperationContext(); + _oplogBuffer.shutdown(opCtx.get()); applier->join(); } @@ -370,14 +386,7 @@ TEST_F(TenantOplogApplierTest, NoOpsForLargeTransaction) { auto writerPool = makeTenantMigrationWriterPool(); - auto applier = - std::make_shared<TenantOplogApplier>(_migrationUuid, - MigrationProtocolEnum::kMultitenantMigrations, - _tenantId, - OpTime(), - &_oplogBuffer, - _executor, - writerPool.get()); + auto applier = makeTenantMigrationOplogApplier(writerPool.get()); ASSERT_OK(applier->startup()); // The first two ops should come in the first batch. auto firstBatchFuture = applier->getNotificationForOpTime(srcOps[0].getOpTime()); @@ -391,21 +400,23 @@ TEST_F(TenantOplogApplierTest, NoOpsForLargeTransaction) { assertNoOpMatches(srcOps[i], entries[i]); } applier->shutdown(); - _oplogBuffer.shutdown(_opCtx.get()); + auto opCtx = cc().makeOperationContext(); + _oplogBuffer.shutdown(opCtx.get()); applier->join(); } TEST_F(TenantOplogApplierTest, CommitUnpreparedTransaction_DataPartiallyApplied) { - createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); + auto opCtx = cc().makeOperationContext(); + createCollectionWithUuid(opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); { - DBDirectClient client(_opCtx.get()); + DBDirectClient client(opCtx.get()); client.createIndexes(NamespaceString::kSessionTransactionsTableNamespace, {MongoDSessionCatalog::getConfigTxnPartialIndexSpec()}); } NamespaceString nss = NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar"); - auto uuid = createCollectionWithUuid(_opCtx.get(), nss); - auto lsid = makeLogicalSessionId(_opCtx.get()); + auto uuid = createCollectionWithUuid(opCtx.get(), nss); + auto lsid = makeLogicalSessionId(opCtx.get()); TxnNumber txnNum(0); const BSONObj doc1 = BSON("_id" << 1 << "data" << 1); @@ -430,216 +441,195 @@ TEST_F(TenantOplogApplierTest, CommitUnpreparedTransaction_DataPartiallyApplied) {StmtId(1)}, partialOp.getOpTime()); - ASSERT_OK(getStorageInterface()->insertDocument(_opCtx.get(), + ASSERT_OK(getStorageInterface()->insertDocument(opCtx.get(), nss, {doc1, commitOp.getOpTime().getTimestamp()}, commitOp.getOpTime().getTerm())); - ASSERT_TRUE(docExists(_opCtx.get(), nss, doc1)); - ASSERT_FALSE(docExists(_opCtx.get(), nss, doc2)); + ASSERT_TRUE(docExists(opCtx.get(), nss, doc1)); + ASSERT_FALSE(docExists(opCtx.get(), nss, doc2)); pushOps({partialOp, commitOp}); auto writerPool = makeTenantMigrationWriterPool(); - auto applier = - std::make_shared<TenantOplogApplier>(_migrationUuid, - MigrationProtocolEnum::kMultitenantMigrations, - _tenantId, - OpTime(), - &_oplogBuffer, - _executor, - writerPool.get()); + auto applier = makeTenantMigrationOplogApplier(writerPool.get()); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(commitOp.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); - ASSERT_TRUE(docExists(_opCtx.get(), nss, doc1)); - ASSERT_TRUE(docExists(_opCtx.get(), nss, doc2)); + ASSERT_TRUE(docExists(opCtx.get(), nss, doc1)); + ASSERT_TRUE(docExists(opCtx.get(), nss, doc2)); applier->shutdown(); - _oplogBuffer.shutdown(_opCtx.get()); + _oplogBuffer.shutdown(opCtx.get()); applier->join(); } TEST_F(TenantOplogApplierTest, ApplyInsert_DatabaseMissing) { - auto entry = makeInsertOplogEntry( - 1, - NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar"), - UUID::gen()); - bool onInsertsCalled = false; - _opObserver->onInsertsFn = - [&](OperationContext* opCtx, const NamespaceString&, const std::vector<BSONObj>&) { - onInsertsCalled = true; - }; + auto nss = + NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar"); + auto entry = makeInsertOplogEntry(1, nss, UUID::gen()); + bool onInsertsCalledForNss = false; + _opObserver->onInsertsFn = [&](OperationContext* opCtx, + const NamespaceString& onInsertsNss, + const std::vector<BSONObj>&) { + if (onInsertsNss == nss) { + onInsertsCalledForNss = true; + } + }; pushOps({entry}); auto writerPool = makeTenantMigrationWriterPool(); - auto applier = - std::make_shared<TenantOplogApplier>(_migrationUuid, - MigrationProtocolEnum::kMultitenantMigrations, - _tenantId, - OpTime(), - &_oplogBuffer, - _executor, - writerPool.get()); + auto applier = makeTenantMigrationOplogApplier(writerPool.get()); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); // Since no database was available, the insert shouldn't actually happen. - ASSERT_FALSE(onInsertsCalled); + ASSERT_FALSE(onInsertsCalledForNss); applier->shutdown(); - _oplogBuffer.shutdown(_opCtx.get()); + auto opCtx = cc().makeOperationContext(); + _oplogBuffer.shutdown(opCtx.get()); applier->join(); } TEST_F(TenantOplogApplierTest, ApplyInsert_CollectionMissing) { - createDatabase(_opCtx.get(), _dbName.toString()); - auto entry = makeInsertOplogEntry( - 1, - NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar"), - UUID::gen()); - bool onInsertsCalled = false; - _opObserver->onInsertsFn = - [&](OperationContext* opCtx, const NamespaceString&, const std::vector<BSONObj>&) { - onInsertsCalled = true; - }; + auto opCtx = cc().makeOperationContext(); + createDatabase(opCtx.get(), _dbName.toString()); + auto nss = + NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar"); + auto entry = makeInsertOplogEntry(1, nss, UUID::gen()); + bool onInsertsCalledForNss = false; + _opObserver->onInsertsFn = [&](OperationContext* opCtx, + const NamespaceString& onInsertsNss, + const std::vector<BSONObj>&) { + if (onInsertsNss == nss) { + onInsertsCalledForNss = true; + } + }; pushOps({entry}); auto writerPool = makeTenantMigrationWriterPool(); - auto applier = - std::make_shared<TenantOplogApplier>(_migrationUuid, - MigrationProtocolEnum::kMultitenantMigrations, - _tenantId, - OpTime(), - &_oplogBuffer, - _executor, - writerPool.get()); + auto applier = makeTenantMigrationOplogApplier(writerPool.get()); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); // Since no collection was available, the insert shouldn't actually happen. - ASSERT_FALSE(onInsertsCalled); + ASSERT_FALSE(onInsertsCalledForNss); applier->shutdown(); - _oplogBuffer.shutdown(_opCtx.get()); + _oplogBuffer.shutdown(opCtx.get()); applier->join(); } TEST_F(TenantOplogApplierTest, ApplyInsert_InsertExisting) { + auto opCtx = cc().makeOperationContext(); NamespaceString nss = NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar"); - auto uuid = createCollectionWithUuid(_opCtx.get(), nss); - ASSERT_OK(getStorageInterface()->insertDocument(_opCtx.get(), + auto uuid = createCollectionWithUuid(opCtx.get(), nss); + ASSERT_OK(getStorageInterface()->insertDocument(opCtx.get(), nss, {BSON("_id" << 1 << "data" << "1")}, 0)); auto entry = makeInsertOplogEntry(1, nss, uuid); - bool onInsertsCalled = false; - bool onUpdateCalled = false; - _opObserver->onInsertsFn = - [&](OperationContext* opCtx, const NamespaceString&, const std::vector<BSONObj>&) { - onInsertsCalled = true; - }; - _opObserver->onUpdateFn = [&](OperationContext* opCtx, const OplogUpdateEntryArgs&) { - onUpdateCalled = true; + bool onInsertsCalledForNss = false; + bool onUpdateCalledForNss = false; + _opObserver->onInsertsFn = [&](OperationContext* opCtx, + const NamespaceString& onInsertsNss, + const std::vector<BSONObj>&) { + if (onInsertsNss == nss) { + onInsertsCalledForNss = true; + } + }; + _opObserver->onUpdateFn = [&](OperationContext* opCtx, + const OplogUpdateEntryArgs& onUpdateArgs) { + if (onUpdateArgs.coll->ns() == nss) { + onUpdateCalledForNss = true; + } }; pushOps({entry}); auto writerPool = makeTenantMigrationWriterPool(); - auto applier = - std::make_shared<TenantOplogApplier>(_migrationUuid, - MigrationProtocolEnum::kMultitenantMigrations, - _tenantId, - OpTime(), - &_oplogBuffer, - _executor, - writerPool.get()); + auto applier = makeTenantMigrationOplogApplier(writerPool.get()); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); // This insert gets converted to an upsert. - ASSERT_FALSE(onInsertsCalled); - ASSERT_TRUE(onUpdateCalled); + ASSERT_FALSE(onInsertsCalledForNss); + ASSERT_TRUE(onUpdateCalledForNss); applier->shutdown(); - _oplogBuffer.shutdown(_opCtx.get()); + _oplogBuffer.shutdown(opCtx.get()); applier->join(); } TEST_F(TenantOplogApplierTest, ApplyInsert_UniqueKey_InsertExisting) { NamespaceString nss = NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar"); - auto uuid = createCollectionWithUuid(_opCtx.get(), nss); + auto opCtx = cc().makeOperationContext(); + auto uuid = createCollectionWithUuid(opCtx.get(), nss); // Create unique key index on the collection. auto indexKey = BSON("data" << 1); auto spec = BSON("v" << int(IndexDescriptor::kLatestIndexVersion) << "key" << indexKey << "name" << (indexKey.firstElementFieldNameStringData() + "_1") << "unique" << true); - createIndex(_opCtx.get(), nss, uuid, spec); + createIndex(opCtx.get(), nss, uuid, spec); ASSERT_OK(getStorageInterface()->insertDocument( - _opCtx.get(), nss, {BSON("_id" << 0 << "data" << 2)}, 0)); + opCtx.get(), nss, {BSON("_id" << 0 << "data" << 2)}, 0)); // Insert an entry that conflicts with the existing document on the indexed field. auto entry = makeOplogEntry(repl::OpTypeEnum::kInsert, nss, uuid, BSON("_id" << 1 << "data" << 2)); - bool onInsertsCalled = false; - _opObserver->onInsertsFn = - [&](OperationContext* opCtx, const NamespaceString&, const std::vector<BSONObj>&) { - onInsertsCalled = true; - }; + bool onInsertsCalledForNss = false; + _opObserver->onInsertsFn = [&](OperationContext* opCtx, + const NamespaceString& onInsertsNss, + const std::vector<BSONObj>&) { + if (onInsertsNss == nss) { + onInsertsCalledForNss = true; + } + }; pushOps({entry}); auto writerPool = makeTenantMigrationWriterPool(); - auto applier = - std::make_shared<TenantOplogApplier>(_migrationUuid, - MigrationProtocolEnum::kMultitenantMigrations, - _tenantId, - OpTime(), - &_oplogBuffer, - _executor, - writerPool.get()); + auto applier = makeTenantMigrationOplogApplier(writerPool.get()); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); // The DuplicateKey error should be ignored and insert should succeed. - ASSERT_TRUE(onInsertsCalled); + ASSERT_TRUE(onInsertsCalledForNss); applier->shutdown(); - _oplogBuffer.shutdown(_opCtx.get()); + _oplogBuffer.shutdown(opCtx.get()); applier->join(); } TEST_F(TenantOplogApplierTest, ApplyInsert_Success) { NamespaceString nss = NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar"); - auto uuid = createCollectionWithUuid(_opCtx.get(), nss); + auto opCtx = cc().makeOperationContext(); + auto uuid = createCollectionWithUuid(opCtx.get(), nss); auto entry = makeInsertOplogEntry(1, nss, uuid); - bool onInsertsCalled = false; - _opObserver->onInsertsFn = - [&](OperationContext* opCtx, const NamespaceString& nss, const std::vector<BSONObj>& docs) { - ASSERT_FALSE(onInsertsCalled); - onInsertsCalled = true; - // TODO Check that (nss.dbName() == _dbName) once the OplogEntry deserializer passes - // "tid" to the NamespaceString constructor - ASSERT_EQUALS(nss.dbName().db(), _dbName.toStringWithTenantId()); - ASSERT_EQUALS(nss.coll(), "bar"); + bool onInsertsCalledForNss = false; + _opObserver->onInsertsFn = [&](OperationContext* opCtx, + const NamespaceString& onInsertsNss, + const std::vector<BSONObj>& docs) { + if (onInsertsNss == nss) { + ASSERT_FALSE(onInsertsCalledForNss); + onInsertsCalledForNss = true; + // TODO Check that (onInsertsNss.dbName() == _dbName) once the OplogEntry deserializer + // passes "tid" to the NamespaceString constructor + ASSERT_EQUALS(onInsertsNss.dbName().db(), _dbName.toStringWithTenantId()); + ASSERT_EQUALS(onInsertsNss.coll(), "bar"); ASSERT_EQUALS(1, docs.size()); ASSERT_BSONOBJ_EQ(docs[0], entry.getObject()); - }; + } + }; pushOps({entry}); auto writerPool = makeTenantMigrationWriterPool(); - auto applier = - std::make_shared<TenantOplogApplier>(_migrationUuid, - MigrationProtocolEnum::kMultitenantMigrations, - _tenantId, - OpTime(), - &_oplogBuffer, - _executor, - writerPool.get()); + auto applier = makeTenantMigrationOplogApplier(writerPool.get()); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); - ASSERT_TRUE(onInsertsCalled); + ASSERT_TRUE(onInsertsCalledForNss); applier->shutdown(); - _oplogBuffer.shutdown(_opCtx.get()); + _oplogBuffer.shutdown(opCtx.get()); applier->join(); } @@ -651,8 +641,9 @@ TEST_F(TenantOplogApplierTest, ApplyInserts_Grouped) { NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar"); NamespaceString nss2 = NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "baz"); - auto uuid1 = createCollectionWithUuid(_opCtx.get(), nss1); - auto uuid2 = createCollectionWithUuid(_opCtx.get(), nss2); + auto opCtx = cc().makeOperationContext(); + auto uuid1 = createCollectionWithUuid(opCtx.get(), nss1); + auto uuid2 = createCollectionWithUuid(opCtx.get(), nss2); std::vector<OplogEntry> entries; bool onInsertsCalledNss1 = false; bool onInsertsCalledNss2 = false; @@ -677,7 +668,7 @@ TEST_F(TenantOplogApplierTest, ApplyInserts_Grouped) { for (int i = 3; i < 6; i++) { ASSERT_BSONOBJ_EQ(docs[i], entries[i + 1].getObject()); } - } else { + } else if (nss == nss2) { ASSERT_EQUALS(nss2, nss); ASSERT_FALSE(onInsertsCalledNss2); onInsertsCalledNss2 = true; @@ -689,274 +680,248 @@ TEST_F(TenantOplogApplierTest, ApplyInserts_Grouped) { // Make sure all ops end up in a single thread so they can be batched. auto writerPool = makeTenantMigrationWriterPool(1); - auto applier = - std::make_shared<TenantOplogApplier>(_migrationUuid, - MigrationProtocolEnum::kMultitenantMigrations, - _tenantId, - OpTime(), - &_oplogBuffer, - _executor, - writerPool.get()); + auto applier = makeTenantMigrationOplogApplier(writerPool.get()); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(entries.back().getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); ASSERT_TRUE(onInsertsCalledNss1); ASSERT_TRUE(onInsertsCalledNss2); applier->shutdown(); - _oplogBuffer.shutdown(_opCtx.get()); + _oplogBuffer.shutdown(opCtx.get()); applier->join(); } TEST_F(TenantOplogApplierTest, ApplyUpdate_MissingDocument) { NamespaceString nss = NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar"); - auto uuid = createCollectionWithUuid(_opCtx.get(), nss); + auto opCtx = cc().makeOperationContext(); + auto uuid = createCollectionWithUuid(opCtx.get(), nss); auto entry = makeOplogEntry(repl::OpTypeEnum::kUpdate, nss, uuid, update_oplog_entry::makeDeltaOplogEntry( BSON(doc_diff::kUpdateSectionFieldName << fromjson("{a: 1}"))), BSON("_id" << 0)); - bool onInsertsCalled = false; - bool onUpdateCalled = false; - _opObserver->onInsertsFn = - [&](OperationContext* opCtx, const NamespaceString& nss, const std::vector<BSONObj>& docs) { - onInsertsCalled = true; - }; - _opObserver->onUpdateFn = [&](OperationContext* opCtx, const OplogUpdateEntryArgs&) { - onUpdateCalled = true; + bool onInsertsCalledForNss = false; + bool onUpdateCalledForNss = false; + _opObserver->onInsertsFn = [&](OperationContext* opCtx, + const NamespaceString& onInsertsNss, + const std::vector<BSONObj>& docs) { + if (onInsertsNss == nss) { + onInsertsCalledForNss = true; + } + }; + _opObserver->onUpdateFn = [&](OperationContext* opCtx, + const OplogUpdateEntryArgs& onUpdateArgs) { + if (onUpdateArgs.coll->ns() == nss) { + onUpdateCalledForNss = true; + } }; pushOps({entry}); auto writerPool = makeTenantMigrationWriterPool(); - auto applier = - std::make_shared<TenantOplogApplier>(_migrationUuid, - MigrationProtocolEnum::kMultitenantMigrations, - _tenantId, - OpTime(), - &_oplogBuffer, - _executor, - writerPool.get()); + auto applier = makeTenantMigrationOplogApplier(writerPool.get()); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); // Updates to missing documents should just be dropped, neither inserted nor updated. - ASSERT_FALSE(onInsertsCalled); - ASSERT_FALSE(onUpdateCalled); + ASSERT_FALSE(onInsertsCalledForNss); + ASSERT_FALSE(onUpdateCalledForNss); applier->shutdown(); - _oplogBuffer.shutdown(_opCtx.get()); + _oplogBuffer.shutdown(opCtx.get()); applier->join(); } TEST_F(TenantOplogApplierTest, ApplyUpdate_Success) { NamespaceString nss = NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar"); - auto uuid = createCollectionWithUuid(_opCtx.get(), nss); - ASSERT_OK(getStorageInterface()->insertDocument(_opCtx.get(), nss, {BSON("_id" << 0)}, 0)); + auto opCtx = cc().makeOperationContext(); + auto uuid = createCollectionWithUuid(opCtx.get(), nss); + ASSERT_OK(getStorageInterface()->insertDocument(opCtx.get(), nss, {BSON("_id" << 0)}, 0)); auto entry = makeOplogEntry(repl::OpTypeEnum::kUpdate, nss, uuid, update_oplog_entry::makeDeltaOplogEntry( BSON(doc_diff::kUpdateSectionFieldName << fromjson("{a: 1}"))), BSON("_id" << 0)); - bool onUpdateCalled = false; - _opObserver->onUpdateFn = [&](OperationContext* opCtx, const OplogUpdateEntryArgs& args) { - onUpdateCalled = true; - ASSERT_EQUALS(nss, args.coll->ns()); - ASSERT_EQUALS(uuid, args.coll->uuid()); + bool onUpdateCalledForNss = false; + _opObserver->onUpdateFn = [&](OperationContext* opCtx, + const OplogUpdateEntryArgs& onUpdateArgs) { + if (onUpdateArgs.coll->ns() == nss) { + onUpdateCalledForNss = true; + ASSERT_EQUALS(nss, onUpdateArgs.coll->ns()); + ASSERT_EQUALS(uuid, onUpdateArgs.coll->uuid()); + } }; pushOps({entry}); auto writerPool = makeTenantMigrationWriterPool(); - auto applier = - std::make_shared<TenantOplogApplier>(_migrationUuid, - MigrationProtocolEnum::kMultitenantMigrations, - _tenantId, - OpTime(), - &_oplogBuffer, - _executor, - writerPool.get()); + auto applier = makeTenantMigrationOplogApplier(writerPool.get()); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); - ASSERT_TRUE(onUpdateCalled); + ASSERT_TRUE(onUpdateCalledForNss); applier->shutdown(); - _oplogBuffer.shutdown(_opCtx.get()); + _oplogBuffer.shutdown(opCtx.get()); applier->join(); } TEST_F(TenantOplogApplierTest, ApplyDelete_DatabaseMissing) { - auto entry = makeOplogEntry( - OpTypeEnum::kDelete, - NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar"), - UUID::gen()); - bool onDeleteCalled = false; - _opObserver->onDeleteFn = - [&](OperationContext* opCtx, const CollectionPtr&, StmtId, const OplogDeleteEntryArgs&) { - onDeleteCalled = true; - }; + NamespaceString nss = + NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar"); + auto entry = makeOplogEntry(OpTypeEnum::kDelete, nss, UUID::gen()); + bool onDeleteCalledForNss = false; + _opObserver->onDeleteFn = [&](OperationContext* opCtx, + const CollectionPtr& coll, + StmtId, + const OplogDeleteEntryArgs&) { + if (coll->ns() == nss) { + onDeleteCalledForNss = true; + } + }; pushOps({entry}); auto writerPool = makeTenantMigrationWriterPool(); - auto applier = - std::make_shared<TenantOplogApplier>(_migrationUuid, - MigrationProtocolEnum::kMultitenantMigrations, - _tenantId, - OpTime(), - &_oplogBuffer, - _executor, - writerPool.get()); + auto applier = makeTenantMigrationOplogApplier(writerPool.get()); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); // Since no database was available, the delete shouldn't actually happen. - ASSERT_FALSE(onDeleteCalled); + ASSERT_FALSE(onDeleteCalledForNss); applier->shutdown(); - _oplogBuffer.shutdown(_opCtx.get()); + auto opCtx = cc().makeOperationContext(); + _oplogBuffer.shutdown(opCtx.get()); applier->join(); } TEST_F(TenantOplogApplierTest, ApplyDelete_CollectionMissing) { - createDatabase(_opCtx.get(), _dbName.toString()); - auto entry = makeOplogEntry( - OpTypeEnum::kDelete, - NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar"), - UUID::gen()); - bool onDeleteCalled = false; - _opObserver->onDeleteFn = - [&](OperationContext* opCtx, const CollectionPtr&, StmtId, const OplogDeleteEntryArgs&) { - onDeleteCalled = true; - }; + auto opCtx = cc().makeOperationContext(); + createDatabase(opCtx.get(), _dbName.toString()); + NamespaceString nss = + NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar"); + auto entry = makeOplogEntry(OpTypeEnum::kDelete, nss, UUID::gen()); + bool onDeleteCalledForNss = false; + _opObserver->onDeleteFn = [&](OperationContext* opCtx, + const CollectionPtr& coll, + StmtId, + const OplogDeleteEntryArgs&) { + if (coll->ns() == nss) { + onDeleteCalledForNss = true; + } + }; pushOps({entry}); auto writerPool = makeTenantMigrationWriterPool(); - auto applier = - std::make_shared<TenantOplogApplier>(_migrationUuid, - MigrationProtocolEnum::kMultitenantMigrations, - _tenantId, - OpTime(), - &_oplogBuffer, - _executor, - writerPool.get()); + auto applier = makeTenantMigrationOplogApplier(writerPool.get()); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); // Since no collection was available, the delete shouldn't actually happen. - ASSERT_FALSE(onDeleteCalled); + ASSERT_FALSE(onDeleteCalledForNss); applier->shutdown(); - _oplogBuffer.shutdown(_opCtx.get()); + _oplogBuffer.shutdown(opCtx.get()); applier->join(); } TEST_F(TenantOplogApplierTest, ApplyDelete_DocumentMissing) { NamespaceString nss = NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar"); - auto uuid = createCollectionWithUuid(_opCtx.get(), nss); + auto opCtx = cc().makeOperationContext(); + auto uuid = createCollectionWithUuid(opCtx.get(), nss); auto entry = makeOplogEntry(OpTypeEnum::kDelete, nss, uuid, BSON("_id" << 0)); - bool onDeleteCalled = false; - _opObserver->onDeleteFn = - [&](OperationContext* opCtx, const CollectionPtr&, StmtId, const OplogDeleteEntryArgs&) { - onDeleteCalled = true; - }; + bool onDeleteCalledForNss = false; + _opObserver->onDeleteFn = [&](OperationContext* opCtx, + const CollectionPtr& coll, + StmtId, + const OplogDeleteEntryArgs&) { + if (coll->ns() == nss) { + onDeleteCalledForNss = true; + } + }; pushOps({entry}); auto writerPool = makeTenantMigrationWriterPool(); - auto applier = - std::make_shared<TenantOplogApplier>(_migrationUuid, - MigrationProtocolEnum::kMultitenantMigrations, - _tenantId, - OpTime(), - &_oplogBuffer, - _executor, - writerPool.get()); + auto applier = makeTenantMigrationOplogApplier(writerPool.get()); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); // Since the document wasn't available, onDelete should not be called. - ASSERT_FALSE(onDeleteCalled); + ASSERT_FALSE(onDeleteCalledForNss); applier->shutdown(); - _oplogBuffer.shutdown(_opCtx.get()); + _oplogBuffer.shutdown(opCtx.get()); applier->join(); } TEST_F(TenantOplogApplierTest, ApplyDelete_Success) { NamespaceString nss = NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar"); - auto uuid = createCollectionWithUuid(_opCtx.get(), nss); - ASSERT_OK(getStorageInterface()->insertDocument(_opCtx.get(), nss, {BSON("_id" << 0)}, 0)); + auto opCtx = cc().makeOperationContext(); + auto uuid = createCollectionWithUuid(opCtx.get(), nss); + ASSERT_OK(getStorageInterface()->insertDocument(opCtx.get(), nss, {BSON("_id" << 0)}, 0)); auto entry = makeOplogEntry(OpTypeEnum::kDelete, nss, uuid, BSON("_id" << 0)); - bool onDeleteCalled = false; + bool onDeleteCalledForNss = false; _opObserver->onDeleteFn = [&](OperationContext* opCtx, const CollectionPtr& coll, StmtId, const OplogDeleteEntryArgs& args) { - onDeleteCalled = true; - ASSERT_TRUE(opCtx); - ASSERT_TRUE(opCtx->lockState()->isDbLockedForMode(nss.dbName(), MODE_IX)); - ASSERT_TRUE(opCtx->lockState()->isCollectionLockedForMode(nss, MODE_IX)); - ASSERT_TRUE(opCtx->writesAreReplicated()); - ASSERT_FALSE(args.fromMigrate); - // TODO SERVER-70007 Check that (nss.dbName() == _dbName) once the OplogEntry deserializer - // passes "tid" to the NamespaceString constructor - ASSERT_EQUALS(nss.dbName().db(), _dbName.toStringWithTenantId()); - ASSERT_EQUALS(nss.coll(), "bar"); - ASSERT_EQUALS(uuid, coll->uuid()); + if (coll->ns() == nss) { + onDeleteCalledForNss = true; + ASSERT_TRUE(opCtx); + ASSERT_TRUE(opCtx->lockState()->isDbLockedForMode(nss.dbName(), MODE_IX)); + ASSERT_TRUE(opCtx->lockState()->isCollectionLockedForMode(nss, MODE_IX)); + ASSERT_TRUE(opCtx->writesAreReplicated()); + ASSERT_FALSE(args.fromMigrate); + // TODO SERVER-70007 Check that (nss.dbName() == _dbName) once the OplogEntry + // deserializer passes "tid" to the NamespaceString constructor + ASSERT_EQUALS(nss.dbName().db(), _dbName.toStringWithTenantId()); + ASSERT_EQUALS(nss.coll(), "bar"); + ASSERT_EQUALS(uuid, coll->uuid()); + } }; pushOps({entry}); auto writerPool = makeTenantMigrationWriterPool(); - auto applier = - std::make_shared<TenantOplogApplier>(_migrationUuid, - MigrationProtocolEnum::kMultitenantMigrations, - _tenantId, - OpTime(), - &_oplogBuffer, - _executor, - writerPool.get()); + auto applier = makeTenantMigrationOplogApplier(writerPool.get()); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); - ASSERT_TRUE(onDeleteCalled); + ASSERT_TRUE(onDeleteCalledForNss); applier->shutdown(); - _oplogBuffer.shutdown(_opCtx.get()); + _oplogBuffer.shutdown(opCtx.get()); applier->join(); } TEST_F(TenantOplogApplierTest, ApplyCreateCollCommand_CollExisting) { NamespaceString nss = NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar"); - auto uuid = createCollectionWithUuid(_opCtx.get(), nss); + auto opCtx = cc().makeOperationContext(); + auto uuid = createCollectionWithUuid(opCtx.get(), nss); auto op = BSON("op" << "c" << "ns" << nss.getCommandNS().ns() << "wall" << Date_t() << "o" << BSON("create" << nss.coll()) << "ts" << Timestamp(1, 1) << "ui" << uuid); - bool applyCmdCalled = false; + bool onCreateCollectionCalledForNss = false; _opObserver->onCreateCollectionFn = [&](OperationContext* opCtx, const CollectionPtr&, const NamespaceString& collNss, const CollectionOptions&, const BSONObj&) { - applyCmdCalled = true; + if (collNss == nss) { + onCreateCollectionCalledForNss = true; + } }; auto entry = OplogEntry(op); pushOps({entry}); auto writerPool = makeTenantMigrationWriterPool(); - auto applier = - std::make_shared<TenantOplogApplier>(_migrationUuid, - MigrationProtocolEnum::kMultitenantMigrations, - _tenantId, - OpTime(), - &_oplogBuffer, - _executor, - writerPool.get()); + auto applier = makeTenantMigrationOplogApplier(writerPool.get()); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); // Since the collection already exists, onCreateCollection should not happen. - ASSERT_FALSE(applyCmdCalled); + ASSERT_FALSE(onCreateCollectionCalledForNss); applier->shutdown(); - _oplogBuffer.shutdown(_opCtx.get()); + _oplogBuffer.shutdown(opCtx.get()); applier->join(); } @@ -965,14 +930,15 @@ TEST_F(TenantOplogApplierTest, ApplyRenameCollCommand_CollExisting) { NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "foo"); NamespaceString nss2 = NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar"); - auto uuid = createCollectionWithUuid(_opCtx.get(), nss2); + auto opCtx = cc().makeOperationContext(); + auto uuid = createCollectionWithUuid(opCtx.get(), nss2); auto op = BSON("op" << "c" << "ns" << nss1.getCommandNS().ns() << "wall" << Date_t() << "o" << BSON("renameCollection" << nss1.ns() << "to" << nss2.ns() << "stayTemp" << false) << "ts" << Timestamp(1, 1) << "ui" << uuid); - bool applyCmdCalled = false; + bool onRenameCollectionCalledForNss = false; _opObserver->onRenameCollectionFn = [&](OperationContext* opCtx, const NamespaceString& fromColl, const NamespaceString& toColl, @@ -980,27 +946,22 @@ TEST_F(TenantOplogApplierTest, ApplyRenameCollCommand_CollExisting) { const boost::optional<UUID>& dropTargetUUID, std::uint64_t numRecords, bool stayTemp) { - applyCmdCalled = true; + if (nss1 == fromColl) { + onRenameCollectionCalledForNss = true; + } }; auto entry = OplogEntry(op); pushOps({entry}); auto writerPool = makeTenantMigrationWriterPool(); - auto applier = - std::make_shared<TenantOplogApplier>(_migrationUuid, - MigrationProtocolEnum::kMultitenantMigrations, - _tenantId, - OpTime(), - &_oplogBuffer, - _executor, - writerPool.get()); + auto applier = makeTenantMigrationOplogApplier(writerPool.get()); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); // Since the collection already has the target name, onRenameCollection should not happen. - ASSERT_FALSE(applyCmdCalled); + ASSERT_FALSE(onRenameCollectionCalledForNss); applier->shutdown(); - _oplogBuffer.shutdown(_opCtx.get()); + _oplogBuffer.shutdown(opCtx.get()); applier->join(); } @@ -1012,43 +973,40 @@ TEST_F(TenantOplogApplierTest, ApplyCreateCollCommand_Success) { << "c" << "ns" << nss.getCommandNS().ns() << "wall" << Date_t() << "o" << BSON("create" << nss.coll()) << "ts" << Timestamp(1, 1) << "ui" << UUID::gen()); - bool applyCmdCalled = false; + bool onCreateCollectionCalledForNss = false; _opObserver->onCreateCollectionFn = [&](OperationContext* opCtx, const CollectionPtr&, const NamespaceString& collNss, const CollectionOptions&, const BSONObj&) { - applyCmdCalled = true; - ASSERT_TRUE(opCtx); - ASSERT_TRUE(opCtx->lockState()->isDbLockedForMode(nss.dbName(), MODE_IX)); - ASSERT_TRUE(opCtx->writesAreReplicated()); - ASSERT_EQUALS(nss, collNss); + if (collNss == nss) { + onCreateCollectionCalledForNss = true; + ASSERT_TRUE(opCtx); + ASSERT_TRUE(opCtx->lockState()->isDbLockedForMode(nss.dbName(), MODE_IX)); + ASSERT_TRUE(opCtx->writesAreReplicated()); + ASSERT_EQUALS(nss, collNss); + } }; auto entry = OplogEntry(op); pushOps({entry}); auto writerPool = makeTenantMigrationWriterPool(); - auto applier = - std::make_shared<TenantOplogApplier>(_migrationUuid, - MigrationProtocolEnum::kMultitenantMigrations, - _tenantId, - OpTime(), - &_oplogBuffer, - _executor, - writerPool.get()); + auto applier = makeTenantMigrationOplogApplier(writerPool.get()); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); - ASSERT_TRUE(applyCmdCalled); + ASSERT_TRUE(onCreateCollectionCalledForNss); applier->shutdown(); - _oplogBuffer.shutdown(_opCtx.get()); + auto opCtx = cc().makeOperationContext(); + _oplogBuffer.shutdown(opCtx.get()); applier->join(); } TEST_F(TenantOplogApplierTest, ApplyCreateIndexesCommand_Success) { NamespaceString nss = NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "t"); - auto uuid = createCollectionWithUuid(_opCtx.get(), nss); + auto opCtx = cc().makeOperationContext(); + auto uuid = createCollectionWithUuid(opCtx.get(), nss); auto op = BSON("op" << "c" @@ -1056,47 +1014,43 @@ TEST_F(TenantOplogApplierTest, ApplyCreateIndexesCommand_Success) { << BSON("createIndexes" << nss.coll() << "v" << 2 << "key" << BSON("a" << 1) << "name" << "a_1") << "ts" << Timestamp(1, 1) << "ui" << uuid); - bool applyCmdCalled = false; + bool onCreateIndexCalledForNss = false; _opObserver->onCreateIndexFn = [&](OperationContext* opCtx, const NamespaceString& collNss, const UUID& collUuid, BSONObj indexDoc, bool fromMigrate) { - ASSERT_FALSE(applyCmdCalled); - applyCmdCalled = true; - ASSERT_TRUE(opCtx->lockState()->isDbLockedForMode(nss.dbName(), MODE_IX)); - ASSERT_TRUE(opCtx->writesAreReplicated()); - ASSERT_BSONOBJ_EQ(indexDoc, - BSON("v" << 2 << "key" << BSON("a" << 1) << "name" - << "a_1")); - ASSERT_EQUALS(nss, collNss); - ASSERT_EQUALS(uuid, collUuid); + if (collNss == nss) { + ASSERT_FALSE(onCreateIndexCalledForNss); + onCreateIndexCalledForNss = true; + ASSERT_TRUE(opCtx->lockState()->isDbLockedForMode(nss.dbName(), MODE_IX)); + ASSERT_TRUE(opCtx->writesAreReplicated()); + ASSERT_BSONOBJ_EQ(indexDoc, + BSON("v" << 2 << "key" << BSON("a" << 1) << "name" + << "a_1")); + ASSERT_EQUALS(nss, collNss); + ASSERT_EQUALS(uuid, collUuid); + } }; auto entry = OplogEntry(op); pushOps({entry}); auto writerPool = makeTenantMigrationWriterPool(); - auto applier = - std::make_shared<TenantOplogApplier>(_migrationUuid, - MigrationProtocolEnum::kMultitenantMigrations, - _tenantId, - OpTime(), - &_oplogBuffer, - _executor, - writerPool.get()); + auto applier = makeTenantMigrationOplogApplier(writerPool.get()); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); - ASSERT_TRUE(applyCmdCalled); + ASSERT_TRUE(onCreateIndexCalledForNss); applier->shutdown(); - _oplogBuffer.shutdown(_opCtx.get()); + _oplogBuffer.shutdown(opCtx.get()); applier->join(); } TEST_F(TenantOplogApplierTest, ApplyStartIndexBuildCommand_Failure) { NamespaceString nss = NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "t"); - auto uuid = createCollectionWithUuid(_opCtx.get(), nss); + auto opCtx = cc().makeOperationContext(); + auto uuid = createCollectionWithUuid(opCtx.get(), nss); auto op = BSON("op" << "c" << "ns" << nss.getCommandNS().ns() << "wall" << Date_t() << "o" @@ -1108,19 +1062,12 @@ TEST_F(TenantOplogApplierTest, ApplyStartIndexBuildCommand_Failure) { pushOps({entry}); auto writerPool = makeTenantMigrationWriterPool(); - auto applier = - std::make_shared<TenantOplogApplier>(_migrationUuid, - MigrationProtocolEnum::kMultitenantMigrations, - _tenantId, - OpTime(), - &_oplogBuffer, - _executor, - writerPool.get()); + auto applier = makeTenantMigrationOplogApplier(writerPool.get()); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_EQUALS(opAppliedFuture.getNoThrow().getStatus().code(), 5434700); applier->shutdown(); - _oplogBuffer.shutdown(_opCtx.get()); + _oplogBuffer.shutdown(opCtx.get()); applier->join(); } @@ -1132,32 +1079,28 @@ TEST_F(TenantOplogApplierTest, ApplyCreateCollCommand_WrongNSS) { << "c" << "ns" << nss.getCommandNS().ns() << "wall" << Date_t() << "o" << BSON("create" << nss.coll()) << "ts" << Timestamp(1, 1) << "ui" << UUID::gen()); - bool applyCmdCalled = false; + bool onCreateCollectionCalledForNss = false; _opObserver->onCreateCollectionFn = [&](OperationContext* opCtx, const CollectionPtr&, const NamespaceString& collNss, const CollectionOptions&, const BSONObj&) { - applyCmdCalled = true; + if (collNss == nss) { + onCreateCollectionCalledForNss = true; + } }; auto entry = OplogEntry(op); pushOps({entry}); auto writerPool = makeTenantMigrationWriterPool(); - auto applier = - std::make_shared<TenantOplogApplier>(_migrationUuid, - MigrationProtocolEnum::kMultitenantMigrations, - _tenantId, - OpTime(), - &_oplogBuffer, - _executor, - writerPool.get()); + auto applier = makeTenantMigrationOplogApplier(writerPool.get()); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_NOT_OK(opAppliedFuture.getNoThrow().getStatus()); - ASSERT_FALSE(applyCmdCalled); + ASSERT_FALSE(onCreateCollectionCalledForNss); applier->shutdown(); - _oplogBuffer.shutdown(_opCtx.get()); + auto opCtx = cc().makeOperationContext(); + _oplogBuffer.shutdown(opCtx.get()); applier->join(); } @@ -1169,79 +1112,71 @@ TEST_F(TenantOplogApplierTest, ApplyCreateCollCommand_WrongNSS_Merge) { << "c" << "ns" << nss.getCommandNS().ns() << "wall" << Date_t() << "o" << BSON("create" << nss.coll()) << "ts" << Timestamp(1, 1) << "ui" << UUID::gen()); - bool applyCmdCalled = false; + bool onCreateCollectionCalledForNss = false; _opObserver->onCreateCollectionFn = [&](OperationContext* opCtx, const CollectionPtr&, const NamespaceString& collNss, const CollectionOptions&, const BSONObj&) { - applyCmdCalled = true; + if (collNss == nss) { + onCreateCollectionCalledForNss = true; + } }; auto entry = OplogEntry(op); pushOps({entry}); auto writerPool = makeTenantMigrationWriterPool(); - auto applier = std::make_shared<TenantOplogApplier>(_migrationUuid, - MigrationProtocolEnum::kShardMerge, - boost::none, - OpTime(), - &_oplogBuffer, - _executor, - writerPool.get()); + auto applier = makeShardMergeOplogApplier(writerPool.get()); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_EQ(opAppliedFuture.getNoThrow().getStatus().code(), ErrorCodes::InvalidTenantId); - ASSERT_FALSE(applyCmdCalled); + ASSERT_FALSE(onCreateCollectionCalledForNss); applier->shutdown(); - _oplogBuffer.shutdown(_opCtx.get()); + auto opCtx = cc().makeOperationContext(); + _oplogBuffer.shutdown(opCtx.get()); applier->join(); } TEST_F(TenantOplogApplierTest, ApplyDropIndexesCommand_IndexNotFound) { NamespaceString nss = NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar"); - auto uuid = createCollectionWithUuid(_opCtx.get(), nss); + auto opCtx = cc().makeOperationContext(); + auto uuid = createCollectionWithUuid(opCtx.get(), nss); auto op = BSON("op" << "c" << "ns" << nss.getCommandNS().ns() << "wall" << Date_t() << "o" << BSON("dropIndexes" << nss.coll() << "index" << "a_1") << "ts" << Timestamp(1, 1) << "ui" << uuid); - bool applyCmdCalled = false; + bool onDropIndexCalledForNss = false; _opObserver->onDropIndexFn = [&](OperationContext* opCtx, - const NamespaceString& nss, + const NamespaceString& onDropIndexNss, const boost::optional<UUID>& uuid, const std::string& indexName, const BSONObj& idxDescriptor) { - applyCmdCalled = true; + onDropIndexCalledForNss = true; }; auto entry = OplogEntry(op); pushOps({entry}); auto writerPool = makeTenantMigrationWriterPool(); - auto applier = - std::make_shared<TenantOplogApplier>(_migrationUuid, - MigrationProtocolEnum::kMultitenantMigrations, - _tenantId, - OpTime(), - &_oplogBuffer, - _executor, - writerPool.get()); + auto applier = makeTenantMigrationOplogApplier(writerPool.get()); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); // The IndexNotFound error should be ignored and drop index should not happen. - ASSERT_FALSE(applyCmdCalled); + ASSERT_FALSE(onDropIndexCalledForNss); applier->shutdown(); - _oplogBuffer.shutdown(_opCtx.get()); + _oplogBuffer.shutdown(opCtx.get()); applier->join(); } TEST_F(TenantOplogApplierTest, ApplyCollModCommand_IndexNotFound) { NamespaceString nss = NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar"); - auto uuid = createCollectionWithUuid(_opCtx.get(), nss); + auto opCtx = cc().makeOperationContext(); + auto uuid = createCollectionWithUuid(opCtx.get(), nss); auto op = BSON("op" << "c" << "ns" << nss.getCommandNS().ns() << "wall" << Date_t() << "o" @@ -1250,40 +1185,36 @@ TEST_F(TenantOplogApplierTest, ApplyCollModCommand_IndexNotFound) { << "data_1" << "hidden" << true)) << "ts" << Timestamp(1, 1) << "ui" << uuid); - bool applyCmdCalled = false; + bool onCollModCalledForNss = false; _opObserver->onCollModFn = [&](OperationContext* opCtx, - const NamespaceString& nss, + const NamespaceString& onCollModNss, const UUID& uuid, const BSONObj& collModCmd, const CollectionOptions& oldCollOptions, boost::optional<IndexCollModInfo> indexInfo) { - applyCmdCalled = true; + if (onCollModNss == nss) { + onCollModCalledForNss = true; + } }; auto entry = OplogEntry(op); pushOps({entry}); auto writerPool = makeTenantMigrationWriterPool(); - auto applier = - std::make_shared<TenantOplogApplier>(_migrationUuid, - MigrationProtocolEnum::kMultitenantMigrations, - _tenantId, - OpTime(), - &_oplogBuffer, - _executor, - writerPool.get()); + auto applier = makeTenantMigrationOplogApplier(writerPool.get()); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); // The IndexNotFound error should be ignored and collMod should not happen. - ASSERT_FALSE(applyCmdCalled); + ASSERT_FALSE(onCollModCalledForNss); applier->shutdown(); - _oplogBuffer.shutdown(_opCtx.get()); + _oplogBuffer.shutdown(opCtx.get()); applier->join(); } TEST_F(TenantOplogApplierTest, ApplyCollModCommand_CollectionMissing) { - createDatabase(_opCtx.get(), _dbName.toString()); + auto opCtx = cc().makeOperationContext(); + createDatabase(opCtx.get(), _dbName.toString()); NamespaceString nss = NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar"); UUID uuid(UUID::gen()); @@ -1295,65 +1226,57 @@ TEST_F(TenantOplogApplierTest, ApplyCollModCommand_CollectionMissing) { << "data_1" << "hidden" << true)) << "ts" << Timestamp(1, 1) << "ui" << uuid); - bool applyCmdCalled = false; + bool onCollModCalledForNss = false; _opObserver->onCollModFn = [&](OperationContext* opCtx, - const NamespaceString& nss, + const NamespaceString& onCollModNss, const UUID& uuid, const BSONObj& collModCmd, const CollectionOptions& oldCollOptions, boost::optional<IndexCollModInfo> indexInfo) { - applyCmdCalled = true; + if (onCollModNss == nss) { + onCollModCalledForNss = true; + } }; auto entry = OplogEntry(op); pushOps({entry}); auto writerPool = makeTenantMigrationWriterPool(); - auto applier = - std::make_shared<TenantOplogApplier>(_migrationUuid, - MigrationProtocolEnum::kMultitenantMigrations, - _tenantId, - OpTime(), - &_oplogBuffer, - _executor, - writerPool.get()); + auto applier = makeTenantMigrationOplogApplier(writerPool.get()); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); // The NamespaceNotFound error should be ignored and collMod should not happen. - ASSERT_FALSE(applyCmdCalled); + ASSERT_FALSE(onCollModCalledForNss); applier->shutdown(); - _oplogBuffer.shutdown(_opCtx.get()); + _oplogBuffer.shutdown(opCtx.get()); applier->join(); } TEST_F(TenantOplogApplierTest, ApplyCRUD_WrongNSS) { // Should not be able to apply a CRUD operation to a namespace not belonging to us. NamespaceString nss = NamespaceString::createNamespaceString_forTest("notmytenant", "bar"); - auto uuid = createCollectionWithUuid(_opCtx.get(), nss); + auto opCtx = cc().makeOperationContext(); + auto uuid = createCollectionWithUuid(opCtx.get(), nss); auto entry = makeInsertOplogEntry(1, nss, uuid); - bool onInsertsCalled = false; - _opObserver->onInsertsFn = - [&](OperationContext* opCtx, const NamespaceString& nss, const std::vector<BSONObj>& docs) { - onInsertsCalled = true; - }; + bool onInsertsCalledForNss = false; + _opObserver->onInsertsFn = [&](OperationContext* opCtx, + const NamespaceString& onInsertsNss, + const std::vector<BSONObj>& docs) { + if (onInsertsNss == nss) { + onInsertsCalledForNss = true; + } + }; pushOps({entry}); auto writerPool = makeTenantMigrationWriterPool(); - auto applier = - std::make_shared<TenantOplogApplier>(_migrationUuid, - MigrationProtocolEnum::kMultitenantMigrations, - _tenantId, - OpTime(), - &_oplogBuffer, - _executor, - writerPool.get()); + auto applier = makeTenantMigrationOplogApplier(writerPool.get()); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_NOT_OK(opAppliedFuture.getNoThrow().getStatus()); - ASSERT_FALSE(onInsertsCalled); + ASSERT_FALSE(onInsertsCalledForNss); applier->shutdown(); - _oplogBuffer.shutdown(_opCtx.get()); + _oplogBuffer.shutdown(opCtx.get()); applier->join(); } @@ -1363,29 +1286,27 @@ TEST_F(TenantOplogApplierTest, ApplyCRUD_WrongNSS_Merge) { // Should not be able to apply a CRUD operation to a namespace not belonging to us. NamespaceString nss = NamespaceString::createNamespaceString_forTest(DatabaseName(invalidTenant, "test"), "bar"); - auto uuid = createCollectionWithUuid(_opCtx.get(), nss); + auto opCtx = cc().makeOperationContext(); + auto uuid = createCollectionWithUuid(opCtx.get(), nss); auto entry = makeInsertOplogEntry(1, nss, uuid); - bool onInsertsCalled = false; - _opObserver->onInsertsFn = - [&](OperationContext* opCtx, const NamespaceString& nss, const std::vector<BSONObj>& docs) { - onInsertsCalled = true; - }; + bool onInsertsCalledForNss = false; + _opObserver->onInsertsFn = [&](OperationContext* opCtx, + const NamespaceString& onInsertsNss, + const std::vector<BSONObj>& docs) { + if (onInsertsNss == nss) { + onInsertsCalledForNss = true; + } + }; pushOps({entry}); auto writerPool = makeTenantMigrationWriterPool(); - auto applier = std::make_shared<TenantOplogApplier>(_migrationUuid, - MigrationProtocolEnum::kShardMerge, - boost::none, - OpTime(), - &_oplogBuffer, - _executor, - writerPool.get()); + auto applier = makeShardMergeOplogApplier(writerPool.get()); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_EQ(opAppliedFuture.getNoThrow().getStatus().code(), ErrorCodes::InvalidTenantId); - ASSERT_FALSE(onInsertsCalled); + ASSERT_FALSE(onInsertsCalledForNss); applier->shutdown(); - _oplogBuffer.shutdown(_opCtx.get()); + _oplogBuffer.shutdown(opCtx.get()); applier->join(); } @@ -1394,30 +1315,25 @@ TEST_F(TenantOplogApplierTest, ApplyCRUD_WrongUUID) { // we claim it does in the nss field. NamespaceString nss = NamespaceString::createNamespaceString_forTest("notmytenant", "bar"); NamespaceString nss_to_apply = NamespaceString::createNamespaceString_forTest(_dbName, "bar"); - auto uuid = createCollectionWithUuid(_opCtx.get(), nss); + auto opCtx = cc().makeOperationContext(); + auto uuid = createCollectionWithUuid(opCtx.get(), nss); auto entry = makeInsertOplogEntry(1, nss_to_apply, uuid); - bool onInsertsCalled = false; - _opObserver->onInsertsFn = - [&](OperationContext* opCtx, const NamespaceString& nss, const std::vector<BSONObj>& docs) { - onInsertsCalled = true; - }; + bool onInsertsCalledForNss = false; + _opObserver->onInsertsFn = [&](OperationContext* opCtx, + const NamespaceString& onInsertsNss, + const std::vector<BSONObj>& docs) { + onInsertsCalledForNss = true; + }; pushOps({entry}); auto writerPool = makeTenantMigrationWriterPool(); - auto applier = - std::make_shared<TenantOplogApplier>(_migrationUuid, - MigrationProtocolEnum::kMultitenantMigrations, - _tenantId, - OpTime(), - &_oplogBuffer, - _executor, - writerPool.get()); + auto applier = makeTenantMigrationOplogApplier(writerPool.get()); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_NOT_OK(opAppliedFuture.getNoThrow().getStatus()); - ASSERT_FALSE(onInsertsCalled); + ASSERT_FALSE(onInsertsCalledForNss); applier->shutdown(); - _oplogBuffer.shutdown(_opCtx.get()); + _oplogBuffer.shutdown(opCtx.get()); applier->join(); } @@ -1427,14 +1343,7 @@ TEST_F(TenantOplogApplierTest, ApplyNoop_Success) { pushOps(srcOps); auto writerPool = makeTenantMigrationWriterPool(); - auto applier = - std::make_shared<TenantOplogApplier>(_migrationUuid, - MigrationProtocolEnum::kMultitenantMigrations, - _tenantId, - OpTime(), - &_oplogBuffer, - _executor, - writerPool.get()); + auto applier = makeTenantMigrationOplogApplier(writerPool.get()); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(srcOps[0].getOpTime()); auto futureRes = opAppliedFuture.getNoThrow(); @@ -1447,7 +1356,8 @@ TEST_F(TenantOplogApplierTest, ApplyNoop_Success) { ASSERT_EQUALS(futureRes.getValue().recipientOpTime, entries[0].getOpTime()); applier->shutdown(); - _oplogBuffer.shutdown(_opCtx.get()); + auto opCtx = cc().makeOperationContext(); + _oplogBuffer.shutdown(opCtx.get()); applier->join(); } @@ -1457,14 +1367,7 @@ TEST_F(TenantOplogApplierTest, ApplyResumeTokenNoop_Success) { pushOps(srcOps); auto writerPool = makeTenantMigrationWriterPool(); - auto applier = - std::make_shared<TenantOplogApplier>(_migrationUuid, - MigrationProtocolEnum::kMultitenantMigrations, - _tenantId, - OpTime(), - &_oplogBuffer, - _executor, - writerPool.get()); + auto applier = makeTenantMigrationOplogApplier(writerPool.get()); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(srcOps[0].getOpTime()); auto futureRes = opAppliedFuture.getNoThrow(); @@ -1477,7 +1380,8 @@ TEST_F(TenantOplogApplierTest, ApplyResumeTokenNoop_Success) { ASSERT_EQUALS(futureRes.getValue().recipientOpTime, OpTime()); applier->shutdown(); - _oplogBuffer.shutdown(_opCtx.get()); + auto opCtx = cc().makeOperationContext(); + _oplogBuffer.shutdown(opCtx.get()); applier->join(); } @@ -1491,14 +1395,7 @@ TEST_F(TenantOplogApplierTest, ApplyInsertThenResumeTokenNoopInDifferentBatch_Su pushOps(srcOps); auto writerPool = makeTenantMigrationWriterPool(); - auto applier = - std::make_shared<TenantOplogApplier>(_migrationUuid, - MigrationProtocolEnum::kMultitenantMigrations, - _tenantId, - OpTime(), - &_oplogBuffer, - _executor, - writerPool.get()); + auto applier = makeTenantMigrationOplogApplier(writerPool.get()); tenantApplierBatchSizeBytes.store(100 * 1024 /* bytes */); tenantApplierBatchSizeOps.store(1 /* ops */); @@ -1516,7 +1413,8 @@ TEST_F(TenantOplogApplierTest, ApplyInsertThenResumeTokenNoopInDifferentBatch_Su ASSERT_EQUALS(futureRes.getValue().recipientOpTime, entries[0].getOpTime()); applier->shutdown(); - _oplogBuffer.shutdown(_opCtx.get()); + auto opCtx = cc().makeOperationContext(); + _oplogBuffer.shutdown(opCtx.get()); applier->join(); } @@ -1530,14 +1428,7 @@ TEST_F(TenantOplogApplierTest, ApplyResumeTokenNoopThenInsertInSameBatch_Success pushOps(srcOps); auto writerPool = makeTenantMigrationWriterPool(); - auto applier = - std::make_shared<TenantOplogApplier>(_migrationUuid, - MigrationProtocolEnum::kMultitenantMigrations, - _tenantId, - OpTime(), - &_oplogBuffer, - _executor, - writerPool.get()); + auto applier = makeTenantMigrationOplogApplier(writerPool.get()); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(srcOps[1].getOpTime()); auto futureRes = opAppliedFuture.getNoThrow(); @@ -1551,7 +1442,8 @@ TEST_F(TenantOplogApplierTest, ApplyResumeTokenNoopThenInsertInSameBatch_Success ASSERT_EQUALS(futureRes.getValue().recipientOpTime, entries[0].getOpTime()); applier->shutdown(); - _oplogBuffer.shutdown(_opCtx.get()); + auto opCtx = cc().makeOperationContext(); + _oplogBuffer.shutdown(opCtx.get()); applier->join(); } @@ -1566,14 +1458,7 @@ TEST_F(TenantOplogApplierTest, ApplyResumeTokenInsertThenNoopSameTimestamp_Succe ASSERT_EQ(srcOps[0].getOpTime(), srcOps[1].getOpTime()); auto writerPool = makeTenantMigrationWriterPool(); - auto applier = - std::make_shared<TenantOplogApplier>(_migrationUuid, - MigrationProtocolEnum::kMultitenantMigrations, - _tenantId, - OpTime(), - &_oplogBuffer, - _executor, - writerPool.get()); + auto applier = makeTenantMigrationOplogApplier(writerPool.get()); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(srcOps[1].getOpTime()); auto futureRes = opAppliedFuture.getNoThrow(); @@ -1587,7 +1472,8 @@ TEST_F(TenantOplogApplierTest, ApplyResumeTokenInsertThenNoopSameTimestamp_Succe ASSERT_EQUALS(futureRes.getValue().recipientOpTime, entries[0].getOpTime()); applier->shutdown(); - _oplogBuffer.shutdown(_opCtx.get()); + auto opCtx = cc().makeOperationContext(); + _oplogBuffer.shutdown(opCtx.get()); applier->join(); } @@ -1601,14 +1487,7 @@ TEST_F(TenantOplogApplierTest, ApplyResumeTokenInsertThenNoop_Success) { pushOps(srcOps); auto writerPool = makeTenantMigrationWriterPool(); - auto applier = - std::make_shared<TenantOplogApplier>(_migrationUuid, - MigrationProtocolEnum::kMultitenantMigrations, - _tenantId, - OpTime(), - &_oplogBuffer, - _executor, - writerPool.get()); + auto applier = makeTenantMigrationOplogApplier(writerPool.get()); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(srcOps[1].getOpTime()); auto futureRes = opAppliedFuture.getNoThrow(); @@ -1622,22 +1501,24 @@ TEST_F(TenantOplogApplierTest, ApplyResumeTokenInsertThenNoop_Success) { ASSERT_EQUALS(futureRes.getValue().recipientOpTime, entries[0].getOpTime()); applier->shutdown(); - _oplogBuffer.shutdown(_opCtx.get()); + auto opCtx = cc().makeOperationContext(); + _oplogBuffer.shutdown(opCtx.get()); applier->join(); } TEST_F(TenantOplogApplierTest, ApplyInsert_MultiKeyIndex) { - createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); + auto opCtx = cc().makeOperationContext(); + createCollectionWithUuid(opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); NamespaceString indexedNss(_dbName.toStringWithTenantId(), "indexedColl"); NamespaceString nonIndexedNss(_dbName.toStringWithTenantId(), "nonIndexedColl"); - auto indexedCollUUID = createCollectionWithUuid(_opCtx.get(), indexedNss); - createCollection(_opCtx.get(), nonIndexedNss, CollectionOptions()); + auto indexedCollUUID = createCollectionWithUuid(opCtx.get(), indexedNss); + createCollection(opCtx.get(), nonIndexedNss, CollectionOptions()); // Create index on the collection. auto indexKey = BSON("val" << 1); auto spec = BSON("v" << int(IndexDescriptor::kLatestIndexVersion) << "key" << indexKey << "name" << "val_1"); - createIndex(_opCtx.get(), indexedNss, indexedCollUUID, spec); + createIndex(opCtx.get(), indexedNss, indexedCollUUID, spec); const BSONObj multiKeyDoc = BSON("_id" << 1 << "val" << BSON_ARRAY(1 << 2)); const BSONObj singleKeyDoc = BSON("_id" << 2 << "val" << 1); @@ -1653,25 +1534,209 @@ TEST_F(TenantOplogApplierTest, ApplyInsert_MultiKeyIndex) { // writer worker thread to ensure that the same opCtx is used. auto writerPool = makeTenantMigrationWriterPool(1); - auto applier = - std::make_shared<TenantOplogApplier>(_migrationUuid, - MigrationProtocolEnum::kMultitenantMigrations, - _tenantId, - OpTime(), - &_oplogBuffer, - _executor, - writerPool.get()); + auto applier = makeTenantMigrationOplogApplier(writerPool.get()); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(unindexedOp.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); - ASSERT_TRUE(docExists(_opCtx.get(), indexedNss, multiKeyDoc)); - ASSERT_TRUE(docExists(_opCtx.get(), nonIndexedNss, singleKeyDoc)); + ASSERT_TRUE(docExists(opCtx.get(), indexedNss, multiKeyDoc)); + ASSERT_TRUE(docExists(opCtx.get(), nonIndexedNss, singleKeyDoc)); + + applier->shutdown(); + _oplogBuffer.shutdown(opCtx.get()); + applier->join(); +} + +TEST_F(TenantOplogApplierTest, StoresOplogApplierProgress) { + auto nss = + NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar"); + auto entry1 = makeInsertOplogEntry(1, nss, UUID::gen()); + pushOps({entry1}); + auto writerPool = makeTenantMigrationWriterPool(); + + auto progressNss = + NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "progress"); + + auto applier = + makeTenantMigrationOplogApplier(writerPool.get(), OpTime(), OpTime(), progressNss); + ASSERT_OK(applier->startup()); + auto opAppliedFuture = applier->getNotificationForOpTime(entry1.getOpTime()); + ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); + + auto opCtx = cc().makeOperationContext(); + auto progress = applier->getStoredProgress(opCtx.get()); + ASSERT_TRUE(progress.has_value()); + ASSERT_EQ(progress->getDonorOpTime(), entry1.getOpTime()); + + auto entry2 = makeInsertOplogEntry(2, nss, UUID::gen()); + pushOps({entry2}); + + opAppliedFuture = applier->getNotificationForOpTime(entry2.getOpTime()); + ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); + + progress = applier->getStoredProgress(opCtx.get()); + ASSERT_TRUE(progress.has_value()); + ASSERT_EQ(progress->getDonorOpTime(), entry2.getOpTime()); + + ASSERT_EQ(applier->getNumOpsApplied(), 2); + + applier->shutdown(); + _oplogBuffer.shutdown(opCtx.get()); + applier->join(); +} + +TEST_F(TenantOplogApplierTest, ResumesOplogApplierProgress) { + auto nss = + NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar"); + auto entry1 = makeInsertOplogEntry(1, nss, UUID::gen()); + auto entry2 = makeInsertOplogEntry(2, nss, UUID::gen()); + pushOps({entry1, entry2}); + auto writerPool = makeTenantMigrationWriterPool(); + + auto progressNss = + NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "progress"); + + auto applier = makeTenantMigrationOplogApplier( + writerPool.get(), OpTime(Timestamp(1, 0), 0), OpTime(Timestamp(1, 0), 0), progressNss); + ASSERT_OK(applier->startup()); + auto opAppliedFuture = applier->getNotificationForOpTime(entry1.getOpTime()); + ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); + + opAppliedFuture = applier->getNotificationForOpTime(entry2.getOpTime()); + ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); + + ASSERT_EQ(applier->getNumOpsApplied(), 2); + + { + auto opCtx = cc().makeOperationContext(); + _oplogBuffer.clear(opCtx.get()); + } + + applier->shutdown(); + applier->join(); + + auto entry3 = makeInsertOplogEntry(3, nss, UUID::gen()); + pushOps({entry1, entry2, entry3}); + + applier = makeTenantMigrationOplogApplier(writerPool.get(), + OpTime(Timestamp(1, 0), 0), + OpTime(Timestamp(1, 0), 0), + progressNss, + true); + ASSERT_OK(applier->startup()); + + opAppliedFuture = applier->getNotificationForOpTime(entry3.getOpTime()); + ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); + + ASSERT_EQ(applier->getNumOpsApplied(), 1); + + applier->shutdown(); + applier->join(); + + auto opCtx = cc().makeOperationContext(); + _oplogBuffer.shutdown(opCtx.get()); +} + +TEST_F(TenantOplogApplierTest, ResumeOplogApplierDoesNotReApplyPreviouslyAppliedRetryableWrites) { + { + auto opCtx = cc().makeOperationContext(); + createCollectionWithUuid(opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); + DBDirectClient client(opCtx.get()); + client.createIndexes(NamespaceString::kSessionTransactionsTableNamespace, + {MongoDSessionCatalog::getConfigTxnPartialIndexSpec()}); + } + + auto getOplogEntryCount = [&]() { + auto opCtx = cc().makeOperationContext(); + OplogInterfaceLocal oplog(opCtx.get()); + auto oplogIter = oplog.makeIterator(); + auto result = oplogIter->next(); + auto oplogEntryCount = 0; + while (result.isOK()) { + oplogEntryCount++; + result = oplogIter->next(); + } + return oplogEntryCount; + }; + + auto nss = + NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar"); + + const auto sessionId = makeLogicalSessionIdForTest(); + OperationSessionInfo sessionInfo; + sessionInfo.setSessionId(sessionId); + sessionInfo.setTxnNumber(0); + auto retryableWrite = makeOplogEntry({Timestamp(2, 0), 1}, + OpTypeEnum::kInsert, + nss, + BSON("_id" << 1), + boost::none, + sessionInfo, + Date_t::now(), + {0}); + + pushOps({retryableWrite}); + + auto writerPool = makeTenantMigrationWriterPool(); + + auto progressNss = + NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "progress"); + + auto applier = makeTenantMigrationOplogApplier( + writerPool.get(), OpTime(Timestamp(1, 0), 0), OpTime(Timestamp(1, 0), 0), progressNss); + ASSERT_OK(applier->startup()); + + auto opAppliedFuture = applier->getNotificationForOpTime(retryableWrite.getOpTime()); + ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); + + ASSERT_EQ(applier->getNumOpsApplied(), 1); + + // The retryable write noop entry should have been logged. + ASSERT_EQ(getOplogEntryCount(), 1); + + applier->shutdown(); + applier->join(); + + // Delete progress collection documents to simulate the loss or absensce of progress data. + { + auto opCtx = cc().makeOperationContext(); + auto storageInterface = StorageInterface::get(opCtx.get()); + ASSERT_OK(storageInterface->deleteDocuments(opCtx.get(), + progressNss, + boost::none, + StorageInterface::ScanDirection::kForward, + {}, + BoundInclusion::kIncludeStartKeyOnly, + 1U)); + _oplogBuffer.clear(opCtx.get()); + } + + pushOps({retryableWrite}); + + // Create a new TenantOplogApplier with resume enabled. The first retryable write will be + // re-applied. + applier = makeTenantMigrationOplogApplier(writerPool.get(), + OpTime(Timestamp(1, 0), 0), + OpTime(Timestamp(1, 0), 0), + progressNss, + true); + ASSERT_OK(applier->startup()); + + opAppliedFuture = applier->getNotificationForOpTime(retryableWrite.getOpTime()); + ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); + + ASSERT_EQ(applier->getNumOpsApplied(), 1); + + // The retryable write noop entry should not have been logged again, so the count should be the + // same. + ASSERT_EQ(getOplogEntryCount(), 1); applier->shutdown(); - _oplogBuffer.shutdown(_opCtx.get()); applier->join(); + + auto opCtx = cc().makeOperationContext(); + _oplogBuffer.shutdown(opCtx.get()); } } // namespace repl |