diff options
Diffstat (limited to 'src/mongo/db/repl/tenant_migration_donor_util.cpp')
-rw-r--r-- | src/mongo/db/repl/tenant_migration_donor_util.cpp | 64 |
1 files changed, 59 insertions, 5 deletions
diff --git a/src/mongo/db/repl/tenant_migration_donor_util.cpp b/src/mongo/db/repl/tenant_migration_donor_util.cpp index c4c0577909a..c33ab2714f8 100644 --- a/src/mongo/db/repl/tenant_migration_donor_util.cpp +++ b/src/mongo/db/repl/tenant_migration_donor_util.cpp @@ -27,11 +27,17 @@ * it in the license file. */ +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplication + #include "mongo/platform/basic.h" #include "mongo/util/str.h" #include "mongo/db/repl/tenant_migration_donor_util.h" +#include "mongo/client/connection_string.h" +#include "mongo/client/remote_command_targeter_rs.h" +#include "mongo/client/replica_set_monitor.h" +#include "mongo/db/commands/tenant_migration_recipient_cmds_gen.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbhelpers.h" @@ -41,12 +47,13 @@ #include "mongo/db/repl/tenant_migration_state_machine_gen.h" #include "mongo/executor/network_interface_factory.h" #include "mongo/executor/thread_pool_task_executor.h" +#include "mongo/logv2/log.h" #include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/fail_point.h" namespace mongo { -namespace tenant_migration { +namespace tenant_migration_donor { namespace { @@ -113,7 +120,7 @@ void onTransitionToBlocking(OperationContext* opCtx, TenantMigrationDonorDocumen mtab = std::make_shared<TenantMigrationAccessBlocker>( opCtx->getServiceContext(), - tenant_migration::makeTenantMigrationExecutor(opCtx->getServiceContext()), + tenant_migration_donor::makeTenantMigrationExecutor(opCtx->getServiceContext()), donorStateDoc.getDatabasePrefix().toString()); mtabByPrefix.add(donorStateDoc.getDatabasePrefix(), mtab); mtab->startBlockingWrites(); @@ -238,6 +245,54 @@ void updateDonorStateDocument(OperationContext* opCtx, })); } +void sendRecipientSyncDataCommand(OperationContext* opCtx, + const TenantMigrationDonorDocument& donorStateDoc) { + const ConnectionString recipientConnectionString = + ConnectionString(donorStateDoc.getRecipientConnectionString().toString(), + ConnectionString::ConnectionType::SET); + auto dataSyncExecutor = makeTenantMigrationExecutor(opCtx->getServiceContext()); + dataSyncExecutor->startup(); + + auto removeReplicaSetMonitorForRecipientGuard = + makeGuard([&] { ReplicaSetMonitor::remove(recipientConnectionString.getSetName()); }); + + // Create the command BSON for the recipientSyncData request. + BSONObj cmdObj = BSONObj([&]() { + const auto donorConnectionString = + repl::ReplicationCoordinator::get(opCtx)->getConfig().getConnectionString().toString(); + + RecipientSyncData request(donorStateDoc.getId(), + donorConnectionString, + donorStateDoc.getDatabasePrefix().toString(), + ReadPreferenceSetting()); + + return request.toBSON(BSONObj()); + }()); + + // Find the host and port of the recipient's primary. + HostAndPort recipientHost([&]() { + auto targeter = RemoteCommandTargeterRS(recipientConnectionString.getSetName(), + recipientConnectionString.getServers()); + return uassertStatusOK(targeter.findHost(opCtx, ReadPreferenceSetting())); + }()); + + executor::RemoteCommandRequest request(recipientHost, + NamespaceString::kAdminDb.toString(), + std::move(cmdObj), + rpc::makeEmptyMetadata(), + nullptr, + Seconds(30)); + + executor::RemoteCommandResponse response = + Status(ErrorCodes::InternalError, "Internal error running command"); + + executor::TaskExecutor::CallbackHandle cbHandle = + uassertStatusOK(dataSyncExecutor->scheduleRemoteCommand( + request, [&response](const auto& args) { response = args.response; })); + + dataSyncExecutor->wait(cbHandle, opCtx); + uassertStatusOK(getStatusFromCommandResult(response.data)); +} } // namespace void startMigration(OperationContext* opCtx, TenantMigrationDonorDocument donorStateDoc) { @@ -255,8 +310,7 @@ void startMigration(OperationContext* opCtx, TenantMigrationDonorDocument donorS // Enter "dataSync" state. insertDonorStateDocument(opCtx, donorStateDoc); - // TODO: Send recipientSyncData and wait for success response (i.e. the recipient's view of - // the data has become consistent). + sendRecipientSyncDataCommand(opCtx, donorStateDoc); // Enter "blocking" state. mtab = startBlockingWritesForTenant(opCtx, donorStateDoc); @@ -351,6 +405,6 @@ void onWriteToDatabase(OperationContext* opCtx, StringData dbName) { } } -} // namespace tenant_migration +} // namespace tenant_migration_donor } // namespace mongo |