summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWenbin Zhu <wenbin.zhu@mongodb.com>2021-06-10 19:43:31 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-06-11 00:07:50 +0000
commit24920b095706001b589de22df744d4f18d0123a5 (patch)
tree28da648903d1e9ffc917f7078991226b8acd8fcb
parent17691de3ce474d8d9c476d1ee99590d8c3edd291 (diff)
downloadmongo-24920b095706001b589de22df744d4f18d0123a5.tar.gz
SERVER-57455 Do not keep an extra copy of the retryable write pre-fetch pipeline.
-rw-r--r--jstests/replsets/tenant_migration_recipient_fetches_retryable_writes_entry_after_committed_snapshot.js4
-rw-r--r--jstests/replsets/tenant_migration_recipient_fetches_retryable_writes_oplog_entries.js4
-rw-r--r--jstests/replsets/tenant_migration_retryable_write_retry.js275
3 files changed, 89 insertions, 194 deletions
diff --git a/jstests/replsets/tenant_migration_recipient_fetches_retryable_writes_entry_after_committed_snapshot.js b/jstests/replsets/tenant_migration_recipient_fetches_retryable_writes_entry_after_committed_snapshot.js
index fdeba25b524..d700d9a1de0 100644
--- a/jstests/replsets/tenant_migration_recipient_fetches_retryable_writes_entry_after_committed_snapshot.js
+++ b/jstests/replsets/tenant_migration_recipient_fetches_retryable_writes_entry_after_committed_snapshot.js
@@ -208,10 +208,10 @@ jsTestLog(`oplog buffer ns: ${kOplogBufferNS}`);
// Number of entries fetched into oplog buffer is the majority committed count - 1 since we only
// fetch entries that occur before startFetchingDonorOpTime, which is equal to the commit point.
-const cursor = recipientOplogBuffer.find();
+const findRes = recipientOplogBuffer.find().toArray();
const expectedCount = counterMajorityCommitted - 1;
assert.eq(
- cursor.itcount(), expectedCount, `Incorrect number of oplog entries: ${cursor.toArray()}`);
+ findRes.length, expectedCount, `Incorrect number of oplog buffer entries: ${tojson(findRes)}`);
// Resume replication on all the secondaries and wait for migration to complete.
for (const fp of stopReplProducerOnDocumentFailpoints) {
diff --git a/jstests/replsets/tenant_migration_recipient_fetches_retryable_writes_oplog_entries.js b/jstests/replsets/tenant_migration_recipient_fetches_retryable_writes_oplog_entries.js
index c82613b8ea1..37fea5a54b0 100644
--- a/jstests/replsets/tenant_migration_recipient_fetches_retryable_writes_oplog_entries.js
+++ b/jstests/replsets/tenant_migration_recipient_fetches_retryable_writes_oplog_entries.js
@@ -164,8 +164,8 @@ jsTestLog({"oplog buffer ns": kOplogBufferNS});
// We expect to see retryableWrite2, retryableWrite3, retryableWrite3's postImage, and
// bulkRetryableWrite0 (bulk insert batch size is 1).
-let cursor = recipientOplogBuffer.find();
-assert.eq(cursor.itcount(), 4, "Incorrect number of oplog entries in buffer: " + cursor.toArray());
+const findRes = recipientOplogBuffer.find().toArray();
+assert.eq(findRes.length, 4, `Incorrect number of oplog entries in buffer: ${tojson(findRes)}`);
assert.eq(1, recipientOplogBuffer.find({"entry.o._id": "retryableWrite2"}).itcount());
assert.eq(1, recipientOplogBuffer.find({"entry.o._id": "retryableWrite3"}).itcount());
assert.eq(1, recipientOplogBuffer.find({"entry.o2._id": "retryableWrite3"}).itcount());
diff --git a/jstests/replsets/tenant_migration_retryable_write_retry.js b/jstests/replsets/tenant_migration_retryable_write_retry.js
index 95bdb4a7494..0b79f1d0f10 100644
--- a/jstests/replsets/tenant_migration_retryable_write_retry.js
+++ b/jstests/replsets/tenant_migration_retryable_write_retry.js
@@ -11,29 +11,23 @@
load("jstests/replsets/libs/tenant_migration_test.js");
load("jstests/replsets/libs/tenant_migration_util.js");
-load("jstests/libs/uuid_util.js");
+load("jstests/libs/fail_point_util.js"); // For configureFailPoint().
+load("jstests/libs/uuid_util.js"); // For extractUUIDFromObject().
const migrationX509Options = TenantMigrationUtil.makeX509OptionsForTest();
-const kGarbageCollectionParams = {
- // Set the delay before a donor state doc is garbage collected to be short to speed up
- // the test.
- tenantMigrationGarbageCollectionDelayMS: 3 * 1000,
-
- // Set the TTL monitor to run at a smaller interval to speed up the test.
- ttlMonitorSleepSecs: 1,
-};
const donorRst = new ReplSetTest({
nodes: 1,
name: "donor",
- nodeOptions: Object.assign(migrationX509Options.donor, {setParameter: kGarbageCollectionParams})
-});
-const recipientRst = new ReplSetTest({
- nodes: 1,
- name: "recipient",
- nodeOptions:
- Object.assign(migrationX509Options.recipient, {setParameter: kGarbageCollectionParams})
+ nodeOptions: Object.assign(migrationX509Options.donor, {
+ setParameter: {
+ // Allow non-timestamped reads on donor after migration completes for testing.
+ 'failpoint.tenantMigrationDonorAllowsNonTimestampedReads': tojson({mode: 'alwaysOn'}),
+ }
+ })
});
+const recipientRst =
+ new ReplSetTest({nodes: 1, name: "recipient", nodeOptions: migrationX509Options.recipient});
donorRst.startSet();
donorRst.initiate();
@@ -50,12 +44,12 @@ if (!tenantMigrationTest.isFeatureFlagEnabled()) {
}
const kTenantId = "testTenantId";
-const kDbName = kTenantId + "_" +
- "testDb";
+const kDbName = tenantMigrationTest.tenantDB(kTenantId, "testDb");
const kCollName = "testColl";
const kNs = `${kDbName}.${kCollName}`;
const donorPrimary = donorRst.getPrimary();
+const recipientPrimary = recipientRst.getPrimary();
const configTxnColl = donorPrimary.getCollection("config.transactions");
assert.commandWorked(donorPrimary.getCollection(kNs).insert(
@@ -191,166 +185,6 @@ sessionsOnDonor.push({
numOplogEntries: 2, // one pre-image oplog entry.
});
-jsTest.log("Run a migration to completion");
-const migrationId = UUID();
-const migrationOpts = {
- migrationIdString: extractUUIDFromObject(migrationId),
- tenantId: kTenantId,
-};
-TenantMigrationTest.assertCommitted(tenantMigrationTest.runMigration(migrationOpts));
-
-const donorDoc =
- donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS).findOne({tenantId: kTenantId});
-
-tenantMigrationTest.waitForMigrationGarbageCollection(migrationId, kTenantId);
-
-// Test the aggregation pipeline the recipient would use for getting the oplog chain where
-// "ts" < "startFetchingOpTime" for all retryable writes entries in config.transactions. The
-// recipient would use the real "startFetchingOpTime", but this test uses the donor's commit
-// timestamp as a substitute.
-const startFetchingTimestamp = donorDoc.commitOrAbortOpTime.ts;
-
-jsTest.log("Run retryable write after the migration");
-const lsid7 = {
- id: UUID()
-};
-const sessionTag7 = "retryable insert after migration";
-// Make sure this write is in the majority snapshot.
-assert.commandWorked(donorPrimary.getDB(kDbName).runCommand({
- insert: kCollName,
- documents: [{_id: 7, x: 7, tag: sessionTag7}],
- txnNumber: NumberLong(0),
- lsid: lsid7,
- writeConcern: {w: "majority"}
-}));
-
-const lsid8 = {
- id: UUID()
-};
-const sessionTag8 = "retryable findAndModify update after migration";
-assert.commandWorked(donorPrimary.getDB(kDbName).runCommand({
- findAndModify: kCollName,
- query: {x: 7},
- update: {$set: {tag: sessionTag8}},
- new: true,
- txnNumber: NumberLong(0),
- lsid: lsid8,
- writeConcern: {w: "majority"}
-}));
-
-// The aggregation pipeline will return an array of retryable writes oplog entries (pre-image/
-// post-image oplog entries included) with "ts" < "startFetchingTimestamp".
-const aggRes = donorPrimary.getDB("config").runCommand({
- aggregate: "transactions",
- pipeline: [
- // Fetch the config.transactions entries that do not have a "state" field, which indicates a
- // retryable write.
- {$match: {"state": {$exists: false}}},
- // Fetch latest oplog entry for each config.transactions entry from the oplog view.
- {$lookup: {
- from: {db: "local", coll: "system.tenantMigration.oplogView"},
- let: { tenant_ts: "$lastWriteOpTime.ts"},
- pipeline: [{
- $match: {
- $expr: {
- $and: [
- {$regexMatch: {
- input: "$ns",
- regex: new RegExp(`^${kTenantId}_`)
- }},
- {$eq: [ "$ts", "$$tenant_ts"]}
- ]
- }
- }
- }],
- // This array is expected to contain exactly one element if `ns` contains
- // `kTenantId`. Otherwise, it will be empty.
- as: "lastOps"
- }},
- // Entries that don't have the correct `ns` will return an empty `lastOps` array. Filter
- // these results before the next stage.
- {$match: {"lastOps": {$ne: [] }}},
- // All remaining results should correspond to the correct `kTenantId`. Replace the
- // single-element 'lastOps' array field with a single 'lastOp' field.
- {$addFields: {lastOp: {$first: "$lastOps"}}},
- {$unset: "lastOps"},
- // Fetch the preImage oplog entry for findAndModify from the oplog view only if it occurred
- // before `startFetchingTimestamp`.
- {$lookup: {
- from: {db: "local", coll: "system.tenantMigration.oplogView"},
- let: { preimage_ts: "$lastOp.preImageOpTime.ts"},
- pipeline: [{
- $match: {
- $expr: {
- $and: [
- {$eq: [ "$ts", "$$preimage_ts"]},
- {$lt: ["$ts", startFetchingTimestamp]}
- ]
- }
- }
- }],
- // This array is expected to contain exactly one element if the 'preImageOpTime'
- // field is not null.
- as: "preImageOps"
- }},
- // Fetch the postImage oplog entry for findAndModify from the oplog view only if it occurred
- // before `startFetchingTimestamp`.
- {$lookup: {
- from: {db: "local", coll: "system.tenantMigration.oplogView"},
- let: { postimage_ts: "$lastOp.postImageOpTime.ts"},
- pipeline: [{
- $match: {
- $expr: {
- $and: [
- {$eq: [ "$ts", "$$postimage_ts"]},
- {$lt: ["$ts", startFetchingTimestamp]}
- ]
- }
- }
- }],
- // This array is expected to contain exactly one element if the 'postImageOpTime'
- // field is not null.
- as: "postImageOps"
- }},
- // Fetch oplog entries in each chain for insert, update, or delete from the oplog view.
- {$graphLookup: {
- from: {db: "local", coll: "system.tenantMigration.oplogView"},
- startWith: "$lastOp.ts",
- connectFromField: "prevOpTime.ts",
- connectToField: "ts",
- as: "history",
- depthField: "depthForTenantMigration"
- }},
- // Now that we have the whole chain, filter out entries that occurred after
- // `startFetchingTimestamp`, since these entries will be fetched during the oplog fetching
- // phase.
- {$set: {
- history: {
- $filter: {
- input: "$history",
- cond: {$lt: ["$$this.ts", startFetchingTimestamp]}
- }
- }
- }},
- // Combine the oplog entries.
- {$set: {history: {$concatArrays: ["$preImageOps", "$history", "$postImageOps"]}}},
- // Fetch the complete oplog entries and unwind oplog entries in each chain to the top-level
- // array.
- {$lookup: {
- from: {db: "local", coll: "oplog.rs"},
- localField: "history.ts",
- foreignField: "ts",
- // This array is expected to contain exactly one element.
- as: "completeOplogEntry"
- }},
- // Unwind oplog entries in each chain to the top-level array.
- {$unwind: "$completeOplogEntry"},
- {$replaceRoot: {newRoot: "$completeOplogEntry"}},
- ],
- readConcern: {level: "majority"},
- cursor: {},
-});
-
// Example oplog entries output for the retryable findAndModify in session 'lsid6' where the first
// one is its pre-image oplog entry.
// {
@@ -404,29 +238,90 @@ const aggRes = donorPrimary.getDB("config").runCommand({
// }
// }
-// Verify that the aggregation command returned the expected number of oplog entries.
+const migrationId = UUID();
+const migrationOpts = {
+ migrationIdString: extractUUIDFromObject(migrationId),
+ tenantId: kTenantId,
+};
+
+const fpAfterRetrievingStartOpTimesMigrationRecipientInstance = configureFailPoint(
+ recipientPrimary, "fpAfterRetrievingStartOpTimesMigrationRecipientInstance", {action: "hang"});
+const fpAfterFetchingRetryableWritesEntriesBeforeStartOpTime = configureFailPoint(
+ recipientPrimary, "fpAfterFetchingRetryableWritesEntriesBeforeStartOpTime", {action: "hang"});
+const fpAfterDataConsistentMigrationRecipientInstance = configureFailPoint(
+ recipientPrimary, "fpAfterDataConsistentMigrationRecipientInstance", {action: "hang"});
+
+jsTestLog(`Starting tenant migration: ${tojson(migrationOpts)}`);
+assert.commandWorked(tenantMigrationTest.startMigration(migrationOpts));
+
+// Wait for recipient to get the startFetchingTimestamp.
+fpAfterRetrievingStartOpTimesMigrationRecipientInstance.wait();
+
+// Do retryable writes after retrieving startFetchingTimestamp, these writes should not appear in
+// the oplog buffer in the pre-fetch stage, but should exit after tenant migration is consistent.
+const lsid7 = {
+ id: UUID()
+};
+const sessionTag7 = "retryable insert after retrieving startFetchingTimestamp";
+assert.commandWorked(donorPrimary.getDB(kDbName).runCommand({
+ insert: kCollName,
+ documents: [{_id: 7, x: 7, tag: sessionTag7}],
+ txnNumber: NumberLong(0),
+ lsid: lsid7
+}));
+
+// Wait for retryable writes to be fetched and inserted into oplog buffer prior to cloning.
+fpAfterRetrievingStartOpTimesMigrationRecipientInstance.off();
+fpAfterFetchingRetryableWritesEntriesBeforeStartOpTime.wait();
+
+const kOplogBufferNS = "config.repl.migration.oplog_" + migrationOpts.migrationIdString;
+const recipientOplogBuffer = recipientPrimary.getCollection(kOplogBufferNS);
+jsTestLog(`oplog buffer ns: ${kOplogBufferNS}`);
+
+// Verify that after pre-fetching retryable writes, the entries inserted into the oplog buffer
+// are equal to the entries on the donor.
+const findRes = recipientOplogBuffer.find().toArray();
+const expectedCount = sessionsOnDonor.reduce(
+ (numOplogEntries, sessionOnDonor) => sessionOnDonor.numOplogEntries + numOplogEntries, 0);
assert.eq(
- aggRes.cursor.firstBatch.length,
- sessionsOnDonor.reduce(
- (numOplogEntries, sessionOnDonor) => sessionOnDonor.numOplogEntries + numOplogEntries, 0));
+ findRes.length, expectedCount, `Incorrect number of oplog buffer entries: ${tojson(findRes)}`);
-for (let sessionOnDonor of sessionsOnDonor) {
+for (const session of sessionsOnDonor) {
// Find the returned oplog docs for the session.
- const docs = aggRes.cursor.firstBatch.filter(
- doc => bsonWoCompare(doc.lsid, sessionOnDonor.txnEntry._id) === 0);
- assert.eq(docs.length, sessionOnDonor.numOplogEntries);
+ const docs = recipientOplogBuffer.find({"entry.lsid": session.txnEntry._id}).toArray();
+ assert.eq(docs.length, session.numOplogEntries);
docs.forEach(doc => {
// Verify the doc corresponds to the right config.transactions entry.
- assert.eq(doc.txnNumber, sessionOnDonor.txnEntry.txnNum);
-
+ assert.eq(doc.entry.txnNumber, session.txnEntry.txnNum);
// Verify that doc contains the right oplog entry.
- if (sessionOnDonor.tag && doc.op != "n") {
- assert.eq(getTagFromOplog(doc), sessionOnDonor.tag);
+ if (doc.entry.op === "n") {
+ assert.eq(session.containsPreImage || session.containsPostImage, true);
+ } else if (session.tag) {
+ assert.eq(getTagFromOplog(doc.entry), session.tag);
}
});
}
+// Wait for tenant migration to be consistent.
+fpAfterFetchingRetryableWritesEntriesBeforeStartOpTime.off();
+fpAfterDataConsistentMigrationRecipientInstance.wait();
+
+// After tenant migration is consistent, the retryable writes done after startFetchingTimestamp
+// should have been fetched and inserted into the oplog buffer.
+const findRes2 = recipientOplogBuffer.find().toArray();
+const expectedCount2 = expectedCount + 1;
+assert.eq(findRes2.length, expectedCount2);
+
+const docs2 = recipientOplogBuffer.find({"entry.lsid": getTxnEntry(lsid7)._id}).toArray();
+assert.eq(docs2.length, 1);
+assert.eq(getTagFromOplog(docs2[0].entry), sessionTag7);
+
+// Wait for tenant migration to complete successfully.
+fpAfterDataConsistentMigrationRecipientInstance.off();
+TenantMigrationTest.assertCommitted(tenantMigrationTest.waitForMigrationToComplete(migrationOpts));
+assert.commandWorked(tenantMigrationTest.forgetMigration(migrationOpts.migrationIdString));
+
donorRst.stopSet();
recipientRst.stopSet();
})();