diff options
author | Billy Donahue <billy.donahue@mongodb.com> | 2023-01-26 04:45:10 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-01-26 05:53:27 +0000 |
commit | 875e14f1adb92cb0ab0e8b2aaffa3c7f73d49e4a (patch) | |
tree | 18b5fda43b387ca5fc1d36b24501755892204a87 /src/mongo/transport | |
parent | be8fd031fb851f5c0c2288585abafd92439c8d0a (diff) | |
download | mongo-875e14f1adb92cb0ab0e8b2aaffa3c7f73d49e4a.tar.gz |
SERVER-68723 SessionWorkflowTest refactor (fix and unrevert)
Diffstat (limited to 'src/mongo/transport')
-rw-r--r-- | src/mongo/transport/session_workflow_test.cpp | 1261 |
1 files changed, 577 insertions, 684 deletions
diff --git a/src/mongo/transport/session_workflow_test.cpp b/src/mongo/transport/session_workflow_test.cpp index a9baba0dfb5..7721a1f39e2 100644 --- a/src/mongo/transport/session_workflow_test.cpp +++ b/src/mongo/transport/session_workflow_test.cpp @@ -30,8 +30,12 @@ #include "mongo/platform/basic.h" +#include <array> +#include <deque> +#include <initializer_list> #include <memory> #include <type_traits> +#include <utility> #include <vector> #include "mongo/base/checked_cast.h" @@ -55,19 +59,15 @@ #include "mongo/transport/service_executor_utils.h" #include "mongo/transport/session_workflow.h" #include "mongo/transport/session_workflow_test_util.h" -#include "mongo/transport/transport_layer_mock.h" #include "mongo/unittest/unittest.h" #include "mongo/util/assert_util.h" -#include "mongo/util/clock_source_mock.h" +#include "mongo/util/concurrency/notification.h" #include "mongo/util/concurrency/thread_pool.h" -#include "mongo/util/producer_consumer_queue.h" -#include "mongo/util/tick_source_mock.h" +#include "mongo/util/synchronized_value.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest - -namespace mongo { -namespace transport { +namespace mongo::transport { namespace { const Status kClosedSessionError{ErrorCodes::SocketException, "Session is closed"}; @@ -75,815 +75,708 @@ const Status kNetworkError{ErrorCodes::HostUnreachable, "Someone is unreachable" const Status kShutdownError{ErrorCodes::ShutdownInProgress, "Something is shutting down"}; const Status kArbitraryError{ErrorCodes::InternalError, "Something happened"}; -/** - * FailureCondition represents a set of the ways any state in the SessionWorkflow can fail. - */ -enum class FailureCondition { - kNone, - kTerminate, // External termination via the ServiceEntryPoint. - kDisconnect, // Socket disconnection by peer. - kNetwork, // Unspecified network failure (ala host unreachable). - kShutdown, // System shutdown. - kArbitrary, // An arbitrary error that does not fall under the other conditions. -}; - -constexpr StringData toString(FailureCondition fail) { - switch (fail) { - case FailureCondition::kNone: - return "None"_sd; - case FailureCondition::kTerminate: - return "Terminate"_sd; - case FailureCondition::kDisconnect: - return "Disconnect"_sd; - case FailureCondition::kNetwork: - return "Network"_sd; - case FailureCondition::kShutdown: - return "Shutdown"_sd; - case FailureCondition::kArbitrary: - return "Arbitrary"_sd; - }; - - return "Unknown"_sd; -} - -std::ostream& operator<<(std::ostream& os, FailureCondition fail) { - return os << toString(fail); +template <typename T, size_t N> +StringData findEnumName(const std::pair<T, StringData> (&arr)[N], T k) { + using std::begin; + using std::end; + return std::find_if(begin(arr), end(arr), [&](auto&& e) { return e.first == k; })->second; } /** - * SessionState represents the externally observable state of the SessionWorkflow. These - * states map relatively closely to the internals of the SessionWorkflow::Impl. That said, - * this enum represents the SessionWorkflowTest's external understanding of the internal - * state. + * Events generated by SessionWorkflow via virtual function calls to mock + * objects. They are a means to observe and indirectly manipulate + * SessionWorkflow's behavior to reproduce test scenarios. + * + * They are named for the mock object and function that emits them. */ -enum class SessionState { - kStart, - kPoll, - kSource, - kProcess, - kSink, - kEnd, +enum class Event { + kSessionWaitForData, + kSessionSourceMessage, + kSepHandleRequest, + kSessionSinkMessage, + kSepEndSession, }; -constexpr StringData toString(SessionState state) { - switch (state) { - case SessionState::kStart: - return "Start"_sd; - case SessionState::kPoll: - return "Poll"_sd; - case SessionState::kSource: - return "Source"_sd; - case SessionState::kProcess: - return "Process"_sd; - case SessionState::kSink: - return "Sink"_sd; - case SessionState::kEnd: - return "End"_sd; - }; - - return "Unknown"_sd; +StringData toString(Event e) { + return findEnumName( + { + {Event::kSessionWaitForData, "Session.waitForData"_sd}, + {Event::kSessionSourceMessage, "Session.sourceMessage"_sd}, + {Event::kSepHandleRequest, "Sep.handleRequest"_sd}, + {Event::kSessionSinkMessage, "Session.sinkMessage"_sd}, + {Event::kSepEndSession, "Sep.endSession"_sd}, + }, + e); } -std::ostream& operator<<(std::ostream& os, SessionState state) { - return os << toString(state); +std::ostream& operator<<(std::ostream& os, Event e) { + return os << toString(e); } -/** - * RequestKind represents the type of operation of the SessionWorkflow. Depending on various - * message flags and conditions, the SessionWorkflow will transition between states - * differently. - */ -enum class RequestKind { - kDefault, - kExhaust, - kMoreToCome, -}; - -constexpr StringData toString(RequestKind kind) { - switch (kind) { - case RequestKind::kDefault: - return "Default"_sd; - case RequestKind::kExhaust: - return "Exhaust"_sd; - case RequestKind::kMoreToCome: - return "MoreToCome"_sd; - }; - return "Unknown"_sd; +template <typename T> +static std::string typeName() { + return demangleName(typeid(T)); } - -std::ostream& operator<<(std::ostream& os, RequestKind kind) { - return os << toString(kind); +template <typename T> +static std::ostream& stream(std::ostream& os, const T&) { + return os << "[{}]"_format(typeName<T>()); +} +static std::ostream& stream(std::ostream& os, const Status& v) { + return os << v; +} +template <typename T> +static std::ostream& stream(std::ostream& os, const StatusWith<T>& v) { + if (!v.isOK()) + return stream(os, v.getStatus()); + return stream(os, v.getValue()); } -class ResultValue { -public: - ResultValue() = default; - explicit ResultValue(Status status) : _value(std::move(status)) {} - explicit ResultValue(StatusWith<Message> message) : _value(std::move(message)) {} - explicit ResultValue(StatusWith<DbResponse> response) : _value(std::move(response)) {} - - void setResponse(StatusWith<DbResponse> response) { - _value = response; - } - - StatusWith<DbResponse> getResponse() const { - return _convertTo<StatusWith<DbResponse>>(); - } +class Result { + using Variant = stdx::variant<stdx::monostate, Status, StatusWith<Message>, Future<DbResponse>>; - void setMessage(StatusWith<Message> message) { - _value = message; - } +public: + Result() = default; - StatusWith<Message> getMessage() const { - return _convertTo<StatusWith<Message>>(); - } + Result(Result&&) = default; + Result& operator=(Result&&) = default; - void setStatus(Status status) { - _value = status; - } + template <typename T, std::enable_if_t<std::is_constructible_v<Variant, T&&>, int> = 0> + explicit Result(T&& v) : _value{std::forward<T>(v)} {} - Status getStatus() const { - return _convertTo<Status>(); - } - - bool empty() const { - return _value.index() == 0; + template <typename T> + T consumeAs() && { + return stdx::visit( + [](auto&& alt) -> T { + using A = decltype(alt); + if constexpr (std::is_convertible<A, T>()) + return std::forward<A>(alt); + invariant(0, "{} => {}"_format(typeName<A>(), typeName<T>())); + MONGO_UNREACHABLE; + }, + std::exchange(_value, {})); } explicit operator bool() const { - return !empty(); + return _value.index() != 0; } private: - template <typename Target> - Target _convertTo() const { + friend std::string toString(const Result& r) { return stdx::visit( - [](auto alt) -> Target { - if constexpr (std::is_convertible<decltype(alt), Target>()) - return alt; - invariant(false, "ResultValue not convertible to target type"); - MONGO_COMPILER_UNREACHABLE; + [](auto&& alt) -> std::string { + using A = std::decay_t<decltype(alt)>; + std::ostringstream os; + stream(os << "[{}]"_format(typeName<A>()), alt); + return os.str(); }, - _value); + r._value); } - stdx::variant<stdx::monostate, Status, StatusWith<Message>, StatusWith<DbResponse>> _value; + Variant _value; }; -/** - * This class stores and synchronizes the shared state result between the test - * fixture and its various wrappers. - */ -struct StateResult { - Mutex mutex = MONGO_MAKE_LATCH("StateResult::_mutex"); - stdx::condition_variable cv; +Message makeOpMsg() { + static auto nextId = AtomicWord<int>{0}; + auto omb = OpMsgBuilder{}; + omb.setBody(BSONObjBuilder{}.append("id", nextId.fetchAndAdd(1)).obj()); + return omb.finish(); +} - AtomicWord<bool> isConnected{true}; +DbResponse makeResponse(Message m) { + DbResponse response{}; + response.response = m; + return response; +} - ResultValue result; - SessionState state; -}; +DbResponse setExhaust(DbResponse response) { + response.shouldRunAgainForExhaust = true; + return response; +} -/** - * The SessionWorkflowTest is a fixture that mocks the external inputs into the - * SessionWorkflow so as to provide a deterministic way to evaluate the session workflow - * implemenation. - */ -class SessionWorkflowTest : public LockerNoopServiceContextTest { -public: - /** - * Make a generic thread pool to deliver external inputs out of line (mocking the network or - * database workers). - */ - static std::shared_ptr<ThreadPool> makeThreadPool() { - auto options = ThreadPool::Options{}; - options.poolName = "SessionWorkflowTest"; +Message setExhaustSupported(Message msg) { + OpMsg::setFlag(&msg, OpMsg::kExhaustSupported); + return msg; +} - return std::make_shared<ThreadPool>(std::move(options)); - } +Message setMoreToCome(Message msg) { + OpMsg::setFlag(&msg, OpMsg::kMoreToCome); + return msg; +} - void setUp() override; - void tearDown() override; +/** Fixture that mocks interactions with a `SessionWorkflow`. */ +class SessionWorkflowTest : public LockerNoopServiceContextTest { + using Base = LockerNoopServiceContextTest; - /** - * This function blocks until the SessionWorkflowTest observes a state change. - */ - SessionState popSessionState() { - return _stateQueue.pop(); +public: + void setUp() override { + Base::setUp(); + auto sc = getServiceContext(); + sc->setServiceEntryPoint(_makeServiceEntryPoint(sc)); + initializeNewSession(); + invariant(sep()->start()); + _threadPool->startup(); } - /** - * This function asserts that the SessionWorkflowTest has not yet observed a state change. - * - * Note that this function does not guarantee that it will not observe a state change in the - * future. - */ - void assertNoSessionState() { - if (auto maybeState = _stateQueue.tryPop()) { - FAIL("The queue is not empty, state: ") << *maybeState; - } + void tearDown() override { + ScopeGuard guard = [&] { Base::tearDown(); }; + // Normal shutdown is a noop outside of ASAN. + invariant(sep()->shutdownAndWait(Seconds{10})); + _threadPool->shutdown(); + _threadPool->join(); } - /** - * This function stores an external response to be delivered out of line to the - * SessionWorkflow. - */ - void setResult(SessionState state, ResultValue result) { - auto lk = stdx::lock_guard(_stateResult->mutex); - invariant(state == SessionState::kPoll || state == SessionState::kSource || - state == SessionState::kProcess || state == SessionState::kSink); - _stateResult->result = std::move(result); - _stateResult->state = state; - _stateResult->cv.notify_one(); + void initializeNewSession() { + _session = std::make_shared<CustomMockSession>(this); } - /** - * This function makes a generic result appropriate for a successful state change given - * SessionState and RequestKind. - */ - ResultValue makeGenericResult(SessionState state, RequestKind kind) { - ResultValue result; - switch (state) { - case SessionState::kPoll: - case SessionState::kSink: - result.setStatus(Status::OK()); - break; - case SessionState::kSource: { - Message message = _makeIndexedBson(); - switch (kind) { - case RequestKind::kExhaust: - OpMsg::setFlag(&message, OpMsg::kExhaustSupported); - break; - case RequestKind::kDefault: - case RequestKind::kMoreToCome: - break; - } - result.setMessage(StatusWith<Message>(message)); - } break; - case SessionState::kProcess: { - DbResponse response; - switch (kind) { - case RequestKind::kDefault: - response.response = _makeIndexedBson(); - break; - case RequestKind::kExhaust: - response.response = _makeIndexedBson(); - response.shouldRunAgainForExhaust = true; - break; - case RequestKind::kMoreToCome: - break; - } - result.setResponse(response); - } break; - default: - invariant( - false, - "Unable to make generic result for this state: {}"_format(toString(state))); - } - return result; + /** Waits for the current Session and SessionWorkflow to end. */ + void joinSessions() { + ASSERT(sep()->waitForNoSessions(Seconds{1})); } - /** - * Initialize a new Session. - */ - void initNewSession(); - - /** - * Launch a SessionWorkflow for the current session. - */ - void startSession(); + /** Launches a SessionWorkflow for the current session. */ + void startSession() { + LOGV2(6742613, "Starting session"); + sep()->startSession(_session); + } - /** - * Wait for the current Session and SessionWorkflow to end. - */ - void joinSession(); + MockServiceEntryPoint* sep() { + return checked_cast<MockServiceEntryPoint*>(getServiceContext()->getServiceEntryPoint()); + } /** - * Mark the session as no longer connected. + * Installs an arbitrary one-shot mock handler callback for the next event. + * The next incoming mock event will invoke this callback and destroy it. */ - void endSession() { - auto lk = stdx::lock_guard(_stateResult->mutex); - if (_stateResult->isConnected.swap(false)) { - LOGV2(5014101, "Ending session"); - _stateResult->cv.notify_one(); - } + void injectMockResponse(unique_function<Result(Event)> cb) { + _expect.push(std::move(cb)); } /** - * Start a brand new session, run the given function, and then join the session. + * Wrapper around `injectMockResponse`. Installs a handler for the `expected` + * mock event, that will return the specified `result`. + * Returns a `Future` that is fulfilled when that mock event occurs. */ - template <typename F> - void runWithNewSession(F&& func) { - initNewSession(); - startSession(); - - auto firstState = popSessionState(); - ASSERT(firstState == SessionState::kSource || firstState == SessionState::kPoll) - << "State was instead: " << toString(firstState); - - std::forward<F>(func)(); - - joinSession(); - } - - void terminateViaServiceEntryPoint(); - - bool isConnected() const { - return _stateResult->isConnected.load(); + Future<void> asyncExpect(Event expected, Result r) { + auto pf = std::make_shared<PromiseAndFuture<void>>(); + injectMockResponse([r = std::move(r), expected, pf](Event event) mutable { + invariant(event == expected, + "Expected {}, but got {}"_format(toString(expected), toString(event))); + pf->promise.emplaceValue(); + return std::move(r); + }); + return std::move(pf->future); } - int onClientDisconnectCalledTimes() const { - return _onClientDisconnectCalled; + void expect(Event expected, Result r) { + return asyncExpect(expected, std::move(r)).get(); } private: - /** - * Generate a resonably generic BSON with an id for use in debugging. - */ - static Message _makeIndexedBson() { - auto bob = BSONObjBuilder(); - static auto nextId = AtomicWord<int>{0}; - bob.append("id", nextId.fetchAndAdd(1)); - - auto omb = OpMsgBuilder{}; - omb.setBody(bob.obj()); - return omb.finish(); - } + class MockExpectationSlot { + public: + void push(unique_function<Result(Event)> cb) { + stdx::lock_guard lk{_mutex}; + invariant(!_cb); + _cb = std::move(cb); + _cv.notify_one(); + } - /** - * Use an external result to mock handling a request. - */ - StatusWith<DbResponse> _processRequest(OperationContext* opCtx, const Message&) noexcept { - _stateQueue.push(SessionState::kProcess); - - auto result = [&]() -> StatusWith<DbResponse> { - auto lk = stdx::unique_lock(_stateResult->mutex); - _stateResult->cv.wait(lk, [this] { - return (_stateResult->result && _stateResult->state == SessionState::kProcess) || - !isConnected(); - }); - - if (!isConnected()) { - return kClosedSessionError; - } + unique_function<Result(Event)> pop() { + stdx::unique_lock lk{_mutex}; + _cv.wait(lk, [&] { return !!_cb; }); + return std::exchange(_cb, {}); + } - invariant(_stateResult->result); - return std::exchange(_stateResult->result, {}).getResponse(); - }(); + private: + mutable Mutex _mutex; + stdx::condition_variable _cv; + unique_function<Result(Event)> _cb; + }; - LOGV2(5014100, "Handled request", "error"_attr = result.getStatus()); + class CustomMockSession : public CallbackMockSession { + public: + explicit CustomMockSession(SessionWorkflowTest* fixture) { + endCb = [this] { *_connected = false; }; + isConnectedCb = [this] { return *_connected; }; + waitForDataCb = [fixture] { + return fixture->_onMockEvent<Status>(Event::kSessionWaitForData); + }; + sourceMessageCb = [fixture] { + return fixture->_onMockEvent<StatusWith<Message>>(Event::kSessionSourceMessage); + }; + sinkMessageCb = [fixture](Message) { + return fixture->_onMockEvent<Status>(Event::kSessionSinkMessage); + }; + // The async variants will just run the same callback on `_threadPool`. + auto async = [fixture](auto cb) { + return ExecutorFuture<void>(fixture->_threadPool).then(cb).unsafeToInlineFuture(); + }; + asyncWaitForDataCb = [=, cb = waitForDataCb] { return async([cb] { return cb(); }); }; + asyncSourceMessageCb = [=, cb = sourceMessageCb](const BatonHandle&) { + return async([cb] { return cb(); }); + }; + asyncSinkMessageCb = [=, cb = sinkMessageCb](Message m, const BatonHandle&) { + return async([cb, m = std::move(m)]() mutable { return cb(std::move(m)); }); + }; + } - return result; - } + private: + synchronized_value<bool> _connected{true}; // Born in the connected state. + }; - Future<DbResponse> _handleRequest(OperationContext* opCtx, const Message& request) noexcept { - auto [p, f] = makePromiseFuture<DbResponse>(); - ExecutorFuture<void>(_threadPool) - .then([this, opCtx, &request, p = std::move(p)]() mutable { - p.setWith([&] { return _processRequest(opCtx, request); }); - }) - .getAsync([](auto&&) {}); - return std::move(f); + std::shared_ptr<ThreadPool> _makeThreadPool() { + ThreadPool::Options options{}; + options.poolName = "SessionWorkflowTest"; + return std::make_shared<ThreadPool>(std::move(options)); } /** - * Use an external result to mock polling for data and observe the state. + * Simulates an async command implemented under the borrowed thread model. + * The promise must be fulfilled while holding ClientStrand. */ - Status _waitForData() { - _stateQueue.push(SessionState::kPoll); - - auto result = [&]() -> Status { - auto lk = stdx::unique_lock(_stateResult->mutex); - _stateResult->cv.wait(lk, [this] { - return (_stateResult->result && _stateResult->state == SessionState::kPoll) || - !isConnected(); - }); - - if (!isConnected()) { - return kClosedSessionError; - } - - invariant(_stateResult->result); - return std::exchange(_stateResult->result, {}).getStatus(); - }(); - - LOGV2(5014102, "Finished waiting for data", "error"_attr = result); - return result; + Future<DbResponse> _asyncHandleRequest(OperationContext* opCtx, + unique_function<Future<DbResponse>()> cb) { + auto pf = PromiseAndFuture<DbResponse>(); + ExecutorFuture<void>(_threadPool) + .then([strand = ClientStrand::get(opCtx->getClient()), + cb = std::move(cb), + p = std::move(pf.promise)]() mutable { + strand->run([&] { p.setWith([&] { return cb(); }); }); + }) + .getAsync([](auto&&) {}); + return std::move(pf.future); } - /** - * Use an external result to mock reading data and observe the state. - */ - StatusWith<Message> _sourceMessage() { - _stateQueue.push(SessionState::kSource); - - auto result = [&]() -> StatusWith<Message> { - auto lk = stdx::unique_lock(_stateResult->mutex); - _stateResult->cv.wait(lk, [this] { - return (_stateResult->result && _stateResult->state == SessionState::kSource) || - !isConnected(); - }); - - if (!isConnected()) { - return kClosedSessionError; - } - - invariant(_stateResult->result); - return std::exchange(_stateResult->result, {}).getMessage(); - }(); - - LOGV2(5014103, "Sourced message", "error"_attr = result.getStatus()); - - return result; + std::unique_ptr<MockServiceEntryPoint> _makeServiceEntryPoint(ServiceContext* sc) { + auto sep = std::make_unique<MockServiceEntryPoint>(sc); + sep->handleRequestCb = [=](OperationContext* opCtx, const Message&) { + auto cb = [this] { return _onMockEvent<Future<DbResponse>>(Event::kSepHandleRequest); }; + if (!gInitialUseDedicatedThread) + return _asyncHandleRequest(opCtx, std::move(cb)); + return cb(); + }; + sep->onEndSessionCb = [=](const SessionHandle&) { + _onMockEvent<void>(Event::kSepEndSession); + }; + sep->derivedOnClientDisconnectCb = [&](Client*) {}; + return sep; } /** - * Use an external result to mock writing data and observe the state. + * Called by all mock functions to notify the main thread and get a value with which to respond. + * The mock function call is identified by an `event`. If there isn't already an expectation, + * the mock object will wait for one to be injected via a call to `injectMockResponse`. */ - Status _sinkMessage(Message message) { - _stateQueue.push(SessionState::kSink); - - auto result = [&]() -> Status { - auto lk = stdx::unique_lock(_stateResult->mutex); - _stateResult->cv.wait(lk, [this] { - return (_stateResult->result && _stateResult->state == SessionState::kSink) || - !isConnected(); - }); - - if (!isConnected()) { - return kClosedSessionError; - } - - invariant(_stateResult->result); - return std::exchange(_stateResult->result, {}).getStatus(); - }(); - - LOGV2(5014104, "Sunk message", "error"_attr = result); - return result; - } - - Future<void> _asyncWaitForData() noexcept { - return ExecutorFuture<void>(_threadPool) - .then([this] { return _waitForData(); }) - .unsafeToInlineFuture(); - } - - Future<Message> _asyncSourceMessage() noexcept { - return ExecutorFuture<void>(_threadPool) - .then([this] { return _sourceMessage(); }) - .unsafeToInlineFuture(); - } - - Future<void> _asyncSinkMessage(Message message) noexcept { - return ExecutorFuture<void>(_threadPool) - .then([this, message = std::move(message)]() mutable { - return _sinkMessage(std::move(message)); - }) - .unsafeToInlineFuture(); + template <typename Target> + Target _onMockEvent(Event event) { + LOGV2_DEBUG(6742616, 2, "Mock event arrived", "event"_attr = event); + Result r = _expect.pop()(event); + LOGV2_DEBUG(6742618, 2, "Responding", "event"_attr = event, "result"_attr = toString(r)); + if constexpr (std::is_same_v<Target, void>) { + std::move(r).consumeAs<stdx::monostate>(); + return; + } else { + return std::move(r).consumeAs<Target>(); + } } - MockServiceEntryPoint* _sep; + MockExpectationSlot _expect; + std::shared_ptr<CustomMockSession> _session; + std::shared_ptr<ThreadPool> _threadPool = _makeThreadPool(); +}; - const std::shared_ptr<ThreadPool> _threadPool = makeThreadPool(); +TEST_F(SessionWorkflowTest, StartThenEndSession) { + startSession(); + expect(Event::kSessionSourceMessage, Result{kClosedSessionError}); + expect(Event::kSepEndSession, Result{}); + joinSessions(); +} - std::unique_ptr<StateResult> _stateResult; +TEST_F(SessionWorkflowTest, OneNormalCommand) { + startSession(); + expect(Event::kSessionSourceMessage, Result{makeOpMsg()}); + expect(Event::kSepHandleRequest, Result{makeResponse(makeOpMsg())}); + expect(Event::kSessionSinkMessage, Result{Status::OK()}); + expect(Event::kSessionSourceMessage, Result{kClosedSessionError}); + expect(Event::kSepEndSession, Result{}); + joinSessions(); +} - std::shared_ptr<CallbackMockSession> _session; - SingleProducerSingleConsumerQueue<SessionState> _stateQueue; +TEST_F(SessionWorkflowTest, OnClientDisconnectCalledOnCleanup) { + int disconnects = 0; + sep()->derivedOnClientDisconnectCb = [&](Client*) { ++disconnects; }; + startSession(); + ASSERT_EQ(disconnects, 0); + expect(Event::kSessionSourceMessage, Result{kClosedSessionError}); + expect(Event::kSepEndSession, Result{}); + joinSessions(); + ASSERT_EQ(disconnects, 1); +} - int _onClientDisconnectCalled{0}; -}; +/** Repro of one formerly troublesome scenario generated by the StepRunner test below. */ +TEST_F(SessionWorkflowTest, MoreToComeDisconnectAtSource3) { + startSession(); + // One more-to-come command, yields an empty response per wire protocol + expect(Event::kSessionSourceMessage, Result{setMoreToCome(makeOpMsg())}); + expect(Event::kSepHandleRequest, Result{makeResponse({})}); + // Another message from session, this time a normal RPC. + expect(Event::kSessionSourceMessage, Result{makeOpMsg()}); + expect(Event::kSepHandleRequest, Result{makeResponse(makeOpMsg())}); + expect(Event::kSessionSinkMessage, Result{Status::OK()}); + // Client disconnects while we're waiting for their next command. + expect(Event::kSessionSourceMessage, Result{kShutdownError}); + expect(Event::kSepEndSession, Result{}); + joinSessions(); +} /** - * This class iterates over the potential methods of failure for a set of steps. + * Check the behavior of an interrupted "getMore" exhaust command. + * SessionWorkflow looks specifically for the "getMore" command name to trigger + * this cleanup. */ -class StepRunner { - /** - * This is a simple data structure describing the external response for one state in the - * session workflow. - */ - struct Step { - SessionState state; - RequestKind kind; +TEST_F(SessionWorkflowTest, CleanupFromGetMore) { + // For this test, SEP handleRequest will stash a copy of the request. + // When a handleRequest has been observed, the test can check this variable + // to peek at it. + auto stashedRequest = std::make_shared<synchronized_value<Message>>(); + sep()->handleRequestCb = [orig = sep()->handleRequestCb, + stashedRequest](OperationContext* opCtx, const Message& msg) { + **stashedRequest = msg; + return orig(opCtx, msg); }; - using StepList = std::vector<Step>; -public: - StepRunner(SessionWorkflowTest* fixture) : _fixture{fixture} {} - ~StepRunner() { - invariant(_runCount > 0, "StepRunner expects to be run at least once"); - } + initializeNewSession(); + startSession(); - /** - * Given a FailureCondition, cause an external result to be delivered that is appropriate for - * the given state and request kind. - */ - SessionState doGenericStep(const Step& step, FailureCondition fail) { - switch (fail) { - case FailureCondition::kNone: { - _fixture->setResult(step.state, _fixture->makeGenericResult(step.state, step.kind)); - } break; - case FailureCondition::kTerminate: { - _fixture->terminateViaServiceEntryPoint(); - // We expect that the session will be disconnected via the SEP, no need to set any - // result. - } break; - case FailureCondition::kDisconnect: { - _fixture->endSession(); - // We expect that the session will be disconnected directly, no need to set any - // result. - } break; - case FailureCondition::kNetwork: { - _fixture->setResult(step.state, ResultValue(kNetworkError)); - } break; - case FailureCondition::kShutdown: { - _fixture->setResult(step.state, ResultValue(kShutdownError)); - } break; - case FailureCondition::kArbitrary: { - _fixture->setResult(step.state, ResultValue(kArbitraryError)); - } break; - }; + auto makeGetMoreRequest = [](int64_t cursorId) { + OpMsgBuilder omb; + omb.setBody(BSONObjBuilder{} + .append("getMore", cursorId) + .append("collection", "testColl") + .append("$db", "testDb") + .obj()); + return setExhaustSupported(omb.finish()); + }; - return _fixture->popSessionState(); - } + auto makeGetMoreResponse = [] { + DbResponse response; + OpMsgBuilder omb; + omb.setBody(BSONObjBuilder{}.append("id", int64_t{0}).obj()); + response.response = omb.finish(); + return response; + }; - /** - * Mark an additional expected state in the session workflow. - */ - void expectNextState(SessionState state, RequestKind kind) { - auto step = Step{}; - step.state = state; - step.kind = kind; - _steps.emplace_back(std::move(step)); + // Produce the condition of having an active `getMore` exhaust command. + expect(Event::kSessionSourceMessage, Result{makeGetMoreRequest(123)}); + expect(Event::kSepHandleRequest, Result{setExhaust(makeGetMoreResponse())}); + + // Simulate a disconnect during the session sink call. The cleanup of + // exhaust resources happens when the session disconnects. So after the send + // of the "getMore" response returns the injected error, expect the + // SessionWorkflow to issue a fire-and-forget "killCursors". + expect(Event::kSessionSinkMessage, Result{kClosedSessionError}); + { + auto killCursors = std::make_shared<Notification<Message>>(); + auto unpause = std::make_shared<Notification<void>>(); + ScopeGuard unpauseGuard = [&] { unpause->set(); }; + injectMockResponse([killCursors, unpause, stashedRequest](Event e) { + invariant(e == Event::kSepHandleRequest); + killCursors->set(**stashedRequest); + unpause->get(); // Pause while main thread examines killCursors. + return Result{makeResponse({})}; // fire-and-forget disallows response. + }); + ASSERT_EQ(OpMsgRequest::parse(killCursors->get()).getCommandName(), "killCursors"_sd); } + // Because they're fire-and-forget commands, we will only observe `handleRequest` + // calls to the SEP for the cleanup "killCursors", and the next thing to happen + // will be the end of the session. + expect(Event::kSepEndSession, Result{}); + joinSessions(); +} + +class StepRunnerSessionWorkflowTest : public SessionWorkflowTest { +public: /** - * Mark the final expected state in the session workflow. + * Concisely encode the ways this test might respond to mock events. + * The OK Result contents depend on which Event it's responding to. */ - void expectFinalState(SessionState state) { - _finalState = state; + enum class Action { + kDefault, // OK result for a basic (request and response) command. + kExhaust, // OK result for a exhuast command. + kMoreToCome, // OK result for a fire-and-forget command. + + kErrTerminate, // External termination via the ServiceEntryPoint. + kErrDisconnect, // Socket disconnection by peer. + kErrNetwork, // Unspecified network failure (ala host unreachable). + kErrShutdown, // System shutdown. + kErrArbitrary, // An arbitrary error that does not fall under the other conditions. + }; + + friend StringData toString(Action k) { + return findEnumName({{Action::kDefault, "Default"_sd}, + {Action::kExhaust, "Exhaust"_sd}, + {Action::kMoreToCome, "MoreToCome"_sd}, + {Action::kErrTerminate, "ErrTerminate"_sd}, + {Action::kErrDisconnect, "ErrDisconnect"_sd}, + {Action::kErrNetwork, "ErrNetwork"_sd}, + {Action::kErrShutdown, "ErrShutdown"_sd}, + {Action::kErrArbitrary, "ErrArbitrary"_sd}}, + k); } /** - * Run a set of variations on the steps, failing further and further along the way. + * Given a list of steps, performs a series of tests exercising that list. + * + * The `run()` function performs a set of variations on the steps, failing + * further and further along the way, with different errors tried at each + * step. + * + * It first sets a baseline by running all the steps without injecting + * failure. Then it checks each failure condition for each step in the + * sequence. For example, if we have steps[NS] and failure conditions + * fails[NF], it will run these pseudocode trials: + * + * // First, no errors. + * { steps[0](OK); steps[1](OK); ... steps[NS-1](OK); } * - * This function first runs all the steps without failure to set a baseline. It then checks each - * failure condition for each step going forward. For example, if we have steps Source, Sink, - * and End and failure conditions None, Network, and Terminate, it will run these variations in - * this order: - * [(Source, None), (Sink, None), (End)] - * [(Source, Network), (End)] - * [(Source, Terminate), (End)] - * [(Source, None), (Sink, Network), (End)] - * [(Source, None), (Sink, Terminate), (End)] + * // Inject each kind of failure at steps[0]. + * { steps[0](fails[0]); } + * { steps[0](fails[1]); } + * ... and so on for fails[NF]. + * + * // Now let steps[0] succeed, but inject each kind of failure at steps[1]. + * { steps[0](OK); steps[1](fails[0]); } + * { steps[0](OK); steps[1](fails[1]); } + * ... and so on for fails[NF]. + * + * // And so on the NS steps.... */ - void run() { - invariant(_finalState); - - auto getExpectedPostState = [&](auto iter) { - auto nextIter = ++iter; - if (nextIter == _steps.end()) { - return *_finalState; - } - return nextIter->state; + class RunAllErrorsAtAllSteps { + public: + /** The set of failures is hardcoded. */ + static constexpr std::array fails{Action::kErrTerminate, + Action::kErrDisconnect, + Action::kErrNetwork, + Action::kErrShutdown, + Action::kErrArbitrary}; + + /** Encodes a response to `event` by taking `action`. */ + struct Step { + Event event; + Action action = Action::kDefault; }; - // Do one entirely clean run. - LOGV2(5014106, "Running success case"); - _fixture->runWithNewSession([&] { - for (auto iter = _steps.begin(); iter != _steps.end(); ++iter) { - ASSERT_EQ(doGenericStep(*iter, FailureCondition::kNone), - getExpectedPostState(iter)); + // The final step is assumed to have `kErrDisconnect` as an action, + // yielding an implied `kEnd` step. + RunAllErrorsAtAllSteps(SessionWorkflowTest* fixture, std::deque<Step> steps) + : _fixture{fixture}, _steps{[&, at = steps.size() - 1] { + return _appendTermination(std::move(steps), at, Action::kErrDisconnect); + }()} {} + + /** + * Run all of the trials specified by the constructor. + */ + void run() { + const std::deque<Step> baseline(_steps.begin(), _steps.end()); + LOGV2(5014106, "Running one entirely clean run"); + _runSteps(baseline); + // Incrementally push forward the step where we fail. + for (size_t failAt = 0; failAt + 1 < baseline.size(); ++failAt) { + LOGV2(6742614, "Injecting failures", "failAt"_attr = failAt); + for (auto fail : fails) + _runSteps(_appendTermination(baseline, failAt, fail)); } + } - _fixture->endSession(); - ASSERT_EQ(_fixture->popSessionState(), SessionState::kEnd); - }); + private: + /** + * Returns a new steps sequence, formed by copying the specified `q`, and + * modifying the copy to be terminated with a `fail` at the `failAt` index. + */ + std::deque<Step> _appendTermination(std::deque<Step> q, size_t failAt, Action fail) const { + LOGV2( + 6742617, "appendTermination", "fail"_attr = toString(fail), "failAt"_attr = failAt); + invariant(failAt < q.size()); + q.erase(q.begin() + failAt + 1, q.end()); + q.back().action = fail; + q.push_back({Event::kSepEndSession}); + return q; + } - const auto failList = std::vector<FailureCondition>{FailureCondition::kTerminate, - FailureCondition::kDisconnect, - FailureCondition::kNetwork, - FailureCondition::kShutdown, - FailureCondition::kArbitrary}; + template <typename T> + void _dumpTransitions(const T& q) { + BSONArrayBuilder bab; + for (auto&& t : q) { + BSONObjBuilder{bab.subobjStart()} + .append("event", toString(t.event)) + .append("action", toString(t.action)); + } + LOGV2(6742615, "Run transitions", "transitions"_attr = bab.arr()); + } - for (auto failIter = _steps.begin(); failIter != _steps.end(); ++failIter) { - // Incrementally push forward the step where we fail. - for (auto fail : failList) { - LOGV2(5014105, - "Running failure case", - "failureCase"_attr = fail, - "sessionState"_attr = failIter->state, - "requestKind"_attr = failIter->kind); - - _fixture->runWithNewSession([&] { - auto iter = _steps.begin(); - for (; iter != failIter; ++iter) { - // Run through each step until our point of failure with - // FailureCondition::kNone. - ASSERT_EQ(doGenericStep(*iter, FailureCondition::kNone), - getExpectedPostState(iter)) - << "Current state: (" << iter->state << ", " << iter->kind << ")"; + /** Makes a result for a successful event. */ + Result _successResult(Event event, Action action) { + switch (event) { + case Event::kSepEndSession: + return Result{}; + case Event::kSessionWaitForData: + case Event::kSessionSinkMessage: + return Result{Status::OK()}; + case Event::kSessionSourceMessage: { + Message m = makeOpMsg(); + if (action == Action::kExhaust) + m = setExhaustSupported(m); + return Result{StatusWith{std::move(m)}}; + } + case Event::kSepHandleRequest: + switch (action) { + case Action::kDefault: + return Result{StatusWith{makeResponse(makeOpMsg())}}; + case Action::kExhaust: + return Result{StatusWith{setExhaust(makeResponse(makeOpMsg()))}}; + case Action::kMoreToCome: + return Result{StatusWith{DbResponse{}}}; + default: + MONGO_UNREACHABLE; } - - // Finally fail on a given step. - ASSERT_EQ(doGenericStep(*iter, fail), SessionState::kEnd); - }); } + MONGO_UNREACHABLE; } - _runCount += 1; - } - -private: - SessionWorkflowTest* const _fixture; - - boost::optional<SessionState> _finalState; - StepList _steps; + void injectStep(const Step& t) { + LOGV2_DEBUG( + 6872301, 3, "Inject step", "event"_attr = t.event, "action"_attr = t.action); + switch (t.action) { + case Action::kErrTerminate: { + // Has a side effect of simulating a ServiceEntryPoint shutdown + // before responding with a shutdown error. + auto pf = std::make_shared<PromiseAndFuture<void>>(); + _fixture->injectMockResponse([this, t, pf](Event e) { + invariant(e == t.event); + _fixture->sep()->endAllSessionsNoTagMask(); + pf->promise.emplaceValue(); + return Result{kShutdownError}; + }); + pf->future.get(); + } break; + case Action::kErrDisconnect: + _fixture->expect(t.event, Result{kClosedSessionError}); + break; + case Action::kErrNetwork: + _fixture->expect(t.event, Result(kNetworkError)); + break; + case Action::kErrShutdown: + _fixture->expect(t.event, Result(kShutdownError)); + break; + case Action::kErrArbitrary: + _fixture->expect(t.event, Result(kArbitraryError)); + break; + case Action::kDefault: + case Action::kExhaust: + case Action::kMoreToCome: + _fixture->expect(t.event, _successResult(t.event, t.action)); + break; + } + } - // This variable is currently used as a post-condition to make sure that the StepRunner has been - // run. In the current form, it could be a boolean. That said, if you need to stress test the - // SessionWorkflow, you will want to check this variable to make sure you have run as many - // times as you expect. - size_t _runCount = 0; -}; + /** Start a new session, run the `steps` sequence, and join the session. */ + void _runSteps(std::deque<Step> q) { + _dumpTransitions(q); + _fixture->initializeNewSession(); + _fixture->startSession(); + for (; !q.empty(); q.pop_front()) + injectStep(q.front()); + _fixture->joinSessions(); + } -void SessionWorkflowTest::initNewSession() { - assertNoSessionState(); - - _session = std::make_shared<CallbackMockSession>(); - _session->endCb = [&] { endSession(); }; - _session->isConnectedCb = [&] { return isConnected(); }; - _session->waitForDataCb = [&] { return _waitForData(); }; - _session->sourceMessageCb = [&] { return _sourceMessage(); }; - _session->sinkMessageCb = [&](Message message) { return _sinkMessage(std::move(message)); }; - _session->asyncWaitForDataCb = [&] { return _asyncWaitForData(); }; - _session->asyncSourceMessageCb = [&](const BatonHandle&) { return _asyncSourceMessage(); }; - _session->asyncSinkMessageCb = [&](Message message, const BatonHandle&) { - return _asyncSinkMessage(std::move(message)); + SessionWorkflowTest* _fixture; + std::deque<Step> _steps; }; - _stateResult->isConnected.store(true); -} - -void SessionWorkflowTest::joinSession() { - ASSERT(_sep->waitForNoSessions(Seconds{1})); - - assertNoSessionState(); -} -void SessionWorkflowTest::startSession() { - _sep->startSession(_session); -} - -void SessionWorkflowTest::terminateViaServiceEntryPoint() { - _sep->endAllSessionsNoTagMask(); -} - -void SessionWorkflowTest::setUp() { - ServiceContextTest::setUp(); - - auto sep = std::make_unique<MockServiceEntryPoint>(getServiceContext()); - sep->handleRequestCb = [&](OperationContext* opCtx, const Message& request) { - return _handleRequest(opCtx, request); - }; - sep->onEndSessionCb = [&](const transport::SessionHandle& session) { - invariant(session == _session); - _stateQueue.push(SessionState::kEnd); - }; - sep->derivedOnClientDisconnectCb = [&](Client*) { ++_onClientDisconnectCalled; }; - _sep = sep.get(); - getServiceContext()->setServiceEntryPoint(std::move(sep)); - invariant(_sep->start()); - - _threadPool->startup(); - - _stateResult = std::make_unique<StateResult>(); -} - -void SessionWorkflowTest::tearDown() { - ON_BLOCK_EXIT([&] { ServiceContextTest::tearDown(); }); - - endSession(); + void runSteps(std::deque<RunAllErrorsAtAllSteps::Step> steps) { + RunAllErrorsAtAllSteps{this, steps}.run(); + } - // Normal shutdown is a noop outside of ASAN. - invariant(_sep->shutdownAndWait(Seconds{10})); + std::deque<RunAllErrorsAtAllSteps::Step> defaultLoop() const { + return { + {Event::kSessionSourceMessage}, + {Event::kSepHandleRequest}, + {Event::kSessionSinkMessage}, + {Event::kSessionSourceMessage}, + }; + } - _threadPool->shutdown(); - _threadPool->join(); -} + std::deque<RunAllErrorsAtAllSteps::Step> exhaustLoop() const { + return { + {Event::kSessionSourceMessage, Action::kExhaust}, + {Event::kSepHandleRequest, Action::kExhaust}, + {Event::kSessionSinkMessage}, + {Event::kSepHandleRequest}, + {Event::kSessionSinkMessage}, + {Event::kSessionSourceMessage}, + }; + } -template <bool useDedicatedThread> -class DedicatedThreadOverrideTest : public SessionWorkflowTest { - ScopedValueOverride<bool> _svo{gInitialUseDedicatedThread, useDedicatedThread}; + std::deque<RunAllErrorsAtAllSteps::Step> moreToComeLoop() const { + return { + {Event::kSessionSourceMessage, Action::kMoreToCome}, + {Event::kSepHandleRequest, Action::kMoreToCome}, + {Event::kSessionSourceMessage}, + {Event::kSepHandleRequest}, + {Event::kSessionSinkMessage}, + {Event::kSessionSourceMessage}, + }; + } }; -using SessionWorkflowWithBorrowedThreadsTest = DedicatedThreadOverrideTest<false>; -using SessionWorkflowWithDedicatedThreadsTest = DedicatedThreadOverrideTest<true>; - -TEST_F(SessionWorkflowTest, StartThenEndSession) { - initNewSession(); - startSession(); - - ASSERT_EQ(popSessionState(), SessionState::kSource); - - endSession(); -} - -TEST_F(SessionWorkflowTest, EndBeforeStartSession) { - initNewSession(); - endSession(); - startSession(); -} - -TEST_F(SessionWorkflowTest, OnClientDisconnectCalledOnCleanup) { - initNewSession(); - startSession(); - ASSERT_EQ(popSessionState(), SessionState::kSource); - ASSERT_EQ(onClientDisconnectCalledTimes(), 0); - endSession(); - ASSERT_EQ(popSessionState(), SessionState::kEnd); - joinSession(); - ASSERT_EQ(onClientDisconnectCalledTimes(), 1); -} +class SessionWorkflowWithDedicatedThreadsTest : public StepRunnerSessionWorkflowTest { + ScopedValueOverride<bool> _svo{gInitialUseDedicatedThread, true}; +}; TEST_F(SessionWorkflowWithDedicatedThreadsTest, DefaultLoop) { - auto runner = StepRunner(this); - - runner.expectNextState(SessionState::kSource, RequestKind::kDefault); - runner.expectNextState(SessionState::kProcess, RequestKind::kDefault); - runner.expectNextState(SessionState::kSink, RequestKind::kDefault); - runner.expectFinalState(SessionState::kSource); - - runner.run(); + runSteps(defaultLoop()); } TEST_F(SessionWorkflowWithDedicatedThreadsTest, ExhaustLoop) { - auto runner = StepRunner(this); - - runner.expectNextState(SessionState::kSource, RequestKind::kExhaust); - runner.expectNextState(SessionState::kProcess, RequestKind::kExhaust); - runner.expectNextState(SessionState::kSink, RequestKind::kExhaust); - runner.expectNextState(SessionState::kProcess, RequestKind::kDefault); - runner.expectNextState(SessionState::kSink, RequestKind::kDefault); - runner.expectFinalState(SessionState::kSource); - - runner.run(); + runSteps(exhaustLoop()); } TEST_F(SessionWorkflowWithDedicatedThreadsTest, MoreToComeLoop) { - auto runner = StepRunner(this); - - runner.expectNextState(SessionState::kSource, RequestKind::kMoreToCome); - runner.expectNextState(SessionState::kProcess, RequestKind::kMoreToCome); - runner.expectNextState(SessionState::kSource, RequestKind::kDefault); - runner.expectNextState(SessionState::kProcess, RequestKind::kDefault); - runner.expectNextState(SessionState::kSink, RequestKind::kDefault); - runner.expectFinalState(SessionState::kSource); - - runner.run(); + runSteps(moreToComeLoop()); } -TEST_F(SessionWorkflowWithBorrowedThreadsTest, DefaultLoop) { - auto runner = StepRunner(this); +class SessionWorkflowWithBorrowedThreadsTest : public StepRunnerSessionWorkflowTest { +public: + /** + * Under the borrowed thread model, the steps are the same as for dedicated thread model, + * except that Session sourceMessage events are preceded by Session waitForData events. + */ + std::deque<RunAllErrorsAtAllSteps::Step> convertStepsToBorrowed( + std::deque<RunAllErrorsAtAllSteps::Step> q) { + for (auto iter = q.begin(); iter != q.end(); ++iter) + if (iter->event == Event::kSessionSourceMessage) + iter = std::next(q.insert(iter, {Event::kSessionWaitForData})); + return q; + } - runner.expectNextState(SessionState::kPoll, RequestKind::kDefault); - runner.expectNextState(SessionState::kSource, RequestKind::kDefault); - runner.expectNextState(SessionState::kProcess, RequestKind::kDefault); - runner.expectNextState(SessionState::kSink, RequestKind::kDefault); - runner.expectFinalState(SessionState::kPoll); +private: + ScopedValueOverride<bool> _svo{gInitialUseDedicatedThread, false}; +}; - runner.run(); +TEST_F(SessionWorkflowWithBorrowedThreadsTest, DefaultLoop) { + runSteps(convertStepsToBorrowed(defaultLoop())); } TEST_F(SessionWorkflowWithBorrowedThreadsTest, ExhaustLoop) { - auto runner = StepRunner(this); - - runner.expectNextState(SessionState::kPoll, RequestKind::kExhaust); - runner.expectNextState(SessionState::kSource, RequestKind::kExhaust); - runner.expectNextState(SessionState::kProcess, RequestKind::kExhaust); - runner.expectNextState(SessionState::kSink, RequestKind::kExhaust); - runner.expectNextState(SessionState::kProcess, RequestKind::kDefault); - runner.expectNextState(SessionState::kSink, RequestKind::kDefault); - runner.expectFinalState(SessionState::kPoll); - - runner.run(); + runSteps(convertStepsToBorrowed(exhaustLoop())); } TEST_F(SessionWorkflowWithBorrowedThreadsTest, MoreToComeLoop) { - auto runner = StepRunner(this); - - runner.expectNextState(SessionState::kPoll, RequestKind::kMoreToCome); - runner.expectNextState(SessionState::kSource, RequestKind::kMoreToCome); - runner.expectNextState(SessionState::kProcess, RequestKind::kMoreToCome); - runner.expectNextState(SessionState::kPoll, RequestKind::kDefault); - runner.expectNextState(SessionState::kSource, RequestKind::kDefault); - runner.expectNextState(SessionState::kProcess, RequestKind::kDefault); - runner.expectNextState(SessionState::kSink, RequestKind::kDefault); - runner.expectFinalState(SessionState::kPoll); - - runner.run(); + runSteps(convertStepsToBorrowed(moreToComeLoop())); } } // namespace -} // namespace transport -} // namespace mongo +} // namespace mongo::transport |