From 1c2c402147d3e6fea734ffd16784cdb1e82da91d Mon Sep 17 00:00:00 2001 From: samantharitter Date: Wed, 2 Nov 2016 18:11:55 -0400 Subject: SERVER-26674 Remove uses of locks in TransportLayerLegacy - transport::Session becomes an abstract interface - narrow the TransportLayer interface - LegacySession owns its Connection object for fast access - refactor mock session and ticket classes into one common class --- src/mongo/s/sharding_test_fixture.cpp | 5 +- src/mongo/transport/mock_session.h | 80 +++++++++ src/mongo/transport/mock_ticket.h | 78 +++++++++ .../transport/service_entry_point_test_suite.cpp | 61 +++---- .../transport/service_entry_point_test_suite.h | 66 +++---- src/mongo/transport/session.cpp | 35 ++-- src/mongo/transport/session.h | 80 ++++----- src/mongo/transport/ticket_impl.h | 3 - src/mongo/transport/transport_layer.h | 16 -- src/mongo/transport/transport_layer_legacy.cpp | 191 ++++++++++----------- src/mongo/transport/transport_layer_legacy.h | 122 ++++++++++--- src/mongo/transport/transport_layer_manager.cpp | 8 - src/mongo/transport/transport_layer_manager.h | 3 - src/mongo/transport/transport_layer_mock.cpp | 36 +--- src/mongo/transport/transport_layer_mock.h | 28 --- src/mongo/transport/transport_layer_mock_test.cpp | 13 +- 16 files changed, 462 insertions(+), 363 deletions(-) create mode 100644 src/mongo/transport/mock_session.h create mode 100644 src/mongo/transport/mock_ticket.h diff --git a/src/mongo/s/sharding_test_fixture.cpp b/src/mongo/s/sharding_test_fixture.cpp index 715e6d9bbfe..a6fe9d431d6 100644 --- a/src/mongo/s/sharding_test_fixture.cpp +++ b/src/mongo/s/sharding_test_fixture.cpp @@ -68,6 +68,7 @@ #include "mongo/s/write_ops/batched_command_request.h" #include "mongo/s/write_ops/batched_command_response.h" #include "mongo/stdx/memory.h" +#include "mongo/transport/mock_session.h" #include "mongo/transport/transport_layer.h" #include "mongo/transport/transport_layer_mock.h" #include "mongo/util/clock_source_mock.h" @@ -101,7 +102,7 @@ void ShardingTestFixture::setUp() { _transportLayer = tlMock.get(); _service->addAndStartTransportLayer(std::move(tlMock)); CollatorFactoryInterface::set(_service.get(), stdx::make_unique()); - _transportSession = transport::Session::create(HostAndPort{}, HostAndPort{}, _transportLayer); + _transportSession = transport::MockSession::create(_transportLayer); _client = _service->makeClient("ShardingTestFixture", _transportSession); _opCtx = _client->makeOperationContext(); @@ -494,7 +495,7 @@ void ShardingTestFixture::expectCount(const HostAndPort& configHost, } void ShardingTestFixture::setRemote(const HostAndPort& remote) { - _transportSession = transport::Session::create(remote, HostAndPort{}, _transportLayer); + _transportSession = transport::MockSession::create(remote, HostAndPort{}, _transportLayer); } void ShardingTestFixture::checkReadConcern(const BSONObj& cmdObj, diff --git a/src/mongo/transport/mock_session.h b/src/mongo/transport/mock_session.h new file mode 100644 index 00000000000..147e13e4f7e --- /dev/null +++ b/src/mongo/transport/mock_session.h @@ -0,0 +1,80 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/transport/session.h" +#include "mongo/util/net/hostandport.h" + +namespace mongo { +namespace transport { + +class TransportLayer; + +class MockSession : public Session { + MONGO_DISALLOW_COPYING(MockSession); + +public: + static std::shared_ptr create(TransportLayer* tl) { + std::shared_ptr handle(new MockSession(tl)); + return handle; + } + + static std::shared_ptr create(HostAndPort remote, + HostAndPort local, + TransportLayer* tl) { + std::shared_ptr handle( + new MockSession(std::move(remote), std::move(local), tl)); + return handle; + } + + TransportLayer* getTransportLayer() const override { + return _tl; + } + + const HostAndPort& remote() const override { + return _remote; + } + + const HostAndPort& local() const override { + return _local; + } + +protected: + explicit MockSession(TransportLayer* tl) : _tl(tl), _remote(), _local() {} + explicit MockSession(HostAndPort remote, HostAndPort local, TransportLayer* tl) + : _tl(tl), _remote(std::move(remote)), _local(std::move(local)) {} + + TransportLayer* _tl; + + HostAndPort _remote; + HostAndPort _local; +}; + +} // namespace transport +} // namespace mongo diff --git a/src/mongo/transport/mock_ticket.h b/src/mongo/transport/mock_ticket.h new file mode 100644 index 00000000000..437daae23f6 --- /dev/null +++ b/src/mongo/transport/mock_ticket.h @@ -0,0 +1,78 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/base/disallow_copying.h" +#include "mongo/transport/session.h" +#include "mongo/transport/ticket_impl.h" +#include "mongo/util/time_support.h" + +namespace mongo { + +class Message; + +namespace transport { + +/** + * A mock ticket class for our test suite. + */ +class MockTicket : public TicketImpl { + MONGO_DISALLOW_COPYING(MockTicket); + +public: + // Source constructor + MockTicket(const SessionHandle& session, + Message* message, + Date_t expiration = Ticket::kNoExpirationDate) + : _id(session->id()), _message(message), _expiration(expiration) {} + + // Sink constructor + MockTicket(const SessionHandle& session, Date_t expiration = Ticket::kNoExpirationDate) + : _id(session->id()), _expiration(expiration) {} + + SessionId sessionId() const override { + return _id; + } + + Date_t expiration() const override { + return _expiration; + } + + boost::optional message() const { + return _message; + } + +private: + Session::Id _id; + boost::optional _message; + Date_t _expiration; +}; + +} // namespace transport +} // namespace mongo diff --git a/src/mongo/transport/service_entry_point_test_suite.cpp b/src/mongo/transport/service_entry_point_test_suite.cpp index 4587ac8ea8b..5d2945919e3 100644 --- a/src/mongo/transport/service_entry_point_test_suite.cpp +++ b/src/mongo/transport/service_entry_point_test_suite.cpp @@ -59,6 +59,7 @@ using namespace transport; using namespace stdx::placeholders; using TicketCallback = TransportLayer::TicketCallback; +using SEPTestSession = ServiceEntryPointTestSuite::SEPTestSession; namespace { @@ -91,7 +92,7 @@ void setPingCommand(Message* m) { // Some default method implementations const auto kDefaultEnd = [](const SessionHandle& session) { return; }; -const auto kDefaultDestroyHook = [](Session& session) { return; }; +const auto kDefaultDestroyHook = [](SEPTestSession& session) { return; }; const auto kDefaultAsyncWait = [](Ticket, TicketCallback cb) { cb(Status::OK()); }; const auto kNoopFunction = [] { return; }; @@ -100,26 +101,6 @@ const auto kEndConnectionStatus = Status(ErrorCodes::HostUnreachable, "connectio } // namespace -ServiceEntryPointTestSuite::MockTicket::MockTicket(const SessionHandle& session, - Message* message, - Date_t expiration) - : _message(message), _sessionId(session->id()), _expiration(expiration) {} - -ServiceEntryPointTestSuite::MockTicket::MockTicket(const SessionHandle& session, Date_t expiration) - : _sessionId(session->id()), _expiration(expiration) {} - -Session::Id ServiceEntryPointTestSuite::MockTicket::sessionId() const { - return _sessionId; -} - -Date_t ServiceEntryPointTestSuite::MockTicket::expiration() const { - return _expiration; -} - -boost::optional ServiceEntryPointTestSuite::MockTicket::message() { - return _message; -} - ServiceEntryPointTestSuite::MockTLHarness::MockTLHarness() : _sourceMessage( stdx::bind(&ServiceEntryPointTestSuite::MockTLHarness::_defaultSource, this, _1, _2, _3)), @@ -155,8 +136,6 @@ SSLPeerInfo ServiceEntryPointTestSuite::MockTLHarness::getX509PeerInfo( return SSLPeerInfo("mock", stdx::unordered_set{}); } -void ServiceEntryPointTestSuite::MockTLHarness::registerTags(const ConstSessionHandle& session) {} - TransportLayer::Stats ServiceEntryPointTestSuite::MockTLHarness::sessionStats() { return Stats(); } @@ -197,13 +176,13 @@ Status ServiceEntryPointTestSuite::MockTLHarness::_waitOnceThenError(transport:: Ticket ServiceEntryPointTestSuite::MockTLHarness::_defaultSource(const SessionHandle& s, Message* m, Date_t d) { - return Ticket(this, stdx::make_unique(s, m, d)); + return Ticket(this, stdx::make_unique(s, m, d)); } Ticket ServiceEntryPointTestSuite::MockTLHarness::_defaultSink(const SessionHandle& s, const Message&, Date_t d) { - return Ticket(this, stdx::make_unique(s, d)); + return Ticket(this, stdx::make_unique(s, d)); } Ticket ServiceEntryPointTestSuite::MockTLHarness::_sinkThenErrorOnWait(const SessionHandle& s, @@ -224,12 +203,12 @@ void ServiceEntryPointTestSuite::MockTLHarness::_resetHooks() { _destroy_hook = kDefaultDestroyHook; } -ServiceEntryPointTestSuite::MockTicket* ServiceEntryPointTestSuite::MockTLHarness::getMockTicket( +transport::MockTicket* ServiceEntryPointTestSuite::MockTLHarness::getMockTicket( const transport::Ticket& ticket) { - return dynamic_cast(getTicketImpl(ticket)); + return dynamic_cast(getTicketImpl(ticket)); } -void ServiceEntryPointTestSuite::MockTLHarness::_destroy(Session& session) { +void ServiceEntryPointTestSuite::MockTLHarness::_destroy(SEPTestSession& session) { return _destroy_hook(session); } @@ -253,10 +232,10 @@ void ServiceEntryPointTestSuite::noLifeCycleTest() { _tl->_wait = stdx::bind(&ServiceEntryPointTestSuite::MockTLHarness::_waitError, _tl.get(), _1); // Step 3: SEP destroys the session, which calls end() - _tl->_destroy_hook = [&testComplete](Session&) { testComplete.set_value(); }; + _tl->_destroy_hook = [&testComplete](SEPTestSession&) { testComplete.set_value(); }; // Kick off the SEP - auto s = Session::create(HostAndPort(), HostAndPort(), _tl.get()); + auto s = SEPTestSession::create(_tl.get()); _sep->startSession(std::move(s)); testFuture.wait(); @@ -281,11 +260,11 @@ void ServiceEntryPointTestSuite::halfLifeCycleTest() { return _tl->_defaultSink(session, m, expiration); }; - // Step 5: SEP destroys the session, which calls end() - _tl->_destroy_hook = [&testComplete](Session&) { testComplete.set_value(); }; + // Step 5: SEP destroys the session, which calls _destroy() + _tl->_destroy_hook = [&testComplete](SEPTestSession&) { testComplete.set_value(); }; // Kick off the SEP - auto s = Session::create(HostAndPort(), HostAndPort(), _tl.get()); + auto s = SEPTestSession::create(_tl.get()); _sep->startSession(std::move(s)); testFuture.wait(); @@ -307,19 +286,19 @@ void ServiceEntryPointTestSuite::fullLifeCycleTest() { // Step 4: SEP calls wait() on the ticket and receives Status::OK() // Step 5: SEP gets a ticket to source a Message // Step 6: SEP calls wait() on the ticket and receives and error - // Step 7: SEP destroys the session, which calls end() - _tl->_destroy_hook = [&testComplete](Session& session) { testComplete.set_value(); }; + // Step 7: SEP destroys the session, which calls _destroy() + _tl->_destroy_hook = [&testComplete](SEPTestSession& session) { testComplete.set_value(); }; // Kick off the SEP - auto s = Session::create(HostAndPort(), HostAndPort(), _tl.get()); + auto s = SEPTestSession::create(_tl.get()); _sep->startSession(std::move(s)); testFuture.wait(); } void ServiceEntryPointTestSuite::interruptingSessionTest() { - auto sA = Session::create(HostAndPort(), HostAndPort(), _tl.get()); - auto sB = Session::create(HostAndPort(), HostAndPort(), _tl.get()); + auto sA = SEPTestSession::create(_tl.get()); + auto sB = SEPTestSession::create(_tl.get()); auto idA = sA->id(); auto idB = sB->id(); int waitCountB = 0; @@ -368,7 +347,7 @@ void ServiceEntryPointTestSuite::interruptingSessionTest() { // Step 7: SEP calls sourceMessage() for B, gets tB3 // Step 8: SEP calls wait() for tB3, gets an error // Step 9: SEP calls end(B) - _tl->_destroy_hook = [this, idA, idB, &resumeA, &testComplete](Session& session) { + _tl->_destroy_hook = [this, idA, idB, &resumeA, &testComplete](SEPTestSession& session) { // When end(B) is called, time to resume session A if (session.id() == idB) { // Resume session A @@ -452,14 +431,14 @@ void ServiceEntryPointTestSuite::burstStressTest(int numSessions, }; // When we end the last session, end the test. - _tl->_destroy_hook = [&allSessionsComplete, numSessions, &ended](Session& session) { + _tl->_destroy_hook = [&allSessionsComplete, numSessions, &ended](SEPTestSession& session) { if (ended.fetchAndAdd(1) == (numSessions - 1)) { allSessionsComplete.set_value(); } }; for (int i = 0; i < numSessions; i++) { - auto s = Session::create(HostAndPort(), HostAndPort(), _tl.get()); + auto s = SEPTestSession::create(_tl.get()); { // This operation may cause a re-hash. stdx::lock_guard lock(cyclesLock); diff --git a/src/mongo/transport/service_entry_point_test_suite.h b/src/mongo/transport/service_entry_point_test_suite.h index d1d3042cc5e..b8e6a2f4b19 100644 --- a/src/mongo/transport/service_entry_point_test_suite.h +++ b/src/mongo/transport/service_entry_point_test_suite.h @@ -28,9 +28,10 @@ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault -#include "mongo/transport/session.h" -#include "mongo/transport/ticket.h" -#include "mongo/transport/ticket_impl.h" +#include + +#include "mongo/transport/mock_session.h" +#include "mongo/transport/mock_ticket.h" #include "mongo/transport/transport_layer.h" #include "mongo/unittest/unittest.h" #include "mongo/util/net/message.h" @@ -85,32 +86,8 @@ public: Milliseconds delay = Milliseconds(0)); void longSessionStressTest(); -private: - /** - * A mock ticket class for our test suite. - */ - class MockTicket : public transport::TicketImpl { - public: - // Source constructor - MockTicket(const transport::SessionHandle& session, Message* message, Date_t expiration); - - // Sink constructor - MockTicket(const transport::SessionHandle& session, Date_t expiration); - - MockTicket(MockTicket&&) = default; - MockTicket& operator=(MockTicket&&) = default; - - transport::Session::Id sessionId() const override; - - Date_t expiration() const override; - - boost::optional message(); - - private: - boost::optional _message; - transport::Session::Id _sessionId; - Date_t _expiration; - }; + class SEPTestSession; + using SEPTestSessionHandle = std::shared_ptr; /** * This class mocks the TransportLayer and allows us to insert hooks beneath @@ -118,6 +95,8 @@ private: */ class MockTLHarness : public transport::TransportLayer { public: + friend class SEPTestSession; + MockTLHarness(); transport::Ticket sourceMessage( @@ -131,14 +110,14 @@ private: Status wait(transport::Ticket&& ticket) override; void asyncWait(transport::Ticket&& ticket, TicketCallback callback) override; SSLPeerInfo getX509PeerInfo(const transport::ConstSessionHandle& session) const override; - void registerTags(const transport::ConstSessionHandle& session) override; + Stats sessionStats() override; void end(const transport::SessionHandle& session) override; void endAllSessions(transport::Session::TagMask tags) override; Status start() override; void shutdown() override; - ServiceEntryPointTestSuite::MockTicket* getMockTicket(const transport::Ticket& ticket); + transport::MockTicket* getMockTicket(const transport::Ticket& ticket); // Mocked method hooks stdx::function @@ -148,7 +127,7 @@ private: stdx::function _wait; stdx::function _asyncWait; stdx::function _end; - stdx::function _destroy_hook; + stdx::function _destroy_hook; stdx::function _endAllSessions = [](transport::Session::TagMask tags) {}; stdx::function _start = [] { return Status::OK(); }; @@ -169,9 +148,30 @@ private: void _resetHooks(); private: - void _destroy(transport::Session& session) override; + void _destroy(SEPTestSession& session); + }; + + /** + * A light wrapper around the mock session class, to handle our destroy logic. + */ + class SEPTestSession : public transport::MockSession { + MockTLHarness* _mockTL; + + public: + static std::shared_ptr create(MockTLHarness* tl) { + std::shared_ptr handle(new SEPTestSession(tl)); + return handle; + } + + ~SEPTestSession() { + _mockTL->_destroy(*this); + } + + private: + explicit SEPTestSession(MockTLHarness* tl) : transport::MockSession(tl), _mockTL(tl) {} }; +private: std::unique_ptr _tl; std::unique_ptr _sep; }; diff --git a/src/mongo/transport/session.cpp b/src/mongo/transport/session.cpp index 4287d62cfd2..2415c9dbb8f 100644 --- a/src/mongo/transport/session.cpp +++ b/src/mongo/transport/session.cpp @@ -43,39 +43,30 @@ AtomicUInt64 sessionIdCounter(0); } // namespace -Session::Session(HostAndPort remote, HostAndPort local, TransportLayer* tl) - : _id(sessionIdCounter.addAndFetch(1)), - _remote(std::move(remote)), - _local(std::move(local)), - _tags(kEmptyTagMask), - _tl(tl) {} +Session::Session() : _id(sessionIdCounter.addAndFetch(1)), _tags(kEmptyTagMask) {} -Session::~Session() { - if (_tl != nullptr) { - _tl->_destroy(*this); - } +Ticket Session::sourceMessage(Message* message, Date_t expiration) { + return getTransportLayer()->sourceMessage(shared_from_this(), message, expiration); } -SessionHandle Session::create(HostAndPort remote, HostAndPort local, TransportLayer* tl) { - std::shared_ptr handle(new Session(std::move(remote), std::move(local), tl)); - return handle; +Ticket Session::sinkMessage(const Message& message, Date_t expiration) { + return getTransportLayer()->sinkMessage(shared_from_this(), message, expiration); } -void Session::replaceTags(TagMask tags) { - _tags = tags; - _tl->registerTags(shared_from_this()); +SSLPeerInfo Session::getX509PeerInfo() const { + return getTransportLayer()->getX509PeerInfo(shared_from_this()); } -Ticket Session::sourceMessage(Message* message, Date_t expiration) { - return _tl->sourceMessage(shared_from_this(), message, expiration); +void Session::replaceTags(TagMask tags) { + _tags = tags; } -Ticket Session::sinkMessage(const Message& message, Date_t expiration) { - return _tl->sinkMessage(shared_from_this(), message, expiration); +Session::TagMask Session::getTags() const { + return _tags; } -SSLPeerInfo Session::getX509PeerInfo() const { - return _tl->getX509PeerInfo(shared_from_this()); +MessageCompressorManager& Session::getCompressorManager() { + return _messageCompressorManager; } } // namespace transport diff --git a/src/mongo/transport/session.h b/src/mongo/transport/session.h index 29e54333ceb..3d81543a7cc 100644 --- a/src/mongo/transport/session.h +++ b/src/mongo/transport/session.h @@ -76,12 +76,7 @@ public: /** * Destroys a session, calling end() for this session in its TransportLayer. */ - ~Session(); - - /** - * A factory for sessions. - */ - static SessionHandle create(HostAndPort remote, HostAndPort local, TransportLayer* tl); + virtual ~Session() = default; /** * Return the id for this session. @@ -91,78 +86,67 @@ public: } /** - * Return the remote host for this session. + * The TransportLayer for this Session. */ - const HostAndPort& remote() const { - return _remote; - } + virtual TransportLayer* getTransportLayer() const = 0; /** - * Return the local host information for this session. + * Source (receive) a new Message for this Session. + * + * This method will forward to sourceMessage on this Session's transport layer. */ - const HostAndPort& local() const { - return _local; - } + virtual Ticket sourceMessage(Message* message, Date_t expiration = Ticket::kNoExpirationDate); /** - * Return the X509 peer information for this connection (SSL only). + * Sink (send) a new Message for this Session. This method should be used + * to send replies to a given host. + * + * This method will forward to sinkMessage on this Session's transport layer. */ - SSLPeerInfo getX509PeerInfo() const; + virtual Ticket sinkMessage(const Message& message, + Date_t expiration = Ticket::kNoExpirationDate); /** - * Set this session's tags. This Session will register - * its new tags with its TransportLayer. + * Return the X509 peer information for this connection (SSL only). */ - void replaceTags(TagMask tags); + virtual SSLPeerInfo getX509PeerInfo() const; /** - * Get this session's tags. + * Return the remote host for this session. */ - TagMask getTags() const { - return _tags; - } + virtual const HostAndPort& remote() const = 0; /** - * Source (receive) a new Message for this Session. - * - * This method will forward to sourceMessage on this Session's transport layer. + * Return the local host information for this session. */ - Ticket sourceMessage(Message* message, Date_t expiration = Ticket::kNoExpirationDate); + virtual const HostAndPort& local() const = 0; /** - * Sink (send) a new Message for this Session. This method should be used - * to send replies to a given host. - * - * This method will forward to sinkMessage on this Session's transport layer. + * Set this session's tags. This Session will register + * its new tags with its TransportLayer. */ - Ticket sinkMessage(const Message& message, Date_t expiration = Ticket::kNoExpirationDate); + virtual void replaceTags(TagMask tags); /** - * The TransportLayer for this Session. + * Get this session's tags. */ - TransportLayer* getTransportLayer() const { - return _tl; - } + virtual TagMask getTags() const; - MessageCompressorManager& getCompressorManager() { - return _messageCompressorManager; - } + /** + * Get the compressor manager for this session. + */ + virtual MessageCompressorManager& getCompressorManager(); -private: +protected: /** * Construct a new session. */ - Session(HostAndPort remote, HostAndPort local, TransportLayer* tl); - - Id _id; + Session(); - HostAndPort _remote; - HostAndPort _local; +private: + const Id _id; TagMask _tags; - - TransportLayer* _tl; - MessageCompressorManager _messageCompressorManager; }; diff --git a/src/mongo/transport/ticket_impl.h b/src/mongo/transport/ticket_impl.h index 22adf9414e7..c6d325dbc37 100644 --- a/src/mongo/transport/ticket_impl.h +++ b/src/mongo/transport/ticket_impl.h @@ -47,9 +47,6 @@ class TicketImpl { public: virtual ~TicketImpl() = default; - TicketImpl(TicketImpl&&) = default; - TicketImpl& operator=(TicketImpl&&) = default; - /** * Return this ticket's session id. */ diff --git a/src/mongo/transport/transport_layer.h b/src/mongo/transport/transport_layer.h index b09ff8a8eb9..f2c82c678a2 100644 --- a/src/mongo/transport/transport_layer.h +++ b/src/mongo/transport/transport_layer.h @@ -147,15 +147,6 @@ public: */ virtual void asyncWait(Ticket&& ticket, TicketCallback callback) = 0; - /** - * Tag this Session within the TransportLayer with the tags currently assigned to the - * Session. If endAllSessions() is called with a matching - * Session::TagMask, this Session will not be ended. - * - * Before calling this method, use Session::replaceTags() to set the desired TagMask. - */ - virtual void registerTags(const ConstSessionHandle& session) = 0; - /** * Return the stored X509 peer information for this session. If the session does not * exist in this TransportLayer, returns a default constructed object. @@ -221,13 +212,6 @@ protected: TransportLayer* getTicketTransportLayer(const Ticket& ticket) { return ticket._tl; } - -private: - /** - * Destroys any information linked to this Session in the TransportLayer. - * This should only be called from within Session's destructor. - */ - virtual void _destroy(Session& session) = 0; }; } // namespace transport diff --git a/src/mongo/transport/transport_layer_legacy.cpp b/src/mongo/transport/transport_layer_legacy.cpp index b5e3c96c6e4..789b41529fa 100644 --- a/src/mongo/transport/transport_layer_legacy.cpp +++ b/src/mongo/transport/transport_layer_legacy.cpp @@ -30,6 +30,8 @@ #include "mongo/platform/basic.h" +#include + #include "mongo/transport/transport_layer_legacy.h" #include "mongo/base/checked_cast.h" @@ -64,10 +66,35 @@ TransportLayerLegacy::TransportLayerLegacy(const TransportLayerLegacy::Options& _running(false), _options(opts) {} -TransportLayerLegacy::LegacyTicket::LegacyTicket(const SessionHandle& session, +std::shared_ptr TransportLayerLegacy::LegacySession::create( + std::unique_ptr amp, TransportLayerLegacy* tl) { + std::shared_ptr handle(new LegacySession(std::move(amp), tl)); + return handle; +} + +TransportLayerLegacy::LegacySession::LegacySession(std::unique_ptr amp, + TransportLayerLegacy* tl) + : _remote(amp->remote()), + _local(amp->localAddr().toString(true)), + _tl(tl), + _tags(kEmptyTagMask), + _connection(stdx::make_unique(std::move(amp))) {} + +TransportLayerLegacy::LegacySession::~LegacySession() { + _tl->_destroy(*this); +} + +TransportLayerLegacy::LegacyTicket::LegacyTicket(const LegacySessionHandle& session, Date_t expiration, WorkHandle work) - : _sessionId(session->id()), _expiration(expiration), _fill(std::move(work)) {} + : _session(session), + _sessionId(session->id()), + _expiration(expiration), + _fill(std::move(work)) {} + +TransportLayerLegacy::LegacySessionHandle TransportLayerLegacy::LegacyTicket::getSession() { + return _session.lock(); +} Session::Id TransportLayerLegacy::LegacyTicket::sessionId() const { return _sessionId; @@ -77,6 +104,10 @@ Date_t TransportLayerLegacy::LegacyTicket::expiration() const { return _expiration; } +Status TransportLayerLegacy::LegacyTicket::fill(AbstractMessagingPort* amp) { + return _fill(amp); +} + Status TransportLayerLegacy::setup() { if (!_listener->setupSockets()) { error() << "Failed to set up sockets during startup."; @@ -118,27 +149,22 @@ Ticket TransportLayerLegacy::sourceMessage(const SessionHandle& session, return Status::OK(); }; - return Ticket(this, stdx::make_unique(session, expiration, std::move(sourceCb))); + auto legacySession = checked_pointer_cast(session); + return Ticket( + this, + stdx::make_unique(std::move(legacySession), expiration, std::move(sourceCb))); } SSLPeerInfo TransportLayerLegacy::getX509PeerInfo(const ConstSessionHandle& session) const { - { - stdx::lock_guard lk(_connectionsMutex); - auto conn = _connections.find(session->id()); - if (conn == _connections.end()) { - // Return empty string if the session is not found - return SSLPeerInfo(); - } - - return conn->second.sslPeerInfo.value_or(SSLPeerInfo()); - } + auto legacySession = checked_pointer_cast(session); + return legacySession->conn()->sslPeerInfo.value_or(SSLPeerInfo()); } TransportLayer::Stats TransportLayerLegacy::sessionStats() { Stats stats; { - stdx::lock_guard lk(_connectionsMutex); - stats.numOpenSessions = _connections.size(); + stdx::lock_guard lk(_sessionsMutex); + stats.numOpenSessions = _sessions.size(); } stats.numAvailableSessions = Listener::globalTicketHolder.available(); @@ -167,7 +193,10 @@ Ticket TransportLayerLegacy::sinkMessage(const SessionHandle& session, } }; - return Ticket(this, stdx::make_unique(session, expiration, std::move(sinkCb))); + auto legacySession = checked_pointer_cast(session); + return Ticket( + this, + stdx::make_unique(std::move(legacySession), expiration, std::move(sinkCb))); } Status TransportLayerLegacy::wait(Ticket&& ticket) { @@ -182,45 +211,32 @@ void TransportLayerLegacy::asyncWait(Ticket&& ticket, TicketCallback callback) { } void TransportLayerLegacy::end(const SessionHandle& session) { - stdx::lock_guard lk(_connectionsMutex); - auto conn = _connections.find(session->id()); - if (conn != _connections.end()) { - _endSession_inlock(conn); - } + auto legacySession = checked_pointer_cast(session); + _closeConnection(legacySession->conn()); } -void TransportLayerLegacy::registerTags(const ConstSessionHandle& session) { - stdx::lock_guard lk(_connectionsMutex); - auto conn = _connections.find(session->id()); - if (conn != _connections.end()) { - conn->second.tags = session->getTags(); - } -} - -void TransportLayerLegacy::_endSession_inlock( - decltype(TransportLayerLegacy::_connections.begin()) conn) { - conn->second.ended = true; - conn->second.amp->shutdown(); +void TransportLayerLegacy::_closeConnection(Connection* conn) { + conn->closed = true; + conn->amp->shutdown(); Listener::globalTicketHolder.release(); } void TransportLayerLegacy::endAllSessions(Session::TagMask tags) { - log() << "legacy transport layer ending all sessions"; + log() << "legacy transport layer closing all connections"; { - stdx::lock_guard lk(_connectionsMutex); - auto&& conn = _connections.begin(); - while (conn != _connections.end()) { - // If we erase this connection below, we invalidate our iterator, use a placeholder. - auto placeholder = conn; - placeholder++; - - if (conn->second.tags & tags) { - log() << "Skip closing connection for connection # " << conn->second.connectionId; - } else { - _endSession_inlock(conn); + stdx::lock_guard lk(_sessionsMutex); + for (auto&& it : _sessions) { + + // Attempt to make our weak_ptr into a shared_ptr + auto session = it.lock(); + if (session) { + if (session->getTags() & tags) { + log() << "Skip closing connection for connection # " + << session->conn()->connectionId; + } else { + _closeConnection(session->conn()); + } } - - conn = placeholder; } } } @@ -232,17 +248,13 @@ void TransportLayerLegacy::shutdown() { endAllSessions(Session::kEmptyTagMask); } -void TransportLayerLegacy::_destroy(Session& session) { - stdx::lock_guard lk(_connectionsMutex); - auto conn = _connections.find(session.id()); - - invariant(conn != _connections.end()); - if (!conn->second.ended) { - _endSession_inlock(conn); +void TransportLayerLegacy::_destroy(LegacySession& session) { + if (!session.conn()->closed) { + _closeConnection(session.conn()); } - invariant(!conn->second.inUse); - _connections.erase(conn); + stdx::lock_guard lk(_sessionsMutex); + _sessions.erase(session.getIter()); } Status TransportLayerLegacy::_runTicket(Ticket ticket) { @@ -254,53 +266,35 @@ Status TransportLayerLegacy::_runTicket(Ticket ticket) { return Ticket::ExpiredStatus; } - AbstractMessagingPort* amp; - - { - stdx::lock_guard lk(_connectionsMutex); - - // Error if we cannot find the session. - auto conn = _connections.find(ticket.sessionId()); - if (conn == _connections.end()) { - return TransportLayer::TicketSessionUnknownStatus; - } - - // Error if we find the session but its connection is closed. - if (conn->second.ended) { - return TransportLayer::TicketSessionClosedStatus; - } + // get the weak_ptr out of the ticket + // attempt to make it into a shared_ptr + auto legacyTicket = checked_cast(getTicketImpl(ticket)); + auto session = legacyTicket->getSession(); + if (!session) { + return TransportLayer::TicketSessionClosedStatus; + } - // "check out" the port - conn->second.inUse = true; - amp = conn->second.amp.get(); + auto conn = session->conn(); + if (conn->closed) { + return TransportLayer::TicketSessionClosedStatus; } - auto legacyTicket = checked_cast(getTicketImpl(ticket)); Status res = Status::OK(); - try { - res = legacyTicket->_fill(amp); + res = legacyTicket->fill(conn->amp.get()); } catch (...) { res = exceptionToStatus(); } - { - stdx::lock_guard lk(_connectionsMutex); - - auto conn = _connections.find(ticket.sessionId()); - invariant(conn != _connections.end()); - #ifdef MONGO_CONFIG_SSL - // If we didn't have an X509 subject name, see if we have one now - if (!conn->second.sslPeerInfo) { - auto info = amp->getX509PeerInfo(); - if (info.subjectName != "") { - conn->second.sslPeerInfo = info; - } + // If we didn't have an X509 subject name, see if we have one now + if (!conn->sslPeerInfo) { + auto info = conn->amp->getX509PeerInfo(); + if (info.subjectName != "") { + conn->sslPeerInfo = info; } -#endif - conn->second.inUse = false; } +#endif return res; } @@ -313,16 +307,17 @@ void TransportLayerLegacy::_handleNewConnection(std::unique_ptrremote(), HostAndPort(amp->localAddr().toString(true)), this); - amp->setLogLevel(logger::LogSeverity::Debug(1)); + auto session = LegacySession::create(std::move(amp), this); + + stdx::list> list; + auto it = list.emplace(list.begin(), session); { - stdx::lock_guard lk(_connectionsMutex); - _connections.emplace(std::piecewise_construct, - std::forward_as_tuple(session->id()), - std::forward_as_tuple(std::move(amp), false, session->getTags())); + // Add the new session to our list + stdx::lock_guard lk(_sessionsMutex); + session->setIter(it); + _sessions.splice(_sessions.begin(), list, it); } invariant(_sep); diff --git a/src/mongo/transport/transport_layer_legacy.h b/src/mongo/transport/transport_layer_legacy.h index 3d348076ad7..dab651bb86e 100644 --- a/src/mongo/transport/transport_layer_legacy.h +++ b/src/mongo/transport/transport_layer_legacy.h @@ -28,10 +28,10 @@ #pragma once +#include "mongo/stdx/list.h" #include "mongo/stdx/memory.h" #include "mongo/stdx/mutex.h" #include "mongo/stdx/thread.h" -#include "mongo/stdx/unordered_map.h" #include "mongo/transport/ticket_impl.h" #include "mongo/transport/transport_layer.h" #include "mongo/util/net/listen.h" @@ -77,7 +77,6 @@ public: Status wait(Ticket&& ticket) override; void asyncWait(Ticket&& ticket, TicketCallback callback) override; - void registerTags(const ConstSessionHandle& session) override; SSLPeerInfo getX509PeerInfo(const ConstSessionHandle& session) const override; Stats sessionStats() override; @@ -88,7 +87,12 @@ public: void shutdown() override; private: - void _destroy(Session& session) override; + class LegacySession; + using LegacySessionHandle = std::shared_ptr; + using ConstLegacySessionHandle = std::shared_ptr; + using SessionEntry = std::list>::iterator; + + void _destroy(LegacySession& session); void _handleNewConnection(std::unique_ptr amp); @@ -97,6 +101,78 @@ private: using NewConnectionCb = stdx::function)>; using WorkHandle = stdx::function; + /** + * Connection object, to associate Sessions with AbstractMessagingPorts. + */ + struct Connection { + MONGO_DISALLOW_COPYING(Connection); + + Connection(std::unique_ptr port) + : amp(std::move(port)), connectionId(amp->connectionId()) {} + + std::unique_ptr amp; + + const long long connectionId; + + boost::optional sslPeerInfo; + bool closed = false; + }; + + /** + * An implementation of the Session interface for this TransportLayer. + */ + class LegacySession : public Session { + MONGO_DISALLOW_COPYING(LegacySession); + + public: + ~LegacySession(); + + static std::shared_ptr create(std::unique_ptr amp, + TransportLayerLegacy* tl); + + TransportLayer* getTransportLayer() const override { + return _tl; + } + + const HostAndPort& remote() const override { + return _remote; + } + + const HostAndPort& local() const override { + return _local; + } + + Connection* conn() const { + return _connection.get(); + } + + void setIter(SessionEntry it) { + _entry = std::move(it); + } + + SessionEntry getIter() const { + return _entry; + } + + private: + explicit LegacySession(std::unique_ptr amp, + TransportLayerLegacy* tl); + + HostAndPort _remote; + HostAndPort _local; + + TransportLayerLegacy* _tl; + + TagMask _tags; + + MessageCompressorManager _messageCompressorManager; + + std::unique_ptr _connection; + + // A handle to this session's entry in the TL's session list + SessionEntry _entry; + }; + /** * A TicketImpl implementation for this TransportLayer. WorkHandle is a callable that * can be invoked to fill this ticket. @@ -105,11 +181,25 @@ private: MONGO_DISALLOW_COPYING(LegacyTicket); public: - LegacyTicket(const SessionHandle& session, Date_t expiration, WorkHandle work); + LegacyTicket(const LegacySessionHandle& session, Date_t expiration, WorkHandle work); SessionId sessionId() const override; Date_t expiration() const override; + /** + * If this ticket's session is still alive, return a shared_ptr. Otherwise, + * return nullptr. + */ + LegacySessionHandle getSession(); + + /** + * Run this ticket's work item. + */ + Status fill(AbstractMessagingPort* amp); + + private: + std::weak_ptr _session; + SessionId _sessionId; Date_t _expiration; @@ -136,32 +226,16 @@ private: NewConnectionCb _accepted; }; - /** - * Connection object, to associate Session ids with AbstractMessagingPorts. - */ - struct Connection { - Connection(std::unique_ptr port, bool ended, Session::TagMask tags) - : amp(std::move(port)), connectionId(amp->connectionId()), tags(tags) {} - - std::unique_ptr amp; - - const long long connectionId; - - boost::optional sslPeerInfo; - Session::TagMask tags; - bool inUse = false; - bool ended = false; - }; + void _closeConnection(Connection* conn); ServiceEntryPoint* _sep; std::unique_ptr _listener; stdx::thread _listenerThread; - mutable stdx::mutex _connectionsMutex; - stdx::unordered_map _connections; - - void _endSession_inlock(decltype(_connections.begin()) conn); + // TransportLayerLegacy holds non-owning pointers to all of its sessions. + mutable stdx::mutex _sessionsMutex; + stdx::list> _sessions; AtomicWord _running; diff --git a/src/mongo/transport/transport_layer_manager.cpp b/src/mongo/transport/transport_layer_manager.cpp index d29c01f54f5..6ca73193804 100644 --- a/src/mongo/transport/transport_layer_manager.cpp +++ b/src/mongo/transport/transport_layer_manager.cpp @@ -97,10 +97,6 @@ TransportLayer::Stats TransportLayerManager::sessionStats() { return stats; } -void TransportLayerManager::registerTags(const ConstSessionHandle& session) { - session->getTransportLayer()->registerTags(session); -} - void TransportLayerManager::end(const SessionHandle& session) { session->getTransportLayer()->end(session); } @@ -126,9 +122,5 @@ Status TransportLayerManager::addAndStartTransportLayer(std::unique_ptrstart(); } -void TransportLayerManager::_destroy(Session& session) { - MONGO_UNREACHABLE; -} - } // namespace transport } // namespace mongo diff --git a/src/mongo/transport/transport_layer_manager.h b/src/mongo/transport/transport_layer_manager.h index deba17146cc..d477fa9b75c 100644 --- a/src/mongo/transport/transport_layer_manager.h +++ b/src/mongo/transport/transport_layer_manager.h @@ -65,7 +65,6 @@ public: void asyncWait(Ticket&& ticket, TicketCallback callback) override; SSLPeerInfo getX509PeerInfo(const ConstSessionHandle& session) const override; - void registerTags(const ConstSessionHandle& session) override; Stats sessionStats() override; @@ -78,8 +77,6 @@ public: Status addAndStartTransportLayer(std::unique_ptr tl); private: - void _destroy(Session& session) override; - template void _foreach(Callable&& cb); diff --git a/src/mongo/transport/transport_layer_mock.cpp b/src/mongo/transport/transport_layer_mock.cpp index 550c8585f95..67cf2519b83 100644 --- a/src/mongo/transport/transport_layer_mock.cpp +++ b/src/mongo/transport/transport_layer_mock.cpp @@ -32,9 +32,8 @@ #include "mongo/base/status.h" #include "mongo/stdx/memory.h" -#include "mongo/transport/session.h" -#include "mongo/transport/ticket.h" -#include "mongo/transport/ticket_impl.h" +#include "mongo/transport/mock_session.h" +#include "mongo/transport/mock_ticket.h" #include "mongo/transport/transport_layer.h" #include "mongo/util/net/message.h" #include "mongo/util/time_support.h" @@ -42,26 +41,6 @@ namespace mongo { namespace transport { -TransportLayerMock::TicketMock::TicketMock(const SessionHandle& session, - Message* message, - Date_t expiration) - : _session(session), _message(message), _expiration(expiration) {} - -TransportLayerMock::TicketMock::TicketMock(const SessionHandle& session, Date_t expiration) - : _session(session), _expiration(expiration) {} - -Session::Id TransportLayerMock::TicketMock::sessionId() const { - return _session->id(); -} - -Date_t TransportLayerMock::TicketMock::expiration() const { - return _expiration; -} - -boost::optional TransportLayerMock::TicketMock::msg() const { - return _message; -} - TransportLayerMock::TransportLayerMock() : _shutdown(false) {} Ticket TransportLayerMock::sourceMessage(const SessionHandle& session, @@ -75,8 +54,7 @@ Ticket TransportLayerMock::sourceMessage(const SessionHandle& session, return Ticket(TransportLayer::TicketSessionClosedStatus); } - return Ticket(this, - stdx::make_unique(session, message, expiration)); + return Ticket(this, stdx::make_unique(session, message, expiration)); } Ticket TransportLayerMock::sinkMessage(const SessionHandle& session, @@ -90,7 +68,7 @@ Ticket TransportLayerMock::sinkMessage(const SessionHandle& session, return Ticket(TransportLayer::TicketSessionClosedStatus); } - return Ticket(this, stdx::make_unique(session, expiration)); + return Ticket(this, stdx::make_unique(session, expiration)); } Status TransportLayerMock::wait(Ticket&& ticket) { @@ -124,10 +102,8 @@ TransportLayer::Stats TransportLayerMock::sessionStats() { return Stats(); } -void TransportLayerMock::registerTags(const ConstSessionHandle& session) {} - SessionHandle TransportLayerMock::createSession() { - auto session = Session::create(HostAndPort(), HostAndPort(), this); + auto session = MockSession::create(this); Session::Id sessionId = session->id(); _sessions[sessionId] = Connection{false, session, SSLPeerInfo()}; @@ -179,7 +155,5 @@ TransportLayerMock::~TransportLayerMock() { shutdown(); } -void TransportLayerMock::_destroy(Session& session) {} - } // namespace transport } // namespace mongo diff --git a/src/mongo/transport/transport_layer_mock.h b/src/mongo/transport/transport_layer_mock.h index 420360a44de..34d6f3c451c 100644 --- a/src/mongo/transport/transport_layer_mock.h +++ b/src/mongo/transport/transport_layer_mock.h @@ -48,31 +48,6 @@ class TransportLayerMock : public TransportLayer { MONGO_DISALLOW_COPYING(TransportLayerMock); public: - class TicketMock : public TicketImpl { - public: - // Source constructor - TicketMock(const SessionHandle& session, - Message* message, - Date_t expiration = Ticket::kNoExpirationDate); - - // Sink constructor - TicketMock(const SessionHandle& session, Date_t expiration = Ticket::kNoExpirationDate); - - TicketMock(TicketMock&&) = default; - TicketMock& operator=(TicketMock&&) = default; - - SessionId sessionId() const override; - - Date_t expiration() const override; - - boost::optional msg() const; - - private: - const SessionHandle& _session; - boost::optional _message; - Date_t _expiration; - }; - TransportLayerMock(); ~TransportLayerMock(); @@ -88,7 +63,6 @@ public: SSLPeerInfo getX509PeerInfo(const ConstSessionHandle& session) const override; void setX509PeerInfo(const SessionHandle& session, SSLPeerInfo peerInfo); - void registerTags(const ConstSessionHandle& session) override; Stats sessionStats() override; @@ -103,8 +77,6 @@ public: bool inShutdown() const; private: - void _destroy(Session& session) override; - struct Connection { bool ended; SessionHandle session; diff --git a/src/mongo/transport/transport_layer_mock_test.cpp b/src/mongo/transport/transport_layer_mock_test.cpp index 9876db68d8e..1cf90b395ff 100644 --- a/src/mongo/transport/transport_layer_mock_test.cpp +++ b/src/mongo/transport/transport_layer_mock_test.cpp @@ -29,6 +29,7 @@ #include "mongo/platform/basic.h" #include "mongo/stdx/memory.h" +#include "mongo/transport/mock_ticket.h" #include "mongo/transport/session.h" #include "mongo/transport/transport_layer.h" #include "mongo/transport/transport_layer_mock.h" @@ -170,7 +171,7 @@ TEST_F(TransportLayerMockTest, SourceMessageTLShutdown) { // wait() returns an OK status TEST_F(TransportLayerMockTest, Wait) { SessionHandle session = tl()->createSession(); - Ticket ticket = Ticket(tl(), stdx::make_unique(session)); + Ticket ticket = Ticket(tl(), stdx::make_unique(session)); Status status = tl()->wait(std::move(ticket)); ASSERT_OK(status); @@ -180,7 +181,7 @@ TEST_F(TransportLayerMockTest, Wait) { TEST_F(TransportLayerMockTest, WaitExpiredTicket) { SessionHandle session = tl()->createSession(); Ticket expiredTicket = - Ticket(tl(), stdx::make_unique(session, Date_t::now())); + Ticket(tl(), stdx::make_unique(session, Date_t::now())); Status status = tl()->wait(std::move(expiredTicket)); ASSERT_EQUALS(status.code(), ErrorCodes::ExceededTimeLimit); @@ -198,7 +199,7 @@ TEST_F(TransportLayerMockTest, WaitInvalidTicket) { // wait() returns a SessionClosed error status if the Ticket's Session is closed TEST_F(TransportLayerMockTest, WaitSessionClosed) { SessionHandle session = tl()->createSession(); - Ticket ticket = Ticket(tl(), stdx::make_unique(session)); + Ticket ticket = Ticket(tl(), stdx::make_unique(session)); tl()->end(session); @@ -211,7 +212,7 @@ TEST_F(TransportLayerMockTest, WaitSessionClosed) { TEST_F(TransportLayerMockTest, WaitSessionUnknown) { std::unique_ptr anotherTL = stdx::make_unique(); SessionHandle session = anotherTL->createSession(); - Ticket ticket = Ticket(tl(), stdx::make_unique(session)); + Ticket ticket = Ticket(tl(), stdx::make_unique(session)); Status status = tl()->wait(std::move(ticket)); ASSERT_EQUALS(status.code(), ErrorCodes::TransportSessionUnknown); @@ -220,7 +221,7 @@ TEST_F(TransportLayerMockTest, WaitSessionUnknown) { // wait() returns a ShutdownInProgress status if the TransportLayer is in shutdown TEST_F(TransportLayerMockTest, WaitTLShutdown) { SessionHandle session = tl()->createSession(); - Ticket ticket = Ticket(tl(), stdx::make_unique(session)); + Ticket ticket = Ticket(tl(), stdx::make_unique(session)); tl()->shutdown(); @@ -242,7 +243,7 @@ void assertEnded(TransportLayer* tl, std::vector sessions, ErrorCodes::Error code = ErrorCodes::TransportSessionClosed) { for (auto session : sessions) { - Ticket ticket = Ticket(tl, stdx::make_unique(session)); + Ticket ticket = Ticket(tl, stdx::make_unique(session)); Status status = tl->wait(std::move(ticket)); ASSERT_EQUALS(status.code(), code); } -- cgit v1.2.1