diff options
author | Christopher Caplinger <christopher.caplinger@mongodb.com> | 2022-01-05 22:32:26 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-01-05 23:11:11 +0000 |
commit | 3ad8827a2f8763f4ff2eac719e6b0420f0627b33 (patch) | |
tree | 8765d1f78efbd308d2e2b776afcdded6202ded03 | |
parent | ad4f490665681d18c1291ce52e8ba6a01f20788b (diff) | |
download | mongo-3ad8827a2f8763f4ff2eac719e6b0420f0627b33.tar.gz |
SERVER-61131: Store backup cursor results and set state to 'learned filenames'
23 files changed, 233 insertions, 73 deletions
diff --git a/jstests/replsets/libs/tenant_migration_test.js b/jstests/replsets/libs/tenant_migration_test.js index 9cbb7523559..0e70ca11dbc 100644 --- a/jstests/replsets/libs/tenant_migration_test.js +++ b/jstests/replsets/libs/tenant_migration_test.js @@ -594,11 +594,20 @@ TenantMigrationTest.DonorState = { }; TenantMigrationTest.RecipientState = { + kUninitialized: "uninitialized", kStarted: "started", kConsistent: "consistent", kDone: "done", + kLearnedFilenames: "learned filenames", + kCopiedFiles: "copied files", }; +TenantMigrationTest.RecipientStateEnum = + Object.keys(TenantMigrationTest.RecipientState).reduce((acc, key, idx) => { + acc[key] = idx; + return acc; + }, {}); + TenantMigrationTest.State = TenantMigrationTest.DonorState; TenantMigrationTest.DonorAccessState = { diff --git a/jstests/replsets/tenant_migration_cloner_stats_with_failover.js b/jstests/replsets/tenant_migration_cloner_stats_with_failover.js index 6b7235ae54d..0f6fe130bf9 100644 --- a/jstests/replsets/tenant_migration_cloner_stats_with_failover.js +++ b/jstests/replsets/tenant_migration_cloner_stats_with_failover.js @@ -13,6 +13,7 @@ * @tags: [ * incompatible_with_eft, * incompatible_with_macos, + * incompatible_with_shard_merge, * incompatible_with_windows_tls, * requires_majority_read_concern, * requires_persistence, @@ -146,4 +147,4 @@ assert.eq(currOp.databases.databasesClonedBeforeFailover, 1, res); assert.eq(currOp.databases[dbName2].clonedCollectionsBeforeFailover, 1, res); tenantMigrationTest.stop(); -})();
\ No newline at end of file +})(); diff --git a/jstests/replsets/tenant_migration_donor_retry.js b/jstests/replsets/tenant_migration_donor_retry.js index 1fb1a36c526..28dfb5692a0 100644 --- a/jstests/replsets/tenant_migration_donor_retry.js +++ b/jstests/replsets/tenant_migration_donor_retry.js @@ -5,6 +5,7 @@ * @tags: [ * incompatible_with_eft, * incompatible_with_macos, + * incompatible_with_shard_merge, * incompatible_with_windows_tls, * requires_majority_read_concern, * requires_persistence, diff --git a/jstests/replsets/tenant_migration_donor_try_abort.js b/jstests/replsets/tenant_migration_donor_try_abort.js index 3d6ada6316c..2f4ba6b64d2 100644 --- a/jstests/replsets/tenant_migration_donor_try_abort.js +++ b/jstests/replsets/tenant_migration_donor_try_abort.js @@ -79,9 +79,15 @@ const migrationX509Options = TenantMigrationUtil.makeX509OptionsForTest(); jsTestLog( "Test sending donorAbortMigration during a tenant migration while recipientSyncData " + "command repeatedly fails with retryable errors."); - const tenantMigrationTest = new TenantMigrationTest({name: jsTestName()}); + if (TenantMigrationUtil.isShardMergeEnabled( + tenantMigrationTest.getDonorPrimary().getDB("admin"))) { + tenantMigrationTest.stop(); + jsTestLog("Skipping test, Shard Merge does not support retry"); + return; + } + const recipientPrimary = tenantMigrationTest.getRecipientPrimary(); let fp = configureFailPoint(recipientPrimary, "failCommand", { failInternalCommands: true, @@ -114,9 +120,15 @@ const migrationX509Options = TenantMigrationUtil.makeX509OptionsForTest(); (() => { jsTestLog("Test sending donorAbortMigration during a tenant migration while find command " + "against admin.system.keys repeatedly fails with retryable errors."); - const tenantMigrationTest = new TenantMigrationTest({name: jsTestName()}); + if (TenantMigrationUtil.isShardMergeEnabled( + tenantMigrationTest.getDonorPrimary().getDB("admin"))) { + tenantMigrationTest.stop(); + jsTestLog("Skipping test, Shard Merge does not support retry"); + return; + } + const recipientPrimary = tenantMigrationTest.getRecipientPrimary(); let fp = configureFailPoint(recipientPrimary, "failCommand", { failInternalCommands: true, @@ -521,6 +533,9 @@ const migrationX509Options = TenantMigrationUtil.makeX509OptionsForTest(); assert(findRes); }); + TenantMigrationTest.assertCommitted( + tenantMigrationTest.waitForMigrationToComplete(migrationOpts)); + tenantMigrationTest.stop(); donorRst.stopSet(); })(); diff --git a/jstests/replsets/tenant_migration_recipient_aborts_merge_on_donor_failure.js b/jstests/replsets/tenant_migration_recipient_aborts_merge_on_donor_failure.js index 2408162a2b2..51615707003 100644 --- a/jstests/replsets/tenant_migration_recipient_aborts_merge_on_donor_failure.js +++ b/jstests/replsets/tenant_migration_recipient_aborts_merge_on_donor_failure.js @@ -26,6 +26,7 @@ load("jstests/replsets/libs/tenant_migration_util.js"); if (!TenantMigrationUtil.isShardMergeEnabled(recipientPrimary.getDB("admin"))) { tenantMigrationTest.stop(); + jsTestLog("Skipping Shard Merge-specific test"); return; } @@ -56,11 +57,12 @@ load("jstests/replsets/libs/tenant_migration_util.js"); waitInFailPoint.wait(); jsTestLog("Stopping the donor primary"); donorRst.stop(donorPrimary); - waitInFailPoint.off(); // wait until the completion path has started after the abort const hangBeforeTaskCompletion = configureFailPoint(recipientPrimary, "hangBeforeTaskCompletion", {action: "hang"}); + + waitInFailPoint.off(); hangBeforeTaskCompletion.wait(); // step up a secondary so that the migration will complete and the diff --git a/jstests/replsets/tenant_migration_recipient_current_op.js b/jstests/replsets/tenant_migration_recipient_current_op.js index 975aa1fc8ea..378bbd2400d 100644 --- a/jstests/replsets/tenant_migration_recipient_current_op.js +++ b/jstests/replsets/tenant_migration_recipient_current_op.js @@ -25,13 +25,6 @@ load("jstests/replsets/libs/tenant_migration_util.js"); const tenantMigrationTest = new TenantMigrationTest({name: jsTestName()}); -// An object that mirrors the recipient migration states. -const migrationStates = { - kStarted: 1, - kConsistent: 2, - kDone: 3 -}; - const kMigrationId = UUID(); const kTenantId = 'testTenantId'; const kReadPreference = { @@ -45,6 +38,9 @@ const migrationOpts = { const recipientPrimary = tenantMigrationTest.getRecipientPrimary(); +const shardMergeIsEnabled = + TenantMigrationUtil.isShardMergeEnabled(recipientPrimary.getDB("admin")); + // Initial inserts to test cloner stats. const dbsToClone = ["db0", "db1", "db2"]; const collsToClone = ["coll0", "coll1"]; @@ -134,7 +130,7 @@ fpAfterPersistingStateDoc.wait(); let res = recipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient migration"}); checkStandardFieldsOK(res); let currOp = res.inprog[0]; -assert.eq(currOp.state, migrationStates.kStarted, res); +assert.eq(currOp.state, TenantMigrationTest.RecipientStateEnum.kStarted, res); assert.eq(currOp.migrationCompleted, false, res); assert.eq(currOp.dataSyncCompleted, false, res); assert(!currOp.hasOwnProperty("startFetchingDonorOpTime"), res); @@ -157,7 +153,13 @@ res = recipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient mi checkStandardFieldsOK(res); currOp = res.inprog[0]; assert.gt(new Date(), currOp.receiveStart, tojson(res)); -assert.eq(currOp.state, migrationStates.kStarted, res); + +if (shardMergeIsEnabled) { + assert.eq(currOp.state, TenantMigrationTest.RecipientStateEnum.kLearnedFilenames, res); +} else { + assert.eq(currOp.state, TenantMigrationTest.RecipientStateEnum.kStarted, res); +} + assert.eq(currOp.migrationCompleted, false, res); assert.eq(currOp.dataSyncCompleted, false, res); assert(!currOp.hasOwnProperty("dataConsistentStopDonorOpTime"), res); @@ -185,7 +187,13 @@ fpAfterCollectionCloner.wait(); res = recipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient migration"}); checkStandardFieldsOK(res); currOp = res.inprog[0]; -assert.eq(currOp.state, migrationStates.kStarted, res); + +if (shardMergeIsEnabled) { + assert.eq(currOp.state, TenantMigrationTest.RecipientStateEnum.kLearnedFilenames, res); +} else { + assert.eq(currOp.state, TenantMigrationTest.RecipientStateEnum.kStarted, res); +} + assert.eq(currOp.migrationCompleted, false, res); assert.eq(currOp.dataSyncCompleted, false, res); assert(!currOp.hasOwnProperty("expireAt"), res); @@ -228,7 +236,7 @@ checkStandardFieldsOK(res); checkPostConsistentFieldsOK(res); currOp = res.inprog[0]; // State should have changed. -assert.eq(currOp.state, migrationStates.kConsistent, res); +assert.eq(currOp.state, TenantMigrationTest.RecipientStateEnum.kConsistent, res); assert.eq(currOp.migrationCompleted, false, res); assert.eq(currOp.dataSyncCompleted, false, res); assert(!currOp.hasOwnProperty("expireAt"), res); @@ -242,7 +250,7 @@ checkStandardFieldsOK(res); checkPostConsistentFieldsOK(res); currOp = res.inprog[0]; // State should have changed. -assert.eq(currOp.state, migrationStates.kConsistent, res); +assert.eq(currOp.state, TenantMigrationTest.RecipientStateEnum.kConsistent, res); assert.eq(currOp.migrationCompleted, false, res); assert.eq(currOp.dataSyncCompleted, false, res); assert(!currOp.hasOwnProperty("expireAt"), res); @@ -268,7 +276,7 @@ res = recipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient mi checkStandardFieldsOK(res); checkPostConsistentFieldsOK(res); currOp = res.inprog[0]; -assert.eq(currOp.state, migrationStates.kConsistent, res); +assert.eq(currOp.state, TenantMigrationTest.RecipientStateEnum.kConsistent, res); assert.eq(currOp.migrationCompleted, false, res); // dataSyncCompleted should have changed. assert.eq(currOp.dataSyncCompleted, true, res); @@ -284,7 +292,7 @@ checkPostConsistentFieldsOK(res); currOp = res.inprog[0]; assert.eq(currOp.dataSyncCompleted, true, res); // State, completion status and expireAt should have changed. -assert.eq(currOp.state, migrationStates.kDone, res); +assert.eq(currOp.state, TenantMigrationTest.RecipientStateEnum.kDone, res); assert.eq(currOp.migrationCompleted, true, res); assert(currOp.hasOwnProperty("expireAt") && currOp.expireAt instanceof Date, res); assert(currOp.hasOwnProperty("databases")); diff --git a/jstests/replsets/tenant_migration_recipient_failover_before_creating_oplog_buffer.js b/jstests/replsets/tenant_migration_recipient_failover_before_creating_oplog_buffer.js index 3a1b29998e7..76662d80993 100644 --- a/jstests/replsets/tenant_migration_recipient_failover_before_creating_oplog_buffer.js +++ b/jstests/replsets/tenant_migration_recipient_failover_before_creating_oplog_buffer.js @@ -5,6 +5,7 @@ * @tags: [ * incompatible_with_eft, * incompatible_with_macos, + * incompatible_with_shard_merge, * incompatible_with_windows_tls, * requires_persistence, * requires_replication, @@ -55,4 +56,4 @@ jsTestLog("Waiting for migration to complete."); TenantMigrationTest.assertCommitted(tenantMigrationTest.waitForMigrationToComplete(migrationOpts)); tenantMigrationTest.stop(); -})();
\ No newline at end of file +})(); diff --git a/jstests/replsets/tenant_migration_recipient_initial_sync_cloning.js b/jstests/replsets/tenant_migration_recipient_initial_sync_cloning.js index 710d20f200d..4afde043e12 100644 --- a/jstests/replsets/tenant_migration_recipient_initial_sync_cloning.js +++ b/jstests/replsets/tenant_migration_recipient_initial_sync_cloning.js @@ -6,6 +6,7 @@ * @tags: [ * incompatible_with_eft, * incompatible_with_macos, + * incompatible_with_shard_merge, * incompatible_with_windows_tls, * requires_majority_read_concern, * requires_persistence, diff --git a/jstests/replsets/tenant_migration_recipient_initial_sync_recovery.js b/jstests/replsets/tenant_migration_recipient_initial_sync_recovery.js index 9d792cdbf8c..9337fe5999f 100644 --- a/jstests/replsets/tenant_migration_recipient_initial_sync_recovery.js +++ b/jstests/replsets/tenant_migration_recipient_initial_sync_recovery.js @@ -7,6 +7,7 @@ * @tags: [ * incompatible_with_eft, * incompatible_with_macos, + * incompatible_with_shard_merge, * incompatible_with_windows_tls, * requires_majority_read_concern, * requires_persistence, diff --git a/jstests/replsets/tenant_migration_recipient_retryable_writes_failover.js b/jstests/replsets/tenant_migration_recipient_retryable_writes_failover.js index 3fc35f5d772..02f6b4ddab1 100644 --- a/jstests/replsets/tenant_migration_recipient_retryable_writes_failover.js +++ b/jstests/replsets/tenant_migration_recipient_retryable_writes_failover.js @@ -5,6 +5,7 @@ * @tags: [ * incompatible_with_eft, * incompatible_with_macos, + * incompatible_with_shard_merge, * incompatible_with_windows_tls, * requires_majority_read_concern, * requires_persistence, diff --git a/jstests/replsets/tenant_migration_recipient_rollback_recovery.js b/jstests/replsets/tenant_migration_recipient_rollback_recovery.js index 0f694c40912..f57c9c68d56 100644 --- a/jstests/replsets/tenant_migration_recipient_rollback_recovery.js +++ b/jstests/replsets/tenant_migration_recipient_rollback_recovery.js @@ -4,6 +4,7 @@ * @tags: [ * incompatible_with_eft, * incompatible_with_macos, + * incompatible_with_shard_merge, * incompatible_with_windows_tls, * requires_majority_read_concern, * requires_persistence, diff --git a/jstests/replsets/tenant_migration_recipient_shard_merge.js b/jstests/replsets/tenant_migration_recipient_shard_merge.js new file mode 100644 index 00000000000..36f20529d03 --- /dev/null +++ b/jstests/replsets/tenant_migration_recipient_shard_merge.js @@ -0,0 +1,72 @@ +/** + * Tests recipient behavior for shard merge + * + * @tags: [ + * incompatible_with_eft, + * incompatible_with_macos, + * incompatible_with_windows_tls, + * requires_majority_read_concern, + * requires_persistence, + * ] + */ + +(function() { +"use strict"; + +load("jstests/libs/fail_point_util.js"); +load("jstests/libs/uuid_util.js"); +load("jstests/replsets/libs/tenant_migration_test.js"); +load("jstests/replsets/libs/tenant_migration_util.js"); + +(() => { + const tenantMigrationTest = + new TenantMigrationTest({name: jsTestName(), sharedOptions: {nodes: 3}}); + + const recipientPrimary = tenantMigrationTest.getRecipientPrimary(); + + if (!TenantMigrationUtil.isShardMergeEnabled(recipientPrimary.getDB("admin"))) { + tenantMigrationTest.stop(); + jsTestLog("Skipping Shard Merge-specific test"); + return; + } + + jsTestLog( + "Test that recipient state is correctly set to 'learned filenames' after creating the backup cursor"); + const tenantId = "testTenantId"; + const tenantDB = tenantMigrationTest.tenantDB(tenantId, "DB"); + const collName = "testColl"; + + const donorRst = tenantMigrationTest.getDonorRst(); + const donorPrimary = tenantMigrationTest.getDonorPrimary(); + const donorSecondary = donorRst.getSecondary(); + + tenantMigrationTest.insertDonorDB(tenantDB, collName); + + const failpoint = "fpAfterRetrievingStartOpTimesMigrationRecipientInstance"; + const waitInFailPoint = configureFailPoint(recipientPrimary, failpoint, {action: "hang"}); + + const migrationUuid = UUID(); + const migrationOpts = { + migrationIdString: extractUUIDFromObject(migrationUuid), + tenantId, + readPreference: {mode: 'primary'} + }; + + jsTestLog(`Starting the tenant migration to wait in failpoint: ${failpoint}`); + assert.commandWorked(tenantMigrationTest.startMigration(migrationOpts)); + + waitInFailPoint.wait(); + + const res = + recipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient migration"}); + assert.eq(res.inprog.length, 1); + const [currOp] = res.inprog; + assert.eq(currOp.state, TenantMigrationTest.RecipientStateEnum.kLearnedFilenames, res); + waitInFailPoint.off(); + + TenantMigrationTest.assertCommitted( + tenantMigrationTest.waitForMigrationToComplete(migrationOpts)); + + tenantMigrationTest.stop(); +})(); +})(); diff --git a/jstests/replsets/tenant_migration_recipient_startup_recovery.js b/jstests/replsets/tenant_migration_recipient_startup_recovery.js index 553cfb0b91f..d73dd0c975e 100644 --- a/jstests/replsets/tenant_migration_recipient_startup_recovery.js +++ b/jstests/replsets/tenant_migration_recipient_startup_recovery.js @@ -7,6 +7,7 @@ * @tags: [ * incompatible_with_eft, * incompatible_with_macos, + * incompatible_with_shard_merge, * incompatible_with_windows_tls, * requires_majority_read_concern, * requires_persistence, diff --git a/jstests/replsets/tenant_migration_recipient_sync_donor_timestamp.js b/jstests/replsets/tenant_migration_recipient_sync_donor_timestamp.js index 97684dc8a07..26eb2e7fd87 100644 --- a/jstests/replsets/tenant_migration_recipient_sync_donor_timestamp.js +++ b/jstests/replsets/tenant_migration_recipient_sync_donor_timestamp.js @@ -7,6 +7,7 @@ * @tags: [ * incompatible_with_eft, * incompatible_with_macos, + * incompatible_with_shard_merge, * incompatible_with_windows_tls, * requires_persistence, * requires_replication, diff --git a/jstests/replsets/tenant_migration_recipient_sync_source_reconnect_delayed_secondary.js b/jstests/replsets/tenant_migration_recipient_sync_source_reconnect_delayed_secondary.js index 7fd34589f07..9d3c3a35f4b 100644 --- a/jstests/replsets/tenant_migration_recipient_sync_source_reconnect_delayed_secondary.js +++ b/jstests/replsets/tenant_migration_recipient_sync_source_reconnect_delayed_secondary.js @@ -10,6 +10,7 @@ * @tags: [ * incompatible_with_eft, * incompatible_with_macos, + * incompatible_with_shard_merge, * incompatible_with_windows_tls, * requires_majority_read_concern, * requires_persistence, diff --git a/jstests/replsets/tenant_migration_recipient_sync_source_restart_donor_secondary.js b/jstests/replsets/tenant_migration_recipient_sync_source_restart_donor_secondary.js index 8cf2c415337..c36cb71e67b 100644 --- a/jstests/replsets/tenant_migration_recipient_sync_source_restart_donor_secondary.js +++ b/jstests/replsets/tenant_migration_recipient_sync_source_restart_donor_secondary.js @@ -9,6 +9,7 @@ * @tags: [ * incompatible_with_eft, * incompatible_with_macos, + * incompatible_with_shard_merge, * incompatible_with_windows_tls, * requires_majority_read_concern, * requires_persistence, diff --git a/jstests/replsets/tenant_migration_resume_collection_cloner_after_recipient_failover.js b/jstests/replsets/tenant_migration_resume_collection_cloner_after_recipient_failover.js index 62d51b1258a..bd6feb7e8e5 100644 --- a/jstests/replsets/tenant_migration_resume_collection_cloner_after_recipient_failover.js +++ b/jstests/replsets/tenant_migration_resume_collection_cloner_after_recipient_failover.js @@ -4,6 +4,7 @@ * @tags: [ * incompatible_with_eft, * incompatible_with_macos, + * incompatible_with_shard_merge, * incompatible_with_windows_tls, * requires_majority_read_concern, * requires_persistence, diff --git a/jstests/replsets/tenant_migration_resume_collection_cloner_after_rename.js b/jstests/replsets/tenant_migration_resume_collection_cloner_after_rename.js index e50e9c48c40..e3a2b36a612 100644 --- a/jstests/replsets/tenant_migration_resume_collection_cloner_after_rename.js +++ b/jstests/replsets/tenant_migration_resume_collection_cloner_after_rename.js @@ -4,6 +4,7 @@ * @tags: [ * incompatible_with_eft, * incompatible_with_macos, + * incompatible_with_shard_merge, * incompatible_with_windows_tls, * requires_majority_read_concern, * requires_persistence, diff --git a/jstests/replsets/tenant_migration_resume_oplog_application.js b/jstests/replsets/tenant_migration_resume_oplog_application.js index da857c9cd70..207be5f0968 100644 --- a/jstests/replsets/tenant_migration_resume_oplog_application.js +++ b/jstests/replsets/tenant_migration_resume_oplog_application.js @@ -4,6 +4,7 @@ * @tags: [ * incompatible_with_eft, * incompatible_with_macos, + * incompatible_with_shard_merge, * incompatible_with_windows_tls, * requires_majority_read_concern, * requires_persistence, diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp index 70fc5c1384f..54273ec611f 100644 --- a/src/mongo/db/repl/tenant_migration_donor_service.cpp +++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp @@ -94,9 +94,11 @@ bool shouldStopSendingRecipientForgetMigrationCommand(Status status) { ErrorCodes::isInterruption(status)); } -bool shouldStopSendingRecipientSyncDataCommand(Status status) { +bool shouldStopSendingRecipientSyncDataCommand(Status status, MigrationProtocolEnum protocol) { + auto isRetriable = + ErrorCodes::isRetriableError(status) && protocol != MigrationProtocolEnum::kShardMerge; return status.isOK() || - !(ErrorCodes::isRetriableError(status) || + !(isRetriable || // Returned if findHost() is unable to target the recipient in 15 seconds, which may // happen after a failover. status == ErrorCodes::FailedToSatisfyReadPreference); @@ -726,9 +728,11 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendCommandToRecipi }); }); }) - .until([token, cmdObj, isRecipientSyncDataCmd](Status status) { + .until([this, self = shared_from_this(), token, cmdObj, isRecipientSyncDataCmd]( + Status status) { if (isRecipientSyncDataCmd) { - return shouldStopSendingRecipientSyncDataCommand(status); + stdx::lock_guard<Latch> lg(_mutex); + return shouldStopSendingRecipientSyncDataCommand(status, getProtocol()); } else { // If the recipient command is not 'recipientSyncData', it must be // 'recipientForgetMigration'. diff --git a/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp b/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp index 1c741c5d914..3de36b59ce3 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp @@ -125,7 +125,7 @@ void TenantMigrationRecipientOpObserver::onUpdate(OperationContext* opCtx, << "Bad state " << TenantMigrationRecipientState_serializer(state) << " for protocol '" << MigrationProtocol_serializer(protocol) << "'", - protocol == MigrationProtocolEnum::kMultitenantMigrations); + protocol == MigrationProtocolEnum::kShardMerge); break; case TenantMigrationRecipientStateEnum::kStarted: createAccessBlockerIfNeeded(opCtx, recipientStateDoc); diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp index 77f469aa819..8d6316c88ea 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp @@ -88,6 +88,7 @@ using namespace fmt; const std::string kTTLIndexName = "TenantMigrationRecipientTTLIndex"; const Backoff kExponentialBackoff(Seconds(1), Milliseconds::max()); constexpr StringData kOplogBufferPrefix = "repl.migration.oplog_"_sd; +constexpr StringData kDonatedFilesPrefix = "donatedFiles."_sd; constexpr int kBackupCursorFileFetcherRetryAttempts = 10; NamespaceString getOplogBufferNs(const UUID& migrationUUID) { @@ -95,6 +96,11 @@ NamespaceString getOplogBufferNs(const UUID& migrationUUID) { kOplogBufferPrefix + migrationUUID.toString()); } +NamespaceString getDonatedFilesNs(const UUID& migrationUUID) { + return NamespaceString(NamespaceString::kConfigDb, + kDonatedFilesPrefix + migrationUUID.toString()); +} + boost::intrusive_ptr<ExpressionContext> makeExpressionContext(OperationContext* opCtx) { StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces; @@ -905,18 +911,18 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_initializeStateDoc( } void TenantMigrationRecipientService::Instance::_killBackupCursor(WithLock lk) { - if (!_backupCursorId || _backupCursorNamespaceString.isEmpty()) { + if (!_donorFilenameBackupCursorId || _donorFilenameBackupCursorNamespaceString.isEmpty()) { return; } - // TODO (SERVER-61131) likely want to cancel getMore/keepalive here as well + // TODO (SERVER-61132) likely want to cancel getMore/keepalive here as well - executor::RemoteCommandRequest request(_client->getServerHostAndPort(), - _backupCursorNamespaceString.db().toString(), - BSON("killCursors" - << _backupCursorNamespaceString.coll().toString() - << "cursors" << BSON_ARRAY(_backupCursorId)), - nullptr); + executor::RemoteCommandRequest request( + _client->getServerHostAndPort(), + _donorFilenameBackupCursorNamespaceString.db().toString(), + BSON("killCursors" << _donorFilenameBackupCursorNamespaceString.coll().toString() + << "cursors" << BSON_ARRAY(_donorFilenameBackupCursorId)), + nullptr); request.sslMode = transport::kGlobalSSLMode; auto scheduleResult = @@ -942,7 +948,7 @@ void TenantMigrationRecipientService::Instance::_killBackupCursor(WithLock lk) { } } -ExecutorFuture<void> TenantMigrationRecipientService::Instance::_createFileFetcher( +ExecutorFuture<void> TenantMigrationRecipientService::Instance::_getDonorFilenames( const CancellationToken& token) { stdx::lock_guard lk(_mutex); LOGV2_DEBUG(6113000, @@ -959,33 +965,27 @@ ExecutorFuture<void> TenantMigrationRecipientService::Instance::_createFileFetch return aggRequest.toBSON(BSONObj()); }(); - // TODO (SERVER-61131) store or pass in returnedFiles so that we can access it when this - // work is done - auto returnedFiles = std::make_shared<std::vector<BSONObj>>(); - auto fetchStatus = std::make_shared<boost::optional<Status>>(); - auto fetcherCallback = [this, self = shared_from_this(), fetchStatus, returnedFiles, token]( + auto fetcherCallback = [this, self = shared_from_this(), fetchStatus, token]( const Fetcher::QueryResponseStatus& dataStatus, Fetcher::NextAction* nextAction, BSONObjBuilder* getMoreBob) { if (!dataStatus.isOK()) { *fetchStatus = dataStatus.getStatus(); - returnedFiles->clear(); LOGV2_ERROR(6113003, "backup cursor failed", "error"_attr = dataStatus.getStatus()); return; } if (token.isCanceled()) { *fetchStatus = Status(ErrorCodes::CallbackCanceled, "backup cursor interrupted"); - returnedFiles->clear(); return; } stdx::lock_guard lk(_mutex); const auto& data = dataStatus.getValue(); - _backupCursorId = data.cursorId; - _backupCursorNamespaceString = data.nss; + _donorFilenameBackupCursorId = data.cursorId; + _donorFilenameBackupCursorNamespaceString = data.nss; for (const BSONObj& doc : data.documents) { if (doc["metadata"]) { @@ -993,7 +993,7 @@ ExecutorFuture<void> TenantMigrationRecipientService::Instance::_createFileFetch const auto& metadata = doc["metadata"].Obj(); auto startApplyingDonorOpTime = OpTime(metadata["checkpointTimestamp"].timestamp(), OpTime::kUninitializedTerm); - // TODO (SERVER-61131) Uncomment the following lines when we skip + // TODO (SERVER-61132) Uncomment the following lines when we skip // _getStartopTimesFromDonor entirely // _stateDoc.setStartApplyingDonorOpTime(startApplyingDonorOpTime); // _stateDoc.setStartFetchingDonorOpTime(startApplyingDonorOpTime); @@ -1009,7 +1009,42 @@ ExecutorFuture<void> TenantMigrationRecipientService::Instance::_createFileFetch "migrationId"_attr = _stateDoc.getId(), "filename"_attr = doc["filename"].String(), "backupCursorId"_attr = data.cursorId); - returnedFiles->emplace_back(doc.getOwned()); + + auto uniqueOpCtx = cc().makeOperationContext(); + auto opCtx = uniqueOpCtx.get(); + auto donatedFilesNs = getDonatedFilesNs(getMigrationUUID()); + auto status = writeConflictRetry( + opCtx, + "insertBackupCursorEntry", + donatedFilesNs.ns(), + [opCtx, donatedFilesNs, doc]() -> Status { + Lock::GlobalWrite lk(opCtx); + AutoGetDb autoDb(opCtx, donatedFilesNs.db(), mongo::MODE_X); + auto db = autoDb.ensureDbExists(opCtx); + CollectionPtr collection = + CollectionCatalog::get(opCtx)->lookupCollectionByNamespace( + opCtx, donatedFilesNs); + WriteUnitOfWork wuow(opCtx); + if (!collection) { + CollectionOptions emptyCollOptions; + uassertStatusOK( + db->userCreateNS(opCtx, donatedFilesNs, emptyCollOptions)); + collection = CollectionCatalog::get(opCtx)->lookupCollectionByNamespace( + opCtx, donatedFilesNs); + } + invariant(collection); + + auto obj = + doc.addField(BSON("_id" << OID::gen()).firstElement()).getOwned(); + + OpDebug* const nullOpDebug = nullptr; + uassertStatusOK(collection->insertDocument( + opCtx, InsertStatement(obj), nullOpDebug, false)); + wuow.commit(); + + return Status::OK(); + }); + uassertStatusOK(status); } } @@ -1026,7 +1061,7 @@ ExecutorFuture<void> TenantMigrationRecipientService::Instance::_createFileFetch getMoreBob->append("collection", data.nss.coll()); }; - _backupCursorFileFetcher = std::make_unique<Fetcher>( + _donorFilenameBackupCursorFileFetcher = std::make_unique<Fetcher>( (**_scopedExecutor).get(), _client->getServerHostAndPort(), NamespaceString::kAdminDb.toString(), @@ -1039,27 +1074,17 @@ ExecutorFuture<void> TenantMigrationRecipientService::Instance::_createFileFetch kBackupCursorFileFetcherRetryAttempts, executor::RemoteCommandRequest::kNoTimeout), transport::kGlobalSSLMode); - uassertStatusOK(_backupCursorFileFetcher->schedule()); + uassertStatusOK(_donorFilenameBackupCursorFileFetcher->schedule()); - return _backupCursorFileFetcher->onCompletion() + return _donorFilenameBackupCursorFileFetcher->onCompletion() .thenRunOn(**_scopedExecutor) - .then([this, self = shared_from_this(), fetchStatus] { + .then([fetchStatus] { if (!*fetchStatus) { // the callback was never invoked uasserted(6113007, "Internal error running cursor callback in command"); } - auto status = fetchStatus->get(); - if (!status.isOK() && status.code() != 50915) { - // In the event of 50915: A checkpoint took place while - // opening a backup cursor, we should retry and *not* cancel - // migration. See https://jira.mongodb.org/browse/SERVER-61964 - // TODO (SERVER-61964): remove conditional check for 50915 error - // and cancel migration if !status.isOK() - this->cancelMigration(); - } - - uassertStatusOK(status); + uassertStatusOK(fetchStatus->get()); }); } @@ -1073,8 +1098,8 @@ void TenantMigrationRecipientService::Instance::_getStartOpTimesFromDonor(WithLo // We only expect to already have start optimes populated if we are not // resuming a migration and this is a multitenant migration. - // TODO (SERVER-61131) Eventually we'll skip _getStartopTimesFromDonor entirely - // for shard merge, but currently _createFileFetcher will populate optimes for + // TODO (SERVER-61132) Eventually we'll skip _getStartopTimesFromDonor entirely + // for shard merge, but currently _getDonorFilenames will populate optimes for // the shard merge case. We can just overwrite here since we aren't doing anything // with the backup cursor results yet. auto isShardMerge = _stateDoc.getProtocol() == MigrationProtocolEnum::kShardMerge; @@ -2100,8 +2125,8 @@ void TenantMigrationRecipientService::Instance::_interrupt(Status status, _cancelRemainingWork(lk); - if (_backupCursorFileFetcher) { - _backupCursorFileFetcher->shutdown(); + if (_donorFilenameBackupCursorFileFetcher) { + _donorFilenameBackupCursorFileFetcher->shutdown(); } // If the task is running, then setting promise result will be taken care by the main task @@ -2191,8 +2216,8 @@ void TenantMigrationRecipientService::Instance::_cleanupOnDataSyncCompletion(Sta swap(savedTenantOplogApplier, _tenantOplogApplier); swap(savedWriterPool, _writerPool); - _backupCursorId = 0; - _backupCursorFileFetcher = nullptr; + _donorFilenameBackupCursorId = 0; + _donorFilenameBackupCursorFileFetcher = nullptr; } // Perform join outside the lock to avoid deadlocks. @@ -2510,7 +2535,7 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run( } return AsyncTry([this, self = shared_from_this(), token] { - return _createFileFetcher(token); + return _getDonorFilenames(token); }) .until([](Status status) { if (status.code() == 50915) { @@ -2530,11 +2555,19 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run( .on(**_scopedExecutor, token); }) .then([this, self = shared_from_this()] { - // TODO (SERVER-61131) temporarily stop fetcher/backup cursor here for + if (_stateDoc.getProtocol() != MigrationProtocolEnum::kShardMerge) { + return; + } + + stdx::lock_guard lk(_mutex); + _stateDoc.setState(TenantMigrationRecipientStateEnum::kLearnedFilenames); + }) + .then([this, self = shared_from_this()] { + stdx::lock_guard lk(_mutex); + // TODO (SERVER-61133) temporarily stop fetcher/backup cursor here for // now. We shut down the backup cursor in onCompletion continuation, but // some tests fail unless we do this here, punting on dealing with those // tests until later ticket(s) - stdx::lock_guard lk(_mutex); _killBackupCursor(lk); _getStartOpTimesFromDonor(lk); return _updateStateDocForMajority(lk); @@ -2700,7 +2733,10 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run( if (_taskState.isInterrupted()) { status = _taskState.getInterruptStatus(); } - if ((ErrorCodes::isRetriableError(status) || isRetriableOplogFetcherError(status)) && + + // shard merge is not resumable for any replica set state transitions or network errors + if (_stateDoc.getProtocol() != MigrationProtocolEnum::kShardMerge && + (ErrorCodes::isRetriableError(status) || isRetriableOplogFetcherError(status)) && !_taskState.isExternalInterrupt() && _stateDocPersistedPromise.getFuture().isReady()) { // Reset the task state and clear the interrupt status. diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.h b/src/mongo/db/repl/tenant_migration_recipient_service.h index 41518acb6e8..39171fd2f90 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.h +++ b/src/mongo/db/repl/tenant_migration_recipient_service.h @@ -371,7 +371,7 @@ public: /** * Creates a backup cursor wrapped in a Fetcher. */ - ExecutorFuture<void> _createFileFetcher(const CancellationToken& token); + ExecutorFuture<void> _getDonorFilenames(const CancellationToken& token); /** * Kills the Donor backup cursor @@ -578,9 +578,9 @@ public: std::unique_ptr<DBClientConnection> _client; // (S) std::unique_ptr<DBClientConnection> _oplogFetcherClient; // (S) - CursorId _backupCursorId; // (M) - NamespaceString _backupCursorNamespaceString; // (M) - std::unique_ptr<Fetcher> _backupCursorFileFetcher; // (M) + CursorId _donorFilenameBackupCursorId; // (M) + NamespaceString _donorFilenameBackupCursorNamespaceString; // (M) + std::unique_ptr<Fetcher> _donorFilenameBackupCursorFileFetcher; // (M) std::unique_ptr<OplogFetcherFactory> _createOplogFetcherFn = std::make_unique<CreateOplogFetcherFn>(); // (M) |