summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorMatt Cotter <matt.cotter@mongodb.com>2016-01-27 16:39:14 -0500
committerMatt Cotter <matt.cotter@mongodb.com>2016-05-20 14:06:28 -0400
commit225a2a57350c79a190c13054a038b6c3c559a558 (patch)
tree6e24efd0f62c580e1c09e17d5b5d7a561dd63e01 /src/mongo
parente6e7e099aaf11afb626d2cd95dd1d339433f6e7f (diff)
downloadmongo-225a2a57350c79a190c13054a038b6c3c559a558.tar.gz
SERVER-23448 create an ASIO based MessagingPort
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/SConscript6
-rw-r--r--src/mongo/client/connection_pool.cpp2
-rw-r--r--src/mongo/client/dbclient.cpp23
-rw-r--r--src/mongo/client/dbclientinterface.h6
-rw-r--r--src/mongo/client/scoped_db_conn_test.cpp1
-rw-r--r--src/mongo/db/client_basic.h2
-rw-r--r--src/mongo/db/curop_test.cpp2
-rw-r--r--src/mongo/db/db.cpp1
-rw-r--r--src/mongo/db/dbmessage.h2
-rw-r--r--src/mongo/db/repl/oplogreader.cpp3
-rw-r--r--src/mongo/db/repl/repl_set_request_votes.cpp6
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp4
-rw-r--r--src/mongo/db/repl/replset_commands.cpp6
-rw-r--r--src/mongo/dbtests/mock_dbclient_conn_test.cpp1
-rw-r--r--src/mongo/s/server.h6
-rw-r--r--src/mongo/tools/bridge.cpp21
-rw-r--r--src/mongo/util/net/SConscript25
-rw-r--r--src/mongo/util/net/abstract_message_port.h153
-rw-r--r--src/mongo/util/net/asio_message_port.cpp619
-rw-r--r--src/mongo/util/net/asio_message_port.h159
-rw-r--r--src/mongo/util/net/asio_ssl_context.cpp69
-rw-r--r--src/mongo/util/net/asio_ssl_context.h85
-rw-r--r--src/mongo/util/net/httpclient.cpp1
-rw-r--r--src/mongo/util/net/listen.cpp35
-rw-r--r--src/mongo/util/net/listen.h16
-rw-r--r--src/mongo/util/net/listen_test.cpp87
-rw-r--r--src/mongo/util/net/message.cpp21
-rw-r--r--src/mongo/util/net/message.h8
-rw-r--r--src/mongo/util/net/message_port.cpp112
-rw-r--r--src/mongo/util/net/message_port.h103
-rw-r--r--src/mongo/util/net/message_port_mock.cpp64
-rw-r--r--src/mongo/util/net/message_port_mock.h58
-rw-r--r--src/mongo/util/net/message_port_startup_param.cpp56
-rw-r--r--src/mongo/util/net/message_port_startup_param.h35
-rw-r--r--src/mongo/util/net/message_server_port.cpp51
-rw-r--r--src/mongo/util/net/miniwebserver.cpp7
-rw-r--r--src/mongo/util/net/miniwebserver.h6
-rw-r--r--src/mongo/util/net/sock.h11
-rw-r--r--src/mongo/util/net/sockaddr.h1
-rw-r--r--src/mongo/util/net/ssl_manager.cpp4
-rw-r--r--src/mongo/util/net/ssl_manager.h3
41 files changed, 1545 insertions, 336 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript
index 398d34de72d..d05a1f9c8a2 100644
--- a/src/mongo/SConscript
+++ b/src/mongo/SConscript
@@ -16,7 +16,11 @@ Import("use_system_version_of_library")
# Boost we need everywhere. 's2' is spammed in all over the place by
# db/geo unfortunately. pcre is also used many places.
-env.InjectThirdPartyIncludePaths(libraries=['boost', 's2', 'pcre'])
+env.InjectThirdPartyIncludePaths(libraries=[
+ 'boost',
+ 'pcre',
+ 's2',
+])
env.InjectMongoIncludePaths()
env.SConscript(
diff --git a/src/mongo/client/connection_pool.cpp b/src/mongo/client/connection_pool.cpp
index 9fa5493824f..cfdb872474c 100644
--- a/src/mongo/client/connection_pool.cpp
+++ b/src/mongo/client/connection_pool.cpp
@@ -185,7 +185,7 @@ ConnectionPool::ConnectionList::iterator ConnectionPool::acquireConnection(
conn->setSoTimeout(durationCount<Milliseconds>(timeout) / 1000.0);
uassertStatusOK(conn->connect(target));
- conn->port().tag |= _messagingPortTags;
+ conn->port().setTag(conn->port().getTag() | _messagingPortTags);
if (isInternalAuthSet()) {
conn->auth(getInternalUserAuthParamsWithFallback());
diff --git a/src/mongo/client/dbclient.cpp b/src/mongo/client/dbclient.cpp
index 8500868a46f..7e7a7528b76 100644
--- a/src/mongo/client/dbclient.cpp
+++ b/src/mongo/client/dbclient.cpp
@@ -31,6 +31,7 @@
#include "mongo/platform/basic.h"
+#include <algorithm>
#include <utility>
#include "mongo/base/status.h"
@@ -63,10 +64,15 @@
#include "mongo/util/concurrency/mutex.h"
#include "mongo/util/debug_util.h"
#include "mongo/util/log.h"
+#include "mongo/util/net/asio_message_port.h"
+#include "mongo/util/net/message_port.h"
+#include "mongo/util/net/message_port_startup_param.h"
#include "mongo/util/net/socket_exception.h"
#include "mongo/util/net/ssl_manager.h"
#include "mongo/util/net/ssl_options.h"
#include "mongo/util/password_digest.h"
+#include "mongo/util/represent_as.h"
+#include "mongo/util/time_support.h"
namespace mongo {
@@ -948,6 +954,10 @@ Status DBClientConnection::connect(const HostAndPort& serverAddress) {
return Status::OK();
}
+namespace {
+const auto kMaxMillisCount = Milliseconds::max().count();
+} // namespace
+
Status DBClientConnection::connectSocketOnly(const HostAndPort& serverAddress) {
_serverAddress = serverAddress;
_failed = true;
@@ -961,7 +971,14 @@ Status DBClientConnection::connectSocketOnly(const HostAndPort& serverAddress) {
<< serverAddress.host() << ", address is invalid");
}
- _port.reset(new MessagingPort(_so_timeout, _logLevel));
+ if (isMessagePortImplASIO()) {
+ // `_so_timeout` is in seconds.
+ auto ms = representAs<int64_t>(std::floor(_so_timeout * 1000)).value_or(kMaxMillisCount);
+ _port.reset(new ASIOMessagingPort(
+ ms > kMaxMillisCount ? Milliseconds::max() : Milliseconds(ms), _logLevel));
+ } else {
+ _port.reset(new MessagingPort(_so_timeout, _logLevel));
+ }
if (serverAddress.host().empty()) {
return Status(ErrorCodes::InvalidOptions,
@@ -1058,7 +1075,9 @@ void DBClientConnection::_checkConnection() {
void DBClientConnection::setSoTimeout(double timeout) {
_so_timeout = timeout;
if (_port) {
- _port->setSocketTimeout(timeout);
+ // `timeout` is in seconds.
+ auto ms = representAs<int64_t>(std::floor(timeout * 1000)).value_or(kMaxMillisCount);
+ _port->setTimeout(ms > kMaxMillisCount ? Milliseconds::max() : Milliseconds(ms));
}
}
diff --git a/src/mongo/client/dbclientinterface.h b/src/mongo/client/dbclientinterface.h
index 942e5caf5fb..dc6624aef5d 100644
--- a/src/mongo/client/dbclientinterface.h
+++ b/src/mongo/client/dbclientinterface.h
@@ -41,8 +41,8 @@
#include "mongo/rpc/unique_message.h"
#include "mongo/stdx/functional.h"
#include "mongo/util/mongoutils/str.h"
+#include "mongo/util/net/abstract_message_port.h"
#include "mongo/util/net/message.h"
-#include "mongo/util/net/message_port.h"
namespace mongo {
@@ -1187,7 +1187,7 @@ public:
return _maxWireVersion;
}
- MessagingPort& port() {
+ AbstractMessagingPort& port() {
verify(_port);
return *_port;
}
@@ -1246,7 +1246,7 @@ protected:
virtual void _auth(const BSONObj& params);
- std::unique_ptr<MessagingPort> _port;
+ std::unique_ptr<AbstractMessagingPort> _port;
bool _failed;
const bool autoReconnect;
diff --git a/src/mongo/client/scoped_db_conn_test.cpp b/src/mongo/client/scoped_db_conn_test.cpp
index 6862597896b..7057b2ed024 100644
--- a/src/mongo/client/scoped_db_conn_test.cpp
+++ b/src/mongo/client/scoped_db_conn_test.cpp
@@ -44,7 +44,6 @@
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
#include "mongo/util/net/listen.h"
-#include "mongo/util/net/message_port.h"
#include "mongo/util/net/message_server.h"
#include "mongo/util/net/socket_exception.h"
#include "mongo/util/quick_exit.h"
diff --git a/src/mongo/db/client_basic.h b/src/mongo/db/client_basic.h
index 6587227245b..88d7ef4afe8 100644
--- a/src/mongo/db/client_basic.h
+++ b/src/mongo/db/client_basic.h
@@ -32,8 +32,8 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/util/decorable.h"
+#include "mongo/util/net/abstract_message_port.h"
#include "mongo/util/net/hostandport.h"
-#include "mongo/util/net/message_port.h"
namespace mongo {
diff --git a/src/mongo/db/curop_test.cpp b/src/mongo/db/curop_test.cpp
index e650ff776d2..84c85e3cef9 100644
--- a/src/mongo/db/curop_test.cpp
+++ b/src/mongo/db/curop_test.cpp
@@ -49,7 +49,7 @@ const long long intervalShort = 10 * 1000; // 10ms in micros
class TestListener : public Listener {
public:
TestListener() : Listener("test", "", 0) {} // port 0 => any available high port
- virtual void acceptedMP(MessagingPort* mp) {}
+ void accepted(AbstractMessagingPort* mp) override {}
};
AtomicUInt32 threadInitialized(0);
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp
index 5eca78ef8f4..eef81d4b21a 100644
--- a/src/mongo/db/db.cpp
+++ b/src/mongo/db/db.cpp
@@ -120,6 +120,7 @@
#include "mongo/util/fast_clock_source_factory.h"
#include "mongo/util/log.h"
#include "mongo/util/net/hostname_canonicalization_worker.h"
+#include "mongo/util/net/listen.h"
#include "mongo/util/net/message_server.h"
#include "mongo/util/net/ssl_manager.h"
#include "mongo/util/ntservice.h"
diff --git a/src/mongo/db/dbmessage.h b/src/mongo/db/dbmessage.h
index 29629e552fd..7f1a3a2d0f2 100644
--- a/src/mongo/db/dbmessage.h
+++ b/src/mongo/db/dbmessage.h
@@ -34,8 +34,8 @@
#include "mongo/client/constants.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/server_options.h"
+#include "mongo/util/net/abstract_message_port.h"
#include "mongo/util/net/message.h"
-#include "mongo/util/net/message_port.h"
namespace mongo {
diff --git a/src/mongo/db/repl/oplogreader.cpp b/src/mongo/db/repl/oplogreader.cpp
index e9e7b79c261..749c6112db9 100644
--- a/src/mongo/db/repl/oplogreader.cpp
+++ b/src/mongo/db/repl/oplogreader.cpp
@@ -98,7 +98,8 @@ bool OplogReader::connect(const HostAndPort& host) {
error() << errmsg << endl;
return false;
}
- _conn->port().tag |= executor::NetworkInterface::kMessagingPortKeepOpen;
+ _conn->port().setTag(_conn->port().getTag() |
+ executor::NetworkInterface::kMessagingPortKeepOpen);
_host = host;
}
return true;
diff --git a/src/mongo/db/repl/repl_set_request_votes.cpp b/src/mongo/db/repl/repl_set_request_votes.cpp
index 0c0f209d1b6..4b34c351769 100644
--- a/src/mongo/db/repl/repl_set_request_votes.cpp
+++ b/src/mongo/db/repl/repl_set_request_votes.cpp
@@ -67,13 +67,13 @@ private:
unsigned originalTag = 0;
AbstractMessagingPort* mp = txn->getClient()->port();
if (mp) {
- originalTag = mp->tag;
- mp->tag |= executor::NetworkInterface::kMessagingPortKeepOpen;
+ originalTag = mp->getTag();
+ mp->setTag(originalTag | executor::NetworkInterface::kMessagingPortKeepOpen);
}
// Untag the connection on exit.
ON_BLOCK_EXIT([mp, originalTag]() {
if (mp) {
- mp->tag = originalTag;
+ mp->setTag(originalTag);
}
});
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 3d8cc0ffb94..152016b2ddc 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -72,7 +72,7 @@
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
#include "mongo/util/net/hostandport.h"
-#include "mongo/util/net/message_port.h"
+#include "mongo/util/net/listen.h"
namespace mongo {
namespace repl {
@@ -387,7 +387,7 @@ HostAndPort ReplicationCoordinatorExternalStateImpl::getClientHostAndPort(
}
void ReplicationCoordinatorExternalStateImpl::closeConnections() {
- MessagingPort::closeAllSockets(executor::NetworkInterface::kMessagingPortKeepOpen);
+ Listener::closeMessagingPorts(executor::NetworkInterface::kMessagingPortKeepOpen);
}
void ReplicationCoordinatorExternalStateImpl::killAllUserOperations(OperationContext* txn) {
diff --git a/src/mongo/db/repl/replset_commands.cpp b/src/mongo/db/repl/replset_commands.cpp
index 18fd6c3208a..e12f0405751 100644
--- a/src/mongo/db/repl/replset_commands.cpp
+++ b/src/mongo/db/repl/replset_commands.cpp
@@ -773,14 +773,14 @@ public:
AbstractMessagingPort* mp = txn->getClient()->port();
unsigned originalTag = 0;
if (mp) {
- originalTag = mp->tag;
- mp->tag |= executor::NetworkInterface::kMessagingPortKeepOpen;
+ originalTag = mp->getTag();
+ mp->setTag(originalTag | executor::NetworkInterface::kMessagingPortKeepOpen);
}
// Unset the tag on block exit
ON_BLOCK_EXIT([mp, originalTag]() {
if (mp) {
- mp->tag = originalTag;
+ mp->setTag(originalTag);
}
});
diff --git a/src/mongo/dbtests/mock_dbclient_conn_test.cpp b/src/mongo/dbtests/mock_dbclient_conn_test.cpp
index 60a5dba7220..7febff697a3 100644
--- a/src/mongo/dbtests/mock_dbclient_conn_test.cpp
+++ b/src/mongo/dbtests/mock_dbclient_conn_test.cpp
@@ -35,6 +35,7 @@
#include "mongo/db/jsobj.h"
#include "mongo/dbtests/mock/mock_dbclient_connection.h"
#include "mongo/unittest/unittest.h"
+#include "mongo/util/net/sock.h"
#include "mongo/util/timer.h"
#include "mongo/util/net/socket_exception.h"
diff --git a/src/mongo/s/server.h b/src/mongo/s/server.h
index 1b76ac71a15..6466e001d13 100644
--- a/src/mongo/s/server.h
+++ b/src/mongo/s/server.h
@@ -31,12 +31,14 @@
#include <string>
#include "mongo/db/jsobj.h"
-#include "mongo/util/net/message.h"
namespace mongo {
+class AbstractMessagingPort;
+class Message;
+
extern OID serverID;
// from request.cpp
-void processRequest(Message& m, MessagingPort& p);
+void processRequest(Message& m, AbstractMessagingPort& p);
}
diff --git a/src/mongo/tools/bridge.cpp b/src/mongo/tools/bridge.cpp
index 64ff17387e0..e61fd05d9f9 100644
--- a/src/mongo/tools/bridge.cpp
+++ b/src/mongo/tools/bridge.cpp
@@ -53,6 +53,7 @@
#include "mongo/util/assert_util.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
+#include "mongo/util/net/abstract_message_port.h"
#include "mongo/util/net/listen.h"
#include "mongo/util/net/message.h"
#include "mongo/util/exit.h"
@@ -86,7 +87,7 @@ boost::optional<HostAndPort> extractHostInfo(const rpc::RequestInterface& reques
class Forwarder {
public:
- Forwarder(MessagingPort* mp,
+ Forwarder(AbstractMessagingPort* mp,
stdx::mutex* settingsMutex,
HostSettingsMap* settings,
int64_t seed)
@@ -114,7 +115,7 @@ public:
warning() << "Unable to establish connection to "
<< mongoBridgeGlobalParams.destUri << " after " << elapsed
<< " seconds: " << status;
- log() << "end connection " << _mp->psock->remoteString();
+ log() << "end connection " << _mp->remote().toString();
_mp->shutdown();
return;
}
@@ -132,7 +133,7 @@ public:
try {
request.reset();
if (!_mp->recv(request)) {
- log() << "end connection " << _mp->psock->remoteString();
+ log() << "end connection " << _mp->remote().toString();
_mp->shutdown();
break;
}
@@ -183,7 +184,7 @@ public:
// Close the connection to 'dest'.
case HostSettings::State::kHangUp:
log() << "Rejecting connection from " << host->toString()
- << ", end connection " << _mp->psock->remoteString();
+ << ", end connection " << _mp->remote().toString();
_mp->shutdown();
return;
// Forward the message to 'dest' with probability '1 - hostSettings.loss'.
@@ -214,7 +215,7 @@ public:
// If there's nothing to respond back to '_mp' with, then close the connection.
if (response.empty()) {
log() << "Received an empty response, end connection "
- << _mp->psock->remoteString();
+ << _mp->remote().toString();
_mp->shutdown();
break;
}
@@ -228,7 +229,7 @@ public:
// connections from 'host', then do so now.
if (hostSettings.state == HostSettings::State::kHangUp) {
log() << "Closing connection from " << host->toString()
- << ", end connection " << _mp->psock->remoteString();
+ << ", end connection " << _mp->remote().toString();
_mp->shutdown();
break;
}
@@ -259,7 +260,7 @@ public:
}
} catch (const DBException& ex) {
error() << "Caught DBException in Forwarder: " << ex << ", end connection "
- << _mp->psock->remoteString();
+ << _mp->remote().toString();
_mp->shutdown();
break;
} catch (...) {
@@ -303,7 +304,7 @@ private:
return {};
}
- MessagingPort* _mp;
+ AbstractMessagingPort* _mp;
stdx::mutex* _settingsMutex;
HostSettingsMap* _settings;
@@ -319,7 +320,7 @@ public:
log() << "Setting random seed: " << mongoBridgeGlobalParams.seed;
}
- void acceptedMP(MessagingPort* mp) final {
+ void accepted(AbstractMessagingPort* mp) override final {
{
stdx::lock_guard<stdx::mutex> lk(_portsMutex);
if (_inShutdown.load()) {
@@ -343,7 +344,7 @@ public:
private:
stdx::mutex _portsMutex;
- std::set<MessagingPort*> _ports;
+ std::set<AbstractMessagingPort*> _ports;
AtomicWord<bool> _inShutdown{false};
stdx::mutex _settingsMutex;
diff --git a/src/mongo/util/net/SConscript b/src/mongo/util/net/SConscript
index 94ff35e50e8..99bd2581fa6 100644
--- a/src/mongo/util/net/SConscript
+++ b/src/mongo/util/net/SConscript
@@ -22,14 +22,23 @@ env.CppUnitTest(
],
)
-env.Library(
+networkEnv = env.Clone();
+
+networkEnv.InjectThirdPartyIncludePaths(libraries=[
+ 'asio',
+])
+
+networkEnv.Library(
target='network',
source=[
+ "asio_message_port.cpp",
+ "asio_ssl_context.cpp",
"hostname_canonicalization.cpp",
"httpclient.cpp",
"listen.cpp",
"message.cpp",
"message_port.cpp",
+ "message_port_startup_param.cpp",
"sock.cpp",
"sockaddr.cpp",
'socket_exception.cpp',
@@ -40,26 +49,18 @@ env.Library(
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/server_options_core',
+ '$BUILD_DIR/mongo/db/server_parameters',
'$BUILD_DIR/mongo/util/background_job',
'$BUILD_DIR/mongo/util/concurrency/ticketholder',
+ '$BUILD_DIR/mongo/util/decorable',
'$BUILD_DIR/mongo/util/fail_point',
'$BUILD_DIR/mongo/util/foundation',
'$BUILD_DIR/mongo/util/options_parser/options_parser',
+ '$BUILD_DIR/third_party/shim_asio',
'hostandport',
],
)
-env.CppUnitTest(
- target='listen_test',
- source=[
- 'listen_test.cpp',
- ],
- LIBDEPS=[
- 'network',
- ],
- NO_CRUTCH = True,
-)
-
env.Library(
target='message_port_mock',
source=[
diff --git a/src/mongo/util/net/abstract_message_port.h b/src/mongo/util/net/abstract_message_port.h
index 372d3fc8485..c4ff5503f41 100644
--- a/src/mongo/util/net/abstract_message_port.h
+++ b/src/mongo/util/net/abstract_message_port.h
@@ -31,48 +31,161 @@
#include <vector>
#include "mongo/config.h"
+#include "mongo/logger/log_severity.h"
#include "mongo/util/net/message.h"
#include "mongo/util/net/sockaddr.h"
+#include "mongo/util/time_support.h"
namespace mongo {
+class SSLManagerInterface;
+
class AbstractMessagingPort {
MONGO_DISALLOW_COPYING(AbstractMessagingPort);
+protected:
+ AbstractMessagingPort() = default;
+
public:
- AbstractMessagingPort() : tag(0), _connectionId(0) {}
- virtual ~AbstractMessagingPort() {}
- // like the reply below, but doesn't rely on received.data still being available
+ virtual ~AbstractMessagingPort() = default;
+
+ using Tag = uint32_t;
+
+ /**
+ * Used when closing sockets. This value will not close any tagged sockets.
+ */
+ static const Tag kSkipAllMask = 0xffffffff;
+
+ /**
+ * Sets the maximum amount of time (in ms) to wait for io operations to run.
+ */
+ virtual void setTimeout(Milliseconds millis) = 0;
+
+ /**
+ * Closes the underlying socket.
+ */
+ virtual void shutdown() = 0;
+
+ /**
+ * Sends a message and waits for a response. This is equivalent to calling `say` then `recv`.
+ */
+ virtual bool call(Message& toSend, Message& response) = 0;
+
+ /**
+ * Reads the next message from the socket.
+ */
+ virtual bool recv(Message& m) = 0;
+
+ /**
+ * Sends a message as a reply to a received message.
+ */
virtual void reply(Message& received, Message& response, int32_t responseToMsgId) = 0;
virtual void reply(Message& received, Message& response) = 0;
+ /**
+ * Sends the message.
+ */
+ virtual void say(Message& toSend, int responseTo = 0) = 0;
+
+ /**
+ * Sends the data over the socket.
+ */
+ virtual void send(const char* data, int len, const char* context) = 0;
+ virtual void send(const std::vector<std::pair<char*, int>>& data, const char* context) = 0;
+
+ /**
+ * Connect to the remote endpoint.
+ */
+ virtual bool connect(SockAddr& farEnd) = 0;
+
+ /**
+ * The remote endpoint.
+ */
virtual HostAndPort remote() const = 0;
+
+ /**
+ * The port of the remote endpoint.
+ */
virtual unsigned remotePort() const = 0;
+
+ /**
+ * The remote endpoint.
+ */
virtual SockAddr remoteAddr() const = 0;
+
+ /**
+ * The local endpoint.
+ */
virtual SockAddr localAddr() const = 0;
- void setX509SubjectName(const std::string& x509SubjectName) {
- _x509SubjectName = x509SubjectName;
- }
+ /**
+ * Whether or not this is still connected.
+ */
+ virtual bool isStillConnected() const = 0;
- std::string getX509SubjectName() {
- return _x509SubjectName;
- }
+ /**
+ * Point in time (in micro seconds) when this was created.
+ */
+ virtual uint64_t getSockCreationMicroSec() const = 0;
- long long connectionId() const {
- return _connectionId;
- }
- void setConnectionId(long long connectionId);
+ /**
+ * Sets the severity level for all logging.
+ */
+ virtual void setLogLevel(logger::LogSeverity logLevel) = 0;
-public:
- // TODO make this private with some helpers
+ /**
+ * Clear the io counters.
+ */
+ virtual void clearCounters() = 0;
+
+ /**
+ * The total number of bytes read (since initialization or clearing the counters).
+ */
+ virtual long long getBytesIn() const = 0;
+
+ /**
+ * The total number of bytes written (since initialization or clearing the counters).
+ */
+ virtual long long getBytesOut() const = 0;
+
+ /**
+ * Set the x509 subject name (used for SSL).
+ */
+ virtual void setX509SubjectName(const std::string& x509SubjectName) = 0;
+
+ /**
+ * Get the current x509 subject name (used for SSL).
+ */
+ virtual std::string getX509SubjectName() const = 0;
+
+ /**
+ * Set the connection ID.
+ */
+ virtual void setConnectionId(const long long connectionId) = 0;
+
+ /**
+ * Get the connection ID.
+ */
+ virtual long long connectionId() const = 0;
+
+ /**
+ * Set the tag for this messaging port, used when closing with a mask.
+ * @see Listener::closeMessagingPorts()
+ */
+ virtual void setTag(const Tag tag) = 0;
- /* ports can be tagged with various classes. see closeAllSockets(tag). defaults to 0. */
- unsigned tag;
+ /**
+ * Get the tag for this messaging port.
+ */
+ virtual Tag getTag() const = 0;
-private:
- long long _connectionId;
- std::string _x509SubjectName;
+ /**
+ * Initiates the TLS/SSL handshake on this AbstractMessagingPort. When this function returns,
+ * further communication on this AbstractMessagingPort will be encrypted.
+ * ssl - Pointer to the global SSLManager.
+ * remoteHost - The hostname of the remote server.
+ */
+ virtual bool secure(SSLManagerInterface* ssl, const std::string& remoteHost) = 0;
};
} // namespace mongo
diff --git a/src/mongo/util/net/asio_message_port.cpp b/src/mongo/util/net/asio_message_port.cpp
new file mode 100644
index 00000000000..1d685300039
--- /dev/null
+++ b/src/mongo/util/net/asio_message_port.cpp
@@ -0,0 +1,619 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/util/net/asio_message_port.h"
+
+#include <set>
+
+#include "mongo/base/init.h"
+#include "mongo/config.h"
+#include "mongo/stdx/memory.h"
+#include "mongo/util/fail_point_service.h"
+#include "mongo/util/invariant.h"
+#include "mongo/util/log.h"
+#include "mongo/util/net/asio_ssl_context.h"
+#include "mongo/util/net/message.h"
+#include "mongo/util/net/socket_exception.h"
+#include "mongo/util/net/ssl_manager.h"
+#include "mongo/util/net/ssl_options.h"
+#include "mongo/util/scopeguard.h"
+
+namespace mongo {
+
+namespace {
+const char kGET[] = "GET";
+const int kHeaderLen = sizeof(MSGHEADER::Value);
+const int kInitialMessageSize = 1024;
+
+class ASIOPorts {
+public:
+ void closeAll(AbstractMessagingPort::Tag skipMask) {
+ stdx::lock_guard<stdx::mutex> lock_guard(mutex);
+ for (auto&& port : _portSet) {
+ if (port->getTag() & skipMask) {
+ LOG(3) << "Skip closing connection # " << port->connectionId();
+ continue;
+ }
+ LOG(3) << "Closing connection # " << port->connectionId();
+ port->shutdown();
+ }
+ }
+
+ void insert(ASIOMessagingPort* p) {
+ stdx::lock_guard<stdx::mutex> lock_guard(mutex);
+ _portSet.insert(p);
+ }
+
+ void erase(ASIOMessagingPort* p) {
+ stdx::lock_guard<stdx::mutex> lock_guard(mutex);
+ _portSet.erase(p);
+ }
+
+private:
+ stdx::mutex mutex;
+ std::unordered_set<ASIOMessagingPort*> _portSet;
+};
+
+// We "new" this so it will still be around when other automatic global vars are being destructed
+// during termination. TODO: This is an artifact from MessagingPort and should be removed when we
+// decide where to put networking global state.
+ASIOPorts& asioPorts = *(new ASIOPorts());
+
+#ifdef MONGO_CONFIG_SSL
+struct ASIOSSLContextPair {
+ ASIOSSLContext server;
+ ASIOSSLContext client;
+};
+
+const auto sslDecoration = SSLManagerInterface::declareDecoration<ASIOSSLContextPair>();
+
+MONGO_INITIALIZER_WITH_PREREQUISITES(ASIOSSLContextSetup, ("SSLManager"))(InitializerContext*) {
+ if (getSSLManager()) {
+ sslDecoration(getSSLManager())
+ .server.init(SSLManagerInterface::ConnectionDirection::kIncoming);
+ sslDecoration(getSSLManager())
+ .client.init(SSLManagerInterface::ConnectionDirection::kOutgoing);
+ }
+ return Status::OK();
+}
+#endif
+
+} // namespace
+
+void ASIOMessagingPort::closeSockets(AbstractMessagingPort::Tag skipMask) {
+ asioPorts.closeAll(skipMask);
+}
+
+ASIOMessagingPort::ASIOMessagingPort(int fd, SockAddr farEnd)
+ : _service(),
+ _inShutdown(0),
+ _timer(_service),
+ _creationTime(curTimeMicros64()),
+ _timeout(),
+ _remote(),
+ _isEncrypted(false),
+ _awaitingHandshake(true),
+ _x509SubjectName(),
+ _bytesIn(0),
+ _bytesOut(0),
+ _logLevel(logger::LogSeverity::Log()),
+ _connectionId(),
+ _tag(),
+#ifdef MONGO_CONFIG_SSL
+ _context(ASIOSSLContext()),
+ _sslSock(_service,
+ getSSLManager() ? sslDecoration(getSSLManager()).server.getContext()
+ : _context->getContext()) {
+ if (getSSLManager()) {
+ _context = boost::none;
+ }
+ _sslSock.lowest_layer().assign(
+ asio::generic::stream_protocol(farEnd.getType(),
+ farEnd.getType() == AF_UNIX ? 0 : IPPROTO_TCP),
+ fd);
+#else
+ _sock(_service,
+ asio::generic::stream_protocol(farEnd.getType(),
+ farEnd.getType() == AF_UNIX ? 0 : IPPROTO_TCP),
+ fd) {
+#endif // MONGO_CONFIG_SSL
+ asioPorts.insert(this);
+ _remote = HostAndPort(farEnd.getAddr(), farEnd.getPort());
+ _timer.expires_at(decltype(_timer)::time_point::max());
+ _setTimerCallback();
+}
+
+ASIOMessagingPort::ASIOMessagingPort(Milliseconds timeout, logger::LogSeverity logLevel)
+ : _service(),
+ _inShutdown(0),
+ _timer(_service),
+ _creationTime(curTimeMicros64()),
+ _timeout(timeout),
+ _remote(),
+ _isEncrypted(false),
+ _awaitingHandshake(true),
+ _x509SubjectName(),
+ _bytesIn(0),
+ _bytesOut(0),
+ _logLevel(logLevel),
+ _connectionId(),
+ _tag(),
+#ifdef MONGO_CONFIG_SSL
+ _context(ASIOSSLContext()),
+ _sslSock(_service,
+ getSSLManager() ? sslDecoration(getSSLManager()).client.getContext()
+ : _context->getContext()) {
+ if (getSSLManager()) {
+ _context = boost::none;
+ }
+#else
+ _sock(_service) {
+#endif // MONGO_CONFIG_SSL
+ asioPorts.insert(this);
+ if (*_timeout == Milliseconds(0)) {
+ _timeout = boost::none;
+ }
+ _timer.expires_at(decltype(_timer)::time_point::max());
+ _setTimerCallback();
+}
+
+ASIOMessagingPort::~ASIOMessagingPort() {
+ shutdown();
+ asioPorts.erase(this);
+}
+
+void ASIOMessagingPort::setTimeout(Milliseconds millis) {
+ if (millis == Milliseconds(0)) {
+ _timeout = boost::none;
+ _timer.expires_at(decltype(_timer)::time_point::max());
+ } else {
+ _timeout = millis;
+ }
+}
+
+void ASIOMessagingPort::shutdown() {
+ if (!_inShutdown.swap(true)) {
+ if (_getSocket().native_handle() >= 0) {
+ _getSocket().close();
+ _awaitingHandshake = true;
+ _isEncrypted = false;
+ }
+ }
+}
+
+asio::error_code ASIOMessagingPort::_read(char* buf, std::size_t size) {
+ invariant(buf);
+ if (_timeout) {
+ _timer.expires_from_now(decltype(_timer)::duration(
+ durationCount<Duration<decltype(_timer)::duration::period>>(*_timeout)));
+ }
+ asio::error_code ec = asio::error::would_block;
+ if (!_isEncrypted) {
+ asio::async_read(_getSocket(),
+ asio::buffer(buf, size),
+ [&ec, size](const asio::error_code& err, std::size_t size_read) {
+ invariant(err || size == size_read);
+ ec = err;
+ });
+ }
+#ifdef MONGO_CONFIG_SSL
+ else {
+ asio::async_read(_sslSock,
+ asio::buffer(buf, size),
+ [&ec, size](const asio::error_code& err, std::size_t size_read) {
+ invariant(err || size == size_read);
+ ec = err;
+ });
+ }
+#endif // MONGO_CONFIG_SSL
+ do {
+ _service.run_one();
+ } while (ec == asio::error::would_block);
+ if (!ec) {
+ _bytesIn += size;
+ }
+ return ec;
+}
+
+asio::error_code ASIOMessagingPort::_write(const char* buf, std::size_t size) {
+ if (_timeout) {
+ _timer.expires_from_now(decltype(_timer)::duration(
+ durationCount<Duration<decltype(_timer)::duration::period>>(*_timeout)));
+ }
+ asio::error_code ec = asio::error::would_block;
+ if (!_isEncrypted) {
+ asio::async_write(_getSocket(),
+ asio::buffer(buf, size),
+ [&ec, &size](const asio::error_code& err, std::size_t size_written) {
+ invariant(err || size == size_written);
+ ec = err;
+ });
+ }
+#ifdef MONGO_CONFIG_SSL
+ else {
+ asio::async_write(_sslSock,
+ asio::buffer(buf, size),
+ [&ec, &size](const asio::error_code& err, std::size_t size_written) {
+ invariant(err || size == size_written);
+ ec = err;
+ });
+ }
+#endif // MONGO_CONFIG_SSL
+ do {
+ _service.run_one();
+ } while (ec == asio::error::would_block);
+ if (!ec) {
+ _bytesOut += size;
+ }
+ return ec;
+}
+
+const asio::generic::stream_protocol::socket& ASIOMessagingPort::_getSocket() const {
+#ifdef MONGO_CONFIG_SSL
+ return _sslSock.next_layer();
+#else
+ return _sock;
+#endif
+}
+
+asio::generic::stream_protocol::socket& ASIOMessagingPort::_getSocket() {
+ return const_cast<asio::generic::stream_protocol::socket&>(
+ const_cast<const ASIOMessagingPort*>(this)->_getSocket());
+}
+
+bool ASIOMessagingPort::recv(Message& m) {
+ try {
+ if (getGlobalFailPointRegistry()->getFailPoint("throwSockExcep")->shouldFail()) {
+ throw SocketException(SocketException::RECV_ERROR, "fail point set");
+ }
+ MsgData::View md = reinterpret_cast<char*>(mongoMalloc(kInitialMessageSize));
+ ScopeGuard guard = MakeGuard([&md]() { free(md.view2ptr()); });
+
+ asio::error_code ec = _read(md.view2ptr(), kHeaderLen);
+ if (ec) {
+ if (ec == asio::error::misc_errors::eof) {
+ // When the socket is closed, no need to log an error.
+ return false;
+ }
+ throw asio::system_error(ec);
+ }
+
+ if (_awaitingHandshake) {
+ static_assert(strlen(kGET) <= kHeaderLen,
+ "HTTP GET string must be smaller than the message header.");
+ if (memcmp(md.view2ptr(), kGET, strlen(kGET)) == 0) {
+ std::string httpMsg =
+ "It looks like you are trying to access MongoDB over HTTP on the native driver "
+ "port.\n";
+ LOG(_logLevel) << httpMsg;
+ std::stringstream ss;
+ ss << "HTTP/1.0 200 OK\r\nConnection: close\r\nContent-Type: "
+ "text/plain\r\nContent-Length: " << httpMsg.size() << "\r\n\r\n" << httpMsg;
+ auto s = ss.str();
+ send(s.c_str(), s.size(), nullptr);
+ return false;
+ }
+#ifndef MONGO_CONFIG_SSL
+ // If responseTo is not 0 or -1 for first packet assume SSL
+ if (md.getResponseToMsgId() != 0 && md.getResponseToMsgId() != -1) {
+ uasserted(40130,
+ "SSL handshake requested, SSL feature not available in this build");
+ }
+#else
+ if (md.getResponseToMsgId() != 0 && md.getResponseToMsgId() != -1) {
+ uassert(40131,
+ "SSL handshake received but server is started without SSL support",
+ sslGlobalParams.sslMode.load() != SSLParams::SSLMode_disabled);
+ // `_sslSock.handshake()` throws on failure.
+ _sslSock.handshake(decltype(_sslSock)::server,
+ asio::buffer(md.view2ptr(), kHeaderLen));
+
+ auto swPeerSubjectName =
+ getSSLManager()->parseAndValidatePeerCertificate(_sslSock.native_handle(), "");
+ if (!swPeerSubjectName.isOK()) {
+ throw SocketException(SocketException::CONNECT_ERROR,
+ swPeerSubjectName.getStatus().reason());
+ }
+ setX509SubjectName(swPeerSubjectName.getValue().get_value_or(""));
+
+ _isEncrypted = true;
+ _awaitingHandshake = false;
+ return recv(m);
+ }
+ uassert(40132,
+ "The server is configured to only allow SSL connections",
+ sslGlobalParams.sslMode.load() != SSLParams::SSLMode_requireSSL);
+
+#endif // MONGO_CONFIG_SSL
+ _awaitingHandshake = false;
+ }
+
+ int msgLen = md.getLen();
+
+ if (static_cast<size_t>(msgLen) < sizeof(MSGHEADER::Value) ||
+ static_cast<size_t>(msgLen) > MaxMessageSizeBytes) {
+ LOG(_logLevel) << "recv(): message len " << msgLen << " is invalid. "
+ << "Min: " << sizeof(MSGHEADER::Value)
+ << ", Max: " << MaxMessageSizeBytes;
+ return false;
+ }
+
+ if (msgLen > kInitialMessageSize) {
+ md = reinterpret_cast<char*>(mongoRealloc(md.view2ptr(), msgLen));
+ }
+
+ ec = _read(md.data(), msgLen - kHeaderLen);
+ if (ec) {
+ throw asio::system_error(ec);
+ }
+
+ guard.Dismiss();
+ m.setData(md.view2ptr(), true);
+ return true;
+
+ } catch (const asio::system_error& e) {
+ LOG(_logLevel) << "SocketException: remote: " << remote() << " error: " << e.what();
+ m.reset();
+ return false;
+ }
+}
+
+void ASIOMessagingPort::reply(Message& received, Message& response) {
+ say(response, received.header().getId());
+}
+
+void ASIOMessagingPort::reply(Message& received, Message& response, int32_t responseToMsgId) {
+ say(response, responseToMsgId);
+}
+
+bool ASIOMessagingPort::call(Message& toSend, Message& response) {
+ try {
+ say(toSend);
+ } catch (const asio::system_error&) {
+ return false;
+ }
+ bool success = recv(response);
+ if (success) {
+ if (response.header().getResponseToMsgId() != toSend.header().getId()) {
+ response.reset();
+ uasserted(40133, "Response ID did not match the sent message ID.");
+ }
+ }
+ return success;
+}
+
+void ASIOMessagingPort::say(Message& toSend, int responseTo) {
+ invariant(!toSend.empty());
+ toSend.header().setId(nextMessageId());
+ toSend.header().setResponseToMsgId(responseTo);
+ auto buf = toSend.buf();
+ if (buf) {
+ send(buf, MsgData::ConstView(buf).getLen(), nullptr);
+ } else {
+ send(toSend.dataBuffers(), nullptr);
+ }
+}
+
+unsigned ASIOMessagingPort::remotePort() const {
+ return _remote.port();
+}
+
+
+HostAndPort ASIOMessagingPort::remote() const {
+ return _remote;
+}
+
+SockAddr ASIOMessagingPort::remoteAddr() const {
+ return SockAddr(_remote.host(), _remote.port());
+}
+
+SockAddr ASIOMessagingPort::localAddr() const {
+ auto ep = _getSocket().local_endpoint();
+ switch (ep.protocol().family()) {
+ case AF_INET:
+ case AF_INET6: {
+ asio::ip::tcp::endpoint tcpEP;
+ tcpEP.resize(ep.size());
+ memcpy(tcpEP.data(), ep.data(), ep.size());
+ return SockAddr(tcpEP.address().to_string(), tcpEP.port());
+ }
+#ifndef _WIN32
+ case AF_UNIX: {
+ asio::local::stream_protocol::endpoint localEP;
+ localEP.resize(ep.size());
+ memcpy(localEP.data(), ep.data(), ep.size());
+ return SockAddr(localEP.path(), 0);
+ }
+#endif // _WIN32
+ default: { MONGO_UNREACHABLE; }
+ }
+}
+
+void ASIOMessagingPort::send(const char* data, int len, const char*) {
+ if (getGlobalFailPointRegistry()->getFailPoint("throwSockExcep")->shouldFail()) {
+ throw SocketException(SocketException::SEND_ERROR, "fail point set");
+ }
+ asio::error_code ec = _write(data, len);
+ if (ec) {
+ throw SocketException(SocketException::SEND_ERROR, asio::system_error(ec).what());
+ }
+}
+
+void ASIOMessagingPort::send(const std::vector<std::pair<char*, int>>& data, const char*) {
+ for (auto&& pair : data) {
+ send(pair.first, pair.second, nullptr);
+ }
+}
+
+bool ASIOMessagingPort::connect(SockAddr& farEnd) {
+ if (_timeout) {
+ _timer.expires_from_now(decltype(_timer)::duration(
+ durationCount<Duration<decltype(_timer)::duration::period>>(*_timeout)));
+ }
+ _remote = HostAndPort(farEnd.getAddr(), farEnd.getPort());
+
+ asio::ip::tcp::resolver resolver(_service);
+ asio::error_code ec = asio::error::would_block;
+
+ if (farEnd.getType() == AF_UNIX) {
+#ifndef _WIN32
+ asio::local::stream_protocol::endpoint endPoint(farEnd.getAddr());
+ _getSocket().async_connect(endPoint, [&ec](const asio::error_code& err) { ec = err; });
+#else
+ uasserted(40135, "Connect called on a Unix socket under windows build.");
+#endif // _WIN32
+ } else {
+ asio::ip::tcp::resolver::query query(_remote.host(), std::to_string(_remote.port()));
+
+ resolver.async_resolve(
+ query,
+ [&ec, this](const asio::error_code& resolveErr, asio::ip::tcp::resolver::iterator i) {
+ if (!resolveErr) {
+ asio::ip::tcp::endpoint tcpEndpoint(*i);
+ _getSocket().async_connect(tcpEndpoint,
+ [&ec](const asio::error_code& err) { ec = err; });
+ } else if (i == asio::ip::tcp::resolver::iterator()) {
+ ec = asio::error::host_unreachable;
+ }
+ });
+ }
+
+ do {
+ _service.run_one();
+ } while (ec == asio::error::would_block);
+
+ if (ec) {
+ if (ec == asio::error::connection_refused) {
+ LOG(_logLevel) << "Failed to connect to " << _remote << ", reason: Connection refused";
+ } else if (ec == asio::error::connection_aborted && _timeout) {
+ LOG(_logLevel) << "Failed to connect to " << _remote << " after " << _timeout->count()
+ << " milliseconds, giving up.";
+ } else {
+ LOG(_logLevel) << "Failed to connect to " << _remote
+ << ", reason: " << asio::system_error(ec).what();
+ }
+ return false;
+ }
+
+ _creationTime = curTimeMicros64();
+ _awaitingHandshake = false;
+ if (farEnd.getType() != AF_UNIX) {
+ _getSocket().set_option(asio::ip::tcp::no_delay(true));
+ }
+
+ return true;
+}
+
+bool ASIOMessagingPort::secure(SSLManagerInterface* ssl, const std::string& remoteHost) {
+#ifdef MONGO_CONFIG_SSL
+ asio::error_code ec;
+ _sslSock.handshake(decltype(_sslSock)::client, ec);
+ if (ec) {
+ return false;
+ }
+
+ auto swPeerSubjectName =
+ getSSLManager()->parseAndValidatePeerCertificate(_sslSock.native_handle(), remoteHost);
+ if (!swPeerSubjectName.isOK()) {
+ throw SocketException(SocketException::CONNECT_ERROR,
+ swPeerSubjectName.getStatus().reason());
+ }
+ setX509SubjectName(swPeerSubjectName.getValue().get_value_or(""));
+
+ _isEncrypted = true;
+ return true;
+#else
+ return false;
+#endif
+}
+
+bool ASIOMessagingPort::isStillConnected() const {
+ return _getSocket().is_open();
+}
+
+uint64_t ASIOMessagingPort::getSockCreationMicroSec() const {
+ return _creationTime;
+}
+
+void ASIOMessagingPort::setLogLevel(logger::LogSeverity logLevel) {
+ _logLevel = logLevel;
+}
+
+void ASIOMessagingPort::clearCounters() {
+ _bytesIn = 0;
+ _bytesOut = 0;
+}
+
+long long ASIOMessagingPort::getBytesIn() const {
+ return _bytesIn;
+}
+
+long long ASIOMessagingPort::getBytesOut() const {
+ return _bytesOut;
+}
+
+void ASIOMessagingPort::setX509SubjectName(const std::string& x509SubjectName) {
+ _x509SubjectName = x509SubjectName;
+}
+
+std::string ASIOMessagingPort::getX509SubjectName() const {
+ return _x509SubjectName;
+}
+
+void ASIOMessagingPort::setConnectionId(const long long connectionId) {
+ _connectionId = connectionId;
+}
+
+long long ASIOMessagingPort::connectionId() const {
+ return _connectionId;
+}
+
+void ASIOMessagingPort::setTag(const AbstractMessagingPort::Tag tag) {
+ _tag = tag;
+}
+
+AbstractMessagingPort::Tag ASIOMessagingPort::getTag() const {
+ return _tag;
+}
+
+void ASIOMessagingPort::_setTimerCallback() {
+ _timer.async_wait([this](const asio::error_code& ec) {
+ if (ec != asio::error::operation_aborted &&
+ (_timer.expires_at() <= decltype(_timer)::clock_type::now())) {
+ _getSocket().cancel();
+ _timer.expires_at(decltype(_timer)::time_point::max());
+ }
+ _setTimerCallback();
+ });
+}
+
+} // namespace mongo
diff --git a/src/mongo/util/net/asio_message_port.h b/src/mongo/util/net/asio_message_port.h
new file mode 100644
index 00000000000..42bfb9e78f5
--- /dev/null
+++ b/src/mongo/util/net/asio_message_port.h
@@ -0,0 +1,159 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include <asio.hpp>
+#include <asio/system_timer.hpp>
+#include <boost/optional.hpp>
+#include <vector>
+
+#include "mongo/config.h"
+#include "mongo/platform/atomic_word.h"
+#include "mongo/util/net/abstract_message_port.h"
+#include "mongo/util/net/asio_ssl_context.h"
+#include "mongo/util/net/message.h"
+#include "mongo/util/net/sockaddr.h"
+#include "mongo/util/time_support.h"
+
+#ifdef MONGO_CONFIG_SSL
+#include <asio/ssl.hpp>
+#endif
+
+namespace mongo {
+
+class ASIOMessagingPort final : public AbstractMessagingPort {
+public:
+ /**
+ * This is the "Ingress Constructor", used by the listener. For this, we already have a file
+ * descriptor and address. This messaging port is already connected, and connect() should not be
+ * called.
+ */
+ ASIOMessagingPort(int fd, SockAddr farEnd);
+
+ /**
+ * This is the "Egress Constructor", used by the dbclient. This messaging port is not connected
+ * to any remote endpoint. In order to do any communications, the connect() method must be
+ * called successfully.
+ */
+ ASIOMessagingPort(Milliseconds timeout, logger::LogSeverity logLevel);
+
+ ~ASIOMessagingPort() override;
+
+ void setTimeout(Milliseconds millis) override;
+
+ void shutdown() override;
+
+ bool call(Message& toSend, Message& response) override;
+
+ bool recv(Message& m) override;
+
+ void reply(Message& received, Message& response, int32_t responseToMsgId) override;
+ void reply(Message& received, Message& response) override;
+
+ void say(Message& toSend, int responseTo = 0) override;
+
+ void send(const char* data, int len, const char*) override;
+ void send(const std::vector<std::pair<char*, int>>& data, const char*) override;
+
+ bool connect(SockAddr& farEnd) override;
+
+ HostAndPort remote() const override;
+
+ unsigned remotePort() const override;
+
+ SockAddr remoteAddr() const override;
+
+ SockAddr localAddr() const override;
+
+ bool isStillConnected() const override;
+
+ uint64_t getSockCreationMicroSec() const override;
+
+ void setLogLevel(logger::LogSeverity logLevel) override;
+
+ void clearCounters() override;
+
+ long long getBytesIn() const override;
+
+ long long getBytesOut() const override;
+
+ void setX509SubjectName(const std::string& x509SubjectName) override;
+
+ std::string getX509SubjectName() const override;
+
+ void setConnectionId(const long long connectionId) override;
+
+ long long connectionId() const override;
+
+ void setTag(const AbstractMessagingPort::Tag tag) override;
+
+ AbstractMessagingPort::Tag getTag() const override;
+
+ bool secure(SSLManagerInterface* ssl, const std::string& remoteHost) override;
+
+ static void closeSockets(AbstractMessagingPort::Tag skipMask = kSkipAllMask);
+
+private:
+ void _setTimerCallback();
+ asio::error_code _read(char* buf, std::size_t size);
+ asio::error_code _write(const char* buf, std::size_t size);
+ const asio::generic::stream_protocol::socket& _getSocket() const;
+ asio::generic::stream_protocol::socket& _getSocket();
+
+ asio::io_service _service;
+
+ AtomicBool _inShutdown;
+
+ asio::system_timer _timer;
+ uint64_t _creationTime;
+ boost::optional<Milliseconds> _timeout;
+
+ HostAndPort _remote;
+
+ bool _isEncrypted;
+ bool _awaitingHandshake;
+ std::string _x509SubjectName;
+
+ long long _bytesIn;
+ long long _bytesOut;
+
+ logger::LogSeverity _logLevel;
+
+ long long _connectionId;
+ AbstractMessagingPort::Tag _tag;
+
+#ifdef MONGO_CONFIG_SSL
+ boost::optional<ASIOSSLContext> _context;
+ asio::ssl::stream<asio::generic::stream_protocol::socket> _sslSock;
+#else
+ asio::generic::stream_protocol::socket _sock;
+#endif
+};
+
+} // namespace mongo
diff --git a/src/mongo/util/net/asio_ssl_context.cpp b/src/mongo/util/net/asio_ssl_context.cpp
new file mode 100644
index 00000000000..eb9f4c2fa68
--- /dev/null
+++ b/src/mongo/util/net/asio_ssl_context.cpp
@@ -0,0 +1,69 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/config.h"
+#include "mongo/util/net/asio_ssl_context.h"
+
+#ifdef MONGO_CONFIG_SSL
+
+#include "mongo/base/init.h"
+#include "mongo/stdx/memory.h"
+
+#include <asio.hpp>
+#include <asio/ssl.hpp>
+
+namespace mongo {
+
+ASIOSSLContext::ASIOSSLContext()
+ : _context(stdx::make_unique<asio::ssl::context>(asio::ssl::context::sslv23)),
+ _mode(static_cast<SSLParams::SSLModes>(getSSLGlobalParams().sslMode.load())) {}
+
+ASIOSSLContext::ASIOSSLContext(ASIOSSLContext&& other) = default;
+
+ASIOSSLContext& ASIOSSLContext::operator=(ASIOSSLContext&& other) = default;
+
+void ASIOSSLContext::init(SSLManagerInterface::ConnectionDirection direction) {
+ if (_mode != SSLParams::SSLMode_disabled) {
+ uassertStatusOK(getSSLManager()->initSSLContext(
+ _context->native_handle(), getSSLGlobalParams(), direction));
+ }
+}
+
+asio::ssl::context& ASIOSSLContext::getContext() {
+ return *_context;
+}
+
+SSLParams::SSLModes ASIOSSLContext::sslMode() const {
+ return _mode;
+}
+
+} // namespace mongo
+
+#endif // MONGO_CONFIG_SSL
diff --git a/src/mongo/util/net/asio_ssl_context.h b/src/mongo/util/net/asio_ssl_context.h
new file mode 100644
index 00000000000..53a68f1e1a3
--- /dev/null
+++ b/src/mongo/util/net/asio_ssl_context.h
@@ -0,0 +1,85 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/config.h"
+#include "mongo/util/net/ssl_manager.h"
+#include "mongo/util/net/ssl_options.h"
+
+#ifdef MONGO_CONFIG_SSL
+namespace asio {
+namespace ssl {
+class context;
+} // namespace ssl
+} // namespace asio
+
+namespace mongo {
+
+class ASIOSSLContext {
+public:
+ MONGO_DISALLOW_COPYING(ASIOSSLContext);
+
+ /**
+ * A class to house the ASIO SSL context as well as the mongo SSL mode. This will be decorated
+ * on to the SSLManager.
+ */
+ ASIOSSLContext();
+
+ ASIOSSLContext(ASIOSSLContext&& other);
+ ASIOSSLContext& operator=(ASIOSSLContext&& other);
+
+ /**
+ * This must be called before calling `getContext()`. This does all of the setup that requires
+ * the SSLManager (which can't be done in construction due to this class being a decoration).
+ */
+ void init(SSLManagerInterface::ConnectionDirection direction);
+
+ /**
+ * A copy of the ASIO SSL context generated upon construction from the mongo::SSLParams.
+ */
+ asio::ssl::context& getContext();
+
+ /**
+ * The SSL operation mode. See enum SSLModes in ssl_options.h
+ */
+ SSLParams::SSLModes sslMode() const;
+
+private:
+ std::unique_ptr<asio::ssl::context> _context;
+ SSLParams::SSLModes _mode;
+};
+} // namespace mongo
+#else
+namespace mongo {
+
+// This is a dummy class for when we build without SSL.
+class ASIOSSLContext {};
+} // namespace mongo
+#endif // MONGO_CONFIG_SSL
diff --git a/src/mongo/util/net/httpclient.cpp b/src/mongo/util/net/httpclient.cpp
index 9262fb41ff2..2cd94c0635f 100644
--- a/src/mongo/util/net/httpclient.cpp
+++ b/src/mongo/util/net/httpclient.cpp
@@ -35,7 +35,6 @@
#include "mongo/config.h"
#include "mongo/util/mongoutils/str.h"
#include "mongo/util/net/message.h"
-#include "mongo/util/net/message_port.h"
#include "mongo/util/net/sock.h"
#include "mongo/util/net/socket_exception.h"
#include "mongo/util/net/ssl_manager.h"
diff --git a/src/mongo/util/net/listen.cpp b/src/mongo/util/net/listen.cpp
index 8538242faee..2cbee476522 100644
--- a/src/mongo/util/net/listen.cpp
+++ b/src/mongo/util/net/listen.cpp
@@ -34,14 +34,18 @@
#include "mongo/util/net/listen.h"
-
+#include "mongo/base/owned_pointer_vector.h"
+#include "mongo/base/status.h"
#include "mongo/config.h"
#include "mongo/db/server_options.h"
-#include "mongo/base/owned_pointer_vector.h"
+#include "mongo/stdx/memory.h"
#include "mongo/util/exit.h"
#include "mongo/util/log.h"
+#include "mongo/util/net/asio_message_port.h"
#include "mongo/util/net/message_port.h"
+#include "mongo/util/net/message_port_startup_param.h"
#include "mongo/util/net/ssl_manager.h"
+#include "mongo/util/net/ssl_options.h"
#include "mongo/util/scopeguard.h"
#ifndef _WIN32
@@ -92,7 +96,7 @@ vector<SockAddr> ipToAddrs(const char* ips, int port, bool useUnixSockets) {
out.push_back(SockAddr("::", port)); // IPv6 all
#ifndef _WIN32
if (useUnixSockets)
- out.push_back(SockAddr(makeUnixSockPath(port).c_str(), port)); // Unix socket
+ out.push_back(SockAddr(makeUnixSockPath(port), port)); // Unix socket
#endif
return out;
}
@@ -114,7 +118,7 @@ vector<SockAddr> ipToAddrs(const char* ips, int port, bool useUnixSockets) {
#ifndef _WIN32
if (sa.isValid() && useUnixSockets &&
(sa.getAddr() == "127.0.0.1" || sa.getAddr() == "0.0.0.0")) // only IPv4
- out.push_back(SockAddr(makeUnixSockPath(port).c_str(), port));
+ out.push_back(SockAddr(makeUnixSockPath(port), port));
#endif
}
return out;
@@ -350,7 +354,7 @@ void Listener::initAndListen() {
pnewSock->secureAccepted(_ssl);
}
#endif
- accepted(pnewSock, myConnectionNumber);
+ _accepted(pnewSock, myConnectionNumber);
}
}
}
@@ -566,7 +570,7 @@ void Listener::initAndListen() {
pnewSock->secureAccepted(_ssl);
}
#endif
- accepted(pnewSock, myConnectionNumber);
+ _accepted(pnewSock, myConnectionNumber);
}
}
#endif
@@ -583,14 +587,15 @@ void Listener::waitUntilListening() const {
}
}
-void Listener::accepted(std::shared_ptr<Socket> psocket, long long connectionId) {
- MessagingPort* port = new MessagingPort(psocket);
+void Listener::_accepted(const std::shared_ptr<Socket>& psocket, long long connectionId) {
+ std::unique_ptr<AbstractMessagingPort> port;
+ if (isMessagePortImplASIO()) {
+ port = stdx::make_unique<ASIOMessagingPort>(psocket->stealSD(), psocket->remoteAddr());
+ } else {
+ port = stdx::make_unique<MessagingPort>(psocket);
+ }
port->setConnectionId(connectionId);
- acceptedMP(port);
-}
-
-void Listener::acceptedMP(MessagingPort* mp) {
- verify(!"You must overwrite one of the accepted methods");
+ accepted(port.release());
}
// ----- ListeningSockets -------
@@ -637,6 +642,10 @@ void Listener::checkTicketNumbers() {
globalTicketHolder.resize(want);
}
+void Listener::closeMessagingPorts(AbstractMessagingPort::Tag skipMask) {
+ ASIOMessagingPort::closeSockets(skipMask);
+ MessagingPort::closeSockets(skipMask);
+}
TicketHolder Listener::globalTicketHolder(DEFAULT_MAX_CONN);
AtomicInt64 Listener::globalConnectionNumber;
diff --git a/src/mongo/util/net/listen.h b/src/mongo/util/net/listen.h
index ec8b2f03ac5..49f35dc1ba1 100644
--- a/src/mongo/util/net/listen.h
+++ b/src/mongo/util/net/listen.h
@@ -38,14 +38,13 @@
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/concurrency/ticketholder.h"
+#include "mongo/util/net/abstract_message_port.h"
#include "mongo/util/net/sock.h"
namespace mongo {
const int DEFAULT_MAX_CONN = 1000000;
-class MessagingPort;
-
class Listener {
MONGO_DISALLOW_COPYING(Listener);
@@ -57,8 +56,7 @@ public:
void initAndListen(); // never returns unless error (start a thread)
/* spawn a thread, etc., then return */
- virtual void accepted(std::shared_ptr<Socket> psocket, long long connectionId);
- virtual void acceptedMP(MessagingPort* mp);
+ virtual void accepted(AbstractMessagingPort* mp) = 0;
const int _port;
@@ -122,6 +120,9 @@ private:
// Boolean that indicates whether this Listener is ready to accept incoming network requests
bool _ready;
+
+ virtual void _accepted(const std::shared_ptr<Socket>& psocket, long long connectionId);
+
#ifdef MONGO_CONFIG_SSL
SSLManagerInterface* _ssl;
#endif
@@ -143,6 +144,13 @@ public:
/** makes sure user input is sane */
static void checkTicketNumbers();
+
+ /**
+ * This will close implementations of AbstractMessagingPort, skipping any that have tags
+ * matching `skipMask`.
+ */
+ static void closeMessagingPorts(
+ AbstractMessagingPort::Tag skipMask = AbstractMessagingPort::kSkipAllMask);
};
class ListeningSockets {
diff --git a/src/mongo/util/net/listen_test.cpp b/src/mongo/util/net/listen_test.cpp
deleted file mode 100644
index 4404d0d70b0..00000000000
--- a/src/mongo/util/net/listen_test.cpp
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Copyright (C) 2016 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects
- * for all of the code used other than as permitted herein. If you modify
- * file(s) with this exception, you may extend this exception to your
- * version of the file(s), but you are not obligated to do so. If you do not
- * wish to do so, delete this exception statement from your version. If you
- * delete this exception statement from all source files in the program,
- * then also delete it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault
-
-#include "mongo/platform/atomic_word.h"
-#include "mongo/stdx/thread.h"
-#include "mongo/unittest/unittest.h"
-#include "mongo/util/log.h"
-#include "mongo/util/exit.h"
-#include "mongo/util/net/listen.h"
-#include "mongo/util/scopeguard.h"
-#include "mongo/util/time_support.h"
-
-namespace mongo {
-
-namespace {
-
-TEST(Listener, ElapsedTimeCheck) {
- const long long kSleepMillis = 5000;
- const long long kEpsilon = 4000;
-
- Listener listener("test_listener", "", 0); // port 0 => any available high port
- listener.setupSockets();
-
- // Start the thread, and add a guard to ensure that we join it on
- // all paths. We call shutdownNoTerminate to set the inShutdown
- // flag so that thread can escape from the listener.
- stdx::thread t([&listener]() { listener.initAndListen(); });
- const auto joint = MakeGuard([&] {
- shutdownNoTerminate();
- t.join();
- });
-
- // Wait in this thread until the listener is active, and then let
- // a little more time elapse to give the timer subsystem a chance
- // to stabilize.
- listener.waitUntilListening();
- sleepmillis(1000);
-
- // Get our start times
- long long listenStart = listener.getMyElapsedTimeMillis();
- long long clockStart = curTimeMillis64();
-
- // Let some time elapse.
- sleepmillis(kSleepMillis);
-
- // Get our current times.
- long long listenDelta = listener.getMyElapsedTimeMillis() - listenStart;
- long long clockDelta = curTimeMillis64() - clockStart;
-
- // Log the times to make it clear in the event of a failure what went wrong.
- log() << "Listener elapsed time: " << listenDelta << std::endl;
- log() << "Clock elapsed time: " << clockDelta << std::endl;
-
- // Fail if we weren't within epsilon.
- ASSERT_APPROX_EQUAL(listenDelta, clockDelta, kEpsilon);
-}
-
-} // namespace
-
-} // namespace mongo
diff --git a/src/mongo/util/net/message.cpp b/src/mongo/util/net/message.cpp
index 515efc9c853..9c91ce0d124 100644
--- a/src/mongo/util/net/message.cpp
+++ b/src/mongo/util/net/message.cpp
@@ -36,29 +36,12 @@
#include <time.h>
#include "mongo/util/net/listen.h"
-#include "mongo/util/net/message_port.h"
namespace mongo {
-void Message::send(MessagingPort& p, const char* context) {
- if (empty()) {
- return;
- }
- if (_buf != 0) {
- p.send(_buf, MsgData::ConstView(_buf).getLen(), context);
- } else {
- p.send(_data, context);
- }
-}
-
+namespace {
AtomicWord<int32_t> NextMsgId;
-
-/*struct MsgStart {
- MsgStart() {
- NextMsgId = (((unsigned) time(0)) << 16) ^ curTimeMillis();
- verify(MsgDataHeaderSize == 16);
- }
-} msgstart;*/
+} // namespace
int32_t nextMessageId() {
return NextMsgId.fetchAndAdd(1);
diff --git a/src/mongo/util/net/message.h b/src/mongo/util/net/message.h
index 3a2cfe95922..2c28aabe5cf 100644
--- a/src/mongo/util/net/message.h
+++ b/src/mongo/util/net/message.h
@@ -51,7 +51,6 @@ namespace mongo {
const size_t MaxMessageSizeBytes = 48 * 1000 * 1000;
class Message;
-class MessagingPort;
enum NetworkOp : int32_t {
opInvalid = 0,
@@ -403,6 +402,8 @@ class Message {
MONGO_DISALLOW_COPYING(Message);
public:
+ using MsgVec = std::vector<std::pair<char*, int>>;
+
// we assume here that a vector with initial size 0 does no allocation (0 is the default, but
// wanted to make it explicit).
Message() = default;
@@ -562,7 +563,9 @@ public:
return _buf;
}
- void send(MessagingPort& p, const char* context);
+ const MsgVec& dataBuffers() const {
+ return _data;
+ }
std::string toString() const;
@@ -575,7 +578,6 @@ private:
char* _buf{nullptr};
// byte buffer(s) - the first must contain at least a full MsgData unless using _buf for storage
// instead
- typedef std::vector<std::pair<char*, int>> MsgVec;
MsgVec _data{};
bool _freeIt{false};
};
diff --git a/src/mongo/util/net/message_port.cpp b/src/mongo/util/net/message_port.cpp
index 43ff4074702..225dfd06940 100644
--- a/src/mongo/util/net/message_port.cpp
+++ b/src/mongo/util/net/message_port.cpp
@@ -61,14 +61,6 @@ namespace mongo {
using std::shared_ptr;
using std::string;
-// if you want trace output:
-#define mmm(x)
-
-void AbstractMessagingPort::setConnectionId(long long connectionId) {
- verify(_connectionId == 0);
- _connectionId = connectionId;
-}
-
/* messagingport -------------------------------------------------------------- */
class Ports {
@@ -77,10 +69,10 @@ class Ports {
public:
Ports() : ports() {}
- void closeAll(unsigned skip_mask) {
+ void closeAll(AbstractMessagingPort::Tag skip_mask) {
stdx::lock_guard<stdx::mutex> bl(m);
for (std::set<MessagingPort*>::iterator i = ports.begin(); i != ports.end(); i++) {
- if ((*i)->tag & skip_mask) {
+ if ((*i)->getTag() & skip_mask) {
LOG(3) << "Skip closing connection # " << (*i)->connectionId();
continue;
}
@@ -102,8 +94,8 @@ public:
// are being destructed during termination.
Ports& ports = *(new Ports());
-void MessagingPort::closeAllSockets(unsigned mask) {
- ports.closeAll(mask);
+void MessagingPort::closeSockets(AbstractMessagingPort::Tag skipMask) {
+ ports.closeAll(skipMask);
}
MessagingPort::MessagingPort(int fd, const SockAddr& remote)
@@ -112,18 +104,20 @@ MessagingPort::MessagingPort(int fd, const SockAddr& remote)
MessagingPort::MessagingPort(double timeout, logger::LogSeverity ll)
: MessagingPort(std::make_shared<Socket>(timeout, ll)) {}
-MessagingPort::MessagingPort(std::shared_ptr<Socket> sock) : psock(std::move(sock)) {
- SockAddr sa = psock->remoteAddr();
+MessagingPort::MessagingPort(std::shared_ptr<Socket> sock)
+ : _x509SubjectName(), _connectionId(), _tag(), _psock(std::move(sock)) {
+ SockAddr sa = _psock->remoteAddr();
_remoteParsed = HostAndPort(sa.getAddr(), sa.getPort());
ports.insert(this);
}
-void MessagingPort::setSocketTimeout(double timeout) {
- psock->setTimeout(timeout);
+void MessagingPort::setTimeout(Milliseconds millis) {
+ double timeout = double(millis.count()) / 1000;
+ _psock->setTimeout(timeout);
}
void MessagingPort::shutdown() {
- psock->close();
+ _psock->close();
}
MessagingPort::~MessagingPort() {
@@ -136,10 +130,9 @@ bool MessagingPort::recv(Message& m) {
#ifdef MONGO_CONFIG_SSL
again:
#endif
- // mmm( log() << "* recv() sock:" << this->sock << endl; )
MSGHEADER::Value header;
int headerLen = sizeof(MSGHEADER::Value);
- psock->recv((char*)&header, headerLen);
+ _psock->recv((char*)&header, headerLen);
int len = header.constView().getMessageLength();
if (len == 542393671) {
@@ -147,7 +140,7 @@ bool MessagingPort::recv(Message& m) {
string msg =
"It looks like you are trying to access MongoDB over HTTP on the native driver "
"port.\n";
- LOG(psock->getLogLevel()) << msg;
+ LOG(_psock->getLogLevel()) << msg;
std::stringstream ss;
ss << "HTTP/1.0 200 OK\r\nConnection: close\r\nContent-Type: "
"text/plain\r\nContent-Length: " << msg.size() << "\r\n\r\n" << msg;
@@ -156,7 +149,7 @@ bool MessagingPort::recv(Message& m) {
return false;
}
// If responseTo is not 0 or -1 for first packet assume SSL
- else if (psock->isAwaitingHandshake()) {
+ else if (_psock->isAwaitingHandshake()) {
#ifndef MONGO_CONFIG_SSL
if (header.constView().getResponseToMsgId() != 0 &&
header.constView().getResponseToMsgId() != -1) {
@@ -170,8 +163,8 @@ bool MessagingPort::recv(Message& m) {
"SSL handshake received but server is started without SSL support",
sslGlobalParams.sslMode.load() != SSLParams::SSLMode_disabled);
setX509SubjectName(
- psock->doSSLHandshake(reinterpret_cast<const char*>(&header), sizeof(header)));
- psock->setHandshakeReceived();
+ _psock->doSSLHandshake(reinterpret_cast<const char*>(&header), sizeof(header)));
+ _psock->setHandshakeReceived();
goto again;
}
uassert(17189,
@@ -186,7 +179,7 @@ bool MessagingPort::recv(Message& m) {
return false;
}
- psock->setHandshakeReceived();
+ _psock->setHandshakeReceived();
int z = (len + 1023) & 0xfffffc00;
verify(z >= len);
MsgData::View md = reinterpret_cast<char*>(mongoMalloc(z));
@@ -196,14 +189,14 @@ bool MessagingPort::recv(Message& m) {
memcpy(md.view2ptr(), &header, headerLen);
int left = len - headerLen;
- psock->recv(md.data(), left);
+ _psock->recv(md.data(), left);
guard.Dismiss();
m.setData(md.view2ptr(), true);
return true;
} catch (const SocketException& e) {
- logger::LogSeverity severity = psock->getLogLevel();
+ logger::LogSeverity severity = _psock->getLogLevel();
if (!e.shouldPrint())
severity = severity.lessSevere();
LOG(severity) << "SocketException: remote: " << remote() << " error: " << e;
@@ -221,38 +214,27 @@ void MessagingPort::reply(Message& received, Message& response, int32_t response
}
bool MessagingPort::call(Message& toSend, Message& response) {
- mmm(log() << "*call()" << endl;) say(toSend);
- return recv(toSend, response);
-}
-
-bool MessagingPort::recv(const Message& toSend, Message& response) {
- while (1) {
- bool ok = recv(response);
- if (!ok) {
- mmm(log() << "recv not ok" << endl;) return false;
+ say(toSend);
+ bool success = recv(response);
+ if (success) {
+ if (response.header().getResponseToMsgId() != toSend.header().getId()) {
+ response.reset();
+ uasserted(40134, "Response ID did not match the sent message ID.");
}
- // log() << "got response: " << response.data->responseTo << endl;
- if (response.header().getResponseToMsgId() == toSend.header().getId())
- break;
- error() << "MessagingPort::call() wrong id got:" << std::hex
- << response.header().getResponseToMsgId() << " expect:" << toSend.header().getId()
- << '\n' << std::dec << " toSend op: " << (unsigned)toSend.operation() << '\n'
- << " response msgid:" << (unsigned)response.header().getId() << '\n'
- << " response len: " << (unsigned)response.header().getLen() << '\n'
- << " response op: " << static_cast<int>(response.operation()) << '\n'
- << " remote: " << psock->remoteString();
- verify(false);
- response.reset();
}
- mmm(log() << "*call() end" << endl;) return true;
+ return success;
}
void MessagingPort::say(Message& toSend, int responseTo) {
verify(!toSend.empty());
- mmm(log() << "* say() thr:" << GetCurrentThreadId() << endl;)
- toSend.header().setId(nextMessageId());
+ toSend.header().setId(nextMessageId());
toSend.header().setResponseToMsgId(responseTo);
- toSend.send(*this, "say");
+ auto buf = toSend.buf();
+ if (buf) {
+ send(buf, MsgData::ConstView(buf).getLen(), "say");
+ } else {
+ send(toSend.dataBuffers(), "say");
+ }
}
HostAndPort MessagingPort::remote() const {
@@ -260,11 +242,35 @@ HostAndPort MessagingPort::remote() const {
}
SockAddr MessagingPort::remoteAddr() const {
- return psock->remoteAddr();
+ return _psock->remoteAddr();
}
SockAddr MessagingPort::localAddr() const {
- return psock->localAddr();
+ return _psock->localAddr();
+}
+
+void MessagingPort::setX509SubjectName(const std::string& x509SubjectName) {
+ _x509SubjectName = x509SubjectName;
+}
+
+std::string MessagingPort::getX509SubjectName() const {
+ return _x509SubjectName;
+}
+
+void MessagingPort::setConnectionId(const long long connectionId) {
+ _connectionId = connectionId;
+}
+
+long long MessagingPort::connectionId() const {
+ return _connectionId;
+}
+
+void MessagingPort::setTag(const AbstractMessagingPort::Tag tag) {
+ _tag = tag;
+}
+
+AbstractMessagingPort::Tag MessagingPort::getTag() const {
+ return _tag;
}
} // namespace mongo
diff --git a/src/mongo/util/net/message_port.h b/src/mongo/util/net/message_port.h
index c1922b7202a..18931e20750 100644
--- a/src/mongo/util/net/message_port.h
+++ b/src/mongo/util/net/message_port.h
@@ -40,7 +40,7 @@ namespace mongo {
class MessagingPort;
-class MessagingPort : public AbstractMessagingPort {
+class MessagingPort final : public AbstractMessagingPort {
public:
MessagingPort(int fd, const SockAddr& remote);
@@ -51,52 +51,67 @@ public:
MessagingPort(std::shared_ptr<Socket> socket);
- virtual ~MessagingPort();
+ ~MessagingPort() override;
- void setSocketTimeout(double timeout);
+ void setTimeout(Milliseconds millis) override;
- void shutdown();
+ void shutdown() override;
/* it's assumed if you reuse a message object, that it doesn't cross MessagingPort's.
also, the Message data will go out of scope on the subsequent recv call.
*/
- bool recv(Message& m);
- void reply(Message& received, Message& response, int32_t responseToMsgId);
- void reply(Message& received, Message& response);
- bool call(Message& toSend, Message& response);
+ bool recv(Message& m) override;
+ void reply(Message& received, Message& response, int32_t responseToMsgId) override;
+ void reply(Message& received, Message& response) override;
+ bool call(Message& toSend, Message& response) override;
- void say(Message& toSend, int responseTo = 0);
+ void say(Message& toSend, int responseTo = 0) override;
- /**
- * this is used for doing 'async' queries
- * instead of doing call( to , from )
- * you would do
- * say( to )
- * recv( from )
- * Note: if you fail to call recv and someone else uses this port,
- * horrible things will happen
- */
- bool recv(const Message& sent, Message& response);
+ unsigned remotePort() const override {
+ return _psock->remotePort();
+ }
+ virtual HostAndPort remote() const override;
+ virtual SockAddr remoteAddr() const override;
+ virtual SockAddr localAddr() const override;
- unsigned remotePort() const {
- return psock->remotePort();
+ void send(const char* data, int len, const char* context) override {
+ _psock->send(data, len, context);
+ }
+ void send(const std::vector<std::pair<char*, int>>& data, const char* context) override {
+ _psock->send(data, context);
+ }
+ bool connect(SockAddr& farEnd) override {
+ return _psock->connect(farEnd);
}
- virtual HostAndPort remote() const;
- virtual SockAddr remoteAddr() const;
- virtual SockAddr localAddr() const;
- std::shared_ptr<Socket> psock;
+ void setLogLevel(logger::LogSeverity ll) override {
+ _psock->setLogLevel(ll);
+ }
- void send(const char* data, int len, const char* context) {
- psock->send(data, len, context);
+ void clearCounters() override {
+ _psock->clearCounters();
}
- void send(const std::vector<std::pair<char*, int>>& data, const char* context) {
- psock->send(data, context);
+
+ long long getBytesIn() const override {
+ return _psock->getBytesIn();
}
- bool connect(SockAddr& farEnd) {
- return psock->connect(farEnd);
+
+ long long getBytesOut() const override {
+ return _psock->getBytesOut();
}
-#ifdef MONGO_CONFIG_SSL
+
+ void setX509SubjectName(const std::string& x509SubjectName) override;
+
+ std::string getX509SubjectName() const override;
+
+ void setConnectionId(const long long connectionId) override;
+
+ long long connectionId() const override;
+
+ void setTag(const AbstractMessagingPort::Tag tag) override;
+
+ AbstractMessagingPort::Tag getTag() const override;
+
/**
* Initiates the TLS/SSL handshake on this MessagingPort.
* When this function returns, further communication on this
@@ -104,25 +119,33 @@ public:
* ssl - Pointer to the global SSLManager.
* remoteHost - The hostname of the remote server.
*/
- bool secure(SSLManagerInterface* ssl, const std::string& remoteHost) {
- return psock->secure(ssl, remoteHost);
- }
+ bool secure(SSLManagerInterface* ssl, const std::string& remoteHost) override {
+#ifdef MONGO_CONFIG_SSL
+ return _psock->secure(ssl, remoteHost);
+#else
+ return false;
#endif
+ }
- bool isStillConnected() {
- return psock->isStillConnected();
+ bool isStillConnected() const override {
+ return _psock->isStillConnected();
}
- uint64_t getSockCreationMicroSec() const {
- return psock->getSockCreationMicroSec();
+ uint64_t getSockCreationMicroSec() const override {
+ return _psock->getSockCreationMicroSec();
}
private:
// this is the parsed version of remote
HostAndPort _remoteParsed;
+ std::string _x509SubjectName;
+ long long _connectionId;
+ AbstractMessagingPort::Tag _tag;
+ std::shared_ptr<Socket> _psock;
+
public:
- static void closeAllSockets(unsigned tagMask = 0xffffffff);
+ static void closeSockets(AbstractMessagingPort::Tag skipMask = kSkipAllMask);
};
diff --git a/src/mongo/util/net/message_port_mock.cpp b/src/mongo/util/net/message_port_mock.cpp
index ae6deb90c8e..6869723129f 100644
--- a/src/mongo/util/net/message_port_mock.cpp
+++ b/src/mongo/util/net/message_port_mock.cpp
@@ -39,10 +39,29 @@ using std::string;
MessagingPortMock::MessagingPortMock() : AbstractMessagingPort() {}
MessagingPortMock::~MessagingPortMock() {}
+void MessagingPortMock::setTimeout(Milliseconds millis) {}
-void MessagingPortMock::reply(Message& received, Message& response) {}
+void MessagingPortMock::shutdown() {}
+
+bool MessagingPortMock::call(Message& toSend, Message& response) {
+ return true;
+}
+
+bool MessagingPortMock::recv(Message& m) {
+ return true;
+}
void MessagingPortMock::reply(Message& received, Message& response, int32_t responseToMsgId) {}
+void MessagingPortMock::reply(Message& received, Message& response) {}
+
+void MessagingPortMock::say(Message& toSend, int responseTo) {}
+
+bool MessagingPortMock::connect(SockAddr& farEnd) {
+ return true;
+}
+
+void MessagingPortMock::send(const char* data, int len, const char* context) {}
+void MessagingPortMock::send(const std::vector<std::pair<char*, int>>& data, const char* context) {}
HostAndPort MessagingPortMock::remote() const {
return _remote;
@@ -60,6 +79,49 @@ SockAddr MessagingPortMock::localAddr() const {
return SockAddr{};
}
+bool MessagingPortMock::isStillConnected() const {
+ return true;
+}
+
+void MessagingPortMock::setLogLevel(logger::LogSeverity logLevel) {}
+
+void MessagingPortMock::clearCounters() {}
+
+long long MessagingPortMock::getBytesIn() const {
+ return 0;
+}
+
+long long MessagingPortMock::getBytesOut() const {
+ return 0;
+}
+
+
+uint64_t MessagingPortMock::getSockCreationMicroSec() const {
+ return 0;
+}
+
+void MessagingPortMock::setX509SubjectName(const std::string& x509SubjectName) {}
+
+std::string MessagingPortMock::getX509SubjectName() const {
+ return "mock";
+}
+
+void MessagingPortMock::setConnectionId(const long long connectionId) {}
+
+long long MessagingPortMock::connectionId() const {
+ return 42;
+}
+
+void MessagingPortMock::setTag(const AbstractMessagingPort::Tag tag) {}
+
+AbstractMessagingPort::Tag MessagingPortMock::getTag() const {
+ return 0;
+}
+
+bool MessagingPortMock::secure(SSLManagerInterface* ssl, const std::string& remoteHost) {
+ return true;
+}
+
void MessagingPortMock::setRemote(const HostAndPort& remote) {
_remote = remote;
}
diff --git a/src/mongo/util/net/message_port_mock.h b/src/mongo/util/net/message_port_mock.h
index 45116dbd008..61ab5be8958 100644
--- a/src/mongo/util/net/message_port_mock.h
+++ b/src/mongo/util/net/message_port_mock.h
@@ -30,27 +30,67 @@
#include <vector>
-#include "mongo/config.h"
#include "mongo/util/net/abstract_message_port.h"
#include "mongo/util/net/message.h"
#include "mongo/util/net/sockaddr.h"
namespace mongo {
-class MessagingPortMock : public AbstractMessagingPort {
+class MessagingPortMock final : public AbstractMessagingPort {
MONGO_DISALLOW_COPYING(MessagingPortMock);
public:
MessagingPortMock();
- virtual ~MessagingPortMock();
+ ~MessagingPortMock();
- virtual void reply(Message& received, Message& response, int32_t responseToMsgId);
- virtual void reply(Message& received, Message& response);
+ void setTimeout(Milliseconds millis) override;
- virtual HostAndPort remote() const;
- virtual unsigned remotePort() const;
- virtual SockAddr remoteAddr() const;
- virtual SockAddr localAddr() const;
+ void shutdown() override;
+
+ bool call(Message& toSend, Message& response) override;
+
+ bool recv(Message& m) override;
+
+ void reply(Message& received, Message& response, int32_t responseToMsgId) override;
+ void reply(Message& received, Message& response) override;
+
+ void say(Message& toSend, int responseTo = 0) override;
+
+ bool connect(SockAddr& farEnd) override;
+
+ void send(const char* data, int len, const char* context) override;
+ void send(const std::vector<std::pair<char*, int>>& data, const char* context) override;
+
+ HostAndPort remote() const override;
+ unsigned remotePort() const override;
+ SockAddr remoteAddr() const override;
+ SockAddr localAddr() const override;
+
+ bool isStillConnected() const override;
+
+ void setLogLevel(logger::LogSeverity logLevel) override;
+
+ void clearCounters() override;
+
+ long long getBytesIn() const override;
+
+ long long getBytesOut() const override;
+
+ uint64_t getSockCreationMicroSec() const override;
+
+ void setX509SubjectName(const std::string& x509SubjectName) override;
+
+ std::string getX509SubjectName() const override;
+
+ void setConnectionId(const long long connectionId) override;
+
+ long long connectionId() const override;
+
+ void setTag(const AbstractMessagingPort::Tag tag) override;
+
+ AbstractMessagingPort::Tag getTag() const override;
+
+ bool secure(SSLManagerInterface* ssl, const std::string& remoteHost) override;
void setRemote(const HostAndPort& remote);
diff --git a/src/mongo/util/net/message_port_startup_param.cpp b/src/mongo/util/net/message_port_startup_param.cpp
new file mode 100644
index 00000000000..704de92b012
--- /dev/null
+++ b/src/mongo/util/net/message_port_startup_param.cpp
@@ -0,0 +1,56 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/base/init.h"
+#include "mongo/db/server_parameters.h"
+
+namespace mongo {
+
+namespace {
+
+const char kMessagePortImplASIO[] = "ASIO";
+const char kMessagePortImplLegacy[] = "legacy";
+
+MONGO_EXPORT_STARTUP_SERVER_PARAMETER(messagePortImpl, std::string, kMessagePortImplLegacy);
+
+MONGO_INITIALIZER(messagePortImpl)(InitializerContext*) {
+ if ((messagePortImpl != kMessagePortImplASIO) && (messagePortImpl != kMessagePortImplLegacy)) {
+ return Status(ErrorCodes::BadValue, "unsupported message port option: " + messagePortImpl);
+ }
+ return Status::OK();
+}
+
+} // namespace
+
+bool isMessagePortImplASIO() {
+ return messagePortImpl == kMessagePortImplASIO;
+}
+
+} // namespace mongo
diff --git a/src/mongo/util/net/message_port_startup_param.h b/src/mongo/util/net/message_port_startup_param.h
new file mode 100644
index 00000000000..8a55ab8fbf3
--- /dev/null
+++ b/src/mongo/util/net/message_port_startup_param.h
@@ -0,0 +1,35 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+namespace mongo {
+
+bool isMessagePortImplASIO();
+
+} // namespace mongo
diff --git a/src/mongo/util/net/message_server_port.cpp b/src/mongo/util/net/message_server_port.cpp
index 1b9b9f90702..c64c16f3073 100644
--- a/src/mongo/util/net/message_server_port.cpp
+++ b/src/mongo/util/net/message_server_port.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/server_options.h"
#include "mongo/db/stats/counters.h"
#include "mongo/stdx/functional.h"
+#include "mongo/stdx/memory.h"
#include "mongo/stdx/thread.h"
#include "mongo/util/concurrency/synchronization.h"
#include "mongo/util/concurrency/thread_name.h"
@@ -48,9 +49,9 @@
#include "mongo/util/exit.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
+#include "mongo/util/net/abstract_message_port.h"
#include "mongo/util/net/listen.h"
#include "mongo/util/net/message.h"
-#include "mongo/util/net/message_port.h"
#include "mongo/util/net/message_server.h"
#include "mongo/util/net/socket_exception.h"
#include "mongo/util/net/ssl_manager.h"
@@ -72,24 +73,7 @@ using std::endl;
namespace {
-class MessagingPortWithHandler : public MessagingPort {
- MONGO_DISALLOW_COPYING(MessagingPortWithHandler);
-
-public:
- MessagingPortWithHandler(const std::shared_ptr<Socket>& socket,
- const std::shared_ptr<MessageHandler> handler,
- long long connectionId)
- : MessagingPort(socket), _handler(handler) {
- setConnectionId(connectionId);
- }
-
- const std::shared_ptr<MessageHandler> getHandler() const {
- return _handler;
- }
-
-private:
- const std::shared_ptr<MessageHandler> _handler;
-};
+using MessagingPortWithHandler = std::pair<AbstractMessagingPort*, std::shared_ptr<MessageHandler>>;
} // namespace
@@ -104,10 +88,9 @@ public:
PortMessageServer(const MessageServer::Options& opts, std::shared_ptr<MessageHandler> handler)
: Listener("", opts.ipList, opts.port), _handler(std::move(handler)) {}
- virtual void accepted(std::shared_ptr<Socket> psocket, long long connectionId) {
+ virtual void accepted(AbstractMessagingPort* mp) {
ScopeGuard sleepAfterClosingPort = MakeGuard(sleepmillis, 2);
- std::unique_ptr<MessagingPortWithHandler> portWithHandler(
- new MessagingPortWithHandler(psocket, _handler, connectionId));
+ auto portWithHandler = stdx::make_unique<MessagingPortWithHandler>(mp, _handler);
if (!Listener::globalTicketHolder.tryAcquire()) {
log() << "connection refused because too many open connections: "
@@ -201,34 +184,34 @@ private:
invariant(arg);
unique_ptr<MessagingPortWithHandler> portWithHandler(
static_cast<MessagingPortWithHandler*>(arg));
- const std::shared_ptr<MessageHandler> handler = portWithHandler->getHandler();
+ auto mp = portWithHandler->first;
+ auto handler = portWithHandler->second;
- setThreadName(std::string(str::stream() << "conn" << portWithHandler->connectionId()));
- portWithHandler->psock->setLogLevel(logger::LogSeverity::Debug(1));
+ setThreadName(std::string(str::stream() << "conn" << mp->connectionId()));
+ mp->setLogLevel(logger::LogSeverity::Debug(1));
Message m;
int64_t counter = 0;
try {
- handler->connected(portWithHandler.get());
+ handler->connected(mp);
ON_BLOCK_EXIT([handler]() { handler->close(); });
while (!inShutdown()) {
m.reset();
- portWithHandler->psock->clearCounters();
+ mp->clearCounters();
- if (!portWithHandler->recv(m)) {
+ if (!mp->recv(m)) {
if (!serverGlobalParams.quiet) {
int conns = Listener::globalTicketHolder.used() - 1;
const char* word = (conns == 1 ? " connection" : " connections");
- log() << "end connection " << portWithHandler->psock->remoteString() << " ("
- << conns << word << " now open)" << endl;
+ log() << "end connection " << mp->remote().toString() << " (" << conns
+ << word << " now open)" << endl;
}
break;
}
- handler->process(m, portWithHandler.get());
- networkCounter.hit(portWithHandler->psock->getBytesIn(),
- portWithHandler->psock->getBytesOut());
+ handler->process(m, mp);
+ networkCounter.hit(mp->getBytesIn(), mp->getBytesOut());
// Occasionally we want to see if we're using too much memory.
if ((counter++ & 0xf) == 0) {
@@ -247,7 +230,7 @@ private:
error() << "Uncaught std::exception: " << e.what() << ", terminating" << endl;
quickExit(EXIT_UNCAUGHT);
}
- portWithHandler->shutdown();
+ mp->shutdown();
return NULL;
}
diff --git a/src/mongo/util/net/miniwebserver.cpp b/src/mongo/util/net/miniwebserver.cpp
index 736bae25757..e49a6030b44 100644
--- a/src/mongo/util/net/miniwebserver.cpp
+++ b/src/mongo/util/net/miniwebserver.cpp
@@ -36,6 +36,7 @@
#include <pcrecpp.h>
#include "mongo/config.h"
+#include "mongo/util/assert_util.h"
#include "mongo/util/hex.h"
#include "mongo/util/log.h"
#include "mongo/util/net/socket_exception.h"
@@ -129,7 +130,7 @@ bool MiniWebServer::fullReceive(const char* buf) {
return false;
}
-void MiniWebServer::accepted(std::shared_ptr<Socket> psock, long long connectionId) {
+void MiniWebServer::_accepted(const std::shared_ptr<Socket>& psock, long long connectionId) {
char buf[4096];
int len = 0;
try {
@@ -202,6 +203,10 @@ void MiniWebServer::accepted(std::shared_ptr<Socket> psock, long long connection
}
}
+void MiniWebServer::accepted(AbstractMessagingPort* mp) {
+ MONGO_UNREACHABLE;
+}
+
string MiniWebServer::getHeader(const char* req, const std::string& wanted) {
const char* headers = strchr(req, '\n');
if (!headers)
diff --git a/src/mongo/util/net/miniwebserver.h b/src/mongo/util/net/miniwebserver.h
index b9d8c77b286..fce5c9fc1cf 100644
--- a/src/mongo/util/net/miniwebserver.h
+++ b/src/mongo/util/net/miniwebserver.h
@@ -35,7 +35,6 @@
#include "mongo/db/jsobj.h"
#include "mongo/util/net/listen.h"
#include "mongo/util/net/message.h"
-#include "mongo/util/net/message_port.h"
namespace mongo {
@@ -67,8 +66,11 @@ public:
return urlDecode(s.c_str());
}
+ // This is not currently used for the MiniWebServer. See SERVER-24200
+ void accepted(AbstractMessagingPort* mp) override;
+
private:
- void accepted(std::shared_ptr<Socket> psocket, long long connectionId);
+ void _accepted(const std::shared_ptr<Socket>& psocket, long long connectionId) override;
static bool fullReceive(const char* buf);
};
diff --git a/src/mongo/util/net/sock.h b/src/mongo/util/net/sock.h
index 1ef0381c1c8..d607eb8051c 100644
--- a/src/mongo/util/net/sock.h
+++ b/src/mongo/util/net/sock.h
@@ -178,6 +178,17 @@ public:
return _fd;
}
+ /**
+ * This sets the Sock's socket descriptor to be invalid and returns the old descriptor. This
+ * only gets called in listen.cpp in Listener::_accepted(). This gets called on the listener
+ * thread immediately after the thread creates the Sock, so it doesn't need to be thread-safe.
+ */
+ int stealSD() {
+ int tmp = _fd;
+ _fd = -1;
+ return tmp;
+ }
+
void setTimeout(double secs);
bool isStillConnected();
diff --git a/src/mongo/util/net/sockaddr.h b/src/mongo/util/net/sockaddr.h
index ed08dcf50f9..046167e4196 100644
--- a/src/mongo/util/net/sockaddr.h
+++ b/src/mongo/util/net/sockaddr.h
@@ -67,6 +67,7 @@ struct SockAddr {
SockAddr(
const char* ip,
int port); /* EndPoint (remote) side, or if you want to specify which interface locally */
+ SockAddr(const std::string& ip, int port) : SockAddr(ip.c_str(), port) {}
template <typename T>
T& as() {
diff --git a/src/mongo/util/net/ssl_manager.cpp b/src/mongo/util/net/ssl_manager.cpp
index f61789a3ac7..967319c81c7 100644
--- a/src/mongo/util/net/ssl_manager.cpp
+++ b/src/mongo/util/net/ssl_manager.cpp
@@ -581,10 +581,6 @@ void SSLManager::SSL_free(SSLConnection* conn) {
Status SSLManager::initSSLContext(SSL_CTX* context,
const SSLParams& params,
ConnectionDirection direction) {
- if (direction == ConnectionDirection::kIncoming) {
- fassert(34364, context == _serverContext.get());
- }
-
// SSL_OP_ALL - Activate all bug workaround options, to support buggy client SSL's.
// SSL_OP_NO_SSLv2 - Disable SSL v2 support
// SSL_OP_NO_SSLv3 - Disable SSL v3 support
diff --git a/src/mongo/util/net/ssl_manager.h b/src/mongo/util/net/ssl_manager.h
index 2b965b46cc6..ecc393070a8 100644
--- a/src/mongo/util/net/ssl_manager.h
+++ b/src/mongo/util/net/ssl_manager.h
@@ -38,6 +38,7 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/base/string_data.h"
#include "mongo/bson/bsonobj.h"
+#include "mongo/util/decorable.h"
#include "mongo/util/net/sock.h"
#include "mongo/util/time_support.h"
@@ -86,7 +87,7 @@ struct SSLConfiguration {
bool hasCA = false;
};
-class SSLManagerInterface {
+class SSLManagerInterface : public Decorable<SSLManagerInterface> {
public:
static std::unique_ptr<SSLManagerInterface> create(const SSLParams& params, bool isServer);