summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorCheahuychou Mao <mao.cheahuychou@gmail.com>2020-12-16 17:09:45 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-01-05 05:58:56 +0000
commit1dfe8355a2b034ded045191f4e3d4be827365621 (patch)
treedc6342e153caa69d5c59c71a6d8f1fdbfae37267 /src
parent2640ec0b0d800bf0a1e761fdb9c249b5d1ffa86a (diff)
downloadmongo-1dfe8355a2b034ded045191f4e3d4be827365621.tar.gz
SERVER-52706 Make tenant migration donor use a separate NetworkInterface and x509 certificate to connect to recipient
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_service.cpp70
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_service.h11
-rw-r--r--src/mongo/executor/network_interface_integration_fixture.cpp4
-rw-r--r--src/mongo/executor/network_interface_integration_fixture.h3
-rw-r--r--src/mongo/transport/transport_layer_asio.cpp15
-rw-r--r--src/mongo/util/net/network_interface_ssl_test.cpp5
6 files changed, 94 insertions, 14 deletions
diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp
index 43d7e2e1fe0..827b2c16a88 100644
--- a/src/mongo/db/repl/tenant_migration_donor_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp
@@ -33,6 +33,7 @@
#include "mongo/client/connection_string.h"
#include "mongo/client/replica_set_monitor.h"
+#include "mongo/config.h"
#include "mongo/db/commands/tenant_migration_recipient_cmds_gen.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/db_raii.h"
@@ -43,8 +44,11 @@
#include "mongo/db/repl/tenant_migration_donor_util.h"
#include "mongo/db/repl/tenant_migration_state_machine_gen.h"
#include "mongo/db/repl/wait_for_majority_service.h"
+#include "mongo/executor/network_interface_factory.h"
+#include "mongo/executor/thread_pool_task_executor.h"
#include "mongo/logv2/log.h"
#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/rpc/metadata/egress_metadata_hook_list.h"
#include "mongo/util/cancelation.h"
#include "mongo/util/future_util.h"
@@ -112,8 +116,49 @@ ExecutorFuture<void> TenantMigrationDonorService::_rebuildService(
TenantMigrationDonorService::Instance::Instance(ServiceContext* serviceContext,
const BSONObj& initialState)
- : repl::PrimaryOnlyService::TypedInstance<Instance>(), _serviceContext(serviceContext) {
- _stateDoc = tenant_migration_donor::parseDonorStateDocument(initialState);
+ : repl::PrimaryOnlyService::TypedInstance<Instance>(),
+ _serviceContext(serviceContext),
+ _stateDoc(tenant_migration_donor::parseDonorStateDocument(initialState)),
+ _instanceName(kServiceName + "-" + _stateDoc.getTenantId()),
+ _recipientUri(
+ uassertStatusOK(MongoURI::parse(_stateDoc.getRecipientConnectionString().toString()))) {
+ ThreadPool::Options threadPoolOptions(_recipientCmdThreadPoolLimit);
+ threadPoolOptions.threadNamePrefix = _instanceName + "-";
+ threadPoolOptions.poolName = _instanceName + "ThreadPool";
+ threadPoolOptions.onCreateThread = [this](const std::string& threadName) {
+ Client::initThread(threadName.c_str());
+ auto client = Client::getCurrent();
+ AuthorizationSession::get(*client)->grantInternalAuthorization(&cc());
+
+ // Ideally, we should also associate the client created by _recipientCmdExecutor with the
+ // TenantMigrationDonorService to make the opCtxs created by the task executor get
+ // registered in the TenantMigrationDonorService, and killed on stepdown. But that would
+ // require passing the pointer to the TenantMigrationService into the Instance and making
+ // constructInstance not const so we can set the client's decoration here. Right now there
+ // is no need for that since the task executor is only used with scheduleRemoteCommand and
+ // no opCtx will be created (the cancelation token is responsible for canceling the
+ // outstanding work on the task executor).
+ stdx::lock_guard<Client> lk(*client);
+ client->setSystemOperationKillableByStepdown(lk);
+ };
+
+ auto hookList = std::make_unique<rpc::EgressMetadataHookList>();
+
+ auto connPoolOptions = executor::ConnectionPool::Options();
+#ifdef MONGO_CONFIG_SSL
+ auto donorCertificate = _stateDoc.getDonorCertificateForRecipient();
+ auto donorSSLClusterPEMPayload = donorCertificate.getCertificate().toString() + "\n" +
+ donorCertificate.getPrivateKey().toString();
+ connPoolOptions.transientSSLParams =
+ TransientSSLParams{_recipientUri.connectionString(), std::move(donorSSLClusterPEMPayload)};
+#endif
+
+ _recipientCmdExecutor = std::make_shared<executor::ThreadPoolTaskExecutor>(
+ std::make_unique<ThreadPool>(threadPoolOptions),
+ executor::makeNetworkInterface(
+ _instanceName + "-Network", nullptr, std::move(hookList), connPoolOptions));
+ _recipientCmdExecutor->startup();
+
if (_stateDoc.getState() > TenantMigrationDonorStateEnum::kUninitialized) {
// The migration was resumed on stepup.
stdx::lock_guard<Latch> lg(_mutex);
@@ -134,6 +179,15 @@ TenantMigrationDonorService::Instance::~Instance() {
stdx::lock_guard<Latch> lg(_mutex);
invariant(_initialDonorStateDurablePromise.getFuture().isReady());
invariant(_receiveDonorForgetMigrationPromise.getFuture().isReady());
+
+ // Unlike the TenantMigrationDonorService's scoped task executor which is shut down on stepdown
+ // and joined on stepup, _recipientCmdExecutor is only shut down and joined when the Instance
+ // is destroyed. This is safe since ThreadPoolTaskExecutor::shutdown() only cancels the
+ // outstanding work on the task executor which the cancelation token will already do, and the
+ // Instance will be destroyed on stepup so this is equivalent to joining the task executor on
+ // stepup.
+ _recipientCmdExecutor->shutdown();
+ _recipientCmdExecutor->join();
}
boost::optional<BSONObj> TenantMigrationDonorService::Instance::reportForCurrentOp(
@@ -423,7 +477,9 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendCommandToRecipi
nullptr,
kRecipientSyncDataTimeout);
- return (**executor)
+ request.sslMode = transport::kEnableSSL;
+
+ return (_recipientCmdExecutor)
->scheduleRemoteCommand(std::move(request), token)
.then([this,
self = shared_from_this()](const auto& response) -> Status {
@@ -487,10 +543,8 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientForget
SemiFuture<void> TenantMigrationDonorService::Instance::run(
std::shared_ptr<executor::ScopedTaskExecutor> executor,
const CancelationToken& token) noexcept {
- auto recipientUri =
- uassertStatusOK(MongoURI::parse(_stateDoc.getRecipientConnectionString().toString()));
- auto recipientTargeterRS = std::make_shared<RemoteCommandTargeterRS>(recipientUri.getSetName(),
- recipientUri.getServers());
+ auto recipientTargeterRS = std::make_shared<RemoteCommandTargeterRS>(
+ _recipientUri.getSetName(), _recipientUri.getServers());
return ExecutorFuture<void>(**executor)
.then([this, self = shared_from_this(), executor, token] {
@@ -628,7 +682,7 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run(
"migrationId"_attr = _stateDoc.getId(),
"expireAt"_attr = _stateDoc.getExpireAt());
- stdx::lock_guard lk(_mutex);
+ stdx::lock_guard<Latch> lg(_mutex);
if (_completionPromise.getFuture().isReady()) {
// interrupt() was called before we got here
return;
diff --git a/src/mongo/db/repl/tenant_migration_donor_service.h b/src/mongo/db/repl/tenant_migration_donor_service.h
index 2f0f613a343..1e6630f1c9a 100644
--- a/src/mongo/db/repl/tenant_migration_donor_service.h
+++ b/src/mongo/db/repl/tenant_migration_donor_service.h
@@ -76,7 +76,7 @@ public:
boost::optional<Status> abortReason;
};
- Instance(ServiceContext* serviceContext, const BSONObj& initialState);
+ explicit Instance(ServiceContext* serviceContext, const BSONObj& initialState);
~Instance();
@@ -176,6 +176,15 @@ public:
ServiceContext* _serviceContext;
TenantMigrationDonorDocument _stateDoc;
+ const std::string _instanceName;
+ const MongoURI _recipientUri;
+
+ // Task executor used for executing commands against the recipient using SSL connection
+ // created using the migration-specific certificate.
+ std::shared_ptr<executor::TaskExecutor> _recipientCmdExecutor;
+ // TODO (SERVER-50438): Limit the size of TenantMigrationDonorService thread pool.
+ const ThreadPool::Limits _recipientCmdThreadPoolLimit{};
+
boost::optional<Status> _abortReason;
// Protects the durable state and the promises below.
diff --git a/src/mongo/executor/network_interface_integration_fixture.cpp b/src/mongo/executor/network_interface_integration_fixture.cpp
index 7577c7c2a1e..62e452e8c69 100644
--- a/src/mongo/executor/network_interface_integration_fixture.cpp
+++ b/src/mongo/executor/network_interface_integration_fixture.cpp
@@ -179,9 +179,11 @@ RemoteCommandResponse NetworkInterfaceIntegrationFixture::runCommandSync(
void NetworkInterfaceIntegrationFixture::assertCommandOK(StringData db,
const BSONObj& cmd,
- Milliseconds timeoutMillis) {
+ Milliseconds timeoutMillis,
+ transport::ConnectSSLMode sslMode) {
RemoteCommandRequest request{
fixture().getServers()[0], db.toString(), cmd, BSONObj(), nullptr, timeoutMillis};
+ request.sslMode = sslMode;
auto res = runCommandSync(request);
ASSERT_OK(res.status);
ASSERT_OK(getStatusFromCommandResult(res.data));
diff --git a/src/mongo/executor/network_interface_integration_fixture.h b/src/mongo/executor/network_interface_integration_fixture.h
index c98b7e70393..226bce34a5c 100644
--- a/src/mongo/executor/network_interface_integration_fixture.h
+++ b/src/mongo/executor/network_interface_integration_fixture.h
@@ -97,7 +97,8 @@ public:
void assertCommandOK(StringData db,
const BSONObj& cmd,
- Milliseconds timeoutMillis = Minutes(5));
+ Milliseconds timeoutMillis = Minutes(5),
+ transport::ConnectSSLMode sslMode = transport::kGlobalSSLMode);
void assertCommandFailsOnClient(StringData db,
const BSONObj& cmd,
ErrorCodes::Error reason,
diff --git a/src/mongo/transport/transport_layer_asio.cpp b/src/mongo/transport/transport_layer_asio.cpp
index 12f21911b56..02d7e7b6655 100644
--- a/src/mongo/transport/transport_layer_asio.cpp
+++ b/src/mongo/transport/transport_layer_asio.cpp
@@ -575,6 +575,13 @@ Future<SessionHandle> TransportLayerASIO::asyncConnect(
const ReactorHandle& reactor,
Milliseconds timeout,
std::shared_ptr<const SSLConnectionContext> transientSSLContext) {
+ if (transientSSLContext) {
+ uassert(ErrorCodes::InvalidSSLConfiguration,
+ "Specified transient SSL context but connection SSL mode is not set",
+ sslMode == kEnableSSL);
+ LOGV2_DEBUG(
+ 5270601, 2, "Connecting to peer using transient SSL connection", "peer"_attr = peer);
+ }
struct AsyncConnectState {
AsyncConnectState(HostAndPort peer,
@@ -1242,6 +1249,14 @@ TransportLayerASIO::_createSSLContext(std::shared_ptr<SSLManagerInterface>& mana
}
if (_listenerOptions.isEgress() && newSSLContext->manager) {
+ if (!transientEgressSSLParams.sslClusterPEMPayload.empty()) {
+ LOGV2_DEBUG(5270602,
+ 2,
+ "Initializing transient egress SSL context",
+ "targetClusterConnectionString"_attr =
+ transientEgressSSLParams.targetedClusterConnectionString);
+ }
+
newSSLContext->egress = std::make_unique<asio::ssl::context>(asio::ssl::context::sslv23);
Status status = newSSLContext->manager->initSSLContext(
newSSLContext->egress->native_handle(),
diff --git a/src/mongo/util/net/network_interface_ssl_test.cpp b/src/mongo/util/net/network_interface_ssl_test.cpp
index b6eeebf1295..afe6eeb6b18 100644
--- a/src/mongo/util/net/network_interface_ssl_test.cpp
+++ b/src/mongo/util/net/network_interface_ssl_test.cpp
@@ -60,8 +60,6 @@ public:
internalSecurity.user = user;
- // Force all connections to use SSL for outgoing
- sslGlobalParams.sslMode.store(static_cast<int>(SSLParams::SSLModes::SSLMode_requireSSL));
sslGlobalParams.sslCAFile = "jstests/libs/ca.pem";
// Set a client cert that should be ignored if we use the transient cert correctly.
sslGlobalParams.sslPEMKeyFile = "jstests/libs/client.pem";
@@ -84,7 +82,8 @@ public:
};
TEST_F(NetworkInterfaceSSLFixture, Ping) {
- assertCommandOK("admin", BSON("ping" << 1));
+ assertCommandOK(
+ "admin", BSON("ping" << 1), RemoteCommandRequest::kNoTimeout, transport::kEnableSSL);
}