diff options
author | Wenbin Zhu <wenbin.zhu@mongodb.com> | 2021-06-09 18:10:29 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-06-09 18:43:54 +0000 |
commit | 577a79b14113874d64294a81ba97ca65522f019b (patch) | |
tree | cd726ff540644c60b2c6988292507bfbaf43124b | |
parent | 5a1ea618307b6f362202e283c3e180d8a71df3d5 (diff) | |
download | mongo-577a79b14113874d64294a81ba97ca65522f019b.tar.gz |
SERVER-57452 Remove the sort stage in retryable write pre-fetch as oplog buffer now allows unordered insert.
-rw-r--r-- | jstests/replsets/tenant_migration_retryable_write_retry.js | 28 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_util.cpp | 30 |
2 files changed, 9 insertions, 49 deletions
diff --git a/jstests/replsets/tenant_migration_retryable_write_retry.js b/jstests/replsets/tenant_migration_retryable_write_retry.js index 52a392dc7ae..95bdb4a7494 100644 --- a/jstests/replsets/tenant_migration_retryable_write_retry.js +++ b/jstests/replsets/tenant_migration_retryable_write_retry.js @@ -239,8 +239,7 @@ assert.commandWorked(donorPrimary.getDB(kDbName).runCommand({ })); // The aggregation pipeline will return an array of retryable writes oplog entries (pre-image/ -// post-image oplog entries included) with "ts" < "startFetchingTimestamp" and sorted in ascending -// order of "ts". +// post-image oplog entries included) with "ts" < "startFetchingTimestamp". const aggRes = donorPrimary.getDB("config").runCommand({ aggregate: "transactions", pipeline: [ @@ -322,25 +321,7 @@ const aggRes = donorPrimary.getDB("config").runCommand({ as: "history", depthField: "depthForTenantMigration" }}, - // Sort the oplog entries in each oplog chain. - {$set: { - history: {$reverseArray: {$reduce: { - input: "$history", - initialValue: {$range: [0, {$size: "$history"}]}, - in: {$concatArrays: [ - {$slice: ["$$value", "$$this.depthForTenantMigration"]}, - ["$$this"], - {$slice: [ - "$$value", - {$subtract: [ - {$add: ["$$this.depthForTenantMigration", 1]}, - {$size: "$history"}, - ]}, - ]}, - ]}, - }}}, - }}, - // Now that we have the whole sorted chain, filter out entries that occurred after + // Now that we have the whole chain, filter out entries that occurred after // `startFetchingTimestamp`, since these entries will be fetched during the oplog fetching // phase. {$set: { @@ -429,11 +410,6 @@ assert.eq( sessionsOnDonor.reduce( (numOplogEntries, sessionOnDonor) => sessionOnDonor.numOplogEntries + numOplogEntries, 0)); -// Verify that the oplog docs are sorted in ascending order of "ts". -for (let i = 1; i < aggRes.cursor.firstBatch.length; i++) { - assert.lt(0, bsonWoCompare(aggRes.cursor.firstBatch[i].ts, aggRes.cursor.firstBatch[i - 1].ts)); -} - for (let sessionOnDonor of sessionsOnDonor) { // Find the returned oplog docs for the session. const docs = aggRes.cursor.firstBatch.filter( diff --git a/src/mongo/db/repl/tenant_migration_util.cpp b/src/mongo/db/repl/tenant_migration_util.cpp index 3abf2fb95ac..dd9934c68d8 100644 --- a/src/mongo/db/repl/tenant_migration_util.cpp +++ b/src/mongo/db/repl/tenant_migration_util.cpp @@ -312,27 +312,11 @@ createRetryableWritesOplogFetchingPipelineForTenantMigrations( .firstElement(), expCtx)); - // 9. Sort the oplog entries in each oplog chain. The $reduce expression sorts the `history` - // array in ascending `depthForTenantMigration` order. The $reverseArray expression will - // give an array in ascending timestamp order. - stages.emplace_back(DocumentSourceAddFields::create(fromjson("{\ - history: {$reverseArray: {$reduce: {\ - input: '$history',\ - initialValue: {$range: [0, {$size: '$history'}]},\ - in: {$concatArrays: [\ - {$slice: ['$$value', '$$this.depthForTenantMigration']},\ - ['$$this'],\ - {$slice: [\ - '$$value',\ - {$subtract: [\ - {$add: ['$$this.depthForTenantMigration', 1]},\ - {$size: '$history'}]}]}]}}}}}"), - expCtx)); - - // 10. Filter out all oplog entries from the `history` array that occur after + // 9. Filter out all oplog entries from the `history` array that occur after // `startFetchingTimestamp`. Since the oplog fetching and application stages will already // capture entries after `startFetchingTimestamp`, we only need the earlier part of the oplog - // chain. + // chain. We do not need to sort the history after this since we will put the fetched entries + // into the oplog buffer collection, where entries are read in timestamp order. stages.emplace_back(DocumentSourceAddFields::create(fromjson("{\ history: {$filter: {\ input: '$history',\ @@ -340,13 +324,13 @@ createRetryableWritesOplogFetchingPipelineForTenantMigrations( "]}}}}"), expCtx)); - // 11. Combine the oplog entries. + // 10. Combine the oplog entries. stages.emplace_back(DocumentSourceAddFields::create(fromjson("{\ 'history': {$concatArrays: [\ '$preImageOps', '$postImageOps', '$history']}}"), expCtx)); - // 12. Fetch the complete oplog entries. `completeOplogEntry` is expected to contain exactly one + // 11. Fetch the complete oplog entries. `completeOplogEntry` is expected to contain exactly one // element. stages.emplace_back(DocumentSourceLookUp::createFromBson( Doc{{"$lookup", @@ -358,11 +342,11 @@ createRetryableWritesOplogFetchingPipelineForTenantMigrations( .firstElement(), expCtx)); - // 13. Unwind oplog entries in each chain to the top-level array. + // 12. Unwind oplog entries in each chain to the top-level array. stages.emplace_back( DocumentSourceUnwind::create(expCtx, "completeOplogEntry", false, boost::none)); - // 14. Replace root. + // 13. Replace root. stages.emplace_back(DocumentSourceReplaceRoot::createFromBson( fromjson("{$replaceRoot: {newRoot: '$completeOplogEntry'}}").firstElement(), expCtx)); |