diff options
author | Tyler Seip <Tyler.Seip@mongodb.com> | 2021-11-19 14:30:19 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-03-21 19:04:50 +0000 |
commit | 71ba91ab5cdbcd3c7536a9a97c267e1492875b15 (patch) | |
tree | 1c8b3a5f19a0fa8f11cfd9a91a307600dc13c228 | |
parent | 1eca071330ff75bcba8c4d45c7192ef9cb220ee8 (diff) | |
download | mongo-71ba91ab5cdbcd3c7536a9a97c267e1492875b15.tar.gz |
SERVER-60679: Parse proxy protocol header on proxied connections
(cherry picked from commit 3a18d295d22b377cc7bc4c97bd3b6884d065bb85)
-rw-r--r-- | src/mongo/db/s/sharding_logging.cpp | 5 | ||||
-rw-r--r-- | src/mongo/s/SConscript | 3 | ||||
-rw-r--r-- | src/mongo/s/load_balancer_support.cpp | 33 | ||||
-rw-r--r-- | src/mongo/s/load_balancer_support.h | 5 | ||||
-rw-r--r-- | src/mongo/s/mongos_main.cpp | 13 | ||||
-rw-r--r-- | src/mongo/s/mongos_server_parameters.idl | 10 | ||||
-rw-r--r-- | src/mongo/transport/mock_session.h | 4 | ||||
-rw-r--r-- | src/mongo/transport/proxy_protocol_header_parser.h | 9 | ||||
-rw-r--r-- | src/mongo/transport/session.h | 5 | ||||
-rw-r--r-- | src/mongo/transport/session_asio.cpp | 44 | ||||
-rw-r--r-- | src/mongo/transport/session_asio.h | 8 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_asio.cpp | 70 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_asio.h | 8 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_manager.cpp | 5 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_manager.h | 8 |
15 files changed, 182 insertions, 48 deletions
diff --git a/src/mongo/db/s/sharding_logging.cpp b/src/mongo/db/s/sharding_logging.cpp index afcd551327e..e6c56ad0538 100644 --- a/src/mongo/db/s/sharding_logging.cpp +++ b/src/mongo/db/s/sharding_logging.cpp @@ -127,8 +127,11 @@ Status ShardingLogging::_log(OperationContext* opCtx, const BSONObj& detail, const WriteConcernOptions& writeConcern) { Date_t now = Grid::get(opCtx)->getNetwork()->now(); + + const auto& session = opCtx->getClient()->session(); + const int port = session ? session->local().port() : serverGlobalParams.port; const std::string serverName = str::stream() - << Grid::get(opCtx)->getNetwork()->getHostName() << ":" << serverGlobalParams.port; + << Grid::get(opCtx)->getNetwork()->getHostName() << ":" << port; const std::string changeId = str::stream() << serverName << "-" << now.toString() << "-" << OID::gen(); diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index 4eeb40e6e38..ecd759ebaee 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -149,7 +149,8 @@ env.Library( '$BUILD_DIR/mongo/db/server_options_core', '$BUILD_DIR/mongo/db/service_context', '$BUILD_DIR/mongo/idl/feature_flag', - "load_balancer_feature_flag", + 'load_balancer_feature_flag', + 'mongos_server_parameters', ], ) diff --git a/src/mongo/s/load_balancer_support.cpp b/src/mongo/s/load_balancer_support.cpp index 5eaf7131123..b6c9a50dd05 100644 --- a/src/mongo/s/load_balancer_support.cpp +++ b/src/mongo/s/load_balancer_support.cpp @@ -37,6 +37,7 @@ #include "mongo/db/repl/hello_gen.h" #include "mongo/db/service_context.h" #include "mongo/s/load_balancer_feature_flag_gen.h" +#include "mongo/s/mongos_server_parameters_gen.h" #include "mongo/util/assert_util.h" #include "mongo/util/fail_point.h" @@ -56,12 +57,7 @@ struct PerService { class PerClient { public: - bool isFromLoadBalancer() const { - if (MONGO_unlikely(clientIsFromLoadBalancer.shouldFail())) { - return true; - } - return _isFromLoadBalancer; - } + bool isFromLoadBalancer() const; void setIsFromLoadBalancer() { _isFromLoadBalancer = true; @@ -96,6 +92,19 @@ private: const auto getPerServiceState = ServiceContext::declareDecoration<PerService>(); const auto getPerClientState = Client::declareDecoration<PerClient>(); + +bool PerClient::isFromLoadBalancer() const { + if (!isEnabled()) { + return false; + } + if (MONGO_unlikely(clientIsFromLoadBalancer.shouldFail())) { + return true; + } + const auto& session = getPerClientState.owner(this)->session(); + + return session && session->isFromLoadBalancer(); +} + } // namespace bool isEnabled() { @@ -103,11 +112,13 @@ bool isEnabled() { serverGlobalParams.featureCompatibility); } -void setClientIsFromLoadBalancer(Client* client) { - if (!isEnabled()) - return; - auto& perClient = getPerClientState(client); - perClient.setIsFromLoadBalancer(); +boost::optional<int> getLoadBalancerPort() { + if (isEnabled()) { + auto val = loadBalancerPort.load(); + if (val != 0) + return val; + } + return {}; } void handleHello(OperationContext* opCtx, BSONObjBuilder* result, bool helloHasLoadBalancedOption) { diff --git a/src/mongo/s/load_balancer_support.h b/src/mongo/s/load_balancer_support.h index 57928e8d6e3..59f4aae3c5d 100644 --- a/src/mongo/s/load_balancer_support.h +++ b/src/mongo/s/load_balancer_support.h @@ -35,10 +35,9 @@ namespace mongo::load_balancer_support { /** - * When a connection is made, we identify whether it came in through a load - * balancer. We associate this information with the `client`. + * Gets the load balancer port, if we are configured to enable one. */ -void setClientIsFromLoadBalancer(Client* client); +boost::optional<int> getLoadBalancerPort(); /** * Helper for handling the `hello` command on mongos. diff --git a/src/mongo/s/mongos_main.cpp b/src/mongo/s/mongos_main.cpp index 817d26bc3c4..f5074a77f15 100644 --- a/src/mongo/s/mongos_main.cpp +++ b/src/mongo/s/mongos_main.cpp @@ -83,6 +83,7 @@ #include "mongo/s/config_server_catalog_cache_loader.h" #include "mongo/s/grid.h" #include "mongo/s/is_mongos.h" +#include "mongo/s/load_balancer_support.h" #include "mongo/s/mongos_options.h" #include "mongo/s/mongos_server_parameters_gen.h" #include "mongo/s/mongos_topology_coordinator.h" @@ -663,8 +664,16 @@ ExitCode runMongosServer(ServiceContext* serviceContext) { serviceContext->setServiceEntryPoint(std::make_unique<ServiceEntryPointMongos>(serviceContext)); - auto tl = - transport::TransportLayerManager::createWithConfig(&serverGlobalParams, serviceContext); + const auto loadBalancerPort = load_balancer_support::getLoadBalancerPort(); + if (loadBalancerPort && *loadBalancerPort == serverGlobalParams.port) { + LOGV2_ERROR(6067901, + "Load balancer port must be different from the normal ingress port.", + "port"_attr = serverGlobalParams.port); + quickExit(EXIT_BADOPTIONS); + } + + auto tl = transport::TransportLayerManager::createWithConfig( + &serverGlobalParams, serviceContext, loadBalancerPort); auto res = tl->setup(); if (!res.isOK()) { LOGV2_ERROR(22856, diff --git a/src/mongo/s/mongos_server_parameters.idl b/src/mongo/s/mongos_server_parameters.idl index da4bf6673d8..e51b9ccb393 100644 --- a/src/mongo/s/mongos_server_parameters.idl +++ b/src/mongo/s/mongos_server_parameters.idl @@ -62,3 +62,13 @@ server_parameters: default: 15000 validator: gte: 0 + + loadBalancerPort: + description: >- + The port over which proxied connections using the Proxy Protocol may be made. + A value of 0 disables the port entirely. + set_at: [ startup ] + cpp_vartype: AtomicWord<int> + cpp_varname: "loadBalancerPort" + default: 0 + validator: { gte: 0, lte: 65535 } diff --git a/src/mongo/transport/mock_session.h b/src/mongo/transport/mock_session.h index 2b5eeefb35d..139db86c4ad 100644 --- a/src/mongo/transport/mock_session.h +++ b/src/mongo/transport/mock_session.h @@ -78,6 +78,10 @@ public: return true; } + bool isFromLoadBalancer() const override { + return false; + } + #ifdef MONGO_CONFIG_SSL const SSLConfiguration* getSSLConfiguration() const override { return nullptr; diff --git a/src/mongo/transport/proxy_protocol_header_parser.h b/src/mongo/transport/proxy_protocol_header_parser.h index f3a3e6126ab..4ccbec882fa 100644 --- a/src/mongo/transport/proxy_protocol_header_parser.h +++ b/src/mongo/transport/proxy_protocol_header_parser.h @@ -44,6 +44,12 @@ namespace mongo::transport { /** + * The maximum number of bytes ever needed by a proxy protocol header; represents + * the minimum TCP MTU. + */ +static constexpr size_t kProxyProtocolHeaderSizeUpperBound = 536; + +/** * Represents the true endpoints that a proxy using the Proxy Protocol is proxying for us. */ struct ProxiedEndpoints { @@ -77,9 +83,6 @@ struct ParserResults { boost::optional<ParserResults> parseProxyProtocolHeader(StringData buffer); namespace proxy_protocol_details { -// The maximum number of bytes ever needed by a proxy protocol header; represents -// the minimum TCP MTU. -static constexpr size_t kBytesToFetch = 536; static constexpr size_t kMaxUnixPathLength = 108; template <typename AddrUn = sockaddr_un> diff --git a/src/mongo/transport/session.h b/src/mongo/transport/session.h index 445c0fef78b..12a8c983615 100644 --- a/src/mongo/transport/session.h +++ b/src/mongo/transport/session.h @@ -157,6 +157,11 @@ public: */ virtual bool isConnected() = 0; + /** + * Returns true if this session was connected through an L4 load balancer. + */ + virtual bool isFromLoadBalancer() const = 0; + virtual const HostAndPort& remote() const = 0; virtual const HostAndPort& local() const = 0; diff --git a/src/mongo/transport/session_asio.cpp b/src/mongo/transport/session_asio.cpp index c9eee2cb5c0..c0c9dfe0a6a 100644 --- a/src/mongo/transport/session_asio.cpp +++ b/src/mongo/transport/session_asio.cpp @@ -34,7 +34,9 @@ #include "mongo/config.h" #include "mongo/logv2/log.h" #include "mongo/transport/asio_utils.h" +#include "mongo/transport/proxy_protocol_header_parser.h" #include "mongo/util/assert_util.h" +#include "mongo/util/future_util.h" namespace mongo::transport { @@ -118,6 +120,10 @@ TransportLayerASIO::ASIOSession::ASIOSession( } _local = HostAndPort(_localAddr.toString(true)); + if (tl->loadBalancerPort()) { + _isFromLoadBalancer = _local.port() == *tl->loadBalancerPort(); + } + _remote = HostAndPort(_remoteAddr.toString(true)); #ifdef MONGO_CONFIG_SSL _sslContext = transientSSLContext ? transientSSLContext : *tl->_sslContext; @@ -377,6 +383,44 @@ auto TransportLayerASIO::ASIOSession::getSocket() -> GenericSocket& { return _socket; } +ExecutorFuture<void> TransportLayerASIO::ASIOSession::parseProxyProtocolHeader( + const ReactorHandle& reactor) { + invariant(_isIngressSession); + invariant(reactor); + auto buffer = std::make_shared<std::array<char, kProxyProtocolHeaderSizeUpperBound>>(); + return AsyncTry([this, buffer] { + const auto bytesRead = peekASIOStream( + _socket, asio::buffer(buffer->data(), kProxyProtocolHeaderSizeUpperBound)); + return transport::parseProxyProtocolHeader(StringData(buffer->data(), bytesRead)); + }) + .until([](StatusWith<boost::optional<ParserResults>> sw) { + return !sw.isOK() || sw.getValue(); + }) + .on(reactor, CancellationToken::uncancelable()) + .then([this, buffer](const boost::optional<ParserResults>& results) mutable { + invariant(results); + + // There may not be any endpoints if this connection is directly + // from the proxy itself or the information isn't available. + if (results->endpoints) { + _proxiedSrcEndpoint = results->endpoints->sourceAddress; + _proxiedDstEndpoint = results->endpoints->destinationAddress; + } else { + _proxiedSrcEndpoint = {}; + _proxiedDstEndpoint = {}; + } + + // Drain the read buffer. + opportunisticRead(_socket, asio::buffer(buffer.get(), results->bytesParsed)).get(); + }) + .onError([this](Status s) { + LOGV2_ERROR( + 6067900, "Error while parsing proxy protocol header", "error"_attr = redact(s)); + end(); + return s; + }); +} + Future<Message> TransportLayerASIO::ASIOSession::sourceMessageImpl(const BatonHandle& baton) { static constexpr auto kHeaderSize = sizeof(MSGHEADER::Value); diff --git a/src/mongo/transport/session_asio.h b/src/mongo/transport/session_asio.h index 59248cde259..6b85ffc5b0b 100644 --- a/src/mongo/transport/session_asio.h +++ b/src/mongo/transport/session_asio.h @@ -135,6 +135,10 @@ public: bool isConnected() override; + bool isFromLoadBalancer() const override { + return _isFromLoadBalancer; + } + #ifdef MONGO_CONFIG_SSL const SSLConfiguration* getSSLConfiguration() const override; @@ -164,6 +168,7 @@ protected: private: GenericSocket& getSocket(); + ExecutorFuture<void> parseProxyProtocolHeader(const ReactorHandle& reactor); Future<Message> sourceMessageImpl(const BatonHandle& baton = nullptr); template <typename MutableBufferSequence> @@ -255,6 +260,9 @@ private: TransportLayerASIO* const _tl; bool _isIngressSession; + bool _isFromLoadBalancer = false; + boost::optional<SockAddr> _proxiedSrcEndpoint; + boost::optional<SockAddr> _proxiedDstEndpoint; }; } // namespace mongo::transport diff --git a/src/mongo/transport/transport_layer_asio.cpp b/src/mongo/transport/transport_layer_asio.cpp index 26f4bdd5d00..cfdd234d0a1 100644 --- a/src/mongo/transport/transport_layer_asio.cpp +++ b/src/mongo/transport/transport_layer_asio.cpp @@ -282,8 +282,10 @@ private: thread_local TransportLayerASIO::ASIOReactor* TransportLayerASIO::ASIOReactor::_reactorForThread = nullptr; -TransportLayerASIO::Options::Options(const ServerGlobalParams* params) +TransportLayerASIO::Options::Options(const ServerGlobalParams* params, + boost::optional<int> loadBalancerPort) : port(params->port), + loadBalancerPort(loadBalancerPort), ipList(params->bind_ips), #ifndef _WIN32 useUnixSockets(!params->noUnixSocket), @@ -957,7 +959,11 @@ Status TransportLayerASIO::setup() { #ifndef _WIN32 if (_listenerOptions.useUnixSockets && _listenerOptions.isIngress()) { - listenAddrs.emplace_back(makeUnixSockPath(_listenerOptions.port)); + listenAddrs.push_back(makeUnixSockPath(_listenerOptions.port)); + + if (_listenerOptions.loadBalancerPort) { + listenAddrs.push_back(makeUnixSockPath(*_listenerOptions.loadBalancerPort)); + } } #endif @@ -973,28 +979,35 @@ Status TransportLayerASIO::setup() { _listenerPort = _listenerOptions.port; WrappedResolver resolver(*_acceptorReactor); + std::vector<int> ports = {_listenerPort}; + if (_listenerOptions.loadBalancerPort) { + ports.push_back(*_listenerOptions.loadBalancerPort); + } + // Self-deduplicating list of unique endpoint addresses. std::set<WrappedEndpoint> endpoints; - for (auto& ip : listenAddrs) { - if (ip.empty()) { - LOGV2_WARNING(23020, "Skipping empty bind address"); - continue; - } + for (const auto& port : ports) { + for (const auto& listenAddr : listenAddrs) { + if (listenAddr.empty()) { + LOGV2_WARNING(23020, "Skipping empty bind address"); + continue; + } - auto swAddrs = - resolver.resolve(HostAndPort(ip, _listenerPort), _listenerOptions.enableIPv6); - if (!swAddrs.isOK()) { - LOGV2_WARNING(23021, - "Found no addresses for {peer}", - "Found no addresses for peer", - "peer"_attr = swAddrs.getStatus()); - continue; + const auto& swAddrs = + resolver.resolve(HostAndPort(listenAddr, port), _listenerOptions.enableIPv6); + if (!swAddrs.isOK()) { + LOGV2_WARNING(23021, + "Found no addresses for {peer}", + "Found no addresses for peer", + "peer"_attr = swAddrs.getStatus()); + continue; + } + const auto& addrs = swAddrs.getValue(); + endpoints.insert(addrs.begin(), addrs.end()); } - auto& addrs = swAddrs.getValue(); - endpoints.insert(addrs.begin(), addrs.end()); } - for (auto& addr : endpoints) { + for (const auto& addr : endpoints) { #ifndef _WIN32 if (addr.family() == AF_UNIX) { if (::unlink(addr.toString().c_str()) == -1 && errno != ENOENT) { @@ -1018,9 +1031,15 @@ Status TransportLayerASIO::setup() { } catch (std::exception&) { // Allow the server to start when "ipv6: true" and "bindIpAll: true", but the platform // does not support ipv6 (e.g., ipv6 kernel module is not loaded in Linux). - const auto bindAllIPv6Addr = ":::"_sd + std::to_string(_listenerPort); + auto bindAllFmt = [](auto p) { return fmt::format(":::{}", p); }; + bool addrIsBindAll = addr.toString() == bindAllFmt(_listenerPort); + + if (!addrIsBindAll && _listenerOptions.loadBalancerPort) { + addrIsBindAll = (addr.toString() == bindAllFmt(*_listenerOptions.loadBalancerPort)); + } + if (errno == EAFNOSUPPORT && _listenerOptions.enableIPv6 && addr.family() == AF_INET6 && - addr.toString() == bindAllIPv6Addr) { + addrIsBindAll) { LOGV2_WARNING(4206501, "Failed to bind to address as the platform does not support ipv6", "Failed to bind to {address} as the platform does not support ipv6", @@ -1264,7 +1283,16 @@ void TransportLayerASIO::_acceptConnection(GenericAcceptor& acceptor) { try { std::shared_ptr<ASIOSession> session( new ASIOSession(this, std::move(peerSocket), true)); - _sep->startSession(std::move(session)); + if (session->isFromLoadBalancer()) { + session->parseProxyProtocolHeader(_acceptorReactor) + .getAsync([this, session = std::move(session)](Status s) { + if (s.isOK()) { + _sep->startSession(std::move(session)); + } + }); + } else { + _sep->startSession(std::move(session)); + } } catch (const asio::system_error& e) { // Swallow connection reset errors. Connection reset errors classically present as // asio::error::eof, but can bubble up as asio::error::invalid_argument when calling diff --git a/src/mongo/transport/transport_layer_asio.h b/src/mongo/transport/transport_layer_asio.h index 34fbb23820c..adedd3eb308 100644 --- a/src/mongo/transport/transport_layer_asio.h +++ b/src/mongo/transport/transport_layer_asio.h @@ -91,7 +91,8 @@ public: constexpr static auto kIngress = 0x1; constexpr static auto kEgress = 0x10; - explicit Options(const ServerGlobalParams* params); + explicit Options(const ServerGlobalParams* params) : Options(params, {}) {} + Options(const ServerGlobalParams* params, boost::optional<int> loadBalancerPort); Options() = default; int mode = kIngress | kEgress; @@ -105,6 +106,7 @@ public: } int port = ServerGlobalParams::DefaultDBPort; // port to bind to + boost::optional<int> loadBalancerPort; // accepts load balancer connections std::vector<std::string> ipList; // addresses to bind to #ifndef _WIN32 bool useUnixSockets = true; // whether to allow UNIX sockets in ipList @@ -145,6 +147,10 @@ public: return _listenerPort; } + boost::optional<int> loadBalancerPort() const { + return _listenerOptions.loadBalancerPort; + } + #ifdef __linux__ BatonHandle makeBaton(OperationContext* opCtx) const override; #endif diff --git a/src/mongo/transport/transport_layer_manager.cpp b/src/mongo/transport/transport_layer_manager.cpp index 536c7b39148..d209026db82 100644 --- a/src/mongo/transport/transport_layer_manager.cpp +++ b/src/mongo/transport/transport_layer_manager.cpp @@ -134,12 +134,11 @@ std::unique_ptr<TransportLayer> TransportLayerManager::makeAndStartDefaultEgress } std::unique_ptr<TransportLayer> TransportLayerManager::createWithConfig( - const ServerGlobalParams* config, ServiceContext* ctx) { + const ServerGlobalParams* config, ServiceContext* ctx, boost::optional<int> loadBalancerPort) { auto sep = ctx->getServiceEntryPoint(); - transport::TransportLayerASIO::Options opts(config); + transport::TransportLayerASIO::Options opts(config, loadBalancerPort); opts.transportMode = transport::Mode::kSynchronous; - std::vector<std::unique_ptr<TransportLayer>> retVector; retVector.emplace_back(std::make_unique<transport::TransportLayerASIO>(opts, sep)); return std::make_unique<TransportLayerManager>(std::move(retVector)); diff --git a/src/mongo/transport/transport_layer_manager.h b/src/mongo/transport/transport_layer_manager.h index 3cc6538a6c5..8bcd645ff5d 100644 --- a/src/mongo/transport/transport_layer_manager.h +++ b/src/mongo/transport/transport_layer_manager.h @@ -39,6 +39,8 @@ #include "mongo/util/hierarchical_acquisition.h" #include "mongo/util/time_support.h" +#include <boost/optional.hpp> + namespace mongo { struct ServerGlobalParams; class ServiceContext; @@ -94,8 +96,10 @@ public: * serviceContext->setTransportLayer(std::move(tl)); * serviceContext->getTransportLayer->start(); */ - static std::unique_ptr<TransportLayer> createWithConfig(const ServerGlobalParams* config, - ServiceContext* ctx); + static std::unique_ptr<TransportLayer> createWithConfig( + const ServerGlobalParams* config, + ServiceContext* ctx, + boost::optional<int> loadBalancerPort = {}); static std::unique_ptr<TransportLayer> makeAndStartDefaultEgressTransportLayer(); |