summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorSamantha Ritter <samantha.ritter@10gen.com>2016-05-31 14:05:17 -0400
committerJason Carey <jcarey@argv.me>2016-07-12 18:38:37 -0400
commitc263ce1f95586f8652058e6202015a77f9becc49 (patch)
treed623fb9da9fd5da3cc4e20cac0653f1fa4af00eb /src/mongo
parentdead3cf8b4b3cb5528ad1abb9eeb722b395e3632 (diff)
downloadmongo-c263ce1f95586f8652058e6202015a77f9becc49.tar.gz
SERVER-24162 Integrate TransportLayer
Expand the transport layer as needed to replace uses of abstract message port for ingress networking.
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/SConscript6
-rw-r--r--src/mongo/base/error_codes.err1
-rw-r--r--src/mongo/client/SConscript2
-rw-r--r--src/mongo/client/scoped_db_conn_test.cpp71
-rw-r--r--src/mongo/db/SConscript4
-rw-r--r--src/mongo/db/auth/sasl_commands.cpp4
-rw-r--r--src/mongo/db/client.cpp19
-rw-r--r--src/mongo/db/client.h18
-rw-r--r--src/mongo/db/client_basic.cpp4
-rw-r--r--src/mongo/db/client_basic.h18
-rw-r--r--src/mongo/db/commands/SConscript1
-rw-r--r--src/mongo/db/commands/authentication_commands.cpp7
-rw-r--r--src/mongo/db/commands/server_status.cpp9
-rw-r--r--src/mongo/db/db.cpp109
-rw-r--r--src/mongo/db/dbmessage.cpp31
-rw-r--r--src/mongo/db/dbmessage.h17
-rw-r--r--src/mongo/db/repl/SConscript2
-rw-r--r--src/mongo/db/repl/repl_set_request_votes.cpp19
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state.h4
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp4
-rw-r--r--src/mongo/db/repl/replication_info.cpp7
-rw-r--r--src/mongo/db/repl/replset_commands.cpp18
-rw-r--r--src/mongo/db/s/collection_metadata_test.cpp10
-rw-r--r--src/mongo/db/s/metadata_loader_test.cpp2
-rw-r--r--src/mongo/db/service_context.cpp18
-rw-r--r--src/mongo/db/service_context.h35
-rw-r--r--src/mongo/db/service_entry_point_mongod.cpp153
-rw-r--r--src/mongo/db/service_entry_point_mongod.h63
-rw-r--r--src/mongo/s/SConscript1
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp3
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_drop_coll_test.cpp2
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_remove_shard_test.cpp4
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_shard_collection_test.cpp2
-rw-r--r--src/mongo/s/chunk_manager_tests.cpp2
-rw-r--r--src/mongo/s/commands/request.cpp9
-rw-r--r--src/mongo/s/commands/request.h12
-rw-r--r--src/mongo/s/commands/strategy.cpp16
-rw-r--r--src/mongo/s/query/async_results_merger_test.cpp2
-rw-r--r--src/mongo/s/server.cpp82
-rw-r--r--src/mongo/s/service_entry_point_mongos.cpp141
-rw-r--r--src/mongo/s/service_entry_point_mongos.h63
-rw-r--r--src/mongo/s/sharding_test_fixture.cpp19
-rw-r--r--src/mongo/s/sharding_test_fixture.h11
-rw-r--r--src/mongo/s/write_ops/batch_write_exec_test.cpp2
-rw-r--r--src/mongo/tools/bridge.cpp6
-rw-r--r--src/mongo/transport/SConscript60
-rw-r--r--src/mongo/transport/service_entry_point_mock.cpp8
-rw-r--r--src/mongo/transport/service_entry_point_test_suite.cpp34
-rw-r--r--src/mongo/transport/service_entry_point_test_suite.h19
-rw-r--r--src/mongo/transport/service_entry_point_utils.cpp150
-rw-r--r--src/mongo/transport/service_entry_point_utils.h (renamed from src/mongo/transport/transport_layer.cpp)16
-rw-r--r--src/mongo/transport/session.cpp24
-rw-r--r--src/mongo/transport/session.h68
-rw-r--r--src/mongo/transport/session_id.h39
-rw-r--r--src/mongo/transport/ticket.cpp17
-rw-r--r--src/mongo/transport/ticket.h41
-rw-r--r--src/mongo/transport/ticket_impl.h5
-rw-r--r--src/mongo/transport/transport_layer.h79
-rw-r--r--src/mongo/transport/transport_layer_legacy.cpp307
-rw-r--r--src/mongo/transport/transport_layer_legacy.h174
-rw-r--r--src/mongo/transport/transport_layer_manager.cpp129
-rw-r--r--src/mongo/transport/transport_layer_manager.h90
-rw-r--r--src/mongo/transport/transport_layer_mock.cpp95
-rw-r--r--src/mongo/transport/transport_layer_mock.h90
-rw-r--r--src/mongo/util/exit.cpp18
-rw-r--r--src/mongo/util/exit.h5
-rw-r--r--src/mongo/util/net/SConscript11
-rw-r--r--src/mongo/util/net/abstract_message_port.h6
-rw-r--r--src/mongo/util/net/asio_message_port.cpp46
-rw-r--r--src/mongo/util/net/asio_message_port.h3
-rw-r--r--src/mongo/util/net/listen.cpp11
-rw-r--r--src/mongo/util/net/listen.h12
-rw-r--r--src/mongo/util/net/message.h5
-rw-r--r--src/mongo/util/net/message_port.cpp46
-rw-r--r--src/mongo/util/net/message_port.h6
-rw-r--r--src/mongo/util/net/message_port_mock.cpp1
-rw-r--r--src/mongo/util/net/message_port_mock.h1
-rw-r--r--src/mongo/util/net/message_server.h83
-rw-r--r--src/mongo/util/net/message_server_port.cpp244
-rw-r--r--src/mongo/util/net/miniwebserver.cpp2
-rw-r--r--src/mongo/util/net/miniwebserver.h2
-rw-r--r--src/mongo/util/net/sockaddr.cpp19
-rw-r--r--src/mongo/util/ntservice_test.cpp1
-rw-r--r--src/mongo/util/tcmalloc_server_status_section.cpp5
84 files changed, 2186 insertions, 819 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript
index a13e463dd2b..a1a1a609e46 100644
--- a/src/mongo/SConscript
+++ b/src/mongo/SConscript
@@ -254,6 +254,8 @@ mongodLibDeps = [
"db/repl/storage_interface_impl",
"executor/network_interface_factory",
's/commands/shared_cluster_commands',
+ "transport/transport_layer_legacy",
+ "transport/service_entry_point_utils",
"util/clock_sources",
"util/ntservice",
]
@@ -266,6 +268,7 @@ mongod = env.Program(
source=[
"db/db.cpp",
"db/mongod_options_init.cpp",
+ "db/service_entry_point_mongod.cpp",
],
LIBDEPS=mongodLibDeps,
)
@@ -297,6 +300,7 @@ env.Install(
's/client/sharding_connection_hook_for_mongos.cpp',
's/mongos_options_init.cpp',
's/server.cpp',
+ 's/service_entry_point_mongos.cpp',
],
LIBDEPS=[
'db/conn_pool_options',
@@ -309,6 +313,8 @@ env.Install(
's/coreshard',
's/mongoscore',
's/sharding_initialization',
+ 'transport/service_entry_point_utils',
+ 'transport/transport_layer_legacy',
'util/clock_sources',
'util/ntservice',
'util/options_parser/options_parser_init',
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err
index fa413cfd6dd..ab0fbc2c2f7 100644
--- a/src/mongo/base/error_codes.err
+++ b/src/mongo/base/error_codes.err
@@ -170,6 +170,7 @@ error_code("OperatorNotSupportedOnView", 168)
error_code("CommandOnShardedViewNotSupportedOnMongos", 169)
error_code("TooManyMatchingDocuments", 170)
error_code("CannotIndexParallelArrays", 171)
+error_code("TransportSessionNotFound", 172)
# Non-sequential error codes (for compatibility only)
error_code("SocketException", 9001)
diff --git a/src/mongo/client/SConscript b/src/mongo/client/SConscript
index 78601d0b1b6..3a0d63c4320 100644
--- a/src/mongo/client/SConscript
+++ b/src/mongo/client/SConscript
@@ -256,7 +256,7 @@ env.CppUnitTest(
'clientdriver',
'$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
'$BUILD_DIR/mongo/db/service_context_noop_init',
- '$BUILD_DIR/mongo/util/net/message_server_port',
+ '$BUILD_DIR/mongo/transport/transport_layer_legacy',
'$BUILD_DIR/mongo/util/net/network',
],
)
diff --git a/src/mongo/client/scoped_db_conn_test.cpp b/src/mongo/client/scoped_db_conn_test.cpp
index 9ee72a63953..ba54e4ec733 100644
--- a/src/mongo/client/scoped_db_conn_test.cpp
+++ b/src/mongo/client/scoped_db_conn_test.cpp
@@ -41,11 +41,14 @@
#include "mongo/rpc/request_interface.h"
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/thread.h"
+#include "mongo/transport/service_entry_point.h"
+#include "mongo/transport/session.h"
+#include "mongo/transport/transport_layer.h"
+#include "mongo/transport/transport_layer_legacy.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
#include "mongo/util/net/listen.h"
-#include "mongo/util/net/message_server.h"
#include "mongo/util/net/socket_exception.h"
#include "mongo/util/quick_exit.h"
#include "mongo/util/time_support.h"
@@ -72,12 +75,30 @@ namespace {
const string TARGET_HOST = "localhost:27017";
const int TARGET_PORT = 27017;
-class DummyMessageHandler final : public MessageHandler {
+class DummyServiceEntryPoint : public ServiceEntryPoint {
+ MONGO_DISALLOW_COPYING(DummyServiceEntryPoint);
+
public:
- virtual void connected(AbstractMessagingPort* p) {}
+ DummyServiceEntryPoint() {}
+
+ virtual ~DummyServiceEntryPoint() {
+ for (auto& t : _threads) {
+ t.join();
+ }
+ }
+
+ void startSession(transport::Session&& session) override {
+ _threads.emplace_back(&DummyServiceEntryPoint::run, this, std::move(session));
+ }
+
+private:
+ void run(transport::Session&& session) {
+ Message inMessage;
+ if (!session.sourceMessage(&inMessage).wait().isOK()) {
+ return;
+ }
- virtual void process(Message& m, AbstractMessagingPort* port) {
- auto request = rpc::makeRequest(&m);
+ auto request = rpc::makeRequest(&inMessage);
auto reply = rpc::makeReplyBuilder(request->getProtocol());
BSONObjBuilder commandResponse;
@@ -92,10 +113,14 @@ public:
.setMetadata(rpc::makeEmptyMetadata())
.done();
- port->reply(m, response);
+ response.header().setResponseToMsgId(inMessage.header().getId());
+
+ if (!session.sinkMessage(response).wait().isOK()) {
+ return;
+ }
}
- virtual void close() {}
+ std::vector<stdx::thread> _threads;
};
// TODO: Take this out and make it as a reusable class in a header file. The only
@@ -130,15 +155,15 @@ public:
* @param messageHandler the message handler to use for this server. Ownership
* of this object is passed to this server.
*/
- void run(std::shared_ptr<MessageHandler> messsageHandler) {
- if (_server != NULL) {
+ void run(std::shared_ptr<ServiceEntryPoint> serviceEntryPoint) {
+ if (_server) {
return;
}
- MessageServer::Options options;
+ transport::TransportLayerLegacy::Options options;
options.port = _port;
- _server.reset(createServer(options, std::move(messsageHandler), getGlobalServiceContext()));
+ _server = stdx::make_unique<transport::TransportLayerLegacy>(options, serviceEntryPoint);
_serverThread = stdx::thread(runServer, _server.get());
}
@@ -164,23 +189,23 @@ public:
sleepmillis(500);
connCount = Listener::globalTicketHolder.used();
}
-
+ _server->shutdown();
_server.reset();
}
/**
* Helper method for running the server on a separate thread.
*/
- static void runServer(MessageServer* server) {
- server->setupSockets();
- server->run();
+ static void runServer(transport::TransportLayerLegacy* server) {
+ server->setup();
+ server->start();
}
private:
const int _port;
stdx::thread _serverThread;
- unique_ptr<MessageServer> _server;
+ unique_ptr<transport::TransportLayerLegacy> _server;
};
/**
@@ -190,9 +215,9 @@ class DummyServerFixture : public unittest::Test {
public:
void setUp() {
_maxPoolSizePerHost = globalConnPool.getMaxPoolSize();
- _dummyServer = new DummyServer(TARGET_PORT);
+ _dummyServer = stdx::make_unique<DummyServer>(TARGET_PORT);
- auto dummyHandler = std::make_shared<DummyMessageHandler>();
+ auto dummyHandler = std::make_shared<DummyServiceEntryPoint>();
_dummyServer->run(std::move(dummyHandler));
DBClientConnection conn;
Timer timer;
@@ -212,7 +237,7 @@ public:
void tearDown() {
ScopedDbConnection::clearPool();
- delete _dummyServer;
+ _dummyServer.reset();
globalConnPool.setMaxPoolSize(_maxPoolSizePerHost);
}
@@ -272,12 +297,12 @@ protected:
}
private:
- static void runServer(MessageServer* server) {
- server->setupSockets();
- server->run();
+ static void runServer(transport::TransportLayerLegacy* server) {
+ server->setup();
+ server->start();
}
- DummyServer* _dummyServer;
+ std::unique_ptr<DummyServer> _dummyServer;
uint32_t _maxPoolSizePerHost;
};
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 92f97280964..ba8c1e855bd 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -100,6 +100,7 @@ env.Library(
"dbmessage.cpp",
],
LIBDEPS=[
+ '$BUILD_DIR/mongo/transport/transport_layer_common',
'$BUILD_DIR/mongo/util/net/network',
]
)
@@ -402,7 +403,6 @@ env.Library(
"$BUILD_DIR/mongo/rpc/command_reply",
"$BUILD_DIR/mongo/rpc/command_request",
"$BUILD_DIR/mongo/rpc/metadata",
- "$BUILD_DIR/mongo/util/net/message_server_port",
"$BUILD_DIR/mongo/util/net/miniwebserver",
"$BUILD_DIR/mongo/util/processinfo",
"$BUILD_DIR/mongo/util/signal_handlers",
@@ -473,6 +473,8 @@ env.Library(
'$BUILD_DIR/mongo/util/decorable',
'$BUILD_DIR/mongo/util/fail_point',
'$BUILD_DIR/mongo/util/net/hostandport',
+ '$BUILD_DIR/mongo/transport/transport_layer_common',
+ '$BUILD_DIR/mongo/transport/transport_layer_manager',
],
)
diff --git a/src/mongo/db/auth/sasl_commands.cpp b/src/mongo/db/auth/sasl_commands.cpp
index f006cded7d0..0fefd3b6cc9 100644
--- a/src/mongo/db/auth/sasl_commands.cpp
+++ b/src/mongo/db/auth/sasl_commands.cpp
@@ -187,10 +187,10 @@ Status doSaslStep(const ClientBasic* client,
status = session->step(payload, &responsePayload);
if (!status.isOK()) {
- const SockAddr clientAddr = client->port()->remoteAddr();
log() << session->getMechanism() << " authentication failed for "
<< session->getPrincipalId() << " on " << session->getAuthenticationDatabase()
- << " from client " << clientAddr.getAddr() << " ; " << status.toString() << std::endl;
+ << " from client " << client->getRemote().toString() << " ; " << status.toString()
+ << std::endl;
sleepmillis(saslGlobalParams.authFailedDelay);
// All the client needs to know is that authentication has failed.
diff --git a/src/mongo/db/client.cpp b/src/mongo/db/client.cpp
index 807a7650aa5..2a665268811 100644
--- a/src/mongo/db/client.cpp
+++ b/src/mongo/db/client.cpp
@@ -44,6 +44,7 @@
#include "mongo/db/lasterror.h"
#include "mongo/db/service_context.h"
#include "mongo/stdx/thread.h"
+#include "mongo/transport/session.h"
#include "mongo/util/concurrency/thread_name.h"
#include "mongo/util/exit.h"
#include "mongo/util/mongoutils/str.h"
@@ -63,16 +64,16 @@ void Client::initThreadIfNotAlready() {
initThreadIfNotAlready(getThreadName().c_str());
}
-void Client::initThread(const char* desc, AbstractMessagingPort* mp) {
- initThread(desc, getGlobalServiceContext(), mp);
+void Client::initThread(const char* desc, transport::Session* session) {
+ initThread(desc, getGlobalServiceContext(), session);
}
-void Client::initThread(const char* desc, ServiceContext* service, AbstractMessagingPort* mp) {
+void Client::initThread(const char* desc, ServiceContext* service, transport::Session* session) {
invariant(currentClient.getMake()->get() == nullptr);
std::string fullDesc;
- if (mp != NULL) {
- fullDesc = str::stream() << desc << mp->connectionId();
+ if (session) {
+ fullDesc = str::stream() << desc << session->id();
} else {
fullDesc = desc;
}
@@ -80,7 +81,7 @@ void Client::initThread(const char* desc, ServiceContext* service, AbstractMessa
setThreadName(fullDesc.c_str());
// Create the client obj, attach to thread
- *currentClient.get() = service->makeClient(fullDesc, mp);
+ *currentClient.get() = service->makeClient(fullDesc, session);
}
void Client::destroy() {
@@ -98,11 +99,11 @@ int64_t generateSeed(const std::string& desc) {
}
} // namespace
-Client::Client(std::string desc, ServiceContext* serviceContext, AbstractMessagingPort* p)
- : ClientBasic(serviceContext, p),
+Client::Client(std::string desc, ServiceContext* serviceContext, transport::Session* session)
+ : ClientBasic(serviceContext, session),
_desc(std::move(desc)),
_threadId(stdx::this_thread::get_id()),
- _connectionId(p ? p->connectionId() : 0),
+ _connectionId(session ? session->id() : 0),
_prng(generateSeed(_desc)) {}
void Client::reportState(BSONObjBuilder& builder) {
diff --git a/src/mongo/db/client.h b/src/mongo/db/client.h
index 407a0bac868..a7523981e47 100644
--- a/src/mongo/db/client.h
+++ b/src/mongo/db/client.h
@@ -51,6 +51,10 @@ class AbstractMessagingPort;
class Collection;
class OperationContext;
+namespace transport {
+class Session;
+} // namespace transport
+
typedef long long ConnectionId;
/** the database's concept of an outside "client" */
@@ -59,16 +63,16 @@ public:
/**
* Creates a Client object and stores it in TLS for the current thread.
*
- * An unowned pointer to a AbstractMessagingPort may optionally be provided. If 'mp' is
- * non-null, then it will be used to augment the thread name, and for reporting purposes.
+ * An unowned pointer to a transport::Session may optionally be provided. If 'session'
+ * is non-null, then it will be used to augment the thread name, and for reporting purposes.
*
- * If provided, 'mp' must outlive the newly-created Client object. Client::destroy() may be used
- * to help enforce that the Client does not outlive 'mp'.
+ * If provided, 'session' must outlive the newly-created Client object. Client::destroy() may
+ * be used to help enforce that the Client does not outlive 'session.'
*/
- static void initThread(const char* desc, AbstractMessagingPort* mp = 0);
+ static void initThread(const char* desc, transport::Session* session = nullptr);
static void initThread(const char* desc,
ServiceContext* serviceContext,
- AbstractMessagingPort* mp);
+ transport::Session* session);
/**
* Inits a thread if that thread has not already been init'd, setting the thread name to
@@ -159,7 +163,7 @@ public:
private:
friend class ServiceContext;
- Client(std::string desc, ServiceContext* serviceContext, AbstractMessagingPort* p = 0);
+ Client(std::string desc, ServiceContext* serviceContext, transport::Session* session = nullptr);
// Description for the client (e.g. conn8)
diff --git a/src/mongo/db/client_basic.cpp b/src/mongo/db/client_basic.cpp
index 06e619597c3..578ebdf668d 100644
--- a/src/mongo/db/client_basic.cpp
+++ b/src/mongo/db/client_basic.cpp
@@ -32,8 +32,8 @@
namespace mongo {
-ClientBasic::ClientBasic(ServiceContext* serviceContext, AbstractMessagingPort* messagingPort)
- : _serviceContext(serviceContext), _messagingPort(messagingPort) {}
+ClientBasic::ClientBasic(ServiceContext* serviceContext, transport::Session* session)
+ : _serviceContext(serviceContext), _session(session) {}
ClientBasic::~ClientBasic() = default;
diff --git a/src/mongo/db/client_basic.h b/src/mongo/db/client_basic.h
index 88d7ef4afe8..09f31b5da42 100644
--- a/src/mongo/db/client_basic.h
+++ b/src/mongo/db/client_basic.h
@@ -31,6 +31,7 @@
#include <memory>
#include "mongo/base/disallow_copying.h"
+#include "mongo/transport/session.h"
#include "mongo/util/decorable.h"
#include "mongo/util/net/abstract_message_port.h"
#include "mongo/util/net/hostandport.h"
@@ -58,11 +59,12 @@ public:
}
bool hasRemote() const {
- return _messagingPort;
+ return _session;
}
+
HostAndPort getRemote() const {
- verify(_messagingPort);
- return _messagingPort->remote();
+ verify(_session);
+ return _session->remote();
}
/**
@@ -73,20 +75,20 @@ public:
}
/**
- * Returns the AbstractMessagePort to which this client session is bound, if any.
+ * Returns the Session to which this client is bound, if any.
*/
- AbstractMessagingPort* port() const {
- return _messagingPort;
+ transport::Session* session() const {
+ return _session;
}
static ClientBasic* getCurrent();
protected:
- ClientBasic(ServiceContext* serviceContext, AbstractMessagingPort* messagingPort);
+ ClientBasic(ServiceContext* serviceContext, transport::Session* session);
~ClientBasic();
private:
ServiceContext* const _serviceContext;
- AbstractMessagingPort* const _messagingPort;
+ transport::Session* const _session;
};
}
diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript
index 919510c4b79..87c472dc519 100644
--- a/src/mongo/db/commands/SConscript
+++ b/src/mongo/db/commands/SConscript
@@ -71,6 +71,7 @@ env.Library(
'$BUILD_DIR/mongo/s/coreshard',
'$BUILD_DIR/mongo/s/write_ops/batch_write_types',
'$BUILD_DIR/mongo/scripting/scripting_common',
+ '$BUILD_DIR/mongo/transport/transport_layer_common',
'$BUILD_DIR/mongo/util/cmdline_utils/cmdline_utils',
'$BUILD_DIR/mongo/util/foundation',
'$BUILD_DIR/mongo/util/ntservice',
diff --git a/src/mongo/db/commands/authentication_commands.cpp b/src/mongo/db/commands/authentication_commands.cpp
index 6fc6268a807..2f05971ef56 100644
--- a/src/mongo/db/commands/authentication_commands.cpp
+++ b/src/mongo/db/commands/authentication_commands.cpp
@@ -56,6 +56,7 @@
#include "mongo/db/server_options.h"
#include "mongo/platform/random.h"
#include "mongo/stdx/memory.h"
+#include "mongo/transport/session.h"
#include "mongo/util/concurrency/mutex.h"
#include "mongo/util/log.h"
#include "mongo/util/md5.hpp"
@@ -314,17 +315,17 @@ Status CmdAuthenticate::_authenticateX509(OperationContext* txn,
ClientBasic* client = ClientBasic::getCurrent();
AuthorizationSession* authorizationSession = AuthorizationSession::get(client);
- std::string clientSubjectName = client->port()->getX509SubjectName();
+ auto clientName = client->session()->getX509SubjectName();
if (!getSSLManager()->getSSLConfiguration().hasCA) {
return Status(ErrorCodes::AuthenticationFailed,
"Unable to verify x.509 certificate, as no CA has been provided.");
- } else if (user.getUser() != clientSubjectName) {
+ } else if (user.getUser() != clientName) {
return Status(ErrorCodes::AuthenticationFailed,
"There is no x.509 client certificate matching the user.");
} else {
// Handle internal cluster member auth, only applies to server-server connections
- if (getSSLManager()->getSSLConfiguration().isClusterMember(clientSubjectName)) {
+ if (getSSLManager()->getSSLConfiguration().isClusterMember(clientName)) {
int clusterAuthMode = serverGlobalParams.clusterAuthMode.load();
if (clusterAuthMode == ServerGlobalParams::ClusterAuthMode_undefined ||
clusterAuthMode == ServerGlobalParams::ClusterAuthMode_keyFile) {
diff --git a/src/mongo/db/commands/server_status.cpp b/src/mongo/db/commands/server_status.cpp
index bbe7e9930a7..e567a15b18b 100644
--- a/src/mongo/db/commands/server_status.cpp
+++ b/src/mongo/db/commands/server_status.cpp
@@ -47,9 +47,9 @@
#include "mongo/db/service_context.h"
#include "mongo/db/stats/counters.h"
#include "mongo/platform/process_id.h"
+#include "mongo/transport/transport_layer.h"
#include "mongo/util/log.h"
#include "mongo/util/net/hostname_canonicalization_worker.h"
-#include "mongo/util/net/listen.h"
#include "mongo/util/net/ssl_manager.h"
#include "mongo/util/processinfo.h"
#include "mongo/util/ramlog.h"
@@ -228,9 +228,10 @@ public:
BSONObj generateSection(OperationContext* txn, const BSONElement& configElement) const {
BSONObjBuilder bb;
- bb.append("current", Listener::globalTicketHolder.used());
- bb.append("available", Listener::globalTicketHolder.available());
- bb.append("totalCreated", Listener::globalConnectionNumber.load());
+ auto stats = txn->getServiceContext()->getTransportLayer()->sessionStats();
+ bb.append("current", static_cast<int>(stats.numOpenSessions));
+ bb.append("available", static_cast<int>(stats.numAvailableSessions));
+ bb.append("totalCreated", static_cast<int>(stats.numCreatedSessions));
return bb.obj();
}
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp
index a10b0be5a39..56dff7cecaf 100644
--- a/src/mongo/db/db.cpp
+++ b/src/mongo/db/db.cpp
@@ -96,6 +96,7 @@
#include "mongo/db/service_context.h"
#include "mongo/db/service_context_d.h"
#include "mongo/db/service_context_d.h"
+#include "mongo/db/service_entry_point_mongod.h"
#include "mongo/db/startup_warnings_mongod.h"
#include "mongo/db/stats/counters.h"
#include "mongo/db/stats/snapshots.h"
@@ -114,6 +115,7 @@
#include "mongo/stdx/future.h"
#include "mongo/stdx/memory.h"
#include "mongo/stdx/thread.h"
+#include "mongo/transport/transport_layer_legacy.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/cmdline_utils/censor_cmdline.h"
#include "mongo/util/concurrency/task.h"
@@ -124,7 +126,6 @@
#include "mongo/util/log.h"
#include "mongo/util/net/hostname_canonicalization_worker.h"
#include "mongo/util/net/listen.h"
-#include "mongo/util/net/message_server.h"
#include "mongo/util/net/ssl_manager.h"
#include "mongo/util/ntservice.h"
#include "mongo/util/options_parser/startup_options.h"
@@ -169,65 +170,6 @@ ntservice::NtServiceDefaultStrings defaultServiceStrings = {
Timer startupSrandTimer;
-class MyMessageHandler : public MessageHandler {
-public:
- virtual void connected(AbstractMessagingPort* p) {
- Client::initThread("conn", p);
- }
-
- virtual void process(Message& m, AbstractMessagingPort* port) {
- while (true) {
- if (inShutdown()) {
- log() << "got request after shutdown()" << endl;
- break;
- }
-
- DbResponse dbresponse;
- {
- auto opCtx = getGlobalServiceContext()->makeOperationContext(&cc());
- assembleResponse(opCtx.get(), m, dbresponse, port->remote());
-
- // opCtx must go out of scope here so that the operation cannot show up in currentOp
- // results after the response reaches the client
- }
-
- if (!dbresponse.response.empty()) {
- port->reply(m, dbresponse.response, dbresponse.responseToMsgId);
- if (dbresponse.exhaustNS.size() > 0) {
- MsgData::View header = dbresponse.response.header();
- QueryResult::View qr = header.view2ptr();
- long long cursorid = qr.getCursorId();
- if (cursorid) {
- verify(dbresponse.exhaustNS.size() && dbresponse.exhaustNS[0]);
- string ns = dbresponse.exhaustNS; // before reset() free's it...
- m.reset();
- BufBuilder b(512);
- b.appendNum((int)0 /*size set later*/);
- b.appendNum(header.getId());
- b.appendNum(header.getResponseToMsgId());
- b.appendNum((int)dbGetMore);
- b.appendNum((int)0);
- b.appendStr(ns);
- b.appendNum((int)0); // ntoreturn
- b.appendNum(cursorid);
-
- MsgData::View header = b.buf();
- header.setLen(b.len());
- m.setData(b.release());
- DEV log() << "exhaust=true sending more";
- continue; // this goes back to top loop
- }
- }
- }
- break;
- }
- }
-
- virtual void close() {
- Client::destroy();
- }
-};
-
static void logStartup(OperationContext* txn) {
BSONObjBuilder toLog;
stringstream id;
@@ -530,7 +472,7 @@ static void _initWireSpec() {
}
-static void _initAndListen(int listenPort) {
+static ExitCode _initAndListen(int listenPort) {
Client::initThread("initandlisten");
_initWireSpec();
@@ -570,20 +512,18 @@ static void _initAndListen(int listenPort) {
checked_cast<ServiceContextMongoD*>(getGlobalServiceContext())->createLockFile();
- // Due to SERVER-15389, we must setupSockets first thing at startup in order to avoid
- // obtaining too high a file descriptor for our calls to select().
- MessageServer::Options options;
+ transport::TransportLayerLegacy::Options options;
options.port = listenPort;
options.ipList = serverGlobalParams.bind_ip;
- auto handler = std::make_shared<MyMessageHandler>();
- MessageServer* server = createServer(options, std::move(handler), getGlobalServiceContext());
-
- // This is what actually creates the sockets, but does not yet listen on them because we
- // do not want connections to just hang if recovery takes a very long time.
- if (!server->setupSockets()) {
- error() << "Failed to set up sockets during startup.";
- return;
+ // Create, start, and attach the TL
+ auto transportLayer = stdx::make_unique<transport::TransportLayerLegacy>(
+ options,
+ std::make_shared<ServiceEntryPointMongod>(getGlobalServiceContext()->getTransportLayer()));
+ auto res = transportLayer->setup();
+ if (!res.isOK()) {
+ error() << "Failed to set up listener: " << res.toString();
+ return EXIT_NET_ERROR;
}
std::shared_ptr<DbWebServer> dbWebServer;
@@ -594,7 +534,7 @@ static void _initAndListen(int listenPort) {
new RestAdminAccess()));
if (!dbWebServer->setupSockets()) {
error() << "Failed to set up sockets for HTTP interface during startup.";
- return;
+ return EXIT_NET_ERROR;
}
}
@@ -674,7 +614,7 @@ static void _initAndListen(int listenPort) {
}
if (mmapv1GlobalOptions.journalOptions & MMAPV1Options::JournalRecoverOnly)
- return;
+ return EXIT_NET_ERROR;
if (mongodGlobalParams.scriptingEnabled) {
ScriptEngine::setup();
@@ -799,14 +739,19 @@ static void _initAndListen(int listenPort) {
// MessageServer::run will return when exit code closes its socket and we don't need the
// operation context anymore
startupOpCtx.reset();
- server->run();
+
+ auto start = getGlobalServiceContext()->addAndStartTransportLayer(std::move(transportLayer));
+ if (!start.isOK()) {
+ error() << "Failed to start the listener: " << start.toString();
+ return EXIT_NET_ERROR;
+ }
+
+ return waitForShutdown();
}
ExitCode initAndListen(int listenPort) {
try {
- _initAndListen(listenPort);
-
- return inShutdown() ? EXIT_CLEAN : EXIT_NET_ERROR;
+ return _initAndListen(listenPort);
} catch (DBException& e) {
log() << "exception in initAndListen: " << e.toString() << ", terminating" << endl;
return EXIT_UNCAUGHT;
@@ -1001,7 +946,6 @@ static void reportEventToSystemImpl(const char* msg) {
// registerShutdownTask is called below. It must not depend on the
// prior execution of mongo initializers or the existence of threads.
static void shutdownTask() {
-
auto serviceContext = getGlobalServiceContext();
Client::initThreadIfNotAlready();
@@ -1014,6 +958,7 @@ static void shutdownTask() {
txn = uniqueTxn.get();
}
+ getGlobalServiceContext()->getTransportLayer()->shutdown();
log(LogComponent::kNetwork) << "shutdown: going to close listening sockets..." << endl;
ListeningSockets::get()->closeAll();
@@ -1037,12 +982,6 @@ static void shutdownTask() {
// will do it for us when the process terminates.
stdx::packaged_task<void()> dryOutTask([] {
-
- // Walk all open sockets and close them. This should unblock any that are sitting in
- // recv. Other sockets on which there are active operations should see the inShutdown flag
- // as true when they complete the operation.
- Listener::closeMessagingPorts(0);
-
// There isn't currently a way to wait on the TicketHolder to have all its tickets back,
// unfortunately. So, busy wait in this detached thread.
while (true) {
diff --git a/src/mongo/db/dbmessage.cpp b/src/mongo/db/dbmessage.cpp
index 67a5548dbfb..f16ec2b6276 100644
--- a/src/mongo/db/dbmessage.cpp
+++ b/src/mongo/db/dbmessage.cpp
@@ -30,7 +30,9 @@
#include "mongo/platform/basic.h"
#include "mongo/db/dbmessage.h"
+#include "mongo/db/operation_context.h"
#include "mongo/platform/strnlen.h"
+#include "mongo/transport/session.h"
namespace mongo {
@@ -173,20 +175,23 @@ OpQueryReplyBuilder::OpQueryReplyBuilder() : _buffer(32768) {
_buffer.skip(sizeof(QueryResult::Value));
}
-void OpQueryReplyBuilder::send(AbstractMessagingPort* destination,
+void OpQueryReplyBuilder::send(transport::Session* session,
int queryResultFlags,
- Message& requestMsg,
+ const Message& requestMsg,
int nReturned,
int startingFrom,
long long cursorId) {
Message response;
putInMessage(&response, queryResultFlags, nReturned, startingFrom, cursorId);
- destination->reply(requestMsg, response, requestMsg.header().getId());
+
+ response.header().setId(nextMessageId());
+ response.header().setResponseToMsgId(requestMsg.header().getId());
+
+ uassertStatusOK(session->sinkMessage(response).wait());
}
-void OpQueryReplyBuilder::sendCommandReply(AbstractMessagingPort* destination,
- Message& requestMsg) {
- send(destination, /*queryFlags*/ 0, requestMsg, /*nReturned*/ 1);
+void OpQueryReplyBuilder::sendCommandReply(transport::Session* session, const Message& requestMsg) {
+ send(session, /*queryFlags*/ 0, requestMsg, /*nReturned*/ 1);
}
void OpQueryReplyBuilder::putInMessage(
@@ -202,7 +207,7 @@ void OpQueryReplyBuilder::putInMessage(
}
void replyToQuery(int queryResultFlags,
- AbstractMessagingPort* p,
+ transport::Session* session,
Message& requestMsg,
const void* data,
int size,
@@ -211,15 +216,19 @@ void replyToQuery(int queryResultFlags,
long long cursorId) {
OpQueryReplyBuilder reply;
reply.bufBuilderForResults().appendBuf(data, size);
- reply.send(p, queryResultFlags, requestMsg, nReturned, startingFrom, cursorId);
+ reply.send(session, queryResultFlags, requestMsg, nReturned, startingFrom, cursorId);
}
void replyToQuery(int queryResultFlags,
- AbstractMessagingPort* p,
+ transport::Session* session,
Message& requestMsg,
const BSONObj& responseObj) {
- replyToQuery(
- queryResultFlags, p, requestMsg, (void*)responseObj.objdata(), responseObj.objsize(), 1);
+ replyToQuery(queryResultFlags,
+ session,
+ requestMsg,
+ (void*)responseObj.objdata(),
+ responseObj.objsize(),
+ 1);
}
void replyToQuery(int queryResultFlags, Message& m, DbResponse& dbresponse, BSONObj obj) {
diff --git a/src/mongo/db/dbmessage.h b/src/mongo/db/dbmessage.h
index 7f1a3a2d0f2..7fabf117421 100644
--- a/src/mongo/db/dbmessage.h
+++ b/src/mongo/db/dbmessage.h
@@ -39,6 +39,12 @@
namespace mongo {
+class OperationContext;
+
+namespace transport {
+class Session;
+} // namespace transport
+
/* db response format
Query or GetMore: // see struct QueryResult
@@ -352,9 +358,9 @@ public:
/**
* Finishes the reply and sends the message out to 'destination'.
*/
- void send(AbstractMessagingPort* destination,
+ void send(transport::Session* session,
int queryResultFlags,
- Message& requestMsg, // should be const but MessagePort::reply takes non-const.
+ const Message& requestMsg,
int nReturned,
int startingFrom = 0,
long long cursorId = 0);
@@ -362,14 +368,14 @@ public:
/**
* Similar to send() but used for replying to a command.
*/
- void sendCommandReply(AbstractMessagingPort* destination, Message& requestMsg);
+ void sendCommandReply(transport::Session* session, const Message& requestMsg);
private:
BufBuilder _buffer;
};
void replyToQuery(int queryResultFlags,
- AbstractMessagingPort* p,
+ transport::Session* session,
Message& requestMsg,
const void* data,
int size,
@@ -377,10 +383,9 @@ void replyToQuery(int queryResultFlags,
int startingFrom = 0,
long long cursorId = 0);
-
/* object reply helper. */
void replyToQuery(int queryResultFlags,
- AbstractMessagingPort* p,
+ transport::Session* session,
Message& requestMsg,
const BSONObj& responseObj);
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index b6a2981dde0..b92ec300cf8 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -398,6 +398,7 @@ env.Library('repl_coordinator_impl',
'$BUILD_DIR/mongo/db/service_context',
'$BUILD_DIR/mongo/rpc/command_status',
'$BUILD_DIR/mongo/rpc/metadata',
+ '$BUILD_DIR/mongo/transport/transport_layer_common',
'$BUILD_DIR/mongo/util/fail_point',
'collection_cloner',
'data_replicator',
@@ -627,6 +628,7 @@ env.Library('replica_set_messages',
'$BUILD_DIR/mongo/client/connection_string',
'$BUILD_DIR/mongo/db/common',
'$BUILD_DIR/mongo/db/server_options_core',
+ '$BUILD_DIR/mongo/transport/transport_layer_common',
'$BUILD_DIR/mongo/util/net/hostandport',
'optime',
'read_concern_args',
diff --git a/src/mongo/db/repl/repl_set_request_votes.cpp b/src/mongo/db/repl/repl_set_request_votes.cpp
index 4b34c351769..c01628466c0 100644
--- a/src/mongo/db/repl/repl_set_request_votes.cpp
+++ b/src/mongo/db/repl/repl_set_request_votes.cpp
@@ -35,6 +35,8 @@
#include "mongo/db/repl/repl_set_request_votes_args.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/executor/network_interface.h"
+#include "mongo/transport/session.h"
+#include "mongo/transport/transport_layer.h"
#include "mongo/util/scopeguard.h"
namespace mongo {
@@ -64,16 +66,17 @@ private:
// We want to keep request vote connection open when relinquishing primary.
// Tag it here.
- unsigned originalTag = 0;
- AbstractMessagingPort* mp = txn->getClient()->port();
- if (mp) {
- originalTag = mp->getTag();
- mp->setTag(originalTag | executor::NetworkInterface::kMessagingPortKeepOpen);
+ transport::Session::TagMask originalTag = 0;
+ transport::Session* session = txn->getClient()->session();
+ if (session) {
+ originalTag = session->getTags();
+ session->replaceTags(originalTag | transport::Session::kKeepOpen);
}
+
// Untag the connection on exit.
- ON_BLOCK_EXIT([mp, originalTag]() {
- if (mp) {
- mp->setTag(originalTag);
+ ON_BLOCK_EXIT([session, originalTag]() {
+ if (session) {
+ session->replaceTags(originalTag);
}
});
diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h
index 582b8f45970..245b37cca7f 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state.h
@@ -192,8 +192,8 @@ public:
virtual HostAndPort getClientHostAndPort(const OperationContext* txn) = 0;
/**
- * Closes all connections except those marked with the keepOpen property, which should
- * just be connections used for heartbeating.
+ * Closes all connections in the given TransportLayer except those marked with the
+ * keepOpen property, which should just be connections used for heartbeating.
* This is used during stepdown, and transition out of primary.
*/
virtual void closeConnections() = 0;
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 81ed9c16bf3..f140580fbcc 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -76,6 +76,8 @@
#include "mongo/stdx/functional.h"
#include "mongo/stdx/memory.h"
#include "mongo/stdx/thread.h"
+#include "mongo/transport/session.h"
+#include "mongo/transport/transport_layer.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/exit.h"
@@ -449,7 +451,7 @@ HostAndPort ReplicationCoordinatorExternalStateImpl::getClientHostAndPort(
}
void ReplicationCoordinatorExternalStateImpl::closeConnections() {
- Listener::closeMessagingPorts(executor::NetworkInterface::kMessagingPortKeepOpen);
+ getGlobalServiceContext()->getTransportLayer()->endAllSessions(transport::Session::kKeepOpen);
}
void ReplicationCoordinatorExternalStateImpl::killAllUserOperations(OperationContext* txn) {
diff --git a/src/mongo/db/repl/replication_info.cpp b/src/mongo/db/repl/replication_info.cpp
index cc844eaf35c..fd36395c017 100644
--- a/src/mongo/db/repl/replication_info.cpp
+++ b/src/mongo/db/repl/replication_info.cpp
@@ -234,9 +234,10 @@ public:
// Tag connections to avoid closing them on stepdown.
auto hangUpElement = cmdObj["hangUpOnStepDown"];
if (!hangUpElement.eoo() && !hangUpElement.trueValue()) {
- AbstractMessagingPort* mp = txn->getClient()->port();
- if (mp) {
- mp->setTag(mp->getTag() | executor::NetworkInterface::kMessagingPortKeepOpen);
+ auto session = txn->getClient()->session();
+ if (session) {
+ session->replaceTags(session->getTags() |
+ executor::NetworkInterface::kMessagingPortKeepOpen);
}
}
appendReplicationInfo(txn, result, 0);
diff --git a/src/mongo/db/repl/replset_commands.cpp b/src/mongo/db/repl/replset_commands.cpp
index 1b96dac313f..f591d64c11b 100644
--- a/src/mongo/db/repl/replset_commands.cpp
+++ b/src/mongo/db/repl/replset_commands.cpp
@@ -59,6 +59,8 @@
#include "mongo/db/service_context.h"
#include "mongo/db/storage/storage_engine.h"
#include "mongo/executor/network_interface.h"
+#include "mongo/transport/session.h"
+#include "mongo/transport/transport_layer.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
#include "mongo/util/scopeguard.h"
@@ -726,17 +728,17 @@ public:
/* we want to keep heartbeat connections open when relinquishing primary.
tag them here. */
- AbstractMessagingPort* mp = txn->getClient()->port();
- unsigned originalTag = 0;
- if (mp) {
- originalTag = mp->getTag();
- mp->setTag(originalTag | executor::NetworkInterface::kMessagingPortKeepOpen);
+ transport::Session::TagMask originalTag = 0;
+ transport::Session* session = txn->getClient()->session();
+ if (session) {
+ originalTag = session->getTags();
+ session->replaceTags(originalTag | transport::Session::kKeepOpen);
}
// Unset the tag on block exit
- ON_BLOCK_EXIT([mp, originalTag]() {
- if (mp) {
- mp->setTag(originalTag);
+ ON_BLOCK_EXIT([session, originalTag]() {
+ if (session) {
+ session->replaceTags(originalTag);
}
});
diff --git a/src/mongo/db/s/collection_metadata_test.cpp b/src/mongo/db/s/collection_metadata_test.cpp
index 013c135cc84..359cb3b4fc1 100644
--- a/src/mongo/db/s/collection_metadata_test.cpp
+++ b/src/mongo/db/s/collection_metadata_test.cpp
@@ -50,7 +50,7 @@ class NoChunkFixture : public ShardingCatalogTestFixture {
protected:
void setUp() {
ShardingCatalogTestFixture::setUp();
- getMessagingPort()->setRemote(HostAndPort("FakeRemoteClient:34567"));
+ setRemote(HostAndPort("FakeRemoteClient:34567"));
configTargeter()->setFindHostReturnValue(configHost);
OID epoch = OID::gen();
@@ -318,7 +318,7 @@ class SingleChunkFixture : public ShardingCatalogTestFixture {
protected:
void setUp() {
ShardingCatalogTestFixture::setUp();
- getMessagingPort()->setRemote(HostAndPort("FakeRemoteClient:34567"));
+ setRemote(HostAndPort("FakeRemoteClient:34567"));
configTargeter()->setFindHostReturnValue(configHost);
OID epoch = OID::gen();
@@ -574,7 +574,7 @@ class SingleChunkMinMaxCompoundKeyFixture : public ShardingCatalogTestFixture {
protected:
void setUp() {
ShardingCatalogTestFixture::setUp();
- getMessagingPort()->setRemote(HostAndPort("FakeRemoteClient:34567"));
+ setRemote(HostAndPort("FakeRemoteClient:34567"));
configTargeter()->setFindHostReturnValue(configHost);
OID epoch = OID::gen();
@@ -642,7 +642,7 @@ class TwoChunksWithGapCompoundKeyFixture : public ShardingCatalogTestFixture {
protected:
void setUp() {
ShardingCatalogTestFixture::setUp();
- getMessagingPort()->setRemote(HostAndPort("FakeRemoteClient:34567"));
+ setRemote(HostAndPort("FakeRemoteClient:34567"));
configTargeter()->setFindHostReturnValue(configHost);
OID epoch = OID::gen();
@@ -839,7 +839,7 @@ class ThreeChunkWithRangeGapFixture : public ShardingCatalogTestFixture {
protected:
void setUp() {
ShardingCatalogTestFixture::setUp();
- getMessagingPort()->setRemote(HostAndPort("FakeRemoteClient:34567"));
+ setRemote(HostAndPort("FakeRemoteClient:34567"));
configTargeter()->setFindHostReturnValue(configHost);
OID epoch = OID::gen();
diff --git a/src/mongo/db/s/metadata_loader_test.cpp b/src/mongo/db/s/metadata_loader_test.cpp
index c96b77a1590..e33e3142d1a 100644
--- a/src/mongo/db/s/metadata_loader_test.cpp
+++ b/src/mongo/db/s/metadata_loader_test.cpp
@@ -51,7 +51,7 @@ public:
protected:
void setUp() override {
ShardingCatalogTestFixture::setUp();
- getMessagingPort()->setRemote(HostAndPort("FakeRemoteClient:34567"));
+ setRemote(HostAndPort("FakeRemoteClient:34567"));
configTargeter()->setFindHostReturnValue(configHost);
_maxCollVersion = ChunkVersion(1, 0, OID::gen());
_loader.reset(new MetadataLoader);
diff --git a/src/mongo/db/service_context.cpp b/src/mongo/db/service_context.cpp
index 03dc9bcaf8d..648fd546721 100644
--- a/src/mongo/db/service_context.cpp
+++ b/src/mongo/db/service_context.cpp
@@ -34,6 +34,9 @@
#include "mongo/db/client.h"
#include "mongo/db/operation_context.h"
#include "mongo/stdx/memory.h"
+#include "mongo/transport/session.h"
+#include "mongo/transport/transport_layer.h"
+#include "mongo/transport/transport_layer_manager.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/mongoutils/str.h"
#include "mongo/util/system_clock_source.h"
@@ -115,7 +118,8 @@ Status validateStorageOptions(
}
ServiceContext::ServiceContext()
- : _tickSource(stdx::make_unique<SystemTickSource>()),
+ : _transportLayerManager(stdx::make_unique<transport::TransportLayerManager>()),
+ _tickSource(stdx::make_unique<SystemTickSource>()),
_fastClockSource(stdx::make_unique<SystemClockSource>()),
_preciseClockSource(stdx::make_unique<SystemClockSource>()) {}
@@ -125,8 +129,8 @@ ServiceContext::~ServiceContext() {
}
ServiceContext::UniqueClient ServiceContext::makeClient(std::string desc,
- AbstractMessagingPort* p) {
- std::unique_ptr<Client> client(new Client(std::move(desc), this, p));
+ transport::Session* session) {
+ std::unique_ptr<Client> client(new Client(std::move(desc), this, session));
auto observer = _clientObservers.cbegin();
try {
for (; observer != _clientObservers.cend(); ++observer) {
@@ -150,6 +154,14 @@ ServiceContext::UniqueClient ServiceContext::makeClient(std::string desc,
return UniqueClient(client.release());
}
+transport::TransportLayer* ServiceContext::getTransportLayer() const {
+ return _transportLayerManager.get();
+}
+
+Status ServiceContext::addAndStartTransportLayer(std::unique_ptr<transport::TransportLayer> tl) {
+ return _transportLayerManager->addAndStartTransportLayer(std::move(tl));
+}
+
TickSource* ServiceContext::getTickSource() const {
return _tickSource.get();
}
diff --git a/src/mongo/db/service_context.h b/src/mongo/db/service_context.h
index fe6a4432d70..174a52ebe52 100644
--- a/src/mongo/db/service_context.h
+++ b/src/mongo/db/service_context.h
@@ -48,6 +48,12 @@ class Client;
class OperationContext;
class OpObserver;
+namespace transport {
+class Session;
+class TransportLayer;
+class TransportLayerManager;
+} // namespace transport
+
/**
* Classes that implement this interface can receive notification on killOp.
*
@@ -207,9 +213,9 @@ public:
*
* The "desc" string is used to set a descriptive name for the client, used in logging.
*
- * If supplied, "p" is the communication channel used for communicating with the client.
+ * If supplied, "session" is the transport::Session used for communicating with the client.
*/
- UniqueClient makeClient(std::string desc, AbstractMessagingPort* p = nullptr);
+ UniqueClient makeClient(std::string desc, transport::Session* session = nullptr);
/**
* Creates a new OperationContext on "client".
@@ -299,6 +305,26 @@ public:
void registerKillOpListener(KillOpListenerInterface* listener);
//
+ // Transport.
+ //
+
+ /**
+ * Get the master TransportLayer. Routes to all other TransportLayers that
+ * may be in use within this service.
+ *
+ * See TransportLayerManager for more details.
+ */
+ transport::TransportLayer* getTransportLayer() const;
+
+ /**
+ * Add a new TransportLayer to this service context. The new TransportLayer will
+ * be added to the TransportLayerManager accessible via getTransportLayer().
+ *
+ * It additionally calls start() on the TransportLayer after adding it.
+ */
+ Status addAndStartTransportLayer(std::unique_ptr<transport::TransportLayer> tl);
+
+ //
// Global OpObserver.
//
@@ -370,6 +396,11 @@ private:
/**
+ * The TransportLayerManager.
+ */
+ std::unique_ptr<transport::TransportLayerManager> _transportLayerManager;
+
+ /**
* Vector of registered observers.
*/
std::vector<std::unique_ptr<ClientObserver>> _clientObservers;
diff --git a/src/mongo/db/service_entry_point_mongod.cpp b/src/mongo/db/service_entry_point_mongod.cpp
new file mode 100644
index 00000000000..86f782a0bad
--- /dev/null
+++ b/src/mongo/db/service_entry_point_mongod.cpp
@@ -0,0 +1,153 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/service_entry_point_mongod.h"
+
+#include <vector>
+
+#include "mongo/db/client.h"
+#include "mongo/db/dbmessage.h"
+#include "mongo/db/instance.h"
+#include "mongo/stdx/thread.h"
+#include "mongo/transport/service_entry_point_utils.h"
+#include "mongo/transport/session.h"
+#include "mongo/transport/ticket.h"
+#include "mongo/transport/transport_layer.h"
+#include "mongo/util/exit.h"
+#include "mongo/util/log.h"
+#include "mongo/util/net/message.h"
+#include "mongo/util/net/socket_exception.h"
+#include "mongo/util/net/thread_idle_callback.h"
+#include "mongo/util/quick_exit.h"
+
+namespace mongo {
+namespace {
+
+// Set up proper headers for formatting an exhaust request, if we need to
+bool setExhaustMessage(Message* m, const DbResponse& dbresponse) {
+ MsgData::View header = dbresponse.response.header();
+ QueryResult::View qr = header.view2ptr();
+ long long cursorid = qr.getCursorId();
+
+ if (!cursorid) {
+ return false;
+ }
+
+ verify(dbresponse.exhaustNS.size() && dbresponse.exhaustNS[0]);
+
+ auto ns = dbresponse.exhaustNS; // reset() will free this
+
+ m->reset();
+
+ BufBuilder b(512);
+ b.appendNum(static_cast<int>(0) /* size set later in appendData() */);
+ b.appendNum(header.getId());
+ b.appendNum(header.getResponseToMsgId());
+ b.appendNum(static_cast<int>(dbGetMore));
+ b.appendNum(static_cast<int>(0));
+ b.appendStr(ns);
+ b.appendNum(static_cast<int>(0)); // ntoreturn
+ b.appendNum(cursorid);
+
+ MsgData::View(b.buf()).setLen(b.len());
+ m->setData(b.release());
+
+ return true;
+}
+
+} // namespace
+
+using transport::Session;
+using transport::TransportLayer;
+
+ServiceEntryPointMongod::ServiceEntryPointMongod(TransportLayer* tl) : _tl(tl) {}
+
+void ServiceEntryPointMongod::startSession(Session&& session) {
+ launchWrappedServiceEntryWorkerThread(std::move(session),
+ [this](Session* session) { _sessionLoop(session); });
+}
+
+void ServiceEntryPointMongod::_sessionLoop(Session* session) {
+ Message inMessage;
+ bool inExhaust = false;
+ int64_t counter = 0;
+
+ while (true) {
+ // 1. Source a Message from the client (unless we are exhausting)
+ if (!inExhaust) {
+ inMessage.reset();
+ auto status = session->sourceMessage(&inMessage).wait();
+
+ if (ErrorCodes::isInterruption(status.code())) {
+ break;
+ }
+
+ uassertStatusOK(status);
+ }
+
+ // 2. Pass sourced Message up to mongod
+ DbResponse dbresponse;
+ {
+ auto opCtx = cc().makeOperationContext();
+ assembleResponse(opCtx.get(), inMessage, dbresponse, session->remote());
+
+ // opCtx must go out of scope here so that the operation cannot show
+ // up in currentOp results after the response reaches the client
+ }
+
+ // 3. Format our response, if we have one
+ Message& toSink = dbresponse.response;
+ if (!toSink.empty()) {
+ toSink.header().setId(nextMessageId());
+ toSink.header().setResponseToMsgId(inMessage.header().getId());
+
+ // If this is an exhaust cursor, don't source more Messages
+ if (dbresponse.exhaustNS.size() > 0 && setExhaustMessage(&inMessage, dbresponse)) {
+ log() << "we are in exhaust";
+ inExhaust = true;
+ } else {
+ inExhaust = false;
+ }
+
+ // 4. Sink our response to the client
+ uassertStatusOK(session->sinkMessage(toSink).wait());
+ } else {
+ inExhaust = false;
+ }
+
+ if ((counter++ & 0xf) == 0) {
+ markThreadIdle();
+ }
+ }
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/service_entry_point_mongod.h b/src/mongo/db/service_entry_point_mongod.h
new file mode 100644
index 00000000000..31ed05098c8
--- /dev/null
+++ b/src/mongo/db/service_entry_point_mongod.h
@@ -0,0 +1,63 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <vector>
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/transport/service_entry_point.h"
+
+namespace mongo {
+
+namespace transport {
+class Session;
+class TransportLayer;
+} // namespace transport
+
+/**
+ * The entry point from the TransportLayer into Mongod. startSession() spawns and
+ * detaches a new thread for each incoming connection (transport::Session).
+ */
+class ServiceEntryPointMongod final : public ServiceEntryPoint {
+ MONGO_DISALLOW_COPYING(ServiceEntryPointMongod);
+
+public:
+ explicit ServiceEntryPointMongod(transport::TransportLayer* tl);
+
+ virtual ~ServiceEntryPointMongod() = default;
+
+ void startSession(transport::Session&& session) override;
+
+private:
+ void _sessionLoop(transport::Session* session);
+
+ transport::TransportLayer* _tl;
+};
+
+} // namespace mongo
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index 1f0411b13cf..8b0b8321aaa 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -98,6 +98,7 @@ env.Library(
'$BUILD_DIR/mongo/s/catalog/replset/sharding_catalog_client_impl',
'$BUILD_DIR/mongo/s/catalog/replset/sharding_catalog_manager_impl',
'$BUILD_DIR/mongo/s/coreshard',
+ '$BUILD_DIR/mongo/transport/transport_layer_mock',
'$BUILD_DIR/mongo/util/clock_source_mock',
'$BUILD_DIR/mongo/util/net/message_port_mock',
'mongoscore',
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp b/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp
index 53aa481fdbc..ac7d4dd759e 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp
+++ b/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp
@@ -78,8 +78,7 @@ protected:
*/
void setUp() override {
ShardingCatalogTestFixture::setUp();
-
- getMessagingPort()->setRemote(HostAndPort("FakeRemoteClient:34567"));
+ setRemote(HostAndPort("FakeRemoteClient:34567"));
configTargeter()->setConnectionStringReturnValue(_configConnStr);
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_drop_coll_test.cpp b/src/mongo/s/catalog/replset/sharding_catalog_drop_coll_test.cpp
index b81a5bbc68f..f5609adef83 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_drop_coll_test.cpp
+++ b/src/mongo/s/catalog/replset/sharding_catalog_drop_coll_test.cpp
@@ -58,8 +58,8 @@ class DropColl2ShardTest : public ShardingCatalogTestFixture {
public:
void setUp() override {
ShardingCatalogTestFixture::setUp();
+ setRemote(_clientHost);
- getMessagingPort()->setRemote(_clientHost);
configTargeter()->setFindHostReturnValue(_configHost);
configTargeter()->setConnectionStringReturnValue(_configCS);
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_remove_shard_test.cpp b/src/mongo/s/catalog/replset/sharding_catalog_remove_shard_test.cpp
index 59cf084d98d..36fcb6eba63 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_remove_shard_test.cpp
+++ b/src/mongo/s/catalog/replset/sharding_catalog_remove_shard_test.cpp
@@ -124,7 +124,7 @@ TEST_F(RemoveShardTest, RemoveShardCantRemoveLastShard) {
TEST_F(RemoveShardTest, RemoveShardStartDraining) {
string shardName = "shardToRemove";
const HostAndPort clientHost{"client1:12345"};
- getMessagingPort()->setRemote(clientHost);
+ setRemote(clientHost);
auto future = launchAsync([&] {
auto result = assertGet(catalogClient()->removeShard(operationContext(), shardName));
@@ -288,7 +288,7 @@ TEST_F(RemoveShardTest, RemoveShardStillDrainingDatabasesRemaining) {
TEST_F(RemoveShardTest, RemoveShardCompletion) {
string shardName = "shardToRemove";
const HostAndPort clientHost{"client1:12345"};
- getMessagingPort()->setRemote(clientHost);
+ setRemote(clientHost);
auto future = launchAsync([&] {
auto result = assertGet(catalogClient()->removeShard(operationContext(), shardName));
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_shard_collection_test.cpp b/src/mongo/s/catalog/replset/sharding_catalog_shard_collection_test.cpp
index d1d36756984..57e35290ced 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_shard_collection_test.cpp
+++ b/src/mongo/s/catalog/replset/sharding_catalog_shard_collection_test.cpp
@@ -87,7 +87,7 @@ public:
ShardingCatalogTestFixture::setUp();
configTargeter()->setFindHostReturnValue(configHost);
configTargeter()->setConnectionStringReturnValue(configCS);
- getMessagingPort()->setRemote(clientHost);
+ setRemote(clientHost);
}
void expectGetDatabase(const DatabaseType& expectedDb) {
diff --git a/src/mongo/s/chunk_manager_tests.cpp b/src/mongo/s/chunk_manager_tests.cpp
index 280b9921d69..5fe831b50a9 100644
--- a/src/mongo/s/chunk_manager_tests.cpp
+++ b/src/mongo/s/chunk_manager_tests.cpp
@@ -68,7 +68,7 @@ class ChunkManagerFixture : public ShardingCatalogTestFixture {
public:
void setUp() override {
ShardingCatalogTestFixture::setUp();
- getMessagingPort()->setRemote(HostAndPort("FakeRemoteClient:34567"));
+ setRemote(HostAndPort("FakeRemoteClient:34567"));
configTargeter()->setFindHostReturnValue(configHost);
}
diff --git a/src/mongo/s/commands/request.cpp b/src/mongo/s/commands/request.cpp
index 050483d2969..6605db10424 100644
--- a/src/mongo/s/commands/request.cpp
+++ b/src/mongo/s/commands/request.cpp
@@ -42,6 +42,7 @@
#include "mongo/s/cluster_last_error_info.h"
#include "mongo/s/commands/strategy.h"
#include "mongo/s/grid.h"
+#include "mongo/transport/session.h"
#include "mongo/util/log.h"
#include "mongo/util/timer.h"
@@ -49,8 +50,8 @@ namespace mongo {
using std::string;
-Request::Request(Message& m, AbstractMessagingPort* p)
- : _clientInfo(&cc()), _m(m), _d(m), _p(p), _id(_m.header().getId()), _didInit(false) {
+Request::Request(Message& m)
+ : _clientInfo(&cc()), _m(m), _d(m), _id(_m.header().getId()), _didInit(false) {
ClusterLastErrorInfo::get(_clientInfo).newRequest();
}
@@ -122,4 +123,8 @@ void Request::process(OperationContext* txn, int attempt) {
<< " op: " << op << " attempt: " << attempt;
}
+transport::Session* Request::session() const {
+ return _clientInfo->session();
+}
+
} // namespace mongo
diff --git a/src/mongo/s/commands/request.h b/src/mongo/s/commands/request.h
index 5664474e845..ace3e77a720 100644
--- a/src/mongo/s/commands/request.h
+++ b/src/mongo/s/commands/request.h
@@ -37,11 +37,15 @@ namespace mongo {
class Client;
class OperationContext;
+namespace transport {
+class Session;
+} // namespace transport
+
class Request {
MONGO_DISALLOW_COPYING(Request);
public:
- Request(Message& m, AbstractMessagingPort* p);
+ Request(Message& m);
const char* getns() const {
return _d.getns();
@@ -71,9 +75,8 @@ public:
DbMessage& d() {
return _d;
}
- AbstractMessagingPort* p() const {
- return _p;
- }
+
+ transport::Session* session() const;
void process(OperationContext* txn, int attempt = 0);
@@ -84,7 +87,6 @@ private:
Message& _m;
DbMessage _d;
- AbstractMessagingPort* const _p;
int32_t _id;
diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp
index fcba8883592..69d02a3e0e9 100644
--- a/src/mongo/s/commands/strategy.cpp
+++ b/src/mongo/s/commands/strategy.cpp
@@ -176,7 +176,7 @@ void Strategy::queryOp(OperationContext* txn, Request& request) {
BSONObj explainObj = explainBuilder.done();
replyToQuery(0, // query result flags
- request.p(),
+ request.session(),
request.m(),
static_cast<const void*>(explainObj.objdata()),
explainObj.objsize(),
@@ -202,7 +202,7 @@ void Strategy::queryOp(OperationContext* txn, Request& request) {
obj.appendSelfToBufBuilder(reply.bufBuilderForResults());
numResults++;
}
- reply.send(request.p(),
+ reply.send(request.session(),
0, // query result flags
request.m(),
numResults,
@@ -265,7 +265,7 @@ void Strategy::clientCommandOp(OperationContext* txn, Request& request) {
BSONObjBuilder builder(reply.bufBuilderForResults());
runAgainstRegistered(txn, q.ns, cmdObj, builder, q.queryOptions);
}
- reply.sendCommandReply(request.p(), request.m());
+ reply.sendCommandReply(request.session(), request.m());
return;
} catch (const StaleConfigException& e) {
if (loops <= 0)
@@ -288,7 +288,7 @@ void Strategy::clientCommandOp(OperationContext* txn, Request& request) {
BSONObjBuilder builder(reply.bufBuilderForResults());
Command::appendCommandStatus(builder, e.toStatus());
}
- reply.sendCommandReply(request.p(), request.m());
+ reply.sendCommandReply(request.session(), request.m());
return;
}
}
@@ -325,7 +325,7 @@ bool Strategy::handleSpecialNamespaces(OperationContext* txn, Request& request,
}
BSONObj x = reply.done();
- replyToQuery(0, request.p(), request.m(), x);
+ replyToQuery(0, request.session(), request.m(), x);
return true;
}
@@ -369,7 +369,7 @@ void Strategy::getMore(OperationContext* txn, Request& request) {
const NamespaceString nss(ns);
auto statusGetDb = grid.catalogCache()->getDatabase(txn, nss.db().toString());
if (statusGetDb == ErrorCodes::NamespaceNotFound) {
- replyToQuery(ResultFlag_CursorNotFound, request.p(), request.m(), 0, 0, 0);
+ replyToQuery(ResultFlag_CursorNotFound, request.session(), request.m(), 0, 0, 0);
return;
}
@@ -384,7 +384,7 @@ void Strategy::getMore(OperationContext* txn, Request& request) {
auto cursorResponse = ClusterFind::runGetMore(txn, getMoreRequest);
if (cursorResponse == ErrorCodes::CursorNotFound) {
- replyToQuery(ResultFlag_CursorNotFound, request.p(), request.m(), 0, 0, 0);
+ replyToQuery(ResultFlag_CursorNotFound, request.session(), request.m(), 0, 0, 0);
return;
}
uassertStatusOK(cursorResponse.getStatus());
@@ -399,7 +399,7 @@ void Strategy::getMore(OperationContext* txn, Request& request) {
}
replyToQuery(0,
- request.p(),
+ request.session(),
request.m(),
buffer.buf(),
buffer.len(),
diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp
index b6893c076ea..e2957554ad2 100644
--- a/src/mongo/s/query/async_results_merger_test.cpp
+++ b/src/mongo/s/query/async_results_merger_test.cpp
@@ -67,8 +67,8 @@ public:
void setUp() override {
ShardingTestFixture::setUp();
+ setRemote(HostAndPort("ClientHost", 12345));
- getMessagingPort()->setRemote(HostAndPort("ClientHost", 12345));
configTargeter()->setFindHostReturnValue(kTestConfigShardHost);
std::vector<ShardType> shards;
diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp
index d4912eead77..f4fda4103ee 100644
--- a/src/mongo/s/server.cpp
+++ b/src/mongo/s/server.cpp
@@ -77,12 +77,14 @@
#include "mongo/s/mongos_options.h"
#include "mongo/s/query/cluster_cursor_cleanup_job.h"
#include "mongo/s/query/cluster_cursor_manager.h"
+#include "mongo/s/service_entry_point_mongos.h"
#include "mongo/s/sharding_egress_metadata_hook_for_mongos.h"
#include "mongo/s/sharding_initialization.h"
#include "mongo/s/sharding_uptime_reporter.h"
#include "mongo/s/version_mongos.h"
#include "mongo/stdx/memory.h"
#include "mongo/stdx/thread.h"
+#include "mongo/transport/transport_layer_legacy.h"
#include "mongo/util/admin_access.h"
#include "mongo/util/cmdline_utils/censor_cmdline.h"
#include "mongo/util/concurrency/thread_name.h"
@@ -92,7 +94,6 @@
#include "mongo/util/log.h"
#include "mongo/util/net/hostname_canonicalization_worker.h"
#include "mongo/util/net/message.h"
-#include "mongo/util/net/message_server.h"
#include "mongo/util/net/socket_exception.h"
#include "mongo/util/net/ssl_manager.h"
#include "mongo/util/ntservice.h"
@@ -160,58 +161,6 @@ static BSONObj buildErrReply(const DBException& ex) {
return errB.obj();
}
-class ShardedMessageHandler : public MessageHandler {
-public:
- virtual ~ShardedMessageHandler() {}
-
- virtual void connected(AbstractMessagingPort* p) {
- Client::initThread("conn", getGlobalServiceContext(), p);
- }
-
- virtual void process(Message& m, AbstractMessagingPort* p) {
- verify(p);
- Request r(m, p);
- auto txn = cc().makeOperationContext();
-
- try {
- r.init(txn.get());
- r.process(txn.get());
- } catch (const AssertionException& ex) {
- LOG(ex.isUserAssertion() ? 1 : 0) << "Assertion failed"
- << " while processing "
- << networkOpToString(m.operation()) << " op"
- << " for " << r.getnsIfPresent() << causedBy(ex);
-
- if (r.expectResponse()) {
- m.header().setId(r.id());
- replyToQuery(ResultFlag_ErrSet, p, m, buildErrReply(ex));
- }
-
- // We *always* populate the last error for now
- LastError::get(cc()).setLastError(ex.getCode(), ex.what());
- } catch (const DBException& ex) {
- log() << "Exception thrown"
- << " while processing " << networkOpToString(m.operation()) << " op"
- << " for " << r.getnsIfPresent() << causedBy(ex);
-
- if (r.expectResponse()) {
- m.header().setId(r.id());
- replyToQuery(ResultFlag_ErrSet, p, m, buildErrReply(ex));
- }
-
- // We *always* populate the last error for now
- LastError::get(cc()).setLastError(ex.getCode(), ex.what());
- }
-
- // Release connections back to pool, if any still cached
- ShardConnection::releaseMyConnections();
- }
-
- virtual void close() {
- Client::destroy();
- }
-};
-
} // namespace mongo
using namespace mongo;
@@ -278,6 +227,19 @@ static ExitCode runMongosServer() {
_initWireSpec();
+ transport::TransportLayerLegacy::Options opts;
+ opts.port = serverGlobalParams.port;
+ opts.ipList = serverGlobalParams.bind_ip;
+
+ auto sep =
+ std::make_shared<ServiceEntryPointMongos>(getGlobalServiceContext()->getTransportLayer());
+
+ auto transportLayer = stdx::make_unique<transport::TransportLayerLegacy>(opts, sep);
+ auto res = transportLayer->setup();
+ if (!res.isOK()) {
+ return EXIT_NET_ERROR;
+ }
+
// Add sharding hooks to both connection pools - ShardingConnectionHook includes auth hooks
globalConnPool.addHook(new ShardingConnectionHookForMongos(false));
shardConnectionPool.addHook(new ShardingConnectionHookForMongos(true));
@@ -352,19 +314,13 @@ static ExitCode runMongosServer() {
PeriodicTask::startRunningPeriodicTasks();
- MessageServer::Options opts;
- opts.port = serverGlobalParams.port;
- opts.ipList = serverGlobalParams.bind_ip;
-
- auto handler = std::make_shared<ShardedMessageHandler>();
- MessageServer* server = createServer(opts, std::move(handler), getGlobalServiceContext());
- if (!server->setupSockets()) {
+ auto start = getGlobalServiceContext()->addAndStartTransportLayer(std::move(transportLayer));
+ if (!start.isOK()) {
return EXIT_NET_ERROR;
}
- server->run();
- // MessageServer::run will return when exit code closes its socket
- return inShutdown() ? EXIT_CLEAN : EXIT_NET_ERROR;
+ // Block until shutdown.
+ return waitForShutdown();
}
MONGO_INITIALIZER_GENERAL(ForkServer, ("EndStartupOptionHandling"), ("default"))
diff --git a/src/mongo/s/service_entry_point_mongos.cpp b/src/mongo/s/service_entry_point_mongos.cpp
new file mode 100644
index 00000000000..fe7d92fd1ad
--- /dev/null
+++ b/src/mongo/s/service_entry_point_mongos.cpp
@@ -0,0 +1,141 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/service_entry_point_mongos.h"
+
+#include <vector>
+
+#include "mongo/db/lasterror.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/server_options.h"
+#include "mongo/db/service_context.h"
+#include "mongo/s/client/shard_connection.h"
+#include "mongo/s/commands/request.h"
+#include "mongo/stdx/thread.h"
+#include "mongo/transport/service_entry_point_utils.h"
+#include "mongo/transport/session.h"
+#include "mongo/transport/transport_layer.h"
+#include "mongo/util/exit.h"
+#include "mongo/util/log.h"
+#include "mongo/util/net/message.h"
+#include "mongo/util/net/socket_exception.h"
+#include "mongo/util/net/thread_idle_callback.h"
+#include "mongo/util/quick_exit.h"
+#include "mongo/util/scopeguard.h"
+
+namespace mongo {
+
+namespace {
+
+BSONObj buildErrReply(const DBException& ex) {
+ BSONObjBuilder errB;
+ errB.append("$err", ex.what());
+ errB.append("code", ex.getCode());
+ if (!ex._shard.empty()) {
+ errB.append("shard", ex._shard);
+ }
+ return errB.obj();
+}
+
+} // namespace
+
+using transport::Session;
+using transport::TransportLayer;
+
+ServiceEntryPointMongos::ServiceEntryPointMongos(TransportLayer* tl) : _tl(tl) {}
+
+void ServiceEntryPointMongos::startSession(Session&& session) {
+ launchWrappedServiceEntryWorkerThread(std::move(session),
+ [this](Session* session) { _sessionLoop(session); });
+}
+
+void ServiceEntryPointMongos::_sessionLoop(Session* session) {
+ Message message;
+ int64_t counter = 0;
+
+ while (true) {
+ // Release any cached egress connections for client back to pool before destroying
+ auto guard = MakeGuard(ShardConnection::releaseMyConnections);
+
+ message.reset();
+
+ // 1. Source a Message from the client
+ {
+ auto status = session->sourceMessage(&message).wait();
+
+ if (ErrorCodes::isInterruption(status.code())) {
+ break;
+ }
+
+ uassertStatusOK(status);
+ }
+
+ // 2. Build a sharding request
+ Request r(message);
+ auto txn = cc().makeOperationContext();
+
+ try {
+ r.init(txn.get());
+ r.process(txn.get());
+ } catch (const AssertionException& ex) {
+ LOG(ex.isUserAssertion() ? 1 : 0) << "Assertion failed"
+ << " while processing "
+ << networkOpToString(message.operation()) << " op"
+ << " for " << r.getnsIfPresent() << causedBy(ex);
+ if (r.expectResponse()) {
+ message.header().setId(r.id());
+ replyToQuery(ResultFlag_ErrSet, session, message, buildErrReply(ex));
+ }
+
+ // We *always* populate the last error for now
+ LastError::get(cc()).setLastError(ex.getCode(), ex.what());
+ } catch (const DBException& ex) {
+ log() << "Exception thrown"
+ << " while processing " << networkOpToString(message.operation()) << " op"
+ << " for " << r.getnsIfPresent() << causedBy(ex);
+
+ if (r.expectResponse()) {
+ message.header().setId(r.id());
+ replyToQuery(ResultFlag_ErrSet, session, message, buildErrReply(ex));
+ }
+
+ // We *always* populate the last error for now
+ LastError::get(cc()).setLastError(ex.getCode(), ex.what());
+ }
+
+ if ((counter++ & 0xf) == 0) {
+ markThreadIdle();
+ }
+ }
+}
+
+} // namespace mongo
diff --git a/src/mongo/s/service_entry_point_mongos.h b/src/mongo/s/service_entry_point_mongos.h
new file mode 100644
index 00000000000..d96540b2b1e
--- /dev/null
+++ b/src/mongo/s/service_entry_point_mongos.h
@@ -0,0 +1,63 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <vector>
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/transport/service_entry_point.h"
+
+namespace mongo {
+
+namespace transport {
+class Session;
+class TransportLayer;
+} // namespace transport
+
+/**
+ * The entry point from the TransportLayer into Mongos. startSession() spawns and
+ * detaches a new thread for each incoming connection (transport::Session).
+ */
+class ServiceEntryPointMongos final : public ServiceEntryPoint {
+ MONGO_DISALLOW_COPYING(ServiceEntryPointMongos);
+
+public:
+ ServiceEntryPointMongos(transport::TransportLayer* tl);
+
+ virtual ~ServiceEntryPointMongos() = default;
+
+ void startSession(transport::Session&& session) override;
+
+private:
+ void _sessionLoop(transport::Session* session);
+
+ transport::TransportLayer* _tl;
+};
+
+} // namespace mongo
diff --git a/src/mongo/s/sharding_test_fixture.cpp b/src/mongo/s/sharding_test_fixture.cpp
index 5d8c3e2285e..f099ac2e67b 100644
--- a/src/mongo/s/sharding_test_fixture.cpp
+++ b/src/mongo/s/sharding_test_fixture.cpp
@@ -65,6 +65,8 @@
#include "mongo/s/write_ops/batched_command_request.h"
#include "mongo/s/write_ops/batched_command_response.h"
#include "mongo/stdx/memory.h"
+#include "mongo/transport/transport_layer.h"
+#include "mongo/transport/transport_layer_mock.h"
#include "mongo/util/clock_source_mock.h"
#include "mongo/util/tick_source_mock.h"
@@ -92,8 +94,12 @@ void ShardingTestFixture::setUp() {
_service->setFastClockSource(stdx::make_unique<ClockSourceMock>());
_service->setPreciseClockSource(stdx::make_unique<ClockSourceMock>());
_service->setTickSource(stdx::make_unique<TickSourceMock>());
- _messagePort = stdx::make_unique<MessagingPortMock>();
- _client = _service->makeClient("ShardingTestFixture", _messagePort.get());
+ auto tlMock = stdx::make_unique<transport::TransportLayerMock>();
+ _transportLayer = tlMock.get();
+ _service->addAndStartTransportLayer(std::move(tlMock));
+ _transportSession =
+ stdx::make_unique<transport::Session>(HostAndPort{}, HostAndPort{}, _transportLayer);
+ _client = _service->makeClient("ShardingTestFixture", _transportSession.get());
_opCtx = _client->makeOperationContext();
// Set up executor pool used for most operations.
@@ -184,6 +190,7 @@ void ShardingTestFixture::tearDown() {
grid.catalogClient(_opCtx.get())->shutDown(_opCtx.get());
grid.clearForUnitTests();
+ _transportSession.reset();
_opCtx.reset();
_client.reset();
_service.reset();
@@ -238,10 +245,6 @@ executor::TaskExecutor* ShardingTestFixture::executor() const {
return _executor;
}
-MessagingPortMock* ShardingTestFixture::getMessagingPort() const {
- return _messagePort.get();
-}
-
DistLockManagerMock* ShardingTestFixture::distLock() const {
invariant(_distLockManager);
return _distLockManager;
@@ -508,6 +511,10 @@ void ShardingTestFixture::expectCount(const HostAndPort& configHost,
});
}
+void ShardingTestFixture::setRemote(const HostAndPort& remote) {
+ *_transportSession = transport::Session{remote, HostAndPort{}, _transportLayer};
+}
+
void ShardingTestFixture::checkReadConcern(const BSONObj& cmdObj,
const Timestamp& expectedTS,
long long expectedTerm) const {
diff --git a/src/mongo/s/sharding_test_fixture.h b/src/mongo/s/sharding_test_fixture.h
index 2a930bbefc4..9365a18dd66 100644
--- a/src/mongo/s/sharding_test_fixture.h
+++ b/src/mongo/s/sharding_test_fixture.h
@@ -60,6 +60,10 @@ class NetworkInterfaceMock;
class TaskExecutor;
} // namespace executor
+namespace transport {
+class TransportLayerMock;
+} // namepsace transport
+
/**
* Sets up the mocked out objects for testing the replica-set backed catalog manager and catalog
* client.
@@ -97,7 +101,7 @@ protected:
executor::TaskExecutor* executor() const;
- MessagingPortMock* getMessagingPort() const;
+ transport::Session* getTransportSession() const;
DistLockManagerMock* distLock() const;
@@ -196,6 +200,8 @@ protected:
void shutdownExecutor();
+ void setRemote(const HostAndPort& remote);
+
/**
* Checks that the given command has the expected settings for read after opTime.
*/
@@ -207,7 +213,8 @@ private:
std::unique_ptr<ServiceContext> _service;
ServiceContext::UniqueClient _client;
ServiceContext::UniqueOperationContext _opCtx;
- std::unique_ptr<MessagingPortMock> _messagePort;
+ transport::TransportLayerMock* _transportLayer;
+ std::unique_ptr<transport::Session> _transportSession;
RemoteCommandTargeterFactoryMock* _targeterFactory;
RemoteCommandTargeterMock* _configTargeter;
diff --git a/src/mongo/s/write_ops/batch_write_exec_test.cpp b/src/mongo/s/write_ops/batch_write_exec_test.cpp
index dea991b80ef..d79e45a1fa9 100644
--- a/src/mongo/s/write_ops/batch_write_exec_test.cpp
+++ b/src/mongo/s/write_ops/batch_write_exec_test.cpp
@@ -66,7 +66,7 @@ public:
void setUp() override {
ShardingTestFixture::setUp();
- getMessagingPort()->setRemote(HostAndPort("ClientHost", 12345));
+ setRemote(HostAndPort("ClientHost", 12345));
// Set up the RemoteCommandTargeter for the config shard.
configTargeter()->setFindHostReturnValue(kTestConfigShardHost);
diff --git a/src/mongo/tools/bridge.cpp b/src/mongo/tools/bridge.cpp
index 7109c174a99..d37751fe537 100644
--- a/src/mongo/tools/bridge.cpp
+++ b/src/mongo/tools/bridge.cpp
@@ -320,17 +320,17 @@ public:
log() << "Setting random seed: " << mongoBridgeGlobalParams.seed;
}
- void accepted(AbstractMessagingPort* mp) override final {
+ void accepted(std::unique_ptr<AbstractMessagingPort> mp) override final {
{
stdx::lock_guard<stdx::mutex> lk(_portsMutex);
if (_inShutdown.load()) {
mp->shutdown();
return;
}
- _ports.insert(mp);
+ _ports.insert(mp.get());
}
- Forwarder f(mp, &_settingsMutex, &_settings, _seedSource.nextInt64());
+ Forwarder f(mp.release(), &_settingsMutex, &_settings, _seedSource.nextInt64());
stdx::thread t(f);
t.detach();
}
diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript
index 82045d5c0e8..adedade05a3 100644
--- a/src/mongo/transport/SConscript
+++ b/src/mongo/transport/SConscript
@@ -13,13 +13,10 @@ env.CppUnitTest(
)
env.Library(
- target='service_entry_point_test_suite',
+ target='transport_layer_common',
source=[
- 'service_entry_point_test_suite.cpp',
- ## move these somewhere better:
'session.cpp',
'ticket.cpp',
- 'transport_layer.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/unittest/unittest',
@@ -28,10 +25,63 @@ env.Library(
],
)
+env.Library(
+ target='transport_layer_manager',
+ source=[
+ 'transport_layer_manager.cpp',
+ ],
+ LIBDEPS=[
+ 'transport_layer_common',
+ ],
+)
+
+env.Library(
+ target='transport_layer_mock',
+ source=[
+ 'transport_layer_mock.cpp',
+ ],
+ LIBDEPS=[
+ 'transport_layer_common',
+ ],
+)
+
+env.Library(
+ target='transport_layer_legacy',
+ source=[
+ 'transport_layer_legacy.cpp',
+ ],
+ LIBDEPS=[
+ 'transport_layer_common',
+ '$BUILD_DIR/mongo/db/service_context',
+ '$BUILD_DIR/mongo/db/stats/counters',
+ ],
+)
+
+env.Library(
+ target='service_entry_point_test_suite',
+ source=[
+ 'service_entry_point_test_suite.cpp',
+ ],
+ LIBDEPS=[
+ 'transport_layer_common',
+ '$BUILD_DIR/mongo/unittest/unittest',
+ ],
+)
+
+env.Library(
+ target='service_entry_point_utils',
+ source=[
+ 'service_entry_point_utils.cpp',
+ ],
+ LIBDEPS=[
+ 'transport_layer_common',
+ ],
+)
+
env.CppUnitTest(
target='service_entry_point_mock_test',
source=[
- 'service_entry_point_mock.cpp', ## move?
+ 'service_entry_point_mock.cpp',
'service_entry_point_mock_test.cpp',
],
LIBDEPS=[
diff --git a/src/mongo/transport/service_entry_point_mock.cpp b/src/mongo/transport/service_entry_point_mock.cpp
index 6cfe3606043..8d4530d7f13 100644
--- a/src/mongo/transport/service_entry_point_mock.cpp
+++ b/src/mongo/transport/service_entry_point_mock.cpp
@@ -100,16 +100,12 @@ void ServiceEntryPointMock::run(transport::Session&& session) {
}
// sourceMessage()
- auto sourceTicket = _tl->sourceMessage(session, &inMessage);
- auto sourceRes = _tl->wait(std::move(sourceTicket));
- if (!sourceRes.isOK()) {
+ if (!session.sourceMessage(&inMessage).wait().isOK()) {
break;
}
// sinkMessage()
- auto sinkTicket = _tl->sinkMessage(session, _outMessage);
- auto sinkRes = _tl->wait(std::move(sinkTicket));
- if (!sinkRes.isOK()) {
+ if (!session.sinkMessage(_outMessage).wait().isOK()) {
break;
}
}
diff --git a/src/mongo/transport/service_entry_point_test_suite.cpp b/src/mongo/transport/service_entry_point_test_suite.cpp
index e4f93ae3344..e2a9c7f336c 100644
--- a/src/mongo/transport/service_entry_point_test_suite.cpp
+++ b/src/mongo/transport/service_entry_point_test_suite.cpp
@@ -57,7 +57,6 @@ namespace mongo {
using namespace transport;
using namespace stdx::placeholders;
-using SessionId = Session::SessionId;
using TicketCallback = TransportLayer::TicketCallback;
namespace {
@@ -107,7 +106,7 @@ ServiceEntryPointTestSuite::MockTicket::MockTicket(const Session& session,
ServiceEntryPointTestSuite::MockTicket::MockTicket(const Session& session, Date_t expiration)
: _sessionId(session.id()), _expiration(expiration) {}
-SessionId ServiceEntryPointTestSuite::MockTicket::sessionId() const {
+Session::Id ServiceEntryPointTestSuite::MockTicket::sessionId() const {
return _sessionId;
}
@@ -140,20 +139,35 @@ Ticket ServiceEntryPointTestSuite::MockTLHarness::sinkMessage(const Session& ses
return _sinkMessage(session, message, expiration);
}
-Status ServiceEntryPointTestSuite::MockTLHarness::wait(Ticket ticket) {
+Status ServiceEntryPointTestSuite::MockTLHarness::wait(Ticket&& ticket) {
return _wait(std::move(ticket));
}
-void ServiceEntryPointTestSuite::MockTLHarness::asyncWait(Ticket ticket, TicketCallback callback) {
+void ServiceEntryPointTestSuite::MockTLHarness::asyncWait(Ticket&& ticket,
+ TicketCallback callback) {
return _asyncWait(std::move(ticket), std::move(callback));
}
+std::string ServiceEntryPointTestSuite::MockTLHarness::getX509SubjectName(const Session& session) {
+ return "mock";
+}
+
+void ServiceEntryPointTestSuite::MockTLHarness::registerTags(const Session& session) {}
+
+TransportLayer::Stats ServiceEntryPointTestSuite::MockTLHarness::sessionStats() {
+ return Stats();
+}
+
void ServiceEntryPointTestSuite::MockTLHarness::end(const Session& session) {
return _end(session);
}
-void ServiceEntryPointTestSuite::MockTLHarness::endAllSessions() {
- return _endAllSessions();
+void ServiceEntryPointTestSuite::MockTLHarness::endAllSessions(Session::TagMask tags) {
+ return _endAllSessions(tags);
+}
+
+Status ServiceEntryPointTestSuite::MockTLHarness::start() {
+ return _start();
}
void ServiceEntryPointTestSuite::MockTLHarness::shutdown() {
@@ -180,13 +194,13 @@ Status ServiceEntryPointTestSuite::MockTLHarness::_waitOnceThenError(transport::
Ticket ServiceEntryPointTestSuite::MockTLHarness::_defaultSource(const Session& s,
Message* m,
Date_t d) {
- return Ticket(stdx::make_unique<ServiceEntryPointTestSuite::MockTicket>(s, m, d));
+ return Ticket(this, stdx::make_unique<ServiceEntryPointTestSuite::MockTicket>(s, m, d));
}
Ticket ServiceEntryPointTestSuite::MockTLHarness::_defaultSink(const Session& s,
const Message&,
Date_t d) {
- return Ticket(stdx::make_unique<ServiceEntryPointTestSuite::MockTicket>(s, d));
+ return Ticket(this, stdx::make_unique<ServiceEntryPointTestSuite::MockTicket>(s, d));
}
Ticket ServiceEntryPointTestSuite::MockTLHarness::_sinkThenErrorOnWait(const Session& s,
@@ -308,6 +322,7 @@ void ServiceEntryPointTestSuite::interruptingSessionTest() {
auto resumeAFuture = resumeA.get_future();
stdx::promise<void> testComplete;
+
auto testFuture = testComplete.get_future();
_tl->_resetHooks();
@@ -376,10 +391,11 @@ void ServiceEntryPointTestSuite::burstStressTest(int numSessions,
Milliseconds delay) {
AtomicWord<int> ended{0};
stdx::promise<void> allSessionsComplete;
+
auto allCompleteFuture = allSessionsComplete.get_future();
stdx::mutex cyclesLock;
- std::unordered_map<SessionId, int> completedCycles;
+ std::unordered_map<Session::Id, int> completedCycles;
_tl->_resetHooks();
diff --git a/src/mongo/transport/service_entry_point_test_suite.h b/src/mongo/transport/service_entry_point_test_suite.h
index a1999efd873..828fee5fe3c 100644
--- a/src/mongo/transport/service_entry_point_test_suite.h
+++ b/src/mongo/transport/service_entry_point_test_suite.h
@@ -99,7 +99,7 @@ private:
MockTicket(MockTicket&&) = default;
MockTicket& operator=(MockTicket&&) = default;
- transport::Session::SessionId sessionId() const override;
+ transport::Session::Id sessionId() const override;
Date_t expiration() const override;
@@ -107,7 +107,7 @@ private:
private:
boost::optional<Message*> _message;
- SessionId _sessionId;
+ transport::Session::Id _sessionId;
Date_t _expiration;
};
@@ -127,10 +127,15 @@ private:
const transport::Session& session,
const Message& message,
Date_t expiration = transport::Ticket::kNoExpirationDate) override;
- Status wait(transport::Ticket ticket) override;
- void asyncWait(transport::Ticket ticket, TicketCallback callback) override;
+ Status wait(transport::Ticket&& ticket) override;
+ void asyncWait(transport::Ticket&& ticket, TicketCallback callback) override;
+ std::string getX509SubjectName(const transport::Session& session) override;
+ void registerTags(const transport::Session& session) override;
+ Stats sessionStats() override;
void end(const transport::Session& session) override;
- void endAllSessions() override;
+ void endAllSessions(
+ transport::Session::TagMask tags = transport::Session::kEmptyTagMask) override;
+ Status start() override;
void shutdown() override;
ServiceEntryPointTestSuite::MockTicket* getMockTicket(const transport::Ticket& ticket);
@@ -143,7 +148,9 @@ private:
stdx::function<Status(transport::Ticket)> _wait;
stdx::function<void(transport::Ticket, TicketCallback)> _asyncWait;
stdx::function<void(const transport::Session&)> _end;
- stdx::function<void(void)> _endAllSessions = [] {};
+ stdx::function<void(transport::Session::TagMask tags)> _endAllSessions =
+ [](transport::Session::TagMask tags) {};
+ stdx::function<Status(void)> _start = [] { return Status::OK(); };
stdx::function<void(void)> _shutdown = [] {};
// Pre-set hook methods
diff --git a/src/mongo/transport/service_entry_point_utils.cpp b/src/mongo/transport/service_entry_point_utils.cpp
new file mode 100644
index 00000000000..efe934967fb
--- /dev/null
+++ b/src/mongo/transport/service_entry_point_utils.cpp
@@ -0,0 +1,150 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/transport/service_entry_point_utils.h"
+
+#include "mongo/db/client.h"
+#include "mongo/db/server_options.h"
+#include "mongo/stdx/memory.h"
+#include "mongo/transport/session.h"
+#include "mongo/transport/transport_layer.h"
+#include "mongo/util/assert_util.h"
+#include "mongo/util/debug_util.h"
+#include "mongo/util/log.h"
+#include "mongo/util/net/socket_exception.h"
+#include "mongo/util/quick_exit.h"
+
+#ifdef __linux__ // TODO: consider making this ifndef _WIN32
+#include <sys/resource.h>
+#endif
+
+#if !defined(__has_feature)
+#define __has_feature(x) 0
+#endif
+
+namespace mongo {
+
+namespace {
+
+struct Context {
+ Context(transport::Session session, stdx::function<void(transport::Session*)> task)
+ : session(std::move(session)), task(std::move(task)) {}
+
+ transport::Session session;
+ stdx::function<void(transport::Session*)> task;
+};
+
+void* runFunc(void* ptr) {
+ std::unique_ptr<Context> ctx(static_cast<Context*>(ptr));
+
+ auto tl = ctx->session.getTransportLayer();
+ Client::initThread("conn", &ctx->session);
+ setThreadName(std::string(str::stream() << "conn" << ctx->session.id()));
+
+ try {
+ ctx->task(&ctx->session);
+ } catch (const AssertionException& e) {
+ log() << "AssertionException handling request, closing client connection: " << e;
+ } catch (const SocketException& e) {
+ log() << "SocketException handling request, closing client connection: " << e;
+ } catch (const DBException& e) {
+ // must be right above std::exception to avoid catching subclasses
+ log() << "DBException handling request, closing client connection: " << e;
+ } catch (const std::exception& e) {
+ error() << "Uncaught std::exception: " << e.what() << ", terminating";
+ quickExit(EXIT_UNCAUGHT);
+ }
+
+ tl->end(ctx->session);
+
+ if (!serverGlobalParams.quiet) {
+ auto conns = tl->sessionStats().numOpenSessions;
+ const char* word = (conns == 1 ? " connection" : " connections");
+ log() << "end connection " << ctx->session.remote() << " (" << conns << word
+ << " now open)";
+ }
+
+ Client::destroy();
+
+ return nullptr;
+}
+} // namespace
+
+void launchWrappedServiceEntryWorkerThread(transport::Session&& session,
+ stdx::function<void(transport::Session*)> task) {
+ auto ctx = stdx::make_unique<Context>(std::move(session), std::move(task));
+
+ try {
+#ifndef __linux__ // TODO: consider making this ifdef _WIN32
+ stdx::thread(stdx::bind(runFunc, ctx.get())).detach();
+ ctx.release();
+#else
+ pthread_attr_t attrs;
+ pthread_attr_init(&attrs);
+ pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED);
+
+ static const size_t STACK_SIZE =
+ 1024 * 1024; // if we change this we need to update the warning
+
+ struct rlimit limits;
+ invariant(getrlimit(RLIMIT_STACK, &limits) == 0);
+ if (limits.rlim_cur > STACK_SIZE) {
+ size_t stackSizeToSet = STACK_SIZE;
+#if !__has_feature(address_sanitizer)
+ if (kDebugBuild)
+ stackSizeToSet /= 2;
+#endif
+ pthread_attr_setstacksize(&attrs, stackSizeToSet);
+ } else if (limits.rlim_cur < 1024 * 1024) {
+ warning() << "Stack size set to " << (limits.rlim_cur / 1024) << "KB. We suggest 1MB";
+ }
+
+
+ pthread_t thread;
+ int failed = pthread_create(&thread, &attrs, runFunc, ctx.get());
+
+ pthread_attr_destroy(&attrs);
+
+ if (failed) {
+ log() << "pthread_create failed: " << errnoWithDescription(failed);
+ throw std::system_error(
+ std::make_error_code(std::errc::resource_unavailable_try_again));
+ }
+ ctx.release();
+#endif // __linux__
+
+ } catch (...) {
+ log() << "failed to create service entry worker thread for " << ctx->session.remote();
+ }
+}
+
+} // namespace mongo
diff --git a/src/mongo/transport/transport_layer.cpp b/src/mongo/transport/service_entry_point_utils.h
index 20030e64661..42be4079f9c 100644
--- a/src/mongo/transport/transport_layer.cpp
+++ b/src/mongo/transport/service_entry_point_utils.h
@@ -26,19 +26,17 @@
* it in the license file.
*/
-#include "mongo/platform/basic.h"
+#pragma once
-#include "mongo/transport/transport_layer.h"
+#include "mongo/stdx/functional.h"
namespace mongo {
-namespace transport {
-TransportLayer::TransportLayer() = default;
-TransportLayer::~TransportLayer() = default;
+namespace transport {
+class Session;
+} // namespace transport
-TicketImpl* TransportLayer::getTicketImpl(const Ticket& ticket) {
- return ticket.impl();
-}
+void launchWrappedServiceEntryWorkerThread(transport::Session&& session,
+ stdx::function<void(transport::Session*)> task);
-} // namespace transport
} // namespace mongo
diff --git a/src/mongo/transport/session.cpp b/src/mongo/transport/session.cpp
index b9f382a6965..ebf237f8bd4 100644
--- a/src/mongo/transport/session.cpp
+++ b/src/mongo/transport/session.cpp
@@ -43,11 +43,14 @@ AtomicUInt64 sessionIdCounter(0);
} // namespace
Session::Session(HostAndPort remote, HostAndPort local, TransportLayer* tl)
- : _id(sessionIdCounter.addAndFetch(1)), _remote(remote), _local(local), _tl(tl) {}
+ : _id(sessionIdCounter.addAndFetch(1)),
+ _remote(std::move(remote)),
+ _local(std::move(local)),
+ _tags(kEmptyTagMask),
+ _tl(tl) {}
Session::~Session() {
if (_tl != nullptr) {
- invariant(_tl);
_tl->end(*this);
}
}
@@ -75,16 +78,21 @@ Session& Session::operator=(Session&& other) {
return *this;
}
-Session::SessionId Session::id() const {
- return _id;
+void Session::replaceTags(TagMask tags) {
+ _tags = tags;
+ _tl->registerTags(*this);
}
-const HostAndPort& Session::remote() const {
- return _remote;
+Ticket Session::sourceMessage(Message* message, Date_t expiration) {
+ return _tl->sourceMessage(*this, message, expiration);
}
-const HostAndPort& Session::local() const {
- return _local;
+Ticket Session::sinkMessage(const Message& message, Date_t expiration) {
+ return _tl->sinkMessage(*this, message, expiration);
+}
+
+std::string Session::getX509SubjectName() const {
+ return _tl->getX509SubjectName(*this);
}
} // namespace transport
diff --git a/src/mongo/transport/session.h b/src/mongo/transport/session.h
index 4692f22437d..b340b0fecf8 100644
--- a/src/mongo/transport/session.h
+++ b/src/mongo/transport/session.h
@@ -29,7 +29,10 @@
#pragma once
#include "mongo/base/disallow_copying.h"
+#include "mongo/transport/session_id.h"
+#include "mongo/transport/ticket.h"
#include "mongo/util/net/hostandport.h"
+#include "mongo/util/net/message.h"
namespace mongo {
namespace transport {
@@ -47,7 +50,14 @@ public:
/**
* Type to indicate the internal id for this session.
*/
- using SessionId = uint64_t;
+ using Id = SessionId;
+
+ /**
+ * Tags for groups of connections.
+ */
+ using TagMask = uint32_t;
+ static constexpr TagMask kEmptyTagMask = 0;
+ static constexpr TagMask kKeepOpen = 1;
/**
* Construct a new session.
@@ -68,24 +78,72 @@ public:
/**
* Return the id for this session.
*/
- SessionId id() const;
+ Id id() const {
+ return _id;
+ }
/**
* Return the remote host for this session.
*/
- const HostAndPort& remote() const;
+ const HostAndPort& remote() const {
+ return _remote;
+ }
/**
* Return the local host information for this session.
*/
- const HostAndPort& local() const;
+ const HostAndPort& local() const {
+ return _local;
+ }
+
+ /**
+ * Return the X509 subject name for this connection (SSL only).
+ */
+ std::string getX509SubjectName() const;
+
+ /**
+ * Set this session's tags. This Session will register
+ * its new tags with its TransportLayer.
+ */
+ void replaceTags(TagMask tags);
+
+ /**
+ * Get this session's tags.
+ */
+ TagMask getTags() const {
+ return _tags;
+ }
+
+ /**
+ * Source (receive) a new Message for this Session.
+ *
+ * This method will forward to sourceMessage on this Session's transport layer.
+ */
+ Ticket sourceMessage(Message* message, Date_t expiration = Ticket::kNoExpirationDate);
+
+ /**
+ * Sink (send) a new Message for this Session. This method should be used
+ * to send replies to a given host.
+ *
+ * This method will forward to sinkMessage on this Session's transport layer.
+ */
+ Ticket sinkMessage(const Message& message, Date_t expiration = Ticket::kNoExpirationDate);
+
+ /**
+ * The TransportLayer for this Session.
+ */
+ TransportLayer* getTransportLayer() const {
+ return _tl;
+ }
private:
- SessionId _id;
+ Id _id;
HostAndPort _remote;
HostAndPort _local;
+ TagMask _tags;
+
TransportLayer* _tl;
};
diff --git a/src/mongo/transport/session_id.h b/src/mongo/transport/session_id.h
new file mode 100644
index 00000000000..b1395ef1429
--- /dev/null
+++ b/src/mongo/transport/session_id.h
@@ -0,0 +1,39 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <cstdint>
+
+namespace mongo {
+namespace transport {
+
+using SessionId = uint64_t;
+
+} // namespace transport
+} // namespace mongo
diff --git a/src/mongo/transport/ticket.cpp b/src/mongo/transport/ticket.cpp
index 76bce1dc51e..77a290087dd 100644
--- a/src/mongo/transport/ticket.cpp
+++ b/src/mongo/transport/ticket.cpp
@@ -30,30 +30,27 @@
#include "mongo/transport/ticket.h"
#include "mongo/transport/ticket_impl.h"
+#include "mongo/transport/transport_layer.h"
namespace mongo {
namespace transport {
const Date_t Ticket::kNoExpirationDate{Date_t::max()};
-Ticket::Ticket(std::unique_ptr<TicketImpl> ticket) : _ticket(std::move(ticket)) {}
+Ticket::Ticket(TransportLayer* tl, std::unique_ptr<TicketImpl> ticket)
+ : _tl(tl), _ticket(std::move(ticket)) {}
Ticket::~Ticket() = default;
Ticket::Ticket(Ticket&&) = default;
Ticket& Ticket::operator=(Ticket&&) = default;
-Session::SessionId Ticket::sessionId() const {
- return _ticket->sessionId();
+Status Ticket::wait()&& {
+ return _tl->wait(std::move(*this));
}
-Date_t Ticket::expiration() const {
- return _ticket->expiration();
-}
-
-// TODO should this actually be const?
-TicketImpl* Ticket::impl() const {
- return _ticket.get();
+void Ticket::asyncWait(TicketCallback cb)&& {
+ return _tl->asyncWait(std::move(*this), std::move(cb));
}
} // namespace transport
diff --git a/src/mongo/transport/ticket.h b/src/mongo/transport/ticket.h
index 875a66b3602..dd8927085aa 100644
--- a/src/mongo/transport/ticket.h
+++ b/src/mongo/transport/ticket.h
@@ -28,14 +28,18 @@
#pragma once
+#include <memory>
+
#include "mongo/base/disallow_copying.h"
-#include "mongo/transport/session.h"
+#include "mongo/stdx/functional.h"
+#include "mongo/transport/session_id.h"
+#include "mongo/transport/ticket_impl.h"
#include "mongo/util/time_support.h"
namespace mongo {
namespace transport {
-class TicketImpl;
+class TransportLayer;
/**
* A Ticket represents some work to be done within the TransportLayer.
@@ -46,16 +50,16 @@ class Ticket {
MONGO_DISALLOW_COPYING(Ticket);
public:
- friend class TransportLayer;
+ using TicketCallback = stdx::function<void(Status)>;
- using SessionId = Session::SessionId;
+ friend class TransportLayer;
/**
* Indicates that there is no expiration time by when a ticket needs to complete.
*/
static const Date_t kNoExpirationDate;
- Ticket(std::unique_ptr<TicketImpl> ticket);
+ Ticket(TransportLayer* tl, std::unique_ptr<TicketImpl> ticket);
~Ticket();
/**
@@ -67,20 +71,41 @@ public:
/**
* Return this ticket's session id.
*/
- SessionId sessionId() const;
+ SessionId sessionId() const {
+ return _ticket->sessionId();
+ }
/**
* Return this ticket's expiration date.
*/
- Date_t expiration() const;
+ Date_t expiration() const {
+ return _ticket->expiration();
+ }
+
+ /**
+ * Wait for this ticket to be filled.
+ *
+ * This is this-rvalue qualified because it consumes the ticket
+ */
+ Status wait() &&;
+
+ /**
+ * Asynchronously wait for this ticket to be filled.
+ *
+ * This is this-rvalue qualified because it consumes the ticket
+ */
+ void asyncWait(TicketCallback cb) &&;
protected:
/**
* Return a non-owning pointer to the underlying TicketImpl type
*/
- TicketImpl* impl() const;
+ TicketImpl* impl() const {
+ return _ticket.get();
+ }
private:
+ TransportLayer* _tl;
std::unique_ptr<TicketImpl> _ticket;
};
diff --git a/src/mongo/transport/ticket_impl.h b/src/mongo/transport/ticket_impl.h
index 378f9fb2999..4cbbc0a22d0 100644
--- a/src/mongo/transport/ticket_impl.h
+++ b/src/mongo/transport/ticket_impl.h
@@ -28,7 +28,8 @@
#pragma once
-#include "mongo/transport/session.h"
+#include "mongo/base/disallow_copying.h"
+#include "mongo/transport/session_id.h"
#include "mongo/util/time_support.h"
namespace mongo {
@@ -43,8 +44,6 @@ class TicketImpl {
MONGO_DISALLOW_COPYING(TicketImpl);
public:
- using SessionId = Session::SessionId;
-
virtual ~TicketImpl() = default;
TicketImpl(TicketImpl&&) = default;
diff --git a/src/mongo/transport/transport_layer.h b/src/mongo/transport/transport_layer.h
index 3028506c6f0..dde6248eaf5 100644
--- a/src/mongo/transport/transport_layer.h
+++ b/src/mongo/transport/transport_layer.h
@@ -57,7 +57,30 @@ class TransportLayer {
MONGO_DISALLOW_COPYING(TransportLayer);
public:
- virtual ~TransportLayer();
+ /**
+ * Stats for sessions open in the Transport Layer.
+ */
+ struct Stats {
+ /**
+ * Returns the number of sessions currently open in the transport layer.
+ */
+ size_t numOpenSessions = 0;
+
+ /**
+ * Returns the total number of sessions that have ever been created by this TransportLayer.
+ */
+ size_t numCreatedSessions = 0;
+
+ /**
+ * Returns the number of available sessions we could still open. Only relevant
+ * when we are operating under a transport::Session limit (for example, in the
+ * legacy implementation, we respect a maximum number of connections). If there
+ * is no session limit, returns std::numeric_limits<int>::max().
+ */
+ size_t numAvailableSessions = 0;
+ };
+
+ virtual ~TransportLayer() = default;
/**
* Source (receive) a new Message for this Session.
@@ -101,7 +124,7 @@ public:
* This thread may be used by the TransportLayer to run other Tickets that were
* enqueued prior to this call.
*/
- virtual Status wait(Ticket ticket) = 0;
+ virtual Status wait(Ticket&& ticket) = 0;
/**
* Callback for Tickets that are run via asyncWait().
@@ -115,7 +138,27 @@ public:
* This thread will not be used by the TransportLayer to perform work. The callback
* passed to asyncWait() may be run on any thread.
*/
- virtual void asyncWait(Ticket ticket, TicketCallback callback) = 0;
+ virtual void asyncWait(Ticket&& ticket, TicketCallback callback) = 0;
+
+ /**
+ * Tag this Session within the TransportLayer with the tags currently assigned to the
+ * Session. If endAllSessions() is called with a matching
+ * Session::TagMask, this Session will not be ended.
+ *
+ * Before calling this method, use Session::replaceTags() to set the desired TagMask.
+ */
+ virtual void registerTags(const Session& session) = 0;
+
+ /**
+ * Return the stored X509 subject name for this session. If the session does not
+ * exist in this TransportLayer, returns "".
+ */
+ virtual std::string getX509SubjectName(const Session& session) = 0;
+
+ /**
+ * Returns the number of sessions currently open in the transport layer.
+ */
+ virtual Stats sessionStats() = 0;
/**
* End the given Session. Tickets for this Session that have already been
@@ -131,11 +174,20 @@ public:
virtual void end(const Session& session) = 0;
/**
- * End all active sessions in the TransportLayer. Tickets that have already been
- * started via wait() or asyncWait() will complete, but may return a failed Status.
- * This method is synchronous and will not return until all sessions have ended.
+ * End all active sessions in the TransportLayer. Tickets that have already been started via
+ * wait() or asyncWait() will complete, but may return a failed Status. This method is
+ * asynchronous and will return after all sessions have been notified to end.
+ *
+ * If a TagMask is provided, endAllSessions() will skip over sessions with matching
+ * tags and leave them open.
+ */
+ virtual void endAllSessions(Session::TagMask tags = Session::kEmptyTagMask) = 0;
+
+ /**
+ * Start the TransportLayer. After this point, the TransportLayer will begin accepting active
+ * sessions from new transport::Endpoints.
*/
- virtual void endAllSessions() = 0;
+ virtual Status start() = 0;
/**
* Shut the TransportLayer down. After this point, the TransportLayer will
@@ -147,12 +199,21 @@ public:
virtual void shutdown() = 0;
protected:
- TransportLayer();
+ TransportLayer() = default;
/**
* Return the implementation of this Ticket.
*/
- TicketImpl* getTicketImpl(const Ticket& ticket);
+ TicketImpl* getTicketImpl(const Ticket& ticket) {
+ return ticket.impl();
+ }
+
+ /**
+ * Return the transport layer of this Ticket.
+ */
+ TransportLayer* getTicketTransportLayer(const Ticket& ticket) {
+ return ticket._tl;
+ }
};
} // namespace transport
diff --git a/src/mongo/transport/transport_layer_legacy.cpp b/src/mongo/transport/transport_layer_legacy.cpp
new file mode 100644
index 00000000000..b45d6a6ad07
--- /dev/null
+++ b/src/mongo/transport/transport_layer_legacy.cpp
@@ -0,0 +1,307 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/transport/transport_layer_legacy.h"
+
+#include "mongo/base/checked_cast.h"
+#include "mongo/config.h"
+#include "mongo/db/service_context.h"
+#include "mongo/db/stats/counters.h"
+#include "mongo/stdx/functional.h"
+#include "mongo/transport/service_entry_point.h"
+#include "mongo/util/assert_util.h"
+#include "mongo/util/log.h"
+#include "mongo/util/net/abstract_message_port.h"
+#include "mongo/util/net/socket_exception.h"
+
+namespace mongo {
+namespace transport {
+
+TransportLayerLegacy::ListenerLegacy::ListenerLegacy(const TransportLayerLegacy::Options& opts,
+ NewConnectionCb callback)
+ : Listener("", opts.ipList, opts.port, getGlobalServiceContext(), true),
+ _accepted(std::move(callback)) {}
+
+void TransportLayerLegacy::ListenerLegacy::accepted(std::unique_ptr<AbstractMessagingPort> mp) {
+ _accepted(std::move(mp));
+}
+
+TransportLayerLegacy::TransportLayerLegacy(const TransportLayerLegacy::Options& opts,
+ std::shared_ptr<ServiceEntryPoint> sep)
+ : _sep(sep),
+ _listener(stdx::make_unique<ListenerLegacy>(
+ opts,
+ stdx::bind(&TransportLayerLegacy::_handleNewConnection, this, stdx::placeholders::_1))),
+ _running(false),
+ _options(opts) {}
+
+TransportLayerLegacy::LegacyTicket::LegacyTicket(const Session& session,
+ Date_t expiration,
+ WorkHandle work)
+ : _sessionId(session.id()), _expiration(expiration), _fill(std::move(work)) {}
+
+Session::Id TransportLayerLegacy::LegacyTicket::sessionId() const {
+ return _sessionId;
+}
+
+Date_t TransportLayerLegacy::LegacyTicket::expiration() const {
+ return _expiration;
+}
+
+Status TransportLayerLegacy::setup() {
+ if (!_listener->setupSockets()) {
+ error() << "Failed to set up sockets during startup.";
+ return {ErrorCodes::InternalError, "Failed to set up sockets"};
+ }
+
+ return Status::OK();
+}
+
+Status TransportLayerLegacy::start() {
+ if (_running.swap(true)) {
+ return {ErrorCodes::InternalError, "TransportLayer is already running"};
+ }
+
+ _listenerThread = stdx::thread([this]() { _listener->initAndListen(); });
+
+ return Status::OK();
+}
+
+TransportLayerLegacy::~TransportLayerLegacy() = default;
+
+Ticket TransportLayerLegacy::sourceMessage(const Session& session,
+ Message* message,
+ Date_t expiration) {
+ auto sourceCb = [message](AbstractMessagingPort* amp) -> Status {
+ if (!amp->recv(*message)) {
+ return {ErrorCodes::HostUnreachable, "Recv failed"};
+ }
+ return Status::OK();
+ };
+
+ return Ticket(this, stdx::make_unique<LegacyTicket>(session, expiration, std::move(sourceCb)));
+}
+
+std::string TransportLayerLegacy::getX509SubjectName(const Session& session) {
+ {
+ stdx::lock_guard<stdx::mutex> lk(_connectionsMutex);
+ auto conn = _connections.find(session.id());
+ if (conn == _connections.end()) {
+ // Return empty string if the session is not found
+ return "";
+ }
+
+ return conn->second.x509SubjectName.value_or("");
+ }
+}
+
+TransportLayer::Stats TransportLayerLegacy::sessionStats() {
+ Stats stats;
+ {
+ stdx::lock_guard<stdx::mutex> lk(_connectionsMutex);
+ stats.numOpenSessions = _connections.size();
+ }
+
+ stats.numAvailableSessions = Listener::globalTicketHolder.available();
+ stats.numCreatedSessions = Listener::globalConnectionNumber.load();
+
+ return stats;
+}
+
+Ticket TransportLayerLegacy::sinkMessage(const Session& session,
+ const Message& message,
+ Date_t expiration) {
+ auto sinkCb = [&message](AbstractMessagingPort* amp) -> Status {
+ try {
+ amp->say(message);
+ return Status::OK();
+ } catch (const SocketException& e) {
+ return {ErrorCodes::HostUnreachable, e.what()};
+ }
+ };
+
+ return Ticket(this, stdx::make_unique<LegacyTicket>(session, expiration, std::move(sinkCb)));
+}
+
+Status TransportLayerLegacy::wait(Ticket&& ticket) {
+ return _runTicket(std::move(ticket));
+}
+
+void TransportLayerLegacy::asyncWait(Ticket&& ticket, TicketCallback callback) {
+ // Left unimplemented because there is no reasonable way to offer general async waiting besides
+ // offering a background thread that can handle waits for multiple tickets. We may never
+ // implement this for the legacy TL.
+ MONGO_UNREACHABLE;
+}
+
+void TransportLayerLegacy::end(const Session& session) {
+ stdx::lock_guard<stdx::mutex> lk(_connectionsMutex);
+ auto conn = _connections.find(session.id());
+ if (conn != _connections.end()) {
+ _endSession_inlock(conn);
+ }
+}
+
+void TransportLayerLegacy::registerTags(const Session& session) {
+ stdx::lock_guard<stdx::mutex> lk(_connectionsMutex);
+ auto conn = _connections.find(session.id());
+ if (conn != _connections.end()) {
+ conn->second.tags = session.getTags();
+ }
+}
+
+void TransportLayerLegacy::_endSession_inlock(
+ decltype(TransportLayerLegacy::_connections.begin()) conn) {
+ conn->second.amp->shutdown();
+
+ // If the amp is in use it means that we're in the middle of fulfilling the ticket in _runTicket
+ // and can't erase it immediately.
+ //
+ // In that case we'll rely on _runTicket to do the removal for us later
+ if (conn->second.inUse) {
+ conn->second.ended = true;
+ } else {
+ Listener::globalTicketHolder.release();
+ _connections.erase(conn);
+ }
+}
+
+void TransportLayerLegacy::endAllSessions(Session::TagMask tags) {
+ log() << "legacy transport layer ending all sessions";
+ {
+ stdx::lock_guard<stdx::mutex> lk(_connectionsMutex);
+ auto&& conn = _connections.begin();
+ while (conn != _connections.end()) {
+ // If we erase this connection below, we invalidate our iterator, use a placeholder.
+ auto placeholder = conn;
+ placeholder++;
+
+ if (conn->second.tags & tags) {
+ log() << "Skip closing connection for connection # " << conn->second.connectionId;
+ } else {
+ _endSession_inlock(conn);
+ }
+
+ conn = placeholder;
+ }
+ }
+}
+
+void TransportLayerLegacy::shutdown() {
+ _running.store(false);
+ _listener->shutdown();
+ _listenerThread.join();
+ endAllSessions();
+}
+
+Status TransportLayerLegacy::_runTicket(Ticket ticket) {
+ if (!_running.load()) {
+ return {ErrorCodes::ShutdownInProgress, "TransportLayer in shutdown"};
+ }
+
+ if (ticket.expiration() < Date_t::now()) {
+ return {ErrorCodes::ExceededTimeLimit, "Ticket has expired"};
+ }
+
+ AbstractMessagingPort* amp;
+
+ {
+ stdx::lock_guard<stdx::mutex> lk(_connectionsMutex);
+
+ auto conn = _connections.find(ticket.sessionId());
+ if (conn == _connections.end()) {
+ return {ErrorCodes::TransportSessionNotFound, "No such session in TransportLayer"};
+ }
+
+ // "check out" the port
+ conn->second.inUse = true;
+ amp = conn->second.amp.get();
+ }
+
+ amp->clearCounters();
+
+ auto legacyTicket = checked_cast<LegacyTicket*>(getTicketImpl(ticket));
+ auto res = legacyTicket->_fill(amp);
+
+ networkCounter.hit(amp->getBytesIn(), amp->getBytesOut());
+
+ {
+ stdx::lock_guard<stdx::mutex> lk(_connectionsMutex);
+
+ auto conn = _connections.find(ticket.sessionId());
+ invariant(conn != _connections.end());
+
+#ifdef MONGO_CONFIG_SSL
+ // If we didn't have an X509 subject name, see if we have one now
+ if (!conn->second.x509SubjectName) {
+ auto name = amp->getX509SubjectName();
+ if (name != "") {
+ conn->second.x509SubjectName = name;
+ }
+ }
+#endif
+ conn->second.inUse = false;
+
+ if (conn->second.ended) {
+ Listener::globalTicketHolder.release();
+ _connections.erase(conn);
+ }
+ }
+
+ return res;
+}
+
+void TransportLayerLegacy::_handleNewConnection(std::unique_ptr<AbstractMessagingPort> amp) {
+ if (!Listener::globalTicketHolder.tryAcquire()) {
+ log() << "connection refused because too many open connections: "
+ << Listener::globalTicketHolder.used();
+ amp->shutdown();
+ return;
+ }
+
+ Session session(amp->remote(), HostAndPort(amp->localAddr().toString(true)), this);
+
+ amp->setLogLevel(logger::LogSeverity::Debug(1));
+
+ {
+ stdx::lock_guard<stdx::mutex> lk(_connectionsMutex);
+ _connections.emplace(std::piecewise_construct,
+ std::forward_as_tuple(session.id()),
+ std::forward_as_tuple(std::move(amp), false, session.getTags()));
+ }
+
+ invariant(_sep);
+ _sep->startSession(std::move(session));
+}
+
+} // namespace transport
+} // namespace mongo
diff --git a/src/mongo/transport/transport_layer_legacy.h b/src/mongo/transport/transport_layer_legacy.h
new file mode 100644
index 00000000000..ec17c76ca1e
--- /dev/null
+++ b/src/mongo/transport/transport_layer_legacy.h
@@ -0,0 +1,174 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <unordered_map>
+
+#include "mongo/stdx/memory.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/stdx/thread.h"
+#include "mongo/transport/ticket_impl.h"
+#include "mongo/transport/transport_layer.h"
+#include "mongo/util/net/listen.h"
+#include "mongo/util/net/sock.h"
+
+namespace mongo {
+
+class AbstractMessagingPort;
+class ServiceEntryPoint;
+
+namespace transport {
+
+/**
+ * A TransportLayer implementation based on legacy networking primitives (the Listener,
+ * AbstractMessagingPort).
+ */
+class TransportLayerLegacy final : public TransportLayer {
+ MONGO_DISALLOW_COPYING(TransportLayerLegacy);
+
+public:
+ struct Options {
+ int port; // port to bind to
+ std::string ipList; // addresses to bind to
+
+ Options() : port(0), ipList("") {}
+ };
+
+ TransportLayerLegacy(const Options& opts, std::shared_ptr<ServiceEntryPoint> sep);
+
+ ~TransportLayerLegacy();
+
+ Status setup();
+ Status start() override;
+
+ Ticket sourceMessage(const Session& session,
+ Message* message,
+ Date_t expiration = Ticket::kNoExpirationDate) override;
+
+ Ticket sinkMessage(const Session& session,
+ const Message& message,
+ Date_t expiration = Ticket::kNoExpirationDate) override;
+
+ Status wait(Ticket&& ticket) override;
+ void asyncWait(Ticket&& ticket, TicketCallback callback) override;
+
+ void registerTags(const Session& session) override;
+ std::string getX509SubjectName(const Session& session) override;
+
+ Stats sessionStats() override;
+
+ void end(const Session& session) override;
+ void endAllSessions(transport::Session::TagMask tags = Session::kKeepOpen) override;
+ void shutdown() override;
+
+private:
+ void _handleNewConnection(std::unique_ptr<AbstractMessagingPort> amp);
+
+ Status _runTicket(Ticket ticket);
+
+ using NewConnectionCb = stdx::function<void(std::unique_ptr<AbstractMessagingPort>)>;
+ using WorkHandle = stdx::function<Status(AbstractMessagingPort*)>;
+
+ /**
+ * A TicketImpl implementation for this TransportLayer. WorkHandle is a callable that
+ * can be invoked to fill this ticket.
+ */
+ class LegacyTicket : public TicketImpl {
+ MONGO_DISALLOW_COPYING(LegacyTicket);
+
+ public:
+ LegacyTicket(const Session& session, Date_t expiration, WorkHandle work);
+
+ SessionId sessionId() const override;
+ Date_t expiration() const override;
+
+ SessionId _sessionId;
+ Date_t _expiration;
+
+ WorkHandle _fill;
+ };
+
+ /**
+ * This Listener wraps the interface in listen.h so that we may implement our own
+ * version of accepted().
+ */
+ class ListenerLegacy : public Listener {
+ public:
+ ListenerLegacy(const TransportLayerLegacy::Options& opts, NewConnectionCb callback);
+
+ void runListener();
+
+ void accepted(std::unique_ptr<AbstractMessagingPort> mp) override;
+
+ bool useUnixSockets() const override {
+ return true;
+ }
+
+ private:
+ NewConnectionCb _accepted;
+ };
+
+ /**
+ * Connection object, to associate Session ids with AbstractMessagingPorts.
+ */
+ struct Connection {
+ Connection(std::unique_ptr<AbstractMessagingPort> port, bool ended, Session::TagMask tags)
+ : amp(std::move(port)),
+ connectionId(amp->connectionId()),
+ tags(tags),
+ inUse(false),
+ ended(false) {}
+
+ std::unique_ptr<AbstractMessagingPort> amp;
+
+ const long long connectionId;
+
+ boost::optional<std::string> x509SubjectName;
+ Session::TagMask tags;
+ bool inUse;
+ bool ended;
+ };
+
+ std::shared_ptr<ServiceEntryPoint> _sep;
+
+ std::unique_ptr<Listener> _listener;
+ stdx::thread _listenerThread;
+
+ stdx::mutex _connectionsMutex;
+ std::unordered_map<Session::Id, Connection> _connections;
+
+ void _endSession_inlock(decltype(_connections.begin()) conn);
+
+ AtomicWord<bool> _running;
+
+ Options _options;
+};
+
+} // namespace transport
+} // namespace mongo
diff --git a/src/mongo/transport/transport_layer_manager.cpp b/src/mongo/transport/transport_layer_manager.cpp
new file mode 100644
index 00000000000..c64023b2067
--- /dev/null
+++ b/src/mongo/transport/transport_layer_manager.cpp
@@ -0,0 +1,129 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/transport/transport_layer_manager.h"
+
+#include "mongo/base/status.h"
+#include "mongo/stdx/memory.h"
+#include "mongo/transport/session.h"
+#include "mongo/util/time_support.h"
+#include <limits>
+
+#include <iostream>
+
+namespace mongo {
+namespace transport {
+
+TransportLayerManager::TransportLayerManager() = default;
+
+Ticket TransportLayerManager::sourceMessage(const Session& session,
+ Message* message,
+ Date_t expiration) {
+ return session.getTransportLayer()->sourceMessage(session, message, expiration);
+}
+
+Ticket TransportLayerManager::sinkMessage(const Session& session,
+ const Message& message,
+ Date_t expiration) {
+ return session.getTransportLayer()->sinkMessage(session, message, expiration);
+}
+
+Status TransportLayerManager::wait(Ticket&& ticket) {
+ return getTicketTransportLayer(ticket)->wait(std::move(ticket));
+}
+
+void TransportLayerManager::asyncWait(Ticket&& ticket, TicketCallback callback) {
+ return getTicketTransportLayer(ticket)->asyncWait(std::move(ticket), std::move(callback));
+}
+
+std::string TransportLayerManager::getX509SubjectName(const Session& session) {
+ return session.getX509SubjectName();
+}
+
+template <typename Callable>
+void TransportLayerManager::_foreach(Callable&& cb) {
+ {
+ stdx::lock_guard<stdx::mutex> lk(_tlsMutex);
+ for (auto&& tl : _tls) {
+ cb(tl.get());
+ }
+ }
+}
+
+TransportLayer::Stats TransportLayerManager::sessionStats() {
+ Stats stats;
+
+ _foreach([&](TransportLayer* tl) {
+ Stats s = tl->sessionStats();
+
+ stats.numOpenSessions += s.numOpenSessions;
+ stats.numCreatedSessions += s.numCreatedSessions;
+ if (std::numeric_limits<size_t>::max() - stats.numAvailableSessions <
+ s.numAvailableSessions) {
+ stats.numAvailableSessions = std::numeric_limits<size_t>::max();
+ } else {
+ stats.numAvailableSessions += s.numAvailableSessions;
+ }
+ });
+
+ return stats;
+}
+
+void TransportLayerManager::registerTags(const Session& session) {
+ session.getTransportLayer()->registerTags(session);
+}
+
+void TransportLayerManager::end(const Session& session) {
+ session.getTransportLayer()->end(session);
+}
+
+void TransportLayerManager::endAllSessions(Session::TagMask tags) {
+ _foreach([&tags](TransportLayer* tl) { tl->endAllSessions(tags); });
+}
+
+Status TransportLayerManager::start() {
+ return Status::OK();
+}
+
+void TransportLayerManager::shutdown() {
+ _foreach([](TransportLayer* tl) { tl->shutdown(); });
+}
+
+Status TransportLayerManager::addAndStartTransportLayer(std::unique_ptr<TransportLayer> tl) {
+ auto ptr = tl.get();
+ {
+ stdx::lock_guard<stdx::mutex> lk(_tlsMutex);
+ _tls.emplace_back(std::move(tl));
+ }
+ return ptr->start();
+}
+
+} // namespace transport
+} // namespace mongo
diff --git a/src/mongo/transport/transport_layer_manager.h b/src/mongo/transport/transport_layer_manager.h
new file mode 100644
index 00000000000..391bdf4ebea
--- /dev/null
+++ b/src/mongo/transport/transport_layer_manager.h
@@ -0,0 +1,90 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <vector>
+
+#include "mongo/base/status.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/transport/session.h"
+#include "mongo/transport/ticket.h"
+#include "mongo/transport/ticket_impl.h"
+#include "mongo/transport/transport_layer.h"
+#include "mongo/util/net/message.h"
+#include "mongo/util/time_support.h"
+
+namespace mongo {
+namespace transport {
+
+/**
+ * This TransportLayerManager is a TransportLayer implementation that holds other
+ * TransportLayers. Mongod and Mongos can treat this like the "only" TransportLayer
+ * and not be concerned with which other TransportLayer implementations it holds
+ * underneath.
+ */
+class TransportLayerManager final : public TransportLayer {
+ MONGO_DISALLOW_COPYING(TransportLayerManager);
+
+public:
+ TransportLayerManager();
+
+ Ticket sourceMessage(const Session& session,
+ Message* message,
+ Date_t expiration = Ticket::kNoExpirationDate) override;
+ Ticket sinkMessage(const Session& session,
+ const Message& message,
+ Date_t expiration = Ticket::kNoExpirationDate) override;
+
+ Status wait(Ticket&& ticket) override;
+ void asyncWait(Ticket&& ticket, TicketCallback callback) override;
+
+ std::string getX509SubjectName(const Session& session) override;
+ void registerTags(const Session& session) override;
+
+ Stats sessionStats() override;
+
+ void end(const Session& session) override;
+ void endAllSessions(Session::TagMask tags = Session::kEmptyTagMask) override;
+
+ Status start() override;
+
+ void shutdown() override;
+
+ Status addAndStartTransportLayer(std::unique_ptr<TransportLayer> tl);
+
+private:
+ template <typename Callable>
+ void _foreach(Callable&& cb);
+
+ stdx::mutex _tlsMutex;
+ std::vector<std::unique_ptr<TransportLayer>> _tls;
+};
+
+} // namespace transport
+} // namespace mongo
diff --git a/src/mongo/transport/transport_layer_mock.cpp b/src/mongo/transport/transport_layer_mock.cpp
new file mode 100644
index 00000000000..c550570b401
--- /dev/null
+++ b/src/mongo/transport/transport_layer_mock.cpp
@@ -0,0 +1,95 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include <memory>
+
+#include "mongo/base/status.h"
+#include "mongo/stdx/memory.h"
+#include "mongo/transport/session.h"
+#include "mongo/transport/ticket.h"
+#include "mongo/transport/ticket_impl.h"
+#include "mongo/transport/transport_layer_mock.h"
+#include "mongo/util/time_support.h"
+
+#include "mongo/stdx/memory.h"
+
+namespace mongo {
+namespace transport {
+
+Session::Id TransportLayerMock::MockTicket::sessionId() const {
+ return Session::Id{};
+}
+
+Date_t TransportLayerMock::MockTicket::expiration() const {
+ return Date_t::now();
+}
+
+Ticket TransportLayerMock::sourceMessage(const Session& session,
+ Message* message,
+ Date_t expiration) {
+ return Ticket(this, stdx::make_unique<MockTicket>());
+}
+
+Ticket TransportLayerMock::sinkMessage(const Session& session,
+ const Message& message,
+ Date_t expiration) {
+ return Ticket(this, stdx::make_unique<MockTicket>());
+}
+
+Status TransportLayerMock::wait(Ticket&& ticket) {
+ return Status::OK();
+}
+
+void TransportLayerMock::asyncWait(Ticket&& ticket, TicketCallback callback) {
+ callback(Status::OK());
+}
+
+std::string TransportLayerMock::getX509SubjectName(const Session& session) {
+ return session.getX509SubjectName();
+}
+
+TransportLayer::Stats TransportLayerMock::sessionStats() {
+ return Stats();
+}
+
+void TransportLayerMock::registerTags(const Session& session) {}
+
+void TransportLayerMock::end(const Session& session) {}
+
+void TransportLayerMock::endAllSessions(Session::TagMask tags) {}
+
+Status TransportLayerMock::start() {
+ return Status::OK();
+}
+
+void TransportLayerMock::shutdown() {}
+
+} // namespace transport
+} // namespace mongo
diff --git a/src/mongo/transport/transport_layer_mock.h b/src/mongo/transport/transport_layer_mock.h
new file mode 100644
index 00000000000..400d6b02258
--- /dev/null
+++ b/src/mongo/transport/transport_layer_mock.h
@@ -0,0 +1,90 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/base/status.h"
+#include "mongo/transport/session.h"
+#include "mongo/transport/ticket.h"
+#include "mongo/transport/ticket_impl.h"
+#include "mongo/transport/transport_layer.h"
+#include "mongo/util/net/message.h"
+#include "mongo/util/time_support.h"
+
+namespace mongo {
+namespace transport {
+
+/**
+ * This TransportLayerMock is a noop TransportLayer implementation.
+ */
+class TransportLayerMock : public TransportLayer {
+ MONGO_DISALLOW_COPYING(TransportLayerMock);
+
+public:
+ TransportLayerMock() = default;
+
+ Ticket sourceMessage(const Session& session,
+ Message* message,
+ Date_t expiration = Ticket::kNoExpirationDate) override;
+ Ticket sinkMessage(const Session& session,
+ const Message& message,
+ Date_t expiration = Ticket::kNoExpirationDate) override;
+
+ Status wait(Ticket&& ticket) override;
+ void asyncWait(Ticket&& ticket, TicketCallback callback) override;
+
+ std::string getX509SubjectName(const Session& session) override;
+ void registerTags(const Session& session) override;
+
+ Stats sessionStats() override;
+
+ void end(const Session& session) override;
+ void endAllSessions(Session::TagMask tags = Session::kEmptyTagMask) override;
+
+ Status start() override;
+ void shutdown() override;
+
+private:
+ /**
+ * A class for Tickets issued from the TransportLayerMock. These will
+ * route to the appropriate TransportLayer when run with wait() or
+ * asyncWait().
+ */
+ class MockTicket : public TicketImpl {
+ MONGO_DISALLOW_COPYING(MockTicket);
+
+ public:
+ MockTicket() = default;
+
+ Session::Id sessionId() const override;
+ Date_t expiration() const override;
+ };
+};
+
+} // namespace transport
+} // namespace mongo
diff --git a/src/mongo/util/exit.cpp b/src/mongo/util/exit.cpp
index 44cdf07f68b..7955a3483c6 100644
--- a/src/mongo/util/exit.cpp
+++ b/src/mongo/util/exit.cpp
@@ -32,6 +32,7 @@
#include "mongo/util/exit.h"
+#include <boost/optional.hpp>
#include <stack>
#include "mongo/stdx/condition_variable.h"
@@ -47,6 +48,7 @@ namespace {
stdx::mutex shutdownMutex;
stdx::condition_variable shutdownTasksComplete;
+boost::optional<ExitCode> shutdownExitCode;
bool shutdownTasksInProgress = false;
AtomicUInt32 shutdownFlag;
std::stack<stdx::function<void()>> shutdownTasks;
@@ -69,6 +71,10 @@ MONGO_COMPILER_NORETURN void logAndQuickExit(ExitCode code) {
quickExit(code);
}
+void setShutdownFlag() {
+ shutdownFlag.fetchAndAdd(1);
+}
+
} // namespace
bool inShutdown() {
@@ -79,6 +85,13 @@ bool inShutdownStrict() {
return shutdownFlag.load() != 0;
}
+ExitCode waitForShutdown() {
+ stdx::unique_lock<stdx::mutex> lk(shutdownMutex);
+ shutdownTasksComplete.wait(lk, [] { return static_cast<bool>(shutdownExitCode); });
+
+ return shutdownExitCode.get();
+}
+
void registerShutdownTask(stdx::function<void()> task) {
stdx::lock_guard<stdx::mutex> lock(shutdownMutex);
invariant(!inShutdown());
@@ -105,7 +118,7 @@ void shutdown(ExitCode code) {
logAndQuickExit(code);
}
- shutdownFlag.fetchAndAdd(1);
+ setShutdownFlag();
shutdownTasksInProgress = true;
shutdownTasksThreadId = stdx::this_thread::get_id();
@@ -117,6 +130,7 @@ void shutdown(ExitCode code) {
{
stdx::lock_guard<stdx::mutex> lock(shutdownMutex);
shutdownTasksInProgress = false;
+ shutdownExitCode.emplace(code);
}
shutdownTasksComplete.notify_all();
@@ -133,7 +147,7 @@ void shutdownNoTerminate() {
if (inShutdown())
return;
- shutdownFlag.fetchAndAdd(1);
+ setShutdownFlag();
shutdownTasksInProgress = true;
shutdownTasksThreadId = stdx::this_thread::get_id();
diff --git a/src/mongo/util/exit.h b/src/mongo/util/exit.h
index fa6a8bbb85b..1aed81117a8 100644
--- a/src/mongo/util/exit.h
+++ b/src/mongo/util/exit.h
@@ -46,6 +46,11 @@ bool inShutdown();
bool inShutdownStrict();
/**
+ * Does not return until all shutdown tasks have run.
+ */
+ExitCode waitForShutdown();
+
+/**
* Registers a new shutdown task to be called when shutdown or
* shutdownNoTerminate is called. If this function is invoked after
* shutdown or shutdownNoTerminate has been called, std::terminate is
diff --git a/src/mongo/util/net/SConscript b/src/mongo/util/net/SConscript
index fa8899561e9..34105547ab6 100644
--- a/src/mongo/util/net/SConscript
+++ b/src/mongo/util/net/SConscript
@@ -95,17 +95,6 @@ env.Library(
)
env.Library(
- target="message_server_port",
- source=[
- "message_server_port.cpp",
- ],
- LIBDEPS=[
- 'network',
- '$BUILD_DIR/mongo/db/stats/counters',
- ],
-)
-
-env.Library(
target='miniwebserver',
source=[
'miniwebserver.cpp',
diff --git a/src/mongo/util/net/abstract_message_port.h b/src/mongo/util/net/abstract_message_port.h
index c4ff5503f41..e4884e75133 100644
--- a/src/mongo/util/net/abstract_message_port.h
+++ b/src/mongo/util/net/abstract_message_port.h
@@ -32,6 +32,7 @@
#include "mongo/config.h"
#include "mongo/logger/log_severity.h"
+#include "mongo/stdx/functional.h"
#include "mongo/util/net/message.h"
#include "mongo/util/net/sockaddr.h"
#include "mongo/util/time_support.h"
@@ -88,6 +89,11 @@ public:
virtual void say(Message& toSend, int responseTo = 0) = 0;
/**
+ * Sends the message (does not set headers).
+ */
+ virtual void say(const Message& toSend) = 0;
+
+ /**
* Sends the data over the socket.
*/
virtual void send(const char* data, int len, const char* context) = 0;
diff --git a/src/mongo/util/net/asio_message_port.cpp b/src/mongo/util/net/asio_message_port.cpp
index 88e37feb2d8..16fab7ea8c1 100644
--- a/src/mongo/util/net/asio_message_port.cpp
+++ b/src/mongo/util/net/asio_message_port.cpp
@@ -54,40 +54,6 @@ const char kGET[] = "GET";
const int kHeaderLen = sizeof(MSGHEADER::Value);
const int kInitialMessageSize = 1024;
-class ASIOPorts {
-public:
- void closeAll(AbstractMessagingPort::Tag skipMask) {
- stdx::lock_guard<stdx::mutex> lock_guard(mutex);
- for (auto&& port : _portSet) {
- if (port->getTag() & skipMask) {
- LOG(3) << "Skip closing connection # " << port->connectionId();
- continue;
- }
- LOG(3) << "Closing connection # " << port->connectionId();
- port->shutdown();
- }
- }
-
- void insert(ASIOMessagingPort* p) {
- stdx::lock_guard<stdx::mutex> lock_guard(mutex);
- _portSet.insert(p);
- }
-
- void erase(ASIOMessagingPort* p) {
- stdx::lock_guard<stdx::mutex> lock_guard(mutex);
- _portSet.erase(p);
- }
-
-private:
- stdx::mutex mutex;
- std::unordered_set<ASIOMessagingPort*> _portSet;
-};
-
-// We "new" this so it will still be around when other automatic global vars are being destructed
-// during termination. TODO: This is an artifact from MessagingPort and should be removed when we
-// decide where to put networking global state.
-ASIOPorts& asioPorts = *(new ASIOPorts());
-
#ifdef MONGO_CONFIG_SSL
struct ASIOSSLContextPair {
ASIOSSLContext server;
@@ -109,10 +75,6 @@ MONGO_INITIALIZER_WITH_PREREQUISITES(ASIOSSLContextSetup, ("SSLManager"))(Initia
} // namespace
-void ASIOMessagingPort::closeSockets(AbstractMessagingPort::Tag skipMask) {
- asioPorts.closeAll(skipMask);
-}
-
ASIOMessagingPort::ASIOMessagingPort(int fd, SockAddr farEnd)
: _service(1),
_timer(_service),
@@ -145,7 +107,6 @@ ASIOMessagingPort::ASIOMessagingPort(int fd, SockAddr farEnd)
farEnd.getType() == AF_UNIX ? 0 : IPPROTO_TCP),
fd) {
#endif // MONGO_CONFIG_SSL
- asioPorts.insert(this);
_getSocket().non_blocking(true);
_remote = HostAndPort(farEnd.getAddr(), farEnd.getPort());
_timer.expires_at(decltype(_timer)::time_point::max());
@@ -177,7 +138,6 @@ ASIOMessagingPort::ASIOMessagingPort(Milliseconds timeout, logger::LogSeverity l
#else
_sock(_service) {
#endif // MONGO_CONFIG_SSL
- asioPorts.insert(this);
if (*_timeout == Milliseconds(0)) {
_timeout = boost::none;
}
@@ -187,7 +147,6 @@ ASIOMessagingPort::ASIOMessagingPort(Milliseconds timeout, logger::LogSeverity l
ASIOMessagingPort::~ASIOMessagingPort() {
shutdown();
- asioPorts.erase(this);
}
void ASIOMessagingPort::setTimeout(Milliseconds millis) {
@@ -502,6 +461,11 @@ void ASIOMessagingPort::say(Message& toSend, int responseTo) {
invariant(!toSend.empty());
toSend.header().setId(nextMessageId());
toSend.header().setResponseToMsgId(responseTo);
+ return say(const_cast<const Message&>(toSend));
+}
+
+void ASIOMessagingPort::say(const Message& toSend) {
+ invariant(!toSend.empty());
auto buf = toSend.buf();
if (buf) {
send(buf, MsgData::ConstView(buf).getLen(), nullptr);
diff --git a/src/mongo/util/net/asio_message_port.h b/src/mongo/util/net/asio_message_port.h
index d474b03ef29..d3e5fc33b1c 100644
--- a/src/mongo/util/net/asio_message_port.h
+++ b/src/mongo/util/net/asio_message_port.h
@@ -78,6 +78,7 @@ public:
void reply(Message& received, Message& response) override;
void say(Message& toSend, int responseTo = 0) override;
+ void say(const Message& toSend) override;
void send(const char* data, int len, const char*) override;
void send(const std::vector<std::pair<char*, int>>& data, const char*) override;
@@ -118,8 +119,6 @@ public:
bool secure(SSLManagerInterface* ssl, const std::string& remoteHost) override;
- static void closeSockets(AbstractMessagingPort::Tag skipMask = kSkipAllMask);
-
private:
void _setTimerCallback();
asio::error_code _read(char* buf, std::size_t size);
diff --git a/src/mongo/util/net/listen.cpp b/src/mongo/util/net/listen.cpp
index deb30887e5c..65cec09e45a 100644
--- a/src/mongo/util/net/listen.cpp
+++ b/src/mongo/util/net/listen.cpp
@@ -280,7 +280,9 @@ void Listener::initAndListen() {
}
struct timeval maxSelectTime;
- while (!inShutdown()) {
+ // The check against _finished allows us to actually stop the listener by signalling it through
+ // the _finished flag.
+ while (!inShutdown() && !_finished.load()) {
fd_set fds[1];
FD_ZERO(fds);
@@ -601,7 +603,7 @@ void Listener::_accepted(const std::shared_ptr<Socket>& psocket, long long conne
port = stdx::make_unique<MessagingPort>(psocket);
}
port->setConnectionId(connectionId);
- accepted(port.release());
+ accepted(std::move(port));
}
// ----- ListeningSockets -------
@@ -648,9 +650,8 @@ void Listener::checkTicketNumbers() {
globalTicketHolder.resize(want);
}
-void Listener::closeMessagingPorts(AbstractMessagingPort::Tag skipMask) {
- ASIOMessagingPort::closeSockets(skipMask);
- MessagingPort::closeSockets(skipMask);
+void Listener::shutdown() {
+ _finished.store(true);
}
TicketHolder Listener::globalTicketHolder(DEFAULT_MAX_CONN);
diff --git a/src/mongo/util/net/listen.h b/src/mongo/util/net/listen.h
index 0bbd3ad8a0b..f0c66a41d60 100644
--- a/src/mongo/util/net/listen.h
+++ b/src/mongo/util/net/listen.h
@@ -66,7 +66,7 @@ public:
void initAndListen(); // never returns unless error (start a thread)
/* spawn a thread, etc., then return */
- virtual void accepted(AbstractMessagingPort* mp) = 0;
+ virtual void accepted(std::unique_ptr<AbstractMessagingPort> mp) = 0;
const int _port;
@@ -83,6 +83,8 @@ public:
*/
void waitUntilListening() const;
+ void shutdown();
+
private:
std::vector<SockAddr> _mine;
std::vector<SOCKET> _socks;
@@ -94,6 +96,7 @@ private:
mutable stdx::condition_variable _readyCondition; // Used to wait for changes to _ready
// Boolean that indicates whether this Listener is ready to accept incoming network requests
bool _ready;
+ AtomicBool _finished{false};
ServiceContext* _ctx;
bool _setAsServiceCtxDecoration;
@@ -119,13 +122,6 @@ public:
/** makes sure user input is sane */
static void checkTicketNumbers();
-
- /**
- * This will close implementations of AbstractMessagingPort, skipping any that have tags
- * matching `skipMask`.
- */
- static void closeMessagingPorts(
- AbstractMessagingPort::Tag skipMask = AbstractMessagingPort::kSkipAllMask);
};
class ListeningSockets {
diff --git a/src/mongo/util/net/message.h b/src/mongo/util/net/message.h
index 37a6c0bb66b..601d02b9e12 100644
--- a/src/mongo/util/net/message.h
+++ b/src/mongo/util/net/message.h
@@ -458,11 +458,16 @@ public:
return _buf.get();
}
+ const char* buf() const {
+ return _buf.get();
+ }
+
std::string toString() const;
SharedBuffer sharedBuffer() {
return _buf;
}
+
ConstSharedBuffer sharedBuffer() const {
return _buf;
}
diff --git a/src/mongo/util/net/message_port.cpp b/src/mongo/util/net/message_port.cpp
index 5051962ed5d..137d494c4ac 100644
--- a/src/mongo/util/net/message_port.cpp
+++ b/src/mongo/util/net/message_port.cpp
@@ -63,41 +63,6 @@ using std::string;
/* messagingport -------------------------------------------------------------- */
-class Ports {
- std::set<MessagingPort*> ports;
- stdx::mutex m;
-
-public:
- Ports() : ports() {}
- void closeAll(AbstractMessagingPort::Tag skip_mask) {
- stdx::lock_guard<stdx::mutex> bl(m);
- for (std::set<MessagingPort*>::iterator i = ports.begin(); i != ports.end(); i++) {
- if ((*i)->getTag() & skip_mask) {
- LOG(3) << "Skip closing connection # " << (*i)->connectionId();
- continue;
- }
- LOG(3) << "Closing connection # " << (*i)->connectionId();
- (*i)->shutdown();
- }
- }
- void insert(MessagingPort* p) {
- stdx::lock_guard<stdx::mutex> bl(m);
- ports.insert(p);
- }
- void erase(MessagingPort* p) {
- stdx::lock_guard<stdx::mutex> bl(m);
- ports.erase(p);
- }
-};
-
-// we "new" this so it is still be around when other automatic global vars
-// are being destructed during termination.
-Ports& ports = *(new Ports());
-
-void MessagingPort::closeSockets(AbstractMessagingPort::Tag skipMask) {
- ports.closeAll(skipMask);
-}
-
MessagingPort::MessagingPort(int fd, const SockAddr& remote)
: MessagingPort(std::make_shared<Socket>(fd, remote)) {}
@@ -108,7 +73,6 @@ MessagingPort::MessagingPort(std::shared_ptr<Socket> sock)
: _x509SubjectName(), _connectionId(), _tag(), _psock(std::move(sock)) {
SockAddr sa = _psock->remoteAddr();
_remoteParsed = HostAndPort(sa.getAddr(), sa.getPort());
- ports.insert(this);
}
void MessagingPort::setTimeout(Milliseconds millis) {
@@ -122,7 +86,6 @@ void MessagingPort::shutdown() {
MessagingPort::~MessagingPort() {
shutdown();
- ports.erase(this);
}
bool MessagingPort::recv(Message& m) {
@@ -216,6 +179,7 @@ bool MessagingPort::call(Message& toSend, Message& response) {
say(toSend);
bool success = recv(response);
if (success) {
+ invariant(!response.empty());
if (response.header().getResponseToMsgId() != toSend.header().getId()) {
response.reset();
uasserted(40134, "Response ID did not match the sent message ID.");
@@ -225,9 +189,15 @@ bool MessagingPort::call(Message& toSend, Message& response) {
}
void MessagingPort::say(Message& toSend, int responseTo) {
- verify(!toSend.empty());
+ invariant(!toSend.empty());
toSend.header().setId(nextMessageId());
toSend.header().setResponseToMsgId(responseTo);
+
+ return say(const_cast<const Message&>(toSend));
+}
+
+void MessagingPort::say(const Message& toSend) {
+ invariant(!toSend.empty());
auto buf = toSend.buf();
if (buf) {
send(buf, MsgData::ConstView(buf).getLen(), "say");
diff --git a/src/mongo/util/net/message_port.h b/src/mongo/util/net/message_port.h
index 18931e20750..41ecc2fce3b 100644
--- a/src/mongo/util/net/message_port.h
+++ b/src/mongo/util/net/message_port.h
@@ -66,6 +66,7 @@ public:
bool call(Message& toSend, Message& response) override;
void say(Message& toSend, int responseTo = 0) override;
+ void say(const Message& toSend) override;
unsigned remotePort() const override {
return _psock->remotePort();
@@ -142,11 +143,6 @@ private:
long long _connectionId;
AbstractMessagingPort::Tag _tag;
std::shared_ptr<Socket> _psock;
-
-
-public:
- static void closeSockets(AbstractMessagingPort::Tag skipMask = kSkipAllMask);
};
-
} // namespace mongo
diff --git a/src/mongo/util/net/message_port_mock.cpp b/src/mongo/util/net/message_port_mock.cpp
index 6869723129f..de07ba032ae 100644
--- a/src/mongo/util/net/message_port_mock.cpp
+++ b/src/mongo/util/net/message_port_mock.cpp
@@ -55,6 +55,7 @@ void MessagingPortMock::reply(Message& received, Message& response, int32_t resp
void MessagingPortMock::reply(Message& received, Message& response) {}
void MessagingPortMock::say(Message& toSend, int responseTo) {}
+void MessagingPortMock::say(const Message& toSend) {}
bool MessagingPortMock::connect(SockAddr& farEnd) {
return true;
diff --git a/src/mongo/util/net/message_port_mock.h b/src/mongo/util/net/message_port_mock.h
index 61ab5be8958..721998f81f3 100644
--- a/src/mongo/util/net/message_port_mock.h
+++ b/src/mongo/util/net/message_port_mock.h
@@ -55,6 +55,7 @@ public:
void reply(Message& received, Message& response) override;
void say(Message& toSend, int responseTo = 0) override;
+ void say(const Message& toSend) override;
bool connect(SockAddr& farEnd) override;
diff --git a/src/mongo/util/net/message_server.h b/src/mongo/util/net/message_server.h
deleted file mode 100644
index 544c859c6f7..00000000000
--- a/src/mongo/util/net/message_server.h
+++ /dev/null
@@ -1,83 +0,0 @@
-// message_server.h
-
-/* Copyright 2009 10gen Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects
- * for all of the code used other than as permitted herein. If you modify
- * file(s) with this exception, you may extend this exception to your
- * version of the file(s), but you are not obligated to do so. If you do not
- * wish to do so, delete this exception statement from your version. If you
- * delete this exception statement from all source files in the program,
- * then also delete it in the license file.
- */
-
-/*
- abstract database server
- async io core, worker thread system
- */
-
-#pragma once
-
-#include "mongo/platform/basic.h"
-
-namespace mongo {
-
-class ServiceContext;
-
-class MessageHandler {
-public:
- virtual ~MessageHandler() {}
-
- /**
- * Called once when a socket is connected.
- */
- virtual void connected(AbstractMessagingPort* p) = 0;
-
- /**
- * Called every time a message comes in. Handler is responsible for responding to client.
- */
- virtual void process(Message& m, AbstractMessagingPort* p) = 0;
-
- /**
- * Called once, either when the client disconnects or when the process is shutting down. After
- * close() is called, this handler's AbstractMessagingPort pointer (passed in via the
- * connected() method) is no longer valid.
- */
- virtual void close() = 0;
-};
-
-class MessageServer {
-public:
- struct Options {
- int port; // port to bind to
- std::string ipList; // addresses to bind to
-
- Options() : port(0), ipList("") {}
- };
-
- virtual ~MessageServer() {}
- virtual void run() = 0;
- virtual bool setupSockets() = 0;
-};
-
-// TODO use a factory here to decide between port and asio variations
-MessageServer* createServer(const MessageServer::Options& opts,
- std::shared_ptr<MessageHandler> handler,
- ServiceContext* ctx);
-}
diff --git a/src/mongo/util/net/message_server_port.cpp b/src/mongo/util/net/message_server_port.cpp
deleted file mode 100644
index e8432f8c510..00000000000
--- a/src/mongo/util/net/message_server_port.cpp
+++ /dev/null
@@ -1,244 +0,0 @@
-// message_server_port.cpp
-
-/* Copyright 2009 10gen Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects
- * for all of the code used other than as permitted herein. If you modify
- * file(s) with this exception, you may extend this exception to your
- * version of the file(s), but you are not obligated to do so. If you do not
- * wish to do so, delete this exception statement from your version. If you
- * delete this exception statement from all source files in the program,
- * then also delete it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
-
-#include "mongo/platform/basic.h"
-
-#include <memory>
-#include <system_error>
-
-#include "mongo/base/disallow_copying.h"
-#include "mongo/config.h"
-#include "mongo/db/lasterror.h"
-#include "mongo/db/server_options.h"
-#include "mongo/db/stats/counters.h"
-#include "mongo/stdx/functional.h"
-#include "mongo/stdx/memory.h"
-#include "mongo/stdx/thread.h"
-#include "mongo/util/concurrency/thread_name.h"
-#include "mongo/util/concurrency/ticketholder.h"
-#include "mongo/util/debug_util.h"
-#include "mongo/util/exit.h"
-#include "mongo/util/log.h"
-#include "mongo/util/mongoutils/str.h"
-#include "mongo/util/net/abstract_message_port.h"
-#include "mongo/util/net/listen.h"
-#include "mongo/util/net/message.h"
-#include "mongo/util/net/message_server.h"
-#include "mongo/util/net/socket_exception.h"
-#include "mongo/util/net/ssl_manager.h"
-#include "mongo/util/net/thread_idle_callback.h"
-#include "mongo/util/quick_exit.h"
-#include "mongo/util/scopeguard.h"
-
-#ifdef __linux__ // TODO: consider making this ifndef _WIN32
-#include <sys/resource.h>
-#endif
-
-#if !defined(__has_feature)
-#define __has_feature(x) 0
-#endif
-
-namespace mongo {
-
-using std::unique_ptr;
-using std::endl;
-
-namespace {
-
-using MessagingPortWithHandler = std::pair<AbstractMessagingPort*, std::shared_ptr<MessageHandler>>;
-
-} // namespace
-
-class PortMessageServer : public MessageServer, public Listener {
-public:
- /**
- * Creates a new message server.
- *
- * @param opts
- * @param handler the handler to use.
- */
- PortMessageServer(const MessageServer::Options& opts,
- std::shared_ptr<MessageHandler> handler,
- ServiceContext* ctx)
- : Listener("", opts.ipList, opts.port, ctx, true), _handler(std::move(handler)) {}
-
- virtual void accepted(AbstractMessagingPort* mp) {
- ScopeGuard sleepAfterClosingPort = MakeGuard(sleepmillis, 2);
- auto portWithHandler = stdx::make_unique<MessagingPortWithHandler>(mp, _handler);
-
- if (!Listener::globalTicketHolder.tryAcquire()) {
- log() << "connection refused because too many open connections: "
- << Listener::globalTicketHolder.used() << endl;
- return;
- }
-
- try {
-#ifndef __linux__ // TODO: consider making this ifdef _WIN32
- {
- stdx::thread thr(stdx::bind(&handleIncomingMsg, portWithHandler.get()));
- thr.detach();
- }
-#else
- pthread_attr_t attrs;
- pthread_attr_init(&attrs);
- pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED);
-
- static const size_t STACK_SIZE =
- 1024 * 1024; // if we change this we need to update the warning
-
- struct rlimit limits;
- verify(getrlimit(RLIMIT_STACK, &limits) == 0);
- if (limits.rlim_cur > STACK_SIZE) {
- size_t stackSizeToSet = STACK_SIZE;
-#if !__has_feature(address_sanitizer)
- if (kDebugBuild)
- stackSizeToSet /= 2;
-#endif
- pthread_attr_setstacksize(&attrs, stackSizeToSet);
- } else if (limits.rlim_cur < 1024 * 1024) {
- warning() << "Stack size set to " << (limits.rlim_cur / 1024)
- << "KB. We suggest 1MB" << endl;
- }
-
-
- pthread_t thread;
- int failed = pthread_create(&thread, &attrs, &handleIncomingMsg, portWithHandler.get());
-
- pthread_attr_destroy(&attrs);
-
- if (failed) {
- log() << "pthread_create failed: " << errnoWithDescription(failed) << endl;
- throw std::system_error(
- std::make_error_code(std::errc::resource_unavailable_try_again));
- }
-#endif // __linux__
-
- portWithHandler.release();
- sleepAfterClosingPort.Dismiss();
- } catch (...) {
- Listener::globalTicketHolder.release();
- log() << "failed to create thread after accepting new connection, closing connection";
- }
- }
-
- virtual bool setupSockets() {
- return Listener::setupSockets();
- }
-
- void run() {
- initAndListen();
- }
-
- virtual bool useUnixSockets() const {
- return true;
- }
-
-private:
- const std::shared_ptr<MessageHandler> _handler;
-
- /**
- * Handles incoming messages from a given socket.
- *
- * Terminating conditions:
- * 1. Assertions while handling the request.
- * 2. Socket is closed.
- * 3. Server is shutting down (based on inShutdown)
- *
- * @param arg this method is in charge of cleaning up the arg object.
- *
- * @return NULL
- */
- static void* handleIncomingMsg(void* arg) {
- TicketHolderReleaser connTicketReleaser(&Listener::globalTicketHolder);
-
- invariant(arg);
- unique_ptr<MessagingPortWithHandler> portWithHandler(
- static_cast<MessagingPortWithHandler*>(arg));
- auto mp = portWithHandler->first;
- auto handler = portWithHandler->second;
-
- setThreadName(std::string(str::stream() << "conn" << mp->connectionId()));
- mp->setLogLevel(logger::LogSeverity::Debug(1));
-
- Message m;
- int64_t counter = 0;
- try {
- handler->connected(mp);
- ON_BLOCK_EXIT([handler]() { handler->close(); });
-
- while (!inShutdown()) {
- m.reset();
- mp->clearCounters();
-
- if (!mp->recv(m)) {
- if (!serverGlobalParams.quiet) {
- int conns = Listener::globalTicketHolder.used() - 1;
- const char* word = (conns == 1 ? " connection" : " connections");
- log() << "end connection " << mp->remote().toString() << " (" << conns
- << word << " now open)" << endl;
- }
- break;
- }
-
- handler->process(m, mp);
- networkCounter.hit(mp->getBytesIn(), mp->getBytesOut());
-
- // Occasionally we want to see if we're using too much memory.
- if ((counter++ & 0xf) == 0) {
- markThreadIdle();
- }
- }
- } catch (AssertionException& e) {
- log() << "AssertionException handling request, closing client connection: " << e
- << endl;
- } catch (SocketException& e) {
- log() << "SocketException handling request, closing client connection: " << e << endl;
- } catch (const DBException&
- e) { // must be right above std::exception to avoid catching subclasses
- log() << "DBException handling request, closing client connection: " << e << endl;
- } catch (std::exception& e) {
- error() << "Uncaught std::exception: " << e.what() << ", terminating" << endl;
- quickExit(EXIT_UNCAUGHT);
- }
- mp->shutdown();
-
- return NULL;
- }
-};
-
-
-MessageServer* createServer(const MessageServer::Options& opts,
- std::shared_ptr<MessageHandler> handler,
- ServiceContext* ctx) {
- return new PortMessageServer(opts, std::move(handler), ctx);
-}
-
-} // namespace mongo
diff --git a/src/mongo/util/net/miniwebserver.cpp b/src/mongo/util/net/miniwebserver.cpp
index ec64f174560..78f81f86134 100644
--- a/src/mongo/util/net/miniwebserver.cpp
+++ b/src/mongo/util/net/miniwebserver.cpp
@@ -204,7 +204,7 @@ void MiniWebServer::_accepted(const std::shared_ptr<Socket>& psock, long long co
}
}
-void MiniWebServer::accepted(AbstractMessagingPort* mp) {
+void MiniWebServer::accepted(std::unique_ptr<AbstractMessagingPort> mp) {
MONGO_UNREACHABLE;
}
diff --git a/src/mongo/util/net/miniwebserver.h b/src/mongo/util/net/miniwebserver.h
index a2d0690225e..122281555fc 100644
--- a/src/mongo/util/net/miniwebserver.h
+++ b/src/mongo/util/net/miniwebserver.h
@@ -69,7 +69,7 @@ public:
}
// This is not currently used for the MiniWebServer. See SERVER-24200
- void accepted(AbstractMessagingPort* mp) override;
+ void accepted(std::unique_ptr<AbstractMessagingPort> mp) override;
private:
void _accepted(const std::shared_ptr<Socket>& psocket, long long connectionId) override;
diff --git a/src/mongo/util/net/sockaddr.cpp b/src/mongo/util/net/sockaddr.cpp
index 00960f2fd97..5236a76892f 100644
--- a/src/mongo/util/net/sockaddr.cpp
+++ b/src/mongo/util/net/sockaddr.cpp
@@ -46,8 +46,8 @@
#endif
#endif
+#include "mongo/bson/util/builder.h"
#include "mongo/util/log.h"
-#include "mongo/util/mongoutils/str.h"
#include "mongo/util/net/sock.h"
namespace mongo {
@@ -150,10 +150,19 @@ bool SockAddr::isLocalHost() const {
}
std::string SockAddr::toString(bool includePort) const {
- std::string out = getAddr();
- if (includePort && getType() != AF_UNIX && getType() != AF_UNSPEC)
- out += mongoutils::str::stream() << ':' << getPort();
- return out;
+ if (includePort && (getType() != AF_UNIX) && (getType() != AF_UNSPEC)) {
+ StringBuilder ss;
+
+ if (getType() == AF_INET6) {
+ ss << '[' << getAddr() << "]:" << getPort();
+ } else {
+ ss << getAddr() << ':' << getPort();
+ }
+
+ return ss.str();
+ } else {
+ return getAddr();
+ }
}
sa_family_t SockAddr::getType() const {
diff --git a/src/mongo/util/ntservice_test.cpp b/src/mongo/util/ntservice_test.cpp
index 0a0fa6bde64..ad5b7e7ed2a 100644
--- a/src/mongo/util/ntservice_test.cpp
+++ b/src/mongo/util/ntservice_test.cpp
@@ -34,7 +34,6 @@
#include <string>
#include <vector>
-
#include "mongo/db/client.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/ntservice.h"
diff --git a/src/mongo/util/tcmalloc_server_status_section.cpp b/src/mongo/util/tcmalloc_server_status_section.cpp
index 7a8eb49e25b..303076ba784 100644
--- a/src/mongo/util/tcmalloc_server_status_section.cpp
+++ b/src/mongo/util/tcmalloc_server_status_section.cpp
@@ -38,6 +38,8 @@
#include "mongo/base/init.h"
#include "mongo/db/commands/server_status.h"
+#include "mongo/db/service_context.h"
+#include "mongo/transport/transport_layer.h"
#include "mongo/util/log.h"
#include "mongo/util/net/listen.h"
#include "mongo/util/net/thread_idle_callback.h"
@@ -58,7 +60,8 @@ stdx::mutex tcmallocCleanupLock;
* favorable times. Ideally would do some milder cleanup or scavenge...
*/
void threadStateChange() {
- if (Listener::globalTicketHolder.used() <= kManyClients)
+ if (getGlobalServiceContext()->getTransportLayer()->sessionStats().numOpenSessions <=
+ kManyClients)
return;
#if MONGO_HAVE_GPERFTOOLS_GET_THREAD_CACHE_SIZE