diff options
Diffstat (limited to 'src/mongo/db/repl/tenant_migration_donor_service.cpp')
-rw-r--r-- | src/mongo/db/repl/tenant_migration_donor_service.cpp | 166 |
1 files changed, 80 insertions, 86 deletions
diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp index 9470ac2b036..920538fadd6 100644 --- a/src/mongo/db/repl/tenant_migration_donor_service.cpp +++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp @@ -30,6 +30,7 @@ #include "mongo/db/repl/tenant_migration_donor_service.h" +#include "mongo/client/async_remote_command_targeter_adapter.h" #include "mongo/client/connection_string.h" #include "mongo/client/replica_set_monitor.h" #include "mongo/config.h" @@ -49,6 +50,8 @@ #include "mongo/db/repl/tenant_migration_state_machine_gen.h" #include "mongo/db/repl/tenant_migration_statistics.h" #include "mongo/db/repl/wait_for_majority_service.h" +#include "mongo/executor/async_rpc.h" +#include "mongo/executor/async_rpc_retry_policy.h" #include "mongo/executor/connection_pool.h" #include "mongo/executor/network_interface_factory.h" #include "mongo/logv2/log.h" @@ -91,25 +94,50 @@ const ReadPreferenceSetting kPrimaryOnlyReadPreference(ReadPreference::PrimaryOn const int kMaxRecipientKeyDocsFindAttempts = 10; -bool shouldStopSendingRecipientForgetMigrationCommand(Status status) { - return status.isOK() || - !(ErrorCodes::isRetriableError(status) || ErrorCodes::isNetworkTimeoutError(status) || - // Returned if findHost() is unable to target the recipient in 15 seconds, which may - // happen after a failover. - status == ErrorCodes::FailedToSatisfyReadPreference || - ErrorCodes::isInterruption(status)); -} +/** + * Encapsulates the retry logic for sending the ForgetMigration command. + */ +class RecipientForgetMigrationRetryPolicy + : public async_rpc::RetryWithBackoffOnErrorCategories<ErrorCategory::RetriableError, + ErrorCategory::NetworkTimeoutError, + ErrorCategory::Interruption> { +public: + using RetryWithBackoffOnErrorCategories::RetryWithBackoffOnErrorCategories; + bool recordAndEvaluateRetry(Status status) override { + if (status.isOK()) { + return false; + } + auto underlyingError = async_rpc::unpackRPCStatusIgnoringWriteConcernAndWriteErrors(status); + // Returned if findHost() is unable to target the recipient in 15 seconds, which may + // happen after a failover. + return RetryWithBackoffOnErrorCategories::recordAndEvaluateRetry(underlyingError) || + underlyingError == ErrorCodes::FailedToSatisfyReadPreference; + } +}; -bool shouldStopSendingRecipientSyncDataCommand(Status status, MigrationProtocolEnum protocol) { - if (status.isOK() || protocol == MigrationProtocolEnum::kShardMerge) { - return true; +/** + * Encapsulates the retry logic for sending the SyncData command. + */ +class RecipientSyncDataRetryPolicy + : public async_rpc::RetryWithBackoffOnErrorCategories<ErrorCategory::RetriableError, + ErrorCategory::NetworkTimeoutError> { +public: + RecipientSyncDataRetryPolicy(MigrationProtocolEnum p, Backoff b) + : RetryWithBackoffOnErrorCategories(b), _protocol{p} {} + + /** Returns true if we should retry sending SyncData given the error */ + bool recordAndEvaluateRetry(Status status) { + if (_protocol == MigrationProtocolEnum::kShardMerge || status.isOK()) { + return false; + } + auto underlyingError = async_rpc::unpackRPCStatusIgnoringWriteConcernAndWriteErrors(status); + return RetryWithBackoffOnErrorCategories::recordAndEvaluateRetry(status) || + underlyingError == ErrorCodes::FailedToSatisfyReadPreference; } - return !(ErrorCodes::isRetriableError(status) || ErrorCodes::isNetworkTimeoutError(status) || - // Returned if findHost() is unable to target the recipient in 15 seconds, which may - // happen after a failover. - status == ErrorCodes::FailedToSatisfyReadPreference); -} +private: + MigrationProtocolEnum _protocol; +}; bool shouldStopFetchingRecipientClusterTimeKeyDocs(Status status) { return status.isOK() || @@ -744,90 +772,48 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_waitForMajorityWrit }); } -ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendCommandToRecipient( - std::shared_ptr<executor::ScopedTaskExecutor> executor, - std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS, - const BSONObj& cmdObj, - const CancellationToken& token) { - const bool isRecipientSyncDataCmd = cmdObj.hasField(RecipientSyncData::kCommandName); - return AsyncTry( - [this, self = shared_from_this(), executor, recipientTargeterRS, cmdObj, token] { - return recipientTargeterRS->findHost(kPrimaryOnlyReadPreference, token) - .thenRunOn(**executor) - .then([this, self = shared_from_this(), executor, cmdObj, token]( - auto recipientHost) { - executor::RemoteCommandRequest request( - std::move(recipientHost), - NamespaceString::kAdminDb.toString(), - std::move(cmdObj), - rpc::makeEmptyMetadata(), - nullptr); - request.sslMode = _sslMode; - - return (_recipientCmdExecutor) - ->scheduleRemoteCommand(std::move(request), token) - .then([this, - self = shared_from_this()](const auto& response) -> Status { - if (!response.isOK()) { - return response.status; - } - auto commandStatus = getStatusFromCommandResult(response.data); - commandStatus.addContext( - "Tenant migration recipient command failed"); - return commandStatus; - }); - }); - }) - .until([this, self = shared_from_this(), token, cmdObj, isRecipientSyncDataCmd]( - Status status) { - if (isRecipientSyncDataCmd) { - return shouldStopSendingRecipientSyncDataCommand(status, getProtocol()); - } else { - // If the recipient command is not 'recipientSyncData', it must be - // 'recipientForgetMigration'. - invariant(cmdObj.hasField(RecipientForgetMigration::kCommandName)); - return shouldStopSendingRecipientForgetMigrationCommand(status); - } - }) - .withBackoffBetweenIterations(kExponentialBackoff) - .on(**executor, token); -} - ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientSyncDataCommand( - std::shared_ptr<executor::ScopedTaskExecutor> executor, + std::shared_ptr<executor::ScopedTaskExecutor> exec, std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS, const CancellationToken& token) { + auto donorConnString = + repl::ReplicationCoordinator::get(_serviceContext)->getConfigConnectionString(); - const auto cmdObj = [&] { - auto donorConnString = - repl::ReplicationCoordinator::get(_serviceContext)->getConfigConnectionString(); - - RecipientSyncData request; - request.setDbName(NamespaceString::kAdminDb); + RecipientSyncData request; + request.setDbName(NamespaceString::kAdminDb); - MigrationRecipientCommonData commonData( - _migrationUuid, donorConnString.toString(), _readPreference); - commonData.setRecipientCertificateForDonor(_recipientCertificateForDonor); - if (_protocol == MigrationProtocolEnum::kMultitenantMigrations) { - commonData.setTenantId(boost::optional<StringData>(_tenantId)); - } + MigrationRecipientCommonData commonData( + _migrationUuid, donorConnString.toString(), _readPreference); + commonData.setRecipientCertificateForDonor(_recipientCertificateForDonor); + if (_protocol == MigrationProtocolEnum::kMultitenantMigrations) { + commonData.setTenantId(boost::optional<StringData>(_tenantId)); + } - stdx::lock_guard<Latch> lg(_mutex); - commonData.setProtocol(_protocol); - request.setMigrationRecipientCommonData(commonData); + commonData.setProtocol(_protocol); + request.setMigrationRecipientCommonData(commonData); + { + stdx::lock_guard<Latch> lg(_mutex); invariant(_stateDoc.getStartMigrationDonorTimestamp()); request.setStartMigrationDonorTimestamp(*_stateDoc.getStartMigrationDonorTimestamp()); request.setReturnAfterReachingDonorTimestamp(_stateDoc.getBlockTimestamp()); - return request.toBSON(BSONObj()); - }(); + } - return _sendCommandToRecipient(executor, recipientTargeterRS, cmdObj, token); + auto asyncTargeter = std::make_unique<async_rpc::AsyncRemoteCommandTargeterAdapter>( + kPrimaryOnlyReadPreference, recipientTargeterRS); + auto retryPolicy = + std::make_shared<RecipientSyncDataRetryPolicy>(getProtocol(), kExponentialBackoff); + auto cmdRes = async_rpc::sendCommand( + request, _serviceContext, std::move(asyncTargeter), **exec, token, retryPolicy); + return std::move(cmdRes).ignoreValue().onError([](Status status) { + return async_rpc::unpackRPCStatusIgnoringWriteConcernAndWriteErrors(status).addContext( + "Tenant migration recipient command failed"); + }); } ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientForgetMigrationCommand( - std::shared_ptr<executor::ScopedTaskExecutor> executor, + std::shared_ptr<executor::ScopedTaskExecutor> exec, std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS, const CancellationToken& token) { @@ -847,7 +833,15 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientForget commonData.setProtocol(_protocol); request.setMigrationRecipientCommonData(commonData); - return _sendCommandToRecipient(executor, recipientTargeterRS, request.toBSON(BSONObj()), token); + auto asyncTargeter = std::make_unique<async_rpc::AsyncRemoteCommandTargeterAdapter>( + kPrimaryOnlyReadPreference, recipientTargeterRS); + auto retryPolicy = std::make_shared<RecipientForgetMigrationRetryPolicy>(kExponentialBackoff); + auto cmdRes = async_rpc::sendCommand( + request, _serviceContext, std::move(asyncTargeter), **exec, token, retryPolicy); + return std::move(cmdRes).ignoreValue().onError([](Status status) { + return async_rpc::unpackRPCStatusIgnoringWriteConcernAndWriteErrors(status).addContext( + "Tenant migration recipient command failed"); + }); } CancellationToken TenantMigrationDonorService::Instance::_initAbortMigrationSource( |