diff options
Diffstat (limited to 'src/mongo')
21 files changed, 520 insertions, 578 deletions
diff --git a/src/mongo/client/dbclientcursor.cpp b/src/mongo/client/dbclientcursor.cpp index bb74398b35f..db37bea3bf0 100644 --- a/src/mongo/client/dbclientcursor.cpp +++ b/src/mongo/client/dbclientcursor.cpp @@ -249,10 +249,8 @@ void DBClientCursor::commandDataReceived() { // HACK: If we got an OP_COMMANDREPLY, take the reply object // and shove it in to an OP_REPLY message. if (op == dbCommandReply) { - // Need to take ownership here as we destroy the underlying message. - BSONObj reply = commandReply->getCommandReply().getOwned(); - batch.m.reset(); - replyToQuery(0, batch.m, reply); + BSONObj reply = commandReply->getCommandReply(); + batch.m = replyToQuery(reply).response; } QueryResult::View qr = batch.m.singleData().view2ptr(); diff --git a/src/mongo/client/scoped_db_conn_test.cpp b/src/mongo/client/scoped_db_conn_test.cpp index 41ae8d1af0c..57d196e7a88 100644 --- a/src/mongo/client/scoped_db_conn_test.cpp +++ b/src/mongo/client/scoped_db_conn_test.cpp @@ -95,6 +95,12 @@ public: _replyDelay = delay; } + DbResponse handleRequest(OperationContext* opCtx, + const Message& request, + const HostAndPort& client) override { + MONGO_UNREACHABLE; + } + private: void run(transport::SessionHandle session) { Message inMessage; diff --git a/src/mongo/db/assemble_response.cpp b/src/mongo/db/assemble_response.cpp index 1c4444a2ce4..dce1610fb62 100644 --- a/src/mongo/db/assemble_response.cpp +++ b/src/mongo/db/assemble_response.cpp @@ -69,13 +69,13 @@ MONGO_FP_DECLARE(rsStopGetMore); namespace { using logger::LogComponent; -inline void opread(Message& m) { +inline void opread(const Message& m) { if (_diaglog.getLevel() & 2) { _diaglog.readop(m.singleData().view2ptr(), m.header().getLen()); } } -inline void opwrite(Message& m) { +inline void opwrite(const Message& m) { if (_diaglog.getLevel() & 1) { _diaglog.writeop(m.singleData().view2ptr(), m.header().getLen()); } @@ -143,15 +143,12 @@ void beginCommandOp(OperationContext* opCtx, const NamespaceString& nss, const B curop->setNS_inlock(nss.ns()); } -void receivedCommand(OperationContext* opCtx, - const NamespaceString& nss, - Client& client, - DbResponse& dbResponse, - Message& message) { +DbResponse receivedCommand(OperationContext* opCtx, + const NamespaceString& nss, + Client& client, + const Message& message) { invariant(nss.isCommand()); - const int32_t responseToMsgId = message.header().getId(); - DbMessage dbMessage(message); QueryMessage queryMessage(dbMessage); @@ -188,18 +185,12 @@ void receivedCommand(OperationContext* opCtx, op->debug().responseLength = response.header().dataLen(); - dbResponse.response = std::move(response); - dbResponse.responseToMsgId = responseToMsgId; + return {std::move(response)}; } -void receivedRpc(OperationContext* opCtx, - Client& client, - DbResponse& dbResponse, - Message& message) { +DbResponse receivedRpc(OperationContext* opCtx, Client& client, const Message& message) { invariant(message.operation() == dbCommand); - const int32_t responseToMsgId = message.header().getId(); - rpc::CommandReplyBuilder replyBuilder{}; auto curOp = CurOp::get(opCtx); @@ -229,19 +220,17 @@ void receivedRpc(OperationContext* opCtx, curOp->debug().responseLength = response.header().dataLen(); - dbResponse.response = std::move(response); - dbResponse.responseToMsgId = responseToMsgId; + return {std::move(response)}; } // In SERVER-7775 we reimplemented the pseudo-commands fsyncUnlock, inProg, and killOp // as ordinary commands. To support old clients for another release, this helper serves // to execute the real command from the legacy pseudo-command codepath. // TODO: remove after MongoDB 3.2 is released -void receivedPseudoCommand(OperationContext* opCtx, - Client& client, - DbResponse& dbResponse, - Message& message, - StringData realCommandName) { +DbResponse receivedPseudoCommand(OperationContext* opCtx, + Client& client, + const Message& message, + StringData realCommandName) { DbMessage originalDbm(message); auto originalNToSkip = originalDbm.pullInt(); @@ -288,23 +277,21 @@ void receivedPseudoCommand(OperationContext* opCtx, interposed.setData(dbQuery, cmdMsgBuf.buf(), cmdMsgBuf.len()); interposed.header().setId(message.header().getId()); - receivedCommand(opCtx, interposedNss, client, dbResponse, interposed); + return receivedCommand(opCtx, interposedNss, client, interposed); } -void receivedQuery(OperationContext* opCtx, - const NamespaceString& nss, - Client& c, - DbResponse& dbResponse, - Message& m) { +DbResponse receivedQuery(OperationContext* opCtx, + const NamespaceString& nss, + Client& c, + const Message& m) { invariant(!nss.isCommand()); globalOpCounters.gotQuery(); - int32_t responseToMsgId = m.header().getId(); - DbMessage d(m); QueryMessage q(d); CurOp& op = *CurOp::get(opCtx); + DbResponse dbResponse; try { Client* client = opCtx->getClient(); @@ -326,10 +313,10 @@ void receivedQuery(OperationContext* opCtx, } op.debug().responseLength = dbResponse.response.header().dataLen(); - dbResponse.responseToMsgId = responseToMsgId; + return dbResponse; } -void receivedKillCursors(OperationContext* opCtx, Message& m) { +void receivedKillCursors(OperationContext* opCtx, const Message& m) { LastError::get(opCtx->getClient()).disable(); DbMessage dbmessage(m); int n = dbmessage.pullInt(); @@ -354,7 +341,7 @@ void receivedKillCursors(OperationContext* opCtx, Message& m) { } } -void receivedInsert(OperationContext* opCtx, const NamespaceString& nsString, Message& m) { +void receivedInsert(OperationContext* opCtx, const NamespaceString& nsString, const Message& m) { auto insertOp = parseLegacyInsert(m); invariant(insertOp.ns == nsString); for (const auto& obj : insertOp.documents) { @@ -366,7 +353,7 @@ void receivedInsert(OperationContext* opCtx, const NamespaceString& nsString, Me performInserts(opCtx, insertOp); } -void receivedUpdate(OperationContext* opCtx, const NamespaceString& nsString, Message& m) { +void receivedUpdate(OperationContext* opCtx, const NamespaceString& nsString, const Message& m) { auto updateOp = parseLegacyUpdate(m); auto& singleUpdate = updateOp.updates[0]; invariant(updateOp.ns == nsString); @@ -387,7 +374,7 @@ void receivedUpdate(OperationContext* opCtx, const NamespaceString& nsString, Me performUpdates(opCtx, updateOp); } -void receivedDelete(OperationContext* opCtx, const NamespaceString& nsString, Message& m) { +void receivedDelete(OperationContext* opCtx, const NamespaceString& nsString, const Message& m) { auto deleteOp = parseLegacyDelete(m); auto& singleDelete = deleteOp.deletes[0]; invariant(deleteOp.ns == nsString); @@ -400,7 +387,10 @@ void receivedDelete(OperationContext* opCtx, const NamespaceString& nsString, Me performDeletes(opCtx, deleteOp); } -bool receivedGetMore(OperationContext* opCtx, DbResponse& dbresponse, Message& m, CurOp& curop) { +DbResponse receivedGetMore(OperationContext* opCtx, + const Message& m, + CurOp& curop, + bool* shouldLogOpDebug) { globalOpCounters.gotGetMore(); DbMessage d(m); @@ -421,6 +411,7 @@ bool receivedGetMore(OperationContext* opCtx, DbResponse& dbresponse, Message& m bool exhaust = false; bool isCursorAuthorized = false; + DbResponse dbresponse; try { const NamespaceString nsString(ns); uassert(ErrorCodes::InvalidNamespace, @@ -454,33 +445,28 @@ bool receivedGetMore(OperationContext* opCtx, DbResponse& dbresponse, Message& m curop.debug().exceptionInfo = e.getInfo(); - replyToQuery(ResultFlag_ErrSet, m, dbresponse, errObj); + dbresponse = replyToQuery(errObj, ResultFlag_ErrSet); curop.debug().responseLength = dbresponse.response.header().dataLen(); curop.debug().nreturned = 1; - return false; + *shouldLogOpDebug = true; + return dbresponse; } curop.debug().responseLength = dbresponse.response.header().dataLen(); auto queryResult = QueryResult::ConstView(dbresponse.response.buf()); curop.debug().nreturned = queryResult.getNReturned(); - dbresponse.responseToMsgId = m.header().getId(); - if (exhaust) { curop.debug().exhaust = true; dbresponse.exhaustNS = ns; } - return true; + return dbresponse; } } // namespace -// Returns false when request includes 'end' -void assembleResponse(OperationContext* opCtx, - Message& m, - DbResponse& dbresponse, - const HostAndPort& remote) { +DbResponse assembleResponse(OperationContext* opCtx, const Message& m, const HostAndPort& remote) { // before we lock... NetworkOp op = m.operation(); bool isCommand = false; @@ -511,16 +497,13 @@ void assembleResponse(OperationContext* opCtx, opwrite(m); if (nsString.coll() == "$cmd.sys.inprog") { - receivedPseudoCommand(opCtx, c, dbresponse, m, "currentOp"); - return; + return receivedPseudoCommand(opCtx, c, m, "currentOp"); } if (nsString.coll() == "$cmd.sys.killop") { - receivedPseudoCommand(opCtx, c, dbresponse, m, "killOp"); - return; + return receivedPseudoCommand(opCtx, c, m, "killOp"); } if (nsString.coll() == "$cmd.sys.unlock") { - receivedPseudoCommand(opCtx, c, dbresponse, m, "fsyncUnlock"); - return; + return receivedPseudoCommand(opCtx, c, m, "fsyncUnlock"); } } else { opread(m); @@ -548,17 +531,14 @@ void assembleResponse(OperationContext* opCtx, long long logThresholdMs = serverGlobalParams.slowMS; bool shouldLogOpDebug = shouldLog(logger::LogSeverity::Debug(1)); + DbResponse dbresponse; if (op == dbQuery) { - if (isCommand) { - receivedCommand(opCtx, nsString, c, dbresponse, m); - } else { - receivedQuery(opCtx, nsString, c, dbresponse, m); - } + dbresponse = isCommand ? receivedCommand(opCtx, nsString, c, m) + : receivedQuery(opCtx, nsString, c, m); } else if (op == dbCommand) { - receivedRpc(opCtx, c, dbresponse, m); + dbresponse = receivedRpc(opCtx, c, m); } else if (op == dbGetMore) { - if (!receivedGetMore(opCtx, dbresponse, m, currentOp)) - shouldLogOpDebug = true; + dbresponse = receivedGetMore(opCtx, m, currentOp, &shouldLogOpDebug); } else { // The remaining operations do not return any response. They are fire-and-forget. try { @@ -644,6 +624,7 @@ void assembleResponse(OperationContext* opCtx, } recordCurOpMetrics(opCtx); + return dbresponse; } } // namespace mongo diff --git a/src/mongo/db/assemble_response.h b/src/mongo/db/assemble_response.h index 9a50dd52dd3..96d1ade53c9 100644 --- a/src/mongo/db/assemble_response.h +++ b/src/mongo/db/assemble_response.h @@ -40,9 +40,8 @@ class OperationContext; // to indicate that the call is on behalf of a DBDirectClient. extern const HostAndPort kHostAndPortForDirectClient; -void assembleResponse(OperationContext* opCtx, - Message& m, - DbResponse& dbresponse, - const HostAndPort& client); +DbResponse assembleResponse(OperationContext* opCtx, + const Message& request, + const HostAndPort& client); } // namespace mongo diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index 3a7b8852a58..4bf339e49b1 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -436,7 +436,7 @@ ExitCode _initAndListen(int listenPort) { Client::initThread("initandlisten"); _initWireSpec(); - auto globalServiceContext = getGlobalServiceContext(); + auto globalServiceContext = checked_cast<ServiceContextMongoD*>(getGlobalServiceContext()); globalServiceContext->setFastClockSource(FastClockSourceFactory::create(Milliseconds(10))); globalServiceContext->setOpObserver(stdx::make_unique<OpObserverImpl>()); @@ -470,20 +470,18 @@ ExitCode _initAndListen(int listenPort) { logProcessDetails(); - checked_cast<ServiceContextMongoD*>(getGlobalServiceContext())->createLockFile(); + globalServiceContext->createLockFile(); transport::TransportLayerLegacy::Options options; options.port = listenPort; options.ipList = serverGlobalParams.bind_ip; - auto sep = - stdx::make_unique<ServiceEntryPointMongod>(getGlobalServiceContext()->getTransportLayer()); - auto sepPtr = sep.get(); - - getGlobalServiceContext()->setServiceEntryPoint(std::move(sep)); + globalServiceContext->setServiceEntryPoint( + stdx::make_unique<ServiceEntryPointMongod>(globalServiceContext->getTransportLayer())); // Create, start, and attach the TL - auto transportLayer = stdx::make_unique<transport::TransportLayerLegacy>(options, sepPtr); + auto transportLayer = stdx::make_unique<transport::TransportLayerLegacy>( + options, globalServiceContext->getServiceEntryPoint()); auto res = transportLayer->setup(); if (!res.isOK()) { error() << "Failed to set up listener: " << res; @@ -494,7 +492,7 @@ ExitCode _initAndListen(int listenPort) { if (serverGlobalParams.isHttpInterfaceEnabled) { dbWebServer.reset(new DbWebServer(serverGlobalParams.bind_ip, serverGlobalParams.port + 1000, - getGlobalServiceContext(), + globalServiceContext, new RestAdminAccess())); if (!dbWebServer->setupSockets()) { error() << "Failed to set up sockets for HTTP interface during startup."; @@ -502,7 +500,7 @@ ExitCode _initAndListen(int listenPort) { } } - getGlobalServiceContext()->initializeGlobalStorageEngine(); + globalServiceContext->initializeGlobalStorageEngine(); #ifdef MONGO_CONFIG_WIREDTIGER_ENABLED if (EncryptionHooks::get(getGlobalServiceContext())->restartRequired()) { @@ -525,7 +523,7 @@ ExitCode _initAndListen(int listenPort) { } // Warn if field name matches non-active registered storage engine. - if (getGlobalServiceContext()->isRegisteredStorageEngine(e.fieldName())) { + if (globalServiceContext->isRegisteredStorageEngine(e.fieldName())) { warning() << "Detected configuration for non-active storage engine " << e.fieldName() << " when current storage engine is " << storageGlobalParams.engine; @@ -533,7 +531,7 @@ ExitCode _initAndListen(int listenPort) { } } - if (!getGlobalServiceContext()->getGlobalStorageEngine()->getSnapshotManager()) { + if (!globalServiceContext->getGlobalStorageEngine()->getSnapshotManager()) { if (moe::startupOptionsParsed.count("replication.enableMajorityReadConcern") && moe::startupOptionsParsed["replication.enableMajorityReadConcern"].as<bool>()) { // Note: we are intentionally only erroring if the user explicitly requested that we @@ -580,7 +578,7 @@ ExitCode _initAndListen(int listenPort) { ScriptEngine::setup(); } - auto startupOpCtx = getGlobalServiceContext()->makeOperationContext(&cc()); + auto startupOpCtx = globalServiceContext->makeOperationContext(&cc()); repairDatabasesAndCheckVersion(startupOpCtx.get()); @@ -697,7 +695,7 @@ ExitCode _initAndListen(int listenPort) { storageGlobalParams.engine != "devnull") { Lock::GlobalWrite lk(startupOpCtx.get()); FeatureCompatibilityVersion::setIfCleanStartup( - startupOpCtx.get(), repl::StorageInterface::get(getGlobalServiceContext())); + startupOpCtx.get(), repl::StorageInterface::get(globalServiceContext)); } if (replSettings.usingReplSets() || (!replSettings.isMaster() && replSettings.isSlave()) || @@ -714,7 +712,7 @@ ExitCode _initAndListen(int listenPort) { // operation context anymore startupOpCtx.reset(); - auto start = getGlobalServiceContext()->addAndStartTransportLayer(std::move(transportLayer)); + auto start = globalServiceContext->addAndStartTransportLayer(std::move(transportLayer)); if (!start.isOK()) { error() << "Failed to start the listener: " << start.toString(); return EXIT_NET_ERROR; @@ -960,10 +958,9 @@ static void shutdownTask() { if (auto sr = grid.shardRegistry()) { // TODO: race: sr is a naked pointer sr->shutdown(); } -#if __has_feature(address_sanitizer) - auto sep = static_cast<ServiceEntryPointMongod*>(serviceContext->getServiceEntryPoint()); - if (sep) { +#if __has_feature(address_sanitizer) + if (auto sep = checked_cast<ServiceEntryPointImpl*>(serviceContext->getServiceEntryPoint())) { // When running under address sanitizer, we get false positive leaks due to disorder around // the lifecycle of a connection and request. When we are running under ASAN, we try a lot // harder to dry up the server from active connections before going on to really shut down. @@ -1001,7 +998,6 @@ static void shutdownTask() { << " active workers to drain; continuing with shutdown... "; } } - #endif // Shutdown Full-Time Data Capture diff --git a/src/mongo/db/dbdirectclient.cpp b/src/mongo/db/dbdirectclient.cpp index 5b1499da351..60a001f534e 100644 --- a/src/mongo/db/dbdirectclient.cpp +++ b/src/mongo/db/dbdirectclient.cpp @@ -123,10 +123,9 @@ bool DBDirectClient::call(Message& toSend, Message& response, bool assertOk, str DirectClientScope directClientScope(_opCtx); LastError::get(_opCtx->getClient()).startRequest(); - DbResponse dbResponse; CurOp curOp(_opCtx); - assembleResponse(_opCtx, toSend, dbResponse, kHostAndPortForDirectClient); - verify(!dbResponse.response.empty()); + auto dbResponse = assembleResponse(_opCtx, toSend, kHostAndPortForDirectClient); + invariant(!dbResponse.response.empty()); response = std::move(dbResponse.response); return true; @@ -136,9 +135,9 @@ void DBDirectClient::say(Message& toSend, bool isRetry, string* actualServer) { DirectClientScope directClientScope(_opCtx); LastError::get(_opCtx->getClient()).startRequest(); - DbResponse dbResponse; CurOp curOp(_opCtx); - assembleResponse(_opCtx, toSend, dbResponse, kHostAndPortForDirectClient); + auto dbResponse = assembleResponse(_opCtx, toSend, kHostAndPortForDirectClient); + invariant(dbResponse.response.empty()); } unique_ptr<DBClientCursor> DBDirectClient::query(const string& ns, diff --git a/src/mongo/db/dbmessage.cpp b/src/mongo/db/dbmessage.cpp index 6941a260bcc..8dd7def0cb3 100644 --- a/src/mongo/db/dbmessage.cpp +++ b/src/mongo/db/dbmessage.cpp @@ -30,10 +30,9 @@ #include "mongo/platform/basic.h" #include "mongo/db/dbmessage.h" -#include "mongo/db/operation_context.h" + #include "mongo/platform/strnlen.h" #include "mongo/rpc/object_check.h" -#include "mongo/transport/session.h" namespace mongo { @@ -177,28 +176,10 @@ OpQueryReplyBuilder::OpQueryReplyBuilder() : _buffer(32768) { _buffer.skip(sizeof(QueryResult::Value)); } -void OpQueryReplyBuilder::send(const transport::SessionHandle& session, - int queryResultFlags, - const Message& requestMsg, - int nReturned, - int startingFrom, - long long cursorId) { - Message response; - putInMessage(&response, queryResultFlags, nReturned, startingFrom, cursorId); - - response.header().setId(nextMessageId()); - response.header().setResponseToMsgId(requestMsg.header().getId()); - - uassertStatusOK(session->sinkMessage(response).wait()); -} - -void OpQueryReplyBuilder::sendCommandReply(const transport::SessionHandle& session, - const Message& requestMsg) { - send(session, /*queryFlags*/ 0, requestMsg, /*nReturned*/ 1); -} - -void OpQueryReplyBuilder::putInMessage( - Message* out, int queryResultFlags, int nReturned, int startingFrom, long long cursorId) { +Message OpQueryReplyBuilder::toQueryReply(int queryResultFlags, + int nReturned, + int startingFrom, + long long cursorId) { QueryResult::View qr = _buffer.buf(); qr.setResultFlags(queryResultFlags); qr.msgdata().setLen(_buffer.len()); @@ -206,44 +187,17 @@ void OpQueryReplyBuilder::putInMessage( qr.setCursorId(cursorId); qr.setStartingFrom(startingFrom); qr.setNReturned(nReturned); - out->setData(_buffer.release()); // transport will free + return Message(_buffer.release()); } -void replyToQuery(int queryResultFlags, - const transport::SessionHandle& session, - const Message& requestMsg, - const void* data, - int size, - int nReturned, - int startingFrom, - long long cursorId) { +DbResponse replyToQuery(int queryResultFlags, + const void* data, + int size, + int nReturned, + int startingFrom, + long long cursorId) { OpQueryReplyBuilder reply; reply.bufBuilderForResults().appendBuf(data, size); - reply.send(session, queryResultFlags, requestMsg, nReturned, startingFrom, cursorId); -} - -void replyToQuery(int queryResultFlags, - const transport::SessionHandle& session, - const Message& requestMsg, - const BSONObj& responseObj) { - replyToQuery(queryResultFlags, - session, - requestMsg, - (void*)responseObj.objdata(), - responseObj.objsize(), - 1); -} - -void replyToQuery(int queryResultFlags, const Message& m, DbResponse& dbresponse, BSONObj obj) { - Message resp; - replyToQuery(queryResultFlags, resp, obj); - dbresponse.response = std::move(resp); - dbresponse.responseToMsgId = m.header().getId(); -} - -void replyToQuery(int queryResultFlags, Message& response, const BSONObj& resultObj) { - OpQueryReplyBuilder reply; - resultObj.appendSelfToBufBuilder(reply.bufBuilderForResults()); - reply.putInMessage(&response, queryResultFlags, /*nReturned*/ 1); + return DbResponse{reply.toQueryReply(queryResultFlags, nReturned, startingFrom, cursorId)}; } } diff --git a/src/mongo/db/dbmessage.h b/src/mongo/db/dbmessage.h index d6ed3a68684..959dc59edd0 100644 --- a/src/mongo/db/dbmessage.h +++ b/src/mongo/db/dbmessage.h @@ -35,8 +35,6 @@ #include "mongo/client/constants.h" #include "mongo/db/jsobj.h" #include "mongo/db/server_options.h" -#include "mongo/transport/session.h" -#include "mongo/util/net/abstract_message_port.h" #include "mongo/util/net/message.h" namespace mongo { @@ -317,13 +315,12 @@ public: /** * A response to a DbMessage. + * + * Order of fields makes DbResponse{funcReturningMessage()} valid. */ struct DbResponse { - Message response; - int32_t responseToMsgId; - std::string exhaustNS; /* points to ns if exhaust mode. 0=normal mode*/ - DbResponse(Message r, int32_t rtId) : response(std::move(r)), responseToMsgId(rtId) {} - DbResponse() = default; + Message response; // If empty, nothing will be returned to the client. + std::string exhaustNS; // Namespace of cursor if exhaust mode, else "". }; /** @@ -348,58 +345,39 @@ public: } /** - * Finishes the reply and transfers the message buffer into 'out'. + * Finishes the reply and returns the message buffer. */ - void putInMessage(Message* out, - int queryResultFlags, - int nReturned, - int startingFrom = 0, - long long cursorId = 0); + Message toQueryReply(int queryResultFlags, + int nReturned, + int startingFrom = 0, + long long cursorId = 0); /** - * Finishes the reply and sends the message out to 'destination'. + * Similar to toQueryReply() but used for replying to a command. */ - void send(const transport::SessionHandle& session, - int queryResultFlags, - const Message& requestMsg, - int nReturned, - int startingFrom = 0, - long long cursorId = 0); - - /** - * Similar to send() but used for replying to a command. - */ - void sendCommandReply(const transport::SessionHandle& session, const Message& requestMsg); + Message toCommandReply() { + return toQueryReply(0, 1); + } private: BufBuilder _buffer; }; -void replyToQuery(int queryResultFlags, - const transport::SessionHandle& session, - const Message& requestMsg, - const void* data, - int size, - int nReturned, - int startingFrom = 0, - long long cursorId = 0); - -/* object reply helper. */ -void replyToQuery(int queryResultFlags, - const transport::SessionHandle& session, - const Message& requestMsg, - const BSONObj& responseObj); +/** + * Helper to build a DbResponse from a buffer containing an OP_QUERY response. + */ +DbResponse replyToQuery(int queryResultFlags, + const void* data, + int size, + int nReturned, + int startingFrom = 0, + long long cursorId = 0); -/* helper to do a reply using a DbResponse object */ -void replyToQuery(int queryResultFlags, const Message& m, DbResponse& dbresponse, BSONObj obj); /** - * Helper method for setting up a response object. - * - * @param queryResultFlags The flags to set to the response object. - * @param response The object to be used for building the response. The internal buffer of - * this object will contain the raw data from resultObj after a successful call. - * @param resultObj The bson object that contains the reply data. + * Helper to build a DbRespose for OP_QUERY with a single reply object. */ -void replyToQuery(int queryResultFlags, Message& response, const BSONObj& resultObj); +inline DbResponse replyToQuery(const BSONObj& obj, int queryResultFlags = 0) { + return replyToQuery(queryResultFlags, obj.objdata(), obj.objsize(), /*nReturned*/ 1); +} } // namespace mongo diff --git a/src/mongo/db/service_entry_point_mongod.cpp b/src/mongo/db/service_entry_point_mongod.cpp index 376a13b695f..c2abbecbe82 100644 --- a/src/mongo/db/service_entry_point_mongod.cpp +++ b/src/mongo/db/service_entry_point_mongod.cpp @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016 MongoDB Inc. + * Copyright (C) 2017 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, @@ -32,139 +32,14 @@ #include "mongo/db/service_entry_point_mongod.h" -#include <vector> - #include "mongo/db/assemble_response.h" -#include "mongo/db/client.h" -#include "mongo/db/dbmessage.h" -#include "mongo/stdx/thread.h" -#include "mongo/transport/service_entry_point_utils.h" -#include "mongo/transport/session.h" -#include "mongo/transport/ticket.h" -#include "mongo/transport/transport_layer.h" -#include "mongo/util/concurrency/idle_thread_block.h" -#include "mongo/util/exit.h" -#include "mongo/util/log.h" -#include "mongo/util/net/message.h" -#include "mongo/util/net/socket_exception.h" -#include "mongo/util/net/thread_idle_callback.h" -#include "mongo/util/quick_exit.h" -#include "mongo/util/scopeguard.h" namespace mongo { -namespace { - -// Set up proper headers for formatting an exhaust request, if we need to -bool setExhaustMessage(Message* m, const DbResponse& dbresponse) { - MsgData::View header = dbresponse.response.header(); - QueryResult::View qr = header.view2ptr(); - long long cursorid = qr.getCursorId(); - - if (!cursorid) { - return false; - } - - verify(dbresponse.exhaustNS.size() && dbresponse.exhaustNS[0]); - - auto ns = dbresponse.exhaustNS; // reset() will free this - - m->reset(); - - BufBuilder b(512); - b.appendNum(static_cast<int>(0) /* size set later in appendData() */); - b.appendNum(header.getId()); - b.appendNum(header.getResponseToMsgId()); - b.appendNum(static_cast<int>(dbGetMore)); - b.appendNum(static_cast<int>(0)); - b.appendStr(ns); - b.appendNum(static_cast<int>(0)); // ntoreturn - b.appendNum(cursorid); - - MsgData::View(b.buf()).setLen(b.len()); - m->setData(b.release()); - - return true; -} - -} // namespace - -using transport::Session; -using transport::TransportLayer; - -ServiceEntryPointMongod::ServiceEntryPointMongod(TransportLayer* tl) : _tl(tl) {} - -void ServiceEntryPointMongod::startSession(transport::SessionHandle session) { - // Pass ownership of the transport::SessionHandle into our worker thread. When this - // thread exits, the session will end. - launchWrappedServiceEntryWorkerThread( - std::move(session), [this](const transport::SessionHandle& session) { - _nWorkers.fetchAndAdd(1); - auto guard = MakeGuard([&] { _nWorkers.fetchAndSubtract(1); }); - - _sessionLoop(session); - }); -} - -void ServiceEntryPointMongod::_sessionLoop(const transport::SessionHandle& session) { - Message inMessage; - bool inExhaust = false; - int64_t counter = 0; - - while (true) { - // 1. Source a Message from the client (unless we are exhausting) - if (!inExhaust) { - inMessage.reset(); - auto status = [&] { - MONGO_IDLE_THREAD_BLOCK; - return session->sourceMessage(&inMessage).wait(); - }(); - - if (ErrorCodes::isInterruption(status.code()) || - ErrorCodes::isNetworkError(status.code())) { - break; - } - - // Our session may have been closed internally. - if (status == TransportLayer::TicketSessionClosedStatus) { - break; - } - - uassertStatusOK(status); - } - - // 2. Pass sourced Message up to mongod - DbResponse dbresponse; - { - auto opCtx = cc().makeOperationContext(); - assembleResponse(opCtx.get(), inMessage, dbresponse, session->remote()); - - // opCtx must go out of scope here so that the operation cannot show - // up in currentOp results after the response reaches the client - } - - // 3. Format our response, if we have one - Message& toSink = dbresponse.response; - if (!toSink.empty()) { - toSink.header().setId(nextMessageId()); - toSink.header().setResponseToMsgId(inMessage.header().getId()); - - // If this is an exhaust cursor, don't source more Messages - if (dbresponse.exhaustNS.size() > 0 && setExhaustMessage(&inMessage, dbresponse)) { - inExhaust = true; - } else { - inExhaust = false; - } - - // 4. Sink our response to the client - uassertStatusOK(session->sinkMessage(toSink).wait()); - } else { - inExhaust = false; - } - if ((counter++ & 0xf) == 0) { - markThreadIdle(); - } - } +DbResponse ServiceEntryPointMongod::handleRequest(OperationContext* opCtx, + const Message& request, + const HostAndPort& client) { + return assembleResponse(opCtx, request, client); } } // namespace mongo diff --git a/src/mongo/db/service_entry_point_mongod.h b/src/mongo/db/service_entry_point_mongod.h index 4917430f02f..cffc2280ab7 100644 --- a/src/mongo/db/service_entry_point_mongod.h +++ b/src/mongo/db/service_entry_point_mongod.h @@ -28,42 +28,22 @@ #pragma once -#include <vector> - #include "mongo/base/disallow_copying.h" -#include "mongo/platform/atomic_word.h" -#include "mongo/transport/service_entry_point.h" +#include "mongo/transport/service_entry_point_impl.h" namespace mongo { -namespace transport { -class Session; -class TransportLayer; -} // namespace transport - /** - * The entry point from the TransportLayer into Mongod. startSession() spawns and - * detaches a new thread for each incoming connection (transport::Session). + * The entry point into mongod. Just a wrapper around assembleResponse. */ -class ServiceEntryPointMongod final : public ServiceEntryPoint { +class ServiceEntryPointMongod final : public ServiceEntryPointImpl { MONGO_DISALLOW_COPYING(ServiceEntryPointMongod); public: - explicit ServiceEntryPointMongod(transport::TransportLayer* tl); - - virtual ~ServiceEntryPointMongod() = default; - - void startSession(transport::SessionHandle session) override; - - std::size_t getNumberOfActiveWorkerThreads() const { - return _nWorkers.load(); - } - -private: - void _sessionLoop(const transport::SessionHandle& session); - - transport::TransportLayer* _tl; - AtomicWord<std::size_t> _nWorkers; + using ServiceEntryPointImpl::ServiceEntryPointImpl; + DbResponse handleRequest(OperationContext* opCtx, + const Message& request, + const HostAndPort& client) override; }; } // namespace mongo diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp index 66386fd5767..25ac876dfd2 100644 --- a/src/mongo/s/commands/strategy.cpp +++ b/src/mongo/s/commands/strategy.cpp @@ -195,7 +195,7 @@ MONGO_INITIALIZER(InitializeRegisterErrorHandler)(InitializerContext* const) { } // namespace -void Strategy::queryOp(OperationContext* opCtx, const NamespaceString& nss, DbMessage* dbm) { +DbResponse Strategy::queryOp(OperationContext* opCtx, const NamespaceString& nss, DbMessage* dbm) { globalOpCounters.gotQuery(); const QueryMessage q(*dbm); @@ -254,15 +254,7 @@ void Strategy::queryOp(OperationContext* opCtx, const NamespaceString& nss, DbMe opCtx, findCommand, queryRequest, verbosity, metadata, &explainBuilder)); BSONObj explainObj = explainBuilder.done(); - replyToQuery(0, // query result flags - client->session(), - dbm->msg(), - static_cast<const void*>(explainObj.objdata()), - explainObj.objsize(), - 1, // numResults - 0, // startingFrom - CursorId(0)); - return; + return replyToQuery(explainObj); } // Do the work to generate the first batch of results. This blocks waiting to get responses from @@ -293,21 +285,17 @@ void Strategy::queryOp(OperationContext* opCtx, const NamespaceString& nss, DbMe numResults++; } - reply.send(client->session(), - 0, // query result flags - dbm->msg(), - numResults, - 0, // startingFrom - cursorId.getValue()); + return DbResponse{reply.toQueryReply(0, // query result flags + numResults, + 0, // startingFrom + cursorId.getValue())}; } -void Strategy::clientCommandOp(OperationContext* opCtx, - const NamespaceString& nss, - DbMessage* dbm) { +DbResponse Strategy::clientCommandOp(OperationContext* opCtx, + const NamespaceString& nss, + DbMessage* dbm) { const QueryMessage q(*dbm); - Client* const client = opCtx->getClient(); - LOG(3) << "command: " << q.ns << " " << redact(q.query) << " ntoreturn: " << q.ntoreturn << " options: " << q.queryOptions; @@ -336,22 +324,17 @@ void Strategy::clientCommandOp(OperationContext* opCtx, const NamespaceString interposedNss("admin", "$cmd"); BSONObjBuilder reply; runAgainstRegistered(opCtx, interposedNss, interposedCmd, reply, q.queryOptions); - replyToQuery(0, client->session(), dbm->msg(), reply.done()); + return replyToQuery(reply.done()); }; if (nss.coll() == "$cmd.sys.inprog") { - upgradeToRealCommand("currentOp"); - return; + return upgradeToRealCommand("currentOp"); } else if (nss.coll() == "$cmd.sys.killop") { - upgradeToRealCommand("killOp"); - return; + return upgradeToRealCommand("killOp"); } else if (nss.coll() == "$cmd.sys.unlock") { - replyToQuery(0, - client->session(), - dbm->msg(), - BSON("err" - << "can't do unlock through mongos")); - return; + return replyToQuery(BSON("$err" + << "can't do unlock through mongos"), + ResultFlag_ErrSet); } // No pseudo-command found, fall through to execute as a regular query @@ -418,8 +401,7 @@ void Strategy::clientCommandOp(OperationContext* opCtx, BSONObjBuilder builder(reply.bufBuilderForResults()); runAgainstRegistered(opCtx, NamespaceString(q.ns), cmdObj, builder, q.queryOptions); } - reply.sendCommandReply(client->session(), dbm->msg()); - return; + return DbResponse{reply.toCommandReply()}; } catch (const StaleConfigException& e) { if (loops <= 0) throw e; @@ -444,8 +426,7 @@ void Strategy::clientCommandOp(OperationContext* opCtx, BSONObjBuilder builder(reply.bufBuilderForResults()); Command::appendCommandStatus(builder, e.toStatus()); } - reply.sendCommandReply(client->session(), dbm->msg()); - return; + return DbResponse{reply.toCommandReply()}; } } } @@ -479,7 +460,7 @@ void Strategy::commandOp(OperationContext* opCtx, } } -void Strategy::getMore(OperationContext* opCtx, const NamespaceString& nss, DbMessage* dbm) { +DbResponse Strategy::getMore(OperationContext* opCtx, const NamespaceString& nss, DbMessage* dbm) { const int ntoreturn = dbm->pullInt(); uassert( 34424, str::stream() << "Invalid ntoreturn for OP_GET_MORE: " << ntoreturn, ntoreturn >= 0); @@ -487,15 +468,12 @@ void Strategy::getMore(OperationContext* opCtx, const NamespaceString& nss, DbMe globalOpCounters.gotGetMore(); - Client* const client = opCtx->getClient(); - // TODO: Handle stale config exceptions here from coll being dropped or sharded during op for // now has same semantics as legacy request. auto statusGetDb = Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, nss.db()); if (statusGetDb == ErrorCodes::NamespaceNotFound) { - replyToQuery(ResultFlag_CursorNotFound, client->session(), dbm->msg(), 0, 0, 0); - return; + return replyToQuery(ResultFlag_CursorNotFound, nullptr, 0, 0); } uassertStatusOK(statusGetDb); @@ -508,8 +486,7 @@ void Strategy::getMore(OperationContext* opCtx, const NamespaceString& nss, DbMe auto cursorResponse = ClusterFind::runGetMore(opCtx, getMoreRequest); if (cursorResponse == ErrorCodes::CursorNotFound) { - replyToQuery(ResultFlag_CursorNotFound, client->session(), dbm->msg(), 0, 0, 0); - return; + return replyToQuery(ResultFlag_CursorNotFound, nullptr, 0, 0); } uassertStatusOK(cursorResponse.getStatus()); @@ -522,14 +499,12 @@ void Strategy::getMore(OperationContext* opCtx, const NamespaceString& nss, DbMe ++numResults; } - replyToQuery(0, - client->session(), - dbm->msg(), - buffer.buf(), - buffer.len(), - numResults, - cursorResponse.getValue().getNumReturnedSoFar().value_or(0), - cursorResponse.getValue().getCursorId()); + return replyToQuery(0, + buffer.buf(), + buffer.len(), + numResults, + cursorResponse.getValue().getNumReturnedSoFar().value_or(0), + cursorResponse.getValue().getCursorId()); } void Strategy::killCursors(OperationContext* opCtx, DbMessage* dbm) { diff --git a/src/mongo/s/commands/strategy.h b/src/mongo/s/commands/strategy.h index 03a1ab0077b..f91e3d61793 100644 --- a/src/mongo/s/commands/strategy.h +++ b/src/mongo/s/commands/strategy.h @@ -37,6 +37,7 @@ namespace mongo { class DbMessage; +struct DbResponse; class NamespaceString; class OperationContext; class QueryRequest; @@ -56,13 +57,13 @@ public: * * Must not be called with legacy '.$cmd' commands. */ - static void queryOp(OperationContext* opCtx, const NamespaceString& nss, DbMessage* dbm); + static DbResponse queryOp(OperationContext* opCtx, const NamespaceString& nss, DbMessage* dbm); /** * Handles a legacy-style getMore request and sends the response back on success (or cursor not * found) or throws on error. */ - static void getMore(OperationContext* opCtx, const NamespaceString& nss, DbMessage* dbm); + static DbResponse getMore(OperationContext* opCtx, const NamespaceString& nss, DbMessage* dbm); /** * Handles a legacy-style killCursors request. Doesn't send any response on success or throws on @@ -84,9 +85,9 @@ public: * Catches StaleConfigException errors and retries the command automatically after refreshing * the metadata for the failing namespace. */ - static void clientCommandOp(OperationContext* opCtx, - const NamespaceString& nss, - DbMessage* dbm); + static DbResponse clientCommandOp(OperationContext* opCtx, + const NamespaceString& nss, + DbMessage* dbm); /** * Helper to run an explain of a find operation on the shards. Fills 'out' with the result of diff --git a/src/mongo/s/service_entry_point_mongos.cpp b/src/mongo/s/service_entry_point_mongos.cpp index 240c1392c33..118296a2690 100644 --- a/src/mongo/s/service_entry_point_mongos.cpp +++ b/src/mongo/s/service_entry_point_mongos.cpp @@ -40,19 +40,12 @@ #include "mongo/s/client/shard_connection.h" #include "mongo/s/cluster_last_error_info.h" #include "mongo/s/commands/strategy.h" -#include "mongo/transport/service_entry_point_utils.h" -#include "mongo/transport/session.h" -#include "mongo/transport/transport_layer.h" -#include "mongo/util/concurrency/idle_thread_block.h" #include "mongo/util/log.h" #include "mongo/util/net/message.h" -#include "mongo/util/net/thread_idle_callback.h" #include "mongo/util/scopeguard.h" namespace mongo { -using transport::TransportLayer; - namespace { BSONObj buildErrReply(const DBException& ex) { @@ -67,126 +60,89 @@ BSONObj buildErrReply(const DBException& ex) { } // namespace -ServiceEntryPointMongos::ServiceEntryPointMongos(TransportLayer* tl) : _tl(tl) {} -void ServiceEntryPointMongos::startSession(transport::SessionHandle session) { - launchWrappedServiceEntryWorkerThread( - std::move(session), - [this](const transport::SessionHandle& session) { _sessionLoop(session); }); -} +DbResponse ServiceEntryPointMongos::handleRequest(OperationContext* opCtx, + const Message& message, + const HostAndPort& remote) { + // Release any cached egress connections for client back to pool before destroying + auto guard = MakeGuard(ShardConnection::releaseMyConnections); -void ServiceEntryPointMongos::_sessionLoop(const transport::SessionHandle& session) { - int64_t counter = 0; + const int32_t msgId = message.header().getId(); + const NetworkOp op = message.operation(); - while (true) { - // Release any cached egress connections for client back to pool before destroying - auto guard = MakeGuard(ShardConnection::releaseMyConnections); + // This exception will not be returned to the caller, but will be logged and will close the + // connection + uassert(ErrorCodes::IllegalOperation, + str::stream() << "Message type " << op << " is not supported.", + isSupportedNetworkOp(op)); - Message message; + // Start a new LastError session. Any exceptions thrown from here onwards will be returned + // to the caller (if the type of the message permits it). + auto client = opCtx->getClient(); + if (!ClusterLastErrorInfo::get(client)) { + ClusterLastErrorInfo::get(client) = std::make_shared<ClusterLastErrorInfo>(); + } + ClusterLastErrorInfo::get(client)->newRequest(); + LastError::get(client).startRequest(); - // Source a Message from the client - { - auto status = [&] { - MONGO_IDLE_THREAD_BLOCK; - return session->sourceMessage(&message).wait(); - }(); + DbMessage dbm(message); - if (ErrorCodes::isInterruption(status.code()) || - ErrorCodes::isNetworkError(status.code())) { - break; - } + NamespaceString nss; + DbResponse dbResponse; + try { + if (dbm.messageShouldHaveNs()) { + nss = NamespaceString(StringData(dbm.getns())); - // Our session may have been closed internally. - if (status == TransportLayer::TicketSessionClosedStatus) { - break; - } + uassert(ErrorCodes::InvalidNamespace, + str::stream() << "Invalid ns [" << nss.ns() << "]", + nss.isValid()); - uassertStatusOK(status); + uassert(ErrorCodes::IllegalOperation, + "Can't use 'local' database through mongos", + nss.db() != NamespaceString::kLocalDb); } - auto opCtx = cc().makeOperationContext(); + AuthorizationSession::get(opCtx->getClient())->startRequest(opCtx); - const int32_t msgId = message.header().getId(); + LOG(3) << "Request::process begin ns: " << nss << " msg id: " << msgId + << " op: " << networkOpToString(op); - const NetworkOp op = message.operation(); + switch (op) { + case dbQuery: + if (nss.isCommand() || nss.isSpecialCommand()) { + dbResponse = Strategy::clientCommandOp(opCtx, nss, &dbm); + } else { + dbResponse = Strategy::queryOp(opCtx, nss, &dbm); + } + break; + case dbGetMore: + dbResponse = Strategy::getMore(opCtx, nss, &dbm); + break; + case dbKillCursors: + Strategy::killCursors(opCtx, &dbm); // No Response. + break; + default: + Strategy::writeOp(opCtx, &dbm); // No Response. + break; + } - // This exception will not be returned to the caller, but will be logged and will close the - // connection - uassert(ErrorCodes::IllegalOperation, - str::stream() << "Message type " << op << " is not supported.", - isSupportedNetworkOp(op)); + LOG(3) << "Request::process end ns: " << nss << " msg id: " << msgId + << " op: " << networkOpToString(op); - // Start a new LastError session. Any exceptions thrown from here onwards will be returned - // to the caller (if the type of the message permits it). - auto client = opCtx->getClient(); - if (!ClusterLastErrorInfo::get(client)) { - ClusterLastErrorInfo::get(client) = std::make_shared<ClusterLastErrorInfo>(); - } - ClusterLastErrorInfo::get(client)->newRequest(); - LastError::get(client).startRequest(); - - DbMessage dbm(message); - - NamespaceString nss; - - try { - - if (dbm.messageShouldHaveNs()) { - nss = NamespaceString(StringData(dbm.getns())); - - uassert(ErrorCodes::InvalidNamespace, - str::stream() << "Invalid ns [" << nss.ns() << "]", - nss.isValid()); - - uassert(ErrorCodes::IllegalOperation, - "Can't use 'local' database through mongos", - nss.db() != NamespaceString::kLocalDb); - } - - AuthorizationSession::get(opCtx->getClient())->startRequest(opCtx.get()); - - LOG(3) << "Request::process begin ns: " << nss << " msg id: " << msgId - << " op: " << networkOpToString(op); - - switch (op) { - case dbQuery: - if (nss.isCommand() || nss.isSpecialCommand()) { - Strategy::clientCommandOp(opCtx.get(), nss, &dbm); - } else { - Strategy::queryOp(opCtx.get(), nss, &dbm); - } - break; - case dbGetMore: - Strategy::getMore(opCtx.get(), nss, &dbm); - break; - case dbKillCursors: - Strategy::killCursors(opCtx.get(), &dbm); - break; - default: - Strategy::writeOp(opCtx.get(), &dbm); - break; - } - - LOG(3) << "Request::process end ns: " << nss << " msg id: " << msgId - << " op: " << networkOpToString(op); - - } catch (const DBException& ex) { - LOG(1) << "Exception thrown" - << " while processing " << networkOpToString(op) << " op" - << " for " << nss.ns() << causedBy(ex); - - if (op == dbQuery || op == dbGetMore) { - replyToQuery(ResultFlag_ErrSet, session, message, buildErrReply(ex)); - } - - // We *always* populate the last error for now - LastError::get(opCtx->getClient()).setLastError(ex.getCode(), ex.what()); - } + } catch (const DBException& ex) { + LOG(1) << "Exception thrown while processing " << networkOpToString(op) << " op for " + << nss.ns() << causedBy(ex); - if ((counter++ & 0xf) == 0) { - markThreadIdle(); + if (op == dbQuery || op == dbGetMore) { + dbResponse = replyToQuery(buildErrReply(ex), ResultFlag_ErrSet); + } else { + // No Response. } + + // We *always* populate the last error for now + LastError::get(opCtx->getClient()).setLastError(ex.getCode(), ex.what()); } + return dbResponse; } } // namespace mongo diff --git a/src/mongo/s/service_entry_point_mongos.h b/src/mongo/s/service_entry_point_mongos.h index acf42d89fc9..d6440f69264 100644 --- a/src/mongo/s/service_entry_point_mongos.h +++ b/src/mongo/s/service_entry_point_mongos.h @@ -31,33 +31,21 @@ #include <vector> #include "mongo/base/disallow_copying.h" -#include "mongo/transport/service_entry_point.h" +#include "mongo/transport/service_entry_point_impl.h" namespace mongo { -namespace transport { -class Session; -class TransportLayer; -} // namespace transport - /** - * The entry point from the TransportLayer into Mongos. startSession() spawns and - * detaches a new thread for each incoming connection (transport::Session). + * The entry point from the TransportLayer into Mongos. */ -class ServiceEntryPointMongos final : public ServiceEntryPoint { +class ServiceEntryPointMongos final : public ServiceEntryPointImpl { MONGO_DISALLOW_COPYING(ServiceEntryPointMongos); public: - ServiceEntryPointMongos(transport::TransportLayer* tl); - - virtual ~ServiceEntryPointMongos() = default; - - void startSession(transport::SessionHandle session) override; - -private: - void _sessionLoop(const transport::SessionHandle& session); - - transport::TransportLayer* _tl; + using ServiceEntryPointImpl::ServiceEntryPointImpl; + DbResponse handleRequest(OperationContext* opCtx, + const Message& request, + const HostAndPort& client) override; }; } // namespace mongo diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript index 75b589c5ec5..b365613660d 100644 --- a/src/mongo/transport/SConscript +++ b/src/mongo/transport/SConscript @@ -74,6 +74,7 @@ env.Library( target='service_entry_point_utils', source=[ 'service_entry_point_utils.cpp', + 'service_entry_point_impl.cpp', ], LIBDEPS=[ "$BUILD_DIR/mongo/db/service_context", diff --git a/src/mongo/transport/service_entry_point.h b/src/mongo/transport/service_entry_point.h index e0a1ee7a907..f9f82ed3273 100644 --- a/src/mongo/transport/service_entry_point.h +++ b/src/mongo/transport/service_entry_point.h @@ -29,6 +29,7 @@ #pragma once #include "mongo/base/disallow_copying.h" +#include "mongo/db/dbmessage.h" #include "mongo/transport/session.h" namespace mongo { @@ -51,6 +52,13 @@ public: */ virtual void startSession(transport::SessionHandle session) = 0; + /** + * Processes a request and fills out a DbResponse. + */ + virtual DbResponse handleRequest(OperationContext* opCtx, + const Message& request, + const HostAndPort& client) = 0; + protected: ServiceEntryPoint() = default; }; diff --git a/src/mongo/transport/service_entry_point_impl.cpp b/src/mongo/transport/service_entry_point_impl.cpp new file mode 100644 index 00000000000..e6db7f875cd --- /dev/null +++ b/src/mongo/transport/service_entry_point_impl.cpp @@ -0,0 +1,169 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork + +#include "mongo/platform/basic.h" + +#include "mongo/transport/service_entry_point_impl.h" + +#include <vector> + +#include "mongo/db/assemble_response.h" +#include "mongo/db/client.h" +#include "mongo/db/dbmessage.h" +#include "mongo/stdx/thread.h" +#include "mongo/transport/service_entry_point_utils.h" +#include "mongo/transport/session.h" +#include "mongo/transport/ticket.h" +#include "mongo/transport/transport_layer.h" +#include "mongo/util/concurrency/idle_thread_block.h" +#include "mongo/util/exit.h" +#include "mongo/util/log.h" +#include "mongo/util/net/message.h" +#include "mongo/util/net/socket_exception.h" +#include "mongo/util/net/thread_idle_callback.h" +#include "mongo/util/quick_exit.h" +#include "mongo/util/scopeguard.h" + +namespace mongo { +namespace { + +// Set up proper headers for formatting an exhaust request, if we need to +bool setExhaustMessage(Message* m, const DbResponse& dbresponse) { + MsgData::View header = dbresponse.response.header(); + QueryResult::View qr = header.view2ptr(); + long long cursorid = qr.getCursorId(); + + if (!cursorid) { + return false; + } + + verify(dbresponse.exhaustNS.size() && dbresponse.exhaustNS[0]); + + auto ns = dbresponse.exhaustNS; // reset() will free this + + m->reset(); + + BufBuilder b(512); + b.appendNum(static_cast<int>(0) /* size set later in appendData() */); + b.appendNum(header.getId()); + b.appendNum(header.getResponseToMsgId()); + b.appendNum(static_cast<int>(dbGetMore)); + b.appendNum(static_cast<int>(0)); + b.appendStr(ns); + b.appendNum(static_cast<int>(0)); // ntoreturn + b.appendNum(cursorid); + + MsgData::View(b.buf()).setLen(b.len()); + m->setData(b.release()); + + return true; +} + +} // namespace + +using transport::Session; +using transport::TransportLayer; + +void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { + // Pass ownership of the transport::SessionHandle into our worker thread. When this + // thread exits, the session will end. + launchWrappedServiceEntryWorkerThread( + std::move(session), [this](const transport::SessionHandle& session) { + _nWorkers.fetchAndAdd(1); + auto guard = MakeGuard([&] { _nWorkers.fetchAndSubtract(1); }); + + _sessionLoop(session); + }); +} + +void ServiceEntryPointImpl::_sessionLoop(const transport::SessionHandle& session) { + Message inMessage; + bool inExhaust = false; + int64_t counter = 0; + + while (true) { + // 1. Source a Message from the client (unless we are exhausting) + if (!inExhaust) { + inMessage.reset(); + auto status = [&] { + MONGO_IDLE_THREAD_BLOCK; + return session->sourceMessage(&inMessage).wait(); + }(); + + if (ErrorCodes::isInterruption(status.code()) || + ErrorCodes::isNetworkError(status.code())) { + break; + } + + // Our session may have been closed internally. + if (status == TransportLayer::TicketSessionClosedStatus) { + break; + } + + uassertStatusOK(status); + } + + // 2. Pass sourced Message to handler to generate response. + auto opCtx = cc().makeOperationContext(); + + // The handleRequest is implemented in a subclass for mongod/mongos and actually all the + // database work for this request. + DbResponse dbresponse = this->handleRequest(opCtx.get(), inMessage, session->remote()); + + // opCtx must be destroyed here so that the operation cannot show + // up in currentOp results after the response reaches the client + opCtx.reset(); + + // 3. Format our response, if we have one + Message& toSink = dbresponse.response; + if (!toSink.empty()) { + toSink.header().setId(nextMessageId()); + toSink.header().setResponseToMsgId(inMessage.header().getId()); + + // If this is an exhaust cursor, don't source more Messages + if (dbresponse.exhaustNS.size() > 0 && setExhaustMessage(&inMessage, dbresponse)) { + inExhaust = true; + } else { + inExhaust = false; + } + + // 4. Sink our response to the client + uassertStatusOK(session->sinkMessage(toSink).wait()); + } else { + inExhaust = false; + } + + if ((counter++ & 0xf) == 0) { + markThreadIdle(); + } + } +} + +} // namespace mongo diff --git a/src/mongo/transport/service_entry_point_impl.h b/src/mongo/transport/service_entry_point_impl.h new file mode 100644 index 00000000000..aeb5ce5016e --- /dev/null +++ b/src/mongo/transport/service_entry_point_impl.h @@ -0,0 +1,73 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <vector> + +#include "mongo/base/disallow_copying.h" +#include "mongo/platform/atomic_word.h" +#include "mongo/transport/service_entry_point.h" + +namespace mongo { + +struct DbResponse; +class OperationContext; + +namespace transport { +class Session; +class TransportLayer; +} // namespace transport + +/** + * A basic entry point from the TransportLayer into a server. + * + * The server logic is implemented inside of handleRequest() by a subclass. + * startSession() spawns and detaches a new thread for each incoming connection + * (transport::Session). + */ +class ServiceEntryPointImpl : public ServiceEntryPoint { + MONGO_DISALLOW_COPYING(ServiceEntryPointImpl); + +public: + explicit ServiceEntryPointImpl(transport::TransportLayer* tl) : _tl(tl) {} + + void startSession(transport::SessionHandle session) final; + + std::size_t getNumberOfActiveWorkerThreads() const { + return _nWorkers.load(); + } + +private: + void _sessionLoop(const transport::SessionHandle& session); + + transport::TransportLayer* _tl; + AtomicWord<std::size_t> _nWorkers; +}; + +} // namespace mongo diff --git a/src/mongo/transport/service_entry_point_mock.cpp b/src/mongo/transport/service_entry_point_mock.cpp index 667b1d5ae5e..243ebfbec94 100644 --- a/src/mongo/transport/service_entry_point_mock.cpp +++ b/src/mongo/transport/service_entry_point_mock.cpp @@ -42,38 +42,8 @@ namespace mongo { using namespace transport; -namespace { -void setOkResponse(Message* m) { - // Need to set up our { ok : 1 } response. - BufBuilder b{}; - - // Leave room for the message header - b.skip(mongo::MsgData::MsgDataHeaderSize); - - // Add our response - auto okObj = BSON("ok" << 1.0); - okObj.appendSelfToBufBuilder(b); - - // Add some metadata - auto metadata = BSONObj(); - metadata.appendSelfToBufBuilder(b); - - // Set Message header fields - MsgData::View msg = b.buf(); - msg.setLen(b.len()); - msg.setOperation(dbCommandReply); - - // Set the message, transfer buffer ownership to Message - m->reset(); - m->setData(b.release()); -} - -} // namespace - ServiceEntryPointMock::ServiceEntryPointMock(transport::TransportLayer* tl) - : _tl(tl), _outMessage(), _inShutdown(false) { - setOkResponse(&_outMessage); -} + : _tl(tl), _inShutdown(false) {} ServiceEntryPointMock::~ServiceEntryPointMock() { { @@ -104,11 +74,38 @@ void ServiceEntryPointMock::run(transport::SessionHandle session) { break; } + auto resp = handleRequest(nullptr, inMessage, session->remote()); + // sinkMessage() - if (!session->sinkMessage(_outMessage).wait().isOK()) { + if (!session->sinkMessage(resp.response).wait().isOK()) { break; } } } +DbResponse ServiceEntryPointMock::handleRequest(OperationContext* opCtx, + const Message& request, + const HostAndPort& client) { + // Need to set up our { ok : 1 } response. + BufBuilder b{}; + + // Leave room for the message header + b.skip(mongo::MsgData::MsgDataHeaderSize); + + // Add our response + auto okObj = BSON("ok" << 1.0); + okObj.appendSelfToBufBuilder(b); + + // Add some metadata + auto metadata = BSONObj(); + metadata.appendSelfToBufBuilder(b); + + // Set Message header fields + MsgData::View msg = b.buf(); + msg.setLen(b.len()); + msg.setOperation(dbCommandReply); + + return {Message(b.release()), ""}; +} + } // namespace mongo diff --git a/src/mongo/transport/service_entry_point_mock.h b/src/mongo/transport/service_entry_point_mock.h index eadc9367fa8..94f011d9f3e 100644 --- a/src/mongo/transport/service_entry_point_mock.h +++ b/src/mongo/transport/service_entry_point_mock.h @@ -65,13 +65,15 @@ public: */ void startSession(transport::SessionHandle session) override; + DbResponse handleRequest(OperationContext* opCtx, + const Message& request, + const HostAndPort& client) override; + private: void run(transport::SessionHandle session); transport::TransportLayer* _tl; - Message _outMessage; - stdx::mutex _shutdownLock; bool _inShutdown; diff --git a/src/mongo/transport/transport_layer_legacy_test.cpp b/src/mongo/transport/transport_layer_legacy_test.cpp index b61ed7559d7..286696b4d6c 100644 --- a/src/mongo/transport/transport_layer_legacy_test.cpp +++ b/src/mongo/transport/transport_layer_legacy_test.cpp @@ -38,7 +38,7 @@ namespace { class ServiceEntryPointUtil : public ServiceEntryPoint { public: - void startSession(transport::SessionHandle session) { + void startSession(transport::SessionHandle session) override { Message m; Status s = session->sourceMessage(&m).wait(); @@ -47,6 +47,12 @@ public: tll->end(session); } + DbResponse handleRequest(OperationContext* opCtx, + const Message& request, + const HostAndPort& client) override { + MONGO_UNREACHABLE; + } + transport::TransportLayerLegacy* tll = nullptr; }; |