summaryrefslogtreecommitdiff
path: root/src/mongo/transport
diff options
context:
space:
mode:
authorJonathan Reams <jbreams@mongodb.com>2017-04-17 12:09:33 -0400
committerJonathan Reams <jbreams@mongodb.com>2017-04-21 11:22:53 -0400
commitf929d2cfcfe48f051c32bdded11566b885816932 (patch)
tree2d4e17c2dd4cceaa95dfcb41c773962341e7a5ec /src/mongo/transport
parentc2cb98f46c70772d054ab0885720b666ae318cf0 (diff)
downloadmongo-f929d2cfcfe48f051c32bdded11566b885816932.tar.gz
SERVER-28749 Unify ServiceEntryPointMongod and ServiceEntryPointMongos
Diffstat (limited to 'src/mongo/transport')
-rw-r--r--src/mongo/transport/SConscript1
-rw-r--r--src/mongo/transport/service_entry_point.h8
-rw-r--r--src/mongo/transport/service_entry_point_impl.cpp169
-rw-r--r--src/mongo/transport/service_entry_point_impl.h73
-rw-r--r--src/mongo/transport/service_entry_point_mock.cpp61
-rw-r--r--src/mongo/transport/service_entry_point_mock.h6
-rw-r--r--src/mongo/transport/transport_layer_legacy_test.cpp8
7 files changed, 291 insertions, 35 deletions
diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript
index 75b589c5ec5..b365613660d 100644
--- a/src/mongo/transport/SConscript
+++ b/src/mongo/transport/SConscript
@@ -74,6 +74,7 @@ env.Library(
target='service_entry_point_utils',
source=[
'service_entry_point_utils.cpp',
+ 'service_entry_point_impl.cpp',
],
LIBDEPS=[
"$BUILD_DIR/mongo/db/service_context",
diff --git a/src/mongo/transport/service_entry_point.h b/src/mongo/transport/service_entry_point.h
index e0a1ee7a907..f9f82ed3273 100644
--- a/src/mongo/transport/service_entry_point.h
+++ b/src/mongo/transport/service_entry_point.h
@@ -29,6 +29,7 @@
#pragma once
#include "mongo/base/disallow_copying.h"
+#include "mongo/db/dbmessage.h"
#include "mongo/transport/session.h"
namespace mongo {
@@ -51,6 +52,13 @@ public:
*/
virtual void startSession(transport::SessionHandle session) = 0;
+ /**
+ * Processes a request and fills out a DbResponse.
+ */
+ virtual DbResponse handleRequest(OperationContext* opCtx,
+ const Message& request,
+ const HostAndPort& client) = 0;
+
protected:
ServiceEntryPoint() = default;
};
diff --git a/src/mongo/transport/service_entry_point_impl.cpp b/src/mongo/transport/service_entry_point_impl.cpp
new file mode 100644
index 00000000000..e6db7f875cd
--- /dev/null
+++ b/src/mongo/transport/service_entry_point_impl.cpp
@@ -0,0 +1,169 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/transport/service_entry_point_impl.h"
+
+#include <vector>
+
+#include "mongo/db/assemble_response.h"
+#include "mongo/db/client.h"
+#include "mongo/db/dbmessage.h"
+#include "mongo/stdx/thread.h"
+#include "mongo/transport/service_entry_point_utils.h"
+#include "mongo/transport/session.h"
+#include "mongo/transport/ticket.h"
+#include "mongo/transport/transport_layer.h"
+#include "mongo/util/concurrency/idle_thread_block.h"
+#include "mongo/util/exit.h"
+#include "mongo/util/log.h"
+#include "mongo/util/net/message.h"
+#include "mongo/util/net/socket_exception.h"
+#include "mongo/util/net/thread_idle_callback.h"
+#include "mongo/util/quick_exit.h"
+#include "mongo/util/scopeguard.h"
+
+namespace mongo {
+namespace {
+
+// Set up proper headers for formatting an exhaust request, if we need to
+bool setExhaustMessage(Message* m, const DbResponse& dbresponse) {
+ MsgData::View header = dbresponse.response.header();
+ QueryResult::View qr = header.view2ptr();
+ long long cursorid = qr.getCursorId();
+
+ if (!cursorid) {
+ return false;
+ }
+
+ verify(dbresponse.exhaustNS.size() && dbresponse.exhaustNS[0]);
+
+ auto ns = dbresponse.exhaustNS; // reset() will free this
+
+ m->reset();
+
+ BufBuilder b(512);
+ b.appendNum(static_cast<int>(0) /* size set later in appendData() */);
+ b.appendNum(header.getId());
+ b.appendNum(header.getResponseToMsgId());
+ b.appendNum(static_cast<int>(dbGetMore));
+ b.appendNum(static_cast<int>(0));
+ b.appendStr(ns);
+ b.appendNum(static_cast<int>(0)); // ntoreturn
+ b.appendNum(cursorid);
+
+ MsgData::View(b.buf()).setLen(b.len());
+ m->setData(b.release());
+
+ return true;
+}
+
+} // namespace
+
+using transport::Session;
+using transport::TransportLayer;
+
+void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {
+ // Pass ownership of the transport::SessionHandle into our worker thread. When this
+ // thread exits, the session will end.
+ launchWrappedServiceEntryWorkerThread(
+ std::move(session), [this](const transport::SessionHandle& session) {
+ _nWorkers.fetchAndAdd(1);
+ auto guard = MakeGuard([&] { _nWorkers.fetchAndSubtract(1); });
+
+ _sessionLoop(session);
+ });
+}
+
+void ServiceEntryPointImpl::_sessionLoop(const transport::SessionHandle& session) {
+ Message inMessage;
+ bool inExhaust = false;
+ int64_t counter = 0;
+
+ while (true) {
+ // 1. Source a Message from the client (unless we are exhausting)
+ if (!inExhaust) {
+ inMessage.reset();
+ auto status = [&] {
+ MONGO_IDLE_THREAD_BLOCK;
+ return session->sourceMessage(&inMessage).wait();
+ }();
+
+ if (ErrorCodes::isInterruption(status.code()) ||
+ ErrorCodes::isNetworkError(status.code())) {
+ break;
+ }
+
+ // Our session may have been closed internally.
+ if (status == TransportLayer::TicketSessionClosedStatus) {
+ break;
+ }
+
+ uassertStatusOK(status);
+ }
+
+ // 2. Pass sourced Message to handler to generate response.
+ auto opCtx = cc().makeOperationContext();
+
+ // The handleRequest is implemented in a subclass for mongod/mongos and actually all the
+ // database work for this request.
+ DbResponse dbresponse = this->handleRequest(opCtx.get(), inMessage, session->remote());
+
+ // opCtx must be destroyed here so that the operation cannot show
+ // up in currentOp results after the response reaches the client
+ opCtx.reset();
+
+ // 3. Format our response, if we have one
+ Message& toSink = dbresponse.response;
+ if (!toSink.empty()) {
+ toSink.header().setId(nextMessageId());
+ toSink.header().setResponseToMsgId(inMessage.header().getId());
+
+ // If this is an exhaust cursor, don't source more Messages
+ if (dbresponse.exhaustNS.size() > 0 && setExhaustMessage(&inMessage, dbresponse)) {
+ inExhaust = true;
+ } else {
+ inExhaust = false;
+ }
+
+ // 4. Sink our response to the client
+ uassertStatusOK(session->sinkMessage(toSink).wait());
+ } else {
+ inExhaust = false;
+ }
+
+ if ((counter++ & 0xf) == 0) {
+ markThreadIdle();
+ }
+ }
+}
+
+} // namespace mongo
diff --git a/src/mongo/transport/service_entry_point_impl.h b/src/mongo/transport/service_entry_point_impl.h
new file mode 100644
index 00000000000..aeb5ce5016e
--- /dev/null
+++ b/src/mongo/transport/service_entry_point_impl.h
@@ -0,0 +1,73 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <vector>
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/platform/atomic_word.h"
+#include "mongo/transport/service_entry_point.h"
+
+namespace mongo {
+
+struct DbResponse;
+class OperationContext;
+
+namespace transport {
+class Session;
+class TransportLayer;
+} // namespace transport
+
+/**
+ * A basic entry point from the TransportLayer into a server.
+ *
+ * The server logic is implemented inside of handleRequest() by a subclass.
+ * startSession() spawns and detaches a new thread for each incoming connection
+ * (transport::Session).
+ */
+class ServiceEntryPointImpl : public ServiceEntryPoint {
+ MONGO_DISALLOW_COPYING(ServiceEntryPointImpl);
+
+public:
+ explicit ServiceEntryPointImpl(transport::TransportLayer* tl) : _tl(tl) {}
+
+ void startSession(transport::SessionHandle session) final;
+
+ std::size_t getNumberOfActiveWorkerThreads() const {
+ return _nWorkers.load();
+ }
+
+private:
+ void _sessionLoop(const transport::SessionHandle& session);
+
+ transport::TransportLayer* _tl;
+ AtomicWord<std::size_t> _nWorkers;
+};
+
+} // namespace mongo
diff --git a/src/mongo/transport/service_entry_point_mock.cpp b/src/mongo/transport/service_entry_point_mock.cpp
index 667b1d5ae5e..243ebfbec94 100644
--- a/src/mongo/transport/service_entry_point_mock.cpp
+++ b/src/mongo/transport/service_entry_point_mock.cpp
@@ -42,38 +42,8 @@ namespace mongo {
using namespace transport;
-namespace {
-void setOkResponse(Message* m) {
- // Need to set up our { ok : 1 } response.
- BufBuilder b{};
-
- // Leave room for the message header
- b.skip(mongo::MsgData::MsgDataHeaderSize);
-
- // Add our response
- auto okObj = BSON("ok" << 1.0);
- okObj.appendSelfToBufBuilder(b);
-
- // Add some metadata
- auto metadata = BSONObj();
- metadata.appendSelfToBufBuilder(b);
-
- // Set Message header fields
- MsgData::View msg = b.buf();
- msg.setLen(b.len());
- msg.setOperation(dbCommandReply);
-
- // Set the message, transfer buffer ownership to Message
- m->reset();
- m->setData(b.release());
-}
-
-} // namespace
-
ServiceEntryPointMock::ServiceEntryPointMock(transport::TransportLayer* tl)
- : _tl(tl), _outMessage(), _inShutdown(false) {
- setOkResponse(&_outMessage);
-}
+ : _tl(tl), _inShutdown(false) {}
ServiceEntryPointMock::~ServiceEntryPointMock() {
{
@@ -104,11 +74,38 @@ void ServiceEntryPointMock::run(transport::SessionHandle session) {
break;
}
+ auto resp = handleRequest(nullptr, inMessage, session->remote());
+
// sinkMessage()
- if (!session->sinkMessage(_outMessage).wait().isOK()) {
+ if (!session->sinkMessage(resp.response).wait().isOK()) {
break;
}
}
}
+DbResponse ServiceEntryPointMock::handleRequest(OperationContext* opCtx,
+ const Message& request,
+ const HostAndPort& client) {
+ // Need to set up our { ok : 1 } response.
+ BufBuilder b{};
+
+ // Leave room for the message header
+ b.skip(mongo::MsgData::MsgDataHeaderSize);
+
+ // Add our response
+ auto okObj = BSON("ok" << 1.0);
+ okObj.appendSelfToBufBuilder(b);
+
+ // Add some metadata
+ auto metadata = BSONObj();
+ metadata.appendSelfToBufBuilder(b);
+
+ // Set Message header fields
+ MsgData::View msg = b.buf();
+ msg.setLen(b.len());
+ msg.setOperation(dbCommandReply);
+
+ return {Message(b.release()), ""};
+}
+
} // namespace mongo
diff --git a/src/mongo/transport/service_entry_point_mock.h b/src/mongo/transport/service_entry_point_mock.h
index eadc9367fa8..94f011d9f3e 100644
--- a/src/mongo/transport/service_entry_point_mock.h
+++ b/src/mongo/transport/service_entry_point_mock.h
@@ -65,13 +65,15 @@ public:
*/
void startSession(transport::SessionHandle session) override;
+ DbResponse handleRequest(OperationContext* opCtx,
+ const Message& request,
+ const HostAndPort& client) override;
+
private:
void run(transport::SessionHandle session);
transport::TransportLayer* _tl;
- Message _outMessage;
-
stdx::mutex _shutdownLock;
bool _inShutdown;
diff --git a/src/mongo/transport/transport_layer_legacy_test.cpp b/src/mongo/transport/transport_layer_legacy_test.cpp
index b61ed7559d7..286696b4d6c 100644
--- a/src/mongo/transport/transport_layer_legacy_test.cpp
+++ b/src/mongo/transport/transport_layer_legacy_test.cpp
@@ -38,7 +38,7 @@ namespace {
class ServiceEntryPointUtil : public ServiceEntryPoint {
public:
- void startSession(transport::SessionHandle session) {
+ void startSession(transport::SessionHandle session) override {
Message m;
Status s = session->sourceMessage(&m).wait();
@@ -47,6 +47,12 @@ public:
tll->end(session);
}
+ DbResponse handleRequest(OperationContext* opCtx,
+ const Message& request,
+ const HostAndPort& client) override {
+ MONGO_UNREACHABLE;
+ }
+
transport::TransportLayerLegacy* tll = nullptr;
};