summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWenbin Zhu <wenbin.zhu@mongodb.com>2021-06-08 21:00:45 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-06-09 02:28:16 +0000
commit047e62422aeb20f6a5bd476ed970d4deb8235237 (patch)
tree6dab9e869824b8e660248cd4fc3822e2a668dbd7
parent0f6540f8c28cfff085e413ab96a19f849191ea3c (diff)
downloadmongo-047e62422aeb20f6a5bd476ed970d4deb8235237.tar.gz
SERVER-56631 Make sure retryable write pre-fetch phase can see the config.transactions record when committed snapshot is not at batch boundary.
(cherry picked from commit 18f91c4304086b0334d90c1d94a7d3c7225439bf)
-rw-r--r--buildscripts/resmokeconfig/suites/replica_sets_update_v1_oplog.yml1
-rw-r--r--jstests/replsets/tenant_migration_recipient_fetches_retryable_writes_entry_after_committed_snapshot.js244
-rw-r--r--jstests/replsets/tenant_migration_retryable_write_retry.js22
-rw-r--r--src/mongo/client/dbclient_cursor.cpp23
-rw-r--r--src/mongo/client/dbclient_cursor.h6
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.cpp36
-rw-r--r--src/mongo/db/repl/tenant_migration_util.cpp24
-rw-r--r--src/mongo/shell/replsettest.js25
8 files changed, 348 insertions, 33 deletions
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_update_v1_oplog.yml b/buildscripts/resmokeconfig/suites/replica_sets_update_v1_oplog.yml
index b22e43d140f..ac1c8b24ec2 100644
--- a/buildscripts/resmokeconfig/suites/replica_sets_update_v1_oplog.yml
+++ b/buildscripts/resmokeconfig/suites/replica_sets_update_v1_oplog.yml
@@ -7,6 +7,7 @@ selector:
# Expects oplog entries to be in $v:2 format.
- jstests/replsets/v2_delta_oplog_entries.js
- jstests/replsets/rollback_with_coalesced_txn_table_updates_during_oplog_application.js
+ - jstests/replsets/tenant_migration_recipient_fetches_retryable_writes_entry_after_committed_snapshot.js
executor:
config:
diff --git a/jstests/replsets/tenant_migration_recipient_fetches_retryable_writes_entry_after_committed_snapshot.js b/jstests/replsets/tenant_migration_recipient_fetches_retryable_writes_entry_after_committed_snapshot.js
new file mode 100644
index 00000000000..fdeba25b524
--- /dev/null
+++ b/jstests/replsets/tenant_migration_recipient_fetches_retryable_writes_entry_after_committed_snapshot.js
@@ -0,0 +1,244 @@
+/**
+ * Tests that the tenant migration recipient correctly reads 'config.transactions' entries from a
+ * donor secondary. During secondary oplog application, updates to the same 'config.transactions'
+ * entry are coalesced in a single update of the most recent retryable write statement. If the
+ * majority committed snapshot of a secondary exists in the middle of a completed batch, then a
+ * recipient's majority read on 'config.transactions' can miss committed retryable writes at that
+ * majority commit point.
+ *
+ * @tags: [
+ * requires_fcv_50,
+ * requires_majority_read_concern,
+ * incompatible_with_eft,
+ * incompatible_with_windows_tls,
+ * incompatible_with_macos, requires_persistence
+ * ]
+ */
+
+(function() {
+load("jstests/replsets/libs/tenant_migration_test.js");
+load("jstests/replsets/libs/tenant_migration_util.js");
+load("jstests/libs/fail_point_util.js"); // For configureFailPoint().
+load("jstests/libs/uuid_util.js"); // For extractUUIDFromObject().
+load("jstests/libs/write_concern_util.js");
+
+const getRecipientCurrOp = function(conn, migrationId) {
+ const res = conn.adminCommand({currentOp: true, desc: "tenant recipient migration"});
+ assert.eq(res.inprog.length, 1);
+ const currOp = res.inprog[0];
+ assert.eq(bsonWoCompare(currOp.instanceID, migrationId), 0);
+
+ return currOp;
+};
+
+const getDonorSyncSource = function(conn, migrationId) {
+ const currOp = getRecipientCurrOp(conn, migrationId);
+ return currOp.donorSyncSource;
+};
+
+const getStartFetchingDonorOpTime = function(conn, migrationId) {
+ const currOp = getRecipientCurrOp(conn, migrationId);
+ return currOp.startFetchingDonorOpTime;
+};
+
+const oplogApplierBatchSize = 50;
+
+const donorRst = new ReplSetTest({
+ nodes: {
+ n0: {},
+ // Set the 'syncdelay' to 1s to speed up checkpointing. Also explicitly set the batch
+ // size for oplog application to ensure the number of retryable write statements being
+ // made majority committed isn't a multiple of it.
+ n1: {syncdelay: 1, setParameter: {replBatchLimitOperations: oplogApplierBatchSize}},
+ // Set the bgSyncOplogFetcherBatchSize to 1 oplog entry to guarantee replication
+ // progress with the stopReplProducerOnDocument failpoint.
+ n2: {rsConfig: {priority: 0, hidden: true}, setParameter: {bgSyncOplogFetcherBatchSize: 1}},
+ n3: {rsConfig: {priority: 0, hidden: true}, setParameter: {bgSyncOplogFetcherBatchSize: 1}},
+ n4: {rsConfig: {priority: 0, hidden: true}, setParameter: {bgSyncOplogFetcherBatchSize: 1}},
+ },
+ // Force secondaries to sync from the primary to guarantee replication progress with the
+ // stopReplProducerOnDocument failpoint. Also disable primary catchup because some replicated
+ // retryable write statements are intentionally not being made majority committed.
+ settings: {chainingAllowed: false, catchUpTimeoutMillis: 0},
+ nodeOptions: Object.assign(TenantMigrationUtil.makeX509OptionsForTest().donor, {
+ setParameter: {
+ tenantMigrationExcludeDonorHostTimeoutMS: 30 * 1000,
+ // Allow non-timestamped reads on donor after migration completes for testing.
+ 'failpoint.tenantMigrationDonorAllowsNonTimestampedReads': tojson({mode: 'alwaysOn'}),
+ }
+ }),
+});
+donorRst.startSet();
+donorRst.initiateWithHighElectionTimeout();
+const donorPrimary = donorRst.getPrimary();
+
+if (!TenantMigrationUtil.isFeatureFlagEnabled(donorPrimary)) {
+ jsTestLog("Skipping test because the tenant migrations feature flag is disabled");
+ donorRst.stopSet();
+ return;
+}
+
+const tenantMigrationTest = new TenantMigrationTest({name: jsTestName(), donorRst: donorRst});
+
+const recipientPrimary = tenantMigrationTest.getRecipientPrimary();
+const kTenantId = "testTenantId";
+const migrationId = UUID();
+const kDbName = tenantMigrationTest.tenantDB(kTenantId, "testDB");
+const kCollName = "retryable_write_secondary_oplog_application";
+const kNs = `${kDbName}.${kCollName}`;
+
+const migrationOpts = {
+ migrationIdString: extractUUIDFromObject(migrationId),
+ tenantId: kTenantId,
+ // The recipient needs to choose a donor secondary as sync source.
+ readPreference: {mode: 'secondary'},
+};
+
+const fpAfterConnectingTenantMigrationRecipientInstance = configureFailPoint(
+ recipientPrimary, "fpAfterConnectingTenantMigrationRecipientInstance", {action: "hang"});
+
+const fpBeforeWaitingForRetryableWritePreFetchMajorityCommitted = configureFailPoint(
+ recipientPrimary, "fpBeforeWaitingForRetryableWritePreFetchMajorityCommitted");
+
+// Start tenant migration and hang after recipient connects to donor sync source.
+jsTestLog(`Starting tenant migration: ${tojson(migrationOpts)}`);
+assert.commandWorked(tenantMigrationTest.startMigration(migrationOpts));
+fpAfterConnectingTenantMigrationRecipientInstance.wait();
+
+// Recipient should connect to secondary1 as other secondaries are hidden.
+const [secondary1, secondary2, secondary3, secondary4] = donorRst.getSecondaries();
+const syncSourceSecondaryHost = getDonorSyncSource(recipientPrimary, migrationId);
+assert.eq(syncSourceSecondaryHost, secondary1.host);
+
+assert.commandWorked(
+ donorPrimary.getCollection(kNs).insert({_id: 0, counter: 0}, {writeConcern: {w: 5}}));
+
+// The default WC is majority and the donor replica set can't satisfy majority writes after we
+// stop replication on the secondaries.
+assert.commandWorked(donorPrimary.adminCommand(
+ {setDefaultRWConcern: 1, defaultWriteConcern: {w: 1}, writeConcern: {w: "majority"}}));
+donorRst.awaitReplication();
+
+// Disable replication on all of the secondaries to manually control the replication progress.
+const stopReplProducerFailpoints = [secondary1, secondary2, secondary3, secondary4].map(
+ conn => configureFailPoint(conn, 'stopReplProducer'));
+
+// While replication is still entirely disabled, additionally disable replication partway
+// into the retryable write on other secondaries. The idea is that while secondary1 will
+// apply all of the oplog entries in a single batch, other secondaries will only apply up
+// to counterMajorityCommitted oplog entries.
+const counterTotal = oplogApplierBatchSize;
+const counterMajorityCommitted = counterTotal - 2;
+jsTestLog(`counterTotal: ${counterTotal}, counterMajorityCommitted: ${counterMajorityCommitted}`);
+const stopReplProducerOnDocumentFailpoints = [secondary2, secondary3, secondary4].map(
+ conn => configureFailPoint(conn,
+ 'stopReplProducerOnDocument',
+ {document: {"diff.u.counter": counterMajorityCommitted + 1}}));
+
+// Perform all the retryable write statements on donor primary.
+const lsid = ({id: UUID()});
+assert.commandWorked(donorPrimary.getCollection(kNs).runCommand("update", {
+ updates: Array.from({length: counterTotal}, () => ({q: {_id: 0}, u: {$inc: {counter: 1}}})),
+ lsid,
+ txnNumber: NumberLong(1),
+}));
+
+// Get the majority committed and last oplog entry of the respective retryable write statements.
+const stmtTotal =
+ donorPrimary.getCollection("local.oplog.rs").findOne({"o.diff.u.counter": counterTotal});
+const stmtMajorityCommitted = donorPrimary.getCollection("local.oplog.rs").findOne({
+ "o.diff.u.counter": counterMajorityCommitted
+});
+
+assert.neq(null, stmtTotal);
+assert.neq(null, stmtMajorityCommitted);
+jsTestLog(`stmtTotal timestamp: ${tojson(stmtTotal.ts)}`);
+jsTestLog(`stmtMajorityCommitted timestamp: ${tojson(stmtMajorityCommitted.ts)}`);
+
+for (const fp of stopReplProducerFailpoints) {
+ fp.off();
+ // Wait for secondary1 to have applied through the counterTotal retryable write statement and
+ // other secondaries applied through the counterMajorityCommitted retryable write statement,
+ // to guarantee that secondary1 will advance its stable_timestamp when learning of the other
+ // secondaries also having applied through counterMajorityCommitted.
+ assert.soon(() => {
+ const {optimes: {appliedOpTime, durableOpTime}} =
+ assert.commandWorked(fp.conn.adminCommand({replSetGetStatus: 1}));
+
+ print(`${fp.conn.host}: ${tojsononeline({
+ appliedOpTime,
+ durableOpTime,
+ stmtMajorityCommittedTimestamp: stmtMajorityCommitted.ts
+ })}`);
+
+ const stmtTarget = (fp.conn.host === secondary1.host) ? stmtTotal : stmtMajorityCommitted;
+
+ return bsonWoCompare(appliedOpTime.ts, stmtTarget.ts) >= 0 &&
+ bsonWoCompare(durableOpTime.ts, stmtTarget.ts) >= 0;
+ });
+}
+
+// Wait for secondary1 to have advanced its stable timestamp, and therefore updated the
+// committed snapshot.
+assert.soon(() => {
+ const {lastStableRecoveryTimestamp} =
+ assert.commandWorked(secondary1.adminCommand({replSetGetStatus: 1}));
+
+ print(`${secondary1.host}: ${tojsononeline({
+ lastStableRecoveryTimestamp,
+ stmtMajorityCommittedTimestamp: stmtMajorityCommitted.ts
+ })}`);
+
+ return bsonWoCompare(lastStableRecoveryTimestamp, stmtMajorityCommitted.ts) >= 0;
+});
+
+// Wait before tenant migration starts to wait for the retryable write pre-fetch result to be
+// majority committed.
+fpAfterConnectingTenantMigrationRecipientInstance.off();
+fpBeforeWaitingForRetryableWritePreFetchMajorityCommitted.wait();
+
+const startFetchingDonorOpTime = getStartFetchingDonorOpTime(recipientPrimary, migrationId);
+assert.eq(startFetchingDonorOpTime.ts, stmtMajorityCommitted.ts);
+
+// At this point, the recipient should have fetched retryable writes and put them into the
+// oplog buffer.
+const kOplogBufferNS = "config.repl.migration.oplog_" + migrationOpts.migrationIdString;
+const recipientOplogBuffer = recipientPrimary.getCollection(kOplogBufferNS);
+jsTestLog(`oplog buffer ns: ${kOplogBufferNS}`);
+
+// Number of entries fetched into oplog buffer is the majority committed count - 1 since we only
+// fetch entries that occur before startFetchingDonorOpTime, which is equal to the commit point.
+const cursor = recipientOplogBuffer.find();
+const expectedCount = counterMajorityCommitted - 1;
+assert.eq(
+ cursor.itcount(), expectedCount, `Incorrect number of oplog entries: ${cursor.toArray()}`);
+
+// Resume replication on all the secondaries and wait for migration to complete.
+for (const fp of stopReplProducerOnDocumentFailpoints) {
+ fp.off();
+}
+
+fpBeforeWaitingForRetryableWritePreFetchMajorityCommitted.off();
+
+TenantMigrationTest.assertCommitted(tenantMigrationTest.waitForMigrationToComplete(migrationOpts));
+assert.commandWorked(tenantMigrationTest.forgetMigration(migrationOpts.migrationIdString));
+
+// After migration, verify that if we perform the same retryable write statements on the recipient,
+// these statements will not be executed.
+let docAfterMigration = recipientPrimary.getCollection(kNs).findOne({_id: 0});
+assert.eq(docAfterMigration.counter, counterTotal);
+
+assert.commandWorked(recipientPrimary.getCollection(kNs).runCommand("update", {
+ updates: Array.from({length: counterTotal}, () => ({q: {_id: 0}, u: {$inc: {counter: 1}}})),
+ lsid,
+ txnNumber: NumberLong(1),
+}));
+
+// The second query should return the same result as first one, since the recipient should have
+// fetched the executed retryable writes from donor.
+docAfterMigration = recipientPrimary.getCollection(kNs).findOne({_id: 0});
+assert.eq(docAfterMigration.counter, counterTotal);
+
+donorRst.stopSet();
+tenantMigrationTest.stop();
+})();
diff --git a/jstests/replsets/tenant_migration_retryable_write_retry.js b/jstests/replsets/tenant_migration_retryable_write_retry.js
index 3ad8e8e78b7..52a392dc7ae 100644
--- a/jstests/replsets/tenant_migration_retryable_write_retry.js
+++ b/jstests/replsets/tenant_migration_retryable_write_retry.js
@@ -322,17 +322,6 @@ const aggRes = donorPrimary.getDB("config").runCommand({
as: "history",
depthField: "depthForTenantMigration"
}},
- // Now that we have the whole chain, filter out entries that occurred after
- // `startFetchingTimestamp`, since these entries will be fetched during the oplog fetching
- // phase.
- {$set: {
- history: {
- $filter: {
- input: "$history",
- cond: {$lt: ["$$this.ts", startFetchingTimestamp]}
- }
- }
- }},
// Sort the oplog entries in each oplog chain.
{$set: {
history: {$reverseArray: {$reduce: {
@@ -351,6 +340,17 @@ const aggRes = donorPrimary.getDB("config").runCommand({
]},
}}},
}},
+ // Now that we have the whole sorted chain, filter out entries that occurred after
+ // `startFetchingTimestamp`, since these entries will be fetched during the oplog fetching
+ // phase.
+ {$set: {
+ history: {
+ $filter: {
+ input: "$history",
+ cond: {$lt: ["$$this.ts", startFetchingTimestamp]}
+ }
+ }
+ }},
// Combine the oplog entries.
{$set: {history: {$concatArrays: ["$preImageOps", "$history", "$postImageOps"]}}},
// Fetch the complete oplog entries and unwind oplog entries in each chain to the top-level
diff --git a/src/mongo/client/dbclient_cursor.cpp b/src/mongo/client/dbclient_cursor.cpp
index 88bbb6ce478..caccb6bb165 100644
--- a/src/mongo/client/dbclient_cursor.cpp
+++ b/src/mongo/client/dbclient_cursor.cpp
@@ -541,14 +541,16 @@ DBClientCursor::DBClientCursor(DBClientBase* client,
queryOptions,
batchSize,
{},
- readConcernObj) {}
+ readConcernObj,
+ boost::none) {}
DBClientCursor::DBClientCursor(DBClientBase* client,
const NamespaceStringOrUUID& nsOrUuid,
long long cursorId,
int nToReturn,
int queryOptions,
- std::vector<BSONObj> initialBatch)
+ std::vector<BSONObj> initialBatch,
+ boost::optional<Timestamp> operationTime)
: DBClientCursor(client,
nsOrUuid,
BSONObj(), // query
@@ -559,7 +561,8 @@ DBClientCursor::DBClientCursor(DBClientBase* client,
queryOptions,
0,
std::move(initialBatch), // batchSize
- boost::none) {}
+ boost::none,
+ operationTime) {}
DBClientCursor::DBClientCursor(DBClientBase* client,
const NamespaceStringOrUUID& nsOrUuid,
@@ -571,7 +574,8 @@ DBClientCursor::DBClientCursor(DBClientBase* client,
int queryOptions,
int batchSize,
std::vector<BSONObj> initialBatch,
- boost::optional<BSONObj> readConcernObj)
+ boost::optional<BSONObj> readConcernObj,
+ boost::optional<Timestamp> operationTime)
: batch{std::move(initialBatch)},
_client(client),
_originalHost(_client->getServerAddress()),
@@ -589,7 +593,8 @@ DBClientCursor::DBClientCursor(DBClientBase* client,
cursorId(cursorId),
_ownCursor(true),
wasError(false),
- _readConcernObj(readConcernObj) {
+ _readConcernObj(readConcernObj),
+ _operationTime(operationTime) {
if (queryOptions & QueryOptionLocal_forceOpQuery) {
// Legacy OP_QUERY does not support UUIDs.
invariant(!_nsOrUuid.uuid());
@@ -617,12 +622,18 @@ StatusWith<std::unique_ptr<DBClientCursor>> DBClientCursor::fromAggregationReque
firstBatch.emplace_back(elem.Obj().getOwned());
}
+ boost::optional<Timestamp> operationTime = boost::none;
+ if (ret.hasField(LogicalTime::kOperationTimeFieldName)) {
+ operationTime = LogicalTime::fromOperationTime(ret).asTimestamp();
+ }
+
return {std::make_unique<DBClientCursor>(client,
aggRequest.getNamespace(),
cursorId,
0,
useExhaust ? QueryOption_Exhaust : 0,
- firstBatch)};
+ firstBatch,
+ operationTime)};
}
DBClientCursor::~DBClientCursor() {
diff --git a/src/mongo/client/dbclient_cursor.h b/src/mongo/client/dbclient_cursor.h
index ff79874fb6e..b4a8fcb4007 100644
--- a/src/mongo/client/dbclient_cursor.h
+++ b/src/mongo/client/dbclient_cursor.h
@@ -161,7 +161,8 @@ public:
long long cursorId,
int nToReturn,
int options,
- std::vector<BSONObj> initialBatch = {});
+ std::vector<BSONObj> initialBatch = {},
+ boost::optional<Timestamp> operationTime = boost::none);
static StatusWith<std::unique_ptr<DBClientCursor>> fromAggregationRequest(
DBClientBase* client,
@@ -289,7 +290,8 @@ private:
int queryOptions,
int bs,
std::vector<BSONObj> initialBatch,
- boost::optional<BSONObj> readConcernObj);
+ boost::optional<BSONObj> readConcernObj,
+ boost::optional<Timestamp> operationTime);
int nextBatchSize();
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
index 28a054c8425..f76872fc95e 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
@@ -143,6 +143,7 @@ MONGO_FAIL_POINT_DEFINE(fpAfterRecordingRecipientPrimaryStartingFCV);
MONGO_FAIL_POINT_DEFINE(fpAfterComparingRecipientAndDonorFCV);
MONGO_FAIL_POINT_DEFINE(fpAfterRetrievingStartOpTimesMigrationRecipientInstance);
MONGO_FAIL_POINT_DEFINE(fpSetSmallAggregationBatchSize);
+MONGO_FAIL_POINT_DEFINE(fpBeforeWaitingForRetryableWritePreFetchMajorityCommitted);
MONGO_FAIL_POINT_DEFINE(pauseAfterRetrievingRetryableWritesBatch);
MONGO_FAIL_POINT_DEFINE(fpAfterFetchingRetryableWritesEntriesBeforeStartOpTime);
MONGO_FAIL_POINT_DEFINE(fpAfterStartingOplogFetcherMigrationRecipientInstance);
@@ -1172,11 +1173,21 @@ TenantMigrationRecipientService::Instance::_fetchRetryableWritesOplogBeforeStart
AggregateCommandRequest aggRequest(NamespaceString::kSessionTransactionsTableNamespace,
std::move(serializedPipeline));
+ // Use local read concern. This is because secondary oplog application coalesces multiple
+ // updates to the same config.transactions record into a single update of the most recent
+ // retryable write statement, and since after SERVER-47844, the committed snapshot of a
+ // secondary can be in the middle of batch, the combination of these two makes secondary
+ // majority reads on config.transactions not always reflect committed retryable writes at
+ // that majority commit point. So we need to do a local read to fetch the retryable writes
+ // so that we don't miss the config.transactions record and later do a majority read on the
+ // donor's last applied operationTime to make sure the fetched results are majority committed.
auto readConcernArgs = repl::ReadConcernArgs(
- boost::optional<repl::ReadConcernLevel>(repl::ReadConcernLevel::kMajorityReadConcern));
+ boost::optional<repl::ReadConcernLevel>(repl::ReadConcernLevel::kLocalReadConcern));
aggRequest.setReadConcern(readConcernArgs.toBSONInner());
// We must set a writeConcern on internal commands.
aggRequest.setWriteConcern(WriteConcernOptions());
+ // Allow aggregation to write to temporary files in case it reaches memory restriction.
+ aggRequest.setAllowDiskUse(true);
// Failpoint to set a small batch size on the aggregation request.
if (MONGO_unlikely(fpSetSmallAggregationBatchSize.shouldFail())) {
@@ -1222,6 +1233,29 @@ TenantMigrationRecipientService::Instance::_fetchRetryableWritesOplogBeforeStart
}
}
+ // Do a majority read on the sync source to make sure the pre-fetch result exists on a
+ // majority of nodes in the set. The timestamp we wait on is the donor's last applied
+ // operationTime, which is guaranteed to be at batch boundary if the sync source is a
+ // secondary. We do not check the rollbackId - rollback would lead to the sync source
+ // closing connections so the migration would fail and retry.
+ auto operationTime = cursor->getOperationTime();
+ uassert(5663100,
+ "Donor operationTime not available in retryable write pre-fetch result.",
+ operationTime);
+ LOGV2_DEBUG(5663101,
+ 1,
+ "Waiting for retryable write pre-fetch result to be majority committed.",
+ "operationTime"_attr = operationTime);
+
+ fpBeforeWaitingForRetryableWritePreFetchMajorityCommitted.pauseWhileSet();
+
+ BSONObj readResult;
+ BSONObj cmd = ClonerUtils::buildMajorityWaitRequest(*operationTime);
+ _client.get()->runCommand("admin", cmd, readResult, QueryOption_SecondaryOk);
+ uassertStatusOKWithContext(
+ getStatusFromCommandResult(readResult),
+ "Failed to wait for retryable writes pre-fetch result majority committed");
+
// Update _stateDoc to indicate that we've finished the retryable writes oplog entry fetching
// stage.
stdx::lock_guard lk(_mutex);
diff --git a/src/mongo/db/repl/tenant_migration_util.cpp b/src/mongo/db/repl/tenant_migration_util.cpp
index 124f9209c8a..3abf2fb95ac 100644
--- a/src/mongo/db/repl/tenant_migration_util.cpp
+++ b/src/mongo/db/repl/tenant_migration_util.cpp
@@ -312,18 +312,7 @@ createRetryableWritesOplogFetchingPipelineForTenantMigrations(
.firstElement(),
expCtx));
- // 9. Filter out all oplog entries from the `history` array that occur after
- // `startFetchingTimestamp`. Since the oplog fetching and application stages will already
- // capture entries after `startFetchingTimestamp`, we only need the earlier part of the oplog
- // chain.
- stages.emplace_back(DocumentSourceAddFields::create(fromjson("{\
- history: {$filter: {\
- input: '$history',\
- cond: {$lt: ['$$this.ts', " + startFetchingTimestamp.toString() +
- "]}}}}"),
- expCtx));
-
- // 10. Sort the oplog entries in each oplog chain. The $reduce expression sorts the `history`
+ // 9. Sort the oplog entries in each oplog chain. The $reduce expression sorts the `history`
// array in ascending `depthForTenantMigration` order. The $reverseArray expression will
// give an array in ascending timestamp order.
stages.emplace_back(DocumentSourceAddFields::create(fromjson("{\
@@ -340,6 +329,17 @@ createRetryableWritesOplogFetchingPipelineForTenantMigrations(
{$size: '$history'}]}]}]}}}}}"),
expCtx));
+ // 10. Filter out all oplog entries from the `history` array that occur after
+ // `startFetchingTimestamp`. Since the oplog fetching and application stages will already
+ // capture entries after `startFetchingTimestamp`, we only need the earlier part of the oplog
+ // chain.
+ stages.emplace_back(DocumentSourceAddFields::create(fromjson("{\
+ history: {$filter: {\
+ input: '$history',\
+ cond: {$lt: ['$$this.ts', " + startFetchingTimestamp.toString() +
+ "]}}}}"),
+ expCtx));
+
// 11. Combine the oplog entries.
stages.emplace_back(DocumentSourceAddFields::create(fromjson("{\
'history': {$concatArrays: [\
diff --git a/src/mongo/shell/replsettest.js b/src/mongo/shell/replsettest.js
index 6efa534c401..a5fb0e06ca2 100644
--- a/src/mongo/shell/replsettest.js
+++ b/src/mongo/shell/replsettest.js
@@ -3255,8 +3255,11 @@ var ReplSetTest = function(opts) {
if (isObject(opts.nodes)) {
var len = 0;
for (var i in opts.nodes) {
+ // opts.nodeOptions and opts.nodes[i] may contain nested objects that have
+ // the same key, e.g. setParameter. So we need to recursively merge them.
+ // Object.assign and Object.merge do not merge nested objects of the same key.
var options = self.nodeOptions["n" + len] =
- Object.merge(opts.nodeOptions, opts.nodes[i]);
+ _deepObjectMerge(opts.nodeOptions, opts.nodes[i]);
if (i.startsWith("a")) {
options.arbiter = true;
}
@@ -3377,6 +3380,26 @@ var ReplSetTest = function(opts) {
} else {
_constructStartNewInstances(opts);
}
+
+ /**
+ * Recursively merge the target and source object.
+ */
+ function _deepObjectMerge(target, source) {
+ if (!(target instanceof Object)) {
+ return (source === undefined || source === null) ? target : source;
+ }
+
+ if (!(source instanceof Object)) {
+ return target;
+ }
+
+ let res = Object.assign({}, target);
+ Object.keys(source).forEach(k => {
+ res[k] = _deepObjectMerge(target[k], source[k]);
+ });
+
+ return res;
+ }
};
/**