diff options
5 files changed, 215 insertions, 108 deletions
diff --git a/jstests/replsets/tenant_migration_recipient_fetches_retryable_writes_oplog_entries.js b/jstests/replsets/tenant_migration_recipient_fetches_retryable_writes_oplog_entries.js index c9016fc6b2e..518aada649b 100644 --- a/jstests/replsets/tenant_migration_recipient_fetches_retryable_writes_oplog_entries.js +++ b/jstests/replsets/tenant_migration_recipient_fetches_retryable_writes_oplog_entries.js @@ -2,14 +2,9 @@ * Tests that the tenant migration recipient correctly fetches retryable writes oplog entries * and adds them to its oplog buffer. * - * TODO SERVER-63517: incompatible_with_shard_merge, this tests specific implementation - * details related to MT Migrations. Retryable write behavior is tested in various other - * tests for shard merge. - * * @tags: [ * incompatible_with_macos, * incompatible_with_windows_tls, - * incompatible_with_shard_merge, * requires_majority_read_concern, * requires_persistence, * serverless, @@ -24,26 +19,25 @@ load("jstests/libs/fail_point_util.js"); // For configureFailPoint(). load("jstests/libs/parallelTester.js"); // For Thread. const kMaxBatchSize = 1; -const kParams = { - // Set the delay before a donor state doc is garbage collected to be short to speed up - // the test. - tenantMigrationGarbageCollectionDelayMS: 3 * 1000, - - // Set the TTL monitor to run at a smaller interval to speed up the test. - ttlMonitorSleepSecs: 1, - - // Decrease internal max batch size so we can still show writes are batched without inserting - // hundreds of documents. - internalInsertMaxBatchSize: kMaxBatchSize, -}; function runTest({storeFindAndModifyImagesInSideCollection = false}) { - const tenantMigrationTest = new TenantMigrationTest( - {name: jsTestName(), sharedOptions: {nodes: 1, setParameter: kParams}}); + const tenantMigrationTest = new TenantMigrationTest({ + name: jsTestName(), + quickGarbageCollection: true, + sharedOptions: { + nodes: 1, + setParameter: { + // Decrease internal max batch size so we can still show writes are batched without + // inserting hundreds of documents. + internalInsertMaxBatchSize: kMaxBatchSize, + } + } + }); const kTenantId = "testTenantId"; - const kDbName = kTenantId + "_" + - "testDb"; + const kTenantId2 = "testTenantId2"; + const kDbName = `${kTenantId}_testDb`; + const kDbName2 = `${kTenantId2}_testDb`; const kCollName = "testColl"; const donorRst = tenantMigrationTest.getDonorRst(); @@ -57,10 +51,6 @@ function runTest({storeFindAndModifyImagesInSideCollection = false}) { recipientPrimary.adminCommand(setParam); const rsConn = new Mongo(donorRst.getURL()); - // Create a collection on a database that isn't prefixed with `kTenantId`. - const session = rsConn.startSession({retryWrites: true}); - const collection = session.getDatabase("test")["collection"]; - const tenantSession = rsConn.startSession({retryWrites: true}); const tenantCollection = tenantSession.getDatabase(kDbName)[kCollName]; @@ -70,11 +60,18 @@ function runTest({storeFindAndModifyImagesInSideCollection = false}) { const tenantSession3 = rsConn.startSession({retryWrites: true}); const tenantCollection3 = tenantSession3.getDatabase(kDbName)[kCollName]; + // Create a collection on a database that isn't prefixed with `kTenantId`. + const secondTenantSession = rsConn.startSession({retryWrites: true}); + const secondTenantCollection = secondTenantSession.getDatabase(kDbName2)[kCollName]; + + const isShardMergeEnabled = + TenantMigrationUtil.isShardMergeEnabled(donorRst.getPrimary().getDB("adminDB")); + jsTestLog("Run retryable writes prior to the migration"); // Retryable insert, but not on correct tenant database. This write should not show up in the - // oplog buffer. - assert.commandWorked(collection.insert({_id: "retryableWrite1"})); + // oplog buffer for the tenant migration protocol. It will however for the shard merge protocol. + assert.commandWorked(secondTenantCollection.insert({_id: "retryableWrite1"})); // The following retryable writes should occur on the correct tenant database, so they should // all be retrieved by the pipeline. @@ -166,18 +163,20 @@ function runTest({storeFindAndModifyImagesInSideCollection = false}) { const recipientOplogBuffer = recipientPrimary.getDB("config")[kOplogBufferNS]; jsTestLog({"oplog buffer ns": kOplogBufferNS}); - // We expect to see retryableWrite2, retryableWrite3, retryableWrite3's postImage, and - // bulkRetryableWrite0 (bulk insert batch size is 1). - const findRes = recipientOplogBuffer.find().toArray(); - assert.eq(findRes.length, 4, `Incorrect number of oplog entries in buffer: ${tojson(findRes)}`); + // We expect to see retryableWrite2, retryableWrite3, retryableWrite3's postImage, + // and bulkRetryableWrite0 (bulk insert batch size is 1). assert.eq(1, recipientOplogBuffer.find({"entry.o._id": "retryableWrite2"}).itcount()); assert.eq(1, recipientOplogBuffer.find({"entry.o._id": "retryableWrite3"}).itcount()); assert.eq(1, recipientOplogBuffer.find({"entry.o2._id": "retryableWrite3"}).itcount()); assert.eq(1, recipientOplogBuffer.find({"entry.o._id": "bulkRetryableWrite0"}).itcount()); + // Only for shardMerge we expect to have the other tenantId. Otherwise only for the provided + // tenantId. + assert.eq(isShardMergeEnabled ? 1 : 0, + recipientOplogBuffer.find({"entry.o._id": "retryableWrite1"}).itcount()); + // Ensure the retryable write oplog entries that should not be in `kOplogBufferNS` are in fact // not. - assert.eq(0, recipientOplogBuffer.find({"entry.o._id": "retryableWrite1"}).itcount()); assert.eq(0, recipientOplogBuffer.find({"entry.o._id": "retryableWrite4"}).itcount()); assert.eq(0, recipientOplogBuffer.find({"entry.o2._id": "retryableWrite4"}).itcount()); assert.eq(0, recipientOplogBuffer.find({"entry.o._id": "bulkRetryableWrite1"}).itcount()); diff --git a/jstests/replsets/tenant_migration_shard_merge_recipient_fetches_retryable_writes.js b/jstests/replsets/tenant_migration_shard_merge_recipient_fetches_retryable_writes.js deleted file mode 100644 index 0c6712f7fc2..00000000000 --- a/jstests/replsets/tenant_migration_shard_merge_recipient_fetches_retryable_writes.js +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Tests that the shard merge recipient correctly fetches retryable writes. - * - * @tags: [ - * incompatible_with_macos, - * incompatible_with_windows_tls, - * featureFlagShardMerge, - * requires_majority_read_concern, - * requires_persistence, - * serverless, - * ] - */ -(function() { -"use strict"; - -load("jstests/replsets/libs/tenant_migration_test.js"); -load("jstests/libs/uuid_util.js"); // For extractUUIDFromObject(). - -const kParams = { - ttlMonitorSleepSecs: 1, -}; - -const tenantMigrationTest = new TenantMigrationTest({ - name: jsTestName(), - sharedOptions: {nodes: 1, setParameter: kParams}, - quickGarbageCollection: true -}); - -const kTenantId = "testTenantId"; -const tenantDB = tenantMigrationTest.tenantDB(kTenantId, "database"); - -const donorRst = tenantMigrationTest.getDonorRst(); -const donorPrimary = tenantMigrationTest.getDonorPrimary(); -const recipientPrimary = tenantMigrationTest.getRecipientPrimary(); - -jsTestLog("Run retryable write prior to the migration"); - -const lsid = UUID(); -const cmd = { - insert: "collection", - documents: [{_id: 1}, {_id: 2}], - ordered: false, - lsid: {id: lsid}, - txnNumber: NumberLong(123), -}; - -assert.commandWorked(donorPrimary.getDB(tenantDB).runCommand(cmd)); -assert.eq(2, donorPrimary.getDB(tenantDB).collection.find().itcount()); - -const migrationId = UUID(); -const migrationOpts = { - migrationIdString: extractUUIDFromObject(migrationId), - tenantId: kTenantId, -}; - -jsTestLog(`Starting migration: ${tojson(migrationOpts)}`); -TenantMigrationTest.assertCommitted(tenantMigrationTest.runMigration(migrationOpts)); - -const {ok, n} = assert.commandWorked(recipientPrimary.getDB(tenantDB).runCommand(cmd)); -assert.eq(1, ok); -assert.eq(2, n); -assert.eq(2, recipientPrimary.getDB(tenantDB).collection.find().itcount()); - -tenantMigrationTest.stop(); -})(); diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp index 3e58a86373f..48d514339eb 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp @@ -1557,12 +1557,18 @@ TenantMigrationRecipientService::Instance::_fetchRetryableWritesOplogBeforeStart "tenantId"_attr = getTenantId(), "migrationId"_attr = getMigrationUUID()); - // Fetch the oplog chains of all retryable writes that occurred before startFetchingTimestamp - // on this tenant. - auto serializedPipeline = - tenant_migration_util::createRetryableWritesOplogFetchingPipelineForTenantMigrations( - expCtx, startFetchingTimestamp, getTenantId()) - ->serializeToBson(); + // Fetch the oplog chains of all retryable writes that occurred before startFetchingTimestamp. + std::vector<BSONObj> serializedPipeline; + if (MigrationProtocolEnum::kShardMerge == getProtocol()) { + serializedPipeline = + tenant_migration_util::createRetryableWritesOplogFetchingPipelineForAllTenants( + expCtx, startFetchingTimestamp) + ->serializeToBson(); + } else { + serializedPipeline = tenant_migration_util::createRetryableWritesOplogFetchingPipeline( + expCtx, startFetchingTimestamp, getTenantId()) + ->serializeToBson(); + } AggregateCommandRequest aggRequest(NamespaceString::kSessionTransactionsTableNamespace, std::move(serializedPipeline)); diff --git a/src/mongo/db/repl/tenant_migration_util.cpp b/src/mongo/db/repl/tenant_migration_util.cpp index 39b1bf2fdff..51d635d72d9 100644 --- a/src/mongo/db/repl/tenant_migration_util.cpp +++ b/src/mongo/db/repl/tenant_migration_util.cpp @@ -203,8 +203,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> createCommittedTransactionsPipelineFo return Pipeline::create(std::move(stages), expCtx); } -std::unique_ptr<Pipeline, PipelineDeleter> -createRetryableWritesOplogFetchingPipelineForTenantMigrations( +std::unique_ptr<Pipeline, PipelineDeleter> createRetryableWritesOplogFetchingPipeline( const boost::intrusive_ptr<ExpressionContext>& expCtx, const Timestamp& startFetchingTimestamp, const std::string& tenantId) { @@ -376,6 +375,168 @@ createRetryableWritesOplogFetchingPipelineForTenantMigrations( return Pipeline::create(std::move(stages), expCtx); } +std::unique_ptr<Pipeline, PipelineDeleter> createRetryableWritesOplogFetchingPipelineForAllTenants( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const Timestamp& startFetchingTimestamp) { + + using Doc = Document; + const Value DNE = Value{Doc{{"$exists", false}}}; + + Pipeline::SourceContainer stages; + + // 1. Match config.transactions entries that do not have a `state` field, which indicates that + // the last write on the session was a retryable write and not a transaction. + stages.emplace_back(DocumentSourceMatch::create(Doc{{"state", DNE}}.toBson(), expCtx)); + + // 2. Fetch latest oplog entry for each config.transactions entry from the oplog view. `lastOps` + // is expected to contain every elements from `oplogView` for all the tenants. + stages.emplace_back(DocumentSourceLookUp::createFromBson(fromjson("{\ + $lookup: {\ + from: {db: 'local', coll: 'system.tenantMigration.oplogView'},\ + localField: 'lastWriteOpTime.ts',\ + foreignField: 'ts',\ + as: 'lastOps'\ + }}") + .firstElement(), + expCtx)); + + // 3. Replace the single-element 'lastOps' array field with a single 'lastOp' field. + stages.emplace_back( + DocumentSourceAddFields::create(fromjson("{lastOp: {$first: '$lastOps'}}"), expCtx)); + + // 4. Remove `lastOps` in favor of `lastOp`. + stages.emplace_back(DocumentSourceProject::createUnset(FieldPath("lastOps"), expCtx)); + + // 5. If `lastOp` does not have `preImageOpTime` or `postImageOpTime` field, assign a dummy + // timestamp so that the next two $lookup stages do not need to do collection scan on the + // the oplog collection, because otherwise $lookup treats the field as having a value of + // of null, preventing it from seeking directly to the entry with the matching timestamp. + stages.emplace_back(DocumentSourceAddFields::create(fromjson("{\ + 'lastOp.preImageOpTime': {\ + $ifNull: ['$lastOp.preImageOpTime', {ts: Timestamp(0, 0), t: -1}]\ + },\ + 'lastOp.postImageOpTime': {\ + $ifNull: ['$lastOp.postImageOpTime', {ts: Timestamp(0, 0), t: -1}]\ + }\ + }"), + expCtx)); + + // 6. Fetch preImage oplog entry for `findAndModify` from the oplog view. `preImageOps` is + // expected to contain exactly one element if the `preImageOpTime` field is not null and + // is earlier than `startFetchingTimestamp`. + stages.emplace_back(DocumentSourceLookUp::createFromBson(fromjson("{\ + $lookup: {\ + from: {db: 'local', coll: 'system.tenantMigration.oplogView'},\ + localField: 'lastOp.preImageOpTime.ts',\ + foreignField: 'ts',\ + pipeline: [{\ + $match: {\ + 'ts': {$lt: " + startFetchingTimestamp.toString() + + "}\ + }\ + }],\ + as: 'preImageOps'\ + }}") + .firstElement(), + expCtx)); + + // 7. Fetch postImage oplog entry for `findAndModify` from the oplog view. `postImageOps` is + // expected to contain exactly one element if the `postImageOpTime` field is not null and is + // earlier than `startFetchingTimestamp`. + stages.emplace_back(DocumentSourceLookUp::createFromBson(fromjson("{\ + $lookup: {\ + from: {db: 'local', coll: 'system.tenantMigration.oplogView'},\ + localField: 'lastOp.postImageOpTime.ts',\ + foreignField: 'ts',\ + pipeline: [{\ + $match: {\ + 'ts': {$lt: " + startFetchingTimestamp.toString() + + "}\ + }\ + }],\ + as: 'postImageOps'\ + }}") + .firstElement(), + expCtx)); + + // 8. Fetch oplog entries in each chain from the oplog view. + stages.emplace_back(DocumentSourceGraphLookUp::createFromBson( + Doc{{"$graphLookup", + Doc{{"from", Doc{{"db", "local"_sd}, {"coll", "system.tenantMigration.oplogView"_sd}}}, + {"startWith", "$lastOp.ts"_sd}, + {"connectFromField", "prevOpTime.ts"_sd}, + {"connectToField", "ts"_sd}, + {"as", "history"_sd}, + {"depthField", "depthForTenantMigration"_sd}}}} + .toBson() + .firstElement(), + expCtx)); + + // 9. Filter out all oplog entries from the `history` array that occur after + // `startFetchingTimestamp`. We keep the entry at the `startFetchingTimestamp` here so that + // we can capture any synthetic oplog entries that need to be created in the + // `FindAndModifyImageLookup` stage later. We do not need to sort the history after this + // since we will put the fetched entries into the oplog buffer collection, where entries are + // read in timestamp order. + stages.emplace_back(DocumentSourceAddFields::create(fromjson("{\ + history: {$filter: {\ + input: '$history',\ + cond: {$lte: ['$$this.ts', " + startFetchingTimestamp.toString() + + "]}}}}"), + expCtx)); + + // 10. Combine the oplog entries. + stages.emplace_back(DocumentSourceAddFields::create(fromjson("{\ + 'history': {$concatArrays: [\ + '$preImageOps', '$postImageOps', '$history']}}"), + expCtx)); + + // 11. Keep only the `history` field to minimize the unwind result in the next stage. + stages.emplace_back(DocumentSourceProject::createFromBson( + BSON("$project" << BSON("_id" << 0 << "history" << 1)).firstElement(), expCtx)); + + // 12. Unwind oplog entries in each `history` chain. This serves as an optimization for the + // next $lookup stage. Without unwinding, `history` is an array and the next $lookup will + // do a collection scan on the oplog collection to find all entries that match any element + // in the array, which is not efficient. After unwinding, the $lookup can utilize the fact + // that oplog collection is order by timestamp to seek directly to an entry that matches + // a timestamp without scanning the entire oplog collection. + stages.emplace_back(DocumentSourceUnwind::create(expCtx, "history", false, boost::none)); + + // 13. Fetch the complete oplog entries. `completeOplogEntry` is expected to contain exactly one + // element. + stages.emplace_back(DocumentSourceLookUp::createFromBson( + Doc{{"$lookup", + Doc{{"from", Doc{{"db", "local"_sd}, {"coll", "oplog.rs"_sd}}}, + {"localField", "history.ts"_sd}, + {"foreignField", "ts"_sd}, + {"as", "completeOplogEntry"_sd}}}} + .toBson() + .firstElement(), + expCtx)); + + // 14. Unwind oplog entries in each chain to the top-level array. + stages.emplace_back( + DocumentSourceUnwind::create(expCtx, "completeOplogEntry", false, boost::none)); + + // 15. Replace root. + stages.emplace_back(DocumentSourceReplaceRoot::createFromBson( + fromjson("{$replaceRoot: {newRoot: '$completeOplogEntry'}}").firstElement(), expCtx)); + + // 16. Downconvert any 'findAndModify' oplog entries to store pre- and post-images in the + // oplog rather than in a side collection. + stages.emplace_back(DocumentSourceFindAndModifyImageLookup::create(expCtx)); + + // 17. Since the oplog fetching and application stages will already capture entries after + // `startFetchingTimestamp`, we only need the earlier part of the oplog chain. + stages.emplace_back(DocumentSourceMatch::createFromBson( + BSON("$match" << BSON("ts" << BSON("$lt" << startFetchingTimestamp))).firstElement(), + expCtx)); + + return Pipeline::create(std::move(stages), expCtx); +} + + bool shouldStopUpdatingExternalKeys(Status status, const CancellationToken& token) { return status.isOK() || token.isCanceled(); } diff --git a/src/mongo/db/repl/tenant_migration_util.h b/src/mongo/db/repl/tenant_migration_util.h index ebf4db08dce..c2b75e59724 100644 --- a/src/mongo/db/repl/tenant_migration_util.h +++ b/src/mongo/db/repl/tenant_migration_util.h @@ -245,15 +245,21 @@ std::unique_ptr<Pipeline, PipelineDeleter> createCommittedTransactionsPipelineFo /** * Creates a pipeline that can be serialized into a query for fetching retryable writes oplog * entries before `startFetchingTimestamp`. We use `tenantId` to fetch entries specific to a - * particular set of tenant databases. + * particular set of tenant databases. This is for the multi-tenant migration protocol. */ -std::unique_ptr<Pipeline, PipelineDeleter> -createRetryableWritesOplogFetchingPipelineForTenantMigrations( +std::unique_ptr<Pipeline, PipelineDeleter> createRetryableWritesOplogFetchingPipeline( const boost::intrusive_ptr<ExpressionContext>& expCtx, const Timestamp& startFetchingTimestamp, const std::string& tenantId); /** + * Creates a pipeline that can be serialized into a query for fetching retryable writes oplog + * entries before `startFetchingTimestamp` for all tenants. This is for shard merge protocol. + */ +std::unique_ptr<Pipeline, PipelineDeleter> createRetryableWritesOplogFetchingPipelineForAllTenants( + const boost::intrusive_ptr<ExpressionContext>& expCtx, const Timestamp& startFetchingTimestamp); + +/** * Returns a new BSONObj created from 'stateDoc' with sensitive fields redacted. */ BSONObj redactStateDoc(BSONObj stateDoc); |