diff options
Diffstat (limited to 'src/mongo/db')
24 files changed, 405 insertions, 171 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 92f97280964..ba8c1e855bd 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -100,6 +100,7 @@ env.Library( "dbmessage.cpp", ], LIBDEPS=[ + '$BUILD_DIR/mongo/transport/transport_layer_common', '$BUILD_DIR/mongo/util/net/network', ] ) @@ -402,7 +403,6 @@ env.Library( "$BUILD_DIR/mongo/rpc/command_reply", "$BUILD_DIR/mongo/rpc/command_request", "$BUILD_DIR/mongo/rpc/metadata", - "$BUILD_DIR/mongo/util/net/message_server_port", "$BUILD_DIR/mongo/util/net/miniwebserver", "$BUILD_DIR/mongo/util/processinfo", "$BUILD_DIR/mongo/util/signal_handlers", @@ -473,6 +473,8 @@ env.Library( '$BUILD_DIR/mongo/util/decorable', '$BUILD_DIR/mongo/util/fail_point', '$BUILD_DIR/mongo/util/net/hostandport', + '$BUILD_DIR/mongo/transport/transport_layer_common', + '$BUILD_DIR/mongo/transport/transport_layer_manager', ], ) diff --git a/src/mongo/db/auth/sasl_commands.cpp b/src/mongo/db/auth/sasl_commands.cpp index f006cded7d0..0fefd3b6cc9 100644 --- a/src/mongo/db/auth/sasl_commands.cpp +++ b/src/mongo/db/auth/sasl_commands.cpp @@ -187,10 +187,10 @@ Status doSaslStep(const ClientBasic* client, status = session->step(payload, &responsePayload); if (!status.isOK()) { - const SockAddr clientAddr = client->port()->remoteAddr(); log() << session->getMechanism() << " authentication failed for " << session->getPrincipalId() << " on " << session->getAuthenticationDatabase() - << " from client " << clientAddr.getAddr() << " ; " << status.toString() << std::endl; + << " from client " << client->getRemote().toString() << " ; " << status.toString() + << std::endl; sleepmillis(saslGlobalParams.authFailedDelay); // All the client needs to know is that authentication has failed. diff --git a/src/mongo/db/client.cpp b/src/mongo/db/client.cpp index 807a7650aa5..2a665268811 100644 --- a/src/mongo/db/client.cpp +++ b/src/mongo/db/client.cpp @@ -44,6 +44,7 @@ #include "mongo/db/lasterror.h" #include "mongo/db/service_context.h" #include "mongo/stdx/thread.h" +#include "mongo/transport/session.h" #include "mongo/util/concurrency/thread_name.h" #include "mongo/util/exit.h" #include "mongo/util/mongoutils/str.h" @@ -63,16 +64,16 @@ void Client::initThreadIfNotAlready() { initThreadIfNotAlready(getThreadName().c_str()); } -void Client::initThread(const char* desc, AbstractMessagingPort* mp) { - initThread(desc, getGlobalServiceContext(), mp); +void Client::initThread(const char* desc, transport::Session* session) { + initThread(desc, getGlobalServiceContext(), session); } -void Client::initThread(const char* desc, ServiceContext* service, AbstractMessagingPort* mp) { +void Client::initThread(const char* desc, ServiceContext* service, transport::Session* session) { invariant(currentClient.getMake()->get() == nullptr); std::string fullDesc; - if (mp != NULL) { - fullDesc = str::stream() << desc << mp->connectionId(); + if (session) { + fullDesc = str::stream() << desc << session->id(); } else { fullDesc = desc; } @@ -80,7 +81,7 @@ void Client::initThread(const char* desc, ServiceContext* service, AbstractMessa setThreadName(fullDesc.c_str()); // Create the client obj, attach to thread - *currentClient.get() = service->makeClient(fullDesc, mp); + *currentClient.get() = service->makeClient(fullDesc, session); } void Client::destroy() { @@ -98,11 +99,11 @@ int64_t generateSeed(const std::string& desc) { } } // namespace -Client::Client(std::string desc, ServiceContext* serviceContext, AbstractMessagingPort* p) - : ClientBasic(serviceContext, p), +Client::Client(std::string desc, ServiceContext* serviceContext, transport::Session* session) + : ClientBasic(serviceContext, session), _desc(std::move(desc)), _threadId(stdx::this_thread::get_id()), - _connectionId(p ? p->connectionId() : 0), + _connectionId(session ? session->id() : 0), _prng(generateSeed(_desc)) {} void Client::reportState(BSONObjBuilder& builder) { diff --git a/src/mongo/db/client.h b/src/mongo/db/client.h index 407a0bac868..a7523981e47 100644 --- a/src/mongo/db/client.h +++ b/src/mongo/db/client.h @@ -51,6 +51,10 @@ class AbstractMessagingPort; class Collection; class OperationContext; +namespace transport { +class Session; +} // namespace transport + typedef long long ConnectionId; /** the database's concept of an outside "client" */ @@ -59,16 +63,16 @@ public: /** * Creates a Client object and stores it in TLS for the current thread. * - * An unowned pointer to a AbstractMessagingPort may optionally be provided. If 'mp' is - * non-null, then it will be used to augment the thread name, and for reporting purposes. + * An unowned pointer to a transport::Session may optionally be provided. If 'session' + * is non-null, then it will be used to augment the thread name, and for reporting purposes. * - * If provided, 'mp' must outlive the newly-created Client object. Client::destroy() may be used - * to help enforce that the Client does not outlive 'mp'. + * If provided, 'session' must outlive the newly-created Client object. Client::destroy() may + * be used to help enforce that the Client does not outlive 'session.' */ - static void initThread(const char* desc, AbstractMessagingPort* mp = 0); + static void initThread(const char* desc, transport::Session* session = nullptr); static void initThread(const char* desc, ServiceContext* serviceContext, - AbstractMessagingPort* mp); + transport::Session* session); /** * Inits a thread if that thread has not already been init'd, setting the thread name to @@ -159,7 +163,7 @@ public: private: friend class ServiceContext; - Client(std::string desc, ServiceContext* serviceContext, AbstractMessagingPort* p = 0); + Client(std::string desc, ServiceContext* serviceContext, transport::Session* session = nullptr); // Description for the client (e.g. conn8) diff --git a/src/mongo/db/client_basic.cpp b/src/mongo/db/client_basic.cpp index 06e619597c3..578ebdf668d 100644 --- a/src/mongo/db/client_basic.cpp +++ b/src/mongo/db/client_basic.cpp @@ -32,8 +32,8 @@ namespace mongo { -ClientBasic::ClientBasic(ServiceContext* serviceContext, AbstractMessagingPort* messagingPort) - : _serviceContext(serviceContext), _messagingPort(messagingPort) {} +ClientBasic::ClientBasic(ServiceContext* serviceContext, transport::Session* session) + : _serviceContext(serviceContext), _session(session) {} ClientBasic::~ClientBasic() = default; diff --git a/src/mongo/db/client_basic.h b/src/mongo/db/client_basic.h index 88d7ef4afe8..09f31b5da42 100644 --- a/src/mongo/db/client_basic.h +++ b/src/mongo/db/client_basic.h @@ -31,6 +31,7 @@ #include <memory> #include "mongo/base/disallow_copying.h" +#include "mongo/transport/session.h" #include "mongo/util/decorable.h" #include "mongo/util/net/abstract_message_port.h" #include "mongo/util/net/hostandport.h" @@ -58,11 +59,12 @@ public: } bool hasRemote() const { - return _messagingPort; + return _session; } + HostAndPort getRemote() const { - verify(_messagingPort); - return _messagingPort->remote(); + verify(_session); + return _session->remote(); } /** @@ -73,20 +75,20 @@ public: } /** - * Returns the AbstractMessagePort to which this client session is bound, if any. + * Returns the Session to which this client is bound, if any. */ - AbstractMessagingPort* port() const { - return _messagingPort; + transport::Session* session() const { + return _session; } static ClientBasic* getCurrent(); protected: - ClientBasic(ServiceContext* serviceContext, AbstractMessagingPort* messagingPort); + ClientBasic(ServiceContext* serviceContext, transport::Session* session); ~ClientBasic(); private: ServiceContext* const _serviceContext; - AbstractMessagingPort* const _messagingPort; + transport::Session* const _session; }; } diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript index 919510c4b79..87c472dc519 100644 --- a/src/mongo/db/commands/SConscript +++ b/src/mongo/db/commands/SConscript @@ -71,6 +71,7 @@ env.Library( '$BUILD_DIR/mongo/s/coreshard', '$BUILD_DIR/mongo/s/write_ops/batch_write_types', '$BUILD_DIR/mongo/scripting/scripting_common', + '$BUILD_DIR/mongo/transport/transport_layer_common', '$BUILD_DIR/mongo/util/cmdline_utils/cmdline_utils', '$BUILD_DIR/mongo/util/foundation', '$BUILD_DIR/mongo/util/ntservice', diff --git a/src/mongo/db/commands/authentication_commands.cpp b/src/mongo/db/commands/authentication_commands.cpp index 6fc6268a807..2f05971ef56 100644 --- a/src/mongo/db/commands/authentication_commands.cpp +++ b/src/mongo/db/commands/authentication_commands.cpp @@ -56,6 +56,7 @@ #include "mongo/db/server_options.h" #include "mongo/platform/random.h" #include "mongo/stdx/memory.h" +#include "mongo/transport/session.h" #include "mongo/util/concurrency/mutex.h" #include "mongo/util/log.h" #include "mongo/util/md5.hpp" @@ -314,17 +315,17 @@ Status CmdAuthenticate::_authenticateX509(OperationContext* txn, ClientBasic* client = ClientBasic::getCurrent(); AuthorizationSession* authorizationSession = AuthorizationSession::get(client); - std::string clientSubjectName = client->port()->getX509SubjectName(); + auto clientName = client->session()->getX509SubjectName(); if (!getSSLManager()->getSSLConfiguration().hasCA) { return Status(ErrorCodes::AuthenticationFailed, "Unable to verify x.509 certificate, as no CA has been provided."); - } else if (user.getUser() != clientSubjectName) { + } else if (user.getUser() != clientName) { return Status(ErrorCodes::AuthenticationFailed, "There is no x.509 client certificate matching the user."); } else { // Handle internal cluster member auth, only applies to server-server connections - if (getSSLManager()->getSSLConfiguration().isClusterMember(clientSubjectName)) { + if (getSSLManager()->getSSLConfiguration().isClusterMember(clientName)) { int clusterAuthMode = serverGlobalParams.clusterAuthMode.load(); if (clusterAuthMode == ServerGlobalParams::ClusterAuthMode_undefined || clusterAuthMode == ServerGlobalParams::ClusterAuthMode_keyFile) { diff --git a/src/mongo/db/commands/server_status.cpp b/src/mongo/db/commands/server_status.cpp index bbe7e9930a7..e567a15b18b 100644 --- a/src/mongo/db/commands/server_status.cpp +++ b/src/mongo/db/commands/server_status.cpp @@ -47,9 +47,9 @@ #include "mongo/db/service_context.h" #include "mongo/db/stats/counters.h" #include "mongo/platform/process_id.h" +#include "mongo/transport/transport_layer.h" #include "mongo/util/log.h" #include "mongo/util/net/hostname_canonicalization_worker.h" -#include "mongo/util/net/listen.h" #include "mongo/util/net/ssl_manager.h" #include "mongo/util/processinfo.h" #include "mongo/util/ramlog.h" @@ -228,9 +228,10 @@ public: BSONObj generateSection(OperationContext* txn, const BSONElement& configElement) const { BSONObjBuilder bb; - bb.append("current", Listener::globalTicketHolder.used()); - bb.append("available", Listener::globalTicketHolder.available()); - bb.append("totalCreated", Listener::globalConnectionNumber.load()); + auto stats = txn->getServiceContext()->getTransportLayer()->sessionStats(); + bb.append("current", static_cast<int>(stats.numOpenSessions)); + bb.append("available", static_cast<int>(stats.numAvailableSessions)); + bb.append("totalCreated", static_cast<int>(stats.numCreatedSessions)); return bb.obj(); } diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index a10b0be5a39..56dff7cecaf 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -96,6 +96,7 @@ #include "mongo/db/service_context.h" #include "mongo/db/service_context_d.h" #include "mongo/db/service_context_d.h" +#include "mongo/db/service_entry_point_mongod.h" #include "mongo/db/startup_warnings_mongod.h" #include "mongo/db/stats/counters.h" #include "mongo/db/stats/snapshots.h" @@ -114,6 +115,7 @@ #include "mongo/stdx/future.h" #include "mongo/stdx/memory.h" #include "mongo/stdx/thread.h" +#include "mongo/transport/transport_layer_legacy.h" #include "mongo/util/assert_util.h" #include "mongo/util/cmdline_utils/censor_cmdline.h" #include "mongo/util/concurrency/task.h" @@ -124,7 +126,6 @@ #include "mongo/util/log.h" #include "mongo/util/net/hostname_canonicalization_worker.h" #include "mongo/util/net/listen.h" -#include "mongo/util/net/message_server.h" #include "mongo/util/net/ssl_manager.h" #include "mongo/util/ntservice.h" #include "mongo/util/options_parser/startup_options.h" @@ -169,65 +170,6 @@ ntservice::NtServiceDefaultStrings defaultServiceStrings = { Timer startupSrandTimer; -class MyMessageHandler : public MessageHandler { -public: - virtual void connected(AbstractMessagingPort* p) { - Client::initThread("conn", p); - } - - virtual void process(Message& m, AbstractMessagingPort* port) { - while (true) { - if (inShutdown()) { - log() << "got request after shutdown()" << endl; - break; - } - - DbResponse dbresponse; - { - auto opCtx = getGlobalServiceContext()->makeOperationContext(&cc()); - assembleResponse(opCtx.get(), m, dbresponse, port->remote()); - - // opCtx must go out of scope here so that the operation cannot show up in currentOp - // results after the response reaches the client - } - - if (!dbresponse.response.empty()) { - port->reply(m, dbresponse.response, dbresponse.responseToMsgId); - if (dbresponse.exhaustNS.size() > 0) { - MsgData::View header = dbresponse.response.header(); - QueryResult::View qr = header.view2ptr(); - long long cursorid = qr.getCursorId(); - if (cursorid) { - verify(dbresponse.exhaustNS.size() && dbresponse.exhaustNS[0]); - string ns = dbresponse.exhaustNS; // before reset() free's it... - m.reset(); - BufBuilder b(512); - b.appendNum((int)0 /*size set later*/); - b.appendNum(header.getId()); - b.appendNum(header.getResponseToMsgId()); - b.appendNum((int)dbGetMore); - b.appendNum((int)0); - b.appendStr(ns); - b.appendNum((int)0); // ntoreturn - b.appendNum(cursorid); - - MsgData::View header = b.buf(); - header.setLen(b.len()); - m.setData(b.release()); - DEV log() << "exhaust=true sending more"; - continue; // this goes back to top loop - } - } - } - break; - } - } - - virtual void close() { - Client::destroy(); - } -}; - static void logStartup(OperationContext* txn) { BSONObjBuilder toLog; stringstream id; @@ -530,7 +472,7 @@ static void _initWireSpec() { } -static void _initAndListen(int listenPort) { +static ExitCode _initAndListen(int listenPort) { Client::initThread("initandlisten"); _initWireSpec(); @@ -570,20 +512,18 @@ static void _initAndListen(int listenPort) { checked_cast<ServiceContextMongoD*>(getGlobalServiceContext())->createLockFile(); - // Due to SERVER-15389, we must setupSockets first thing at startup in order to avoid - // obtaining too high a file descriptor for our calls to select(). - MessageServer::Options options; + transport::TransportLayerLegacy::Options options; options.port = listenPort; options.ipList = serverGlobalParams.bind_ip; - auto handler = std::make_shared<MyMessageHandler>(); - MessageServer* server = createServer(options, std::move(handler), getGlobalServiceContext()); - - // This is what actually creates the sockets, but does not yet listen on them because we - // do not want connections to just hang if recovery takes a very long time. - if (!server->setupSockets()) { - error() << "Failed to set up sockets during startup."; - return; + // Create, start, and attach the TL + auto transportLayer = stdx::make_unique<transport::TransportLayerLegacy>( + options, + std::make_shared<ServiceEntryPointMongod>(getGlobalServiceContext()->getTransportLayer())); + auto res = transportLayer->setup(); + if (!res.isOK()) { + error() << "Failed to set up listener: " << res.toString(); + return EXIT_NET_ERROR; } std::shared_ptr<DbWebServer> dbWebServer; @@ -594,7 +534,7 @@ static void _initAndListen(int listenPort) { new RestAdminAccess())); if (!dbWebServer->setupSockets()) { error() << "Failed to set up sockets for HTTP interface during startup."; - return; + return EXIT_NET_ERROR; } } @@ -674,7 +614,7 @@ static void _initAndListen(int listenPort) { } if (mmapv1GlobalOptions.journalOptions & MMAPV1Options::JournalRecoverOnly) - return; + return EXIT_NET_ERROR; if (mongodGlobalParams.scriptingEnabled) { ScriptEngine::setup(); @@ -799,14 +739,19 @@ static void _initAndListen(int listenPort) { // MessageServer::run will return when exit code closes its socket and we don't need the // operation context anymore startupOpCtx.reset(); - server->run(); + + auto start = getGlobalServiceContext()->addAndStartTransportLayer(std::move(transportLayer)); + if (!start.isOK()) { + error() << "Failed to start the listener: " << start.toString(); + return EXIT_NET_ERROR; + } + + return waitForShutdown(); } ExitCode initAndListen(int listenPort) { try { - _initAndListen(listenPort); - - return inShutdown() ? EXIT_CLEAN : EXIT_NET_ERROR; + return _initAndListen(listenPort); } catch (DBException& e) { log() << "exception in initAndListen: " << e.toString() << ", terminating" << endl; return EXIT_UNCAUGHT; @@ -1001,7 +946,6 @@ static void reportEventToSystemImpl(const char* msg) { // registerShutdownTask is called below. It must not depend on the // prior execution of mongo initializers or the existence of threads. static void shutdownTask() { - auto serviceContext = getGlobalServiceContext(); Client::initThreadIfNotAlready(); @@ -1014,6 +958,7 @@ static void shutdownTask() { txn = uniqueTxn.get(); } + getGlobalServiceContext()->getTransportLayer()->shutdown(); log(LogComponent::kNetwork) << "shutdown: going to close listening sockets..." << endl; ListeningSockets::get()->closeAll(); @@ -1037,12 +982,6 @@ static void shutdownTask() { // will do it for us when the process terminates. stdx::packaged_task<void()> dryOutTask([] { - - // Walk all open sockets and close them. This should unblock any that are sitting in - // recv. Other sockets on which there are active operations should see the inShutdown flag - // as true when they complete the operation. - Listener::closeMessagingPorts(0); - // There isn't currently a way to wait on the TicketHolder to have all its tickets back, // unfortunately. So, busy wait in this detached thread. while (true) { diff --git a/src/mongo/db/dbmessage.cpp b/src/mongo/db/dbmessage.cpp index 67a5548dbfb..f16ec2b6276 100644 --- a/src/mongo/db/dbmessage.cpp +++ b/src/mongo/db/dbmessage.cpp @@ -30,7 +30,9 @@ #include "mongo/platform/basic.h" #include "mongo/db/dbmessage.h" +#include "mongo/db/operation_context.h" #include "mongo/platform/strnlen.h" +#include "mongo/transport/session.h" namespace mongo { @@ -173,20 +175,23 @@ OpQueryReplyBuilder::OpQueryReplyBuilder() : _buffer(32768) { _buffer.skip(sizeof(QueryResult::Value)); } -void OpQueryReplyBuilder::send(AbstractMessagingPort* destination, +void OpQueryReplyBuilder::send(transport::Session* session, int queryResultFlags, - Message& requestMsg, + const Message& requestMsg, int nReturned, int startingFrom, long long cursorId) { Message response; putInMessage(&response, queryResultFlags, nReturned, startingFrom, cursorId); - destination->reply(requestMsg, response, requestMsg.header().getId()); + + response.header().setId(nextMessageId()); + response.header().setResponseToMsgId(requestMsg.header().getId()); + + uassertStatusOK(session->sinkMessage(response).wait()); } -void OpQueryReplyBuilder::sendCommandReply(AbstractMessagingPort* destination, - Message& requestMsg) { - send(destination, /*queryFlags*/ 0, requestMsg, /*nReturned*/ 1); +void OpQueryReplyBuilder::sendCommandReply(transport::Session* session, const Message& requestMsg) { + send(session, /*queryFlags*/ 0, requestMsg, /*nReturned*/ 1); } void OpQueryReplyBuilder::putInMessage( @@ -202,7 +207,7 @@ void OpQueryReplyBuilder::putInMessage( } void replyToQuery(int queryResultFlags, - AbstractMessagingPort* p, + transport::Session* session, Message& requestMsg, const void* data, int size, @@ -211,15 +216,19 @@ void replyToQuery(int queryResultFlags, long long cursorId) { OpQueryReplyBuilder reply; reply.bufBuilderForResults().appendBuf(data, size); - reply.send(p, queryResultFlags, requestMsg, nReturned, startingFrom, cursorId); + reply.send(session, queryResultFlags, requestMsg, nReturned, startingFrom, cursorId); } void replyToQuery(int queryResultFlags, - AbstractMessagingPort* p, + transport::Session* session, Message& requestMsg, const BSONObj& responseObj) { - replyToQuery( - queryResultFlags, p, requestMsg, (void*)responseObj.objdata(), responseObj.objsize(), 1); + replyToQuery(queryResultFlags, + session, + requestMsg, + (void*)responseObj.objdata(), + responseObj.objsize(), + 1); } void replyToQuery(int queryResultFlags, Message& m, DbResponse& dbresponse, BSONObj obj) { diff --git a/src/mongo/db/dbmessage.h b/src/mongo/db/dbmessage.h index 7f1a3a2d0f2..7fabf117421 100644 --- a/src/mongo/db/dbmessage.h +++ b/src/mongo/db/dbmessage.h @@ -39,6 +39,12 @@ namespace mongo { +class OperationContext; + +namespace transport { +class Session; +} // namespace transport + /* db response format Query or GetMore: // see struct QueryResult @@ -352,9 +358,9 @@ public: /** * Finishes the reply and sends the message out to 'destination'. */ - void send(AbstractMessagingPort* destination, + void send(transport::Session* session, int queryResultFlags, - Message& requestMsg, // should be const but MessagePort::reply takes non-const. + const Message& requestMsg, int nReturned, int startingFrom = 0, long long cursorId = 0); @@ -362,14 +368,14 @@ public: /** * Similar to send() but used for replying to a command. */ - void sendCommandReply(AbstractMessagingPort* destination, Message& requestMsg); + void sendCommandReply(transport::Session* session, const Message& requestMsg); private: BufBuilder _buffer; }; void replyToQuery(int queryResultFlags, - AbstractMessagingPort* p, + transport::Session* session, Message& requestMsg, const void* data, int size, @@ -377,10 +383,9 @@ void replyToQuery(int queryResultFlags, int startingFrom = 0, long long cursorId = 0); - /* object reply helper. */ void replyToQuery(int queryResultFlags, - AbstractMessagingPort* p, + transport::Session* session, Message& requestMsg, const BSONObj& responseObj); diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index b6a2981dde0..b92ec300cf8 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -398,6 +398,7 @@ env.Library('repl_coordinator_impl', '$BUILD_DIR/mongo/db/service_context', '$BUILD_DIR/mongo/rpc/command_status', '$BUILD_DIR/mongo/rpc/metadata', + '$BUILD_DIR/mongo/transport/transport_layer_common', '$BUILD_DIR/mongo/util/fail_point', 'collection_cloner', 'data_replicator', @@ -627,6 +628,7 @@ env.Library('replica_set_messages', '$BUILD_DIR/mongo/client/connection_string', '$BUILD_DIR/mongo/db/common', '$BUILD_DIR/mongo/db/server_options_core', + '$BUILD_DIR/mongo/transport/transport_layer_common', '$BUILD_DIR/mongo/util/net/hostandport', 'optime', 'read_concern_args', diff --git a/src/mongo/db/repl/repl_set_request_votes.cpp b/src/mongo/db/repl/repl_set_request_votes.cpp index 4b34c351769..c01628466c0 100644 --- a/src/mongo/db/repl/repl_set_request_votes.cpp +++ b/src/mongo/db/repl/repl_set_request_votes.cpp @@ -35,6 +35,8 @@ #include "mongo/db/repl/repl_set_request_votes_args.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/executor/network_interface.h" +#include "mongo/transport/session.h" +#include "mongo/transport/transport_layer.h" #include "mongo/util/scopeguard.h" namespace mongo { @@ -64,16 +66,17 @@ private: // We want to keep request vote connection open when relinquishing primary. // Tag it here. - unsigned originalTag = 0; - AbstractMessagingPort* mp = txn->getClient()->port(); - if (mp) { - originalTag = mp->getTag(); - mp->setTag(originalTag | executor::NetworkInterface::kMessagingPortKeepOpen); + transport::Session::TagMask originalTag = 0; + transport::Session* session = txn->getClient()->session(); + if (session) { + originalTag = session->getTags(); + session->replaceTags(originalTag | transport::Session::kKeepOpen); } + // Untag the connection on exit. - ON_BLOCK_EXIT([mp, originalTag]() { - if (mp) { - mp->setTag(originalTag); + ON_BLOCK_EXIT([session, originalTag]() { + if (session) { + session->replaceTags(originalTag); } }); diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h index 582b8f45970..245b37cca7f 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state.h +++ b/src/mongo/db/repl/replication_coordinator_external_state.h @@ -192,8 +192,8 @@ public: virtual HostAndPort getClientHostAndPort(const OperationContext* txn) = 0; /** - * Closes all connections except those marked with the keepOpen property, which should - * just be connections used for heartbeating. + * Closes all connections in the given TransportLayer except those marked with the + * keepOpen property, which should just be connections used for heartbeating. * This is used during stepdown, and transition out of primary. */ virtual void closeConnections() = 0; diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 81ed9c16bf3..f140580fbcc 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -76,6 +76,8 @@ #include "mongo/stdx/functional.h" #include "mongo/stdx/memory.h" #include "mongo/stdx/thread.h" +#include "mongo/transport/session.h" +#include "mongo/transport/transport_layer.h" #include "mongo/util/assert_util.h" #include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/exit.h" @@ -449,7 +451,7 @@ HostAndPort ReplicationCoordinatorExternalStateImpl::getClientHostAndPort( } void ReplicationCoordinatorExternalStateImpl::closeConnections() { - Listener::closeMessagingPorts(executor::NetworkInterface::kMessagingPortKeepOpen); + getGlobalServiceContext()->getTransportLayer()->endAllSessions(transport::Session::kKeepOpen); } void ReplicationCoordinatorExternalStateImpl::killAllUserOperations(OperationContext* txn) { diff --git a/src/mongo/db/repl/replication_info.cpp b/src/mongo/db/repl/replication_info.cpp index cc844eaf35c..fd36395c017 100644 --- a/src/mongo/db/repl/replication_info.cpp +++ b/src/mongo/db/repl/replication_info.cpp @@ -234,9 +234,10 @@ public: // Tag connections to avoid closing them on stepdown. auto hangUpElement = cmdObj["hangUpOnStepDown"]; if (!hangUpElement.eoo() && !hangUpElement.trueValue()) { - AbstractMessagingPort* mp = txn->getClient()->port(); - if (mp) { - mp->setTag(mp->getTag() | executor::NetworkInterface::kMessagingPortKeepOpen); + auto session = txn->getClient()->session(); + if (session) { + session->replaceTags(session->getTags() | + executor::NetworkInterface::kMessagingPortKeepOpen); } } appendReplicationInfo(txn, result, 0); diff --git a/src/mongo/db/repl/replset_commands.cpp b/src/mongo/db/repl/replset_commands.cpp index 1b96dac313f..f591d64c11b 100644 --- a/src/mongo/db/repl/replset_commands.cpp +++ b/src/mongo/db/repl/replset_commands.cpp @@ -59,6 +59,8 @@ #include "mongo/db/service_context.h" #include "mongo/db/storage/storage_engine.h" #include "mongo/executor/network_interface.h" +#include "mongo/transport/session.h" +#include "mongo/transport/transport_layer.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" @@ -726,17 +728,17 @@ public: /* we want to keep heartbeat connections open when relinquishing primary. tag them here. */ - AbstractMessagingPort* mp = txn->getClient()->port(); - unsigned originalTag = 0; - if (mp) { - originalTag = mp->getTag(); - mp->setTag(originalTag | executor::NetworkInterface::kMessagingPortKeepOpen); + transport::Session::TagMask originalTag = 0; + transport::Session* session = txn->getClient()->session(); + if (session) { + originalTag = session->getTags(); + session->replaceTags(originalTag | transport::Session::kKeepOpen); } // Unset the tag on block exit - ON_BLOCK_EXIT([mp, originalTag]() { - if (mp) { - mp->setTag(originalTag); + ON_BLOCK_EXIT([session, originalTag]() { + if (session) { + session->replaceTags(originalTag); } }); diff --git a/src/mongo/db/s/collection_metadata_test.cpp b/src/mongo/db/s/collection_metadata_test.cpp index 013c135cc84..359cb3b4fc1 100644 --- a/src/mongo/db/s/collection_metadata_test.cpp +++ b/src/mongo/db/s/collection_metadata_test.cpp @@ -50,7 +50,7 @@ class NoChunkFixture : public ShardingCatalogTestFixture { protected: void setUp() { ShardingCatalogTestFixture::setUp(); - getMessagingPort()->setRemote(HostAndPort("FakeRemoteClient:34567")); + setRemote(HostAndPort("FakeRemoteClient:34567")); configTargeter()->setFindHostReturnValue(configHost); OID epoch = OID::gen(); @@ -318,7 +318,7 @@ class SingleChunkFixture : public ShardingCatalogTestFixture { protected: void setUp() { ShardingCatalogTestFixture::setUp(); - getMessagingPort()->setRemote(HostAndPort("FakeRemoteClient:34567")); + setRemote(HostAndPort("FakeRemoteClient:34567")); configTargeter()->setFindHostReturnValue(configHost); OID epoch = OID::gen(); @@ -574,7 +574,7 @@ class SingleChunkMinMaxCompoundKeyFixture : public ShardingCatalogTestFixture { protected: void setUp() { ShardingCatalogTestFixture::setUp(); - getMessagingPort()->setRemote(HostAndPort("FakeRemoteClient:34567")); + setRemote(HostAndPort("FakeRemoteClient:34567")); configTargeter()->setFindHostReturnValue(configHost); OID epoch = OID::gen(); @@ -642,7 +642,7 @@ class TwoChunksWithGapCompoundKeyFixture : public ShardingCatalogTestFixture { protected: void setUp() { ShardingCatalogTestFixture::setUp(); - getMessagingPort()->setRemote(HostAndPort("FakeRemoteClient:34567")); + setRemote(HostAndPort("FakeRemoteClient:34567")); configTargeter()->setFindHostReturnValue(configHost); OID epoch = OID::gen(); @@ -839,7 +839,7 @@ class ThreeChunkWithRangeGapFixture : public ShardingCatalogTestFixture { protected: void setUp() { ShardingCatalogTestFixture::setUp(); - getMessagingPort()->setRemote(HostAndPort("FakeRemoteClient:34567")); + setRemote(HostAndPort("FakeRemoteClient:34567")); configTargeter()->setFindHostReturnValue(configHost); OID epoch = OID::gen(); diff --git a/src/mongo/db/s/metadata_loader_test.cpp b/src/mongo/db/s/metadata_loader_test.cpp index c96b77a1590..e33e3142d1a 100644 --- a/src/mongo/db/s/metadata_loader_test.cpp +++ b/src/mongo/db/s/metadata_loader_test.cpp @@ -51,7 +51,7 @@ public: protected: void setUp() override { ShardingCatalogTestFixture::setUp(); - getMessagingPort()->setRemote(HostAndPort("FakeRemoteClient:34567")); + setRemote(HostAndPort("FakeRemoteClient:34567")); configTargeter()->setFindHostReturnValue(configHost); _maxCollVersion = ChunkVersion(1, 0, OID::gen()); _loader.reset(new MetadataLoader); diff --git a/src/mongo/db/service_context.cpp b/src/mongo/db/service_context.cpp index 03dc9bcaf8d..648fd546721 100644 --- a/src/mongo/db/service_context.cpp +++ b/src/mongo/db/service_context.cpp @@ -34,6 +34,9 @@ #include "mongo/db/client.h" #include "mongo/db/operation_context.h" #include "mongo/stdx/memory.h" +#include "mongo/transport/session.h" +#include "mongo/transport/transport_layer.h" +#include "mongo/transport/transport_layer_manager.h" #include "mongo/util/assert_util.h" #include "mongo/util/mongoutils/str.h" #include "mongo/util/system_clock_source.h" @@ -115,7 +118,8 @@ Status validateStorageOptions( } ServiceContext::ServiceContext() - : _tickSource(stdx::make_unique<SystemTickSource>()), + : _transportLayerManager(stdx::make_unique<transport::TransportLayerManager>()), + _tickSource(stdx::make_unique<SystemTickSource>()), _fastClockSource(stdx::make_unique<SystemClockSource>()), _preciseClockSource(stdx::make_unique<SystemClockSource>()) {} @@ -125,8 +129,8 @@ ServiceContext::~ServiceContext() { } ServiceContext::UniqueClient ServiceContext::makeClient(std::string desc, - AbstractMessagingPort* p) { - std::unique_ptr<Client> client(new Client(std::move(desc), this, p)); + transport::Session* session) { + std::unique_ptr<Client> client(new Client(std::move(desc), this, session)); auto observer = _clientObservers.cbegin(); try { for (; observer != _clientObservers.cend(); ++observer) { @@ -150,6 +154,14 @@ ServiceContext::UniqueClient ServiceContext::makeClient(std::string desc, return UniqueClient(client.release()); } +transport::TransportLayer* ServiceContext::getTransportLayer() const { + return _transportLayerManager.get(); +} + +Status ServiceContext::addAndStartTransportLayer(std::unique_ptr<transport::TransportLayer> tl) { + return _transportLayerManager->addAndStartTransportLayer(std::move(tl)); +} + TickSource* ServiceContext::getTickSource() const { return _tickSource.get(); } diff --git a/src/mongo/db/service_context.h b/src/mongo/db/service_context.h index fe6a4432d70..174a52ebe52 100644 --- a/src/mongo/db/service_context.h +++ b/src/mongo/db/service_context.h @@ -48,6 +48,12 @@ class Client; class OperationContext; class OpObserver; +namespace transport { +class Session; +class TransportLayer; +class TransportLayerManager; +} // namespace transport + /** * Classes that implement this interface can receive notification on killOp. * @@ -207,9 +213,9 @@ public: * * The "desc" string is used to set a descriptive name for the client, used in logging. * - * If supplied, "p" is the communication channel used for communicating with the client. + * If supplied, "session" is the transport::Session used for communicating with the client. */ - UniqueClient makeClient(std::string desc, AbstractMessagingPort* p = nullptr); + UniqueClient makeClient(std::string desc, transport::Session* session = nullptr); /** * Creates a new OperationContext on "client". @@ -299,6 +305,26 @@ public: void registerKillOpListener(KillOpListenerInterface* listener); // + // Transport. + // + + /** + * Get the master TransportLayer. Routes to all other TransportLayers that + * may be in use within this service. + * + * See TransportLayerManager for more details. + */ + transport::TransportLayer* getTransportLayer() const; + + /** + * Add a new TransportLayer to this service context. The new TransportLayer will + * be added to the TransportLayerManager accessible via getTransportLayer(). + * + * It additionally calls start() on the TransportLayer after adding it. + */ + Status addAndStartTransportLayer(std::unique_ptr<transport::TransportLayer> tl); + + // // Global OpObserver. // @@ -370,6 +396,11 @@ private: /** + * The TransportLayerManager. + */ + std::unique_ptr<transport::TransportLayerManager> _transportLayerManager; + + /** * Vector of registered observers. */ std::vector<std::unique_ptr<ClientObserver>> _clientObservers; diff --git a/src/mongo/db/service_entry_point_mongod.cpp b/src/mongo/db/service_entry_point_mongod.cpp new file mode 100644 index 00000000000..86f782a0bad --- /dev/null +++ b/src/mongo/db/service_entry_point_mongod.cpp @@ -0,0 +1,153 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork + +#include "mongo/platform/basic.h" + +#include "mongo/db/service_entry_point_mongod.h" + +#include <vector> + +#include "mongo/db/client.h" +#include "mongo/db/dbmessage.h" +#include "mongo/db/instance.h" +#include "mongo/stdx/thread.h" +#include "mongo/transport/service_entry_point_utils.h" +#include "mongo/transport/session.h" +#include "mongo/transport/ticket.h" +#include "mongo/transport/transport_layer.h" +#include "mongo/util/exit.h" +#include "mongo/util/log.h" +#include "mongo/util/net/message.h" +#include "mongo/util/net/socket_exception.h" +#include "mongo/util/net/thread_idle_callback.h" +#include "mongo/util/quick_exit.h" + +namespace mongo { +namespace { + +// Set up proper headers for formatting an exhaust request, if we need to +bool setExhaustMessage(Message* m, const DbResponse& dbresponse) { + MsgData::View header = dbresponse.response.header(); + QueryResult::View qr = header.view2ptr(); + long long cursorid = qr.getCursorId(); + + if (!cursorid) { + return false; + } + + verify(dbresponse.exhaustNS.size() && dbresponse.exhaustNS[0]); + + auto ns = dbresponse.exhaustNS; // reset() will free this + + m->reset(); + + BufBuilder b(512); + b.appendNum(static_cast<int>(0) /* size set later in appendData() */); + b.appendNum(header.getId()); + b.appendNum(header.getResponseToMsgId()); + b.appendNum(static_cast<int>(dbGetMore)); + b.appendNum(static_cast<int>(0)); + b.appendStr(ns); + b.appendNum(static_cast<int>(0)); // ntoreturn + b.appendNum(cursorid); + + MsgData::View(b.buf()).setLen(b.len()); + m->setData(b.release()); + + return true; +} + +} // namespace + +using transport::Session; +using transport::TransportLayer; + +ServiceEntryPointMongod::ServiceEntryPointMongod(TransportLayer* tl) : _tl(tl) {} + +void ServiceEntryPointMongod::startSession(Session&& session) { + launchWrappedServiceEntryWorkerThread(std::move(session), + [this](Session* session) { _sessionLoop(session); }); +} + +void ServiceEntryPointMongod::_sessionLoop(Session* session) { + Message inMessage; + bool inExhaust = false; + int64_t counter = 0; + + while (true) { + // 1. Source a Message from the client (unless we are exhausting) + if (!inExhaust) { + inMessage.reset(); + auto status = session->sourceMessage(&inMessage).wait(); + + if (ErrorCodes::isInterruption(status.code())) { + break; + } + + uassertStatusOK(status); + } + + // 2. Pass sourced Message up to mongod + DbResponse dbresponse; + { + auto opCtx = cc().makeOperationContext(); + assembleResponse(opCtx.get(), inMessage, dbresponse, session->remote()); + + // opCtx must go out of scope here so that the operation cannot show + // up in currentOp results after the response reaches the client + } + + // 3. Format our response, if we have one + Message& toSink = dbresponse.response; + if (!toSink.empty()) { + toSink.header().setId(nextMessageId()); + toSink.header().setResponseToMsgId(inMessage.header().getId()); + + // If this is an exhaust cursor, don't source more Messages + if (dbresponse.exhaustNS.size() > 0 && setExhaustMessage(&inMessage, dbresponse)) { + log() << "we are in exhaust"; + inExhaust = true; + } else { + inExhaust = false; + } + + // 4. Sink our response to the client + uassertStatusOK(session->sinkMessage(toSink).wait()); + } else { + inExhaust = false; + } + + if ((counter++ & 0xf) == 0) { + markThreadIdle(); + } + } +} + +} // namespace mongo diff --git a/src/mongo/db/service_entry_point_mongod.h b/src/mongo/db/service_entry_point_mongod.h new file mode 100644 index 00000000000..31ed05098c8 --- /dev/null +++ b/src/mongo/db/service_entry_point_mongod.h @@ -0,0 +1,63 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <vector> + +#include "mongo/base/disallow_copying.h" +#include "mongo/transport/service_entry_point.h" + +namespace mongo { + +namespace transport { +class Session; +class TransportLayer; +} // namespace transport + +/** + * The entry point from the TransportLayer into Mongod. startSession() spawns and + * detaches a new thread for each incoming connection (transport::Session). + */ +class ServiceEntryPointMongod final : public ServiceEntryPoint { + MONGO_DISALLOW_COPYING(ServiceEntryPointMongod); + +public: + explicit ServiceEntryPointMongod(transport::TransportLayer* tl); + + virtual ~ServiceEntryPointMongod() = default; + + void startSession(transport::Session&& session) override; + +private: + void _sessionLoop(transport::Session* session); + + transport::TransportLayer* _tl; +}; + +} // namespace mongo |