diff options
author | Wenbin Zhu <wenbin.zhu@mongodb.com> | 2021-06-10 19:43:31 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-06-11 00:07:50 +0000 |
commit | 24920b095706001b589de22df744d4f18d0123a5 (patch) | |
tree | 28da648903d1e9ffc917f7078991226b8acd8fcb | |
parent | 17691de3ce474d8d9c476d1ee99590d8c3edd291 (diff) | |
download | mongo-24920b095706001b589de22df744d4f18d0123a5.tar.gz |
SERVER-57455 Do not keep an extra copy of the retryable write pre-fetch pipeline.
3 files changed, 89 insertions, 194 deletions
diff --git a/jstests/replsets/tenant_migration_recipient_fetches_retryable_writes_entry_after_committed_snapshot.js b/jstests/replsets/tenant_migration_recipient_fetches_retryable_writes_entry_after_committed_snapshot.js index fdeba25b524..d700d9a1de0 100644 --- a/jstests/replsets/tenant_migration_recipient_fetches_retryable_writes_entry_after_committed_snapshot.js +++ b/jstests/replsets/tenant_migration_recipient_fetches_retryable_writes_entry_after_committed_snapshot.js @@ -208,10 +208,10 @@ jsTestLog(`oplog buffer ns: ${kOplogBufferNS}`); // Number of entries fetched into oplog buffer is the majority committed count - 1 since we only // fetch entries that occur before startFetchingDonorOpTime, which is equal to the commit point. -const cursor = recipientOplogBuffer.find(); +const findRes = recipientOplogBuffer.find().toArray(); const expectedCount = counterMajorityCommitted - 1; assert.eq( - cursor.itcount(), expectedCount, `Incorrect number of oplog entries: ${cursor.toArray()}`); + findRes.length, expectedCount, `Incorrect number of oplog buffer entries: ${tojson(findRes)}`); // Resume replication on all the secondaries and wait for migration to complete. for (const fp of stopReplProducerOnDocumentFailpoints) { diff --git a/jstests/replsets/tenant_migration_recipient_fetches_retryable_writes_oplog_entries.js b/jstests/replsets/tenant_migration_recipient_fetches_retryable_writes_oplog_entries.js index c82613b8ea1..37fea5a54b0 100644 --- a/jstests/replsets/tenant_migration_recipient_fetches_retryable_writes_oplog_entries.js +++ b/jstests/replsets/tenant_migration_recipient_fetches_retryable_writes_oplog_entries.js @@ -164,8 +164,8 @@ jsTestLog({"oplog buffer ns": kOplogBufferNS}); // We expect to see retryableWrite2, retryableWrite3, retryableWrite3's postImage, and // bulkRetryableWrite0 (bulk insert batch size is 1). -let cursor = recipientOplogBuffer.find(); -assert.eq(cursor.itcount(), 4, "Incorrect number of oplog entries in buffer: " + cursor.toArray()); +const findRes = recipientOplogBuffer.find().toArray(); +assert.eq(findRes.length, 4, `Incorrect number of oplog entries in buffer: ${tojson(findRes)}`); assert.eq(1, recipientOplogBuffer.find({"entry.o._id": "retryableWrite2"}).itcount()); assert.eq(1, recipientOplogBuffer.find({"entry.o._id": "retryableWrite3"}).itcount()); assert.eq(1, recipientOplogBuffer.find({"entry.o2._id": "retryableWrite3"}).itcount()); diff --git a/jstests/replsets/tenant_migration_retryable_write_retry.js b/jstests/replsets/tenant_migration_retryable_write_retry.js index 95bdb4a7494..0b79f1d0f10 100644 --- a/jstests/replsets/tenant_migration_retryable_write_retry.js +++ b/jstests/replsets/tenant_migration_retryable_write_retry.js @@ -11,29 +11,23 @@ load("jstests/replsets/libs/tenant_migration_test.js"); load("jstests/replsets/libs/tenant_migration_util.js"); -load("jstests/libs/uuid_util.js"); +load("jstests/libs/fail_point_util.js"); // For configureFailPoint(). +load("jstests/libs/uuid_util.js"); // For extractUUIDFromObject(). const migrationX509Options = TenantMigrationUtil.makeX509OptionsForTest(); -const kGarbageCollectionParams = { - // Set the delay before a donor state doc is garbage collected to be short to speed up - // the test. - tenantMigrationGarbageCollectionDelayMS: 3 * 1000, - - // Set the TTL monitor to run at a smaller interval to speed up the test. - ttlMonitorSleepSecs: 1, -}; const donorRst = new ReplSetTest({ nodes: 1, name: "donor", - nodeOptions: Object.assign(migrationX509Options.donor, {setParameter: kGarbageCollectionParams}) -}); -const recipientRst = new ReplSetTest({ - nodes: 1, - name: "recipient", - nodeOptions: - Object.assign(migrationX509Options.recipient, {setParameter: kGarbageCollectionParams}) + nodeOptions: Object.assign(migrationX509Options.donor, { + setParameter: { + // Allow non-timestamped reads on donor after migration completes for testing. + 'failpoint.tenantMigrationDonorAllowsNonTimestampedReads': tojson({mode: 'alwaysOn'}), + } + }) }); +const recipientRst = + new ReplSetTest({nodes: 1, name: "recipient", nodeOptions: migrationX509Options.recipient}); donorRst.startSet(); donorRst.initiate(); @@ -50,12 +44,12 @@ if (!tenantMigrationTest.isFeatureFlagEnabled()) { } const kTenantId = "testTenantId"; -const kDbName = kTenantId + "_" + - "testDb"; +const kDbName = tenantMigrationTest.tenantDB(kTenantId, "testDb"); const kCollName = "testColl"; const kNs = `${kDbName}.${kCollName}`; const donorPrimary = donorRst.getPrimary(); +const recipientPrimary = recipientRst.getPrimary(); const configTxnColl = donorPrimary.getCollection("config.transactions"); assert.commandWorked(donorPrimary.getCollection(kNs).insert( @@ -191,166 +185,6 @@ sessionsOnDonor.push({ numOplogEntries: 2, // one pre-image oplog entry. }); -jsTest.log("Run a migration to completion"); -const migrationId = UUID(); -const migrationOpts = { - migrationIdString: extractUUIDFromObject(migrationId), - tenantId: kTenantId, -}; -TenantMigrationTest.assertCommitted(tenantMigrationTest.runMigration(migrationOpts)); - -const donorDoc = - donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS).findOne({tenantId: kTenantId}); - -tenantMigrationTest.waitForMigrationGarbageCollection(migrationId, kTenantId); - -// Test the aggregation pipeline the recipient would use for getting the oplog chain where -// "ts" < "startFetchingOpTime" for all retryable writes entries in config.transactions. The -// recipient would use the real "startFetchingOpTime", but this test uses the donor's commit -// timestamp as a substitute. -const startFetchingTimestamp = donorDoc.commitOrAbortOpTime.ts; - -jsTest.log("Run retryable write after the migration"); -const lsid7 = { - id: UUID() -}; -const sessionTag7 = "retryable insert after migration"; -// Make sure this write is in the majority snapshot. -assert.commandWorked(donorPrimary.getDB(kDbName).runCommand({ - insert: kCollName, - documents: [{_id: 7, x: 7, tag: sessionTag7}], - txnNumber: NumberLong(0), - lsid: lsid7, - writeConcern: {w: "majority"} -})); - -const lsid8 = { - id: UUID() -}; -const sessionTag8 = "retryable findAndModify update after migration"; -assert.commandWorked(donorPrimary.getDB(kDbName).runCommand({ - findAndModify: kCollName, - query: {x: 7}, - update: {$set: {tag: sessionTag8}}, - new: true, - txnNumber: NumberLong(0), - lsid: lsid8, - writeConcern: {w: "majority"} -})); - -// The aggregation pipeline will return an array of retryable writes oplog entries (pre-image/ -// post-image oplog entries included) with "ts" < "startFetchingTimestamp". -const aggRes = donorPrimary.getDB("config").runCommand({ - aggregate: "transactions", - pipeline: [ - // Fetch the config.transactions entries that do not have a "state" field, which indicates a - // retryable write. - {$match: {"state": {$exists: false}}}, - // Fetch latest oplog entry for each config.transactions entry from the oplog view. - {$lookup: { - from: {db: "local", coll: "system.tenantMigration.oplogView"}, - let: { tenant_ts: "$lastWriteOpTime.ts"}, - pipeline: [{ - $match: { - $expr: { - $and: [ - {$regexMatch: { - input: "$ns", - regex: new RegExp(`^${kTenantId}_`) - }}, - {$eq: [ "$ts", "$$tenant_ts"]} - ] - } - } - }], - // This array is expected to contain exactly one element if `ns` contains - // `kTenantId`. Otherwise, it will be empty. - as: "lastOps" - }}, - // Entries that don't have the correct `ns` will return an empty `lastOps` array. Filter - // these results before the next stage. - {$match: {"lastOps": {$ne: [] }}}, - // All remaining results should correspond to the correct `kTenantId`. Replace the - // single-element 'lastOps' array field with a single 'lastOp' field. - {$addFields: {lastOp: {$first: "$lastOps"}}}, - {$unset: "lastOps"}, - // Fetch the preImage oplog entry for findAndModify from the oplog view only if it occurred - // before `startFetchingTimestamp`. - {$lookup: { - from: {db: "local", coll: "system.tenantMigration.oplogView"}, - let: { preimage_ts: "$lastOp.preImageOpTime.ts"}, - pipeline: [{ - $match: { - $expr: { - $and: [ - {$eq: [ "$ts", "$$preimage_ts"]}, - {$lt: ["$ts", startFetchingTimestamp]} - ] - } - } - }], - // This array is expected to contain exactly one element if the 'preImageOpTime' - // field is not null. - as: "preImageOps" - }}, - // Fetch the postImage oplog entry for findAndModify from the oplog view only if it occurred - // before `startFetchingTimestamp`. - {$lookup: { - from: {db: "local", coll: "system.tenantMigration.oplogView"}, - let: { postimage_ts: "$lastOp.postImageOpTime.ts"}, - pipeline: [{ - $match: { - $expr: { - $and: [ - {$eq: [ "$ts", "$$postimage_ts"]}, - {$lt: ["$ts", startFetchingTimestamp]} - ] - } - } - }], - // This array is expected to contain exactly one element if the 'postImageOpTime' - // field is not null. - as: "postImageOps" - }}, - // Fetch oplog entries in each chain for insert, update, or delete from the oplog view. - {$graphLookup: { - from: {db: "local", coll: "system.tenantMigration.oplogView"}, - startWith: "$lastOp.ts", - connectFromField: "prevOpTime.ts", - connectToField: "ts", - as: "history", - depthField: "depthForTenantMigration" - }}, - // Now that we have the whole chain, filter out entries that occurred after - // `startFetchingTimestamp`, since these entries will be fetched during the oplog fetching - // phase. - {$set: { - history: { - $filter: { - input: "$history", - cond: {$lt: ["$$this.ts", startFetchingTimestamp]} - } - } - }}, - // Combine the oplog entries. - {$set: {history: {$concatArrays: ["$preImageOps", "$history", "$postImageOps"]}}}, - // Fetch the complete oplog entries and unwind oplog entries in each chain to the top-level - // array. - {$lookup: { - from: {db: "local", coll: "oplog.rs"}, - localField: "history.ts", - foreignField: "ts", - // This array is expected to contain exactly one element. - as: "completeOplogEntry" - }}, - // Unwind oplog entries in each chain to the top-level array. - {$unwind: "$completeOplogEntry"}, - {$replaceRoot: {newRoot: "$completeOplogEntry"}}, - ], - readConcern: {level: "majority"}, - cursor: {}, -}); - // Example oplog entries output for the retryable findAndModify in session 'lsid6' where the first // one is its pre-image oplog entry. // { @@ -404,29 +238,90 @@ const aggRes = donorPrimary.getDB("config").runCommand({ // } // } -// Verify that the aggregation command returned the expected number of oplog entries. +const migrationId = UUID(); +const migrationOpts = { + migrationIdString: extractUUIDFromObject(migrationId), + tenantId: kTenantId, +}; + +const fpAfterRetrievingStartOpTimesMigrationRecipientInstance = configureFailPoint( + recipientPrimary, "fpAfterRetrievingStartOpTimesMigrationRecipientInstance", {action: "hang"}); +const fpAfterFetchingRetryableWritesEntriesBeforeStartOpTime = configureFailPoint( + recipientPrimary, "fpAfterFetchingRetryableWritesEntriesBeforeStartOpTime", {action: "hang"}); +const fpAfterDataConsistentMigrationRecipientInstance = configureFailPoint( + recipientPrimary, "fpAfterDataConsistentMigrationRecipientInstance", {action: "hang"}); + +jsTestLog(`Starting tenant migration: ${tojson(migrationOpts)}`); +assert.commandWorked(tenantMigrationTest.startMigration(migrationOpts)); + +// Wait for recipient to get the startFetchingTimestamp. +fpAfterRetrievingStartOpTimesMigrationRecipientInstance.wait(); + +// Do retryable writes after retrieving startFetchingTimestamp, these writes should not appear in +// the oplog buffer in the pre-fetch stage, but should exit after tenant migration is consistent. +const lsid7 = { + id: UUID() +}; +const sessionTag7 = "retryable insert after retrieving startFetchingTimestamp"; +assert.commandWorked(donorPrimary.getDB(kDbName).runCommand({ + insert: kCollName, + documents: [{_id: 7, x: 7, tag: sessionTag7}], + txnNumber: NumberLong(0), + lsid: lsid7 +})); + +// Wait for retryable writes to be fetched and inserted into oplog buffer prior to cloning. +fpAfterRetrievingStartOpTimesMigrationRecipientInstance.off(); +fpAfterFetchingRetryableWritesEntriesBeforeStartOpTime.wait(); + +const kOplogBufferNS = "config.repl.migration.oplog_" + migrationOpts.migrationIdString; +const recipientOplogBuffer = recipientPrimary.getCollection(kOplogBufferNS); +jsTestLog(`oplog buffer ns: ${kOplogBufferNS}`); + +// Verify that after pre-fetching retryable writes, the entries inserted into the oplog buffer +// are equal to the entries on the donor. +const findRes = recipientOplogBuffer.find().toArray(); +const expectedCount = sessionsOnDonor.reduce( + (numOplogEntries, sessionOnDonor) => sessionOnDonor.numOplogEntries + numOplogEntries, 0); assert.eq( - aggRes.cursor.firstBatch.length, - sessionsOnDonor.reduce( - (numOplogEntries, sessionOnDonor) => sessionOnDonor.numOplogEntries + numOplogEntries, 0)); + findRes.length, expectedCount, `Incorrect number of oplog buffer entries: ${tojson(findRes)}`); -for (let sessionOnDonor of sessionsOnDonor) { +for (const session of sessionsOnDonor) { // Find the returned oplog docs for the session. - const docs = aggRes.cursor.firstBatch.filter( - doc => bsonWoCompare(doc.lsid, sessionOnDonor.txnEntry._id) === 0); - assert.eq(docs.length, sessionOnDonor.numOplogEntries); + const docs = recipientOplogBuffer.find({"entry.lsid": session.txnEntry._id}).toArray(); + assert.eq(docs.length, session.numOplogEntries); docs.forEach(doc => { // Verify the doc corresponds to the right config.transactions entry. - assert.eq(doc.txnNumber, sessionOnDonor.txnEntry.txnNum); - + assert.eq(doc.entry.txnNumber, session.txnEntry.txnNum); // Verify that doc contains the right oplog entry. - if (sessionOnDonor.tag && doc.op != "n") { - assert.eq(getTagFromOplog(doc), sessionOnDonor.tag); + if (doc.entry.op === "n") { + assert.eq(session.containsPreImage || session.containsPostImage, true); + } else if (session.tag) { + assert.eq(getTagFromOplog(doc.entry), session.tag); } }); } +// Wait for tenant migration to be consistent. +fpAfterFetchingRetryableWritesEntriesBeforeStartOpTime.off(); +fpAfterDataConsistentMigrationRecipientInstance.wait(); + +// After tenant migration is consistent, the retryable writes done after startFetchingTimestamp +// should have been fetched and inserted into the oplog buffer. +const findRes2 = recipientOplogBuffer.find().toArray(); +const expectedCount2 = expectedCount + 1; +assert.eq(findRes2.length, expectedCount2); + +const docs2 = recipientOplogBuffer.find({"entry.lsid": getTxnEntry(lsid7)._id}).toArray(); +assert.eq(docs2.length, 1); +assert.eq(getTagFromOplog(docs2[0].entry), sessionTag7); + +// Wait for tenant migration to complete successfully. +fpAfterDataConsistentMigrationRecipientInstance.off(); +TenantMigrationTest.assertCommitted(tenantMigrationTest.waitForMigrationToComplete(migrationOpts)); +assert.commandWorked(tenantMigrationTest.forgetMigration(migrationOpts.migrationIdString)); + donorRst.stopSet(); recipientRst.stopSet(); })(); |