diff options
Diffstat (limited to 'src/mongo/transport')
-rw-r--r-- | src/mongo/transport/service_entry_point.h | 7 | ||||
-rw-r--r-- | src/mongo/transport/service_entry_point_mock.cpp | 8 | ||||
-rw-r--r-- | src/mongo/transport/service_entry_point_mock.h | 6 | ||||
-rw-r--r-- | src/mongo/transport/service_entry_point_test_suite.cpp | 58 | ||||
-rw-r--r-- | src/mongo/transport/service_entry_point_test_suite.h | 30 | ||||
-rw-r--r-- | src/mongo/transport/service_entry_point_utils.cpp | 26 | ||||
-rw-r--r-- | src/mongo/transport/service_entry_point_utils.h | 9 | ||||
-rw-r--r-- | src/mongo/transport/session.cpp | 32 | ||||
-rw-r--r-- | src/mongo/transport/session.h | 23 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer.h | 10 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_legacy.cpp | 35 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_legacy.h | 12 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_manager.cpp | 22 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_manager.h | 10 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_mock.cpp | 53 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_mock.h | 24 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_mock_test.cpp | 64 |
17 files changed, 213 insertions, 216 deletions
diff --git a/src/mongo/transport/service_entry_point.h b/src/mongo/transport/service_entry_point.h index a1c031c5e3e..e0a1ee7a907 100644 --- a/src/mongo/transport/service_entry_point.h +++ b/src/mongo/transport/service_entry_point.h @@ -29,13 +29,10 @@ #pragma once #include "mongo/base/disallow_copying.h" +#include "mongo/transport/session.h" namespace mongo { -namespace transport { -class Session; -} // namespace transport - /** * This is the entrypoint from the transport layer into mongod or mongos. * @@ -52,7 +49,7 @@ public: /** * Begin running a new Session. This method returns immediately. */ - virtual void startSession(transport::Session&& session) = 0; + virtual void startSession(transport::SessionHandle session) = 0; protected: ServiceEntryPoint() = default; diff --git a/src/mongo/transport/service_entry_point_mock.cpp b/src/mongo/transport/service_entry_point_mock.cpp index 8d4530d7f13..667b1d5ae5e 100644 --- a/src/mongo/transport/service_entry_point_mock.cpp +++ b/src/mongo/transport/service_entry_point_mock.cpp @@ -86,11 +86,11 @@ ServiceEntryPointMock::~ServiceEntryPointMock() { } } -void ServiceEntryPointMock::startSession(transport::Session&& session) { +void ServiceEntryPointMock::startSession(transport::SessionHandle session) { _threads.emplace_back(&ServiceEntryPointMock::run, this, std::move(session)); } -void ServiceEntryPointMock::run(transport::Session&& session) { +void ServiceEntryPointMock::run(transport::SessionHandle session) { Message inMessage; while (true) { { @@ -100,12 +100,12 @@ void ServiceEntryPointMock::run(transport::Session&& session) { } // sourceMessage() - if (!session.sourceMessage(&inMessage).wait().isOK()) { + if (!session->sourceMessage(&inMessage).wait().isOK()) { break; } // sinkMessage() - if (!session.sinkMessage(_outMessage).wait().isOK()) { + if (!session->sinkMessage(_outMessage).wait().isOK()) { break; } } diff --git a/src/mongo/transport/service_entry_point_mock.h b/src/mongo/transport/service_entry_point_mock.h index ceeca2a0985..eadc9367fa8 100644 --- a/src/mongo/transport/service_entry_point_mock.h +++ b/src/mongo/transport/service_entry_point_mock.h @@ -34,13 +34,13 @@ #include "mongo/stdx/mutex.h" #include "mongo/stdx/thread.h" #include "mongo/transport/service_entry_point.h" +#include "mongo/transport/session.h" #include "mongo/util/net/message.h" namespace mongo { namespace transport { -class Session; class TransportLayer; } // namespace transport @@ -63,10 +63,10 @@ public: * * ...repeat until wait() returns an error. */ - void startSession(transport::Session&& session) override; + void startSession(transport::SessionHandle session) override; private: - void run(transport::Session&& session); + void run(transport::SessionHandle session); transport::TransportLayer* _tl; diff --git a/src/mongo/transport/service_entry_point_test_suite.cpp b/src/mongo/transport/service_entry_point_test_suite.cpp index eefb9593218..4587ac8ea8b 100644 --- a/src/mongo/transport/service_entry_point_test_suite.cpp +++ b/src/mongo/transport/service_entry_point_test_suite.cpp @@ -90,7 +90,7 @@ void setPingCommand(Message* m) { } // Some default method implementations -const auto kDefaultEnd = [](const Session& session) { return; }; +const auto kDefaultEnd = [](const SessionHandle& session) { return; }; const auto kDefaultDestroyHook = [](Session& session) { return; }; const auto kDefaultAsyncWait = [](Ticket, TicketCallback cb) { cb(Status::OK()); }; const auto kNoopFunction = [] { return; }; @@ -100,13 +100,13 @@ const auto kEndConnectionStatus = Status(ErrorCodes::HostUnreachable, "connectio } // namespace -ServiceEntryPointTestSuite::MockTicket::MockTicket(const Session& session, +ServiceEntryPointTestSuite::MockTicket::MockTicket(const SessionHandle& session, Message* message, Date_t expiration) - : _message(message), _sessionId(session.id()), _expiration(expiration) {} + : _message(message), _sessionId(session->id()), _expiration(expiration) {} -ServiceEntryPointTestSuite::MockTicket::MockTicket(const Session& session, Date_t expiration) - : _sessionId(session.id()), _expiration(expiration) {} +ServiceEntryPointTestSuite::MockTicket::MockTicket(const SessionHandle& session, Date_t expiration) + : _sessionId(session->id()), _expiration(expiration) {} Session::Id ServiceEntryPointTestSuite::MockTicket::sessionId() const { return _sessionId; @@ -129,13 +129,13 @@ ServiceEntryPointTestSuite::MockTLHarness::MockTLHarness() _asyncWait(kDefaultAsyncWait), _end(kDefaultEnd) {} -Ticket ServiceEntryPointTestSuite::MockTLHarness::sourceMessage(Session& session, +Ticket ServiceEntryPointTestSuite::MockTLHarness::sourceMessage(const SessionHandle& session, Message* message, Date_t expiration) { return _sourceMessage(session, message, expiration); } -Ticket ServiceEntryPointTestSuite::MockTLHarness::sinkMessage(Session& session, +Ticket ServiceEntryPointTestSuite::MockTLHarness::sinkMessage(const SessionHandle& session, const Message& message, Date_t expiration) { return _sinkMessage(session, message, expiration); @@ -151,17 +151,17 @@ void ServiceEntryPointTestSuite::MockTLHarness::asyncWait(Ticket&& ticket, } SSLPeerInfo ServiceEntryPointTestSuite::MockTLHarness::getX509PeerInfo( - const Session& session) const { + const ConstSessionHandle& session) const { return SSLPeerInfo("mock", stdx::unordered_set<RoleName>{}); } -void ServiceEntryPointTestSuite::MockTLHarness::registerTags(const Session& session) {} +void ServiceEntryPointTestSuite::MockTLHarness::registerTags(const ConstSessionHandle& session) {} TransportLayer::Stats ServiceEntryPointTestSuite::MockTLHarness::sessionStats() { return Stats(); } -void ServiceEntryPointTestSuite::MockTLHarness::end(Session& session) { +void ServiceEntryPointTestSuite::MockTLHarness::end(const SessionHandle& session) { return _end(session); } @@ -194,17 +194,19 @@ Status ServiceEntryPointTestSuite::MockTLHarness::_waitOnceThenError(transport:: return _defaultWait(std::move(ticket)); } -Ticket ServiceEntryPointTestSuite::MockTLHarness::_defaultSource(Session& s, Message* m, Date_t d) { +Ticket ServiceEntryPointTestSuite::MockTLHarness::_defaultSource(const SessionHandle& s, + Message* m, + Date_t d) { return Ticket(this, stdx::make_unique<ServiceEntryPointTestSuite::MockTicket>(s, m, d)); } -Ticket ServiceEntryPointTestSuite::MockTLHarness::_defaultSink(Session& s, +Ticket ServiceEntryPointTestSuite::MockTLHarness::_defaultSink(const SessionHandle& s, const Message&, Date_t d) { return Ticket(this, stdx::make_unique<ServiceEntryPointTestSuite::MockTicket>(s, d)); } -Ticket ServiceEntryPointTestSuite::MockTLHarness::_sinkThenErrorOnWait(Session& s, +Ticket ServiceEntryPointTestSuite::MockTLHarness::_sinkThenErrorOnWait(const SessionHandle& s, const Message& m, Date_t d) { _wait = stdx::bind(&ServiceEntryPointTestSuite::MockTLHarness::_waitOnceThenError, this, _1); @@ -251,10 +253,10 @@ void ServiceEntryPointTestSuite::noLifeCycleTest() { _tl->_wait = stdx::bind(&ServiceEntryPointTestSuite::MockTLHarness::_waitError, _tl.get(), _1); // Step 3: SEP destroys the session, which calls end() - _tl->_destroy_hook = [&testComplete](const Session&) { testComplete.set_value(); }; + _tl->_destroy_hook = [&testComplete](Session&) { testComplete.set_value(); }; // Kick off the SEP - Session s(HostAndPort(), HostAndPort(), _tl.get()); + auto s = Session::create(HostAndPort(), HostAndPort(), _tl.get()); _sep->startSession(std::move(s)); testFuture.wait(); @@ -270,7 +272,7 @@ void ServiceEntryPointTestSuite::halfLifeCycleTest() { // Step 1: SEP gets a ticket to source a Message // Step 2: SEP calls wait() on the ticket and receives a Message // Step 3: SEP gets a ticket to sink a Message - _tl->_sinkMessage = [this](Session& session, const Message& m, Date_t expiration) { + _tl->_sinkMessage = [this](const SessionHandle& session, const Message& m, Date_t expiration) { // Step 4: SEP calls wait() on the ticket and receives an error _tl->_wait = @@ -280,10 +282,10 @@ void ServiceEntryPointTestSuite::halfLifeCycleTest() { }; // Step 5: SEP destroys the session, which calls end() - _tl->_destroy_hook = [&testComplete](const Session&) { testComplete.set_value(); }; + _tl->_destroy_hook = [&testComplete](Session&) { testComplete.set_value(); }; // Kick off the SEP - Session s(HostAndPort(), HostAndPort(), _tl.get()); + auto s = Session::create(HostAndPort(), HostAndPort(), _tl.get()); _sep->startSession(std::move(s)); testFuture.wait(); @@ -306,20 +308,20 @@ void ServiceEntryPointTestSuite::fullLifeCycleTest() { // Step 5: SEP gets a ticket to source a Message // Step 6: SEP calls wait() on the ticket and receives and error // Step 7: SEP destroys the session, which calls end() - _tl->_destroy_hook = [&testComplete](const Session& session) { testComplete.set_value(); }; + _tl->_destroy_hook = [&testComplete](Session& session) { testComplete.set_value(); }; // Kick off the SEP - Session s(HostAndPort(), HostAndPort(), _tl.get()); + auto s = Session::create(HostAndPort(), HostAndPort(), _tl.get()); _sep->startSession(std::move(s)); testFuture.wait(); } void ServiceEntryPointTestSuite::interruptingSessionTest() { - Session sA(HostAndPort(), HostAndPort(), _tl.get()); - Session sB(HostAndPort(), HostAndPort(), _tl.get()); - auto idA = sA.id(); - auto idB = sB.id(); + auto sA = Session::create(HostAndPort(), HostAndPort(), _tl.get()); + auto sB = Session::create(HostAndPort(), HostAndPort(), _tl.get()); + auto idA = sA->id(); + auto idB = sB->id(); int waitCountB = 0; stdx::promise<void> startB; @@ -366,7 +368,7 @@ void ServiceEntryPointTestSuite::interruptingSessionTest() { // Step 7: SEP calls sourceMessage() for B, gets tB3 // Step 8: SEP calls wait() for tB3, gets an error // Step 9: SEP calls end(B) - _tl->_destroy_hook = [this, idA, idB, &resumeA, &testComplete](const Session& session) { + _tl->_destroy_hook = [this, idA, idB, &resumeA, &testComplete](Session& session) { // When end(B) is called, time to resume session A if (session.id() == idB) { // Resume session A @@ -450,18 +452,18 @@ void ServiceEntryPointTestSuite::burstStressTest(int numSessions, }; // When we end the last session, end the test. - _tl->_destroy_hook = [&allSessionsComplete, numSessions, &ended](const Session& session) { + _tl->_destroy_hook = [&allSessionsComplete, numSessions, &ended](Session& session) { if (ended.fetchAndAdd(1) == (numSessions - 1)) { allSessionsComplete.set_value(); } }; for (int i = 0; i < numSessions; i++) { - Session s(HostAndPort(), HostAndPort(), _tl.get()); + auto s = Session::create(HostAndPort(), HostAndPort(), _tl.get()); { // This operation may cause a re-hash. stdx::lock_guard<stdx::mutex> lock(cyclesLock); - completedCycles.emplace(s.id(), 0); + completedCycles.emplace(s->id(), 0); } _sep->startSession(std::move(s)); } diff --git a/src/mongo/transport/service_entry_point_test_suite.h b/src/mongo/transport/service_entry_point_test_suite.h index ba1d60fd027..d1d3042cc5e 100644 --- a/src/mongo/transport/service_entry_point_test_suite.h +++ b/src/mongo/transport/service_entry_point_test_suite.h @@ -92,10 +92,10 @@ private: class MockTicket : public transport::TicketImpl { public: // Source constructor - MockTicket(const transport::Session& session, Message* message, Date_t expiration); + MockTicket(const transport::SessionHandle& session, Message* message, Date_t expiration); // Sink constructor - MockTicket(const transport::Session& session, Date_t expiration); + MockTicket(const transport::SessionHandle& session, Date_t expiration); MockTicket(MockTicket&&) = default; MockTicket& operator=(MockTicket&&) = default; @@ -121,19 +121,19 @@ private: MockTLHarness(); transport::Ticket sourceMessage( - transport::Session& session, + const transport::SessionHandle& session, Message* message, Date_t expiration = transport::Ticket::kNoExpirationDate) override; transport::Ticket sinkMessage( - transport::Session& session, + const transport::SessionHandle& session, const Message& message, Date_t expiration = transport::Ticket::kNoExpirationDate) override; Status wait(transport::Ticket&& ticket) override; void asyncWait(transport::Ticket&& ticket, TicketCallback callback) override; - SSLPeerInfo getX509PeerInfo(const transport::Session& session) const override; - void registerTags(const transport::Session& session) override; + SSLPeerInfo getX509PeerInfo(const transport::ConstSessionHandle& session) const override; + void registerTags(const transport::ConstSessionHandle& session) override; Stats sessionStats() override; - void end(transport::Session& session) override; + void end(const transport::SessionHandle& session) override; void endAllSessions(transport::Session::TagMask tags) override; Status start() override; void shutdown() override; @@ -141,11 +141,13 @@ private: ServiceEntryPointTestSuite::MockTicket* getMockTicket(const transport::Ticket& ticket); // Mocked method hooks - stdx::function<transport::Ticket(transport::Session&, Message*, Date_t)> _sourceMessage; - stdx::function<transport::Ticket(transport::Session&, const Message&, Date_t)> _sinkMessage; + stdx::function<transport::Ticket(const transport::SessionHandle&, Message*, Date_t)> + _sourceMessage; + stdx::function<transport::Ticket(const transport::SessionHandle&, const Message&, Date_t)> + _sinkMessage; stdx::function<Status(transport::Ticket)> _wait; stdx::function<void(transport::Ticket, TicketCallback)> _asyncWait; - stdx::function<void(const transport::Session&)> _end; + stdx::function<void(const transport::SessionHandle&)> _end; stdx::function<void(transport::Session& session)> _destroy_hook; stdx::function<void(transport::Session::TagMask tags)> _endAllSessions = [](transport::Session::TagMask tags) {}; @@ -153,9 +155,11 @@ private: stdx::function<void(void)> _shutdown = [] {}; // Pre-set hook methods - transport::Ticket _defaultSource(transport::Session& s, Message* m, Date_t d); - transport::Ticket _defaultSink(transport::Session& s, const Message&, Date_t d); - transport::Ticket _sinkThenErrorOnWait(transport::Session& s, const Message& m, Date_t d); + transport::Ticket _defaultSource(const transport::SessionHandle& s, Message* m, Date_t d); + transport::Ticket _defaultSink(const transport::SessionHandle& s, const Message&, Date_t d); + transport::Ticket _sinkThenErrorOnWait(const transport::SessionHandle& s, + const Message& m, + Date_t d); Status _defaultWait(transport::Ticket ticket); Status _waitError(transport::Ticket ticket); diff --git a/src/mongo/transport/service_entry_point_utils.cpp b/src/mongo/transport/service_entry_point_utils.cpp index ac0a6109e03..9d74de81a01 100644 --- a/src/mongo/transport/service_entry_point_utils.cpp +++ b/src/mongo/transport/service_entry_point_utils.cpp @@ -55,23 +55,27 @@ namespace mongo { namespace { +/** + * This object takes ownership of transport::SessionHandle. + */ struct Context { - Context(transport::Session session, stdx::function<void(transport::Session*)> task) + Context(transport::SessionHandle session, + stdx::function<void(const transport::SessionHandle&)> task) : session(std::move(session)), task(std::move(task)) {} - transport::Session session; - stdx::function<void(transport::Session*)> task; + transport::SessionHandle session; + stdx::function<void(const transport::SessionHandle&)> task; }; void* runFunc(void* ptr) { std::unique_ptr<Context> ctx(static_cast<Context*>(ptr)); - auto tl = ctx->session.getTransportLayer(); - Client::initThread("conn", &ctx->session); - setThreadName(std::string(str::stream() << "conn" << ctx->session.id())); + auto tl = ctx->session->getTransportLayer(); + Client::initThread("conn", ctx->session); + setThreadName(std::string(str::stream() << "conn" << ctx->session->id())); try { - ctx->task(&ctx->session); + ctx->task(ctx->session); } catch (const AssertionException& e) { log() << "AssertionException handling request, closing client connection: " << e; } catch (const SocketException& e) { @@ -89,7 +93,7 @@ void* runFunc(void* ptr) { if (!serverGlobalParams.quiet) { auto conns = tl->sessionStats().numOpenSessions; const char* word = (conns == 1 ? " connection" : " connections"); - log() << "end connection " << ctx->session.remote() << " (" << conns << word + log() << "end connection " << ctx->session->remote() << " (" << conns << word << " now open)"; } @@ -99,8 +103,8 @@ void* runFunc(void* ptr) { } } // namespace -void launchWrappedServiceEntryWorkerThread(transport::Session&& session, - stdx::function<void(transport::Session*)> task) { +void launchWrappedServiceEntryWorkerThread( + transport::SessionHandle session, stdx::function<void(const transport::SessionHandle&)> task) { auto ctx = stdx::make_unique<Context>(std::move(session), std::move(task)); try { @@ -147,7 +151,7 @@ void launchWrappedServiceEntryWorkerThread(transport::Session&& session, #endif // __linux__ } catch (...) { - log() << "failed to create service entry worker thread for " << ctx->session.remote(); + log() << "failed to create service entry worker thread for " << ctx->session->remote(); } } diff --git a/src/mongo/transport/service_entry_point_utils.h b/src/mongo/transport/service_entry_point_utils.h index 42be4079f9c..1c1634af6d5 100644 --- a/src/mongo/transport/service_entry_point_utils.h +++ b/src/mongo/transport/service_entry_point_utils.h @@ -29,14 +29,11 @@ #pragma once #include "mongo/stdx/functional.h" +#include "mongo/transport/session.h" namespace mongo { -namespace transport { -class Session; -} // namespace transport - -void launchWrappedServiceEntryWorkerThread(transport::Session&& session, - stdx::function<void(transport::Session*)> task); +void launchWrappedServiceEntryWorkerThread( + transport::SessionHandle session, stdx::function<void(const transport::SessionHandle&)> task); } // namespace mongo diff --git a/src/mongo/transport/session.cpp b/src/mongo/transport/session.cpp index b32c42609b8..4287d62cfd2 100644 --- a/src/mongo/transport/session.cpp +++ b/src/mongo/transport/session.cpp @@ -56,44 +56,26 @@ Session::~Session() { } } -Session::Session(Session&& other) - : _id(other._id), - _remote(std::move(other._remote)), - _local(std::move(other._local)), - _tl(other._tl) { - // We do not want to call tl->destroy() on moved-from Sessions. - other._tl = nullptr; -} - -Session& Session::operator=(Session&& other) { - if (&other == this) { - return *this; - } - - _id = other._id; - _remote = std::move(other._remote); - _local = std::move(other._local); - _tl = other._tl; - other._tl = nullptr; - - return *this; +SessionHandle Session::create(HostAndPort remote, HostAndPort local, TransportLayer* tl) { + std::shared_ptr<Session> handle(new Session(std::move(remote), std::move(local), tl)); + return handle; } void Session::replaceTags(TagMask tags) { _tags = tags; - _tl->registerTags(*this); + _tl->registerTags(shared_from_this()); } Ticket Session::sourceMessage(Message* message, Date_t expiration) { - return _tl->sourceMessage(*this, message, expiration); + return _tl->sourceMessage(shared_from_this(), message, expiration); } Ticket Session::sinkMessage(const Message& message, Date_t expiration) { - return _tl->sinkMessage(*this, message, expiration); + return _tl->sinkMessage(shared_from_this(), message, expiration); } SSLPeerInfo Session::getX509PeerInfo() const { - return _tl->getX509PeerInfo(*this); + return _tl->getX509PeerInfo(shared_from_this()); } } // namespace transport diff --git a/src/mongo/transport/session.h b/src/mongo/transport/session.h index 026a3445bb4..29e54333ceb 100644 --- a/src/mongo/transport/session.h +++ b/src/mongo/transport/session.h @@ -28,6 +28,8 @@ #pragma once +#include <memory> + #include "mongo/base/disallow_copying.h" #include "mongo/transport/message_compressor_manager.h" #include "mongo/transport/session_id.h" @@ -43,12 +45,16 @@ struct SSLPeerInfo; namespace transport { class TransportLayer; +class Session; + +using SessionHandle = std::shared_ptr<Session>; +using ConstSessionHandle = std::shared_ptr<const Session>; /** * This type contains data needed to associate Messages with connections * (on the transport side) and Messages with Client objects (on the database side). */ -class Session { +class Session : public std::enable_shared_from_this<Session> { MONGO_DISALLOW_COPYING(Session); public: @@ -68,20 +74,14 @@ public: static constexpr TagMask kKeepOpen = 1; /** - * Construct a new session. - */ - Session(HostAndPort remote, HostAndPort local, TransportLayer* tl); - - /** * Destroys a session, calling end() for this session in its TransportLayer. */ ~Session(); /** - * Move constructor and assignment operator. + * A factory for sessions. */ - Session(Session&& other); - Session& operator=(Session&& other); + static SessionHandle create(HostAndPort remote, HostAndPort local, TransportLayer* tl); /** * Return the id for this session. @@ -149,6 +149,11 @@ public: } private: + /** + * Construct a new session. + */ + Session(HostAndPort remote, HostAndPort local, TransportLayer* tl); + Id _id; HostAndPort _remote; diff --git a/src/mongo/transport/transport_layer.h b/src/mongo/transport/transport_layer.h index 3313a68108d..b09ff8a8eb9 100644 --- a/src/mongo/transport/transport_layer.h +++ b/src/mongo/transport/transport_layer.h @@ -101,7 +101,7 @@ public: * TransportLayer is unable to source a Message, this will be a failed status, * and the passed-in Message buffer may be left in an invalid state. */ - virtual Ticket sourceMessage(Session& session, + virtual Ticket sourceMessage(const SessionHandle& session, Message* message, Date_t expiration = Ticket::kNoExpirationDate) = 0; @@ -120,7 +120,7 @@ public: * This method does NOT take ownership of the sunk Message, which must be cleaned * up by the caller. */ - virtual Ticket sinkMessage(Session& session, + virtual Ticket sinkMessage(const SessionHandle& session, const Message& message, Date_t expiration = Ticket::kNoExpirationDate) = 0; @@ -154,13 +154,13 @@ public: * * Before calling this method, use Session::replaceTags() to set the desired TagMask. */ - virtual void registerTags(const Session& session) = 0; + virtual void registerTags(const ConstSessionHandle& session) = 0; /** * Return the stored X509 peer information for this session. If the session does not * exist in this TransportLayer, returns a default constructed object. */ - virtual SSLPeerInfo getX509PeerInfo(const Session& session) const = 0; + virtual SSLPeerInfo getX509PeerInfo(const ConstSessionHandle& session) const = 0; /** * Returns the number of sessions currently open in the transport layer. @@ -178,7 +178,7 @@ public: * * This method is idempotent and synchronous. */ - virtual void end(Session& session) = 0; + virtual void end(const SessionHandle& session) = 0; /** * End all active sessions in the TransportLayer. Tickets that have already been started via diff --git a/src/mongo/transport/transport_layer_legacy.cpp b/src/mongo/transport/transport_layer_legacy.cpp index a410ff09fdb..b5e3c96c6e4 100644 --- a/src/mongo/transport/transport_layer_legacy.cpp +++ b/src/mongo/transport/transport_layer_legacy.cpp @@ -64,10 +64,10 @@ TransportLayerLegacy::TransportLayerLegacy(const TransportLayerLegacy::Options& _running(false), _options(opts) {} -TransportLayerLegacy::LegacyTicket::LegacyTicket(const Session& session, +TransportLayerLegacy::LegacyTicket::LegacyTicket(const SessionHandle& session, Date_t expiration, WorkHandle work) - : _sessionId(session.id()), _expiration(expiration), _fill(std::move(work)) {} + : _sessionId(session->id()), _expiration(expiration), _fill(std::move(work)) {} Session::Id TransportLayerLegacy::LegacyTicket::sessionId() const { return _sessionId; @@ -98,8 +98,10 @@ Status TransportLayerLegacy::start() { TransportLayerLegacy::~TransportLayerLegacy() = default; -Ticket TransportLayerLegacy::sourceMessage(Session& session, Message* message, Date_t expiration) { - auto& compressorMgr = session.getCompressorManager(); +Ticket TransportLayerLegacy::sourceMessage(const SessionHandle& session, + Message* message, + Date_t expiration) { + auto& compressorMgr = session->getCompressorManager(); auto sourceCb = [message, &compressorMgr](AbstractMessagingPort* amp) -> Status { if (!amp->recv(*message)) { return {ErrorCodes::HostUnreachable, "Recv failed"}; @@ -119,10 +121,10 @@ Ticket TransportLayerLegacy::sourceMessage(Session& session, Message* message, D return Ticket(this, stdx::make_unique<LegacyTicket>(session, expiration, std::move(sourceCb))); } -SSLPeerInfo TransportLayerLegacy::getX509PeerInfo(const Session& session) const { +SSLPeerInfo TransportLayerLegacy::getX509PeerInfo(const ConstSessionHandle& session) const { { stdx::lock_guard<stdx::mutex> lk(_connectionsMutex); - auto conn = _connections.find(session.id()); + auto conn = _connections.find(session->id()); if (conn == _connections.end()) { // Return empty string if the session is not found return SSLPeerInfo(); @@ -145,10 +147,10 @@ TransportLayer::Stats TransportLayerLegacy::sessionStats() { return stats; } -Ticket TransportLayerLegacy::sinkMessage(Session& session, +Ticket TransportLayerLegacy::sinkMessage(const SessionHandle& session, const Message& message, Date_t expiration) { - auto& compressorMgr = session.getCompressorManager(); + auto& compressorMgr = session->getCompressorManager(); auto sinkCb = [&message, &compressorMgr](AbstractMessagingPort* amp) -> Status { try { networkCounter.hitLogical(0, message.size()); @@ -179,19 +181,19 @@ void TransportLayerLegacy::asyncWait(Ticket&& ticket, TicketCallback callback) { MONGO_UNREACHABLE; } -void TransportLayerLegacy::end(Session& session) { +void TransportLayerLegacy::end(const SessionHandle& session) { stdx::lock_guard<stdx::mutex> lk(_connectionsMutex); - auto conn = _connections.find(session.id()); + auto conn = _connections.find(session->id()); if (conn != _connections.end()) { _endSession_inlock(conn); } } -void TransportLayerLegacy::registerTags(const Session& session) { +void TransportLayerLegacy::registerTags(const ConstSessionHandle& session) { stdx::lock_guard<stdx::mutex> lk(_connectionsMutex); - auto conn = _connections.find(session.id()); + auto conn = _connections.find(session->id()); if (conn != _connections.end()) { - conn->second.tags = session.getTags(); + conn->second.tags = session->getTags(); } } @@ -311,15 +313,16 @@ void TransportLayerLegacy::_handleNewConnection(std::unique_ptr<AbstractMessagin return; } - Session session(amp->remote(), HostAndPort(amp->localAddr().toString(true)), this); + auto session = + Session::create(amp->remote(), HostAndPort(amp->localAddr().toString(true)), this); amp->setLogLevel(logger::LogSeverity::Debug(1)); { stdx::lock_guard<stdx::mutex> lk(_connectionsMutex); _connections.emplace(std::piecewise_construct, - std::forward_as_tuple(session.id()), - std::forward_as_tuple(std::move(amp), false, session.getTags())); + std::forward_as_tuple(session->id()), + std::forward_as_tuple(std::move(amp), false, session->getTags())); } invariant(_sep); diff --git a/src/mongo/transport/transport_layer_legacy.h b/src/mongo/transport/transport_layer_legacy.h index 8135f559070..3d348076ad7 100644 --- a/src/mongo/transport/transport_layer_legacy.h +++ b/src/mongo/transport/transport_layer_legacy.h @@ -66,23 +66,23 @@ public: Status setup(); Status start() override; - Ticket sourceMessage(Session& session, + Ticket sourceMessage(const SessionHandle& session, Message* message, Date_t expiration = Ticket::kNoExpirationDate) override; - Ticket sinkMessage(Session& session, + Ticket sinkMessage(const SessionHandle& session, const Message& message, Date_t expiration = Ticket::kNoExpirationDate) override; Status wait(Ticket&& ticket) override; void asyncWait(Ticket&& ticket, TicketCallback callback) override; - void registerTags(const Session& session) override; - SSLPeerInfo getX509PeerInfo(const Session& session) const override; + void registerTags(const ConstSessionHandle& session) override; + SSLPeerInfo getX509PeerInfo(const ConstSessionHandle& session) const override; Stats sessionStats() override; - void end(Session& session) override; + void end(const SessionHandle& session) override; void endAllSessions(transport::Session::TagMask tags) override; void shutdown() override; @@ -105,7 +105,7 @@ private: MONGO_DISALLOW_COPYING(LegacyTicket); public: - LegacyTicket(const Session& session, Date_t expiration, WorkHandle work); + LegacyTicket(const SessionHandle& session, Date_t expiration, WorkHandle work); SessionId sessionId() const override; Date_t expiration() const override; diff --git a/src/mongo/transport/transport_layer_manager.cpp b/src/mongo/transport/transport_layer_manager.cpp index 2c12fe98c25..d29c01f54f5 100644 --- a/src/mongo/transport/transport_layer_manager.cpp +++ b/src/mongo/transport/transport_layer_manager.cpp @@ -44,14 +44,16 @@ namespace transport { TransportLayerManager::TransportLayerManager() = default; -Ticket TransportLayerManager::sourceMessage(Session& session, Message* message, Date_t expiration) { - return session.getTransportLayer()->sourceMessage(session, message, expiration); +Ticket TransportLayerManager::sourceMessage(const SessionHandle& session, + Message* message, + Date_t expiration) { + return session->getTransportLayer()->sourceMessage(session, message, expiration); } -Ticket TransportLayerManager::sinkMessage(Session& session, +Ticket TransportLayerManager::sinkMessage(const SessionHandle& session, const Message& message, Date_t expiration) { - return session.getTransportLayer()->sinkMessage(session, message, expiration); + return session->getTransportLayer()->sinkMessage(session, message, expiration); } Status TransportLayerManager::wait(Ticket&& ticket) { @@ -62,8 +64,8 @@ void TransportLayerManager::asyncWait(Ticket&& ticket, TicketCallback callback) return getTicketTransportLayer(ticket)->asyncWait(std::move(ticket), std::move(callback)); } -SSLPeerInfo TransportLayerManager::getX509PeerInfo(const Session& session) const { - return session.getX509PeerInfo(); +SSLPeerInfo TransportLayerManager::getX509PeerInfo(const ConstSessionHandle& session) const { + return session->getX509PeerInfo(); } template <typename Callable> @@ -95,12 +97,12 @@ TransportLayer::Stats TransportLayerManager::sessionStats() { return stats; } -void TransportLayerManager::registerTags(const Session& session) { - session.getTransportLayer()->registerTags(session); +void TransportLayerManager::registerTags(const ConstSessionHandle& session) { + session->getTransportLayer()->registerTags(session); } -void TransportLayerManager::end(Session& session) { - session.getTransportLayer()->end(session); +void TransportLayerManager::end(const SessionHandle& session) { + session->getTransportLayer()->end(session); } void TransportLayerManager::endAllSessions(Session::TagMask tags) { diff --git a/src/mongo/transport/transport_layer_manager.h b/src/mongo/transport/transport_layer_manager.h index bfdf7046da1..deba17146cc 100644 --- a/src/mongo/transport/transport_layer_manager.h +++ b/src/mongo/transport/transport_layer_manager.h @@ -54,22 +54,22 @@ class TransportLayerManager final : public TransportLayer { public: TransportLayerManager(); - Ticket sourceMessage(Session& session, + Ticket sourceMessage(const SessionHandle& session, Message* message, Date_t expiration = Ticket::kNoExpirationDate) override; - Ticket sinkMessage(Session& session, + Ticket sinkMessage(const SessionHandle& session, const Message& message, Date_t expiration = Ticket::kNoExpirationDate) override; Status wait(Ticket&& ticket) override; void asyncWait(Ticket&& ticket, TicketCallback callback) override; - SSLPeerInfo getX509PeerInfo(const Session& session) const override; - void registerTags(const Session& session) override; + SSLPeerInfo getX509PeerInfo(const ConstSessionHandle& session) const override; + void registerTags(const ConstSessionHandle& session) override; Stats sessionStats() override; - void end(Session& session) override; + void end(const SessionHandle& session) override; void endAllSessions(Session::TagMask tags) override; Status start() override; diff --git a/src/mongo/transport/transport_layer_mock.cpp b/src/mongo/transport/transport_layer_mock.cpp index 623980a19fc..550c8585f95 100644 --- a/src/mongo/transport/transport_layer_mock.cpp +++ b/src/mongo/transport/transport_layer_mock.cpp @@ -42,12 +42,12 @@ namespace mongo { namespace transport { -TransportLayerMock::TicketMock::TicketMock(const Session* session, +TransportLayerMock::TicketMock::TicketMock(const SessionHandle& session, Message* message, Date_t expiration) : _session(session), _message(message), _expiration(expiration) {} -TransportLayerMock::TicketMock::TicketMock(const Session* session, Date_t expiration) +TransportLayerMock::TicketMock::TicketMock(const SessionHandle& session, Date_t expiration) : _session(session), _expiration(expiration) {} Session::Id TransportLayerMock::TicketMock::sessionId() const { @@ -64,31 +64,33 @@ boost::optional<Message*> TransportLayerMock::TicketMock::msg() const { TransportLayerMock::TransportLayerMock() : _shutdown(false) {} -Ticket TransportLayerMock::sourceMessage(Session& session, Message* message, Date_t expiration) { +Ticket TransportLayerMock::sourceMessage(const SessionHandle& session, + Message* message, + Date_t expiration) { if (inShutdown()) { return Ticket(TransportLayer::ShutdownStatus); - } else if (!owns(session.id())) { + } else if (!owns(session->id())) { return Ticket(TransportLayer::SessionUnknownStatus); - } else if (_sessions[session.id()].ended) { + } else if (_sessions[session->id()].ended) { return Ticket(TransportLayer::TicketSessionClosedStatus); } return Ticket(this, - stdx::make_unique<TransportLayerMock::TicketMock>(&session, message, expiration)); + stdx::make_unique<TransportLayerMock::TicketMock>(session, message, expiration)); } -Ticket TransportLayerMock::sinkMessage(Session& session, +Ticket TransportLayerMock::sinkMessage(const SessionHandle& session, const Message& message, Date_t expiration) { if (inShutdown()) { return Ticket(TransportLayer::ShutdownStatus); - } else if (!owns(session.id())) { + } else if (!owns(session->id())) { return Ticket(TransportLayer::SessionUnknownStatus); - } else if (_sessions[session.id()].ended) { + } else if (_sessions[session->id()].ended) { return Ticket(TransportLayer::TicketSessionClosedStatus); } - return Ticket(this, stdx::make_unique<TransportLayerMock::TicketMock>(&session, expiration)); + return Ticket(this, stdx::make_unique<TransportLayerMock::TicketMock>(session, expiration)); } Status TransportLayerMock::wait(Ticket&& ticket) { @@ -109,52 +111,51 @@ void TransportLayerMock::asyncWait(Ticket&& ticket, TicketCallback callback) { callback(Status::OK()); } -SSLPeerInfo TransportLayerMock::getX509PeerInfo(const Session& session) const { - return _sessions.at(session.id()).peerInfo; +SSLPeerInfo TransportLayerMock::getX509PeerInfo(const ConstSessionHandle& session) const { + return _sessions.at(session->id()).peerInfo; } -void TransportLayerMock::setX509PeerInfo(const Session& session, SSLPeerInfo peerInfo) { - _sessions[session.id()].peerInfo = std::move(peerInfo); +void TransportLayerMock::setX509PeerInfo(const SessionHandle& session, SSLPeerInfo peerInfo) { + _sessions[session->id()].peerInfo = std::move(peerInfo); } TransportLayer::Stats TransportLayerMock::sessionStats() { return Stats(); } -void TransportLayerMock::registerTags(const Session& session) {} +void TransportLayerMock::registerTags(const ConstSessionHandle& session) {} -Session* TransportLayerMock::createSession() { - std::unique_ptr<Session> session = - stdx::make_unique<Session>(HostAndPort(), HostAndPort(), this); +SessionHandle TransportLayerMock::createSession() { + auto session = Session::create(HostAndPort(), HostAndPort(), this); Session::Id sessionId = session->id(); - _sessions[sessionId] = Connection{false, std::move(session), SSLPeerInfo()}; + _sessions[sessionId] = Connection{false, session, SSLPeerInfo()}; - return _sessions[sessionId].session.get(); + return _sessions[sessionId].session; } -Session* TransportLayerMock::get(Session::Id id) { +SessionHandle TransportLayerMock::get(Session::Id id) { if (!owns(id)) return nullptr; - return _sessions[id].session.get(); + return _sessions[id].session; } bool TransportLayerMock::owns(Session::Id id) { return _sessions.count(id) > 0; } -void TransportLayerMock::end(Session& session) { - if (!owns(session.id())) +void TransportLayerMock::end(const SessionHandle& session) { + if (!owns(session->id())) return; - _sessions[session.id()].ended = true; + _sessions[session->id()].ended = true; } void TransportLayerMock::endAllSessions(Session::TagMask tags) { auto it = _sessions.begin(); while (it != _sessions.end()) { - end(*it->second.session.get()); + end(it->second.session); it++; } } diff --git a/src/mongo/transport/transport_layer_mock.h b/src/mongo/transport/transport_layer_mock.h index a8d37ab7a0d..420360a44de 100644 --- a/src/mongo/transport/transport_layer_mock.h +++ b/src/mongo/transport/transport_layer_mock.h @@ -51,12 +51,12 @@ public: class TicketMock : public TicketImpl { public: // Source constructor - TicketMock(const Session* session, + TicketMock(const SessionHandle& session, Message* message, Date_t expiration = Ticket::kNoExpirationDate); // Sink constructor - TicketMock(const Session* session, Date_t expiration = Ticket::kNoExpirationDate); + TicketMock(const SessionHandle& session, Date_t expiration = Ticket::kNoExpirationDate); TicketMock(TicketMock&&) = default; TicketMock& operator=(TicketMock&&) = default; @@ -68,7 +68,7 @@ public: boost::optional<Message*> msg() const; private: - const Session* _session; + const SessionHandle& _session; boost::optional<Message*> _message; Date_t _expiration; }; @@ -76,26 +76,26 @@ public: TransportLayerMock(); ~TransportLayerMock(); - Ticket sourceMessage(Session& session, + Ticket sourceMessage(const SessionHandle& session, Message* message, Date_t expiration = Ticket::kNoExpirationDate) override; - Ticket sinkMessage(Session& session, + Ticket sinkMessage(const SessionHandle& session, const Message& message, Date_t expiration = Ticket::kNoExpirationDate) override; Status wait(Ticket&& ticket) override; void asyncWait(Ticket&& ticket, TicketCallback callback) override; - SSLPeerInfo getX509PeerInfo(const Session& session) const override; - void setX509PeerInfo(const Session& session, SSLPeerInfo peerInfo); - void registerTags(const Session& session) override; + SSLPeerInfo getX509PeerInfo(const ConstSessionHandle& session) const override; + void setX509PeerInfo(const SessionHandle& session, SSLPeerInfo peerInfo); + void registerTags(const ConstSessionHandle& session) override; Stats sessionStats() override; - Session* createSession(); - Session* get(Session::Id id); + SessionHandle createSession(); + SessionHandle get(Session::Id id); bool owns(Session::Id id); - void end(Session& session) override; + void end(const SessionHandle& session) override; void endAllSessions(Session::TagMask tags) override; Status start() override; @@ -107,7 +107,7 @@ private: struct Connection { bool ended; - std::unique_ptr<Session> session; + SessionHandle session; SSLPeerInfo peerInfo; }; stdx::unordered_map<Session::Id, Connection> _sessions; diff --git a/src/mongo/transport/transport_layer_mock_test.cpp b/src/mongo/transport/transport_layer_mock_test.cpp index a86874b4b00..9876db68d8e 100644 --- a/src/mongo/transport/transport_layer_mock_test.cpp +++ b/src/mongo/transport/transport_layer_mock_test.cpp @@ -54,10 +54,10 @@ private: // sinkMessage() generates a valid Ticket TEST_F(TransportLayerMockTest, SinkMessageGeneratesTicket) { Message msg{}; - Session* session = tl()->createSession(); + SessionHandle session = tl()->createSession(); // call sinkMessage() with no expiration - Ticket ticket = tl()->sinkMessage(*session, msg); + Ticket ticket = tl()->sinkMessage(session, msg); ASSERT(ticket.valid()); ASSERT_OK(ticket.status()); ASSERT_EQUALS(ticket.sessionId(), session->id()); @@ -65,7 +65,7 @@ TEST_F(TransportLayerMockTest, SinkMessageGeneratesTicket) { // call sinkMessage() with an expiration Date_t expiration = Date_t::now() + Hours(1); - ticket = tl()->sinkMessage(*session, msg, expiration); + ticket = tl()->sinkMessage(session, msg, expiration); ASSERT(ticket.valid()); ASSERT_OK(ticket.status()); ASSERT_EQUALS(ticket.sessionId(), session->id()); @@ -75,11 +75,11 @@ TEST_F(TransportLayerMockTest, SinkMessageGeneratesTicket) { // sinkMessage() generates an invalid Ticket if the Session is closed TEST_F(TransportLayerMockTest, SinkMessageSessionClosed) { Message msg{}; - Session* session = tl()->createSession(); + SessionHandle session = tl()->createSession(); - tl()->end(*session); + tl()->end(session); - Ticket ticket = tl()->sinkMessage(*session, msg); + Ticket ticket = tl()->sinkMessage(session, msg); ASSERT_FALSE(ticket.valid()); ASSERT_EQUALS(ticket.status().code(), ErrorCodes::TransportSessionClosed); } @@ -89,9 +89,9 @@ TEST_F(TransportLayerMockTest, SinkMessageSessionUnknown) { Message msg{}; std::unique_ptr<TransportLayerMock> anotherTL = stdx::make_unique<TransportLayerMock>(); - Session* session = anotherTL->createSession(); + SessionHandle session = anotherTL->createSession(); - Ticket ticket = tl()->sinkMessage(*session, msg); + Ticket ticket = tl()->sinkMessage(session, msg); ASSERT_FALSE(ticket.valid()); ASSERT_EQUALS(ticket.status().code(), ErrorCodes::TransportSessionUnknown); } @@ -99,11 +99,11 @@ TEST_F(TransportLayerMockTest, SinkMessageSessionUnknown) { // sinkMessage() generates an invalid Ticket if the TransportLayer is in shutdown TEST_F(TransportLayerMockTest, SinkMessageTLShutdown) { Message msg{}; - Session* session = tl()->createSession(); + SessionHandle session = tl()->createSession(); tl()->shutdown(); - Ticket ticket = tl()->sinkMessage(*session, msg); + Ticket ticket = tl()->sinkMessage(session, msg); ASSERT_FALSE(ticket.valid()); ASSERT_EQUALS(ticket.status().code(), ErrorCodes::ShutdownInProgress); } @@ -111,10 +111,10 @@ TEST_F(TransportLayerMockTest, SinkMessageTLShutdown) { // sourceMessage() generates a valid ticket TEST_F(TransportLayerMockTest, SourceMessageGeneratesTicket) { Message msg{}; - Session* session = tl()->createSession(); + SessionHandle session = tl()->createSession(); // call sourceMessage() with no expiration - Ticket ticket = tl()->sourceMessage(*session, &msg); + Ticket ticket = tl()->sourceMessage(session, &msg); ASSERT(ticket.valid()); ASSERT_OK(ticket.status()); ASSERT_EQUALS(ticket.sessionId(), session->id()); @@ -123,7 +123,7 @@ TEST_F(TransportLayerMockTest, SourceMessageGeneratesTicket) { // call sourceMessage() with an expiration Date_t expiration = Date_t::now() + Hours(1); - ticket = tl()->sourceMessage(*session, &msg, expiration); + ticket = tl()->sourceMessage(session, &msg, expiration); ASSERT(ticket.valid()); ASSERT_OK(ticket.status()); ASSERT_EQUALS(ticket.sessionId(), session->id()); @@ -134,11 +134,11 @@ TEST_F(TransportLayerMockTest, SourceMessageGeneratesTicket) { // sourceMessage() generates an invalid ticket if the Session is closed TEST_F(TransportLayerMockTest, SourceMessageSessionClosed) { Message msg{}; - Session* session = tl()->createSession(); + SessionHandle session = tl()->createSession(); - tl()->end(*session); + tl()->end(session); - Ticket ticket = tl()->sourceMessage(*session, &msg); + Ticket ticket = tl()->sourceMessage(session, &msg); ASSERT_FALSE(ticket.valid()); ASSERT_EQUALS(ticket.status().code(), ErrorCodes::TransportSessionClosed); } @@ -148,9 +148,9 @@ TEST_F(TransportLayerMockTest, SourceMessageSessionUnknown) { Message msg{}; std::unique_ptr<TransportLayerMock> anotherTL = stdx::make_unique<TransportLayerMock>(); - Session* session = anotherTL->createSession(); + SessionHandle session = anotherTL->createSession(); - Ticket ticket = tl()->sourceMessage(*session, &msg); + Ticket ticket = tl()->sourceMessage(session, &msg); ASSERT_FALSE(ticket.valid()); ASSERT_EQUALS(ticket.status().code(), ErrorCodes::TransportSessionUnknown); } @@ -158,18 +158,18 @@ TEST_F(TransportLayerMockTest, SourceMessageSessionUnknown) { // sourceMessage() generates an invalid ticket if the TransportLayer is in shutdown TEST_F(TransportLayerMockTest, SourceMessageTLShutdown) { Message msg{}; - Session* session = tl()->createSession(); + SessionHandle session = tl()->createSession(); tl()->shutdown(); - Ticket ticket = tl()->sourceMessage(*session, &msg); + Ticket ticket = tl()->sourceMessage(session, &msg); ASSERT_FALSE(ticket.valid()); ASSERT_EQUALS(ticket.status().code(), ErrorCodes::ShutdownInProgress); } // wait() returns an OK status TEST_F(TransportLayerMockTest, Wait) { - Session* session = tl()->createSession(); + SessionHandle session = tl()->createSession(); Ticket ticket = Ticket(tl(), stdx::make_unique<TransportLayerMock::TicketMock>(session)); Status status = tl()->wait(std::move(ticket)); @@ -178,7 +178,7 @@ TEST_F(TransportLayerMockTest, Wait) { // wait() returns an TicketExpired error status if the Ticket expired TEST_F(TransportLayerMockTest, WaitExpiredTicket) { - Session* session = tl()->createSession(); + SessionHandle session = tl()->createSession(); Ticket expiredTicket = Ticket(tl(), stdx::make_unique<TransportLayerMock::TicketMock>(session, Date_t::now())); @@ -197,10 +197,10 @@ TEST_F(TransportLayerMockTest, WaitInvalidTicket) { // wait() returns a SessionClosed error status if the Ticket's Session is closed TEST_F(TransportLayerMockTest, WaitSessionClosed) { - Session* session = tl()->createSession(); + SessionHandle session = tl()->createSession(); Ticket ticket = Ticket(tl(), stdx::make_unique<TransportLayerMock::TicketMock>(session)); - tl()->end(*session); + tl()->end(session); Status status = tl()->wait(std::move(ticket)); ASSERT_EQUALS(status.code(), ErrorCodes::TransportSessionClosed); @@ -210,7 +210,7 @@ TEST_F(TransportLayerMockTest, WaitSessionClosed) { // Session TEST_F(TransportLayerMockTest, WaitSessionUnknown) { std::unique_ptr<TransportLayerMock> anotherTL = stdx::make_unique<TransportLayerMock>(); - Session* session = anotherTL->createSession(); + SessionHandle session = anotherTL->createSession(); Ticket ticket = Ticket(tl(), stdx::make_unique<TransportLayerMock::TicketMock>(session)); Status status = tl()->wait(std::move(ticket)); @@ -219,7 +219,7 @@ TEST_F(TransportLayerMockTest, WaitSessionUnknown) { // wait() returns a ShutdownInProgress status if the TransportLayer is in shutdown TEST_F(TransportLayerMockTest, WaitTLShutdown) { - Session* session = tl()->createSession(); + SessionHandle session = tl()->createSession(); Ticket ticket = Ticket(tl(), stdx::make_unique<TransportLayerMock::TicketMock>(session)); tl()->shutdown(); @@ -228,18 +228,18 @@ TEST_F(TransportLayerMockTest, WaitTLShutdown) { ASSERT_EQUALS(status.code(), ErrorCodes::ShutdownInProgress); } -std::vector<Session*> createSessions(TransportLayerMock* tl) { +std::vector<SessionHandle> createSessions(TransportLayerMock* tl) { int numSessions = 10; - std::vector<Session*> sessions; + std::vector<SessionHandle> sessions; for (int i = 0; i < numSessions; i++) { - Session* session = tl->createSession(); + SessionHandle session = tl->createSession(); sessions.push_back(session); } return sessions; } void assertEnded(TransportLayer* tl, - std::vector<Session*> sessions, + std::vector<SessionHandle> sessions, ErrorCodes::Error code = ErrorCodes::TransportSessionClosed) { for (auto session : sessions) { Ticket ticket = Ticket(tl, stdx::make_unique<TransportLayerMock::TicketMock>(session)); @@ -250,14 +250,14 @@ void assertEnded(TransportLayer* tl, // endAllSessions() ends all sessions TEST_F(TransportLayerMockTest, EndAllSessions) { - std::vector<Session*> sessions = createSessions(tl()); + std::vector<SessionHandle> sessions = createSessions(tl()); tl()->endAllSessions(Session::kEmptyTagMask); assertEnded(tl(), sessions); } // shutdown() ends all sessions and shuts down TEST_F(TransportLayerMockTest, Shutdown) { - std::vector<Session*> sessions = createSessions(tl()); + std::vector<SessionHandle> sessions = createSessions(tl()); tl()->shutdown(); assertEnded(tl(), sessions, ErrorCodes::ShutdownInProgress); ASSERT(tl()->inShutdown()); |