summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormathisbessamdb <mathis.bessa@mongodb.com>2022-07-07 14:32:13 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-07-07 15:24:49 +0000
commit26fd2600a781a7561fc216e38f38932c23bce3f3 (patch)
tree948774862829bddf4b3ab2c4a7a55636a1375502
parent9c471282110ba73ba40d3782fd858135e16b5cac (diff)
downloadmongo-26fd2600a781a7561fc216e38f38932c23bce3f3.tar.gz
SERVER-65819 Update existing tenant migration retryable writes oplog for shard merge
-rw-r--r--jstests/replsets/tenant_migration_recipient_fetches_retryable_writes_oplog_entries.js63
-rw-r--r--jstests/replsets/tenant_migration_shard_merge_recipient_fetches_retryable_writes.js65
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.cpp18
-rw-r--r--src/mongo/db/repl/tenant_migration_util.cpp165
-rw-r--r--src/mongo/db/repl/tenant_migration_util.h12
5 files changed, 215 insertions, 108 deletions
diff --git a/jstests/replsets/tenant_migration_recipient_fetches_retryable_writes_oplog_entries.js b/jstests/replsets/tenant_migration_recipient_fetches_retryable_writes_oplog_entries.js
index c9016fc6b2e..518aada649b 100644
--- a/jstests/replsets/tenant_migration_recipient_fetches_retryable_writes_oplog_entries.js
+++ b/jstests/replsets/tenant_migration_recipient_fetches_retryable_writes_oplog_entries.js
@@ -2,14 +2,9 @@
* Tests that the tenant migration recipient correctly fetches retryable writes oplog entries
* and adds them to its oplog buffer.
*
- * TODO SERVER-63517: incompatible_with_shard_merge, this tests specific implementation
- * details related to MT Migrations. Retryable write behavior is tested in various other
- * tests for shard merge.
- *
* @tags: [
* incompatible_with_macos,
* incompatible_with_windows_tls,
- * incompatible_with_shard_merge,
* requires_majority_read_concern,
* requires_persistence,
* serverless,
@@ -24,26 +19,25 @@ load("jstests/libs/fail_point_util.js"); // For configureFailPoint().
load("jstests/libs/parallelTester.js"); // For Thread.
const kMaxBatchSize = 1;
-const kParams = {
- // Set the delay before a donor state doc is garbage collected to be short to speed up
- // the test.
- tenantMigrationGarbageCollectionDelayMS: 3 * 1000,
-
- // Set the TTL monitor to run at a smaller interval to speed up the test.
- ttlMonitorSleepSecs: 1,
-
- // Decrease internal max batch size so we can still show writes are batched without inserting
- // hundreds of documents.
- internalInsertMaxBatchSize: kMaxBatchSize,
-};
function runTest({storeFindAndModifyImagesInSideCollection = false}) {
- const tenantMigrationTest = new TenantMigrationTest(
- {name: jsTestName(), sharedOptions: {nodes: 1, setParameter: kParams}});
+ const tenantMigrationTest = new TenantMigrationTest({
+ name: jsTestName(),
+ quickGarbageCollection: true,
+ sharedOptions: {
+ nodes: 1,
+ setParameter: {
+ // Decrease internal max batch size so we can still show writes are batched without
+ // inserting hundreds of documents.
+ internalInsertMaxBatchSize: kMaxBatchSize,
+ }
+ }
+ });
const kTenantId = "testTenantId";
- const kDbName = kTenantId + "_" +
- "testDb";
+ const kTenantId2 = "testTenantId2";
+ const kDbName = `${kTenantId}_testDb`;
+ const kDbName2 = `${kTenantId2}_testDb`;
const kCollName = "testColl";
const donorRst = tenantMigrationTest.getDonorRst();
@@ -57,10 +51,6 @@ function runTest({storeFindAndModifyImagesInSideCollection = false}) {
recipientPrimary.adminCommand(setParam);
const rsConn = new Mongo(donorRst.getURL());
- // Create a collection on a database that isn't prefixed with `kTenantId`.
- const session = rsConn.startSession({retryWrites: true});
- const collection = session.getDatabase("test")["collection"];
-
const tenantSession = rsConn.startSession({retryWrites: true});
const tenantCollection = tenantSession.getDatabase(kDbName)[kCollName];
@@ -70,11 +60,18 @@ function runTest({storeFindAndModifyImagesInSideCollection = false}) {
const tenantSession3 = rsConn.startSession({retryWrites: true});
const tenantCollection3 = tenantSession3.getDatabase(kDbName)[kCollName];
+ // Create a collection on a database that isn't prefixed with `kTenantId`.
+ const secondTenantSession = rsConn.startSession({retryWrites: true});
+ const secondTenantCollection = secondTenantSession.getDatabase(kDbName2)[kCollName];
+
+ const isShardMergeEnabled =
+ TenantMigrationUtil.isShardMergeEnabled(donorRst.getPrimary().getDB("adminDB"));
+
jsTestLog("Run retryable writes prior to the migration");
// Retryable insert, but not on correct tenant database. This write should not show up in the
- // oplog buffer.
- assert.commandWorked(collection.insert({_id: "retryableWrite1"}));
+ // oplog buffer for the tenant migration protocol. It will however for the shard merge protocol.
+ assert.commandWorked(secondTenantCollection.insert({_id: "retryableWrite1"}));
// The following retryable writes should occur on the correct tenant database, so they should
// all be retrieved by the pipeline.
@@ -166,18 +163,20 @@ function runTest({storeFindAndModifyImagesInSideCollection = false}) {
const recipientOplogBuffer = recipientPrimary.getDB("config")[kOplogBufferNS];
jsTestLog({"oplog buffer ns": kOplogBufferNS});
- // We expect to see retryableWrite2, retryableWrite3, retryableWrite3's postImage, and
- // bulkRetryableWrite0 (bulk insert batch size is 1).
- const findRes = recipientOplogBuffer.find().toArray();
- assert.eq(findRes.length, 4, `Incorrect number of oplog entries in buffer: ${tojson(findRes)}`);
+ // We expect to see retryableWrite2, retryableWrite3, retryableWrite3's postImage,
+ // and bulkRetryableWrite0 (bulk insert batch size is 1).
assert.eq(1, recipientOplogBuffer.find({"entry.o._id": "retryableWrite2"}).itcount());
assert.eq(1, recipientOplogBuffer.find({"entry.o._id": "retryableWrite3"}).itcount());
assert.eq(1, recipientOplogBuffer.find({"entry.o2._id": "retryableWrite3"}).itcount());
assert.eq(1, recipientOplogBuffer.find({"entry.o._id": "bulkRetryableWrite0"}).itcount());
+ // Only for shardMerge we expect to have the other tenantId. Otherwise only for the provided
+ // tenantId.
+ assert.eq(isShardMergeEnabled ? 1 : 0,
+ recipientOplogBuffer.find({"entry.o._id": "retryableWrite1"}).itcount());
+
// Ensure the retryable write oplog entries that should not be in `kOplogBufferNS` are in fact
// not.
- assert.eq(0, recipientOplogBuffer.find({"entry.o._id": "retryableWrite1"}).itcount());
assert.eq(0, recipientOplogBuffer.find({"entry.o._id": "retryableWrite4"}).itcount());
assert.eq(0, recipientOplogBuffer.find({"entry.o2._id": "retryableWrite4"}).itcount());
assert.eq(0, recipientOplogBuffer.find({"entry.o._id": "bulkRetryableWrite1"}).itcount());
diff --git a/jstests/replsets/tenant_migration_shard_merge_recipient_fetches_retryable_writes.js b/jstests/replsets/tenant_migration_shard_merge_recipient_fetches_retryable_writes.js
deleted file mode 100644
index 0c6712f7fc2..00000000000
--- a/jstests/replsets/tenant_migration_shard_merge_recipient_fetches_retryable_writes.js
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Tests that the shard merge recipient correctly fetches retryable writes.
- *
- * @tags: [
- * incompatible_with_macos,
- * incompatible_with_windows_tls,
- * featureFlagShardMerge,
- * requires_majority_read_concern,
- * requires_persistence,
- * serverless,
- * ]
- */
-(function() {
-"use strict";
-
-load("jstests/replsets/libs/tenant_migration_test.js");
-load("jstests/libs/uuid_util.js"); // For extractUUIDFromObject().
-
-const kParams = {
- ttlMonitorSleepSecs: 1,
-};
-
-const tenantMigrationTest = new TenantMigrationTest({
- name: jsTestName(),
- sharedOptions: {nodes: 1, setParameter: kParams},
- quickGarbageCollection: true
-});
-
-const kTenantId = "testTenantId";
-const tenantDB = tenantMigrationTest.tenantDB(kTenantId, "database");
-
-const donorRst = tenantMigrationTest.getDonorRst();
-const donorPrimary = tenantMigrationTest.getDonorPrimary();
-const recipientPrimary = tenantMigrationTest.getRecipientPrimary();
-
-jsTestLog("Run retryable write prior to the migration");
-
-const lsid = UUID();
-const cmd = {
- insert: "collection",
- documents: [{_id: 1}, {_id: 2}],
- ordered: false,
- lsid: {id: lsid},
- txnNumber: NumberLong(123),
-};
-
-assert.commandWorked(donorPrimary.getDB(tenantDB).runCommand(cmd));
-assert.eq(2, donorPrimary.getDB(tenantDB).collection.find().itcount());
-
-const migrationId = UUID();
-const migrationOpts = {
- migrationIdString: extractUUIDFromObject(migrationId),
- tenantId: kTenantId,
-};
-
-jsTestLog(`Starting migration: ${tojson(migrationOpts)}`);
-TenantMigrationTest.assertCommitted(tenantMigrationTest.runMigration(migrationOpts));
-
-const {ok, n} = assert.commandWorked(recipientPrimary.getDB(tenantDB).runCommand(cmd));
-assert.eq(1, ok);
-assert.eq(2, n);
-assert.eq(2, recipientPrimary.getDB(tenantDB).collection.find().itcount());
-
-tenantMigrationTest.stop();
-})();
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
index 3e58a86373f..48d514339eb 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
@@ -1557,12 +1557,18 @@ TenantMigrationRecipientService::Instance::_fetchRetryableWritesOplogBeforeStart
"tenantId"_attr = getTenantId(),
"migrationId"_attr = getMigrationUUID());
- // Fetch the oplog chains of all retryable writes that occurred before startFetchingTimestamp
- // on this tenant.
- auto serializedPipeline =
- tenant_migration_util::createRetryableWritesOplogFetchingPipelineForTenantMigrations(
- expCtx, startFetchingTimestamp, getTenantId())
- ->serializeToBson();
+ // Fetch the oplog chains of all retryable writes that occurred before startFetchingTimestamp.
+ std::vector<BSONObj> serializedPipeline;
+ if (MigrationProtocolEnum::kShardMerge == getProtocol()) {
+ serializedPipeline =
+ tenant_migration_util::createRetryableWritesOplogFetchingPipelineForAllTenants(
+ expCtx, startFetchingTimestamp)
+ ->serializeToBson();
+ } else {
+ serializedPipeline = tenant_migration_util::createRetryableWritesOplogFetchingPipeline(
+ expCtx, startFetchingTimestamp, getTenantId())
+ ->serializeToBson();
+ }
AggregateCommandRequest aggRequest(NamespaceString::kSessionTransactionsTableNamespace,
std::move(serializedPipeline));
diff --git a/src/mongo/db/repl/tenant_migration_util.cpp b/src/mongo/db/repl/tenant_migration_util.cpp
index 39b1bf2fdff..51d635d72d9 100644
--- a/src/mongo/db/repl/tenant_migration_util.cpp
+++ b/src/mongo/db/repl/tenant_migration_util.cpp
@@ -203,8 +203,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> createCommittedTransactionsPipelineFo
return Pipeline::create(std::move(stages), expCtx);
}
-std::unique_ptr<Pipeline, PipelineDeleter>
-createRetryableWritesOplogFetchingPipelineForTenantMigrations(
+std::unique_ptr<Pipeline, PipelineDeleter> createRetryableWritesOplogFetchingPipeline(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const Timestamp& startFetchingTimestamp,
const std::string& tenantId) {
@@ -376,6 +375,168 @@ createRetryableWritesOplogFetchingPipelineForTenantMigrations(
return Pipeline::create(std::move(stages), expCtx);
}
+std::unique_ptr<Pipeline, PipelineDeleter> createRetryableWritesOplogFetchingPipelineForAllTenants(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const Timestamp& startFetchingTimestamp) {
+
+ using Doc = Document;
+ const Value DNE = Value{Doc{{"$exists", false}}};
+
+ Pipeline::SourceContainer stages;
+
+ // 1. Match config.transactions entries that do not have a `state` field, which indicates that
+ // the last write on the session was a retryable write and not a transaction.
+ stages.emplace_back(DocumentSourceMatch::create(Doc{{"state", DNE}}.toBson(), expCtx));
+
+ // 2. Fetch latest oplog entry for each config.transactions entry from the oplog view. `lastOps`
+ // is expected to contain every elements from `oplogView` for all the tenants.
+ stages.emplace_back(DocumentSourceLookUp::createFromBson(fromjson("{\
+ $lookup: {\
+ from: {db: 'local', coll: 'system.tenantMigration.oplogView'},\
+ localField: 'lastWriteOpTime.ts',\
+ foreignField: 'ts',\
+ as: 'lastOps'\
+ }}")
+ .firstElement(),
+ expCtx));
+
+ // 3. Replace the single-element 'lastOps' array field with a single 'lastOp' field.
+ stages.emplace_back(
+ DocumentSourceAddFields::create(fromjson("{lastOp: {$first: '$lastOps'}}"), expCtx));
+
+ // 4. Remove `lastOps` in favor of `lastOp`.
+ stages.emplace_back(DocumentSourceProject::createUnset(FieldPath("lastOps"), expCtx));
+
+ // 5. If `lastOp` does not have `preImageOpTime` or `postImageOpTime` field, assign a dummy
+ // timestamp so that the next two $lookup stages do not need to do collection scan on the
+ // the oplog collection, because otherwise $lookup treats the field as having a value of
+ // of null, preventing it from seeking directly to the entry with the matching timestamp.
+ stages.emplace_back(DocumentSourceAddFields::create(fromjson("{\
+ 'lastOp.preImageOpTime': {\
+ $ifNull: ['$lastOp.preImageOpTime', {ts: Timestamp(0, 0), t: -1}]\
+ },\
+ 'lastOp.postImageOpTime': {\
+ $ifNull: ['$lastOp.postImageOpTime', {ts: Timestamp(0, 0), t: -1}]\
+ }\
+ }"),
+ expCtx));
+
+ // 6. Fetch preImage oplog entry for `findAndModify` from the oplog view. `preImageOps` is
+ // expected to contain exactly one element if the `preImageOpTime` field is not null and
+ // is earlier than `startFetchingTimestamp`.
+ stages.emplace_back(DocumentSourceLookUp::createFromBson(fromjson("{\
+ $lookup: {\
+ from: {db: 'local', coll: 'system.tenantMigration.oplogView'},\
+ localField: 'lastOp.preImageOpTime.ts',\
+ foreignField: 'ts',\
+ pipeline: [{\
+ $match: {\
+ 'ts': {$lt: " + startFetchingTimestamp.toString() +
+ "}\
+ }\
+ }],\
+ as: 'preImageOps'\
+ }}")
+ .firstElement(),
+ expCtx));
+
+ // 7. Fetch postImage oplog entry for `findAndModify` from the oplog view. `postImageOps` is
+ // expected to contain exactly one element if the `postImageOpTime` field is not null and is
+ // earlier than `startFetchingTimestamp`.
+ stages.emplace_back(DocumentSourceLookUp::createFromBson(fromjson("{\
+ $lookup: {\
+ from: {db: 'local', coll: 'system.tenantMigration.oplogView'},\
+ localField: 'lastOp.postImageOpTime.ts',\
+ foreignField: 'ts',\
+ pipeline: [{\
+ $match: {\
+ 'ts': {$lt: " + startFetchingTimestamp.toString() +
+ "}\
+ }\
+ }],\
+ as: 'postImageOps'\
+ }}")
+ .firstElement(),
+ expCtx));
+
+ // 8. Fetch oplog entries in each chain from the oplog view.
+ stages.emplace_back(DocumentSourceGraphLookUp::createFromBson(
+ Doc{{"$graphLookup",
+ Doc{{"from", Doc{{"db", "local"_sd}, {"coll", "system.tenantMigration.oplogView"_sd}}},
+ {"startWith", "$lastOp.ts"_sd},
+ {"connectFromField", "prevOpTime.ts"_sd},
+ {"connectToField", "ts"_sd},
+ {"as", "history"_sd},
+ {"depthField", "depthForTenantMigration"_sd}}}}
+ .toBson()
+ .firstElement(),
+ expCtx));
+
+ // 9. Filter out all oplog entries from the `history` array that occur after
+ // `startFetchingTimestamp`. We keep the entry at the `startFetchingTimestamp` here so that
+ // we can capture any synthetic oplog entries that need to be created in the
+ // `FindAndModifyImageLookup` stage later. We do not need to sort the history after this
+ // since we will put the fetched entries into the oplog buffer collection, where entries are
+ // read in timestamp order.
+ stages.emplace_back(DocumentSourceAddFields::create(fromjson("{\
+ history: {$filter: {\
+ input: '$history',\
+ cond: {$lte: ['$$this.ts', " + startFetchingTimestamp.toString() +
+ "]}}}}"),
+ expCtx));
+
+ // 10. Combine the oplog entries.
+ stages.emplace_back(DocumentSourceAddFields::create(fromjson("{\
+ 'history': {$concatArrays: [\
+ '$preImageOps', '$postImageOps', '$history']}}"),
+ expCtx));
+
+ // 11. Keep only the `history` field to minimize the unwind result in the next stage.
+ stages.emplace_back(DocumentSourceProject::createFromBson(
+ BSON("$project" << BSON("_id" << 0 << "history" << 1)).firstElement(), expCtx));
+
+ // 12. Unwind oplog entries in each `history` chain. This serves as an optimization for the
+ // next $lookup stage. Without unwinding, `history` is an array and the next $lookup will
+ // do a collection scan on the oplog collection to find all entries that match any element
+ // in the array, which is not efficient. After unwinding, the $lookup can utilize the fact
+ // that oplog collection is order by timestamp to seek directly to an entry that matches
+ // a timestamp without scanning the entire oplog collection.
+ stages.emplace_back(DocumentSourceUnwind::create(expCtx, "history", false, boost::none));
+
+ // 13. Fetch the complete oplog entries. `completeOplogEntry` is expected to contain exactly one
+ // element.
+ stages.emplace_back(DocumentSourceLookUp::createFromBson(
+ Doc{{"$lookup",
+ Doc{{"from", Doc{{"db", "local"_sd}, {"coll", "oplog.rs"_sd}}},
+ {"localField", "history.ts"_sd},
+ {"foreignField", "ts"_sd},
+ {"as", "completeOplogEntry"_sd}}}}
+ .toBson()
+ .firstElement(),
+ expCtx));
+
+ // 14. Unwind oplog entries in each chain to the top-level array.
+ stages.emplace_back(
+ DocumentSourceUnwind::create(expCtx, "completeOplogEntry", false, boost::none));
+
+ // 15. Replace root.
+ stages.emplace_back(DocumentSourceReplaceRoot::createFromBson(
+ fromjson("{$replaceRoot: {newRoot: '$completeOplogEntry'}}").firstElement(), expCtx));
+
+ // 16. Downconvert any 'findAndModify' oplog entries to store pre- and post-images in the
+ // oplog rather than in a side collection.
+ stages.emplace_back(DocumentSourceFindAndModifyImageLookup::create(expCtx));
+
+ // 17. Since the oplog fetching and application stages will already capture entries after
+ // `startFetchingTimestamp`, we only need the earlier part of the oplog chain.
+ stages.emplace_back(DocumentSourceMatch::createFromBson(
+ BSON("$match" << BSON("ts" << BSON("$lt" << startFetchingTimestamp))).firstElement(),
+ expCtx));
+
+ return Pipeline::create(std::move(stages), expCtx);
+}
+
+
bool shouldStopUpdatingExternalKeys(Status status, const CancellationToken& token) {
return status.isOK() || token.isCanceled();
}
diff --git a/src/mongo/db/repl/tenant_migration_util.h b/src/mongo/db/repl/tenant_migration_util.h
index ebf4db08dce..c2b75e59724 100644
--- a/src/mongo/db/repl/tenant_migration_util.h
+++ b/src/mongo/db/repl/tenant_migration_util.h
@@ -245,15 +245,21 @@ std::unique_ptr<Pipeline, PipelineDeleter> createCommittedTransactionsPipelineFo
/**
* Creates a pipeline that can be serialized into a query for fetching retryable writes oplog
* entries before `startFetchingTimestamp`. We use `tenantId` to fetch entries specific to a
- * particular set of tenant databases.
+ * particular set of tenant databases. This is for the multi-tenant migration protocol.
*/
-std::unique_ptr<Pipeline, PipelineDeleter>
-createRetryableWritesOplogFetchingPipelineForTenantMigrations(
+std::unique_ptr<Pipeline, PipelineDeleter> createRetryableWritesOplogFetchingPipeline(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const Timestamp& startFetchingTimestamp,
const std::string& tenantId);
/**
+ * Creates a pipeline that can be serialized into a query for fetching retryable writes oplog
+ * entries before `startFetchingTimestamp` for all tenants. This is for shard merge protocol.
+ */
+std::unique_ptr<Pipeline, PipelineDeleter> createRetryableWritesOplogFetchingPipelineForAllTenants(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, const Timestamp& startFetchingTimestamp);
+
+/**
* Returns a new BSONObj created from 'stateDoc' with sensitive fields redacted.
*/
BSONObj redactStateDoc(BSONObj stateDoc);