summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWenbin Zhu <wenbin.zhu@mongodb.com>2021-06-09 18:10:29 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-06-09 18:43:54 +0000
commit577a79b14113874d64294a81ba97ca65522f019b (patch)
treecd726ff540644c60b2c6988292507bfbaf43124b
parent5a1ea618307b6f362202e283c3e180d8a71df3d5 (diff)
downloadmongo-577a79b14113874d64294a81ba97ca65522f019b.tar.gz
SERVER-57452 Remove the sort stage in retryable write pre-fetch as oplog buffer now allows unordered insert.
-rw-r--r--jstests/replsets/tenant_migration_retryable_write_retry.js28
-rw-r--r--src/mongo/db/repl/tenant_migration_util.cpp30
2 files changed, 9 insertions, 49 deletions
diff --git a/jstests/replsets/tenant_migration_retryable_write_retry.js b/jstests/replsets/tenant_migration_retryable_write_retry.js
index 52a392dc7ae..95bdb4a7494 100644
--- a/jstests/replsets/tenant_migration_retryable_write_retry.js
+++ b/jstests/replsets/tenant_migration_retryable_write_retry.js
@@ -239,8 +239,7 @@ assert.commandWorked(donorPrimary.getDB(kDbName).runCommand({
}));
// The aggregation pipeline will return an array of retryable writes oplog entries (pre-image/
-// post-image oplog entries included) with "ts" < "startFetchingTimestamp" and sorted in ascending
-// order of "ts".
+// post-image oplog entries included) with "ts" < "startFetchingTimestamp".
const aggRes = donorPrimary.getDB("config").runCommand({
aggregate: "transactions",
pipeline: [
@@ -322,25 +321,7 @@ const aggRes = donorPrimary.getDB("config").runCommand({
as: "history",
depthField: "depthForTenantMigration"
}},
- // Sort the oplog entries in each oplog chain.
- {$set: {
- history: {$reverseArray: {$reduce: {
- input: "$history",
- initialValue: {$range: [0, {$size: "$history"}]},
- in: {$concatArrays: [
- {$slice: ["$$value", "$$this.depthForTenantMigration"]},
- ["$$this"],
- {$slice: [
- "$$value",
- {$subtract: [
- {$add: ["$$this.depthForTenantMigration", 1]},
- {$size: "$history"},
- ]},
- ]},
- ]},
- }}},
- }},
- // Now that we have the whole sorted chain, filter out entries that occurred after
+ // Now that we have the whole chain, filter out entries that occurred after
// `startFetchingTimestamp`, since these entries will be fetched during the oplog fetching
// phase.
{$set: {
@@ -429,11 +410,6 @@ assert.eq(
sessionsOnDonor.reduce(
(numOplogEntries, sessionOnDonor) => sessionOnDonor.numOplogEntries + numOplogEntries, 0));
-// Verify that the oplog docs are sorted in ascending order of "ts".
-for (let i = 1; i < aggRes.cursor.firstBatch.length; i++) {
- assert.lt(0, bsonWoCompare(aggRes.cursor.firstBatch[i].ts, aggRes.cursor.firstBatch[i - 1].ts));
-}
-
for (let sessionOnDonor of sessionsOnDonor) {
// Find the returned oplog docs for the session.
const docs = aggRes.cursor.firstBatch.filter(
diff --git a/src/mongo/db/repl/tenant_migration_util.cpp b/src/mongo/db/repl/tenant_migration_util.cpp
index 3abf2fb95ac..dd9934c68d8 100644
--- a/src/mongo/db/repl/tenant_migration_util.cpp
+++ b/src/mongo/db/repl/tenant_migration_util.cpp
@@ -312,27 +312,11 @@ createRetryableWritesOplogFetchingPipelineForTenantMigrations(
.firstElement(),
expCtx));
- // 9. Sort the oplog entries in each oplog chain. The $reduce expression sorts the `history`
- // array in ascending `depthForTenantMigration` order. The $reverseArray expression will
- // give an array in ascending timestamp order.
- stages.emplace_back(DocumentSourceAddFields::create(fromjson("{\
- history: {$reverseArray: {$reduce: {\
- input: '$history',\
- initialValue: {$range: [0, {$size: '$history'}]},\
- in: {$concatArrays: [\
- {$slice: ['$$value', '$$this.depthForTenantMigration']},\
- ['$$this'],\
- {$slice: [\
- '$$value',\
- {$subtract: [\
- {$add: ['$$this.depthForTenantMigration', 1]},\
- {$size: '$history'}]}]}]}}}}}"),
- expCtx));
-
- // 10. Filter out all oplog entries from the `history` array that occur after
+ // 9. Filter out all oplog entries from the `history` array that occur after
// `startFetchingTimestamp`. Since the oplog fetching and application stages will already
// capture entries after `startFetchingTimestamp`, we only need the earlier part of the oplog
- // chain.
+ // chain. We do not need to sort the history after this since we will put the fetched entries
+ // into the oplog buffer collection, where entries are read in timestamp order.
stages.emplace_back(DocumentSourceAddFields::create(fromjson("{\
history: {$filter: {\
input: '$history',\
@@ -340,13 +324,13 @@ createRetryableWritesOplogFetchingPipelineForTenantMigrations(
"]}}}}"),
expCtx));
- // 11. Combine the oplog entries.
+ // 10. Combine the oplog entries.
stages.emplace_back(DocumentSourceAddFields::create(fromjson("{\
'history': {$concatArrays: [\
'$preImageOps', '$postImageOps', '$history']}}"),
expCtx));
- // 12. Fetch the complete oplog entries. `completeOplogEntry` is expected to contain exactly one
+ // 11. Fetch the complete oplog entries. `completeOplogEntry` is expected to contain exactly one
// element.
stages.emplace_back(DocumentSourceLookUp::createFromBson(
Doc{{"$lookup",
@@ -358,11 +342,11 @@ createRetryableWritesOplogFetchingPipelineForTenantMigrations(
.firstElement(),
expCtx));
- // 13. Unwind oplog entries in each chain to the top-level array.
+ // 12. Unwind oplog entries in each chain to the top-level array.
stages.emplace_back(
DocumentSourceUnwind::create(expCtx, "completeOplogEntry", false, boost::none));
- // 14. Replace root.
+ // 13. Replace root.
stages.emplace_back(DocumentSourceReplaceRoot::createFromBson(
fromjson("{$replaceRoot: {newRoot: '$completeOplogEntry'}}").firstElement(), expCtx));