summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBilly Donahue <billy.donahue@mongodb.com>2022-12-11 15:53:26 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-12-11 16:25:07 +0000
commita32134f0c24464f38922821cc5300ac315d43681 (patch)
tree3f719bd8b43ca90d37720af54cd7027c85f62c52
parent21e9ac5c3425a3ce858f51c27cd3c30fbb0262ad (diff)
downloadmongo-a32134f0c24464f38922821cc5300ac315d43681.tar.gz
SERVER-68875 SessionWorkflow loop
-rw-r--r--src/mongo/transport/session_workflow.cpp376
-rw-r--r--src/mongo/transport/session_workflow_test.cpp3
2 files changed, 197 insertions, 182 deletions
diff --git a/src/mongo/transport/session_workflow.cpp b/src/mongo/transport/session_workflow.cpp
index 32b49603221..4cee7eacd0f 100644
--- a/src/mongo/transport/session_workflow.cpp
+++ b/src/mongo/transport/session_workflow.cpp
@@ -33,6 +33,7 @@
#include "mongo/transport/session_workflow.h"
#include <memory>
+#include <tuple>
#include "mongo/base/status.h"
#include "mongo/config.h"
@@ -67,8 +68,7 @@
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kExecutor
-namespace mongo {
-namespace transport {
+namespace mongo::transport {
namespace {
MONGO_FAIL_POINT_DEFINE(doNotSetMoreToCome);
@@ -334,6 +334,8 @@ bool killExhaust(const Message& in, ServiceEntryPoint* sep, Client* client) {
class SessionWorkflow::Impl {
public:
+ class WorkItem;
+
Impl(SessionWorkflow* workflow, ServiceContext::UniqueClient client)
: _workflow{workflow},
_serviceContext{client->getServiceContext()},
@@ -348,7 +350,9 @@ public:
return _clientStrand->getClientPointer();
}
- void start();
+ void start() {
+ _scheduleIteration();
+ }
/*
* Terminates the associated transport Session, regardless of tags.
@@ -363,57 +367,18 @@ public:
*
* This will not block on the session terminating cleaning itself up, it returns immediately.
*/
- void terminateIfTagsDontMatch(transport::Session::TagMask tags);
-
- /*
- * This function actually calls into the database and processes a request. It's broken out
- * into its own inline function for better readability.
- */
- Future<void> processMessage();
-
- /** Get a request message from the Session (transport layer). */
- void receiveMessage();
-
- /** Send a response message to the Session (transport layer). */
- void sendMessage();
-
- /*
- * Releases all the resources associated with the session and call the cleanupHook.
- */
- void cleanupSession(const Status& status);
-
- /*
- * Schedules a new loop for this session workflow on a service executor. The status argument
- * specifies whether the last execution of the loop, if any, was successful.
- */
- void scheduleNewLoop(Status status);
-
- /*
- * Starts a new loop by running an iteration for this session workflow (e.g., source, process
- * and then sink).
- */
- void startNewLoop(const Status& execStatus);
-
- /*
- * Releases all the resources associated with the exhaust request.
- * When the session is closing, the most recently synthesized exhaust
- * `WorkItem` may refer to a cursor that we won't need anymore, so we can
- * try to kill it early as an optimization.
- */
- void cleanupExhaustResources();
+ void terminateIfTagsDontMatch(Session::TagMask tags);
- /*
- * Gets the transport::Session associated with this connection
- */
- const transport::SessionHandle& session() const {
+ const SessionHandle& session() const {
return client()->session();
}
- /*
- * Gets the transport::ServiceExecutor associated with this connection.
- */
ServiceExecutor* executor() {
- return ServiceExecutorContext::get(client())->getServiceExecutor();
+ return seCtx()->getServiceExecutor();
+ }
+
+ bool useDedicatedThread() {
+ return seCtx()->useDedicatedThread();
}
std::shared_ptr<ServiceExecutor::Executor> taskRunner() {
@@ -432,14 +397,73 @@ public:
#endif
}
-private:
- class WorkItem;
+ ServiceExecutorContext* seCtx() {
+ return ServiceExecutorContext::get(client());
+ }
+private:
struct RunnerAndSource {
std::shared_ptr<ServiceExecutor::Executor> runner;
ServiceExecutor* source = nullptr;
};
+ /** Alias: refers to this Impl, but holds a ref to the enclosing workflow. */
+ std::shared_ptr<Impl> shared_from_this() {
+ return {_workflow->shared_from_this(), this};
+ }
+
+ /**
+ * Returns a callback that's just like `cb`, but runs under the `_clientStrand`.
+ * The wrapper binds a `shared_from_this` so `cb` doesn't need its own copy
+ * of that anchoring shared pointer.
+ */
+ unique_function<void(Status)> _captureContext(unique_function<void(Status)> cb) {
+ return [this, a = shared_from_this(), cb = std::move(cb)](Status st) mutable {
+ _clientStrand->run([&] { cb(st); });
+ };
+ }
+
+ void _scheduleIteration();
+
+ Future<void> _doOneIteration();
+
+ /** Returns a Future for the next WorkItem. */
+ Future<std::unique_ptr<WorkItem>> _getNextWork() {
+ invariant(!_work);
+ if (_nextWork)
+ return Future{std::move(_nextWork)}; // Already have one ready.
+ if (useDedicatedThread())
+ return _receiveRequest();
+ auto&& [p, f] = makePromiseFuture<void>();
+ taskRunner()->runOnDataAvailable(
+ session(), _captureContext([p = std::move(p)](Status s) mutable { p.setFrom(s); }));
+ return std::move(f).then([this, anchor = shared_from_this()] { return _receiveRequest(); });
+ }
+
+ /** Receives a message from the session and creates a new WorkItem from it. */
+ std::unique_ptr<WorkItem> _receiveRequest();
+
+ /** Sends work to the ServiceEntryPoint, obtaining a future for its completion. */
+ Future<DbResponse> _dispatchWork();
+
+ /** Handles the completed response from dispatched work. */
+ void _acceptResponse(DbResponse response);
+
+ /** Writes the completed work response to the Session. */
+ void _sendResponse();
+
+ void _onLoopError(Status error);
+
+ void _cleanupSession(const Status& status);
+
+ /*
+ * Releases all the resources associated with the exhaust request.
+ * When the session is closing, the most recently synthesized exhaust
+ * `WorkItem` may refer to a cursor that we won't need anymore, so we can
+ * try to kill it early as an optimization.
+ */
+ void _cleanupExhaustResources();
+
/**
* Notify the task runner that this would be a good time to yield. It might
* not actually yield, depending on implementation and on overall system
@@ -452,11 +476,6 @@ private:
taskRunner()->yieldPointReached();
}
- /** Alias: refers to this Impl, but holds a ref to the enclosing workflow. */
- std::shared_ptr<Impl> shared_from_this() {
- return {_workflow->shared_from_this(), this};
- }
-
SessionWorkflow* const _workflow;
ServiceContext* const _serviceContext;
ServiceEntryPoint* _sep;
@@ -467,8 +486,6 @@ private:
std::unique_ptr<WorkItem> _work;
std::unique_ptr<WorkItem> _nextWork; /**< created by exhaust responses */
-
- metrics_detail::SessionWorkflowMetrics _metrics{_sep};
};
class SessionWorkflow::Impl::WorkItem {
@@ -557,15 +574,14 @@ private:
boost::optional<Message> _out;
};
-void SessionWorkflow::Impl::receiveMessage() {
- invariant(!_work);
+std::unique_ptr<SessionWorkflow::Impl::WorkItem> SessionWorkflow::Impl::_receiveRequest() {
try {
auto msg = uassertStatusOK([&] {
MONGO_IDLE_THREAD_BLOCK;
return session()->sourceMessage();
}());
invariant(!msg.empty());
- _work = std::make_unique<WorkItem>(this, std::move(msg));
+ return std::make_unique<WorkItem>(this, std::move(msg));
} catch (const DBException& ex) {
auto remote = session()->remote();
const auto& status = ex.toStatus();
@@ -595,31 +611,28 @@ void SessionWorkflow::Impl::receiveMessage() {
}
}
-void SessionWorkflow::Impl::sendMessage() {
- // Sink our response to the client
- //
- // If there was an error sinking the message to the client, then we should print an error and
- // end the session.
- //
- // Otherwise, return from this function to let startNewLoop() continue the future chaining.
-
+void SessionWorkflow::Impl::_sendResponse() {
+ if (!_work->hasOut())
+ return;
sessionWorkflowDelaySendMessage.execute([](auto&& data) {
Milliseconds delay{data["millis"].safeNumberLong()};
LOGV2(6724101, "sendMessage: failpoint-induced delay", "delay"_attr = delay);
sleepFor(delay);
});
- if (auto status = session()->sinkMessage(_work->consumeOut()); !status.isOK()) {
+ try {
+ uassertStatusOK(session()->sinkMessage(_work->consumeOut()));
+ } catch (const DBException& ex) {
LOGV2(22989,
"Error sending response to client. Ending connection from remote",
- "error"_attr = status,
+ "error"_attr = ex,
"remote"_attr = session()->remote(),
"connectionId"_attr = session()->id());
- uassertStatusOK(status);
+ throw;
}
}
-Future<void> SessionWorkflow::Impl::processMessage() {
+Future<DbResponse> SessionWorkflow::Impl::_dispatchWork() {
invariant(_work);
invariant(!_work->in().empty());
@@ -633,119 +646,123 @@ Future<void> SessionWorkflow::Impl::processMessage() {
// Pass sourced Message to handler to generate response.
_work->initOperation();
- // The handleRequest is implemented in a subclass for mongod/mongos and actually all the
- // database work for this request.
- return _sep->handleRequest(_work->opCtx(), _work->in())
- .then([this](DbResponse response) mutable {
- // opCtx must be killed and delisted here so that the operation cannot show up in
- // currentOp results after the response reaches the client. Destruction of the already
- // killed opCtx is postponed for later (i.e., after completion of the future-chain) to
- // mitigate its performance impact on the critical path of execution.
- // Note that destroying futures after execution, rather that postponing the destruction
- // until completion of the future-chain, would expose the cost of destroying opCtx to
- // the critical path and result in serious performance implications.
- _serviceContext->killAndDelistOperation(_work->opCtx(),
- ErrorCodes::OperationIsKilledAndDelisted);
- // Format our response, if we have one
- Message& toSink = response.response;
- if (toSink.empty())
- return;
- invariant(!OpMsg::isFlagSet(_work->in(), OpMsg::kMoreToCome));
- invariant(!OpMsg::isFlagSet(toSink, OpMsg::kChecksumPresent));
-
- // Update the header for the response message.
- toSink.header().setId(nextMessageId());
- toSink.header().setResponseToMsgId(_work->in().header().getId());
- if (!isTLS() && OpMsg::isFlagSet(_work->in(), OpMsg::kChecksumPresent))
- OpMsg::appendChecksum(&toSink);
-
- // If the incoming message has the exhaust flag set, then bypass the normal RPC
- // behavior. Sink the response to the network, but also synthesize a new
- // request, as if a new message was sourced from the network. This new request is
- // sent to the database once again to be processed. This cycle repeats as long as
- // the dbresponses continue to indicate the exhaust stream should continue.
- _nextWork = _work->synthesizeExhaust(response);
-
- networkCounter.hitLogicalOut(toSink.size());
-
- beforeCompressingExhaustResponse.executeIf(
- [&](auto&&) {}, [&](auto&&) { return _work->hasCompressorId() && _nextWork; });
-
- toSink = _work->compressResponse(toSink);
-
- TrafficRecorder::get(_serviceContext)
- .observe(session(), _serviceContext->getPreciseClockSource()->now(), toSink);
-
- _work->setOut(std::move(toSink));
- });
+ return _sep->handleRequest(_work->opCtx(), _work->in());
}
-void SessionWorkflow::Impl::start() {
- scheduleNewLoop(Status::OK());
-}
+void SessionWorkflow::Impl::_acceptResponse(DbResponse response) {
+ auto&& work = *_work;
+ // opCtx must be killed and delisted here so that the operation cannot show up in
+ // currentOp results after the response reaches the client. Destruction of the already
+ // killed opCtx is postponed for later (i.e., after completion of the future-chain) to
+ // mitigate its performance impact on the critical path of execution.
+ // Note that destroying futures after execution, rather that postponing the destruction
+ // until completion of the future-chain, would expose the cost of destroying opCtx to
+ // the critical path and result in serious performance implications.
+ _serviceContext->killAndDelistOperation(work.opCtx(), ErrorCodes::OperationIsKilledAndDelisted);
+ // Format our response, if we have one
+ Message& toSink = response.response;
+ if (toSink.empty())
+ return;
+ invariant(!OpMsg::isFlagSet(work.in(), OpMsg::kMoreToCome));
+ invariant(!OpMsg::isFlagSet(toSink, OpMsg::kChecksumPresent));
-void SessionWorkflow::Impl::scheduleNewLoop(Status status) try {
- _work = nullptr;
- uassertStatusOK(status);
+ // Update the header for the response message.
+ toSink.header().setId(nextMessageId());
+ toSink.header().setResponseToMsgId(work.in().header().getId());
+ if (!isTLS() && OpMsg::isFlagSet(work.in(), OpMsg::kChecksumPresent))
+ OpMsg::appendChecksum(&toSink);
- auto cb = [this, anchor = shared_from_this()](Status executorStatus) {
- _clientStrand->run([&] { startNewLoop(executorStatus); });
- };
+ // If the incoming message has the exhaust flag set, then bypass the normal RPC
+ // behavior. Sink the response to the network, but also synthesize a new
+ // request, as if a new message was sourced from the network. This new request is
+ // sent to the database once again to be processed. This cycle repeats as long as
+ // the dbresponses continue to indicate the exhaust stream should continue.
+ _nextWork = work.synthesizeExhaust(response);
- try {
- // Start our loop again with a new stack.
- if (_nextWork) {
- // If we're in exhaust, we're not expecting more data.
- taskRunner()->schedule(std::move(cb));
- } else {
- _yieldPointReached();
- taskRunner()->runOnDataAvailable(session(), std::move(cb));
- }
- } catch (const DBException& ex) {
- LOGV2_WARNING_OPTIONS(22993,
- {logv2::LogComponent::kExecutor},
- "Unable to schedule a new loop for the session workflow",
- "error"_attr = ex.toStatus());
- throw;
- }
-} catch (const DBException& ex) {
- LOGV2_DEBUG(5763901, 2, "Terminating session due to error", "error"_attr = ex.toStatus());
+ networkCounter.hitLogicalOut(toSink.size());
+
+ beforeCompressingExhaustResponse.executeIf(
+ [&](auto&&) {}, [&](auto&&) { return work.hasCompressorId() && _nextWork; });
+
+ toSink = work.compressResponse(toSink);
+
+ TrafficRecorder::get(_serviceContext)
+ .observe(session(), _serviceContext->getPreciseClockSource()->now(), toSink);
+
+ work.setOut(std::move(toSink));
+}
+
+void SessionWorkflow::Impl::_onLoopError(Status error) {
+ LOGV2_DEBUG(5763901, 2, "Terminating session due to error", "error"_attr = error);
terminate();
- cleanupSession(ex.toStatus());
+ _cleanupSession(error);
}
-void SessionWorkflow::Impl::startNewLoop(const Status& executorStatus) {
- if (!executorStatus.isOK()) {
- cleanupSession(executorStatus);
- return;
- }
+/** Returns a Future representing the completion of one loop iteration. */
+Future<void> SessionWorkflow::Impl::_doOneIteration() {
+ struct Frame {
+ explicit Frame(std::shared_ptr<Impl> a) : anchor{std::move(a)} {
+ metrics.start();
+ }
+ ~Frame() {
+ metrics.finish();
+ }
- _metrics.start();
+ std::shared_ptr<Impl> anchor;
+ metrics_detail::SessionWorkflowMetrics metrics{anchor->_sep};
+ };
- makeReadyFutureWith([this] {
- if (_nextWork) {
- _work = std::move(_nextWork);
- } else {
- receiveMessage();
- }
- _metrics.received();
- return processMessage();
- })
- .then([this] {
- _metrics.processed();
- if (_work->hasOut()) {
- sendMessage();
- _metrics.sent(*session());
- _yieldPointReached();
- _metrics.yielded();
- }
+ auto fr = std::make_shared<Frame>(shared_from_this());
+ return _getNextWork()
+ .then([&, fr](auto work) {
+ fr->metrics.received();
+ invariant(!_work);
+ _work = std::move(work);
+ return _dispatchWork();
})
- .getAsync([this, anchor = shared_from_this()](Status status) {
- _metrics.finish();
- scheduleNewLoop(std::move(status));
+ .then([&, fr](auto rsp) {
+ _acceptResponse(std::move(rsp));
+ fr->metrics.processed();
+ _sendResponse();
+ fr->metrics.sent(*session());
+ _yieldPointReached();
+ fr->metrics.yielded();
});
}
+void SessionWorkflow::Impl::_scheduleIteration() try {
+ _work = nullptr;
+ taskRunner()->schedule(_captureContext([&](Status status) {
+ if (MONGO_unlikely(!status.isOK())) {
+ _cleanupSession(status);
+ return;
+ }
+ if (useDedicatedThread()) {
+ try {
+ _doOneIteration().get();
+ _scheduleIteration();
+ } catch (const DBException& ex) {
+ _onLoopError(ex.toStatus());
+ }
+ } else {
+ _doOneIteration().getAsync([this, anchor = shared_from_this()](Status st) {
+ if (!st.isOK()) {
+ _onLoopError(st);
+ return;
+ }
+ _scheduleIteration();
+ });
+ }
+ }));
+} catch (const DBException& ex) {
+ auto error = ex.toStatus();
+ LOGV2_WARNING_OPTIONS(22993,
+ {logv2::LogComponent::kExecutor},
+ "Unable to schedule a new loop for the session workflow",
+ "error"_attr = error);
+ _onLoopError(error);
+}
+
void SessionWorkflow::Impl::terminate() {
if (_isTerminated.swap(true))
return;
@@ -753,7 +770,7 @@ void SessionWorkflow::Impl::terminate() {
session()->end();
}
-void SessionWorkflow::Impl::terminateIfTagsDontMatch(transport::Session::TagMask tags) {
+void SessionWorkflow::Impl::terminateIfTagsDontMatch(Session::TagMask tags) {
if (_isTerminated.load())
return;
@@ -761,7 +778,7 @@ void SessionWorkflow::Impl::terminateIfTagsDontMatch(transport::Session::TagMask
// If terminateIfTagsDontMatch gets called when we still are 'pending' where no tags have been
// set, then skip the termination check.
- if ((sessionTags & tags) || (sessionTags & transport::Session::kPending)) {
+ if ((sessionTags & tags) || (sessionTags & Session::kPending)) {
LOGV2(
22991, "Skip closing connection for connection", "connectionId"_attr = session()->id());
return;
@@ -770,17 +787,17 @@ void SessionWorkflow::Impl::terminateIfTagsDontMatch(transport::Session::TagMask
terminate();
}
-void SessionWorkflow::Impl::cleanupExhaustResources() {
+void SessionWorkflow::Impl::_cleanupExhaustResources() {
auto clean = [&](auto& w) {
return w && w->isExhaust() && killExhaust(w->in(), _sep, client());
};
clean(_nextWork) || clean(_work);
}
-void SessionWorkflow::Impl::cleanupSession(const Status& status) {
+void SessionWorkflow::Impl::_cleanupSession(const Status& status) {
LOGV2_DEBUG(5127900, 2, "Ending session", "error"_attr = status);
_taskRunner = {};
- cleanupExhaustResources();
+ _cleanupExhaustResources();
_sep->onClientDisconnect(client());
}
@@ -801,9 +818,8 @@ void SessionWorkflow::terminate() {
_impl->terminate();
}
-void SessionWorkflow::terminateIfTagsDontMatch(transport::Session::TagMask tags) {
+void SessionWorkflow::terminateIfTagsDontMatch(Session::TagMask tags) {
_impl->terminateIfTagsDontMatch(tags);
}
-} // namespace transport
-} // namespace mongo
+} // namespace mongo::transport
diff --git a/src/mongo/transport/session_workflow_test.cpp b/src/mongo/transport/session_workflow_test.cpp
index b98fa310b04..86afa2adf6d 100644
--- a/src/mongo/transport/session_workflow_test.cpp
+++ b/src/mongo/transport/session_workflow_test.cpp
@@ -440,8 +440,7 @@ private:
auto [p, f] = makePromiseFuture<DbResponse>();
ExecutorFuture<void>(_threadPool)
.then([this, opCtx, &request, p = std::move(p)]() mutable {
- auto strand = ClientStrand::get(opCtx->getClient());
- strand->run([&] { p.setWith([&] { return _processRequest(opCtx, request); }); });
+ p.setWith([&] { return _processRequest(opCtx, request); });
})
.getAsync([](auto&&) {});
return std::move(f);