From 225a2a57350c79a190c13054a038b6c3c559a558 Mon Sep 17 00:00:00 2001 From: Matt Cotter Date: Wed, 27 Jan 2016 16:39:14 -0500 Subject: SERVER-23448 create an ASIO based MessagingPort --- src/mongo/SConscript | 6 +- src/mongo/client/connection_pool.cpp | 2 +- src/mongo/client/dbclient.cpp | 23 +- src/mongo/client/dbclientinterface.h | 6 +- src/mongo/client/scoped_db_conn_test.cpp | 1 - src/mongo/db/client_basic.h | 2 +- src/mongo/db/curop_test.cpp | 2 +- src/mongo/db/db.cpp | 1 + src/mongo/db/dbmessage.h | 2 +- src/mongo/db/repl/oplogreader.cpp | 3 +- src/mongo/db/repl/repl_set_request_votes.cpp | 6 +- ...replication_coordinator_external_state_impl.cpp | 4 +- src/mongo/db/repl/replset_commands.cpp | 6 +- src/mongo/dbtests/mock_dbclient_conn_test.cpp | 1 + src/mongo/s/server.h | 6 +- src/mongo/tools/bridge.cpp | 21 +- src/mongo/util/net/SConscript | 25 +- src/mongo/util/net/abstract_message_port.h | 153 ++++- src/mongo/util/net/asio_message_port.cpp | 619 +++++++++++++++++++++ src/mongo/util/net/asio_message_port.h | 159 ++++++ src/mongo/util/net/asio_ssl_context.cpp | 69 +++ src/mongo/util/net/asio_ssl_context.h | 85 +++ src/mongo/util/net/httpclient.cpp | 1 - src/mongo/util/net/listen.cpp | 35 +- src/mongo/util/net/listen.h | 16 +- src/mongo/util/net/listen_test.cpp | 87 --- src/mongo/util/net/message.cpp | 21 +- src/mongo/util/net/message.h | 8 +- src/mongo/util/net/message_port.cpp | 112 ++-- src/mongo/util/net/message_port.h | 103 ++-- src/mongo/util/net/message_port_mock.cpp | 64 ++- src/mongo/util/net/message_port_mock.h | 58 +- src/mongo/util/net/message_port_startup_param.cpp | 56 ++ src/mongo/util/net/message_port_startup_param.h | 35 ++ src/mongo/util/net/message_server_port.cpp | 51 +- src/mongo/util/net/miniwebserver.cpp | 7 +- src/mongo/util/net/miniwebserver.h | 6 +- src/mongo/util/net/sock.h | 11 + src/mongo/util/net/sockaddr.h | 1 + src/mongo/util/net/ssl_manager.cpp | 4 - src/mongo/util/net/ssl_manager.h | 3 +- 41 files changed, 1545 insertions(+), 336 deletions(-) create mode 100644 src/mongo/util/net/asio_message_port.cpp create mode 100644 src/mongo/util/net/asio_message_port.h create mode 100644 src/mongo/util/net/asio_ssl_context.cpp create mode 100644 src/mongo/util/net/asio_ssl_context.h delete mode 100644 src/mongo/util/net/listen_test.cpp create mode 100644 src/mongo/util/net/message_port_startup_param.cpp create mode 100644 src/mongo/util/net/message_port_startup_param.h (limited to 'src/mongo') diff --git a/src/mongo/SConscript b/src/mongo/SConscript index 398d34de72d..d05a1f9c8a2 100644 --- a/src/mongo/SConscript +++ b/src/mongo/SConscript @@ -16,7 +16,11 @@ Import("use_system_version_of_library") # Boost we need everywhere. 's2' is spammed in all over the place by # db/geo unfortunately. pcre is also used many places. -env.InjectThirdPartyIncludePaths(libraries=['boost', 's2', 'pcre']) +env.InjectThirdPartyIncludePaths(libraries=[ + 'boost', + 'pcre', + 's2', +]) env.InjectMongoIncludePaths() env.SConscript( diff --git a/src/mongo/client/connection_pool.cpp b/src/mongo/client/connection_pool.cpp index 9fa5493824f..cfdb872474c 100644 --- a/src/mongo/client/connection_pool.cpp +++ b/src/mongo/client/connection_pool.cpp @@ -185,7 +185,7 @@ ConnectionPool::ConnectionList::iterator ConnectionPool::acquireConnection( conn->setSoTimeout(durationCount(timeout) / 1000.0); uassertStatusOK(conn->connect(target)); - conn->port().tag |= _messagingPortTags; + conn->port().setTag(conn->port().getTag() | _messagingPortTags); if (isInternalAuthSet()) { conn->auth(getInternalUserAuthParamsWithFallback()); diff --git a/src/mongo/client/dbclient.cpp b/src/mongo/client/dbclient.cpp index 8500868a46f..7e7a7528b76 100644 --- a/src/mongo/client/dbclient.cpp +++ b/src/mongo/client/dbclient.cpp @@ -31,6 +31,7 @@ #include "mongo/platform/basic.h" +#include #include #include "mongo/base/status.h" @@ -63,10 +64,15 @@ #include "mongo/util/concurrency/mutex.h" #include "mongo/util/debug_util.h" #include "mongo/util/log.h" +#include "mongo/util/net/asio_message_port.h" +#include "mongo/util/net/message_port.h" +#include "mongo/util/net/message_port_startup_param.h" #include "mongo/util/net/socket_exception.h" #include "mongo/util/net/ssl_manager.h" #include "mongo/util/net/ssl_options.h" #include "mongo/util/password_digest.h" +#include "mongo/util/represent_as.h" +#include "mongo/util/time_support.h" namespace mongo { @@ -948,6 +954,10 @@ Status DBClientConnection::connect(const HostAndPort& serverAddress) { return Status::OK(); } +namespace { +const auto kMaxMillisCount = Milliseconds::max().count(); +} // namespace + Status DBClientConnection::connectSocketOnly(const HostAndPort& serverAddress) { _serverAddress = serverAddress; _failed = true; @@ -961,7 +971,14 @@ Status DBClientConnection::connectSocketOnly(const HostAndPort& serverAddress) { << serverAddress.host() << ", address is invalid"); } - _port.reset(new MessagingPort(_so_timeout, _logLevel)); + if (isMessagePortImplASIO()) { + // `_so_timeout` is in seconds. + auto ms = representAs(std::floor(_so_timeout * 1000)).value_or(kMaxMillisCount); + _port.reset(new ASIOMessagingPort( + ms > kMaxMillisCount ? Milliseconds::max() : Milliseconds(ms), _logLevel)); + } else { + _port.reset(new MessagingPort(_so_timeout, _logLevel)); + } if (serverAddress.host().empty()) { return Status(ErrorCodes::InvalidOptions, @@ -1058,7 +1075,9 @@ void DBClientConnection::_checkConnection() { void DBClientConnection::setSoTimeout(double timeout) { _so_timeout = timeout; if (_port) { - _port->setSocketTimeout(timeout); + // `timeout` is in seconds. + auto ms = representAs(std::floor(timeout * 1000)).value_or(kMaxMillisCount); + _port->setTimeout(ms > kMaxMillisCount ? Milliseconds::max() : Milliseconds(ms)); } } diff --git a/src/mongo/client/dbclientinterface.h b/src/mongo/client/dbclientinterface.h index 942e5caf5fb..dc6624aef5d 100644 --- a/src/mongo/client/dbclientinterface.h +++ b/src/mongo/client/dbclientinterface.h @@ -41,8 +41,8 @@ #include "mongo/rpc/unique_message.h" #include "mongo/stdx/functional.h" #include "mongo/util/mongoutils/str.h" +#include "mongo/util/net/abstract_message_port.h" #include "mongo/util/net/message.h" -#include "mongo/util/net/message_port.h" namespace mongo { @@ -1187,7 +1187,7 @@ public: return _maxWireVersion; } - MessagingPort& port() { + AbstractMessagingPort& port() { verify(_port); return *_port; } @@ -1246,7 +1246,7 @@ protected: virtual void _auth(const BSONObj& params); - std::unique_ptr _port; + std::unique_ptr _port; bool _failed; const bool autoReconnect; diff --git a/src/mongo/client/scoped_db_conn_test.cpp b/src/mongo/client/scoped_db_conn_test.cpp index 6862597896b..7057b2ed024 100644 --- a/src/mongo/client/scoped_db_conn_test.cpp +++ b/src/mongo/client/scoped_db_conn_test.cpp @@ -44,7 +44,6 @@ #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" #include "mongo/util/net/listen.h" -#include "mongo/util/net/message_port.h" #include "mongo/util/net/message_server.h" #include "mongo/util/net/socket_exception.h" #include "mongo/util/quick_exit.h" diff --git a/src/mongo/db/client_basic.h b/src/mongo/db/client_basic.h index 6587227245b..88d7ef4afe8 100644 --- a/src/mongo/db/client_basic.h +++ b/src/mongo/db/client_basic.h @@ -32,8 +32,8 @@ #include "mongo/base/disallow_copying.h" #include "mongo/util/decorable.h" +#include "mongo/util/net/abstract_message_port.h" #include "mongo/util/net/hostandport.h" -#include "mongo/util/net/message_port.h" namespace mongo { diff --git a/src/mongo/db/curop_test.cpp b/src/mongo/db/curop_test.cpp index e650ff776d2..84c85e3cef9 100644 --- a/src/mongo/db/curop_test.cpp +++ b/src/mongo/db/curop_test.cpp @@ -49,7 +49,7 @@ const long long intervalShort = 10 * 1000; // 10ms in micros class TestListener : public Listener { public: TestListener() : Listener("test", "", 0) {} // port 0 => any available high port - virtual void acceptedMP(MessagingPort* mp) {} + void accepted(AbstractMessagingPort* mp) override {} }; AtomicUInt32 threadInitialized(0); diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index 5eca78ef8f4..eef81d4b21a 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -120,6 +120,7 @@ #include "mongo/util/fast_clock_source_factory.h" #include "mongo/util/log.h" #include "mongo/util/net/hostname_canonicalization_worker.h" +#include "mongo/util/net/listen.h" #include "mongo/util/net/message_server.h" #include "mongo/util/net/ssl_manager.h" #include "mongo/util/ntservice.h" diff --git a/src/mongo/db/dbmessage.h b/src/mongo/db/dbmessage.h index 29629e552fd..7f1a3a2d0f2 100644 --- a/src/mongo/db/dbmessage.h +++ b/src/mongo/db/dbmessage.h @@ -34,8 +34,8 @@ #include "mongo/client/constants.h" #include "mongo/db/jsobj.h" #include "mongo/db/server_options.h" +#include "mongo/util/net/abstract_message_port.h" #include "mongo/util/net/message.h" -#include "mongo/util/net/message_port.h" namespace mongo { diff --git a/src/mongo/db/repl/oplogreader.cpp b/src/mongo/db/repl/oplogreader.cpp index e9e7b79c261..749c6112db9 100644 --- a/src/mongo/db/repl/oplogreader.cpp +++ b/src/mongo/db/repl/oplogreader.cpp @@ -98,7 +98,8 @@ bool OplogReader::connect(const HostAndPort& host) { error() << errmsg << endl; return false; } - _conn->port().tag |= executor::NetworkInterface::kMessagingPortKeepOpen; + _conn->port().setTag(_conn->port().getTag() | + executor::NetworkInterface::kMessagingPortKeepOpen); _host = host; } return true; diff --git a/src/mongo/db/repl/repl_set_request_votes.cpp b/src/mongo/db/repl/repl_set_request_votes.cpp index 0c0f209d1b6..4b34c351769 100644 --- a/src/mongo/db/repl/repl_set_request_votes.cpp +++ b/src/mongo/db/repl/repl_set_request_votes.cpp @@ -67,13 +67,13 @@ private: unsigned originalTag = 0; AbstractMessagingPort* mp = txn->getClient()->port(); if (mp) { - originalTag = mp->tag; - mp->tag |= executor::NetworkInterface::kMessagingPortKeepOpen; + originalTag = mp->getTag(); + mp->setTag(originalTag | executor::NetworkInterface::kMessagingPortKeepOpen); } // Untag the connection on exit. ON_BLOCK_EXIT([mp, originalTag]() { if (mp) { - mp->tag = originalTag; + mp->setTag(originalTag); } }); diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 3d8cc0ffb94..152016b2ddc 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -72,7 +72,7 @@ #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" #include "mongo/util/net/hostandport.h" -#include "mongo/util/net/message_port.h" +#include "mongo/util/net/listen.h" namespace mongo { namespace repl { @@ -387,7 +387,7 @@ HostAndPort ReplicationCoordinatorExternalStateImpl::getClientHostAndPort( } void ReplicationCoordinatorExternalStateImpl::closeConnections() { - MessagingPort::closeAllSockets(executor::NetworkInterface::kMessagingPortKeepOpen); + Listener::closeMessagingPorts(executor::NetworkInterface::kMessagingPortKeepOpen); } void ReplicationCoordinatorExternalStateImpl::killAllUserOperations(OperationContext* txn) { diff --git a/src/mongo/db/repl/replset_commands.cpp b/src/mongo/db/repl/replset_commands.cpp index 18fd6c3208a..e12f0405751 100644 --- a/src/mongo/db/repl/replset_commands.cpp +++ b/src/mongo/db/repl/replset_commands.cpp @@ -773,14 +773,14 @@ public: AbstractMessagingPort* mp = txn->getClient()->port(); unsigned originalTag = 0; if (mp) { - originalTag = mp->tag; - mp->tag |= executor::NetworkInterface::kMessagingPortKeepOpen; + originalTag = mp->getTag(); + mp->setTag(originalTag | executor::NetworkInterface::kMessagingPortKeepOpen); } // Unset the tag on block exit ON_BLOCK_EXIT([mp, originalTag]() { if (mp) { - mp->tag = originalTag; + mp->setTag(originalTag); } }); diff --git a/src/mongo/dbtests/mock_dbclient_conn_test.cpp b/src/mongo/dbtests/mock_dbclient_conn_test.cpp index 60a5dba7220..7febff697a3 100644 --- a/src/mongo/dbtests/mock_dbclient_conn_test.cpp +++ b/src/mongo/dbtests/mock_dbclient_conn_test.cpp @@ -35,6 +35,7 @@ #include "mongo/db/jsobj.h" #include "mongo/dbtests/mock/mock_dbclient_connection.h" #include "mongo/unittest/unittest.h" +#include "mongo/util/net/sock.h" #include "mongo/util/timer.h" #include "mongo/util/net/socket_exception.h" diff --git a/src/mongo/s/server.h b/src/mongo/s/server.h index 1b76ac71a15..6466e001d13 100644 --- a/src/mongo/s/server.h +++ b/src/mongo/s/server.h @@ -31,12 +31,14 @@ #include #include "mongo/db/jsobj.h" -#include "mongo/util/net/message.h" namespace mongo { +class AbstractMessagingPort; +class Message; + extern OID serverID; // from request.cpp -void processRequest(Message& m, MessagingPort& p); +void processRequest(Message& m, AbstractMessagingPort& p); } diff --git a/src/mongo/tools/bridge.cpp b/src/mongo/tools/bridge.cpp index 64ff17387e0..e61fd05d9f9 100644 --- a/src/mongo/tools/bridge.cpp +++ b/src/mongo/tools/bridge.cpp @@ -53,6 +53,7 @@ #include "mongo/util/assert_util.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" +#include "mongo/util/net/abstract_message_port.h" #include "mongo/util/net/listen.h" #include "mongo/util/net/message.h" #include "mongo/util/exit.h" @@ -86,7 +87,7 @@ boost::optional extractHostInfo(const rpc::RequestInterface& reques class Forwarder { public: - Forwarder(MessagingPort* mp, + Forwarder(AbstractMessagingPort* mp, stdx::mutex* settingsMutex, HostSettingsMap* settings, int64_t seed) @@ -114,7 +115,7 @@ public: warning() << "Unable to establish connection to " << mongoBridgeGlobalParams.destUri << " after " << elapsed << " seconds: " << status; - log() << "end connection " << _mp->psock->remoteString(); + log() << "end connection " << _mp->remote().toString(); _mp->shutdown(); return; } @@ -132,7 +133,7 @@ public: try { request.reset(); if (!_mp->recv(request)) { - log() << "end connection " << _mp->psock->remoteString(); + log() << "end connection " << _mp->remote().toString(); _mp->shutdown(); break; } @@ -183,7 +184,7 @@ public: // Close the connection to 'dest'. case HostSettings::State::kHangUp: log() << "Rejecting connection from " << host->toString() - << ", end connection " << _mp->psock->remoteString(); + << ", end connection " << _mp->remote().toString(); _mp->shutdown(); return; // Forward the message to 'dest' with probability '1 - hostSettings.loss'. @@ -214,7 +215,7 @@ public: // If there's nothing to respond back to '_mp' with, then close the connection. if (response.empty()) { log() << "Received an empty response, end connection " - << _mp->psock->remoteString(); + << _mp->remote().toString(); _mp->shutdown(); break; } @@ -228,7 +229,7 @@ public: // connections from 'host', then do so now. if (hostSettings.state == HostSettings::State::kHangUp) { log() << "Closing connection from " << host->toString() - << ", end connection " << _mp->psock->remoteString(); + << ", end connection " << _mp->remote().toString(); _mp->shutdown(); break; } @@ -259,7 +260,7 @@ public: } } catch (const DBException& ex) { error() << "Caught DBException in Forwarder: " << ex << ", end connection " - << _mp->psock->remoteString(); + << _mp->remote().toString(); _mp->shutdown(); break; } catch (...) { @@ -303,7 +304,7 @@ private: return {}; } - MessagingPort* _mp; + AbstractMessagingPort* _mp; stdx::mutex* _settingsMutex; HostSettingsMap* _settings; @@ -319,7 +320,7 @@ public: log() << "Setting random seed: " << mongoBridgeGlobalParams.seed; } - void acceptedMP(MessagingPort* mp) final { + void accepted(AbstractMessagingPort* mp) override final { { stdx::lock_guard lk(_portsMutex); if (_inShutdown.load()) { @@ -343,7 +344,7 @@ public: private: stdx::mutex _portsMutex; - std::set _ports; + std::set _ports; AtomicWord _inShutdown{false}; stdx::mutex _settingsMutex; diff --git a/src/mongo/util/net/SConscript b/src/mongo/util/net/SConscript index 94ff35e50e8..99bd2581fa6 100644 --- a/src/mongo/util/net/SConscript +++ b/src/mongo/util/net/SConscript @@ -22,14 +22,23 @@ env.CppUnitTest( ], ) -env.Library( +networkEnv = env.Clone(); + +networkEnv.InjectThirdPartyIncludePaths(libraries=[ + 'asio', +]) + +networkEnv.Library( target='network', source=[ + "asio_message_port.cpp", + "asio_ssl_context.cpp", "hostname_canonicalization.cpp", "httpclient.cpp", "listen.cpp", "message.cpp", "message_port.cpp", + "message_port_startup_param.cpp", "sock.cpp", "sockaddr.cpp", 'socket_exception.cpp', @@ -40,26 +49,18 @@ env.Library( ], LIBDEPS=[ '$BUILD_DIR/mongo/db/server_options_core', + '$BUILD_DIR/mongo/db/server_parameters', '$BUILD_DIR/mongo/util/background_job', '$BUILD_DIR/mongo/util/concurrency/ticketholder', + '$BUILD_DIR/mongo/util/decorable', '$BUILD_DIR/mongo/util/fail_point', '$BUILD_DIR/mongo/util/foundation', '$BUILD_DIR/mongo/util/options_parser/options_parser', + '$BUILD_DIR/third_party/shim_asio', 'hostandport', ], ) -env.CppUnitTest( - target='listen_test', - source=[ - 'listen_test.cpp', - ], - LIBDEPS=[ - 'network', - ], - NO_CRUTCH = True, -) - env.Library( target='message_port_mock', source=[ diff --git a/src/mongo/util/net/abstract_message_port.h b/src/mongo/util/net/abstract_message_port.h index 372d3fc8485..c4ff5503f41 100644 --- a/src/mongo/util/net/abstract_message_port.h +++ b/src/mongo/util/net/abstract_message_port.h @@ -31,48 +31,161 @@ #include #include "mongo/config.h" +#include "mongo/logger/log_severity.h" #include "mongo/util/net/message.h" #include "mongo/util/net/sockaddr.h" +#include "mongo/util/time_support.h" namespace mongo { +class SSLManagerInterface; + class AbstractMessagingPort { MONGO_DISALLOW_COPYING(AbstractMessagingPort); +protected: + AbstractMessagingPort() = default; + public: - AbstractMessagingPort() : tag(0), _connectionId(0) {} - virtual ~AbstractMessagingPort() {} - // like the reply below, but doesn't rely on received.data still being available + virtual ~AbstractMessagingPort() = default; + + using Tag = uint32_t; + + /** + * Used when closing sockets. This value will not close any tagged sockets. + */ + static const Tag kSkipAllMask = 0xffffffff; + + /** + * Sets the maximum amount of time (in ms) to wait for io operations to run. + */ + virtual void setTimeout(Milliseconds millis) = 0; + + /** + * Closes the underlying socket. + */ + virtual void shutdown() = 0; + + /** + * Sends a message and waits for a response. This is equivalent to calling `say` then `recv`. + */ + virtual bool call(Message& toSend, Message& response) = 0; + + /** + * Reads the next message from the socket. + */ + virtual bool recv(Message& m) = 0; + + /** + * Sends a message as a reply to a received message. + */ virtual void reply(Message& received, Message& response, int32_t responseToMsgId) = 0; virtual void reply(Message& received, Message& response) = 0; + /** + * Sends the message. + */ + virtual void say(Message& toSend, int responseTo = 0) = 0; + + /** + * Sends the data over the socket. + */ + virtual void send(const char* data, int len, const char* context) = 0; + virtual void send(const std::vector>& data, const char* context) = 0; + + /** + * Connect to the remote endpoint. + */ + virtual bool connect(SockAddr& farEnd) = 0; + + /** + * The remote endpoint. + */ virtual HostAndPort remote() const = 0; + + /** + * The port of the remote endpoint. + */ virtual unsigned remotePort() const = 0; + + /** + * The remote endpoint. + */ virtual SockAddr remoteAddr() const = 0; + + /** + * The local endpoint. + */ virtual SockAddr localAddr() const = 0; - void setX509SubjectName(const std::string& x509SubjectName) { - _x509SubjectName = x509SubjectName; - } + /** + * Whether or not this is still connected. + */ + virtual bool isStillConnected() const = 0; - std::string getX509SubjectName() { - return _x509SubjectName; - } + /** + * Point in time (in micro seconds) when this was created. + */ + virtual uint64_t getSockCreationMicroSec() const = 0; - long long connectionId() const { - return _connectionId; - } - void setConnectionId(long long connectionId); + /** + * Sets the severity level for all logging. + */ + virtual void setLogLevel(logger::LogSeverity logLevel) = 0; -public: - // TODO make this private with some helpers + /** + * Clear the io counters. + */ + virtual void clearCounters() = 0; + + /** + * The total number of bytes read (since initialization or clearing the counters). + */ + virtual long long getBytesIn() const = 0; + + /** + * The total number of bytes written (since initialization or clearing the counters). + */ + virtual long long getBytesOut() const = 0; + + /** + * Set the x509 subject name (used for SSL). + */ + virtual void setX509SubjectName(const std::string& x509SubjectName) = 0; + + /** + * Get the current x509 subject name (used for SSL). + */ + virtual std::string getX509SubjectName() const = 0; + + /** + * Set the connection ID. + */ + virtual void setConnectionId(const long long connectionId) = 0; + + /** + * Get the connection ID. + */ + virtual long long connectionId() const = 0; + + /** + * Set the tag for this messaging port, used when closing with a mask. + * @see Listener::closeMessagingPorts() + */ + virtual void setTag(const Tag tag) = 0; - /* ports can be tagged with various classes. see closeAllSockets(tag). defaults to 0. */ - unsigned tag; + /** + * Get the tag for this messaging port. + */ + virtual Tag getTag() const = 0; -private: - long long _connectionId; - std::string _x509SubjectName; + /** + * Initiates the TLS/SSL handshake on this AbstractMessagingPort. When this function returns, + * further communication on this AbstractMessagingPort will be encrypted. + * ssl - Pointer to the global SSLManager. + * remoteHost - The hostname of the remote server. + */ + virtual bool secure(SSLManagerInterface* ssl, const std::string& remoteHost) = 0; }; } // namespace mongo diff --git a/src/mongo/util/net/asio_message_port.cpp b/src/mongo/util/net/asio_message_port.cpp new file mode 100644 index 00000000000..1d685300039 --- /dev/null +++ b/src/mongo/util/net/asio_message_port.cpp @@ -0,0 +1,619 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork + +#include "mongo/platform/basic.h" + +#include "mongo/util/net/asio_message_port.h" + +#include + +#include "mongo/base/init.h" +#include "mongo/config.h" +#include "mongo/stdx/memory.h" +#include "mongo/util/fail_point_service.h" +#include "mongo/util/invariant.h" +#include "mongo/util/log.h" +#include "mongo/util/net/asio_ssl_context.h" +#include "mongo/util/net/message.h" +#include "mongo/util/net/socket_exception.h" +#include "mongo/util/net/ssl_manager.h" +#include "mongo/util/net/ssl_options.h" +#include "mongo/util/scopeguard.h" + +namespace mongo { + +namespace { +const char kGET[] = "GET"; +const int kHeaderLen = sizeof(MSGHEADER::Value); +const int kInitialMessageSize = 1024; + +class ASIOPorts { +public: + void closeAll(AbstractMessagingPort::Tag skipMask) { + stdx::lock_guard lock_guard(mutex); + for (auto&& port : _portSet) { + if (port->getTag() & skipMask) { + LOG(3) << "Skip closing connection # " << port->connectionId(); + continue; + } + LOG(3) << "Closing connection # " << port->connectionId(); + port->shutdown(); + } + } + + void insert(ASIOMessagingPort* p) { + stdx::lock_guard lock_guard(mutex); + _portSet.insert(p); + } + + void erase(ASIOMessagingPort* p) { + stdx::lock_guard lock_guard(mutex); + _portSet.erase(p); + } + +private: + stdx::mutex mutex; + std::unordered_set _portSet; +}; + +// We "new" this so it will still be around when other automatic global vars are being destructed +// during termination. TODO: This is an artifact from MessagingPort and should be removed when we +// decide where to put networking global state. +ASIOPorts& asioPorts = *(new ASIOPorts()); + +#ifdef MONGO_CONFIG_SSL +struct ASIOSSLContextPair { + ASIOSSLContext server; + ASIOSSLContext client; +}; + +const auto sslDecoration = SSLManagerInterface::declareDecoration(); + +MONGO_INITIALIZER_WITH_PREREQUISITES(ASIOSSLContextSetup, ("SSLManager"))(InitializerContext*) { + if (getSSLManager()) { + sslDecoration(getSSLManager()) + .server.init(SSLManagerInterface::ConnectionDirection::kIncoming); + sslDecoration(getSSLManager()) + .client.init(SSLManagerInterface::ConnectionDirection::kOutgoing); + } + return Status::OK(); +} +#endif + +} // namespace + +void ASIOMessagingPort::closeSockets(AbstractMessagingPort::Tag skipMask) { + asioPorts.closeAll(skipMask); +} + +ASIOMessagingPort::ASIOMessagingPort(int fd, SockAddr farEnd) + : _service(), + _inShutdown(0), + _timer(_service), + _creationTime(curTimeMicros64()), + _timeout(), + _remote(), + _isEncrypted(false), + _awaitingHandshake(true), + _x509SubjectName(), + _bytesIn(0), + _bytesOut(0), + _logLevel(logger::LogSeverity::Log()), + _connectionId(), + _tag(), +#ifdef MONGO_CONFIG_SSL + _context(ASIOSSLContext()), + _sslSock(_service, + getSSLManager() ? sslDecoration(getSSLManager()).server.getContext() + : _context->getContext()) { + if (getSSLManager()) { + _context = boost::none; + } + _sslSock.lowest_layer().assign( + asio::generic::stream_protocol(farEnd.getType(), + farEnd.getType() == AF_UNIX ? 0 : IPPROTO_TCP), + fd); +#else + _sock(_service, + asio::generic::stream_protocol(farEnd.getType(), + farEnd.getType() == AF_UNIX ? 0 : IPPROTO_TCP), + fd) { +#endif // MONGO_CONFIG_SSL + asioPorts.insert(this); + _remote = HostAndPort(farEnd.getAddr(), farEnd.getPort()); + _timer.expires_at(decltype(_timer)::time_point::max()); + _setTimerCallback(); +} + +ASIOMessagingPort::ASIOMessagingPort(Milliseconds timeout, logger::LogSeverity logLevel) + : _service(), + _inShutdown(0), + _timer(_service), + _creationTime(curTimeMicros64()), + _timeout(timeout), + _remote(), + _isEncrypted(false), + _awaitingHandshake(true), + _x509SubjectName(), + _bytesIn(0), + _bytesOut(0), + _logLevel(logLevel), + _connectionId(), + _tag(), +#ifdef MONGO_CONFIG_SSL + _context(ASIOSSLContext()), + _sslSock(_service, + getSSLManager() ? sslDecoration(getSSLManager()).client.getContext() + : _context->getContext()) { + if (getSSLManager()) { + _context = boost::none; + } +#else + _sock(_service) { +#endif // MONGO_CONFIG_SSL + asioPorts.insert(this); + if (*_timeout == Milliseconds(0)) { + _timeout = boost::none; + } + _timer.expires_at(decltype(_timer)::time_point::max()); + _setTimerCallback(); +} + +ASIOMessagingPort::~ASIOMessagingPort() { + shutdown(); + asioPorts.erase(this); +} + +void ASIOMessagingPort::setTimeout(Milliseconds millis) { + if (millis == Milliseconds(0)) { + _timeout = boost::none; + _timer.expires_at(decltype(_timer)::time_point::max()); + } else { + _timeout = millis; + } +} + +void ASIOMessagingPort::shutdown() { + if (!_inShutdown.swap(true)) { + if (_getSocket().native_handle() >= 0) { + _getSocket().close(); + _awaitingHandshake = true; + _isEncrypted = false; + } + } +} + +asio::error_code ASIOMessagingPort::_read(char* buf, std::size_t size) { + invariant(buf); + if (_timeout) { + _timer.expires_from_now(decltype(_timer)::duration( + durationCount>(*_timeout))); + } + asio::error_code ec = asio::error::would_block; + if (!_isEncrypted) { + asio::async_read(_getSocket(), + asio::buffer(buf, size), + [&ec, size](const asio::error_code& err, std::size_t size_read) { + invariant(err || size == size_read); + ec = err; + }); + } +#ifdef MONGO_CONFIG_SSL + else { + asio::async_read(_sslSock, + asio::buffer(buf, size), + [&ec, size](const asio::error_code& err, std::size_t size_read) { + invariant(err || size == size_read); + ec = err; + }); + } +#endif // MONGO_CONFIG_SSL + do { + _service.run_one(); + } while (ec == asio::error::would_block); + if (!ec) { + _bytesIn += size; + } + return ec; +} + +asio::error_code ASIOMessagingPort::_write(const char* buf, std::size_t size) { + if (_timeout) { + _timer.expires_from_now(decltype(_timer)::duration( + durationCount>(*_timeout))); + } + asio::error_code ec = asio::error::would_block; + if (!_isEncrypted) { + asio::async_write(_getSocket(), + asio::buffer(buf, size), + [&ec, &size](const asio::error_code& err, std::size_t size_written) { + invariant(err || size == size_written); + ec = err; + }); + } +#ifdef MONGO_CONFIG_SSL + else { + asio::async_write(_sslSock, + asio::buffer(buf, size), + [&ec, &size](const asio::error_code& err, std::size_t size_written) { + invariant(err || size == size_written); + ec = err; + }); + } +#endif // MONGO_CONFIG_SSL + do { + _service.run_one(); + } while (ec == asio::error::would_block); + if (!ec) { + _bytesOut += size; + } + return ec; +} + +const asio::generic::stream_protocol::socket& ASIOMessagingPort::_getSocket() const { +#ifdef MONGO_CONFIG_SSL + return _sslSock.next_layer(); +#else + return _sock; +#endif +} + +asio::generic::stream_protocol::socket& ASIOMessagingPort::_getSocket() { + return const_cast( + const_cast(this)->_getSocket()); +} + +bool ASIOMessagingPort::recv(Message& m) { + try { + if (getGlobalFailPointRegistry()->getFailPoint("throwSockExcep")->shouldFail()) { + throw SocketException(SocketException::RECV_ERROR, "fail point set"); + } + MsgData::View md = reinterpret_cast(mongoMalloc(kInitialMessageSize)); + ScopeGuard guard = MakeGuard([&md]() { free(md.view2ptr()); }); + + asio::error_code ec = _read(md.view2ptr(), kHeaderLen); + if (ec) { + if (ec == asio::error::misc_errors::eof) { + // When the socket is closed, no need to log an error. + return false; + } + throw asio::system_error(ec); + } + + if (_awaitingHandshake) { + static_assert(strlen(kGET) <= kHeaderLen, + "HTTP GET string must be smaller than the message header."); + if (memcmp(md.view2ptr(), kGET, strlen(kGET)) == 0) { + std::string httpMsg = + "It looks like you are trying to access MongoDB over HTTP on the native driver " + "port.\n"; + LOG(_logLevel) << httpMsg; + std::stringstream ss; + ss << "HTTP/1.0 200 OK\r\nConnection: close\r\nContent-Type: " + "text/plain\r\nContent-Length: " << httpMsg.size() << "\r\n\r\n" << httpMsg; + auto s = ss.str(); + send(s.c_str(), s.size(), nullptr); + return false; + } +#ifndef MONGO_CONFIG_SSL + // If responseTo is not 0 or -1 for first packet assume SSL + if (md.getResponseToMsgId() != 0 && md.getResponseToMsgId() != -1) { + uasserted(40130, + "SSL handshake requested, SSL feature not available in this build"); + } +#else + if (md.getResponseToMsgId() != 0 && md.getResponseToMsgId() != -1) { + uassert(40131, + "SSL handshake received but server is started without SSL support", + sslGlobalParams.sslMode.load() != SSLParams::SSLMode_disabled); + // `_sslSock.handshake()` throws on failure. + _sslSock.handshake(decltype(_sslSock)::server, + asio::buffer(md.view2ptr(), kHeaderLen)); + + auto swPeerSubjectName = + getSSLManager()->parseAndValidatePeerCertificate(_sslSock.native_handle(), ""); + if (!swPeerSubjectName.isOK()) { + throw SocketException(SocketException::CONNECT_ERROR, + swPeerSubjectName.getStatus().reason()); + } + setX509SubjectName(swPeerSubjectName.getValue().get_value_or("")); + + _isEncrypted = true; + _awaitingHandshake = false; + return recv(m); + } + uassert(40132, + "The server is configured to only allow SSL connections", + sslGlobalParams.sslMode.load() != SSLParams::SSLMode_requireSSL); + +#endif // MONGO_CONFIG_SSL + _awaitingHandshake = false; + } + + int msgLen = md.getLen(); + + if (static_cast(msgLen) < sizeof(MSGHEADER::Value) || + static_cast(msgLen) > MaxMessageSizeBytes) { + LOG(_logLevel) << "recv(): message len " << msgLen << " is invalid. " + << "Min: " << sizeof(MSGHEADER::Value) + << ", Max: " << MaxMessageSizeBytes; + return false; + } + + if (msgLen > kInitialMessageSize) { + md = reinterpret_cast(mongoRealloc(md.view2ptr(), msgLen)); + } + + ec = _read(md.data(), msgLen - kHeaderLen); + if (ec) { + throw asio::system_error(ec); + } + + guard.Dismiss(); + m.setData(md.view2ptr(), true); + return true; + + } catch (const asio::system_error& e) { + LOG(_logLevel) << "SocketException: remote: " << remote() << " error: " << e.what(); + m.reset(); + return false; + } +} + +void ASIOMessagingPort::reply(Message& received, Message& response) { + say(response, received.header().getId()); +} + +void ASIOMessagingPort::reply(Message& received, Message& response, int32_t responseToMsgId) { + say(response, responseToMsgId); +} + +bool ASIOMessagingPort::call(Message& toSend, Message& response) { + try { + say(toSend); + } catch (const asio::system_error&) { + return false; + } + bool success = recv(response); + if (success) { + if (response.header().getResponseToMsgId() != toSend.header().getId()) { + response.reset(); + uasserted(40133, "Response ID did not match the sent message ID."); + } + } + return success; +} + +void ASIOMessagingPort::say(Message& toSend, int responseTo) { + invariant(!toSend.empty()); + toSend.header().setId(nextMessageId()); + toSend.header().setResponseToMsgId(responseTo); + auto buf = toSend.buf(); + if (buf) { + send(buf, MsgData::ConstView(buf).getLen(), nullptr); + } else { + send(toSend.dataBuffers(), nullptr); + } +} + +unsigned ASIOMessagingPort::remotePort() const { + return _remote.port(); +} + + +HostAndPort ASIOMessagingPort::remote() const { + return _remote; +} + +SockAddr ASIOMessagingPort::remoteAddr() const { + return SockAddr(_remote.host(), _remote.port()); +} + +SockAddr ASIOMessagingPort::localAddr() const { + auto ep = _getSocket().local_endpoint(); + switch (ep.protocol().family()) { + case AF_INET: + case AF_INET6: { + asio::ip::tcp::endpoint tcpEP; + tcpEP.resize(ep.size()); + memcpy(tcpEP.data(), ep.data(), ep.size()); + return SockAddr(tcpEP.address().to_string(), tcpEP.port()); + } +#ifndef _WIN32 + case AF_UNIX: { + asio::local::stream_protocol::endpoint localEP; + localEP.resize(ep.size()); + memcpy(localEP.data(), ep.data(), ep.size()); + return SockAddr(localEP.path(), 0); + } +#endif // _WIN32 + default: { MONGO_UNREACHABLE; } + } +} + +void ASIOMessagingPort::send(const char* data, int len, const char*) { + if (getGlobalFailPointRegistry()->getFailPoint("throwSockExcep")->shouldFail()) { + throw SocketException(SocketException::SEND_ERROR, "fail point set"); + } + asio::error_code ec = _write(data, len); + if (ec) { + throw SocketException(SocketException::SEND_ERROR, asio::system_error(ec).what()); + } +} + +void ASIOMessagingPort::send(const std::vector>& data, const char*) { + for (auto&& pair : data) { + send(pair.first, pair.second, nullptr); + } +} + +bool ASIOMessagingPort::connect(SockAddr& farEnd) { + if (_timeout) { + _timer.expires_from_now(decltype(_timer)::duration( + durationCount>(*_timeout))); + } + _remote = HostAndPort(farEnd.getAddr(), farEnd.getPort()); + + asio::ip::tcp::resolver resolver(_service); + asio::error_code ec = asio::error::would_block; + + if (farEnd.getType() == AF_UNIX) { +#ifndef _WIN32 + asio::local::stream_protocol::endpoint endPoint(farEnd.getAddr()); + _getSocket().async_connect(endPoint, [&ec](const asio::error_code& err) { ec = err; }); +#else + uasserted(40135, "Connect called on a Unix socket under windows build."); +#endif // _WIN32 + } else { + asio::ip::tcp::resolver::query query(_remote.host(), std::to_string(_remote.port())); + + resolver.async_resolve( + query, + [&ec, this](const asio::error_code& resolveErr, asio::ip::tcp::resolver::iterator i) { + if (!resolveErr) { + asio::ip::tcp::endpoint tcpEndpoint(*i); + _getSocket().async_connect(tcpEndpoint, + [&ec](const asio::error_code& err) { ec = err; }); + } else if (i == asio::ip::tcp::resolver::iterator()) { + ec = asio::error::host_unreachable; + } + }); + } + + do { + _service.run_one(); + } while (ec == asio::error::would_block); + + if (ec) { + if (ec == asio::error::connection_refused) { + LOG(_logLevel) << "Failed to connect to " << _remote << ", reason: Connection refused"; + } else if (ec == asio::error::connection_aborted && _timeout) { + LOG(_logLevel) << "Failed to connect to " << _remote << " after " << _timeout->count() + << " milliseconds, giving up."; + } else { + LOG(_logLevel) << "Failed to connect to " << _remote + << ", reason: " << asio::system_error(ec).what(); + } + return false; + } + + _creationTime = curTimeMicros64(); + _awaitingHandshake = false; + if (farEnd.getType() != AF_UNIX) { + _getSocket().set_option(asio::ip::tcp::no_delay(true)); + } + + return true; +} + +bool ASIOMessagingPort::secure(SSLManagerInterface* ssl, const std::string& remoteHost) { +#ifdef MONGO_CONFIG_SSL + asio::error_code ec; + _sslSock.handshake(decltype(_sslSock)::client, ec); + if (ec) { + return false; + } + + auto swPeerSubjectName = + getSSLManager()->parseAndValidatePeerCertificate(_sslSock.native_handle(), remoteHost); + if (!swPeerSubjectName.isOK()) { + throw SocketException(SocketException::CONNECT_ERROR, + swPeerSubjectName.getStatus().reason()); + } + setX509SubjectName(swPeerSubjectName.getValue().get_value_or("")); + + _isEncrypted = true; + return true; +#else + return false; +#endif +} + +bool ASIOMessagingPort::isStillConnected() const { + return _getSocket().is_open(); +} + +uint64_t ASIOMessagingPort::getSockCreationMicroSec() const { + return _creationTime; +} + +void ASIOMessagingPort::setLogLevel(logger::LogSeverity logLevel) { + _logLevel = logLevel; +} + +void ASIOMessagingPort::clearCounters() { + _bytesIn = 0; + _bytesOut = 0; +} + +long long ASIOMessagingPort::getBytesIn() const { + return _bytesIn; +} + +long long ASIOMessagingPort::getBytesOut() const { + return _bytesOut; +} + +void ASIOMessagingPort::setX509SubjectName(const std::string& x509SubjectName) { + _x509SubjectName = x509SubjectName; +} + +std::string ASIOMessagingPort::getX509SubjectName() const { + return _x509SubjectName; +} + +void ASIOMessagingPort::setConnectionId(const long long connectionId) { + _connectionId = connectionId; +} + +long long ASIOMessagingPort::connectionId() const { + return _connectionId; +} + +void ASIOMessagingPort::setTag(const AbstractMessagingPort::Tag tag) { + _tag = tag; +} + +AbstractMessagingPort::Tag ASIOMessagingPort::getTag() const { + return _tag; +} + +void ASIOMessagingPort::_setTimerCallback() { + _timer.async_wait([this](const asio::error_code& ec) { + if (ec != asio::error::operation_aborted && + (_timer.expires_at() <= decltype(_timer)::clock_type::now())) { + _getSocket().cancel(); + _timer.expires_at(decltype(_timer)::time_point::max()); + } + _setTimerCallback(); + }); +} + +} // namespace mongo diff --git a/src/mongo/util/net/asio_message_port.h b/src/mongo/util/net/asio_message_port.h new file mode 100644 index 00000000000..42bfb9e78f5 --- /dev/null +++ b/src/mongo/util/net/asio_message_port.h @@ -0,0 +1,159 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include +#include +#include +#include + +#include "mongo/config.h" +#include "mongo/platform/atomic_word.h" +#include "mongo/util/net/abstract_message_port.h" +#include "mongo/util/net/asio_ssl_context.h" +#include "mongo/util/net/message.h" +#include "mongo/util/net/sockaddr.h" +#include "mongo/util/time_support.h" + +#ifdef MONGO_CONFIG_SSL +#include +#endif + +namespace mongo { + +class ASIOMessagingPort final : public AbstractMessagingPort { +public: + /** + * This is the "Ingress Constructor", used by the listener. For this, we already have a file + * descriptor and address. This messaging port is already connected, and connect() should not be + * called. + */ + ASIOMessagingPort(int fd, SockAddr farEnd); + + /** + * This is the "Egress Constructor", used by the dbclient. This messaging port is not connected + * to any remote endpoint. In order to do any communications, the connect() method must be + * called successfully. + */ + ASIOMessagingPort(Milliseconds timeout, logger::LogSeverity logLevel); + + ~ASIOMessagingPort() override; + + void setTimeout(Milliseconds millis) override; + + void shutdown() override; + + bool call(Message& toSend, Message& response) override; + + bool recv(Message& m) override; + + void reply(Message& received, Message& response, int32_t responseToMsgId) override; + void reply(Message& received, Message& response) override; + + void say(Message& toSend, int responseTo = 0) override; + + void send(const char* data, int len, const char*) override; + void send(const std::vector>& data, const char*) override; + + bool connect(SockAddr& farEnd) override; + + HostAndPort remote() const override; + + unsigned remotePort() const override; + + SockAddr remoteAddr() const override; + + SockAddr localAddr() const override; + + bool isStillConnected() const override; + + uint64_t getSockCreationMicroSec() const override; + + void setLogLevel(logger::LogSeverity logLevel) override; + + void clearCounters() override; + + long long getBytesIn() const override; + + long long getBytesOut() const override; + + void setX509SubjectName(const std::string& x509SubjectName) override; + + std::string getX509SubjectName() const override; + + void setConnectionId(const long long connectionId) override; + + long long connectionId() const override; + + void setTag(const AbstractMessagingPort::Tag tag) override; + + AbstractMessagingPort::Tag getTag() const override; + + bool secure(SSLManagerInterface* ssl, const std::string& remoteHost) override; + + static void closeSockets(AbstractMessagingPort::Tag skipMask = kSkipAllMask); + +private: + void _setTimerCallback(); + asio::error_code _read(char* buf, std::size_t size); + asio::error_code _write(const char* buf, std::size_t size); + const asio::generic::stream_protocol::socket& _getSocket() const; + asio::generic::stream_protocol::socket& _getSocket(); + + asio::io_service _service; + + AtomicBool _inShutdown; + + asio::system_timer _timer; + uint64_t _creationTime; + boost::optional _timeout; + + HostAndPort _remote; + + bool _isEncrypted; + bool _awaitingHandshake; + std::string _x509SubjectName; + + long long _bytesIn; + long long _bytesOut; + + logger::LogSeverity _logLevel; + + long long _connectionId; + AbstractMessagingPort::Tag _tag; + +#ifdef MONGO_CONFIG_SSL + boost::optional _context; + asio::ssl::stream _sslSock; +#else + asio::generic::stream_protocol::socket _sock; +#endif +}; + +} // namespace mongo diff --git a/src/mongo/util/net/asio_ssl_context.cpp b/src/mongo/util/net/asio_ssl_context.cpp new file mode 100644 index 00000000000..eb9f4c2fa68 --- /dev/null +++ b/src/mongo/util/net/asio_ssl_context.cpp @@ -0,0 +1,69 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/config.h" +#include "mongo/util/net/asio_ssl_context.h" + +#ifdef MONGO_CONFIG_SSL + +#include "mongo/base/init.h" +#include "mongo/stdx/memory.h" + +#include +#include + +namespace mongo { + +ASIOSSLContext::ASIOSSLContext() + : _context(stdx::make_unique(asio::ssl::context::sslv23)), + _mode(static_cast(getSSLGlobalParams().sslMode.load())) {} + +ASIOSSLContext::ASIOSSLContext(ASIOSSLContext&& other) = default; + +ASIOSSLContext& ASIOSSLContext::operator=(ASIOSSLContext&& other) = default; + +void ASIOSSLContext::init(SSLManagerInterface::ConnectionDirection direction) { + if (_mode != SSLParams::SSLMode_disabled) { + uassertStatusOK(getSSLManager()->initSSLContext( + _context->native_handle(), getSSLGlobalParams(), direction)); + } +} + +asio::ssl::context& ASIOSSLContext::getContext() { + return *_context; +} + +SSLParams::SSLModes ASIOSSLContext::sslMode() const { + return _mode; +} + +} // namespace mongo + +#endif // MONGO_CONFIG_SSL diff --git a/src/mongo/util/net/asio_ssl_context.h b/src/mongo/util/net/asio_ssl_context.h new file mode 100644 index 00000000000..53a68f1e1a3 --- /dev/null +++ b/src/mongo/util/net/asio_ssl_context.h @@ -0,0 +1,85 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/base/disallow_copying.h" +#include "mongo/config.h" +#include "mongo/util/net/ssl_manager.h" +#include "mongo/util/net/ssl_options.h" + +#ifdef MONGO_CONFIG_SSL +namespace asio { +namespace ssl { +class context; +} // namespace ssl +} // namespace asio + +namespace mongo { + +class ASIOSSLContext { +public: + MONGO_DISALLOW_COPYING(ASIOSSLContext); + + /** + * A class to house the ASIO SSL context as well as the mongo SSL mode. This will be decorated + * on to the SSLManager. + */ + ASIOSSLContext(); + + ASIOSSLContext(ASIOSSLContext&& other); + ASIOSSLContext& operator=(ASIOSSLContext&& other); + + /** + * This must be called before calling `getContext()`. This does all of the setup that requires + * the SSLManager (which can't be done in construction due to this class being a decoration). + */ + void init(SSLManagerInterface::ConnectionDirection direction); + + /** + * A copy of the ASIO SSL context generated upon construction from the mongo::SSLParams. + */ + asio::ssl::context& getContext(); + + /** + * The SSL operation mode. See enum SSLModes in ssl_options.h + */ + SSLParams::SSLModes sslMode() const; + +private: + std::unique_ptr _context; + SSLParams::SSLModes _mode; +}; +} // namespace mongo +#else +namespace mongo { + +// This is a dummy class for when we build without SSL. +class ASIOSSLContext {}; +} // namespace mongo +#endif // MONGO_CONFIG_SSL diff --git a/src/mongo/util/net/httpclient.cpp b/src/mongo/util/net/httpclient.cpp index 9262fb41ff2..2cd94c0635f 100644 --- a/src/mongo/util/net/httpclient.cpp +++ b/src/mongo/util/net/httpclient.cpp @@ -35,7 +35,6 @@ #include "mongo/config.h" #include "mongo/util/mongoutils/str.h" #include "mongo/util/net/message.h" -#include "mongo/util/net/message_port.h" #include "mongo/util/net/sock.h" #include "mongo/util/net/socket_exception.h" #include "mongo/util/net/ssl_manager.h" diff --git a/src/mongo/util/net/listen.cpp b/src/mongo/util/net/listen.cpp index 8538242faee..2cbee476522 100644 --- a/src/mongo/util/net/listen.cpp +++ b/src/mongo/util/net/listen.cpp @@ -34,14 +34,18 @@ #include "mongo/util/net/listen.h" - +#include "mongo/base/owned_pointer_vector.h" +#include "mongo/base/status.h" #include "mongo/config.h" #include "mongo/db/server_options.h" -#include "mongo/base/owned_pointer_vector.h" +#include "mongo/stdx/memory.h" #include "mongo/util/exit.h" #include "mongo/util/log.h" +#include "mongo/util/net/asio_message_port.h" #include "mongo/util/net/message_port.h" +#include "mongo/util/net/message_port_startup_param.h" #include "mongo/util/net/ssl_manager.h" +#include "mongo/util/net/ssl_options.h" #include "mongo/util/scopeguard.h" #ifndef _WIN32 @@ -92,7 +96,7 @@ vector ipToAddrs(const char* ips, int port, bool useUnixSockets) { out.push_back(SockAddr("::", port)); // IPv6 all #ifndef _WIN32 if (useUnixSockets) - out.push_back(SockAddr(makeUnixSockPath(port).c_str(), port)); // Unix socket + out.push_back(SockAddr(makeUnixSockPath(port), port)); // Unix socket #endif return out; } @@ -114,7 +118,7 @@ vector ipToAddrs(const char* ips, int port, bool useUnixSockets) { #ifndef _WIN32 if (sa.isValid() && useUnixSockets && (sa.getAddr() == "127.0.0.1" || sa.getAddr() == "0.0.0.0")) // only IPv4 - out.push_back(SockAddr(makeUnixSockPath(port).c_str(), port)); + out.push_back(SockAddr(makeUnixSockPath(port), port)); #endif } return out; @@ -350,7 +354,7 @@ void Listener::initAndListen() { pnewSock->secureAccepted(_ssl); } #endif - accepted(pnewSock, myConnectionNumber); + _accepted(pnewSock, myConnectionNumber); } } } @@ -566,7 +570,7 @@ void Listener::initAndListen() { pnewSock->secureAccepted(_ssl); } #endif - accepted(pnewSock, myConnectionNumber); + _accepted(pnewSock, myConnectionNumber); } } #endif @@ -583,14 +587,15 @@ void Listener::waitUntilListening() const { } } -void Listener::accepted(std::shared_ptr psocket, long long connectionId) { - MessagingPort* port = new MessagingPort(psocket); +void Listener::_accepted(const std::shared_ptr& psocket, long long connectionId) { + std::unique_ptr port; + if (isMessagePortImplASIO()) { + port = stdx::make_unique(psocket->stealSD(), psocket->remoteAddr()); + } else { + port = stdx::make_unique(psocket); + } port->setConnectionId(connectionId); - acceptedMP(port); -} - -void Listener::acceptedMP(MessagingPort* mp) { - verify(!"You must overwrite one of the accepted methods"); + accepted(port.release()); } // ----- ListeningSockets ------- @@ -637,6 +642,10 @@ void Listener::checkTicketNumbers() { globalTicketHolder.resize(want); } +void Listener::closeMessagingPorts(AbstractMessagingPort::Tag skipMask) { + ASIOMessagingPort::closeSockets(skipMask); + MessagingPort::closeSockets(skipMask); +} TicketHolder Listener::globalTicketHolder(DEFAULT_MAX_CONN); AtomicInt64 Listener::globalConnectionNumber; diff --git a/src/mongo/util/net/listen.h b/src/mongo/util/net/listen.h index ec8b2f03ac5..49f35dc1ba1 100644 --- a/src/mongo/util/net/listen.h +++ b/src/mongo/util/net/listen.h @@ -38,14 +38,13 @@ #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/mutex.h" #include "mongo/util/concurrency/ticketholder.h" +#include "mongo/util/net/abstract_message_port.h" #include "mongo/util/net/sock.h" namespace mongo { const int DEFAULT_MAX_CONN = 1000000; -class MessagingPort; - class Listener { MONGO_DISALLOW_COPYING(Listener); @@ -57,8 +56,7 @@ public: void initAndListen(); // never returns unless error (start a thread) /* spawn a thread, etc., then return */ - virtual void accepted(std::shared_ptr psocket, long long connectionId); - virtual void acceptedMP(MessagingPort* mp); + virtual void accepted(AbstractMessagingPort* mp) = 0; const int _port; @@ -122,6 +120,9 @@ private: // Boolean that indicates whether this Listener is ready to accept incoming network requests bool _ready; + + virtual void _accepted(const std::shared_ptr& psocket, long long connectionId); + #ifdef MONGO_CONFIG_SSL SSLManagerInterface* _ssl; #endif @@ -143,6 +144,13 @@ public: /** makes sure user input is sane */ static void checkTicketNumbers(); + + /** + * This will close implementations of AbstractMessagingPort, skipping any that have tags + * matching `skipMask`. + */ + static void closeMessagingPorts( + AbstractMessagingPort::Tag skipMask = AbstractMessagingPort::kSkipAllMask); }; class ListeningSockets { diff --git a/src/mongo/util/net/listen_test.cpp b/src/mongo/util/net/listen_test.cpp deleted file mode 100644 index 4404d0d70b0..00000000000 --- a/src/mongo/util/net/listen_test.cpp +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Copyright (C) 2016 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault - -#include "mongo/platform/atomic_word.h" -#include "mongo/stdx/thread.h" -#include "mongo/unittest/unittest.h" -#include "mongo/util/log.h" -#include "mongo/util/exit.h" -#include "mongo/util/net/listen.h" -#include "mongo/util/scopeguard.h" -#include "mongo/util/time_support.h" - -namespace mongo { - -namespace { - -TEST(Listener, ElapsedTimeCheck) { - const long long kSleepMillis = 5000; - const long long kEpsilon = 4000; - - Listener listener("test_listener", "", 0); // port 0 => any available high port - listener.setupSockets(); - - // Start the thread, and add a guard to ensure that we join it on - // all paths. We call shutdownNoTerminate to set the inShutdown - // flag so that thread can escape from the listener. - stdx::thread t([&listener]() { listener.initAndListen(); }); - const auto joint = MakeGuard([&] { - shutdownNoTerminate(); - t.join(); - }); - - // Wait in this thread until the listener is active, and then let - // a little more time elapse to give the timer subsystem a chance - // to stabilize. - listener.waitUntilListening(); - sleepmillis(1000); - - // Get our start times - long long listenStart = listener.getMyElapsedTimeMillis(); - long long clockStart = curTimeMillis64(); - - // Let some time elapse. - sleepmillis(kSleepMillis); - - // Get our current times. - long long listenDelta = listener.getMyElapsedTimeMillis() - listenStart; - long long clockDelta = curTimeMillis64() - clockStart; - - // Log the times to make it clear in the event of a failure what went wrong. - log() << "Listener elapsed time: " << listenDelta << std::endl; - log() << "Clock elapsed time: " << clockDelta << std::endl; - - // Fail if we weren't within epsilon. - ASSERT_APPROX_EQUAL(listenDelta, clockDelta, kEpsilon); -} - -} // namespace - -} // namespace mongo diff --git a/src/mongo/util/net/message.cpp b/src/mongo/util/net/message.cpp index 515efc9c853..9c91ce0d124 100644 --- a/src/mongo/util/net/message.cpp +++ b/src/mongo/util/net/message.cpp @@ -36,29 +36,12 @@ #include #include "mongo/util/net/listen.h" -#include "mongo/util/net/message_port.h" namespace mongo { -void Message::send(MessagingPort& p, const char* context) { - if (empty()) { - return; - } - if (_buf != 0) { - p.send(_buf, MsgData::ConstView(_buf).getLen(), context); - } else { - p.send(_data, context); - } -} - +namespace { AtomicWord NextMsgId; - -/*struct MsgStart { - MsgStart() { - NextMsgId = (((unsigned) time(0)) << 16) ^ curTimeMillis(); - verify(MsgDataHeaderSize == 16); - } -} msgstart;*/ +} // namespace int32_t nextMessageId() { return NextMsgId.fetchAndAdd(1); diff --git a/src/mongo/util/net/message.h b/src/mongo/util/net/message.h index 3a2cfe95922..2c28aabe5cf 100644 --- a/src/mongo/util/net/message.h +++ b/src/mongo/util/net/message.h @@ -51,7 +51,6 @@ namespace mongo { const size_t MaxMessageSizeBytes = 48 * 1000 * 1000; class Message; -class MessagingPort; enum NetworkOp : int32_t { opInvalid = 0, @@ -403,6 +402,8 @@ class Message { MONGO_DISALLOW_COPYING(Message); public: + using MsgVec = std::vector>; + // we assume here that a vector with initial size 0 does no allocation (0 is the default, but // wanted to make it explicit). Message() = default; @@ -562,7 +563,9 @@ public: return _buf; } - void send(MessagingPort& p, const char* context); + const MsgVec& dataBuffers() const { + return _data; + } std::string toString() const; @@ -575,7 +578,6 @@ private: char* _buf{nullptr}; // byte buffer(s) - the first must contain at least a full MsgData unless using _buf for storage // instead - typedef std::vector> MsgVec; MsgVec _data{}; bool _freeIt{false}; }; diff --git a/src/mongo/util/net/message_port.cpp b/src/mongo/util/net/message_port.cpp index 43ff4074702..225dfd06940 100644 --- a/src/mongo/util/net/message_port.cpp +++ b/src/mongo/util/net/message_port.cpp @@ -61,14 +61,6 @@ namespace mongo { using std::shared_ptr; using std::string; -// if you want trace output: -#define mmm(x) - -void AbstractMessagingPort::setConnectionId(long long connectionId) { - verify(_connectionId == 0); - _connectionId = connectionId; -} - /* messagingport -------------------------------------------------------------- */ class Ports { @@ -77,10 +69,10 @@ class Ports { public: Ports() : ports() {} - void closeAll(unsigned skip_mask) { + void closeAll(AbstractMessagingPort::Tag skip_mask) { stdx::lock_guard bl(m); for (std::set::iterator i = ports.begin(); i != ports.end(); i++) { - if ((*i)->tag & skip_mask) { + if ((*i)->getTag() & skip_mask) { LOG(3) << "Skip closing connection # " << (*i)->connectionId(); continue; } @@ -102,8 +94,8 @@ public: // are being destructed during termination. Ports& ports = *(new Ports()); -void MessagingPort::closeAllSockets(unsigned mask) { - ports.closeAll(mask); +void MessagingPort::closeSockets(AbstractMessagingPort::Tag skipMask) { + ports.closeAll(skipMask); } MessagingPort::MessagingPort(int fd, const SockAddr& remote) @@ -112,18 +104,20 @@ MessagingPort::MessagingPort(int fd, const SockAddr& remote) MessagingPort::MessagingPort(double timeout, logger::LogSeverity ll) : MessagingPort(std::make_shared(timeout, ll)) {} -MessagingPort::MessagingPort(std::shared_ptr sock) : psock(std::move(sock)) { - SockAddr sa = psock->remoteAddr(); +MessagingPort::MessagingPort(std::shared_ptr sock) + : _x509SubjectName(), _connectionId(), _tag(), _psock(std::move(sock)) { + SockAddr sa = _psock->remoteAddr(); _remoteParsed = HostAndPort(sa.getAddr(), sa.getPort()); ports.insert(this); } -void MessagingPort::setSocketTimeout(double timeout) { - psock->setTimeout(timeout); +void MessagingPort::setTimeout(Milliseconds millis) { + double timeout = double(millis.count()) / 1000; + _psock->setTimeout(timeout); } void MessagingPort::shutdown() { - psock->close(); + _psock->close(); } MessagingPort::~MessagingPort() { @@ -136,10 +130,9 @@ bool MessagingPort::recv(Message& m) { #ifdef MONGO_CONFIG_SSL again: #endif - // mmm( log() << "* recv() sock:" << this->sock << endl; ) MSGHEADER::Value header; int headerLen = sizeof(MSGHEADER::Value); - psock->recv((char*)&header, headerLen); + _psock->recv((char*)&header, headerLen); int len = header.constView().getMessageLength(); if (len == 542393671) { @@ -147,7 +140,7 @@ bool MessagingPort::recv(Message& m) { string msg = "It looks like you are trying to access MongoDB over HTTP on the native driver " "port.\n"; - LOG(psock->getLogLevel()) << msg; + LOG(_psock->getLogLevel()) << msg; std::stringstream ss; ss << "HTTP/1.0 200 OK\r\nConnection: close\r\nContent-Type: " "text/plain\r\nContent-Length: " << msg.size() << "\r\n\r\n" << msg; @@ -156,7 +149,7 @@ bool MessagingPort::recv(Message& m) { return false; } // If responseTo is not 0 or -1 for first packet assume SSL - else if (psock->isAwaitingHandshake()) { + else if (_psock->isAwaitingHandshake()) { #ifndef MONGO_CONFIG_SSL if (header.constView().getResponseToMsgId() != 0 && header.constView().getResponseToMsgId() != -1) { @@ -170,8 +163,8 @@ bool MessagingPort::recv(Message& m) { "SSL handshake received but server is started without SSL support", sslGlobalParams.sslMode.load() != SSLParams::SSLMode_disabled); setX509SubjectName( - psock->doSSLHandshake(reinterpret_cast(&header), sizeof(header))); - psock->setHandshakeReceived(); + _psock->doSSLHandshake(reinterpret_cast(&header), sizeof(header))); + _psock->setHandshakeReceived(); goto again; } uassert(17189, @@ -186,7 +179,7 @@ bool MessagingPort::recv(Message& m) { return false; } - psock->setHandshakeReceived(); + _psock->setHandshakeReceived(); int z = (len + 1023) & 0xfffffc00; verify(z >= len); MsgData::View md = reinterpret_cast(mongoMalloc(z)); @@ -196,14 +189,14 @@ bool MessagingPort::recv(Message& m) { memcpy(md.view2ptr(), &header, headerLen); int left = len - headerLen; - psock->recv(md.data(), left); + _psock->recv(md.data(), left); guard.Dismiss(); m.setData(md.view2ptr(), true); return true; } catch (const SocketException& e) { - logger::LogSeverity severity = psock->getLogLevel(); + logger::LogSeverity severity = _psock->getLogLevel(); if (!e.shouldPrint()) severity = severity.lessSevere(); LOG(severity) << "SocketException: remote: " << remote() << " error: " << e; @@ -221,38 +214,27 @@ void MessagingPort::reply(Message& received, Message& response, int32_t response } bool MessagingPort::call(Message& toSend, Message& response) { - mmm(log() << "*call()" << endl;) say(toSend); - return recv(toSend, response); -} - -bool MessagingPort::recv(const Message& toSend, Message& response) { - while (1) { - bool ok = recv(response); - if (!ok) { - mmm(log() << "recv not ok" << endl;) return false; + say(toSend); + bool success = recv(response); + if (success) { + if (response.header().getResponseToMsgId() != toSend.header().getId()) { + response.reset(); + uasserted(40134, "Response ID did not match the sent message ID."); } - // log() << "got response: " << response.data->responseTo << endl; - if (response.header().getResponseToMsgId() == toSend.header().getId()) - break; - error() << "MessagingPort::call() wrong id got:" << std::hex - << response.header().getResponseToMsgId() << " expect:" << toSend.header().getId() - << '\n' << std::dec << " toSend op: " << (unsigned)toSend.operation() << '\n' - << " response msgid:" << (unsigned)response.header().getId() << '\n' - << " response len: " << (unsigned)response.header().getLen() << '\n' - << " response op: " << static_cast(response.operation()) << '\n' - << " remote: " << psock->remoteString(); - verify(false); - response.reset(); } - mmm(log() << "*call() end" << endl;) return true; + return success; } void MessagingPort::say(Message& toSend, int responseTo) { verify(!toSend.empty()); - mmm(log() << "* say() thr:" << GetCurrentThreadId() << endl;) - toSend.header().setId(nextMessageId()); + toSend.header().setId(nextMessageId()); toSend.header().setResponseToMsgId(responseTo); - toSend.send(*this, "say"); + auto buf = toSend.buf(); + if (buf) { + send(buf, MsgData::ConstView(buf).getLen(), "say"); + } else { + send(toSend.dataBuffers(), "say"); + } } HostAndPort MessagingPort::remote() const { @@ -260,11 +242,35 @@ HostAndPort MessagingPort::remote() const { } SockAddr MessagingPort::remoteAddr() const { - return psock->remoteAddr(); + return _psock->remoteAddr(); } SockAddr MessagingPort::localAddr() const { - return psock->localAddr(); + return _psock->localAddr(); +} + +void MessagingPort::setX509SubjectName(const std::string& x509SubjectName) { + _x509SubjectName = x509SubjectName; +} + +std::string MessagingPort::getX509SubjectName() const { + return _x509SubjectName; +} + +void MessagingPort::setConnectionId(const long long connectionId) { + _connectionId = connectionId; +} + +long long MessagingPort::connectionId() const { + return _connectionId; +} + +void MessagingPort::setTag(const AbstractMessagingPort::Tag tag) { + _tag = tag; +} + +AbstractMessagingPort::Tag MessagingPort::getTag() const { + return _tag; } } // namespace mongo diff --git a/src/mongo/util/net/message_port.h b/src/mongo/util/net/message_port.h index c1922b7202a..18931e20750 100644 --- a/src/mongo/util/net/message_port.h +++ b/src/mongo/util/net/message_port.h @@ -40,7 +40,7 @@ namespace mongo { class MessagingPort; -class MessagingPort : public AbstractMessagingPort { +class MessagingPort final : public AbstractMessagingPort { public: MessagingPort(int fd, const SockAddr& remote); @@ -51,52 +51,67 @@ public: MessagingPort(std::shared_ptr socket); - virtual ~MessagingPort(); + ~MessagingPort() override; - void setSocketTimeout(double timeout); + void setTimeout(Milliseconds millis) override; - void shutdown(); + void shutdown() override; /* it's assumed if you reuse a message object, that it doesn't cross MessagingPort's. also, the Message data will go out of scope on the subsequent recv call. */ - bool recv(Message& m); - void reply(Message& received, Message& response, int32_t responseToMsgId); - void reply(Message& received, Message& response); - bool call(Message& toSend, Message& response); + bool recv(Message& m) override; + void reply(Message& received, Message& response, int32_t responseToMsgId) override; + void reply(Message& received, Message& response) override; + bool call(Message& toSend, Message& response) override; - void say(Message& toSend, int responseTo = 0); + void say(Message& toSend, int responseTo = 0) override; - /** - * this is used for doing 'async' queries - * instead of doing call( to , from ) - * you would do - * say( to ) - * recv( from ) - * Note: if you fail to call recv and someone else uses this port, - * horrible things will happen - */ - bool recv(const Message& sent, Message& response); + unsigned remotePort() const override { + return _psock->remotePort(); + } + virtual HostAndPort remote() const override; + virtual SockAddr remoteAddr() const override; + virtual SockAddr localAddr() const override; - unsigned remotePort() const { - return psock->remotePort(); + void send(const char* data, int len, const char* context) override { + _psock->send(data, len, context); + } + void send(const std::vector>& data, const char* context) override { + _psock->send(data, context); + } + bool connect(SockAddr& farEnd) override { + return _psock->connect(farEnd); } - virtual HostAndPort remote() const; - virtual SockAddr remoteAddr() const; - virtual SockAddr localAddr() const; - std::shared_ptr psock; + void setLogLevel(logger::LogSeverity ll) override { + _psock->setLogLevel(ll); + } - void send(const char* data, int len, const char* context) { - psock->send(data, len, context); + void clearCounters() override { + _psock->clearCounters(); } - void send(const std::vector>& data, const char* context) { - psock->send(data, context); + + long long getBytesIn() const override { + return _psock->getBytesIn(); } - bool connect(SockAddr& farEnd) { - return psock->connect(farEnd); + + long long getBytesOut() const override { + return _psock->getBytesOut(); } -#ifdef MONGO_CONFIG_SSL + + void setX509SubjectName(const std::string& x509SubjectName) override; + + std::string getX509SubjectName() const override; + + void setConnectionId(const long long connectionId) override; + + long long connectionId() const override; + + void setTag(const AbstractMessagingPort::Tag tag) override; + + AbstractMessagingPort::Tag getTag() const override; + /** * Initiates the TLS/SSL handshake on this MessagingPort. * When this function returns, further communication on this @@ -104,25 +119,33 @@ public: * ssl - Pointer to the global SSLManager. * remoteHost - The hostname of the remote server. */ - bool secure(SSLManagerInterface* ssl, const std::string& remoteHost) { - return psock->secure(ssl, remoteHost); - } + bool secure(SSLManagerInterface* ssl, const std::string& remoteHost) override { +#ifdef MONGO_CONFIG_SSL + return _psock->secure(ssl, remoteHost); +#else + return false; #endif + } - bool isStillConnected() { - return psock->isStillConnected(); + bool isStillConnected() const override { + return _psock->isStillConnected(); } - uint64_t getSockCreationMicroSec() const { - return psock->getSockCreationMicroSec(); + uint64_t getSockCreationMicroSec() const override { + return _psock->getSockCreationMicroSec(); } private: // this is the parsed version of remote HostAndPort _remoteParsed; + std::string _x509SubjectName; + long long _connectionId; + AbstractMessagingPort::Tag _tag; + std::shared_ptr _psock; + public: - static void closeAllSockets(unsigned tagMask = 0xffffffff); + static void closeSockets(AbstractMessagingPort::Tag skipMask = kSkipAllMask); }; diff --git a/src/mongo/util/net/message_port_mock.cpp b/src/mongo/util/net/message_port_mock.cpp index ae6deb90c8e..6869723129f 100644 --- a/src/mongo/util/net/message_port_mock.cpp +++ b/src/mongo/util/net/message_port_mock.cpp @@ -39,10 +39,29 @@ using std::string; MessagingPortMock::MessagingPortMock() : AbstractMessagingPort() {} MessagingPortMock::~MessagingPortMock() {} +void MessagingPortMock::setTimeout(Milliseconds millis) {} -void MessagingPortMock::reply(Message& received, Message& response) {} +void MessagingPortMock::shutdown() {} + +bool MessagingPortMock::call(Message& toSend, Message& response) { + return true; +} + +bool MessagingPortMock::recv(Message& m) { + return true; +} void MessagingPortMock::reply(Message& received, Message& response, int32_t responseToMsgId) {} +void MessagingPortMock::reply(Message& received, Message& response) {} + +void MessagingPortMock::say(Message& toSend, int responseTo) {} + +bool MessagingPortMock::connect(SockAddr& farEnd) { + return true; +} + +void MessagingPortMock::send(const char* data, int len, const char* context) {} +void MessagingPortMock::send(const std::vector>& data, const char* context) {} HostAndPort MessagingPortMock::remote() const { return _remote; @@ -60,6 +79,49 @@ SockAddr MessagingPortMock::localAddr() const { return SockAddr{}; } +bool MessagingPortMock::isStillConnected() const { + return true; +} + +void MessagingPortMock::setLogLevel(logger::LogSeverity logLevel) {} + +void MessagingPortMock::clearCounters() {} + +long long MessagingPortMock::getBytesIn() const { + return 0; +} + +long long MessagingPortMock::getBytesOut() const { + return 0; +} + + +uint64_t MessagingPortMock::getSockCreationMicroSec() const { + return 0; +} + +void MessagingPortMock::setX509SubjectName(const std::string& x509SubjectName) {} + +std::string MessagingPortMock::getX509SubjectName() const { + return "mock"; +} + +void MessagingPortMock::setConnectionId(const long long connectionId) {} + +long long MessagingPortMock::connectionId() const { + return 42; +} + +void MessagingPortMock::setTag(const AbstractMessagingPort::Tag tag) {} + +AbstractMessagingPort::Tag MessagingPortMock::getTag() const { + return 0; +} + +bool MessagingPortMock::secure(SSLManagerInterface* ssl, const std::string& remoteHost) { + return true; +} + void MessagingPortMock::setRemote(const HostAndPort& remote) { _remote = remote; } diff --git a/src/mongo/util/net/message_port_mock.h b/src/mongo/util/net/message_port_mock.h index 45116dbd008..61ab5be8958 100644 --- a/src/mongo/util/net/message_port_mock.h +++ b/src/mongo/util/net/message_port_mock.h @@ -30,27 +30,67 @@ #include -#include "mongo/config.h" #include "mongo/util/net/abstract_message_port.h" #include "mongo/util/net/message.h" #include "mongo/util/net/sockaddr.h" namespace mongo { -class MessagingPortMock : public AbstractMessagingPort { +class MessagingPortMock final : public AbstractMessagingPort { MONGO_DISALLOW_COPYING(MessagingPortMock); public: MessagingPortMock(); - virtual ~MessagingPortMock(); + ~MessagingPortMock(); - virtual void reply(Message& received, Message& response, int32_t responseToMsgId); - virtual void reply(Message& received, Message& response); + void setTimeout(Milliseconds millis) override; - virtual HostAndPort remote() const; - virtual unsigned remotePort() const; - virtual SockAddr remoteAddr() const; - virtual SockAddr localAddr() const; + void shutdown() override; + + bool call(Message& toSend, Message& response) override; + + bool recv(Message& m) override; + + void reply(Message& received, Message& response, int32_t responseToMsgId) override; + void reply(Message& received, Message& response) override; + + void say(Message& toSend, int responseTo = 0) override; + + bool connect(SockAddr& farEnd) override; + + void send(const char* data, int len, const char* context) override; + void send(const std::vector>& data, const char* context) override; + + HostAndPort remote() const override; + unsigned remotePort() const override; + SockAddr remoteAddr() const override; + SockAddr localAddr() const override; + + bool isStillConnected() const override; + + void setLogLevel(logger::LogSeverity logLevel) override; + + void clearCounters() override; + + long long getBytesIn() const override; + + long long getBytesOut() const override; + + uint64_t getSockCreationMicroSec() const override; + + void setX509SubjectName(const std::string& x509SubjectName) override; + + std::string getX509SubjectName() const override; + + void setConnectionId(const long long connectionId) override; + + long long connectionId() const override; + + void setTag(const AbstractMessagingPort::Tag tag) override; + + AbstractMessagingPort::Tag getTag() const override; + + bool secure(SSLManagerInterface* ssl, const std::string& remoteHost) override; void setRemote(const HostAndPort& remote); diff --git a/src/mongo/util/net/message_port_startup_param.cpp b/src/mongo/util/net/message_port_startup_param.cpp new file mode 100644 index 00000000000..704de92b012 --- /dev/null +++ b/src/mongo/util/net/message_port_startup_param.cpp @@ -0,0 +1,56 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/base/init.h" +#include "mongo/db/server_parameters.h" + +namespace mongo { + +namespace { + +const char kMessagePortImplASIO[] = "ASIO"; +const char kMessagePortImplLegacy[] = "legacy"; + +MONGO_EXPORT_STARTUP_SERVER_PARAMETER(messagePortImpl, std::string, kMessagePortImplLegacy); + +MONGO_INITIALIZER(messagePortImpl)(InitializerContext*) { + if ((messagePortImpl != kMessagePortImplASIO) && (messagePortImpl != kMessagePortImplLegacy)) { + return Status(ErrorCodes::BadValue, "unsupported message port option: " + messagePortImpl); + } + return Status::OK(); +} + +} // namespace + +bool isMessagePortImplASIO() { + return messagePortImpl == kMessagePortImplASIO; +} + +} // namespace mongo diff --git a/src/mongo/util/net/message_port_startup_param.h b/src/mongo/util/net/message_port_startup_param.h new file mode 100644 index 00000000000..8a55ab8fbf3 --- /dev/null +++ b/src/mongo/util/net/message_port_startup_param.h @@ -0,0 +1,35 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +namespace mongo { + +bool isMessagePortImplASIO(); + +} // namespace mongo diff --git a/src/mongo/util/net/message_server_port.cpp b/src/mongo/util/net/message_server_port.cpp index 1b9b9f90702..c64c16f3073 100644 --- a/src/mongo/util/net/message_server_port.cpp +++ b/src/mongo/util/net/message_server_port.cpp @@ -40,6 +40,7 @@ #include "mongo/db/server_options.h" #include "mongo/db/stats/counters.h" #include "mongo/stdx/functional.h" +#include "mongo/stdx/memory.h" #include "mongo/stdx/thread.h" #include "mongo/util/concurrency/synchronization.h" #include "mongo/util/concurrency/thread_name.h" @@ -48,9 +49,9 @@ #include "mongo/util/exit.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" +#include "mongo/util/net/abstract_message_port.h" #include "mongo/util/net/listen.h" #include "mongo/util/net/message.h" -#include "mongo/util/net/message_port.h" #include "mongo/util/net/message_server.h" #include "mongo/util/net/socket_exception.h" #include "mongo/util/net/ssl_manager.h" @@ -72,24 +73,7 @@ using std::endl; namespace { -class MessagingPortWithHandler : public MessagingPort { - MONGO_DISALLOW_COPYING(MessagingPortWithHandler); - -public: - MessagingPortWithHandler(const std::shared_ptr& socket, - const std::shared_ptr handler, - long long connectionId) - : MessagingPort(socket), _handler(handler) { - setConnectionId(connectionId); - } - - const std::shared_ptr getHandler() const { - return _handler; - } - -private: - const std::shared_ptr _handler; -}; +using MessagingPortWithHandler = std::pair>; } // namespace @@ -104,10 +88,9 @@ public: PortMessageServer(const MessageServer::Options& opts, std::shared_ptr handler) : Listener("", opts.ipList, opts.port), _handler(std::move(handler)) {} - virtual void accepted(std::shared_ptr psocket, long long connectionId) { + virtual void accepted(AbstractMessagingPort* mp) { ScopeGuard sleepAfterClosingPort = MakeGuard(sleepmillis, 2); - std::unique_ptr portWithHandler( - new MessagingPortWithHandler(psocket, _handler, connectionId)); + auto portWithHandler = stdx::make_unique(mp, _handler); if (!Listener::globalTicketHolder.tryAcquire()) { log() << "connection refused because too many open connections: " @@ -201,34 +184,34 @@ private: invariant(arg); unique_ptr portWithHandler( static_cast(arg)); - const std::shared_ptr handler = portWithHandler->getHandler(); + auto mp = portWithHandler->first; + auto handler = portWithHandler->second; - setThreadName(std::string(str::stream() << "conn" << portWithHandler->connectionId())); - portWithHandler->psock->setLogLevel(logger::LogSeverity::Debug(1)); + setThreadName(std::string(str::stream() << "conn" << mp->connectionId())); + mp->setLogLevel(logger::LogSeverity::Debug(1)); Message m; int64_t counter = 0; try { - handler->connected(portWithHandler.get()); + handler->connected(mp); ON_BLOCK_EXIT([handler]() { handler->close(); }); while (!inShutdown()) { m.reset(); - portWithHandler->psock->clearCounters(); + mp->clearCounters(); - if (!portWithHandler->recv(m)) { + if (!mp->recv(m)) { if (!serverGlobalParams.quiet) { int conns = Listener::globalTicketHolder.used() - 1; const char* word = (conns == 1 ? " connection" : " connections"); - log() << "end connection " << portWithHandler->psock->remoteString() << " (" - << conns << word << " now open)" << endl; + log() << "end connection " << mp->remote().toString() << " (" << conns + << word << " now open)" << endl; } break; } - handler->process(m, portWithHandler.get()); - networkCounter.hit(portWithHandler->psock->getBytesIn(), - portWithHandler->psock->getBytesOut()); + handler->process(m, mp); + networkCounter.hit(mp->getBytesIn(), mp->getBytesOut()); // Occasionally we want to see if we're using too much memory. if ((counter++ & 0xf) == 0) { @@ -247,7 +230,7 @@ private: error() << "Uncaught std::exception: " << e.what() << ", terminating" << endl; quickExit(EXIT_UNCAUGHT); } - portWithHandler->shutdown(); + mp->shutdown(); return NULL; } diff --git a/src/mongo/util/net/miniwebserver.cpp b/src/mongo/util/net/miniwebserver.cpp index 736bae25757..e49a6030b44 100644 --- a/src/mongo/util/net/miniwebserver.cpp +++ b/src/mongo/util/net/miniwebserver.cpp @@ -36,6 +36,7 @@ #include #include "mongo/config.h" +#include "mongo/util/assert_util.h" #include "mongo/util/hex.h" #include "mongo/util/log.h" #include "mongo/util/net/socket_exception.h" @@ -129,7 +130,7 @@ bool MiniWebServer::fullReceive(const char* buf) { return false; } -void MiniWebServer::accepted(std::shared_ptr psock, long long connectionId) { +void MiniWebServer::_accepted(const std::shared_ptr& psock, long long connectionId) { char buf[4096]; int len = 0; try { @@ -202,6 +203,10 @@ void MiniWebServer::accepted(std::shared_ptr psock, long long connection } } +void MiniWebServer::accepted(AbstractMessagingPort* mp) { + MONGO_UNREACHABLE; +} + string MiniWebServer::getHeader(const char* req, const std::string& wanted) { const char* headers = strchr(req, '\n'); if (!headers) diff --git a/src/mongo/util/net/miniwebserver.h b/src/mongo/util/net/miniwebserver.h index b9d8c77b286..fce5c9fc1cf 100644 --- a/src/mongo/util/net/miniwebserver.h +++ b/src/mongo/util/net/miniwebserver.h @@ -35,7 +35,6 @@ #include "mongo/db/jsobj.h" #include "mongo/util/net/listen.h" #include "mongo/util/net/message.h" -#include "mongo/util/net/message_port.h" namespace mongo { @@ -67,8 +66,11 @@ public: return urlDecode(s.c_str()); } + // This is not currently used for the MiniWebServer. See SERVER-24200 + void accepted(AbstractMessagingPort* mp) override; + private: - void accepted(std::shared_ptr psocket, long long connectionId); + void _accepted(const std::shared_ptr& psocket, long long connectionId) override; static bool fullReceive(const char* buf); }; diff --git a/src/mongo/util/net/sock.h b/src/mongo/util/net/sock.h index 1ef0381c1c8..d607eb8051c 100644 --- a/src/mongo/util/net/sock.h +++ b/src/mongo/util/net/sock.h @@ -178,6 +178,17 @@ public: return _fd; } + /** + * This sets the Sock's socket descriptor to be invalid and returns the old descriptor. This + * only gets called in listen.cpp in Listener::_accepted(). This gets called on the listener + * thread immediately after the thread creates the Sock, so it doesn't need to be thread-safe. + */ + int stealSD() { + int tmp = _fd; + _fd = -1; + return tmp; + } + void setTimeout(double secs); bool isStillConnected(); diff --git a/src/mongo/util/net/sockaddr.h b/src/mongo/util/net/sockaddr.h index ed08dcf50f9..046167e4196 100644 --- a/src/mongo/util/net/sockaddr.h +++ b/src/mongo/util/net/sockaddr.h @@ -67,6 +67,7 @@ struct SockAddr { SockAddr( const char* ip, int port); /* EndPoint (remote) side, or if you want to specify which interface locally */ + SockAddr(const std::string& ip, int port) : SockAddr(ip.c_str(), port) {} template T& as() { diff --git a/src/mongo/util/net/ssl_manager.cpp b/src/mongo/util/net/ssl_manager.cpp index f61789a3ac7..967319c81c7 100644 --- a/src/mongo/util/net/ssl_manager.cpp +++ b/src/mongo/util/net/ssl_manager.cpp @@ -581,10 +581,6 @@ void SSLManager::SSL_free(SSLConnection* conn) { Status SSLManager::initSSLContext(SSL_CTX* context, const SSLParams& params, ConnectionDirection direction) { - if (direction == ConnectionDirection::kIncoming) { - fassert(34364, context == _serverContext.get()); - } - // SSL_OP_ALL - Activate all bug workaround options, to support buggy client SSL's. // SSL_OP_NO_SSLv2 - Disable SSL v2 support // SSL_OP_NO_SSLv3 - Disable SSL v3 support diff --git a/src/mongo/util/net/ssl_manager.h b/src/mongo/util/net/ssl_manager.h index 2b965b46cc6..ecc393070a8 100644 --- a/src/mongo/util/net/ssl_manager.h +++ b/src/mongo/util/net/ssl_manager.h @@ -38,6 +38,7 @@ #include "mongo/base/disallow_copying.h" #include "mongo/base/string_data.h" #include "mongo/bson/bsonobj.h" +#include "mongo/util/decorable.h" #include "mongo/util/net/sock.h" #include "mongo/util/time_support.h" @@ -86,7 +87,7 @@ struct SSLConfiguration { bool hasCA = false; }; -class SSLManagerInterface { +class SSLManagerInterface : public Decorable { public: static std::unique_ptr create(const SSLParams& params, bool isServer); -- cgit v1.2.1