summaryrefslogtreecommitdiff
path: root/src/mongo/transport/session_workflow_test.cpp
diff options
context:
space:
mode:
authorAlex Li <alex.li@mongodb.com>2022-08-05 14:00:03 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-08-05 14:29:45 +0000
commit4b2edd31a69c2a51f8791c871e5bfb6213d9002e (patch)
treee9e0173f89a229c0c376d4a8c54ee00dddfe1535 /src/mongo/transport/session_workflow_test.cpp
parentbc1652aba351de2c4596a63dfa3abe6849fd56f3 (diff)
downloadmongo-4b2edd31a69c2a51f8791c871e5bfb6213d9002e.tar.gz
SERVER-67425 ServiceStateMachineTest: separate mock and utility types from the fixture
Diffstat (limited to 'src/mongo/transport/session_workflow_test.cpp')
-rw-r--r--src/mongo/transport/session_workflow_test.cpp229
1 files changed, 118 insertions, 111 deletions
diff --git a/src/mongo/transport/session_workflow_test.cpp b/src/mongo/transport/session_workflow_test.cpp
index 9c4fe021273..17644aaac7d 100644
--- a/src/mongo/transport/session_workflow_test.cpp
+++ b/src/mongo/transport/session_workflow_test.cpp
@@ -202,6 +202,77 @@ std::ostream& operator<<(std::ostream& os, RequestKind kind) {
return os << toString(kind);
}
+class CallbackMockSession : public MockSessionBase {
+public:
+ TransportLayer* getTransportLayer() const override {
+ return getTransportLayerCb();
+ }
+
+ void end() override {
+ endCb();
+ }
+
+ bool isConnected() override {
+ return isConnectedCb();
+ }
+
+ Status waitForData() noexcept override {
+ return waitForDataCb();
+ }
+
+ StatusWith<Message> sourceMessage() noexcept override {
+ return sourceMessageCb();
+ }
+
+ Status sinkMessage(Message message) noexcept override {
+ return sinkMessageCb(message);
+ }
+
+ Future<void> asyncWaitForData() noexcept override {
+ return asyncWaitForDataCb();
+ }
+
+ Future<Message> asyncSourceMessage(const BatonHandle& handle) noexcept override {
+ return asyncSourceMessageCb(handle);
+ }
+
+ Future<void> asyncSinkMessage(Message message, const BatonHandle& handle) noexcept override {
+ return asyncSinkMessageCb(message, handle);
+ }
+
+ std::function<TransportLayer*(void)> getTransportLayerCb;
+ std::function<void(void)> endCb;
+ std::function<bool(void)> isConnectedCb;
+ std::function<Status(void)> waitForDataCb;
+ std::function<StatusWith<Message>(void)> sourceMessageCb;
+ std::function<Status(Message)> sinkMessageCb;
+ std::function<Future<void>(void)> asyncWaitForDataCb;
+ std::function<Future<Message>(const BatonHandle&)> asyncSourceMessageCb;
+ std::function<Future<void>(Message, const BatonHandle&)> asyncSinkMessageCb;
+};
+
+class MockServiceEntryPoint : public ServiceEntryPointImpl {
+public:
+ explicit MockServiceEntryPoint(ServiceContext* svcCtx) : ServiceEntryPointImpl(svcCtx) {}
+
+ Future<DbResponse> handleRequest(OperationContext* opCtx,
+ const Message& request) noexcept override {
+ return handleRequestCb(opCtx, request);
+ }
+
+ void onEndSession(const transport::SessionHandle& handle) override {
+ onEndSessionCb(handle);
+ }
+
+ void derivedOnClientDisconnect(Client* client) override {
+ derivedOnClientDisconnectCb(client);
+ }
+
+ std::function<Future<DbResponse>(OperationContext*, const Message&)> handleRequestCb;
+ std::function<void(const transport::SessionHandle)> onEndSessionCb;
+ std::function<void(Client*)> derivedOnClientDisconnectCb;
+};
+
/**
* The SessionWorkflowTest is a fixture that mocks the external inputs into the
* SessionWorkflow so as to provide a deterministic way to evaluate the session workflow
@@ -209,9 +280,6 @@ std::ostream& operator<<(std::ostream& os, RequestKind kind) {
*/
class SessionWorkflowTest : public LockerNoopServiceContextTest {
public:
- class ServiceEntryPoint;
- class Session;
-
/**
* Make a generic thread pool to deliver external inputs out of line (mocking the network or
* database workers).
@@ -368,7 +436,7 @@ private:
/**
* Use an external result to mock handling a request.
*/
- StatusWith<DbResponse> _handleRequest(OperationContext* opCtx, const Message&) noexcept {
+ StatusWith<DbResponse> _processRequest(OperationContext* opCtx, const Message&) noexcept {
_stateQueue.push(SessionState::kProcess);
auto result = [&]() -> StatusWith<DbResponse> {
@@ -389,6 +457,17 @@ private:
return result;
}
+ Future<DbResponse> _handleRequest(OperationContext* opCtx, const Message& request) noexcept {
+ auto [p, f] = makePromiseFuture<DbResponse>();
+ ExecutorFuture<void>(_threadPool)
+ .then([this, opCtx, &request, p = std::move(p)]() mutable {
+ auto strand = ClientStrand::get(opCtx->getClient());
+ strand->run([&] { p.setWith([&] { return _processRequest(opCtx, request); }); });
+ })
+ .getAsync([](auto&&) {});
+ return std::move(f);
+ }
+
/**
* Use an external result to mock polling for data and observe the state.
*/
@@ -458,126 +537,36 @@ private:
return result;
}
- /**
- * Observe the end of the session.
- */
- void _cleanup(const transport::SessionHandle& session) {
- invariant(session == _session,
- "This fixture and the SessionWorkflow should have handles to the same Session");
-
- _stateQueue.push(SessionState::kEnd);
- }
-
- void _onClientDisconnect() {
- ++_onClientDisconnectCalled;
- }
-
- SessionWorkflowTest::ServiceEntryPoint* _sep;
-
- const std::shared_ptr<ThreadPool> _threadPool = makeThreadPool();
-
- std::unique_ptr<StateResult> _stateResult;
-
- std::shared_ptr<SessionWorkflowTest::Session> _session;
- SingleProducerSingleConsumerQueue<SessionState> _stateQueue;
-
- int _onClientDisconnectCalled{0};
-};
-
-/**
- * This class is a simple wrapper that delegates Session behavior to the fixture.
- */
-class SessionWorkflowTest::Session final : public transport::MockSessionBase {
-public:
- explicit Session(SessionWorkflowTest* fixture)
- : transport::MockSessionBase(), _fixture{fixture} {}
- ~Session() override = default;
-
- TransportLayer* getTransportLayer() const override {
- MONGO_UNREACHABLE;
- }
-
- void end() override {
- _fixture->endSession();
- }
-
- bool isConnected() override {
- return _fixture->isConnected();
- }
-
- Status waitForData() noexcept override {
- return _fixture->_waitForData();
- }
-
- StatusWith<Message> sourceMessage() noexcept override {
- return _fixture->_sourceMessage();
- }
-
- Status sinkMessage(Message message) noexcept override {
- return _fixture->_sinkMessage(std::move(message));
- }
-
- Future<void> asyncWaitForData() noexcept override {
- return ExecutorFuture<void>(_fixture->_threadPool)
- .then([this] { return _fixture->_waitForData(); })
+ Future<void> _asyncWaitForData() noexcept {
+ return ExecutorFuture<void>(_threadPool)
+ .then([this] { return _waitForData(); })
.unsafeToInlineFuture();
}
- Future<Message> asyncSourceMessage(const BatonHandle&) noexcept override {
- return ExecutorFuture<void>(_fixture->_threadPool)
- .then([this] { return _fixture->_sourceMessage(); })
+ Future<Message> _asyncSourceMessage() noexcept {
+ return ExecutorFuture<void>(_threadPool)
+ .then([this] { return _sourceMessage(); })
.unsafeToInlineFuture();
}
- Future<void> asyncSinkMessage(Message message, const BatonHandle&) noexcept override {
- return ExecutorFuture<void>(_fixture->_threadPool)
+ Future<void> _asyncSinkMessage(Message message) noexcept {
+ return ExecutorFuture<void>(_threadPool)
.then([this, message = std::move(message)]() mutable {
- return _fixture->_sinkMessage(std::move(message));
+ return _sinkMessage(std::move(message));
})
.unsafeToInlineFuture();
}
-private:
- SessionWorkflowTest* const _fixture;
-};
-
-/**
- * This class is a simple wrapper that delegates ServiceEntryPoint behavior to the fixture.
- *
- * TODO(SERVER-54143) ServiceEntryPointImpl does a surprising amount of management for
- * SessionWorkflows. Once we separate that concern, we should be able to define onEndSession as
- * a hook or override for that type and derive from ServiceEntryPoint instead.
- */
-class SessionWorkflowTest::ServiceEntryPoint final : public ServiceEntryPointImpl {
-public:
- explicit ServiceEntryPoint(SessionWorkflowTest* fixture)
- : ServiceEntryPointImpl(fixture->getServiceContext()), _fixture(fixture) {}
- ~ServiceEntryPoint() override = default;
+ MockServiceEntryPoint* _sep;
- Future<DbResponse> handleRequest(OperationContext* opCtx,
- const Message& request) noexcept override {
- auto [p, f] = makePromiseFuture<DbResponse>();
- ExecutorFuture<void>(_fixture->_threadPool)
- .then([this, opCtx, &request, p = std::move(p)]() mutable {
- auto strand = ClientStrand::get(opCtx->getClient());
- strand->run(
- [&] { p.setWith([&] { return _fixture->_handleRequest(opCtx, request); }); });
- })
- .getAsync([](auto) {});
- return std::move(f);
- }
+ const std::shared_ptr<ThreadPool> _threadPool = makeThreadPool();
- void onEndSession(const transport::SessionHandle& session) override {
- _fixture->_cleanup(session);
- }
+ std::unique_ptr<StateResult> _stateResult;
- void derivedOnClientDisconnect(Client* client) override {
- invariant(client);
- _fixture->_onClientDisconnect();
- }
+ std::shared_ptr<CallbackMockSession> _session;
+ SingleProducerSingleConsumerQueue<SessionState> _stateQueue;
-private:
- SessionWorkflowTest* const _fixture;
+ int _onClientDisconnectCalled{0};
};
/**
@@ -739,7 +728,17 @@ private:
void SessionWorkflowTest::initNewSession() {
assertNoSessionState();
- _session = std::make_shared<Session>(this);
+ _session = std::make_shared<CallbackMockSession>();
+ _session->endCb = [&] { endSession(); };
+ _session->isConnectedCb = [&] { return isConnected(); };
+ _session->waitForDataCb = [&] { return _waitForData(); };
+ _session->sourceMessageCb = [&] { return _sourceMessage(); };
+ _session->sinkMessageCb = [&](Message message) { return _sinkMessage(std::move(message)); };
+ _session->asyncWaitForDataCb = [&] { return _asyncWaitForData(); };
+ _session->asyncSourceMessageCb = [&](const BatonHandle&) { return _asyncSourceMessage(); };
+ _session->asyncSinkMessageCb = [&](Message message, const BatonHandle&) {
+ return _asyncSinkMessage(std::move(message));
+ };
_stateResult->isConnected.store(true);
}
@@ -760,7 +759,15 @@ void SessionWorkflowTest::terminateViaServiceEntryPoint() {
void SessionWorkflowTest::setUp() {
ServiceContextTest::setUp();
- auto sep = std::make_unique<SessionWorkflowTest::ServiceEntryPoint>(this);
+ auto sep = std::make_unique<MockServiceEntryPoint>(getServiceContext());
+ sep->handleRequestCb = [&](OperationContext* opCtx, const Message& request) {
+ return _handleRequest(opCtx, request);
+ };
+ sep->onEndSessionCb = [&](const transport::SessionHandle& session) {
+ invariant(session == _session);
+ _stateQueue.push(SessionState::kEnd);
+ };
+ sep->derivedOnClientDisconnectCb = [&](Client*) { ++_onClientDisconnectCalled; };
_sep = sep.get();
getServiceContext()->setServiceEntryPoint(std::move(sep));
invariant(_sep->start());