summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/client/dbclientcursor.cpp6
-rw-r--r--src/mongo/client/scoped_db_conn_test.cpp6
-rw-r--r--src/mongo/db/assemble_response.cpp105
-rw-r--r--src/mongo/db/assemble_response.h7
-rw-r--r--src/mongo/db/db.cpp34
-rw-r--r--src/mongo/db/dbdirectclient.cpp9
-rw-r--r--src/mongo/db/dbmessage.cpp72
-rw-r--r--src/mongo/db/dbmessage.h74
-rw-r--r--src/mongo/db/service_entry_point_mongod.cpp135
-rw-r--r--src/mongo/db/service_entry_point_mongod.h34
-rw-r--r--src/mongo/s/commands/strategy.cpp77
-rw-r--r--src/mongo/s/commands/strategy.h11
-rw-r--r--src/mongo/s/service_entry_point_mongos.cpp176
-rw-r--r--src/mongo/s/service_entry_point_mongos.h26
-rw-r--r--src/mongo/transport/SConscript1
-rw-r--r--src/mongo/transport/service_entry_point.h8
-rw-r--r--src/mongo/transport/service_entry_point_impl.cpp169
-rw-r--r--src/mongo/transport/service_entry_point_impl.h73
-rw-r--r--src/mongo/transport/service_entry_point_mock.cpp61
-rw-r--r--src/mongo/transport/service_entry_point_mock.h6
-rw-r--r--src/mongo/transport/transport_layer_legacy_test.cpp8
21 files changed, 520 insertions, 578 deletions
diff --git a/src/mongo/client/dbclientcursor.cpp b/src/mongo/client/dbclientcursor.cpp
index bb74398b35f..db37bea3bf0 100644
--- a/src/mongo/client/dbclientcursor.cpp
+++ b/src/mongo/client/dbclientcursor.cpp
@@ -249,10 +249,8 @@ void DBClientCursor::commandDataReceived() {
// HACK: If we got an OP_COMMANDREPLY, take the reply object
// and shove it in to an OP_REPLY message.
if (op == dbCommandReply) {
- // Need to take ownership here as we destroy the underlying message.
- BSONObj reply = commandReply->getCommandReply().getOwned();
- batch.m.reset();
- replyToQuery(0, batch.m, reply);
+ BSONObj reply = commandReply->getCommandReply();
+ batch.m = replyToQuery(reply).response;
}
QueryResult::View qr = batch.m.singleData().view2ptr();
diff --git a/src/mongo/client/scoped_db_conn_test.cpp b/src/mongo/client/scoped_db_conn_test.cpp
index 41ae8d1af0c..57d196e7a88 100644
--- a/src/mongo/client/scoped_db_conn_test.cpp
+++ b/src/mongo/client/scoped_db_conn_test.cpp
@@ -95,6 +95,12 @@ public:
_replyDelay = delay;
}
+ DbResponse handleRequest(OperationContext* opCtx,
+ const Message& request,
+ const HostAndPort& client) override {
+ MONGO_UNREACHABLE;
+ }
+
private:
void run(transport::SessionHandle session) {
Message inMessage;
diff --git a/src/mongo/db/assemble_response.cpp b/src/mongo/db/assemble_response.cpp
index 1c4444a2ce4..dce1610fb62 100644
--- a/src/mongo/db/assemble_response.cpp
+++ b/src/mongo/db/assemble_response.cpp
@@ -69,13 +69,13 @@ MONGO_FP_DECLARE(rsStopGetMore);
namespace {
using logger::LogComponent;
-inline void opread(Message& m) {
+inline void opread(const Message& m) {
if (_diaglog.getLevel() & 2) {
_diaglog.readop(m.singleData().view2ptr(), m.header().getLen());
}
}
-inline void opwrite(Message& m) {
+inline void opwrite(const Message& m) {
if (_diaglog.getLevel() & 1) {
_diaglog.writeop(m.singleData().view2ptr(), m.header().getLen());
}
@@ -143,15 +143,12 @@ void beginCommandOp(OperationContext* opCtx, const NamespaceString& nss, const B
curop->setNS_inlock(nss.ns());
}
-void receivedCommand(OperationContext* opCtx,
- const NamespaceString& nss,
- Client& client,
- DbResponse& dbResponse,
- Message& message) {
+DbResponse receivedCommand(OperationContext* opCtx,
+ const NamespaceString& nss,
+ Client& client,
+ const Message& message) {
invariant(nss.isCommand());
- const int32_t responseToMsgId = message.header().getId();
-
DbMessage dbMessage(message);
QueryMessage queryMessage(dbMessage);
@@ -188,18 +185,12 @@ void receivedCommand(OperationContext* opCtx,
op->debug().responseLength = response.header().dataLen();
- dbResponse.response = std::move(response);
- dbResponse.responseToMsgId = responseToMsgId;
+ return {std::move(response)};
}
-void receivedRpc(OperationContext* opCtx,
- Client& client,
- DbResponse& dbResponse,
- Message& message) {
+DbResponse receivedRpc(OperationContext* opCtx, Client& client, const Message& message) {
invariant(message.operation() == dbCommand);
- const int32_t responseToMsgId = message.header().getId();
-
rpc::CommandReplyBuilder replyBuilder{};
auto curOp = CurOp::get(opCtx);
@@ -229,19 +220,17 @@ void receivedRpc(OperationContext* opCtx,
curOp->debug().responseLength = response.header().dataLen();
- dbResponse.response = std::move(response);
- dbResponse.responseToMsgId = responseToMsgId;
+ return {std::move(response)};
}
// In SERVER-7775 we reimplemented the pseudo-commands fsyncUnlock, inProg, and killOp
// as ordinary commands. To support old clients for another release, this helper serves
// to execute the real command from the legacy pseudo-command codepath.
// TODO: remove after MongoDB 3.2 is released
-void receivedPseudoCommand(OperationContext* opCtx,
- Client& client,
- DbResponse& dbResponse,
- Message& message,
- StringData realCommandName) {
+DbResponse receivedPseudoCommand(OperationContext* opCtx,
+ Client& client,
+ const Message& message,
+ StringData realCommandName) {
DbMessage originalDbm(message);
auto originalNToSkip = originalDbm.pullInt();
@@ -288,23 +277,21 @@ void receivedPseudoCommand(OperationContext* opCtx,
interposed.setData(dbQuery, cmdMsgBuf.buf(), cmdMsgBuf.len());
interposed.header().setId(message.header().getId());
- receivedCommand(opCtx, interposedNss, client, dbResponse, interposed);
+ return receivedCommand(opCtx, interposedNss, client, interposed);
}
-void receivedQuery(OperationContext* opCtx,
- const NamespaceString& nss,
- Client& c,
- DbResponse& dbResponse,
- Message& m) {
+DbResponse receivedQuery(OperationContext* opCtx,
+ const NamespaceString& nss,
+ Client& c,
+ const Message& m) {
invariant(!nss.isCommand());
globalOpCounters.gotQuery();
- int32_t responseToMsgId = m.header().getId();
-
DbMessage d(m);
QueryMessage q(d);
CurOp& op = *CurOp::get(opCtx);
+ DbResponse dbResponse;
try {
Client* client = opCtx->getClient();
@@ -326,10 +313,10 @@ void receivedQuery(OperationContext* opCtx,
}
op.debug().responseLength = dbResponse.response.header().dataLen();
- dbResponse.responseToMsgId = responseToMsgId;
+ return dbResponse;
}
-void receivedKillCursors(OperationContext* opCtx, Message& m) {
+void receivedKillCursors(OperationContext* opCtx, const Message& m) {
LastError::get(opCtx->getClient()).disable();
DbMessage dbmessage(m);
int n = dbmessage.pullInt();
@@ -354,7 +341,7 @@ void receivedKillCursors(OperationContext* opCtx, Message& m) {
}
}
-void receivedInsert(OperationContext* opCtx, const NamespaceString& nsString, Message& m) {
+void receivedInsert(OperationContext* opCtx, const NamespaceString& nsString, const Message& m) {
auto insertOp = parseLegacyInsert(m);
invariant(insertOp.ns == nsString);
for (const auto& obj : insertOp.documents) {
@@ -366,7 +353,7 @@ void receivedInsert(OperationContext* opCtx, const NamespaceString& nsString, Me
performInserts(opCtx, insertOp);
}
-void receivedUpdate(OperationContext* opCtx, const NamespaceString& nsString, Message& m) {
+void receivedUpdate(OperationContext* opCtx, const NamespaceString& nsString, const Message& m) {
auto updateOp = parseLegacyUpdate(m);
auto& singleUpdate = updateOp.updates[0];
invariant(updateOp.ns == nsString);
@@ -387,7 +374,7 @@ void receivedUpdate(OperationContext* opCtx, const NamespaceString& nsString, Me
performUpdates(opCtx, updateOp);
}
-void receivedDelete(OperationContext* opCtx, const NamespaceString& nsString, Message& m) {
+void receivedDelete(OperationContext* opCtx, const NamespaceString& nsString, const Message& m) {
auto deleteOp = parseLegacyDelete(m);
auto& singleDelete = deleteOp.deletes[0];
invariant(deleteOp.ns == nsString);
@@ -400,7 +387,10 @@ void receivedDelete(OperationContext* opCtx, const NamespaceString& nsString, Me
performDeletes(opCtx, deleteOp);
}
-bool receivedGetMore(OperationContext* opCtx, DbResponse& dbresponse, Message& m, CurOp& curop) {
+DbResponse receivedGetMore(OperationContext* opCtx,
+ const Message& m,
+ CurOp& curop,
+ bool* shouldLogOpDebug) {
globalOpCounters.gotGetMore();
DbMessage d(m);
@@ -421,6 +411,7 @@ bool receivedGetMore(OperationContext* opCtx, DbResponse& dbresponse, Message& m
bool exhaust = false;
bool isCursorAuthorized = false;
+ DbResponse dbresponse;
try {
const NamespaceString nsString(ns);
uassert(ErrorCodes::InvalidNamespace,
@@ -454,33 +445,28 @@ bool receivedGetMore(OperationContext* opCtx, DbResponse& dbresponse, Message& m
curop.debug().exceptionInfo = e.getInfo();
- replyToQuery(ResultFlag_ErrSet, m, dbresponse, errObj);
+ dbresponse = replyToQuery(errObj, ResultFlag_ErrSet);
curop.debug().responseLength = dbresponse.response.header().dataLen();
curop.debug().nreturned = 1;
- return false;
+ *shouldLogOpDebug = true;
+ return dbresponse;
}
curop.debug().responseLength = dbresponse.response.header().dataLen();
auto queryResult = QueryResult::ConstView(dbresponse.response.buf());
curop.debug().nreturned = queryResult.getNReturned();
- dbresponse.responseToMsgId = m.header().getId();
-
if (exhaust) {
curop.debug().exhaust = true;
dbresponse.exhaustNS = ns;
}
- return true;
+ return dbresponse;
}
} // namespace
-// Returns false when request includes 'end'
-void assembleResponse(OperationContext* opCtx,
- Message& m,
- DbResponse& dbresponse,
- const HostAndPort& remote) {
+DbResponse assembleResponse(OperationContext* opCtx, const Message& m, const HostAndPort& remote) {
// before we lock...
NetworkOp op = m.operation();
bool isCommand = false;
@@ -511,16 +497,13 @@ void assembleResponse(OperationContext* opCtx,
opwrite(m);
if (nsString.coll() == "$cmd.sys.inprog") {
- receivedPseudoCommand(opCtx, c, dbresponse, m, "currentOp");
- return;
+ return receivedPseudoCommand(opCtx, c, m, "currentOp");
}
if (nsString.coll() == "$cmd.sys.killop") {
- receivedPseudoCommand(opCtx, c, dbresponse, m, "killOp");
- return;
+ return receivedPseudoCommand(opCtx, c, m, "killOp");
}
if (nsString.coll() == "$cmd.sys.unlock") {
- receivedPseudoCommand(opCtx, c, dbresponse, m, "fsyncUnlock");
- return;
+ return receivedPseudoCommand(opCtx, c, m, "fsyncUnlock");
}
} else {
opread(m);
@@ -548,17 +531,14 @@ void assembleResponse(OperationContext* opCtx,
long long logThresholdMs = serverGlobalParams.slowMS;
bool shouldLogOpDebug = shouldLog(logger::LogSeverity::Debug(1));
+ DbResponse dbresponse;
if (op == dbQuery) {
- if (isCommand) {
- receivedCommand(opCtx, nsString, c, dbresponse, m);
- } else {
- receivedQuery(opCtx, nsString, c, dbresponse, m);
- }
+ dbresponse = isCommand ? receivedCommand(opCtx, nsString, c, m)
+ : receivedQuery(opCtx, nsString, c, m);
} else if (op == dbCommand) {
- receivedRpc(opCtx, c, dbresponse, m);
+ dbresponse = receivedRpc(opCtx, c, m);
} else if (op == dbGetMore) {
- if (!receivedGetMore(opCtx, dbresponse, m, currentOp))
- shouldLogOpDebug = true;
+ dbresponse = receivedGetMore(opCtx, m, currentOp, &shouldLogOpDebug);
} else {
// The remaining operations do not return any response. They are fire-and-forget.
try {
@@ -644,6 +624,7 @@ void assembleResponse(OperationContext* opCtx,
}
recordCurOpMetrics(opCtx);
+ return dbresponse;
}
} // namespace mongo
diff --git a/src/mongo/db/assemble_response.h b/src/mongo/db/assemble_response.h
index 9a50dd52dd3..96d1ade53c9 100644
--- a/src/mongo/db/assemble_response.h
+++ b/src/mongo/db/assemble_response.h
@@ -40,9 +40,8 @@ class OperationContext;
// to indicate that the call is on behalf of a DBDirectClient.
extern const HostAndPort kHostAndPortForDirectClient;
-void assembleResponse(OperationContext* opCtx,
- Message& m,
- DbResponse& dbresponse,
- const HostAndPort& client);
+DbResponse assembleResponse(OperationContext* opCtx,
+ const Message& request,
+ const HostAndPort& client);
} // namespace mongo
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp
index 3a7b8852a58..4bf339e49b1 100644
--- a/src/mongo/db/db.cpp
+++ b/src/mongo/db/db.cpp
@@ -436,7 +436,7 @@ ExitCode _initAndListen(int listenPort) {
Client::initThread("initandlisten");
_initWireSpec();
- auto globalServiceContext = getGlobalServiceContext();
+ auto globalServiceContext = checked_cast<ServiceContextMongoD*>(getGlobalServiceContext());
globalServiceContext->setFastClockSource(FastClockSourceFactory::create(Milliseconds(10)));
globalServiceContext->setOpObserver(stdx::make_unique<OpObserverImpl>());
@@ -470,20 +470,18 @@ ExitCode _initAndListen(int listenPort) {
logProcessDetails();
- checked_cast<ServiceContextMongoD*>(getGlobalServiceContext())->createLockFile();
+ globalServiceContext->createLockFile();
transport::TransportLayerLegacy::Options options;
options.port = listenPort;
options.ipList = serverGlobalParams.bind_ip;
- auto sep =
- stdx::make_unique<ServiceEntryPointMongod>(getGlobalServiceContext()->getTransportLayer());
- auto sepPtr = sep.get();
-
- getGlobalServiceContext()->setServiceEntryPoint(std::move(sep));
+ globalServiceContext->setServiceEntryPoint(
+ stdx::make_unique<ServiceEntryPointMongod>(globalServiceContext->getTransportLayer()));
// Create, start, and attach the TL
- auto transportLayer = stdx::make_unique<transport::TransportLayerLegacy>(options, sepPtr);
+ auto transportLayer = stdx::make_unique<transport::TransportLayerLegacy>(
+ options, globalServiceContext->getServiceEntryPoint());
auto res = transportLayer->setup();
if (!res.isOK()) {
error() << "Failed to set up listener: " << res;
@@ -494,7 +492,7 @@ ExitCode _initAndListen(int listenPort) {
if (serverGlobalParams.isHttpInterfaceEnabled) {
dbWebServer.reset(new DbWebServer(serverGlobalParams.bind_ip,
serverGlobalParams.port + 1000,
- getGlobalServiceContext(),
+ globalServiceContext,
new RestAdminAccess()));
if (!dbWebServer->setupSockets()) {
error() << "Failed to set up sockets for HTTP interface during startup.";
@@ -502,7 +500,7 @@ ExitCode _initAndListen(int listenPort) {
}
}
- getGlobalServiceContext()->initializeGlobalStorageEngine();
+ globalServiceContext->initializeGlobalStorageEngine();
#ifdef MONGO_CONFIG_WIREDTIGER_ENABLED
if (EncryptionHooks::get(getGlobalServiceContext())->restartRequired()) {
@@ -525,7 +523,7 @@ ExitCode _initAndListen(int listenPort) {
}
// Warn if field name matches non-active registered storage engine.
- if (getGlobalServiceContext()->isRegisteredStorageEngine(e.fieldName())) {
+ if (globalServiceContext->isRegisteredStorageEngine(e.fieldName())) {
warning() << "Detected configuration for non-active storage engine "
<< e.fieldName() << " when current storage engine is "
<< storageGlobalParams.engine;
@@ -533,7 +531,7 @@ ExitCode _initAndListen(int listenPort) {
}
}
- if (!getGlobalServiceContext()->getGlobalStorageEngine()->getSnapshotManager()) {
+ if (!globalServiceContext->getGlobalStorageEngine()->getSnapshotManager()) {
if (moe::startupOptionsParsed.count("replication.enableMajorityReadConcern") &&
moe::startupOptionsParsed["replication.enableMajorityReadConcern"].as<bool>()) {
// Note: we are intentionally only erroring if the user explicitly requested that we
@@ -580,7 +578,7 @@ ExitCode _initAndListen(int listenPort) {
ScriptEngine::setup();
}
- auto startupOpCtx = getGlobalServiceContext()->makeOperationContext(&cc());
+ auto startupOpCtx = globalServiceContext->makeOperationContext(&cc());
repairDatabasesAndCheckVersion(startupOpCtx.get());
@@ -697,7 +695,7 @@ ExitCode _initAndListen(int listenPort) {
storageGlobalParams.engine != "devnull") {
Lock::GlobalWrite lk(startupOpCtx.get());
FeatureCompatibilityVersion::setIfCleanStartup(
- startupOpCtx.get(), repl::StorageInterface::get(getGlobalServiceContext()));
+ startupOpCtx.get(), repl::StorageInterface::get(globalServiceContext));
}
if (replSettings.usingReplSets() || (!replSettings.isMaster() && replSettings.isSlave()) ||
@@ -714,7 +712,7 @@ ExitCode _initAndListen(int listenPort) {
// operation context anymore
startupOpCtx.reset();
- auto start = getGlobalServiceContext()->addAndStartTransportLayer(std::move(transportLayer));
+ auto start = globalServiceContext->addAndStartTransportLayer(std::move(transportLayer));
if (!start.isOK()) {
error() << "Failed to start the listener: " << start.toString();
return EXIT_NET_ERROR;
@@ -960,10 +958,9 @@ static void shutdownTask() {
if (auto sr = grid.shardRegistry()) { // TODO: race: sr is a naked pointer
sr->shutdown();
}
-#if __has_feature(address_sanitizer)
- auto sep = static_cast<ServiceEntryPointMongod*>(serviceContext->getServiceEntryPoint());
- if (sep) {
+#if __has_feature(address_sanitizer)
+ if (auto sep = checked_cast<ServiceEntryPointImpl*>(serviceContext->getServiceEntryPoint())) {
// When running under address sanitizer, we get false positive leaks due to disorder around
// the lifecycle of a connection and request. When we are running under ASAN, we try a lot
// harder to dry up the server from active connections before going on to really shut down.
@@ -1001,7 +998,6 @@ static void shutdownTask() {
<< " active workers to drain; continuing with shutdown... ";
}
}
-
#endif
// Shutdown Full-Time Data Capture
diff --git a/src/mongo/db/dbdirectclient.cpp b/src/mongo/db/dbdirectclient.cpp
index 5b1499da351..60a001f534e 100644
--- a/src/mongo/db/dbdirectclient.cpp
+++ b/src/mongo/db/dbdirectclient.cpp
@@ -123,10 +123,9 @@ bool DBDirectClient::call(Message& toSend, Message& response, bool assertOk, str
DirectClientScope directClientScope(_opCtx);
LastError::get(_opCtx->getClient()).startRequest();
- DbResponse dbResponse;
CurOp curOp(_opCtx);
- assembleResponse(_opCtx, toSend, dbResponse, kHostAndPortForDirectClient);
- verify(!dbResponse.response.empty());
+ auto dbResponse = assembleResponse(_opCtx, toSend, kHostAndPortForDirectClient);
+ invariant(!dbResponse.response.empty());
response = std::move(dbResponse.response);
return true;
@@ -136,9 +135,9 @@ void DBDirectClient::say(Message& toSend, bool isRetry, string* actualServer) {
DirectClientScope directClientScope(_opCtx);
LastError::get(_opCtx->getClient()).startRequest();
- DbResponse dbResponse;
CurOp curOp(_opCtx);
- assembleResponse(_opCtx, toSend, dbResponse, kHostAndPortForDirectClient);
+ auto dbResponse = assembleResponse(_opCtx, toSend, kHostAndPortForDirectClient);
+ invariant(dbResponse.response.empty());
}
unique_ptr<DBClientCursor> DBDirectClient::query(const string& ns,
diff --git a/src/mongo/db/dbmessage.cpp b/src/mongo/db/dbmessage.cpp
index 6941a260bcc..8dd7def0cb3 100644
--- a/src/mongo/db/dbmessage.cpp
+++ b/src/mongo/db/dbmessage.cpp
@@ -30,10 +30,9 @@
#include "mongo/platform/basic.h"
#include "mongo/db/dbmessage.h"
-#include "mongo/db/operation_context.h"
+
#include "mongo/platform/strnlen.h"
#include "mongo/rpc/object_check.h"
-#include "mongo/transport/session.h"
namespace mongo {
@@ -177,28 +176,10 @@ OpQueryReplyBuilder::OpQueryReplyBuilder() : _buffer(32768) {
_buffer.skip(sizeof(QueryResult::Value));
}
-void OpQueryReplyBuilder::send(const transport::SessionHandle& session,
- int queryResultFlags,
- const Message& requestMsg,
- int nReturned,
- int startingFrom,
- long long cursorId) {
- Message response;
- putInMessage(&response, queryResultFlags, nReturned, startingFrom, cursorId);
-
- response.header().setId(nextMessageId());
- response.header().setResponseToMsgId(requestMsg.header().getId());
-
- uassertStatusOK(session->sinkMessage(response).wait());
-}
-
-void OpQueryReplyBuilder::sendCommandReply(const transport::SessionHandle& session,
- const Message& requestMsg) {
- send(session, /*queryFlags*/ 0, requestMsg, /*nReturned*/ 1);
-}
-
-void OpQueryReplyBuilder::putInMessage(
- Message* out, int queryResultFlags, int nReturned, int startingFrom, long long cursorId) {
+Message OpQueryReplyBuilder::toQueryReply(int queryResultFlags,
+ int nReturned,
+ int startingFrom,
+ long long cursorId) {
QueryResult::View qr = _buffer.buf();
qr.setResultFlags(queryResultFlags);
qr.msgdata().setLen(_buffer.len());
@@ -206,44 +187,17 @@ void OpQueryReplyBuilder::putInMessage(
qr.setCursorId(cursorId);
qr.setStartingFrom(startingFrom);
qr.setNReturned(nReturned);
- out->setData(_buffer.release()); // transport will free
+ return Message(_buffer.release());
}
-void replyToQuery(int queryResultFlags,
- const transport::SessionHandle& session,
- const Message& requestMsg,
- const void* data,
- int size,
- int nReturned,
- int startingFrom,
- long long cursorId) {
+DbResponse replyToQuery(int queryResultFlags,
+ const void* data,
+ int size,
+ int nReturned,
+ int startingFrom,
+ long long cursorId) {
OpQueryReplyBuilder reply;
reply.bufBuilderForResults().appendBuf(data, size);
- reply.send(session, queryResultFlags, requestMsg, nReturned, startingFrom, cursorId);
-}
-
-void replyToQuery(int queryResultFlags,
- const transport::SessionHandle& session,
- const Message& requestMsg,
- const BSONObj& responseObj) {
- replyToQuery(queryResultFlags,
- session,
- requestMsg,
- (void*)responseObj.objdata(),
- responseObj.objsize(),
- 1);
-}
-
-void replyToQuery(int queryResultFlags, const Message& m, DbResponse& dbresponse, BSONObj obj) {
- Message resp;
- replyToQuery(queryResultFlags, resp, obj);
- dbresponse.response = std::move(resp);
- dbresponse.responseToMsgId = m.header().getId();
-}
-
-void replyToQuery(int queryResultFlags, Message& response, const BSONObj& resultObj) {
- OpQueryReplyBuilder reply;
- resultObj.appendSelfToBufBuilder(reply.bufBuilderForResults());
- reply.putInMessage(&response, queryResultFlags, /*nReturned*/ 1);
+ return DbResponse{reply.toQueryReply(queryResultFlags, nReturned, startingFrom, cursorId)};
}
}
diff --git a/src/mongo/db/dbmessage.h b/src/mongo/db/dbmessage.h
index d6ed3a68684..959dc59edd0 100644
--- a/src/mongo/db/dbmessage.h
+++ b/src/mongo/db/dbmessage.h
@@ -35,8 +35,6 @@
#include "mongo/client/constants.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/server_options.h"
-#include "mongo/transport/session.h"
-#include "mongo/util/net/abstract_message_port.h"
#include "mongo/util/net/message.h"
namespace mongo {
@@ -317,13 +315,12 @@ public:
/**
* A response to a DbMessage.
+ *
+ * Order of fields makes DbResponse{funcReturningMessage()} valid.
*/
struct DbResponse {
- Message response;
- int32_t responseToMsgId;
- std::string exhaustNS; /* points to ns if exhaust mode. 0=normal mode*/
- DbResponse(Message r, int32_t rtId) : response(std::move(r)), responseToMsgId(rtId) {}
- DbResponse() = default;
+ Message response; // If empty, nothing will be returned to the client.
+ std::string exhaustNS; // Namespace of cursor if exhaust mode, else "".
};
/**
@@ -348,58 +345,39 @@ public:
}
/**
- * Finishes the reply and transfers the message buffer into 'out'.
+ * Finishes the reply and returns the message buffer.
*/
- void putInMessage(Message* out,
- int queryResultFlags,
- int nReturned,
- int startingFrom = 0,
- long long cursorId = 0);
+ Message toQueryReply(int queryResultFlags,
+ int nReturned,
+ int startingFrom = 0,
+ long long cursorId = 0);
/**
- * Finishes the reply and sends the message out to 'destination'.
+ * Similar to toQueryReply() but used for replying to a command.
*/
- void send(const transport::SessionHandle& session,
- int queryResultFlags,
- const Message& requestMsg,
- int nReturned,
- int startingFrom = 0,
- long long cursorId = 0);
-
- /**
- * Similar to send() but used for replying to a command.
- */
- void sendCommandReply(const transport::SessionHandle& session, const Message& requestMsg);
+ Message toCommandReply() {
+ return toQueryReply(0, 1);
+ }
private:
BufBuilder _buffer;
};
-void replyToQuery(int queryResultFlags,
- const transport::SessionHandle& session,
- const Message& requestMsg,
- const void* data,
- int size,
- int nReturned,
- int startingFrom = 0,
- long long cursorId = 0);
-
-/* object reply helper. */
-void replyToQuery(int queryResultFlags,
- const transport::SessionHandle& session,
- const Message& requestMsg,
- const BSONObj& responseObj);
+/**
+ * Helper to build a DbResponse from a buffer containing an OP_QUERY response.
+ */
+DbResponse replyToQuery(int queryResultFlags,
+ const void* data,
+ int size,
+ int nReturned,
+ int startingFrom = 0,
+ long long cursorId = 0);
-/* helper to do a reply using a DbResponse object */
-void replyToQuery(int queryResultFlags, const Message& m, DbResponse& dbresponse, BSONObj obj);
/**
- * Helper method for setting up a response object.
- *
- * @param queryResultFlags The flags to set to the response object.
- * @param response The object to be used for building the response. The internal buffer of
- * this object will contain the raw data from resultObj after a successful call.
- * @param resultObj The bson object that contains the reply data.
+ * Helper to build a DbRespose for OP_QUERY with a single reply object.
*/
-void replyToQuery(int queryResultFlags, Message& response, const BSONObj& resultObj);
+inline DbResponse replyToQuery(const BSONObj& obj, int queryResultFlags = 0) {
+ return replyToQuery(queryResultFlags, obj.objdata(), obj.objsize(), /*nReturned*/ 1);
+}
} // namespace mongo
diff --git a/src/mongo/db/service_entry_point_mongod.cpp b/src/mongo/db/service_entry_point_mongod.cpp
index 376a13b695f..c2abbecbe82 100644
--- a/src/mongo/db/service_entry_point_mongod.cpp
+++ b/src/mongo/db/service_entry_point_mongod.cpp
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2016 MongoDB Inc.
+ * Copyright (C) 2017 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
@@ -32,139 +32,14 @@
#include "mongo/db/service_entry_point_mongod.h"
-#include <vector>
-
#include "mongo/db/assemble_response.h"
-#include "mongo/db/client.h"
-#include "mongo/db/dbmessage.h"
-#include "mongo/stdx/thread.h"
-#include "mongo/transport/service_entry_point_utils.h"
-#include "mongo/transport/session.h"
-#include "mongo/transport/ticket.h"
-#include "mongo/transport/transport_layer.h"
-#include "mongo/util/concurrency/idle_thread_block.h"
-#include "mongo/util/exit.h"
-#include "mongo/util/log.h"
-#include "mongo/util/net/message.h"
-#include "mongo/util/net/socket_exception.h"
-#include "mongo/util/net/thread_idle_callback.h"
-#include "mongo/util/quick_exit.h"
-#include "mongo/util/scopeguard.h"
namespace mongo {
-namespace {
-
-// Set up proper headers for formatting an exhaust request, if we need to
-bool setExhaustMessage(Message* m, const DbResponse& dbresponse) {
- MsgData::View header = dbresponse.response.header();
- QueryResult::View qr = header.view2ptr();
- long long cursorid = qr.getCursorId();
-
- if (!cursorid) {
- return false;
- }
-
- verify(dbresponse.exhaustNS.size() && dbresponse.exhaustNS[0]);
-
- auto ns = dbresponse.exhaustNS; // reset() will free this
-
- m->reset();
-
- BufBuilder b(512);
- b.appendNum(static_cast<int>(0) /* size set later in appendData() */);
- b.appendNum(header.getId());
- b.appendNum(header.getResponseToMsgId());
- b.appendNum(static_cast<int>(dbGetMore));
- b.appendNum(static_cast<int>(0));
- b.appendStr(ns);
- b.appendNum(static_cast<int>(0)); // ntoreturn
- b.appendNum(cursorid);
-
- MsgData::View(b.buf()).setLen(b.len());
- m->setData(b.release());
-
- return true;
-}
-
-} // namespace
-
-using transport::Session;
-using transport::TransportLayer;
-
-ServiceEntryPointMongod::ServiceEntryPointMongod(TransportLayer* tl) : _tl(tl) {}
-
-void ServiceEntryPointMongod::startSession(transport::SessionHandle session) {
- // Pass ownership of the transport::SessionHandle into our worker thread. When this
- // thread exits, the session will end.
- launchWrappedServiceEntryWorkerThread(
- std::move(session), [this](const transport::SessionHandle& session) {
- _nWorkers.fetchAndAdd(1);
- auto guard = MakeGuard([&] { _nWorkers.fetchAndSubtract(1); });
-
- _sessionLoop(session);
- });
-}
-
-void ServiceEntryPointMongod::_sessionLoop(const transport::SessionHandle& session) {
- Message inMessage;
- bool inExhaust = false;
- int64_t counter = 0;
-
- while (true) {
- // 1. Source a Message from the client (unless we are exhausting)
- if (!inExhaust) {
- inMessage.reset();
- auto status = [&] {
- MONGO_IDLE_THREAD_BLOCK;
- return session->sourceMessage(&inMessage).wait();
- }();
-
- if (ErrorCodes::isInterruption(status.code()) ||
- ErrorCodes::isNetworkError(status.code())) {
- break;
- }
-
- // Our session may have been closed internally.
- if (status == TransportLayer::TicketSessionClosedStatus) {
- break;
- }
-
- uassertStatusOK(status);
- }
-
- // 2. Pass sourced Message up to mongod
- DbResponse dbresponse;
- {
- auto opCtx = cc().makeOperationContext();
- assembleResponse(opCtx.get(), inMessage, dbresponse, session->remote());
-
- // opCtx must go out of scope here so that the operation cannot show
- // up in currentOp results after the response reaches the client
- }
-
- // 3. Format our response, if we have one
- Message& toSink = dbresponse.response;
- if (!toSink.empty()) {
- toSink.header().setId(nextMessageId());
- toSink.header().setResponseToMsgId(inMessage.header().getId());
-
- // If this is an exhaust cursor, don't source more Messages
- if (dbresponse.exhaustNS.size() > 0 && setExhaustMessage(&inMessage, dbresponse)) {
- inExhaust = true;
- } else {
- inExhaust = false;
- }
-
- // 4. Sink our response to the client
- uassertStatusOK(session->sinkMessage(toSink).wait());
- } else {
- inExhaust = false;
- }
- if ((counter++ & 0xf) == 0) {
- markThreadIdle();
- }
- }
+DbResponse ServiceEntryPointMongod::handleRequest(OperationContext* opCtx,
+ const Message& request,
+ const HostAndPort& client) {
+ return assembleResponse(opCtx, request, client);
}
} // namespace mongo
diff --git a/src/mongo/db/service_entry_point_mongod.h b/src/mongo/db/service_entry_point_mongod.h
index 4917430f02f..cffc2280ab7 100644
--- a/src/mongo/db/service_entry_point_mongod.h
+++ b/src/mongo/db/service_entry_point_mongod.h
@@ -28,42 +28,22 @@
#pragma once
-#include <vector>
-
#include "mongo/base/disallow_copying.h"
-#include "mongo/platform/atomic_word.h"
-#include "mongo/transport/service_entry_point.h"
+#include "mongo/transport/service_entry_point_impl.h"
namespace mongo {
-namespace transport {
-class Session;
-class TransportLayer;
-} // namespace transport
-
/**
- * The entry point from the TransportLayer into Mongod. startSession() spawns and
- * detaches a new thread for each incoming connection (transport::Session).
+ * The entry point into mongod. Just a wrapper around assembleResponse.
*/
-class ServiceEntryPointMongod final : public ServiceEntryPoint {
+class ServiceEntryPointMongod final : public ServiceEntryPointImpl {
MONGO_DISALLOW_COPYING(ServiceEntryPointMongod);
public:
- explicit ServiceEntryPointMongod(transport::TransportLayer* tl);
-
- virtual ~ServiceEntryPointMongod() = default;
-
- void startSession(transport::SessionHandle session) override;
-
- std::size_t getNumberOfActiveWorkerThreads() const {
- return _nWorkers.load();
- }
-
-private:
- void _sessionLoop(const transport::SessionHandle& session);
-
- transport::TransportLayer* _tl;
- AtomicWord<std::size_t> _nWorkers;
+ using ServiceEntryPointImpl::ServiceEntryPointImpl;
+ DbResponse handleRequest(OperationContext* opCtx,
+ const Message& request,
+ const HostAndPort& client) override;
};
} // namespace mongo
diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp
index 66386fd5767..25ac876dfd2 100644
--- a/src/mongo/s/commands/strategy.cpp
+++ b/src/mongo/s/commands/strategy.cpp
@@ -195,7 +195,7 @@ MONGO_INITIALIZER(InitializeRegisterErrorHandler)(InitializerContext* const) {
} // namespace
-void Strategy::queryOp(OperationContext* opCtx, const NamespaceString& nss, DbMessage* dbm) {
+DbResponse Strategy::queryOp(OperationContext* opCtx, const NamespaceString& nss, DbMessage* dbm) {
globalOpCounters.gotQuery();
const QueryMessage q(*dbm);
@@ -254,15 +254,7 @@ void Strategy::queryOp(OperationContext* opCtx, const NamespaceString& nss, DbMe
opCtx, findCommand, queryRequest, verbosity, metadata, &explainBuilder));
BSONObj explainObj = explainBuilder.done();
- replyToQuery(0, // query result flags
- client->session(),
- dbm->msg(),
- static_cast<const void*>(explainObj.objdata()),
- explainObj.objsize(),
- 1, // numResults
- 0, // startingFrom
- CursorId(0));
- return;
+ return replyToQuery(explainObj);
}
// Do the work to generate the first batch of results. This blocks waiting to get responses from
@@ -293,21 +285,17 @@ void Strategy::queryOp(OperationContext* opCtx, const NamespaceString& nss, DbMe
numResults++;
}
- reply.send(client->session(),
- 0, // query result flags
- dbm->msg(),
- numResults,
- 0, // startingFrom
- cursorId.getValue());
+ return DbResponse{reply.toQueryReply(0, // query result flags
+ numResults,
+ 0, // startingFrom
+ cursorId.getValue())};
}
-void Strategy::clientCommandOp(OperationContext* opCtx,
- const NamespaceString& nss,
- DbMessage* dbm) {
+DbResponse Strategy::clientCommandOp(OperationContext* opCtx,
+ const NamespaceString& nss,
+ DbMessage* dbm) {
const QueryMessage q(*dbm);
- Client* const client = opCtx->getClient();
-
LOG(3) << "command: " << q.ns << " " << redact(q.query) << " ntoreturn: " << q.ntoreturn
<< " options: " << q.queryOptions;
@@ -336,22 +324,17 @@ void Strategy::clientCommandOp(OperationContext* opCtx,
const NamespaceString interposedNss("admin", "$cmd");
BSONObjBuilder reply;
runAgainstRegistered(opCtx, interposedNss, interposedCmd, reply, q.queryOptions);
- replyToQuery(0, client->session(), dbm->msg(), reply.done());
+ return replyToQuery(reply.done());
};
if (nss.coll() == "$cmd.sys.inprog") {
- upgradeToRealCommand("currentOp");
- return;
+ return upgradeToRealCommand("currentOp");
} else if (nss.coll() == "$cmd.sys.killop") {
- upgradeToRealCommand("killOp");
- return;
+ return upgradeToRealCommand("killOp");
} else if (nss.coll() == "$cmd.sys.unlock") {
- replyToQuery(0,
- client->session(),
- dbm->msg(),
- BSON("err"
- << "can't do unlock through mongos"));
- return;
+ return replyToQuery(BSON("$err"
+ << "can't do unlock through mongos"),
+ ResultFlag_ErrSet);
}
// No pseudo-command found, fall through to execute as a regular query
@@ -418,8 +401,7 @@ void Strategy::clientCommandOp(OperationContext* opCtx,
BSONObjBuilder builder(reply.bufBuilderForResults());
runAgainstRegistered(opCtx, NamespaceString(q.ns), cmdObj, builder, q.queryOptions);
}
- reply.sendCommandReply(client->session(), dbm->msg());
- return;
+ return DbResponse{reply.toCommandReply()};
} catch (const StaleConfigException& e) {
if (loops <= 0)
throw e;
@@ -444,8 +426,7 @@ void Strategy::clientCommandOp(OperationContext* opCtx,
BSONObjBuilder builder(reply.bufBuilderForResults());
Command::appendCommandStatus(builder, e.toStatus());
}
- reply.sendCommandReply(client->session(), dbm->msg());
- return;
+ return DbResponse{reply.toCommandReply()};
}
}
}
@@ -479,7 +460,7 @@ void Strategy::commandOp(OperationContext* opCtx,
}
}
-void Strategy::getMore(OperationContext* opCtx, const NamespaceString& nss, DbMessage* dbm) {
+DbResponse Strategy::getMore(OperationContext* opCtx, const NamespaceString& nss, DbMessage* dbm) {
const int ntoreturn = dbm->pullInt();
uassert(
34424, str::stream() << "Invalid ntoreturn for OP_GET_MORE: " << ntoreturn, ntoreturn >= 0);
@@ -487,15 +468,12 @@ void Strategy::getMore(OperationContext* opCtx, const NamespaceString& nss, DbMe
globalOpCounters.gotGetMore();
- Client* const client = opCtx->getClient();
-
// TODO: Handle stale config exceptions here from coll being dropped or sharded during op for
// now has same semantics as legacy request.
auto statusGetDb = Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, nss.db());
if (statusGetDb == ErrorCodes::NamespaceNotFound) {
- replyToQuery(ResultFlag_CursorNotFound, client->session(), dbm->msg(), 0, 0, 0);
- return;
+ return replyToQuery(ResultFlag_CursorNotFound, nullptr, 0, 0);
}
uassertStatusOK(statusGetDb);
@@ -508,8 +486,7 @@ void Strategy::getMore(OperationContext* opCtx, const NamespaceString& nss, DbMe
auto cursorResponse = ClusterFind::runGetMore(opCtx, getMoreRequest);
if (cursorResponse == ErrorCodes::CursorNotFound) {
- replyToQuery(ResultFlag_CursorNotFound, client->session(), dbm->msg(), 0, 0, 0);
- return;
+ return replyToQuery(ResultFlag_CursorNotFound, nullptr, 0, 0);
}
uassertStatusOK(cursorResponse.getStatus());
@@ -522,14 +499,12 @@ void Strategy::getMore(OperationContext* opCtx, const NamespaceString& nss, DbMe
++numResults;
}
- replyToQuery(0,
- client->session(),
- dbm->msg(),
- buffer.buf(),
- buffer.len(),
- numResults,
- cursorResponse.getValue().getNumReturnedSoFar().value_or(0),
- cursorResponse.getValue().getCursorId());
+ return replyToQuery(0,
+ buffer.buf(),
+ buffer.len(),
+ numResults,
+ cursorResponse.getValue().getNumReturnedSoFar().value_or(0),
+ cursorResponse.getValue().getCursorId());
}
void Strategy::killCursors(OperationContext* opCtx, DbMessage* dbm) {
diff --git a/src/mongo/s/commands/strategy.h b/src/mongo/s/commands/strategy.h
index 03a1ab0077b..f91e3d61793 100644
--- a/src/mongo/s/commands/strategy.h
+++ b/src/mongo/s/commands/strategy.h
@@ -37,6 +37,7 @@
namespace mongo {
class DbMessage;
+struct DbResponse;
class NamespaceString;
class OperationContext;
class QueryRequest;
@@ -56,13 +57,13 @@ public:
*
* Must not be called with legacy '.$cmd' commands.
*/
- static void queryOp(OperationContext* opCtx, const NamespaceString& nss, DbMessage* dbm);
+ static DbResponse queryOp(OperationContext* opCtx, const NamespaceString& nss, DbMessage* dbm);
/**
* Handles a legacy-style getMore request and sends the response back on success (or cursor not
* found) or throws on error.
*/
- static void getMore(OperationContext* opCtx, const NamespaceString& nss, DbMessage* dbm);
+ static DbResponse getMore(OperationContext* opCtx, const NamespaceString& nss, DbMessage* dbm);
/**
* Handles a legacy-style killCursors request. Doesn't send any response on success or throws on
@@ -84,9 +85,9 @@ public:
* Catches StaleConfigException errors and retries the command automatically after refreshing
* the metadata for the failing namespace.
*/
- static void clientCommandOp(OperationContext* opCtx,
- const NamespaceString& nss,
- DbMessage* dbm);
+ static DbResponse clientCommandOp(OperationContext* opCtx,
+ const NamespaceString& nss,
+ DbMessage* dbm);
/**
* Helper to run an explain of a find operation on the shards. Fills 'out' with the result of
diff --git a/src/mongo/s/service_entry_point_mongos.cpp b/src/mongo/s/service_entry_point_mongos.cpp
index 240c1392c33..118296a2690 100644
--- a/src/mongo/s/service_entry_point_mongos.cpp
+++ b/src/mongo/s/service_entry_point_mongos.cpp
@@ -40,19 +40,12 @@
#include "mongo/s/client/shard_connection.h"
#include "mongo/s/cluster_last_error_info.h"
#include "mongo/s/commands/strategy.h"
-#include "mongo/transport/service_entry_point_utils.h"
-#include "mongo/transport/session.h"
-#include "mongo/transport/transport_layer.h"
-#include "mongo/util/concurrency/idle_thread_block.h"
#include "mongo/util/log.h"
#include "mongo/util/net/message.h"
-#include "mongo/util/net/thread_idle_callback.h"
#include "mongo/util/scopeguard.h"
namespace mongo {
-using transport::TransportLayer;
-
namespace {
BSONObj buildErrReply(const DBException& ex) {
@@ -67,126 +60,89 @@ BSONObj buildErrReply(const DBException& ex) {
} // namespace
-ServiceEntryPointMongos::ServiceEntryPointMongos(TransportLayer* tl) : _tl(tl) {}
-void ServiceEntryPointMongos::startSession(transport::SessionHandle session) {
- launchWrappedServiceEntryWorkerThread(
- std::move(session),
- [this](const transport::SessionHandle& session) { _sessionLoop(session); });
-}
+DbResponse ServiceEntryPointMongos::handleRequest(OperationContext* opCtx,
+ const Message& message,
+ const HostAndPort& remote) {
+ // Release any cached egress connections for client back to pool before destroying
+ auto guard = MakeGuard(ShardConnection::releaseMyConnections);
-void ServiceEntryPointMongos::_sessionLoop(const transport::SessionHandle& session) {
- int64_t counter = 0;
+ const int32_t msgId = message.header().getId();
+ const NetworkOp op = message.operation();
- while (true) {
- // Release any cached egress connections for client back to pool before destroying
- auto guard = MakeGuard(ShardConnection::releaseMyConnections);
+ // This exception will not be returned to the caller, but will be logged and will close the
+ // connection
+ uassert(ErrorCodes::IllegalOperation,
+ str::stream() << "Message type " << op << " is not supported.",
+ isSupportedNetworkOp(op));
- Message message;
+ // Start a new LastError session. Any exceptions thrown from here onwards will be returned
+ // to the caller (if the type of the message permits it).
+ auto client = opCtx->getClient();
+ if (!ClusterLastErrorInfo::get(client)) {
+ ClusterLastErrorInfo::get(client) = std::make_shared<ClusterLastErrorInfo>();
+ }
+ ClusterLastErrorInfo::get(client)->newRequest();
+ LastError::get(client).startRequest();
- // Source a Message from the client
- {
- auto status = [&] {
- MONGO_IDLE_THREAD_BLOCK;
- return session->sourceMessage(&message).wait();
- }();
+ DbMessage dbm(message);
- if (ErrorCodes::isInterruption(status.code()) ||
- ErrorCodes::isNetworkError(status.code())) {
- break;
- }
+ NamespaceString nss;
+ DbResponse dbResponse;
+ try {
+ if (dbm.messageShouldHaveNs()) {
+ nss = NamespaceString(StringData(dbm.getns()));
- // Our session may have been closed internally.
- if (status == TransportLayer::TicketSessionClosedStatus) {
- break;
- }
+ uassert(ErrorCodes::InvalidNamespace,
+ str::stream() << "Invalid ns [" << nss.ns() << "]",
+ nss.isValid());
- uassertStatusOK(status);
+ uassert(ErrorCodes::IllegalOperation,
+ "Can't use 'local' database through mongos",
+ nss.db() != NamespaceString::kLocalDb);
}
- auto opCtx = cc().makeOperationContext();
+ AuthorizationSession::get(opCtx->getClient())->startRequest(opCtx);
- const int32_t msgId = message.header().getId();
+ LOG(3) << "Request::process begin ns: " << nss << " msg id: " << msgId
+ << " op: " << networkOpToString(op);
- const NetworkOp op = message.operation();
+ switch (op) {
+ case dbQuery:
+ if (nss.isCommand() || nss.isSpecialCommand()) {
+ dbResponse = Strategy::clientCommandOp(opCtx, nss, &dbm);
+ } else {
+ dbResponse = Strategy::queryOp(opCtx, nss, &dbm);
+ }
+ break;
+ case dbGetMore:
+ dbResponse = Strategy::getMore(opCtx, nss, &dbm);
+ break;
+ case dbKillCursors:
+ Strategy::killCursors(opCtx, &dbm); // No Response.
+ break;
+ default:
+ Strategy::writeOp(opCtx, &dbm); // No Response.
+ break;
+ }
- // This exception will not be returned to the caller, but will be logged and will close the
- // connection
- uassert(ErrorCodes::IllegalOperation,
- str::stream() << "Message type " << op << " is not supported.",
- isSupportedNetworkOp(op));
+ LOG(3) << "Request::process end ns: " << nss << " msg id: " << msgId
+ << " op: " << networkOpToString(op);
- // Start a new LastError session. Any exceptions thrown from here onwards will be returned
- // to the caller (if the type of the message permits it).
- auto client = opCtx->getClient();
- if (!ClusterLastErrorInfo::get(client)) {
- ClusterLastErrorInfo::get(client) = std::make_shared<ClusterLastErrorInfo>();
- }
- ClusterLastErrorInfo::get(client)->newRequest();
- LastError::get(client).startRequest();
-
- DbMessage dbm(message);
-
- NamespaceString nss;
-
- try {
-
- if (dbm.messageShouldHaveNs()) {
- nss = NamespaceString(StringData(dbm.getns()));
-
- uassert(ErrorCodes::InvalidNamespace,
- str::stream() << "Invalid ns [" << nss.ns() << "]",
- nss.isValid());
-
- uassert(ErrorCodes::IllegalOperation,
- "Can't use 'local' database through mongos",
- nss.db() != NamespaceString::kLocalDb);
- }
-
- AuthorizationSession::get(opCtx->getClient())->startRequest(opCtx.get());
-
- LOG(3) << "Request::process begin ns: " << nss << " msg id: " << msgId
- << " op: " << networkOpToString(op);
-
- switch (op) {
- case dbQuery:
- if (nss.isCommand() || nss.isSpecialCommand()) {
- Strategy::clientCommandOp(opCtx.get(), nss, &dbm);
- } else {
- Strategy::queryOp(opCtx.get(), nss, &dbm);
- }
- break;
- case dbGetMore:
- Strategy::getMore(opCtx.get(), nss, &dbm);
- break;
- case dbKillCursors:
- Strategy::killCursors(opCtx.get(), &dbm);
- break;
- default:
- Strategy::writeOp(opCtx.get(), &dbm);
- break;
- }
-
- LOG(3) << "Request::process end ns: " << nss << " msg id: " << msgId
- << " op: " << networkOpToString(op);
-
- } catch (const DBException& ex) {
- LOG(1) << "Exception thrown"
- << " while processing " << networkOpToString(op) << " op"
- << " for " << nss.ns() << causedBy(ex);
-
- if (op == dbQuery || op == dbGetMore) {
- replyToQuery(ResultFlag_ErrSet, session, message, buildErrReply(ex));
- }
-
- // We *always* populate the last error for now
- LastError::get(opCtx->getClient()).setLastError(ex.getCode(), ex.what());
- }
+ } catch (const DBException& ex) {
+ LOG(1) << "Exception thrown while processing " << networkOpToString(op) << " op for "
+ << nss.ns() << causedBy(ex);
- if ((counter++ & 0xf) == 0) {
- markThreadIdle();
+ if (op == dbQuery || op == dbGetMore) {
+ dbResponse = replyToQuery(buildErrReply(ex), ResultFlag_ErrSet);
+ } else {
+ // No Response.
}
+
+ // We *always* populate the last error for now
+ LastError::get(opCtx->getClient()).setLastError(ex.getCode(), ex.what());
}
+ return dbResponse;
}
} // namespace mongo
diff --git a/src/mongo/s/service_entry_point_mongos.h b/src/mongo/s/service_entry_point_mongos.h
index acf42d89fc9..d6440f69264 100644
--- a/src/mongo/s/service_entry_point_mongos.h
+++ b/src/mongo/s/service_entry_point_mongos.h
@@ -31,33 +31,21 @@
#include <vector>
#include "mongo/base/disallow_copying.h"
-#include "mongo/transport/service_entry_point.h"
+#include "mongo/transport/service_entry_point_impl.h"
namespace mongo {
-namespace transport {
-class Session;
-class TransportLayer;
-} // namespace transport
-
/**
- * The entry point from the TransportLayer into Mongos. startSession() spawns and
- * detaches a new thread for each incoming connection (transport::Session).
+ * The entry point from the TransportLayer into Mongos.
*/
-class ServiceEntryPointMongos final : public ServiceEntryPoint {
+class ServiceEntryPointMongos final : public ServiceEntryPointImpl {
MONGO_DISALLOW_COPYING(ServiceEntryPointMongos);
public:
- ServiceEntryPointMongos(transport::TransportLayer* tl);
-
- virtual ~ServiceEntryPointMongos() = default;
-
- void startSession(transport::SessionHandle session) override;
-
-private:
- void _sessionLoop(const transport::SessionHandle& session);
-
- transport::TransportLayer* _tl;
+ using ServiceEntryPointImpl::ServiceEntryPointImpl;
+ DbResponse handleRequest(OperationContext* opCtx,
+ const Message& request,
+ const HostAndPort& client) override;
};
} // namespace mongo
diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript
index 75b589c5ec5..b365613660d 100644
--- a/src/mongo/transport/SConscript
+++ b/src/mongo/transport/SConscript
@@ -74,6 +74,7 @@ env.Library(
target='service_entry_point_utils',
source=[
'service_entry_point_utils.cpp',
+ 'service_entry_point_impl.cpp',
],
LIBDEPS=[
"$BUILD_DIR/mongo/db/service_context",
diff --git a/src/mongo/transport/service_entry_point.h b/src/mongo/transport/service_entry_point.h
index e0a1ee7a907..f9f82ed3273 100644
--- a/src/mongo/transport/service_entry_point.h
+++ b/src/mongo/transport/service_entry_point.h
@@ -29,6 +29,7 @@
#pragma once
#include "mongo/base/disallow_copying.h"
+#include "mongo/db/dbmessage.h"
#include "mongo/transport/session.h"
namespace mongo {
@@ -51,6 +52,13 @@ public:
*/
virtual void startSession(transport::SessionHandle session) = 0;
+ /**
+ * Processes a request and fills out a DbResponse.
+ */
+ virtual DbResponse handleRequest(OperationContext* opCtx,
+ const Message& request,
+ const HostAndPort& client) = 0;
+
protected:
ServiceEntryPoint() = default;
};
diff --git a/src/mongo/transport/service_entry_point_impl.cpp b/src/mongo/transport/service_entry_point_impl.cpp
new file mode 100644
index 00000000000..e6db7f875cd
--- /dev/null
+++ b/src/mongo/transport/service_entry_point_impl.cpp
@@ -0,0 +1,169 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/transport/service_entry_point_impl.h"
+
+#include <vector>
+
+#include "mongo/db/assemble_response.h"
+#include "mongo/db/client.h"
+#include "mongo/db/dbmessage.h"
+#include "mongo/stdx/thread.h"
+#include "mongo/transport/service_entry_point_utils.h"
+#include "mongo/transport/session.h"
+#include "mongo/transport/ticket.h"
+#include "mongo/transport/transport_layer.h"
+#include "mongo/util/concurrency/idle_thread_block.h"
+#include "mongo/util/exit.h"
+#include "mongo/util/log.h"
+#include "mongo/util/net/message.h"
+#include "mongo/util/net/socket_exception.h"
+#include "mongo/util/net/thread_idle_callback.h"
+#include "mongo/util/quick_exit.h"
+#include "mongo/util/scopeguard.h"
+
+namespace mongo {
+namespace {
+
+// Set up proper headers for formatting an exhaust request, if we need to
+bool setExhaustMessage(Message* m, const DbResponse& dbresponse) {
+ MsgData::View header = dbresponse.response.header();
+ QueryResult::View qr = header.view2ptr();
+ long long cursorid = qr.getCursorId();
+
+ if (!cursorid) {
+ return false;
+ }
+
+ verify(dbresponse.exhaustNS.size() && dbresponse.exhaustNS[0]);
+
+ auto ns = dbresponse.exhaustNS; // reset() will free this
+
+ m->reset();
+
+ BufBuilder b(512);
+ b.appendNum(static_cast<int>(0) /* size set later in appendData() */);
+ b.appendNum(header.getId());
+ b.appendNum(header.getResponseToMsgId());
+ b.appendNum(static_cast<int>(dbGetMore));
+ b.appendNum(static_cast<int>(0));
+ b.appendStr(ns);
+ b.appendNum(static_cast<int>(0)); // ntoreturn
+ b.appendNum(cursorid);
+
+ MsgData::View(b.buf()).setLen(b.len());
+ m->setData(b.release());
+
+ return true;
+}
+
+} // namespace
+
+using transport::Session;
+using transport::TransportLayer;
+
+void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {
+ // Pass ownership of the transport::SessionHandle into our worker thread. When this
+ // thread exits, the session will end.
+ launchWrappedServiceEntryWorkerThread(
+ std::move(session), [this](const transport::SessionHandle& session) {
+ _nWorkers.fetchAndAdd(1);
+ auto guard = MakeGuard([&] { _nWorkers.fetchAndSubtract(1); });
+
+ _sessionLoop(session);
+ });
+}
+
+void ServiceEntryPointImpl::_sessionLoop(const transport::SessionHandle& session) {
+ Message inMessage;
+ bool inExhaust = false;
+ int64_t counter = 0;
+
+ while (true) {
+ // 1. Source a Message from the client (unless we are exhausting)
+ if (!inExhaust) {
+ inMessage.reset();
+ auto status = [&] {
+ MONGO_IDLE_THREAD_BLOCK;
+ return session->sourceMessage(&inMessage).wait();
+ }();
+
+ if (ErrorCodes::isInterruption(status.code()) ||
+ ErrorCodes::isNetworkError(status.code())) {
+ break;
+ }
+
+ // Our session may have been closed internally.
+ if (status == TransportLayer::TicketSessionClosedStatus) {
+ break;
+ }
+
+ uassertStatusOK(status);
+ }
+
+ // 2. Pass sourced Message to handler to generate response.
+ auto opCtx = cc().makeOperationContext();
+
+ // The handleRequest is implemented in a subclass for mongod/mongos and actually all the
+ // database work for this request.
+ DbResponse dbresponse = this->handleRequest(opCtx.get(), inMessage, session->remote());
+
+ // opCtx must be destroyed here so that the operation cannot show
+ // up in currentOp results after the response reaches the client
+ opCtx.reset();
+
+ // 3. Format our response, if we have one
+ Message& toSink = dbresponse.response;
+ if (!toSink.empty()) {
+ toSink.header().setId(nextMessageId());
+ toSink.header().setResponseToMsgId(inMessage.header().getId());
+
+ // If this is an exhaust cursor, don't source more Messages
+ if (dbresponse.exhaustNS.size() > 0 && setExhaustMessage(&inMessage, dbresponse)) {
+ inExhaust = true;
+ } else {
+ inExhaust = false;
+ }
+
+ // 4. Sink our response to the client
+ uassertStatusOK(session->sinkMessage(toSink).wait());
+ } else {
+ inExhaust = false;
+ }
+
+ if ((counter++ & 0xf) == 0) {
+ markThreadIdle();
+ }
+ }
+}
+
+} // namespace mongo
diff --git a/src/mongo/transport/service_entry_point_impl.h b/src/mongo/transport/service_entry_point_impl.h
new file mode 100644
index 00000000000..aeb5ce5016e
--- /dev/null
+++ b/src/mongo/transport/service_entry_point_impl.h
@@ -0,0 +1,73 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <vector>
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/platform/atomic_word.h"
+#include "mongo/transport/service_entry_point.h"
+
+namespace mongo {
+
+struct DbResponse;
+class OperationContext;
+
+namespace transport {
+class Session;
+class TransportLayer;
+} // namespace transport
+
+/**
+ * A basic entry point from the TransportLayer into a server.
+ *
+ * The server logic is implemented inside of handleRequest() by a subclass.
+ * startSession() spawns and detaches a new thread for each incoming connection
+ * (transport::Session).
+ */
+class ServiceEntryPointImpl : public ServiceEntryPoint {
+ MONGO_DISALLOW_COPYING(ServiceEntryPointImpl);
+
+public:
+ explicit ServiceEntryPointImpl(transport::TransportLayer* tl) : _tl(tl) {}
+
+ void startSession(transport::SessionHandle session) final;
+
+ std::size_t getNumberOfActiveWorkerThreads() const {
+ return _nWorkers.load();
+ }
+
+private:
+ void _sessionLoop(const transport::SessionHandle& session);
+
+ transport::TransportLayer* _tl;
+ AtomicWord<std::size_t> _nWorkers;
+};
+
+} // namespace mongo
diff --git a/src/mongo/transport/service_entry_point_mock.cpp b/src/mongo/transport/service_entry_point_mock.cpp
index 667b1d5ae5e..243ebfbec94 100644
--- a/src/mongo/transport/service_entry_point_mock.cpp
+++ b/src/mongo/transport/service_entry_point_mock.cpp
@@ -42,38 +42,8 @@ namespace mongo {
using namespace transport;
-namespace {
-void setOkResponse(Message* m) {
- // Need to set up our { ok : 1 } response.
- BufBuilder b{};
-
- // Leave room for the message header
- b.skip(mongo::MsgData::MsgDataHeaderSize);
-
- // Add our response
- auto okObj = BSON("ok" << 1.0);
- okObj.appendSelfToBufBuilder(b);
-
- // Add some metadata
- auto metadata = BSONObj();
- metadata.appendSelfToBufBuilder(b);
-
- // Set Message header fields
- MsgData::View msg = b.buf();
- msg.setLen(b.len());
- msg.setOperation(dbCommandReply);
-
- // Set the message, transfer buffer ownership to Message
- m->reset();
- m->setData(b.release());
-}
-
-} // namespace
-
ServiceEntryPointMock::ServiceEntryPointMock(transport::TransportLayer* tl)
- : _tl(tl), _outMessage(), _inShutdown(false) {
- setOkResponse(&_outMessage);
-}
+ : _tl(tl), _inShutdown(false) {}
ServiceEntryPointMock::~ServiceEntryPointMock() {
{
@@ -104,11 +74,38 @@ void ServiceEntryPointMock::run(transport::SessionHandle session) {
break;
}
+ auto resp = handleRequest(nullptr, inMessage, session->remote());
+
// sinkMessage()
- if (!session->sinkMessage(_outMessage).wait().isOK()) {
+ if (!session->sinkMessage(resp.response).wait().isOK()) {
break;
}
}
}
+DbResponse ServiceEntryPointMock::handleRequest(OperationContext* opCtx,
+ const Message& request,
+ const HostAndPort& client) {
+ // Need to set up our { ok : 1 } response.
+ BufBuilder b{};
+
+ // Leave room for the message header
+ b.skip(mongo::MsgData::MsgDataHeaderSize);
+
+ // Add our response
+ auto okObj = BSON("ok" << 1.0);
+ okObj.appendSelfToBufBuilder(b);
+
+ // Add some metadata
+ auto metadata = BSONObj();
+ metadata.appendSelfToBufBuilder(b);
+
+ // Set Message header fields
+ MsgData::View msg = b.buf();
+ msg.setLen(b.len());
+ msg.setOperation(dbCommandReply);
+
+ return {Message(b.release()), ""};
+}
+
} // namespace mongo
diff --git a/src/mongo/transport/service_entry_point_mock.h b/src/mongo/transport/service_entry_point_mock.h
index eadc9367fa8..94f011d9f3e 100644
--- a/src/mongo/transport/service_entry_point_mock.h
+++ b/src/mongo/transport/service_entry_point_mock.h
@@ -65,13 +65,15 @@ public:
*/
void startSession(transport::SessionHandle session) override;
+ DbResponse handleRequest(OperationContext* opCtx,
+ const Message& request,
+ const HostAndPort& client) override;
+
private:
void run(transport::SessionHandle session);
transport::TransportLayer* _tl;
- Message _outMessage;
-
stdx::mutex _shutdownLock;
bool _inShutdown;
diff --git a/src/mongo/transport/transport_layer_legacy_test.cpp b/src/mongo/transport/transport_layer_legacy_test.cpp
index b61ed7559d7..286696b4d6c 100644
--- a/src/mongo/transport/transport_layer_legacy_test.cpp
+++ b/src/mongo/transport/transport_layer_legacy_test.cpp
@@ -38,7 +38,7 @@ namespace {
class ServiceEntryPointUtil : public ServiceEntryPoint {
public:
- void startSession(transport::SessionHandle session) {
+ void startSession(transport::SessionHandle session) override {
Message m;
Status s = session->sourceMessage(&m).wait();
@@ -47,6 +47,12 @@ public:
tll->end(session);
}
+ DbResponse handleRequest(OperationContext* opCtx,
+ const Message& request,
+ const HostAndPort& client) override {
+ MONGO_UNREACHABLE;
+ }
+
transport::TransportLayerLegacy* tll = nullptr;
};