diff options
Diffstat (limited to 'src/mongo/transport/service_state_machine.cpp')
-rw-r--r-- | src/mongo/transport/service_state_machine.cpp | 113 |
1 files changed, 60 insertions, 53 deletions
diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp index 5fbff04a151..373a362d0df 100644 --- a/src/mongo/transport/service_state_machine.cpp +++ b/src/mongo/transport/service_state_machine.cpp @@ -467,61 +467,68 @@ void ServiceStateMachine::_processMessage(ThreadGuard guard) { // The handleRequest is implemented in a subclass for mongod/mongos and actually all the // database work for this request. - DbResponse dbresponse = _sep->handleRequest(opCtx.get(), _inMessage); - - // opCtx must be killed and delisted here so that the operation cannot show up in currentOp - // results after the response reaches the client. The destruction is postponed for later to - // mitigate its performance impact on the critical path of execution. - _serviceContext->killAndDelistOperation(opCtx.get(), ErrorCodes::OperationIsKilledAndDelisted); - invariant(!_killedOpCtx); - _killedOpCtx = std::move(opCtx); - - // Format our response, if we have one - Message& toSink = dbresponse.response; - if (!toSink.empty()) { - invariant(!OpMsg::isFlagSet(_inMessage, OpMsg::kMoreToCome)); - invariant(!OpMsg::isFlagSet(toSink, OpMsg::kChecksumPresent)); - - // Update the header for the response message. - toSink.header().setId(nextMessageId()); - toSink.header().setResponseToMsgId(_inMessage.header().getId()); - if (OpMsg::isFlagSet(_inMessage, OpMsg::kChecksumPresent)) { + _sep->handleRequest(opCtx.get(), _inMessage) + .then([this, + &compressorMgr = compressorMgr, + opCtx = std::move(opCtx), + guard = std::move(guard)](DbResponse dbresponse) mutable -> void { + // opCtx must be killed and delisted here so that the operation cannot show up in + // currentOp results after the response reaches the client. The destruction is postponed + // for later to mitigate its performance impact on the critical path of execution. + _serviceContext->killAndDelistOperation(opCtx.get(), + ErrorCodes::OperationIsKilledAndDelisted); + invariant(!_killedOpCtx); + _killedOpCtx = std::move(opCtx); + + // Format our response, if we have one + Message& toSink = dbresponse.response; + if (!toSink.empty()) { + invariant(!OpMsg::isFlagSet(_inMessage, OpMsg::kMoreToCome)); + invariant(!OpMsg::isFlagSet(toSink, OpMsg::kChecksumPresent)); + + // Update the header for the response message. + toSink.header().setId(nextMessageId()); + toSink.header().setResponseToMsgId(_inMessage.header().getId()); + if (OpMsg::isFlagSet(_inMessage, OpMsg::kChecksumPresent)) { #ifdef MONGO_CONFIG_SSL - if (!SSLPeerInfo::forSession(_session()).isTLS) { - OpMsg::appendChecksum(&toSink); - } + if (!SSLPeerInfo::forSession(_session()).isTLS) { + OpMsg::appendChecksum(&toSink); + } #else - OpMsg::appendChecksum(&toSink); + OpMsg::appendChecksum(&toSink); #endif - } - - // If the incoming message has the exhaust flag set, then we bypass the normal RPC behavior. - // We will sink the response to the network, but we also synthesize a new request, as if we - // sourced a new message from the network. This new request is sent to the database once - // again to be processed. This cycle repeats as long as the command indicates the exhaust - // stream should continue. - _inMessage = makeExhaustMessage(_inMessage, &dbresponse); - _inExhaust = !_inMessage.empty(); - - networkCounter.hitLogicalOut(toSink.size()); - - if (_compressorId) { - auto swm = compressorMgr.compressMessage(toSink, &_compressorId.value()); - uassertStatusOK(swm.getStatus()); - toSink = swm.getValue(); - } - - TrafficRecorder::get(_serviceContext) - .observe(_sessionHandle, _serviceContext->getPreciseClockSource()->now(), toSink); - - _sinkMessage(std::move(guard), std::move(toSink)); - - } else { - _state.store(State::Source); - _inMessage.reset(); - _inExhaust = false; - return _scheduleNextWithGuard(std::move(guard), ServiceExecutor::kDeferredTask); - } + } + + // If the incoming message has the exhaust flag set, then we bypass the normal RPC + // behavior. We will sink the response to the network, but we also synthesize a new + // request, as if we sourced a new message from the network. This new request is + // sent to the database once again to be processed. This cycle repeats as long as + // the command indicates the exhaust stream should continue. + _inMessage = makeExhaustMessage(_inMessage, &dbresponse); + _inExhaust = !_inMessage.empty(); + + networkCounter.hitLogicalOut(toSink.size()); + + if (_compressorId) { + auto swm = compressorMgr.compressMessage(toSink, &_compressorId.value()); + uassertStatusOK(swm.getStatus()); + toSink = swm.getValue(); + } + + TrafficRecorder::get(_serviceContext) + .observe( + _sessionHandle, _serviceContext->getPreciseClockSource()->now(), toSink); + + _sinkMessage(std::move(guard), std::move(toSink)); + + } else { + _state.store(State::Source); + _inMessage.reset(); + _inExhaust = false; + return _scheduleNextWithGuard(std::move(guard), ServiceExecutor::kDeferredTask); + } + }) + .get(); } void ServiceStateMachine::runNext() { @@ -668,7 +675,7 @@ void ServiceStateMachine::_cleanupExhaustResources() noexcept try { // Fire and forget. This is a best effort attempt to immediately clean up the exhaust // cursor. If the killCursors request fails here for any reasons, it will still be // cleaned up once the cursor times out. - _sep->handleRequest(opCtx.get(), makeKillCursorsMessage(cursorId)); + _sep->handleRequest(opCtx.get(), makeKillCursorsMessage(cursorId)).get(); } } catch (const DBException& e) { LOGV2(22992, |