summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorXueruiFa <xuerui.fa@mongodb.com>2021-01-20 20:42:01 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-02-05 05:20:16 +0000
commit709dca02b250194115e5a79fa6c6a4daa9b296a0 (patch)
treed7095dade8e6a8d73507855e56ae0a25470cccf7
parent1736293848e2a251db316bb5b4f529739d5e5a27 (diff)
downloadmongo-709dca02b250194115e5a79fa6c6a4daa9b296a0.tar.gz
SERVER-52720: Handle choosing a new donor node when startApplyingOpTime exists
-rw-r--r--jstests/replsets/libs/tenant_migration_test.js3
-rw-r--r--jstests/replsets/tenant_migration_recipient_cannot_sync_from_stale_donor_hosts.js131
-rw-r--r--jstests/replsets/tenant_migration_recipient_current_op.js3
-rw-r--r--jstests/replsets/tenant_migration_recipient_sync_source_no_available_donor_hosts.js154
-rw-r--r--src/mongo/client/streamable_replica_set_monitor.cpp6
-rw-r--r--src/mongo/client/streamable_replica_set_monitor.h3
-rw-r--r--src/mongo/db/repl/repl_server_parameters.idl10
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.cpp178
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.h25
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service_test.cpp133
10 files changed, 549 insertions, 97 deletions
diff --git a/jstests/replsets/libs/tenant_migration_test.js b/jstests/replsets/libs/tenant_migration_test.js
index 6062f40afe4..70d6c24c571 100644
--- a/jstests/replsets/libs/tenant_migration_test.js
+++ b/jstests/replsets/libs/tenant_migration_test.js
@@ -418,8 +418,7 @@ function TenantMigrationTest({
const db = this.getDonorPrimary().getDB(dbName);
const coll = db.getCollection(collName);
- assert.commandWorked(coll.insertMany(data));
- this.getDonorRst().awaitReplication();
+ assert.commandWorked(coll.insertMany(data, {writeConcern: {w: 'majority'}}));
};
/**
diff --git a/jstests/replsets/tenant_migration_recipient_cannot_sync_from_stale_donor_hosts.js b/jstests/replsets/tenant_migration_recipient_cannot_sync_from_stale_donor_hosts.js
new file mode 100644
index 00000000000..8fb22cf88e4
--- /dev/null
+++ b/jstests/replsets/tenant_migration_recipient_cannot_sync_from_stale_donor_hosts.js
@@ -0,0 +1,131 @@
+/*
+ * Tests that the recipient primary cannot sync from a donor host that has a majority OpTime that is
+ * earlier than the recipient's stored 'startApplyingDonorOpTime'.
+ *
+ * @tags: [requires_majority_read_concern, requires_fcv_49]
+ */
+
+(function() {
+"use strict";
+
+load("jstests/libs/fail_point_util.js");
+load("jstests/libs/uuid_util.js");
+load("jstests/libs/write_concern_util.js");
+load("jstests/replsets/libs/tenant_migration_test.js");
+load('jstests/replsets/rslib.js');
+
+const donorRst = new ReplSetTest({
+ name: `${jsTestName()}_donor`,
+ nodes: 3,
+ settings: {chainingAllowed: false},
+ nodeOptions: TenantMigrationUtil.makeX509OptionsForTest().donor,
+});
+donorRst.startSet();
+donorRst.initiateWithHighElectionTimeout();
+
+const tenantMigrationTest = new TenantMigrationTest({name: jsTestName(), donorRst});
+if (!tenantMigrationTest.isFeatureFlagEnabled()) {
+ jsTestLog("Skipping test because the tenant migrations feature flag is disabled");
+ donorRst.stopSet();
+ return;
+}
+const tenantId = "testTenantId";
+const tenantDB = tenantMigrationTest.tenantDB(tenantId, "DB");
+const collName = "testColl";
+
+const donorPrimary = tenantMigrationTest.getDonorPrimary();
+const delayedSecondary = donorRst.getSecondaries()[0];
+const donorSecondary = donorRst.getSecondaries()[1];
+
+const recipientRst = tenantMigrationTest.getRecipientRst();
+const recipientPrimary = tenantMigrationTest.getRecipientPrimary();
+const newRecipientPrimary = recipientRst.getSecondary();
+
+// Stop replicating on one of the secondaries so that its majority OpTime will be behind the
+// recipient's 'startApplyingDonorOpTime'.
+stopServerReplication(delayedSecondary);
+tenantMigrationTest.insertDonorDB(tenantDB, collName);
+
+const hangRecipientPrimaryAfterCreatingRSM =
+ configureFailPoint(recipientPrimary, 'hangAfterCreatingRSM');
+const hangRecipientPrimaryAfterCreatingConnections = configureFailPoint(
+ recipientPrimary, 'fpAfterStartingOplogFetcherMigrationRecipientInstance', {action: "hang"});
+
+const migrationOpts = {
+ migrationIdString: extractUUIDFromObject(UUID()),
+ tenantId,
+ // The recipient primary can only choose secondaries as sync sources.
+ readPreference: {mode: 'secondary'},
+};
+
+jsTestLog("Starting the tenant migration");
+assert.commandWorked(tenantMigrationTest.startMigration(migrationOpts));
+hangRecipientPrimaryAfterCreatingRSM.wait();
+
+awaitRSClientHosts(recipientPrimary, donorSecondary, {ok: true, secondary: true});
+awaitRSClientHosts(recipientPrimary, delayedSecondary, {ok: true, secondary: true});
+
+// Turn on the 'waitInHello' failpoint. This will cause the delayed secondary to cease sending hello
+// responses and the RSM should mark the node as down. This is necessary so that the delayed
+// secondary is not chosen as the sync source here, since we want the 'startApplyingDonorOpTime' to
+// be set to the most advanced majority OpTime.
+jsTestLog(
+ "Turning on waitInHello failpoint. Delayed donor secondary should stop sending hello responses.");
+const helloFailpoint = configureFailPoint(delayedSecondary, "waitInHello");
+awaitRSClientHosts(recipientPrimary, delayedSecondary, {ok: false});
+
+hangRecipientPrimaryAfterCreatingRSM.off();
+hangRecipientPrimaryAfterCreatingConnections.wait();
+
+let res = recipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient migration"});
+let currOp = res.inprog[0];
+// The migration should not be complete.
+assert.eq(currOp.migrationCompleted, false, tojson(res));
+assert.eq(currOp.dataSyncCompleted, false, tojson(res));
+// The sync source can only be 'donorSecondary'.
+assert.eq(donorSecondary.host, currOp.donorSyncSource, tojson(res));
+
+helloFailpoint.off();
+
+const hangNewRecipientPrimaryAfterCreatingRSM =
+ configureFailPoint(newRecipientPrimary, 'hangAfterCreatingRSM');
+const hangNewRecipientPrimaryAfterCreatingConnections =
+ configureFailPoint(newRecipientPrimary,
+ 'fpAfterRetrievingStartOpTimesMigrationRecipientInstance',
+ {action: "hang"});
+
+// Step up a new primary so that the tenant migration restarts on the new primary, with the
+// 'startApplyingDonorOpTime' field already set in the state doc.
+jsTestLog("Stepping up the recipient secondary");
+recipientRst.awaitLastOpCommitted();
+recipientRst.stepUp(newRecipientPrimary);
+assert.eq(newRecipientPrimary, recipientRst.getPrimary());
+
+// Wait for the new primary to see the state of each donor node.
+hangNewRecipientPrimaryAfterCreatingRSM.wait();
+awaitRSClientHosts(newRecipientPrimary, donorPrimary, {ok: true, ismaster: true});
+awaitRSClientHosts(
+ newRecipientPrimary, [delayedSecondary, donorSecondary], {ok: true, secondary: true});
+hangNewRecipientPrimaryAfterCreatingRSM.off();
+
+jsTestLog("Releasing failpoints");
+hangNewRecipientPrimaryAfterCreatingConnections.wait();
+hangRecipientPrimaryAfterCreatingConnections.off();
+
+res = newRecipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient migration"});
+currOp = res.inprog[0];
+// 'donorSecondary' should always be the chosen sync source, since read preference is 'secondary'
+// and 'delayedSecondary' cannot be chosen because it is too stale.
+assert.eq(donorSecondary.host,
+ currOp.donorSyncSource,
+ `the recipient should always choose the non-lagged secondary as sync source`);
+
+hangNewRecipientPrimaryAfterCreatingConnections.off();
+restartServerReplication(delayedSecondary);
+
+assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts));
+assert.commandWorked(tenantMigrationTest.forgetMigration(migrationOpts.migrationIdString));
+
+donorRst.stopSet();
+tenantMigrationTest.stop();
+})();
diff --git a/jstests/replsets/tenant_migration_recipient_current_op.js b/jstests/replsets/tenant_migration_recipient_current_op.js
index 43a8ff6f312..b857eb78112 100644
--- a/jstests/replsets/tenant_migration_recipient_current_op.js
+++ b/jstests/replsets/tenant_migration_recipient_current_op.js
@@ -93,6 +93,7 @@ assert(!currOp.startApplyingDonorOpTime, tojson(res));
assert(!currOp.dataConsistentStopDonorOpTime, tojson(res));
assert(!currOp.cloneFinishedRecipientOpTime, tojson(res));
assert(!currOp.expireAt, tojson(res));
+assert(!currOp.donorSyncSource, tojson(res));
fpAfterPersistingStateDoc.off();
// Allow the migration to move to the point where the startFetchingDonorOpTime has been obtained.
@@ -111,6 +112,7 @@ assert(!currOp.expireAt, tojson(res));
// Must exist now.
assert(currOp.startFetchingDonorOpTime, tojson(res));
assert(currOp.startApplyingDonorOpTime, tojson(res));
+assert(currOp.donorSyncSource, tojson(res));
fpAfterRetrievingStartOpTime.off();
// Wait until collection cloning is done, and cloneFinishedRecipientOpTime
@@ -128,6 +130,7 @@ assert(!currOp.expireAt, tojson(res));
// Must exist now.
assert(currOp.startFetchingDonorOpTime, tojson(res));
assert(currOp.startApplyingDonorOpTime, tojson(res));
+assert(currOp.donorSyncSource, tojson(res));
assert(currOp.dataConsistentStopDonorOpTime, tojson(res));
assert(currOp.cloneFinishedRecipientOpTime, tojson(res));
fpAfterCollectionCloner.off();
diff --git a/jstests/replsets/tenant_migration_recipient_sync_source_no_available_donor_hosts.js b/jstests/replsets/tenant_migration_recipient_sync_source_no_available_donor_hosts.js
new file mode 100644
index 00000000000..a076b1f588c
--- /dev/null
+++ b/jstests/replsets/tenant_migration_recipient_sync_source_no_available_donor_hosts.js
@@ -0,0 +1,154 @@
+/*
+ * Tests that the migration cannot complete when at least one donor host has a stale majority OpTime
+ * and all other hosts are considered unavailable. The recipient primary should retry and continue
+ * to wait until a suitable sync source is available on the donor replica set.
+ *
+ * @tags: [requires_majority_read_concern, requires_fcv_49]
+ */
+
+(function() {
+"use strict";
+
+load("jstests/libs/fail_point_util.js");
+load("jstests/libs/uuid_util.js");
+load("jstests/libs/write_concern_util.js");
+load("jstests/replsets/libs/tenant_migration_test.js");
+load("jstests/replsets/libs/tenant_migration_util.js");
+load('jstests/replsets/rslib.js');
+
+const donorRst = new ReplSetTest({
+ name: `${jsTestName()}_donor`,
+ nodes: 3,
+ settings: {chainingAllowed: false},
+ nodeOptions:
+ Object.assign(TenantMigrationUtil.makeX509OptionsForTest().donor,
+ {setParameter: {tenantMigrationExcludeDonorHostTimeoutMS: 30 * 1000}}),
+});
+donorRst.startSet();
+donorRst.initiateWithHighElectionTimeout();
+
+const tenantMigrationTest = new TenantMigrationTest({name: jsTestName(), donorRst});
+if (!tenantMigrationTest.isFeatureFlagEnabled()) {
+ jsTestLog("Skipping test because the tenant migrations feature flag is disabled");
+ donorRst.stopSet();
+ return;
+}
+const tenantId = "testTenantId";
+const tenantDB = tenantMigrationTest.tenantDB(tenantId, "DB");
+const collName = "testColl";
+
+const donorPrimary = tenantMigrationTest.getDonorPrimary();
+const delayedSecondary = donorRst.getSecondaries()[0];
+const donorSecondary = donorRst.getSecondaries()[1];
+
+const recipientRst = tenantMigrationTest.getRecipientRst();
+const recipientPrimary = tenantMigrationTest.getRecipientPrimary();
+const newRecipientPrimary = recipientRst.getSecondary();
+
+// Stop replicating on one of the secondaries so that its majority OpTime will be behind the
+// recipient's 'startApplyingDonorOpTime'.
+stopServerReplication(delayedSecondary);
+tenantMigrationTest.insertDonorDB(tenantDB, collName);
+
+const hangRecipientPrimaryAfterCreatingRSM =
+ configureFailPoint(recipientPrimary, 'hangAfterCreatingRSM');
+const hangRecipientPrimaryAfterCreatingConnections = configureFailPoint(
+ recipientPrimary, 'fpAfterStartingOplogFetcherMigrationRecipientInstance', {action: "hang"});
+
+const migrationOpts = {
+ migrationIdString: extractUUIDFromObject(UUID()),
+ tenantId,
+ // The recipient primary can only choose secondaries as sync sources.
+ readPreference: {mode: 'secondary'},
+};
+
+jsTestLog("Starting the tenant migration");
+assert.commandWorked(tenantMigrationTest.startMigration(migrationOpts));
+hangRecipientPrimaryAfterCreatingRSM.wait();
+
+awaitRSClientHosts(recipientPrimary, donorSecondary, {ok: true, secondary: true});
+awaitRSClientHosts(recipientPrimary, delayedSecondary, {ok: true, secondary: true});
+
+// Turn on the 'waitInHello' failpoint. This will cause the delayed secondary to cease sending hello
+// responses and the RSM should mark the node as down. This is necessary so that the delayed
+// secondary is not chosen as the sync source here, since we want the 'startApplyingDonorOpTime' to
+// be set to the most advanced majority OpTime.
+jsTestLog(
+ "Turning on waitInHello failpoint. Delayed donor secondary should stop sending hello responses.");
+const helloFailpoint = configureFailPoint(delayedSecondary, "waitInHello");
+awaitRSClientHosts(recipientPrimary, delayedSecondary, {ok: false});
+
+hangRecipientPrimaryAfterCreatingRSM.off();
+hangRecipientPrimaryAfterCreatingConnections.wait();
+
+let res = recipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient migration"});
+let currOp = res.inprog[0];
+// The migration should not be complete.
+assert.eq(currOp.migrationCompleted, false, tojson(res));
+assert.eq(currOp.dataSyncCompleted, false, tojson(res));
+// The sync source can only be 'donorSecondary'.
+assert.eq(donorSecondary.host, currOp.donorSyncSource, tojson(res));
+
+helloFailpoint.off();
+
+const hangNewRecipientPrimaryAfterCreatingRSM =
+ configureFailPoint(newRecipientPrimary, 'hangAfterCreatingRSM');
+const hangNewRecipientPrimaryAfterCreatingConnections =
+ configureFailPoint(newRecipientPrimary,
+ 'fpAfterRetrievingStartOpTimesMigrationRecipientInstance',
+ {action: "hang"});
+
+// Step up a new primary so that the tenant migration restarts on the new primary, with the
+// 'startApplyingDonorOpTime' field already set in the state doc.
+jsTestLog("Stepping up the recipient secondary");
+recipientRst.awaitLastOpCommitted();
+recipientRst.stepUp(newRecipientPrimary);
+assert.eq(newRecipientPrimary, recipientRst.getPrimary());
+
+jsTestLog("Stopping the non-lagged secondary");
+donorRst.stop(donorSecondary);
+
+// Wait for the new primary to see the state of each donor node. 'donorSecondary' should return
+// '{ok: false}' since it has been shut down.
+hangNewRecipientPrimaryAfterCreatingRSM.wait();
+awaitRSClientHosts(newRecipientPrimary, donorPrimary, {ok: true, ismaster: true});
+awaitRSClientHosts(newRecipientPrimary, delayedSecondary, {ok: true, secondary: true});
+awaitRSClientHosts(newRecipientPrimary, donorSecondary, {ok: false});
+
+jsTestLog("Releasing failpoints");
+hangNewRecipientPrimaryAfterCreatingRSM.off();
+hangRecipientPrimaryAfterCreatingConnections.off();
+
+res = newRecipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient migration"});
+currOp = res.inprog[0];
+// The migration should not be complete and there should be no sync source stored, since the new
+// recipient primary does not have a valid sync source to choose from.
+assert.eq(currOp.migrationCompleted, false, tojson(res));
+assert.eq(currOp.dataSyncCompleted, false, tojson(res));
+assert(!currOp.donorSyncSource, tojson(res));
+
+jsTestLog("Restarting replication on 'delayedSecondary'");
+restartServerReplication(delayedSecondary);
+// The recipient should eventually be able to connect to the lagged secondary, after the secondary
+// has caught up and the exclude timeout has expired.
+// TODO (SERVER-54256): After we add retries in sync source selection for the recipient primary,
+// uncomment these lines.
+// hangNewRecipientPrimaryAfterCreatingConnections.wait();
+
+// res = newRecipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient migration"});
+// currOp = res.inprog[0];
+// assert.eq(
+// delayedSecondary.host,
+// currOp.donorSyncSource,
+// `the new recipient primary should only be able to choose 'delayedSecondary' as sync source`);
+
+// hangNewRecipientPrimaryAfterCreatingConnections.off();
+
+assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts));
+assert.commandWorked(tenantMigrationTest.forgetMigration(migrationOpts.migrationIdString));
+
+// Remove 'donorSecondary' so that the test can complete properly.
+donorRst.remove(donorSecondary);
+donorRst.stopSet();
+tenantMigrationTest.stop();
+})();
diff --git a/src/mongo/client/streamable_replica_set_monitor.cpp b/src/mongo/client/streamable_replica_set_monitor.cpp
index 9d4764979be..3e118440cc6 100644
--- a/src/mongo/client/streamable_replica_set_monitor.cpp
+++ b/src/mongo/client/streamable_replica_set_monitor.cpp
@@ -356,18 +356,20 @@ SemiFuture<std::vector<HostAndPort>> StreamableReplicaSetMonitor::getHostsOrRefr
return {*immediateResult};
}
- return _enqueueOutstandingQuery(lk, criteria, cancelToken, deadline);
+ return _enqueueOutstandingQuery(lk, criteria, excludedHosts, cancelToken, deadline);
});
}
SemiFuture<std::vector<HostAndPort>> StreamableReplicaSetMonitor::_enqueueOutstandingQuery(
WithLock,
const ReadPreferenceSetting& criteria,
+ const std::vector<HostAndPort>& excludedHosts,
const CancelationToken& cancelToken,
const Date_t& deadline) {
auto query = std::make_shared<HostQuery>();
query->criteria = criteria;
+ query->excludedHosts = excludedHosts;
auto pf = makePromiseFuture<std::vector<HostAndPort>>();
query->promise = std::move(pf.promise);
@@ -783,7 +785,7 @@ void StreamableReplicaSetMonitor::_processOutstanding(
// If query has not been canceled yet, try to satisfy it.
if (!query->hasBeenResolved()) {
- auto result = _getHosts(topologyDescription, query->criteria);
+ auto result = _getHosts(topologyDescription, query->criteria, query->excludedHosts);
if (result) {
if (query->tryResolveWithSuccess(std::move(*result))) {
const auto latency = _executor->now() - query->start;
diff --git a/src/mongo/client/streamable_replica_set_monitor.h b/src/mongo/client/streamable_replica_set_monitor.h
index a1b0368981d..55cb63b87df 100644
--- a/src/mongo/client/streamable_replica_set_monitor.h
+++ b/src/mongo/client/streamable_replica_set_monitor.h
@@ -180,6 +180,8 @@ private:
ReadPreferenceSetting criteria;
+ std::vector<HostAndPort> excludedHosts;
+
// Used to compute latency.
Date_t start;
@@ -201,6 +203,7 @@ private:
SemiFuture<std::vector<HostAndPort>> _enqueueOutstandingQuery(
WithLock,
const ReadPreferenceSetting& criteria,
+ const std::vector<HostAndPort>& excludedHosts,
const CancelationToken& cancelToken,
const Date_t& deadline);
diff --git a/src/mongo/db/repl/repl_server_parameters.idl b/src/mongo/db/repl/repl_server_parameters.idl
index e58efd020c0..4f9b975ce2e 100644
--- a/src/mongo/db/repl/repl_server_parameters.idl
+++ b/src/mongo/db/repl/repl_server_parameters.idl
@@ -453,6 +453,16 @@ server_parameters:
cpp_varname: tenantMigrationDisableX509Auth
default: false
+ tenantMigrationExcludeDonorHostTimeoutMS:
+ description: >-
+ Period of time, in milliseconds, that a donor host should be excluded for.
+ set_at: startup
+ cpp_vartype: int
+ cpp_varname: tenantMigrationExcludeDonorHostTimeoutMS
+ default: 60000
+ validator:
+ gte: 1
+
feature_flags:
featureFlagTenantMigrations:
description: >-
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
index 0587e7865b8..e7d9d632b45 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
@@ -95,6 +95,7 @@ MONGO_FAIL_POINT_DEFINE(fpAfterStartingOplogApplierMigrationRecipientInstance);
MONGO_FAIL_POINT_DEFINE(fpAfterDataConsistentMigrationRecipientInstance);
MONGO_FAIL_POINT_DEFINE(hangBeforeTaskCompletion);
MONGO_FAIL_POINT_DEFINE(fpAfterReceivingRecipientForgetMigration);
+MONGO_FAIL_POINT_DEFINE(hangAfterCreatingRSM);
namespace {
// We never restart just the oplog fetcher. If a failure occurs, we restart the whole state machine
@@ -280,6 +281,10 @@ boost::optional<BSONObj> TenantMigrationRecipientService::Instance::reportForCur
if (_stateDoc.getExpireAt())
bob.append("expireAt", _stateDoc.getExpireAt()->toString());
+ if (_client) {
+ bob.append("donorSyncSource", _client->getServerAddress());
+ }
+
return bob.obj();
}
@@ -388,6 +393,22 @@ std::unique_ptr<DBClientConnection> TenantMigrationRecipientService::Instance::_
return client;
}
+OpTime TenantMigrationRecipientService::Instance::_getDonorMajorityOpTime(
+ std::unique_ptr<mongo::DBClientConnection>& client) {
+ auto oplogOpTimeFields =
+ BSON(OplogEntry::kTimestampFieldName << 1 << OplogEntry::kTermFieldName << 1);
+ auto majorityOpTimeBson =
+ client->findOne(NamespaceString::kRsOplogNamespace.ns(),
+ Query().sort("$natural", -1),
+ &oplogOpTimeFields,
+ QueryOption_SecondaryOk,
+ ReadConcernArgs(ReadConcernLevel::kMajorityReadConcern).toBSONInner());
+ uassert(5272003, "Found no entries in the remote oplog", !majorityOpTimeBson.isEmpty());
+
+ auto majorityOpTime = uassertStatusOK(OpTime::parseFromOplogEntry(majorityOpTimeBson));
+ return majorityOpTime;
+}
+
SemiFuture<TenantMigrationRecipientService::Instance::ConnectionPair>
TenantMigrationRecipientService::Instance::_createAndConnectClients() {
LOGV2_DEBUG(4880401,
@@ -414,53 +435,118 @@ TenantMigrationRecipientService::Instance::_createAndConnectClients() {
.getAsync([getHostCancelSource](auto) mutable { getHostCancelSource.cancel(); });
});
- // Get all donor hosts that we have excluded.
- const auto& excludedHosts = _getExcludedDonorHosts(lk);
+ if (MONGO_unlikely(hangAfterCreatingRSM.shouldFail())) {
+ LOGV2(5272004, "hangAfterCreatingRSM failpoint enabled");
+ hangAfterCreatingRSM.pauseWhileSet();
+ }
- return _donorReplicaSetMonitor
- ->getHostOrRefresh(_readPreference, excludedHosts, getHostCancelSource.token())
- .thenRunOn(**_scopedExecutor)
- .then([this, self = shared_from_this()](const HostAndPort& serverAddress) {
- // Application name is constructed such that it doesn't exceeds
- // kMaxApplicationNameByteLength (128 bytes).
- // "TenantMigration_" (16 bytes) + <tenantId> (61 bytes) + "_" (1 byte) +
- // <migrationUuid> (36 bytes) = 114 bytes length.
- // Note: Since the total length of tenant database name (<tenantId>_<user provided db
- // name>) can't exceed 63 bytes and the user provided db name should be at least one
- // character long, the maximum length of tenantId can only be 61 bytes.
- auto applicationName =
- "TenantMigration_" + getTenantId() + "_" + getMigrationUUID().toString();
-
- auto client = _connectAndAuth(serverAddress, applicationName);
-
- // Application name is constructed such that it doesn't exceeds
- // kMaxApplicationNameByteLength (128 bytes).
- // "TenantMigration_" (16 bytes) + <tenantId> (61 bytes) + "_" (1 byte) +
- // <migrationUuid> (36 bytes) + _oplogFetcher" (13 bytes) = 127 bytes length.
- applicationName += "_oplogFetcher";
- auto oplogFetcherClient = _connectAndAuth(serverAddress, applicationName);
- return ConnectionPair(std::move(client), std::move(oplogFetcherClient));
- })
- .onError(
- [this, self = shared_from_this()](const Status& status) -> SemiFuture<ConnectionPair> {
+ const auto kDelayedMajorityOpTimeErrorCode = 5272000;
+
+ return AsyncTry([this,
+ self = shared_from_this(),
+ getHostCancelSource,
+ kDelayedMajorityOpTimeErrorCode] {
+ stdx::lock_guard lk(_mutex);
+
+ // Get all donor hosts that we have excluded.
+ const auto& excludedHosts = _getExcludedDonorHosts(lk);
+
+ return _donorReplicaSetMonitor
+ ->getHostOrRefresh(_readPreference, excludedHosts, getHostCancelSource.token())
+ .thenRunOn(**_scopedExecutor)
+ .then([this, self = shared_from_this(), kDelayedMajorityOpTimeErrorCode](
+ const HostAndPort& serverAddress) {
+ LOGV2(5272002,
+ "Attempting to connect to donor host",
+ "donorHost"_attr = serverAddress,
+ "tenantId"_attr = getTenantId(),
+ "migrationId"_attr = getMigrationUUID());
+ // Application name is constructed such that it doesn't exceeds
+ // kMaxApplicationNameByteLength (128 bytes).
+ // "TenantMigration_" (16 bytes) + <tenantId> (61 bytes) + "_" (1 byte) +
+ // <migrationUuid> (36 bytes) = 114 bytes length.
+ // Note: Since the total length of tenant database name (<tenantId>_<user
+ // provided db name>) can't exceed 63 bytes and the user provided db name
+ // should be at least one character long, the maximum length of tenantId can
+ // only be 61 bytes.
+ auto applicationName =
+ "TenantMigration_" + getTenantId() + "_" + getMigrationUUID().toString();
+
+ auto client = _connectAndAuth(serverAddress, applicationName);
+
+ boost::optional<repl::OpTime> startApplyingOpTime;
+ {
+ stdx::lock_guard lk(_mutex);
+ startApplyingOpTime = _stateDoc.getStartApplyingDonorOpTime();
+ }
+
+ if (startApplyingOpTime) {
+ auto majoritySnapshotOpTime = _getDonorMajorityOpTime(client);
+
+ if (majoritySnapshotOpTime < *startApplyingOpTime) {
+ stdx::lock_guard lk(_mutex);
+ const auto now =
+ getGlobalServiceContext()->getFastClockSource()->now();
+ _excludeDonorHost(
+ lk,
+ serverAddress,
+ now + Milliseconds(tenantMigrationExcludeDonorHostTimeoutMS));
+ uasserted(
+ kDelayedMajorityOpTimeErrorCode,
+ str::stream()
+ << "majoritySnapshotOpTime on donor host must not be behind "
+ "startApplyingDonorOpTime, majoritySnapshotOpTime: "
+ << majoritySnapshotOpTime.toString()
+ << "; startApplyingDonorOpTime: "
+ << (*startApplyingOpTime).toString());
+ }
+ }
+
+ // Application name is constructed such that it doesn't exceed
+ // kMaxApplicationNameByteLength (128 bytes).
+ // "TenantMigration_" (16 bytes) + <tenantId> (61 bytes) + "_" (1 byte) +
+ // <migrationUuid> (36 bytes) + _oplogFetcher" (13 bytes) = 127 bytes
+ // length.
+ applicationName += "_oplogFetcher";
+ auto oplogFetcherClient = _connectAndAuth(serverAddress, applicationName);
+ return ConnectionPair(std::move(client), std::move(oplogFetcherClient));
+ });
+ })
+ .until([this, self = shared_from_this(), kDelayedMajorityOpTimeErrorCode](
+ const StatusWith<ConnectionPair>& status) {
+ if (!status.isOK()) {
LOGV2_ERROR(4880404,
"Connecting to donor failed",
"tenantId"_attr = getTenantId(),
"migrationId"_attr = getMigrationUUID(),
- "error"_attr = status);
+ "error"_attr = status.getStatus());
// Make sure we don't end up with a partially initialized set of connections.
stdx::lock_guard lk(_mutex);
_client = nullptr;
_oplogFetcherClient = nullptr;
- return status;
- })
+
+ // If the future chain has been interrupted, stop retrying.
+ if (_taskState.isInterrupted()) {
+ return true;
+ }
+
+ // If the connection failed because the majority snapshot OpTime on the donor host
+ // was not ahead of our stored 'startApplyingDonorOpTime, choose another donor host
+ // to connect to.
+ if (status.getStatus() == ErrorCodes::Error(kDelayedMajorityOpTimeErrorCode)) {
+ return false;
+ }
+ }
+ return true;
+ })
+ .on(**_scopedExecutor, CancelationToken::uncancelable())
.semi();
}
-void TenantMigrationRecipientService::Instance::excludeDonorHost(const HostAndPort& host,
- Date_t until) {
- stdx::lock_guard lk(_mutex);
+void TenantMigrationRecipientService::Instance::_excludeDonorHost(WithLock,
+ const HostAndPort& host,
+ Date_t until) {
LOGV2_DEBUG(5271800,
2,
"Excluding donor host",
@@ -546,22 +632,13 @@ void TenantMigrationRecipientService::Instance::_getStartOpTimesFromDonor(WithLo
invariant(!_stateDoc.getStartFetchingDonorOpTime().has_value());
// Get the last oplog entry at the read concern majority optime in the remote oplog. It
// does not matter which tenant it is for.
- auto oplogOpTimeFields =
- BSON(OplogEntry::kTimestampFieldName << 1 << OplogEntry::kTermFieldName << 1);
- auto lastOplogEntry1Bson =
- _client->findOne(NamespaceString::kRsOplogNamespace.ns(),
- Query().sort("$natural", -1),
- &oplogOpTimeFields,
- QueryOption_SecondaryOk,
- ReadConcernArgs(ReadConcernLevel::kMajorityReadConcern).toBSONInner());
- uassert(4880601, "Found no entries in the remote oplog", !lastOplogEntry1Bson.isEmpty());
+ auto lastOplogEntry1OpTime = _getDonorMajorityOpTime(_client);
LOGV2_DEBUG(4880600,
2,
"Found last oplog entry at read concern majority optime on remote node",
"migrationId"_attr = getMigrationUUID(),
"tenantId"_attr = _stateDoc.getTenantId(),
- "lastOplogEntry"_attr = lastOplogEntry1Bson);
- auto lastOplogEntry1OpTime = uassertStatusOK(OpTime::parseFromOplogEntry(lastOplogEntry1Bson));
+ "lastOplogEntry"_attr = lastOplogEntry1OpTime.toBSON());
// Get the optime of the earliest transaction that was open at the read concern majority optime
// As with the last oplog entry, it does not matter that this may be for a different tenant; an
@@ -589,21 +666,14 @@ void TenantMigrationRecipientService::Instance::_getStartOpTimesFromDonor(WithLo
// We need to fetch the last oplog entry both before and after getting the transaction
// table entry, as otherwise there is a potential race where we may try to apply
// a commit for which we have not fetched a previous transaction oplog entry.
- auto lastOplogEntry2Bson =
- _client->findOne(NamespaceString::kRsOplogNamespace.ns(),
- Query().sort("$natural", -1),
- &oplogOpTimeFields,
- QueryOption_SecondaryOk,
- ReadConcernArgs(ReadConcernLevel::kMajorityReadConcern).toBSONInner());
- uassert(4880603, "Found no entries in the remote oplog", !lastOplogEntry2Bson.isEmpty());
+ auto lastOplogEntry2OpTime = _getDonorMajorityOpTime(_client);
LOGV2_DEBUG(4880604,
2,
"Found last oplog entry at the read concern majority optime (after reading txn "
"table) on remote node",
"migrationId"_attr = getMigrationUUID(),
"tenantId"_attr = _stateDoc.getTenantId(),
- "lastOplogEntry"_attr = lastOplogEntry2Bson);
- auto lastOplogEntry2OpTime = uassertStatusOK(OpTime::parseFromOplogEntry(lastOplogEntry2Bson));
+ "lastOplogEntry"_attr = lastOplogEntry2OpTime.toBSON());
_stateDoc.setStartApplyingDonorOpTime(lastOplogEntry2OpTime);
OpTime startFetchingDonorOpTime = lastOplogEntry1OpTime;
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.h b/src/mongo/db/repl/tenant_migration_recipient_service.h
index e801c0a7fc2..9de9ebc73c1 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.h
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.h
@@ -151,11 +151,6 @@ public:
const Timestamp& donorTs) const;
/*
- * Suppresses selecting 'host' as the donor sync source, until 'until'.
- */
- void excludeDonorHost(const HostAndPort& host, Date_t until);
-
- /*
* Set the oplog creator functor, to allow use of a mock oplog fetcher.
*/
void setCreateOplogFetcherFn_forTest(
@@ -171,6 +166,14 @@ public:
_tenantOplogApplier->shutdown();
}
+ /*
+ * Suppresses selecting 'host' as the donor sync source, until 'until'.
+ */
+ void excludeDonorHost_forTest(const HostAndPort& host, Date_t until) {
+ stdx::lock_guard lk(_mutex);
+ _excludeDonorHost(lk, host, until);
+ }
+
private:
friend class TenantMigrationRecipientServiceTest;
@@ -391,10 +394,15 @@ public:
void _cleanupOnDataSyncCompletion(Status status);
/*
+ * Suppresses selecting 'host' as the donor sync source, until 'until'.
+ */
+ void _excludeDonorHost(WithLock, const HostAndPort& host, Date_t until);
+
+ /*
* Returns a vector of currently excluded donor hosts. Also removes hosts from the list of
* excluded donor nodes, if the exclude duration has expired.
*/
- std::vector<HostAndPort> _getExcludedDonorHosts(WithLock lk);
+ std::vector<HostAndPort> _getExcludedDonorHosts(WithLock);
/*
* Makes the failpoint to stop or hang based on failpoint data "action" field.
@@ -406,6 +414,11 @@ public:
*/
SharedSemiFuture<void> _updateStateDocForMajority(WithLock lk) const;
+ /*
+ * Returns the majority OpTime on the donor node that 'client' is connected to.
+ */
+ OpTime _getDonorMajorityOpTime(std::unique_ptr<mongo::DBClientConnection>& client);
+
mutable Mutex _mutex = MONGO_MAKE_LATCH("TenantMigrationRecipientService::_mutex");
// All member variables are labeled with one of the following codes indicating the
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
index 31f833b75f8..342201ce67e 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
@@ -306,32 +306,45 @@ protected:
ASSERT_OK(persistedStateDocWithStatus.getStatus());
ASSERT_BSONOBJ_EQ(memoryStateDoc.toBSON(), persistedStateDocWithStatus.getValue().toBSON());
}
- void insertToAllNodes(MockReplicaSet* replSet, const std::string& nss, BSONObj obj) {
- for (const auto& host : replSet->getHosts()) {
+ void insertToNodes(MockReplicaSet* replSet,
+ const std::string& nss,
+ BSONObj obj,
+ const std::vector<HostAndPort>& hosts) {
+ for (const auto& host : hosts) {
replSet->getNode(host.toString())->insert(nss, obj);
}
}
- void clearCollectionAllNodes(MockReplicaSet* replSet, const std::string& nss) {
- for (const auto& host : replSet->getHosts()) {
+ void insertToAllNodes(MockReplicaSet* replSet, const std::string& nss, BSONObj obj) {
+ insertToNodes(replSet, nss, obj, replSet->getHosts());
+ }
+
+ void clearCollection(MockReplicaSet* replSet,
+ const std::string& nss,
+ const std::vector<HostAndPort>& hosts) {
+ for (const auto& host : hosts) {
replSet->getNode(host.toString())->remove(nss, Query());
}
}
- void insertTopOfOplog(MockReplicaSet* replSet, const OpTime& topOfOplogOpTime) {
+ void insertTopOfOplog(MockReplicaSet* replSet,
+ const OpTime& topOfOplogOpTime,
+ const std::vector<HostAndPort> hosts = {}) {
+ const auto targetHosts = hosts.empty() ? replSet->getHosts() : hosts;
// The MockRemoteDBService does not actually implement the database, so to make our
// find work correctly we must make sure there's only one document to find.
- clearCollectionAllNodes(replSet, NamespaceString::kRsOplogNamespace.ns());
- insertToAllNodes(replSet,
- NamespaceString::kRsOplogNamespace.ns(),
- makeOplogEntry(topOfOplogOpTime,
- OpTypeEnum::kNoop,
- {} /* namespace */,
- boost::none /* uuid */,
- BSONObj() /* o */,
- boost::none /* o2 */)
- .getEntry()
- .toBSON());
+ clearCollection(replSet, NamespaceString::kRsOplogNamespace.ns(), targetHosts);
+ insertToNodes(replSet,
+ NamespaceString::kRsOplogNamespace.ns(),
+ makeOplogEntry(topOfOplogOpTime,
+ OpTypeEnum::kNoop,
+ {} /* namespace */,
+ boost::none /* uuid */,
+ BSONObj() /* o */,
+ boost::none /* o2 */)
+ .getEntry()
+ .toBSON(),
+ targetHosts);
}
// Accessors to class private members
@@ -648,7 +661,7 @@ TEST_F(TenantMigrationRecipientServiceTest,
// Mark the primary as excluded.
auto hosts = replSet.getHosts();
auto now = opCtx->getServiceContext()->getFastClockSource()->now();
- instance->excludeDonorHost(hosts.at(0), now + Milliseconds(500));
+ instance->excludeDonorHost_forTest(hosts.at(0), now + Milliseconds(500));
AtomicWord<bool> runReplMonitor{true};
// Keep scanning the replica set while waiting to reach the failpoint. This would normally
@@ -722,7 +735,7 @@ TEST_F(TenantMigrationRecipientServiceTest,
auto hosts = replSet.getHosts();
auto now = opCtx->getServiceContext()->getFastClockSource()->now();
auto excludeTime = Milliseconds(500);
- instance->excludeDonorHost(hosts.at(0), now + excludeTime);
+ instance->excludeDonorHost_forTest(hosts.at(0), now + excludeTime);
// Advance the clock past excludeTime.
advanceTime(excludeTime + Milliseconds(500));
@@ -789,7 +802,7 @@ TEST_F(TenantMigrationRecipientServiceTest,
auto hosts = replSet.getHosts();
for (const auto& host : hosts) {
const auto now = opCtx->getServiceContext()->getFastClockSource()->now();
- instance->excludeDonorHost(host, now + Milliseconds(500));
+ instance->excludeDonorHost_forTest(host, now + Milliseconds(500));
}
AtomicWord<bool> runReplMonitor{true};
@@ -864,7 +877,7 @@ TEST_F(TenantMigrationRecipientServiceTest,
auto hosts = replSet.getHosts();
auto now = opCtx->getServiceContext()->getFastClockSource()->now();
auto excludeTime = Milliseconds(500);
- instance->excludeDonorHost(hosts.at(0), now + excludeTime);
+ instance->excludeDonorHost_forTest(hosts.at(0), now + excludeTime);
hangFp->setMode(FailPoint::off);
taskFp->waitForTimesEntered(taskFpInitialTimesEntered + 1);
@@ -930,7 +943,7 @@ TEST_F(TenantMigrationRecipientServiceTest,
auto hosts = replSet.getHosts();
auto now = opCtx->getServiceContext()->getFastClockSource()->now();
auto excludeTime = Milliseconds(500);
- instance->excludeDonorHost(hosts.at(0), now + excludeTime);
+ instance->excludeDonorHost_forTest(hosts.at(0), now + excludeTime);
// Advance the clock past excludeTime.
advanceTime(excludeTime + Milliseconds(500));
@@ -961,6 +974,61 @@ TEST_F(TenantMigrationRecipientServiceTest,
ASSERT_OK(instance->getCompletionFuture().getNoThrow());
}
+TEST_F(TenantMigrationRecipientServiceTest,
+ TenantMigrationRecipientConnection_RemoteMajorityOpTimeBehindStartApplying) {
+ stopFailPointEnableBlock fp("fpAfterConnectingTenantMigrationRecipientInstance");
+
+ const UUID migrationUUID = UUID::gen();
+ const OpTime remoteMajorityOpTime(Timestamp(5, 1), 1);
+ const OpTime startApplyingOpTime(Timestamp(6, 1), 1);
+
+ auto taskFp = globalFailPointRegistry().find("hangBeforeTaskCompletion");
+ auto timesEntered = taskFp->setMode(FailPoint::alwaysOn, 0);
+
+ // Insert the remote majority optime into the oplogs of the first two hosts.
+ MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
+ const auto hosts = replSet.getHosts();
+ const std::vector<HostAndPort> advancedOpTimeHosts = {hosts.begin(), hosts.begin() + 2};
+
+ insertTopOfOplog(&replSet, remoteMajorityOpTime, advancedOpTimeHosts);
+ insertTopOfOplog(&replSet, startApplyingOpTime, {hosts.at(2)});
+
+ TenantMigrationRecipientDocument initialStateDocument(
+ migrationUUID,
+ replSet.getConnectionString(),
+ "tenantA",
+ ReadPreferenceSetting(ReadPreference::PrimaryPreferred));
+ initialStateDocument.setRecipientCertificateForDonor(kRecipientPEMPayload);
+ initialStateDocument.setStartApplyingDonorOpTime(startApplyingOpTime);
+
+ // Create and start the instance.
+ auto opCtx = makeOperationContext();
+ auto instance = TenantMigrationRecipientService::Instance::getOrCreate(
+ opCtx.get(), _service, initialStateDocument.toBSON());
+ ASSERT(instance.get());
+ ASSERT_EQ(migrationUUID, instance->getMigrationUUID());
+
+ taskFp->waitForTimesEntered(timesEntered + 1);
+
+ auto* client = getClient(instance.get());
+ auto* oplogFetcherClient = getOplogFetcherClient(instance.get());
+ // Both clients should be populated.
+ ASSERT(client);
+ ASSERT(oplogFetcherClient);
+
+ // Clients should be distinct.
+ ASSERT(client != oplogFetcherClient);
+
+ // Clients should be connected to donor node at index 2.
+ auto donorHost = hosts[2].toString();
+ ASSERT_EQ(donorHost, client->getServerAddress());
+ ASSERT(client->isStillConnected());
+ ASSERT_EQ(donorHost, oplogFetcherClient->getServerAddress());
+ ASSERT(oplogFetcherClient->isStillConnected());
+
+ taskFp->setMode(FailPoint::off, 0);
+}
+
TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientConnection_PrimaryFailsOver) {
stopFailPointEnableBlock fp("fpAfterConnectingTenantMigrationRecipientInstance");
@@ -1506,14 +1574,13 @@ TEST_F(TenantMigrationRecipientServiceTest, OplogFetcherResumesFromTopOfOplogBuf
TEST_F(TenantMigrationRecipientServiceTest, OplogFetcherNoDocInBufferToResumeFrom) {
const UUID migrationUUID = UUID::gen();
- const OpTime initialOpTime(Timestamp(1, 1), 1);
const OpTime startFetchingOpTime(Timestamp(2, 1), 1);
const OpTime clonerFinishedOpTime(Timestamp(3, 1), 1);
const OpTime resumeFetchingOpTime(Timestamp(4, 1), 1);
const OpTime dataConsistentOpTime(Timestamp(5, 1), 1);
MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
- insertTopOfOplog(&replSet, initialOpTime);
+ insertTopOfOplog(&replSet, startFetchingOpTime);
TenantMigrationRecipientDocument initialStateDocument(
migrationUUID,
@@ -1612,13 +1679,12 @@ TEST_F(TenantMigrationRecipientServiceTest, OplogFetcherNoDocInBufferToResumeFro
TEST_F(TenantMigrationRecipientServiceTest, OplogApplierResumesFromLastNoOpOplogEntry) {
const UUID migrationUUID = UUID::gen();
- const OpTime initialOpTime(Timestamp(1, 1), 1);
const OpTime clonerFinishedOpTime(Timestamp(2, 1), 1);
const OpTime resumeOpTime(Timestamp(3, 1), 1);
const OpTime dataConsistentOpTime(Timestamp(4, 1), 1);
MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
- insertTopOfOplog(&replSet, initialOpTime);
+ insertTopOfOplog(&replSet, clonerFinishedOpTime);
TenantMigrationRecipientDocument initialStateDocument(
migrationUUID,
@@ -1633,7 +1699,7 @@ TEST_F(TenantMigrationRecipientServiceTest, OplogApplierResumesFromLastNoOpOplog
clonerFinishedOpTime /* cloneFinishedRecipientOpTime */,
dataConsistentOpTime /* dataConsistentStopDonorOpTime */,
clonerFinishedOpTime /* startApplyingDonorOpTime */,
- initialOpTime /* startFetchingDonorOpTime */);
+ clonerFinishedOpTime /* startFetchingDonorOpTime */);
auto opCtx = makeOperationContext();
std::shared_ptr<TenantMigrationRecipientService::Instance> instance;
@@ -1732,12 +1798,11 @@ TEST_F(TenantMigrationRecipientServiceTest, OplogApplierResumesFromLastNoOpOplog
TEST_F(TenantMigrationRecipientServiceTest, OplogApplierResumesFromStartDonorApplyingOpTime) {
const UUID migrationUUID = UUID::gen();
- const OpTime initialOpTime(Timestamp(1, 1), 1);
const OpTime startApplyingOpTime(Timestamp(2, 1), 1);
const OpTime dataConsistentOpTime(Timestamp(4, 1), 1);
MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
- insertTopOfOplog(&replSet, initialOpTime);
+ insertTopOfOplog(&replSet, startApplyingOpTime);
TenantMigrationRecipientDocument initialStateDocument(
migrationUUID,
@@ -1749,10 +1814,12 @@ TEST_F(TenantMigrationRecipientServiceTest, OplogApplierResumesFromStartDonorApp
// We skip cloning here as a way to simulate that the recipient service has detected an existing
// migration on startup and will attempt to resume oplog fetching from the appropriate optime.
updateStateDocToCloningFinished(initialStateDocument,
- OpTime(Timestamp(10, 1), 1) /* cloneFinishedRecipientOpTime */,
+ OpTime(Timestamp(10, 1), 1) /* cloneFinishedRecipientOpTime
+ */
+ ,
dataConsistentOpTime /* dataConsistentStopDonorOpTime */,
startApplyingOpTime /* startApplyingDonorOpTime */,
- initialOpTime /* startFetchingDonorOpTime */);
+ startApplyingOpTime /* startFetchingDonorOpTime */);
auto opCtx = makeOperationContext();
std::shared_ptr<TenantMigrationRecipientService::Instance> instance;
@@ -1775,8 +1842,9 @@ TEST_F(TenantMigrationRecipientServiceTest, OplogApplierResumesFromStartDonorApp
// field. These oplog entries do not satisfy the conditions for the oplog applier to resume from
// so we default to resuming from 'startDonorApplyingOpTime'.
const auto insertNss = NamespaceString("tenantA_foo.bar");
+ const auto beforeStartApplyingOpTime = OpTime(Timestamp(1, 1), 1);
const auto entryBeforeStartApplyingOpTime = makeOplogEntry(
- initialOpTime,
+ beforeStartApplyingOpTime,
OpTypeEnum::kInsert,
insertNss,
UUID::gen(),
@@ -1886,12 +1954,11 @@ TEST_F(TenantMigrationRecipientServiceTest, OplogApplierResumesFromStartDonorApp
TEST_F(TenantMigrationRecipientServiceTest,
OplogFetcherResumesFromStartFetchingOpTimeWithDocInBuffer) {
const UUID migrationUUID = UUID::gen();
- const OpTime initialOpTime(Timestamp(1, 1), 1);
const OpTime startFetchingOpTime(Timestamp(2, 1), 1);
const OpTime dataConsistentOpTime(Timestamp(4, 1), 1);
MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
- insertTopOfOplog(&replSet, initialOpTime);
+ insertTopOfOplog(&replSet, startFetchingOpTime);
TenantMigrationRecipientDocument initialStateDocument(
migrationUUID,