diff options
Diffstat (limited to 'src/mongo/db/repl/tenant_migration_recipient_service.cpp')
-rw-r--r-- | src/mongo/db/repl/tenant_migration_recipient_service.cpp | 117 |
1 files changed, 74 insertions, 43 deletions
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp index 57ec62819a1..c6c4d4d2958 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp @@ -76,6 +76,7 @@ #include "mongo/db/transaction/transaction_participant.h" #include "mongo/db/vector_clock_mutable.h" #include "mongo/db/write_concern_options.h" +#include "mongo/executor/task_executor.h" #include "mongo/logv2/log.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/util/assert_util.h" @@ -93,6 +94,8 @@ const std::string kTTLIndexName = "TenantMigrationRecipientTTLIndex"; const Backoff kExponentialBackoff(Seconds(1), Milliseconds::max()); constexpr StringData kOplogBufferPrefix = "repl.migration.oplog_"_sd; constexpr int kBackupCursorFileFetcherRetryAttempts = 10; +constexpr int kCheckpointTsBackupCursorErrorCode = 6929900; +constexpr int kCloseCursorBeforeOpenErrorCode = 50886; NamespaceString getOplogBufferNs(const UUID& migrationUUID) { return NamespaceString(NamespaceString::kConfigDb, @@ -979,25 +982,8 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_killBackupCursor() nullptr); request.sslMode = _donorUri.getSSLMode(); - auto scheduleResult = - (_recipientService->getInstanceCleanupExecutor()) - ->scheduleRemoteCommand( - request, [](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) { - if (!args.response.isOK()) { - LOGV2_WARNING(6113005, - "killCursors command task failed", - "error"_attr = redact(args.response.status)); - return; - } - auto status = getStatusFromCommandResult(args.response.data); - if (status.isOK()) { - LOGV2_INFO(6113415, "Killed backup cursor"); - } else { - LOGV2_WARNING(6113006, - "killCursors command failed", - "error"_attr = redact(status)); - } - }); + const auto scheduleResult = _scheduleKillBackupCursorWithLock( + lk, _recipientService->getInstanceCleanupExecutor()); if (!scheduleResult.isOK()) { LOGV2_WARNING(6113004, "Failed to run killCursors command on backup cursor", @@ -1009,13 +995,8 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_killBackupCursor() SemiFuture<void> TenantMigrationRecipientService::Instance::_openBackupCursor( const CancellationToken& token) { - stdx::lock_guard lk(_mutex); - LOGV2_DEBUG(6113000, - 1, - "Trying to open backup cursor on donor primary", - "migrationId"_attr = _stateDoc.getId(), - "donorConnectionString"_attr = _stateDoc.getDonorConnectionString()); - const auto cmdObj = [] { + + const auto aggregateCommandRequestObj = [] { AggregateCommandRequest aggRequest( NamespaceString::makeCollectionlessAggregateNSS(NamespaceString::kAdminDb), {BSON("$backupCursor" << BSONObj())}); @@ -1024,11 +1005,18 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_openBackupCursor( return aggRequest.toBSON(BSONObj()); }(); - auto startMigrationDonorTimestamp = _stateDoc.getStartMigrationDonorTimestamp(); + stdx::lock_guard lk(_mutex); + LOGV2_DEBUG(6113000, + 1, + "Trying to open backup cursor on donor primary", + "migrationId"_attr = _stateDoc.getId(), + "donorConnectionString"_attr = _stateDoc.getDonorConnectionString()); + + const auto startMigrationDonorTimestamp = _stateDoc.getStartMigrationDonorTimestamp(); auto fetchStatus = std::make_shared<boost::optional<Status>>(); auto uniqueMetadataInfo = std::make_unique<boost::optional<shard_merge_utils::MetadataInfo>>(); - auto fetcherCallback = + const auto fetcherCallback = [ this, self = shared_from_this(), @@ -1043,8 +1031,8 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_openBackupCursor( uassertStatusOK(dataStatus); uassert(ErrorCodes::CallbackCanceled, "backup cursor interrupted", !token.isCanceled()); - auto uniqueOpCtx = cc().makeOperationContext(); - auto opCtx = uniqueOpCtx.get(); + const auto uniqueOpCtx = cc().makeOperationContext(); + const auto opCtx = uniqueOpCtx.get(); const auto& data = dataStatus.getValue(); for (const BSONObj& doc : data.documents) { @@ -1059,14 +1047,6 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_openBackupCursor( "backupCursorId"_attr = data.cursorId, "backupCursorCheckpointTimestamp"_attr = checkpointTimestamp); - // This ensures that the recipient won’t receive any 2 phase index build donor - // oplog entries during the migration. We also have a check in the tenant oplog - // applier to detect such oplog entries. Adding a check here helps us to detect - // the problem earlier. - uassert(6929900, - "backupCursorCheckpointTimestamp should be greater than or equal to " - "startMigrationDonorTimestamp", - checkpointTimestamp >= startMigrationDonorTimestamp); { stdx::lock_guard lk(_mutex); stdx::lock_guard<TenantMigrationSharedData> sharedDatalk(*_sharedData); @@ -1075,6 +1055,15 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_openBackupCursor( BackupCursorInfo{data.cursorId, data.nss, checkpointTimestamp}); } + // This ensures that the recipient won’t receive any 2 phase index build donor + // oplog entries during the migration. We also have a check in the tenant oplog + // applier to detect such oplog entries. Adding a check here helps us to detect + // the problem earlier. + uassert(kCheckpointTsBackupCursorErrorCode, + "backupCursorCheckpointTimestamp should be greater than or equal to " + "startMigrationDonorTimestamp", + checkpointTimestamp >= startMigrationDonorTimestamp); + invariant(metadataInfoPtr && !*metadataInfoPtr); (*metadataInfoPtr) = shard_merge_utils::MetadataInfo::constructMetadataInfo( getMigrationUUID(), _client->getServerAddress(), metadata); @@ -1133,10 +1122,10 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_openBackupCursor( }; _donorFilenameBackupCursorFileFetcher = std::make_unique<Fetcher>( - (**_scopedExecutor).get(), + _backupCursorExecutor.get(), _client->getServerHostAndPort(), NamespaceString::kAdminDb.toString(), - cmdObj, + aggregateCommandRequestObj, fetcherCallback, ReadPreferenceSetting(ReadPreference::PrimaryPreferred).toContainingBSON(), executor::RemoteCommandRequest::kNoTimeout, /* aggregateTimeout */ @@ -1160,6 +1149,35 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_openBackupCursor( .semi(); } +StatusWith<executor::TaskExecutor::CallbackHandle> +TenantMigrationRecipientService::Instance::_scheduleKillBackupCursorWithLock( + WithLock lk, std::shared_ptr<executor::TaskExecutor> executor) { + auto& donorBackupCursorInfo = _getDonorBackupCursorInfo(lk); + executor::RemoteCommandRequest killCursorsRequest( + _client->getServerHostAndPort(), + donorBackupCursorInfo.nss.db().toString(), + BSON("killCursors" << donorBackupCursorInfo.nss.coll().toString() << "cursors" + << BSON_ARRAY(donorBackupCursorInfo.cursorId)), + nullptr); + killCursorsRequest.sslMode = _donorUri.getSSLMode(); + + return executor->scheduleRemoteCommand( + killCursorsRequest, [](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) { + if (!args.response.isOK()) { + LOGV2_WARNING(6113005, + "killCursors command task failed", + "error"_attr = redact(args.response.status)); + return; + } + auto status = getStatusFromCommandResult(args.response.data); + if (status.isOK()) { + LOGV2_INFO(6113415, "Killed backup cursor"); + } else { + LOGV2_WARNING(6113006, "killCursors command failed", "error"_attr = redact(status)); + } + }); +} + SemiFuture<void> TenantMigrationRecipientService::Instance::_openBackupCursorWithRetry( const CancellationToken& token) { return AsyncTry([this, self = shared_from_this(), token] { return _openBackupCursor(token); }) @@ -1169,8 +1187,20 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_openBackupCursorWit "Retrying backup cursor creation after transient error", "migrationId"_attr = getMigrationUUID(), "status"_attr = status); - // A checkpoint took place while opening a backup cursor. We - // should retry and *not* cancel migration. + + return false; + } else if (status.code() == kCheckpointTsBackupCursorErrorCode || + status.code() == kCloseCursorBeforeOpenErrorCode) { + LOGV2_INFO(6955100, + "Closing backup cursor and retrying after getting retryable error", + "migrationId"_attr = getMigrationUUID(), + "status"_attr = status); + + stdx::lock_guard lk(_mutex); + const auto scheduleResult = + _scheduleKillBackupCursorWithLock(lk, _backupCursorExecutor); + uassertStatusOK(scheduleResult); + return false; } @@ -1199,7 +1229,7 @@ void TenantMigrationRecipientService::Instance::_keepBackupCursorAlive( auto& donorBackupCursorInfo = _getDonorBackupCursorInfo(lk); _backupCursorKeepAliveFuture = shard_merge_utils::keepBackupCursorAlive(_backupCursorKeepAliveCancellation, - **_scopedExecutor, + _backupCursorExecutor, _client->getServerHostAndPort(), donorBackupCursorInfo.cursorId, donorBackupCursorInfo.nss); @@ -2703,6 +2733,7 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run( std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& token) noexcept { _scopedExecutor = executor; + _backupCursorExecutor = **_scopedExecutor; auto scopedOutstandingMigrationCounter = TenantMigrationStatistics::get(_serviceContext)->getScopedOutstandingReceivingCount(); |