summaryrefslogtreecommitdiff
path: root/src/mongo/transport
diff options
context:
space:
mode:
authorBilly Donahue <billy.donahue@mongodb.com>2023-01-26 04:45:10 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-01-26 05:53:27 +0000
commit875e14f1adb92cb0ab0e8b2aaffa3c7f73d49e4a (patch)
tree18b5fda43b387ca5fc1d36b24501755892204a87 /src/mongo/transport
parentbe8fd031fb851f5c0c2288585abafd92439c8d0a (diff)
downloadmongo-875e14f1adb92cb0ab0e8b2aaffa3c7f73d49e4a.tar.gz
SERVER-68723 SessionWorkflowTest refactor (fix and unrevert)
Diffstat (limited to 'src/mongo/transport')
-rw-r--r--src/mongo/transport/session_workflow_test.cpp1261
1 files changed, 577 insertions, 684 deletions
diff --git a/src/mongo/transport/session_workflow_test.cpp b/src/mongo/transport/session_workflow_test.cpp
index a9baba0dfb5..7721a1f39e2 100644
--- a/src/mongo/transport/session_workflow_test.cpp
+++ b/src/mongo/transport/session_workflow_test.cpp
@@ -30,8 +30,12 @@
#include "mongo/platform/basic.h"
+#include <array>
+#include <deque>
+#include <initializer_list>
#include <memory>
#include <type_traits>
+#include <utility>
#include <vector>
#include "mongo/base/checked_cast.h"
@@ -55,19 +59,15 @@
#include "mongo/transport/service_executor_utils.h"
#include "mongo/transport/session_workflow.h"
#include "mongo/transport/session_workflow_test_util.h"
-#include "mongo/transport/transport_layer_mock.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/assert_util.h"
-#include "mongo/util/clock_source_mock.h"
+#include "mongo/util/concurrency/notification.h"
#include "mongo/util/concurrency/thread_pool.h"
-#include "mongo/util/producer_consumer_queue.h"
-#include "mongo/util/tick_source_mock.h"
+#include "mongo/util/synchronized_value.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
-
-namespace mongo {
-namespace transport {
+namespace mongo::transport {
namespace {
const Status kClosedSessionError{ErrorCodes::SocketException, "Session is closed"};
@@ -75,815 +75,708 @@ const Status kNetworkError{ErrorCodes::HostUnreachable, "Someone is unreachable"
const Status kShutdownError{ErrorCodes::ShutdownInProgress, "Something is shutting down"};
const Status kArbitraryError{ErrorCodes::InternalError, "Something happened"};
-/**
- * FailureCondition represents a set of the ways any state in the SessionWorkflow can fail.
- */
-enum class FailureCondition {
- kNone,
- kTerminate, // External termination via the ServiceEntryPoint.
- kDisconnect, // Socket disconnection by peer.
- kNetwork, // Unspecified network failure (ala host unreachable).
- kShutdown, // System shutdown.
- kArbitrary, // An arbitrary error that does not fall under the other conditions.
-};
-
-constexpr StringData toString(FailureCondition fail) {
- switch (fail) {
- case FailureCondition::kNone:
- return "None"_sd;
- case FailureCondition::kTerminate:
- return "Terminate"_sd;
- case FailureCondition::kDisconnect:
- return "Disconnect"_sd;
- case FailureCondition::kNetwork:
- return "Network"_sd;
- case FailureCondition::kShutdown:
- return "Shutdown"_sd;
- case FailureCondition::kArbitrary:
- return "Arbitrary"_sd;
- };
-
- return "Unknown"_sd;
-}
-
-std::ostream& operator<<(std::ostream& os, FailureCondition fail) {
- return os << toString(fail);
+template <typename T, size_t N>
+StringData findEnumName(const std::pair<T, StringData> (&arr)[N], T k) {
+ using std::begin;
+ using std::end;
+ return std::find_if(begin(arr), end(arr), [&](auto&& e) { return e.first == k; })->second;
}
/**
- * SessionState represents the externally observable state of the SessionWorkflow. These
- * states map relatively closely to the internals of the SessionWorkflow::Impl. That said,
- * this enum represents the SessionWorkflowTest's external understanding of the internal
- * state.
+ * Events generated by SessionWorkflow via virtual function calls to mock
+ * objects. They are a means to observe and indirectly manipulate
+ * SessionWorkflow's behavior to reproduce test scenarios.
+ *
+ * They are named for the mock object and function that emits them.
*/
-enum class SessionState {
- kStart,
- kPoll,
- kSource,
- kProcess,
- kSink,
- kEnd,
+enum class Event {
+ kSessionWaitForData,
+ kSessionSourceMessage,
+ kSepHandleRequest,
+ kSessionSinkMessage,
+ kSepEndSession,
};
-constexpr StringData toString(SessionState state) {
- switch (state) {
- case SessionState::kStart:
- return "Start"_sd;
- case SessionState::kPoll:
- return "Poll"_sd;
- case SessionState::kSource:
- return "Source"_sd;
- case SessionState::kProcess:
- return "Process"_sd;
- case SessionState::kSink:
- return "Sink"_sd;
- case SessionState::kEnd:
- return "End"_sd;
- };
-
- return "Unknown"_sd;
+StringData toString(Event e) {
+ return findEnumName(
+ {
+ {Event::kSessionWaitForData, "Session.waitForData"_sd},
+ {Event::kSessionSourceMessage, "Session.sourceMessage"_sd},
+ {Event::kSepHandleRequest, "Sep.handleRequest"_sd},
+ {Event::kSessionSinkMessage, "Session.sinkMessage"_sd},
+ {Event::kSepEndSession, "Sep.endSession"_sd},
+ },
+ e);
}
-std::ostream& operator<<(std::ostream& os, SessionState state) {
- return os << toString(state);
+std::ostream& operator<<(std::ostream& os, Event e) {
+ return os << toString(e);
}
-/**
- * RequestKind represents the type of operation of the SessionWorkflow. Depending on various
- * message flags and conditions, the SessionWorkflow will transition between states
- * differently.
- */
-enum class RequestKind {
- kDefault,
- kExhaust,
- kMoreToCome,
-};
-
-constexpr StringData toString(RequestKind kind) {
- switch (kind) {
- case RequestKind::kDefault:
- return "Default"_sd;
- case RequestKind::kExhaust:
- return "Exhaust"_sd;
- case RequestKind::kMoreToCome:
- return "MoreToCome"_sd;
- };
- return "Unknown"_sd;
+template <typename T>
+static std::string typeName() {
+ return demangleName(typeid(T));
}
-
-std::ostream& operator<<(std::ostream& os, RequestKind kind) {
- return os << toString(kind);
+template <typename T>
+static std::ostream& stream(std::ostream& os, const T&) {
+ return os << "[{}]"_format(typeName<T>());
+}
+static std::ostream& stream(std::ostream& os, const Status& v) {
+ return os << v;
+}
+template <typename T>
+static std::ostream& stream(std::ostream& os, const StatusWith<T>& v) {
+ if (!v.isOK())
+ return stream(os, v.getStatus());
+ return stream(os, v.getValue());
}
-class ResultValue {
-public:
- ResultValue() = default;
- explicit ResultValue(Status status) : _value(std::move(status)) {}
- explicit ResultValue(StatusWith<Message> message) : _value(std::move(message)) {}
- explicit ResultValue(StatusWith<DbResponse> response) : _value(std::move(response)) {}
-
- void setResponse(StatusWith<DbResponse> response) {
- _value = response;
- }
-
- StatusWith<DbResponse> getResponse() const {
- return _convertTo<StatusWith<DbResponse>>();
- }
+class Result {
+ using Variant = stdx::variant<stdx::monostate, Status, StatusWith<Message>, Future<DbResponse>>;
- void setMessage(StatusWith<Message> message) {
- _value = message;
- }
+public:
+ Result() = default;
- StatusWith<Message> getMessage() const {
- return _convertTo<StatusWith<Message>>();
- }
+ Result(Result&&) = default;
+ Result& operator=(Result&&) = default;
- void setStatus(Status status) {
- _value = status;
- }
+ template <typename T, std::enable_if_t<std::is_constructible_v<Variant, T&&>, int> = 0>
+ explicit Result(T&& v) : _value{std::forward<T>(v)} {}
- Status getStatus() const {
- return _convertTo<Status>();
- }
-
- bool empty() const {
- return _value.index() == 0;
+ template <typename T>
+ T consumeAs() && {
+ return stdx::visit(
+ [](auto&& alt) -> T {
+ using A = decltype(alt);
+ if constexpr (std::is_convertible<A, T>())
+ return std::forward<A>(alt);
+ invariant(0, "{} => {}"_format(typeName<A>(), typeName<T>()));
+ MONGO_UNREACHABLE;
+ },
+ std::exchange(_value, {}));
}
explicit operator bool() const {
- return !empty();
+ return _value.index() != 0;
}
private:
- template <typename Target>
- Target _convertTo() const {
+ friend std::string toString(const Result& r) {
return stdx::visit(
- [](auto alt) -> Target {
- if constexpr (std::is_convertible<decltype(alt), Target>())
- return alt;
- invariant(false, "ResultValue not convertible to target type");
- MONGO_COMPILER_UNREACHABLE;
+ [](auto&& alt) -> std::string {
+ using A = std::decay_t<decltype(alt)>;
+ std::ostringstream os;
+ stream(os << "[{}]"_format(typeName<A>()), alt);
+ return os.str();
},
- _value);
+ r._value);
}
- stdx::variant<stdx::monostate, Status, StatusWith<Message>, StatusWith<DbResponse>> _value;
+ Variant _value;
};
-/**
- * This class stores and synchronizes the shared state result between the test
- * fixture and its various wrappers.
- */
-struct StateResult {
- Mutex mutex = MONGO_MAKE_LATCH("StateResult::_mutex");
- stdx::condition_variable cv;
+Message makeOpMsg() {
+ static auto nextId = AtomicWord<int>{0};
+ auto omb = OpMsgBuilder{};
+ omb.setBody(BSONObjBuilder{}.append("id", nextId.fetchAndAdd(1)).obj());
+ return omb.finish();
+}
- AtomicWord<bool> isConnected{true};
+DbResponse makeResponse(Message m) {
+ DbResponse response{};
+ response.response = m;
+ return response;
+}
- ResultValue result;
- SessionState state;
-};
+DbResponse setExhaust(DbResponse response) {
+ response.shouldRunAgainForExhaust = true;
+ return response;
+}
-/**
- * The SessionWorkflowTest is a fixture that mocks the external inputs into the
- * SessionWorkflow so as to provide a deterministic way to evaluate the session workflow
- * implemenation.
- */
-class SessionWorkflowTest : public LockerNoopServiceContextTest {
-public:
- /**
- * Make a generic thread pool to deliver external inputs out of line (mocking the network or
- * database workers).
- */
- static std::shared_ptr<ThreadPool> makeThreadPool() {
- auto options = ThreadPool::Options{};
- options.poolName = "SessionWorkflowTest";
+Message setExhaustSupported(Message msg) {
+ OpMsg::setFlag(&msg, OpMsg::kExhaustSupported);
+ return msg;
+}
- return std::make_shared<ThreadPool>(std::move(options));
- }
+Message setMoreToCome(Message msg) {
+ OpMsg::setFlag(&msg, OpMsg::kMoreToCome);
+ return msg;
+}
- void setUp() override;
- void tearDown() override;
+/** Fixture that mocks interactions with a `SessionWorkflow`. */
+class SessionWorkflowTest : public LockerNoopServiceContextTest {
+ using Base = LockerNoopServiceContextTest;
- /**
- * This function blocks until the SessionWorkflowTest observes a state change.
- */
- SessionState popSessionState() {
- return _stateQueue.pop();
+public:
+ void setUp() override {
+ Base::setUp();
+ auto sc = getServiceContext();
+ sc->setServiceEntryPoint(_makeServiceEntryPoint(sc));
+ initializeNewSession();
+ invariant(sep()->start());
+ _threadPool->startup();
}
- /**
- * This function asserts that the SessionWorkflowTest has not yet observed a state change.
- *
- * Note that this function does not guarantee that it will not observe a state change in the
- * future.
- */
- void assertNoSessionState() {
- if (auto maybeState = _stateQueue.tryPop()) {
- FAIL("The queue is not empty, state: ") << *maybeState;
- }
+ void tearDown() override {
+ ScopeGuard guard = [&] { Base::tearDown(); };
+ // Normal shutdown is a noop outside of ASAN.
+ invariant(sep()->shutdownAndWait(Seconds{10}));
+ _threadPool->shutdown();
+ _threadPool->join();
}
- /**
- * This function stores an external response to be delivered out of line to the
- * SessionWorkflow.
- */
- void setResult(SessionState state, ResultValue result) {
- auto lk = stdx::lock_guard(_stateResult->mutex);
- invariant(state == SessionState::kPoll || state == SessionState::kSource ||
- state == SessionState::kProcess || state == SessionState::kSink);
- _stateResult->result = std::move(result);
- _stateResult->state = state;
- _stateResult->cv.notify_one();
+ void initializeNewSession() {
+ _session = std::make_shared<CustomMockSession>(this);
}
- /**
- * This function makes a generic result appropriate for a successful state change given
- * SessionState and RequestKind.
- */
- ResultValue makeGenericResult(SessionState state, RequestKind kind) {
- ResultValue result;
- switch (state) {
- case SessionState::kPoll:
- case SessionState::kSink:
- result.setStatus(Status::OK());
- break;
- case SessionState::kSource: {
- Message message = _makeIndexedBson();
- switch (kind) {
- case RequestKind::kExhaust:
- OpMsg::setFlag(&message, OpMsg::kExhaustSupported);
- break;
- case RequestKind::kDefault:
- case RequestKind::kMoreToCome:
- break;
- }
- result.setMessage(StatusWith<Message>(message));
- } break;
- case SessionState::kProcess: {
- DbResponse response;
- switch (kind) {
- case RequestKind::kDefault:
- response.response = _makeIndexedBson();
- break;
- case RequestKind::kExhaust:
- response.response = _makeIndexedBson();
- response.shouldRunAgainForExhaust = true;
- break;
- case RequestKind::kMoreToCome:
- break;
- }
- result.setResponse(response);
- } break;
- default:
- invariant(
- false,
- "Unable to make generic result for this state: {}"_format(toString(state)));
- }
- return result;
+ /** Waits for the current Session and SessionWorkflow to end. */
+ void joinSessions() {
+ ASSERT(sep()->waitForNoSessions(Seconds{1}));
}
- /**
- * Initialize a new Session.
- */
- void initNewSession();
-
- /**
- * Launch a SessionWorkflow for the current session.
- */
- void startSession();
+ /** Launches a SessionWorkflow for the current session. */
+ void startSession() {
+ LOGV2(6742613, "Starting session");
+ sep()->startSession(_session);
+ }
- /**
- * Wait for the current Session and SessionWorkflow to end.
- */
- void joinSession();
+ MockServiceEntryPoint* sep() {
+ return checked_cast<MockServiceEntryPoint*>(getServiceContext()->getServiceEntryPoint());
+ }
/**
- * Mark the session as no longer connected.
+ * Installs an arbitrary one-shot mock handler callback for the next event.
+ * The next incoming mock event will invoke this callback and destroy it.
*/
- void endSession() {
- auto lk = stdx::lock_guard(_stateResult->mutex);
- if (_stateResult->isConnected.swap(false)) {
- LOGV2(5014101, "Ending session");
- _stateResult->cv.notify_one();
- }
+ void injectMockResponse(unique_function<Result(Event)> cb) {
+ _expect.push(std::move(cb));
}
/**
- * Start a brand new session, run the given function, and then join the session.
+ * Wrapper around `injectMockResponse`. Installs a handler for the `expected`
+ * mock event, that will return the specified `result`.
+ * Returns a `Future` that is fulfilled when that mock event occurs.
*/
- template <typename F>
- void runWithNewSession(F&& func) {
- initNewSession();
- startSession();
-
- auto firstState = popSessionState();
- ASSERT(firstState == SessionState::kSource || firstState == SessionState::kPoll)
- << "State was instead: " << toString(firstState);
-
- std::forward<F>(func)();
-
- joinSession();
- }
-
- void terminateViaServiceEntryPoint();
-
- bool isConnected() const {
- return _stateResult->isConnected.load();
+ Future<void> asyncExpect(Event expected, Result r) {
+ auto pf = std::make_shared<PromiseAndFuture<void>>();
+ injectMockResponse([r = std::move(r), expected, pf](Event event) mutable {
+ invariant(event == expected,
+ "Expected {}, but got {}"_format(toString(expected), toString(event)));
+ pf->promise.emplaceValue();
+ return std::move(r);
+ });
+ return std::move(pf->future);
}
- int onClientDisconnectCalledTimes() const {
- return _onClientDisconnectCalled;
+ void expect(Event expected, Result r) {
+ return asyncExpect(expected, std::move(r)).get();
}
private:
- /**
- * Generate a resonably generic BSON with an id for use in debugging.
- */
- static Message _makeIndexedBson() {
- auto bob = BSONObjBuilder();
- static auto nextId = AtomicWord<int>{0};
- bob.append("id", nextId.fetchAndAdd(1));
-
- auto omb = OpMsgBuilder{};
- omb.setBody(bob.obj());
- return omb.finish();
- }
+ class MockExpectationSlot {
+ public:
+ void push(unique_function<Result(Event)> cb) {
+ stdx::lock_guard lk{_mutex};
+ invariant(!_cb);
+ _cb = std::move(cb);
+ _cv.notify_one();
+ }
- /**
- * Use an external result to mock handling a request.
- */
- StatusWith<DbResponse> _processRequest(OperationContext* opCtx, const Message&) noexcept {
- _stateQueue.push(SessionState::kProcess);
-
- auto result = [&]() -> StatusWith<DbResponse> {
- auto lk = stdx::unique_lock(_stateResult->mutex);
- _stateResult->cv.wait(lk, [this] {
- return (_stateResult->result && _stateResult->state == SessionState::kProcess) ||
- !isConnected();
- });
-
- if (!isConnected()) {
- return kClosedSessionError;
- }
+ unique_function<Result(Event)> pop() {
+ stdx::unique_lock lk{_mutex};
+ _cv.wait(lk, [&] { return !!_cb; });
+ return std::exchange(_cb, {});
+ }
- invariant(_stateResult->result);
- return std::exchange(_stateResult->result, {}).getResponse();
- }();
+ private:
+ mutable Mutex _mutex;
+ stdx::condition_variable _cv;
+ unique_function<Result(Event)> _cb;
+ };
- LOGV2(5014100, "Handled request", "error"_attr = result.getStatus());
+ class CustomMockSession : public CallbackMockSession {
+ public:
+ explicit CustomMockSession(SessionWorkflowTest* fixture) {
+ endCb = [this] { *_connected = false; };
+ isConnectedCb = [this] { return *_connected; };
+ waitForDataCb = [fixture] {
+ return fixture->_onMockEvent<Status>(Event::kSessionWaitForData);
+ };
+ sourceMessageCb = [fixture] {
+ return fixture->_onMockEvent<StatusWith<Message>>(Event::kSessionSourceMessage);
+ };
+ sinkMessageCb = [fixture](Message) {
+ return fixture->_onMockEvent<Status>(Event::kSessionSinkMessage);
+ };
+ // The async variants will just run the same callback on `_threadPool`.
+ auto async = [fixture](auto cb) {
+ return ExecutorFuture<void>(fixture->_threadPool).then(cb).unsafeToInlineFuture();
+ };
+ asyncWaitForDataCb = [=, cb = waitForDataCb] { return async([cb] { return cb(); }); };
+ asyncSourceMessageCb = [=, cb = sourceMessageCb](const BatonHandle&) {
+ return async([cb] { return cb(); });
+ };
+ asyncSinkMessageCb = [=, cb = sinkMessageCb](Message m, const BatonHandle&) {
+ return async([cb, m = std::move(m)]() mutable { return cb(std::move(m)); });
+ };
+ }
- return result;
- }
+ private:
+ synchronized_value<bool> _connected{true}; // Born in the connected state.
+ };
- Future<DbResponse> _handleRequest(OperationContext* opCtx, const Message& request) noexcept {
- auto [p, f] = makePromiseFuture<DbResponse>();
- ExecutorFuture<void>(_threadPool)
- .then([this, opCtx, &request, p = std::move(p)]() mutable {
- p.setWith([&] { return _processRequest(opCtx, request); });
- })
- .getAsync([](auto&&) {});
- return std::move(f);
+ std::shared_ptr<ThreadPool> _makeThreadPool() {
+ ThreadPool::Options options{};
+ options.poolName = "SessionWorkflowTest";
+ return std::make_shared<ThreadPool>(std::move(options));
}
/**
- * Use an external result to mock polling for data and observe the state.
+ * Simulates an async command implemented under the borrowed thread model.
+ * The promise must be fulfilled while holding ClientStrand.
*/
- Status _waitForData() {
- _stateQueue.push(SessionState::kPoll);
-
- auto result = [&]() -> Status {
- auto lk = stdx::unique_lock(_stateResult->mutex);
- _stateResult->cv.wait(lk, [this] {
- return (_stateResult->result && _stateResult->state == SessionState::kPoll) ||
- !isConnected();
- });
-
- if (!isConnected()) {
- return kClosedSessionError;
- }
-
- invariant(_stateResult->result);
- return std::exchange(_stateResult->result, {}).getStatus();
- }();
-
- LOGV2(5014102, "Finished waiting for data", "error"_attr = result);
- return result;
+ Future<DbResponse> _asyncHandleRequest(OperationContext* opCtx,
+ unique_function<Future<DbResponse>()> cb) {
+ auto pf = PromiseAndFuture<DbResponse>();
+ ExecutorFuture<void>(_threadPool)
+ .then([strand = ClientStrand::get(opCtx->getClient()),
+ cb = std::move(cb),
+ p = std::move(pf.promise)]() mutable {
+ strand->run([&] { p.setWith([&] { return cb(); }); });
+ })
+ .getAsync([](auto&&) {});
+ return std::move(pf.future);
}
- /**
- * Use an external result to mock reading data and observe the state.
- */
- StatusWith<Message> _sourceMessage() {
- _stateQueue.push(SessionState::kSource);
-
- auto result = [&]() -> StatusWith<Message> {
- auto lk = stdx::unique_lock(_stateResult->mutex);
- _stateResult->cv.wait(lk, [this] {
- return (_stateResult->result && _stateResult->state == SessionState::kSource) ||
- !isConnected();
- });
-
- if (!isConnected()) {
- return kClosedSessionError;
- }
-
- invariant(_stateResult->result);
- return std::exchange(_stateResult->result, {}).getMessage();
- }();
-
- LOGV2(5014103, "Sourced message", "error"_attr = result.getStatus());
-
- return result;
+ std::unique_ptr<MockServiceEntryPoint> _makeServiceEntryPoint(ServiceContext* sc) {
+ auto sep = std::make_unique<MockServiceEntryPoint>(sc);
+ sep->handleRequestCb = [=](OperationContext* opCtx, const Message&) {
+ auto cb = [this] { return _onMockEvent<Future<DbResponse>>(Event::kSepHandleRequest); };
+ if (!gInitialUseDedicatedThread)
+ return _asyncHandleRequest(opCtx, std::move(cb));
+ return cb();
+ };
+ sep->onEndSessionCb = [=](const SessionHandle&) {
+ _onMockEvent<void>(Event::kSepEndSession);
+ };
+ sep->derivedOnClientDisconnectCb = [&](Client*) {};
+ return sep;
}
/**
- * Use an external result to mock writing data and observe the state.
+ * Called by all mock functions to notify the main thread and get a value with which to respond.
+ * The mock function call is identified by an `event`. If there isn't already an expectation,
+ * the mock object will wait for one to be injected via a call to `injectMockResponse`.
*/
- Status _sinkMessage(Message message) {
- _stateQueue.push(SessionState::kSink);
-
- auto result = [&]() -> Status {
- auto lk = stdx::unique_lock(_stateResult->mutex);
- _stateResult->cv.wait(lk, [this] {
- return (_stateResult->result && _stateResult->state == SessionState::kSink) ||
- !isConnected();
- });
-
- if (!isConnected()) {
- return kClosedSessionError;
- }
-
- invariant(_stateResult->result);
- return std::exchange(_stateResult->result, {}).getStatus();
- }();
-
- LOGV2(5014104, "Sunk message", "error"_attr = result);
- return result;
- }
-
- Future<void> _asyncWaitForData() noexcept {
- return ExecutorFuture<void>(_threadPool)
- .then([this] { return _waitForData(); })
- .unsafeToInlineFuture();
- }
-
- Future<Message> _asyncSourceMessage() noexcept {
- return ExecutorFuture<void>(_threadPool)
- .then([this] { return _sourceMessage(); })
- .unsafeToInlineFuture();
- }
-
- Future<void> _asyncSinkMessage(Message message) noexcept {
- return ExecutorFuture<void>(_threadPool)
- .then([this, message = std::move(message)]() mutable {
- return _sinkMessage(std::move(message));
- })
- .unsafeToInlineFuture();
+ template <typename Target>
+ Target _onMockEvent(Event event) {
+ LOGV2_DEBUG(6742616, 2, "Mock event arrived", "event"_attr = event);
+ Result r = _expect.pop()(event);
+ LOGV2_DEBUG(6742618, 2, "Responding", "event"_attr = event, "result"_attr = toString(r));
+ if constexpr (std::is_same_v<Target, void>) {
+ std::move(r).consumeAs<stdx::monostate>();
+ return;
+ } else {
+ return std::move(r).consumeAs<Target>();
+ }
}
- MockServiceEntryPoint* _sep;
+ MockExpectationSlot _expect;
+ std::shared_ptr<CustomMockSession> _session;
+ std::shared_ptr<ThreadPool> _threadPool = _makeThreadPool();
+};
- const std::shared_ptr<ThreadPool> _threadPool = makeThreadPool();
+TEST_F(SessionWorkflowTest, StartThenEndSession) {
+ startSession();
+ expect(Event::kSessionSourceMessage, Result{kClosedSessionError});
+ expect(Event::kSepEndSession, Result{});
+ joinSessions();
+}
- std::unique_ptr<StateResult> _stateResult;
+TEST_F(SessionWorkflowTest, OneNormalCommand) {
+ startSession();
+ expect(Event::kSessionSourceMessage, Result{makeOpMsg()});
+ expect(Event::kSepHandleRequest, Result{makeResponse(makeOpMsg())});
+ expect(Event::kSessionSinkMessage, Result{Status::OK()});
+ expect(Event::kSessionSourceMessage, Result{kClosedSessionError});
+ expect(Event::kSepEndSession, Result{});
+ joinSessions();
+}
- std::shared_ptr<CallbackMockSession> _session;
- SingleProducerSingleConsumerQueue<SessionState> _stateQueue;
+TEST_F(SessionWorkflowTest, OnClientDisconnectCalledOnCleanup) {
+ int disconnects = 0;
+ sep()->derivedOnClientDisconnectCb = [&](Client*) { ++disconnects; };
+ startSession();
+ ASSERT_EQ(disconnects, 0);
+ expect(Event::kSessionSourceMessage, Result{kClosedSessionError});
+ expect(Event::kSepEndSession, Result{});
+ joinSessions();
+ ASSERT_EQ(disconnects, 1);
+}
- int _onClientDisconnectCalled{0};
-};
+/** Repro of one formerly troublesome scenario generated by the StepRunner test below. */
+TEST_F(SessionWorkflowTest, MoreToComeDisconnectAtSource3) {
+ startSession();
+ // One more-to-come command, yields an empty response per wire protocol
+ expect(Event::kSessionSourceMessage, Result{setMoreToCome(makeOpMsg())});
+ expect(Event::kSepHandleRequest, Result{makeResponse({})});
+ // Another message from session, this time a normal RPC.
+ expect(Event::kSessionSourceMessage, Result{makeOpMsg()});
+ expect(Event::kSepHandleRequest, Result{makeResponse(makeOpMsg())});
+ expect(Event::kSessionSinkMessage, Result{Status::OK()});
+ // Client disconnects while we're waiting for their next command.
+ expect(Event::kSessionSourceMessage, Result{kShutdownError});
+ expect(Event::kSepEndSession, Result{});
+ joinSessions();
+}
/**
- * This class iterates over the potential methods of failure for a set of steps.
+ * Check the behavior of an interrupted "getMore" exhaust command.
+ * SessionWorkflow looks specifically for the "getMore" command name to trigger
+ * this cleanup.
*/
-class StepRunner {
- /**
- * This is a simple data structure describing the external response for one state in the
- * session workflow.
- */
- struct Step {
- SessionState state;
- RequestKind kind;
+TEST_F(SessionWorkflowTest, CleanupFromGetMore) {
+ // For this test, SEP handleRequest will stash a copy of the request.
+ // When a handleRequest has been observed, the test can check this variable
+ // to peek at it.
+ auto stashedRequest = std::make_shared<synchronized_value<Message>>();
+ sep()->handleRequestCb = [orig = sep()->handleRequestCb,
+ stashedRequest](OperationContext* opCtx, const Message& msg) {
+ **stashedRequest = msg;
+ return orig(opCtx, msg);
};
- using StepList = std::vector<Step>;
-public:
- StepRunner(SessionWorkflowTest* fixture) : _fixture{fixture} {}
- ~StepRunner() {
- invariant(_runCount > 0, "StepRunner expects to be run at least once");
- }
+ initializeNewSession();
+ startSession();
- /**
- * Given a FailureCondition, cause an external result to be delivered that is appropriate for
- * the given state and request kind.
- */
- SessionState doGenericStep(const Step& step, FailureCondition fail) {
- switch (fail) {
- case FailureCondition::kNone: {
- _fixture->setResult(step.state, _fixture->makeGenericResult(step.state, step.kind));
- } break;
- case FailureCondition::kTerminate: {
- _fixture->terminateViaServiceEntryPoint();
- // We expect that the session will be disconnected via the SEP, no need to set any
- // result.
- } break;
- case FailureCondition::kDisconnect: {
- _fixture->endSession();
- // We expect that the session will be disconnected directly, no need to set any
- // result.
- } break;
- case FailureCondition::kNetwork: {
- _fixture->setResult(step.state, ResultValue(kNetworkError));
- } break;
- case FailureCondition::kShutdown: {
- _fixture->setResult(step.state, ResultValue(kShutdownError));
- } break;
- case FailureCondition::kArbitrary: {
- _fixture->setResult(step.state, ResultValue(kArbitraryError));
- } break;
- };
+ auto makeGetMoreRequest = [](int64_t cursorId) {
+ OpMsgBuilder omb;
+ omb.setBody(BSONObjBuilder{}
+ .append("getMore", cursorId)
+ .append("collection", "testColl")
+ .append("$db", "testDb")
+ .obj());
+ return setExhaustSupported(omb.finish());
+ };
- return _fixture->popSessionState();
- }
+ auto makeGetMoreResponse = [] {
+ DbResponse response;
+ OpMsgBuilder omb;
+ omb.setBody(BSONObjBuilder{}.append("id", int64_t{0}).obj());
+ response.response = omb.finish();
+ return response;
+ };
- /**
- * Mark an additional expected state in the session workflow.
- */
- void expectNextState(SessionState state, RequestKind kind) {
- auto step = Step{};
- step.state = state;
- step.kind = kind;
- _steps.emplace_back(std::move(step));
+ // Produce the condition of having an active `getMore` exhaust command.
+ expect(Event::kSessionSourceMessage, Result{makeGetMoreRequest(123)});
+ expect(Event::kSepHandleRequest, Result{setExhaust(makeGetMoreResponse())});
+
+ // Simulate a disconnect during the session sink call. The cleanup of
+ // exhaust resources happens when the session disconnects. So after the send
+ // of the "getMore" response returns the injected error, expect the
+ // SessionWorkflow to issue a fire-and-forget "killCursors".
+ expect(Event::kSessionSinkMessage, Result{kClosedSessionError});
+ {
+ auto killCursors = std::make_shared<Notification<Message>>();
+ auto unpause = std::make_shared<Notification<void>>();
+ ScopeGuard unpauseGuard = [&] { unpause->set(); };
+ injectMockResponse([killCursors, unpause, stashedRequest](Event e) {
+ invariant(e == Event::kSepHandleRequest);
+ killCursors->set(**stashedRequest);
+ unpause->get(); // Pause while main thread examines killCursors.
+ return Result{makeResponse({})}; // fire-and-forget disallows response.
+ });
+ ASSERT_EQ(OpMsgRequest::parse(killCursors->get()).getCommandName(), "killCursors"_sd);
}
+ // Because they're fire-and-forget commands, we will only observe `handleRequest`
+ // calls to the SEP for the cleanup "killCursors", and the next thing to happen
+ // will be the end of the session.
+ expect(Event::kSepEndSession, Result{});
+ joinSessions();
+}
+
+class StepRunnerSessionWorkflowTest : public SessionWorkflowTest {
+public:
/**
- * Mark the final expected state in the session workflow.
+ * Concisely encode the ways this test might respond to mock events.
+ * The OK Result contents depend on which Event it's responding to.
*/
- void expectFinalState(SessionState state) {
- _finalState = state;
+ enum class Action {
+ kDefault, // OK result for a basic (request and response) command.
+ kExhaust, // OK result for a exhuast command.
+ kMoreToCome, // OK result for a fire-and-forget command.
+
+ kErrTerminate, // External termination via the ServiceEntryPoint.
+ kErrDisconnect, // Socket disconnection by peer.
+ kErrNetwork, // Unspecified network failure (ala host unreachable).
+ kErrShutdown, // System shutdown.
+ kErrArbitrary, // An arbitrary error that does not fall under the other conditions.
+ };
+
+ friend StringData toString(Action k) {
+ return findEnumName({{Action::kDefault, "Default"_sd},
+ {Action::kExhaust, "Exhaust"_sd},
+ {Action::kMoreToCome, "MoreToCome"_sd},
+ {Action::kErrTerminate, "ErrTerminate"_sd},
+ {Action::kErrDisconnect, "ErrDisconnect"_sd},
+ {Action::kErrNetwork, "ErrNetwork"_sd},
+ {Action::kErrShutdown, "ErrShutdown"_sd},
+ {Action::kErrArbitrary, "ErrArbitrary"_sd}},
+ k);
}
/**
- * Run a set of variations on the steps, failing further and further along the way.
+ * Given a list of steps, performs a series of tests exercising that list.
+ *
+ * The `run()` function performs a set of variations on the steps, failing
+ * further and further along the way, with different errors tried at each
+ * step.
+ *
+ * It first sets a baseline by running all the steps without injecting
+ * failure. Then it checks each failure condition for each step in the
+ * sequence. For example, if we have steps[NS] and failure conditions
+ * fails[NF], it will run these pseudocode trials:
+ *
+ * // First, no errors.
+ * { steps[0](OK); steps[1](OK); ... steps[NS-1](OK); }
*
- * This function first runs all the steps without failure to set a baseline. It then checks each
- * failure condition for each step going forward. For example, if we have steps Source, Sink,
- * and End and failure conditions None, Network, and Terminate, it will run these variations in
- * this order:
- * [(Source, None), (Sink, None), (End)]
- * [(Source, Network), (End)]
- * [(Source, Terminate), (End)]
- * [(Source, None), (Sink, Network), (End)]
- * [(Source, None), (Sink, Terminate), (End)]
+ * // Inject each kind of failure at steps[0].
+ * { steps[0](fails[0]); }
+ * { steps[0](fails[1]); }
+ * ... and so on for fails[NF].
+ *
+ * // Now let steps[0] succeed, but inject each kind of failure at steps[1].
+ * { steps[0](OK); steps[1](fails[0]); }
+ * { steps[0](OK); steps[1](fails[1]); }
+ * ... and so on for fails[NF].
+ *
+ * // And so on the NS steps....
*/
- void run() {
- invariant(_finalState);
-
- auto getExpectedPostState = [&](auto iter) {
- auto nextIter = ++iter;
- if (nextIter == _steps.end()) {
- return *_finalState;
- }
- return nextIter->state;
+ class RunAllErrorsAtAllSteps {
+ public:
+ /** The set of failures is hardcoded. */
+ static constexpr std::array fails{Action::kErrTerminate,
+ Action::kErrDisconnect,
+ Action::kErrNetwork,
+ Action::kErrShutdown,
+ Action::kErrArbitrary};
+
+ /** Encodes a response to `event` by taking `action`. */
+ struct Step {
+ Event event;
+ Action action = Action::kDefault;
};
- // Do one entirely clean run.
- LOGV2(5014106, "Running success case");
- _fixture->runWithNewSession([&] {
- for (auto iter = _steps.begin(); iter != _steps.end(); ++iter) {
- ASSERT_EQ(doGenericStep(*iter, FailureCondition::kNone),
- getExpectedPostState(iter));
+ // The final step is assumed to have `kErrDisconnect` as an action,
+ // yielding an implied `kEnd` step.
+ RunAllErrorsAtAllSteps(SessionWorkflowTest* fixture, std::deque<Step> steps)
+ : _fixture{fixture}, _steps{[&, at = steps.size() - 1] {
+ return _appendTermination(std::move(steps), at, Action::kErrDisconnect);
+ }()} {}
+
+ /**
+ * Run all of the trials specified by the constructor.
+ */
+ void run() {
+ const std::deque<Step> baseline(_steps.begin(), _steps.end());
+ LOGV2(5014106, "Running one entirely clean run");
+ _runSteps(baseline);
+ // Incrementally push forward the step where we fail.
+ for (size_t failAt = 0; failAt + 1 < baseline.size(); ++failAt) {
+ LOGV2(6742614, "Injecting failures", "failAt"_attr = failAt);
+ for (auto fail : fails)
+ _runSteps(_appendTermination(baseline, failAt, fail));
}
+ }
- _fixture->endSession();
- ASSERT_EQ(_fixture->popSessionState(), SessionState::kEnd);
- });
+ private:
+ /**
+ * Returns a new steps sequence, formed by copying the specified `q`, and
+ * modifying the copy to be terminated with a `fail` at the `failAt` index.
+ */
+ std::deque<Step> _appendTermination(std::deque<Step> q, size_t failAt, Action fail) const {
+ LOGV2(
+ 6742617, "appendTermination", "fail"_attr = toString(fail), "failAt"_attr = failAt);
+ invariant(failAt < q.size());
+ q.erase(q.begin() + failAt + 1, q.end());
+ q.back().action = fail;
+ q.push_back({Event::kSepEndSession});
+ return q;
+ }
- const auto failList = std::vector<FailureCondition>{FailureCondition::kTerminate,
- FailureCondition::kDisconnect,
- FailureCondition::kNetwork,
- FailureCondition::kShutdown,
- FailureCondition::kArbitrary};
+ template <typename T>
+ void _dumpTransitions(const T& q) {
+ BSONArrayBuilder bab;
+ for (auto&& t : q) {
+ BSONObjBuilder{bab.subobjStart()}
+ .append("event", toString(t.event))
+ .append("action", toString(t.action));
+ }
+ LOGV2(6742615, "Run transitions", "transitions"_attr = bab.arr());
+ }
- for (auto failIter = _steps.begin(); failIter != _steps.end(); ++failIter) {
- // Incrementally push forward the step where we fail.
- for (auto fail : failList) {
- LOGV2(5014105,
- "Running failure case",
- "failureCase"_attr = fail,
- "sessionState"_attr = failIter->state,
- "requestKind"_attr = failIter->kind);
-
- _fixture->runWithNewSession([&] {
- auto iter = _steps.begin();
- for (; iter != failIter; ++iter) {
- // Run through each step until our point of failure with
- // FailureCondition::kNone.
- ASSERT_EQ(doGenericStep(*iter, FailureCondition::kNone),
- getExpectedPostState(iter))
- << "Current state: (" << iter->state << ", " << iter->kind << ")";
+ /** Makes a result for a successful event. */
+ Result _successResult(Event event, Action action) {
+ switch (event) {
+ case Event::kSepEndSession:
+ return Result{};
+ case Event::kSessionWaitForData:
+ case Event::kSessionSinkMessage:
+ return Result{Status::OK()};
+ case Event::kSessionSourceMessage: {
+ Message m = makeOpMsg();
+ if (action == Action::kExhaust)
+ m = setExhaustSupported(m);
+ return Result{StatusWith{std::move(m)}};
+ }
+ case Event::kSepHandleRequest:
+ switch (action) {
+ case Action::kDefault:
+ return Result{StatusWith{makeResponse(makeOpMsg())}};
+ case Action::kExhaust:
+ return Result{StatusWith{setExhaust(makeResponse(makeOpMsg()))}};
+ case Action::kMoreToCome:
+ return Result{StatusWith{DbResponse{}}};
+ default:
+ MONGO_UNREACHABLE;
}
-
- // Finally fail on a given step.
- ASSERT_EQ(doGenericStep(*iter, fail), SessionState::kEnd);
- });
}
+ MONGO_UNREACHABLE;
}
- _runCount += 1;
- }
-
-private:
- SessionWorkflowTest* const _fixture;
-
- boost::optional<SessionState> _finalState;
- StepList _steps;
+ void injectStep(const Step& t) {
+ LOGV2_DEBUG(
+ 6872301, 3, "Inject step", "event"_attr = t.event, "action"_attr = t.action);
+ switch (t.action) {
+ case Action::kErrTerminate: {
+ // Has a side effect of simulating a ServiceEntryPoint shutdown
+ // before responding with a shutdown error.
+ auto pf = std::make_shared<PromiseAndFuture<void>>();
+ _fixture->injectMockResponse([this, t, pf](Event e) {
+ invariant(e == t.event);
+ _fixture->sep()->endAllSessionsNoTagMask();
+ pf->promise.emplaceValue();
+ return Result{kShutdownError};
+ });
+ pf->future.get();
+ } break;
+ case Action::kErrDisconnect:
+ _fixture->expect(t.event, Result{kClosedSessionError});
+ break;
+ case Action::kErrNetwork:
+ _fixture->expect(t.event, Result(kNetworkError));
+ break;
+ case Action::kErrShutdown:
+ _fixture->expect(t.event, Result(kShutdownError));
+ break;
+ case Action::kErrArbitrary:
+ _fixture->expect(t.event, Result(kArbitraryError));
+ break;
+ case Action::kDefault:
+ case Action::kExhaust:
+ case Action::kMoreToCome:
+ _fixture->expect(t.event, _successResult(t.event, t.action));
+ break;
+ }
+ }
- // This variable is currently used as a post-condition to make sure that the StepRunner has been
- // run. In the current form, it could be a boolean. That said, if you need to stress test the
- // SessionWorkflow, you will want to check this variable to make sure you have run as many
- // times as you expect.
- size_t _runCount = 0;
-};
+ /** Start a new session, run the `steps` sequence, and join the session. */
+ void _runSteps(std::deque<Step> q) {
+ _dumpTransitions(q);
+ _fixture->initializeNewSession();
+ _fixture->startSession();
+ for (; !q.empty(); q.pop_front())
+ injectStep(q.front());
+ _fixture->joinSessions();
+ }
-void SessionWorkflowTest::initNewSession() {
- assertNoSessionState();
-
- _session = std::make_shared<CallbackMockSession>();
- _session->endCb = [&] { endSession(); };
- _session->isConnectedCb = [&] { return isConnected(); };
- _session->waitForDataCb = [&] { return _waitForData(); };
- _session->sourceMessageCb = [&] { return _sourceMessage(); };
- _session->sinkMessageCb = [&](Message message) { return _sinkMessage(std::move(message)); };
- _session->asyncWaitForDataCb = [&] { return _asyncWaitForData(); };
- _session->asyncSourceMessageCb = [&](const BatonHandle&) { return _asyncSourceMessage(); };
- _session->asyncSinkMessageCb = [&](Message message, const BatonHandle&) {
- return _asyncSinkMessage(std::move(message));
+ SessionWorkflowTest* _fixture;
+ std::deque<Step> _steps;
};
- _stateResult->isConnected.store(true);
-}
-
-void SessionWorkflowTest::joinSession() {
- ASSERT(_sep->waitForNoSessions(Seconds{1}));
-
- assertNoSessionState();
-}
-void SessionWorkflowTest::startSession() {
- _sep->startSession(_session);
-}
-
-void SessionWorkflowTest::terminateViaServiceEntryPoint() {
- _sep->endAllSessionsNoTagMask();
-}
-
-void SessionWorkflowTest::setUp() {
- ServiceContextTest::setUp();
-
- auto sep = std::make_unique<MockServiceEntryPoint>(getServiceContext());
- sep->handleRequestCb = [&](OperationContext* opCtx, const Message& request) {
- return _handleRequest(opCtx, request);
- };
- sep->onEndSessionCb = [&](const transport::SessionHandle& session) {
- invariant(session == _session);
- _stateQueue.push(SessionState::kEnd);
- };
- sep->derivedOnClientDisconnectCb = [&](Client*) { ++_onClientDisconnectCalled; };
- _sep = sep.get();
- getServiceContext()->setServiceEntryPoint(std::move(sep));
- invariant(_sep->start());
-
- _threadPool->startup();
-
- _stateResult = std::make_unique<StateResult>();
-}
-
-void SessionWorkflowTest::tearDown() {
- ON_BLOCK_EXIT([&] { ServiceContextTest::tearDown(); });
-
- endSession();
+ void runSteps(std::deque<RunAllErrorsAtAllSteps::Step> steps) {
+ RunAllErrorsAtAllSteps{this, steps}.run();
+ }
- // Normal shutdown is a noop outside of ASAN.
- invariant(_sep->shutdownAndWait(Seconds{10}));
+ std::deque<RunAllErrorsAtAllSteps::Step> defaultLoop() const {
+ return {
+ {Event::kSessionSourceMessage},
+ {Event::kSepHandleRequest},
+ {Event::kSessionSinkMessage},
+ {Event::kSessionSourceMessage},
+ };
+ }
- _threadPool->shutdown();
- _threadPool->join();
-}
+ std::deque<RunAllErrorsAtAllSteps::Step> exhaustLoop() const {
+ return {
+ {Event::kSessionSourceMessage, Action::kExhaust},
+ {Event::kSepHandleRequest, Action::kExhaust},
+ {Event::kSessionSinkMessage},
+ {Event::kSepHandleRequest},
+ {Event::kSessionSinkMessage},
+ {Event::kSessionSourceMessage},
+ };
+ }
-template <bool useDedicatedThread>
-class DedicatedThreadOverrideTest : public SessionWorkflowTest {
- ScopedValueOverride<bool> _svo{gInitialUseDedicatedThread, useDedicatedThread};
+ std::deque<RunAllErrorsAtAllSteps::Step> moreToComeLoop() const {
+ return {
+ {Event::kSessionSourceMessage, Action::kMoreToCome},
+ {Event::kSepHandleRequest, Action::kMoreToCome},
+ {Event::kSessionSourceMessage},
+ {Event::kSepHandleRequest},
+ {Event::kSessionSinkMessage},
+ {Event::kSessionSourceMessage},
+ };
+ }
};
-using SessionWorkflowWithBorrowedThreadsTest = DedicatedThreadOverrideTest<false>;
-using SessionWorkflowWithDedicatedThreadsTest = DedicatedThreadOverrideTest<true>;
-
-TEST_F(SessionWorkflowTest, StartThenEndSession) {
- initNewSession();
- startSession();
-
- ASSERT_EQ(popSessionState(), SessionState::kSource);
-
- endSession();
-}
-
-TEST_F(SessionWorkflowTest, EndBeforeStartSession) {
- initNewSession();
- endSession();
- startSession();
-}
-
-TEST_F(SessionWorkflowTest, OnClientDisconnectCalledOnCleanup) {
- initNewSession();
- startSession();
- ASSERT_EQ(popSessionState(), SessionState::kSource);
- ASSERT_EQ(onClientDisconnectCalledTimes(), 0);
- endSession();
- ASSERT_EQ(popSessionState(), SessionState::kEnd);
- joinSession();
- ASSERT_EQ(onClientDisconnectCalledTimes(), 1);
-}
+class SessionWorkflowWithDedicatedThreadsTest : public StepRunnerSessionWorkflowTest {
+ ScopedValueOverride<bool> _svo{gInitialUseDedicatedThread, true};
+};
TEST_F(SessionWorkflowWithDedicatedThreadsTest, DefaultLoop) {
- auto runner = StepRunner(this);
-
- runner.expectNextState(SessionState::kSource, RequestKind::kDefault);
- runner.expectNextState(SessionState::kProcess, RequestKind::kDefault);
- runner.expectNextState(SessionState::kSink, RequestKind::kDefault);
- runner.expectFinalState(SessionState::kSource);
-
- runner.run();
+ runSteps(defaultLoop());
}
TEST_F(SessionWorkflowWithDedicatedThreadsTest, ExhaustLoop) {
- auto runner = StepRunner(this);
-
- runner.expectNextState(SessionState::kSource, RequestKind::kExhaust);
- runner.expectNextState(SessionState::kProcess, RequestKind::kExhaust);
- runner.expectNextState(SessionState::kSink, RequestKind::kExhaust);
- runner.expectNextState(SessionState::kProcess, RequestKind::kDefault);
- runner.expectNextState(SessionState::kSink, RequestKind::kDefault);
- runner.expectFinalState(SessionState::kSource);
-
- runner.run();
+ runSteps(exhaustLoop());
}
TEST_F(SessionWorkflowWithDedicatedThreadsTest, MoreToComeLoop) {
- auto runner = StepRunner(this);
-
- runner.expectNextState(SessionState::kSource, RequestKind::kMoreToCome);
- runner.expectNextState(SessionState::kProcess, RequestKind::kMoreToCome);
- runner.expectNextState(SessionState::kSource, RequestKind::kDefault);
- runner.expectNextState(SessionState::kProcess, RequestKind::kDefault);
- runner.expectNextState(SessionState::kSink, RequestKind::kDefault);
- runner.expectFinalState(SessionState::kSource);
-
- runner.run();
+ runSteps(moreToComeLoop());
}
-TEST_F(SessionWorkflowWithBorrowedThreadsTest, DefaultLoop) {
- auto runner = StepRunner(this);
+class SessionWorkflowWithBorrowedThreadsTest : public StepRunnerSessionWorkflowTest {
+public:
+ /**
+ * Under the borrowed thread model, the steps are the same as for dedicated thread model,
+ * except that Session sourceMessage events are preceded by Session waitForData events.
+ */
+ std::deque<RunAllErrorsAtAllSteps::Step> convertStepsToBorrowed(
+ std::deque<RunAllErrorsAtAllSteps::Step> q) {
+ for (auto iter = q.begin(); iter != q.end(); ++iter)
+ if (iter->event == Event::kSessionSourceMessage)
+ iter = std::next(q.insert(iter, {Event::kSessionWaitForData}));
+ return q;
+ }
- runner.expectNextState(SessionState::kPoll, RequestKind::kDefault);
- runner.expectNextState(SessionState::kSource, RequestKind::kDefault);
- runner.expectNextState(SessionState::kProcess, RequestKind::kDefault);
- runner.expectNextState(SessionState::kSink, RequestKind::kDefault);
- runner.expectFinalState(SessionState::kPoll);
+private:
+ ScopedValueOverride<bool> _svo{gInitialUseDedicatedThread, false};
+};
- runner.run();
+TEST_F(SessionWorkflowWithBorrowedThreadsTest, DefaultLoop) {
+ runSteps(convertStepsToBorrowed(defaultLoop()));
}
TEST_F(SessionWorkflowWithBorrowedThreadsTest, ExhaustLoop) {
- auto runner = StepRunner(this);
-
- runner.expectNextState(SessionState::kPoll, RequestKind::kExhaust);
- runner.expectNextState(SessionState::kSource, RequestKind::kExhaust);
- runner.expectNextState(SessionState::kProcess, RequestKind::kExhaust);
- runner.expectNextState(SessionState::kSink, RequestKind::kExhaust);
- runner.expectNextState(SessionState::kProcess, RequestKind::kDefault);
- runner.expectNextState(SessionState::kSink, RequestKind::kDefault);
- runner.expectFinalState(SessionState::kPoll);
-
- runner.run();
+ runSteps(convertStepsToBorrowed(exhaustLoop()));
}
TEST_F(SessionWorkflowWithBorrowedThreadsTest, MoreToComeLoop) {
- auto runner = StepRunner(this);
-
- runner.expectNextState(SessionState::kPoll, RequestKind::kMoreToCome);
- runner.expectNextState(SessionState::kSource, RequestKind::kMoreToCome);
- runner.expectNextState(SessionState::kProcess, RequestKind::kMoreToCome);
- runner.expectNextState(SessionState::kPoll, RequestKind::kDefault);
- runner.expectNextState(SessionState::kSource, RequestKind::kDefault);
- runner.expectNextState(SessionState::kProcess, RequestKind::kDefault);
- runner.expectNextState(SessionState::kSink, RequestKind::kDefault);
- runner.expectFinalState(SessionState::kPoll);
-
- runner.run();
+ runSteps(convertStepsToBorrowed(moreToComeLoop()));
}
} // namespace
-} // namespace transport
-} // namespace mongo
+} // namespace mongo::transport