summaryrefslogtreecommitdiff
path: root/src/mongo/transport
diff options
context:
space:
mode:
authorAndrew Chen <a.chen@mongodb.com>2020-07-01 15:48:23 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-07-30 14:48:04 +0000
commit051bd6e51b33d7fd8a1265e166efffd20178a86d (patch)
tree6a2b92974c8e1cd148e0f085d0091ea0942e21f5 /src/mongo/transport
parent41fda13f7239308239ade7f143f9b5e2fef0e87b (diff)
downloadmongo-051bd6e51b33d7fd8a1265e166efffd20178a86d.tar.gz
SERVER-48978 Futurize handleRequest
Diffstat (limited to 'src/mongo/transport')
-rw-r--r--src/mongo/transport/service_entry_point.h4
-rw-r--r--src/mongo/transport/service_state_machine.cpp113
-rw-r--r--src/mongo/transport/service_state_machine_test.cpp8
-rw-r--r--src/mongo/transport/transport_layer_asio_test.cpp6
4 files changed, 73 insertions, 58 deletions
diff --git a/src/mongo/transport/service_entry_point.h b/src/mongo/transport/service_entry_point.h
index 2c3ded7849a..aa970f1ef67 100644
--- a/src/mongo/transport/service_entry_point.h
+++ b/src/mongo/transport/service_entry_point.h
@@ -32,6 +32,7 @@
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/dbmessage.h"
#include "mongo/transport/session.h"
+#include "mongo/util/future.h"
namespace mongo {
@@ -82,7 +83,8 @@ public:
/**
* Processes a request and fills out a DbResponse.
*/
- virtual DbResponse handleRequest(OperationContext* opCtx, const Message& request) = 0;
+ virtual Future<DbResponse> handleRequest(OperationContext* opCtx,
+ const Message& request) noexcept = 0;
protected:
ServiceEntryPoint() = default;
diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp
index 5fbff04a151..373a362d0df 100644
--- a/src/mongo/transport/service_state_machine.cpp
+++ b/src/mongo/transport/service_state_machine.cpp
@@ -467,61 +467,68 @@ void ServiceStateMachine::_processMessage(ThreadGuard guard) {
// The handleRequest is implemented in a subclass for mongod/mongos and actually all the
// database work for this request.
- DbResponse dbresponse = _sep->handleRequest(opCtx.get(), _inMessage);
-
- // opCtx must be killed and delisted here so that the operation cannot show up in currentOp
- // results after the response reaches the client. The destruction is postponed for later to
- // mitigate its performance impact on the critical path of execution.
- _serviceContext->killAndDelistOperation(opCtx.get(), ErrorCodes::OperationIsKilledAndDelisted);
- invariant(!_killedOpCtx);
- _killedOpCtx = std::move(opCtx);
-
- // Format our response, if we have one
- Message& toSink = dbresponse.response;
- if (!toSink.empty()) {
- invariant(!OpMsg::isFlagSet(_inMessage, OpMsg::kMoreToCome));
- invariant(!OpMsg::isFlagSet(toSink, OpMsg::kChecksumPresent));
-
- // Update the header for the response message.
- toSink.header().setId(nextMessageId());
- toSink.header().setResponseToMsgId(_inMessage.header().getId());
- if (OpMsg::isFlagSet(_inMessage, OpMsg::kChecksumPresent)) {
+ _sep->handleRequest(opCtx.get(), _inMessage)
+ .then([this,
+ &compressorMgr = compressorMgr,
+ opCtx = std::move(opCtx),
+ guard = std::move(guard)](DbResponse dbresponse) mutable -> void {
+ // opCtx must be killed and delisted here so that the operation cannot show up in
+ // currentOp results after the response reaches the client. The destruction is postponed
+ // for later to mitigate its performance impact on the critical path of execution.
+ _serviceContext->killAndDelistOperation(opCtx.get(),
+ ErrorCodes::OperationIsKilledAndDelisted);
+ invariant(!_killedOpCtx);
+ _killedOpCtx = std::move(opCtx);
+
+ // Format our response, if we have one
+ Message& toSink = dbresponse.response;
+ if (!toSink.empty()) {
+ invariant(!OpMsg::isFlagSet(_inMessage, OpMsg::kMoreToCome));
+ invariant(!OpMsg::isFlagSet(toSink, OpMsg::kChecksumPresent));
+
+ // Update the header for the response message.
+ toSink.header().setId(nextMessageId());
+ toSink.header().setResponseToMsgId(_inMessage.header().getId());
+ if (OpMsg::isFlagSet(_inMessage, OpMsg::kChecksumPresent)) {
#ifdef MONGO_CONFIG_SSL
- if (!SSLPeerInfo::forSession(_session()).isTLS) {
- OpMsg::appendChecksum(&toSink);
- }
+ if (!SSLPeerInfo::forSession(_session()).isTLS) {
+ OpMsg::appendChecksum(&toSink);
+ }
#else
- OpMsg::appendChecksum(&toSink);
+ OpMsg::appendChecksum(&toSink);
#endif
- }
-
- // If the incoming message has the exhaust flag set, then we bypass the normal RPC behavior.
- // We will sink the response to the network, but we also synthesize a new request, as if we
- // sourced a new message from the network. This new request is sent to the database once
- // again to be processed. This cycle repeats as long as the command indicates the exhaust
- // stream should continue.
- _inMessage = makeExhaustMessage(_inMessage, &dbresponse);
- _inExhaust = !_inMessage.empty();
-
- networkCounter.hitLogicalOut(toSink.size());
-
- if (_compressorId) {
- auto swm = compressorMgr.compressMessage(toSink, &_compressorId.value());
- uassertStatusOK(swm.getStatus());
- toSink = swm.getValue();
- }
-
- TrafficRecorder::get(_serviceContext)
- .observe(_sessionHandle, _serviceContext->getPreciseClockSource()->now(), toSink);
-
- _sinkMessage(std::move(guard), std::move(toSink));
-
- } else {
- _state.store(State::Source);
- _inMessage.reset();
- _inExhaust = false;
- return _scheduleNextWithGuard(std::move(guard), ServiceExecutor::kDeferredTask);
- }
+ }
+
+ // If the incoming message has the exhaust flag set, then we bypass the normal RPC
+ // behavior. We will sink the response to the network, but we also synthesize a new
+ // request, as if we sourced a new message from the network. This new request is
+ // sent to the database once again to be processed. This cycle repeats as long as
+ // the command indicates the exhaust stream should continue.
+ _inMessage = makeExhaustMessage(_inMessage, &dbresponse);
+ _inExhaust = !_inMessage.empty();
+
+ networkCounter.hitLogicalOut(toSink.size());
+
+ if (_compressorId) {
+ auto swm = compressorMgr.compressMessage(toSink, &_compressorId.value());
+ uassertStatusOK(swm.getStatus());
+ toSink = swm.getValue();
+ }
+
+ TrafficRecorder::get(_serviceContext)
+ .observe(
+ _sessionHandle, _serviceContext->getPreciseClockSource()->now(), toSink);
+
+ _sinkMessage(std::move(guard), std::move(toSink));
+
+ } else {
+ _state.store(State::Source);
+ _inMessage.reset();
+ _inExhaust = false;
+ return _scheduleNextWithGuard(std::move(guard), ServiceExecutor::kDeferredTask);
+ }
+ })
+ .get();
}
void ServiceStateMachine::runNext() {
@@ -668,7 +675,7 @@ void ServiceStateMachine::_cleanupExhaustResources() noexcept try {
// Fire and forget. This is a best effort attempt to immediately clean up the exhaust
// cursor. If the killCursors request fails here for any reasons, it will still be
// cleaned up once the cursor times out.
- _sep->handleRequest(opCtx.get(), makeKillCursorsMessage(cursorId));
+ _sep->handleRequest(opCtx.get(), makeKillCursorsMessage(cursorId)).get();
}
} catch (const DBException& e) {
LOGV2(22992,
diff --git a/src/mongo/transport/service_state_machine_test.cpp b/src/mongo/transport/service_state_machine_test.cpp
index b5aceac7ea5..02669106853 100644
--- a/src/mongo/transport/service_state_machine_test.cpp
+++ b/src/mongo/transport/service_state_machine_test.cpp
@@ -72,7 +72,8 @@ public:
void startSession(transport::SessionHandle session) override {}
- DbResponse handleRequest(OperationContext* opCtx, const Message& request) override {
+ Future<DbResponse> handleRequest(OperationContext* opCtx,
+ const Message& request) noexcept override try {
LOGV2(22994, "In handleRequest");
_ranHandler = true;
ASSERT_TRUE(haveClient());
@@ -98,7 +99,10 @@ public:
}
dbResponse.response = res;
- return dbResponse;
+ return Future<DbResponse>::makeReady(std::move(dbResponse));
+ } catch (const DBException& e) {
+ LOGV2_ERROR(4879805, "Failed to handle request", "error"_attr = redact(e));
+ return e.toStatus();
}
void endAllSessions(transport::Session::TagMask tags) override {}
diff --git a/src/mongo/transport/transport_layer_asio_test.cpp b/src/mongo/transport/transport_layer_asio_test.cpp
index 660f14852f3..a7938e031ba 100644
--- a/src/mongo/transport/transport_layer_asio_test.cpp
+++ b/src/mongo/transport/transport_layer_asio_test.cpp
@@ -79,7 +79,8 @@ public:
return _sessions.size();
}
- DbResponse handleRequest(OperationContext* opCtx, const Message& request) override {
+ Future<DbResponse> handleRequest(OperationContext* opCtx,
+ const Message& request) noexcept override {
MONGO_UNREACHABLE;
}
@@ -191,7 +192,8 @@ public:
return 0;
}
- DbResponse handleRequest(OperationContext* opCtx, const Message& request) override {
+ Future<DbResponse> handleRequest(OperationContext* opCtx,
+ const Message& request) noexcept override {
MONGO_UNREACHABLE;
}