diff options
author | Cheahuychou Mao <mao.cheahuychou@gmail.com> | 2020-12-16 17:09:45 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-01-05 05:58:56 +0000 |
commit | 1dfe8355a2b034ded045191f4e3d4be827365621 (patch) | |
tree | dc6342e153caa69d5c59c71a6d8f1fdbfae37267 /src/mongo | |
parent | 2640ec0b0d800bf0a1e761fdb9c249b5d1ffa86a (diff) | |
download | mongo-1dfe8355a2b034ded045191f4e3d4be827365621.tar.gz |
SERVER-52706 Make tenant migration donor use a separate NetworkInterface and x509 certificate to connect to recipient
Diffstat (limited to 'src/mongo')
6 files changed, 94 insertions, 14 deletions
diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp index 43d7e2e1fe0..827b2c16a88 100644 --- a/src/mongo/db/repl/tenant_migration_donor_service.cpp +++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp @@ -33,6 +33,7 @@ #include "mongo/client/connection_string.h" #include "mongo/client/replica_set_monitor.h" +#include "mongo/config.h" #include "mongo/db/commands/tenant_migration_recipient_cmds_gen.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/db_raii.h" @@ -43,8 +44,11 @@ #include "mongo/db/repl/tenant_migration_donor_util.h" #include "mongo/db/repl/tenant_migration_state_machine_gen.h" #include "mongo/db/repl/wait_for_majority_service.h" +#include "mongo/executor/network_interface_factory.h" +#include "mongo/executor/thread_pool_task_executor.h" #include "mongo/logv2/log.h" #include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/rpc/metadata/egress_metadata_hook_list.h" #include "mongo/util/cancelation.h" #include "mongo/util/future_util.h" @@ -112,8 +116,49 @@ ExecutorFuture<void> TenantMigrationDonorService::_rebuildService( TenantMigrationDonorService::Instance::Instance(ServiceContext* serviceContext, const BSONObj& initialState) - : repl::PrimaryOnlyService::TypedInstance<Instance>(), _serviceContext(serviceContext) { - _stateDoc = tenant_migration_donor::parseDonorStateDocument(initialState); + : repl::PrimaryOnlyService::TypedInstance<Instance>(), + _serviceContext(serviceContext), + _stateDoc(tenant_migration_donor::parseDonorStateDocument(initialState)), + _instanceName(kServiceName + "-" + _stateDoc.getTenantId()), + _recipientUri( + uassertStatusOK(MongoURI::parse(_stateDoc.getRecipientConnectionString().toString()))) { + ThreadPool::Options threadPoolOptions(_recipientCmdThreadPoolLimit); + threadPoolOptions.threadNamePrefix = _instanceName + "-"; + threadPoolOptions.poolName = _instanceName + "ThreadPool"; + threadPoolOptions.onCreateThread = [this](const std::string& threadName) { + Client::initThread(threadName.c_str()); + auto client = Client::getCurrent(); + AuthorizationSession::get(*client)->grantInternalAuthorization(&cc()); + + // Ideally, we should also associate the client created by _recipientCmdExecutor with the + // TenantMigrationDonorService to make the opCtxs created by the task executor get + // registered in the TenantMigrationDonorService, and killed on stepdown. But that would + // require passing the pointer to the TenantMigrationService into the Instance and making + // constructInstance not const so we can set the client's decoration here. Right now there + // is no need for that since the task executor is only used with scheduleRemoteCommand and + // no opCtx will be created (the cancelation token is responsible for canceling the + // outstanding work on the task executor). + stdx::lock_guard<Client> lk(*client); + client->setSystemOperationKillableByStepdown(lk); + }; + + auto hookList = std::make_unique<rpc::EgressMetadataHookList>(); + + auto connPoolOptions = executor::ConnectionPool::Options(); +#ifdef MONGO_CONFIG_SSL + auto donorCertificate = _stateDoc.getDonorCertificateForRecipient(); + auto donorSSLClusterPEMPayload = donorCertificate.getCertificate().toString() + "\n" + + donorCertificate.getPrivateKey().toString(); + connPoolOptions.transientSSLParams = + TransientSSLParams{_recipientUri.connectionString(), std::move(donorSSLClusterPEMPayload)}; +#endif + + _recipientCmdExecutor = std::make_shared<executor::ThreadPoolTaskExecutor>( + std::make_unique<ThreadPool>(threadPoolOptions), + executor::makeNetworkInterface( + _instanceName + "-Network", nullptr, std::move(hookList), connPoolOptions)); + _recipientCmdExecutor->startup(); + if (_stateDoc.getState() > TenantMigrationDonorStateEnum::kUninitialized) { // The migration was resumed on stepup. stdx::lock_guard<Latch> lg(_mutex); @@ -134,6 +179,15 @@ TenantMigrationDonorService::Instance::~Instance() { stdx::lock_guard<Latch> lg(_mutex); invariant(_initialDonorStateDurablePromise.getFuture().isReady()); invariant(_receiveDonorForgetMigrationPromise.getFuture().isReady()); + + // Unlike the TenantMigrationDonorService's scoped task executor which is shut down on stepdown + // and joined on stepup, _recipientCmdExecutor is only shut down and joined when the Instance + // is destroyed. This is safe since ThreadPoolTaskExecutor::shutdown() only cancels the + // outstanding work on the task executor which the cancelation token will already do, and the + // Instance will be destroyed on stepup so this is equivalent to joining the task executor on + // stepup. + _recipientCmdExecutor->shutdown(); + _recipientCmdExecutor->join(); } boost::optional<BSONObj> TenantMigrationDonorService::Instance::reportForCurrentOp( @@ -423,7 +477,9 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendCommandToRecipi nullptr, kRecipientSyncDataTimeout); - return (**executor) + request.sslMode = transport::kEnableSSL; + + return (_recipientCmdExecutor) ->scheduleRemoteCommand(std::move(request), token) .then([this, self = shared_from_this()](const auto& response) -> Status { @@ -487,10 +543,8 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientForget SemiFuture<void> TenantMigrationDonorService::Instance::run( std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancelationToken& token) noexcept { - auto recipientUri = - uassertStatusOK(MongoURI::parse(_stateDoc.getRecipientConnectionString().toString())); - auto recipientTargeterRS = std::make_shared<RemoteCommandTargeterRS>(recipientUri.getSetName(), - recipientUri.getServers()); + auto recipientTargeterRS = std::make_shared<RemoteCommandTargeterRS>( + _recipientUri.getSetName(), _recipientUri.getServers()); return ExecutorFuture<void>(**executor) .then([this, self = shared_from_this(), executor, token] { @@ -628,7 +682,7 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run( "migrationId"_attr = _stateDoc.getId(), "expireAt"_attr = _stateDoc.getExpireAt()); - stdx::lock_guard lk(_mutex); + stdx::lock_guard<Latch> lg(_mutex); if (_completionPromise.getFuture().isReady()) { // interrupt() was called before we got here return; diff --git a/src/mongo/db/repl/tenant_migration_donor_service.h b/src/mongo/db/repl/tenant_migration_donor_service.h index 2f0f613a343..1e6630f1c9a 100644 --- a/src/mongo/db/repl/tenant_migration_donor_service.h +++ b/src/mongo/db/repl/tenant_migration_donor_service.h @@ -76,7 +76,7 @@ public: boost::optional<Status> abortReason; }; - Instance(ServiceContext* serviceContext, const BSONObj& initialState); + explicit Instance(ServiceContext* serviceContext, const BSONObj& initialState); ~Instance(); @@ -176,6 +176,15 @@ public: ServiceContext* _serviceContext; TenantMigrationDonorDocument _stateDoc; + const std::string _instanceName; + const MongoURI _recipientUri; + + // Task executor used for executing commands against the recipient using SSL connection + // created using the migration-specific certificate. + std::shared_ptr<executor::TaskExecutor> _recipientCmdExecutor; + // TODO (SERVER-50438): Limit the size of TenantMigrationDonorService thread pool. + const ThreadPool::Limits _recipientCmdThreadPoolLimit{}; + boost::optional<Status> _abortReason; // Protects the durable state and the promises below. diff --git a/src/mongo/executor/network_interface_integration_fixture.cpp b/src/mongo/executor/network_interface_integration_fixture.cpp index 7577c7c2a1e..62e452e8c69 100644 --- a/src/mongo/executor/network_interface_integration_fixture.cpp +++ b/src/mongo/executor/network_interface_integration_fixture.cpp @@ -179,9 +179,11 @@ RemoteCommandResponse NetworkInterfaceIntegrationFixture::runCommandSync( void NetworkInterfaceIntegrationFixture::assertCommandOK(StringData db, const BSONObj& cmd, - Milliseconds timeoutMillis) { + Milliseconds timeoutMillis, + transport::ConnectSSLMode sslMode) { RemoteCommandRequest request{ fixture().getServers()[0], db.toString(), cmd, BSONObj(), nullptr, timeoutMillis}; + request.sslMode = sslMode; auto res = runCommandSync(request); ASSERT_OK(res.status); ASSERT_OK(getStatusFromCommandResult(res.data)); diff --git a/src/mongo/executor/network_interface_integration_fixture.h b/src/mongo/executor/network_interface_integration_fixture.h index c98b7e70393..226bce34a5c 100644 --- a/src/mongo/executor/network_interface_integration_fixture.h +++ b/src/mongo/executor/network_interface_integration_fixture.h @@ -97,7 +97,8 @@ public: void assertCommandOK(StringData db, const BSONObj& cmd, - Milliseconds timeoutMillis = Minutes(5)); + Milliseconds timeoutMillis = Minutes(5), + transport::ConnectSSLMode sslMode = transport::kGlobalSSLMode); void assertCommandFailsOnClient(StringData db, const BSONObj& cmd, ErrorCodes::Error reason, diff --git a/src/mongo/transport/transport_layer_asio.cpp b/src/mongo/transport/transport_layer_asio.cpp index 12f21911b56..02d7e7b6655 100644 --- a/src/mongo/transport/transport_layer_asio.cpp +++ b/src/mongo/transport/transport_layer_asio.cpp @@ -575,6 +575,13 @@ Future<SessionHandle> TransportLayerASIO::asyncConnect( const ReactorHandle& reactor, Milliseconds timeout, std::shared_ptr<const SSLConnectionContext> transientSSLContext) { + if (transientSSLContext) { + uassert(ErrorCodes::InvalidSSLConfiguration, + "Specified transient SSL context but connection SSL mode is not set", + sslMode == kEnableSSL); + LOGV2_DEBUG( + 5270601, 2, "Connecting to peer using transient SSL connection", "peer"_attr = peer); + } struct AsyncConnectState { AsyncConnectState(HostAndPort peer, @@ -1242,6 +1249,14 @@ TransportLayerASIO::_createSSLContext(std::shared_ptr<SSLManagerInterface>& mana } if (_listenerOptions.isEgress() && newSSLContext->manager) { + if (!transientEgressSSLParams.sslClusterPEMPayload.empty()) { + LOGV2_DEBUG(5270602, + 2, + "Initializing transient egress SSL context", + "targetClusterConnectionString"_attr = + transientEgressSSLParams.targetedClusterConnectionString); + } + newSSLContext->egress = std::make_unique<asio::ssl::context>(asio::ssl::context::sslv23); Status status = newSSLContext->manager->initSSLContext( newSSLContext->egress->native_handle(), diff --git a/src/mongo/util/net/network_interface_ssl_test.cpp b/src/mongo/util/net/network_interface_ssl_test.cpp index b6eeebf1295..afe6eeb6b18 100644 --- a/src/mongo/util/net/network_interface_ssl_test.cpp +++ b/src/mongo/util/net/network_interface_ssl_test.cpp @@ -60,8 +60,6 @@ public: internalSecurity.user = user; - // Force all connections to use SSL for outgoing - sslGlobalParams.sslMode.store(static_cast<int>(SSLParams::SSLModes::SSLMode_requireSSL)); sslGlobalParams.sslCAFile = "jstests/libs/ca.pem"; // Set a client cert that should be ignored if we use the transient cert correctly. sslGlobalParams.sslPEMKeyFile = "jstests/libs/client.pem"; @@ -84,7 +82,8 @@ public: }; TEST_F(NetworkInterfaceSSLFixture, Ping) { - assertCommandOK("admin", BSON("ping" << 1)); + assertCommandOK( + "admin", BSON("ping" << 1), RemoteCommandRequest::kNoTimeout, transport::kEnableSSL); } |