summaryrefslogtreecommitdiff
path: root/src/mongo/transport/service_state_machine.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/transport/service_state_machine.cpp')
-rw-r--r--src/mongo/transport/service_state_machine.cpp113
1 files changed, 60 insertions, 53 deletions
diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp
index 5fbff04a151..373a362d0df 100644
--- a/src/mongo/transport/service_state_machine.cpp
+++ b/src/mongo/transport/service_state_machine.cpp
@@ -467,61 +467,68 @@ void ServiceStateMachine::_processMessage(ThreadGuard guard) {
// The handleRequest is implemented in a subclass for mongod/mongos and actually all the
// database work for this request.
- DbResponse dbresponse = _sep->handleRequest(opCtx.get(), _inMessage);
-
- // opCtx must be killed and delisted here so that the operation cannot show up in currentOp
- // results after the response reaches the client. The destruction is postponed for later to
- // mitigate its performance impact on the critical path of execution.
- _serviceContext->killAndDelistOperation(opCtx.get(), ErrorCodes::OperationIsKilledAndDelisted);
- invariant(!_killedOpCtx);
- _killedOpCtx = std::move(opCtx);
-
- // Format our response, if we have one
- Message& toSink = dbresponse.response;
- if (!toSink.empty()) {
- invariant(!OpMsg::isFlagSet(_inMessage, OpMsg::kMoreToCome));
- invariant(!OpMsg::isFlagSet(toSink, OpMsg::kChecksumPresent));
-
- // Update the header for the response message.
- toSink.header().setId(nextMessageId());
- toSink.header().setResponseToMsgId(_inMessage.header().getId());
- if (OpMsg::isFlagSet(_inMessage, OpMsg::kChecksumPresent)) {
+ _sep->handleRequest(opCtx.get(), _inMessage)
+ .then([this,
+ &compressorMgr = compressorMgr,
+ opCtx = std::move(opCtx),
+ guard = std::move(guard)](DbResponse dbresponse) mutable -> void {
+ // opCtx must be killed and delisted here so that the operation cannot show up in
+ // currentOp results after the response reaches the client. The destruction is postponed
+ // for later to mitigate its performance impact on the critical path of execution.
+ _serviceContext->killAndDelistOperation(opCtx.get(),
+ ErrorCodes::OperationIsKilledAndDelisted);
+ invariant(!_killedOpCtx);
+ _killedOpCtx = std::move(opCtx);
+
+ // Format our response, if we have one
+ Message& toSink = dbresponse.response;
+ if (!toSink.empty()) {
+ invariant(!OpMsg::isFlagSet(_inMessage, OpMsg::kMoreToCome));
+ invariant(!OpMsg::isFlagSet(toSink, OpMsg::kChecksumPresent));
+
+ // Update the header for the response message.
+ toSink.header().setId(nextMessageId());
+ toSink.header().setResponseToMsgId(_inMessage.header().getId());
+ if (OpMsg::isFlagSet(_inMessage, OpMsg::kChecksumPresent)) {
#ifdef MONGO_CONFIG_SSL
- if (!SSLPeerInfo::forSession(_session()).isTLS) {
- OpMsg::appendChecksum(&toSink);
- }
+ if (!SSLPeerInfo::forSession(_session()).isTLS) {
+ OpMsg::appendChecksum(&toSink);
+ }
#else
- OpMsg::appendChecksum(&toSink);
+ OpMsg::appendChecksum(&toSink);
#endif
- }
-
- // If the incoming message has the exhaust flag set, then we bypass the normal RPC behavior.
- // We will sink the response to the network, but we also synthesize a new request, as if we
- // sourced a new message from the network. This new request is sent to the database once
- // again to be processed. This cycle repeats as long as the command indicates the exhaust
- // stream should continue.
- _inMessage = makeExhaustMessage(_inMessage, &dbresponse);
- _inExhaust = !_inMessage.empty();
-
- networkCounter.hitLogicalOut(toSink.size());
-
- if (_compressorId) {
- auto swm = compressorMgr.compressMessage(toSink, &_compressorId.value());
- uassertStatusOK(swm.getStatus());
- toSink = swm.getValue();
- }
-
- TrafficRecorder::get(_serviceContext)
- .observe(_sessionHandle, _serviceContext->getPreciseClockSource()->now(), toSink);
-
- _sinkMessage(std::move(guard), std::move(toSink));
-
- } else {
- _state.store(State::Source);
- _inMessage.reset();
- _inExhaust = false;
- return _scheduleNextWithGuard(std::move(guard), ServiceExecutor::kDeferredTask);
- }
+ }
+
+ // If the incoming message has the exhaust flag set, then we bypass the normal RPC
+ // behavior. We will sink the response to the network, but we also synthesize a new
+ // request, as if we sourced a new message from the network. This new request is
+ // sent to the database once again to be processed. This cycle repeats as long as
+ // the command indicates the exhaust stream should continue.
+ _inMessage = makeExhaustMessage(_inMessage, &dbresponse);
+ _inExhaust = !_inMessage.empty();
+
+ networkCounter.hitLogicalOut(toSink.size());
+
+ if (_compressorId) {
+ auto swm = compressorMgr.compressMessage(toSink, &_compressorId.value());
+ uassertStatusOK(swm.getStatus());
+ toSink = swm.getValue();
+ }
+
+ TrafficRecorder::get(_serviceContext)
+ .observe(
+ _sessionHandle, _serviceContext->getPreciseClockSource()->now(), toSink);
+
+ _sinkMessage(std::move(guard), std::move(toSink));
+
+ } else {
+ _state.store(State::Source);
+ _inMessage.reset();
+ _inExhaust = false;
+ return _scheduleNextWithGuard(std::move(guard), ServiceExecutor::kDeferredTask);
+ }
+ })
+ .get();
}
void ServiceStateMachine::runNext() {
@@ -668,7 +675,7 @@ void ServiceStateMachine::_cleanupExhaustResources() noexcept try {
// Fire and forget. This is a best effort attempt to immediately clean up the exhaust
// cursor. If the killCursors request fails here for any reasons, it will still be
// cleaned up once the cursor times out.
- _sep->handleRequest(opCtx.get(), makeKillCursorsMessage(cursorId));
+ _sep->handleRequest(opCtx.get(), makeKillCursorsMessage(cursorId)).get();
}
} catch (const DBException& e) {
LOGV2(22992,