diff options
author | Jonathan Reams <jbreams@mongodb.com> | 2017-04-17 12:09:33 -0400 |
---|---|---|
committer | Jonathan Reams <jbreams@mongodb.com> | 2017-04-21 11:22:53 -0400 |
commit | f929d2cfcfe48f051c32bdded11566b885816932 (patch) | |
tree | 2d4e17c2dd4cceaa95dfcb41c773962341e7a5ec /src/mongo/transport | |
parent | c2cb98f46c70772d054ab0885720b666ae318cf0 (diff) | |
download | mongo-f929d2cfcfe48f051c32bdded11566b885816932.tar.gz |
SERVER-28749 Unify ServiceEntryPointMongod and ServiceEntryPointMongos
Diffstat (limited to 'src/mongo/transport')
-rw-r--r-- | src/mongo/transport/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/transport/service_entry_point.h | 8 | ||||
-rw-r--r-- | src/mongo/transport/service_entry_point_impl.cpp | 169 | ||||
-rw-r--r-- | src/mongo/transport/service_entry_point_impl.h | 73 | ||||
-rw-r--r-- | src/mongo/transport/service_entry_point_mock.cpp | 61 | ||||
-rw-r--r-- | src/mongo/transport/service_entry_point_mock.h | 6 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_legacy_test.cpp | 8 |
7 files changed, 291 insertions, 35 deletions
diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript index 75b589c5ec5..b365613660d 100644 --- a/src/mongo/transport/SConscript +++ b/src/mongo/transport/SConscript @@ -74,6 +74,7 @@ env.Library( target='service_entry_point_utils', source=[ 'service_entry_point_utils.cpp', + 'service_entry_point_impl.cpp', ], LIBDEPS=[ "$BUILD_DIR/mongo/db/service_context", diff --git a/src/mongo/transport/service_entry_point.h b/src/mongo/transport/service_entry_point.h index e0a1ee7a907..f9f82ed3273 100644 --- a/src/mongo/transport/service_entry_point.h +++ b/src/mongo/transport/service_entry_point.h @@ -29,6 +29,7 @@ #pragma once #include "mongo/base/disallow_copying.h" +#include "mongo/db/dbmessage.h" #include "mongo/transport/session.h" namespace mongo { @@ -51,6 +52,13 @@ public: */ virtual void startSession(transport::SessionHandle session) = 0; + /** + * Processes a request and fills out a DbResponse. + */ + virtual DbResponse handleRequest(OperationContext* opCtx, + const Message& request, + const HostAndPort& client) = 0; + protected: ServiceEntryPoint() = default; }; diff --git a/src/mongo/transport/service_entry_point_impl.cpp b/src/mongo/transport/service_entry_point_impl.cpp new file mode 100644 index 00000000000..e6db7f875cd --- /dev/null +++ b/src/mongo/transport/service_entry_point_impl.cpp @@ -0,0 +1,169 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork + +#include "mongo/platform/basic.h" + +#include "mongo/transport/service_entry_point_impl.h" + +#include <vector> + +#include "mongo/db/assemble_response.h" +#include "mongo/db/client.h" +#include "mongo/db/dbmessage.h" +#include "mongo/stdx/thread.h" +#include "mongo/transport/service_entry_point_utils.h" +#include "mongo/transport/session.h" +#include "mongo/transport/ticket.h" +#include "mongo/transport/transport_layer.h" +#include "mongo/util/concurrency/idle_thread_block.h" +#include "mongo/util/exit.h" +#include "mongo/util/log.h" +#include "mongo/util/net/message.h" +#include "mongo/util/net/socket_exception.h" +#include "mongo/util/net/thread_idle_callback.h" +#include "mongo/util/quick_exit.h" +#include "mongo/util/scopeguard.h" + +namespace mongo { +namespace { + +// Set up proper headers for formatting an exhaust request, if we need to +bool setExhaustMessage(Message* m, const DbResponse& dbresponse) { + MsgData::View header = dbresponse.response.header(); + QueryResult::View qr = header.view2ptr(); + long long cursorid = qr.getCursorId(); + + if (!cursorid) { + return false; + } + + verify(dbresponse.exhaustNS.size() && dbresponse.exhaustNS[0]); + + auto ns = dbresponse.exhaustNS; // reset() will free this + + m->reset(); + + BufBuilder b(512); + b.appendNum(static_cast<int>(0) /* size set later in appendData() */); + b.appendNum(header.getId()); + b.appendNum(header.getResponseToMsgId()); + b.appendNum(static_cast<int>(dbGetMore)); + b.appendNum(static_cast<int>(0)); + b.appendStr(ns); + b.appendNum(static_cast<int>(0)); // ntoreturn + b.appendNum(cursorid); + + MsgData::View(b.buf()).setLen(b.len()); + m->setData(b.release()); + + return true; +} + +} // namespace + +using transport::Session; +using transport::TransportLayer; + +void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { + // Pass ownership of the transport::SessionHandle into our worker thread. When this + // thread exits, the session will end. + launchWrappedServiceEntryWorkerThread( + std::move(session), [this](const transport::SessionHandle& session) { + _nWorkers.fetchAndAdd(1); + auto guard = MakeGuard([&] { _nWorkers.fetchAndSubtract(1); }); + + _sessionLoop(session); + }); +} + +void ServiceEntryPointImpl::_sessionLoop(const transport::SessionHandle& session) { + Message inMessage; + bool inExhaust = false; + int64_t counter = 0; + + while (true) { + // 1. Source a Message from the client (unless we are exhausting) + if (!inExhaust) { + inMessage.reset(); + auto status = [&] { + MONGO_IDLE_THREAD_BLOCK; + return session->sourceMessage(&inMessage).wait(); + }(); + + if (ErrorCodes::isInterruption(status.code()) || + ErrorCodes::isNetworkError(status.code())) { + break; + } + + // Our session may have been closed internally. + if (status == TransportLayer::TicketSessionClosedStatus) { + break; + } + + uassertStatusOK(status); + } + + // 2. Pass sourced Message to handler to generate response. + auto opCtx = cc().makeOperationContext(); + + // The handleRequest is implemented in a subclass for mongod/mongos and actually all the + // database work for this request. + DbResponse dbresponse = this->handleRequest(opCtx.get(), inMessage, session->remote()); + + // opCtx must be destroyed here so that the operation cannot show + // up in currentOp results after the response reaches the client + opCtx.reset(); + + // 3. Format our response, if we have one + Message& toSink = dbresponse.response; + if (!toSink.empty()) { + toSink.header().setId(nextMessageId()); + toSink.header().setResponseToMsgId(inMessage.header().getId()); + + // If this is an exhaust cursor, don't source more Messages + if (dbresponse.exhaustNS.size() > 0 && setExhaustMessage(&inMessage, dbresponse)) { + inExhaust = true; + } else { + inExhaust = false; + } + + // 4. Sink our response to the client + uassertStatusOK(session->sinkMessage(toSink).wait()); + } else { + inExhaust = false; + } + + if ((counter++ & 0xf) == 0) { + markThreadIdle(); + } + } +} + +} // namespace mongo diff --git a/src/mongo/transport/service_entry_point_impl.h b/src/mongo/transport/service_entry_point_impl.h new file mode 100644 index 00000000000..aeb5ce5016e --- /dev/null +++ b/src/mongo/transport/service_entry_point_impl.h @@ -0,0 +1,73 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <vector> + +#include "mongo/base/disallow_copying.h" +#include "mongo/platform/atomic_word.h" +#include "mongo/transport/service_entry_point.h" + +namespace mongo { + +struct DbResponse; +class OperationContext; + +namespace transport { +class Session; +class TransportLayer; +} // namespace transport + +/** + * A basic entry point from the TransportLayer into a server. + * + * The server logic is implemented inside of handleRequest() by a subclass. + * startSession() spawns and detaches a new thread for each incoming connection + * (transport::Session). + */ +class ServiceEntryPointImpl : public ServiceEntryPoint { + MONGO_DISALLOW_COPYING(ServiceEntryPointImpl); + +public: + explicit ServiceEntryPointImpl(transport::TransportLayer* tl) : _tl(tl) {} + + void startSession(transport::SessionHandle session) final; + + std::size_t getNumberOfActiveWorkerThreads() const { + return _nWorkers.load(); + } + +private: + void _sessionLoop(const transport::SessionHandle& session); + + transport::TransportLayer* _tl; + AtomicWord<std::size_t> _nWorkers; +}; + +} // namespace mongo diff --git a/src/mongo/transport/service_entry_point_mock.cpp b/src/mongo/transport/service_entry_point_mock.cpp index 667b1d5ae5e..243ebfbec94 100644 --- a/src/mongo/transport/service_entry_point_mock.cpp +++ b/src/mongo/transport/service_entry_point_mock.cpp @@ -42,38 +42,8 @@ namespace mongo { using namespace transport; -namespace { -void setOkResponse(Message* m) { - // Need to set up our { ok : 1 } response. - BufBuilder b{}; - - // Leave room for the message header - b.skip(mongo::MsgData::MsgDataHeaderSize); - - // Add our response - auto okObj = BSON("ok" << 1.0); - okObj.appendSelfToBufBuilder(b); - - // Add some metadata - auto metadata = BSONObj(); - metadata.appendSelfToBufBuilder(b); - - // Set Message header fields - MsgData::View msg = b.buf(); - msg.setLen(b.len()); - msg.setOperation(dbCommandReply); - - // Set the message, transfer buffer ownership to Message - m->reset(); - m->setData(b.release()); -} - -} // namespace - ServiceEntryPointMock::ServiceEntryPointMock(transport::TransportLayer* tl) - : _tl(tl), _outMessage(), _inShutdown(false) { - setOkResponse(&_outMessage); -} + : _tl(tl), _inShutdown(false) {} ServiceEntryPointMock::~ServiceEntryPointMock() { { @@ -104,11 +74,38 @@ void ServiceEntryPointMock::run(transport::SessionHandle session) { break; } + auto resp = handleRequest(nullptr, inMessage, session->remote()); + // sinkMessage() - if (!session->sinkMessage(_outMessage).wait().isOK()) { + if (!session->sinkMessage(resp.response).wait().isOK()) { break; } } } +DbResponse ServiceEntryPointMock::handleRequest(OperationContext* opCtx, + const Message& request, + const HostAndPort& client) { + // Need to set up our { ok : 1 } response. + BufBuilder b{}; + + // Leave room for the message header + b.skip(mongo::MsgData::MsgDataHeaderSize); + + // Add our response + auto okObj = BSON("ok" << 1.0); + okObj.appendSelfToBufBuilder(b); + + // Add some metadata + auto metadata = BSONObj(); + metadata.appendSelfToBufBuilder(b); + + // Set Message header fields + MsgData::View msg = b.buf(); + msg.setLen(b.len()); + msg.setOperation(dbCommandReply); + + return {Message(b.release()), ""}; +} + } // namespace mongo diff --git a/src/mongo/transport/service_entry_point_mock.h b/src/mongo/transport/service_entry_point_mock.h index eadc9367fa8..94f011d9f3e 100644 --- a/src/mongo/transport/service_entry_point_mock.h +++ b/src/mongo/transport/service_entry_point_mock.h @@ -65,13 +65,15 @@ public: */ void startSession(transport::SessionHandle session) override; + DbResponse handleRequest(OperationContext* opCtx, + const Message& request, + const HostAndPort& client) override; + private: void run(transport::SessionHandle session); transport::TransportLayer* _tl; - Message _outMessage; - stdx::mutex _shutdownLock; bool _inShutdown; diff --git a/src/mongo/transport/transport_layer_legacy_test.cpp b/src/mongo/transport/transport_layer_legacy_test.cpp index b61ed7559d7..286696b4d6c 100644 --- a/src/mongo/transport/transport_layer_legacy_test.cpp +++ b/src/mongo/transport/transport_layer_legacy_test.cpp @@ -38,7 +38,7 @@ namespace { class ServiceEntryPointUtil : public ServiceEntryPoint { public: - void startSession(transport::SessionHandle session) { + void startSession(transport::SessionHandle session) override { Message m; Status s = session->sourceMessage(&m).wait(); @@ -47,6 +47,12 @@ public: tll->end(session); } + DbResponse handleRequest(OperationContext* opCtx, + const Message& request, + const HostAndPort& client) override { + MONGO_UNREACHABLE; + } + transport::TransportLayerLegacy* tll = nullptr; }; |