summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/tenant_migration_donor_service.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/tenant_migration_donor_service.cpp')
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_service.cpp166
1 files changed, 80 insertions, 86 deletions
diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp
index 9470ac2b036..920538fadd6 100644
--- a/src/mongo/db/repl/tenant_migration_donor_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp
@@ -30,6 +30,7 @@
#include "mongo/db/repl/tenant_migration_donor_service.h"
+#include "mongo/client/async_remote_command_targeter_adapter.h"
#include "mongo/client/connection_string.h"
#include "mongo/client/replica_set_monitor.h"
#include "mongo/config.h"
@@ -49,6 +50,8 @@
#include "mongo/db/repl/tenant_migration_state_machine_gen.h"
#include "mongo/db/repl/tenant_migration_statistics.h"
#include "mongo/db/repl/wait_for_majority_service.h"
+#include "mongo/executor/async_rpc.h"
+#include "mongo/executor/async_rpc_retry_policy.h"
#include "mongo/executor/connection_pool.h"
#include "mongo/executor/network_interface_factory.h"
#include "mongo/logv2/log.h"
@@ -91,25 +94,50 @@ const ReadPreferenceSetting kPrimaryOnlyReadPreference(ReadPreference::PrimaryOn
const int kMaxRecipientKeyDocsFindAttempts = 10;
-bool shouldStopSendingRecipientForgetMigrationCommand(Status status) {
- return status.isOK() ||
- !(ErrorCodes::isRetriableError(status) || ErrorCodes::isNetworkTimeoutError(status) ||
- // Returned if findHost() is unable to target the recipient in 15 seconds, which may
- // happen after a failover.
- status == ErrorCodes::FailedToSatisfyReadPreference ||
- ErrorCodes::isInterruption(status));
-}
+/**
+ * Encapsulates the retry logic for sending the ForgetMigration command.
+ */
+class RecipientForgetMigrationRetryPolicy
+ : public async_rpc::RetryWithBackoffOnErrorCategories<ErrorCategory::RetriableError,
+ ErrorCategory::NetworkTimeoutError,
+ ErrorCategory::Interruption> {
+public:
+ using RetryWithBackoffOnErrorCategories::RetryWithBackoffOnErrorCategories;
+ bool recordAndEvaluateRetry(Status status) override {
+ if (status.isOK()) {
+ return false;
+ }
+ auto underlyingError = async_rpc::unpackRPCStatusIgnoringWriteConcernAndWriteErrors(status);
+ // Returned if findHost() is unable to target the recipient in 15 seconds, which may
+ // happen after a failover.
+ return RetryWithBackoffOnErrorCategories::recordAndEvaluateRetry(underlyingError) ||
+ underlyingError == ErrorCodes::FailedToSatisfyReadPreference;
+ }
+};
-bool shouldStopSendingRecipientSyncDataCommand(Status status, MigrationProtocolEnum protocol) {
- if (status.isOK() || protocol == MigrationProtocolEnum::kShardMerge) {
- return true;
+/**
+ * Encapsulates the retry logic for sending the SyncData command.
+ */
+class RecipientSyncDataRetryPolicy
+ : public async_rpc::RetryWithBackoffOnErrorCategories<ErrorCategory::RetriableError,
+ ErrorCategory::NetworkTimeoutError> {
+public:
+ RecipientSyncDataRetryPolicy(MigrationProtocolEnum p, Backoff b)
+ : RetryWithBackoffOnErrorCategories(b), _protocol{p} {}
+
+ /** Returns true if we should retry sending SyncData given the error */
+ bool recordAndEvaluateRetry(Status status) {
+ if (_protocol == MigrationProtocolEnum::kShardMerge || status.isOK()) {
+ return false;
+ }
+ auto underlyingError = async_rpc::unpackRPCStatusIgnoringWriteConcernAndWriteErrors(status);
+ return RetryWithBackoffOnErrorCategories::recordAndEvaluateRetry(status) ||
+ underlyingError == ErrorCodes::FailedToSatisfyReadPreference;
}
- return !(ErrorCodes::isRetriableError(status) || ErrorCodes::isNetworkTimeoutError(status) ||
- // Returned if findHost() is unable to target the recipient in 15 seconds, which may
- // happen after a failover.
- status == ErrorCodes::FailedToSatisfyReadPreference);
-}
+private:
+ MigrationProtocolEnum _protocol;
+};
bool shouldStopFetchingRecipientClusterTimeKeyDocs(Status status) {
return status.isOK() ||
@@ -744,90 +772,48 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_waitForMajorityWrit
});
}
-ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendCommandToRecipient(
- std::shared_ptr<executor::ScopedTaskExecutor> executor,
- std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS,
- const BSONObj& cmdObj,
- const CancellationToken& token) {
- const bool isRecipientSyncDataCmd = cmdObj.hasField(RecipientSyncData::kCommandName);
- return AsyncTry(
- [this, self = shared_from_this(), executor, recipientTargeterRS, cmdObj, token] {
- return recipientTargeterRS->findHost(kPrimaryOnlyReadPreference, token)
- .thenRunOn(**executor)
- .then([this, self = shared_from_this(), executor, cmdObj, token](
- auto recipientHost) {
- executor::RemoteCommandRequest request(
- std::move(recipientHost),
- NamespaceString::kAdminDb.toString(),
- std::move(cmdObj),
- rpc::makeEmptyMetadata(),
- nullptr);
- request.sslMode = _sslMode;
-
- return (_recipientCmdExecutor)
- ->scheduleRemoteCommand(std::move(request), token)
- .then([this,
- self = shared_from_this()](const auto& response) -> Status {
- if (!response.isOK()) {
- return response.status;
- }
- auto commandStatus = getStatusFromCommandResult(response.data);
- commandStatus.addContext(
- "Tenant migration recipient command failed");
- return commandStatus;
- });
- });
- })
- .until([this, self = shared_from_this(), token, cmdObj, isRecipientSyncDataCmd](
- Status status) {
- if (isRecipientSyncDataCmd) {
- return shouldStopSendingRecipientSyncDataCommand(status, getProtocol());
- } else {
- // If the recipient command is not 'recipientSyncData', it must be
- // 'recipientForgetMigration'.
- invariant(cmdObj.hasField(RecipientForgetMigration::kCommandName));
- return shouldStopSendingRecipientForgetMigrationCommand(status);
- }
- })
- .withBackoffBetweenIterations(kExponentialBackoff)
- .on(**executor, token);
-}
-
ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientSyncDataCommand(
- std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ std::shared_ptr<executor::ScopedTaskExecutor> exec,
std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS,
const CancellationToken& token) {
+ auto donorConnString =
+ repl::ReplicationCoordinator::get(_serviceContext)->getConfigConnectionString();
- const auto cmdObj = [&] {
- auto donorConnString =
- repl::ReplicationCoordinator::get(_serviceContext)->getConfigConnectionString();
-
- RecipientSyncData request;
- request.setDbName(NamespaceString::kAdminDb);
+ RecipientSyncData request;
+ request.setDbName(NamespaceString::kAdminDb);
- MigrationRecipientCommonData commonData(
- _migrationUuid, donorConnString.toString(), _readPreference);
- commonData.setRecipientCertificateForDonor(_recipientCertificateForDonor);
- if (_protocol == MigrationProtocolEnum::kMultitenantMigrations) {
- commonData.setTenantId(boost::optional<StringData>(_tenantId));
- }
+ MigrationRecipientCommonData commonData(
+ _migrationUuid, donorConnString.toString(), _readPreference);
+ commonData.setRecipientCertificateForDonor(_recipientCertificateForDonor);
+ if (_protocol == MigrationProtocolEnum::kMultitenantMigrations) {
+ commonData.setTenantId(boost::optional<StringData>(_tenantId));
+ }
- stdx::lock_guard<Latch> lg(_mutex);
- commonData.setProtocol(_protocol);
- request.setMigrationRecipientCommonData(commonData);
+ commonData.setProtocol(_protocol);
+ request.setMigrationRecipientCommonData(commonData);
+ {
+ stdx::lock_guard<Latch> lg(_mutex);
invariant(_stateDoc.getStartMigrationDonorTimestamp());
request.setStartMigrationDonorTimestamp(*_stateDoc.getStartMigrationDonorTimestamp());
request.setReturnAfterReachingDonorTimestamp(_stateDoc.getBlockTimestamp());
- return request.toBSON(BSONObj());
- }();
+ }
- return _sendCommandToRecipient(executor, recipientTargeterRS, cmdObj, token);
+ auto asyncTargeter = std::make_unique<async_rpc::AsyncRemoteCommandTargeterAdapter>(
+ kPrimaryOnlyReadPreference, recipientTargeterRS);
+ auto retryPolicy =
+ std::make_shared<RecipientSyncDataRetryPolicy>(getProtocol(), kExponentialBackoff);
+ auto cmdRes = async_rpc::sendCommand(
+ request, _serviceContext, std::move(asyncTargeter), **exec, token, retryPolicy);
+ return std::move(cmdRes).ignoreValue().onError([](Status status) {
+ return async_rpc::unpackRPCStatusIgnoringWriteConcernAndWriteErrors(status).addContext(
+ "Tenant migration recipient command failed");
+ });
}
ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientForgetMigrationCommand(
- std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ std::shared_ptr<executor::ScopedTaskExecutor> exec,
std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS,
const CancellationToken& token) {
@@ -847,7 +833,15 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientForget
commonData.setProtocol(_protocol);
request.setMigrationRecipientCommonData(commonData);
- return _sendCommandToRecipient(executor, recipientTargeterRS, request.toBSON(BSONObj()), token);
+ auto asyncTargeter = std::make_unique<async_rpc::AsyncRemoteCommandTargeterAdapter>(
+ kPrimaryOnlyReadPreference, recipientTargeterRS);
+ auto retryPolicy = std::make_shared<RecipientForgetMigrationRetryPolicy>(kExponentialBackoff);
+ auto cmdRes = async_rpc::sendCommand(
+ request, _serviceContext, std::move(asyncTargeter), **exec, token, retryPolicy);
+ return std::move(cmdRes).ignoreValue().onError([](Status status) {
+ return async_rpc::unpackRPCStatusIgnoringWriteConcernAndWriteErrors(status).addContext(
+ "Tenant migration recipient command failed");
+ });
}
CancellationToken TenantMigrationDonorService::Instance::_initAbortMigrationSource(