diff options
author | Samantha Ritter <samantha.ritter@10gen.com> | 2016-05-31 14:05:17 -0400 |
---|---|---|
committer | Jason Carey <jcarey@argv.me> | 2016-07-12 18:38:37 -0400 |
commit | c263ce1f95586f8652058e6202015a77f9becc49 (patch) | |
tree | d623fb9da9fd5da3cc4e20cac0653f1fa4af00eb /src/mongo | |
parent | dead3cf8b4b3cb5528ad1abb9eeb722b395e3632 (diff) | |
download | mongo-c263ce1f95586f8652058e6202015a77f9becc49.tar.gz |
SERVER-24162 Integrate TransportLayer
Expand the transport layer as needed to replace uses of abstract message port for ingress
networking.
Diffstat (limited to 'src/mongo')
84 files changed, 2186 insertions, 819 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript index a13e463dd2b..a1a1a609e46 100644 --- a/src/mongo/SConscript +++ b/src/mongo/SConscript @@ -254,6 +254,8 @@ mongodLibDeps = [ "db/repl/storage_interface_impl", "executor/network_interface_factory", 's/commands/shared_cluster_commands', + "transport/transport_layer_legacy", + "transport/service_entry_point_utils", "util/clock_sources", "util/ntservice", ] @@ -266,6 +268,7 @@ mongod = env.Program( source=[ "db/db.cpp", "db/mongod_options_init.cpp", + "db/service_entry_point_mongod.cpp", ], LIBDEPS=mongodLibDeps, ) @@ -297,6 +300,7 @@ env.Install( 's/client/sharding_connection_hook_for_mongos.cpp', 's/mongos_options_init.cpp', 's/server.cpp', + 's/service_entry_point_mongos.cpp', ], LIBDEPS=[ 'db/conn_pool_options', @@ -309,6 +313,8 @@ env.Install( 's/coreshard', 's/mongoscore', 's/sharding_initialization', + 'transport/service_entry_point_utils', + 'transport/transport_layer_legacy', 'util/clock_sources', 'util/ntservice', 'util/options_parser/options_parser_init', diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err index fa413cfd6dd..ab0fbc2c2f7 100644 --- a/src/mongo/base/error_codes.err +++ b/src/mongo/base/error_codes.err @@ -170,6 +170,7 @@ error_code("OperatorNotSupportedOnView", 168) error_code("CommandOnShardedViewNotSupportedOnMongos", 169) error_code("TooManyMatchingDocuments", 170) error_code("CannotIndexParallelArrays", 171) +error_code("TransportSessionNotFound", 172) # Non-sequential error codes (for compatibility only) error_code("SocketException", 9001) diff --git a/src/mongo/client/SConscript b/src/mongo/client/SConscript index 78601d0b1b6..3a0d63c4320 100644 --- a/src/mongo/client/SConscript +++ b/src/mongo/client/SConscript @@ -256,7 +256,7 @@ env.CppUnitTest( 'clientdriver', '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', '$BUILD_DIR/mongo/db/service_context_noop_init', - '$BUILD_DIR/mongo/util/net/message_server_port', + '$BUILD_DIR/mongo/transport/transport_layer_legacy', '$BUILD_DIR/mongo/util/net/network', ], ) diff --git a/src/mongo/client/scoped_db_conn_test.cpp b/src/mongo/client/scoped_db_conn_test.cpp index 9ee72a63953..ba54e4ec733 100644 --- a/src/mongo/client/scoped_db_conn_test.cpp +++ b/src/mongo/client/scoped_db_conn_test.cpp @@ -41,11 +41,14 @@ #include "mongo/rpc/request_interface.h" #include "mongo/stdx/mutex.h" #include "mongo/stdx/thread.h" +#include "mongo/transport/service_entry_point.h" +#include "mongo/transport/session.h" +#include "mongo/transport/transport_layer.h" +#include "mongo/transport/transport_layer_legacy.h" #include "mongo/unittest/unittest.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" #include "mongo/util/net/listen.h" -#include "mongo/util/net/message_server.h" #include "mongo/util/net/socket_exception.h" #include "mongo/util/quick_exit.h" #include "mongo/util/time_support.h" @@ -72,12 +75,30 @@ namespace { const string TARGET_HOST = "localhost:27017"; const int TARGET_PORT = 27017; -class DummyMessageHandler final : public MessageHandler { +class DummyServiceEntryPoint : public ServiceEntryPoint { + MONGO_DISALLOW_COPYING(DummyServiceEntryPoint); + public: - virtual void connected(AbstractMessagingPort* p) {} + DummyServiceEntryPoint() {} + + virtual ~DummyServiceEntryPoint() { + for (auto& t : _threads) { + t.join(); + } + } + + void startSession(transport::Session&& session) override { + _threads.emplace_back(&DummyServiceEntryPoint::run, this, std::move(session)); + } + +private: + void run(transport::Session&& session) { + Message inMessage; + if (!session.sourceMessage(&inMessage).wait().isOK()) { + return; + } - virtual void process(Message& m, AbstractMessagingPort* port) { - auto request = rpc::makeRequest(&m); + auto request = rpc::makeRequest(&inMessage); auto reply = rpc::makeReplyBuilder(request->getProtocol()); BSONObjBuilder commandResponse; @@ -92,10 +113,14 @@ public: .setMetadata(rpc::makeEmptyMetadata()) .done(); - port->reply(m, response); + response.header().setResponseToMsgId(inMessage.header().getId()); + + if (!session.sinkMessage(response).wait().isOK()) { + return; + } } - virtual void close() {} + std::vector<stdx::thread> _threads; }; // TODO: Take this out and make it as a reusable class in a header file. The only @@ -130,15 +155,15 @@ public: * @param messageHandler the message handler to use for this server. Ownership * of this object is passed to this server. */ - void run(std::shared_ptr<MessageHandler> messsageHandler) { - if (_server != NULL) { + void run(std::shared_ptr<ServiceEntryPoint> serviceEntryPoint) { + if (_server) { return; } - MessageServer::Options options; + transport::TransportLayerLegacy::Options options; options.port = _port; - _server.reset(createServer(options, std::move(messsageHandler), getGlobalServiceContext())); + _server = stdx::make_unique<transport::TransportLayerLegacy>(options, serviceEntryPoint); _serverThread = stdx::thread(runServer, _server.get()); } @@ -164,23 +189,23 @@ public: sleepmillis(500); connCount = Listener::globalTicketHolder.used(); } - + _server->shutdown(); _server.reset(); } /** * Helper method for running the server on a separate thread. */ - static void runServer(MessageServer* server) { - server->setupSockets(); - server->run(); + static void runServer(transport::TransportLayerLegacy* server) { + server->setup(); + server->start(); } private: const int _port; stdx::thread _serverThread; - unique_ptr<MessageServer> _server; + unique_ptr<transport::TransportLayerLegacy> _server; }; /** @@ -190,9 +215,9 @@ class DummyServerFixture : public unittest::Test { public: void setUp() { _maxPoolSizePerHost = globalConnPool.getMaxPoolSize(); - _dummyServer = new DummyServer(TARGET_PORT); + _dummyServer = stdx::make_unique<DummyServer>(TARGET_PORT); - auto dummyHandler = std::make_shared<DummyMessageHandler>(); + auto dummyHandler = std::make_shared<DummyServiceEntryPoint>(); _dummyServer->run(std::move(dummyHandler)); DBClientConnection conn; Timer timer; @@ -212,7 +237,7 @@ public: void tearDown() { ScopedDbConnection::clearPool(); - delete _dummyServer; + _dummyServer.reset(); globalConnPool.setMaxPoolSize(_maxPoolSizePerHost); } @@ -272,12 +297,12 @@ protected: } private: - static void runServer(MessageServer* server) { - server->setupSockets(); - server->run(); + static void runServer(transport::TransportLayerLegacy* server) { + server->setup(); + server->start(); } - DummyServer* _dummyServer; + std::unique_ptr<DummyServer> _dummyServer; uint32_t _maxPoolSizePerHost; }; diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 92f97280964..ba8c1e855bd 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -100,6 +100,7 @@ env.Library( "dbmessage.cpp", ], LIBDEPS=[ + '$BUILD_DIR/mongo/transport/transport_layer_common', '$BUILD_DIR/mongo/util/net/network', ] ) @@ -402,7 +403,6 @@ env.Library( "$BUILD_DIR/mongo/rpc/command_reply", "$BUILD_DIR/mongo/rpc/command_request", "$BUILD_DIR/mongo/rpc/metadata", - "$BUILD_DIR/mongo/util/net/message_server_port", "$BUILD_DIR/mongo/util/net/miniwebserver", "$BUILD_DIR/mongo/util/processinfo", "$BUILD_DIR/mongo/util/signal_handlers", @@ -473,6 +473,8 @@ env.Library( '$BUILD_DIR/mongo/util/decorable', '$BUILD_DIR/mongo/util/fail_point', '$BUILD_DIR/mongo/util/net/hostandport', + '$BUILD_DIR/mongo/transport/transport_layer_common', + '$BUILD_DIR/mongo/transport/transport_layer_manager', ], ) diff --git a/src/mongo/db/auth/sasl_commands.cpp b/src/mongo/db/auth/sasl_commands.cpp index f006cded7d0..0fefd3b6cc9 100644 --- a/src/mongo/db/auth/sasl_commands.cpp +++ b/src/mongo/db/auth/sasl_commands.cpp @@ -187,10 +187,10 @@ Status doSaslStep(const ClientBasic* client, status = session->step(payload, &responsePayload); if (!status.isOK()) { - const SockAddr clientAddr = client->port()->remoteAddr(); log() << session->getMechanism() << " authentication failed for " << session->getPrincipalId() << " on " << session->getAuthenticationDatabase() - << " from client " << clientAddr.getAddr() << " ; " << status.toString() << std::endl; + << " from client " << client->getRemote().toString() << " ; " << status.toString() + << std::endl; sleepmillis(saslGlobalParams.authFailedDelay); // All the client needs to know is that authentication has failed. diff --git a/src/mongo/db/client.cpp b/src/mongo/db/client.cpp index 807a7650aa5..2a665268811 100644 --- a/src/mongo/db/client.cpp +++ b/src/mongo/db/client.cpp @@ -44,6 +44,7 @@ #include "mongo/db/lasterror.h" #include "mongo/db/service_context.h" #include "mongo/stdx/thread.h" +#include "mongo/transport/session.h" #include "mongo/util/concurrency/thread_name.h" #include "mongo/util/exit.h" #include "mongo/util/mongoutils/str.h" @@ -63,16 +64,16 @@ void Client::initThreadIfNotAlready() { initThreadIfNotAlready(getThreadName().c_str()); } -void Client::initThread(const char* desc, AbstractMessagingPort* mp) { - initThread(desc, getGlobalServiceContext(), mp); +void Client::initThread(const char* desc, transport::Session* session) { + initThread(desc, getGlobalServiceContext(), session); } -void Client::initThread(const char* desc, ServiceContext* service, AbstractMessagingPort* mp) { +void Client::initThread(const char* desc, ServiceContext* service, transport::Session* session) { invariant(currentClient.getMake()->get() == nullptr); std::string fullDesc; - if (mp != NULL) { - fullDesc = str::stream() << desc << mp->connectionId(); + if (session) { + fullDesc = str::stream() << desc << session->id(); } else { fullDesc = desc; } @@ -80,7 +81,7 @@ void Client::initThread(const char* desc, ServiceContext* service, AbstractMessa setThreadName(fullDesc.c_str()); // Create the client obj, attach to thread - *currentClient.get() = service->makeClient(fullDesc, mp); + *currentClient.get() = service->makeClient(fullDesc, session); } void Client::destroy() { @@ -98,11 +99,11 @@ int64_t generateSeed(const std::string& desc) { } } // namespace -Client::Client(std::string desc, ServiceContext* serviceContext, AbstractMessagingPort* p) - : ClientBasic(serviceContext, p), +Client::Client(std::string desc, ServiceContext* serviceContext, transport::Session* session) + : ClientBasic(serviceContext, session), _desc(std::move(desc)), _threadId(stdx::this_thread::get_id()), - _connectionId(p ? p->connectionId() : 0), + _connectionId(session ? session->id() : 0), _prng(generateSeed(_desc)) {} void Client::reportState(BSONObjBuilder& builder) { diff --git a/src/mongo/db/client.h b/src/mongo/db/client.h index 407a0bac868..a7523981e47 100644 --- a/src/mongo/db/client.h +++ b/src/mongo/db/client.h @@ -51,6 +51,10 @@ class AbstractMessagingPort; class Collection; class OperationContext; +namespace transport { +class Session; +} // namespace transport + typedef long long ConnectionId; /** the database's concept of an outside "client" */ @@ -59,16 +63,16 @@ public: /** * Creates a Client object and stores it in TLS for the current thread. * - * An unowned pointer to a AbstractMessagingPort may optionally be provided. If 'mp' is - * non-null, then it will be used to augment the thread name, and for reporting purposes. + * An unowned pointer to a transport::Session may optionally be provided. If 'session' + * is non-null, then it will be used to augment the thread name, and for reporting purposes. * - * If provided, 'mp' must outlive the newly-created Client object. Client::destroy() may be used - * to help enforce that the Client does not outlive 'mp'. + * If provided, 'session' must outlive the newly-created Client object. Client::destroy() may + * be used to help enforce that the Client does not outlive 'session.' */ - static void initThread(const char* desc, AbstractMessagingPort* mp = 0); + static void initThread(const char* desc, transport::Session* session = nullptr); static void initThread(const char* desc, ServiceContext* serviceContext, - AbstractMessagingPort* mp); + transport::Session* session); /** * Inits a thread if that thread has not already been init'd, setting the thread name to @@ -159,7 +163,7 @@ public: private: friend class ServiceContext; - Client(std::string desc, ServiceContext* serviceContext, AbstractMessagingPort* p = 0); + Client(std::string desc, ServiceContext* serviceContext, transport::Session* session = nullptr); // Description for the client (e.g. conn8) diff --git a/src/mongo/db/client_basic.cpp b/src/mongo/db/client_basic.cpp index 06e619597c3..578ebdf668d 100644 --- a/src/mongo/db/client_basic.cpp +++ b/src/mongo/db/client_basic.cpp @@ -32,8 +32,8 @@ namespace mongo { -ClientBasic::ClientBasic(ServiceContext* serviceContext, AbstractMessagingPort* messagingPort) - : _serviceContext(serviceContext), _messagingPort(messagingPort) {} +ClientBasic::ClientBasic(ServiceContext* serviceContext, transport::Session* session) + : _serviceContext(serviceContext), _session(session) {} ClientBasic::~ClientBasic() = default; diff --git a/src/mongo/db/client_basic.h b/src/mongo/db/client_basic.h index 88d7ef4afe8..09f31b5da42 100644 --- a/src/mongo/db/client_basic.h +++ b/src/mongo/db/client_basic.h @@ -31,6 +31,7 @@ #include <memory> #include "mongo/base/disallow_copying.h" +#include "mongo/transport/session.h" #include "mongo/util/decorable.h" #include "mongo/util/net/abstract_message_port.h" #include "mongo/util/net/hostandport.h" @@ -58,11 +59,12 @@ public: } bool hasRemote() const { - return _messagingPort; + return _session; } + HostAndPort getRemote() const { - verify(_messagingPort); - return _messagingPort->remote(); + verify(_session); + return _session->remote(); } /** @@ -73,20 +75,20 @@ public: } /** - * Returns the AbstractMessagePort to which this client session is bound, if any. + * Returns the Session to which this client is bound, if any. */ - AbstractMessagingPort* port() const { - return _messagingPort; + transport::Session* session() const { + return _session; } static ClientBasic* getCurrent(); protected: - ClientBasic(ServiceContext* serviceContext, AbstractMessagingPort* messagingPort); + ClientBasic(ServiceContext* serviceContext, transport::Session* session); ~ClientBasic(); private: ServiceContext* const _serviceContext; - AbstractMessagingPort* const _messagingPort; + transport::Session* const _session; }; } diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript index 919510c4b79..87c472dc519 100644 --- a/src/mongo/db/commands/SConscript +++ b/src/mongo/db/commands/SConscript @@ -71,6 +71,7 @@ env.Library( '$BUILD_DIR/mongo/s/coreshard', '$BUILD_DIR/mongo/s/write_ops/batch_write_types', '$BUILD_DIR/mongo/scripting/scripting_common', + '$BUILD_DIR/mongo/transport/transport_layer_common', '$BUILD_DIR/mongo/util/cmdline_utils/cmdline_utils', '$BUILD_DIR/mongo/util/foundation', '$BUILD_DIR/mongo/util/ntservice', diff --git a/src/mongo/db/commands/authentication_commands.cpp b/src/mongo/db/commands/authentication_commands.cpp index 6fc6268a807..2f05971ef56 100644 --- a/src/mongo/db/commands/authentication_commands.cpp +++ b/src/mongo/db/commands/authentication_commands.cpp @@ -56,6 +56,7 @@ #include "mongo/db/server_options.h" #include "mongo/platform/random.h" #include "mongo/stdx/memory.h" +#include "mongo/transport/session.h" #include "mongo/util/concurrency/mutex.h" #include "mongo/util/log.h" #include "mongo/util/md5.hpp" @@ -314,17 +315,17 @@ Status CmdAuthenticate::_authenticateX509(OperationContext* txn, ClientBasic* client = ClientBasic::getCurrent(); AuthorizationSession* authorizationSession = AuthorizationSession::get(client); - std::string clientSubjectName = client->port()->getX509SubjectName(); + auto clientName = client->session()->getX509SubjectName(); if (!getSSLManager()->getSSLConfiguration().hasCA) { return Status(ErrorCodes::AuthenticationFailed, "Unable to verify x.509 certificate, as no CA has been provided."); - } else if (user.getUser() != clientSubjectName) { + } else if (user.getUser() != clientName) { return Status(ErrorCodes::AuthenticationFailed, "There is no x.509 client certificate matching the user."); } else { // Handle internal cluster member auth, only applies to server-server connections - if (getSSLManager()->getSSLConfiguration().isClusterMember(clientSubjectName)) { + if (getSSLManager()->getSSLConfiguration().isClusterMember(clientName)) { int clusterAuthMode = serverGlobalParams.clusterAuthMode.load(); if (clusterAuthMode == ServerGlobalParams::ClusterAuthMode_undefined || clusterAuthMode == ServerGlobalParams::ClusterAuthMode_keyFile) { diff --git a/src/mongo/db/commands/server_status.cpp b/src/mongo/db/commands/server_status.cpp index bbe7e9930a7..e567a15b18b 100644 --- a/src/mongo/db/commands/server_status.cpp +++ b/src/mongo/db/commands/server_status.cpp @@ -47,9 +47,9 @@ #include "mongo/db/service_context.h" #include "mongo/db/stats/counters.h" #include "mongo/platform/process_id.h" +#include "mongo/transport/transport_layer.h" #include "mongo/util/log.h" #include "mongo/util/net/hostname_canonicalization_worker.h" -#include "mongo/util/net/listen.h" #include "mongo/util/net/ssl_manager.h" #include "mongo/util/processinfo.h" #include "mongo/util/ramlog.h" @@ -228,9 +228,10 @@ public: BSONObj generateSection(OperationContext* txn, const BSONElement& configElement) const { BSONObjBuilder bb; - bb.append("current", Listener::globalTicketHolder.used()); - bb.append("available", Listener::globalTicketHolder.available()); - bb.append("totalCreated", Listener::globalConnectionNumber.load()); + auto stats = txn->getServiceContext()->getTransportLayer()->sessionStats(); + bb.append("current", static_cast<int>(stats.numOpenSessions)); + bb.append("available", static_cast<int>(stats.numAvailableSessions)); + bb.append("totalCreated", static_cast<int>(stats.numCreatedSessions)); return bb.obj(); } diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index a10b0be5a39..56dff7cecaf 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -96,6 +96,7 @@ #include "mongo/db/service_context.h" #include "mongo/db/service_context_d.h" #include "mongo/db/service_context_d.h" +#include "mongo/db/service_entry_point_mongod.h" #include "mongo/db/startup_warnings_mongod.h" #include "mongo/db/stats/counters.h" #include "mongo/db/stats/snapshots.h" @@ -114,6 +115,7 @@ #include "mongo/stdx/future.h" #include "mongo/stdx/memory.h" #include "mongo/stdx/thread.h" +#include "mongo/transport/transport_layer_legacy.h" #include "mongo/util/assert_util.h" #include "mongo/util/cmdline_utils/censor_cmdline.h" #include "mongo/util/concurrency/task.h" @@ -124,7 +126,6 @@ #include "mongo/util/log.h" #include "mongo/util/net/hostname_canonicalization_worker.h" #include "mongo/util/net/listen.h" -#include "mongo/util/net/message_server.h" #include "mongo/util/net/ssl_manager.h" #include "mongo/util/ntservice.h" #include "mongo/util/options_parser/startup_options.h" @@ -169,65 +170,6 @@ ntservice::NtServiceDefaultStrings defaultServiceStrings = { Timer startupSrandTimer; -class MyMessageHandler : public MessageHandler { -public: - virtual void connected(AbstractMessagingPort* p) { - Client::initThread("conn", p); - } - - virtual void process(Message& m, AbstractMessagingPort* port) { - while (true) { - if (inShutdown()) { - log() << "got request after shutdown()" << endl; - break; - } - - DbResponse dbresponse; - { - auto opCtx = getGlobalServiceContext()->makeOperationContext(&cc()); - assembleResponse(opCtx.get(), m, dbresponse, port->remote()); - - // opCtx must go out of scope here so that the operation cannot show up in currentOp - // results after the response reaches the client - } - - if (!dbresponse.response.empty()) { - port->reply(m, dbresponse.response, dbresponse.responseToMsgId); - if (dbresponse.exhaustNS.size() > 0) { - MsgData::View header = dbresponse.response.header(); - QueryResult::View qr = header.view2ptr(); - long long cursorid = qr.getCursorId(); - if (cursorid) { - verify(dbresponse.exhaustNS.size() && dbresponse.exhaustNS[0]); - string ns = dbresponse.exhaustNS; // before reset() free's it... - m.reset(); - BufBuilder b(512); - b.appendNum((int)0 /*size set later*/); - b.appendNum(header.getId()); - b.appendNum(header.getResponseToMsgId()); - b.appendNum((int)dbGetMore); - b.appendNum((int)0); - b.appendStr(ns); - b.appendNum((int)0); // ntoreturn - b.appendNum(cursorid); - - MsgData::View header = b.buf(); - header.setLen(b.len()); - m.setData(b.release()); - DEV log() << "exhaust=true sending more"; - continue; // this goes back to top loop - } - } - } - break; - } - } - - virtual void close() { - Client::destroy(); - } -}; - static void logStartup(OperationContext* txn) { BSONObjBuilder toLog; stringstream id; @@ -530,7 +472,7 @@ static void _initWireSpec() { } -static void _initAndListen(int listenPort) { +static ExitCode _initAndListen(int listenPort) { Client::initThread("initandlisten"); _initWireSpec(); @@ -570,20 +512,18 @@ static void _initAndListen(int listenPort) { checked_cast<ServiceContextMongoD*>(getGlobalServiceContext())->createLockFile(); - // Due to SERVER-15389, we must setupSockets first thing at startup in order to avoid - // obtaining too high a file descriptor for our calls to select(). - MessageServer::Options options; + transport::TransportLayerLegacy::Options options; options.port = listenPort; options.ipList = serverGlobalParams.bind_ip; - auto handler = std::make_shared<MyMessageHandler>(); - MessageServer* server = createServer(options, std::move(handler), getGlobalServiceContext()); - - // This is what actually creates the sockets, but does not yet listen on them because we - // do not want connections to just hang if recovery takes a very long time. - if (!server->setupSockets()) { - error() << "Failed to set up sockets during startup."; - return; + // Create, start, and attach the TL + auto transportLayer = stdx::make_unique<transport::TransportLayerLegacy>( + options, + std::make_shared<ServiceEntryPointMongod>(getGlobalServiceContext()->getTransportLayer())); + auto res = transportLayer->setup(); + if (!res.isOK()) { + error() << "Failed to set up listener: " << res.toString(); + return EXIT_NET_ERROR; } std::shared_ptr<DbWebServer> dbWebServer; @@ -594,7 +534,7 @@ static void _initAndListen(int listenPort) { new RestAdminAccess())); if (!dbWebServer->setupSockets()) { error() << "Failed to set up sockets for HTTP interface during startup."; - return; + return EXIT_NET_ERROR; } } @@ -674,7 +614,7 @@ static void _initAndListen(int listenPort) { } if (mmapv1GlobalOptions.journalOptions & MMAPV1Options::JournalRecoverOnly) - return; + return EXIT_NET_ERROR; if (mongodGlobalParams.scriptingEnabled) { ScriptEngine::setup(); @@ -799,14 +739,19 @@ static void _initAndListen(int listenPort) { // MessageServer::run will return when exit code closes its socket and we don't need the // operation context anymore startupOpCtx.reset(); - server->run(); + + auto start = getGlobalServiceContext()->addAndStartTransportLayer(std::move(transportLayer)); + if (!start.isOK()) { + error() << "Failed to start the listener: " << start.toString(); + return EXIT_NET_ERROR; + } + + return waitForShutdown(); } ExitCode initAndListen(int listenPort) { try { - _initAndListen(listenPort); - - return inShutdown() ? EXIT_CLEAN : EXIT_NET_ERROR; + return _initAndListen(listenPort); } catch (DBException& e) { log() << "exception in initAndListen: " << e.toString() << ", terminating" << endl; return EXIT_UNCAUGHT; @@ -1001,7 +946,6 @@ static void reportEventToSystemImpl(const char* msg) { // registerShutdownTask is called below. It must not depend on the // prior execution of mongo initializers or the existence of threads. static void shutdownTask() { - auto serviceContext = getGlobalServiceContext(); Client::initThreadIfNotAlready(); @@ -1014,6 +958,7 @@ static void shutdownTask() { txn = uniqueTxn.get(); } + getGlobalServiceContext()->getTransportLayer()->shutdown(); log(LogComponent::kNetwork) << "shutdown: going to close listening sockets..." << endl; ListeningSockets::get()->closeAll(); @@ -1037,12 +982,6 @@ static void shutdownTask() { // will do it for us when the process terminates. stdx::packaged_task<void()> dryOutTask([] { - - // Walk all open sockets and close them. This should unblock any that are sitting in - // recv. Other sockets on which there are active operations should see the inShutdown flag - // as true when they complete the operation. - Listener::closeMessagingPorts(0); - // There isn't currently a way to wait on the TicketHolder to have all its tickets back, // unfortunately. So, busy wait in this detached thread. while (true) { diff --git a/src/mongo/db/dbmessage.cpp b/src/mongo/db/dbmessage.cpp index 67a5548dbfb..f16ec2b6276 100644 --- a/src/mongo/db/dbmessage.cpp +++ b/src/mongo/db/dbmessage.cpp @@ -30,7 +30,9 @@ #include "mongo/platform/basic.h" #include "mongo/db/dbmessage.h" +#include "mongo/db/operation_context.h" #include "mongo/platform/strnlen.h" +#include "mongo/transport/session.h" namespace mongo { @@ -173,20 +175,23 @@ OpQueryReplyBuilder::OpQueryReplyBuilder() : _buffer(32768) { _buffer.skip(sizeof(QueryResult::Value)); } -void OpQueryReplyBuilder::send(AbstractMessagingPort* destination, +void OpQueryReplyBuilder::send(transport::Session* session, int queryResultFlags, - Message& requestMsg, + const Message& requestMsg, int nReturned, int startingFrom, long long cursorId) { Message response; putInMessage(&response, queryResultFlags, nReturned, startingFrom, cursorId); - destination->reply(requestMsg, response, requestMsg.header().getId()); + + response.header().setId(nextMessageId()); + response.header().setResponseToMsgId(requestMsg.header().getId()); + + uassertStatusOK(session->sinkMessage(response).wait()); } -void OpQueryReplyBuilder::sendCommandReply(AbstractMessagingPort* destination, - Message& requestMsg) { - send(destination, /*queryFlags*/ 0, requestMsg, /*nReturned*/ 1); +void OpQueryReplyBuilder::sendCommandReply(transport::Session* session, const Message& requestMsg) { + send(session, /*queryFlags*/ 0, requestMsg, /*nReturned*/ 1); } void OpQueryReplyBuilder::putInMessage( @@ -202,7 +207,7 @@ void OpQueryReplyBuilder::putInMessage( } void replyToQuery(int queryResultFlags, - AbstractMessagingPort* p, + transport::Session* session, Message& requestMsg, const void* data, int size, @@ -211,15 +216,19 @@ void replyToQuery(int queryResultFlags, long long cursorId) { OpQueryReplyBuilder reply; reply.bufBuilderForResults().appendBuf(data, size); - reply.send(p, queryResultFlags, requestMsg, nReturned, startingFrom, cursorId); + reply.send(session, queryResultFlags, requestMsg, nReturned, startingFrom, cursorId); } void replyToQuery(int queryResultFlags, - AbstractMessagingPort* p, + transport::Session* session, Message& requestMsg, const BSONObj& responseObj) { - replyToQuery( - queryResultFlags, p, requestMsg, (void*)responseObj.objdata(), responseObj.objsize(), 1); + replyToQuery(queryResultFlags, + session, + requestMsg, + (void*)responseObj.objdata(), + responseObj.objsize(), + 1); } void replyToQuery(int queryResultFlags, Message& m, DbResponse& dbresponse, BSONObj obj) { diff --git a/src/mongo/db/dbmessage.h b/src/mongo/db/dbmessage.h index 7f1a3a2d0f2..7fabf117421 100644 --- a/src/mongo/db/dbmessage.h +++ b/src/mongo/db/dbmessage.h @@ -39,6 +39,12 @@ namespace mongo { +class OperationContext; + +namespace transport { +class Session; +} // namespace transport + /* db response format Query or GetMore: // see struct QueryResult @@ -352,9 +358,9 @@ public: /** * Finishes the reply and sends the message out to 'destination'. */ - void send(AbstractMessagingPort* destination, + void send(transport::Session* session, int queryResultFlags, - Message& requestMsg, // should be const but MessagePort::reply takes non-const. + const Message& requestMsg, int nReturned, int startingFrom = 0, long long cursorId = 0); @@ -362,14 +368,14 @@ public: /** * Similar to send() but used for replying to a command. */ - void sendCommandReply(AbstractMessagingPort* destination, Message& requestMsg); + void sendCommandReply(transport::Session* session, const Message& requestMsg); private: BufBuilder _buffer; }; void replyToQuery(int queryResultFlags, - AbstractMessagingPort* p, + transport::Session* session, Message& requestMsg, const void* data, int size, @@ -377,10 +383,9 @@ void replyToQuery(int queryResultFlags, int startingFrom = 0, long long cursorId = 0); - /* object reply helper. */ void replyToQuery(int queryResultFlags, - AbstractMessagingPort* p, + transport::Session* session, Message& requestMsg, const BSONObj& responseObj); diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index b6a2981dde0..b92ec300cf8 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -398,6 +398,7 @@ env.Library('repl_coordinator_impl', '$BUILD_DIR/mongo/db/service_context', '$BUILD_DIR/mongo/rpc/command_status', '$BUILD_DIR/mongo/rpc/metadata', + '$BUILD_DIR/mongo/transport/transport_layer_common', '$BUILD_DIR/mongo/util/fail_point', 'collection_cloner', 'data_replicator', @@ -627,6 +628,7 @@ env.Library('replica_set_messages', '$BUILD_DIR/mongo/client/connection_string', '$BUILD_DIR/mongo/db/common', '$BUILD_DIR/mongo/db/server_options_core', + '$BUILD_DIR/mongo/transport/transport_layer_common', '$BUILD_DIR/mongo/util/net/hostandport', 'optime', 'read_concern_args', diff --git a/src/mongo/db/repl/repl_set_request_votes.cpp b/src/mongo/db/repl/repl_set_request_votes.cpp index 4b34c351769..c01628466c0 100644 --- a/src/mongo/db/repl/repl_set_request_votes.cpp +++ b/src/mongo/db/repl/repl_set_request_votes.cpp @@ -35,6 +35,8 @@ #include "mongo/db/repl/repl_set_request_votes_args.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/executor/network_interface.h" +#include "mongo/transport/session.h" +#include "mongo/transport/transport_layer.h" #include "mongo/util/scopeguard.h" namespace mongo { @@ -64,16 +66,17 @@ private: // We want to keep request vote connection open when relinquishing primary. // Tag it here. - unsigned originalTag = 0; - AbstractMessagingPort* mp = txn->getClient()->port(); - if (mp) { - originalTag = mp->getTag(); - mp->setTag(originalTag | executor::NetworkInterface::kMessagingPortKeepOpen); + transport::Session::TagMask originalTag = 0; + transport::Session* session = txn->getClient()->session(); + if (session) { + originalTag = session->getTags(); + session->replaceTags(originalTag | transport::Session::kKeepOpen); } + // Untag the connection on exit. - ON_BLOCK_EXIT([mp, originalTag]() { - if (mp) { - mp->setTag(originalTag); + ON_BLOCK_EXIT([session, originalTag]() { + if (session) { + session->replaceTags(originalTag); } }); diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h index 582b8f45970..245b37cca7f 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state.h +++ b/src/mongo/db/repl/replication_coordinator_external_state.h @@ -192,8 +192,8 @@ public: virtual HostAndPort getClientHostAndPort(const OperationContext* txn) = 0; /** - * Closes all connections except those marked with the keepOpen property, which should - * just be connections used for heartbeating. + * Closes all connections in the given TransportLayer except those marked with the + * keepOpen property, which should just be connections used for heartbeating. * This is used during stepdown, and transition out of primary. */ virtual void closeConnections() = 0; diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 81ed9c16bf3..f140580fbcc 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -76,6 +76,8 @@ #include "mongo/stdx/functional.h" #include "mongo/stdx/memory.h" #include "mongo/stdx/thread.h" +#include "mongo/transport/session.h" +#include "mongo/transport/transport_layer.h" #include "mongo/util/assert_util.h" #include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/exit.h" @@ -449,7 +451,7 @@ HostAndPort ReplicationCoordinatorExternalStateImpl::getClientHostAndPort( } void ReplicationCoordinatorExternalStateImpl::closeConnections() { - Listener::closeMessagingPorts(executor::NetworkInterface::kMessagingPortKeepOpen); + getGlobalServiceContext()->getTransportLayer()->endAllSessions(transport::Session::kKeepOpen); } void ReplicationCoordinatorExternalStateImpl::killAllUserOperations(OperationContext* txn) { diff --git a/src/mongo/db/repl/replication_info.cpp b/src/mongo/db/repl/replication_info.cpp index cc844eaf35c..fd36395c017 100644 --- a/src/mongo/db/repl/replication_info.cpp +++ b/src/mongo/db/repl/replication_info.cpp @@ -234,9 +234,10 @@ public: // Tag connections to avoid closing them on stepdown. auto hangUpElement = cmdObj["hangUpOnStepDown"]; if (!hangUpElement.eoo() && !hangUpElement.trueValue()) { - AbstractMessagingPort* mp = txn->getClient()->port(); - if (mp) { - mp->setTag(mp->getTag() | executor::NetworkInterface::kMessagingPortKeepOpen); + auto session = txn->getClient()->session(); + if (session) { + session->replaceTags(session->getTags() | + executor::NetworkInterface::kMessagingPortKeepOpen); } } appendReplicationInfo(txn, result, 0); diff --git a/src/mongo/db/repl/replset_commands.cpp b/src/mongo/db/repl/replset_commands.cpp index 1b96dac313f..f591d64c11b 100644 --- a/src/mongo/db/repl/replset_commands.cpp +++ b/src/mongo/db/repl/replset_commands.cpp @@ -59,6 +59,8 @@ #include "mongo/db/service_context.h" #include "mongo/db/storage/storage_engine.h" #include "mongo/executor/network_interface.h" +#include "mongo/transport/session.h" +#include "mongo/transport/transport_layer.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" @@ -726,17 +728,17 @@ public: /* we want to keep heartbeat connections open when relinquishing primary. tag them here. */ - AbstractMessagingPort* mp = txn->getClient()->port(); - unsigned originalTag = 0; - if (mp) { - originalTag = mp->getTag(); - mp->setTag(originalTag | executor::NetworkInterface::kMessagingPortKeepOpen); + transport::Session::TagMask originalTag = 0; + transport::Session* session = txn->getClient()->session(); + if (session) { + originalTag = session->getTags(); + session->replaceTags(originalTag | transport::Session::kKeepOpen); } // Unset the tag on block exit - ON_BLOCK_EXIT([mp, originalTag]() { - if (mp) { - mp->setTag(originalTag); + ON_BLOCK_EXIT([session, originalTag]() { + if (session) { + session->replaceTags(originalTag); } }); diff --git a/src/mongo/db/s/collection_metadata_test.cpp b/src/mongo/db/s/collection_metadata_test.cpp index 013c135cc84..359cb3b4fc1 100644 --- a/src/mongo/db/s/collection_metadata_test.cpp +++ b/src/mongo/db/s/collection_metadata_test.cpp @@ -50,7 +50,7 @@ class NoChunkFixture : public ShardingCatalogTestFixture { protected: void setUp() { ShardingCatalogTestFixture::setUp(); - getMessagingPort()->setRemote(HostAndPort("FakeRemoteClient:34567")); + setRemote(HostAndPort("FakeRemoteClient:34567")); configTargeter()->setFindHostReturnValue(configHost); OID epoch = OID::gen(); @@ -318,7 +318,7 @@ class SingleChunkFixture : public ShardingCatalogTestFixture { protected: void setUp() { ShardingCatalogTestFixture::setUp(); - getMessagingPort()->setRemote(HostAndPort("FakeRemoteClient:34567")); + setRemote(HostAndPort("FakeRemoteClient:34567")); configTargeter()->setFindHostReturnValue(configHost); OID epoch = OID::gen(); @@ -574,7 +574,7 @@ class SingleChunkMinMaxCompoundKeyFixture : public ShardingCatalogTestFixture { protected: void setUp() { ShardingCatalogTestFixture::setUp(); - getMessagingPort()->setRemote(HostAndPort("FakeRemoteClient:34567")); + setRemote(HostAndPort("FakeRemoteClient:34567")); configTargeter()->setFindHostReturnValue(configHost); OID epoch = OID::gen(); @@ -642,7 +642,7 @@ class TwoChunksWithGapCompoundKeyFixture : public ShardingCatalogTestFixture { protected: void setUp() { ShardingCatalogTestFixture::setUp(); - getMessagingPort()->setRemote(HostAndPort("FakeRemoteClient:34567")); + setRemote(HostAndPort("FakeRemoteClient:34567")); configTargeter()->setFindHostReturnValue(configHost); OID epoch = OID::gen(); @@ -839,7 +839,7 @@ class ThreeChunkWithRangeGapFixture : public ShardingCatalogTestFixture { protected: void setUp() { ShardingCatalogTestFixture::setUp(); - getMessagingPort()->setRemote(HostAndPort("FakeRemoteClient:34567")); + setRemote(HostAndPort("FakeRemoteClient:34567")); configTargeter()->setFindHostReturnValue(configHost); OID epoch = OID::gen(); diff --git a/src/mongo/db/s/metadata_loader_test.cpp b/src/mongo/db/s/metadata_loader_test.cpp index c96b77a1590..e33e3142d1a 100644 --- a/src/mongo/db/s/metadata_loader_test.cpp +++ b/src/mongo/db/s/metadata_loader_test.cpp @@ -51,7 +51,7 @@ public: protected: void setUp() override { ShardingCatalogTestFixture::setUp(); - getMessagingPort()->setRemote(HostAndPort("FakeRemoteClient:34567")); + setRemote(HostAndPort("FakeRemoteClient:34567")); configTargeter()->setFindHostReturnValue(configHost); _maxCollVersion = ChunkVersion(1, 0, OID::gen()); _loader.reset(new MetadataLoader); diff --git a/src/mongo/db/service_context.cpp b/src/mongo/db/service_context.cpp index 03dc9bcaf8d..648fd546721 100644 --- a/src/mongo/db/service_context.cpp +++ b/src/mongo/db/service_context.cpp @@ -34,6 +34,9 @@ #include "mongo/db/client.h" #include "mongo/db/operation_context.h" #include "mongo/stdx/memory.h" +#include "mongo/transport/session.h" +#include "mongo/transport/transport_layer.h" +#include "mongo/transport/transport_layer_manager.h" #include "mongo/util/assert_util.h" #include "mongo/util/mongoutils/str.h" #include "mongo/util/system_clock_source.h" @@ -115,7 +118,8 @@ Status validateStorageOptions( } ServiceContext::ServiceContext() - : _tickSource(stdx::make_unique<SystemTickSource>()), + : _transportLayerManager(stdx::make_unique<transport::TransportLayerManager>()), + _tickSource(stdx::make_unique<SystemTickSource>()), _fastClockSource(stdx::make_unique<SystemClockSource>()), _preciseClockSource(stdx::make_unique<SystemClockSource>()) {} @@ -125,8 +129,8 @@ ServiceContext::~ServiceContext() { } ServiceContext::UniqueClient ServiceContext::makeClient(std::string desc, - AbstractMessagingPort* p) { - std::unique_ptr<Client> client(new Client(std::move(desc), this, p)); + transport::Session* session) { + std::unique_ptr<Client> client(new Client(std::move(desc), this, session)); auto observer = _clientObservers.cbegin(); try { for (; observer != _clientObservers.cend(); ++observer) { @@ -150,6 +154,14 @@ ServiceContext::UniqueClient ServiceContext::makeClient(std::string desc, return UniqueClient(client.release()); } +transport::TransportLayer* ServiceContext::getTransportLayer() const { + return _transportLayerManager.get(); +} + +Status ServiceContext::addAndStartTransportLayer(std::unique_ptr<transport::TransportLayer> tl) { + return _transportLayerManager->addAndStartTransportLayer(std::move(tl)); +} + TickSource* ServiceContext::getTickSource() const { return _tickSource.get(); } diff --git a/src/mongo/db/service_context.h b/src/mongo/db/service_context.h index fe6a4432d70..174a52ebe52 100644 --- a/src/mongo/db/service_context.h +++ b/src/mongo/db/service_context.h @@ -48,6 +48,12 @@ class Client; class OperationContext; class OpObserver; +namespace transport { +class Session; +class TransportLayer; +class TransportLayerManager; +} // namespace transport + /** * Classes that implement this interface can receive notification on killOp. * @@ -207,9 +213,9 @@ public: * * The "desc" string is used to set a descriptive name for the client, used in logging. * - * If supplied, "p" is the communication channel used for communicating with the client. + * If supplied, "session" is the transport::Session used for communicating with the client. */ - UniqueClient makeClient(std::string desc, AbstractMessagingPort* p = nullptr); + UniqueClient makeClient(std::string desc, transport::Session* session = nullptr); /** * Creates a new OperationContext on "client". @@ -299,6 +305,26 @@ public: void registerKillOpListener(KillOpListenerInterface* listener); // + // Transport. + // + + /** + * Get the master TransportLayer. Routes to all other TransportLayers that + * may be in use within this service. + * + * See TransportLayerManager for more details. + */ + transport::TransportLayer* getTransportLayer() const; + + /** + * Add a new TransportLayer to this service context. The new TransportLayer will + * be added to the TransportLayerManager accessible via getTransportLayer(). + * + * It additionally calls start() on the TransportLayer after adding it. + */ + Status addAndStartTransportLayer(std::unique_ptr<transport::TransportLayer> tl); + + // // Global OpObserver. // @@ -370,6 +396,11 @@ private: /** + * The TransportLayerManager. + */ + std::unique_ptr<transport::TransportLayerManager> _transportLayerManager; + + /** * Vector of registered observers. */ std::vector<std::unique_ptr<ClientObserver>> _clientObservers; diff --git a/src/mongo/db/service_entry_point_mongod.cpp b/src/mongo/db/service_entry_point_mongod.cpp new file mode 100644 index 00000000000..86f782a0bad --- /dev/null +++ b/src/mongo/db/service_entry_point_mongod.cpp @@ -0,0 +1,153 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork + +#include "mongo/platform/basic.h" + +#include "mongo/db/service_entry_point_mongod.h" + +#include <vector> + +#include "mongo/db/client.h" +#include "mongo/db/dbmessage.h" +#include "mongo/db/instance.h" +#include "mongo/stdx/thread.h" +#include "mongo/transport/service_entry_point_utils.h" +#include "mongo/transport/session.h" +#include "mongo/transport/ticket.h" +#include "mongo/transport/transport_layer.h" +#include "mongo/util/exit.h" +#include "mongo/util/log.h" +#include "mongo/util/net/message.h" +#include "mongo/util/net/socket_exception.h" +#include "mongo/util/net/thread_idle_callback.h" +#include "mongo/util/quick_exit.h" + +namespace mongo { +namespace { + +// Set up proper headers for formatting an exhaust request, if we need to +bool setExhaustMessage(Message* m, const DbResponse& dbresponse) { + MsgData::View header = dbresponse.response.header(); + QueryResult::View qr = header.view2ptr(); + long long cursorid = qr.getCursorId(); + + if (!cursorid) { + return false; + } + + verify(dbresponse.exhaustNS.size() && dbresponse.exhaustNS[0]); + + auto ns = dbresponse.exhaustNS; // reset() will free this + + m->reset(); + + BufBuilder b(512); + b.appendNum(static_cast<int>(0) /* size set later in appendData() */); + b.appendNum(header.getId()); + b.appendNum(header.getResponseToMsgId()); + b.appendNum(static_cast<int>(dbGetMore)); + b.appendNum(static_cast<int>(0)); + b.appendStr(ns); + b.appendNum(static_cast<int>(0)); // ntoreturn + b.appendNum(cursorid); + + MsgData::View(b.buf()).setLen(b.len()); + m->setData(b.release()); + + return true; +} + +} // namespace + +using transport::Session; +using transport::TransportLayer; + +ServiceEntryPointMongod::ServiceEntryPointMongod(TransportLayer* tl) : _tl(tl) {} + +void ServiceEntryPointMongod::startSession(Session&& session) { + launchWrappedServiceEntryWorkerThread(std::move(session), + [this](Session* session) { _sessionLoop(session); }); +} + +void ServiceEntryPointMongod::_sessionLoop(Session* session) { + Message inMessage; + bool inExhaust = false; + int64_t counter = 0; + + while (true) { + // 1. Source a Message from the client (unless we are exhausting) + if (!inExhaust) { + inMessage.reset(); + auto status = session->sourceMessage(&inMessage).wait(); + + if (ErrorCodes::isInterruption(status.code())) { + break; + } + + uassertStatusOK(status); + } + + // 2. Pass sourced Message up to mongod + DbResponse dbresponse; + { + auto opCtx = cc().makeOperationContext(); + assembleResponse(opCtx.get(), inMessage, dbresponse, session->remote()); + + // opCtx must go out of scope here so that the operation cannot show + // up in currentOp results after the response reaches the client + } + + // 3. Format our response, if we have one + Message& toSink = dbresponse.response; + if (!toSink.empty()) { + toSink.header().setId(nextMessageId()); + toSink.header().setResponseToMsgId(inMessage.header().getId()); + + // If this is an exhaust cursor, don't source more Messages + if (dbresponse.exhaustNS.size() > 0 && setExhaustMessage(&inMessage, dbresponse)) { + log() << "we are in exhaust"; + inExhaust = true; + } else { + inExhaust = false; + } + + // 4. Sink our response to the client + uassertStatusOK(session->sinkMessage(toSink).wait()); + } else { + inExhaust = false; + } + + if ((counter++ & 0xf) == 0) { + markThreadIdle(); + } + } +} + +} // namespace mongo diff --git a/src/mongo/db/service_entry_point_mongod.h b/src/mongo/db/service_entry_point_mongod.h new file mode 100644 index 00000000000..31ed05098c8 --- /dev/null +++ b/src/mongo/db/service_entry_point_mongod.h @@ -0,0 +1,63 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <vector> + +#include "mongo/base/disallow_copying.h" +#include "mongo/transport/service_entry_point.h" + +namespace mongo { + +namespace transport { +class Session; +class TransportLayer; +} // namespace transport + +/** + * The entry point from the TransportLayer into Mongod. startSession() spawns and + * detaches a new thread for each incoming connection (transport::Session). + */ +class ServiceEntryPointMongod final : public ServiceEntryPoint { + MONGO_DISALLOW_COPYING(ServiceEntryPointMongod); + +public: + explicit ServiceEntryPointMongod(transport::TransportLayer* tl); + + virtual ~ServiceEntryPointMongod() = default; + + void startSession(transport::Session&& session) override; + +private: + void _sessionLoop(transport::Session* session); + + transport::TransportLayer* _tl; +}; + +} // namespace mongo diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index 1f0411b13cf..8b0b8321aaa 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -98,6 +98,7 @@ env.Library( '$BUILD_DIR/mongo/s/catalog/replset/sharding_catalog_client_impl', '$BUILD_DIR/mongo/s/catalog/replset/sharding_catalog_manager_impl', '$BUILD_DIR/mongo/s/coreshard', + '$BUILD_DIR/mongo/transport/transport_layer_mock', '$BUILD_DIR/mongo/util/clock_source_mock', '$BUILD_DIR/mongo/util/net/message_port_mock', 'mongoscore', diff --git a/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp b/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp index 53aa481fdbc..ac7d4dd759e 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp +++ b/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp @@ -78,8 +78,7 @@ protected: */ void setUp() override { ShardingCatalogTestFixture::setUp(); - - getMessagingPort()->setRemote(HostAndPort("FakeRemoteClient:34567")); + setRemote(HostAndPort("FakeRemoteClient:34567")); configTargeter()->setConnectionStringReturnValue(_configConnStr); diff --git a/src/mongo/s/catalog/replset/sharding_catalog_drop_coll_test.cpp b/src/mongo/s/catalog/replset/sharding_catalog_drop_coll_test.cpp index b81a5bbc68f..f5609adef83 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_drop_coll_test.cpp +++ b/src/mongo/s/catalog/replset/sharding_catalog_drop_coll_test.cpp @@ -58,8 +58,8 @@ class DropColl2ShardTest : public ShardingCatalogTestFixture { public: void setUp() override { ShardingCatalogTestFixture::setUp(); + setRemote(_clientHost); - getMessagingPort()->setRemote(_clientHost); configTargeter()->setFindHostReturnValue(_configHost); configTargeter()->setConnectionStringReturnValue(_configCS); diff --git a/src/mongo/s/catalog/replset/sharding_catalog_remove_shard_test.cpp b/src/mongo/s/catalog/replset/sharding_catalog_remove_shard_test.cpp index 59cf084d98d..36fcb6eba63 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_remove_shard_test.cpp +++ b/src/mongo/s/catalog/replset/sharding_catalog_remove_shard_test.cpp @@ -124,7 +124,7 @@ TEST_F(RemoveShardTest, RemoveShardCantRemoveLastShard) { TEST_F(RemoveShardTest, RemoveShardStartDraining) { string shardName = "shardToRemove"; const HostAndPort clientHost{"client1:12345"}; - getMessagingPort()->setRemote(clientHost); + setRemote(clientHost); auto future = launchAsync([&] { auto result = assertGet(catalogClient()->removeShard(operationContext(), shardName)); @@ -288,7 +288,7 @@ TEST_F(RemoveShardTest, RemoveShardStillDrainingDatabasesRemaining) { TEST_F(RemoveShardTest, RemoveShardCompletion) { string shardName = "shardToRemove"; const HostAndPort clientHost{"client1:12345"}; - getMessagingPort()->setRemote(clientHost); + setRemote(clientHost); auto future = launchAsync([&] { auto result = assertGet(catalogClient()->removeShard(operationContext(), shardName)); diff --git a/src/mongo/s/catalog/replset/sharding_catalog_shard_collection_test.cpp b/src/mongo/s/catalog/replset/sharding_catalog_shard_collection_test.cpp index d1d36756984..57e35290ced 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_shard_collection_test.cpp +++ b/src/mongo/s/catalog/replset/sharding_catalog_shard_collection_test.cpp @@ -87,7 +87,7 @@ public: ShardingCatalogTestFixture::setUp(); configTargeter()->setFindHostReturnValue(configHost); configTargeter()->setConnectionStringReturnValue(configCS); - getMessagingPort()->setRemote(clientHost); + setRemote(clientHost); } void expectGetDatabase(const DatabaseType& expectedDb) { diff --git a/src/mongo/s/chunk_manager_tests.cpp b/src/mongo/s/chunk_manager_tests.cpp index 280b9921d69..5fe831b50a9 100644 --- a/src/mongo/s/chunk_manager_tests.cpp +++ b/src/mongo/s/chunk_manager_tests.cpp @@ -68,7 +68,7 @@ class ChunkManagerFixture : public ShardingCatalogTestFixture { public: void setUp() override { ShardingCatalogTestFixture::setUp(); - getMessagingPort()->setRemote(HostAndPort("FakeRemoteClient:34567")); + setRemote(HostAndPort("FakeRemoteClient:34567")); configTargeter()->setFindHostReturnValue(configHost); } diff --git a/src/mongo/s/commands/request.cpp b/src/mongo/s/commands/request.cpp index 050483d2969..6605db10424 100644 --- a/src/mongo/s/commands/request.cpp +++ b/src/mongo/s/commands/request.cpp @@ -42,6 +42,7 @@ #include "mongo/s/cluster_last_error_info.h" #include "mongo/s/commands/strategy.h" #include "mongo/s/grid.h" +#include "mongo/transport/session.h" #include "mongo/util/log.h" #include "mongo/util/timer.h" @@ -49,8 +50,8 @@ namespace mongo { using std::string; -Request::Request(Message& m, AbstractMessagingPort* p) - : _clientInfo(&cc()), _m(m), _d(m), _p(p), _id(_m.header().getId()), _didInit(false) { +Request::Request(Message& m) + : _clientInfo(&cc()), _m(m), _d(m), _id(_m.header().getId()), _didInit(false) { ClusterLastErrorInfo::get(_clientInfo).newRequest(); } @@ -122,4 +123,8 @@ void Request::process(OperationContext* txn, int attempt) { << " op: " << op << " attempt: " << attempt; } +transport::Session* Request::session() const { + return _clientInfo->session(); +} + } // namespace mongo diff --git a/src/mongo/s/commands/request.h b/src/mongo/s/commands/request.h index 5664474e845..ace3e77a720 100644 --- a/src/mongo/s/commands/request.h +++ b/src/mongo/s/commands/request.h @@ -37,11 +37,15 @@ namespace mongo { class Client; class OperationContext; +namespace transport { +class Session; +} // namespace transport + class Request { MONGO_DISALLOW_COPYING(Request); public: - Request(Message& m, AbstractMessagingPort* p); + Request(Message& m); const char* getns() const { return _d.getns(); @@ -71,9 +75,8 @@ public: DbMessage& d() { return _d; } - AbstractMessagingPort* p() const { - return _p; - } + + transport::Session* session() const; void process(OperationContext* txn, int attempt = 0); @@ -84,7 +87,6 @@ private: Message& _m; DbMessage _d; - AbstractMessagingPort* const _p; int32_t _id; diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp index fcba8883592..69d02a3e0e9 100644 --- a/src/mongo/s/commands/strategy.cpp +++ b/src/mongo/s/commands/strategy.cpp @@ -176,7 +176,7 @@ void Strategy::queryOp(OperationContext* txn, Request& request) { BSONObj explainObj = explainBuilder.done(); replyToQuery(0, // query result flags - request.p(), + request.session(), request.m(), static_cast<const void*>(explainObj.objdata()), explainObj.objsize(), @@ -202,7 +202,7 @@ void Strategy::queryOp(OperationContext* txn, Request& request) { obj.appendSelfToBufBuilder(reply.bufBuilderForResults()); numResults++; } - reply.send(request.p(), + reply.send(request.session(), 0, // query result flags request.m(), numResults, @@ -265,7 +265,7 @@ void Strategy::clientCommandOp(OperationContext* txn, Request& request) { BSONObjBuilder builder(reply.bufBuilderForResults()); runAgainstRegistered(txn, q.ns, cmdObj, builder, q.queryOptions); } - reply.sendCommandReply(request.p(), request.m()); + reply.sendCommandReply(request.session(), request.m()); return; } catch (const StaleConfigException& e) { if (loops <= 0) @@ -288,7 +288,7 @@ void Strategy::clientCommandOp(OperationContext* txn, Request& request) { BSONObjBuilder builder(reply.bufBuilderForResults()); Command::appendCommandStatus(builder, e.toStatus()); } - reply.sendCommandReply(request.p(), request.m()); + reply.sendCommandReply(request.session(), request.m()); return; } } @@ -325,7 +325,7 @@ bool Strategy::handleSpecialNamespaces(OperationContext* txn, Request& request, } BSONObj x = reply.done(); - replyToQuery(0, request.p(), request.m(), x); + replyToQuery(0, request.session(), request.m(), x); return true; } @@ -369,7 +369,7 @@ void Strategy::getMore(OperationContext* txn, Request& request) { const NamespaceString nss(ns); auto statusGetDb = grid.catalogCache()->getDatabase(txn, nss.db().toString()); if (statusGetDb == ErrorCodes::NamespaceNotFound) { - replyToQuery(ResultFlag_CursorNotFound, request.p(), request.m(), 0, 0, 0); + replyToQuery(ResultFlag_CursorNotFound, request.session(), request.m(), 0, 0, 0); return; } @@ -384,7 +384,7 @@ void Strategy::getMore(OperationContext* txn, Request& request) { auto cursorResponse = ClusterFind::runGetMore(txn, getMoreRequest); if (cursorResponse == ErrorCodes::CursorNotFound) { - replyToQuery(ResultFlag_CursorNotFound, request.p(), request.m(), 0, 0, 0); + replyToQuery(ResultFlag_CursorNotFound, request.session(), request.m(), 0, 0, 0); return; } uassertStatusOK(cursorResponse.getStatus()); @@ -399,7 +399,7 @@ void Strategy::getMore(OperationContext* txn, Request& request) { } replyToQuery(0, - request.p(), + request.session(), request.m(), buffer.buf(), buffer.len(), diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp index b6893c076ea..e2957554ad2 100644 --- a/src/mongo/s/query/async_results_merger_test.cpp +++ b/src/mongo/s/query/async_results_merger_test.cpp @@ -67,8 +67,8 @@ public: void setUp() override { ShardingTestFixture::setUp(); + setRemote(HostAndPort("ClientHost", 12345)); - getMessagingPort()->setRemote(HostAndPort("ClientHost", 12345)); configTargeter()->setFindHostReturnValue(kTestConfigShardHost); std::vector<ShardType> shards; diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp index d4912eead77..f4fda4103ee 100644 --- a/src/mongo/s/server.cpp +++ b/src/mongo/s/server.cpp @@ -77,12 +77,14 @@ #include "mongo/s/mongos_options.h" #include "mongo/s/query/cluster_cursor_cleanup_job.h" #include "mongo/s/query/cluster_cursor_manager.h" +#include "mongo/s/service_entry_point_mongos.h" #include "mongo/s/sharding_egress_metadata_hook_for_mongos.h" #include "mongo/s/sharding_initialization.h" #include "mongo/s/sharding_uptime_reporter.h" #include "mongo/s/version_mongos.h" #include "mongo/stdx/memory.h" #include "mongo/stdx/thread.h" +#include "mongo/transport/transport_layer_legacy.h" #include "mongo/util/admin_access.h" #include "mongo/util/cmdline_utils/censor_cmdline.h" #include "mongo/util/concurrency/thread_name.h" @@ -92,7 +94,6 @@ #include "mongo/util/log.h" #include "mongo/util/net/hostname_canonicalization_worker.h" #include "mongo/util/net/message.h" -#include "mongo/util/net/message_server.h" #include "mongo/util/net/socket_exception.h" #include "mongo/util/net/ssl_manager.h" #include "mongo/util/ntservice.h" @@ -160,58 +161,6 @@ static BSONObj buildErrReply(const DBException& ex) { return errB.obj(); } -class ShardedMessageHandler : public MessageHandler { -public: - virtual ~ShardedMessageHandler() {} - - virtual void connected(AbstractMessagingPort* p) { - Client::initThread("conn", getGlobalServiceContext(), p); - } - - virtual void process(Message& m, AbstractMessagingPort* p) { - verify(p); - Request r(m, p); - auto txn = cc().makeOperationContext(); - - try { - r.init(txn.get()); - r.process(txn.get()); - } catch (const AssertionException& ex) { - LOG(ex.isUserAssertion() ? 1 : 0) << "Assertion failed" - << " while processing " - << networkOpToString(m.operation()) << " op" - << " for " << r.getnsIfPresent() << causedBy(ex); - - if (r.expectResponse()) { - m.header().setId(r.id()); - replyToQuery(ResultFlag_ErrSet, p, m, buildErrReply(ex)); - } - - // We *always* populate the last error for now - LastError::get(cc()).setLastError(ex.getCode(), ex.what()); - } catch (const DBException& ex) { - log() << "Exception thrown" - << " while processing " << networkOpToString(m.operation()) << " op" - << " for " << r.getnsIfPresent() << causedBy(ex); - - if (r.expectResponse()) { - m.header().setId(r.id()); - replyToQuery(ResultFlag_ErrSet, p, m, buildErrReply(ex)); - } - - // We *always* populate the last error for now - LastError::get(cc()).setLastError(ex.getCode(), ex.what()); - } - - // Release connections back to pool, if any still cached - ShardConnection::releaseMyConnections(); - } - - virtual void close() { - Client::destroy(); - } -}; - } // namespace mongo using namespace mongo; @@ -278,6 +227,19 @@ static ExitCode runMongosServer() { _initWireSpec(); + transport::TransportLayerLegacy::Options opts; + opts.port = serverGlobalParams.port; + opts.ipList = serverGlobalParams.bind_ip; + + auto sep = + std::make_shared<ServiceEntryPointMongos>(getGlobalServiceContext()->getTransportLayer()); + + auto transportLayer = stdx::make_unique<transport::TransportLayerLegacy>(opts, sep); + auto res = transportLayer->setup(); + if (!res.isOK()) { + return EXIT_NET_ERROR; + } + // Add sharding hooks to both connection pools - ShardingConnectionHook includes auth hooks globalConnPool.addHook(new ShardingConnectionHookForMongos(false)); shardConnectionPool.addHook(new ShardingConnectionHookForMongos(true)); @@ -352,19 +314,13 @@ static ExitCode runMongosServer() { PeriodicTask::startRunningPeriodicTasks(); - MessageServer::Options opts; - opts.port = serverGlobalParams.port; - opts.ipList = serverGlobalParams.bind_ip; - - auto handler = std::make_shared<ShardedMessageHandler>(); - MessageServer* server = createServer(opts, std::move(handler), getGlobalServiceContext()); - if (!server->setupSockets()) { + auto start = getGlobalServiceContext()->addAndStartTransportLayer(std::move(transportLayer)); + if (!start.isOK()) { return EXIT_NET_ERROR; } - server->run(); - // MessageServer::run will return when exit code closes its socket - return inShutdown() ? EXIT_CLEAN : EXIT_NET_ERROR; + // Block until shutdown. + return waitForShutdown(); } MONGO_INITIALIZER_GENERAL(ForkServer, ("EndStartupOptionHandling"), ("default")) diff --git a/src/mongo/s/service_entry_point_mongos.cpp b/src/mongo/s/service_entry_point_mongos.cpp new file mode 100644 index 00000000000..fe7d92fd1ad --- /dev/null +++ b/src/mongo/s/service_entry_point_mongos.cpp @@ -0,0 +1,141 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork + +#include "mongo/platform/basic.h" + +#include "mongo/s/service_entry_point_mongos.h" + +#include <vector> + +#include "mongo/db/lasterror.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/server_options.h" +#include "mongo/db/service_context.h" +#include "mongo/s/client/shard_connection.h" +#include "mongo/s/commands/request.h" +#include "mongo/stdx/thread.h" +#include "mongo/transport/service_entry_point_utils.h" +#include "mongo/transport/session.h" +#include "mongo/transport/transport_layer.h" +#include "mongo/util/exit.h" +#include "mongo/util/log.h" +#include "mongo/util/net/message.h" +#include "mongo/util/net/socket_exception.h" +#include "mongo/util/net/thread_idle_callback.h" +#include "mongo/util/quick_exit.h" +#include "mongo/util/scopeguard.h" + +namespace mongo { + +namespace { + +BSONObj buildErrReply(const DBException& ex) { + BSONObjBuilder errB; + errB.append("$err", ex.what()); + errB.append("code", ex.getCode()); + if (!ex._shard.empty()) { + errB.append("shard", ex._shard); + } + return errB.obj(); +} + +} // namespace + +using transport::Session; +using transport::TransportLayer; + +ServiceEntryPointMongos::ServiceEntryPointMongos(TransportLayer* tl) : _tl(tl) {} + +void ServiceEntryPointMongos::startSession(Session&& session) { + launchWrappedServiceEntryWorkerThread(std::move(session), + [this](Session* session) { _sessionLoop(session); }); +} + +void ServiceEntryPointMongos::_sessionLoop(Session* session) { + Message message; + int64_t counter = 0; + + while (true) { + // Release any cached egress connections for client back to pool before destroying + auto guard = MakeGuard(ShardConnection::releaseMyConnections); + + message.reset(); + + // 1. Source a Message from the client + { + auto status = session->sourceMessage(&message).wait(); + + if (ErrorCodes::isInterruption(status.code())) { + break; + } + + uassertStatusOK(status); + } + + // 2. Build a sharding request + Request r(message); + auto txn = cc().makeOperationContext(); + + try { + r.init(txn.get()); + r.process(txn.get()); + } catch (const AssertionException& ex) { + LOG(ex.isUserAssertion() ? 1 : 0) << "Assertion failed" + << " while processing " + << networkOpToString(message.operation()) << " op" + << " for " << r.getnsIfPresent() << causedBy(ex); + if (r.expectResponse()) { + message.header().setId(r.id()); + replyToQuery(ResultFlag_ErrSet, session, message, buildErrReply(ex)); + } + + // We *always* populate the last error for now + LastError::get(cc()).setLastError(ex.getCode(), ex.what()); + } catch (const DBException& ex) { + log() << "Exception thrown" + << " while processing " << networkOpToString(message.operation()) << " op" + << " for " << r.getnsIfPresent() << causedBy(ex); + + if (r.expectResponse()) { + message.header().setId(r.id()); + replyToQuery(ResultFlag_ErrSet, session, message, buildErrReply(ex)); + } + + // We *always* populate the last error for now + LastError::get(cc()).setLastError(ex.getCode(), ex.what()); + } + + if ((counter++ & 0xf) == 0) { + markThreadIdle(); + } + } +} + +} // namespace mongo diff --git a/src/mongo/s/service_entry_point_mongos.h b/src/mongo/s/service_entry_point_mongos.h new file mode 100644 index 00000000000..d96540b2b1e --- /dev/null +++ b/src/mongo/s/service_entry_point_mongos.h @@ -0,0 +1,63 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <vector> + +#include "mongo/base/disallow_copying.h" +#include "mongo/transport/service_entry_point.h" + +namespace mongo { + +namespace transport { +class Session; +class TransportLayer; +} // namespace transport + +/** + * The entry point from the TransportLayer into Mongos. startSession() spawns and + * detaches a new thread for each incoming connection (transport::Session). + */ +class ServiceEntryPointMongos final : public ServiceEntryPoint { + MONGO_DISALLOW_COPYING(ServiceEntryPointMongos); + +public: + ServiceEntryPointMongos(transport::TransportLayer* tl); + + virtual ~ServiceEntryPointMongos() = default; + + void startSession(transport::Session&& session) override; + +private: + void _sessionLoop(transport::Session* session); + + transport::TransportLayer* _tl; +}; + +} // namespace mongo diff --git a/src/mongo/s/sharding_test_fixture.cpp b/src/mongo/s/sharding_test_fixture.cpp index 5d8c3e2285e..f099ac2e67b 100644 --- a/src/mongo/s/sharding_test_fixture.cpp +++ b/src/mongo/s/sharding_test_fixture.cpp @@ -65,6 +65,8 @@ #include "mongo/s/write_ops/batched_command_request.h" #include "mongo/s/write_ops/batched_command_response.h" #include "mongo/stdx/memory.h" +#include "mongo/transport/transport_layer.h" +#include "mongo/transport/transport_layer_mock.h" #include "mongo/util/clock_source_mock.h" #include "mongo/util/tick_source_mock.h" @@ -92,8 +94,12 @@ void ShardingTestFixture::setUp() { _service->setFastClockSource(stdx::make_unique<ClockSourceMock>()); _service->setPreciseClockSource(stdx::make_unique<ClockSourceMock>()); _service->setTickSource(stdx::make_unique<TickSourceMock>()); - _messagePort = stdx::make_unique<MessagingPortMock>(); - _client = _service->makeClient("ShardingTestFixture", _messagePort.get()); + auto tlMock = stdx::make_unique<transport::TransportLayerMock>(); + _transportLayer = tlMock.get(); + _service->addAndStartTransportLayer(std::move(tlMock)); + _transportSession = + stdx::make_unique<transport::Session>(HostAndPort{}, HostAndPort{}, _transportLayer); + _client = _service->makeClient("ShardingTestFixture", _transportSession.get()); _opCtx = _client->makeOperationContext(); // Set up executor pool used for most operations. @@ -184,6 +190,7 @@ void ShardingTestFixture::tearDown() { grid.catalogClient(_opCtx.get())->shutDown(_opCtx.get()); grid.clearForUnitTests(); + _transportSession.reset(); _opCtx.reset(); _client.reset(); _service.reset(); @@ -238,10 +245,6 @@ executor::TaskExecutor* ShardingTestFixture::executor() const { return _executor; } -MessagingPortMock* ShardingTestFixture::getMessagingPort() const { - return _messagePort.get(); -} - DistLockManagerMock* ShardingTestFixture::distLock() const { invariant(_distLockManager); return _distLockManager; @@ -508,6 +511,10 @@ void ShardingTestFixture::expectCount(const HostAndPort& configHost, }); } +void ShardingTestFixture::setRemote(const HostAndPort& remote) { + *_transportSession = transport::Session{remote, HostAndPort{}, _transportLayer}; +} + void ShardingTestFixture::checkReadConcern(const BSONObj& cmdObj, const Timestamp& expectedTS, long long expectedTerm) const { diff --git a/src/mongo/s/sharding_test_fixture.h b/src/mongo/s/sharding_test_fixture.h index 2a930bbefc4..9365a18dd66 100644 --- a/src/mongo/s/sharding_test_fixture.h +++ b/src/mongo/s/sharding_test_fixture.h @@ -60,6 +60,10 @@ class NetworkInterfaceMock; class TaskExecutor; } // namespace executor +namespace transport { +class TransportLayerMock; +} // namepsace transport + /** * Sets up the mocked out objects for testing the replica-set backed catalog manager and catalog * client. @@ -97,7 +101,7 @@ protected: executor::TaskExecutor* executor() const; - MessagingPortMock* getMessagingPort() const; + transport::Session* getTransportSession() const; DistLockManagerMock* distLock() const; @@ -196,6 +200,8 @@ protected: void shutdownExecutor(); + void setRemote(const HostAndPort& remote); + /** * Checks that the given command has the expected settings for read after opTime. */ @@ -207,7 +213,8 @@ private: std::unique_ptr<ServiceContext> _service; ServiceContext::UniqueClient _client; ServiceContext::UniqueOperationContext _opCtx; - std::unique_ptr<MessagingPortMock> _messagePort; + transport::TransportLayerMock* _transportLayer; + std::unique_ptr<transport::Session> _transportSession; RemoteCommandTargeterFactoryMock* _targeterFactory; RemoteCommandTargeterMock* _configTargeter; diff --git a/src/mongo/s/write_ops/batch_write_exec_test.cpp b/src/mongo/s/write_ops/batch_write_exec_test.cpp index dea991b80ef..d79e45a1fa9 100644 --- a/src/mongo/s/write_ops/batch_write_exec_test.cpp +++ b/src/mongo/s/write_ops/batch_write_exec_test.cpp @@ -66,7 +66,7 @@ public: void setUp() override { ShardingTestFixture::setUp(); - getMessagingPort()->setRemote(HostAndPort("ClientHost", 12345)); + setRemote(HostAndPort("ClientHost", 12345)); // Set up the RemoteCommandTargeter for the config shard. configTargeter()->setFindHostReturnValue(kTestConfigShardHost); diff --git a/src/mongo/tools/bridge.cpp b/src/mongo/tools/bridge.cpp index 7109c174a99..d37751fe537 100644 --- a/src/mongo/tools/bridge.cpp +++ b/src/mongo/tools/bridge.cpp @@ -320,17 +320,17 @@ public: log() << "Setting random seed: " << mongoBridgeGlobalParams.seed; } - void accepted(AbstractMessagingPort* mp) override final { + void accepted(std::unique_ptr<AbstractMessagingPort> mp) override final { { stdx::lock_guard<stdx::mutex> lk(_portsMutex); if (_inShutdown.load()) { mp->shutdown(); return; } - _ports.insert(mp); + _ports.insert(mp.get()); } - Forwarder f(mp, &_settingsMutex, &_settings, _seedSource.nextInt64()); + Forwarder f(mp.release(), &_settingsMutex, &_settings, _seedSource.nextInt64()); stdx::thread t(f); t.detach(); } diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript index 82045d5c0e8..adedade05a3 100644 --- a/src/mongo/transport/SConscript +++ b/src/mongo/transport/SConscript @@ -13,13 +13,10 @@ env.CppUnitTest( ) env.Library( - target='service_entry_point_test_suite', + target='transport_layer_common', source=[ - 'service_entry_point_test_suite.cpp', - ## move these somewhere better: 'session.cpp', 'ticket.cpp', - 'transport_layer.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/unittest/unittest', @@ -28,10 +25,63 @@ env.Library( ], ) +env.Library( + target='transport_layer_manager', + source=[ + 'transport_layer_manager.cpp', + ], + LIBDEPS=[ + 'transport_layer_common', + ], +) + +env.Library( + target='transport_layer_mock', + source=[ + 'transport_layer_mock.cpp', + ], + LIBDEPS=[ + 'transport_layer_common', + ], +) + +env.Library( + target='transport_layer_legacy', + source=[ + 'transport_layer_legacy.cpp', + ], + LIBDEPS=[ + 'transport_layer_common', + '$BUILD_DIR/mongo/db/service_context', + '$BUILD_DIR/mongo/db/stats/counters', + ], +) + +env.Library( + target='service_entry_point_test_suite', + source=[ + 'service_entry_point_test_suite.cpp', + ], + LIBDEPS=[ + 'transport_layer_common', + '$BUILD_DIR/mongo/unittest/unittest', + ], +) + +env.Library( + target='service_entry_point_utils', + source=[ + 'service_entry_point_utils.cpp', + ], + LIBDEPS=[ + 'transport_layer_common', + ], +) + env.CppUnitTest( target='service_entry_point_mock_test', source=[ - 'service_entry_point_mock.cpp', ## move? + 'service_entry_point_mock.cpp', 'service_entry_point_mock_test.cpp', ], LIBDEPS=[ diff --git a/src/mongo/transport/service_entry_point_mock.cpp b/src/mongo/transport/service_entry_point_mock.cpp index 6cfe3606043..8d4530d7f13 100644 --- a/src/mongo/transport/service_entry_point_mock.cpp +++ b/src/mongo/transport/service_entry_point_mock.cpp @@ -100,16 +100,12 @@ void ServiceEntryPointMock::run(transport::Session&& session) { } // sourceMessage() - auto sourceTicket = _tl->sourceMessage(session, &inMessage); - auto sourceRes = _tl->wait(std::move(sourceTicket)); - if (!sourceRes.isOK()) { + if (!session.sourceMessage(&inMessage).wait().isOK()) { break; } // sinkMessage() - auto sinkTicket = _tl->sinkMessage(session, _outMessage); - auto sinkRes = _tl->wait(std::move(sinkTicket)); - if (!sinkRes.isOK()) { + if (!session.sinkMessage(_outMessage).wait().isOK()) { break; } } diff --git a/src/mongo/transport/service_entry_point_test_suite.cpp b/src/mongo/transport/service_entry_point_test_suite.cpp index e4f93ae3344..e2a9c7f336c 100644 --- a/src/mongo/transport/service_entry_point_test_suite.cpp +++ b/src/mongo/transport/service_entry_point_test_suite.cpp @@ -57,7 +57,6 @@ namespace mongo { using namespace transport; using namespace stdx::placeholders; -using SessionId = Session::SessionId; using TicketCallback = TransportLayer::TicketCallback; namespace { @@ -107,7 +106,7 @@ ServiceEntryPointTestSuite::MockTicket::MockTicket(const Session& session, ServiceEntryPointTestSuite::MockTicket::MockTicket(const Session& session, Date_t expiration) : _sessionId(session.id()), _expiration(expiration) {} -SessionId ServiceEntryPointTestSuite::MockTicket::sessionId() const { +Session::Id ServiceEntryPointTestSuite::MockTicket::sessionId() const { return _sessionId; } @@ -140,20 +139,35 @@ Ticket ServiceEntryPointTestSuite::MockTLHarness::sinkMessage(const Session& ses return _sinkMessage(session, message, expiration); } -Status ServiceEntryPointTestSuite::MockTLHarness::wait(Ticket ticket) { +Status ServiceEntryPointTestSuite::MockTLHarness::wait(Ticket&& ticket) { return _wait(std::move(ticket)); } -void ServiceEntryPointTestSuite::MockTLHarness::asyncWait(Ticket ticket, TicketCallback callback) { +void ServiceEntryPointTestSuite::MockTLHarness::asyncWait(Ticket&& ticket, + TicketCallback callback) { return _asyncWait(std::move(ticket), std::move(callback)); } +std::string ServiceEntryPointTestSuite::MockTLHarness::getX509SubjectName(const Session& session) { + return "mock"; +} + +void ServiceEntryPointTestSuite::MockTLHarness::registerTags(const Session& session) {} + +TransportLayer::Stats ServiceEntryPointTestSuite::MockTLHarness::sessionStats() { + return Stats(); +} + void ServiceEntryPointTestSuite::MockTLHarness::end(const Session& session) { return _end(session); } -void ServiceEntryPointTestSuite::MockTLHarness::endAllSessions() { - return _endAllSessions(); +void ServiceEntryPointTestSuite::MockTLHarness::endAllSessions(Session::TagMask tags) { + return _endAllSessions(tags); +} + +Status ServiceEntryPointTestSuite::MockTLHarness::start() { + return _start(); } void ServiceEntryPointTestSuite::MockTLHarness::shutdown() { @@ -180,13 +194,13 @@ Status ServiceEntryPointTestSuite::MockTLHarness::_waitOnceThenError(transport:: Ticket ServiceEntryPointTestSuite::MockTLHarness::_defaultSource(const Session& s, Message* m, Date_t d) { - return Ticket(stdx::make_unique<ServiceEntryPointTestSuite::MockTicket>(s, m, d)); + return Ticket(this, stdx::make_unique<ServiceEntryPointTestSuite::MockTicket>(s, m, d)); } Ticket ServiceEntryPointTestSuite::MockTLHarness::_defaultSink(const Session& s, const Message&, Date_t d) { - return Ticket(stdx::make_unique<ServiceEntryPointTestSuite::MockTicket>(s, d)); + return Ticket(this, stdx::make_unique<ServiceEntryPointTestSuite::MockTicket>(s, d)); } Ticket ServiceEntryPointTestSuite::MockTLHarness::_sinkThenErrorOnWait(const Session& s, @@ -308,6 +322,7 @@ void ServiceEntryPointTestSuite::interruptingSessionTest() { auto resumeAFuture = resumeA.get_future(); stdx::promise<void> testComplete; + auto testFuture = testComplete.get_future(); _tl->_resetHooks(); @@ -376,10 +391,11 @@ void ServiceEntryPointTestSuite::burstStressTest(int numSessions, Milliseconds delay) { AtomicWord<int> ended{0}; stdx::promise<void> allSessionsComplete; + auto allCompleteFuture = allSessionsComplete.get_future(); stdx::mutex cyclesLock; - std::unordered_map<SessionId, int> completedCycles; + std::unordered_map<Session::Id, int> completedCycles; _tl->_resetHooks(); diff --git a/src/mongo/transport/service_entry_point_test_suite.h b/src/mongo/transport/service_entry_point_test_suite.h index a1999efd873..828fee5fe3c 100644 --- a/src/mongo/transport/service_entry_point_test_suite.h +++ b/src/mongo/transport/service_entry_point_test_suite.h @@ -99,7 +99,7 @@ private: MockTicket(MockTicket&&) = default; MockTicket& operator=(MockTicket&&) = default; - transport::Session::SessionId sessionId() const override; + transport::Session::Id sessionId() const override; Date_t expiration() const override; @@ -107,7 +107,7 @@ private: private: boost::optional<Message*> _message; - SessionId _sessionId; + transport::Session::Id _sessionId; Date_t _expiration; }; @@ -127,10 +127,15 @@ private: const transport::Session& session, const Message& message, Date_t expiration = transport::Ticket::kNoExpirationDate) override; - Status wait(transport::Ticket ticket) override; - void asyncWait(transport::Ticket ticket, TicketCallback callback) override; + Status wait(transport::Ticket&& ticket) override; + void asyncWait(transport::Ticket&& ticket, TicketCallback callback) override; + std::string getX509SubjectName(const transport::Session& session) override; + void registerTags(const transport::Session& session) override; + Stats sessionStats() override; void end(const transport::Session& session) override; - void endAllSessions() override; + void endAllSessions( + transport::Session::TagMask tags = transport::Session::kEmptyTagMask) override; + Status start() override; void shutdown() override; ServiceEntryPointTestSuite::MockTicket* getMockTicket(const transport::Ticket& ticket); @@ -143,7 +148,9 @@ private: stdx::function<Status(transport::Ticket)> _wait; stdx::function<void(transport::Ticket, TicketCallback)> _asyncWait; stdx::function<void(const transport::Session&)> _end; - stdx::function<void(void)> _endAllSessions = [] {}; + stdx::function<void(transport::Session::TagMask tags)> _endAllSessions = + [](transport::Session::TagMask tags) {}; + stdx::function<Status(void)> _start = [] { return Status::OK(); }; stdx::function<void(void)> _shutdown = [] {}; // Pre-set hook methods diff --git a/src/mongo/transport/service_entry_point_utils.cpp b/src/mongo/transport/service_entry_point_utils.cpp new file mode 100644 index 00000000000..efe934967fb --- /dev/null +++ b/src/mongo/transport/service_entry_point_utils.cpp @@ -0,0 +1,150 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault + +#include "mongo/platform/basic.h" + +#include "mongo/transport/service_entry_point_utils.h" + +#include "mongo/db/client.h" +#include "mongo/db/server_options.h" +#include "mongo/stdx/memory.h" +#include "mongo/transport/session.h" +#include "mongo/transport/transport_layer.h" +#include "mongo/util/assert_util.h" +#include "mongo/util/debug_util.h" +#include "mongo/util/log.h" +#include "mongo/util/net/socket_exception.h" +#include "mongo/util/quick_exit.h" + +#ifdef __linux__ // TODO: consider making this ifndef _WIN32 +#include <sys/resource.h> +#endif + +#if !defined(__has_feature) +#define __has_feature(x) 0 +#endif + +namespace mongo { + +namespace { + +struct Context { + Context(transport::Session session, stdx::function<void(transport::Session*)> task) + : session(std::move(session)), task(std::move(task)) {} + + transport::Session session; + stdx::function<void(transport::Session*)> task; +}; + +void* runFunc(void* ptr) { + std::unique_ptr<Context> ctx(static_cast<Context*>(ptr)); + + auto tl = ctx->session.getTransportLayer(); + Client::initThread("conn", &ctx->session); + setThreadName(std::string(str::stream() << "conn" << ctx->session.id())); + + try { + ctx->task(&ctx->session); + } catch (const AssertionException& e) { + log() << "AssertionException handling request, closing client connection: " << e; + } catch (const SocketException& e) { + log() << "SocketException handling request, closing client connection: " << e; + } catch (const DBException& e) { + // must be right above std::exception to avoid catching subclasses + log() << "DBException handling request, closing client connection: " << e; + } catch (const std::exception& e) { + error() << "Uncaught std::exception: " << e.what() << ", terminating"; + quickExit(EXIT_UNCAUGHT); + } + + tl->end(ctx->session); + + if (!serverGlobalParams.quiet) { + auto conns = tl->sessionStats().numOpenSessions; + const char* word = (conns == 1 ? " connection" : " connections"); + log() << "end connection " << ctx->session.remote() << " (" << conns << word + << " now open)"; + } + + Client::destroy(); + + return nullptr; +} +} // namespace + +void launchWrappedServiceEntryWorkerThread(transport::Session&& session, + stdx::function<void(transport::Session*)> task) { + auto ctx = stdx::make_unique<Context>(std::move(session), std::move(task)); + + try { +#ifndef __linux__ // TODO: consider making this ifdef _WIN32 + stdx::thread(stdx::bind(runFunc, ctx.get())).detach(); + ctx.release(); +#else + pthread_attr_t attrs; + pthread_attr_init(&attrs); + pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED); + + static const size_t STACK_SIZE = + 1024 * 1024; // if we change this we need to update the warning + + struct rlimit limits; + invariant(getrlimit(RLIMIT_STACK, &limits) == 0); + if (limits.rlim_cur > STACK_SIZE) { + size_t stackSizeToSet = STACK_SIZE; +#if !__has_feature(address_sanitizer) + if (kDebugBuild) + stackSizeToSet /= 2; +#endif + pthread_attr_setstacksize(&attrs, stackSizeToSet); + } else if (limits.rlim_cur < 1024 * 1024) { + warning() << "Stack size set to " << (limits.rlim_cur / 1024) << "KB. We suggest 1MB"; + } + + + pthread_t thread; + int failed = pthread_create(&thread, &attrs, runFunc, ctx.get()); + + pthread_attr_destroy(&attrs); + + if (failed) { + log() << "pthread_create failed: " << errnoWithDescription(failed); + throw std::system_error( + std::make_error_code(std::errc::resource_unavailable_try_again)); + } + ctx.release(); +#endif // __linux__ + + } catch (...) { + log() << "failed to create service entry worker thread for " << ctx->session.remote(); + } +} + +} // namespace mongo diff --git a/src/mongo/transport/transport_layer.cpp b/src/mongo/transport/service_entry_point_utils.h index 20030e64661..42be4079f9c 100644 --- a/src/mongo/transport/transport_layer.cpp +++ b/src/mongo/transport/service_entry_point_utils.h @@ -26,19 +26,17 @@ * it in the license file. */ -#include "mongo/platform/basic.h" +#pragma once -#include "mongo/transport/transport_layer.h" +#include "mongo/stdx/functional.h" namespace mongo { -namespace transport { -TransportLayer::TransportLayer() = default; -TransportLayer::~TransportLayer() = default; +namespace transport { +class Session; +} // namespace transport -TicketImpl* TransportLayer::getTicketImpl(const Ticket& ticket) { - return ticket.impl(); -} +void launchWrappedServiceEntryWorkerThread(transport::Session&& session, + stdx::function<void(transport::Session*)> task); -} // namespace transport } // namespace mongo diff --git a/src/mongo/transport/session.cpp b/src/mongo/transport/session.cpp index b9f382a6965..ebf237f8bd4 100644 --- a/src/mongo/transport/session.cpp +++ b/src/mongo/transport/session.cpp @@ -43,11 +43,14 @@ AtomicUInt64 sessionIdCounter(0); } // namespace Session::Session(HostAndPort remote, HostAndPort local, TransportLayer* tl) - : _id(sessionIdCounter.addAndFetch(1)), _remote(remote), _local(local), _tl(tl) {} + : _id(sessionIdCounter.addAndFetch(1)), + _remote(std::move(remote)), + _local(std::move(local)), + _tags(kEmptyTagMask), + _tl(tl) {} Session::~Session() { if (_tl != nullptr) { - invariant(_tl); _tl->end(*this); } } @@ -75,16 +78,21 @@ Session& Session::operator=(Session&& other) { return *this; } -Session::SessionId Session::id() const { - return _id; +void Session::replaceTags(TagMask tags) { + _tags = tags; + _tl->registerTags(*this); } -const HostAndPort& Session::remote() const { - return _remote; +Ticket Session::sourceMessage(Message* message, Date_t expiration) { + return _tl->sourceMessage(*this, message, expiration); } -const HostAndPort& Session::local() const { - return _local; +Ticket Session::sinkMessage(const Message& message, Date_t expiration) { + return _tl->sinkMessage(*this, message, expiration); +} + +std::string Session::getX509SubjectName() const { + return _tl->getX509SubjectName(*this); } } // namespace transport diff --git a/src/mongo/transport/session.h b/src/mongo/transport/session.h index 4692f22437d..b340b0fecf8 100644 --- a/src/mongo/transport/session.h +++ b/src/mongo/transport/session.h @@ -29,7 +29,10 @@ #pragma once #include "mongo/base/disallow_copying.h" +#include "mongo/transport/session_id.h" +#include "mongo/transport/ticket.h" #include "mongo/util/net/hostandport.h" +#include "mongo/util/net/message.h" namespace mongo { namespace transport { @@ -47,7 +50,14 @@ public: /** * Type to indicate the internal id for this session. */ - using SessionId = uint64_t; + using Id = SessionId; + + /** + * Tags for groups of connections. + */ + using TagMask = uint32_t; + static constexpr TagMask kEmptyTagMask = 0; + static constexpr TagMask kKeepOpen = 1; /** * Construct a new session. @@ -68,24 +78,72 @@ public: /** * Return the id for this session. */ - SessionId id() const; + Id id() const { + return _id; + } /** * Return the remote host for this session. */ - const HostAndPort& remote() const; + const HostAndPort& remote() const { + return _remote; + } /** * Return the local host information for this session. */ - const HostAndPort& local() const; + const HostAndPort& local() const { + return _local; + } + + /** + * Return the X509 subject name for this connection (SSL only). + */ + std::string getX509SubjectName() const; + + /** + * Set this session's tags. This Session will register + * its new tags with its TransportLayer. + */ + void replaceTags(TagMask tags); + + /** + * Get this session's tags. + */ + TagMask getTags() const { + return _tags; + } + + /** + * Source (receive) a new Message for this Session. + * + * This method will forward to sourceMessage on this Session's transport layer. + */ + Ticket sourceMessage(Message* message, Date_t expiration = Ticket::kNoExpirationDate); + + /** + * Sink (send) a new Message for this Session. This method should be used + * to send replies to a given host. + * + * This method will forward to sinkMessage on this Session's transport layer. + */ + Ticket sinkMessage(const Message& message, Date_t expiration = Ticket::kNoExpirationDate); + + /** + * The TransportLayer for this Session. + */ + TransportLayer* getTransportLayer() const { + return _tl; + } private: - SessionId _id; + Id _id; HostAndPort _remote; HostAndPort _local; + TagMask _tags; + TransportLayer* _tl; }; diff --git a/src/mongo/transport/session_id.h b/src/mongo/transport/session_id.h new file mode 100644 index 00000000000..b1395ef1429 --- /dev/null +++ b/src/mongo/transport/session_id.h @@ -0,0 +1,39 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <cstdint> + +namespace mongo { +namespace transport { + +using SessionId = uint64_t; + +} // namespace transport +} // namespace mongo diff --git a/src/mongo/transport/ticket.cpp b/src/mongo/transport/ticket.cpp index 76bce1dc51e..77a290087dd 100644 --- a/src/mongo/transport/ticket.cpp +++ b/src/mongo/transport/ticket.cpp @@ -30,30 +30,27 @@ #include "mongo/transport/ticket.h" #include "mongo/transport/ticket_impl.h" +#include "mongo/transport/transport_layer.h" namespace mongo { namespace transport { const Date_t Ticket::kNoExpirationDate{Date_t::max()}; -Ticket::Ticket(std::unique_ptr<TicketImpl> ticket) : _ticket(std::move(ticket)) {} +Ticket::Ticket(TransportLayer* tl, std::unique_ptr<TicketImpl> ticket) + : _tl(tl), _ticket(std::move(ticket)) {} Ticket::~Ticket() = default; Ticket::Ticket(Ticket&&) = default; Ticket& Ticket::operator=(Ticket&&) = default; -Session::SessionId Ticket::sessionId() const { - return _ticket->sessionId(); +Status Ticket::wait()&& { + return _tl->wait(std::move(*this)); } -Date_t Ticket::expiration() const { - return _ticket->expiration(); -} - -// TODO should this actually be const? -TicketImpl* Ticket::impl() const { - return _ticket.get(); +void Ticket::asyncWait(TicketCallback cb)&& { + return _tl->asyncWait(std::move(*this), std::move(cb)); } } // namespace transport diff --git a/src/mongo/transport/ticket.h b/src/mongo/transport/ticket.h index 875a66b3602..dd8927085aa 100644 --- a/src/mongo/transport/ticket.h +++ b/src/mongo/transport/ticket.h @@ -28,14 +28,18 @@ #pragma once +#include <memory> + #include "mongo/base/disallow_copying.h" -#include "mongo/transport/session.h" +#include "mongo/stdx/functional.h" +#include "mongo/transport/session_id.h" +#include "mongo/transport/ticket_impl.h" #include "mongo/util/time_support.h" namespace mongo { namespace transport { -class TicketImpl; +class TransportLayer; /** * A Ticket represents some work to be done within the TransportLayer. @@ -46,16 +50,16 @@ class Ticket { MONGO_DISALLOW_COPYING(Ticket); public: - friend class TransportLayer; + using TicketCallback = stdx::function<void(Status)>; - using SessionId = Session::SessionId; + friend class TransportLayer; /** * Indicates that there is no expiration time by when a ticket needs to complete. */ static const Date_t kNoExpirationDate; - Ticket(std::unique_ptr<TicketImpl> ticket); + Ticket(TransportLayer* tl, std::unique_ptr<TicketImpl> ticket); ~Ticket(); /** @@ -67,20 +71,41 @@ public: /** * Return this ticket's session id. */ - SessionId sessionId() const; + SessionId sessionId() const { + return _ticket->sessionId(); + } /** * Return this ticket's expiration date. */ - Date_t expiration() const; + Date_t expiration() const { + return _ticket->expiration(); + } + + /** + * Wait for this ticket to be filled. + * + * This is this-rvalue qualified because it consumes the ticket + */ + Status wait() &&; + + /** + * Asynchronously wait for this ticket to be filled. + * + * This is this-rvalue qualified because it consumes the ticket + */ + void asyncWait(TicketCallback cb) &&; protected: /** * Return a non-owning pointer to the underlying TicketImpl type */ - TicketImpl* impl() const; + TicketImpl* impl() const { + return _ticket.get(); + } private: + TransportLayer* _tl; std::unique_ptr<TicketImpl> _ticket; }; diff --git a/src/mongo/transport/ticket_impl.h b/src/mongo/transport/ticket_impl.h index 378f9fb2999..4cbbc0a22d0 100644 --- a/src/mongo/transport/ticket_impl.h +++ b/src/mongo/transport/ticket_impl.h @@ -28,7 +28,8 @@ #pragma once -#include "mongo/transport/session.h" +#include "mongo/base/disallow_copying.h" +#include "mongo/transport/session_id.h" #include "mongo/util/time_support.h" namespace mongo { @@ -43,8 +44,6 @@ class TicketImpl { MONGO_DISALLOW_COPYING(TicketImpl); public: - using SessionId = Session::SessionId; - virtual ~TicketImpl() = default; TicketImpl(TicketImpl&&) = default; diff --git a/src/mongo/transport/transport_layer.h b/src/mongo/transport/transport_layer.h index 3028506c6f0..dde6248eaf5 100644 --- a/src/mongo/transport/transport_layer.h +++ b/src/mongo/transport/transport_layer.h @@ -57,7 +57,30 @@ class TransportLayer { MONGO_DISALLOW_COPYING(TransportLayer); public: - virtual ~TransportLayer(); + /** + * Stats for sessions open in the Transport Layer. + */ + struct Stats { + /** + * Returns the number of sessions currently open in the transport layer. + */ + size_t numOpenSessions = 0; + + /** + * Returns the total number of sessions that have ever been created by this TransportLayer. + */ + size_t numCreatedSessions = 0; + + /** + * Returns the number of available sessions we could still open. Only relevant + * when we are operating under a transport::Session limit (for example, in the + * legacy implementation, we respect a maximum number of connections). If there + * is no session limit, returns std::numeric_limits<int>::max(). + */ + size_t numAvailableSessions = 0; + }; + + virtual ~TransportLayer() = default; /** * Source (receive) a new Message for this Session. @@ -101,7 +124,7 @@ public: * This thread may be used by the TransportLayer to run other Tickets that were * enqueued prior to this call. */ - virtual Status wait(Ticket ticket) = 0; + virtual Status wait(Ticket&& ticket) = 0; /** * Callback for Tickets that are run via asyncWait(). @@ -115,7 +138,27 @@ public: * This thread will not be used by the TransportLayer to perform work. The callback * passed to asyncWait() may be run on any thread. */ - virtual void asyncWait(Ticket ticket, TicketCallback callback) = 0; + virtual void asyncWait(Ticket&& ticket, TicketCallback callback) = 0; + + /** + * Tag this Session within the TransportLayer with the tags currently assigned to the + * Session. If endAllSessions() is called with a matching + * Session::TagMask, this Session will not be ended. + * + * Before calling this method, use Session::replaceTags() to set the desired TagMask. + */ + virtual void registerTags(const Session& session) = 0; + + /** + * Return the stored X509 subject name for this session. If the session does not + * exist in this TransportLayer, returns "". + */ + virtual std::string getX509SubjectName(const Session& session) = 0; + + /** + * Returns the number of sessions currently open in the transport layer. + */ + virtual Stats sessionStats() = 0; /** * End the given Session. Tickets for this Session that have already been @@ -131,11 +174,20 @@ public: virtual void end(const Session& session) = 0; /** - * End all active sessions in the TransportLayer. Tickets that have already been - * started via wait() or asyncWait() will complete, but may return a failed Status. - * This method is synchronous and will not return until all sessions have ended. + * End all active sessions in the TransportLayer. Tickets that have already been started via + * wait() or asyncWait() will complete, but may return a failed Status. This method is + * asynchronous and will return after all sessions have been notified to end. + * + * If a TagMask is provided, endAllSessions() will skip over sessions with matching + * tags and leave them open. + */ + virtual void endAllSessions(Session::TagMask tags = Session::kEmptyTagMask) = 0; + + /** + * Start the TransportLayer. After this point, the TransportLayer will begin accepting active + * sessions from new transport::Endpoints. */ - virtual void endAllSessions() = 0; + virtual Status start() = 0; /** * Shut the TransportLayer down. After this point, the TransportLayer will @@ -147,12 +199,21 @@ public: virtual void shutdown() = 0; protected: - TransportLayer(); + TransportLayer() = default; /** * Return the implementation of this Ticket. */ - TicketImpl* getTicketImpl(const Ticket& ticket); + TicketImpl* getTicketImpl(const Ticket& ticket) { + return ticket.impl(); + } + + /** + * Return the transport layer of this Ticket. + */ + TransportLayer* getTicketTransportLayer(const Ticket& ticket) { + return ticket._tl; + } }; } // namespace transport diff --git a/src/mongo/transport/transport_layer_legacy.cpp b/src/mongo/transport/transport_layer_legacy.cpp new file mode 100644 index 00000000000..b45d6a6ad07 --- /dev/null +++ b/src/mongo/transport/transport_layer_legacy.cpp @@ -0,0 +1,307 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork + +#include "mongo/platform/basic.h" + +#include "mongo/transport/transport_layer_legacy.h" + +#include "mongo/base/checked_cast.h" +#include "mongo/config.h" +#include "mongo/db/service_context.h" +#include "mongo/db/stats/counters.h" +#include "mongo/stdx/functional.h" +#include "mongo/transport/service_entry_point.h" +#include "mongo/util/assert_util.h" +#include "mongo/util/log.h" +#include "mongo/util/net/abstract_message_port.h" +#include "mongo/util/net/socket_exception.h" + +namespace mongo { +namespace transport { + +TransportLayerLegacy::ListenerLegacy::ListenerLegacy(const TransportLayerLegacy::Options& opts, + NewConnectionCb callback) + : Listener("", opts.ipList, opts.port, getGlobalServiceContext(), true), + _accepted(std::move(callback)) {} + +void TransportLayerLegacy::ListenerLegacy::accepted(std::unique_ptr<AbstractMessagingPort> mp) { + _accepted(std::move(mp)); +} + +TransportLayerLegacy::TransportLayerLegacy(const TransportLayerLegacy::Options& opts, + std::shared_ptr<ServiceEntryPoint> sep) + : _sep(sep), + _listener(stdx::make_unique<ListenerLegacy>( + opts, + stdx::bind(&TransportLayerLegacy::_handleNewConnection, this, stdx::placeholders::_1))), + _running(false), + _options(opts) {} + +TransportLayerLegacy::LegacyTicket::LegacyTicket(const Session& session, + Date_t expiration, + WorkHandle work) + : _sessionId(session.id()), _expiration(expiration), _fill(std::move(work)) {} + +Session::Id TransportLayerLegacy::LegacyTicket::sessionId() const { + return _sessionId; +} + +Date_t TransportLayerLegacy::LegacyTicket::expiration() const { + return _expiration; +} + +Status TransportLayerLegacy::setup() { + if (!_listener->setupSockets()) { + error() << "Failed to set up sockets during startup."; + return {ErrorCodes::InternalError, "Failed to set up sockets"}; + } + + return Status::OK(); +} + +Status TransportLayerLegacy::start() { + if (_running.swap(true)) { + return {ErrorCodes::InternalError, "TransportLayer is already running"}; + } + + _listenerThread = stdx::thread([this]() { _listener->initAndListen(); }); + + return Status::OK(); +} + +TransportLayerLegacy::~TransportLayerLegacy() = default; + +Ticket TransportLayerLegacy::sourceMessage(const Session& session, + Message* message, + Date_t expiration) { + auto sourceCb = [message](AbstractMessagingPort* amp) -> Status { + if (!amp->recv(*message)) { + return {ErrorCodes::HostUnreachable, "Recv failed"}; + } + return Status::OK(); + }; + + return Ticket(this, stdx::make_unique<LegacyTicket>(session, expiration, std::move(sourceCb))); +} + +std::string TransportLayerLegacy::getX509SubjectName(const Session& session) { + { + stdx::lock_guard<stdx::mutex> lk(_connectionsMutex); + auto conn = _connections.find(session.id()); + if (conn == _connections.end()) { + // Return empty string if the session is not found + return ""; + } + + return conn->second.x509SubjectName.value_or(""); + } +} + +TransportLayer::Stats TransportLayerLegacy::sessionStats() { + Stats stats; + { + stdx::lock_guard<stdx::mutex> lk(_connectionsMutex); + stats.numOpenSessions = _connections.size(); + } + + stats.numAvailableSessions = Listener::globalTicketHolder.available(); + stats.numCreatedSessions = Listener::globalConnectionNumber.load(); + + return stats; +} + +Ticket TransportLayerLegacy::sinkMessage(const Session& session, + const Message& message, + Date_t expiration) { + auto sinkCb = [&message](AbstractMessagingPort* amp) -> Status { + try { + amp->say(message); + return Status::OK(); + } catch (const SocketException& e) { + return {ErrorCodes::HostUnreachable, e.what()}; + } + }; + + return Ticket(this, stdx::make_unique<LegacyTicket>(session, expiration, std::move(sinkCb))); +} + +Status TransportLayerLegacy::wait(Ticket&& ticket) { + return _runTicket(std::move(ticket)); +} + +void TransportLayerLegacy::asyncWait(Ticket&& ticket, TicketCallback callback) { + // Left unimplemented because there is no reasonable way to offer general async waiting besides + // offering a background thread that can handle waits for multiple tickets. We may never + // implement this for the legacy TL. + MONGO_UNREACHABLE; +} + +void TransportLayerLegacy::end(const Session& session) { + stdx::lock_guard<stdx::mutex> lk(_connectionsMutex); + auto conn = _connections.find(session.id()); + if (conn != _connections.end()) { + _endSession_inlock(conn); + } +} + +void TransportLayerLegacy::registerTags(const Session& session) { + stdx::lock_guard<stdx::mutex> lk(_connectionsMutex); + auto conn = _connections.find(session.id()); + if (conn != _connections.end()) { + conn->second.tags = session.getTags(); + } +} + +void TransportLayerLegacy::_endSession_inlock( + decltype(TransportLayerLegacy::_connections.begin()) conn) { + conn->second.amp->shutdown(); + + // If the amp is in use it means that we're in the middle of fulfilling the ticket in _runTicket + // and can't erase it immediately. + // + // In that case we'll rely on _runTicket to do the removal for us later + if (conn->second.inUse) { + conn->second.ended = true; + } else { + Listener::globalTicketHolder.release(); + _connections.erase(conn); + } +} + +void TransportLayerLegacy::endAllSessions(Session::TagMask tags) { + log() << "legacy transport layer ending all sessions"; + { + stdx::lock_guard<stdx::mutex> lk(_connectionsMutex); + auto&& conn = _connections.begin(); + while (conn != _connections.end()) { + // If we erase this connection below, we invalidate our iterator, use a placeholder. + auto placeholder = conn; + placeholder++; + + if (conn->second.tags & tags) { + log() << "Skip closing connection for connection # " << conn->second.connectionId; + } else { + _endSession_inlock(conn); + } + + conn = placeholder; + } + } +} + +void TransportLayerLegacy::shutdown() { + _running.store(false); + _listener->shutdown(); + _listenerThread.join(); + endAllSessions(); +} + +Status TransportLayerLegacy::_runTicket(Ticket ticket) { + if (!_running.load()) { + return {ErrorCodes::ShutdownInProgress, "TransportLayer in shutdown"}; + } + + if (ticket.expiration() < Date_t::now()) { + return {ErrorCodes::ExceededTimeLimit, "Ticket has expired"}; + } + + AbstractMessagingPort* amp; + + { + stdx::lock_guard<stdx::mutex> lk(_connectionsMutex); + + auto conn = _connections.find(ticket.sessionId()); + if (conn == _connections.end()) { + return {ErrorCodes::TransportSessionNotFound, "No such session in TransportLayer"}; + } + + // "check out" the port + conn->second.inUse = true; + amp = conn->second.amp.get(); + } + + amp->clearCounters(); + + auto legacyTicket = checked_cast<LegacyTicket*>(getTicketImpl(ticket)); + auto res = legacyTicket->_fill(amp); + + networkCounter.hit(amp->getBytesIn(), amp->getBytesOut()); + + { + stdx::lock_guard<stdx::mutex> lk(_connectionsMutex); + + auto conn = _connections.find(ticket.sessionId()); + invariant(conn != _connections.end()); + +#ifdef MONGO_CONFIG_SSL + // If we didn't have an X509 subject name, see if we have one now + if (!conn->second.x509SubjectName) { + auto name = amp->getX509SubjectName(); + if (name != "") { + conn->second.x509SubjectName = name; + } + } +#endif + conn->second.inUse = false; + + if (conn->second.ended) { + Listener::globalTicketHolder.release(); + _connections.erase(conn); + } + } + + return res; +} + +void TransportLayerLegacy::_handleNewConnection(std::unique_ptr<AbstractMessagingPort> amp) { + if (!Listener::globalTicketHolder.tryAcquire()) { + log() << "connection refused because too many open connections: " + << Listener::globalTicketHolder.used(); + amp->shutdown(); + return; + } + + Session session(amp->remote(), HostAndPort(amp->localAddr().toString(true)), this); + + amp->setLogLevel(logger::LogSeverity::Debug(1)); + + { + stdx::lock_guard<stdx::mutex> lk(_connectionsMutex); + _connections.emplace(std::piecewise_construct, + std::forward_as_tuple(session.id()), + std::forward_as_tuple(std::move(amp), false, session.getTags())); + } + + invariant(_sep); + _sep->startSession(std::move(session)); +} + +} // namespace transport +} // namespace mongo diff --git a/src/mongo/transport/transport_layer_legacy.h b/src/mongo/transport/transport_layer_legacy.h new file mode 100644 index 00000000000..ec17c76ca1e --- /dev/null +++ b/src/mongo/transport/transport_layer_legacy.h @@ -0,0 +1,174 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <unordered_map> + +#include "mongo/stdx/memory.h" +#include "mongo/stdx/mutex.h" +#include "mongo/stdx/thread.h" +#include "mongo/transport/ticket_impl.h" +#include "mongo/transport/transport_layer.h" +#include "mongo/util/net/listen.h" +#include "mongo/util/net/sock.h" + +namespace mongo { + +class AbstractMessagingPort; +class ServiceEntryPoint; + +namespace transport { + +/** + * A TransportLayer implementation based on legacy networking primitives (the Listener, + * AbstractMessagingPort). + */ +class TransportLayerLegacy final : public TransportLayer { + MONGO_DISALLOW_COPYING(TransportLayerLegacy); + +public: + struct Options { + int port; // port to bind to + std::string ipList; // addresses to bind to + + Options() : port(0), ipList("") {} + }; + + TransportLayerLegacy(const Options& opts, std::shared_ptr<ServiceEntryPoint> sep); + + ~TransportLayerLegacy(); + + Status setup(); + Status start() override; + + Ticket sourceMessage(const Session& session, + Message* message, + Date_t expiration = Ticket::kNoExpirationDate) override; + + Ticket sinkMessage(const Session& session, + const Message& message, + Date_t expiration = Ticket::kNoExpirationDate) override; + + Status wait(Ticket&& ticket) override; + void asyncWait(Ticket&& ticket, TicketCallback callback) override; + + void registerTags(const Session& session) override; + std::string getX509SubjectName(const Session& session) override; + + Stats sessionStats() override; + + void end(const Session& session) override; + void endAllSessions(transport::Session::TagMask tags = Session::kKeepOpen) override; + void shutdown() override; + +private: + void _handleNewConnection(std::unique_ptr<AbstractMessagingPort> amp); + + Status _runTicket(Ticket ticket); + + using NewConnectionCb = stdx::function<void(std::unique_ptr<AbstractMessagingPort>)>; + using WorkHandle = stdx::function<Status(AbstractMessagingPort*)>; + + /** + * A TicketImpl implementation for this TransportLayer. WorkHandle is a callable that + * can be invoked to fill this ticket. + */ + class LegacyTicket : public TicketImpl { + MONGO_DISALLOW_COPYING(LegacyTicket); + + public: + LegacyTicket(const Session& session, Date_t expiration, WorkHandle work); + + SessionId sessionId() const override; + Date_t expiration() const override; + + SessionId _sessionId; + Date_t _expiration; + + WorkHandle _fill; + }; + + /** + * This Listener wraps the interface in listen.h so that we may implement our own + * version of accepted(). + */ + class ListenerLegacy : public Listener { + public: + ListenerLegacy(const TransportLayerLegacy::Options& opts, NewConnectionCb callback); + + void runListener(); + + void accepted(std::unique_ptr<AbstractMessagingPort> mp) override; + + bool useUnixSockets() const override { + return true; + } + + private: + NewConnectionCb _accepted; + }; + + /** + * Connection object, to associate Session ids with AbstractMessagingPorts. + */ + struct Connection { + Connection(std::unique_ptr<AbstractMessagingPort> port, bool ended, Session::TagMask tags) + : amp(std::move(port)), + connectionId(amp->connectionId()), + tags(tags), + inUse(false), + ended(false) {} + + std::unique_ptr<AbstractMessagingPort> amp; + + const long long connectionId; + + boost::optional<std::string> x509SubjectName; + Session::TagMask tags; + bool inUse; + bool ended; + }; + + std::shared_ptr<ServiceEntryPoint> _sep; + + std::unique_ptr<Listener> _listener; + stdx::thread _listenerThread; + + stdx::mutex _connectionsMutex; + std::unordered_map<Session::Id, Connection> _connections; + + void _endSession_inlock(decltype(_connections.begin()) conn); + + AtomicWord<bool> _running; + + Options _options; +}; + +} // namespace transport +} // namespace mongo diff --git a/src/mongo/transport/transport_layer_manager.cpp b/src/mongo/transport/transport_layer_manager.cpp new file mode 100644 index 00000000000..c64023b2067 --- /dev/null +++ b/src/mongo/transport/transport_layer_manager.cpp @@ -0,0 +1,129 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/transport/transport_layer_manager.h" + +#include "mongo/base/status.h" +#include "mongo/stdx/memory.h" +#include "mongo/transport/session.h" +#include "mongo/util/time_support.h" +#include <limits> + +#include <iostream> + +namespace mongo { +namespace transport { + +TransportLayerManager::TransportLayerManager() = default; + +Ticket TransportLayerManager::sourceMessage(const Session& session, + Message* message, + Date_t expiration) { + return session.getTransportLayer()->sourceMessage(session, message, expiration); +} + +Ticket TransportLayerManager::sinkMessage(const Session& session, + const Message& message, + Date_t expiration) { + return session.getTransportLayer()->sinkMessage(session, message, expiration); +} + +Status TransportLayerManager::wait(Ticket&& ticket) { + return getTicketTransportLayer(ticket)->wait(std::move(ticket)); +} + +void TransportLayerManager::asyncWait(Ticket&& ticket, TicketCallback callback) { + return getTicketTransportLayer(ticket)->asyncWait(std::move(ticket), std::move(callback)); +} + +std::string TransportLayerManager::getX509SubjectName(const Session& session) { + return session.getX509SubjectName(); +} + +template <typename Callable> +void TransportLayerManager::_foreach(Callable&& cb) { + { + stdx::lock_guard<stdx::mutex> lk(_tlsMutex); + for (auto&& tl : _tls) { + cb(tl.get()); + } + } +} + +TransportLayer::Stats TransportLayerManager::sessionStats() { + Stats stats; + + _foreach([&](TransportLayer* tl) { + Stats s = tl->sessionStats(); + + stats.numOpenSessions += s.numOpenSessions; + stats.numCreatedSessions += s.numCreatedSessions; + if (std::numeric_limits<size_t>::max() - stats.numAvailableSessions < + s.numAvailableSessions) { + stats.numAvailableSessions = std::numeric_limits<size_t>::max(); + } else { + stats.numAvailableSessions += s.numAvailableSessions; + } + }); + + return stats; +} + +void TransportLayerManager::registerTags(const Session& session) { + session.getTransportLayer()->registerTags(session); +} + +void TransportLayerManager::end(const Session& session) { + session.getTransportLayer()->end(session); +} + +void TransportLayerManager::endAllSessions(Session::TagMask tags) { + _foreach([&tags](TransportLayer* tl) { tl->endAllSessions(tags); }); +} + +Status TransportLayerManager::start() { + return Status::OK(); +} + +void TransportLayerManager::shutdown() { + _foreach([](TransportLayer* tl) { tl->shutdown(); }); +} + +Status TransportLayerManager::addAndStartTransportLayer(std::unique_ptr<TransportLayer> tl) { + auto ptr = tl.get(); + { + stdx::lock_guard<stdx::mutex> lk(_tlsMutex); + _tls.emplace_back(std::move(tl)); + } + return ptr->start(); +} + +} // namespace transport +} // namespace mongo diff --git a/src/mongo/transport/transport_layer_manager.h b/src/mongo/transport/transport_layer_manager.h new file mode 100644 index 00000000000..391bdf4ebea --- /dev/null +++ b/src/mongo/transport/transport_layer_manager.h @@ -0,0 +1,90 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <vector> + +#include "mongo/base/status.h" +#include "mongo/stdx/mutex.h" +#include "mongo/transport/session.h" +#include "mongo/transport/ticket.h" +#include "mongo/transport/ticket_impl.h" +#include "mongo/transport/transport_layer.h" +#include "mongo/util/net/message.h" +#include "mongo/util/time_support.h" + +namespace mongo { +namespace transport { + +/** + * This TransportLayerManager is a TransportLayer implementation that holds other + * TransportLayers. Mongod and Mongos can treat this like the "only" TransportLayer + * and not be concerned with which other TransportLayer implementations it holds + * underneath. + */ +class TransportLayerManager final : public TransportLayer { + MONGO_DISALLOW_COPYING(TransportLayerManager); + +public: + TransportLayerManager(); + + Ticket sourceMessage(const Session& session, + Message* message, + Date_t expiration = Ticket::kNoExpirationDate) override; + Ticket sinkMessage(const Session& session, + const Message& message, + Date_t expiration = Ticket::kNoExpirationDate) override; + + Status wait(Ticket&& ticket) override; + void asyncWait(Ticket&& ticket, TicketCallback callback) override; + + std::string getX509SubjectName(const Session& session) override; + void registerTags(const Session& session) override; + + Stats sessionStats() override; + + void end(const Session& session) override; + void endAllSessions(Session::TagMask tags = Session::kEmptyTagMask) override; + + Status start() override; + + void shutdown() override; + + Status addAndStartTransportLayer(std::unique_ptr<TransportLayer> tl); + +private: + template <typename Callable> + void _foreach(Callable&& cb); + + stdx::mutex _tlsMutex; + std::vector<std::unique_ptr<TransportLayer>> _tls; +}; + +} // namespace transport +} // namespace mongo diff --git a/src/mongo/transport/transport_layer_mock.cpp b/src/mongo/transport/transport_layer_mock.cpp new file mode 100644 index 00000000000..c550570b401 --- /dev/null +++ b/src/mongo/transport/transport_layer_mock.cpp @@ -0,0 +1,95 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include <memory> + +#include "mongo/base/status.h" +#include "mongo/stdx/memory.h" +#include "mongo/transport/session.h" +#include "mongo/transport/ticket.h" +#include "mongo/transport/ticket_impl.h" +#include "mongo/transport/transport_layer_mock.h" +#include "mongo/util/time_support.h" + +#include "mongo/stdx/memory.h" + +namespace mongo { +namespace transport { + +Session::Id TransportLayerMock::MockTicket::sessionId() const { + return Session::Id{}; +} + +Date_t TransportLayerMock::MockTicket::expiration() const { + return Date_t::now(); +} + +Ticket TransportLayerMock::sourceMessage(const Session& session, + Message* message, + Date_t expiration) { + return Ticket(this, stdx::make_unique<MockTicket>()); +} + +Ticket TransportLayerMock::sinkMessage(const Session& session, + const Message& message, + Date_t expiration) { + return Ticket(this, stdx::make_unique<MockTicket>()); +} + +Status TransportLayerMock::wait(Ticket&& ticket) { + return Status::OK(); +} + +void TransportLayerMock::asyncWait(Ticket&& ticket, TicketCallback callback) { + callback(Status::OK()); +} + +std::string TransportLayerMock::getX509SubjectName(const Session& session) { + return session.getX509SubjectName(); +} + +TransportLayer::Stats TransportLayerMock::sessionStats() { + return Stats(); +} + +void TransportLayerMock::registerTags(const Session& session) {} + +void TransportLayerMock::end(const Session& session) {} + +void TransportLayerMock::endAllSessions(Session::TagMask tags) {} + +Status TransportLayerMock::start() { + return Status::OK(); +} + +void TransportLayerMock::shutdown() {} + +} // namespace transport +} // namespace mongo diff --git a/src/mongo/transport/transport_layer_mock.h b/src/mongo/transport/transport_layer_mock.h new file mode 100644 index 00000000000..400d6b02258 --- /dev/null +++ b/src/mongo/transport/transport_layer_mock.h @@ -0,0 +1,90 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/base/status.h" +#include "mongo/transport/session.h" +#include "mongo/transport/ticket.h" +#include "mongo/transport/ticket_impl.h" +#include "mongo/transport/transport_layer.h" +#include "mongo/util/net/message.h" +#include "mongo/util/time_support.h" + +namespace mongo { +namespace transport { + +/** + * This TransportLayerMock is a noop TransportLayer implementation. + */ +class TransportLayerMock : public TransportLayer { + MONGO_DISALLOW_COPYING(TransportLayerMock); + +public: + TransportLayerMock() = default; + + Ticket sourceMessage(const Session& session, + Message* message, + Date_t expiration = Ticket::kNoExpirationDate) override; + Ticket sinkMessage(const Session& session, + const Message& message, + Date_t expiration = Ticket::kNoExpirationDate) override; + + Status wait(Ticket&& ticket) override; + void asyncWait(Ticket&& ticket, TicketCallback callback) override; + + std::string getX509SubjectName(const Session& session) override; + void registerTags(const Session& session) override; + + Stats sessionStats() override; + + void end(const Session& session) override; + void endAllSessions(Session::TagMask tags = Session::kEmptyTagMask) override; + + Status start() override; + void shutdown() override; + +private: + /** + * A class for Tickets issued from the TransportLayerMock. These will + * route to the appropriate TransportLayer when run with wait() or + * asyncWait(). + */ + class MockTicket : public TicketImpl { + MONGO_DISALLOW_COPYING(MockTicket); + + public: + MockTicket() = default; + + Session::Id sessionId() const override; + Date_t expiration() const override; + }; +}; + +} // namespace transport +} // namespace mongo diff --git a/src/mongo/util/exit.cpp b/src/mongo/util/exit.cpp index 44cdf07f68b..7955a3483c6 100644 --- a/src/mongo/util/exit.cpp +++ b/src/mongo/util/exit.cpp @@ -32,6 +32,7 @@ #include "mongo/util/exit.h" +#include <boost/optional.hpp> #include <stack> #include "mongo/stdx/condition_variable.h" @@ -47,6 +48,7 @@ namespace { stdx::mutex shutdownMutex; stdx::condition_variable shutdownTasksComplete; +boost::optional<ExitCode> shutdownExitCode; bool shutdownTasksInProgress = false; AtomicUInt32 shutdownFlag; std::stack<stdx::function<void()>> shutdownTasks; @@ -69,6 +71,10 @@ MONGO_COMPILER_NORETURN void logAndQuickExit(ExitCode code) { quickExit(code); } +void setShutdownFlag() { + shutdownFlag.fetchAndAdd(1); +} + } // namespace bool inShutdown() { @@ -79,6 +85,13 @@ bool inShutdownStrict() { return shutdownFlag.load() != 0; } +ExitCode waitForShutdown() { + stdx::unique_lock<stdx::mutex> lk(shutdownMutex); + shutdownTasksComplete.wait(lk, [] { return static_cast<bool>(shutdownExitCode); }); + + return shutdownExitCode.get(); +} + void registerShutdownTask(stdx::function<void()> task) { stdx::lock_guard<stdx::mutex> lock(shutdownMutex); invariant(!inShutdown()); @@ -105,7 +118,7 @@ void shutdown(ExitCode code) { logAndQuickExit(code); } - shutdownFlag.fetchAndAdd(1); + setShutdownFlag(); shutdownTasksInProgress = true; shutdownTasksThreadId = stdx::this_thread::get_id(); @@ -117,6 +130,7 @@ void shutdown(ExitCode code) { { stdx::lock_guard<stdx::mutex> lock(shutdownMutex); shutdownTasksInProgress = false; + shutdownExitCode.emplace(code); } shutdownTasksComplete.notify_all(); @@ -133,7 +147,7 @@ void shutdownNoTerminate() { if (inShutdown()) return; - shutdownFlag.fetchAndAdd(1); + setShutdownFlag(); shutdownTasksInProgress = true; shutdownTasksThreadId = stdx::this_thread::get_id(); diff --git a/src/mongo/util/exit.h b/src/mongo/util/exit.h index fa6a8bbb85b..1aed81117a8 100644 --- a/src/mongo/util/exit.h +++ b/src/mongo/util/exit.h @@ -46,6 +46,11 @@ bool inShutdown(); bool inShutdownStrict(); /** + * Does not return until all shutdown tasks have run. + */ +ExitCode waitForShutdown(); + +/** * Registers a new shutdown task to be called when shutdown or * shutdownNoTerminate is called. If this function is invoked after * shutdown or shutdownNoTerminate has been called, std::terminate is diff --git a/src/mongo/util/net/SConscript b/src/mongo/util/net/SConscript index fa8899561e9..34105547ab6 100644 --- a/src/mongo/util/net/SConscript +++ b/src/mongo/util/net/SConscript @@ -95,17 +95,6 @@ env.Library( ) env.Library( - target="message_server_port", - source=[ - "message_server_port.cpp", - ], - LIBDEPS=[ - 'network', - '$BUILD_DIR/mongo/db/stats/counters', - ], -) - -env.Library( target='miniwebserver', source=[ 'miniwebserver.cpp', diff --git a/src/mongo/util/net/abstract_message_port.h b/src/mongo/util/net/abstract_message_port.h index c4ff5503f41..e4884e75133 100644 --- a/src/mongo/util/net/abstract_message_port.h +++ b/src/mongo/util/net/abstract_message_port.h @@ -32,6 +32,7 @@ #include "mongo/config.h" #include "mongo/logger/log_severity.h" +#include "mongo/stdx/functional.h" #include "mongo/util/net/message.h" #include "mongo/util/net/sockaddr.h" #include "mongo/util/time_support.h" @@ -88,6 +89,11 @@ public: virtual void say(Message& toSend, int responseTo = 0) = 0; /** + * Sends the message (does not set headers). + */ + virtual void say(const Message& toSend) = 0; + + /** * Sends the data over the socket. */ virtual void send(const char* data, int len, const char* context) = 0; diff --git a/src/mongo/util/net/asio_message_port.cpp b/src/mongo/util/net/asio_message_port.cpp index 88e37feb2d8..16fab7ea8c1 100644 --- a/src/mongo/util/net/asio_message_port.cpp +++ b/src/mongo/util/net/asio_message_port.cpp @@ -54,40 +54,6 @@ const char kGET[] = "GET"; const int kHeaderLen = sizeof(MSGHEADER::Value); const int kInitialMessageSize = 1024; -class ASIOPorts { -public: - void closeAll(AbstractMessagingPort::Tag skipMask) { - stdx::lock_guard<stdx::mutex> lock_guard(mutex); - for (auto&& port : _portSet) { - if (port->getTag() & skipMask) { - LOG(3) << "Skip closing connection # " << port->connectionId(); - continue; - } - LOG(3) << "Closing connection # " << port->connectionId(); - port->shutdown(); - } - } - - void insert(ASIOMessagingPort* p) { - stdx::lock_guard<stdx::mutex> lock_guard(mutex); - _portSet.insert(p); - } - - void erase(ASIOMessagingPort* p) { - stdx::lock_guard<stdx::mutex> lock_guard(mutex); - _portSet.erase(p); - } - -private: - stdx::mutex mutex; - std::unordered_set<ASIOMessagingPort*> _portSet; -}; - -// We "new" this so it will still be around when other automatic global vars are being destructed -// during termination. TODO: This is an artifact from MessagingPort and should be removed when we -// decide where to put networking global state. -ASIOPorts& asioPorts = *(new ASIOPorts()); - #ifdef MONGO_CONFIG_SSL struct ASIOSSLContextPair { ASIOSSLContext server; @@ -109,10 +75,6 @@ MONGO_INITIALIZER_WITH_PREREQUISITES(ASIOSSLContextSetup, ("SSLManager"))(Initia } // namespace -void ASIOMessagingPort::closeSockets(AbstractMessagingPort::Tag skipMask) { - asioPorts.closeAll(skipMask); -} - ASIOMessagingPort::ASIOMessagingPort(int fd, SockAddr farEnd) : _service(1), _timer(_service), @@ -145,7 +107,6 @@ ASIOMessagingPort::ASIOMessagingPort(int fd, SockAddr farEnd) farEnd.getType() == AF_UNIX ? 0 : IPPROTO_TCP), fd) { #endif // MONGO_CONFIG_SSL - asioPorts.insert(this); _getSocket().non_blocking(true); _remote = HostAndPort(farEnd.getAddr(), farEnd.getPort()); _timer.expires_at(decltype(_timer)::time_point::max()); @@ -177,7 +138,6 @@ ASIOMessagingPort::ASIOMessagingPort(Milliseconds timeout, logger::LogSeverity l #else _sock(_service) { #endif // MONGO_CONFIG_SSL - asioPorts.insert(this); if (*_timeout == Milliseconds(0)) { _timeout = boost::none; } @@ -187,7 +147,6 @@ ASIOMessagingPort::ASIOMessagingPort(Milliseconds timeout, logger::LogSeverity l ASIOMessagingPort::~ASIOMessagingPort() { shutdown(); - asioPorts.erase(this); } void ASIOMessagingPort::setTimeout(Milliseconds millis) { @@ -502,6 +461,11 @@ void ASIOMessagingPort::say(Message& toSend, int responseTo) { invariant(!toSend.empty()); toSend.header().setId(nextMessageId()); toSend.header().setResponseToMsgId(responseTo); + return say(const_cast<const Message&>(toSend)); +} + +void ASIOMessagingPort::say(const Message& toSend) { + invariant(!toSend.empty()); auto buf = toSend.buf(); if (buf) { send(buf, MsgData::ConstView(buf).getLen(), nullptr); diff --git a/src/mongo/util/net/asio_message_port.h b/src/mongo/util/net/asio_message_port.h index d474b03ef29..d3e5fc33b1c 100644 --- a/src/mongo/util/net/asio_message_port.h +++ b/src/mongo/util/net/asio_message_port.h @@ -78,6 +78,7 @@ public: void reply(Message& received, Message& response) override; void say(Message& toSend, int responseTo = 0) override; + void say(const Message& toSend) override; void send(const char* data, int len, const char*) override; void send(const std::vector<std::pair<char*, int>>& data, const char*) override; @@ -118,8 +119,6 @@ public: bool secure(SSLManagerInterface* ssl, const std::string& remoteHost) override; - static void closeSockets(AbstractMessagingPort::Tag skipMask = kSkipAllMask); - private: void _setTimerCallback(); asio::error_code _read(char* buf, std::size_t size); diff --git a/src/mongo/util/net/listen.cpp b/src/mongo/util/net/listen.cpp index deb30887e5c..65cec09e45a 100644 --- a/src/mongo/util/net/listen.cpp +++ b/src/mongo/util/net/listen.cpp @@ -280,7 +280,9 @@ void Listener::initAndListen() { } struct timeval maxSelectTime; - while (!inShutdown()) { + // The check against _finished allows us to actually stop the listener by signalling it through + // the _finished flag. + while (!inShutdown() && !_finished.load()) { fd_set fds[1]; FD_ZERO(fds); @@ -601,7 +603,7 @@ void Listener::_accepted(const std::shared_ptr<Socket>& psocket, long long conne port = stdx::make_unique<MessagingPort>(psocket); } port->setConnectionId(connectionId); - accepted(port.release()); + accepted(std::move(port)); } // ----- ListeningSockets ------- @@ -648,9 +650,8 @@ void Listener::checkTicketNumbers() { globalTicketHolder.resize(want); } -void Listener::closeMessagingPorts(AbstractMessagingPort::Tag skipMask) { - ASIOMessagingPort::closeSockets(skipMask); - MessagingPort::closeSockets(skipMask); +void Listener::shutdown() { + _finished.store(true); } TicketHolder Listener::globalTicketHolder(DEFAULT_MAX_CONN); diff --git a/src/mongo/util/net/listen.h b/src/mongo/util/net/listen.h index 0bbd3ad8a0b..f0c66a41d60 100644 --- a/src/mongo/util/net/listen.h +++ b/src/mongo/util/net/listen.h @@ -66,7 +66,7 @@ public: void initAndListen(); // never returns unless error (start a thread) /* spawn a thread, etc., then return */ - virtual void accepted(AbstractMessagingPort* mp) = 0; + virtual void accepted(std::unique_ptr<AbstractMessagingPort> mp) = 0; const int _port; @@ -83,6 +83,8 @@ public: */ void waitUntilListening() const; + void shutdown(); + private: std::vector<SockAddr> _mine; std::vector<SOCKET> _socks; @@ -94,6 +96,7 @@ private: mutable stdx::condition_variable _readyCondition; // Used to wait for changes to _ready // Boolean that indicates whether this Listener is ready to accept incoming network requests bool _ready; + AtomicBool _finished{false}; ServiceContext* _ctx; bool _setAsServiceCtxDecoration; @@ -119,13 +122,6 @@ public: /** makes sure user input is sane */ static void checkTicketNumbers(); - - /** - * This will close implementations of AbstractMessagingPort, skipping any that have tags - * matching `skipMask`. - */ - static void closeMessagingPorts( - AbstractMessagingPort::Tag skipMask = AbstractMessagingPort::kSkipAllMask); }; class ListeningSockets { diff --git a/src/mongo/util/net/message.h b/src/mongo/util/net/message.h index 37a6c0bb66b..601d02b9e12 100644 --- a/src/mongo/util/net/message.h +++ b/src/mongo/util/net/message.h @@ -458,11 +458,16 @@ public: return _buf.get(); } + const char* buf() const { + return _buf.get(); + } + std::string toString() const; SharedBuffer sharedBuffer() { return _buf; } + ConstSharedBuffer sharedBuffer() const { return _buf; } diff --git a/src/mongo/util/net/message_port.cpp b/src/mongo/util/net/message_port.cpp index 5051962ed5d..137d494c4ac 100644 --- a/src/mongo/util/net/message_port.cpp +++ b/src/mongo/util/net/message_port.cpp @@ -63,41 +63,6 @@ using std::string; /* messagingport -------------------------------------------------------------- */ -class Ports { - std::set<MessagingPort*> ports; - stdx::mutex m; - -public: - Ports() : ports() {} - void closeAll(AbstractMessagingPort::Tag skip_mask) { - stdx::lock_guard<stdx::mutex> bl(m); - for (std::set<MessagingPort*>::iterator i = ports.begin(); i != ports.end(); i++) { - if ((*i)->getTag() & skip_mask) { - LOG(3) << "Skip closing connection # " << (*i)->connectionId(); - continue; - } - LOG(3) << "Closing connection # " << (*i)->connectionId(); - (*i)->shutdown(); - } - } - void insert(MessagingPort* p) { - stdx::lock_guard<stdx::mutex> bl(m); - ports.insert(p); - } - void erase(MessagingPort* p) { - stdx::lock_guard<stdx::mutex> bl(m); - ports.erase(p); - } -}; - -// we "new" this so it is still be around when other automatic global vars -// are being destructed during termination. -Ports& ports = *(new Ports()); - -void MessagingPort::closeSockets(AbstractMessagingPort::Tag skipMask) { - ports.closeAll(skipMask); -} - MessagingPort::MessagingPort(int fd, const SockAddr& remote) : MessagingPort(std::make_shared<Socket>(fd, remote)) {} @@ -108,7 +73,6 @@ MessagingPort::MessagingPort(std::shared_ptr<Socket> sock) : _x509SubjectName(), _connectionId(), _tag(), _psock(std::move(sock)) { SockAddr sa = _psock->remoteAddr(); _remoteParsed = HostAndPort(sa.getAddr(), sa.getPort()); - ports.insert(this); } void MessagingPort::setTimeout(Milliseconds millis) { @@ -122,7 +86,6 @@ void MessagingPort::shutdown() { MessagingPort::~MessagingPort() { shutdown(); - ports.erase(this); } bool MessagingPort::recv(Message& m) { @@ -216,6 +179,7 @@ bool MessagingPort::call(Message& toSend, Message& response) { say(toSend); bool success = recv(response); if (success) { + invariant(!response.empty()); if (response.header().getResponseToMsgId() != toSend.header().getId()) { response.reset(); uasserted(40134, "Response ID did not match the sent message ID."); @@ -225,9 +189,15 @@ bool MessagingPort::call(Message& toSend, Message& response) { } void MessagingPort::say(Message& toSend, int responseTo) { - verify(!toSend.empty()); + invariant(!toSend.empty()); toSend.header().setId(nextMessageId()); toSend.header().setResponseToMsgId(responseTo); + + return say(const_cast<const Message&>(toSend)); +} + +void MessagingPort::say(const Message& toSend) { + invariant(!toSend.empty()); auto buf = toSend.buf(); if (buf) { send(buf, MsgData::ConstView(buf).getLen(), "say"); diff --git a/src/mongo/util/net/message_port.h b/src/mongo/util/net/message_port.h index 18931e20750..41ecc2fce3b 100644 --- a/src/mongo/util/net/message_port.h +++ b/src/mongo/util/net/message_port.h @@ -66,6 +66,7 @@ public: bool call(Message& toSend, Message& response) override; void say(Message& toSend, int responseTo = 0) override; + void say(const Message& toSend) override; unsigned remotePort() const override { return _psock->remotePort(); @@ -142,11 +143,6 @@ private: long long _connectionId; AbstractMessagingPort::Tag _tag; std::shared_ptr<Socket> _psock; - - -public: - static void closeSockets(AbstractMessagingPort::Tag skipMask = kSkipAllMask); }; - } // namespace mongo diff --git a/src/mongo/util/net/message_port_mock.cpp b/src/mongo/util/net/message_port_mock.cpp index 6869723129f..de07ba032ae 100644 --- a/src/mongo/util/net/message_port_mock.cpp +++ b/src/mongo/util/net/message_port_mock.cpp @@ -55,6 +55,7 @@ void MessagingPortMock::reply(Message& received, Message& response, int32_t resp void MessagingPortMock::reply(Message& received, Message& response) {} void MessagingPortMock::say(Message& toSend, int responseTo) {} +void MessagingPortMock::say(const Message& toSend) {} bool MessagingPortMock::connect(SockAddr& farEnd) { return true; diff --git a/src/mongo/util/net/message_port_mock.h b/src/mongo/util/net/message_port_mock.h index 61ab5be8958..721998f81f3 100644 --- a/src/mongo/util/net/message_port_mock.h +++ b/src/mongo/util/net/message_port_mock.h @@ -55,6 +55,7 @@ public: void reply(Message& received, Message& response) override; void say(Message& toSend, int responseTo = 0) override; + void say(const Message& toSend) override; bool connect(SockAddr& farEnd) override; diff --git a/src/mongo/util/net/message_server.h b/src/mongo/util/net/message_server.h deleted file mode 100644 index 544c859c6f7..00000000000 --- a/src/mongo/util/net/message_server.h +++ /dev/null @@ -1,83 +0,0 @@ -// message_server.h - -/* Copyright 2009 10gen Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ - -/* - abstract database server - async io core, worker thread system - */ - -#pragma once - -#include "mongo/platform/basic.h" - -namespace mongo { - -class ServiceContext; - -class MessageHandler { -public: - virtual ~MessageHandler() {} - - /** - * Called once when a socket is connected. - */ - virtual void connected(AbstractMessagingPort* p) = 0; - - /** - * Called every time a message comes in. Handler is responsible for responding to client. - */ - virtual void process(Message& m, AbstractMessagingPort* p) = 0; - - /** - * Called once, either when the client disconnects or when the process is shutting down. After - * close() is called, this handler's AbstractMessagingPort pointer (passed in via the - * connected() method) is no longer valid. - */ - virtual void close() = 0; -}; - -class MessageServer { -public: - struct Options { - int port; // port to bind to - std::string ipList; // addresses to bind to - - Options() : port(0), ipList("") {} - }; - - virtual ~MessageServer() {} - virtual void run() = 0; - virtual bool setupSockets() = 0; -}; - -// TODO use a factory here to decide between port and asio variations -MessageServer* createServer(const MessageServer::Options& opts, - std::shared_ptr<MessageHandler> handler, - ServiceContext* ctx); -} diff --git a/src/mongo/util/net/message_server_port.cpp b/src/mongo/util/net/message_server_port.cpp deleted file mode 100644 index e8432f8c510..00000000000 --- a/src/mongo/util/net/message_server_port.cpp +++ /dev/null @@ -1,244 +0,0 @@ -// message_server_port.cpp - -/* Copyright 2009 10gen Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork - -#include "mongo/platform/basic.h" - -#include <memory> -#include <system_error> - -#include "mongo/base/disallow_copying.h" -#include "mongo/config.h" -#include "mongo/db/lasterror.h" -#include "mongo/db/server_options.h" -#include "mongo/db/stats/counters.h" -#include "mongo/stdx/functional.h" -#include "mongo/stdx/memory.h" -#include "mongo/stdx/thread.h" -#include "mongo/util/concurrency/thread_name.h" -#include "mongo/util/concurrency/ticketholder.h" -#include "mongo/util/debug_util.h" -#include "mongo/util/exit.h" -#include "mongo/util/log.h" -#include "mongo/util/mongoutils/str.h" -#include "mongo/util/net/abstract_message_port.h" -#include "mongo/util/net/listen.h" -#include "mongo/util/net/message.h" -#include "mongo/util/net/message_server.h" -#include "mongo/util/net/socket_exception.h" -#include "mongo/util/net/ssl_manager.h" -#include "mongo/util/net/thread_idle_callback.h" -#include "mongo/util/quick_exit.h" -#include "mongo/util/scopeguard.h" - -#ifdef __linux__ // TODO: consider making this ifndef _WIN32 -#include <sys/resource.h> -#endif - -#if !defined(__has_feature) -#define __has_feature(x) 0 -#endif - -namespace mongo { - -using std::unique_ptr; -using std::endl; - -namespace { - -using MessagingPortWithHandler = std::pair<AbstractMessagingPort*, std::shared_ptr<MessageHandler>>; - -} // namespace - -class PortMessageServer : public MessageServer, public Listener { -public: - /** - * Creates a new message server. - * - * @param opts - * @param handler the handler to use. - */ - PortMessageServer(const MessageServer::Options& opts, - std::shared_ptr<MessageHandler> handler, - ServiceContext* ctx) - : Listener("", opts.ipList, opts.port, ctx, true), _handler(std::move(handler)) {} - - virtual void accepted(AbstractMessagingPort* mp) { - ScopeGuard sleepAfterClosingPort = MakeGuard(sleepmillis, 2); - auto portWithHandler = stdx::make_unique<MessagingPortWithHandler>(mp, _handler); - - if (!Listener::globalTicketHolder.tryAcquire()) { - log() << "connection refused because too many open connections: " - << Listener::globalTicketHolder.used() << endl; - return; - } - - try { -#ifndef __linux__ // TODO: consider making this ifdef _WIN32 - { - stdx::thread thr(stdx::bind(&handleIncomingMsg, portWithHandler.get())); - thr.detach(); - } -#else - pthread_attr_t attrs; - pthread_attr_init(&attrs); - pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED); - - static const size_t STACK_SIZE = - 1024 * 1024; // if we change this we need to update the warning - - struct rlimit limits; - verify(getrlimit(RLIMIT_STACK, &limits) == 0); - if (limits.rlim_cur > STACK_SIZE) { - size_t stackSizeToSet = STACK_SIZE; -#if !__has_feature(address_sanitizer) - if (kDebugBuild) - stackSizeToSet /= 2; -#endif - pthread_attr_setstacksize(&attrs, stackSizeToSet); - } else if (limits.rlim_cur < 1024 * 1024) { - warning() << "Stack size set to " << (limits.rlim_cur / 1024) - << "KB. We suggest 1MB" << endl; - } - - - pthread_t thread; - int failed = pthread_create(&thread, &attrs, &handleIncomingMsg, portWithHandler.get()); - - pthread_attr_destroy(&attrs); - - if (failed) { - log() << "pthread_create failed: " << errnoWithDescription(failed) << endl; - throw std::system_error( - std::make_error_code(std::errc::resource_unavailable_try_again)); - } -#endif // __linux__ - - portWithHandler.release(); - sleepAfterClosingPort.Dismiss(); - } catch (...) { - Listener::globalTicketHolder.release(); - log() << "failed to create thread after accepting new connection, closing connection"; - } - } - - virtual bool setupSockets() { - return Listener::setupSockets(); - } - - void run() { - initAndListen(); - } - - virtual bool useUnixSockets() const { - return true; - } - -private: - const std::shared_ptr<MessageHandler> _handler; - - /** - * Handles incoming messages from a given socket. - * - * Terminating conditions: - * 1. Assertions while handling the request. - * 2. Socket is closed. - * 3. Server is shutting down (based on inShutdown) - * - * @param arg this method is in charge of cleaning up the arg object. - * - * @return NULL - */ - static void* handleIncomingMsg(void* arg) { - TicketHolderReleaser connTicketReleaser(&Listener::globalTicketHolder); - - invariant(arg); - unique_ptr<MessagingPortWithHandler> portWithHandler( - static_cast<MessagingPortWithHandler*>(arg)); - auto mp = portWithHandler->first; - auto handler = portWithHandler->second; - - setThreadName(std::string(str::stream() << "conn" << mp->connectionId())); - mp->setLogLevel(logger::LogSeverity::Debug(1)); - - Message m; - int64_t counter = 0; - try { - handler->connected(mp); - ON_BLOCK_EXIT([handler]() { handler->close(); }); - - while (!inShutdown()) { - m.reset(); - mp->clearCounters(); - - if (!mp->recv(m)) { - if (!serverGlobalParams.quiet) { - int conns = Listener::globalTicketHolder.used() - 1; - const char* word = (conns == 1 ? " connection" : " connections"); - log() << "end connection " << mp->remote().toString() << " (" << conns - << word << " now open)" << endl; - } - break; - } - - handler->process(m, mp); - networkCounter.hit(mp->getBytesIn(), mp->getBytesOut()); - - // Occasionally we want to see if we're using too much memory. - if ((counter++ & 0xf) == 0) { - markThreadIdle(); - } - } - } catch (AssertionException& e) { - log() << "AssertionException handling request, closing client connection: " << e - << endl; - } catch (SocketException& e) { - log() << "SocketException handling request, closing client connection: " << e << endl; - } catch (const DBException& - e) { // must be right above std::exception to avoid catching subclasses - log() << "DBException handling request, closing client connection: " << e << endl; - } catch (std::exception& e) { - error() << "Uncaught std::exception: " << e.what() << ", terminating" << endl; - quickExit(EXIT_UNCAUGHT); - } - mp->shutdown(); - - return NULL; - } -}; - - -MessageServer* createServer(const MessageServer::Options& opts, - std::shared_ptr<MessageHandler> handler, - ServiceContext* ctx) { - return new PortMessageServer(opts, std::move(handler), ctx); -} - -} // namespace mongo diff --git a/src/mongo/util/net/miniwebserver.cpp b/src/mongo/util/net/miniwebserver.cpp index ec64f174560..78f81f86134 100644 --- a/src/mongo/util/net/miniwebserver.cpp +++ b/src/mongo/util/net/miniwebserver.cpp @@ -204,7 +204,7 @@ void MiniWebServer::_accepted(const std::shared_ptr<Socket>& psock, long long co } } -void MiniWebServer::accepted(AbstractMessagingPort* mp) { +void MiniWebServer::accepted(std::unique_ptr<AbstractMessagingPort> mp) { MONGO_UNREACHABLE; } diff --git a/src/mongo/util/net/miniwebserver.h b/src/mongo/util/net/miniwebserver.h index a2d0690225e..122281555fc 100644 --- a/src/mongo/util/net/miniwebserver.h +++ b/src/mongo/util/net/miniwebserver.h @@ -69,7 +69,7 @@ public: } // This is not currently used for the MiniWebServer. See SERVER-24200 - void accepted(AbstractMessagingPort* mp) override; + void accepted(std::unique_ptr<AbstractMessagingPort> mp) override; private: void _accepted(const std::shared_ptr<Socket>& psocket, long long connectionId) override; diff --git a/src/mongo/util/net/sockaddr.cpp b/src/mongo/util/net/sockaddr.cpp index 00960f2fd97..5236a76892f 100644 --- a/src/mongo/util/net/sockaddr.cpp +++ b/src/mongo/util/net/sockaddr.cpp @@ -46,8 +46,8 @@ #endif #endif +#include "mongo/bson/util/builder.h" #include "mongo/util/log.h" -#include "mongo/util/mongoutils/str.h" #include "mongo/util/net/sock.h" namespace mongo { @@ -150,10 +150,19 @@ bool SockAddr::isLocalHost() const { } std::string SockAddr::toString(bool includePort) const { - std::string out = getAddr(); - if (includePort && getType() != AF_UNIX && getType() != AF_UNSPEC) - out += mongoutils::str::stream() << ':' << getPort(); - return out; + if (includePort && (getType() != AF_UNIX) && (getType() != AF_UNSPEC)) { + StringBuilder ss; + + if (getType() == AF_INET6) { + ss << '[' << getAddr() << "]:" << getPort(); + } else { + ss << getAddr() << ':' << getPort(); + } + + return ss.str(); + } else { + return getAddr(); + } } sa_family_t SockAddr::getType() const { diff --git a/src/mongo/util/ntservice_test.cpp b/src/mongo/util/ntservice_test.cpp index 0a0fa6bde64..ad5b7e7ed2a 100644 --- a/src/mongo/util/ntservice_test.cpp +++ b/src/mongo/util/ntservice_test.cpp @@ -34,7 +34,6 @@ #include <string> #include <vector> - #include "mongo/db/client.h" #include "mongo/unittest/unittest.h" #include "mongo/util/ntservice.h" diff --git a/src/mongo/util/tcmalloc_server_status_section.cpp b/src/mongo/util/tcmalloc_server_status_section.cpp index 7a8eb49e25b..303076ba784 100644 --- a/src/mongo/util/tcmalloc_server_status_section.cpp +++ b/src/mongo/util/tcmalloc_server_status_section.cpp @@ -38,6 +38,8 @@ #include "mongo/base/init.h" #include "mongo/db/commands/server_status.h" +#include "mongo/db/service_context.h" +#include "mongo/transport/transport_layer.h" #include "mongo/util/log.h" #include "mongo/util/net/listen.h" #include "mongo/util/net/thread_idle_callback.h" @@ -58,7 +60,8 @@ stdx::mutex tcmallocCleanupLock; * favorable times. Ideally would do some milder cleanup or scavenge... */ void threadStateChange() { - if (Listener::globalTicketHolder.used() <= kManyClients) + if (getGlobalServiceContext()->getTransportLayer()->sessionStats().numOpenSessions <= + kManyClients) return; #if MONGO_HAVE_GPERFTOOLS_GET_THREAD_CACHE_SIZE |