diff options
author | Andrew Chen <a.chen@mongodb.com> | 2020-07-01 15:48:23 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-07-30 14:48:04 +0000 |
commit | 051bd6e51b33d7fd8a1265e166efffd20178a86d (patch) | |
tree | 6a2b92974c8e1cd148e0f085d0091ea0942e21f5 /src/mongo/transport | |
parent | 41fda13f7239308239ade7f143f9b5e2fef0e87b (diff) | |
download | mongo-051bd6e51b33d7fd8a1265e166efffd20178a86d.tar.gz |
SERVER-48978 Futurize handleRequest
Diffstat (limited to 'src/mongo/transport')
-rw-r--r-- | src/mongo/transport/service_entry_point.h | 4 | ||||
-rw-r--r-- | src/mongo/transport/service_state_machine.cpp | 113 | ||||
-rw-r--r-- | src/mongo/transport/service_state_machine_test.cpp | 8 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_asio_test.cpp | 6 |
4 files changed, 73 insertions, 58 deletions
diff --git a/src/mongo/transport/service_entry_point.h b/src/mongo/transport/service_entry_point.h index 2c3ded7849a..aa970f1ef67 100644 --- a/src/mongo/transport/service_entry_point.h +++ b/src/mongo/transport/service_entry_point.h @@ -32,6 +32,7 @@ #include "mongo/bson/bsonobjbuilder.h" #include "mongo/db/dbmessage.h" #include "mongo/transport/session.h" +#include "mongo/util/future.h" namespace mongo { @@ -82,7 +83,8 @@ public: /** * Processes a request and fills out a DbResponse. */ - virtual DbResponse handleRequest(OperationContext* opCtx, const Message& request) = 0; + virtual Future<DbResponse> handleRequest(OperationContext* opCtx, + const Message& request) noexcept = 0; protected: ServiceEntryPoint() = default; diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp index 5fbff04a151..373a362d0df 100644 --- a/src/mongo/transport/service_state_machine.cpp +++ b/src/mongo/transport/service_state_machine.cpp @@ -467,61 +467,68 @@ void ServiceStateMachine::_processMessage(ThreadGuard guard) { // The handleRequest is implemented in a subclass for mongod/mongos and actually all the // database work for this request. - DbResponse dbresponse = _sep->handleRequest(opCtx.get(), _inMessage); - - // opCtx must be killed and delisted here so that the operation cannot show up in currentOp - // results after the response reaches the client. The destruction is postponed for later to - // mitigate its performance impact on the critical path of execution. - _serviceContext->killAndDelistOperation(opCtx.get(), ErrorCodes::OperationIsKilledAndDelisted); - invariant(!_killedOpCtx); - _killedOpCtx = std::move(opCtx); - - // Format our response, if we have one - Message& toSink = dbresponse.response; - if (!toSink.empty()) { - invariant(!OpMsg::isFlagSet(_inMessage, OpMsg::kMoreToCome)); - invariant(!OpMsg::isFlagSet(toSink, OpMsg::kChecksumPresent)); - - // Update the header for the response message. - toSink.header().setId(nextMessageId()); - toSink.header().setResponseToMsgId(_inMessage.header().getId()); - if (OpMsg::isFlagSet(_inMessage, OpMsg::kChecksumPresent)) { + _sep->handleRequest(opCtx.get(), _inMessage) + .then([this, + &compressorMgr = compressorMgr, + opCtx = std::move(opCtx), + guard = std::move(guard)](DbResponse dbresponse) mutable -> void { + // opCtx must be killed and delisted here so that the operation cannot show up in + // currentOp results after the response reaches the client. The destruction is postponed + // for later to mitigate its performance impact on the critical path of execution. + _serviceContext->killAndDelistOperation(opCtx.get(), + ErrorCodes::OperationIsKilledAndDelisted); + invariant(!_killedOpCtx); + _killedOpCtx = std::move(opCtx); + + // Format our response, if we have one + Message& toSink = dbresponse.response; + if (!toSink.empty()) { + invariant(!OpMsg::isFlagSet(_inMessage, OpMsg::kMoreToCome)); + invariant(!OpMsg::isFlagSet(toSink, OpMsg::kChecksumPresent)); + + // Update the header for the response message. + toSink.header().setId(nextMessageId()); + toSink.header().setResponseToMsgId(_inMessage.header().getId()); + if (OpMsg::isFlagSet(_inMessage, OpMsg::kChecksumPresent)) { #ifdef MONGO_CONFIG_SSL - if (!SSLPeerInfo::forSession(_session()).isTLS) { - OpMsg::appendChecksum(&toSink); - } + if (!SSLPeerInfo::forSession(_session()).isTLS) { + OpMsg::appendChecksum(&toSink); + } #else - OpMsg::appendChecksum(&toSink); + OpMsg::appendChecksum(&toSink); #endif - } - - // If the incoming message has the exhaust flag set, then we bypass the normal RPC behavior. - // We will sink the response to the network, but we also synthesize a new request, as if we - // sourced a new message from the network. This new request is sent to the database once - // again to be processed. This cycle repeats as long as the command indicates the exhaust - // stream should continue. - _inMessage = makeExhaustMessage(_inMessage, &dbresponse); - _inExhaust = !_inMessage.empty(); - - networkCounter.hitLogicalOut(toSink.size()); - - if (_compressorId) { - auto swm = compressorMgr.compressMessage(toSink, &_compressorId.value()); - uassertStatusOK(swm.getStatus()); - toSink = swm.getValue(); - } - - TrafficRecorder::get(_serviceContext) - .observe(_sessionHandle, _serviceContext->getPreciseClockSource()->now(), toSink); - - _sinkMessage(std::move(guard), std::move(toSink)); - - } else { - _state.store(State::Source); - _inMessage.reset(); - _inExhaust = false; - return _scheduleNextWithGuard(std::move(guard), ServiceExecutor::kDeferredTask); - } + } + + // If the incoming message has the exhaust flag set, then we bypass the normal RPC + // behavior. We will sink the response to the network, but we also synthesize a new + // request, as if we sourced a new message from the network. This new request is + // sent to the database once again to be processed. This cycle repeats as long as + // the command indicates the exhaust stream should continue. + _inMessage = makeExhaustMessage(_inMessage, &dbresponse); + _inExhaust = !_inMessage.empty(); + + networkCounter.hitLogicalOut(toSink.size()); + + if (_compressorId) { + auto swm = compressorMgr.compressMessage(toSink, &_compressorId.value()); + uassertStatusOK(swm.getStatus()); + toSink = swm.getValue(); + } + + TrafficRecorder::get(_serviceContext) + .observe( + _sessionHandle, _serviceContext->getPreciseClockSource()->now(), toSink); + + _sinkMessage(std::move(guard), std::move(toSink)); + + } else { + _state.store(State::Source); + _inMessage.reset(); + _inExhaust = false; + return _scheduleNextWithGuard(std::move(guard), ServiceExecutor::kDeferredTask); + } + }) + .get(); } void ServiceStateMachine::runNext() { @@ -668,7 +675,7 @@ void ServiceStateMachine::_cleanupExhaustResources() noexcept try { // Fire and forget. This is a best effort attempt to immediately clean up the exhaust // cursor. If the killCursors request fails here for any reasons, it will still be // cleaned up once the cursor times out. - _sep->handleRequest(opCtx.get(), makeKillCursorsMessage(cursorId)); + _sep->handleRequest(opCtx.get(), makeKillCursorsMessage(cursorId)).get(); } } catch (const DBException& e) { LOGV2(22992, diff --git a/src/mongo/transport/service_state_machine_test.cpp b/src/mongo/transport/service_state_machine_test.cpp index b5aceac7ea5..02669106853 100644 --- a/src/mongo/transport/service_state_machine_test.cpp +++ b/src/mongo/transport/service_state_machine_test.cpp @@ -72,7 +72,8 @@ public: void startSession(transport::SessionHandle session) override {} - DbResponse handleRequest(OperationContext* opCtx, const Message& request) override { + Future<DbResponse> handleRequest(OperationContext* opCtx, + const Message& request) noexcept override try { LOGV2(22994, "In handleRequest"); _ranHandler = true; ASSERT_TRUE(haveClient()); @@ -98,7 +99,10 @@ public: } dbResponse.response = res; - return dbResponse; + return Future<DbResponse>::makeReady(std::move(dbResponse)); + } catch (const DBException& e) { + LOGV2_ERROR(4879805, "Failed to handle request", "error"_attr = redact(e)); + return e.toStatus(); } void endAllSessions(transport::Session::TagMask tags) override {} diff --git a/src/mongo/transport/transport_layer_asio_test.cpp b/src/mongo/transport/transport_layer_asio_test.cpp index 660f14852f3..a7938e031ba 100644 --- a/src/mongo/transport/transport_layer_asio_test.cpp +++ b/src/mongo/transport/transport_layer_asio_test.cpp @@ -79,7 +79,8 @@ public: return _sessions.size(); } - DbResponse handleRequest(OperationContext* opCtx, const Message& request) override { + Future<DbResponse> handleRequest(OperationContext* opCtx, + const Message& request) noexcept override { MONGO_UNREACHABLE; } @@ -191,7 +192,8 @@ public: return 0; } - DbResponse handleRequest(OperationContext* opCtx, const Message& request) override { + Future<DbResponse> handleRequest(OperationContext* opCtx, + const Message& request) noexcept override { MONGO_UNREACHABLE; } |