summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/tenant_migration_donor_util.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/tenant_migration_donor_util.cpp')
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_util.cpp64
1 files changed, 59 insertions, 5 deletions
diff --git a/src/mongo/db/repl/tenant_migration_donor_util.cpp b/src/mongo/db/repl/tenant_migration_donor_util.cpp
index c4c0577909a..c33ab2714f8 100644
--- a/src/mongo/db/repl/tenant_migration_donor_util.cpp
+++ b/src/mongo/db/repl/tenant_migration_donor_util.cpp
@@ -27,11 +27,17 @@
* it in the license file.
*/
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplication
+
#include "mongo/platform/basic.h"
#include "mongo/util/str.h"
#include "mongo/db/repl/tenant_migration_donor_util.h"
+#include "mongo/client/connection_string.h"
+#include "mongo/client/remote_command_targeter_rs.h"
+#include "mongo/client/replica_set_monitor.h"
+#include "mongo/db/commands/tenant_migration_recipient_cmds_gen.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbhelpers.h"
@@ -41,12 +47,13 @@
#include "mongo/db/repl/tenant_migration_state_machine_gen.h"
#include "mongo/executor/network_interface_factory.h"
#include "mongo/executor/thread_pool_task_executor.h"
+#include "mongo/logv2/log.h"
#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/fail_point.h"
namespace mongo {
-namespace tenant_migration {
+namespace tenant_migration_donor {
namespace {
@@ -113,7 +120,7 @@ void onTransitionToBlocking(OperationContext* opCtx, TenantMigrationDonorDocumen
mtab = std::make_shared<TenantMigrationAccessBlocker>(
opCtx->getServiceContext(),
- tenant_migration::makeTenantMigrationExecutor(opCtx->getServiceContext()),
+ tenant_migration_donor::makeTenantMigrationExecutor(opCtx->getServiceContext()),
donorStateDoc.getDatabasePrefix().toString());
mtabByPrefix.add(donorStateDoc.getDatabasePrefix(), mtab);
mtab->startBlockingWrites();
@@ -238,6 +245,54 @@ void updateDonorStateDocument(OperationContext* opCtx,
}));
}
+void sendRecipientSyncDataCommand(OperationContext* opCtx,
+ const TenantMigrationDonorDocument& donorStateDoc) {
+ const ConnectionString recipientConnectionString =
+ ConnectionString(donorStateDoc.getRecipientConnectionString().toString(),
+ ConnectionString::ConnectionType::SET);
+ auto dataSyncExecutor = makeTenantMigrationExecutor(opCtx->getServiceContext());
+ dataSyncExecutor->startup();
+
+ auto removeReplicaSetMonitorForRecipientGuard =
+ makeGuard([&] { ReplicaSetMonitor::remove(recipientConnectionString.getSetName()); });
+
+ // Create the command BSON for the recipientSyncData request.
+ BSONObj cmdObj = BSONObj([&]() {
+ const auto donorConnectionString =
+ repl::ReplicationCoordinator::get(opCtx)->getConfig().getConnectionString().toString();
+
+ RecipientSyncData request(donorStateDoc.getId(),
+ donorConnectionString,
+ donorStateDoc.getDatabasePrefix().toString(),
+ ReadPreferenceSetting());
+
+ return request.toBSON(BSONObj());
+ }());
+
+ // Find the host and port of the recipient's primary.
+ HostAndPort recipientHost([&]() {
+ auto targeter = RemoteCommandTargeterRS(recipientConnectionString.getSetName(),
+ recipientConnectionString.getServers());
+ return uassertStatusOK(targeter.findHost(opCtx, ReadPreferenceSetting()));
+ }());
+
+ executor::RemoteCommandRequest request(recipientHost,
+ NamespaceString::kAdminDb.toString(),
+ std::move(cmdObj),
+ rpc::makeEmptyMetadata(),
+ nullptr,
+ Seconds(30));
+
+ executor::RemoteCommandResponse response =
+ Status(ErrorCodes::InternalError, "Internal error running command");
+
+ executor::TaskExecutor::CallbackHandle cbHandle =
+ uassertStatusOK(dataSyncExecutor->scheduleRemoteCommand(
+ request, [&response](const auto& args) { response = args.response; }));
+
+ dataSyncExecutor->wait(cbHandle, opCtx);
+ uassertStatusOK(getStatusFromCommandResult(response.data));
+}
} // namespace
void startMigration(OperationContext* opCtx, TenantMigrationDonorDocument donorStateDoc) {
@@ -255,8 +310,7 @@ void startMigration(OperationContext* opCtx, TenantMigrationDonorDocument donorS
// Enter "dataSync" state.
insertDonorStateDocument(opCtx, donorStateDoc);
- // TODO: Send recipientSyncData and wait for success response (i.e. the recipient's view of
- // the data has become consistent).
+ sendRecipientSyncDataCommand(opCtx, donorStateDoc);
// Enter "blocking" state.
mtab = startBlockingWritesForTenant(opCtx, donorStateDoc);
@@ -351,6 +405,6 @@ void onWriteToDatabase(OperationContext* opCtx, StringData dbName) {
}
}
-} // namespace tenant_migration
+} // namespace tenant_migration_donor
} // namespace mongo