diff options
author | XueruiFa <xuerui.fa@mongodb.com> | 2021-01-20 20:42:01 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-02-05 05:20:16 +0000 |
commit | 709dca02b250194115e5a79fa6c6a4daa9b296a0 (patch) | |
tree | d7095dade8e6a8d73507855e56ae0a25470cccf7 | |
parent | 1736293848e2a251db316bb5b4f529739d5e5a27 (diff) | |
download | mongo-709dca02b250194115e5a79fa6c6a4daa9b296a0.tar.gz |
SERVER-52720: Handle choosing a new donor node when startApplyingOpTime exists
10 files changed, 549 insertions, 97 deletions
diff --git a/jstests/replsets/libs/tenant_migration_test.js b/jstests/replsets/libs/tenant_migration_test.js index 6062f40afe4..70d6c24c571 100644 --- a/jstests/replsets/libs/tenant_migration_test.js +++ b/jstests/replsets/libs/tenant_migration_test.js @@ -418,8 +418,7 @@ function TenantMigrationTest({ const db = this.getDonorPrimary().getDB(dbName); const coll = db.getCollection(collName); - assert.commandWorked(coll.insertMany(data)); - this.getDonorRst().awaitReplication(); + assert.commandWorked(coll.insertMany(data, {writeConcern: {w: 'majority'}})); }; /** diff --git a/jstests/replsets/tenant_migration_recipient_cannot_sync_from_stale_donor_hosts.js b/jstests/replsets/tenant_migration_recipient_cannot_sync_from_stale_donor_hosts.js new file mode 100644 index 00000000000..8fb22cf88e4 --- /dev/null +++ b/jstests/replsets/tenant_migration_recipient_cannot_sync_from_stale_donor_hosts.js @@ -0,0 +1,131 @@ +/* + * Tests that the recipient primary cannot sync from a donor host that has a majority OpTime that is + * earlier than the recipient's stored 'startApplyingDonorOpTime'. + * + * @tags: [requires_majority_read_concern, requires_fcv_49] + */ + +(function() { +"use strict"; + +load("jstests/libs/fail_point_util.js"); +load("jstests/libs/uuid_util.js"); +load("jstests/libs/write_concern_util.js"); +load("jstests/replsets/libs/tenant_migration_test.js"); +load('jstests/replsets/rslib.js'); + +const donorRst = new ReplSetTest({ + name: `${jsTestName()}_donor`, + nodes: 3, + settings: {chainingAllowed: false}, + nodeOptions: TenantMigrationUtil.makeX509OptionsForTest().donor, +}); +donorRst.startSet(); +donorRst.initiateWithHighElectionTimeout(); + +const tenantMigrationTest = new TenantMigrationTest({name: jsTestName(), donorRst}); +if (!tenantMigrationTest.isFeatureFlagEnabled()) { + jsTestLog("Skipping test because the tenant migrations feature flag is disabled"); + donorRst.stopSet(); + return; +} +const tenantId = "testTenantId"; +const tenantDB = tenantMigrationTest.tenantDB(tenantId, "DB"); +const collName = "testColl"; + +const donorPrimary = tenantMigrationTest.getDonorPrimary(); +const delayedSecondary = donorRst.getSecondaries()[0]; +const donorSecondary = donorRst.getSecondaries()[1]; + +const recipientRst = tenantMigrationTest.getRecipientRst(); +const recipientPrimary = tenantMigrationTest.getRecipientPrimary(); +const newRecipientPrimary = recipientRst.getSecondary(); + +// Stop replicating on one of the secondaries so that its majority OpTime will be behind the +// recipient's 'startApplyingDonorOpTime'. +stopServerReplication(delayedSecondary); +tenantMigrationTest.insertDonorDB(tenantDB, collName); + +const hangRecipientPrimaryAfterCreatingRSM = + configureFailPoint(recipientPrimary, 'hangAfterCreatingRSM'); +const hangRecipientPrimaryAfterCreatingConnections = configureFailPoint( + recipientPrimary, 'fpAfterStartingOplogFetcherMigrationRecipientInstance', {action: "hang"}); + +const migrationOpts = { + migrationIdString: extractUUIDFromObject(UUID()), + tenantId, + // The recipient primary can only choose secondaries as sync sources. + readPreference: {mode: 'secondary'}, +}; + +jsTestLog("Starting the tenant migration"); +assert.commandWorked(tenantMigrationTest.startMigration(migrationOpts)); +hangRecipientPrimaryAfterCreatingRSM.wait(); + +awaitRSClientHosts(recipientPrimary, donorSecondary, {ok: true, secondary: true}); +awaitRSClientHosts(recipientPrimary, delayedSecondary, {ok: true, secondary: true}); + +// Turn on the 'waitInHello' failpoint. This will cause the delayed secondary to cease sending hello +// responses and the RSM should mark the node as down. This is necessary so that the delayed +// secondary is not chosen as the sync source here, since we want the 'startApplyingDonorOpTime' to +// be set to the most advanced majority OpTime. +jsTestLog( + "Turning on waitInHello failpoint. Delayed donor secondary should stop sending hello responses."); +const helloFailpoint = configureFailPoint(delayedSecondary, "waitInHello"); +awaitRSClientHosts(recipientPrimary, delayedSecondary, {ok: false}); + +hangRecipientPrimaryAfterCreatingRSM.off(); +hangRecipientPrimaryAfterCreatingConnections.wait(); + +let res = recipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient migration"}); +let currOp = res.inprog[0]; +// The migration should not be complete. +assert.eq(currOp.migrationCompleted, false, tojson(res)); +assert.eq(currOp.dataSyncCompleted, false, tojson(res)); +// The sync source can only be 'donorSecondary'. +assert.eq(donorSecondary.host, currOp.donorSyncSource, tojson(res)); + +helloFailpoint.off(); + +const hangNewRecipientPrimaryAfterCreatingRSM = + configureFailPoint(newRecipientPrimary, 'hangAfterCreatingRSM'); +const hangNewRecipientPrimaryAfterCreatingConnections = + configureFailPoint(newRecipientPrimary, + 'fpAfterRetrievingStartOpTimesMigrationRecipientInstance', + {action: "hang"}); + +// Step up a new primary so that the tenant migration restarts on the new primary, with the +// 'startApplyingDonorOpTime' field already set in the state doc. +jsTestLog("Stepping up the recipient secondary"); +recipientRst.awaitLastOpCommitted(); +recipientRst.stepUp(newRecipientPrimary); +assert.eq(newRecipientPrimary, recipientRst.getPrimary()); + +// Wait for the new primary to see the state of each donor node. +hangNewRecipientPrimaryAfterCreatingRSM.wait(); +awaitRSClientHosts(newRecipientPrimary, donorPrimary, {ok: true, ismaster: true}); +awaitRSClientHosts( + newRecipientPrimary, [delayedSecondary, donorSecondary], {ok: true, secondary: true}); +hangNewRecipientPrimaryAfterCreatingRSM.off(); + +jsTestLog("Releasing failpoints"); +hangNewRecipientPrimaryAfterCreatingConnections.wait(); +hangRecipientPrimaryAfterCreatingConnections.off(); + +res = newRecipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient migration"}); +currOp = res.inprog[0]; +// 'donorSecondary' should always be the chosen sync source, since read preference is 'secondary' +// and 'delayedSecondary' cannot be chosen because it is too stale. +assert.eq(donorSecondary.host, + currOp.donorSyncSource, + `the recipient should always choose the non-lagged secondary as sync source`); + +hangNewRecipientPrimaryAfterCreatingConnections.off(); +restartServerReplication(delayedSecondary); + +assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts)); +assert.commandWorked(tenantMigrationTest.forgetMigration(migrationOpts.migrationIdString)); + +donorRst.stopSet(); +tenantMigrationTest.stop(); +})(); diff --git a/jstests/replsets/tenant_migration_recipient_current_op.js b/jstests/replsets/tenant_migration_recipient_current_op.js index 43a8ff6f312..b857eb78112 100644 --- a/jstests/replsets/tenant_migration_recipient_current_op.js +++ b/jstests/replsets/tenant_migration_recipient_current_op.js @@ -93,6 +93,7 @@ assert(!currOp.startApplyingDonorOpTime, tojson(res)); assert(!currOp.dataConsistentStopDonorOpTime, tojson(res)); assert(!currOp.cloneFinishedRecipientOpTime, tojson(res)); assert(!currOp.expireAt, tojson(res)); +assert(!currOp.donorSyncSource, tojson(res)); fpAfterPersistingStateDoc.off(); // Allow the migration to move to the point where the startFetchingDonorOpTime has been obtained. @@ -111,6 +112,7 @@ assert(!currOp.expireAt, tojson(res)); // Must exist now. assert(currOp.startFetchingDonorOpTime, tojson(res)); assert(currOp.startApplyingDonorOpTime, tojson(res)); +assert(currOp.donorSyncSource, tojson(res)); fpAfterRetrievingStartOpTime.off(); // Wait until collection cloning is done, and cloneFinishedRecipientOpTime @@ -128,6 +130,7 @@ assert(!currOp.expireAt, tojson(res)); // Must exist now. assert(currOp.startFetchingDonorOpTime, tojson(res)); assert(currOp.startApplyingDonorOpTime, tojson(res)); +assert(currOp.donorSyncSource, tojson(res)); assert(currOp.dataConsistentStopDonorOpTime, tojson(res)); assert(currOp.cloneFinishedRecipientOpTime, tojson(res)); fpAfterCollectionCloner.off(); diff --git a/jstests/replsets/tenant_migration_recipient_sync_source_no_available_donor_hosts.js b/jstests/replsets/tenant_migration_recipient_sync_source_no_available_donor_hosts.js new file mode 100644 index 00000000000..a076b1f588c --- /dev/null +++ b/jstests/replsets/tenant_migration_recipient_sync_source_no_available_donor_hosts.js @@ -0,0 +1,154 @@ +/* + * Tests that the migration cannot complete when at least one donor host has a stale majority OpTime + * and all other hosts are considered unavailable. The recipient primary should retry and continue + * to wait until a suitable sync source is available on the donor replica set. + * + * @tags: [requires_majority_read_concern, requires_fcv_49] + */ + +(function() { +"use strict"; + +load("jstests/libs/fail_point_util.js"); +load("jstests/libs/uuid_util.js"); +load("jstests/libs/write_concern_util.js"); +load("jstests/replsets/libs/tenant_migration_test.js"); +load("jstests/replsets/libs/tenant_migration_util.js"); +load('jstests/replsets/rslib.js'); + +const donorRst = new ReplSetTest({ + name: `${jsTestName()}_donor`, + nodes: 3, + settings: {chainingAllowed: false}, + nodeOptions: + Object.assign(TenantMigrationUtil.makeX509OptionsForTest().donor, + {setParameter: {tenantMigrationExcludeDonorHostTimeoutMS: 30 * 1000}}), +}); +donorRst.startSet(); +donorRst.initiateWithHighElectionTimeout(); + +const tenantMigrationTest = new TenantMigrationTest({name: jsTestName(), donorRst}); +if (!tenantMigrationTest.isFeatureFlagEnabled()) { + jsTestLog("Skipping test because the tenant migrations feature flag is disabled"); + donorRst.stopSet(); + return; +} +const tenantId = "testTenantId"; +const tenantDB = tenantMigrationTest.tenantDB(tenantId, "DB"); +const collName = "testColl"; + +const donorPrimary = tenantMigrationTest.getDonorPrimary(); +const delayedSecondary = donorRst.getSecondaries()[0]; +const donorSecondary = donorRst.getSecondaries()[1]; + +const recipientRst = tenantMigrationTest.getRecipientRst(); +const recipientPrimary = tenantMigrationTest.getRecipientPrimary(); +const newRecipientPrimary = recipientRst.getSecondary(); + +// Stop replicating on one of the secondaries so that its majority OpTime will be behind the +// recipient's 'startApplyingDonorOpTime'. +stopServerReplication(delayedSecondary); +tenantMigrationTest.insertDonorDB(tenantDB, collName); + +const hangRecipientPrimaryAfterCreatingRSM = + configureFailPoint(recipientPrimary, 'hangAfterCreatingRSM'); +const hangRecipientPrimaryAfterCreatingConnections = configureFailPoint( + recipientPrimary, 'fpAfterStartingOplogFetcherMigrationRecipientInstance', {action: "hang"}); + +const migrationOpts = { + migrationIdString: extractUUIDFromObject(UUID()), + tenantId, + // The recipient primary can only choose secondaries as sync sources. + readPreference: {mode: 'secondary'}, +}; + +jsTestLog("Starting the tenant migration"); +assert.commandWorked(tenantMigrationTest.startMigration(migrationOpts)); +hangRecipientPrimaryAfterCreatingRSM.wait(); + +awaitRSClientHosts(recipientPrimary, donorSecondary, {ok: true, secondary: true}); +awaitRSClientHosts(recipientPrimary, delayedSecondary, {ok: true, secondary: true}); + +// Turn on the 'waitInHello' failpoint. This will cause the delayed secondary to cease sending hello +// responses and the RSM should mark the node as down. This is necessary so that the delayed +// secondary is not chosen as the sync source here, since we want the 'startApplyingDonorOpTime' to +// be set to the most advanced majority OpTime. +jsTestLog( + "Turning on waitInHello failpoint. Delayed donor secondary should stop sending hello responses."); +const helloFailpoint = configureFailPoint(delayedSecondary, "waitInHello"); +awaitRSClientHosts(recipientPrimary, delayedSecondary, {ok: false}); + +hangRecipientPrimaryAfterCreatingRSM.off(); +hangRecipientPrimaryAfterCreatingConnections.wait(); + +let res = recipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient migration"}); +let currOp = res.inprog[0]; +// The migration should not be complete. +assert.eq(currOp.migrationCompleted, false, tojson(res)); +assert.eq(currOp.dataSyncCompleted, false, tojson(res)); +// The sync source can only be 'donorSecondary'. +assert.eq(donorSecondary.host, currOp.donorSyncSource, tojson(res)); + +helloFailpoint.off(); + +const hangNewRecipientPrimaryAfterCreatingRSM = + configureFailPoint(newRecipientPrimary, 'hangAfterCreatingRSM'); +const hangNewRecipientPrimaryAfterCreatingConnections = + configureFailPoint(newRecipientPrimary, + 'fpAfterRetrievingStartOpTimesMigrationRecipientInstance', + {action: "hang"}); + +// Step up a new primary so that the tenant migration restarts on the new primary, with the +// 'startApplyingDonorOpTime' field already set in the state doc. +jsTestLog("Stepping up the recipient secondary"); +recipientRst.awaitLastOpCommitted(); +recipientRst.stepUp(newRecipientPrimary); +assert.eq(newRecipientPrimary, recipientRst.getPrimary()); + +jsTestLog("Stopping the non-lagged secondary"); +donorRst.stop(donorSecondary); + +// Wait for the new primary to see the state of each donor node. 'donorSecondary' should return +// '{ok: false}' since it has been shut down. +hangNewRecipientPrimaryAfterCreatingRSM.wait(); +awaitRSClientHosts(newRecipientPrimary, donorPrimary, {ok: true, ismaster: true}); +awaitRSClientHosts(newRecipientPrimary, delayedSecondary, {ok: true, secondary: true}); +awaitRSClientHosts(newRecipientPrimary, donorSecondary, {ok: false}); + +jsTestLog("Releasing failpoints"); +hangNewRecipientPrimaryAfterCreatingRSM.off(); +hangRecipientPrimaryAfterCreatingConnections.off(); + +res = newRecipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient migration"}); +currOp = res.inprog[0]; +// The migration should not be complete and there should be no sync source stored, since the new +// recipient primary does not have a valid sync source to choose from. +assert.eq(currOp.migrationCompleted, false, tojson(res)); +assert.eq(currOp.dataSyncCompleted, false, tojson(res)); +assert(!currOp.donorSyncSource, tojson(res)); + +jsTestLog("Restarting replication on 'delayedSecondary'"); +restartServerReplication(delayedSecondary); +// The recipient should eventually be able to connect to the lagged secondary, after the secondary +// has caught up and the exclude timeout has expired. +// TODO (SERVER-54256): After we add retries in sync source selection for the recipient primary, +// uncomment these lines. +// hangNewRecipientPrimaryAfterCreatingConnections.wait(); + +// res = newRecipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient migration"}); +// currOp = res.inprog[0]; +// assert.eq( +// delayedSecondary.host, +// currOp.donorSyncSource, +// `the new recipient primary should only be able to choose 'delayedSecondary' as sync source`); + +// hangNewRecipientPrimaryAfterCreatingConnections.off(); + +assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts)); +assert.commandWorked(tenantMigrationTest.forgetMigration(migrationOpts.migrationIdString)); + +// Remove 'donorSecondary' so that the test can complete properly. +donorRst.remove(donorSecondary); +donorRst.stopSet(); +tenantMigrationTest.stop(); +})(); diff --git a/src/mongo/client/streamable_replica_set_monitor.cpp b/src/mongo/client/streamable_replica_set_monitor.cpp index 9d4764979be..3e118440cc6 100644 --- a/src/mongo/client/streamable_replica_set_monitor.cpp +++ b/src/mongo/client/streamable_replica_set_monitor.cpp @@ -356,18 +356,20 @@ SemiFuture<std::vector<HostAndPort>> StreamableReplicaSetMonitor::getHostsOrRefr return {*immediateResult}; } - return _enqueueOutstandingQuery(lk, criteria, cancelToken, deadline); + return _enqueueOutstandingQuery(lk, criteria, excludedHosts, cancelToken, deadline); }); } SemiFuture<std::vector<HostAndPort>> StreamableReplicaSetMonitor::_enqueueOutstandingQuery( WithLock, const ReadPreferenceSetting& criteria, + const std::vector<HostAndPort>& excludedHosts, const CancelationToken& cancelToken, const Date_t& deadline) { auto query = std::make_shared<HostQuery>(); query->criteria = criteria; + query->excludedHosts = excludedHosts; auto pf = makePromiseFuture<std::vector<HostAndPort>>(); query->promise = std::move(pf.promise); @@ -783,7 +785,7 @@ void StreamableReplicaSetMonitor::_processOutstanding( // If query has not been canceled yet, try to satisfy it. if (!query->hasBeenResolved()) { - auto result = _getHosts(topologyDescription, query->criteria); + auto result = _getHosts(topologyDescription, query->criteria, query->excludedHosts); if (result) { if (query->tryResolveWithSuccess(std::move(*result))) { const auto latency = _executor->now() - query->start; diff --git a/src/mongo/client/streamable_replica_set_monitor.h b/src/mongo/client/streamable_replica_set_monitor.h index a1b0368981d..55cb63b87df 100644 --- a/src/mongo/client/streamable_replica_set_monitor.h +++ b/src/mongo/client/streamable_replica_set_monitor.h @@ -180,6 +180,8 @@ private: ReadPreferenceSetting criteria; + std::vector<HostAndPort> excludedHosts; + // Used to compute latency. Date_t start; @@ -201,6 +203,7 @@ private: SemiFuture<std::vector<HostAndPort>> _enqueueOutstandingQuery( WithLock, const ReadPreferenceSetting& criteria, + const std::vector<HostAndPort>& excludedHosts, const CancelationToken& cancelToken, const Date_t& deadline); diff --git a/src/mongo/db/repl/repl_server_parameters.idl b/src/mongo/db/repl/repl_server_parameters.idl index e58efd020c0..4f9b975ce2e 100644 --- a/src/mongo/db/repl/repl_server_parameters.idl +++ b/src/mongo/db/repl/repl_server_parameters.idl @@ -453,6 +453,16 @@ server_parameters: cpp_varname: tenantMigrationDisableX509Auth default: false + tenantMigrationExcludeDonorHostTimeoutMS: + description: >- + Period of time, in milliseconds, that a donor host should be excluded for. + set_at: startup + cpp_vartype: int + cpp_varname: tenantMigrationExcludeDonorHostTimeoutMS + default: 60000 + validator: + gte: 1 + feature_flags: featureFlagTenantMigrations: description: >- diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp index 0587e7865b8..e7d9d632b45 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp @@ -95,6 +95,7 @@ MONGO_FAIL_POINT_DEFINE(fpAfterStartingOplogApplierMigrationRecipientInstance); MONGO_FAIL_POINT_DEFINE(fpAfterDataConsistentMigrationRecipientInstance); MONGO_FAIL_POINT_DEFINE(hangBeforeTaskCompletion); MONGO_FAIL_POINT_DEFINE(fpAfterReceivingRecipientForgetMigration); +MONGO_FAIL_POINT_DEFINE(hangAfterCreatingRSM); namespace { // We never restart just the oplog fetcher. If a failure occurs, we restart the whole state machine @@ -280,6 +281,10 @@ boost::optional<BSONObj> TenantMigrationRecipientService::Instance::reportForCur if (_stateDoc.getExpireAt()) bob.append("expireAt", _stateDoc.getExpireAt()->toString()); + if (_client) { + bob.append("donorSyncSource", _client->getServerAddress()); + } + return bob.obj(); } @@ -388,6 +393,22 @@ std::unique_ptr<DBClientConnection> TenantMigrationRecipientService::Instance::_ return client; } +OpTime TenantMigrationRecipientService::Instance::_getDonorMajorityOpTime( + std::unique_ptr<mongo::DBClientConnection>& client) { + auto oplogOpTimeFields = + BSON(OplogEntry::kTimestampFieldName << 1 << OplogEntry::kTermFieldName << 1); + auto majorityOpTimeBson = + client->findOne(NamespaceString::kRsOplogNamespace.ns(), + Query().sort("$natural", -1), + &oplogOpTimeFields, + QueryOption_SecondaryOk, + ReadConcernArgs(ReadConcernLevel::kMajorityReadConcern).toBSONInner()); + uassert(5272003, "Found no entries in the remote oplog", !majorityOpTimeBson.isEmpty()); + + auto majorityOpTime = uassertStatusOK(OpTime::parseFromOplogEntry(majorityOpTimeBson)); + return majorityOpTime; +} + SemiFuture<TenantMigrationRecipientService::Instance::ConnectionPair> TenantMigrationRecipientService::Instance::_createAndConnectClients() { LOGV2_DEBUG(4880401, @@ -414,53 +435,118 @@ TenantMigrationRecipientService::Instance::_createAndConnectClients() { .getAsync([getHostCancelSource](auto) mutable { getHostCancelSource.cancel(); }); }); - // Get all donor hosts that we have excluded. - const auto& excludedHosts = _getExcludedDonorHosts(lk); + if (MONGO_unlikely(hangAfterCreatingRSM.shouldFail())) { + LOGV2(5272004, "hangAfterCreatingRSM failpoint enabled"); + hangAfterCreatingRSM.pauseWhileSet(); + } - return _donorReplicaSetMonitor - ->getHostOrRefresh(_readPreference, excludedHosts, getHostCancelSource.token()) - .thenRunOn(**_scopedExecutor) - .then([this, self = shared_from_this()](const HostAndPort& serverAddress) { - // Application name is constructed such that it doesn't exceeds - // kMaxApplicationNameByteLength (128 bytes). - // "TenantMigration_" (16 bytes) + <tenantId> (61 bytes) + "_" (1 byte) + - // <migrationUuid> (36 bytes) = 114 bytes length. - // Note: Since the total length of tenant database name (<tenantId>_<user provided db - // name>) can't exceed 63 bytes and the user provided db name should be at least one - // character long, the maximum length of tenantId can only be 61 bytes. - auto applicationName = - "TenantMigration_" + getTenantId() + "_" + getMigrationUUID().toString(); - - auto client = _connectAndAuth(serverAddress, applicationName); - - // Application name is constructed such that it doesn't exceeds - // kMaxApplicationNameByteLength (128 bytes). - // "TenantMigration_" (16 bytes) + <tenantId> (61 bytes) + "_" (1 byte) + - // <migrationUuid> (36 bytes) + _oplogFetcher" (13 bytes) = 127 bytes length. - applicationName += "_oplogFetcher"; - auto oplogFetcherClient = _connectAndAuth(serverAddress, applicationName); - return ConnectionPair(std::move(client), std::move(oplogFetcherClient)); - }) - .onError( - [this, self = shared_from_this()](const Status& status) -> SemiFuture<ConnectionPair> { + const auto kDelayedMajorityOpTimeErrorCode = 5272000; + + return AsyncTry([this, + self = shared_from_this(), + getHostCancelSource, + kDelayedMajorityOpTimeErrorCode] { + stdx::lock_guard lk(_mutex); + + // Get all donor hosts that we have excluded. + const auto& excludedHosts = _getExcludedDonorHosts(lk); + + return _donorReplicaSetMonitor + ->getHostOrRefresh(_readPreference, excludedHosts, getHostCancelSource.token()) + .thenRunOn(**_scopedExecutor) + .then([this, self = shared_from_this(), kDelayedMajorityOpTimeErrorCode]( + const HostAndPort& serverAddress) { + LOGV2(5272002, + "Attempting to connect to donor host", + "donorHost"_attr = serverAddress, + "tenantId"_attr = getTenantId(), + "migrationId"_attr = getMigrationUUID()); + // Application name is constructed such that it doesn't exceeds + // kMaxApplicationNameByteLength (128 bytes). + // "TenantMigration_" (16 bytes) + <tenantId> (61 bytes) + "_" (1 byte) + + // <migrationUuid> (36 bytes) = 114 bytes length. + // Note: Since the total length of tenant database name (<tenantId>_<user + // provided db name>) can't exceed 63 bytes and the user provided db name + // should be at least one character long, the maximum length of tenantId can + // only be 61 bytes. + auto applicationName = + "TenantMigration_" + getTenantId() + "_" + getMigrationUUID().toString(); + + auto client = _connectAndAuth(serverAddress, applicationName); + + boost::optional<repl::OpTime> startApplyingOpTime; + { + stdx::lock_guard lk(_mutex); + startApplyingOpTime = _stateDoc.getStartApplyingDonorOpTime(); + } + + if (startApplyingOpTime) { + auto majoritySnapshotOpTime = _getDonorMajorityOpTime(client); + + if (majoritySnapshotOpTime < *startApplyingOpTime) { + stdx::lock_guard lk(_mutex); + const auto now = + getGlobalServiceContext()->getFastClockSource()->now(); + _excludeDonorHost( + lk, + serverAddress, + now + Milliseconds(tenantMigrationExcludeDonorHostTimeoutMS)); + uasserted( + kDelayedMajorityOpTimeErrorCode, + str::stream() + << "majoritySnapshotOpTime on donor host must not be behind " + "startApplyingDonorOpTime, majoritySnapshotOpTime: " + << majoritySnapshotOpTime.toString() + << "; startApplyingDonorOpTime: " + << (*startApplyingOpTime).toString()); + } + } + + // Application name is constructed such that it doesn't exceed + // kMaxApplicationNameByteLength (128 bytes). + // "TenantMigration_" (16 bytes) + <tenantId> (61 bytes) + "_" (1 byte) + + // <migrationUuid> (36 bytes) + _oplogFetcher" (13 bytes) = 127 bytes + // length. + applicationName += "_oplogFetcher"; + auto oplogFetcherClient = _connectAndAuth(serverAddress, applicationName); + return ConnectionPair(std::move(client), std::move(oplogFetcherClient)); + }); + }) + .until([this, self = shared_from_this(), kDelayedMajorityOpTimeErrorCode]( + const StatusWith<ConnectionPair>& status) { + if (!status.isOK()) { LOGV2_ERROR(4880404, "Connecting to donor failed", "tenantId"_attr = getTenantId(), "migrationId"_attr = getMigrationUUID(), - "error"_attr = status); + "error"_attr = status.getStatus()); // Make sure we don't end up with a partially initialized set of connections. stdx::lock_guard lk(_mutex); _client = nullptr; _oplogFetcherClient = nullptr; - return status; - }) + + // If the future chain has been interrupted, stop retrying. + if (_taskState.isInterrupted()) { + return true; + } + + // If the connection failed because the majority snapshot OpTime on the donor host + // was not ahead of our stored 'startApplyingDonorOpTime, choose another donor host + // to connect to. + if (status.getStatus() == ErrorCodes::Error(kDelayedMajorityOpTimeErrorCode)) { + return false; + } + } + return true; + }) + .on(**_scopedExecutor, CancelationToken::uncancelable()) .semi(); } -void TenantMigrationRecipientService::Instance::excludeDonorHost(const HostAndPort& host, - Date_t until) { - stdx::lock_guard lk(_mutex); +void TenantMigrationRecipientService::Instance::_excludeDonorHost(WithLock, + const HostAndPort& host, + Date_t until) { LOGV2_DEBUG(5271800, 2, "Excluding donor host", @@ -546,22 +632,13 @@ void TenantMigrationRecipientService::Instance::_getStartOpTimesFromDonor(WithLo invariant(!_stateDoc.getStartFetchingDonorOpTime().has_value()); // Get the last oplog entry at the read concern majority optime in the remote oplog. It // does not matter which tenant it is for. - auto oplogOpTimeFields = - BSON(OplogEntry::kTimestampFieldName << 1 << OplogEntry::kTermFieldName << 1); - auto lastOplogEntry1Bson = - _client->findOne(NamespaceString::kRsOplogNamespace.ns(), - Query().sort("$natural", -1), - &oplogOpTimeFields, - QueryOption_SecondaryOk, - ReadConcernArgs(ReadConcernLevel::kMajorityReadConcern).toBSONInner()); - uassert(4880601, "Found no entries in the remote oplog", !lastOplogEntry1Bson.isEmpty()); + auto lastOplogEntry1OpTime = _getDonorMajorityOpTime(_client); LOGV2_DEBUG(4880600, 2, "Found last oplog entry at read concern majority optime on remote node", "migrationId"_attr = getMigrationUUID(), "tenantId"_attr = _stateDoc.getTenantId(), - "lastOplogEntry"_attr = lastOplogEntry1Bson); - auto lastOplogEntry1OpTime = uassertStatusOK(OpTime::parseFromOplogEntry(lastOplogEntry1Bson)); + "lastOplogEntry"_attr = lastOplogEntry1OpTime.toBSON()); // Get the optime of the earliest transaction that was open at the read concern majority optime // As with the last oplog entry, it does not matter that this may be for a different tenant; an @@ -589,21 +666,14 @@ void TenantMigrationRecipientService::Instance::_getStartOpTimesFromDonor(WithLo // We need to fetch the last oplog entry both before and after getting the transaction // table entry, as otherwise there is a potential race where we may try to apply // a commit for which we have not fetched a previous transaction oplog entry. - auto lastOplogEntry2Bson = - _client->findOne(NamespaceString::kRsOplogNamespace.ns(), - Query().sort("$natural", -1), - &oplogOpTimeFields, - QueryOption_SecondaryOk, - ReadConcernArgs(ReadConcernLevel::kMajorityReadConcern).toBSONInner()); - uassert(4880603, "Found no entries in the remote oplog", !lastOplogEntry2Bson.isEmpty()); + auto lastOplogEntry2OpTime = _getDonorMajorityOpTime(_client); LOGV2_DEBUG(4880604, 2, "Found last oplog entry at the read concern majority optime (after reading txn " "table) on remote node", "migrationId"_attr = getMigrationUUID(), "tenantId"_attr = _stateDoc.getTenantId(), - "lastOplogEntry"_attr = lastOplogEntry2Bson); - auto lastOplogEntry2OpTime = uassertStatusOK(OpTime::parseFromOplogEntry(lastOplogEntry2Bson)); + "lastOplogEntry"_attr = lastOplogEntry2OpTime.toBSON()); _stateDoc.setStartApplyingDonorOpTime(lastOplogEntry2OpTime); OpTime startFetchingDonorOpTime = lastOplogEntry1OpTime; diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.h b/src/mongo/db/repl/tenant_migration_recipient_service.h index e801c0a7fc2..9de9ebc73c1 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.h +++ b/src/mongo/db/repl/tenant_migration_recipient_service.h @@ -151,11 +151,6 @@ public: const Timestamp& donorTs) const; /* - * Suppresses selecting 'host' as the donor sync source, until 'until'. - */ - void excludeDonorHost(const HostAndPort& host, Date_t until); - - /* * Set the oplog creator functor, to allow use of a mock oplog fetcher. */ void setCreateOplogFetcherFn_forTest( @@ -171,6 +166,14 @@ public: _tenantOplogApplier->shutdown(); } + /* + * Suppresses selecting 'host' as the donor sync source, until 'until'. + */ + void excludeDonorHost_forTest(const HostAndPort& host, Date_t until) { + stdx::lock_guard lk(_mutex); + _excludeDonorHost(lk, host, until); + } + private: friend class TenantMigrationRecipientServiceTest; @@ -391,10 +394,15 @@ public: void _cleanupOnDataSyncCompletion(Status status); /* + * Suppresses selecting 'host' as the donor sync source, until 'until'. + */ + void _excludeDonorHost(WithLock, const HostAndPort& host, Date_t until); + + /* * Returns a vector of currently excluded donor hosts. Also removes hosts from the list of * excluded donor nodes, if the exclude duration has expired. */ - std::vector<HostAndPort> _getExcludedDonorHosts(WithLock lk); + std::vector<HostAndPort> _getExcludedDonorHosts(WithLock); /* * Makes the failpoint to stop or hang based on failpoint data "action" field. @@ -406,6 +414,11 @@ public: */ SharedSemiFuture<void> _updateStateDocForMajority(WithLock lk) const; + /* + * Returns the majority OpTime on the donor node that 'client' is connected to. + */ + OpTime _getDonorMajorityOpTime(std::unique_ptr<mongo::DBClientConnection>& client); + mutable Mutex _mutex = MONGO_MAKE_LATCH("TenantMigrationRecipientService::_mutex"); // All member variables are labeled with one of the following codes indicating the diff --git a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp index 31f833b75f8..342201ce67e 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp @@ -306,32 +306,45 @@ protected: ASSERT_OK(persistedStateDocWithStatus.getStatus()); ASSERT_BSONOBJ_EQ(memoryStateDoc.toBSON(), persistedStateDocWithStatus.getValue().toBSON()); } - void insertToAllNodes(MockReplicaSet* replSet, const std::string& nss, BSONObj obj) { - for (const auto& host : replSet->getHosts()) { + void insertToNodes(MockReplicaSet* replSet, + const std::string& nss, + BSONObj obj, + const std::vector<HostAndPort>& hosts) { + for (const auto& host : hosts) { replSet->getNode(host.toString())->insert(nss, obj); } } - void clearCollectionAllNodes(MockReplicaSet* replSet, const std::string& nss) { - for (const auto& host : replSet->getHosts()) { + void insertToAllNodes(MockReplicaSet* replSet, const std::string& nss, BSONObj obj) { + insertToNodes(replSet, nss, obj, replSet->getHosts()); + } + + void clearCollection(MockReplicaSet* replSet, + const std::string& nss, + const std::vector<HostAndPort>& hosts) { + for (const auto& host : hosts) { replSet->getNode(host.toString())->remove(nss, Query()); } } - void insertTopOfOplog(MockReplicaSet* replSet, const OpTime& topOfOplogOpTime) { + void insertTopOfOplog(MockReplicaSet* replSet, + const OpTime& topOfOplogOpTime, + const std::vector<HostAndPort> hosts = {}) { + const auto targetHosts = hosts.empty() ? replSet->getHosts() : hosts; // The MockRemoteDBService does not actually implement the database, so to make our // find work correctly we must make sure there's only one document to find. - clearCollectionAllNodes(replSet, NamespaceString::kRsOplogNamespace.ns()); - insertToAllNodes(replSet, - NamespaceString::kRsOplogNamespace.ns(), - makeOplogEntry(topOfOplogOpTime, - OpTypeEnum::kNoop, - {} /* namespace */, - boost::none /* uuid */, - BSONObj() /* o */, - boost::none /* o2 */) - .getEntry() - .toBSON()); + clearCollection(replSet, NamespaceString::kRsOplogNamespace.ns(), targetHosts); + insertToNodes(replSet, + NamespaceString::kRsOplogNamespace.ns(), + makeOplogEntry(topOfOplogOpTime, + OpTypeEnum::kNoop, + {} /* namespace */, + boost::none /* uuid */, + BSONObj() /* o */, + boost::none /* o2 */) + .getEntry() + .toBSON(), + targetHosts); } // Accessors to class private members @@ -648,7 +661,7 @@ TEST_F(TenantMigrationRecipientServiceTest, // Mark the primary as excluded. auto hosts = replSet.getHosts(); auto now = opCtx->getServiceContext()->getFastClockSource()->now(); - instance->excludeDonorHost(hosts.at(0), now + Milliseconds(500)); + instance->excludeDonorHost_forTest(hosts.at(0), now + Milliseconds(500)); AtomicWord<bool> runReplMonitor{true}; // Keep scanning the replica set while waiting to reach the failpoint. This would normally @@ -722,7 +735,7 @@ TEST_F(TenantMigrationRecipientServiceTest, auto hosts = replSet.getHosts(); auto now = opCtx->getServiceContext()->getFastClockSource()->now(); auto excludeTime = Milliseconds(500); - instance->excludeDonorHost(hosts.at(0), now + excludeTime); + instance->excludeDonorHost_forTest(hosts.at(0), now + excludeTime); // Advance the clock past excludeTime. advanceTime(excludeTime + Milliseconds(500)); @@ -789,7 +802,7 @@ TEST_F(TenantMigrationRecipientServiceTest, auto hosts = replSet.getHosts(); for (const auto& host : hosts) { const auto now = opCtx->getServiceContext()->getFastClockSource()->now(); - instance->excludeDonorHost(host, now + Milliseconds(500)); + instance->excludeDonorHost_forTest(host, now + Milliseconds(500)); } AtomicWord<bool> runReplMonitor{true}; @@ -864,7 +877,7 @@ TEST_F(TenantMigrationRecipientServiceTest, auto hosts = replSet.getHosts(); auto now = opCtx->getServiceContext()->getFastClockSource()->now(); auto excludeTime = Milliseconds(500); - instance->excludeDonorHost(hosts.at(0), now + excludeTime); + instance->excludeDonorHost_forTest(hosts.at(0), now + excludeTime); hangFp->setMode(FailPoint::off); taskFp->waitForTimesEntered(taskFpInitialTimesEntered + 1); @@ -930,7 +943,7 @@ TEST_F(TenantMigrationRecipientServiceTest, auto hosts = replSet.getHosts(); auto now = opCtx->getServiceContext()->getFastClockSource()->now(); auto excludeTime = Milliseconds(500); - instance->excludeDonorHost(hosts.at(0), now + excludeTime); + instance->excludeDonorHost_forTest(hosts.at(0), now + excludeTime); // Advance the clock past excludeTime. advanceTime(excludeTime + Milliseconds(500)); @@ -961,6 +974,61 @@ TEST_F(TenantMigrationRecipientServiceTest, ASSERT_OK(instance->getCompletionFuture().getNoThrow()); } +TEST_F(TenantMigrationRecipientServiceTest, + TenantMigrationRecipientConnection_RemoteMajorityOpTimeBehindStartApplying) { + stopFailPointEnableBlock fp("fpAfterConnectingTenantMigrationRecipientInstance"); + + const UUID migrationUUID = UUID::gen(); + const OpTime remoteMajorityOpTime(Timestamp(5, 1), 1); + const OpTime startApplyingOpTime(Timestamp(6, 1), 1); + + auto taskFp = globalFailPointRegistry().find("hangBeforeTaskCompletion"); + auto timesEntered = taskFp->setMode(FailPoint::alwaysOn, 0); + + // Insert the remote majority optime into the oplogs of the first two hosts. + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); + const auto hosts = replSet.getHosts(); + const std::vector<HostAndPort> advancedOpTimeHosts = {hosts.begin(), hosts.begin() + 2}; + + insertTopOfOplog(&replSet, remoteMajorityOpTime, advancedOpTimeHosts); + insertTopOfOplog(&replSet, startApplyingOpTime, {hosts.at(2)}); + + TenantMigrationRecipientDocument initialStateDocument( + migrationUUID, + replSet.getConnectionString(), + "tenantA", + ReadPreferenceSetting(ReadPreference::PrimaryPreferred)); + initialStateDocument.setRecipientCertificateForDonor(kRecipientPEMPayload); + initialStateDocument.setStartApplyingDonorOpTime(startApplyingOpTime); + + // Create and start the instance. + auto opCtx = makeOperationContext(); + auto instance = TenantMigrationRecipientService::Instance::getOrCreate( + opCtx.get(), _service, initialStateDocument.toBSON()); + ASSERT(instance.get()); + ASSERT_EQ(migrationUUID, instance->getMigrationUUID()); + + taskFp->waitForTimesEntered(timesEntered + 1); + + auto* client = getClient(instance.get()); + auto* oplogFetcherClient = getOplogFetcherClient(instance.get()); + // Both clients should be populated. + ASSERT(client); + ASSERT(oplogFetcherClient); + + // Clients should be distinct. + ASSERT(client != oplogFetcherClient); + + // Clients should be connected to donor node at index 2. + auto donorHost = hosts[2].toString(); + ASSERT_EQ(donorHost, client->getServerAddress()); + ASSERT(client->isStillConnected()); + ASSERT_EQ(donorHost, oplogFetcherClient->getServerAddress()); + ASSERT(oplogFetcherClient->isStillConnected()); + + taskFp->setMode(FailPoint::off, 0); +} + TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientConnection_PrimaryFailsOver) { stopFailPointEnableBlock fp("fpAfterConnectingTenantMigrationRecipientInstance"); @@ -1506,14 +1574,13 @@ TEST_F(TenantMigrationRecipientServiceTest, OplogFetcherResumesFromTopOfOplogBuf TEST_F(TenantMigrationRecipientServiceTest, OplogFetcherNoDocInBufferToResumeFrom) { const UUID migrationUUID = UUID::gen(); - const OpTime initialOpTime(Timestamp(1, 1), 1); const OpTime startFetchingOpTime(Timestamp(2, 1), 1); const OpTime clonerFinishedOpTime(Timestamp(3, 1), 1); const OpTime resumeFetchingOpTime(Timestamp(4, 1), 1); const OpTime dataConsistentOpTime(Timestamp(5, 1), 1); MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); - insertTopOfOplog(&replSet, initialOpTime); + insertTopOfOplog(&replSet, startFetchingOpTime); TenantMigrationRecipientDocument initialStateDocument( migrationUUID, @@ -1612,13 +1679,12 @@ TEST_F(TenantMigrationRecipientServiceTest, OplogFetcherNoDocInBufferToResumeFro TEST_F(TenantMigrationRecipientServiceTest, OplogApplierResumesFromLastNoOpOplogEntry) { const UUID migrationUUID = UUID::gen(); - const OpTime initialOpTime(Timestamp(1, 1), 1); const OpTime clonerFinishedOpTime(Timestamp(2, 1), 1); const OpTime resumeOpTime(Timestamp(3, 1), 1); const OpTime dataConsistentOpTime(Timestamp(4, 1), 1); MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); - insertTopOfOplog(&replSet, initialOpTime); + insertTopOfOplog(&replSet, clonerFinishedOpTime); TenantMigrationRecipientDocument initialStateDocument( migrationUUID, @@ -1633,7 +1699,7 @@ TEST_F(TenantMigrationRecipientServiceTest, OplogApplierResumesFromLastNoOpOplog clonerFinishedOpTime /* cloneFinishedRecipientOpTime */, dataConsistentOpTime /* dataConsistentStopDonorOpTime */, clonerFinishedOpTime /* startApplyingDonorOpTime */, - initialOpTime /* startFetchingDonorOpTime */); + clonerFinishedOpTime /* startFetchingDonorOpTime */); auto opCtx = makeOperationContext(); std::shared_ptr<TenantMigrationRecipientService::Instance> instance; @@ -1732,12 +1798,11 @@ TEST_F(TenantMigrationRecipientServiceTest, OplogApplierResumesFromLastNoOpOplog TEST_F(TenantMigrationRecipientServiceTest, OplogApplierResumesFromStartDonorApplyingOpTime) { const UUID migrationUUID = UUID::gen(); - const OpTime initialOpTime(Timestamp(1, 1), 1); const OpTime startApplyingOpTime(Timestamp(2, 1), 1); const OpTime dataConsistentOpTime(Timestamp(4, 1), 1); MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); - insertTopOfOplog(&replSet, initialOpTime); + insertTopOfOplog(&replSet, startApplyingOpTime); TenantMigrationRecipientDocument initialStateDocument( migrationUUID, @@ -1749,10 +1814,12 @@ TEST_F(TenantMigrationRecipientServiceTest, OplogApplierResumesFromStartDonorApp // We skip cloning here as a way to simulate that the recipient service has detected an existing // migration on startup and will attempt to resume oplog fetching from the appropriate optime. updateStateDocToCloningFinished(initialStateDocument, - OpTime(Timestamp(10, 1), 1) /* cloneFinishedRecipientOpTime */, + OpTime(Timestamp(10, 1), 1) /* cloneFinishedRecipientOpTime + */ + , dataConsistentOpTime /* dataConsistentStopDonorOpTime */, startApplyingOpTime /* startApplyingDonorOpTime */, - initialOpTime /* startFetchingDonorOpTime */); + startApplyingOpTime /* startFetchingDonorOpTime */); auto opCtx = makeOperationContext(); std::shared_ptr<TenantMigrationRecipientService::Instance> instance; @@ -1775,8 +1842,9 @@ TEST_F(TenantMigrationRecipientServiceTest, OplogApplierResumesFromStartDonorApp // field. These oplog entries do not satisfy the conditions for the oplog applier to resume from // so we default to resuming from 'startDonorApplyingOpTime'. const auto insertNss = NamespaceString("tenantA_foo.bar"); + const auto beforeStartApplyingOpTime = OpTime(Timestamp(1, 1), 1); const auto entryBeforeStartApplyingOpTime = makeOplogEntry( - initialOpTime, + beforeStartApplyingOpTime, OpTypeEnum::kInsert, insertNss, UUID::gen(), @@ -1886,12 +1954,11 @@ TEST_F(TenantMigrationRecipientServiceTest, OplogApplierResumesFromStartDonorApp TEST_F(TenantMigrationRecipientServiceTest, OplogFetcherResumesFromStartFetchingOpTimeWithDocInBuffer) { const UUID migrationUUID = UUID::gen(); - const OpTime initialOpTime(Timestamp(1, 1), 1); const OpTime startFetchingOpTime(Timestamp(2, 1), 1); const OpTime dataConsistentOpTime(Timestamp(4, 1), 1); MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); - insertTopOfOplog(&replSet, initialOpTime); + insertTopOfOplog(&replSet, startFetchingOpTime); TenantMigrationRecipientDocument initialStateDocument( migrationUUID, |