/** * Copyright (C) 2018-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, * as published by MongoDB, Inc. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * Server Side Public License for more details. * * You should have received a copy of the Server Side Public License * along with this program. If not, see * . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the Server Side Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/platform/basic.h" #include #include #include #include #include #include #include #include #include #include "mongo/base/checked_cast.h" #include "mongo/base/status.h" #include "mongo/bson/bsonobj.h" #include "mongo/bson/bsonobjbuilder.h" #include "mongo/db/client.h" #include "mongo/db/client_strand.h" #include "mongo/db/concurrency/locker_noop_service_context_test_fixture.h" #include "mongo/db/dbmessage.h" #include "mongo/db/service_context.h" #include "mongo/logv2/log.h" #include "mongo/platform/compiler.h" #include "mongo/platform/mutex.h" #include "mongo/rpc/op_msg.h" #include "mongo/transport/mock_session.h" #include "mongo/transport/service_entry_point.h" #include "mongo/transport/service_entry_point_impl.h" #include "mongo/transport/service_executor.h" #include "mongo/transport/service_executor_utils.h" #include "mongo/transport/session_workflow.h" #include "mongo/transport/session_workflow_test_util.h" #include "mongo/unittest/unittest.h" #include "mongo/util/assert_util.h" #include "mongo/util/concurrency/notification.h" #include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/synchronized_value.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest namespace mongo::transport { namespace { const Status kClosedSessionError{ErrorCodes::SocketException, "Session is closed"}; const Status kNetworkError{ErrorCodes::HostUnreachable, "Someone is unreachable"}; const Status kShutdownError{ErrorCodes::ShutdownInProgress, "Something is shutting down"}; const Status kArbitraryError{ErrorCodes::InternalError, "Something happened"}; template struct FunctionTraits; template struct FunctionTraits { using function_type = R(A...); using result_type = R; using tied_arguments_type = std::tuple; }; /** X-Macro defining the mocked event names and their function signatures */ #define EVENT_TABLE(X) \ /* Session functions */ \ X(sessionWaitForData, Status()) \ X(sessionSourceMessage, StatusWith()) \ X(sessionSinkMessage, Status(const Message&)) \ /* ServiceEntryPoint functions */ \ X(sepHandleRequest, Future(OperationContext*, const Message&)) \ X(sepEndSession, void(const std::shared_ptr&)) \ /**/ /** * Events generated by SessionWorkflow via virtual function calls to mock * objects. They are a means to observe and indirectly manipulate * SessionWorkflow's behavior to reproduce test scenarios. * * They are named for the mock object and function that emits them. */ #define X(e, sig) e, enum class Event { EVENT_TABLE(X) }; #undef X StringData toString(Event e) { #define X(e, sig) #e ""_sd, return std::array{EVENT_TABLE(X)}[static_cast(e)]; #undef X } std::ostream& operator<<(std::ostream& os, Event e) { return os << toString(e); } /** * Trait that maps the Event enum to a function type. * Captures and analyzes the function type of each mocked Event. * We'll use these to enable type erasure in the framework. */ template struct EventTraits; #define X(e, sig) \ template <> \ struct EventTraits : FunctionTraits {}; EVENT_TABLE(X) #undef X template using EventSigT = typename EventTraits::function_type; template using EventTiedArgumentsT = typename EventTraits::tied_arguments_type; template using EventResultT = typename EventTraits::result_type; Message makeOpMsg() { static auto nextId = AtomicWord{0}; auto omb = OpMsgBuilder{}; omb.setBody(BSONObjBuilder{}.append("id", nextId.fetchAndAdd(1)).obj()); return omb.finish(); } DbResponse makeResponse(Message m) { DbResponse response{}; response.response = m; return response; } DbResponse setExhaust(DbResponse response) { response.shouldRunAgainForExhaust = true; return response; } Message setExhaustSupported(Message msg) { OpMsg::setFlag(&msg, OpMsg::kExhaustSupported); return msg; } Message setMoreToCome(Message msg) { OpMsg::setFlag(&msg, OpMsg::kMoreToCome); return msg; } class MockExpectationSlot { public: struct BasicExpectation { virtual ~BasicExpectation() = default; virtual Event event() const = 0; }; template struct Expectation : BasicExpectation { explicit Expectation(unique_function> cb) : cb{std::move(cb)} {} Event event() const override { return e; } unique_function> cb; }; template void push(unique_function> cb) { stdx::lock_guard lk{_mutex}; invariant(!_cb); _cb = std::make_unique>(std::move(cb)); _cv.notify_one(); } template unique_function> pop() { stdx::unique_lock lk{_mutex}; _cv.wait(lk, [&] { return !!_cb; }); auto h = std::exchange(_cb, {}); invariant(h->event() == e, "Expecting {}, got {}"_format(h->event(), e)); return std::move(static_cast&>(*h).cb); } private: mutable Mutex _mutex; stdx::condition_variable _cv; std::unique_ptr _cb; }; /** Fixture that mocks interactions with a `SessionWorkflow`. */ class SessionWorkflowTest : public LockerNoopServiceContextTest { using Base = LockerNoopServiceContextTest; public: void setUp() override { Base::setUp(); auto sc = getServiceContext(); sc->setServiceEntryPoint(_makeServiceEntryPoint(sc)); initializeNewSession(); invariant(sep()->start()); _threadPool->startup(); } void tearDown() override { ScopeGuard guard = [&] { Base::tearDown(); }; // Normal shutdown is a noop outside of ASAN. invariant(sep()->shutdownAndWait(Seconds{10})); _threadPool->shutdown(); _threadPool->join(); } void initializeNewSession() { _session = std::make_shared(this); } /** Waits for the current Session and SessionWorkflow to end. */ void joinSessions() { ASSERT(sep()->waitForNoSessions(Seconds{1})); } /** Launches a SessionWorkflow for the current session. */ void startSession() { LOGV2(6742613, "Starting session"); sep()->startSession(_session); } MockServiceEntryPoint* sep() { return checked_cast(getServiceContext()->getServiceEntryPoint()); } /** * Installs an arbitrary one-shot mock handler callback for the next event. * The next incoming mock event will invoke this callback and destroy it. */ template void injectMockResponse(unique_function> cb) { _expect.push(std::move(cb)); } /** * Wrapper around `injectMockResponse`. Installs a handler for the `expected` * mock event, that will return the specified `result`. * Returns a `Future` that is fulfilled when that mock event occurs. */ template Future asyncExpect(EventResultT r) { auto pf = std::make_shared>(); injectMockResponse([r = std::move(r), pf](auto&&...) mutable { pf->promise.emplaceValue(); return std::move(r); }); return std::move(pf->future); } template Future asyncExpect() { auto pf = std::make_shared>(); injectMockResponse( [pf](const EventTiedArgumentsT& args) mutable { pf->promise.emplaceValue(); }); return std::move(pf->future); } template void expect(EventResultT r) { asyncExpect(std::move(r)).get(); } template void expect() { asyncExpect().get(); } private: class CustomMockSession : public CallbackMockSession { public: explicit CustomMockSession(SessionWorkflowTest* fixture) { endCb = [this] { *_connected = false; }; isConnectedCb = [this] { return *_connected; }; waitForDataCb = [fixture] { return fixture->_onMockEvent(std::tie()); }; sourceMessageCb = [fixture] { return fixture->_onMockEvent(std::tie()); }; sinkMessageCb = [fixture](const Message& m) { return fixture->_onMockEvent(std::tie(m)); }; // The async variants will just run the same callback on `_threadPool`. auto async = [fixture](auto cb) { return ExecutorFuture(fixture->_threadPool).then(cb).unsafeToInlineFuture(); }; asyncWaitForDataCb = [=, cb = waitForDataCb] { return async([cb] { return cb(); }); }; asyncSourceMessageCb = [=, cb = sourceMessageCb](const BatonHandle&) { return async([cb] { return cb(); }); }; asyncSinkMessageCb = [=, cb = sinkMessageCb](Message m, const BatonHandle&) { return async([cb, m = std::move(m)]() mutable { return cb(std::move(m)); }); }; } private: synchronized_value _connected{true}; // Born in the connected state. }; std::shared_ptr _makeThreadPool() { ThreadPool::Options options{}; options.poolName = "SessionWorkflowTest"; return std::make_shared(std::move(options)); } std::unique_ptr _makeServiceEntryPoint(ServiceContext* sc) { auto sep = std::make_unique(sc); sep->handleRequestCb = [=, this](OperationContext* opCtx, const Message& msg) { if (!gInitialUseDedicatedThread) { // Simulates an async command implemented under the borrowed // thread model. The returned future will be fulfilled while // holding the ClientStrand for opCtx. auto pf = PromiseAndFuture(); ExecutorFuture(_threadPool) .then([this, strand = ClientStrand::get(opCtx->getClient()), opCtx, msg = std::move(msg), p = std::move(pf.promise)]() mutable { strand->run([&] { p.setWith([&] { return _onMockEvent(std::tie(opCtx, msg)); }); }); }) .getAsync([](auto&&) {}); return std::move(pf.future); } return _onMockEvent(std::tie(opCtx, msg)); }; sep->onEndSessionCb = [=, this](const std::shared_ptr& session) { _onMockEvent(std::tie(session)); }; sep->derivedOnClientDisconnectCb = [&](Client*) { }; return sep; } /** * Called by all mock functions to notify the main thread and get a value with which to respond. * The mock function call is identified by an `event`. If there isn't already an expectation, * the mock object will wait for one to be injected via a call to `injectMockResponse`. */ template EventResultT _onMockEvent(const EventTiedArgumentsT& args) { LOGV2_DEBUG(6742616, 2, "Mock event arrived", "event"_attr = event); return std::apply(_expect.pop(), args); } MockExpectationSlot _expect; std::shared_ptr _session; std::shared_ptr _threadPool = _makeThreadPool(); }; TEST_F(SessionWorkflowTest, StartThenEndSession) { startSession(); expect(kClosedSessionError); expect(); joinSessions(); } TEST_F(SessionWorkflowTest, OneNormalCommand) { startSession(); expect(makeOpMsg()); expect(makeResponse(makeOpMsg())); expect(Status::OK()); expect(kClosedSessionError); expect(); joinSessions(); } TEST_F(SessionWorkflowTest, OnClientDisconnectCalledOnCleanup) { int disconnects = 0; sep()->derivedOnClientDisconnectCb = [&](Client*) { ++disconnects; }; startSession(); ASSERT_EQ(disconnects, 0); expect(kClosedSessionError); expect(); joinSessions(); ASSERT_EQ(disconnects, 1); } /** Repro of one formerly troublesome scenario generated by the StepRunner test below. */ TEST_F(SessionWorkflowTest, MoreToComeDisconnectAtSource3) { startSession(); // One more-to-come command, yields an empty response per wire protocol expect(setMoreToCome(makeOpMsg())); expect(makeResponse({})); // Another message from session, this time a normal RPC. expect(makeOpMsg()); expect(makeResponse(makeOpMsg())); expect(Status::OK()); // Client disconnects while we're waiting for their next command. expect(kShutdownError); expect(); joinSessions(); } /** * Check the behavior of an interrupted "getMore" exhaust command. * SessionWorkflow looks specifically for the "getMore" command name to trigger * this cleanup. */ TEST_F(SessionWorkflowTest, CleanupFromGetMore) { initializeNewSession(); startSession(); auto makeGetMoreRequest = [](int64_t cursorId) { OpMsgBuilder omb; omb.setBody(BSONObjBuilder{} .append("getMore", cursorId) .append("collection", "testColl") .append("$db", "testDb") .obj()); return setExhaustSupported(omb.finish()); }; auto makeGetMoreResponse = [] { DbResponse response; OpMsgBuilder omb; omb.setBody(BSONObjBuilder{}.append("id", int64_t{0}).obj()); response.response = omb.finish(); return response; }; // Produce the condition of having an active `getMore` exhaust command. expect(makeGetMoreRequest(123)); expect(setExhaust(makeGetMoreResponse())); // Simulate a disconnect during the session sink call. The cleanup of // exhaust resources happens when the session disconnects. So after the send // of the "getMore" response returns the injected error, expect the // SessionWorkflow to issue a fire-and-forget "killCursors". expect(kClosedSessionError); PromiseAndFuture killCursors; injectMockResponse( [p = std::move(killCursors.promise)](OperationContext*, const Message& msg) mutable { p.emplaceValue(msg); return makeResponse({}); }); ASSERT_EQ(OpMsgRequest::parse(killCursors.future.get()).getCommandName(), "killCursors"_sd); // Because they're fire-and-forget commands, we will only observe `handleRequest` // calls to the SEP for the cleanup "killCursors", and the next thing to happen // will be the end of the session. expect(); joinSessions(); } class StepRunnerSessionWorkflowTest : public SessionWorkflowTest { public: /** * Concisely encode the ways this test might respond to mock events. * The OK Result contents depend on which Event it's responding to. */ #define ACTION_TABLE(X) \ X(basic) /* OK result for a basic (request and response) command. */ \ X(exhaust) /* OK result for a exhuast command. */ \ X(moreToCome) /* OK result for a fire-and-forget command. */ \ /**/ \ X(errTerminate) /* External termination via the ServiceEntryPoint. */ \ X(errDisconnect) /* Socket disconnection by peer. */ \ X(errNetwork) /* Unspecified network failure (host unreachable). */ \ X(errShutdown) /* System shutdown. */ \ X(errArbitrary) /* An arbitrary miscellaneous error. */ #define X(e) e, enum class Action { ACTION_TABLE(X) }; #undef X friend StringData toString(Action action) { #define X(a) #a ""_sd, return std::array{ACTION_TABLE(X)}[static_cast(action)]; #undef X } /** * Given a list of steps, performs a series of tests exercising that list. * * The `run()` function performs a set of variations on the steps, failing * further and further along the way, with different errors tried at each * step. * * It first sets a baseline by running all the steps without injecting * failure. Then it checks each failure condition for each step in the * sequence. For example, if we have steps[NS] and failure conditions * fails[NF], it will run these pseudocode trials: * * // First, no errors. * { steps[0](OK); steps[1](OK); ... steps[NS-1](OK); } * * // Inject each kind of failure at steps[0]. * { steps[0](fails[0]); } * { steps[0](fails[1]); } * ... and so on for fails[NF]. * * // Now let steps[0] succeed, but inject each kind of failure at steps[1]. * { steps[0](OK); steps[1](fails[0]); } * { steps[0](OK); steps[1](fails[1]); } * ... and so on for fails[NF]. * * // And so on the NS steps.... */ class RunAllErrorsAtAllSteps { public: /** The set of failures is hardcoded. */ static constexpr std::array fails{Action::errTerminate, Action::errDisconnect, Action::errNetwork, Action::errShutdown, Action::errArbitrary}; /** Encodes a response to `event` by taking `action`. */ struct Step { Event event; Action action = Action::basic; }; // The final step is assumed to have `errDisconnect` as an action, // yielding an implied `kEnd` step. RunAllErrorsAtAllSteps(SessionWorkflowTest* fixture, std::deque steps) : _fixture{fixture}, _steps{[&, at = steps.size() - 1] { return _appendTermination(std::move(steps), at, Action::errDisconnect); }()} {} /** * Run all of the trials specified by the constructor. */ void run() { const std::deque baseline(_steps.begin(), _steps.end()); LOGV2(5014106, "Running one entirely clean run"); _runSteps(baseline); // Incrementally push forward the step where we fail. for (size_t failAt = 0; failAt + 1 < baseline.size(); ++failAt) { LOGV2(6742614, "Injecting failures", "failAt"_attr = failAt); for (auto fail : fails) _runSteps(_appendTermination(baseline, failAt, fail)); } } private: /** * Returns a new steps sequence, formed by copying the specified `q`, and * modifying the copy to be terminated with a `fail` at the `failAt` index. */ std::deque _appendTermination(std::deque q, size_t failAt, Action fail) const { LOGV2(6742617, "appendTermination", "fail"_attr = fail, "failAt"_attr = failAt); invariant(failAt < q.size()); q.erase(q.begin() + failAt + 1, q.end()); q.back().action = fail; q.push_back({Event::sepEndSession}); return q; } template void _dumpTransitions(const T& q) { BSONArrayBuilder bab; for (auto&& t : q) { BSONObjBuilder{bab.subobjStart()} .append("event", toString(t.event)) .append("action", toString(t.action)); } LOGV2(6742615, "Run transitions", "transitions"_attr = bab.arr()); } template >, int> = 0> void _setExpectation() { _fixture->expect(); } // The scenario generator will try to inject an error status into // functions That don't report errors, so that injected Status must be ignored. template >, int> = 0> void _setExpectation(Status) { _fixture->expect(); } template >, int> = 0> void _setExpectation(EventResultT r) { _fixture->expect(std::move(r)); } template void injectStep(const Action& action) { LOGV2_DEBUG(6872301, 3, "Inject step", "event"_attr = event, "action"_attr = action); switch (action) { case Action::errTerminate: { // Has a side effect of simulating a ServiceEntryPoint shutdown // before responding with a shutdown error. auto pf = std::make_shared>(); _fixture->injectMockResponse([this, pf](auto&&...) { _fixture->sep()->endAllSessionsNoTagMask(); pf->promise.emplaceValue(); if constexpr (std::is_void_v>) { return; } else { return kShutdownError; } }); pf->future.get(); } break; case Action::errDisconnect: _setExpectation(kClosedSessionError); break; case Action::errNetwork: _setExpectation(kNetworkError); break; case Action::errShutdown: _setExpectation(kShutdownError); break; case Action::errArbitrary: _setExpectation(kArbitraryError); break; case Action::basic: case Action::exhaust: case Action::moreToCome: if constexpr (event == Event::sepEndSession) { _setExpectation(); return; } else if constexpr (event == Event::sessionWaitForData || event == Event::sessionSinkMessage) { _setExpectation(Status::OK()); return; } else if constexpr (event == Event::sessionSourceMessage) { Message m = makeOpMsg(); if (action == Action::exhaust) m = setExhaustSupported(m); _setExpectation(StatusWith{std::move(m)}); return; } else if constexpr (event == Event::sepHandleRequest) { switch (action) { case Action::basic: _setExpectation(StatusWith{makeResponse(makeOpMsg())}); return; case Action::exhaust: _setExpectation( StatusWith{setExhaust(makeResponse(makeOpMsg()))}); return; case Action::moreToCome: _setExpectation(StatusWith{DbResponse{}}); return; default: MONGO_UNREACHABLE; } break; } break; } } void injectStep(const Step& t) { // The event table is expanded to generate the cases of a switch, // effectively transforming the runtime value `t.event` into a // template parameter. The `sig` is unused in the expansion. switch (t.event) { #define X(e, sig) \ case Event::e: \ injectStep(t.action); \ break; EVENT_TABLE(X) #undef X } } /** Start a new session, run the `steps` sequence, and join the session. */ void _runSteps(std::deque q) { _dumpTransitions(q); _fixture->initializeNewSession(); _fixture->startSession(); for (; !q.empty(); q.pop_front()) injectStep(q.front()); _fixture->joinSessions(); } SessionWorkflowTest* _fixture; std::deque _steps; }; void runSteps(std::deque steps) { RunAllErrorsAtAllSteps{this, steps}.run(); } std::deque defaultLoop() const { return { {Event::sessionSourceMessage}, {Event::sepHandleRequest}, {Event::sessionSinkMessage}, {Event::sessionSourceMessage}, }; } std::deque exhaustLoop() const { return { {Event::sessionSourceMessage, Action::exhaust}, {Event::sepHandleRequest, Action::exhaust}, {Event::sessionSinkMessage}, {Event::sepHandleRequest}, {Event::sessionSinkMessage}, {Event::sessionSourceMessage}, }; } std::deque moreToComeLoop() const { return { {Event::sessionSourceMessage, Action::moreToCome}, {Event::sepHandleRequest, Action::moreToCome}, {Event::sessionSourceMessage}, {Event::sepHandleRequest}, {Event::sessionSinkMessage}, {Event::sessionSourceMessage}, }; } }; class SessionWorkflowWithDedicatedThreadsTest : public StepRunnerSessionWorkflowTest { ScopedValueOverride _svo{gInitialUseDedicatedThread, true}; }; TEST_F(SessionWorkflowWithDedicatedThreadsTest, DefaultLoop) { runSteps(defaultLoop()); } TEST_F(SessionWorkflowWithDedicatedThreadsTest, ExhaustLoop) { runSteps(exhaustLoop()); } TEST_F(SessionWorkflowWithDedicatedThreadsTest, MoreToComeLoop) { runSteps(moreToComeLoop()); } class SessionWorkflowWithBorrowedThreadsTest : public StepRunnerSessionWorkflowTest { public: /** * Under the borrowed thread model, the steps are the same as for dedicated thread model, * except that Session sourceMessage events are preceded by Session waitForData events. */ std::deque convertStepsToBorrowed( std::deque q) { for (auto iter = q.begin(); iter != q.end(); ++iter) if (iter->event == Event::sessionSourceMessage) iter = std::next(q.insert(iter, {Event::sessionWaitForData})); return q; } private: ScopedValueOverride _svo{gInitialUseDedicatedThread, false}; }; TEST_F(SessionWorkflowWithBorrowedThreadsTest, DefaultLoop) { runSteps(convertStepsToBorrowed(defaultLoop())); } TEST_F(SessionWorkflowWithBorrowedThreadsTest, ExhaustLoop) { runSteps(convertStepsToBorrowed(exhaustLoop())); } TEST_F(SessionWorkflowWithBorrowedThreadsTest, MoreToComeLoop) { runSteps(convertStepsToBorrowed(moreToComeLoop())); } } // namespace } // namespace mongo::transport