diff options
author | Alex Li <alex.li@mongodb.com> | 2022-08-05 14:00:03 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-08-05 14:29:45 +0000 |
commit | 4b2edd31a69c2a51f8791c871e5bfb6213d9002e (patch) | |
tree | e9e0173f89a229c0c376d4a8c54ee00dddfe1535 /src/mongo/transport/session_workflow_test.cpp | |
parent | bc1652aba351de2c4596a63dfa3abe6849fd56f3 (diff) | |
download | mongo-4b2edd31a69c2a51f8791c871e5bfb6213d9002e.tar.gz |
SERVER-67425 ServiceStateMachineTest: separate mock and utility types from the fixture
Diffstat (limited to 'src/mongo/transport/session_workflow_test.cpp')
-rw-r--r-- | src/mongo/transport/session_workflow_test.cpp | 229 |
1 files changed, 118 insertions, 111 deletions
diff --git a/src/mongo/transport/session_workflow_test.cpp b/src/mongo/transport/session_workflow_test.cpp index 9c4fe021273..17644aaac7d 100644 --- a/src/mongo/transport/session_workflow_test.cpp +++ b/src/mongo/transport/session_workflow_test.cpp @@ -202,6 +202,77 @@ std::ostream& operator<<(std::ostream& os, RequestKind kind) { return os << toString(kind); } +class CallbackMockSession : public MockSessionBase { +public: + TransportLayer* getTransportLayer() const override { + return getTransportLayerCb(); + } + + void end() override { + endCb(); + } + + bool isConnected() override { + return isConnectedCb(); + } + + Status waitForData() noexcept override { + return waitForDataCb(); + } + + StatusWith<Message> sourceMessage() noexcept override { + return sourceMessageCb(); + } + + Status sinkMessage(Message message) noexcept override { + return sinkMessageCb(message); + } + + Future<void> asyncWaitForData() noexcept override { + return asyncWaitForDataCb(); + } + + Future<Message> asyncSourceMessage(const BatonHandle& handle) noexcept override { + return asyncSourceMessageCb(handle); + } + + Future<void> asyncSinkMessage(Message message, const BatonHandle& handle) noexcept override { + return asyncSinkMessageCb(message, handle); + } + + std::function<TransportLayer*(void)> getTransportLayerCb; + std::function<void(void)> endCb; + std::function<bool(void)> isConnectedCb; + std::function<Status(void)> waitForDataCb; + std::function<StatusWith<Message>(void)> sourceMessageCb; + std::function<Status(Message)> sinkMessageCb; + std::function<Future<void>(void)> asyncWaitForDataCb; + std::function<Future<Message>(const BatonHandle&)> asyncSourceMessageCb; + std::function<Future<void>(Message, const BatonHandle&)> asyncSinkMessageCb; +}; + +class MockServiceEntryPoint : public ServiceEntryPointImpl { +public: + explicit MockServiceEntryPoint(ServiceContext* svcCtx) : ServiceEntryPointImpl(svcCtx) {} + + Future<DbResponse> handleRequest(OperationContext* opCtx, + const Message& request) noexcept override { + return handleRequestCb(opCtx, request); + } + + void onEndSession(const transport::SessionHandle& handle) override { + onEndSessionCb(handle); + } + + void derivedOnClientDisconnect(Client* client) override { + derivedOnClientDisconnectCb(client); + } + + std::function<Future<DbResponse>(OperationContext*, const Message&)> handleRequestCb; + std::function<void(const transport::SessionHandle)> onEndSessionCb; + std::function<void(Client*)> derivedOnClientDisconnectCb; +}; + /** * The SessionWorkflowTest is a fixture that mocks the external inputs into the * SessionWorkflow so as to provide a deterministic way to evaluate the session workflow @@ -209,9 +280,6 @@ std::ostream& operator<<(std::ostream& os, RequestKind kind) { */ class SessionWorkflowTest : public LockerNoopServiceContextTest { public: - class ServiceEntryPoint; - class Session; - /** * Make a generic thread pool to deliver external inputs out of line (mocking the network or * database workers). @@ -368,7 +436,7 @@ private: /** * Use an external result to mock handling a request. */ - StatusWith<DbResponse> _handleRequest(OperationContext* opCtx, const Message&) noexcept { + StatusWith<DbResponse> _processRequest(OperationContext* opCtx, const Message&) noexcept { _stateQueue.push(SessionState::kProcess); auto result = [&]() -> StatusWith<DbResponse> { @@ -389,6 +457,17 @@ private: return result; } + Future<DbResponse> _handleRequest(OperationContext* opCtx, const Message& request) noexcept { + auto [p, f] = makePromiseFuture<DbResponse>(); + ExecutorFuture<void>(_threadPool) + .then([this, opCtx, &request, p = std::move(p)]() mutable { + auto strand = ClientStrand::get(opCtx->getClient()); + strand->run([&] { p.setWith([&] { return _processRequest(opCtx, request); }); }); + }) + .getAsync([](auto&&) {}); + return std::move(f); + } + /** * Use an external result to mock polling for data and observe the state. */ @@ -458,126 +537,36 @@ private: return result; } - /** - * Observe the end of the session. - */ - void _cleanup(const transport::SessionHandle& session) { - invariant(session == _session, - "This fixture and the SessionWorkflow should have handles to the same Session"); - - _stateQueue.push(SessionState::kEnd); - } - - void _onClientDisconnect() { - ++_onClientDisconnectCalled; - } - - SessionWorkflowTest::ServiceEntryPoint* _sep; - - const std::shared_ptr<ThreadPool> _threadPool = makeThreadPool(); - - std::unique_ptr<StateResult> _stateResult; - - std::shared_ptr<SessionWorkflowTest::Session> _session; - SingleProducerSingleConsumerQueue<SessionState> _stateQueue; - - int _onClientDisconnectCalled{0}; -}; - -/** - * This class is a simple wrapper that delegates Session behavior to the fixture. - */ -class SessionWorkflowTest::Session final : public transport::MockSessionBase { -public: - explicit Session(SessionWorkflowTest* fixture) - : transport::MockSessionBase(), _fixture{fixture} {} - ~Session() override = default; - - TransportLayer* getTransportLayer() const override { - MONGO_UNREACHABLE; - } - - void end() override { - _fixture->endSession(); - } - - bool isConnected() override { - return _fixture->isConnected(); - } - - Status waitForData() noexcept override { - return _fixture->_waitForData(); - } - - StatusWith<Message> sourceMessage() noexcept override { - return _fixture->_sourceMessage(); - } - - Status sinkMessage(Message message) noexcept override { - return _fixture->_sinkMessage(std::move(message)); - } - - Future<void> asyncWaitForData() noexcept override { - return ExecutorFuture<void>(_fixture->_threadPool) - .then([this] { return _fixture->_waitForData(); }) + Future<void> _asyncWaitForData() noexcept { + return ExecutorFuture<void>(_threadPool) + .then([this] { return _waitForData(); }) .unsafeToInlineFuture(); } - Future<Message> asyncSourceMessage(const BatonHandle&) noexcept override { - return ExecutorFuture<void>(_fixture->_threadPool) - .then([this] { return _fixture->_sourceMessage(); }) + Future<Message> _asyncSourceMessage() noexcept { + return ExecutorFuture<void>(_threadPool) + .then([this] { return _sourceMessage(); }) .unsafeToInlineFuture(); } - Future<void> asyncSinkMessage(Message message, const BatonHandle&) noexcept override { - return ExecutorFuture<void>(_fixture->_threadPool) + Future<void> _asyncSinkMessage(Message message) noexcept { + return ExecutorFuture<void>(_threadPool) .then([this, message = std::move(message)]() mutable { - return _fixture->_sinkMessage(std::move(message)); + return _sinkMessage(std::move(message)); }) .unsafeToInlineFuture(); } -private: - SessionWorkflowTest* const _fixture; -}; - -/** - * This class is a simple wrapper that delegates ServiceEntryPoint behavior to the fixture. - * - * TODO(SERVER-54143) ServiceEntryPointImpl does a surprising amount of management for - * SessionWorkflows. Once we separate that concern, we should be able to define onEndSession as - * a hook or override for that type and derive from ServiceEntryPoint instead. - */ -class SessionWorkflowTest::ServiceEntryPoint final : public ServiceEntryPointImpl { -public: - explicit ServiceEntryPoint(SessionWorkflowTest* fixture) - : ServiceEntryPointImpl(fixture->getServiceContext()), _fixture(fixture) {} - ~ServiceEntryPoint() override = default; + MockServiceEntryPoint* _sep; - Future<DbResponse> handleRequest(OperationContext* opCtx, - const Message& request) noexcept override { - auto [p, f] = makePromiseFuture<DbResponse>(); - ExecutorFuture<void>(_fixture->_threadPool) - .then([this, opCtx, &request, p = std::move(p)]() mutable { - auto strand = ClientStrand::get(opCtx->getClient()); - strand->run( - [&] { p.setWith([&] { return _fixture->_handleRequest(opCtx, request); }); }); - }) - .getAsync([](auto) {}); - return std::move(f); - } + const std::shared_ptr<ThreadPool> _threadPool = makeThreadPool(); - void onEndSession(const transport::SessionHandle& session) override { - _fixture->_cleanup(session); - } + std::unique_ptr<StateResult> _stateResult; - void derivedOnClientDisconnect(Client* client) override { - invariant(client); - _fixture->_onClientDisconnect(); - } + std::shared_ptr<CallbackMockSession> _session; + SingleProducerSingleConsumerQueue<SessionState> _stateQueue; -private: - SessionWorkflowTest* const _fixture; + int _onClientDisconnectCalled{0}; }; /** @@ -739,7 +728,17 @@ private: void SessionWorkflowTest::initNewSession() { assertNoSessionState(); - _session = std::make_shared<Session>(this); + _session = std::make_shared<CallbackMockSession>(); + _session->endCb = [&] { endSession(); }; + _session->isConnectedCb = [&] { return isConnected(); }; + _session->waitForDataCb = [&] { return _waitForData(); }; + _session->sourceMessageCb = [&] { return _sourceMessage(); }; + _session->sinkMessageCb = [&](Message message) { return _sinkMessage(std::move(message)); }; + _session->asyncWaitForDataCb = [&] { return _asyncWaitForData(); }; + _session->asyncSourceMessageCb = [&](const BatonHandle&) { return _asyncSourceMessage(); }; + _session->asyncSinkMessageCb = [&](Message message, const BatonHandle&) { + return _asyncSinkMessage(std::move(message)); + }; _stateResult->isConnected.store(true); } @@ -760,7 +759,15 @@ void SessionWorkflowTest::terminateViaServiceEntryPoint() { void SessionWorkflowTest::setUp() { ServiceContextTest::setUp(); - auto sep = std::make_unique<SessionWorkflowTest::ServiceEntryPoint>(this); + auto sep = std::make_unique<MockServiceEntryPoint>(getServiceContext()); + sep->handleRequestCb = [&](OperationContext* opCtx, const Message& request) { + return _handleRequest(opCtx, request); + }; + sep->onEndSessionCb = [&](const transport::SessionHandle& session) { + invariant(session == _session); + _stateQueue.push(SessionState::kEnd); + }; + sep->derivedOnClientDisconnectCb = [&](Client*) { ++_onClientDisconnectCalled; }; _sep = sep.get(); getServiceContext()->setServiceEntryPoint(std::move(sep)); invariant(_sep->start()); |