diff options
author | Billy Donahue <billy.donahue@mongodb.com> | 2022-12-11 15:53:26 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-12-11 16:25:07 +0000 |
commit | a32134f0c24464f38922821cc5300ac315d43681 (patch) | |
tree | 3f719bd8b43ca90d37720af54cd7027c85f62c52 | |
parent | 21e9ac5c3425a3ce858f51c27cd3c30fbb0262ad (diff) | |
download | mongo-a32134f0c24464f38922821cc5300ac315d43681.tar.gz |
SERVER-68875 SessionWorkflow loop
-rw-r--r-- | src/mongo/transport/session_workflow.cpp | 376 | ||||
-rw-r--r-- | src/mongo/transport/session_workflow_test.cpp | 3 |
2 files changed, 197 insertions, 182 deletions
diff --git a/src/mongo/transport/session_workflow.cpp b/src/mongo/transport/session_workflow.cpp index 32b49603221..4cee7eacd0f 100644 --- a/src/mongo/transport/session_workflow.cpp +++ b/src/mongo/transport/session_workflow.cpp @@ -33,6 +33,7 @@ #include "mongo/transport/session_workflow.h" #include <memory> +#include <tuple> #include "mongo/base/status.h" #include "mongo/config.h" @@ -67,8 +68,7 @@ #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kExecutor -namespace mongo { -namespace transport { +namespace mongo::transport { namespace { MONGO_FAIL_POINT_DEFINE(doNotSetMoreToCome); @@ -334,6 +334,8 @@ bool killExhaust(const Message& in, ServiceEntryPoint* sep, Client* client) { class SessionWorkflow::Impl { public: + class WorkItem; + Impl(SessionWorkflow* workflow, ServiceContext::UniqueClient client) : _workflow{workflow}, _serviceContext{client->getServiceContext()}, @@ -348,7 +350,9 @@ public: return _clientStrand->getClientPointer(); } - void start(); + void start() { + _scheduleIteration(); + } /* * Terminates the associated transport Session, regardless of tags. @@ -363,57 +367,18 @@ public: * * This will not block on the session terminating cleaning itself up, it returns immediately. */ - void terminateIfTagsDontMatch(transport::Session::TagMask tags); - - /* - * This function actually calls into the database and processes a request. It's broken out - * into its own inline function for better readability. - */ - Future<void> processMessage(); - - /** Get a request message from the Session (transport layer). */ - void receiveMessage(); - - /** Send a response message to the Session (transport layer). */ - void sendMessage(); - - /* - * Releases all the resources associated with the session and call the cleanupHook. - */ - void cleanupSession(const Status& status); - - /* - * Schedules a new loop for this session workflow on a service executor. The status argument - * specifies whether the last execution of the loop, if any, was successful. - */ - void scheduleNewLoop(Status status); - - /* - * Starts a new loop by running an iteration for this session workflow (e.g., source, process - * and then sink). - */ - void startNewLoop(const Status& execStatus); - - /* - * Releases all the resources associated with the exhaust request. - * When the session is closing, the most recently synthesized exhaust - * `WorkItem` may refer to a cursor that we won't need anymore, so we can - * try to kill it early as an optimization. - */ - void cleanupExhaustResources(); + void terminateIfTagsDontMatch(Session::TagMask tags); - /* - * Gets the transport::Session associated with this connection - */ - const transport::SessionHandle& session() const { + const SessionHandle& session() const { return client()->session(); } - /* - * Gets the transport::ServiceExecutor associated with this connection. - */ ServiceExecutor* executor() { - return ServiceExecutorContext::get(client())->getServiceExecutor(); + return seCtx()->getServiceExecutor(); + } + + bool useDedicatedThread() { + return seCtx()->useDedicatedThread(); } std::shared_ptr<ServiceExecutor::Executor> taskRunner() { @@ -432,14 +397,73 @@ public: #endif } -private: - class WorkItem; + ServiceExecutorContext* seCtx() { + return ServiceExecutorContext::get(client()); + } +private: struct RunnerAndSource { std::shared_ptr<ServiceExecutor::Executor> runner; ServiceExecutor* source = nullptr; }; + /** Alias: refers to this Impl, but holds a ref to the enclosing workflow. */ + std::shared_ptr<Impl> shared_from_this() { + return {_workflow->shared_from_this(), this}; + } + + /** + * Returns a callback that's just like `cb`, but runs under the `_clientStrand`. + * The wrapper binds a `shared_from_this` so `cb` doesn't need its own copy + * of that anchoring shared pointer. + */ + unique_function<void(Status)> _captureContext(unique_function<void(Status)> cb) { + return [this, a = shared_from_this(), cb = std::move(cb)](Status st) mutable { + _clientStrand->run([&] { cb(st); }); + }; + } + + void _scheduleIteration(); + + Future<void> _doOneIteration(); + + /** Returns a Future for the next WorkItem. */ + Future<std::unique_ptr<WorkItem>> _getNextWork() { + invariant(!_work); + if (_nextWork) + return Future{std::move(_nextWork)}; // Already have one ready. + if (useDedicatedThread()) + return _receiveRequest(); + auto&& [p, f] = makePromiseFuture<void>(); + taskRunner()->runOnDataAvailable( + session(), _captureContext([p = std::move(p)](Status s) mutable { p.setFrom(s); })); + return std::move(f).then([this, anchor = shared_from_this()] { return _receiveRequest(); }); + } + + /** Receives a message from the session and creates a new WorkItem from it. */ + std::unique_ptr<WorkItem> _receiveRequest(); + + /** Sends work to the ServiceEntryPoint, obtaining a future for its completion. */ + Future<DbResponse> _dispatchWork(); + + /** Handles the completed response from dispatched work. */ + void _acceptResponse(DbResponse response); + + /** Writes the completed work response to the Session. */ + void _sendResponse(); + + void _onLoopError(Status error); + + void _cleanupSession(const Status& status); + + /* + * Releases all the resources associated with the exhaust request. + * When the session is closing, the most recently synthesized exhaust + * `WorkItem` may refer to a cursor that we won't need anymore, so we can + * try to kill it early as an optimization. + */ + void _cleanupExhaustResources(); + /** * Notify the task runner that this would be a good time to yield. It might * not actually yield, depending on implementation and on overall system @@ -452,11 +476,6 @@ private: taskRunner()->yieldPointReached(); } - /** Alias: refers to this Impl, but holds a ref to the enclosing workflow. */ - std::shared_ptr<Impl> shared_from_this() { - return {_workflow->shared_from_this(), this}; - } - SessionWorkflow* const _workflow; ServiceContext* const _serviceContext; ServiceEntryPoint* _sep; @@ -467,8 +486,6 @@ private: std::unique_ptr<WorkItem> _work; std::unique_ptr<WorkItem> _nextWork; /**< created by exhaust responses */ - - metrics_detail::SessionWorkflowMetrics _metrics{_sep}; }; class SessionWorkflow::Impl::WorkItem { @@ -557,15 +574,14 @@ private: boost::optional<Message> _out; }; -void SessionWorkflow::Impl::receiveMessage() { - invariant(!_work); +std::unique_ptr<SessionWorkflow::Impl::WorkItem> SessionWorkflow::Impl::_receiveRequest() { try { auto msg = uassertStatusOK([&] { MONGO_IDLE_THREAD_BLOCK; return session()->sourceMessage(); }()); invariant(!msg.empty()); - _work = std::make_unique<WorkItem>(this, std::move(msg)); + return std::make_unique<WorkItem>(this, std::move(msg)); } catch (const DBException& ex) { auto remote = session()->remote(); const auto& status = ex.toStatus(); @@ -595,31 +611,28 @@ void SessionWorkflow::Impl::receiveMessage() { } } -void SessionWorkflow::Impl::sendMessage() { - // Sink our response to the client - // - // If there was an error sinking the message to the client, then we should print an error and - // end the session. - // - // Otherwise, return from this function to let startNewLoop() continue the future chaining. - +void SessionWorkflow::Impl::_sendResponse() { + if (!_work->hasOut()) + return; sessionWorkflowDelaySendMessage.execute([](auto&& data) { Milliseconds delay{data["millis"].safeNumberLong()}; LOGV2(6724101, "sendMessage: failpoint-induced delay", "delay"_attr = delay); sleepFor(delay); }); - if (auto status = session()->sinkMessage(_work->consumeOut()); !status.isOK()) { + try { + uassertStatusOK(session()->sinkMessage(_work->consumeOut())); + } catch (const DBException& ex) { LOGV2(22989, "Error sending response to client. Ending connection from remote", - "error"_attr = status, + "error"_attr = ex, "remote"_attr = session()->remote(), "connectionId"_attr = session()->id()); - uassertStatusOK(status); + throw; } } -Future<void> SessionWorkflow::Impl::processMessage() { +Future<DbResponse> SessionWorkflow::Impl::_dispatchWork() { invariant(_work); invariant(!_work->in().empty()); @@ -633,119 +646,123 @@ Future<void> SessionWorkflow::Impl::processMessage() { // Pass sourced Message to handler to generate response. _work->initOperation(); - // The handleRequest is implemented in a subclass for mongod/mongos and actually all the - // database work for this request. - return _sep->handleRequest(_work->opCtx(), _work->in()) - .then([this](DbResponse response) mutable { - // opCtx must be killed and delisted here so that the operation cannot show up in - // currentOp results after the response reaches the client. Destruction of the already - // killed opCtx is postponed for later (i.e., after completion of the future-chain) to - // mitigate its performance impact on the critical path of execution. - // Note that destroying futures after execution, rather that postponing the destruction - // until completion of the future-chain, would expose the cost of destroying opCtx to - // the critical path and result in serious performance implications. - _serviceContext->killAndDelistOperation(_work->opCtx(), - ErrorCodes::OperationIsKilledAndDelisted); - // Format our response, if we have one - Message& toSink = response.response; - if (toSink.empty()) - return; - invariant(!OpMsg::isFlagSet(_work->in(), OpMsg::kMoreToCome)); - invariant(!OpMsg::isFlagSet(toSink, OpMsg::kChecksumPresent)); - - // Update the header for the response message. - toSink.header().setId(nextMessageId()); - toSink.header().setResponseToMsgId(_work->in().header().getId()); - if (!isTLS() && OpMsg::isFlagSet(_work->in(), OpMsg::kChecksumPresent)) - OpMsg::appendChecksum(&toSink); - - // If the incoming message has the exhaust flag set, then bypass the normal RPC - // behavior. Sink the response to the network, but also synthesize a new - // request, as if a new message was sourced from the network. This new request is - // sent to the database once again to be processed. This cycle repeats as long as - // the dbresponses continue to indicate the exhaust stream should continue. - _nextWork = _work->synthesizeExhaust(response); - - networkCounter.hitLogicalOut(toSink.size()); - - beforeCompressingExhaustResponse.executeIf( - [&](auto&&) {}, [&](auto&&) { return _work->hasCompressorId() && _nextWork; }); - - toSink = _work->compressResponse(toSink); - - TrafficRecorder::get(_serviceContext) - .observe(session(), _serviceContext->getPreciseClockSource()->now(), toSink); - - _work->setOut(std::move(toSink)); - }); + return _sep->handleRequest(_work->opCtx(), _work->in()); } -void SessionWorkflow::Impl::start() { - scheduleNewLoop(Status::OK()); -} +void SessionWorkflow::Impl::_acceptResponse(DbResponse response) { + auto&& work = *_work; + // opCtx must be killed and delisted here so that the operation cannot show up in + // currentOp results after the response reaches the client. Destruction of the already + // killed opCtx is postponed for later (i.e., after completion of the future-chain) to + // mitigate its performance impact on the critical path of execution. + // Note that destroying futures after execution, rather that postponing the destruction + // until completion of the future-chain, would expose the cost of destroying opCtx to + // the critical path and result in serious performance implications. + _serviceContext->killAndDelistOperation(work.opCtx(), ErrorCodes::OperationIsKilledAndDelisted); + // Format our response, if we have one + Message& toSink = response.response; + if (toSink.empty()) + return; + invariant(!OpMsg::isFlagSet(work.in(), OpMsg::kMoreToCome)); + invariant(!OpMsg::isFlagSet(toSink, OpMsg::kChecksumPresent)); -void SessionWorkflow::Impl::scheduleNewLoop(Status status) try { - _work = nullptr; - uassertStatusOK(status); + // Update the header for the response message. + toSink.header().setId(nextMessageId()); + toSink.header().setResponseToMsgId(work.in().header().getId()); + if (!isTLS() && OpMsg::isFlagSet(work.in(), OpMsg::kChecksumPresent)) + OpMsg::appendChecksum(&toSink); - auto cb = [this, anchor = shared_from_this()](Status executorStatus) { - _clientStrand->run([&] { startNewLoop(executorStatus); }); - }; + // If the incoming message has the exhaust flag set, then bypass the normal RPC + // behavior. Sink the response to the network, but also synthesize a new + // request, as if a new message was sourced from the network. This new request is + // sent to the database once again to be processed. This cycle repeats as long as + // the dbresponses continue to indicate the exhaust stream should continue. + _nextWork = work.synthesizeExhaust(response); - try { - // Start our loop again with a new stack. - if (_nextWork) { - // If we're in exhaust, we're not expecting more data. - taskRunner()->schedule(std::move(cb)); - } else { - _yieldPointReached(); - taskRunner()->runOnDataAvailable(session(), std::move(cb)); - } - } catch (const DBException& ex) { - LOGV2_WARNING_OPTIONS(22993, - {logv2::LogComponent::kExecutor}, - "Unable to schedule a new loop for the session workflow", - "error"_attr = ex.toStatus()); - throw; - } -} catch (const DBException& ex) { - LOGV2_DEBUG(5763901, 2, "Terminating session due to error", "error"_attr = ex.toStatus()); + networkCounter.hitLogicalOut(toSink.size()); + + beforeCompressingExhaustResponse.executeIf( + [&](auto&&) {}, [&](auto&&) { return work.hasCompressorId() && _nextWork; }); + + toSink = work.compressResponse(toSink); + + TrafficRecorder::get(_serviceContext) + .observe(session(), _serviceContext->getPreciseClockSource()->now(), toSink); + + work.setOut(std::move(toSink)); +} + +void SessionWorkflow::Impl::_onLoopError(Status error) { + LOGV2_DEBUG(5763901, 2, "Terminating session due to error", "error"_attr = error); terminate(); - cleanupSession(ex.toStatus()); + _cleanupSession(error); } -void SessionWorkflow::Impl::startNewLoop(const Status& executorStatus) { - if (!executorStatus.isOK()) { - cleanupSession(executorStatus); - return; - } +/** Returns a Future representing the completion of one loop iteration. */ +Future<void> SessionWorkflow::Impl::_doOneIteration() { + struct Frame { + explicit Frame(std::shared_ptr<Impl> a) : anchor{std::move(a)} { + metrics.start(); + } + ~Frame() { + metrics.finish(); + } - _metrics.start(); + std::shared_ptr<Impl> anchor; + metrics_detail::SessionWorkflowMetrics metrics{anchor->_sep}; + }; - makeReadyFutureWith([this] { - if (_nextWork) { - _work = std::move(_nextWork); - } else { - receiveMessage(); - } - _metrics.received(); - return processMessage(); - }) - .then([this] { - _metrics.processed(); - if (_work->hasOut()) { - sendMessage(); - _metrics.sent(*session()); - _yieldPointReached(); - _metrics.yielded(); - } + auto fr = std::make_shared<Frame>(shared_from_this()); + return _getNextWork() + .then([&, fr](auto work) { + fr->metrics.received(); + invariant(!_work); + _work = std::move(work); + return _dispatchWork(); }) - .getAsync([this, anchor = shared_from_this()](Status status) { - _metrics.finish(); - scheduleNewLoop(std::move(status)); + .then([&, fr](auto rsp) { + _acceptResponse(std::move(rsp)); + fr->metrics.processed(); + _sendResponse(); + fr->metrics.sent(*session()); + _yieldPointReached(); + fr->metrics.yielded(); }); } +void SessionWorkflow::Impl::_scheduleIteration() try { + _work = nullptr; + taskRunner()->schedule(_captureContext([&](Status status) { + if (MONGO_unlikely(!status.isOK())) { + _cleanupSession(status); + return; + } + if (useDedicatedThread()) { + try { + _doOneIteration().get(); + _scheduleIteration(); + } catch (const DBException& ex) { + _onLoopError(ex.toStatus()); + } + } else { + _doOneIteration().getAsync([this, anchor = shared_from_this()](Status st) { + if (!st.isOK()) { + _onLoopError(st); + return; + } + _scheduleIteration(); + }); + } + })); +} catch (const DBException& ex) { + auto error = ex.toStatus(); + LOGV2_WARNING_OPTIONS(22993, + {logv2::LogComponent::kExecutor}, + "Unable to schedule a new loop for the session workflow", + "error"_attr = error); + _onLoopError(error); +} + void SessionWorkflow::Impl::terminate() { if (_isTerminated.swap(true)) return; @@ -753,7 +770,7 @@ void SessionWorkflow::Impl::terminate() { session()->end(); } -void SessionWorkflow::Impl::terminateIfTagsDontMatch(transport::Session::TagMask tags) { +void SessionWorkflow::Impl::terminateIfTagsDontMatch(Session::TagMask tags) { if (_isTerminated.load()) return; @@ -761,7 +778,7 @@ void SessionWorkflow::Impl::terminateIfTagsDontMatch(transport::Session::TagMask // If terminateIfTagsDontMatch gets called when we still are 'pending' where no tags have been // set, then skip the termination check. - if ((sessionTags & tags) || (sessionTags & transport::Session::kPending)) { + if ((sessionTags & tags) || (sessionTags & Session::kPending)) { LOGV2( 22991, "Skip closing connection for connection", "connectionId"_attr = session()->id()); return; @@ -770,17 +787,17 @@ void SessionWorkflow::Impl::terminateIfTagsDontMatch(transport::Session::TagMask terminate(); } -void SessionWorkflow::Impl::cleanupExhaustResources() { +void SessionWorkflow::Impl::_cleanupExhaustResources() { auto clean = [&](auto& w) { return w && w->isExhaust() && killExhaust(w->in(), _sep, client()); }; clean(_nextWork) || clean(_work); } -void SessionWorkflow::Impl::cleanupSession(const Status& status) { +void SessionWorkflow::Impl::_cleanupSession(const Status& status) { LOGV2_DEBUG(5127900, 2, "Ending session", "error"_attr = status); _taskRunner = {}; - cleanupExhaustResources(); + _cleanupExhaustResources(); _sep->onClientDisconnect(client()); } @@ -801,9 +818,8 @@ void SessionWorkflow::terminate() { _impl->terminate(); } -void SessionWorkflow::terminateIfTagsDontMatch(transport::Session::TagMask tags) { +void SessionWorkflow::terminateIfTagsDontMatch(Session::TagMask tags) { _impl->terminateIfTagsDontMatch(tags); } -} // namespace transport -} // namespace mongo +} // namespace mongo::transport diff --git a/src/mongo/transport/session_workflow_test.cpp b/src/mongo/transport/session_workflow_test.cpp index b98fa310b04..86afa2adf6d 100644 --- a/src/mongo/transport/session_workflow_test.cpp +++ b/src/mongo/transport/session_workflow_test.cpp @@ -440,8 +440,7 @@ private: auto [p, f] = makePromiseFuture<DbResponse>(); ExecutorFuture<void>(_threadPool) .then([this, opCtx, &request, p = std::move(p)]() mutable { - auto strand = ClientStrand::get(opCtx->getClient()); - strand->run([&] { p.setWith([&] { return _processRequest(opCtx, request); }); }); + p.setWith([&] { return _processRequest(opCtx, request); }); }) .getAsync([](auto&&) {}); return std::move(f); |