summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorSamantha Ritter <samantha.ritter@10gen.com>2016-05-31 14:05:17 -0400
committerJason Carey <jcarey@argv.me>2016-07-12 18:38:37 -0400
commitc263ce1f95586f8652058e6202015a77f9becc49 (patch)
treed623fb9da9fd5da3cc4e20cac0653f1fa4af00eb /src/mongo/db
parentdead3cf8b4b3cb5528ad1abb9eeb722b395e3632 (diff)
downloadmongo-c263ce1f95586f8652058e6202015a77f9becc49.tar.gz
SERVER-24162 Integrate TransportLayer
Expand the transport layer as needed to replace uses of abstract message port for ingress networking.
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/SConscript4
-rw-r--r--src/mongo/db/auth/sasl_commands.cpp4
-rw-r--r--src/mongo/db/client.cpp19
-rw-r--r--src/mongo/db/client.h18
-rw-r--r--src/mongo/db/client_basic.cpp4
-rw-r--r--src/mongo/db/client_basic.h18
-rw-r--r--src/mongo/db/commands/SConscript1
-rw-r--r--src/mongo/db/commands/authentication_commands.cpp7
-rw-r--r--src/mongo/db/commands/server_status.cpp9
-rw-r--r--src/mongo/db/db.cpp109
-rw-r--r--src/mongo/db/dbmessage.cpp31
-rw-r--r--src/mongo/db/dbmessage.h17
-rw-r--r--src/mongo/db/repl/SConscript2
-rw-r--r--src/mongo/db/repl/repl_set_request_votes.cpp19
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state.h4
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp4
-rw-r--r--src/mongo/db/repl/replication_info.cpp7
-rw-r--r--src/mongo/db/repl/replset_commands.cpp18
-rw-r--r--src/mongo/db/s/collection_metadata_test.cpp10
-rw-r--r--src/mongo/db/s/metadata_loader_test.cpp2
-rw-r--r--src/mongo/db/service_context.cpp18
-rw-r--r--src/mongo/db/service_context.h35
-rw-r--r--src/mongo/db/service_entry_point_mongod.cpp153
-rw-r--r--src/mongo/db/service_entry_point_mongod.h63
24 files changed, 405 insertions, 171 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 92f97280964..ba8c1e855bd 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -100,6 +100,7 @@ env.Library(
"dbmessage.cpp",
],
LIBDEPS=[
+ '$BUILD_DIR/mongo/transport/transport_layer_common',
'$BUILD_DIR/mongo/util/net/network',
]
)
@@ -402,7 +403,6 @@ env.Library(
"$BUILD_DIR/mongo/rpc/command_reply",
"$BUILD_DIR/mongo/rpc/command_request",
"$BUILD_DIR/mongo/rpc/metadata",
- "$BUILD_DIR/mongo/util/net/message_server_port",
"$BUILD_DIR/mongo/util/net/miniwebserver",
"$BUILD_DIR/mongo/util/processinfo",
"$BUILD_DIR/mongo/util/signal_handlers",
@@ -473,6 +473,8 @@ env.Library(
'$BUILD_DIR/mongo/util/decorable',
'$BUILD_DIR/mongo/util/fail_point',
'$BUILD_DIR/mongo/util/net/hostandport',
+ '$BUILD_DIR/mongo/transport/transport_layer_common',
+ '$BUILD_DIR/mongo/transport/transport_layer_manager',
],
)
diff --git a/src/mongo/db/auth/sasl_commands.cpp b/src/mongo/db/auth/sasl_commands.cpp
index f006cded7d0..0fefd3b6cc9 100644
--- a/src/mongo/db/auth/sasl_commands.cpp
+++ b/src/mongo/db/auth/sasl_commands.cpp
@@ -187,10 +187,10 @@ Status doSaslStep(const ClientBasic* client,
status = session->step(payload, &responsePayload);
if (!status.isOK()) {
- const SockAddr clientAddr = client->port()->remoteAddr();
log() << session->getMechanism() << " authentication failed for "
<< session->getPrincipalId() << " on " << session->getAuthenticationDatabase()
- << " from client " << clientAddr.getAddr() << " ; " << status.toString() << std::endl;
+ << " from client " << client->getRemote().toString() << " ; " << status.toString()
+ << std::endl;
sleepmillis(saslGlobalParams.authFailedDelay);
// All the client needs to know is that authentication has failed.
diff --git a/src/mongo/db/client.cpp b/src/mongo/db/client.cpp
index 807a7650aa5..2a665268811 100644
--- a/src/mongo/db/client.cpp
+++ b/src/mongo/db/client.cpp
@@ -44,6 +44,7 @@
#include "mongo/db/lasterror.h"
#include "mongo/db/service_context.h"
#include "mongo/stdx/thread.h"
+#include "mongo/transport/session.h"
#include "mongo/util/concurrency/thread_name.h"
#include "mongo/util/exit.h"
#include "mongo/util/mongoutils/str.h"
@@ -63,16 +64,16 @@ void Client::initThreadIfNotAlready() {
initThreadIfNotAlready(getThreadName().c_str());
}
-void Client::initThread(const char* desc, AbstractMessagingPort* mp) {
- initThread(desc, getGlobalServiceContext(), mp);
+void Client::initThread(const char* desc, transport::Session* session) {
+ initThread(desc, getGlobalServiceContext(), session);
}
-void Client::initThread(const char* desc, ServiceContext* service, AbstractMessagingPort* mp) {
+void Client::initThread(const char* desc, ServiceContext* service, transport::Session* session) {
invariant(currentClient.getMake()->get() == nullptr);
std::string fullDesc;
- if (mp != NULL) {
- fullDesc = str::stream() << desc << mp->connectionId();
+ if (session) {
+ fullDesc = str::stream() << desc << session->id();
} else {
fullDesc = desc;
}
@@ -80,7 +81,7 @@ void Client::initThread(const char* desc, ServiceContext* service, AbstractMessa
setThreadName(fullDesc.c_str());
// Create the client obj, attach to thread
- *currentClient.get() = service->makeClient(fullDesc, mp);
+ *currentClient.get() = service->makeClient(fullDesc, session);
}
void Client::destroy() {
@@ -98,11 +99,11 @@ int64_t generateSeed(const std::string& desc) {
}
} // namespace
-Client::Client(std::string desc, ServiceContext* serviceContext, AbstractMessagingPort* p)
- : ClientBasic(serviceContext, p),
+Client::Client(std::string desc, ServiceContext* serviceContext, transport::Session* session)
+ : ClientBasic(serviceContext, session),
_desc(std::move(desc)),
_threadId(stdx::this_thread::get_id()),
- _connectionId(p ? p->connectionId() : 0),
+ _connectionId(session ? session->id() : 0),
_prng(generateSeed(_desc)) {}
void Client::reportState(BSONObjBuilder& builder) {
diff --git a/src/mongo/db/client.h b/src/mongo/db/client.h
index 407a0bac868..a7523981e47 100644
--- a/src/mongo/db/client.h
+++ b/src/mongo/db/client.h
@@ -51,6 +51,10 @@ class AbstractMessagingPort;
class Collection;
class OperationContext;
+namespace transport {
+class Session;
+} // namespace transport
+
typedef long long ConnectionId;
/** the database's concept of an outside "client" */
@@ -59,16 +63,16 @@ public:
/**
* Creates a Client object and stores it in TLS for the current thread.
*
- * An unowned pointer to a AbstractMessagingPort may optionally be provided. If 'mp' is
- * non-null, then it will be used to augment the thread name, and for reporting purposes.
+ * An unowned pointer to a transport::Session may optionally be provided. If 'session'
+ * is non-null, then it will be used to augment the thread name, and for reporting purposes.
*
- * If provided, 'mp' must outlive the newly-created Client object. Client::destroy() may be used
- * to help enforce that the Client does not outlive 'mp'.
+ * If provided, 'session' must outlive the newly-created Client object. Client::destroy() may
+ * be used to help enforce that the Client does not outlive 'session.'
*/
- static void initThread(const char* desc, AbstractMessagingPort* mp = 0);
+ static void initThread(const char* desc, transport::Session* session = nullptr);
static void initThread(const char* desc,
ServiceContext* serviceContext,
- AbstractMessagingPort* mp);
+ transport::Session* session);
/**
* Inits a thread if that thread has not already been init'd, setting the thread name to
@@ -159,7 +163,7 @@ public:
private:
friend class ServiceContext;
- Client(std::string desc, ServiceContext* serviceContext, AbstractMessagingPort* p = 0);
+ Client(std::string desc, ServiceContext* serviceContext, transport::Session* session = nullptr);
// Description for the client (e.g. conn8)
diff --git a/src/mongo/db/client_basic.cpp b/src/mongo/db/client_basic.cpp
index 06e619597c3..578ebdf668d 100644
--- a/src/mongo/db/client_basic.cpp
+++ b/src/mongo/db/client_basic.cpp
@@ -32,8 +32,8 @@
namespace mongo {
-ClientBasic::ClientBasic(ServiceContext* serviceContext, AbstractMessagingPort* messagingPort)
- : _serviceContext(serviceContext), _messagingPort(messagingPort) {}
+ClientBasic::ClientBasic(ServiceContext* serviceContext, transport::Session* session)
+ : _serviceContext(serviceContext), _session(session) {}
ClientBasic::~ClientBasic() = default;
diff --git a/src/mongo/db/client_basic.h b/src/mongo/db/client_basic.h
index 88d7ef4afe8..09f31b5da42 100644
--- a/src/mongo/db/client_basic.h
+++ b/src/mongo/db/client_basic.h
@@ -31,6 +31,7 @@
#include <memory>
#include "mongo/base/disallow_copying.h"
+#include "mongo/transport/session.h"
#include "mongo/util/decorable.h"
#include "mongo/util/net/abstract_message_port.h"
#include "mongo/util/net/hostandport.h"
@@ -58,11 +59,12 @@ public:
}
bool hasRemote() const {
- return _messagingPort;
+ return _session;
}
+
HostAndPort getRemote() const {
- verify(_messagingPort);
- return _messagingPort->remote();
+ verify(_session);
+ return _session->remote();
}
/**
@@ -73,20 +75,20 @@ public:
}
/**
- * Returns the AbstractMessagePort to which this client session is bound, if any.
+ * Returns the Session to which this client is bound, if any.
*/
- AbstractMessagingPort* port() const {
- return _messagingPort;
+ transport::Session* session() const {
+ return _session;
}
static ClientBasic* getCurrent();
protected:
- ClientBasic(ServiceContext* serviceContext, AbstractMessagingPort* messagingPort);
+ ClientBasic(ServiceContext* serviceContext, transport::Session* session);
~ClientBasic();
private:
ServiceContext* const _serviceContext;
- AbstractMessagingPort* const _messagingPort;
+ transport::Session* const _session;
};
}
diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript
index 919510c4b79..87c472dc519 100644
--- a/src/mongo/db/commands/SConscript
+++ b/src/mongo/db/commands/SConscript
@@ -71,6 +71,7 @@ env.Library(
'$BUILD_DIR/mongo/s/coreshard',
'$BUILD_DIR/mongo/s/write_ops/batch_write_types',
'$BUILD_DIR/mongo/scripting/scripting_common',
+ '$BUILD_DIR/mongo/transport/transport_layer_common',
'$BUILD_DIR/mongo/util/cmdline_utils/cmdline_utils',
'$BUILD_DIR/mongo/util/foundation',
'$BUILD_DIR/mongo/util/ntservice',
diff --git a/src/mongo/db/commands/authentication_commands.cpp b/src/mongo/db/commands/authentication_commands.cpp
index 6fc6268a807..2f05971ef56 100644
--- a/src/mongo/db/commands/authentication_commands.cpp
+++ b/src/mongo/db/commands/authentication_commands.cpp
@@ -56,6 +56,7 @@
#include "mongo/db/server_options.h"
#include "mongo/platform/random.h"
#include "mongo/stdx/memory.h"
+#include "mongo/transport/session.h"
#include "mongo/util/concurrency/mutex.h"
#include "mongo/util/log.h"
#include "mongo/util/md5.hpp"
@@ -314,17 +315,17 @@ Status CmdAuthenticate::_authenticateX509(OperationContext* txn,
ClientBasic* client = ClientBasic::getCurrent();
AuthorizationSession* authorizationSession = AuthorizationSession::get(client);
- std::string clientSubjectName = client->port()->getX509SubjectName();
+ auto clientName = client->session()->getX509SubjectName();
if (!getSSLManager()->getSSLConfiguration().hasCA) {
return Status(ErrorCodes::AuthenticationFailed,
"Unable to verify x.509 certificate, as no CA has been provided.");
- } else if (user.getUser() != clientSubjectName) {
+ } else if (user.getUser() != clientName) {
return Status(ErrorCodes::AuthenticationFailed,
"There is no x.509 client certificate matching the user.");
} else {
// Handle internal cluster member auth, only applies to server-server connections
- if (getSSLManager()->getSSLConfiguration().isClusterMember(clientSubjectName)) {
+ if (getSSLManager()->getSSLConfiguration().isClusterMember(clientName)) {
int clusterAuthMode = serverGlobalParams.clusterAuthMode.load();
if (clusterAuthMode == ServerGlobalParams::ClusterAuthMode_undefined ||
clusterAuthMode == ServerGlobalParams::ClusterAuthMode_keyFile) {
diff --git a/src/mongo/db/commands/server_status.cpp b/src/mongo/db/commands/server_status.cpp
index bbe7e9930a7..e567a15b18b 100644
--- a/src/mongo/db/commands/server_status.cpp
+++ b/src/mongo/db/commands/server_status.cpp
@@ -47,9 +47,9 @@
#include "mongo/db/service_context.h"
#include "mongo/db/stats/counters.h"
#include "mongo/platform/process_id.h"
+#include "mongo/transport/transport_layer.h"
#include "mongo/util/log.h"
#include "mongo/util/net/hostname_canonicalization_worker.h"
-#include "mongo/util/net/listen.h"
#include "mongo/util/net/ssl_manager.h"
#include "mongo/util/processinfo.h"
#include "mongo/util/ramlog.h"
@@ -228,9 +228,10 @@ public:
BSONObj generateSection(OperationContext* txn, const BSONElement& configElement) const {
BSONObjBuilder bb;
- bb.append("current", Listener::globalTicketHolder.used());
- bb.append("available", Listener::globalTicketHolder.available());
- bb.append("totalCreated", Listener::globalConnectionNumber.load());
+ auto stats = txn->getServiceContext()->getTransportLayer()->sessionStats();
+ bb.append("current", static_cast<int>(stats.numOpenSessions));
+ bb.append("available", static_cast<int>(stats.numAvailableSessions));
+ bb.append("totalCreated", static_cast<int>(stats.numCreatedSessions));
return bb.obj();
}
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp
index a10b0be5a39..56dff7cecaf 100644
--- a/src/mongo/db/db.cpp
+++ b/src/mongo/db/db.cpp
@@ -96,6 +96,7 @@
#include "mongo/db/service_context.h"
#include "mongo/db/service_context_d.h"
#include "mongo/db/service_context_d.h"
+#include "mongo/db/service_entry_point_mongod.h"
#include "mongo/db/startup_warnings_mongod.h"
#include "mongo/db/stats/counters.h"
#include "mongo/db/stats/snapshots.h"
@@ -114,6 +115,7 @@
#include "mongo/stdx/future.h"
#include "mongo/stdx/memory.h"
#include "mongo/stdx/thread.h"
+#include "mongo/transport/transport_layer_legacy.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/cmdline_utils/censor_cmdline.h"
#include "mongo/util/concurrency/task.h"
@@ -124,7 +126,6 @@
#include "mongo/util/log.h"
#include "mongo/util/net/hostname_canonicalization_worker.h"
#include "mongo/util/net/listen.h"
-#include "mongo/util/net/message_server.h"
#include "mongo/util/net/ssl_manager.h"
#include "mongo/util/ntservice.h"
#include "mongo/util/options_parser/startup_options.h"
@@ -169,65 +170,6 @@ ntservice::NtServiceDefaultStrings defaultServiceStrings = {
Timer startupSrandTimer;
-class MyMessageHandler : public MessageHandler {
-public:
- virtual void connected(AbstractMessagingPort* p) {
- Client::initThread("conn", p);
- }
-
- virtual void process(Message& m, AbstractMessagingPort* port) {
- while (true) {
- if (inShutdown()) {
- log() << "got request after shutdown()" << endl;
- break;
- }
-
- DbResponse dbresponse;
- {
- auto opCtx = getGlobalServiceContext()->makeOperationContext(&cc());
- assembleResponse(opCtx.get(), m, dbresponse, port->remote());
-
- // opCtx must go out of scope here so that the operation cannot show up in currentOp
- // results after the response reaches the client
- }
-
- if (!dbresponse.response.empty()) {
- port->reply(m, dbresponse.response, dbresponse.responseToMsgId);
- if (dbresponse.exhaustNS.size() > 0) {
- MsgData::View header = dbresponse.response.header();
- QueryResult::View qr = header.view2ptr();
- long long cursorid = qr.getCursorId();
- if (cursorid) {
- verify(dbresponse.exhaustNS.size() && dbresponse.exhaustNS[0]);
- string ns = dbresponse.exhaustNS; // before reset() free's it...
- m.reset();
- BufBuilder b(512);
- b.appendNum((int)0 /*size set later*/);
- b.appendNum(header.getId());
- b.appendNum(header.getResponseToMsgId());
- b.appendNum((int)dbGetMore);
- b.appendNum((int)0);
- b.appendStr(ns);
- b.appendNum((int)0); // ntoreturn
- b.appendNum(cursorid);
-
- MsgData::View header = b.buf();
- header.setLen(b.len());
- m.setData(b.release());
- DEV log() << "exhaust=true sending more";
- continue; // this goes back to top loop
- }
- }
- }
- break;
- }
- }
-
- virtual void close() {
- Client::destroy();
- }
-};
-
static void logStartup(OperationContext* txn) {
BSONObjBuilder toLog;
stringstream id;
@@ -530,7 +472,7 @@ static void _initWireSpec() {
}
-static void _initAndListen(int listenPort) {
+static ExitCode _initAndListen(int listenPort) {
Client::initThread("initandlisten");
_initWireSpec();
@@ -570,20 +512,18 @@ static void _initAndListen(int listenPort) {
checked_cast<ServiceContextMongoD*>(getGlobalServiceContext())->createLockFile();
- // Due to SERVER-15389, we must setupSockets first thing at startup in order to avoid
- // obtaining too high a file descriptor for our calls to select().
- MessageServer::Options options;
+ transport::TransportLayerLegacy::Options options;
options.port = listenPort;
options.ipList = serverGlobalParams.bind_ip;
- auto handler = std::make_shared<MyMessageHandler>();
- MessageServer* server = createServer(options, std::move(handler), getGlobalServiceContext());
-
- // This is what actually creates the sockets, but does not yet listen on them because we
- // do not want connections to just hang if recovery takes a very long time.
- if (!server->setupSockets()) {
- error() << "Failed to set up sockets during startup.";
- return;
+ // Create, start, and attach the TL
+ auto transportLayer = stdx::make_unique<transport::TransportLayerLegacy>(
+ options,
+ std::make_shared<ServiceEntryPointMongod>(getGlobalServiceContext()->getTransportLayer()));
+ auto res = transportLayer->setup();
+ if (!res.isOK()) {
+ error() << "Failed to set up listener: " << res.toString();
+ return EXIT_NET_ERROR;
}
std::shared_ptr<DbWebServer> dbWebServer;
@@ -594,7 +534,7 @@ static void _initAndListen(int listenPort) {
new RestAdminAccess()));
if (!dbWebServer->setupSockets()) {
error() << "Failed to set up sockets for HTTP interface during startup.";
- return;
+ return EXIT_NET_ERROR;
}
}
@@ -674,7 +614,7 @@ static void _initAndListen(int listenPort) {
}
if (mmapv1GlobalOptions.journalOptions & MMAPV1Options::JournalRecoverOnly)
- return;
+ return EXIT_NET_ERROR;
if (mongodGlobalParams.scriptingEnabled) {
ScriptEngine::setup();
@@ -799,14 +739,19 @@ static void _initAndListen(int listenPort) {
// MessageServer::run will return when exit code closes its socket and we don't need the
// operation context anymore
startupOpCtx.reset();
- server->run();
+
+ auto start = getGlobalServiceContext()->addAndStartTransportLayer(std::move(transportLayer));
+ if (!start.isOK()) {
+ error() << "Failed to start the listener: " << start.toString();
+ return EXIT_NET_ERROR;
+ }
+
+ return waitForShutdown();
}
ExitCode initAndListen(int listenPort) {
try {
- _initAndListen(listenPort);
-
- return inShutdown() ? EXIT_CLEAN : EXIT_NET_ERROR;
+ return _initAndListen(listenPort);
} catch (DBException& e) {
log() << "exception in initAndListen: " << e.toString() << ", terminating" << endl;
return EXIT_UNCAUGHT;
@@ -1001,7 +946,6 @@ static void reportEventToSystemImpl(const char* msg) {
// registerShutdownTask is called below. It must not depend on the
// prior execution of mongo initializers or the existence of threads.
static void shutdownTask() {
-
auto serviceContext = getGlobalServiceContext();
Client::initThreadIfNotAlready();
@@ -1014,6 +958,7 @@ static void shutdownTask() {
txn = uniqueTxn.get();
}
+ getGlobalServiceContext()->getTransportLayer()->shutdown();
log(LogComponent::kNetwork) << "shutdown: going to close listening sockets..." << endl;
ListeningSockets::get()->closeAll();
@@ -1037,12 +982,6 @@ static void shutdownTask() {
// will do it for us when the process terminates.
stdx::packaged_task<void()> dryOutTask([] {
-
- // Walk all open sockets and close them. This should unblock any that are sitting in
- // recv. Other sockets on which there are active operations should see the inShutdown flag
- // as true when they complete the operation.
- Listener::closeMessagingPorts(0);
-
// There isn't currently a way to wait on the TicketHolder to have all its tickets back,
// unfortunately. So, busy wait in this detached thread.
while (true) {
diff --git a/src/mongo/db/dbmessage.cpp b/src/mongo/db/dbmessage.cpp
index 67a5548dbfb..f16ec2b6276 100644
--- a/src/mongo/db/dbmessage.cpp
+++ b/src/mongo/db/dbmessage.cpp
@@ -30,7 +30,9 @@
#include "mongo/platform/basic.h"
#include "mongo/db/dbmessage.h"
+#include "mongo/db/operation_context.h"
#include "mongo/platform/strnlen.h"
+#include "mongo/transport/session.h"
namespace mongo {
@@ -173,20 +175,23 @@ OpQueryReplyBuilder::OpQueryReplyBuilder() : _buffer(32768) {
_buffer.skip(sizeof(QueryResult::Value));
}
-void OpQueryReplyBuilder::send(AbstractMessagingPort* destination,
+void OpQueryReplyBuilder::send(transport::Session* session,
int queryResultFlags,
- Message& requestMsg,
+ const Message& requestMsg,
int nReturned,
int startingFrom,
long long cursorId) {
Message response;
putInMessage(&response, queryResultFlags, nReturned, startingFrom, cursorId);
- destination->reply(requestMsg, response, requestMsg.header().getId());
+
+ response.header().setId(nextMessageId());
+ response.header().setResponseToMsgId(requestMsg.header().getId());
+
+ uassertStatusOK(session->sinkMessage(response).wait());
}
-void OpQueryReplyBuilder::sendCommandReply(AbstractMessagingPort* destination,
- Message& requestMsg) {
- send(destination, /*queryFlags*/ 0, requestMsg, /*nReturned*/ 1);
+void OpQueryReplyBuilder::sendCommandReply(transport::Session* session, const Message& requestMsg) {
+ send(session, /*queryFlags*/ 0, requestMsg, /*nReturned*/ 1);
}
void OpQueryReplyBuilder::putInMessage(
@@ -202,7 +207,7 @@ void OpQueryReplyBuilder::putInMessage(
}
void replyToQuery(int queryResultFlags,
- AbstractMessagingPort* p,
+ transport::Session* session,
Message& requestMsg,
const void* data,
int size,
@@ -211,15 +216,19 @@ void replyToQuery(int queryResultFlags,
long long cursorId) {
OpQueryReplyBuilder reply;
reply.bufBuilderForResults().appendBuf(data, size);
- reply.send(p, queryResultFlags, requestMsg, nReturned, startingFrom, cursorId);
+ reply.send(session, queryResultFlags, requestMsg, nReturned, startingFrom, cursorId);
}
void replyToQuery(int queryResultFlags,
- AbstractMessagingPort* p,
+ transport::Session* session,
Message& requestMsg,
const BSONObj& responseObj) {
- replyToQuery(
- queryResultFlags, p, requestMsg, (void*)responseObj.objdata(), responseObj.objsize(), 1);
+ replyToQuery(queryResultFlags,
+ session,
+ requestMsg,
+ (void*)responseObj.objdata(),
+ responseObj.objsize(),
+ 1);
}
void replyToQuery(int queryResultFlags, Message& m, DbResponse& dbresponse, BSONObj obj) {
diff --git a/src/mongo/db/dbmessage.h b/src/mongo/db/dbmessage.h
index 7f1a3a2d0f2..7fabf117421 100644
--- a/src/mongo/db/dbmessage.h
+++ b/src/mongo/db/dbmessage.h
@@ -39,6 +39,12 @@
namespace mongo {
+class OperationContext;
+
+namespace transport {
+class Session;
+} // namespace transport
+
/* db response format
Query or GetMore: // see struct QueryResult
@@ -352,9 +358,9 @@ public:
/**
* Finishes the reply and sends the message out to 'destination'.
*/
- void send(AbstractMessagingPort* destination,
+ void send(transport::Session* session,
int queryResultFlags,
- Message& requestMsg, // should be const but MessagePort::reply takes non-const.
+ const Message& requestMsg,
int nReturned,
int startingFrom = 0,
long long cursorId = 0);
@@ -362,14 +368,14 @@ public:
/**
* Similar to send() but used for replying to a command.
*/
- void sendCommandReply(AbstractMessagingPort* destination, Message& requestMsg);
+ void sendCommandReply(transport::Session* session, const Message& requestMsg);
private:
BufBuilder _buffer;
};
void replyToQuery(int queryResultFlags,
- AbstractMessagingPort* p,
+ transport::Session* session,
Message& requestMsg,
const void* data,
int size,
@@ -377,10 +383,9 @@ void replyToQuery(int queryResultFlags,
int startingFrom = 0,
long long cursorId = 0);
-
/* object reply helper. */
void replyToQuery(int queryResultFlags,
- AbstractMessagingPort* p,
+ transport::Session* session,
Message& requestMsg,
const BSONObj& responseObj);
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index b6a2981dde0..b92ec300cf8 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -398,6 +398,7 @@ env.Library('repl_coordinator_impl',
'$BUILD_DIR/mongo/db/service_context',
'$BUILD_DIR/mongo/rpc/command_status',
'$BUILD_DIR/mongo/rpc/metadata',
+ '$BUILD_DIR/mongo/transport/transport_layer_common',
'$BUILD_DIR/mongo/util/fail_point',
'collection_cloner',
'data_replicator',
@@ -627,6 +628,7 @@ env.Library('replica_set_messages',
'$BUILD_DIR/mongo/client/connection_string',
'$BUILD_DIR/mongo/db/common',
'$BUILD_DIR/mongo/db/server_options_core',
+ '$BUILD_DIR/mongo/transport/transport_layer_common',
'$BUILD_DIR/mongo/util/net/hostandport',
'optime',
'read_concern_args',
diff --git a/src/mongo/db/repl/repl_set_request_votes.cpp b/src/mongo/db/repl/repl_set_request_votes.cpp
index 4b34c351769..c01628466c0 100644
--- a/src/mongo/db/repl/repl_set_request_votes.cpp
+++ b/src/mongo/db/repl/repl_set_request_votes.cpp
@@ -35,6 +35,8 @@
#include "mongo/db/repl/repl_set_request_votes_args.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/executor/network_interface.h"
+#include "mongo/transport/session.h"
+#include "mongo/transport/transport_layer.h"
#include "mongo/util/scopeguard.h"
namespace mongo {
@@ -64,16 +66,17 @@ private:
// We want to keep request vote connection open when relinquishing primary.
// Tag it here.
- unsigned originalTag = 0;
- AbstractMessagingPort* mp = txn->getClient()->port();
- if (mp) {
- originalTag = mp->getTag();
- mp->setTag(originalTag | executor::NetworkInterface::kMessagingPortKeepOpen);
+ transport::Session::TagMask originalTag = 0;
+ transport::Session* session = txn->getClient()->session();
+ if (session) {
+ originalTag = session->getTags();
+ session->replaceTags(originalTag | transport::Session::kKeepOpen);
}
+
// Untag the connection on exit.
- ON_BLOCK_EXIT([mp, originalTag]() {
- if (mp) {
- mp->setTag(originalTag);
+ ON_BLOCK_EXIT([session, originalTag]() {
+ if (session) {
+ session->replaceTags(originalTag);
}
});
diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h
index 582b8f45970..245b37cca7f 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state.h
@@ -192,8 +192,8 @@ public:
virtual HostAndPort getClientHostAndPort(const OperationContext* txn) = 0;
/**
- * Closes all connections except those marked with the keepOpen property, which should
- * just be connections used for heartbeating.
+ * Closes all connections in the given TransportLayer except those marked with the
+ * keepOpen property, which should just be connections used for heartbeating.
* This is used during stepdown, and transition out of primary.
*/
virtual void closeConnections() = 0;
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 81ed9c16bf3..f140580fbcc 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -76,6 +76,8 @@
#include "mongo/stdx/functional.h"
#include "mongo/stdx/memory.h"
#include "mongo/stdx/thread.h"
+#include "mongo/transport/session.h"
+#include "mongo/transport/transport_layer.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/exit.h"
@@ -449,7 +451,7 @@ HostAndPort ReplicationCoordinatorExternalStateImpl::getClientHostAndPort(
}
void ReplicationCoordinatorExternalStateImpl::closeConnections() {
- Listener::closeMessagingPorts(executor::NetworkInterface::kMessagingPortKeepOpen);
+ getGlobalServiceContext()->getTransportLayer()->endAllSessions(transport::Session::kKeepOpen);
}
void ReplicationCoordinatorExternalStateImpl::killAllUserOperations(OperationContext* txn) {
diff --git a/src/mongo/db/repl/replication_info.cpp b/src/mongo/db/repl/replication_info.cpp
index cc844eaf35c..fd36395c017 100644
--- a/src/mongo/db/repl/replication_info.cpp
+++ b/src/mongo/db/repl/replication_info.cpp
@@ -234,9 +234,10 @@ public:
// Tag connections to avoid closing them on stepdown.
auto hangUpElement = cmdObj["hangUpOnStepDown"];
if (!hangUpElement.eoo() && !hangUpElement.trueValue()) {
- AbstractMessagingPort* mp = txn->getClient()->port();
- if (mp) {
- mp->setTag(mp->getTag() | executor::NetworkInterface::kMessagingPortKeepOpen);
+ auto session = txn->getClient()->session();
+ if (session) {
+ session->replaceTags(session->getTags() |
+ executor::NetworkInterface::kMessagingPortKeepOpen);
}
}
appendReplicationInfo(txn, result, 0);
diff --git a/src/mongo/db/repl/replset_commands.cpp b/src/mongo/db/repl/replset_commands.cpp
index 1b96dac313f..f591d64c11b 100644
--- a/src/mongo/db/repl/replset_commands.cpp
+++ b/src/mongo/db/repl/replset_commands.cpp
@@ -59,6 +59,8 @@
#include "mongo/db/service_context.h"
#include "mongo/db/storage/storage_engine.h"
#include "mongo/executor/network_interface.h"
+#include "mongo/transport/session.h"
+#include "mongo/transport/transport_layer.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
#include "mongo/util/scopeguard.h"
@@ -726,17 +728,17 @@ public:
/* we want to keep heartbeat connections open when relinquishing primary.
tag them here. */
- AbstractMessagingPort* mp = txn->getClient()->port();
- unsigned originalTag = 0;
- if (mp) {
- originalTag = mp->getTag();
- mp->setTag(originalTag | executor::NetworkInterface::kMessagingPortKeepOpen);
+ transport::Session::TagMask originalTag = 0;
+ transport::Session* session = txn->getClient()->session();
+ if (session) {
+ originalTag = session->getTags();
+ session->replaceTags(originalTag | transport::Session::kKeepOpen);
}
// Unset the tag on block exit
- ON_BLOCK_EXIT([mp, originalTag]() {
- if (mp) {
- mp->setTag(originalTag);
+ ON_BLOCK_EXIT([session, originalTag]() {
+ if (session) {
+ session->replaceTags(originalTag);
}
});
diff --git a/src/mongo/db/s/collection_metadata_test.cpp b/src/mongo/db/s/collection_metadata_test.cpp
index 013c135cc84..359cb3b4fc1 100644
--- a/src/mongo/db/s/collection_metadata_test.cpp
+++ b/src/mongo/db/s/collection_metadata_test.cpp
@@ -50,7 +50,7 @@ class NoChunkFixture : public ShardingCatalogTestFixture {
protected:
void setUp() {
ShardingCatalogTestFixture::setUp();
- getMessagingPort()->setRemote(HostAndPort("FakeRemoteClient:34567"));
+ setRemote(HostAndPort("FakeRemoteClient:34567"));
configTargeter()->setFindHostReturnValue(configHost);
OID epoch = OID::gen();
@@ -318,7 +318,7 @@ class SingleChunkFixture : public ShardingCatalogTestFixture {
protected:
void setUp() {
ShardingCatalogTestFixture::setUp();
- getMessagingPort()->setRemote(HostAndPort("FakeRemoteClient:34567"));
+ setRemote(HostAndPort("FakeRemoteClient:34567"));
configTargeter()->setFindHostReturnValue(configHost);
OID epoch = OID::gen();
@@ -574,7 +574,7 @@ class SingleChunkMinMaxCompoundKeyFixture : public ShardingCatalogTestFixture {
protected:
void setUp() {
ShardingCatalogTestFixture::setUp();
- getMessagingPort()->setRemote(HostAndPort("FakeRemoteClient:34567"));
+ setRemote(HostAndPort("FakeRemoteClient:34567"));
configTargeter()->setFindHostReturnValue(configHost);
OID epoch = OID::gen();
@@ -642,7 +642,7 @@ class TwoChunksWithGapCompoundKeyFixture : public ShardingCatalogTestFixture {
protected:
void setUp() {
ShardingCatalogTestFixture::setUp();
- getMessagingPort()->setRemote(HostAndPort("FakeRemoteClient:34567"));
+ setRemote(HostAndPort("FakeRemoteClient:34567"));
configTargeter()->setFindHostReturnValue(configHost);
OID epoch = OID::gen();
@@ -839,7 +839,7 @@ class ThreeChunkWithRangeGapFixture : public ShardingCatalogTestFixture {
protected:
void setUp() {
ShardingCatalogTestFixture::setUp();
- getMessagingPort()->setRemote(HostAndPort("FakeRemoteClient:34567"));
+ setRemote(HostAndPort("FakeRemoteClient:34567"));
configTargeter()->setFindHostReturnValue(configHost);
OID epoch = OID::gen();
diff --git a/src/mongo/db/s/metadata_loader_test.cpp b/src/mongo/db/s/metadata_loader_test.cpp
index c96b77a1590..e33e3142d1a 100644
--- a/src/mongo/db/s/metadata_loader_test.cpp
+++ b/src/mongo/db/s/metadata_loader_test.cpp
@@ -51,7 +51,7 @@ public:
protected:
void setUp() override {
ShardingCatalogTestFixture::setUp();
- getMessagingPort()->setRemote(HostAndPort("FakeRemoteClient:34567"));
+ setRemote(HostAndPort("FakeRemoteClient:34567"));
configTargeter()->setFindHostReturnValue(configHost);
_maxCollVersion = ChunkVersion(1, 0, OID::gen());
_loader.reset(new MetadataLoader);
diff --git a/src/mongo/db/service_context.cpp b/src/mongo/db/service_context.cpp
index 03dc9bcaf8d..648fd546721 100644
--- a/src/mongo/db/service_context.cpp
+++ b/src/mongo/db/service_context.cpp
@@ -34,6 +34,9 @@
#include "mongo/db/client.h"
#include "mongo/db/operation_context.h"
#include "mongo/stdx/memory.h"
+#include "mongo/transport/session.h"
+#include "mongo/transport/transport_layer.h"
+#include "mongo/transport/transport_layer_manager.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/mongoutils/str.h"
#include "mongo/util/system_clock_source.h"
@@ -115,7 +118,8 @@ Status validateStorageOptions(
}
ServiceContext::ServiceContext()
- : _tickSource(stdx::make_unique<SystemTickSource>()),
+ : _transportLayerManager(stdx::make_unique<transport::TransportLayerManager>()),
+ _tickSource(stdx::make_unique<SystemTickSource>()),
_fastClockSource(stdx::make_unique<SystemClockSource>()),
_preciseClockSource(stdx::make_unique<SystemClockSource>()) {}
@@ -125,8 +129,8 @@ ServiceContext::~ServiceContext() {
}
ServiceContext::UniqueClient ServiceContext::makeClient(std::string desc,
- AbstractMessagingPort* p) {
- std::unique_ptr<Client> client(new Client(std::move(desc), this, p));
+ transport::Session* session) {
+ std::unique_ptr<Client> client(new Client(std::move(desc), this, session));
auto observer = _clientObservers.cbegin();
try {
for (; observer != _clientObservers.cend(); ++observer) {
@@ -150,6 +154,14 @@ ServiceContext::UniqueClient ServiceContext::makeClient(std::string desc,
return UniqueClient(client.release());
}
+transport::TransportLayer* ServiceContext::getTransportLayer() const {
+ return _transportLayerManager.get();
+}
+
+Status ServiceContext::addAndStartTransportLayer(std::unique_ptr<transport::TransportLayer> tl) {
+ return _transportLayerManager->addAndStartTransportLayer(std::move(tl));
+}
+
TickSource* ServiceContext::getTickSource() const {
return _tickSource.get();
}
diff --git a/src/mongo/db/service_context.h b/src/mongo/db/service_context.h
index fe6a4432d70..174a52ebe52 100644
--- a/src/mongo/db/service_context.h
+++ b/src/mongo/db/service_context.h
@@ -48,6 +48,12 @@ class Client;
class OperationContext;
class OpObserver;
+namespace transport {
+class Session;
+class TransportLayer;
+class TransportLayerManager;
+} // namespace transport
+
/**
* Classes that implement this interface can receive notification on killOp.
*
@@ -207,9 +213,9 @@ public:
*
* The "desc" string is used to set a descriptive name for the client, used in logging.
*
- * If supplied, "p" is the communication channel used for communicating with the client.
+ * If supplied, "session" is the transport::Session used for communicating with the client.
*/
- UniqueClient makeClient(std::string desc, AbstractMessagingPort* p = nullptr);
+ UniqueClient makeClient(std::string desc, transport::Session* session = nullptr);
/**
* Creates a new OperationContext on "client".
@@ -299,6 +305,26 @@ public:
void registerKillOpListener(KillOpListenerInterface* listener);
//
+ // Transport.
+ //
+
+ /**
+ * Get the master TransportLayer. Routes to all other TransportLayers that
+ * may be in use within this service.
+ *
+ * See TransportLayerManager for more details.
+ */
+ transport::TransportLayer* getTransportLayer() const;
+
+ /**
+ * Add a new TransportLayer to this service context. The new TransportLayer will
+ * be added to the TransportLayerManager accessible via getTransportLayer().
+ *
+ * It additionally calls start() on the TransportLayer after adding it.
+ */
+ Status addAndStartTransportLayer(std::unique_ptr<transport::TransportLayer> tl);
+
+ //
// Global OpObserver.
//
@@ -370,6 +396,11 @@ private:
/**
+ * The TransportLayerManager.
+ */
+ std::unique_ptr<transport::TransportLayerManager> _transportLayerManager;
+
+ /**
* Vector of registered observers.
*/
std::vector<std::unique_ptr<ClientObserver>> _clientObservers;
diff --git a/src/mongo/db/service_entry_point_mongod.cpp b/src/mongo/db/service_entry_point_mongod.cpp
new file mode 100644
index 00000000000..86f782a0bad
--- /dev/null
+++ b/src/mongo/db/service_entry_point_mongod.cpp
@@ -0,0 +1,153 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/service_entry_point_mongod.h"
+
+#include <vector>
+
+#include "mongo/db/client.h"
+#include "mongo/db/dbmessage.h"
+#include "mongo/db/instance.h"
+#include "mongo/stdx/thread.h"
+#include "mongo/transport/service_entry_point_utils.h"
+#include "mongo/transport/session.h"
+#include "mongo/transport/ticket.h"
+#include "mongo/transport/transport_layer.h"
+#include "mongo/util/exit.h"
+#include "mongo/util/log.h"
+#include "mongo/util/net/message.h"
+#include "mongo/util/net/socket_exception.h"
+#include "mongo/util/net/thread_idle_callback.h"
+#include "mongo/util/quick_exit.h"
+
+namespace mongo {
+namespace {
+
+// Set up proper headers for formatting an exhaust request, if we need to
+bool setExhaustMessage(Message* m, const DbResponse& dbresponse) {
+ MsgData::View header = dbresponse.response.header();
+ QueryResult::View qr = header.view2ptr();
+ long long cursorid = qr.getCursorId();
+
+ if (!cursorid) {
+ return false;
+ }
+
+ verify(dbresponse.exhaustNS.size() && dbresponse.exhaustNS[0]);
+
+ auto ns = dbresponse.exhaustNS; // reset() will free this
+
+ m->reset();
+
+ BufBuilder b(512);
+ b.appendNum(static_cast<int>(0) /* size set later in appendData() */);
+ b.appendNum(header.getId());
+ b.appendNum(header.getResponseToMsgId());
+ b.appendNum(static_cast<int>(dbGetMore));
+ b.appendNum(static_cast<int>(0));
+ b.appendStr(ns);
+ b.appendNum(static_cast<int>(0)); // ntoreturn
+ b.appendNum(cursorid);
+
+ MsgData::View(b.buf()).setLen(b.len());
+ m->setData(b.release());
+
+ return true;
+}
+
+} // namespace
+
+using transport::Session;
+using transport::TransportLayer;
+
+ServiceEntryPointMongod::ServiceEntryPointMongod(TransportLayer* tl) : _tl(tl) {}
+
+void ServiceEntryPointMongod::startSession(Session&& session) {
+ launchWrappedServiceEntryWorkerThread(std::move(session),
+ [this](Session* session) { _sessionLoop(session); });
+}
+
+void ServiceEntryPointMongod::_sessionLoop(Session* session) {
+ Message inMessage;
+ bool inExhaust = false;
+ int64_t counter = 0;
+
+ while (true) {
+ // 1. Source a Message from the client (unless we are exhausting)
+ if (!inExhaust) {
+ inMessage.reset();
+ auto status = session->sourceMessage(&inMessage).wait();
+
+ if (ErrorCodes::isInterruption(status.code())) {
+ break;
+ }
+
+ uassertStatusOK(status);
+ }
+
+ // 2. Pass sourced Message up to mongod
+ DbResponse dbresponse;
+ {
+ auto opCtx = cc().makeOperationContext();
+ assembleResponse(opCtx.get(), inMessage, dbresponse, session->remote());
+
+ // opCtx must go out of scope here so that the operation cannot show
+ // up in currentOp results after the response reaches the client
+ }
+
+ // 3. Format our response, if we have one
+ Message& toSink = dbresponse.response;
+ if (!toSink.empty()) {
+ toSink.header().setId(nextMessageId());
+ toSink.header().setResponseToMsgId(inMessage.header().getId());
+
+ // If this is an exhaust cursor, don't source more Messages
+ if (dbresponse.exhaustNS.size() > 0 && setExhaustMessage(&inMessage, dbresponse)) {
+ log() << "we are in exhaust";
+ inExhaust = true;
+ } else {
+ inExhaust = false;
+ }
+
+ // 4. Sink our response to the client
+ uassertStatusOK(session->sinkMessage(toSink).wait());
+ } else {
+ inExhaust = false;
+ }
+
+ if ((counter++ & 0xf) == 0) {
+ markThreadIdle();
+ }
+ }
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/service_entry_point_mongod.h b/src/mongo/db/service_entry_point_mongod.h
new file mode 100644
index 00000000000..31ed05098c8
--- /dev/null
+++ b/src/mongo/db/service_entry_point_mongod.h
@@ -0,0 +1,63 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <vector>
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/transport/service_entry_point.h"
+
+namespace mongo {
+
+namespace transport {
+class Session;
+class TransportLayer;
+} // namespace transport
+
+/**
+ * The entry point from the TransportLayer into Mongod. startSession() spawns and
+ * detaches a new thread for each incoming connection (transport::Session).
+ */
+class ServiceEntryPointMongod final : public ServiceEntryPoint {
+ MONGO_DISALLOW_COPYING(ServiceEntryPointMongod);
+
+public:
+ explicit ServiceEntryPointMongod(transport::TransportLayer* tl);
+
+ virtual ~ServiceEntryPointMongod() = default;
+
+ void startSession(transport::Session&& session) override;
+
+private:
+ void _sessionLoop(transport::Session* session);
+
+ transport::TransportLayer* _tl;
+};
+
+} // namespace mongo