summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTyler Seip <Tyler.Seip@mongodb.com>2021-11-19 14:30:19 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-03-21 19:04:50 +0000
commit71ba91ab5cdbcd3c7536a9a97c267e1492875b15 (patch)
tree1c8b3a5f19a0fa8f11cfd9a91a307600dc13c228
parent1eca071330ff75bcba8c4d45c7192ef9cb220ee8 (diff)
downloadmongo-71ba91ab5cdbcd3c7536a9a97c267e1492875b15.tar.gz
SERVER-60679: Parse proxy protocol header on proxied connections
(cherry picked from commit 3a18d295d22b377cc7bc4c97bd3b6884d065bb85)
-rw-r--r--src/mongo/db/s/sharding_logging.cpp5
-rw-r--r--src/mongo/s/SConscript3
-rw-r--r--src/mongo/s/load_balancer_support.cpp33
-rw-r--r--src/mongo/s/load_balancer_support.h5
-rw-r--r--src/mongo/s/mongos_main.cpp13
-rw-r--r--src/mongo/s/mongos_server_parameters.idl10
-rw-r--r--src/mongo/transport/mock_session.h4
-rw-r--r--src/mongo/transport/proxy_protocol_header_parser.h9
-rw-r--r--src/mongo/transport/session.h5
-rw-r--r--src/mongo/transport/session_asio.cpp44
-rw-r--r--src/mongo/transport/session_asio.h8
-rw-r--r--src/mongo/transport/transport_layer_asio.cpp70
-rw-r--r--src/mongo/transport/transport_layer_asio.h8
-rw-r--r--src/mongo/transport/transport_layer_manager.cpp5
-rw-r--r--src/mongo/transport/transport_layer_manager.h8
15 files changed, 182 insertions, 48 deletions
diff --git a/src/mongo/db/s/sharding_logging.cpp b/src/mongo/db/s/sharding_logging.cpp
index afcd551327e..e6c56ad0538 100644
--- a/src/mongo/db/s/sharding_logging.cpp
+++ b/src/mongo/db/s/sharding_logging.cpp
@@ -127,8 +127,11 @@ Status ShardingLogging::_log(OperationContext* opCtx,
const BSONObj& detail,
const WriteConcernOptions& writeConcern) {
Date_t now = Grid::get(opCtx)->getNetwork()->now();
+
+ const auto& session = opCtx->getClient()->session();
+ const int port = session ? session->local().port() : serverGlobalParams.port;
const std::string serverName = str::stream()
- << Grid::get(opCtx)->getNetwork()->getHostName() << ":" << serverGlobalParams.port;
+ << Grid::get(opCtx)->getNetwork()->getHostName() << ":" << port;
const std::string changeId = str::stream()
<< serverName << "-" << now.toString() << "-" << OID::gen();
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index 4eeb40e6e38..ecd759ebaee 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -149,7 +149,8 @@ env.Library(
'$BUILD_DIR/mongo/db/server_options_core',
'$BUILD_DIR/mongo/db/service_context',
'$BUILD_DIR/mongo/idl/feature_flag',
- "load_balancer_feature_flag",
+ 'load_balancer_feature_flag',
+ 'mongos_server_parameters',
],
)
diff --git a/src/mongo/s/load_balancer_support.cpp b/src/mongo/s/load_balancer_support.cpp
index 5eaf7131123..b6c9a50dd05 100644
--- a/src/mongo/s/load_balancer_support.cpp
+++ b/src/mongo/s/load_balancer_support.cpp
@@ -37,6 +37,7 @@
#include "mongo/db/repl/hello_gen.h"
#include "mongo/db/service_context.h"
#include "mongo/s/load_balancer_feature_flag_gen.h"
+#include "mongo/s/mongos_server_parameters_gen.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/fail_point.h"
@@ -56,12 +57,7 @@ struct PerService {
class PerClient {
public:
- bool isFromLoadBalancer() const {
- if (MONGO_unlikely(clientIsFromLoadBalancer.shouldFail())) {
- return true;
- }
- return _isFromLoadBalancer;
- }
+ bool isFromLoadBalancer() const;
void setIsFromLoadBalancer() {
_isFromLoadBalancer = true;
@@ -96,6 +92,19 @@ private:
const auto getPerServiceState = ServiceContext::declareDecoration<PerService>();
const auto getPerClientState = Client::declareDecoration<PerClient>();
+
+bool PerClient::isFromLoadBalancer() const {
+ if (!isEnabled()) {
+ return false;
+ }
+ if (MONGO_unlikely(clientIsFromLoadBalancer.shouldFail())) {
+ return true;
+ }
+ const auto& session = getPerClientState.owner(this)->session();
+
+ return session && session->isFromLoadBalancer();
+}
+
} // namespace
bool isEnabled() {
@@ -103,11 +112,13 @@ bool isEnabled() {
serverGlobalParams.featureCompatibility);
}
-void setClientIsFromLoadBalancer(Client* client) {
- if (!isEnabled())
- return;
- auto& perClient = getPerClientState(client);
- perClient.setIsFromLoadBalancer();
+boost::optional<int> getLoadBalancerPort() {
+ if (isEnabled()) {
+ auto val = loadBalancerPort.load();
+ if (val != 0)
+ return val;
+ }
+ return {};
}
void handleHello(OperationContext* opCtx, BSONObjBuilder* result, bool helloHasLoadBalancedOption) {
diff --git a/src/mongo/s/load_balancer_support.h b/src/mongo/s/load_balancer_support.h
index 57928e8d6e3..59f4aae3c5d 100644
--- a/src/mongo/s/load_balancer_support.h
+++ b/src/mongo/s/load_balancer_support.h
@@ -35,10 +35,9 @@
namespace mongo::load_balancer_support {
/**
- * When a connection is made, we identify whether it came in through a load
- * balancer. We associate this information with the `client`.
+ * Gets the load balancer port, if we are configured to enable one.
*/
-void setClientIsFromLoadBalancer(Client* client);
+boost::optional<int> getLoadBalancerPort();
/**
* Helper for handling the `hello` command on mongos.
diff --git a/src/mongo/s/mongos_main.cpp b/src/mongo/s/mongos_main.cpp
index 817d26bc3c4..f5074a77f15 100644
--- a/src/mongo/s/mongos_main.cpp
+++ b/src/mongo/s/mongos_main.cpp
@@ -83,6 +83,7 @@
#include "mongo/s/config_server_catalog_cache_loader.h"
#include "mongo/s/grid.h"
#include "mongo/s/is_mongos.h"
+#include "mongo/s/load_balancer_support.h"
#include "mongo/s/mongos_options.h"
#include "mongo/s/mongos_server_parameters_gen.h"
#include "mongo/s/mongos_topology_coordinator.h"
@@ -663,8 +664,16 @@ ExitCode runMongosServer(ServiceContext* serviceContext) {
serviceContext->setServiceEntryPoint(std::make_unique<ServiceEntryPointMongos>(serviceContext));
- auto tl =
- transport::TransportLayerManager::createWithConfig(&serverGlobalParams, serviceContext);
+ const auto loadBalancerPort = load_balancer_support::getLoadBalancerPort();
+ if (loadBalancerPort && *loadBalancerPort == serverGlobalParams.port) {
+ LOGV2_ERROR(6067901,
+ "Load balancer port must be different from the normal ingress port.",
+ "port"_attr = serverGlobalParams.port);
+ quickExit(EXIT_BADOPTIONS);
+ }
+
+ auto tl = transport::TransportLayerManager::createWithConfig(
+ &serverGlobalParams, serviceContext, loadBalancerPort);
auto res = tl->setup();
if (!res.isOK()) {
LOGV2_ERROR(22856,
diff --git a/src/mongo/s/mongos_server_parameters.idl b/src/mongo/s/mongos_server_parameters.idl
index da4bf6673d8..e51b9ccb393 100644
--- a/src/mongo/s/mongos_server_parameters.idl
+++ b/src/mongo/s/mongos_server_parameters.idl
@@ -62,3 +62,13 @@ server_parameters:
default: 15000
validator:
gte: 0
+
+ loadBalancerPort:
+ description: >-
+ The port over which proxied connections using the Proxy Protocol may be made.
+ A value of 0 disables the port entirely.
+ set_at: [ startup ]
+ cpp_vartype: AtomicWord<int>
+ cpp_varname: "loadBalancerPort"
+ default: 0
+ validator: { gte: 0, lte: 65535 }
diff --git a/src/mongo/transport/mock_session.h b/src/mongo/transport/mock_session.h
index 2b5eeefb35d..139db86c4ad 100644
--- a/src/mongo/transport/mock_session.h
+++ b/src/mongo/transport/mock_session.h
@@ -78,6 +78,10 @@ public:
return true;
}
+ bool isFromLoadBalancer() const override {
+ return false;
+ }
+
#ifdef MONGO_CONFIG_SSL
const SSLConfiguration* getSSLConfiguration() const override {
return nullptr;
diff --git a/src/mongo/transport/proxy_protocol_header_parser.h b/src/mongo/transport/proxy_protocol_header_parser.h
index f3a3e6126ab..4ccbec882fa 100644
--- a/src/mongo/transport/proxy_protocol_header_parser.h
+++ b/src/mongo/transport/proxy_protocol_header_parser.h
@@ -44,6 +44,12 @@
namespace mongo::transport {
/**
+ * The maximum number of bytes ever needed by a proxy protocol header; represents
+ * the minimum TCP MTU.
+ */
+static constexpr size_t kProxyProtocolHeaderSizeUpperBound = 536;
+
+/**
* Represents the true endpoints that a proxy using the Proxy Protocol is proxying for us.
*/
struct ProxiedEndpoints {
@@ -77,9 +83,6 @@ struct ParserResults {
boost::optional<ParserResults> parseProxyProtocolHeader(StringData buffer);
namespace proxy_protocol_details {
-// The maximum number of bytes ever needed by a proxy protocol header; represents
-// the minimum TCP MTU.
-static constexpr size_t kBytesToFetch = 536;
static constexpr size_t kMaxUnixPathLength = 108;
template <typename AddrUn = sockaddr_un>
diff --git a/src/mongo/transport/session.h b/src/mongo/transport/session.h
index 445c0fef78b..12a8c983615 100644
--- a/src/mongo/transport/session.h
+++ b/src/mongo/transport/session.h
@@ -157,6 +157,11 @@ public:
*/
virtual bool isConnected() = 0;
+ /**
+ * Returns true if this session was connected through an L4 load balancer.
+ */
+ virtual bool isFromLoadBalancer() const = 0;
+
virtual const HostAndPort& remote() const = 0;
virtual const HostAndPort& local() const = 0;
diff --git a/src/mongo/transport/session_asio.cpp b/src/mongo/transport/session_asio.cpp
index c9eee2cb5c0..c0c9dfe0a6a 100644
--- a/src/mongo/transport/session_asio.cpp
+++ b/src/mongo/transport/session_asio.cpp
@@ -34,7 +34,9 @@
#include "mongo/config.h"
#include "mongo/logv2/log.h"
#include "mongo/transport/asio_utils.h"
+#include "mongo/transport/proxy_protocol_header_parser.h"
#include "mongo/util/assert_util.h"
+#include "mongo/util/future_util.h"
namespace mongo::transport {
@@ -118,6 +120,10 @@ TransportLayerASIO::ASIOSession::ASIOSession(
}
_local = HostAndPort(_localAddr.toString(true));
+ if (tl->loadBalancerPort()) {
+ _isFromLoadBalancer = _local.port() == *tl->loadBalancerPort();
+ }
+
_remote = HostAndPort(_remoteAddr.toString(true));
#ifdef MONGO_CONFIG_SSL
_sslContext = transientSSLContext ? transientSSLContext : *tl->_sslContext;
@@ -377,6 +383,44 @@ auto TransportLayerASIO::ASIOSession::getSocket() -> GenericSocket& {
return _socket;
}
+ExecutorFuture<void> TransportLayerASIO::ASIOSession::parseProxyProtocolHeader(
+ const ReactorHandle& reactor) {
+ invariant(_isIngressSession);
+ invariant(reactor);
+ auto buffer = std::make_shared<std::array<char, kProxyProtocolHeaderSizeUpperBound>>();
+ return AsyncTry([this, buffer] {
+ const auto bytesRead = peekASIOStream(
+ _socket, asio::buffer(buffer->data(), kProxyProtocolHeaderSizeUpperBound));
+ return transport::parseProxyProtocolHeader(StringData(buffer->data(), bytesRead));
+ })
+ .until([](StatusWith<boost::optional<ParserResults>> sw) {
+ return !sw.isOK() || sw.getValue();
+ })
+ .on(reactor, CancellationToken::uncancelable())
+ .then([this, buffer](const boost::optional<ParserResults>& results) mutable {
+ invariant(results);
+
+ // There may not be any endpoints if this connection is directly
+ // from the proxy itself or the information isn't available.
+ if (results->endpoints) {
+ _proxiedSrcEndpoint = results->endpoints->sourceAddress;
+ _proxiedDstEndpoint = results->endpoints->destinationAddress;
+ } else {
+ _proxiedSrcEndpoint = {};
+ _proxiedDstEndpoint = {};
+ }
+
+ // Drain the read buffer.
+ opportunisticRead(_socket, asio::buffer(buffer.get(), results->bytesParsed)).get();
+ })
+ .onError([this](Status s) {
+ LOGV2_ERROR(
+ 6067900, "Error while parsing proxy protocol header", "error"_attr = redact(s));
+ end();
+ return s;
+ });
+}
+
Future<Message> TransportLayerASIO::ASIOSession::sourceMessageImpl(const BatonHandle& baton) {
static constexpr auto kHeaderSize = sizeof(MSGHEADER::Value);
diff --git a/src/mongo/transport/session_asio.h b/src/mongo/transport/session_asio.h
index 59248cde259..6b85ffc5b0b 100644
--- a/src/mongo/transport/session_asio.h
+++ b/src/mongo/transport/session_asio.h
@@ -135,6 +135,10 @@ public:
bool isConnected() override;
+ bool isFromLoadBalancer() const override {
+ return _isFromLoadBalancer;
+ }
+
#ifdef MONGO_CONFIG_SSL
const SSLConfiguration* getSSLConfiguration() const override;
@@ -164,6 +168,7 @@ protected:
private:
GenericSocket& getSocket();
+ ExecutorFuture<void> parseProxyProtocolHeader(const ReactorHandle& reactor);
Future<Message> sourceMessageImpl(const BatonHandle& baton = nullptr);
template <typename MutableBufferSequence>
@@ -255,6 +260,9 @@ private:
TransportLayerASIO* const _tl;
bool _isIngressSession;
+ bool _isFromLoadBalancer = false;
+ boost::optional<SockAddr> _proxiedSrcEndpoint;
+ boost::optional<SockAddr> _proxiedDstEndpoint;
};
} // namespace mongo::transport
diff --git a/src/mongo/transport/transport_layer_asio.cpp b/src/mongo/transport/transport_layer_asio.cpp
index 26f4bdd5d00..cfdd234d0a1 100644
--- a/src/mongo/transport/transport_layer_asio.cpp
+++ b/src/mongo/transport/transport_layer_asio.cpp
@@ -282,8 +282,10 @@ private:
thread_local TransportLayerASIO::ASIOReactor* TransportLayerASIO::ASIOReactor::_reactorForThread =
nullptr;
-TransportLayerASIO::Options::Options(const ServerGlobalParams* params)
+TransportLayerASIO::Options::Options(const ServerGlobalParams* params,
+ boost::optional<int> loadBalancerPort)
: port(params->port),
+ loadBalancerPort(loadBalancerPort),
ipList(params->bind_ips),
#ifndef _WIN32
useUnixSockets(!params->noUnixSocket),
@@ -957,7 +959,11 @@ Status TransportLayerASIO::setup() {
#ifndef _WIN32
if (_listenerOptions.useUnixSockets && _listenerOptions.isIngress()) {
- listenAddrs.emplace_back(makeUnixSockPath(_listenerOptions.port));
+ listenAddrs.push_back(makeUnixSockPath(_listenerOptions.port));
+
+ if (_listenerOptions.loadBalancerPort) {
+ listenAddrs.push_back(makeUnixSockPath(*_listenerOptions.loadBalancerPort));
+ }
}
#endif
@@ -973,28 +979,35 @@ Status TransportLayerASIO::setup() {
_listenerPort = _listenerOptions.port;
WrappedResolver resolver(*_acceptorReactor);
+ std::vector<int> ports = {_listenerPort};
+ if (_listenerOptions.loadBalancerPort) {
+ ports.push_back(*_listenerOptions.loadBalancerPort);
+ }
+
// Self-deduplicating list of unique endpoint addresses.
std::set<WrappedEndpoint> endpoints;
- for (auto& ip : listenAddrs) {
- if (ip.empty()) {
- LOGV2_WARNING(23020, "Skipping empty bind address");
- continue;
- }
+ for (const auto& port : ports) {
+ for (const auto& listenAddr : listenAddrs) {
+ if (listenAddr.empty()) {
+ LOGV2_WARNING(23020, "Skipping empty bind address");
+ continue;
+ }
- auto swAddrs =
- resolver.resolve(HostAndPort(ip, _listenerPort), _listenerOptions.enableIPv6);
- if (!swAddrs.isOK()) {
- LOGV2_WARNING(23021,
- "Found no addresses for {peer}",
- "Found no addresses for peer",
- "peer"_attr = swAddrs.getStatus());
- continue;
+ const auto& swAddrs =
+ resolver.resolve(HostAndPort(listenAddr, port), _listenerOptions.enableIPv6);
+ if (!swAddrs.isOK()) {
+ LOGV2_WARNING(23021,
+ "Found no addresses for {peer}",
+ "Found no addresses for peer",
+ "peer"_attr = swAddrs.getStatus());
+ continue;
+ }
+ const auto& addrs = swAddrs.getValue();
+ endpoints.insert(addrs.begin(), addrs.end());
}
- auto& addrs = swAddrs.getValue();
- endpoints.insert(addrs.begin(), addrs.end());
}
- for (auto& addr : endpoints) {
+ for (const auto& addr : endpoints) {
#ifndef _WIN32
if (addr.family() == AF_UNIX) {
if (::unlink(addr.toString().c_str()) == -1 && errno != ENOENT) {
@@ -1018,9 +1031,15 @@ Status TransportLayerASIO::setup() {
} catch (std::exception&) {
// Allow the server to start when "ipv6: true" and "bindIpAll: true", but the platform
// does not support ipv6 (e.g., ipv6 kernel module is not loaded in Linux).
- const auto bindAllIPv6Addr = ":::"_sd + std::to_string(_listenerPort);
+ auto bindAllFmt = [](auto p) { return fmt::format(":::{}", p); };
+ bool addrIsBindAll = addr.toString() == bindAllFmt(_listenerPort);
+
+ if (!addrIsBindAll && _listenerOptions.loadBalancerPort) {
+ addrIsBindAll = (addr.toString() == bindAllFmt(*_listenerOptions.loadBalancerPort));
+ }
+
if (errno == EAFNOSUPPORT && _listenerOptions.enableIPv6 && addr.family() == AF_INET6 &&
- addr.toString() == bindAllIPv6Addr) {
+ addrIsBindAll) {
LOGV2_WARNING(4206501,
"Failed to bind to address as the platform does not support ipv6",
"Failed to bind to {address} as the platform does not support ipv6",
@@ -1264,7 +1283,16 @@ void TransportLayerASIO::_acceptConnection(GenericAcceptor& acceptor) {
try {
std::shared_ptr<ASIOSession> session(
new ASIOSession(this, std::move(peerSocket), true));
- _sep->startSession(std::move(session));
+ if (session->isFromLoadBalancer()) {
+ session->parseProxyProtocolHeader(_acceptorReactor)
+ .getAsync([this, session = std::move(session)](Status s) {
+ if (s.isOK()) {
+ _sep->startSession(std::move(session));
+ }
+ });
+ } else {
+ _sep->startSession(std::move(session));
+ }
} catch (const asio::system_error& e) {
// Swallow connection reset errors. Connection reset errors classically present as
// asio::error::eof, but can bubble up as asio::error::invalid_argument when calling
diff --git a/src/mongo/transport/transport_layer_asio.h b/src/mongo/transport/transport_layer_asio.h
index 34fbb23820c..adedd3eb308 100644
--- a/src/mongo/transport/transport_layer_asio.h
+++ b/src/mongo/transport/transport_layer_asio.h
@@ -91,7 +91,8 @@ public:
constexpr static auto kIngress = 0x1;
constexpr static auto kEgress = 0x10;
- explicit Options(const ServerGlobalParams* params);
+ explicit Options(const ServerGlobalParams* params) : Options(params, {}) {}
+ Options(const ServerGlobalParams* params, boost::optional<int> loadBalancerPort);
Options() = default;
int mode = kIngress | kEgress;
@@ -105,6 +106,7 @@ public:
}
int port = ServerGlobalParams::DefaultDBPort; // port to bind to
+ boost::optional<int> loadBalancerPort; // accepts load balancer connections
std::vector<std::string> ipList; // addresses to bind to
#ifndef _WIN32
bool useUnixSockets = true; // whether to allow UNIX sockets in ipList
@@ -145,6 +147,10 @@ public:
return _listenerPort;
}
+ boost::optional<int> loadBalancerPort() const {
+ return _listenerOptions.loadBalancerPort;
+ }
+
#ifdef __linux__
BatonHandle makeBaton(OperationContext* opCtx) const override;
#endif
diff --git a/src/mongo/transport/transport_layer_manager.cpp b/src/mongo/transport/transport_layer_manager.cpp
index 536c7b39148..d209026db82 100644
--- a/src/mongo/transport/transport_layer_manager.cpp
+++ b/src/mongo/transport/transport_layer_manager.cpp
@@ -134,12 +134,11 @@ std::unique_ptr<TransportLayer> TransportLayerManager::makeAndStartDefaultEgress
}
std::unique_ptr<TransportLayer> TransportLayerManager::createWithConfig(
- const ServerGlobalParams* config, ServiceContext* ctx) {
+ const ServerGlobalParams* config, ServiceContext* ctx, boost::optional<int> loadBalancerPort) {
auto sep = ctx->getServiceEntryPoint();
- transport::TransportLayerASIO::Options opts(config);
+ transport::TransportLayerASIO::Options opts(config, loadBalancerPort);
opts.transportMode = transport::Mode::kSynchronous;
-
std::vector<std::unique_ptr<TransportLayer>> retVector;
retVector.emplace_back(std::make_unique<transport::TransportLayerASIO>(opts, sep));
return std::make_unique<TransportLayerManager>(std::move(retVector));
diff --git a/src/mongo/transport/transport_layer_manager.h b/src/mongo/transport/transport_layer_manager.h
index 3cc6538a6c5..8bcd645ff5d 100644
--- a/src/mongo/transport/transport_layer_manager.h
+++ b/src/mongo/transport/transport_layer_manager.h
@@ -39,6 +39,8 @@
#include "mongo/util/hierarchical_acquisition.h"
#include "mongo/util/time_support.h"
+#include <boost/optional.hpp>
+
namespace mongo {
struct ServerGlobalParams;
class ServiceContext;
@@ -94,8 +96,10 @@ public:
* serviceContext->setTransportLayer(std::move(tl));
* serviceContext->getTransportLayer->start();
*/
- static std::unique_ptr<TransportLayer> createWithConfig(const ServerGlobalParams* config,
- ServiceContext* ctx);
+ static std::unique_ptr<TransportLayer> createWithConfig(
+ const ServerGlobalParams* config,
+ ServiceContext* ctx,
+ boost::optional<int> loadBalancerPort = {});
static std::unique_ptr<TransportLayer> makeAndStartDefaultEgressTransportLayer();