summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorXueruiFa <xuerui.fa@mongodb.com>2021-04-06 14:42:20 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-04-22 01:56:13 +0000
commita48b6a726df973f9aa22b90c0f23f846c1fb985c (patch)
tree034b2a133de1932d6837cda42231fd846569ddfc
parent8d962a7ce64afc7a0a96da4212506f557792786c (diff)
downloadmongo-a48b6a726df973f9aa22b90c0f23f846c1fb985c.tar.gz
SERVER-55355: Retry tenant migration on oplog fetcher errors
(cherry picked from commit e51091e12eec32b8fdb18ec56cf64928b1a61348)
-rw-r--r--jstests/replsets/tenant_migration_sync_source_too_stale.js157
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.cpp26
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.h2
3 files changed, 180 insertions, 5 deletions
diff --git a/jstests/replsets/tenant_migration_sync_source_too_stale.js b/jstests/replsets/tenant_migration_sync_source_too_stale.js
new file mode 100644
index 00000000000..07f8c315d77
--- /dev/null
+++ b/jstests/replsets/tenant_migration_sync_source_too_stale.js
@@ -0,0 +1,157 @@
+/**
+ * Tests that a migration will retry if the oplog fetcher discoveres that its sync source is too
+ * stale. We test this with a donor replica set that has two secondaries, 'donorSecondary' and
+ * 'delayedSecondary'. We force the recipient to sync from 'donorSecondary'. Then, after the
+ * recipient has set its 'startFetchingDonorOpTime', we stop replication on 'delayedSecondary' and
+ * advance the OpTime on 'donorSecondary'. Next, we wait until the recipient is about to report that
+ * it has reached a consistent state. At this point, it should have advanced its 'lastFetched' to be
+ * ahead of 'startFetchingDonorOpTime'. After forcing the recipient to restart and sync from
+ * 'delayedSecondary', it should see that it is too stale. As a result, it should retry sync source
+ * selection until it finds a sync source that is no longer too stale.
+ *
+ * @tags: [requires_fcv_49, requires_majority_read_concern, incompatible_with_eft,
+ * incompatible_with_windows_tls]
+ */
+
+(function() {
+"use strict";
+
+load("jstests/libs/fail_point_util.js");
+load("jstests/libs/uuid_util.js");
+load("jstests/libs/write_concern_util.js");
+load("jstests/replsets/libs/tenant_migration_test.js");
+load('jstests/replsets/rslib.js');
+
+const donorRst = new ReplSetTest({
+ name: `${jsTestName()}_donor`,
+ nodes: 3,
+ settings: {chainingAllowed: false},
+ nodeOptions: TenantMigrationUtil.makeX509OptionsForTest().donor,
+});
+donorRst.startSet();
+donorRst.initiateWithHighElectionTimeout();
+
+if (!TenantMigrationUtil.isFeatureFlagEnabled(donorRst.getPrimary())) {
+ jsTestLog("Skipping test because the tenant migrations feature flag is disabled");
+ donorRst.stopSet();
+ return;
+}
+
+const tenantMigrationTest = new TenantMigrationTest({
+ name: jsTestName(),
+ donorRst,
+ // Set a low value for excluding donor hosts so that the test doesn't take long to retry a sync
+ // source.
+ sharedOptions: {setParamter: {tenantMigrationExcludeDonorHostTimeoutMS: 1000}}
+});
+
+const tenantId = "testTenantId";
+const tenantDB = tenantMigrationTest.tenantDB(tenantId, "testDB");
+const collName = "testColl";
+
+const delayedSecondary = donorRst.getSecondaries()[0];
+const donorSecondary = donorRst.getSecondaries()[1];
+
+const recipientPrimary = tenantMigrationTest.getRecipientPrimary();
+const hangRecipientPrimaryAfterCreatingRSM =
+ configureFailPoint(recipientPrimary, 'hangAfterCreatingRSM');
+const hangRecipientPrimaryAfterRetrievingStartOpTimes = configureFailPoint(
+ recipientPrimary, 'fpAfterRetrievingStartOpTimesMigrationRecipientInstance', {action: "hang"});
+const hangRecipientPrimaryBeforeConsistentState = configureFailPoint(
+ recipientPrimary, 'fpBeforeFulfillingDataConsistentPromise', {action: "hang"});
+
+const migrationId = UUID();
+const migrationOpts = {
+ migrationIdString: extractUUIDFromObject(migrationId),
+ tenantId,
+ // Configure the recipient primary to only choose a secondary as sync source.
+ readPreference: {mode: 'secondary'}
+};
+
+jsTestLog("Starting the tenant migration");
+assert.commandWorked(tenantMigrationTest.startMigration(migrationOpts));
+hangRecipientPrimaryAfterCreatingRSM.wait();
+
+awaitRSClientHosts(recipientPrimary, donorSecondary, {ok: true, secondary: true});
+awaitRSClientHosts(recipientPrimary, delayedSecondary, {ok: true, secondary: true});
+
+// Turn on the 'waitInHello' failpoint. This will cause the delayed secondary to cease sending
+// hello responses and the RSM should mark the node as down. This is necessary so that the
+// delayed secondary is not chosen as the sync source here.
+jsTestLog(
+ "Turning on waitInHello failpoint. Delayed donor secondary should stop sending hello responses.");
+const helloFailpoint = configureFailPoint(delayedSecondary, "waitInHello");
+awaitRSClientHosts(recipientPrimary, delayedSecondary, {ok: false});
+
+hangRecipientPrimaryAfterCreatingRSM.off();
+hangRecipientPrimaryAfterRetrievingStartOpTimes.wait();
+
+let res = recipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient migration"});
+let currOp = res.inprog[0];
+// The migration should not be complete.
+assert.eq(currOp.migrationCompleted, false, tojson(res));
+assert.eq(currOp.dataSyncCompleted, false, tojson(res));
+// The sync source can only be 'donorSecondary'.
+assert.eq(donorSecondary.host, currOp.donorSyncSource, tojson(res));
+
+helloFailpoint.off();
+
+// Stop replicating on one of the secondaries and advance the OpTime on the other nodes in the
+// donor replica set.
+jsTestLog("Stopping replication on delayedSecondary");
+stopServerReplication(delayedSecondary);
+tenantMigrationTest.insertDonorDB(tenantDB, collName);
+
+// Wait for 'lastFetched' to be advanced on the recipient.
+hangRecipientPrimaryAfterRetrievingStartOpTimes.off();
+hangRecipientPrimaryBeforeConsistentState.wait();
+
+const hangRecipientPrimaryAfterRestart = configureFailPoint(
+ recipientPrimary, 'fpAfterRetrievingStartOpTimesMigrationRecipientInstance', {action: "hang"});
+
+jsTestLog("Stopping donorSecondary");
+donorRst.stop(donorSecondary);
+awaitRSClientHosts(recipientPrimary, delayedSecondary, {ok: true, secondary: true});
+awaitRSClientHosts(recipientPrimary, donorSecondary, {ok: false});
+
+hangRecipientPrimaryBeforeConsistentState.off();
+const configRecipientNs = recipientPrimary.getCollection(TenantMigrationTest.kConfigRecipientsNS);
+assert.soon(() => {
+ // Wait for the recipient to realize that the donor sync source has been shut down and retry
+ // sync source selection.
+ const recipientDoc = configRecipientNs.find({"_id": migrationId}).toArray();
+ return recipientDoc[0].numRestartsDueToDonorConnectionFailure == 1;
+});
+
+hangRecipientPrimaryAfterRestart.wait();
+
+res = recipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient migration"});
+currOp = res.inprog[0];
+// The migration should not be complete.
+assert.eq(currOp.migrationCompleted, false, tojson(res));
+assert.eq(currOp.dataSyncCompleted, false, tojson(res));
+// Since 'donorSecondary' was shut down, the sync source can only be 'delayedSecondary'.
+assert.eq(delayedSecondary.host, currOp.donorSyncSource, tojson(res));
+
+hangRecipientPrimaryAfterRestart.off();
+
+assert.soon(() => {
+ // Verify that the recipient eventually has to restart again, since its lastFetched is ahead of
+ // the last OpTime on 'delayedSecondary'.
+ const recipientDoc = configRecipientNs.find({"_id": migrationId}).toArray();
+ return recipientDoc[0].numRestartsDueToDonorConnectionFailure == 2;
+});
+
+// Let 'delayedSecondary' catch back up to the recipient's lastFetched OpTime.
+donorRst.remove(donorSecondary);
+restartServerReplication(delayedSecondary);
+donorRst.awaitReplication();
+
+// Verify that the migration eventually commits successfully.
+const migrationRes =
+ assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts));
+assert.eq(migrationRes.state, TenantMigrationTest.DonorState.kCommitted);
+
+donorRst.stopSet();
+tenantMigrationTest.stop();
+})();
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
index b56dc8bd1b5..a6a58038f27 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
@@ -113,6 +113,12 @@ boost::intrusive_ptr<ExpressionContext> makeExpressionContext(OperationContext*
boost::none); /* collUUID */
}
+bool isRetriableOplogFetcherError(Status oplogFetcherStatus) {
+ return oplogFetcherStatus == ErrorCodes::InvalidSyncSource ||
+ oplogFetcherStatus == ErrorCodes::TooStaleToSyncFromSource ||
+ oplogFetcherStatus == ErrorCodes::ShutdownInProgress;
+}
+
} // namespace
// A convenient place to set test-specific parameters.
@@ -1317,9 +1323,23 @@ void TenantMigrationRecipientService::Instance::_oplogFetcherCallback(Status opl
"tenantId"_attr = getTenantId(),
"migrationId"_attr = getMigrationUUID(),
"error"_attr = oplogFetcherStatus);
+ if (isRetriableOplogFetcherError(oplogFetcherStatus)) {
+ LOGV2_DEBUG(5535500,
+ 1,
+ "Recipient migration service oplog fetcher received retriable error, "
+ "excluding donor host as sync source and retrying",
+ "tenantId"_attr = getTenantId(),
+ "migrationId"_attr = getMigrationUUID(),
+ "error"_attr = oplogFetcherStatus);
+
+ stdx::lock_guard lk(_mutex);
+ const auto now = getGlobalServiceContext()->getFastClockSource()->now();
+ _excludeDonorHost(lk,
+ _client->getServerHostAndPort(),
+ now + Milliseconds(tenantMigrationExcludeDonorHostTimeoutMS));
+ }
_interrupt(oplogFetcherStatus, /*skipWaitingForForgetMigration=*/false);
}
- _oplogFetcherStatus = oplogFetcherStatus;
}
void TenantMigrationRecipientService::Instance::_stopOrHangOnFailPoint(FailPoint* fp,
@@ -2102,7 +2122,8 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
if (_taskState.isInterrupted()) {
status = _taskState.getInterruptStatus();
}
- if (ErrorCodes::isRetriableError(status) && !_taskState.isExternalInterrupt() &&
+ if ((ErrorCodes::isRetriableError(status) || isRetriableOplogFetcherError(status)) &&
+ !_taskState.isExternalInterrupt() &&
_stateDocPersistedPromise.getFuture().isReady()) {
// Reset the task state and clear the interrupt status.
if (!_taskState.isRunning()) {
@@ -2110,7 +2131,6 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
}
_isRestartingOplogApplier = true;
// Clean up the async components before retrying the future chain.
- _oplogFetcherStatus = boost::none;
std::unique_ptr<OplogFetcher> savedDonorOplogFetcher;
std::shared_ptr<TenantOplogApplier> savedTenantOplogApplier;
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.h b/src/mongo/db/repl/tenant_migration_recipient_service.h
index 1a0b97d8612..0bfa8cd4b80 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.h
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.h
@@ -541,8 +541,6 @@ public:
std::unique_ptr<TenantMigrationSharedData> _sharedData; // (S)
// Indicates whether the main task future continuation chain state kicked off by run().
TaskState _taskState; // (M)
- // Used to indicate whether the migration is able to be retried on fetcher error.
- boost::optional<Status> _oplogFetcherStatus; // (M)
// Promise that is resolved when the state document is initialized and persisted.
SharedPromise<void> _stateDocPersistedPromise; // (W)