summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLuis Osta <luis.osta@mongodb.com>2020-08-04 17:03:47 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-08-04 18:07:13 +0000
commit0e3bd22e59a51dcdfc7fdac6d96dcda22e2e6647 (patch)
tree9fe03e0ad3a5f88e68919122ac34a310d3b56143
parentb4db8c01a13fd70997a05857be17548b0adec020 (diff)
downloadmongo-0e3bd22e59a51dcdfc7fdac6d96dcda22e2e6647.tar.gz
SERVER-49789 Make tenant migration donor use a StreamableReplicaSetMonitor to monitor and send commands to the recipient
-rw-r--r--jstests/replsets/db_reads_while_recovering_all_commands.js1
-rw-r--r--jstests/replsets/reads_during_tenant_migration.js24
-rw-r--r--jstests/replsets/tenant_migration_donor_state_machine.js28
-rw-r--r--jstests/replsets/writes_during_tenant_migration.js23
-rw-r--r--jstests/sharding/read_write_concern_defaults_application.js1
-rw-r--r--src/mongo/db/commands/SConscript18
-rw-r--r--src/mongo/db/commands/tenant_migration_donor_cmds.cpp (renamed from src/mongo/db/commands/tenant_migration_cmds.cpp)4
-rw-r--r--src/mongo/db/commands/tenant_migration_donor_cmds.idl (renamed from src/mongo/db/commands/tenant_migration_cmds.idl)0
-rw-r--r--src/mongo/db/commands/tenant_migration_recipient_cmds.cpp73
-rw-r--r--src/mongo/db/commands/tenant_migration_recipient_cmds.idl64
-rw-r--r--src/mongo/db/op_observer_impl.cpp2
-rw-r--r--src/mongo/db/repl/SConscript4
-rw-r--r--src/mongo/db/repl/oplog.cpp2
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_util.cpp64
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_util.h6
-rw-r--r--src/mongo/db/service_entry_point_common.cpp10
-rw-r--r--src/mongo/util/uuid.h12
17 files changed, 290 insertions, 46 deletions
diff --git a/jstests/replsets/db_reads_while_recovering_all_commands.js b/jstests/replsets/db_reads_while_recovering_all_commands.js
index 467b06c1363..6670bc1458c 100644
--- a/jstests/replsets/db_reads_while_recovering_all_commands.js
+++ b/jstests/replsets/db_reads_while_recovering_all_commands.js
@@ -241,6 +241,7 @@ const allCommands = {
prepareTransaction: {skip: isPrimaryOnly},
profile: {skip: isPrimaryOnly},
reapLogicalSessionCacheNow: {skip: isNotAUserDataRead},
+ recipientSyncData: {skip: isPrimaryOnly},
refreshLogicalSessionCacheNow: {skip: isNotAUserDataRead},
refreshSessions: {skip: isNotAUserDataRead},
reIndex: {skip: isNotAUserDataRead},
diff --git a/jstests/replsets/reads_during_tenant_migration.js b/jstests/replsets/reads_during_tenant_migration.js
index 49f602a30c1..7fc6d455cb1 100644
--- a/jstests/replsets/reads_during_tenant_migration.js
+++ b/jstests/replsets/reads_during_tenant_migration.js
@@ -14,8 +14,19 @@
load("jstests/libs/fail_point_util.js");
load("jstests/libs/parallelTester.js");
+const donorRst = new ReplSetTest(
+ {nodes: [{}, {rsConfig: {priority: 0}}, {rsConfig: {priority: 0}}], name: 'donor'});
+const recipientRst = new ReplSetTest({nodes: 1, name: 'recipient'});
+
+donorRst.startSet();
+donorRst.initiate();
+recipientRst.startSet();
+recipientRst.initiate();
+
+const kCollName = "testColl";
+const kRecipientConnString = recipientRst.getURL();
+
const kMaxTimeMS = 5 * 1000;
-const kRecipientConnString = "testConnString";
const kConfigDonorsNS = "config.tenantMigrationDonors";
function startMigration(host, dbName, recipientConnString) {
@@ -355,12 +366,6 @@ const testCases = {
}
};
-const rst = new ReplSetTest({nodes: [{}, {rsConfig: {priority: 0}}, {rsConfig: {priority: 0}}]});
-rst.startSet();
-rst.initiate();
-
-const kCollName = "testColl";
-
// Run test cases.
const testFuncs = {
inCommitted: testReadIsRejectedIfSentAfterMigrationHasCommitted,
@@ -373,9 +378,10 @@ const testFuncs = {
for (const [testName, testFunc] of Object.entries(testFuncs)) {
for (const [commandName, testCase] of Object.entries(testCases)) {
let dbName = commandName + "-" + testName + "0";
- testFunc(rst, testCase, dbName, kCollName);
+ testFunc(donorRst, testCase, dbName, kCollName);
}
}
-rst.stopSet();
+donorRst.stopSet();
+recipientRst.stopSet();
})();
diff --git a/jstests/replsets/tenant_migration_donor_state_machine.js b/jstests/replsets/tenant_migration_donor_state_machine.js
index 831b1697e7d..c4d73d7506f 100644
--- a/jstests/replsets/tenant_migration_donor_state_machine.js
+++ b/jstests/replsets/tenant_migration_donor_state_machine.js
@@ -19,14 +19,17 @@ const accessState = {
kReject: 3
};
-const donorRst =
- new ReplSetTest({nodes: [{}, {rsConfig: {priority: 0}}, {rsConfig: {priority: 0}}]});
-const recipientRst = new ReplSetTest({nodes: 1});
+const donorRst = new ReplSetTest(
+ {nodes: [{}, {rsConfig: {priority: 0}}, {rsConfig: {priority: 0}}], name: 'donor'});
+const recipientRst = new ReplSetTest({nodes: 1, name: 'recipient'});
donorRst.startSet();
donorRst.initiate();
+recipientRst.startSet();
+recipientRst.initiate();
const donorPrimary = donorRst.getPrimary();
+const recipientPrimary = recipientRst.getPrimary();
const kRecipientConnString = recipientRst.getURL();
const kDBPrefix = 'testDb';
const kConfigDonorsNS = "config.tenantMigrationDonors";
@@ -77,6 +80,15 @@ const kConfigDonorsNS = "config.tenantMigrationDonors";
donorPrimary.getDB("local").oplog.rs.findOne({ns: kConfigDonorsNS, op: "u", o: donorDoc});
assert.eq(donorDoc.state, "committed");
assert.eq(donorDoc.commitOrAbortOpTime.ts, commitOplogEntry.ts);
+
+ const donorRecipientMonitorPoolStats =
+ donorPrimary.adminCommand({connPoolStats: 1}).replicaSets;
+ assert.eq(Object.keys(donorRecipientMonitorPoolStats).length, 0);
+
+ const recipientSyncDataMetrics =
+ recipientPrimary.adminCommand({serverStatus: 1}).metrics.commands.recipientSyncData;
+ assert.eq(recipientSyncDataMetrics.failed, 0);
+ assert.neq(recipientSyncDataMetrics.total, 0);
})();
(() => {
@@ -103,7 +115,17 @@ const kConfigDonorsNS = "config.tenantMigrationDonors";
donorPrimary.getDB("local").oplog.rs.findOne({ns: kConfigDonorsNS, op: "u", o: donorDoc});
assert.eq(donorDoc.state, "aborted");
assert.eq(donorDoc.commitOrAbortOpTime.ts, abortOplogEntry.ts);
+
+ const donorRecipientMonitorPoolStats =
+ donorPrimary.adminCommand({connPoolStats: 1}).replicaSets;
+ assert.eq(Object.keys(donorRecipientMonitorPoolStats).length, 0);
+
+ const recipientSyncDataMetrics =
+ recipientPrimary.adminCommand({serverStatus: 1}).metrics.commands.recipientSyncData;
+ assert.eq(recipientSyncDataMetrics.failed, 0);
+ assert.neq(recipientSyncDataMetrics.total, 0);
})();
donorRst.stopSet();
+recipientRst.stopSet();
})();
diff --git a/jstests/replsets/writes_during_tenant_migration.js b/jstests/replsets/writes_during_tenant_migration.js
index cddd67c04fc..14582bd8319 100644
--- a/jstests/replsets/writes_during_tenant_migration.js
+++ b/jstests/replsets/writes_during_tenant_migration.js
@@ -11,6 +11,18 @@
load("jstests/libs/fail_point_util.js");
load("jstests/libs/parallelTester.js");
+const donorRst = new ReplSetTest({nodes: 1, name: 'donor'});
+const recipientRst = new ReplSetTest({nodes: 1, name: 'recipient'});
+
+donorRst.startSet();
+donorRst.initiate();
+recipientRst.startSet();
+recipientRst.initiate();
+
+const primary = donorRst.getPrimary();
+const kRecipientConnString = recipientRst.getURL();
+const kCollName = "testColl";
+
const kTestDoc = {
x: -1
};
@@ -28,7 +40,6 @@ const kTestIndex = {
const kNumInitialDocs = 2; // num initial docs to insert into test collections.
const kMaxSize = 1024; // max size of capped collections.
const kTxnNumber = NumberLong(0);
-const kRecipientConnString = "testConnString";
const kMaxTimeMS = 1 * 1000;
function startMigration(host, dbName, recipientConnString) {
@@ -810,13 +821,6 @@ const testCases = {
whatsmyuri: {skip: isNotRunOnUserDatabase}
};
-const rst = new ReplSetTest({nodes: 1});
-rst.startSet();
-rst.initiate();
-const primary = rst.getPrimary();
-
-const kCollName = "testColl";
-
// Validate test cases for all commands.
for (let command of Object.keys(testCases)) {
validateTestCase(testCases[command]);
@@ -857,5 +861,6 @@ for (const [testName, testFunc] of Object.entries(testFuncs)) {
}
}
-rst.stopSet();
+donorRst.stopSet();
+recipientRst.stopSet();
})();
diff --git a/jstests/sharding/read_write_concern_defaults_application.js b/jstests/sharding/read_write_concern_defaults_application.js
index 40b76f35a48..c45fbc86f43 100644
--- a/jstests/sharding/read_write_concern_defaults_application.js
+++ b/jstests/sharding/read_write_concern_defaults_application.js
@@ -502,6 +502,7 @@ let testCases = {
profile: {skip: "does not accept read or write concern"},
reIndex: {skip: "does not accept read or write concern"},
reapLogicalSessionCacheNow: {skip: "does not accept read or write concern"},
+ recipientSyncData: {skip: "does not accept read or write concern"},
refineCollectionShardKey: {skip: "does not accept read or write concern"},
refreshLogicalSessionCacheNow: {skip: "does not accept read or write concern"},
refreshSessions: {skip: "does not accept read or write concern"},
diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript
index a4a1ebdc4ca..70f9707fb8f 100644
--- a/src/mongo/db/commands/SConscript
+++ b/src/mongo/db/commands/SConscript
@@ -409,16 +409,17 @@ env.Library(
"sharded_index_consistency_server_status.cpp",
"shutdown_d.cpp",
"snapshot_management.cpp",
+ "tenant_migration_donor_cmds.cpp",
+ "tenant_migration_recipient_cmds.cpp",
"top_command.cpp",
- "tenant_migration_cmds.cpp",
"txn_cmds.cpp",
"user_management_commands.cpp",
"vote_commit_index_build_command.cpp",
env.Idlc('internal_rename_if_options_and_indexes_match.idl')[0],
- env.Idlc('tenant_migration_cmds.idl')[0],
env.Idlc('vote_commit_index_build.idl')[0],
],
LIBDEPS=[
+ 'tenant_migration_cmds_request',
'txn_cmd_request',
],
LIBDEPS_PRIVATE=[
@@ -445,6 +446,7 @@ env.Library(
'$BUILD_DIR/mongo/db/repl/dbcheck',
'$BUILD_DIR/mongo/db/repl/oplog',
'$BUILD_DIR/mongo/db/repl/repl_coordinator_interface',
+ '$BUILD_DIR/mongo/db/repl/tenant_migration_donor',
'$BUILD_DIR/mongo/db/rw_concern_d',
'$BUILD_DIR/mongo/db/s/sharding_runtime_d',
'$BUILD_DIR/mongo/db/server_options_core',
@@ -527,6 +529,18 @@ env.Library(
)
env.Library(
+ target='tenant_migration_cmds_request',
+ source=[
+ env.Idlc('tenant_migration_donor_cmds.idl')[0],
+ env.Idlc('tenant_migration_recipient_cmds.idl')[0],
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/idl/idl_parser',
+ '$BUILD_DIR/mongo/client/read_preference',
+ ]
+)
+env.Library(
target='txn_cmd_request',
source=[
env.Idlc("txn_cmds.idl")[0],
diff --git a/src/mongo/db/commands/tenant_migration_cmds.cpp b/src/mongo/db/commands/tenant_migration_donor_cmds.cpp
index 9b12ae0ad99..16e773b2e0f 100644
--- a/src/mongo/db/commands/tenant_migration_cmds.cpp
+++ b/src/mongo/db/commands/tenant_migration_donor_cmds.cpp
@@ -28,7 +28,7 @@
*/
#include "mongo/db/commands.h"
-#include "mongo/db/commands/tenant_migration_cmds_gen.h"
+#include "mongo/db/commands/tenant_migration_donor_cmds_gen.h"
#include "mongo/db/repl/tenant_migration_donor_util.h"
namespace mongo {
@@ -51,7 +51,7 @@ public:
requestBody.getDatabasePrefix().toString(),
TenantMigrationDonorStateEnum::kDataSync);
- tenant_migration::startMigration(opCtx, donorStateDoc);
+ tenant_migration_donor::startMigration(opCtx, donorStateDoc);
}
diff --git a/src/mongo/db/commands/tenant_migration_cmds.idl b/src/mongo/db/commands/tenant_migration_donor_cmds.idl
index 96fff6640db..96fff6640db 100644
--- a/src/mongo/db/commands/tenant_migration_cmds.idl
+++ b/src/mongo/db/commands/tenant_migration_donor_cmds.idl
diff --git a/src/mongo/db/commands/tenant_migration_recipient_cmds.cpp b/src/mongo/db/commands/tenant_migration_recipient_cmds.cpp
new file mode 100644
index 00000000000..1ea9c6e8994
--- /dev/null
+++ b/src/mongo/db/commands/tenant_migration_recipient_cmds.cpp
@@ -0,0 +1,73 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/commands.h"
+#include "mongo/db/commands/tenant_migration_recipient_cmds_gen.h"
+
+namespace mongo {
+namespace {
+
+class RecipientSyncDataCmd : public TypedCommand<RecipientSyncDataCmd> {
+public:
+ using Request = RecipientSyncData;
+
+ class Invocation : public InvocationBase {
+
+ public:
+ using InvocationBase::InvocationBase;
+ void typedRun(OperationContext* opCtx) {}
+
+
+ void doCheckAuthorization(OperationContext* opCtx) const {}
+
+ private:
+ bool supportsWriteConcern() const override {
+ return false;
+ }
+ NamespaceString ns() const {
+ return NamespaceString(request().getDbName(), "");
+ }
+ };
+
+ std::string help() const {
+ return "";
+ }
+ bool adminOnly() const override {
+ return true;
+ }
+
+ BasicCommand::AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
+ return BasicCommand::AllowedOnSecondary::kNever;
+ }
+
+} recipientSyncDataCmd;
+
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/commands/tenant_migration_recipient_cmds.idl b/src/mongo/db/commands/tenant_migration_recipient_cmds.idl
new file mode 100644
index 00000000000..4d267f500e2
--- /dev/null
+++ b/src/mongo/db/commands/tenant_migration_recipient_cmds.idl
@@ -0,0 +1,64 @@
+# Copyright (C) 2020-present MongoDB, Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the Server Side Public License, version 1,
+# as published by MongoDB, Inc.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# Server Side Public License for more details.
+#
+# You should have received a copy of the Server Side Public License
+# along with this program. If not, see
+# <http://www.mongodb.com/licensing/server-side-public-license>.
+#
+# As a special exception, the copyright holders give permission to link the
+# code of portions of this program with the OpenSSL library under certain
+# conditions as described in each individual source file and distribute
+# linked combinations including the program with the OpenSSL library. You
+# must comply with the Server Side Public License in all respects for
+# all of the code used other than as permitted herein. If you modify file(s)
+# with this exception, you may extend this exception to your version of the
+# file(s), but you are not obligated to do so. If you do not wish to do so,
+# delete this exception statement from your version. If you delete this
+# exception statement from all source files in the program, then also delete
+# it in the license file.
+#
+
+global:
+ cpp_namespace: "mongo"
+ cpp_includes:
+ - "mongo/client/read_preference.h"
+ - "mongo/db/repl/tenant_migration_util.h"
+
+imports:
+ - "mongo/client/read_preference_setting.idl"
+ - "mongo/idl/basic_types.idl"
+ - "mongo/s/sharding_types.idl"
+ - "mongo/db/repl/replication_types.idl"
+
+commands:
+ recipientSyncData:
+ description: "Parser for the 'recipientSyncData' command."
+ strict: true
+ namespace: ignored
+ fields:
+ migrationId:
+ description: "Unique identifier for the tenant migration."
+ type: uuid
+ donorConnectionString:
+ description: "The URI string that the donor will utilize to create a connection with the recipient."
+ type: string
+ databasePrefix:
+ description: "The prefix from which the migrating database will be matched. The prefixes 'admin', 'local', 'config', the empty string, are not allowed."
+ type: string
+ validator:
+ callback: "validateDbPrefix"
+ readPreference:
+ description: "The read preference settings that the donor will pass on to the recipient."
+ type: readPreference
+ returnAfterReachingOpTime:
+ description: "If provided, the recipient should return after syncing up to this OpTime. Otherwise, the recipient will return once its copy of the data is consistent."
+ type: timestamp
+ optional: true
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 6f8d1479823..193fa3885da 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -574,7 +574,7 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
ReadWriteConcernDefaults::get(opCtx).observeDirectWriteToConfigSettings(
opCtx, args.updateArgs.updatedDoc["_id"], args.updateArgs.updatedDoc);
} else if (args.nss == NamespaceString::kTenantMigrationDonorsNamespace) {
- tenant_migration::onDonorStateTransition(opCtx, args.updateArgs.updatedDoc);
+ tenant_migration_donor::onDonorStateTransition(opCtx, args.updateArgs.updatedDoc);
}
}
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 25d80ca26f0..a63a654fc25 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -44,6 +44,7 @@ env.Library(
'repl_coordinator_interface',
'repl_server_parameters',
'repl_settings',
+ 'tenant_migration_donor',
'timestamp_block',
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/db/catalog/catalog_helpers',
@@ -55,7 +56,6 @@ env.Library(
'$BUILD_DIR/mongo/db/dbdirectclient',
'$BUILD_DIR/mongo/db/dbhelpers',
'$BUILD_DIR/mongo/db/op_observer',
- '$BUILD_DIR/mongo/db/repl/tenant_migration_donor',
'$BUILD_DIR/mongo/db/stats/counters',
'$BUILD_DIR/mongo/db/stats/server_read_concern_write_concern_metrics',
'$BUILD_DIR/mongo/db/transaction',
@@ -1241,8 +1241,10 @@ env.Library(
],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/client/remote_command_targeter',
'$BUILD_DIR/mongo/db/catalog_raii',
'$BUILD_DIR/mongo/db/commands/server_status',
+ '$BUILD_DIR/mongo/db/commands/tenant_migration_cmds_request',
'$BUILD_DIR/mongo/db/dbhelpers',
'$BUILD_DIR/mongo/db/service_context',
'$BUILD_DIR/mongo/db/rw_concern_d',
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index d3749f0be47..61ce9f4da5d 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -230,7 +230,7 @@ void _logOpsInner(OperationContext* opCtx,
// The oplogEntry for renameCollection has nss set to the fromCollection's ns. renameCollection
// can be across databases, but a tenant will never be able to rename into a database with a
// different prefix, so it is safe to use the fromCollection's db's prefix for this check.
- tenant_migration::onWriteToDatabase(opCtx, nss.db());
+ tenant_migration_donor::onWriteToDatabase(opCtx, nss.db());
Status result = oplogCollection->insertDocumentsForOplog(opCtx, records, timestamps);
if (!result.isOK()) {
diff --git a/src/mongo/db/repl/tenant_migration_donor_util.cpp b/src/mongo/db/repl/tenant_migration_donor_util.cpp
index c4c0577909a..c33ab2714f8 100644
--- a/src/mongo/db/repl/tenant_migration_donor_util.cpp
+++ b/src/mongo/db/repl/tenant_migration_donor_util.cpp
@@ -27,11 +27,17 @@
* it in the license file.
*/
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplication
+
#include "mongo/platform/basic.h"
#include "mongo/util/str.h"
#include "mongo/db/repl/tenant_migration_donor_util.h"
+#include "mongo/client/connection_string.h"
+#include "mongo/client/remote_command_targeter_rs.h"
+#include "mongo/client/replica_set_monitor.h"
+#include "mongo/db/commands/tenant_migration_recipient_cmds_gen.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbhelpers.h"
@@ -41,12 +47,13 @@
#include "mongo/db/repl/tenant_migration_state_machine_gen.h"
#include "mongo/executor/network_interface_factory.h"
#include "mongo/executor/thread_pool_task_executor.h"
+#include "mongo/logv2/log.h"
#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/fail_point.h"
namespace mongo {
-namespace tenant_migration {
+namespace tenant_migration_donor {
namespace {
@@ -113,7 +120,7 @@ void onTransitionToBlocking(OperationContext* opCtx, TenantMigrationDonorDocumen
mtab = std::make_shared<TenantMigrationAccessBlocker>(
opCtx->getServiceContext(),
- tenant_migration::makeTenantMigrationExecutor(opCtx->getServiceContext()),
+ tenant_migration_donor::makeTenantMigrationExecutor(opCtx->getServiceContext()),
donorStateDoc.getDatabasePrefix().toString());
mtabByPrefix.add(donorStateDoc.getDatabasePrefix(), mtab);
mtab->startBlockingWrites();
@@ -238,6 +245,54 @@ void updateDonorStateDocument(OperationContext* opCtx,
}));
}
+void sendRecipientSyncDataCommand(OperationContext* opCtx,
+ const TenantMigrationDonorDocument& donorStateDoc) {
+ const ConnectionString recipientConnectionString =
+ ConnectionString(donorStateDoc.getRecipientConnectionString().toString(),
+ ConnectionString::ConnectionType::SET);
+ auto dataSyncExecutor = makeTenantMigrationExecutor(opCtx->getServiceContext());
+ dataSyncExecutor->startup();
+
+ auto removeReplicaSetMonitorForRecipientGuard =
+ makeGuard([&] { ReplicaSetMonitor::remove(recipientConnectionString.getSetName()); });
+
+ // Create the command BSON for the recipientSyncData request.
+ BSONObj cmdObj = BSONObj([&]() {
+ const auto donorConnectionString =
+ repl::ReplicationCoordinator::get(opCtx)->getConfig().getConnectionString().toString();
+
+ RecipientSyncData request(donorStateDoc.getId(),
+ donorConnectionString,
+ donorStateDoc.getDatabasePrefix().toString(),
+ ReadPreferenceSetting());
+
+ return request.toBSON(BSONObj());
+ }());
+
+ // Find the host and port of the recipient's primary.
+ HostAndPort recipientHost([&]() {
+ auto targeter = RemoteCommandTargeterRS(recipientConnectionString.getSetName(),
+ recipientConnectionString.getServers());
+ return uassertStatusOK(targeter.findHost(opCtx, ReadPreferenceSetting()));
+ }());
+
+ executor::RemoteCommandRequest request(recipientHost,
+ NamespaceString::kAdminDb.toString(),
+ std::move(cmdObj),
+ rpc::makeEmptyMetadata(),
+ nullptr,
+ Seconds(30));
+
+ executor::RemoteCommandResponse response =
+ Status(ErrorCodes::InternalError, "Internal error running command");
+
+ executor::TaskExecutor::CallbackHandle cbHandle =
+ uassertStatusOK(dataSyncExecutor->scheduleRemoteCommand(
+ request, [&response](const auto& args) { response = args.response; }));
+
+ dataSyncExecutor->wait(cbHandle, opCtx);
+ uassertStatusOK(getStatusFromCommandResult(response.data));
+}
} // namespace
void startMigration(OperationContext* opCtx, TenantMigrationDonorDocument donorStateDoc) {
@@ -255,8 +310,7 @@ void startMigration(OperationContext* opCtx, TenantMigrationDonorDocument donorS
// Enter "dataSync" state.
insertDonorStateDocument(opCtx, donorStateDoc);
- // TODO: Send recipientSyncData and wait for success response (i.e. the recipient's view of
- // the data has become consistent).
+ sendRecipientSyncDataCommand(opCtx, donorStateDoc);
// Enter "blocking" state.
mtab = startBlockingWritesForTenant(opCtx, donorStateDoc);
@@ -351,6 +405,6 @@ void onWriteToDatabase(OperationContext* opCtx, StringData dbName) {
}
}
-} // namespace tenant_migration
+} // namespace tenant_migration_donor
} // namespace mongo
diff --git a/src/mongo/db/repl/tenant_migration_donor_util.h b/src/mongo/db/repl/tenant_migration_donor_util.h
index 5055bbaec8b..cd42d025fae 100644
--- a/src/mongo/db/repl/tenant_migration_donor_util.h
+++ b/src/mongo/db/repl/tenant_migration_donor_util.h
@@ -30,7 +30,7 @@
#pragma once
#include "mongo/db/commands.h"
-#include "mongo/db/commands/tenant_migration_cmds_gen.h"
+#include "mongo/db/commands/tenant_migration_donor_cmds_gen.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/tenant_migration_access_blocker_by_prefix.h"
#include "mongo/db/repl/tenant_migration_conflict_info.h"
@@ -41,7 +41,7 @@
namespace mongo {
-namespace tenant_migration {
+namespace tenant_migration_donor {
/**
* Starts a tenant migration as defined in the given donor's state document.
@@ -114,6 +114,6 @@ void migrationConflictRetry(OperationContext* opCtx,
}
}
-} // namespace tenant_migration
+} // namespace tenant_migration_donor
} // namespace mongo
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index 612139a2bb9..6717fc5589d 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -519,8 +519,8 @@ void invokeWithNoSession(OperationContext* opCtx,
const OpMsgRequest& request,
CommandInvocation* invocation,
rpc::ReplyBuilderInterface* replyBuilder) {
- tenant_migration::checkIfCanReadOrBlock(opCtx, request.getDatabase());
- tenant_migration::migrationConflictRetry(
+ tenant_migration_donor::checkIfCanReadOrBlock(opCtx, request.getDatabase());
+ tenant_migration_donor::migrationConflictRetry(
opCtx,
[&] { CommandHelpers::runCommandInvocation(opCtx, request, invocation, replyBuilder); },
replyBuilder);
@@ -627,10 +627,10 @@ void invokeWithSessionCheckedOut(OperationContext* opCtx,
}
}
- tenant_migration::checkIfCanReadOrBlock(opCtx, request.getDatabase());
+ tenant_migration_donor::checkIfCanReadOrBlock(opCtx, request.getDatabase());
try {
- tenant_migration::migrationConflictRetry(
+ tenant_migration_donor::migrationConflictRetry(
opCtx,
[&] { CommandHelpers::runCommandInvocation(opCtx, request, invocation, replyBuilder); },
replyBuilder);
@@ -874,7 +874,7 @@ bool runCommandImpl(OperationContext* opCtx,
});
behaviors.waitForLinearizableReadConcern(opCtx);
- tenant_migration::checkIfLinearizableReadWasAllowedOrThrow(opCtx, request.getDatabase());
+ tenant_migration_donor::checkIfLinearizableReadWasAllowedOrThrow(opCtx, request.getDatabase());
// Wait for data to satisfy the read concern level, if necessary.
behaviors.waitForSpeculativeMajorityReadConcern(opCtx);
diff --git a/src/mongo/util/uuid.h b/src/mongo/util/uuid.h
index 7fb8471be2c..4330eee758e 100644
--- a/src/mongo/util/uuid.h
+++ b/src/mongo/util/uuid.h
@@ -65,9 +65,10 @@ class UUID {
// Make the IDL generated parser a friend
friend class ConfigsvrShardCollectionResponse;
- friend class ShardsvrShardCollectionResponse;
- friend class ShardsvrRenameCollection;
friend class CommonReshardingMetadata;
+ friend class DonorStartMigration;
+ friend class DonorWaitForMigrationToCommit;
+ friend class DonorForgetMigration;
friend class DatabaseVersion;
friend class DbCheckOplogCollection;
friend class EncryptionPlaceholder;
@@ -86,15 +87,16 @@ class UUID {
friend class repl::OplogEntryBase;
friend class repl::DurableReplOperation;
friend class repl::InitialSyncIdDocument;
+ friend class RecipientSyncData;
friend class ResumeIndexInfo;
friend class ResumeTokenInternal;
friend class ShardCollectionTypeBase;
+ friend class ShardsvrShardCollectionResponse;
+ friend class ShardsvrRenameCollection;
friend class TenantMigrationDonorDocument;
friend class TenantMigrationRecipientDocument;
friend class VoteCommitIndexBuild;
- friend class DonorStartMigration;
- friend class DonorWaitForMigrationToCommit;
- friend class DonorForgetMigration;
+
public:
/**