diff options
author | Samantha Ritter <samantha.ritter@10gen.com> | 2016-05-31 14:05:17 -0400 |
---|---|---|
committer | Jason Carey <jcarey@argv.me> | 2016-07-12 18:38:37 -0400 |
commit | c263ce1f95586f8652058e6202015a77f9becc49 (patch) | |
tree | d623fb9da9fd5da3cc4e20cac0653f1fa4af00eb /src | |
parent | dead3cf8b4b3cb5528ad1abb9eeb722b395e3632 (diff) | |
download | mongo-c263ce1f95586f8652058e6202015a77f9becc49.tar.gz |
SERVER-24162 Integrate TransportLayer
Expand the transport layer as needed to replace uses of abstract message port for ingress
networking.
Diffstat (limited to 'src')
84 files changed, 2186 insertions, 819 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript index a13e463dd2b..a1a1a609e46 100644 --- a/src/mongo/SConscript +++ b/src/mongo/SConscript @@ -254,6 +254,8 @@ mongodLibDeps = [ "db/repl/storage_interface_impl", "executor/network_interface_factory", 's/commands/shared_cluster_commands', + "transport/transport_layer_legacy", + "transport/service_entry_point_utils", "util/clock_sources", "util/ntservice", ] @@ -266,6 +268,7 @@ mongod = env.Program( source=[ "db/db.cpp", "db/mongod_options_init.cpp", + "db/service_entry_point_mongod.cpp", ], LIBDEPS=mongodLibDeps, ) @@ -297,6 +300,7 @@ env.Install( 's/client/sharding_connection_hook_for_mongos.cpp', 's/mongos_options_init.cpp', 's/server.cpp', + 's/service_entry_point_mongos.cpp', ], LIBDEPS=[ 'db/conn_pool_options', @@ -309,6 +313,8 @@ env.Install( 's/coreshard', 's/mongoscore', 's/sharding_initialization', + 'transport/service_entry_point_utils', + 'transport/transport_layer_legacy', 'util/clock_sources', 'util/ntservice', 'util/options_parser/options_parser_init', diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err index fa413cfd6dd..ab0fbc2c2f7 100644 --- a/src/mongo/base/error_codes.err +++ b/src/mongo/base/error_codes.err @@ -170,6 +170,7 @@ error_code("OperatorNotSupportedOnView", 168) error_code("CommandOnShardedViewNotSupportedOnMongos", 169) error_code("TooManyMatchingDocuments", 170) error_code("CannotIndexParallelArrays", 171) +error_code("TransportSessionNotFound", 172) # Non-sequential error codes (for compatibility only) error_code("SocketException", 9001) diff --git a/src/mongo/client/SConscript b/src/mongo/client/SConscript index 78601d0b1b6..3a0d63c4320 100644 --- a/src/mongo/client/SConscript +++ b/src/mongo/client/SConscript @@ -256,7 +256,7 @@ env.CppUnitTest( 'clientdriver', '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', '$BUILD_DIR/mongo/db/service_context_noop_init', - '$BUILD_DIR/mongo/util/net/message_server_port', + '$BUILD_DIR/mongo/transport/transport_layer_legacy', '$BUILD_DIR/mongo/util/net/network', ], ) diff --git a/src/mongo/client/scoped_db_conn_test.cpp b/src/mongo/client/scoped_db_conn_test.cpp index 9ee72a63953..ba54e4ec733 100644 --- a/src/mongo/client/scoped_db_conn_test.cpp +++ b/src/mongo/client/scoped_db_conn_test.cpp @@ -41,11 +41,14 @@ #include "mongo/rpc/request_interface.h" #include "mongo/stdx/mutex.h" #include "mongo/stdx/thread.h" +#include "mongo/transport/service_entry_point.h" +#include "mongo/transport/session.h" +#include "mongo/transport/transport_layer.h" +#include "mongo/transport/transport_layer_legacy.h" #include "mongo/unittest/unittest.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" #include "mongo/util/net/listen.h" -#include "mongo/util/net/message_server.h" #include "mongo/util/net/socket_exception.h" #include "mongo/util/quick_exit.h" #include "mongo/util/time_support.h" @@ -72,12 +75,30 @@ namespace { const string TARGET_HOST = "localhost:27017"; const int TARGET_PORT = 27017; -class DummyMessageHandler final : public MessageHandler { +class DummyServiceEntryPoint : public ServiceEntryPoint { + MONGO_DISALLOW_COPYING(DummyServiceEntryPoint); + public: - virtual void connected(AbstractMessagingPort* p) {} + DummyServiceEntryPoint() {} + + virtual ~DummyServiceEntryPoint() { + for (auto& t : _threads) { + t.join(); + } + } + + void startSession(transport::Session&& session) override { + _threads.emplace_back(&DummyServiceEntryPoint::run, this, std::move(session)); + } + +private: + void run(transport::Session&& session) { + Message inMessage; + if (!session.sourceMessage(&inMessage).wait().isOK()) { + return; + } - virtual void process(Message& m, AbstractMessagingPort* port) { - auto request = rpc::makeRequest(&m); + auto request = rpc::makeRequest(&inMessage); auto reply = rpc::makeReplyBuilder(request->getProtocol()); BSONObjBuilder commandResponse; @@ -92,10 +113,14 @@ public: .setMetadata(rpc::makeEmptyMetadata()) .done(); - port->reply(m, response); + response.header().setResponseToMsgId(inMessage.header().getId()); + + if (!session.sinkMessage(response).wait().isOK()) { + return; + } } - virtual void close() {} + std::vector<stdx::thread> _threads; }; // TODO: Take this out and make it as a reusable class in a header file. The only @@ -130,15 +155,15 @@ public: * @param messageHandler the message handler to use for this server. Ownership * of this object is passed to this server. */ - void run(std::shared_ptr<MessageHandler> messsageHandler) { - if (_server != NULL) { + void run(std::shared_ptr<ServiceEntryPoint> serviceEntryPoint) { + if (_server) { return; } - MessageServer::Options options; + transport::TransportLayerLegacy::Options options; options.port = _port; - _server.reset(createServer(options, std::move(messsageHandler), getGlobalServiceContext())); + _server = stdx::make_unique<transport::TransportLayerLegacy>(options, serviceEntryPoint); _serverThread = stdx::thread(runServer, _server.get()); } @@ -164,23 +189,23 @@ public: sleepmillis(500); connCount = Listener::globalTicketHolder.used(); } - + _server->shutdown(); _server.reset(); } /** * Helper method for running the server on a separate thread. */ - static void runServer(MessageServer* server) { - server->setupSockets(); - server->run(); + static void runServer(transport::TransportLayerLegacy* server) { + server->setup(); + server->start(); } private: const int _port; stdx::thread _serverThread; - unique_ptr<MessageServer> _server; + unique_ptr<transport::TransportLayerLegacy> _server; }; /** @@ -190,9 +215,9 @@ class DummyServerFixture : public unittest::Test { public: void setUp() { _maxPoolSizePerHost = globalConnPool.getMaxPoolSize(); - _dummyServer = new DummyServer(TARGET_PORT); + _dummyServer = stdx::make_unique<DummyServer>(TARGET_PORT); - auto dummyHandler = std::make_shared<DummyMessageHandler>(); + auto dummyHandler = std::make_shared<DummyServiceEntryPoint>(); _dummyServer->run(std::move(dummyHandler)); DBClientConnection conn; Timer timer; @@ -212,7 +237,7 @@ public: void tearDown() { ScopedDbConnection::clearPool(); - delete _dummyServer; + _dummyServer.reset(); globalConnPool.setMaxPoolSize(_maxPoolSizePerHost); } @@ -272,12 +297,12 @@ protected: } private: - static void runServer(MessageServer* server) { - server->setupSockets(); - server->run(); + static void runServer(transport::TransportLayerLegacy* server) { + server->setup(); + server->start(); } - DummyServer* _dummyServer; + std::unique_ptr<DummyServer> _dummyServer; uint32_t _maxPoolSizePerHost; }; diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 92f97280964..ba8c1e855bd 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -100,6 +100,7 @@ env.Library( "dbmessage.cpp", ], LIBDEPS=[ + '$BUILD_DIR/mongo/transport/transport_layer_common', '$BUILD_DIR/mongo/util/net/network', ] ) @@ -402,7 +403,6 @@ env.Library( "$BUILD_DIR/mongo/rpc/command_reply", "$BUILD_DIR/mongo/rpc/command_request", "$BUILD_DIR/mongo/rpc/metadata", - "$BUILD_DIR/mongo/util/net/message_server_port", "$BUILD_DIR/mongo/util/net/miniwebserver", "$BUILD_DIR/mongo/util/processinfo", "$BUILD_DIR/mongo/util/signal_handlers", @@ -473,6 +473,8 @@ env.Library( '$BUILD_DIR/mongo/util/decorable', '$BUILD_DIR/mongo/util/fail_point', '$BUILD_DIR/mongo/util/net/hostandport', + '$BUILD_DIR/mongo/transport/transport_layer_common', + '$BUILD_DIR/mongo/transport/transport_layer_manager', ], ) diff --git a/src/mongo/db/auth/sasl_commands.cpp b/src/mongo/db/auth/sasl_commands.cpp index f006cded7d0..0fefd3b6cc9 100644 --- a/src/mongo/db/auth/sasl_commands.cpp +++ b/src/mongo/db/auth/sasl_commands.cpp @@ -187,10 +187,10 @@ Status doSaslStep(const ClientBasic* client, status = session->step(payload, &responsePayload); if (!status.isOK()) { - const SockAddr clientAddr = client->port()->remoteAddr(); log() << session->getMechanism() << " authentication failed for " << session->getPrincipalId() << " on " << session->getAuthenticationDatabase() - << " from client " << clientAddr.getAddr() << " ; " << status.toString() << std::endl; + << " from client " << client->getRemote().toString() << " ; " << status.toString() + << std::endl; sleepmillis(saslGlobalParams.authFailedDelay); // All the client needs to know is that authentication has failed. diff --git a/src/mongo/db/client.cpp b/src/mongo/db/client.cpp index 807a7650aa5..2a665268811 100644 --- a/src/mongo/db/client.cpp +++ b/src/mongo/db/client.cpp @@ -44,6 +44,7 @@ #include "mongo/db/lasterror.h" #include "mongo/db/service_context.h" #include "mongo/stdx/thread.h" +#include "mongo/transport/session.h" #include "mongo/util/concurrency/thread_name.h" #include "mongo/util/exit.h" #include "mongo/util/mongoutils/str.h" @@ -63,16 +64,16 @@ void Client::initThreadIfNotAlready() { initThreadIfNotAlready(getThreadName().c_str()); } -void Client::initThread(const char* desc, AbstractMessagingPort* mp) { - initThread(desc, getGlobalServiceContext(), mp); +void Client::initThread(const char* desc, transport::Session* session) { + initThread(desc, getGlobalServiceContext(), session); } -void Client::initThread(const char* desc, ServiceContext* service, AbstractMessagingPort* mp) { +void Client::initThread(const char* desc, ServiceContext* service, transport::Session* session) { invariant(currentClient.getMake()->get() == nullptr); std::string fullDesc; - if (mp != NULL) { - fullDesc = str::stream() << desc << mp->connectionId(); + if (session) { + fullDesc = str::stream() << desc << session->id(); } else { fullDesc = desc; } @@ -80,7 +81,7 @@ void Client::initThread(const char* desc, ServiceContext* service, AbstractMessa setThreadName(fullDesc.c_str()); // Create the client obj, attach to thread - *currentClient.get() = service->makeClient(fullDesc, mp); + *currentClient.get() = service->makeClient(fullDesc, session); } void Client::destroy() { @@ -98,11 +99,11 @@ int64_t generateSeed(const std::string& desc) { } } // namespace -Client::Client(std::string desc, ServiceContext* serviceContext, AbstractMessagingPort* p) - : ClientBasic(serviceContext, p), +Client::Client(std::string desc, ServiceContext* serviceContext, transport::Session* session) + : ClientBasic(serviceContext, session), _desc(std::move(desc)), _threadId(stdx::this_thread::get_id()), - _connectionId(p ? p->connectionId() : 0), + _connectionId(session ? session->id() : 0), _prng(generateSeed(_desc)) {} void Client::reportState(BSONObjBuilder& builder) { diff --git a/src/mongo/db/client.h b/src/mongo/db/client.h index 407a0bac868..a7523981e47 100644 --- a/src/mongo/db/client.h +++ b/src/mongo/db/client.h @@ -51,6 +51,10 @@ class AbstractMessagingPort; class Collection; class OperationContext; +namespace transport { +class Session; +} // namespace transport + typedef long long ConnectionId; /** the database's concept of an outside "client" */ @@ -59,16 +63,16 @@ public: /** * Creates a Client object and stores it in TLS for the current thread. * - * An unowned pointer to a AbstractMessagingPort may optionally be provided. If 'mp' is - * non-null, then it will be used to augment the thread name, and for reporting purposes. + * An unowned pointer to a transport::Session may optionally be provided. If 'session' + * is non-null, then it will be used to augment the thread name, and for reporting purposes. * - * If provided, 'mp' must outlive the newly-created Client object. Client::destroy() may be used - * to help enforce that the Client does not outlive 'mp'. + * If provided, 'session' must outlive the newly-created Client object. Client::destroy() may + * be used to help enforce that the Client does not outlive 'session.' */ - static void initThread(const char* desc, AbstractMessagingPort* mp = 0); + static void initThread(const char* desc, transport::Session* session = nullptr); static void initThread(const char* desc, ServiceContext* serviceContext, - AbstractMessagingPort* mp); + transport::Session* session); /** * Inits a thread if that thread has not already been init'd, setting the thread name to @@ -159,7 +163,7 @@ public: private: friend class ServiceContext; - Client(std::string desc, ServiceContext* serviceContext, AbstractMessagingPort* p = 0); + Client(std::string desc, ServiceContext* serviceContext, transport::Session* session = nullptr); // Description for the client (e.g. conn8) diff --git a/src/mongo/db/client_basic.cpp b/src/mongo/db/client_basic.cpp index 06e619597c3..578ebdf668d 100644 --- a/src/mongo/db/client_basic.cpp +++ b/src/mongo/db/client_basic.cpp @@ -32,8 +32,8 @@ namespace mongo { -ClientBasic::ClientBasic(ServiceContext* serviceContext, AbstractMessagingPort* messagingPort) - : _serviceContext(serviceContext), _messagingPort(messagingPort) {} +ClientBasic::ClientBasic(ServiceContext* serviceContext, transport::Session* session) + : _serviceContext(serviceContext), _session(session) {} ClientBasic::~ClientBasic() = default; diff --git a/src/mongo/db/client_basic.h b/src/mongo/db/client_basic.h index 88d7ef4afe8..09f31b5da42 100644 --- a/src/mongo/db/client_basic.h +++ b/src/mongo/db/client_basic.h @@ -31,6 +31,7 @@ #include <memory> #include "mongo/base/disallow_copying.h" +#include "mongo/transport/session.h" #include "mongo/util/decorable.h" #include "mongo/util/net/abstract_message_port.h" #include "mongo/util/net/hostandport.h" @@ -58,11 +59,12 @@ public: } bool hasRemote() const { - return _messagingPort; + return _session; } + HostAndPort getRemote() const { - verify(_messagingPort); - return _messagingPort->remote(); + verify(_session); + return _session->remote(); } /** @@ -73,20 +75,20 @@ public: } /** - * Returns the AbstractMessagePort to which this client session is bound, if any. + * Returns the Session to which this client is bound, if any. */ - AbstractMessagingPort* port() const { - return _messagingPort; + transport::Session* session() const { + return _session; } static ClientBasic* getCurrent(); protected: - ClientBasic(ServiceContext* serviceContext, AbstractMessagingPort* messagingPort); + ClientBasic(ServiceContext* serviceContext, transport::Session* session); ~ClientBasic(); private: ServiceContext* const _serviceContext; - AbstractMessagingPort* const _messagingPort; + transport::Session* const _session; }; } diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript index 919510c4b79..87c472dc519 100644 --- a/src/mongo/db/commands/SConscript +++ b/src/mongo/db/commands/SConscript @@ -71,6 +71,7 @@ env.Library( '$BUILD_DIR/mongo/s/coreshard', '$BUILD_DIR/mongo/s/write_ops/batch_write_types', '$BUILD_DIR/mongo/scripting/scripting_common', + '$BUILD_DIR/mongo/transport/transport_layer_common', '$BUILD_DIR/mongo/util/cmdline_utils/cmdline_utils', '$BUILD_DIR/mongo/util/foundation', '$BUILD_DIR/mongo/util/ntservice', diff --git a/src/mongo/db/commands/authentication_commands.cpp b/src/mongo/db/commands/authentication_commands.cpp index 6fc6268a807..2f05971ef56 100644 --- a/src/mongo/db/commands/authentication_commands.cpp +++ b/src/mongo/db/commands/authentication_commands.cpp @@ -56,6 +56,7 @@ #include "mongo/db/server_options.h" #include "mongo/platform/random.h" #include "mongo/stdx/memory.h" +#include "mongo/transport/session.h" #include "mongo/util/concurrency/mutex.h" #include "mongo/util/log.h" #include "mongo/util/md5.hpp" @@ -314,17 +315,17 @@ Status CmdAuthenticate::_authenticateX509(OperationContext* txn, ClientBasic* client = ClientBasic::getCurrent(); AuthorizationSession* authorizationSession = AuthorizationSession::get(client); - std::string clientSubjectName = client->port()->getX509SubjectName(); + auto clientName = client->session()->getX509SubjectName(); if (!getSSLManager()->getSSLConfiguration().hasCA) { return Status(ErrorCodes::AuthenticationFailed, "Unable to verify x.509 certificate, as no CA has been provided."); - } else if (user.getUser() != clientSubjectName) { + } else if (user.getUser() != clientName) { return Status(ErrorCodes::AuthenticationFailed, "There is no x.509 client certificate matching the user."); } else { // Handle internal cluster member auth, only applies to server-server connections - if (getSSLManager()->getSSLConfiguration().isClusterMember(clientSubjectName)) { + if (getSSLManager()->getSSLConfiguration().isClusterMember(clientName)) { int clusterAuthMode = serverGlobalParams.clusterAuthMode.load(); if (clusterAuthMode == ServerGlobalParams::ClusterAuthMode_undefined || clusterAuthMode == ServerGlobalParams::ClusterAuthMode_keyFile) { diff --git a/src/mongo/db/commands/server_status.cpp b/src/mongo/db/commands/server_status.cpp index bbe7e9930a7..e567a15b18b 100644 --- a/src/mongo/db/commands/server_status.cpp +++ b/src/mongo/db/commands/server_status.cpp @@ -47,9 +47,9 @@ #include "mongo/db/service_context.h" #include "mongo/db/stats/counters.h" #include "mongo/platform/process_id.h" +#include "mongo/transport/transport_layer.h" #include "mongo/util/log.h" #include "mongo/util/net/hostname_canonicalization_worker.h" -#include "mongo/util/net/listen.h" #include "mongo/util/net/ssl_manager.h" #include "mongo/util/processinfo.h" #include "mongo/util/ramlog.h" @@ -228,9 +228,10 @@ public: BSONObj generateSection(OperationContext* txn, const BSONElement& configElement) const { BSONObjBuilder bb; - bb.append("current", Listener::globalTicketHolder.used()); - bb.append("available", Listener::globalTicketHolder.available()); - bb.append("totalCreated", Listener::globalConnectionNumber.load()); + auto stats = txn->getServiceContext()->getTransportLayer()->sessionStats(); + bb.append("current", static_cast<int>(stats.numOpenSessions)); + bb.append("available", static_cast<int>(stats.numAvailableSessions)); + bb.append("totalCreated", static_cast<int>(stats.numCreatedSessions)); return bb.obj(); } diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index a10b0be5a39..56dff7cecaf 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -96,6 +96,7 @@ #include "mongo/db/service_context.h" #include "mongo/db/service_context_d.h" #include "mongo/db/service_context_d.h" +#include "mongo/db/service_entry_point_mongod.h" #include "mongo/db/startup_warnings_mongod.h" #include "mongo/db/stats/counters.h" #include "mongo/db/stats/snapshots.h" @@ -114,6 +115,7 @@ #include "mongo/stdx/future.h" #include "mongo/stdx/memory.h" #include "mongo/stdx/thread.h" +#include "mongo/transport/transport_layer_legacy.h" #include "mongo/util/assert_util.h" #include "mongo/util/cmdline_utils/censor_cmdline.h" #include "mongo/util/concurrency/task.h" @@ -124,7 +126,6 @@ #include "mongo/util/log.h" #include "mongo/util/net/hostname_canonicalization_worker.h" #include "mongo/util/net/listen.h" -#include "mongo/util/net/message_server.h" #include "mongo/util/net/ssl_manager.h" #include "mongo/util/ntservice.h" #include "mongo/util/options_parser/startup_options.h" @@ -169,65 +170,6 @@ ntservice::NtServiceDefaultStrings defaultServiceStrings = { Timer startupSrandTimer; -class MyMessageHandler : public MessageHandler { -public: - virtual void connected(AbstractMessagingPort* p) { - Client::initThread("conn", p); - } - - virtual void process(Message& m, AbstractMessagingPort* port) { - while (true) { - if (inShutdown()) { - log() << "got request after shutdown()" << endl; - break; - } - - DbResponse dbresponse; - { - auto opCtx = getGlobalServiceContext()->makeOperationContext(&cc()); - assembleResponse(opCtx.get(), m, dbresponse, port->remote()); - - // opCtx must go out of scope here so that the operation cannot show up in currentOp - // results after the response reaches the client - } - - if (!dbresponse.response.empty()) { - port->reply(m, dbresponse.response, dbresponse.responseToMsgId); - if (dbresponse.exhaustNS.size() > 0) { - MsgData::View header = dbresponse.response.header(); - QueryResult::View qr = header.view2ptr(); - long long cursorid = qr.getCursorId(); - if (cursorid) { - verify(dbresponse.exhaustNS.size() && dbresponse.exhaustNS[0]); - string ns = dbresponse.exhaustNS; // before reset() free's it... - m.reset(); - BufBuilder b(512); - b.appendNum((int)0 /*size set later*/); - b.appendNum(header.getId()); - b.appendNum(header.getResponseToMsgId()); - b.appendNum((int)dbGetMore); - b.appendNum((int)0); - b.appendStr(ns); - b.appendNum((int)0); // ntoreturn - b.appendNum(cursorid); - - MsgData::View header = b.buf(); - header.setLen(b.len()); - m.setData(b.release()); - DEV log() << "exhaust=true sending more"; - continue; // this goes back to top loop - } - } - } - break; - } - } - - virtual void close() { - Client::destroy(); - } -}; - static void logStartup(OperationContext* txn) { BSONObjBuilder toLog; stringstream id; @@ -530,7 +472,7 @@ static void _initWireSpec() { } -static void _initAndListen(int listenPort) { +static ExitCode _initAndListen(int listenPort) { Client::initThread("initandlisten"); _initWireSpec(); @@ -570,20 +512,18 @@ static void _initAndListen(int listenPort) { checked_cast<ServiceContextMongoD*>(getGlobalServiceContext())->createLockFile(); - // Due to SERVER-15389, we must setupSockets first thing at startup in order to avoid - // obtaining too high a file descriptor for our calls to select(). - MessageServer::Options options; + transport::TransportLayerLegacy::Options options; options.port = listenPort; options.ipList = serverGlobalParams.bind_ip; - auto handler = std::make_shared<MyMessageHandler>(); - MessageServer* server = createServer(options, std::move(handler), getGlobalServiceContext()); - - // This is what actually creates the sockets, but does not yet listen on them because we - // do not want connections to just hang if recovery takes a very long time. - if (!server->setupSockets()) { - error() << "Failed to set up sockets during startup."; - return; + // Create, start, and attach the TL + auto transportLayer = stdx::make_unique<transport::TransportLayerLegacy>( + options, + std::make_shared<ServiceEntryPointMongod>(getGlobalServiceContext()->getTransportLayer())); + auto res = transportLayer->setup(); + if (!res.isOK()) { + error() << "Failed to set up listener: " << res.toString(); + return EXIT_NET_ERROR; } std::shared_ptr<DbWebServer> dbWebServer; @@ -594,7 +534,7 @@ static void _initAndListen(int listenPort) { new RestAdminAccess())); if (!dbWebServer->setupSockets()) { error() << "Failed to set up sockets for HTTP interface during startup."; - return; + return EXIT_NET_ERROR; } } @@ -674,7 +614,7 @@ static void _initAndListen(int listenPort) { } if (mmapv1GlobalOptions.journalOptions & MMAPV1Options::JournalRecoverOnly) - return; + return EXIT_NET_ERROR; if (mongodGlobalParams.scriptingEnabled) { ScriptEngine::setup(); @@ -799,14 +739,19 @@ static void _initAndListen(int listenPort) { // MessageServer::run will return when exit code closes its socket and we don't need the // operation context anymore startupOpCtx.reset(); - server->run(); + + auto start = getGlobalServiceContext()->addAndStartTransportLayer(std::move(transportLayer)); + if (!start.isOK()) { + error() << "Failed to start the listener: " << start.toString(); + return EXIT_NET_ERROR; + } + + return waitForShutdown(); } ExitCode initAndListen(int listenPort) { try { - _initAndListen(listenPort); - - return inShutdown() ? EXIT_CLEAN : EXIT_NET_ERROR; + return _initAndListen(listenPort); } catch (DBException& e) { log() << "exception in initAndListen: " << e.toString() << ", terminating" << endl; return EXIT_UNCAUGHT; @@ -1001,7 +946,6 @@ static void reportEventToSystemImpl(const char* msg) { // registerShutdownTask is called below. It must not depend on the // prior execution of mongo initializers or the existence of threads. static void shutdownTask() { - auto serviceContext = getGlobalServiceContext(); Client::initThreadIfNotAlready(); @@ -1014,6 +958,7 @@ static void shutdownTask() { txn = uniqueTxn.get(); } + getGlobalServiceContext()->getTransportLayer()->shutdown(); log(LogComponent::kNetwork) << "shutdown: going to close listening sockets..." << endl; ListeningSockets::get()->closeAll(); @@ -1037,12 +982,6 @@ static void shutdownTask() { // will do it for us when the process terminates. stdx::packaged_task<void()> dryOutTask([] { - - // Walk all open sockets and close them. This should unblock any that are sitting in - // recv. Other sockets on which there are active operations should see the inShutdown flag - // as true when they complete the operation. - Listener::closeMessagingPorts(0); - // There isn't currently a way to wait on the TicketHolder to have all its tickets back, // unfortunately. So, busy wait in this detached thread. while (true) { diff --git a/src/mongo/db/dbmessage.cpp b/src/mongo/db/dbmessage.cpp index 67a5548dbfb..f16ec2b6276 100644 --- a/src/mongo/db/dbmessage.cpp +++ b/src/mongo/db/dbmessage.cpp @@ -30,7 +30,9 @@ #include "mongo/platform/basic.h" #include "mongo/db/dbmessage.h" +#include "mongo/db/operation_context.h" #include "mongo/platform/strnlen.h" +#include "mongo/transport/session.h" namespace mongo { @@ -173,20 +175,23 @@ OpQueryReplyBuilder::OpQueryReplyBuilder() : _buffer(32768) { _buffer.skip(sizeof(QueryResult::Value)); } -void OpQueryReplyBuilder::send(AbstractMessagingPort* destination, +void OpQueryReplyBuilder::send(transport::Session* session, int queryResultFlags, - Message& requestMsg, + const Message& requestMsg, int nReturned, int startingFrom, long long cursorId) { Message response; putInMessage(&response, queryResultFlags, nReturned, startingFrom, cursorId); - destination->reply(requestMsg, response, requestMsg.header().getId()); + + response.header().setId(nextMessageId()); + response.header().setResponseToMsgId(requestMsg.header().getId()); + + uassertStatusOK(session->sinkMessage(response).wait()); } -void OpQueryReplyBuilder::sendCommandReply(AbstractMessagingPort* destination, - Message& requestMsg) { - send(destination, /*queryFlags*/ 0, requestMsg, /*nReturned*/ 1); +void OpQueryReplyBuilder::sendCommandReply(transport::Session* session, const Message& requestMsg) { + send(session, /*queryFlags*/ 0, requestMsg, /*nReturned*/ 1); } void OpQueryReplyBuilder::putInMessage( @@ -202,7 +207,7 @@ void OpQueryReplyBuilder::putInMessage( } void replyToQuery(int queryResultFlags, - AbstractMessagingPort* p, + transport::Session* session, Message& requestMsg, const void* data, int size, @@ -211,15 +216,19 @@ void replyToQuery(int queryResultFlags, long long cursorId) { OpQueryReplyBuilder reply; reply.bufBuilderForResults().appendBuf(data, size); - reply.send(p, queryResultFlags, requestMsg, nReturned, startingFrom, cursorId); + reply.send(session, queryResultFlags, requestMsg, nReturned, startingFrom, cursorId); } void replyToQuery(int queryResultFlags, - AbstractMessagingPort* p, + transport::Session* session, Message& requestMsg, const BSONObj& responseObj) { - replyToQuery( - queryResultFlags, p, requestMsg, (void*)responseObj.objdata(), responseObj.objsize(), 1); + replyToQuery(queryResultFlags, + session, + requestMsg, + (void*)responseObj.objdata(), + responseObj.objsize(), + 1); } void replyToQuery(int queryResultFlags, Message& m, DbResponse& dbresponse, BSONObj obj) { diff --git a/src/mongo/db/dbmessage.h b/src/mongo/db/dbmessage.h index 7f1a3a2d0f2..7fabf117421 100644 --- a/src/mongo/db/dbmessage.h +++ b/src/mongo/db/dbmessage.h @@ -39,6 +39,12 @@ namespace mongo { +class OperationContext; + +namespace transport { +class Session; +} // namespace transport + /* db response format Query or GetMore: // see struct QueryResult @@ -352,9 +358,9 @@ public: /** * Finishes the reply and sends the message out to 'destination'. */ - void send(AbstractMessagingPort* destination, + void send(transport::Session* session, int queryResultFlags, - Message& requestMsg, // should be const but MessagePort::reply takes non-const. + const Message& requestMsg, int nReturned, int startingFrom = 0, long long cursorId = 0); @@ -362,14 +368,14 @@ public: /** * Similar to send() but used for replying to a command. */ - void sendCommandReply(AbstractMessagingPort* destination, Message& requestMsg); + void sendCommandReply(transport::Session* session, const Message& requestMsg); private: BufBuilder _buffer; }; void replyToQuery(int queryResultFlags, - AbstractMessagingPort* p, + transport::Session* session, Message& requestMsg, const void* data, int size, @@ -377,10 +383,9 @@ void replyToQuery(int queryResultFlags, int startingFrom = 0, long long cursorId = 0); - /* object reply helper. */ void replyToQuery(int queryResultFlags, - AbstractMessagingPort* p, + transport::Session* session, Message& requestMsg, const BSONObj& responseObj); diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index b6a2981dde0..b92ec300cf8 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -398,6 +398,7 @@ env.Library('repl_coordinator_impl', '$BUILD_DIR/mongo/db/service_context', '$BUILD_DIR/mongo/rpc/command_status', '$BUILD_DIR/mongo/rpc/metadata', + '$BUILD_DIR/mongo/transport/transport_layer_common', '$BUILD_DIR/mongo/util/fail_point', 'collection_cloner', 'data_replicator', @@ -627,6 +628,7 @@ env.Library('replica_set_messages', '$BUILD_DIR/mongo/client/connection_string', '$BUILD_DIR/mongo/db/common', '$BUILD_DIR/mongo/db/server_options_core', + '$BUILD_DIR/mongo/transport/transport_layer_common', '$BUILD_DIR/mongo/util/net/hostandport', 'optime', 'read_concern_args', diff --git a/src/mongo/db/repl/repl_set_request_votes.cpp b/src/mongo/db/repl/repl_set_request_votes.cpp index 4b34c351769..c01628466c0 100644 --- a/src/mongo/db/repl/repl_set_request_votes.cpp +++ b/src/mongo/db/repl/repl_set_request_votes.cpp @@ -35,6 +35,8 @@ #include "mongo/db/repl/repl_set_request_votes_args.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/executor/network_interface.h" +#include "mongo/transport/session.h" +#include "mongo/transport/transport_layer.h" #include "mongo/util/scopeguard.h" namespace mongo { @@ -64,16 +66,17 @@ private: // We want to keep request vote connection open when relinquishing primary. // Tag it here. - unsigned originalTag = 0; - AbstractMessagingPort* mp = txn->getClient()->port(); - if (mp) { - originalTag = mp->getTag(); - mp->setTag(originalTag | executor::NetworkInterface::kMessagingPortKeepOpen); + transport::Session::TagMask originalTag = 0; + transport::Session* session = txn->getClient()->session(); + if (session) { + originalTag = session->getTags(); + session->replaceTags(originalTag | transport::Session::kKeepOpen); } + // Untag the connection on exit. - ON_BLOCK_EXIT([mp, originalTag]() { - if (mp) { - mp->setTag(originalTag); + ON_BLOCK_EXIT([session, originalTag]() { + if (session) { + session->replaceTags(originalTag); } }); diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h index 582b8f45970..245b37cca7f 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state.h +++ b/src/mongo/db/repl/replication_coordinator_external_state.h @@ -192,8 +192,8 @@ public: virtual HostAndPort getClientHostAndPort(const OperationContext* txn) = 0; /** - * Closes all connections except those marked with the keepOpen property, which should - * just be connections used for heartbeating. + * Closes all connections in the given TransportLayer except those marked with the + * keepOpen property, which should just be connections used for heartbeating. * This is used during stepdown, and transition out of primary. */ virtual void closeConnections() = 0; diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 81ed9c16bf3..f140580fbcc 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -76,6 +76,8 @@ #include "mongo/stdx/functional.h" #include "mongo/stdx/memory.h" #include "mongo/stdx/thread.h" +#include "mongo/transport/session.h" +#include "mongo/transport/transport_layer.h" #include "mongo/util/assert_util.h" #include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/exit.h" @@ -449,7 +451,7 @@ HostAndPort ReplicationCoordinatorExternalStateImpl::getClientHostAndPort( } void ReplicationCoordinatorExternalStateImpl::closeConnections() { - Listener::closeMessagingPorts(executor::NetworkInterface::kMessagingPortKeepOpen); + getGlobalServiceContext()->getTransportLayer()->endAllSessions(transport::Session::kKeepOpen); } void ReplicationCoordinatorExternalStateImpl::killAllUserOperations(OperationContext* txn) { diff --git a/src/mongo/db/repl/replication_info.cpp b/src/mongo/db/repl/replication_info.cpp index cc844eaf35c..fd36395c017 100644 --- a/src/mongo/db/repl/replication_info.cpp +++ b/src/mongo/db/repl/replication_info.cpp @@ -234,9 +234,10 @@ public: // Tag connections to avoid closing them on stepdown. auto hangUpElement = cmdObj["hangUpOnStepDown"]; if (!hangUpElement.eoo() && !hangUpElement.trueValue()) { - AbstractMessagingPort* mp = txn->getClient()->port(); - if (mp) { - mp->setTag(mp->getTag() | executor::NetworkInterface::kMessagingPortKeepOpen); + auto session = txn->getClient()->session(); + if (session) { + session->replaceTags(session->getTags() | + executor::NetworkInterface::kMessagingPortKeepOpen); } } appendReplicationInfo(txn, result, 0); diff --git a/src/mongo/db/repl/replset_commands.cpp b/src/mongo/db/repl/replset_commands.cpp index 1b96dac313f..f591d64c11b 100644 --- a/src/mongo/db/repl/replset_commands.cpp +++ b/src/mongo/db/repl/replset_commands.cpp @@ -59,6 +59,8 @@ #include "mongo/db/service_context.h" #include "mongo/db/storage/storage_engine.h" #include "mongo/executor/network_interface.h" +#include "mongo/transport/session.h" +#include "mongo/transport/transport_layer.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" @@ -726,17 +728,17 @@ public: /* we want to keep heartbeat connections open when relinquishing primary. tag them here. */ - AbstractMessagingPort* mp = txn->getClient()->port(); - unsigned originalTag = 0; - if (mp) { - originalTag = mp->getTag(); - mp->setTag(originalTag | executor::NetworkInterface::kMessagingPortKeepOpen); + transport::Session::TagMask originalTag = 0; + transport::Session* session = txn->getClient()->session(); + if (session) { + originalTag = session->getTags(); + session->replaceTags(originalTag | transport::Session::kKeepOpen); } // Unset the tag on block exit - ON_BLOCK_EXIT([mp, originalTag]() { - if (mp) { - mp->setTag(originalTag); + ON_BLOCK_EXIT([session, originalTag]() { + if (session) { + session->replaceTags(originalTag); } }); diff --git a/src/mongo/db/s/collection_metadata_test.cpp b/src/mongo/db/s/collection_metadata_test.cpp index 013c135cc84..359cb3b4fc1 100644 --- a/src/mongo/db/s/collection_metadata_test.cpp +++ b/src/mongo/db/s/collection_metadata_test.cpp @@ -50,7 +50,7 @@ class NoChunkFixture : public ShardingCatalogTestFixture { protected: void setUp() { ShardingCatalogTestFixture::setUp(); - getMessagingPort()->setRemote(HostAndPort("FakeRemoteClient:34567")); + setRemote(HostAndPort("FakeRemoteClient:34567")); configTargeter()->setFindHostReturnValue(configHost); OID epoch = OID::gen(); @@ -318,7 +318,7 @@ class SingleChunkFixture : public ShardingCatalogTestFixture { protected: void setUp() { ShardingCatalogTestFixture::setUp(); - getMessagingPort()->setRemote(HostAndPort("FakeRemoteClient:34567")); + setRemote(HostAndPort("FakeRemoteClient:34567")); configTargeter()->setFindHostReturnValue(configHost); OID epoch = OID::gen(); @@ -574,7 +574,7 @@ class SingleChunkMinMaxCompoundKeyFixture : public ShardingCatalogTestFixture { protected: void setUp() { ShardingCatalogTestFixture::setUp(); - getMessagingPort()->setRemote(HostAndPort("FakeRemoteClient:34567")); + setRemote(HostAndPort("FakeRemoteClient:34567")); configTargeter()->setFindHostReturnValue(configHost); OID epoch = OID::gen(); @@ -642,7 +642,7 @@ class TwoChunksWithGapCompoundKeyFixture : public ShardingCatalogTestFixture { protected: void setUp() { ShardingCatalogTestFixture::setUp(); - getMessagingPort()->setRemote(HostAndPort("FakeRemoteClient:34567")); + setRemote(HostAndPort("FakeRemoteClient:34567")); configTargeter()->setFindHostReturnValue(configHost); OID epoch = OID::gen(); @@ -839,7 +839,7 @@ class ThreeChunkWithRangeGapFixture : public ShardingCatalogTestFixture { protected: void setUp() { ShardingCatalogTestFixture::setUp(); - getMessagingPort()->setRemote(HostAndPort("FakeRemoteClient:34567")); + setRemote(HostAndPort("FakeRemoteClient:34567")); configTargeter()->setFindHostReturnValue(configHost); OID epoch = OID::gen(); diff --git a/src/mongo/db/s/metadata_loader_test.cpp b/src/mongo/db/s/metadata_loader_test.cpp index c96b77a1590..e33e3142d1a 100644 --- a/src/mongo/db/s/metadata_loader_test.cpp +++ b/src/mongo/db/s/metadata_loader_test.cpp @@ -51,7 +51,7 @@ public: protected: void setUp() override { ShardingCatalogTestFixture::setUp(); - getMessagingPort()->setRemote(HostAndPort("FakeRemoteClient:34567")); + setRemote(HostAndPort("FakeRemoteClient:34567")); configTargeter()->setFindHostReturnValue(configHost); _maxCollVersion = ChunkVersion(1, 0, OID::gen()); _loader.reset(new MetadataLoader); diff --git a/src/mongo/db/service_context.cpp b/src/mongo/db/service_context.cpp index 03dc9bcaf8d..648fd546721 100644 --- a/src/mongo/db/service_context.cpp +++ b/src/mongo/db/service_context.cpp @@ -34,6 +34,9 @@ #include "mongo/db/client.h" #include "mongo/db/operation_context.h" #include "mongo/stdx/memory.h" +#include "mongo/transport/session.h" +#include "mongo/transport/transport_layer.h" +#include "mongo/transport/transport_layer_manager.h" #include "mongo/util/assert_util.h" #include "mongo/util/mongoutils/str.h" #include "mongo/util/system_clock_source.h" @@ -115,7 +118,8 @@ Status validateStorageOptions( } ServiceContext::ServiceContext() - : _tickSource(stdx::make_unique<SystemTickSource>()), + : _transportLayerManager(stdx::make_unique<transport::TransportLayerManager>()), + _tickSource(stdx::make_unique<SystemTickSource>()), _fastClockSource(stdx::make_unique<SystemClockSource>()), _preciseClockSource(stdx::make_unique<SystemClockSource>()) {} @@ -125,8 +129,8 @@ ServiceContext::~ServiceContext() { } ServiceContext::UniqueClient ServiceContext::makeClient(std::string desc, - AbstractMessagingPort* p) { - std::unique_ptr<Client> client(new Client(std::move(desc), this, p)); + transport::Session* session) { + std::unique_ptr<Client> client(new Client(std::move(desc), this, session)); auto observer = _clientObservers.cbegin(); try { for (; observer != _clientObservers.cend(); ++observer) { @@ -150,6 +154,14 @@ ServiceContext::UniqueClient ServiceContext::makeClient(std::string desc, return UniqueClient(client.release()); } +transport::TransportLayer* ServiceContext::getTransportLayer() const { + return _transportLayerManager.get(); +} + +Status ServiceContext::addAndStartTransportLayer(std::unique_ptr<transport::TransportLayer> tl) { + return _transportLayerManager->addAndStartTransportLayer(std::move(tl)); +} + TickSource* ServiceContext::getTickSource() const { return _tickSource.get(); } diff --git a/src/mongo/db/service_context.h b/src/mongo/db/service_context.h index fe6a4432d70..174a52ebe52 100644 --- a/src/mongo/db/service_context.h +++ b/src/mongo/db/service_context.h @@ -48,6 +48,12 @@ class Client; class OperationContext; class OpObserver; +namespace transport { +class Session; +class TransportLayer; +class TransportLayerManager; +} // namespace transport + /** * Classes that implement this interface can receive notification on killOp. * @@ -207,9 +213,9 @@ public: * * The "desc" string is used to set a descriptive name for the client, used in logging. * - * If supplied, "p" is the communication channel used for communicating with the client. + * If supplied, "session" is the transport::Session used for communicating with the client. */ - UniqueClient makeClient(std::string desc, AbstractMessagingPort* p = nullptr); + UniqueClient makeClient(std::string desc, transport::Session* session = nullptr); /** * Creates a new OperationContext on "client". @@ -299,6 +305,26 @@ public: void registerKillOpListener(KillOpListenerInterface* listener); // + // Transport. + // + + /** + * Get the master TransportLayer. Routes to all other TransportLayers that + * may be in use within this service. + * + * See TransportLayerManager for more details. + */ + transport::TransportLayer* getTransportLayer() const; + + /** + * Add a new TransportLayer to this service context. The new TransportLayer will + * be added to the TransportLayerManager accessible via getTransportLayer(). + * + * It additionally calls start() on the TransportLayer after adding it. + */ + Status addAndStartTransportLayer(std::unique_ptr<transport::TransportLayer> tl); + + // // Global OpObserver. // @@ -370,6 +396,11 @@ private: /** + * The TransportLayerManager. + */ + std::unique_ptr<transport::TransportLayerManager> _transportLayerManager; + + /** * Vector of registered observers. */ std::vector<std::unique_ptr<ClientObserver>> _clientObservers; diff --git a/src/mongo/db/service_entry_point_mongod.cpp b/src/mongo/db/service_entry_point_mongod.cpp new file mode 100644 index 00000000000..86f782a0bad --- /dev/null +++ b/src/mongo/db/service_entry_point_mongod.cpp @@ -0,0 +1,153 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork + +#include "mongo/platform/basic.h" + +#include "mongo/db/service_entry_point_mongod.h" + +#include <vector> + +#include "mongo/db/client.h" +#include "mongo/db/dbmessage.h" +#include "mongo/db/instance.h" +#include "mongo/stdx/thread.h" +#include "mongo/transport/service_entry_point_utils.h" +#include "mongo/transport/session.h" +#include "mongo/transport/ticket.h" +#include "mongo/transport/transport_layer.h" +#include "mongo/util/exit.h" +#include "mongo/util/log.h" +#include "mongo/util/net/message.h" +#include "mongo/util/net/socket_exception.h" +#include "mongo/util/net/thread_idle_callback.h" +#include "mongo/util/quick_exit.h" + +namespace mongo { +namespace { + +// Set up proper headers for formatting an exhaust request, if we need to +bool setExhaustMessage(Message* m, const DbResponse& dbresponse) { + MsgData::View header = dbresponse.response.header(); + QueryResult::View qr = header.view2ptr(); + long long cursorid = qr.getCursorId(); + + if (!cursorid) { + return false; + } + + verify(dbresponse.exhaustNS.size() && dbresponse.exhaustNS[0]); + + auto ns = dbresponse.exhaustNS; // reset() will free this + + m->reset(); + + BufBuilder b(512); + b.appendNum(static_cast<int>(0) /* size set later in appendData() */); + b.appendNum(header.getId()); + b.appendNum(header.getResponseToMsgId()); + b.appendNum(static_cast<int>(dbGetMore)); + b.appendNum(static_cast<int>(0)); + b.appendStr(ns); + b.appendNum(static_cast<int>(0)); // ntoreturn + b.appendNum(cursorid); + + MsgData::View(b.buf()).setLen(b.len()); + m->setData(b.release()); + + return true; +} + +} // namespace + +using transport::Session; +using transport::TransportLayer; + +ServiceEntryPointMongod::ServiceEntryPointMongod(TransportLayer* tl) : _tl(tl) {} + +void ServiceEntryPointMongod::startSession(Session&& session) { + launchWrappedServiceEntryWorkerThread(std::move(session), + [this](Session* session) { _sessionLoop(session); }); +} + +void ServiceEntryPointMongod::_sessionLoop(Session* session) { + Message inMessage; + bool inExhaust = false; + int64_t counter = 0; + + while (true) { + // 1. Source a Message from the client (unless we are exhausting) + if (!inExhaust) { + inMessage.reset(); + auto status = session->sourceMessage(&inMessage).wait(); + + if (ErrorCodes::isInterruption(status.code())) { + break; + } + + uassertStatusOK(status); + } + + // 2. Pass sourced Message up to mongod + DbResponse dbresponse; + { + auto opCtx = cc().makeOperationContext(); + assembleResponse(opCtx.get(), inMessage, dbresponse, session->remote()); + + // opCtx must go out of scope here so that the operation cannot show + // up in currentOp results after the response reaches the client + } + + // 3. Format our response, if we have one + Message& toSink = dbresponse.response; + if (!toSink.empty()) { + toSink.header().setId(nextMessageId()); + toSink.header().setResponseToMsgId(inMessage.header().getId()); + + // If this is an exhaust cursor, don't source more Messages + if (dbresponse.exhaustNS.size() > 0 && setExhaustMessage(&inMessage, dbresponse)) { + log() << "we are in exhaust"; + inExhaust = true; + } else { + inExhaust = false; + } + + // 4. Sink our response to the client + uassertStatusOK(session->sinkMessage(toSink).wait()); + } else { + inExhaust = false; + } + + if ((counter++ & 0xf) == 0) { + markThreadIdle(); + } + } +} + +} // namespace mongo diff --git a/src/mongo/db/service_entry_point_mongod.h b/src/mongo/db/service_entry_point_mongod.h new file mode 100644 index 00000000000..31ed05098c8 --- /dev/null +++ b/src/mongo/db/service_entry_point_mongod.h @@ -0,0 +1,63 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <vector> + +#include "mongo/base/disallow_copying.h" +#include "mongo/transport/service_entry_point.h" + +namespace mongo { + +namespace transport { +class Session; +class TransportLayer; +} // namespace transport + +/** + * The entry point from the TransportLayer into Mongod. startSession() spawns and + * detaches a new thread for each incoming connection (transport::Session). + */ +class ServiceEntryPointMongod final : public ServiceEntryPoint { + MONGO_DISALLOW_COPYING(ServiceEntryPointMongod); + +public: + explicit ServiceEntryPointMongod(transport::TransportLayer* tl); + + virtual ~ServiceEntryPointMongod() = default; + + void startSession(transport::Session&& session) override; + +private: + void _sessionLoop(transport::Session* session); + + transport::TransportLayer* _tl; +}; + +} // namespace mongo diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index 1f0411b13cf..8b0b8321aaa 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -98,6 +98,7 @@ env.Library( '$BUILD_DIR/mongo/s/catalog/replset/sharding_catalog_client_impl', '$BUILD_DIR/mongo/s/catalog/replset/sharding_catalog_manager_impl', '$BUILD_DIR/mongo/s/coreshard', + '$BUILD_DIR/mongo/transport/transport_layer_mock', '$BUILD_DIR/mongo/util/clock_source_mock', '$BUILD_DIR/mongo/util/net/message_port_mock', 'mongoscore', diff --git a/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp b/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp index 53aa481fdbc..ac7d4dd759e 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp +++ b/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp @@ -78,8 +78,7 @@ protected: */ void setUp() override { ShardingCatalogTestFixture::setUp(); - - getMessagingPort()->setRemote(HostAndPort("FakeRemoteClient:34567")); + setRemote(HostAndPort("FakeRemoteClient:34567")); configTargeter()->setConnectionStringReturnValue(_configConnStr); diff --git a/src/mongo/s/catalog/replset/sharding_catalog_drop_coll_test.cpp b/src/mongo/s/catalog/replset/sharding_catalog_drop_coll_test.cpp index b81a5bbc68f..f5609adef83 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_drop_coll_test.cpp +++ b/src/mongo/s/catalog/replset/sharding_catalog_drop_coll_test.cpp @@ -58,8 +58,8 @@ class DropColl2ShardTest : public ShardingCatalogTestFixture { public: void setUp() override { ShardingCatalogTestFixture::setUp(); + setRemote(_clientHost); - getMessagingPort()->setRemote(_clientHost); configTargeter()->setFindHostReturnValue(_configHost); configTargeter()->setConnectionStringReturnValue(_configCS); diff --git a/src/mongo/s/catalog/replset/sharding_catalog_remove_shard_test.cpp b/src/mongo/s/catalog/replset/sharding_catalog_remove_shard_test.cpp index 59cf084d98d..36fcb6eba63 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_remove_shard_test.cpp +++ b/src/mongo/s/catalog/replset/sharding_catalog_remove_shard_test.cpp @@ -124,7 +124,7 @@ TEST_F(RemoveShardTest, RemoveShardCantRemoveLastShard) { TEST_F(RemoveShardTest, RemoveShardStartDraining) { string shardName = "shardToRemove"; const HostAndPort clientHost{"client1:12345"}; - getMessagingPort()->setRemote(clientHost); + setRemote(clientHost); auto future = launchAsync([&] { auto result = assertGet(catalogClient()->removeShard(operationContext(), shardName)); @@ -288,7 +288,7 @@ TEST_F(RemoveShardTest, RemoveShardStillDrainingDatabasesRemaining) { TEST_F(RemoveShardTest, RemoveShardCompletion) { string shardName = "shardToRemove"; const HostAndPort clientHost{"client1:12345"}; - getMessagingPort()->setRemote(clientHost); + setRemote(clientHost); auto future = launchAsync([&] { auto result = assertGet(catalogClient()->removeShard(operationContext(), shardName)); diff --git a/src/mongo/s/catalog/replset/sharding_catalog_shard_collection_test.cpp b/src/mongo/s/catalog/replset/sharding_catalog_shard_collection_test.cpp index d1d36756984..57e35290ced 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_shard_collection_test.cpp +++ b/src/mongo/s/catalog/replset/sharding_catalog_shard_collection_test.cpp @@ -87,7 +87,7 @@ public: ShardingCatalogTestFixture::setUp(); configTargeter()->setFindHostReturnValue(configHost); configTargeter()->setConnectionStringReturnValue(configCS); - getMessagingPort()->setRemote(clientHost); + setRemote(clientHost); } void expectGetDatabase(const DatabaseType& expectedDb) { diff --git a/src/mongo/s/chunk_manager_tests.cpp b/src/mongo/s/chunk_manager_tests.cpp index 280b9921d69..5fe831b50a9 100644 --- a/src/mongo/s/chunk_manager_tests.cpp +++ b/src/mongo/s/chunk_manager_tests.cpp @@ -68,7 +68,7 @@ class ChunkManagerFixture : public ShardingCatalogTestFixture { public: void setUp() override { ShardingCatalogTestFixture::setUp(); - getMessagingPort()->setRemote(HostAndPort("FakeRemoteClient:34567")); + setRemote(HostAndPort("FakeRemoteClient:34567")); configTargeter()->setFindHostReturnValue(configHost); } diff --git a/src/mongo/s/commands/request.cpp b/src/mongo/s/commands/request.cpp index 050483d2969..6605db10424 100644 --- a/src/mongo/s/commands/request.cpp +++ b/src/mongo/s/commands/request.cpp @@ -42,6 +42,7 @@ #include "mongo/s/cluster_last_error_info.h" #include "mongo/s/commands/strategy.h" #include "mongo/s/grid.h" +#include "mongo/transport/session.h" #include "mongo/util/log.h" #include "mongo/util/timer.h" @@ -49,8 +50,8 @@ namespace mongo { using std::string; -Request::Request(Message& m, AbstractMessagingPort* p) - : _clientInfo(&cc()), _m(m), _d(m), _p(p), _id(_m.header().getId()), _didInit(false) { +Request::Request(Message& m) + : _clientInfo(&cc()), _m(m), _d(m), _id(_m.header().getId()), _didInit(false) { ClusterLastErrorInfo::get(_clientInfo).newRequest(); } @@ -122,4 +123,8 @@ void Request::process(OperationContext* txn, int attempt) { << " op: " << op << " attempt: " << attempt; } +transport::Session* Request::session() const { + return _clientInfo->session(); +} + } // namespace mongo diff --git a/src/mongo/s/commands/request.h b/src/mongo/s/commands/request.h index 5664474e845..ace3e77a720 100644 --- a/src/mongo/s/commands/request.h +++ b/src/mongo/s/commands/request.h @@ -37,11 +37,15 @@ namespace mongo { class Client; class OperationContext; +namespace transport { +class Session; +} // namespace transport + class Request { MONGO_DISALLOW_COPYING(Request); public: - Request(Message& m, AbstractMessagingPort* p); + Request(Message& m); const char* getns() const { return _d.getns(); @@ -71,9 +75,8 @@ public: DbMessage& d() { return _d; } - AbstractMessagingPort* p() const { - return _p; - } + + transport::Session* session() const; void process(OperationContext* txn, int attempt = 0); @@ -84,7 +87,6 @@ private: Message& _m; DbMessage _d; - AbstractMessagingPort* const _p; int32_t _id; diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp index fcba8883592..69d02a3e0e9 100644 --- a/src/mongo/s/commands/strategy.cpp +++ b/src/mongo/s/commands/strategy.cpp @@ -176,7 +176,7 @@ void Strategy::queryOp(OperationContext* txn, Request& request) { BSONObj explainObj = explainBuilder.done(); replyToQuery(0, // query result flags - request.p(), + request.session(), request.m(), static_cast<const void*>(explainObj.objdata()), explainObj.objsize(), @@ -202,7 +202,7 @@ void Strategy::queryOp(OperationContext* txn, Request& request) { obj.appendSelfToBufBuilder(reply.bufBuilderForResults()); numResults++; } - reply.send(request.p(), + reply.send(request.session(), 0, // query result flags request.m(), numResults, @@ -265,7 +265,7 @@ void Strategy::clientCommandOp(OperationContext* txn, Request& request) { BSONObjBuilder builder(reply.bufBuilderForResults()); runAgainstRegistered(txn, q.ns, cmdObj, builder, q.queryOptions); } - reply.sendCommandReply(request.p(), request.m()); + reply.sendCommandReply(request.session(), request.m()); return; } catch (const StaleConfigException& e) { if (loops <= 0) @@ -288,7 +288,7 @@ void Strategy::clientCommandOp(OperationContext* txn, Request& request) { BSONObjBuilder builder(reply.bufBuilderForResults()); Command::appendCommandStatus(builder, e.toStatus()); } - reply.sendCommandReply(request.p(), request.m()); + reply.sendCommandReply(request.session(), request.m()); return; } } @@ -325,7 +325,7 @@ bool Strategy::handleSpecialNamespaces(OperationContext* txn, Request& request, } BSONObj x = reply.done(); - replyToQuery(0, request.p(), request.m(), x); + replyToQuery(0, request.session(), request.m(), x); return true; } @@ -369,7 +369,7 @@ void Strategy::getMore(OperationContext* txn, Request& request) { const NamespaceString nss(ns); auto statusGetDb = grid.catalogCache()->getDatabase(txn, nss.db().toString()); if (statusGetDb == ErrorCodes::NamespaceNotFound) { - replyToQuery(ResultFlag_CursorNotFound, request.p(), request.m(), 0, 0, 0); + replyToQuery(ResultFlag_CursorNotFound, request.session(), request.m(), 0, 0, 0); return; } @@ -384,7 +384,7 @@ void Strategy::getMore(OperationContext* txn, Request& request) { auto cursorResponse = ClusterFind::runGetMore(txn, getMoreRequest); if (cursorResponse == ErrorCodes::CursorNotFound) { - replyToQuery(ResultFlag_CursorNotFound, request.p(), request.m(), 0, 0, 0); + replyToQuery(ResultFlag_CursorNotFound, request.session(), request.m(), 0, 0, 0); return; } uassertStatusOK(cursorResponse.getStatus()); @@ -399,7 +399,7 @@ void Strategy::getMore(OperationContext* txn, Request& request) { } replyToQuery(0, - request.p(), + request.session(), request.m(), buffer.buf(), buffer.len(), diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp index b6893c076ea..e2957554ad2 100644 --- a/src/mongo/s/query/async_results_merger_test.cpp +++ b/src/mongo/s/query/async_results_merger_test.cpp @@ -67,8 +67,8 @@ public: void setUp() override { ShardingTestFixture::setUp(); + setRemote(HostAndPort("ClientHost", 12345)); - getMessagingPort()->setRemote(HostAndPort("ClientHost", 12345)); configTargeter()->setFindHostReturnValue(kTestConfigShardHost); std::vector<ShardType> shards; diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp index d4912eead77..f4fda4103ee 100644 --- a/src/mongo/s/server.cpp +++ b/src/mongo/s/server.cpp @@ -77,12 +77,14 @@ #include "mongo/s/mongos_options.h" #include "mongo/s/query/cluster_cursor_cleanup_job.h" #include "mongo/s/query/cluster_cursor_manager.h" +#include "mongo/s/service_entry_point_mongos.h" #include "mongo/s/sharding_egress_metadata_hook_for_mongos.h" #include "mongo/s/sharding_initialization.h" #include "mongo/s/sharding_uptime_reporter.h" #include "mongo/s/version_mongos.h" #include "mongo/stdx/memory.h" #include "mongo/stdx/thread.h" +#include "mongo/transport/transport_layer_legacy.h" #include "mongo/util/admin_access.h" #include "mongo/util/cmdline_utils/censor_cmdline.h" #include "mongo/util/concurrency/thread_name.h" @@ -92,7 +94,6 @@ #include "mongo/util/log.h" #include "mongo/util/net/hostname_canonicalization_worker.h" #include "mongo/util/net/message.h" -#include "mongo/util/net/message_server.h" #include "mongo/util/net/socket_exception.h" #include "mongo/util/net/ssl_manager.h" #include "mongo/util/ntservice.h" @@ -160,58 +161,6 @@ static BSONObj buildErrReply(const DBException& ex) { return errB.obj(); } -class ShardedMessageHandler : public MessageHandler { -public: - virtual ~ShardedMessageHandler() {} - - virtual void connected(AbstractMessagingPort* p) { - Client::initThread("conn", getGlobalServiceContext(), p); - } - - virtual void process(Message& m, AbstractMessagingPort* p) { - verify(p); - Request r(m, p); - auto txn = cc().makeOperationContext(); - - try { - r.init(txn.get()); - r.process(txn.get()); - } catch (const AssertionException& ex) { - LOG(ex.isUserAssertion() ? 1 : 0) << "Assertion failed" - << " while processing " - << networkOpToString(m.operation()) << " op" - << " for " << r.getnsIfPresent() << causedBy(ex); - - if (r.expectResponse()) { - m.header().setId(r.id()); - replyToQuery(ResultFlag_ErrSet, p, m, buildErrReply(ex)); - } - - // We *always* populate the last error for now - LastError::get(cc()).setLastError(ex.getCode(), ex.what()); - } catch (const DBException& ex) { - log() << "Exception thrown" - << " while processing " << networkOpToString(m.operation()) << " op" - << " for " << r.getnsIfPresent() << causedBy(ex); - - if (r.expectResponse()) { - m.header().setId(r.id()); - replyToQuery(ResultFlag_ErrSet, p, m, buildErrReply(ex)); - } - - // We *always* populate the last error for now - LastError::get(cc()).setLastError(ex.getCode(), ex.what()); - } - - // Release connections back to pool, if any still cached - ShardConnection::releaseMyConnections(); - } - - virtual void close() { - Client::destroy(); - } -}; - } // namespace mongo using namespace mongo; @@ -278,6 +227,19 @@ static ExitCode runMongosServer() { _initWireSpec(); + transport::TransportLayerLegacy::Options opts; + opts.port = serverGlobalParams.port; + opts.ipList = serverGlobalParams.bind_ip; + + auto sep = + std::make_shared<ServiceEntryPointMongos>(getGlobalServiceContext()->getTransportLayer()); + + auto transportLayer = stdx::make_unique<transport::TransportLayerLegacy>(opts, sep); + auto res = transportLayer->setup(); + if (!res.isOK()) { + return EXIT_NET_ERROR; + } + // Add sharding hooks to both connection pools - ShardingConnectionHook includes auth hooks globalConnPool.addHook(new ShardingConnectionHookForMongos(false)); shardConnectionPool.addHook(new ShardingConnectionHookForMongos(true)); @@ -352,19 +314,13 @@ static ExitCode runMongosServer() { PeriodicTask::startRunningPeriodicTasks(); - MessageServer::Options opts; - opts.port = serverGlobalParams.port; - opts.ipList = serverGlobalParams.bind_ip; - - auto handler = std::make_shared<ShardedMessageHandler>(); - MessageServer* server = createServer(opts, std::move(handler), getGlobalServiceContext()); - if (!server->setupSockets()) { + auto start = getGlobalServiceContext()->addAndStartTransportLayer(std::move(transportLayer)); + if (!start.isOK()) { return EXIT_NET_ERROR; } - server->run(); - // MessageServer::run will return when exit code closes its socket - return inShutdown() ? EXIT_CLEAN : EXIT_NET_ERROR; + // Block until shutdown. + return waitForShutdown(); } MONGO_INITIALIZER_GENERAL(ForkServer, ("EndStartupOptionHandling"), ("default")) diff --git a/src/mongo/s/service_entry_point_mongos.cpp b/src/mongo/s/service_entry_point_mongos.cpp new file mode 100644 index 00000000000..fe7d92fd1ad --- /dev/null +++ b/src/mongo/s/service_entry_point_mongos.cpp @@ -0,0 +1,141 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork + +#include "mongo/platform/basic.h" + +#include "mongo/s/service_entry_point_mongos.h" + +#include <vector> + +#include "mongo/db/lasterror.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/server_options.h" +#include "mongo/db/service_context.h" +#include "mongo/s/client/shard_connection.h" +#include "mongo/s/commands/request.h" +#include "mongo/stdx/thread.h" +#include "mongo/transport/service_entry_point_utils.h" +#include "mongo/transport/session.h" +#include "mongo/transport/transport_layer.h" +#include "mongo/util/exit.h" +#include "mongo/util/log.h" +#include "mongo/util/net/message.h" +#include "mongo/util/net/socket_exception.h" +#include "mongo/util/net/thread_idle_callback.h" +#include "mongo/util/quick_exit.h" +#include "mongo/util/scopeguard.h" + +namespace mongo { + +namespace { + +BSONObj buildErrReply(const DBException& ex) { + BSONObjBuilder errB; + errB.append("$err", ex.what()); + errB.append("code", ex.getCode()); + if (!ex._shard.empty()) { + errB.append("shard", ex._shard); + } + return errB.obj(); +} + +} // namespace + +using transport::Session; +using transport::TransportLayer; + +ServiceEntryPointMongos::ServiceEntryPointMongos(TransportLayer* tl) : _tl(tl) {} + +void ServiceEntryPointMongos::startSession(Session&& session) { + launchWrappedServiceEntryWorkerThread(std::move(session), + [this](Session* session) { _sessionLoop(session); }); +} + +void ServiceEntryPointMongos::_sessionLoop(Session* session) { + Message message; + int64_t counter = 0; + + while (true) { + // Release any cached egress connections for client back to pool before destroying + auto guard = MakeGuard(ShardConnection::releaseMyConnections); + + message.reset(); + + // 1. Source a Message from the client + { + auto status = session->sourceMessage(&message).wait(); + + if (ErrorCodes::isInterruption(status.code())) { + break; + } + + uassertStatusOK(status); + } + + // 2. Build a sharding request + Request r(message); + auto txn = cc().makeOperationContext(); + + try { + r.init(txn.get()); + r.process(txn.get()); + } catch (const AssertionException& ex) { + LOG(ex.isUserAssertion() ? 1 : 0) << "Assertion failed" + << " while processing " + << networkOpToString(message.operation()) << " op" + << " for " << r.getnsIfPresent() << causedBy(ex); + if (r.expectResponse()) { + message.header().setId(r.id()); + replyToQuery(ResultFlag_ErrSet, session, message, buildErrReply(ex)); + } + + // We *always* populate the last error for now + LastError::get(cc()).setLastError(ex.getCode(), ex.what()); + } catch (const DBException& ex) { + log() << "Exception thrown" + << " while processing " << networkOpToString(message.operation()) << " op" + << " for " << r.getnsIfPresent() << causedBy(ex); + + if (r.expectResponse()) { + message.header().setId(r.id()); + replyToQuery(ResultFlag_ErrSet, session, message, buildErrReply(ex)); + } + + // We *always* populate the last error for now + LastError::get(cc()).setLastError(ex.getCode(), ex.what()); + } + + if ((counter++ & 0xf) == 0) { + markThreadIdle(); + } + } +} + +} // namespace mongo diff --git a/src/mongo/s/service_entry_point_mongos.h b/src/mongo/s/service_entry_point_mongos.h new file mode 100644 index 00000000000..d96540b2b1e --- /dev/null +++ b/src/mongo/s/service_entry_point_mongos.h @@ -0,0 +1,63 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <vector> + +#include "mongo/base/disallow_copying.h" +#include "mongo/transport/service_entry_point.h" + +namespace mongo { + +namespace transport { +class Session; +class TransportLayer; +} // namespace transport + +/** + * The entry point from the TransportLayer into Mongos. startSession() spawns and + * detaches a new thread for each incoming connection (transport::Session). + */ +class ServiceEntryPointMongos final : public ServiceEntryPoint { + MONGO_DISALLOW_COPYING(ServiceEntryPointMongos); + +public: + ServiceEntryPointMongos(transport::TransportLayer* tl); + + virtual ~ServiceEntryPointMongos() = default; + + void startSession(transport::Session&& session) override; + +private: + void _sessionLoop(transport::Session* session); + + transport::TransportLayer* _tl; +}; + +} // namespace mongo diff --git a/src/mongo/s/sharding_test_fixture.cpp b/src/mongo/s/sharding_test_fixture.cpp index 5d8c3e2285e..f099ac2e67b 100644 --- a/src/mongo/s/sharding_test_fixture.cpp +++ b/src/mongo/s/sharding_test_fixture.cpp @@ -65,6 +65,8 @@ #include "mongo/s/write_ops/batched_command_request.h" #include "mongo/s/write_ops/batched_command_response.h" #include "mongo/stdx/memory.h" +#include "mongo/transport/transport_layer.h" +#include "mongo/transport/transport_layer_mock.h" #include "mongo/util/clock_source_mock.h" #include "mongo/util/tick_source_mock.h" @@ -92,8 +94,12 @@ void ShardingTestFixture::setUp() { _service->setFastClockSource(stdx::make_unique<ClockSourceMock>()); _service->setPreciseClockSource(stdx::make_unique<ClockSourceMock>()); _service->setTickSource(stdx::make_unique<TickSourceMock>()); - _messagePort = stdx::make_unique<MessagingPortMock>(); - _client = _service->makeClient("ShardingTestFixture", _messagePort.get()); + auto tlMock = stdx::make_unique<transport::TransportLayerMock>(); + _transportLayer = tlMock.get(); + _service->addAndStartTransportLayer(std::move(tlMock)); + _transportSession = + stdx::make_unique<transport::Session>(HostAndPort{}, HostAndPort{}, _transportLayer); + _client = _service->makeClient("ShardingTestFixture", _transportSession.get()); _opCtx = _client->makeOperationContext(); // Set up executor pool used for most operations. @@ -184,6 +190,7 @@ void ShardingTestFixture::tearDown() { grid.catalogClient(_opCtx.get())->shutDown(_opCtx.get()); grid.clearForUnitTests(); + _transportSession.reset(); _opCtx.reset(); _client.reset(); _service.reset(); @@ -238,10 +245,6 @@ executor::TaskExecutor* ShardingTestFixture::executor() const { return _executor; } -MessagingPortMock* ShardingTestFixture::getMessagingPort() const { - return _messagePort.get(); -} - DistLockManagerMock* ShardingTestFixture::distLock() const { invariant(_distLockManager); return _distLockManager; @@ -508,6 +511,10 @@ void ShardingTestFixture::expectCount(const HostAndPort& configHost, }); } +void ShardingTestFixture::setRemote(const HostAndPort& remote) { + *_transportSession = transport::Session{remote, HostAndPort{}, _transportLayer}; +} + void ShardingTestFixture::checkReadConcern(const BSONObj& cmdObj, const Timestamp& expectedTS, long long expectedTerm) const { diff --git a/src/mongo/s/sharding_test_fixture.h b/src/mongo/s/sharding_test_fixture.h index 2a930bbefc4..9365a18dd66 100644 --- a/src/mongo/s/sharding_test_fixture.h +++ b/src/mongo/s/sharding_test_fixture.h @@ -60,6 +60,10 @@ class NetworkInterfaceMock; class TaskExecutor; } // namespace executor +namespace transport { +class TransportLayerMock; +} // namepsace transport + /** * Sets up the mocked out objects for testing the replica-set backed catalog manager and catalog * client. @@ -97,7 +101,7 @@ protected: executor::TaskExecutor* executor() const; - MessagingPortMock* getMessagingPort() const; + transport::Session* getTransportSession() const; DistLockManagerMock* distLock() const; @@ -196,6 +200,8 @@ protected: void shutdownExecutor(); + void setRemote(const HostAndPort& remote); + /** * Checks that the given command has the expected settings for read after opTime. */ @@ -207,7 +213,8 @@ private: std::unique_ptr<ServiceContext> _service; ServiceContext::UniqueClient _client; ServiceContext::UniqueOperationContext _opCtx; - std::unique_ptr<MessagingPortMock> _messagePort; + transport::TransportLayerMock* _transportLayer; + std::unique_ptr<transport::Session> _transportSession; RemoteCommandTargeterFactoryMock* _targeterFactory; RemoteCommandTargeterMock* _configTargeter; diff --git a/src/mongo/s/write_ops/batch_write_exec_test.cpp b/src/mongo/s/write_ops/batch_write_exec_test.cpp index dea991b80ef..d79e45a1fa9 100644 --- a/src/mongo/s/write_ops/batch_write_exec_test.cpp +++ b/src/mongo/s/write_ops/batch_write_exec_test.cpp @@ -66,7 +66,7 @@ public: void setUp() override { ShardingTestFixture::setUp(); - getMessagingPort()->setRemote(HostAndPort("ClientHost", 12345)); + setRemote(HostAndPort("ClientHost", 12345)); // Set up the RemoteCommandTargeter for the config shard. configTargeter()->setFindHostReturnValue(kTestConfigShardHost); diff --git a/src/mongo/tools/bridge.cpp b/src/mongo/tools/bridge.cpp index 7109c174a99..d37751fe537 100644 --- a/src/mongo/tools/bridge.cpp +++ b/src/mongo/tools/bridge.cpp @@ -320,17 +320,17 @@ public: log() << "Setting random seed: " << mongoBridgeGlobalParams.seed; } - void accepted(AbstractMessagingPort* mp) override final { + void accepted(std::unique_ptr<AbstractMessagingPort> mp) override final { { stdx::lock_guard<stdx::mutex> lk(_portsMutex); if (_inShutdown.load()) { mp->shutdown(); return; } - _ports.insert(mp); + _ports.insert(mp.get()); } - Forwarder f(mp, &_settingsMutex, &_settings, _seedSource.nextInt64()); + Forwarder f(mp.release(), &_settingsMutex, &_settings, _seedSource.nextInt64()); stdx::thread t(f); t.detach(); } diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript index 82045d5c0e8..adedade05a3 100644 --- a/src/mongo/transport/SConscript +++ b/src/mongo/transport/SConscript @@ -13,13 +13,10 @@ env.CppUnitTest( ) env.Library( - target='service_entry_point_test_suite', + target='transport_layer_common', source=[ - 'service_entry_point_test_suite.cpp', - ## move these somewhere better: 'session.cpp', 'ticket.cpp', - 'transport_layer.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/unittest/unittest', @@ -28,10 +25,63 @@ env.Library( ], ) +env.Library( + target='transport_layer_manager', + source=[ + 'transport_layer_manager.cpp', + ], + LIBDEPS=[ + 'transport_layer_common', + ], +) + +env.Library( + target='transport_layer_mock', + source=[ + 'transport_layer_mock.cpp', + ], + LIBDEPS=[ + 'transport_layer_common', + ], +) + +env.Library( + target='transport_layer_legacy', + source=[ + 'transport_layer_legacy.cpp', + ], + LIBDEPS=[ + 'transport_layer_common', + '$BUILD_DIR/mongo/db/service_context', + '$BUILD_DIR/mongo/db/stats/counters', + ], +) + +env.Library( + target='service_entry_point_test_suite', + source=[ + 'service_entry_point_test_suite.cpp', + ], + LIBDEPS=[ + 'transport_layer_common', + '$BUILD_DIR/mongo/unittest/unittest', + ], +) + +env.Library( + target='service_entry_point_utils', + source=[ + 'service_entry_point_utils.cpp', + ], + LIBDEPS=[ + 'transport_layer_common', + ], +) + env.CppUnitTest( target='service_entry_point_mock_test', source=[ - 'service_entry_point_mock.cpp', ## move? + 'service_entry_point_mock.cpp', 'service_entry_point_mock_test.cpp', ], LIBDEPS=[ diff --git a/src/mongo/transport/service_entry_point_mock.cpp b/src/mongo/transport/service_entry_point_mock.cpp index 6cfe3606043..8d4530d7f13 100644 --- a/src/mongo/transport/service_entry_point_mock.cpp +++ b/src/mongo/transport/service_entry_point_mock.cpp @@ -100,16 +100,12 @@ void ServiceEntryPointMock::run(transport::Session&& session) { } // sourceMessage() - auto sourceTicket = _tl->sourceMessage(session, &inMessage); - auto sourceRes = _tl->wait(std::move(sourceTicket)); - if (!sourceRes.isOK()) { + if (!session.sourceMessage(&inMessage).wait().isOK()) { break; } // sinkMessage() - auto sinkTicket = _tl->sinkMessage(session, _outMessage); - auto sinkRes = _tl->wait(std::move(sinkTicket)); - if (!sinkRes.isOK()) { + if (!session.sinkMessage(_outMessage).wait().isOK()) { break; } } diff --git a/src/mongo/transport/service_entry_point_test_suite.cpp b/src/mongo/transport/service_entry_point_test_suite.cpp index e4f93ae3344..e2a9c7f336c 100644 --- a/src/mongo/transport/service_entry_point_test_suite.cpp +++ b/src/mongo/transport/service_entry_point_test_suite.cpp @@ -57,7 +57,6 @@ namespace mongo { using namespace transport; using namespace stdx::placeholders; -using SessionId = Session::SessionId; using TicketCallback = TransportLayer::TicketCallback; namespace { @@ -107,7 +106,7 @@ ServiceEntryPointTestSuite::MockTicket::MockTicket(const Session& session, ServiceEntryPointTestSuite::MockTicket::MockTicket(const Session& session, Date_t expiration) : _sessionId(session.id()), _expiration(expiration) {} -SessionId ServiceEntryPointTestSuite::MockTicket::sessionId() const { +Session::Id ServiceEntryPointTestSuite::MockTicket::sessionId() const { return _sessionId; } @@ -140,20 +139,35 @@ Ticket ServiceEntryPointTestSuite::MockTLHarness::sinkMessage(const Session& ses return _sinkMessage(session, message, expiration); } -Status ServiceEntryPointTestSuite::MockTLHarness::wait(Ticket ticket) { +Status ServiceEntryPointTestSuite::MockTLHarness::wait(Ticket&& ticket) { return _wait(std::move(ticket)); } -void ServiceEntryPointTestSuite::MockTLHarness::asyncWait(Ticket ticket, TicketCallback callback) { +void ServiceEntryPointTestSuite::MockTLHarness::asyncWait(Ticket&& ticket, + TicketCallback callback) { return _asyncWait(std::move(ticket), std::move(callback)); } +std::string ServiceEntryPointTestSuite::MockTLHarness::getX509SubjectName(const Session& session) { + return "mock"; +} + +void ServiceEntryPointTestSuite::MockTLHarness::registerTags(const Session& session) {} + +TransportLayer::Stats ServiceEntryPointTestSuite::MockTLHarness::sessionStats() { + return Stats(); +} + void ServiceEntryPointTestSuite::MockTLHarness::end(const Session& session) { return _end(session); } -void ServiceEntryPointTestSuite::MockTLHarness::endAllSessions() { - return _endAllSessions(); +void ServiceEntryPointTestSuite::MockTLHarness::endAllSessions(Session::TagMask tags) { + return _endAllSessions(tags); +} + +Status ServiceEntryPointTestSuite::MockTLHarness::start() { + return _start(); } void ServiceEntryPointTestSuite::MockTLHarness::shutdown() { @@ -180,13 +194,13 @@ Status ServiceEntryPointTestSuite::MockTLHarness::_waitOnceThenError(transport:: Ticket ServiceEntryPointTestSuite::MockTLHarness::_defaultSource(const Session& s, Message* m, Date_t d) { - return Ticket(stdx::make_unique<ServiceEntryPointTestSuite::MockTicket>(s, m, d)); + return Ticket(this, stdx::make_unique<ServiceEntryPointTestSuite::MockTicket>(s, m, d)); } Ticket ServiceEntryPointTestSuite::MockTLHarness::_defaultSink(const Session& s, const Message&, Date_t d) { - return Ticket(stdx::make_unique<ServiceEntryPointTestSuite::MockTicket>(s, d)); + return Ticket(this, stdx::make_unique<ServiceEntryPointTestSuite::MockTicket>(s, d)); } Ticket ServiceEntryPointTestSuite::MockTLHarness::_sinkThenErrorOnWait(const Session& s, @@ -308,6 +322,7 @@ void ServiceEntryPointTestSuite::interruptingSessionTest() { auto resumeAFuture = resumeA.get_future(); stdx::promise<void> testComplete; + auto testFuture = testComplete.get_future(); _tl->_resetHooks(); @@ -376,10 +391,11 @@ void ServiceEntryPointTestSuite::burstStressTest(int numSessions, Milliseconds delay) { AtomicWord<int> ended{0}; stdx::promise<void> allSessionsComplete; + auto allCompleteFuture = allSessionsComplete.get_future(); stdx::mutex cyclesLock; - std::unordered_map<SessionId, int> completedCycles; + std::unordered_map<Session::Id, int> completedCycles; _tl->_resetHooks(); diff --git a/src/mongo/transport/service_entry_point_test_suite.h b/src/mongo/transport/service_entry_point_test_suite.h index a1999efd873..828fee5fe3c 100644 --- a/src/mongo/transport/service_entry_point_test_suite.h +++ b/src/mongo/transport/service_entry_point_test_suite.h @@ -99,7 +99,7 @@ private: MockTicket(MockTicket&&) = default; MockTicket& operator=(MockTicket&&) = default; - transport::Session::SessionId sessionId() const override; + transport::Session::Id sessionId() const override; Date_t expiration() const override; @@ -107,7 +107,7 @@ private: private: boost::optional<Message*> _message; - SessionId _sessionId; + transport::Session::Id _sessionId; Date_t _expiration; }; @@ -127,10 +127,15 @@ private: const transport::Session& session, const Message& message, Date_t expiration = transport::Ticket::kNoExpirationDate) override; - Status wait(transport::Ticket ticket) override; - void asyncWait(transport::Ticket ticket, TicketCallback callback) override; + Status wait(transport::Ticket&& ticket) override; + void asyncWait(transport::Ticket&& ticket, TicketCallback callback) override; + std::string getX509SubjectName(const transport::Session& session) override; + void registerTags(const transport::Session& session) override; + Stats sessionStats() override; void end(const transport::Session& session) override; - void endAllSessions() override; + void endAllSessions( + transport::Session::TagMask tags = transport::Session::kEmptyTagMask) override; + Status start() override; void shutdown() override; ServiceEntryPointTestSuite::MockTicket* getMockTicket(const transport::Ticket& ticket); @@ -143,7 +148,9 @@ private: stdx::function<Status(transport::Ticket)> _wait; stdx::function<void(transport::Ticket, TicketCallback)> _asyncWait; stdx::function<void(const transport::Session&)> _end; - stdx::function<void(void)> _endAllSessions = [] {}; + stdx::function<void(transport::Session::TagMask tags)> _endAllSessions = + [](transport::Session::TagMask tags) {}; + stdx::function<Status(void)> _start = [] { return Status::OK(); }; stdx::function<void(void)> _shutdown = [] {}; // Pre-set hook methods diff --git a/src/mongo/transport/service_entry_point_utils.cpp b/src/mongo/transport/service_entry_point_utils.cpp new file mode 100644 index 00000000000..efe934967fb --- /dev/null +++ b/src/mongo/transport/service_entry_point_utils.cpp @@ -0,0 +1,150 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault + +#include "mongo/platform/basic.h" + +#include "mongo/transport/service_entry_point_utils.h" + +#include "mongo/db/client.h" +#include "mongo/db/server_options.h" +#include "mongo/stdx/memory.h" +#include "mongo/transport/session.h" +#include "mongo/transport/transport_layer.h" +#include "mongo/util/assert_util.h" +#include "mongo/util/debug_util.h" +#include "mongo/util/log.h" +#include "mongo/util/net/socket_exception.h" +#include "mongo/util/quick_exit.h" + +#ifdef __linux__ // TODO: consider making this ifndef _WIN32 +#include <sys/resource.h> +#endif + +#if !defined(__has_feature) +#define __has_feature(x) 0 +#endif + +namespace mongo { + +namespace { + +struct Context { + Context(transport::Session session, stdx::function<void(transport::Session*)> task) + : session(std::move(session)), task(std::move(task)) {} + + transport::Session session; + stdx::function<void(transport::Session*)> task; +}; + +void* runFunc(void* ptr) { + std::unique_ptr<Context> ctx(static_cast<Context*>(ptr)); + + auto tl = ctx->session.getTransportLayer(); + Client::initThread("conn", &ctx->session); + setThreadName(std::string(str::stream() << "conn" << ctx->session.id())); + + try { + ctx->task(&ctx->session); + } catch (const AssertionException& e) { + log() << "AssertionException handling request, closing client connection: " << e; + } catch (const SocketException& e) { + log() << "SocketException handling request, closing client connection: " << e; + } catch (const DBException& e) { + // must be right above std::exception to avoid catching subclasses + log() << "DBException handling request, closing client connection: " << e; + } catch (const std::exception& e) { + error() << "Uncaught std::exception: " << e.what() << ", terminating"; + quickExit(EXIT_UNCAUGHT); + } + + tl->end(ctx->session); + + if (!serverGlobalParams.quiet) { + auto conns = tl->sessionStats().numOpenSessions; + const char* word = (conns == 1 ? " connection" : " connections"); + log() << "end connection " << ctx->session.remote() << " (" << conns << word + << " now open)"; + } + + Client::destroy(); + + return nullptr; +} +} // namespace + +void launchWrappedServiceEntryWorkerThread(transport::Session&& session, + stdx::function<void(transport::Session*)> task) { + auto ctx = stdx::make_unique<Context>(std::move(session), std::move(task)); + + try { +#ifndef __linux__ // TODO: consider making this ifdef _WIN32 + stdx::thread(stdx::bind(runFunc, ctx.get())).detach(); + ctx.release(); +#else + pthread_attr_t attrs; + pthread_attr_init(&attrs); + pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED); + + static const size_t STACK_SIZE = + 1024 * 1024; // if we change this we need to update the warning + + struct rlimit limits; + invariant(getrlimit(RLIMIT_STACK, &limits) == 0); + if (limits.rlim_cur > STACK_SIZE) { + size_t stackSizeToSet = STACK_SIZE; +#if !__has_feature(address_sanitizer) + if (kDebugBuild) + stackSizeToSet /= 2; +#endif + pthread_attr_setstacksize(&attrs, stackSizeToSet); + } else if (limits.rlim_cur < 1024 * 1024) { + warning() << "Stack size set to " << (limits.rlim_cur / 1024) << "KB. We suggest 1MB"; + } + + + pthread_t thread; + int failed = pthread_create(&thread, &attrs, runFunc, ctx.get()); + + pthread_attr_destroy(&attrs); + + if (failed) { + log() << "pthread_create failed: " << errnoWithDescription(failed); + throw std::system_error( + std::make_error_code(std::errc::resource_unavailable_try_again)); + } + ctx.release(); +#endif // __linux__ + + } catch (...) { + log() << "failed to create service entry worker thread for " << ctx->session.remote(); + } +} + +} // namespace mongo diff --git a/src/mongo/transport/transport_layer.cpp b/src/mongo/transport/service_entry_point_utils.h index 20030e64661..42be4079f9c 100644 --- a/src/mongo/transport/transport_layer.cpp +++ b/src/mongo/transport/service_entry_point_utils.h @@ -26,19 +26,17 @@ * it in the license file. */ -#include "mongo/platform/basic.h" +#pragma once -#include "mongo/transport/transport_layer.h" +#include "mongo/stdx/functional.h" namespace mongo { -namespace transport { -TransportLayer::TransportLayer() = default; -TransportLayer::~TransportLayer() = default; +namespace transport { +class Session; +} // namespace transport -TicketImpl* TransportLayer::getTicketImpl(const Ticket& ticket) { - return ticket.impl(); -} +void launchWrappedServiceEntryWorkerThread(transport::Session&& session, + stdx::function<void(transport::Session*)> task); -} // namespace transport } // namespace mongo diff --git a/src/mongo/transport/session.cpp b/src/mongo/transport/session.cpp index b9f382a6965..ebf237f8bd4 100644 --- a/src/mongo/transport/session.cpp +++ b/src/mongo/transport/session.cpp @@ -43,11 +43,14 @@ AtomicUInt64 sessionIdCounter(0); } // namespace Session::Session(HostAndPort remote, HostAndPort local, TransportLayer* tl) - : _id(sessionIdCounter.addAndFetch(1)), _remote(remote), _local(local), _tl(tl) {} + : _id(sessionIdCounter.addAndFetch(1)), + _remote(std::move(remote)), + _local(std::move(local)), + _tags(kEmptyTagMask), + _tl(tl) {} Session::~Session() { if (_tl != nullptr) { - invariant(_tl); _tl->end(*this); } } @@ -75,16 +78,21 @@ Session& Session::operator=(Session&& other) { return *this; } -Session::SessionId Session::id() const { - return _id; +void Session::replaceTags(TagMask tags) { + _tags = tags; + _tl->registerTags(*this); } -const HostAndPort& Session::remote() const { - return _remote; +Ticket Session::sourceMessage(Message* message, Date_t expiration) { + return _tl->sourceMessage(*this, message, expiration); } -const HostAndPort& Session::local() const { - return _local; +Ticket Session::sinkMessage(const Message& message, Date_t expiration) { + return _tl->sinkMessage(*this, message, expiration); +} + +std::string Session::getX509SubjectName() const { + return _tl->getX509SubjectName(*this); } } // namespace transport diff --git a/src/mongo/transport/session.h b/src/mongo/transport/session.h index 4692f22437d..b340b0fecf8 100644 --- a/src/mongo/transport/session.h +++ b/src/mongo/transport/session.h @@ -29,7 +29,10 @@ #pragma once #include "mongo/base/disallow_copying.h" +#include "mongo/transport/session_id.h" +#include "mongo/transport/ticket.h" #include "mongo/util/net/hostandport.h" +#include "mongo/util/net/message.h" namespace mongo { namespace transport { @@ -47,7 +50,14 @@ public: /** * Type to indicate the internal id for this session. */ - using SessionId = uint64_t; + using Id = SessionId; + + /** + * Tags for groups of connections. + */ + using TagMask = uint32_t; + static constexpr TagMask kEmptyTagMask = 0; + static constexpr TagMask kKeepOpen = 1; /** * Construct a new session. @@ -68,24 +78,72 @@ public: /** * Return the id for this session. */ - SessionId id() const; + Id id() const { + return _id; + } /** * Return the remote host for this session. */ - const HostAndPort& remote() const; + const HostAndPort& remote() const { + return _remote; + } /** * Return the local host information for this session. */ - const HostAndPort& local() const; + const HostAndPort& local() const { + return _local; + } + + /** + * Return the X509 subject name for this connection (SSL only). + */ + std::string getX509SubjectName() const; + + /** + * Set this session's tags. This Session will register + * its new tags with its TransportLayer. + */ + void replaceTags(TagMask tags); + + /** + * Get this session's tags. + */ + TagMask getTags() const { + return _tags; + } + + /** + * Source (receive) a new Message for this Session. + * + * This method will forward to sourceMessage on this Session's transport layer. + */ + Ticket sourceMessage(Message* message, Date_t expiration = Ticket::kNoExpirationDate); + + /** + * Sink (send) a new Message for this Session. This method should be used + * to send replies to a given host. + * + * This method will forward to sinkMessage on this Session's transport layer. + */ + Ticket sinkMessage(const Message& message, Date_t expiration = Ticket::kNoExpirationDate); + + /** + * The TransportLayer for this Session. + */ + TransportLayer* getTransportLayer() const { + return _tl; + } private: - SessionId _id; + Id _id; HostAndPort _remote; HostAndPort _local; + TagMask _tags; + TransportLayer* _tl; }; diff --git a/src/mongo/transport/session_id.h b/src/mongo/transport/session_id.h new file mode 100644 index 00000000000..b1395ef1429 --- /dev/null +++ b/src/mongo/transport/session_id.h @@ -0,0 +1,39 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <cstdint> + +namespace mongo { +namespace transport { + +using SessionId = uint64_t; + +} // namespace transport +} // namespace mongo diff --git a/src/mongo/transport/ticket.cpp b/src/mongo/transport/ticket.cpp index 76bce1dc51e..77a290087dd 100644 --- a/src/mongo/transport/ticket.cpp +++ b/src/mongo/transport/ticket.cpp @@ -30,30 +30,27 @@ #include "mongo/transport/ticket.h" #include "mongo/transport/ticket_impl.h" +#include "mongo/transport/transport_layer.h" namespace mongo { namespace transport { const Date_t Ticket::kNoExpirationDate{Date_t::max()}; -Ticket::Ticket(std::unique_ptr<TicketImpl> ticket) : _ticket(std::move(ticket)) {} +Ticket::Ticket(TransportLayer* tl, std::unique_ptr<TicketImpl> ticket) + : _tl(tl), _ticket(std::move(ticket)) {} Ticket::~Ticket() = default; Ticket::Ticket(Ticket&&) = default; Ticket& Ticket::operator=(Ticket&&) = default; -Session::SessionId Ticket::sessionId() const { - return _ticket->sessionId(); +Status Ticket::wait()&& { + return _tl->wait(std::move(*this)); } -Date_t Ticket::expiration() const { - return _ticket->expiration(); -} - -// TODO should this actually be const? -TicketImpl* Ticket::impl() const { - return _ticket.get(); +void Ticket::asyncWait(TicketCallback cb)&& { + return _tl->asyncWait(std::move(*this), std::move(cb)); } } // namespace transport diff --git a/src/mongo/transport/ticket.h b/src/mongo/transport/ticket.h index 875a66b3602..dd8927085aa 100644 --- a/src/mongo/transport/ticket.h +++ b/src/mongo/transport/ticket.h @@ -28,14 +28,18 @@ #pragma once +#include <memory> + #include "mongo/base/disallow_copying.h" -#include "mongo/transport/session.h" +#include "mongo/stdx/functional.h" +#include "mongo/transport/session_id.h" +#include "mongo/transport/ticket_impl.h" #include "mongo/util/time_support.h" namespace mongo { namespace transport { -class TicketImpl; +class TransportLayer; /** * A Ticket represents some work to be done within the TransportLayer. @@ -46,16 +50,16 @@ class Ticket { MONGO_DISALLOW_COPYING(Ticket); public: - friend class TransportLayer; + using TicketCallback = stdx::function<void(Status)>; - using SessionId = Session::SessionId; + friend class TransportLayer; /** * Indicates that there is no expiration time by when a ticket needs to complete. */ static const Date_t kNoExpirationDate; - Ticket(std::unique_ptr<TicketImpl> ticket); + Ticket(TransportLayer* tl, std::unique_ptr<TicketImpl> ticket); ~Ticket(); /** @@ -67,20 +71,41 @@ public: /** * Return this ticket's session id. */ - SessionId sessionId() const; + SessionId sessionId() const { + return _ticket->sessionId(); + } /** * Return this ticket's expiration date. */ - Date_t expiration() const; + Date_t expiration() const { + return _ticket->expiration(); + } + + /** + * Wait for this ticket to be filled. + * + * This is this-rvalue qualified because it consumes the ticket + */ + Status wait() &&; + + /** + * Asynchronously wait for this ticket to be filled. + * + * This is this-rvalue qualified because it consumes the ticket + */ + void asyncWait(TicketCallback cb) &&; protected: /** * Return a non-owning pointer to the underlying TicketImpl type */ - TicketImpl* impl() const; + TicketImpl* impl() const { + return _ticket.get(); + } private: + TransportLayer* _tl; std::unique_ptr<TicketImpl> _ticket; }; diff --git a/src/mongo/transport/ticket_impl.h b/src/mongo/transport/ticket_impl.h index 378f9fb2999..4cbbc0a22d0 100644 --- a/src/mongo/transport/ticket_impl.h +++ b/src/mongo/transport/ticket_impl.h @@ -28,7 +28,8 @@ #pragma once -#include "mongo/transport/session.h" +#include "mongo/base/disallow_copying.h" +#include "mongo/transport/session_id.h" #include "mongo/util/time_support.h" namespace mongo { @@ -43,8 +44,6 @@ class TicketImpl { MONGO_DISALLOW_COPYING(TicketImpl); public: - using SessionId = Session::SessionId; - virtual ~TicketImpl() = default; TicketImpl(TicketImpl&&) = default; diff --git a/src/mongo/transport/transport_layer.h b/src/mongo/transport/transport_layer.h index 3028506c6f0..dde6248eaf5 100644 --- a/src/mongo/transport/transport_layer.h +++ b/src/mongo/transport/transport_layer.h @@ -57,7 +57,30 @@ class TransportLayer { MONGO_DISALLOW_COPYING(TransportLayer); public: - virtual ~TransportLayer(); + /** + * Stats for sessions open in the Transport Layer. + */ + struct Stats { + /** + * Returns the number of sessions currently open in the transport layer. + */ + size_t numOpenSessions = 0; + + /** + * Returns the total number of sessions that have ever been created by this TransportLayer. + */ + size_t numCreatedSessions = 0; + + /** + * Returns the number of available sessions we could still open. Only relevant + * when we are operating under a transport::Session limit (for example, in the + * legacy implementation, we respect a maximum number of connections). If there + * is no session limit, returns std::numeric_limits<int>::max(). + */ + size_t numAvailableSessions = 0; + }; + + virtual ~TransportLayer() = default; /** * Source (receive) a new Message for this Session. @@ -101,7 +124,7 @@ public: * This thread may be used by the TransportLayer to run other Tickets that were * enqueued prior to this call. */ - virtual Status wait(Ticket ticket) = 0; + virtual Status wait(Ticket&& ticket) = 0; /** * Callback for Tickets that are run via asyncWait(). @@ -115,7 +138,27 @@ public: * This thread will not be used by the TransportLayer to perform work. The callback * passed to asyncWait() may be run on any thread. */ - virtual void asyncWait(Ticket ticket, TicketCallback callback) = 0; + virtual void asyncWait(Ticket&& ticket, TicketCallback callback) = 0; + + /** + * Tag this Session within the TransportLayer with the tags currently assigned to the + * Session. If endAllSessions() is called with a matching + * Session::TagMask, this Session will not be ended. + * + * Before calling this method, use Session::replaceTags() to set the desired TagMask. + */ + virtual void registerTags(const Session& session) = 0; + + /** + * Return the stored X509 subject name for this session. If the session does not + * exist in this TransportLayer, returns "". + */ + virtual std::string getX509SubjectName(const Session& session) = 0; + + /** + * Returns the number of sessions currently open in the transport layer. + */ + virtual Stats sessionStats() = 0; /** * End the given Session. Tickets for this Session that have already been @@ -131,11 +174,20 @@ public: virtual void end(const Session& session) = 0; /** - * End all active sessions in the TransportLayer. Tickets that have already been - * started via wait() or asyncWait() will complete, but may return a failed Status. - * This method is synchronous and will not return until all sessions have ended. + * End all active sessions in the TransportLayer. Tickets that have already been started via + * wait() or asyncWait() will complete, but may return a failed Status. This method is + * asynchronous and will return after all sessions have been notified to end. + * + * If a TagMask is provided, endAllSessions() will skip over sessions with matching + * tags and leave them open. + */ + virtual void endAllSessions(Session::TagMask tags = Session::kEmptyTagMask) = 0; + + /** + * Start the TransportLayer. After this point, the TransportLayer will begin accepting active + * sessions from new transport::Endpoints. */ - virtual void endAllSessions() = 0; + virtual Status start() = 0; /** * Shut the TransportLayer down. After this point, the TransportLayer will @@ -147,12 +199,21 @@ public: virtual void shutdown() = 0; protected: - TransportLayer(); + TransportLayer() = default; /** * Return the implementation of this Ticket. */ - TicketImpl* getTicketImpl(const Ticket& ticket); + TicketImpl* getTicketImpl(const Ticket& ticket) { + return ticket.impl(); + } + + /** + * Return the transport layer of this Ticket. + */ + TransportLayer* getTicketTransportLayer(const Ticket& ticket) { + return ticket._tl; + } }; } // namespace transport diff --git a/src/mongo/transport/transport_layer_legacy.cpp b/src/mongo/transport/transport_layer_legacy.cpp new file mode 100644 index 00000000000..b45d6a6ad07 --- /dev/null +++ b/src/mongo/transport/transport_layer_legacy.cpp @@ -0,0 +1,307 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork + +#include "mongo/platform/basic.h" + +#include "mongo/transport/transport_layer_legacy.h" + +#include "mongo/base/checked_cast.h" +#include "mongo/config.h" +#include "mongo/db/service_context.h" +#include "mongo/db/stats/counters.h" +#include "mongo/stdx/functional.h" +#include "mongo/transport/service_entry_point.h" +#include "mongo/util/assert_util.h" +#include "mongo/util/log.h" +#include "mongo/util/net/abstract_message_port.h" +#include "mongo/util/net/socket_exception.h" + +namespace mongo { +namespace transport { + +TransportLayerLegacy::ListenerLegacy::ListenerLegacy(const TransportLayerLegacy::Options& opts, + NewConnectionCb callback) + : Listener("", opts.ipList, opts.port, getGlobalServiceContext(), true), + _accepted(std::move(callback)) {} + +void TransportLayerLegacy::ListenerLegacy::accepted(std::unique_ptr<AbstractMessagingPort> mp) { + _accepted(std::move(mp)); +} + +TransportLayerLegacy::TransportLayerLegacy(const TransportLayerLegacy::Options& opts, + std::shared_ptr<ServiceEntryPoint> sep) + : _sep(sep), + _listener(stdx::make_unique<ListenerLegacy>( + opts, + stdx::bind(&TransportLayerLegacy::_handleNewConnection, this, stdx::placeholders::_1))), + _running(false), + _options(opts) {} + +TransportLayerLegacy::LegacyTicket::LegacyTicket(const Session& session, + Date_t expiration, + WorkHandle work) + : _sessionId(session.id()), _expiration(expiration), _fill(std::move(work)) {} + +Session::Id TransportLayerLegacy::LegacyTicket::sessionId() const { + return _sessionId; +} + +Date_t TransportLayerLegacy::LegacyTicket::expiration() const { + return _expiration; +} + +Status TransportLayerLegacy::setup() { + if (!_listener->setupSockets()) { + error() << "Failed to set up sockets during startup."; + return {ErrorCodes::InternalError, "Failed to set up sockets"}; + } + + return Status::OK(); +} + +Status TransportLayerLegacy::start() { + if (_running.swap(true)) { + return {ErrorCodes::InternalError, "TransportLayer is already running"}; + } + + _listenerThread = stdx::thread([this]() { _listener->initAndListen(); }); + + return Status::OK(); +} + +TransportLayerLegacy::~TransportLayerLegacy() = default; + +Ticket TransportLayerLegacy::sourceMessage(const Session& session, + Message* message, + Date_t expiration) { + auto sourceCb = [message](AbstractMessagingPort* amp) -> Status { + if (!amp->recv(*message)) { + return {ErrorCodes::HostUnreachable, "Recv failed"}; + } + return Status::OK(); + }; + + return Ticket(this, stdx::make_unique<LegacyTicket>(session, expiration, std::move(sourceCb))); +} + +std::string TransportLayerLegacy::getX509SubjectName(const Session& session) { + { + stdx::lock_guard<stdx::mutex> lk(_connectionsMutex); + auto conn = _connections.find(session.id()); + if (conn == _connections.end()) { + // Return empty string if the session is not found + return ""; + } + + return conn->second.x509SubjectName.value_or(""); + } +} + +TransportLayer::Stats TransportLayerLegacy::sessionStats() { + Stats stats; + { + stdx::lock_guard<stdx::mutex> lk(_connectionsMutex); + stats.numOpenSessions = _connections.size(); + } + + stats.numAvailableSessions = Listener::globalTicketHolder.available(); + stats.numCreatedSessions = Listener::globalConnectionNumber.load(); + + return stats; +} + +Ticket TransportLayerLegacy::sinkMessage(const Session& session, + const Message& message, + Date_t expiration) { + auto sinkCb = [&message](AbstractMessagingPort* amp) -> Status { + try { + amp->say(message); + return Status::OK(); + } catch (const SocketException& e) { + return {ErrorCodes::HostUnreachable, e.what()}; + } + }; + + return Ticket(this, stdx::make_unique<LegacyTicket>(session, expiration, std::move(sinkCb))); +} + +Status TransportLayerLegacy::wait(Ticket&& ticket) { + return _runTicket(std::move(ticket)); +} + +void TransportLayerLegacy::asyncWait(Ticket&& ticket, TicketCallback callback) { + // Left unimplemented because there is no reasonable way to offer general async waiting besides + // offering a background thread that can handle waits for multiple tickets. We may never + // implement this for the legacy TL. + MONGO_UNREACHABLE; +} + +void TransportLayerLegacy::end(const Session& session) { + stdx::lock_guard<stdx::mutex> lk(_connectionsMutex); + auto conn = _connections.find(session.id()); + if (conn != _connections.end()) { + _endSession_inlock(conn); + } +} + +void TransportLayerLegacy::registerTags(const Session& session) { + stdx::lock_guard<stdx::mutex> lk(_connectionsMutex); + auto conn = _connections.find(session.id()); + if (conn != _connections.end()) { + conn->second.tags = session.getTags(); + } +} + +void TransportLayerLegacy::_endSession_inlock( + decltype(TransportLayerLegacy::_connections.begin()) conn) { + conn->second.amp->shutdown(); + + // If the amp is in use it means that we're in the middle of fulfilling the ticket in _runTicket + // and can't erase it immediately. + // + // In that case we'll rely on _runTicket to do the removal for us later + if (conn->second.inUse) { + conn->second.ended = true; + } else { + Listener::globalTicketHolder.release(); + _connections.erase(conn); + } +} + +void TransportLayerLegacy::endAllSessions(Session::TagMask tags) { + log() << "legacy transport layer ending all sessions"; + { + stdx::lock_guard<stdx::mutex> lk(_connectionsMutex); + auto&& conn = _connections.begin(); + while (conn != _connections.end()) { + // If we erase this connection below, we invalidate our iterator, use a placeholder. + auto placeholder = conn; + placeholder++; + + if (conn->second.tags & tags) { + log() << "Skip closing connection for connection # " << conn->second.connectionId; + } else { + _endSession_inlock(conn); + } + + conn = placeholder; + } + } +} + +void TransportLayerLegacy::shutdown() { + _running.store(false); + _listener->shutdown(); + _listenerThread.join(); + endAllSessions(); +} + +Status TransportLayerLegacy::_runTicket(Ticket ticket) { + if (!_running.load()) { + return {ErrorCodes::ShutdownInProgress, "TransportLayer in shutdown"}; + } + + if (ticket.expiration() < Date_t::now()) { + return {ErrorCodes::ExceededTimeLimit, "Ticket has expired"}; + } + + AbstractMessagingPort* amp; + + { + stdx::lock_guard<stdx::mutex> lk(_connectionsMutex); + + auto conn = _connections.find(ticket.sessionId()); + if (conn == _connections.end()) { + return {ErrorCodes::TransportSessionNotFound, "No such session in TransportLayer"}; + } + + // "check out" the port + conn->second.inUse = true; + amp = conn->second.amp.get(); + } + + amp->clearCounters(); + + auto legacyTicket = checked_cast<LegacyTicket*>(getTicketImpl(ticket)); + auto res = legacyTicket->_fill(amp); + + networkCounter.hit(amp->getBytesIn(), amp->getBytesOut()); + + { + stdx::lock_guard<stdx::mutex> lk(_connectionsMutex); + + auto conn = _connections.find(ticket.sessionId()); + invariant(conn != _connections.end()); + +#ifdef MONGO_CONFIG_SSL + // If we didn't have an X509 subject name, see if we have one now + if (!conn->second.x509SubjectName) { + auto name = amp->getX509SubjectName(); + if (name != "") { + conn->second.x509SubjectName = name; + } + } +#endif + conn->second.inUse = false; + + if (conn->second.ended) { + Listener::globalTicketHolder.release(); + _connections.erase(conn); + } + } + + return res; +} + +void TransportLayerLegacy::_handleNewConnection(std::unique_ptr<AbstractMessagingPort> amp) { + if (!Listener::globalTicketHolder.tryAcquire()) { + log() << "connection refused because too many open connections: " + << Listener::globalTicketHolder.used(); + amp->shutdown(); + return; + } + + Session session(amp->remote(), HostAndPort(amp->localAddr().toString(true)), this); + + amp->setLogLevel(logger::LogSeverity::Debug(1)); + + { + stdx::lock_guard<stdx::mutex> lk(_connectionsMutex); + _connections.emplace(std::piecewise_construct, + std::forward_as_tuple(session.id()), + std::forward_as_tuple(std::move(amp), false, session.getTags())); + } + + invariant(_sep); + _sep->startSession(std::move(session)); +} + +} // namespace transport +} // namespace mongo diff --git a/src/mongo/transport/transport_layer_legacy.h b/src/mongo/transport/transport_layer_legacy.h new file mode 100644 index 00000000000..ec17c76ca1e --- /dev/null +++ b/src/mongo/transport/transport_layer_legacy.h @@ -0,0 +1,174 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <unordered_map> + +#include "mongo/stdx/memory.h" +#include "mongo/stdx/mutex.h" +#include "mongo/stdx/thread.h" +#include "mongo/transport/ticket_impl.h" +#include "mongo/transport/transport_layer.h" +#include "mongo/util/net/listen.h" +#include "mongo/util/net/sock.h" + +namespace mongo { + +class AbstractMessagingPort; +class ServiceEntryPoint; + +namespace transport { + +/** + * A TransportLayer implementation based on legacy networking primitives (the Listener, + * AbstractMessagingPort). + */ +class TransportLayerLegacy final : public TransportLayer { + MONGO_DISALLOW_COPYING(TransportLayerLegacy); + +public: + struct Options { + int port; // port to bind to + std::string ipList; // addresses to bind to + + Options() : port(0), ipList("") {} + }; + + TransportLayerLegacy(const Options& opts, std::shared_ptr<ServiceEntryPoint> sep); + + ~TransportLayerLegacy(); + + Status setup(); + Status start() override; + + Ticket sourceMessage(const Session& session, + Message* message, + Date_t expiration = Ticket::kNoExpirationDate) override; + + Ticket sinkMessage(const Session& session, + const Message& message, + Date_t expiration = Ticket::kNoExpirationDate) override; + + Status wait(Ticket&& ticket) override; + void asyncWait(Ticket&& ticket, TicketCallback callback) override; + + void registerTags(const Session& session) override; + std::string getX509SubjectName(const Session& session) override; + + Stats sessionStats() override; + + void end(const Session& session) override; + void endAllSessions(transport::Session::TagMask tags = Session::kKeepOpen) override; + void shutdown() override; + +private: + void _handleNewConnection(std::unique_ptr<AbstractMessagingPort> amp); + + Status _runTicket(Ticket ticket); + + using NewConnectionCb = stdx::function<void(std::unique_ptr<AbstractMessagingPort>)>; + using WorkHandle = stdx::function<Status(AbstractMessagingPort*)>; + + /** + * A TicketImpl implementation for this TransportLayer. WorkHandle is a callable that + * can be invoked to fill this ticket. + */ + class LegacyTicket : public TicketImpl { + MONGO_DISALLOW_COPYING(LegacyTicket); + + public: + LegacyTicket(const Session& session, Date_t expiration, WorkHandle work); + + SessionId sessionId() const override; + Date_t expiration() const override; + + SessionId _sessionId; + Date_t _expiration; + + WorkHandle _fill; + }; + + /** + * This Listener wraps the interface in listen.h so that we may implement our own + * version of accepted(). + */ + class ListenerLegacy : public Listener { + public: + ListenerLegacy(const TransportLayerLegacy::Options& opts, NewConnectionCb callback); + + void runListener(); + + void accepted(std::unique_ptr<AbstractMessagingPort> mp) override; + + bool useUnixSockets() const override { + return true; + } + + private: + NewConnectionCb _accepted; + }; + + /** + * Connection object, to associate Session ids with AbstractMessagingPorts. + */ + struct Connection { + Connection(std::unique_ptr<AbstractMessagingPort> port, bool ended, Session::TagMask tags) + : amp(std::move(port)), + connectionId(amp->connectionId()), + tags(tags), + inUse(false), + ended(false) {} + + std::unique_ptr<AbstractMessagingPort> amp; + + const long long connectionId; + + boost::optional<std::string> x509SubjectName; + Session::TagMask tags; + bool inUse; + bool ended; + }; + + std::shared_ptr<ServiceEntryPoint> _sep; + + std::unique_ptr<Listener> _listener; + stdx::thread _listenerThread; + + stdx::mutex _connectionsMutex; + std::unordered_map<Session::Id, Connection> _connections; + + void _endSession_inlock(decltype(_connections.begin()) conn); + + AtomicWord<bool> _running; + + Options _options; +}; + +} // namespace transport +} // namespace mongo diff --git a/src/mongo/transport/transport_layer_manager.cpp b/src/mongo/transport/transport_layer_manager.cpp new file mode 100644 index 00000000000..c64023b2067 --- /dev/null +++ b/src/mongo/transport/transport_layer_manager.cpp @@ -0,0 +1,129 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/transport/transport_layer_manager.h" + +#include "mongo/base/status.h" +#include "mongo/stdx/memory.h" +#include "mongo/transport/session.h" +#include "mongo/util/time_support.h" +#include <limits> + +#include <iostream> + +namespace mongo { +namespace transport { + +TransportLayerManager::TransportLayerManager() = default; + +Ticket TransportLayerManager::sourceMessage(const Session& session, + Message* message, + Date_t expiration) { + return session.getTransportLayer()->sourceMessage(session, message, expiration); +} + +Ticket TransportLayerManager::sinkMessage(const Session& session, + const Message& message, + Date_t expiration) { + return session.getTransportLayer()->sinkMessage(session, message, expiration); +} + +Status TransportLayerManager::wait(Ticket&& ticket) { + return getTicketTransportLayer(ticket)->wait(std::move(ticket)); +} + +void TransportLayerManager::asyncWait(Ticket&& ticket, TicketCallback callback) { + return getTicketTransportLayer(ticket)->asyncWait(std::move(ticket), std::move(callback)); +} + +std::string TransportLayerManager::getX509SubjectName(const Session& session) { + return session.getX509SubjectName(); +} + +template <typename Callable> +void TransportLayerManager::_foreach(Callable&& cb) { + { + stdx::lock_guard<stdx::mutex> lk(_tlsMutex); + for (auto&& tl : _tls) { + cb(tl.get()); + } + } +} + +TransportLayer::Stats TransportLayerManager::sessionStats() { + Stats stats; + + _foreach([&](TransportLayer* tl) { + Stats s = tl->sessionStats(); + + stats.numOpenSessions += s.numOpenSessions; + stats.numCreatedSessions += s.numCreatedSessions; + if (std::numeric_limits<size_t>::max() - stats.numAvailableSessions < + s.numAvailableSessions) { + stats.numAvailableSessions = std::numeric_limits<size_t>::max(); + } else { + stats.numAvailableSessions += s.numAvailableSessions; + } + }); + + return stats; +} + +void TransportLayerManager::registerTags(const Session& session) { + session.getTransportLayer()->registerTags(session); +} + +void TransportLayerManager::end(const Session& session) { + session.getTransportLayer()->end(session); +} + +void TransportLayerManager::endAllSessions(Session::TagMask tags) { + _foreach([&tags](TransportLayer* tl) { tl->endAllSessions(tags); }); +} + +Status TransportLayerManager::start() { + return Status::OK(); +} + +void TransportLayerManager::shutdown() { + _foreach([](TransportLayer* tl) { tl->shutdown(); }); +} + +Status TransportLayerManager::addAndStartTransportLayer(std::unique_ptr<TransportLayer> tl) { + auto ptr = tl.get(); + { + stdx::lock_guard<stdx::mutex> lk(_tlsMutex); + _tls.emplace_back(std::move(tl)); + } + return ptr->start(); +} + +} // namespace transport +} // namespace mongo diff --git a/src/mongo/transport/transport_layer_manager.h b/src/mongo/transport/transport_layer_manager.h new file mode 100644 index 00000000000..391bdf4ebea --- /dev/null +++ b/src/mongo/transport/transport_layer_manager.h @@ -0,0 +1,90 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <vector> + +#include "mongo/base/status.h" +#include "mongo/stdx/mutex.h" +#include "mongo/transport/session.h" +#include "mongo/transport/ticket.h" +#include "mongo/transport/ticket_impl.h" +#include "mongo/transport/transport_layer.h" +#include "mongo/util/net/message.h" +#include "mongo/util/time_support.h" + +namespace mongo { +namespace transport { + +/** + * This TransportLayerManager is a TransportLayer implementation that holds other + * TransportLayers. Mongod and Mongos can treat this like the "only" TransportLayer + * and not be concerned with which other TransportLayer implementations it holds + * underneath. + */ +class TransportLayerManager final : public TransportLayer { + MONGO_DISALLOW_COPYING(TransportLayerManager); + +public: + TransportLayerManager(); + + Ticket sourceMessage(const Session& session, + Message* message, + Date_t expiration = Ticket::kNoExpirationDate) override; + Ticket sinkMessage(const Session& session, + const Message& message, + Date_t expiration = Ticket::kNoExpirationDate) override; + + Status wait(Ticket&& ticket) override; + void asyncWait(Ticket&& ticket, TicketCallback callback) override; + + std::string getX509SubjectName(const Session& session) override; + void registerTags(const Session& session) override; + + Stats sessionStats() override; + + void end(const Session& session) override; + void endAllSessions(Session::TagMask tags = Session::kEmptyTagMask) override; + + Status start() override; + + void shutdown() override; + + Status addAndStartTransportLayer(std::unique_ptr<TransportLayer> tl); + +private: + template <typename Callable> + void _foreach(Callable&& cb); + + stdx::mutex _tlsMutex; + std::vector<std::unique_ptr<TransportLayer>> _tls; +}; + +} // namespace transport +} // namespace mongo diff --git a/src/mongo/transport/transport_layer_mock.cpp b/src/mongo/transport/transport_layer_mock.cpp new file mode 100644 index 00000000000..c550570b401 --- /dev/null +++ b/src/mongo/transport/transport_layer_mock.cpp @@ -0,0 +1,95 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include <memory> + +#include "mongo/base/status.h" +#include "mongo/stdx/memory.h" +#include "mongo/transport/session.h" +#include "mongo/transport/ticket.h" +#include "mongo/transport/ticket_impl.h" +#include "mongo/transport/transport_layer_mock.h" +#include "mongo/util/time_support.h" + +#include "mongo/stdx/memory.h" + +namespace mongo { +namespace transport { + +Session::Id TransportLayerMock::MockTicket::sessionId() const { + return Session::Id{}; +} + +Date_t TransportLayerMock::MockTicket::expiration() const { + return Date_t::now(); +} + +Ticket TransportLayerMock::sourceMessage(const Session& session, + Message* message, + Date_t expiration) { + return Ticket(this, stdx::make_unique<MockTicket>()); +} + +Ticket TransportLayerMock::sinkMessage(const Session& session, + const Message& message, + Date_t expiration) { + return Ticket(this, stdx::make_unique<MockTicket>()); +} + +Status TransportLayerMock::wait(Ticket&& ticket) { + return Status::OK(); +} + +void TransportLayerMock::asyncWait(Ticket&& ticket, TicketCallback callback) { + callback(Status::OK()); +} + +std::string TransportLayerMock::getX509SubjectName(const Session& session) { + return session.getX509SubjectName(); +} + +TransportLayer::Stats TransportLayerMock::sessionStats() { + return Stats(); +} + +void TransportLayerMock::registerTags(const Session& session) {} + +void TransportLayerMock::end(const Session& session) {} + +void TransportLayerMock::endAllSessions(Session::TagMask tags) {} + +Status TransportLayerMock::start() { + return Status::OK(); +} + +void TransportLayerMock::shutdown() {} + +} // namespace transport +} // namespace mongo diff --git a/src/mongo/transport/transport_layer_mock.h b/src/mongo/transport/transport_layer_mock.h new file mode 100644 index 00000000000..400d6b02258 --- /dev/null +++ b/src/mongo/transport/transport_layer_mock.h @@ -0,0 +1,90 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/base/status.h" +#include "mongo/transport/session.h" +#include "mongo/transport/ticket.h" +#include "mongo/transport/ticket_impl.h" +#include "mongo/transport/transport_layer.h" +#include "mongo/util/net/message.h" +#include "mongo/util/time_support.h" + +namespace mongo { +namespace transport { + +/** + * This TransportLayerMock is a noop TransportLayer implementation. + */ +class TransportLayerMock : public TransportLayer { + MONGO_DISALLOW_COPYING(TransportLayerMock); + +public: + TransportLayerMock() = default; + + Ticket sourceMessage(const Session& session, + Message* message, + Date_t expiration = Ticket::kNoExpirationDate) override; + Ticket sinkMessage(const Session& session, + const Message& message, + Date_t expiration = Ticket::kNoExpirationDate) override; + + Status wait(Ticket&& ticket) override; + void asyncWait(Ticket&& ticket, TicketCallback callback) override; + + std::string getX509SubjectName(const Session& session) override; + void registerTags(const Session& session) override; + + Stats sessionStats() override; + + void end(const Session& session) override; + void endAllSessions(Session::TagMask tags = Session::kEmptyTagMask) override; + + Status start() override; + void shutdown() override; + +private: + /** + * A class for Tickets issued from the TransportLayerMock. These will + * route to the appropriate TransportLayer when run with wait() or + * asyncWait(). + */ + class MockTicket : public TicketImpl { + MONGO_DISALLOW_COPYING(MockTicket); + + public: + MockTicket() = default; + + Session::Id sessionId() const override; + Date_t expiration() const override; + }; +}; + +} // namespace transport +} // namespace mongo diff --git a/src/mongo/util/exit.cpp b/src/mongo/util/exit.cpp index 44cdf07f68b..7955a3483c6 100644 --- a/src/mongo/util/exit.cpp +++ b/src/mongo/util/exit.cpp @@ -32,6 +32,7 @@ #include "mongo/util/exit.h" +#include <boost/optional.hpp> #include <stack> #include "mongo/stdx/condition_variable.h" @@ -47,6 +48,7 @@ namespace { stdx::mutex shutdownMutex; stdx::condition_variable shutdownTasksComplete; +boost::optional<ExitCode> shutdownExitCode; bool shutdownTasksInProgress = false; AtomicUInt32 shutdownFlag; std::stack<stdx::function<void()>> shutdownTasks; @@ -69,6 +71,10 @@ MONGO_COMPILER_NORETURN void logAndQuickExit(ExitCode code) { quickExit(code); } +void setShutdownFlag() { + shutdownFlag.fetchAndAdd(1); +} + } // namespace bool inShutdown() { @@ -79,6 +85,13 @@ bool inShutdownStrict() { return shutdownFlag.load() != 0; } +ExitCode waitForShutdown() { + stdx::unique_lock<stdx::mutex> lk(shutdownMutex); + shutdownTasksComplete.wait(lk, [] { return static_cast<bool>(shutdownExitCode); }); + + return shutdownExitCode.get(); +} + void registerShutdownTask(stdx::function<void()> task) { stdx::lock_guard<stdx::mutex> lock(shutdownMutex); invariant(!inShutdown()); @@ -105,7 +118,7 @@ void shutdown(ExitCode code) { logAndQuickExit(code); } - shutdownFlag.fetchAndAdd(1); + setShutdownFlag(); shutdownTasksInProgress = true; shutdownTasksThreadId = stdx::this_thread::get_id(); @@ -117,6 +130,7 @@ void shutdown(ExitCode code) { { stdx::lock_guard<stdx::mutex> lock(shutdownMutex); shutdownTasksInProgress = false; + shutdownExitCode.emplace(code); } shutdownTasksComplete.notify_all(); @@ -133,7 +147,7 @@ void shutdownNoTerminate() { if (inShutdown()) return; - shutdownFlag.fetchAndAdd(1); + setShutdownFlag(); shutdownTasksInProgress = true; shutdownTasksThreadId = stdx::this_thread::get_id(); diff --git a/src/mongo/util/exit.h b/src/mongo/util/exit.h index fa6a8bbb85b..1aed81117a8 100644 --- a/src/mongo/util/exit.h +++ b/src/mongo/util/exit.h @@ -46,6 +46,11 @@ bool inShutdown(); bool inShutdownStrict(); /** + * Does not return until all shutdown tasks have run. + */ +ExitCode waitForShutdown(); + +/** * Registers a new shutdown task to be called when shutdown or * shutdownNoTerminate is called. If this function is invoked after * shutdown or shutdownNoTerminate has been called, std::terminate is diff --git a/src/mongo/util/net/SConscript b/src/mongo/util/net/SConscript index fa8899561e9..34105547ab6 100644 --- a/src/mongo/util/net/SConscript +++ b/src/mongo/util/net/SConscript @@ -95,17 +95,6 @@ env.Library( ) env.Library( - target="message_server_port", - source=[ - "message_server_port.cpp", - ], - LIBDEPS=[ - 'network', - '$BUILD_DIR/mongo/db/stats/counters', - ], -) - -env.Library( target='miniwebserver', source=[ 'miniwebserver.cpp', diff --git a/src/mongo/util/net/abstract_message_port.h b/src/mongo/util/net/abstract_message_port.h index c4ff5503f41..e4884e75133 100644 --- a/src/mongo/util/net/abstract_message_port.h +++ b/src/mongo/util/net/abstract_message_port.h @@ -32,6 +32,7 @@ #include "mongo/config.h" #include "mongo/logger/log_severity.h" +#include "mongo/stdx/functional.h" #include "mongo/util/net/message.h" #include "mongo/util/net/sockaddr.h" #include "mongo/util/time_support.h" @@ -88,6 +89,11 @@ public: virtual void say(Message& toSend, int responseTo = 0) = 0; /** + * Sends the message (does not set headers). + */ + virtual void say(const Message& toSend) = 0; + + /** * Sends the data over the socket. */ virtual void send(const char* data, int len, const char* context) = 0; diff --git a/src/mongo/util/net/asio_message_port.cpp b/src/mongo/util/net/asio_message_port.cpp index 88e37feb2d8..16fab7ea8c1 100644 --- a/src/mongo/util/net/asio_message_port.cpp +++ b/src/mongo/util/net/asio_message_port.cpp @@ -54,40 +54,6 @@ const char kGET[] = "GET"; const int kHeaderLen = sizeof(MSGHEADER::Value); const int kInitialMessageSize = 1024; -class ASIOPorts { -public: - void closeAll(AbstractMessagingPort::Tag skipMask) { - stdx::lock_guard<stdx::mutex> lock_guard(mutex); - for (auto&& port : _portSet) { - if (port->getTag() & skipMask) { - LOG(3) << "Skip closing connection # " << port->connectionId(); - continue; - } - LOG(3) << "Closing connection # " << port->connectionId(); - port->shutdown(); - } - } - - void insert(ASIOMessagingPort* p) { - stdx::lock_guard<stdx::mutex> lock_guard(mutex); - _portSet.insert(p); - } - - void erase(ASIOMessagingPort* p) { - stdx::lock_guard<stdx::mutex> lock_guard(mutex); - _portSet.erase(p); - } - -private: - stdx::mutex mutex; - std::unordered_set<ASIOMessagingPort*> _portSet; -}; - -// We "new" this so it will still be around when other automatic global vars are being destructed -// during termination. TODO: This is an artifact from MessagingPort and should be removed when we -// decide where to put networking global state. -ASIOPorts& asioPorts = *(new ASIOPorts()); - #ifdef MONGO_CONFIG_SSL struct ASIOSSLContextPair { ASIOSSLContext server; @@ -109,10 +75,6 @@ MONGO_INITIALIZER_WITH_PREREQUISITES(ASIOSSLContextSetup, ("SSLManager"))(Initia } // namespace -void ASIOMessagingPort::closeSockets(AbstractMessagingPort::Tag skipMask) { - asioPorts.closeAll(skipMask); -} - ASIOMessagingPort::ASIOMessagingPort(int fd, SockAddr farEnd) : _service(1), _timer(_service), @@ -145,7 +107,6 @@ ASIOMessagingPort::ASIOMessagingPort(int fd, SockAddr farEnd) farEnd.getType() == AF_UNIX ? 0 : IPPROTO_TCP), fd) { #endif // MONGO_CONFIG_SSL - asioPorts.insert(this); _getSocket().non_blocking(true); _remote = HostAndPort(farEnd.getAddr(), farEnd.getPort()); _timer.expires_at(decltype(_timer)::time_point::max()); @@ -177,7 +138,6 @@ ASIOMessagingPort::ASIOMessagingPort(Milliseconds timeout, logger::LogSeverity l #else _sock(_service) { #endif // MONGO_CONFIG_SSL - asioPorts.insert(this); if (*_timeout == Milliseconds(0)) { _timeout = boost::none; } @@ -187,7 +147,6 @@ ASIOMessagingPort::ASIOMessagingPort(Milliseconds timeout, logger::LogSeverity l ASIOMessagingPort::~ASIOMessagingPort() { shutdown(); - asioPorts.erase(this); } void ASIOMessagingPort::setTimeout(Milliseconds millis) { @@ -502,6 +461,11 @@ void ASIOMessagingPort::say(Message& toSend, int responseTo) { invariant(!toSend.empty()); toSend.header().setId(nextMessageId()); toSend.header().setResponseToMsgId(responseTo); + return say(const_cast<const Message&>(toSend)); +} + +void ASIOMessagingPort::say(const Message& toSend) { + invariant(!toSend.empty()); auto buf = toSend.buf(); if (buf) { send(buf, MsgData::ConstView(buf).getLen(), nullptr); diff --git a/src/mongo/util/net/asio_message_port.h b/src/mongo/util/net/asio_message_port.h index d474b03ef29..d3e5fc33b1c 100644 --- a/src/mongo/util/net/asio_message_port.h +++ b/src/mongo/util/net/asio_message_port.h @@ -78,6 +78,7 @@ public: void reply(Message& received, Message& response) override; void say(Message& toSend, int responseTo = 0) override; + void say(const Message& toSend) override; void send(const char* data, int len, const char*) override; void send(const std::vector<std::pair<char*, int>>& data, const char*) override; @@ -118,8 +119,6 @@ public: bool secure(SSLManagerInterface* ssl, const std::string& remoteHost) override; - static void closeSockets(AbstractMessagingPort::Tag skipMask = kSkipAllMask); - private: void _setTimerCallback(); asio::error_code _read(char* buf, std::size_t size); diff --git a/src/mongo/util/net/listen.cpp b/src/mongo/util/net/listen.cpp index deb30887e5c..65cec09e45a 100644 --- a/src/mongo/util/net/listen.cpp +++ b/src/mongo/util/net/listen.cpp @@ -280,7 +280,9 @@ void Listener::initAndListen() { } struct timeval maxSelectTime; - while (!inShutdown()) { + // The check against _finished allows us to actually stop the listener by signalling it through + // the _finished flag. + while (!inShutdown() && !_finished.load()) { fd_set fds[1]; FD_ZERO(fds); @@ -601,7 +603,7 @@ void Listener::_accepted(const std::shared_ptr<Socket>& psocket, long long conne port = stdx::make_unique<MessagingPort>(psocket); } port->setConnectionId(connectionId); - accepted(port.release()); + accepted(std::move(port)); } // ----- ListeningSockets ------- @@ -648,9 +650,8 @@ void Listener::checkTicketNumbers() { globalTicketHolder.resize(want); } -void Listener::closeMessagingPorts(AbstractMessagingPort::Tag skipMask) { - ASIOMessagingPort::closeSockets(skipMask); - MessagingPort::closeSockets(skipMask); +void Listener::shutdown() { + _finished.store(true); } TicketHolder Listener::globalTicketHolder(DEFAULT_MAX_CONN); diff --git a/src/mongo/util/net/listen.h b/src/mongo/util/net/listen.h index 0bbd3ad8a0b..f0c66a41d60 100644 --- a/src/mongo/util/net/listen.h +++ b/src/mongo/util/net/listen.h @@ -66,7 +66,7 @@ public: void initAndListen(); // never returns unless error (start a thread) /* spawn a thread, etc., then return */ - virtual void accepted(AbstractMessagingPort* mp) = 0; + virtual void accepted(std::unique_ptr<AbstractMessagingPort> mp) = 0; const int _port; @@ -83,6 +83,8 @@ public: */ void waitUntilListening() const; + void shutdown(); + private: std::vector<SockAddr> _mine; std::vector<SOCKET> _socks; @@ -94,6 +96,7 @@ private: mutable stdx::condition_variable _readyCondition; // Used to wait for changes to _ready // Boolean that indicates whether this Listener is ready to accept incoming network requests bool _ready; + AtomicBool _finished{false}; ServiceContext* _ctx; bool _setAsServiceCtxDecoration; @@ -119,13 +122,6 @@ public: /** makes sure user input is sane */ static void checkTicketNumbers(); - - /** - * This will close implementations of AbstractMessagingPort, skipping any that have tags - * matching `skipMask`. - */ - static void closeMessagingPorts( - AbstractMessagingPort::Tag skipMask = AbstractMessagingPort::kSkipAllMask); }; class ListeningSockets { diff --git a/src/mongo/util/net/message.h b/src/mongo/util/net/message.h index 37a6c0bb66b..601d02b9e12 100644 --- a/src/mongo/util/net/message.h +++ b/src/mongo/util/net/message.h @@ -458,11 +458,16 @@ public: return _buf.get(); } + const char* buf() const { + return _buf.get(); + } + std::string toString() const; SharedBuffer sharedBuffer() { return _buf; } + ConstSharedBuffer sharedBuffer() const { return _buf; } diff --git a/src/mongo/util/net/message_port.cpp b/src/mongo/util/net/message_port.cpp index 5051962ed5d..137d494c4ac 100644 --- a/src/mongo/util/net/message_port.cpp +++ b/src/mongo/util/net/message_port.cpp @@ -63,41 +63,6 @@ using std::string; /* messagingport -------------------------------------------------------------- */ -class Ports { - std::set<MessagingPort*> ports; - stdx::mutex m; - -public: - Ports() : ports() {} - void closeAll(AbstractMessagingPort::Tag skip_mask) { - stdx::lock_guard<stdx::mutex> bl(m); - for (std::set<MessagingPort*>::iterator i = ports.begin(); i != ports.end(); i++) { - if ((*i)->getTag() & skip_mask) { - LOG(3) << "Skip closing connection # " << (*i)->connectionId(); - continue; - } - LOG(3) << "Closing connection # " << (*i)->connectionId(); - (*i)->shutdown(); - } - } - void insert(MessagingPort* p) { - stdx::lock_guard<stdx::mutex> bl(m); - ports.insert(p); - } - void erase(MessagingPort* p) { - stdx::lock_guard<stdx::mutex> bl(m); - ports.erase(p); - } -}; - -// we "new" this so it is still be around when other automatic global vars -// are being destructed during termination. -Ports& ports = *(new Ports()); - -void MessagingPort::closeSockets(AbstractMessagingPort::Tag skipMask) { - ports.closeAll(skipMask); -} - MessagingPort::MessagingPort(int fd, const SockAddr& remote) : MessagingPort(std::make_shared<Socket>(fd, remote)) {} @@ -108,7 +73,6 @@ MessagingPort::MessagingPort(std::shared_ptr<Socket> sock) : _x509SubjectName(), _connectionId(), _tag(), _psock(std::move(sock)) { SockAddr sa = _psock->remoteAddr(); _remoteParsed = HostAndPort(sa.getAddr(), sa.getPort()); - ports.insert(this); } void MessagingPort::setTimeout(Milliseconds millis) { @@ -122,7 +86,6 @@ void MessagingPort::shutdown() { MessagingPort::~MessagingPort() { shutdown(); - ports.erase(this); } bool MessagingPort::recv(Message& m) { @@ -216,6 +179,7 @@ bool MessagingPort::call(Message& toSend, Message& response) { say(toSend); bool success = recv(response); if (success) { + invariant(!response.empty()); if (response.header().getResponseToMsgId() != toSend.header().getId()) { response.reset(); uasserted(40134, "Response ID did not match the sent message ID."); @@ -225,9 +189,15 @@ bool MessagingPort::call(Message& toSend, Message& response) { } void MessagingPort::say(Message& toSend, int responseTo) { - verify(!toSend.empty()); + invariant(!toSend.empty()); toSend.header().setId(nextMessageId()); toSend.header().setResponseToMsgId(responseTo); + + return say(const_cast<const Message&>(toSend)); +} + +void MessagingPort::say(const Message& toSend) { + invariant(!toSend.empty()); auto buf = toSend.buf(); if (buf) { send(buf, MsgData::ConstView(buf).getLen(), "say"); diff --git a/src/mongo/util/net/message_port.h b/src/mongo/util/net/message_port.h index 18931e20750..41ecc2fce3b 100644 --- a/src/mongo/util/net/message_port.h +++ b/src/mongo/util/net/message_port.h @@ -66,6 +66,7 @@ public: bool call(Message& toSend, Message& response) override; void say(Message& toSend, int responseTo = 0) override; + void say(const Message& toSend) override; unsigned remotePort() const override { return _psock->remotePort(); @@ -142,11 +143,6 @@ private: long long _connectionId; AbstractMessagingPort::Tag _tag; std::shared_ptr<Socket> _psock; - - -public: - static void closeSockets(AbstractMessagingPort::Tag skipMask = kSkipAllMask); }; - } // namespace mongo diff --git a/src/mongo/util/net/message_port_mock.cpp b/src/mongo/util/net/message_port_mock.cpp index 6869723129f..de07ba032ae 100644 --- a/src/mongo/util/net/message_port_mock.cpp +++ b/src/mongo/util/net/message_port_mock.cpp @@ -55,6 +55,7 @@ void MessagingPortMock::reply(Message& received, Message& response, int32_t resp void MessagingPortMock::reply(Message& received, Message& response) {} void MessagingPortMock::say(Message& toSend, int responseTo) {} +void MessagingPortMock::say(const Message& toSend) {} bool MessagingPortMock::connect(SockAddr& farEnd) { return true; diff --git a/src/mongo/util/net/message_port_mock.h b/src/mongo/util/net/message_port_mock.h index 61ab5be8958..721998f81f3 100644 --- a/src/mongo/util/net/message_port_mock.h +++ b/src/mongo/util/net/message_port_mock.h @@ -55,6 +55,7 @@ public: void reply(Message& received, Message& response) override; void say(Message& toSend, int responseTo = 0) override; + void say(const Message& toSend) override; bool connect(SockAddr& farEnd) override; diff --git a/src/mongo/util/net/message_server.h b/src/mongo/util/net/message_server.h deleted file mode 100644 index 544c859c6f7..00000000000 --- a/src/mongo/util/net/message_server.h +++ /dev/null @@ -1,83 +0,0 @@ -// message_server.h - -/* Copyright 2009 10gen Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ - -/* - abstract database server - async io core, worker thread system - */ - -#pragma once - -#include "mongo/platform/basic.h" - -namespace mongo { - -class ServiceContext; - -class MessageHandler { -public: - virtual ~MessageHandler() {} - - /** - * Called once when a socket is connected. - */ - virtual void connected(AbstractMessagingPort* p) = 0; - - /** - * Called every time a message comes in. Handler is responsible for responding to client. - */ - virtual void process(Message& m, AbstractMessagingPort* p) = 0; - - /** - * Called once, either when the client disconnects or when the process is shutting down. After - * close() is called, this handler's AbstractMessagingPort pointer (passed in via the - * connected() method) is no longer valid. - */ - virtual void close() = 0; -}; - -class MessageServer { -public: - struct Options { - int port; // port to bind to - std::string ipList; // addresses to bind to - - Options() : port(0), ipList("") {} - }; - - virtual ~MessageServer() {} - virtual void run() = 0; - virtual bool setupSockets() = 0; -}; - -// TODO use a factory here to decide between port and asio variations -MessageServer* createServer(const MessageServer::Options& opts, - std::shared_ptr<MessageHandler> handler, - ServiceContext* ctx); -} diff --git a/src/mongo/util/net/message_server_port.cpp b/src/mongo/util/net/message_server_port.cpp deleted file mode 100644 index e8432f8c510..00000000000 --- a/src/mongo/util/net/message_server_port.cpp +++ /dev/null @@ -1,244 +0,0 @@ -// message_server_port.cpp - -/* Copyright 2009 10gen Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork - -#include "mongo/platform/basic.h" - -#include <memory> -#include <system_error> - -#include "mongo/base/disallow_copying.h" -#include "mongo/config.h" -#include "mongo/db/lasterror.h" -#include "mongo/db/server_options.h" -#include "mongo/db/stats/counters.h" -#include "mongo/stdx/functional.h" -#include "mongo/stdx/memory.h" -#include "mongo/stdx/thread.h" -#include "mongo/util/concurrency/thread_name.h" -#include "mongo/util/concurrency/ticketholder.h" -#include "mongo/util/debug_util.h" -#include "mongo/util/exit.h" -#include "mongo/util/log.h" -#include "mongo/util/mongoutils/str.h" -#include "mongo/util/net/abstract_message_port.h" -#include "mongo/util/net/listen.h" -#include "mongo/util/net/message.h" -#include "mongo/util/net/message_server.h" -#include "mongo/util/net/socket_exception.h" -#include "mongo/util/net/ssl_manager.h" -#include "mongo/util/net/thread_idle_callback.h" -#include "mongo/util/quick_exit.h" -#include "mongo/util/scopeguard.h" - -#ifdef __linux__ // TODO: consider making this ifndef _WIN32 -#include <sys/resource.h> -#endif - -#if !defined(__has_feature) -#define __has_feature(x) 0 -#endif - -namespace mongo { - -using std::unique_ptr; -using std::endl; - -namespace { - -using MessagingPortWithHandler = std::pair<AbstractMessagingPort*, std::shared_ptr<MessageHandler>>; - -} // namespace - -class PortMessageServer : public MessageServer, public Listener { -public: - /** - * Creates a new message server. - * - * @param opts - * @param handler the handler to use. - */ - PortMessageServer(const MessageServer::Options& opts, - std::shared_ptr<MessageHandler> handler, - ServiceContext* ctx) - : Listener("", opts.ipList, opts.port, ctx, true), _handler(std::move(handler)) {} - - virtual void accepted(AbstractMessagingPort* mp) { - ScopeGuard sleepAfterClosingPort = MakeGuard(sleepmillis, 2); - auto portWithHandler = stdx::make_unique<MessagingPortWithHandler>(mp, _handler); - - if (!Listener::globalTicketHolder.tryAcquire()) { - log() << "connection refused because too many open connections: " - << Listener::globalTicketHolder.used() << endl; - return; - } - - try { -#ifndef __linux__ // TODO: consider making this ifdef _WIN32 - { - stdx::thread thr(stdx::bind(&handleIncomingMsg, portWithHandler.get())); - thr.detach(); - } -#else - pthread_attr_t attrs; - pthread_attr_init(&attrs); - pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED); - - static const size_t STACK_SIZE = - 1024 * 1024; // if we change this we need to update the warning - - struct rlimit limits; - verify(getrlimit(RLIMIT_STACK, &limits) == 0); - if (limits.rlim_cur > STACK_SIZE) { - size_t stackSizeToSet = STACK_SIZE; -#if !__has_feature(address_sanitizer) - if (kDebugBuild) - stackSizeToSet /= 2; -#endif - pthread_attr_setstacksize(&attrs, stackSizeToSet); - } else if (limits.rlim_cur < 1024 * 1024) { - warning() << "Stack size set to " << (limits.rlim_cur / 1024) - << "KB. We suggest 1MB" << endl; - } - - - pthread_t thread; - int failed = pthread_create(&thread, &attrs, &handleIncomingMsg, portWithHandler.get()); - - pthread_attr_destroy(&attrs); - - if (failed) { - log() << "pthread_create failed: " << errnoWithDescription(failed) << endl; - throw std::system_error( - std::make_error_code(std::errc::resource_unavailable_try_again)); - } -#endif // __linux__ - - portWithHandler.release(); - sleepAfterClosingPort.Dismiss(); - } catch (...) { - Listener::globalTicketHolder.release(); - log() << "failed to create thread after accepting new connection, closing connection"; - } - } - - virtual bool setupSockets() { - return Listener::setupSockets(); - } - - void run() { - initAndListen(); - } - - virtual bool useUnixSockets() const { - return true; - } - -private: - const std::shared_ptr<MessageHandler> _handler; - - /** - * Handles incoming messages from a given socket. - * - * Terminating conditions: - * 1. Assertions while handling the request. - * 2. Socket is closed. - * 3. Server is shutting down (based on inShutdown) - * - * @param arg this method is in charge of cleaning up the arg object. - * - * @return NULL - */ - static void* handleIncomingMsg(void* arg) { - TicketHolderReleaser connTicketReleaser(&Listener::globalTicketHolder); - - invariant(arg); - unique_ptr<MessagingPortWithHandler> portWithHandler( - static_cast<MessagingPortWithHandler*>(arg)); - auto mp = portWithHandler->first; - auto handler = portWithHandler->second; - - setThreadName(std::string(str::stream() << "conn" << mp->connectionId())); - mp->setLogLevel(logger::LogSeverity::Debug(1)); - - Message m; - int64_t counter = 0; - try { - handler->connected(mp); - ON_BLOCK_EXIT([handler]() { handler->close(); }); - - while (!inShutdown()) { - m.reset(); - mp->clearCounters(); - - if (!mp->recv(m)) { - if (!serverGlobalParams.quiet) { - int conns = Listener::globalTicketHolder.used() - 1; - const char* word = (conns == 1 ? " connection" : " connections"); - log() << "end connection " << mp->remote().toString() << " (" << conns - << word << " now open)" << endl; - } - break; - } - - handler->process(m, mp); - networkCounter.hit(mp->getBytesIn(), mp->getBytesOut()); - - // Occasionally we want to see if we're using too much memory. - if ((counter++ & 0xf) == 0) { - markThreadIdle(); - } - } - } catch (AssertionException& e) { - log() << "AssertionException handling request, closing client connection: " << e - << endl; - } catch (SocketException& e) { - log() << "SocketException handling request, closing client connection: " << e << endl; - } catch (const DBException& - e) { // must be right above std::exception to avoid catching subclasses - log() << "DBException handling request, closing client connection: " << e << endl; - } catch (std::exception& e) { - error() << "Uncaught std::exception: " << e.what() << ", terminating" << endl; - quickExit(EXIT_UNCAUGHT); - } - mp->shutdown(); - - return NULL; - } -}; - - -MessageServer* createServer(const MessageServer::Options& opts, - std::shared_ptr<MessageHandler> handler, - ServiceContext* ctx) { - return new PortMessageServer(opts, std::move(handler), ctx); -} - -} // namespace mongo diff --git a/src/mongo/util/net/miniwebserver.cpp b/src/mongo/util/net/miniwebserver.cpp index ec64f174560..78f81f86134 100644 --- a/src/mongo/util/net/miniwebserver.cpp +++ b/src/mongo/util/net/miniwebserver.cpp @@ -204,7 +204,7 @@ void MiniWebServer::_accepted(const std::shared_ptr<Socket>& psock, long long co } } -void MiniWebServer::accepted(AbstractMessagingPort* mp) { +void MiniWebServer::accepted(std::unique_ptr<AbstractMessagingPort> mp) { MONGO_UNREACHABLE; } diff --git a/src/mongo/util/net/miniwebserver.h b/src/mongo/util/net/miniwebserver.h index a2d0690225e..122281555fc 100644 --- a/src/mongo/util/net/miniwebserver.h +++ b/src/mongo/util/net/miniwebserver.h @@ -69,7 +69,7 @@ public: } // This is not currently used for the MiniWebServer. See SERVER-24200 - void accepted(AbstractMessagingPort* mp) override; + void accepted(std::unique_ptr<AbstractMessagingPort> mp) override; private: void _accepted(const std::shared_ptr<Socket>& psocket, long long connectionId) override; diff --git a/src/mongo/util/net/sockaddr.cpp b/src/mongo/util/net/sockaddr.cpp index 00960f2fd97..5236a76892f 100644 --- a/src/mongo/util/net/sockaddr.cpp +++ b/src/mongo/util/net/sockaddr.cpp @@ -46,8 +46,8 @@ #endif #endif +#include "mongo/bson/util/builder.h" #include "mongo/util/log.h" -#include "mongo/util/mongoutils/str.h" #include "mongo/util/net/sock.h" namespace mongo { @@ -150,10 +150,19 @@ bool SockAddr::isLocalHost() const { } std::string SockAddr::toString(bool includePort) const { - std::string out = getAddr(); - if (includePort && getType() != AF_UNIX && getType() != AF_UNSPEC) - out += mongoutils::str::stream() << ':' << getPort(); - return out; + if (includePort && (getType() != AF_UNIX) && (getType() != AF_UNSPEC)) { + StringBuilder ss; + + if (getType() == AF_INET6) { + ss << '[' << getAddr() << "]:" << getPort(); + } else { + ss << getAddr() << ':' << getPort(); + } + + return ss.str(); + } else { + return getAddr(); + } } sa_family_t SockAddr::getType() const { diff --git a/src/mongo/util/ntservice_test.cpp b/src/mongo/util/ntservice_test.cpp index 0a0fa6bde64..ad5b7e7ed2a 100644 --- a/src/mongo/util/ntservice_test.cpp +++ b/src/mongo/util/ntservice_test.cpp @@ -34,7 +34,6 @@ #include <string> #include <vector> - #include "mongo/db/client.h" #include "mongo/unittest/unittest.h" #include "mongo/util/ntservice.h" diff --git a/src/mongo/util/tcmalloc_server_status_section.cpp b/src/mongo/util/tcmalloc_server_status_section.cpp index 7a8eb49e25b..303076ba784 100644 --- a/src/mongo/util/tcmalloc_server_status_section.cpp +++ b/src/mongo/util/tcmalloc_server_status_section.cpp @@ -38,6 +38,8 @@ #include "mongo/base/init.h" #include "mongo/db/commands/server_status.h" +#include "mongo/db/service_context.h" +#include "mongo/transport/transport_layer.h" #include "mongo/util/log.h" #include "mongo/util/net/listen.h" #include "mongo/util/net/thread_idle_callback.h" @@ -58,7 +60,8 @@ stdx::mutex tcmallocCleanupLock; * favorable times. Ideally would do some milder cleanup or scavenge... */ void threadStateChange() { - if (Listener::globalTicketHolder.used() <= kManyClients) + if (getGlobalServiceContext()->getTransportLayer()->sessionStats().numOpenSessions <= + kManyClients) return; #if MONGO_HAVE_GPERFTOOLS_GET_THREAD_CACHE_SIZE |