summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsamantharitter <samantha.ritter@10gen.com>2016-11-02 18:11:55 -0400
committersamantharitter <samantha.ritter@10gen.com>2016-11-08 16:30:37 -0500
commit1c2c402147d3e6fea734ffd16784cdb1e82da91d (patch)
treed63e852a26e18c0463dc363642433568b187dab1
parent158db044bb21a2ff39c2984a35850a28c9572c8d (diff)
downloadmongo-1c2c402147d3e6fea734ffd16784cdb1e82da91d.tar.gz
SERVER-26674 Remove uses of locks in TransportLayerLegacy
- transport::Session becomes an abstract interface - narrow the TransportLayer interface - LegacySession owns its Connection object for fast access - refactor mock session and ticket classes into one common class
-rw-r--r--src/mongo/s/sharding_test_fixture.cpp5
-rw-r--r--src/mongo/transport/mock_session.h80
-rw-r--r--src/mongo/transport/mock_ticket.h78
-rw-r--r--src/mongo/transport/service_entry_point_test_suite.cpp61
-rw-r--r--src/mongo/transport/service_entry_point_test_suite.h66
-rw-r--r--src/mongo/transport/session.cpp35
-rw-r--r--src/mongo/transport/session.h80
-rw-r--r--src/mongo/transport/ticket_impl.h3
-rw-r--r--src/mongo/transport/transport_layer.h16
-rw-r--r--src/mongo/transport/transport_layer_legacy.cpp191
-rw-r--r--src/mongo/transport/transport_layer_legacy.h122
-rw-r--r--src/mongo/transport/transport_layer_manager.cpp8
-rw-r--r--src/mongo/transport/transport_layer_manager.h3
-rw-r--r--src/mongo/transport/transport_layer_mock.cpp36
-rw-r--r--src/mongo/transport/transport_layer_mock.h28
-rw-r--r--src/mongo/transport/transport_layer_mock_test.cpp13
16 files changed, 462 insertions, 363 deletions
diff --git a/src/mongo/s/sharding_test_fixture.cpp b/src/mongo/s/sharding_test_fixture.cpp
index 715e6d9bbfe..a6fe9d431d6 100644
--- a/src/mongo/s/sharding_test_fixture.cpp
+++ b/src/mongo/s/sharding_test_fixture.cpp
@@ -68,6 +68,7 @@
#include "mongo/s/write_ops/batched_command_request.h"
#include "mongo/s/write_ops/batched_command_response.h"
#include "mongo/stdx/memory.h"
+#include "mongo/transport/mock_session.h"
#include "mongo/transport/transport_layer.h"
#include "mongo/transport/transport_layer_mock.h"
#include "mongo/util/clock_source_mock.h"
@@ -101,7 +102,7 @@ void ShardingTestFixture::setUp() {
_transportLayer = tlMock.get();
_service->addAndStartTransportLayer(std::move(tlMock));
CollatorFactoryInterface::set(_service.get(), stdx::make_unique<CollatorFactoryMock>());
- _transportSession = transport::Session::create(HostAndPort{}, HostAndPort{}, _transportLayer);
+ _transportSession = transport::MockSession::create(_transportLayer);
_client = _service->makeClient("ShardingTestFixture", _transportSession);
_opCtx = _client->makeOperationContext();
@@ -494,7 +495,7 @@ void ShardingTestFixture::expectCount(const HostAndPort& configHost,
}
void ShardingTestFixture::setRemote(const HostAndPort& remote) {
- _transportSession = transport::Session::create(remote, HostAndPort{}, _transportLayer);
+ _transportSession = transport::MockSession::create(remote, HostAndPort{}, _transportLayer);
}
void ShardingTestFixture::checkReadConcern(const BSONObj& cmdObj,
diff --git a/src/mongo/transport/mock_session.h b/src/mongo/transport/mock_session.h
new file mode 100644
index 00000000000..147e13e4f7e
--- /dev/null
+++ b/src/mongo/transport/mock_session.h
@@ -0,0 +1,80 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/transport/session.h"
+#include "mongo/util/net/hostandport.h"
+
+namespace mongo {
+namespace transport {
+
+class TransportLayer;
+
+class MockSession : public Session {
+ MONGO_DISALLOW_COPYING(MockSession);
+
+public:
+ static std::shared_ptr<MockSession> create(TransportLayer* tl) {
+ std::shared_ptr<MockSession> handle(new MockSession(tl));
+ return handle;
+ }
+
+ static std::shared_ptr<MockSession> create(HostAndPort remote,
+ HostAndPort local,
+ TransportLayer* tl) {
+ std::shared_ptr<MockSession> handle(
+ new MockSession(std::move(remote), std::move(local), tl));
+ return handle;
+ }
+
+ TransportLayer* getTransportLayer() const override {
+ return _tl;
+ }
+
+ const HostAndPort& remote() const override {
+ return _remote;
+ }
+
+ const HostAndPort& local() const override {
+ return _local;
+ }
+
+protected:
+ explicit MockSession(TransportLayer* tl) : _tl(tl), _remote(), _local() {}
+ explicit MockSession(HostAndPort remote, HostAndPort local, TransportLayer* tl)
+ : _tl(tl), _remote(std::move(remote)), _local(std::move(local)) {}
+
+ TransportLayer* _tl;
+
+ HostAndPort _remote;
+ HostAndPort _local;
+};
+
+} // namespace transport
+} // namespace mongo
diff --git a/src/mongo/transport/mock_ticket.h b/src/mongo/transport/mock_ticket.h
new file mode 100644
index 00000000000..437daae23f6
--- /dev/null
+++ b/src/mongo/transport/mock_ticket.h
@@ -0,0 +1,78 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/transport/session.h"
+#include "mongo/transport/ticket_impl.h"
+#include "mongo/util/time_support.h"
+
+namespace mongo {
+
+class Message;
+
+namespace transport {
+
+/**
+ * A mock ticket class for our test suite.
+ */
+class MockTicket : public TicketImpl {
+ MONGO_DISALLOW_COPYING(MockTicket);
+
+public:
+ // Source constructor
+ MockTicket(const SessionHandle& session,
+ Message* message,
+ Date_t expiration = Ticket::kNoExpirationDate)
+ : _id(session->id()), _message(message), _expiration(expiration) {}
+
+ // Sink constructor
+ MockTicket(const SessionHandle& session, Date_t expiration = Ticket::kNoExpirationDate)
+ : _id(session->id()), _expiration(expiration) {}
+
+ SessionId sessionId() const override {
+ return _id;
+ }
+
+ Date_t expiration() const override {
+ return _expiration;
+ }
+
+ boost::optional<Message*> message() const {
+ return _message;
+ }
+
+private:
+ Session::Id _id;
+ boost::optional<Message*> _message;
+ Date_t _expiration;
+};
+
+} // namespace transport
+} // namespace mongo
diff --git a/src/mongo/transport/service_entry_point_test_suite.cpp b/src/mongo/transport/service_entry_point_test_suite.cpp
index 4587ac8ea8b..5d2945919e3 100644
--- a/src/mongo/transport/service_entry_point_test_suite.cpp
+++ b/src/mongo/transport/service_entry_point_test_suite.cpp
@@ -59,6 +59,7 @@ using namespace transport;
using namespace stdx::placeholders;
using TicketCallback = TransportLayer::TicketCallback;
+using SEPTestSession = ServiceEntryPointTestSuite::SEPTestSession;
namespace {
@@ -91,7 +92,7 @@ void setPingCommand(Message* m) {
// Some default method implementations
const auto kDefaultEnd = [](const SessionHandle& session) { return; };
-const auto kDefaultDestroyHook = [](Session& session) { return; };
+const auto kDefaultDestroyHook = [](SEPTestSession& session) { return; };
const auto kDefaultAsyncWait = [](Ticket, TicketCallback cb) { cb(Status::OK()); };
const auto kNoopFunction = [] { return; };
@@ -100,26 +101,6 @@ const auto kEndConnectionStatus = Status(ErrorCodes::HostUnreachable, "connectio
} // namespace
-ServiceEntryPointTestSuite::MockTicket::MockTicket(const SessionHandle& session,
- Message* message,
- Date_t expiration)
- : _message(message), _sessionId(session->id()), _expiration(expiration) {}
-
-ServiceEntryPointTestSuite::MockTicket::MockTicket(const SessionHandle& session, Date_t expiration)
- : _sessionId(session->id()), _expiration(expiration) {}
-
-Session::Id ServiceEntryPointTestSuite::MockTicket::sessionId() const {
- return _sessionId;
-}
-
-Date_t ServiceEntryPointTestSuite::MockTicket::expiration() const {
- return _expiration;
-}
-
-boost::optional<Message*> ServiceEntryPointTestSuite::MockTicket::message() {
- return _message;
-}
-
ServiceEntryPointTestSuite::MockTLHarness::MockTLHarness()
: _sourceMessage(
stdx::bind(&ServiceEntryPointTestSuite::MockTLHarness::_defaultSource, this, _1, _2, _3)),
@@ -155,8 +136,6 @@ SSLPeerInfo ServiceEntryPointTestSuite::MockTLHarness::getX509PeerInfo(
return SSLPeerInfo("mock", stdx::unordered_set<RoleName>{});
}
-void ServiceEntryPointTestSuite::MockTLHarness::registerTags(const ConstSessionHandle& session) {}
-
TransportLayer::Stats ServiceEntryPointTestSuite::MockTLHarness::sessionStats() {
return Stats();
}
@@ -197,13 +176,13 @@ Status ServiceEntryPointTestSuite::MockTLHarness::_waitOnceThenError(transport::
Ticket ServiceEntryPointTestSuite::MockTLHarness::_defaultSource(const SessionHandle& s,
Message* m,
Date_t d) {
- return Ticket(this, stdx::make_unique<ServiceEntryPointTestSuite::MockTicket>(s, m, d));
+ return Ticket(this, stdx::make_unique<transport::MockTicket>(s, m, d));
}
Ticket ServiceEntryPointTestSuite::MockTLHarness::_defaultSink(const SessionHandle& s,
const Message&,
Date_t d) {
- return Ticket(this, stdx::make_unique<ServiceEntryPointTestSuite::MockTicket>(s, d));
+ return Ticket(this, stdx::make_unique<transport::MockTicket>(s, d));
}
Ticket ServiceEntryPointTestSuite::MockTLHarness::_sinkThenErrorOnWait(const SessionHandle& s,
@@ -224,12 +203,12 @@ void ServiceEntryPointTestSuite::MockTLHarness::_resetHooks() {
_destroy_hook = kDefaultDestroyHook;
}
-ServiceEntryPointTestSuite::MockTicket* ServiceEntryPointTestSuite::MockTLHarness::getMockTicket(
+transport::MockTicket* ServiceEntryPointTestSuite::MockTLHarness::getMockTicket(
const transport::Ticket& ticket) {
- return dynamic_cast<ServiceEntryPointTestSuite::MockTicket*>(getTicketImpl(ticket));
+ return dynamic_cast<transport::MockTicket*>(getTicketImpl(ticket));
}
-void ServiceEntryPointTestSuite::MockTLHarness::_destroy(Session& session) {
+void ServiceEntryPointTestSuite::MockTLHarness::_destroy(SEPTestSession& session) {
return _destroy_hook(session);
}
@@ -253,10 +232,10 @@ void ServiceEntryPointTestSuite::noLifeCycleTest() {
_tl->_wait = stdx::bind(&ServiceEntryPointTestSuite::MockTLHarness::_waitError, _tl.get(), _1);
// Step 3: SEP destroys the session, which calls end()
- _tl->_destroy_hook = [&testComplete](Session&) { testComplete.set_value(); };
+ _tl->_destroy_hook = [&testComplete](SEPTestSession&) { testComplete.set_value(); };
// Kick off the SEP
- auto s = Session::create(HostAndPort(), HostAndPort(), _tl.get());
+ auto s = SEPTestSession::create(_tl.get());
_sep->startSession(std::move(s));
testFuture.wait();
@@ -281,11 +260,11 @@ void ServiceEntryPointTestSuite::halfLifeCycleTest() {
return _tl->_defaultSink(session, m, expiration);
};
- // Step 5: SEP destroys the session, which calls end()
- _tl->_destroy_hook = [&testComplete](Session&) { testComplete.set_value(); };
+ // Step 5: SEP destroys the session, which calls _destroy()
+ _tl->_destroy_hook = [&testComplete](SEPTestSession&) { testComplete.set_value(); };
// Kick off the SEP
- auto s = Session::create(HostAndPort(), HostAndPort(), _tl.get());
+ auto s = SEPTestSession::create(_tl.get());
_sep->startSession(std::move(s));
testFuture.wait();
@@ -307,19 +286,19 @@ void ServiceEntryPointTestSuite::fullLifeCycleTest() {
// Step 4: SEP calls wait() on the ticket and receives Status::OK()
// Step 5: SEP gets a ticket to source a Message
// Step 6: SEP calls wait() on the ticket and receives and error
- // Step 7: SEP destroys the session, which calls end()
- _tl->_destroy_hook = [&testComplete](Session& session) { testComplete.set_value(); };
+ // Step 7: SEP destroys the session, which calls _destroy()
+ _tl->_destroy_hook = [&testComplete](SEPTestSession& session) { testComplete.set_value(); };
// Kick off the SEP
- auto s = Session::create(HostAndPort(), HostAndPort(), _tl.get());
+ auto s = SEPTestSession::create(_tl.get());
_sep->startSession(std::move(s));
testFuture.wait();
}
void ServiceEntryPointTestSuite::interruptingSessionTest() {
- auto sA = Session::create(HostAndPort(), HostAndPort(), _tl.get());
- auto sB = Session::create(HostAndPort(), HostAndPort(), _tl.get());
+ auto sA = SEPTestSession::create(_tl.get());
+ auto sB = SEPTestSession::create(_tl.get());
auto idA = sA->id();
auto idB = sB->id();
int waitCountB = 0;
@@ -368,7 +347,7 @@ void ServiceEntryPointTestSuite::interruptingSessionTest() {
// Step 7: SEP calls sourceMessage() for B, gets tB3
// Step 8: SEP calls wait() for tB3, gets an error
// Step 9: SEP calls end(B)
- _tl->_destroy_hook = [this, idA, idB, &resumeA, &testComplete](Session& session) {
+ _tl->_destroy_hook = [this, idA, idB, &resumeA, &testComplete](SEPTestSession& session) {
// When end(B) is called, time to resume session A
if (session.id() == idB) {
// Resume session A
@@ -452,14 +431,14 @@ void ServiceEntryPointTestSuite::burstStressTest(int numSessions,
};
// When we end the last session, end the test.
- _tl->_destroy_hook = [&allSessionsComplete, numSessions, &ended](Session& session) {
+ _tl->_destroy_hook = [&allSessionsComplete, numSessions, &ended](SEPTestSession& session) {
if (ended.fetchAndAdd(1) == (numSessions - 1)) {
allSessionsComplete.set_value();
}
};
for (int i = 0; i < numSessions; i++) {
- auto s = Session::create(HostAndPort(), HostAndPort(), _tl.get());
+ auto s = SEPTestSession::create(_tl.get());
{
// This operation may cause a re-hash.
stdx::lock_guard<stdx::mutex> lock(cyclesLock);
diff --git a/src/mongo/transport/service_entry_point_test_suite.h b/src/mongo/transport/service_entry_point_test_suite.h
index d1d3042cc5e..b8e6a2f4b19 100644
--- a/src/mongo/transport/service_entry_point_test_suite.h
+++ b/src/mongo/transport/service_entry_point_test_suite.h
@@ -28,9 +28,10 @@
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault
-#include "mongo/transport/session.h"
-#include "mongo/transport/ticket.h"
-#include "mongo/transport/ticket_impl.h"
+#include <memory>
+
+#include "mongo/transport/mock_session.h"
+#include "mongo/transport/mock_ticket.h"
#include "mongo/transport/transport_layer.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/net/message.h"
@@ -85,32 +86,8 @@ public:
Milliseconds delay = Milliseconds(0));
void longSessionStressTest();
-private:
- /**
- * A mock ticket class for our test suite.
- */
- class MockTicket : public transport::TicketImpl {
- public:
- // Source constructor
- MockTicket(const transport::SessionHandle& session, Message* message, Date_t expiration);
-
- // Sink constructor
- MockTicket(const transport::SessionHandle& session, Date_t expiration);
-
- MockTicket(MockTicket&&) = default;
- MockTicket& operator=(MockTicket&&) = default;
-
- transport::Session::Id sessionId() const override;
-
- Date_t expiration() const override;
-
- boost::optional<Message*> message();
-
- private:
- boost::optional<Message*> _message;
- transport::Session::Id _sessionId;
- Date_t _expiration;
- };
+ class SEPTestSession;
+ using SEPTestSessionHandle = std::shared_ptr<SEPTestSession>;
/**
* This class mocks the TransportLayer and allows us to insert hooks beneath
@@ -118,6 +95,8 @@ private:
*/
class MockTLHarness : public transport::TransportLayer {
public:
+ friend class SEPTestSession;
+
MockTLHarness();
transport::Ticket sourceMessage(
@@ -131,14 +110,14 @@ private:
Status wait(transport::Ticket&& ticket) override;
void asyncWait(transport::Ticket&& ticket, TicketCallback callback) override;
SSLPeerInfo getX509PeerInfo(const transport::ConstSessionHandle& session) const override;
- void registerTags(const transport::ConstSessionHandle& session) override;
+
Stats sessionStats() override;
void end(const transport::SessionHandle& session) override;
void endAllSessions(transport::Session::TagMask tags) override;
Status start() override;
void shutdown() override;
- ServiceEntryPointTestSuite::MockTicket* getMockTicket(const transport::Ticket& ticket);
+ transport::MockTicket* getMockTicket(const transport::Ticket& ticket);
// Mocked method hooks
stdx::function<transport::Ticket(const transport::SessionHandle&, Message*, Date_t)>
@@ -148,7 +127,7 @@ private:
stdx::function<Status(transport::Ticket)> _wait;
stdx::function<void(transport::Ticket, TicketCallback)> _asyncWait;
stdx::function<void(const transport::SessionHandle&)> _end;
- stdx::function<void(transport::Session& session)> _destroy_hook;
+ stdx::function<void(SEPTestSession& session)> _destroy_hook;
stdx::function<void(transport::Session::TagMask tags)> _endAllSessions =
[](transport::Session::TagMask tags) {};
stdx::function<Status(void)> _start = [] { return Status::OK(); };
@@ -169,9 +148,30 @@ private:
void _resetHooks();
private:
- void _destroy(transport::Session& session) override;
+ void _destroy(SEPTestSession& session);
+ };
+
+ /**
+ * A light wrapper around the mock session class, to handle our destroy logic.
+ */
+ class SEPTestSession : public transport::MockSession {
+ MockTLHarness* _mockTL;
+
+ public:
+ static std::shared_ptr<SEPTestSession> create(MockTLHarness* tl) {
+ std::shared_ptr<SEPTestSession> handle(new SEPTestSession(tl));
+ return handle;
+ }
+
+ ~SEPTestSession() {
+ _mockTL->_destroy(*this);
+ }
+
+ private:
+ explicit SEPTestSession(MockTLHarness* tl) : transport::MockSession(tl), _mockTL(tl) {}
};
+private:
std::unique_ptr<MockTLHarness> _tl;
std::unique_ptr<ServiceEntryPoint> _sep;
};
diff --git a/src/mongo/transport/session.cpp b/src/mongo/transport/session.cpp
index 4287d62cfd2..2415c9dbb8f 100644
--- a/src/mongo/transport/session.cpp
+++ b/src/mongo/transport/session.cpp
@@ -43,39 +43,30 @@ AtomicUInt64 sessionIdCounter(0);
} // namespace
-Session::Session(HostAndPort remote, HostAndPort local, TransportLayer* tl)
- : _id(sessionIdCounter.addAndFetch(1)),
- _remote(std::move(remote)),
- _local(std::move(local)),
- _tags(kEmptyTagMask),
- _tl(tl) {}
+Session::Session() : _id(sessionIdCounter.addAndFetch(1)), _tags(kEmptyTagMask) {}
-Session::~Session() {
- if (_tl != nullptr) {
- _tl->_destroy(*this);
- }
+Ticket Session::sourceMessage(Message* message, Date_t expiration) {
+ return getTransportLayer()->sourceMessage(shared_from_this(), message, expiration);
}
-SessionHandle Session::create(HostAndPort remote, HostAndPort local, TransportLayer* tl) {
- std::shared_ptr<Session> handle(new Session(std::move(remote), std::move(local), tl));
- return handle;
+Ticket Session::sinkMessage(const Message& message, Date_t expiration) {
+ return getTransportLayer()->sinkMessage(shared_from_this(), message, expiration);
}
-void Session::replaceTags(TagMask tags) {
- _tags = tags;
- _tl->registerTags(shared_from_this());
+SSLPeerInfo Session::getX509PeerInfo() const {
+ return getTransportLayer()->getX509PeerInfo(shared_from_this());
}
-Ticket Session::sourceMessage(Message* message, Date_t expiration) {
- return _tl->sourceMessage(shared_from_this(), message, expiration);
+void Session::replaceTags(TagMask tags) {
+ _tags = tags;
}
-Ticket Session::sinkMessage(const Message& message, Date_t expiration) {
- return _tl->sinkMessage(shared_from_this(), message, expiration);
+Session::TagMask Session::getTags() const {
+ return _tags;
}
-SSLPeerInfo Session::getX509PeerInfo() const {
- return _tl->getX509PeerInfo(shared_from_this());
+MessageCompressorManager& Session::getCompressorManager() {
+ return _messageCompressorManager;
}
} // namespace transport
diff --git a/src/mongo/transport/session.h b/src/mongo/transport/session.h
index 29e54333ceb..3d81543a7cc 100644
--- a/src/mongo/transport/session.h
+++ b/src/mongo/transport/session.h
@@ -76,12 +76,7 @@ public:
/**
* Destroys a session, calling end() for this session in its TransportLayer.
*/
- ~Session();
-
- /**
- * A factory for sessions.
- */
- static SessionHandle create(HostAndPort remote, HostAndPort local, TransportLayer* tl);
+ virtual ~Session() = default;
/**
* Return the id for this session.
@@ -91,78 +86,67 @@ public:
}
/**
- * Return the remote host for this session.
+ * The TransportLayer for this Session.
*/
- const HostAndPort& remote() const {
- return _remote;
- }
+ virtual TransportLayer* getTransportLayer() const = 0;
/**
- * Return the local host information for this session.
+ * Source (receive) a new Message for this Session.
+ *
+ * This method will forward to sourceMessage on this Session's transport layer.
*/
- const HostAndPort& local() const {
- return _local;
- }
+ virtual Ticket sourceMessage(Message* message, Date_t expiration = Ticket::kNoExpirationDate);
/**
- * Return the X509 peer information for this connection (SSL only).
+ * Sink (send) a new Message for this Session. This method should be used
+ * to send replies to a given host.
+ *
+ * This method will forward to sinkMessage on this Session's transport layer.
*/
- SSLPeerInfo getX509PeerInfo() const;
+ virtual Ticket sinkMessage(const Message& message,
+ Date_t expiration = Ticket::kNoExpirationDate);
/**
- * Set this session's tags. This Session will register
- * its new tags with its TransportLayer.
+ * Return the X509 peer information for this connection (SSL only).
*/
- void replaceTags(TagMask tags);
+ virtual SSLPeerInfo getX509PeerInfo() const;
/**
- * Get this session's tags.
+ * Return the remote host for this session.
*/
- TagMask getTags() const {
- return _tags;
- }
+ virtual const HostAndPort& remote() const = 0;
/**
- * Source (receive) a new Message for this Session.
- *
- * This method will forward to sourceMessage on this Session's transport layer.
+ * Return the local host information for this session.
*/
- Ticket sourceMessage(Message* message, Date_t expiration = Ticket::kNoExpirationDate);
+ virtual const HostAndPort& local() const = 0;
/**
- * Sink (send) a new Message for this Session. This method should be used
- * to send replies to a given host.
- *
- * This method will forward to sinkMessage on this Session's transport layer.
+ * Set this session's tags. This Session will register
+ * its new tags with its TransportLayer.
*/
- Ticket sinkMessage(const Message& message, Date_t expiration = Ticket::kNoExpirationDate);
+ virtual void replaceTags(TagMask tags);
/**
- * The TransportLayer for this Session.
+ * Get this session's tags.
*/
- TransportLayer* getTransportLayer() const {
- return _tl;
- }
+ virtual TagMask getTags() const;
- MessageCompressorManager& getCompressorManager() {
- return _messageCompressorManager;
- }
+ /**
+ * Get the compressor manager for this session.
+ */
+ virtual MessageCompressorManager& getCompressorManager();
-private:
+protected:
/**
* Construct a new session.
*/
- Session(HostAndPort remote, HostAndPort local, TransportLayer* tl);
-
- Id _id;
+ Session();
- HostAndPort _remote;
- HostAndPort _local;
+private:
+ const Id _id;
TagMask _tags;
-
- TransportLayer* _tl;
-
MessageCompressorManager _messageCompressorManager;
};
diff --git a/src/mongo/transport/ticket_impl.h b/src/mongo/transport/ticket_impl.h
index 22adf9414e7..c6d325dbc37 100644
--- a/src/mongo/transport/ticket_impl.h
+++ b/src/mongo/transport/ticket_impl.h
@@ -47,9 +47,6 @@ class TicketImpl {
public:
virtual ~TicketImpl() = default;
- TicketImpl(TicketImpl&&) = default;
- TicketImpl& operator=(TicketImpl&&) = default;
-
/**
* Return this ticket's session id.
*/
diff --git a/src/mongo/transport/transport_layer.h b/src/mongo/transport/transport_layer.h
index b09ff8a8eb9..f2c82c678a2 100644
--- a/src/mongo/transport/transport_layer.h
+++ b/src/mongo/transport/transport_layer.h
@@ -148,15 +148,6 @@ public:
virtual void asyncWait(Ticket&& ticket, TicketCallback callback) = 0;
/**
- * Tag this Session within the TransportLayer with the tags currently assigned to the
- * Session. If endAllSessions() is called with a matching
- * Session::TagMask, this Session will not be ended.
- *
- * Before calling this method, use Session::replaceTags() to set the desired TagMask.
- */
- virtual void registerTags(const ConstSessionHandle& session) = 0;
-
- /**
* Return the stored X509 peer information for this session. If the session does not
* exist in this TransportLayer, returns a default constructed object.
*/
@@ -221,13 +212,6 @@ protected:
TransportLayer* getTicketTransportLayer(const Ticket& ticket) {
return ticket._tl;
}
-
-private:
- /**
- * Destroys any information linked to this Session in the TransportLayer.
- * This should only be called from within Session's destructor.
- */
- virtual void _destroy(Session& session) = 0;
};
} // namespace transport
diff --git a/src/mongo/transport/transport_layer_legacy.cpp b/src/mongo/transport/transport_layer_legacy.cpp
index b5e3c96c6e4..789b41529fa 100644
--- a/src/mongo/transport/transport_layer_legacy.cpp
+++ b/src/mongo/transport/transport_layer_legacy.cpp
@@ -30,6 +30,8 @@
#include "mongo/platform/basic.h"
+#include <memory>
+
#include "mongo/transport/transport_layer_legacy.h"
#include "mongo/base/checked_cast.h"
@@ -64,10 +66,35 @@ TransportLayerLegacy::TransportLayerLegacy(const TransportLayerLegacy::Options&
_running(false),
_options(opts) {}
-TransportLayerLegacy::LegacyTicket::LegacyTicket(const SessionHandle& session,
+std::shared_ptr<TransportLayerLegacy::LegacySession> TransportLayerLegacy::LegacySession::create(
+ std::unique_ptr<AbstractMessagingPort> amp, TransportLayerLegacy* tl) {
+ std::shared_ptr<LegacySession> handle(new LegacySession(std::move(amp), tl));
+ return handle;
+}
+
+TransportLayerLegacy::LegacySession::LegacySession(std::unique_ptr<AbstractMessagingPort> amp,
+ TransportLayerLegacy* tl)
+ : _remote(amp->remote()),
+ _local(amp->localAddr().toString(true)),
+ _tl(tl),
+ _tags(kEmptyTagMask),
+ _connection(stdx::make_unique<Connection>(std::move(amp))) {}
+
+TransportLayerLegacy::LegacySession::~LegacySession() {
+ _tl->_destroy(*this);
+}
+
+TransportLayerLegacy::LegacyTicket::LegacyTicket(const LegacySessionHandle& session,
Date_t expiration,
WorkHandle work)
- : _sessionId(session->id()), _expiration(expiration), _fill(std::move(work)) {}
+ : _session(session),
+ _sessionId(session->id()),
+ _expiration(expiration),
+ _fill(std::move(work)) {}
+
+TransportLayerLegacy::LegacySessionHandle TransportLayerLegacy::LegacyTicket::getSession() {
+ return _session.lock();
+}
Session::Id TransportLayerLegacy::LegacyTicket::sessionId() const {
return _sessionId;
@@ -77,6 +104,10 @@ Date_t TransportLayerLegacy::LegacyTicket::expiration() const {
return _expiration;
}
+Status TransportLayerLegacy::LegacyTicket::fill(AbstractMessagingPort* amp) {
+ return _fill(amp);
+}
+
Status TransportLayerLegacy::setup() {
if (!_listener->setupSockets()) {
error() << "Failed to set up sockets during startup.";
@@ -118,27 +149,22 @@ Ticket TransportLayerLegacy::sourceMessage(const SessionHandle& session,
return Status::OK();
};
- return Ticket(this, stdx::make_unique<LegacyTicket>(session, expiration, std::move(sourceCb)));
+ auto legacySession = checked_pointer_cast<LegacySession>(session);
+ return Ticket(
+ this,
+ stdx::make_unique<LegacyTicket>(std::move(legacySession), expiration, std::move(sourceCb)));
}
SSLPeerInfo TransportLayerLegacy::getX509PeerInfo(const ConstSessionHandle& session) const {
- {
- stdx::lock_guard<stdx::mutex> lk(_connectionsMutex);
- auto conn = _connections.find(session->id());
- if (conn == _connections.end()) {
- // Return empty string if the session is not found
- return SSLPeerInfo();
- }
-
- return conn->second.sslPeerInfo.value_or(SSLPeerInfo());
- }
+ auto legacySession = checked_pointer_cast<const LegacySession>(session);
+ return legacySession->conn()->sslPeerInfo.value_or(SSLPeerInfo());
}
TransportLayer::Stats TransportLayerLegacy::sessionStats() {
Stats stats;
{
- stdx::lock_guard<stdx::mutex> lk(_connectionsMutex);
- stats.numOpenSessions = _connections.size();
+ stdx::lock_guard<stdx::mutex> lk(_sessionsMutex);
+ stats.numOpenSessions = _sessions.size();
}
stats.numAvailableSessions = Listener::globalTicketHolder.available();
@@ -167,7 +193,10 @@ Ticket TransportLayerLegacy::sinkMessage(const SessionHandle& session,
}
};
- return Ticket(this, stdx::make_unique<LegacyTicket>(session, expiration, std::move(sinkCb)));
+ auto legacySession = checked_pointer_cast<LegacySession>(session);
+ return Ticket(
+ this,
+ stdx::make_unique<LegacyTicket>(std::move(legacySession), expiration, std::move(sinkCb)));
}
Status TransportLayerLegacy::wait(Ticket&& ticket) {
@@ -182,45 +211,32 @@ void TransportLayerLegacy::asyncWait(Ticket&& ticket, TicketCallback callback) {
}
void TransportLayerLegacy::end(const SessionHandle& session) {
- stdx::lock_guard<stdx::mutex> lk(_connectionsMutex);
- auto conn = _connections.find(session->id());
- if (conn != _connections.end()) {
- _endSession_inlock(conn);
- }
+ auto legacySession = checked_pointer_cast<const LegacySession>(session);
+ _closeConnection(legacySession->conn());
}
-void TransportLayerLegacy::registerTags(const ConstSessionHandle& session) {
- stdx::lock_guard<stdx::mutex> lk(_connectionsMutex);
- auto conn = _connections.find(session->id());
- if (conn != _connections.end()) {
- conn->second.tags = session->getTags();
- }
-}
-
-void TransportLayerLegacy::_endSession_inlock(
- decltype(TransportLayerLegacy::_connections.begin()) conn) {
- conn->second.ended = true;
- conn->second.amp->shutdown();
+void TransportLayerLegacy::_closeConnection(Connection* conn) {
+ conn->closed = true;
+ conn->amp->shutdown();
Listener::globalTicketHolder.release();
}
void TransportLayerLegacy::endAllSessions(Session::TagMask tags) {
- log() << "legacy transport layer ending all sessions";
+ log() << "legacy transport layer closing all connections";
{
- stdx::lock_guard<stdx::mutex> lk(_connectionsMutex);
- auto&& conn = _connections.begin();
- while (conn != _connections.end()) {
- // If we erase this connection below, we invalidate our iterator, use a placeholder.
- auto placeholder = conn;
- placeholder++;
-
- if (conn->second.tags & tags) {
- log() << "Skip closing connection for connection # " << conn->second.connectionId;
- } else {
- _endSession_inlock(conn);
+ stdx::lock_guard<stdx::mutex> lk(_sessionsMutex);
+ for (auto&& it : _sessions) {
+
+ // Attempt to make our weak_ptr into a shared_ptr
+ auto session = it.lock();
+ if (session) {
+ if (session->getTags() & tags) {
+ log() << "Skip closing connection for connection # "
+ << session->conn()->connectionId;
+ } else {
+ _closeConnection(session->conn());
+ }
}
-
- conn = placeholder;
}
}
}
@@ -232,17 +248,13 @@ void TransportLayerLegacy::shutdown() {
endAllSessions(Session::kEmptyTagMask);
}
-void TransportLayerLegacy::_destroy(Session& session) {
- stdx::lock_guard<stdx::mutex> lk(_connectionsMutex);
- auto conn = _connections.find(session.id());
-
- invariant(conn != _connections.end());
- if (!conn->second.ended) {
- _endSession_inlock(conn);
+void TransportLayerLegacy::_destroy(LegacySession& session) {
+ if (!session.conn()->closed) {
+ _closeConnection(session.conn());
}
- invariant(!conn->second.inUse);
- _connections.erase(conn);
+ stdx::lock_guard<stdx::mutex> lk(_sessionsMutex);
+ _sessions.erase(session.getIter());
}
Status TransportLayerLegacy::_runTicket(Ticket ticket) {
@@ -254,53 +266,35 @@ Status TransportLayerLegacy::_runTicket(Ticket ticket) {
return Ticket::ExpiredStatus;
}
- AbstractMessagingPort* amp;
-
- {
- stdx::lock_guard<stdx::mutex> lk(_connectionsMutex);
-
- // Error if we cannot find the session.
- auto conn = _connections.find(ticket.sessionId());
- if (conn == _connections.end()) {
- return TransportLayer::TicketSessionUnknownStatus;
- }
-
- // Error if we find the session but its connection is closed.
- if (conn->second.ended) {
- return TransportLayer::TicketSessionClosedStatus;
- }
+ // get the weak_ptr out of the ticket
+ // attempt to make it into a shared_ptr
+ auto legacyTicket = checked_cast<LegacyTicket*>(getTicketImpl(ticket));
+ auto session = legacyTicket->getSession();
+ if (!session) {
+ return TransportLayer::TicketSessionClosedStatus;
+ }
- // "check out" the port
- conn->second.inUse = true;
- amp = conn->second.amp.get();
+ auto conn = session->conn();
+ if (conn->closed) {
+ return TransportLayer::TicketSessionClosedStatus;
}
- auto legacyTicket = checked_cast<LegacyTicket*>(getTicketImpl(ticket));
Status res = Status::OK();
-
try {
- res = legacyTicket->_fill(amp);
+ res = legacyTicket->fill(conn->amp.get());
} catch (...) {
res = exceptionToStatus();
}
- {
- stdx::lock_guard<stdx::mutex> lk(_connectionsMutex);
-
- auto conn = _connections.find(ticket.sessionId());
- invariant(conn != _connections.end());
-
#ifdef MONGO_CONFIG_SSL
- // If we didn't have an X509 subject name, see if we have one now
- if (!conn->second.sslPeerInfo) {
- auto info = amp->getX509PeerInfo();
- if (info.subjectName != "") {
- conn->second.sslPeerInfo = info;
- }
+ // If we didn't have an X509 subject name, see if we have one now
+ if (!conn->sslPeerInfo) {
+ auto info = conn->amp->getX509PeerInfo();
+ if (info.subjectName != "") {
+ conn->sslPeerInfo = info;
}
-#endif
- conn->second.inUse = false;
}
+#endif
return res;
}
@@ -313,16 +307,17 @@ void TransportLayerLegacy::_handleNewConnection(std::unique_ptr<AbstractMessagin
return;
}
- auto session =
- Session::create(amp->remote(), HostAndPort(amp->localAddr().toString(true)), this);
-
amp->setLogLevel(logger::LogSeverity::Debug(1));
+ auto session = LegacySession::create(std::move(amp), this);
+
+ stdx::list<std::weak_ptr<LegacySession>> list;
+ auto it = list.emplace(list.begin(), session);
{
- stdx::lock_guard<stdx::mutex> lk(_connectionsMutex);
- _connections.emplace(std::piecewise_construct,
- std::forward_as_tuple(session->id()),
- std::forward_as_tuple(std::move(amp), false, session->getTags()));
+ // Add the new session to our list
+ stdx::lock_guard<stdx::mutex> lk(_sessionsMutex);
+ session->setIter(it);
+ _sessions.splice(_sessions.begin(), list, it);
}
invariant(_sep);
diff --git a/src/mongo/transport/transport_layer_legacy.h b/src/mongo/transport/transport_layer_legacy.h
index 3d348076ad7..dab651bb86e 100644
--- a/src/mongo/transport/transport_layer_legacy.h
+++ b/src/mongo/transport/transport_layer_legacy.h
@@ -28,10 +28,10 @@
#pragma once
+#include "mongo/stdx/list.h"
#include "mongo/stdx/memory.h"
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/thread.h"
-#include "mongo/stdx/unordered_map.h"
#include "mongo/transport/ticket_impl.h"
#include "mongo/transport/transport_layer.h"
#include "mongo/util/net/listen.h"
@@ -77,7 +77,6 @@ public:
Status wait(Ticket&& ticket) override;
void asyncWait(Ticket&& ticket, TicketCallback callback) override;
- void registerTags(const ConstSessionHandle& session) override;
SSLPeerInfo getX509PeerInfo(const ConstSessionHandle& session) const override;
Stats sessionStats() override;
@@ -88,7 +87,12 @@ public:
void shutdown() override;
private:
- void _destroy(Session& session) override;
+ class LegacySession;
+ using LegacySessionHandle = std::shared_ptr<LegacySession>;
+ using ConstLegacySessionHandle = std::shared_ptr<const LegacySession>;
+ using SessionEntry = std::list<std::weak_ptr<LegacySession>>::iterator;
+
+ void _destroy(LegacySession& session);
void _handleNewConnection(std::unique_ptr<AbstractMessagingPort> amp);
@@ -98,6 +102,78 @@ private:
using WorkHandle = stdx::function<Status(AbstractMessagingPort*)>;
/**
+ * Connection object, to associate Sessions with AbstractMessagingPorts.
+ */
+ struct Connection {
+ MONGO_DISALLOW_COPYING(Connection);
+
+ Connection(std::unique_ptr<AbstractMessagingPort> port)
+ : amp(std::move(port)), connectionId(amp->connectionId()) {}
+
+ std::unique_ptr<AbstractMessagingPort> amp;
+
+ const long long connectionId;
+
+ boost::optional<SSLPeerInfo> sslPeerInfo;
+ bool closed = false;
+ };
+
+ /**
+ * An implementation of the Session interface for this TransportLayer.
+ */
+ class LegacySession : public Session {
+ MONGO_DISALLOW_COPYING(LegacySession);
+
+ public:
+ ~LegacySession();
+
+ static std::shared_ptr<LegacySession> create(std::unique_ptr<AbstractMessagingPort> amp,
+ TransportLayerLegacy* tl);
+
+ TransportLayer* getTransportLayer() const override {
+ return _tl;
+ }
+
+ const HostAndPort& remote() const override {
+ return _remote;
+ }
+
+ const HostAndPort& local() const override {
+ return _local;
+ }
+
+ Connection* conn() const {
+ return _connection.get();
+ }
+
+ void setIter(SessionEntry it) {
+ _entry = std::move(it);
+ }
+
+ SessionEntry getIter() const {
+ return _entry;
+ }
+
+ private:
+ explicit LegacySession(std::unique_ptr<AbstractMessagingPort> amp,
+ TransportLayerLegacy* tl);
+
+ HostAndPort _remote;
+ HostAndPort _local;
+
+ TransportLayerLegacy* _tl;
+
+ TagMask _tags;
+
+ MessageCompressorManager _messageCompressorManager;
+
+ std::unique_ptr<Connection> _connection;
+
+ // A handle to this session's entry in the TL's session list
+ SessionEntry _entry;
+ };
+
+ /**
* A TicketImpl implementation for this TransportLayer. WorkHandle is a callable that
* can be invoked to fill this ticket.
*/
@@ -105,11 +181,25 @@ private:
MONGO_DISALLOW_COPYING(LegacyTicket);
public:
- LegacyTicket(const SessionHandle& session, Date_t expiration, WorkHandle work);
+ LegacyTicket(const LegacySessionHandle& session, Date_t expiration, WorkHandle work);
SessionId sessionId() const override;
Date_t expiration() const override;
+ /**
+ * If this ticket's session is still alive, return a shared_ptr. Otherwise,
+ * return nullptr.
+ */
+ LegacySessionHandle getSession();
+
+ /**
+ * Run this ticket's work item.
+ */
+ Status fill(AbstractMessagingPort* amp);
+
+ private:
+ std::weak_ptr<LegacySession> _session;
+
SessionId _sessionId;
Date_t _expiration;
@@ -136,32 +226,16 @@ private:
NewConnectionCb _accepted;
};
- /**
- * Connection object, to associate Session ids with AbstractMessagingPorts.
- */
- struct Connection {
- Connection(std::unique_ptr<AbstractMessagingPort> port, bool ended, Session::TagMask tags)
- : amp(std::move(port)), connectionId(amp->connectionId()), tags(tags) {}
-
- std::unique_ptr<AbstractMessagingPort> amp;
-
- const long long connectionId;
-
- boost::optional<SSLPeerInfo> sslPeerInfo;
- Session::TagMask tags;
- bool inUse = false;
- bool ended = false;
- };
+ void _closeConnection(Connection* conn);
ServiceEntryPoint* _sep;
std::unique_ptr<Listener> _listener;
stdx::thread _listenerThread;
- mutable stdx::mutex _connectionsMutex;
- stdx::unordered_map<Session::Id, Connection> _connections;
-
- void _endSession_inlock(decltype(_connections.begin()) conn);
+ // TransportLayerLegacy holds non-owning pointers to all of its sessions.
+ mutable stdx::mutex _sessionsMutex;
+ stdx::list<std::weak_ptr<LegacySession>> _sessions;
AtomicWord<bool> _running;
diff --git a/src/mongo/transport/transport_layer_manager.cpp b/src/mongo/transport/transport_layer_manager.cpp
index d29c01f54f5..6ca73193804 100644
--- a/src/mongo/transport/transport_layer_manager.cpp
+++ b/src/mongo/transport/transport_layer_manager.cpp
@@ -97,10 +97,6 @@ TransportLayer::Stats TransportLayerManager::sessionStats() {
return stats;
}
-void TransportLayerManager::registerTags(const ConstSessionHandle& session) {
- session->getTransportLayer()->registerTags(session);
-}
-
void TransportLayerManager::end(const SessionHandle& session) {
session->getTransportLayer()->end(session);
}
@@ -126,9 +122,5 @@ Status TransportLayerManager::addAndStartTransportLayer(std::unique_ptr<Transpor
return ptr->start();
}
-void TransportLayerManager::_destroy(Session& session) {
- MONGO_UNREACHABLE;
-}
-
} // namespace transport
} // namespace mongo
diff --git a/src/mongo/transport/transport_layer_manager.h b/src/mongo/transport/transport_layer_manager.h
index deba17146cc..d477fa9b75c 100644
--- a/src/mongo/transport/transport_layer_manager.h
+++ b/src/mongo/transport/transport_layer_manager.h
@@ -65,7 +65,6 @@ public:
void asyncWait(Ticket&& ticket, TicketCallback callback) override;
SSLPeerInfo getX509PeerInfo(const ConstSessionHandle& session) const override;
- void registerTags(const ConstSessionHandle& session) override;
Stats sessionStats() override;
@@ -78,8 +77,6 @@ public:
Status addAndStartTransportLayer(std::unique_ptr<TransportLayer> tl);
private:
- void _destroy(Session& session) override;
-
template <typename Callable>
void _foreach(Callable&& cb);
diff --git a/src/mongo/transport/transport_layer_mock.cpp b/src/mongo/transport/transport_layer_mock.cpp
index 550c8585f95..67cf2519b83 100644
--- a/src/mongo/transport/transport_layer_mock.cpp
+++ b/src/mongo/transport/transport_layer_mock.cpp
@@ -32,9 +32,8 @@
#include "mongo/base/status.h"
#include "mongo/stdx/memory.h"
-#include "mongo/transport/session.h"
-#include "mongo/transport/ticket.h"
-#include "mongo/transport/ticket_impl.h"
+#include "mongo/transport/mock_session.h"
+#include "mongo/transport/mock_ticket.h"
#include "mongo/transport/transport_layer.h"
#include "mongo/util/net/message.h"
#include "mongo/util/time_support.h"
@@ -42,26 +41,6 @@
namespace mongo {
namespace transport {
-TransportLayerMock::TicketMock::TicketMock(const SessionHandle& session,
- Message* message,
- Date_t expiration)
- : _session(session), _message(message), _expiration(expiration) {}
-
-TransportLayerMock::TicketMock::TicketMock(const SessionHandle& session, Date_t expiration)
- : _session(session), _expiration(expiration) {}
-
-Session::Id TransportLayerMock::TicketMock::sessionId() const {
- return _session->id();
-}
-
-Date_t TransportLayerMock::TicketMock::expiration() const {
- return _expiration;
-}
-
-boost::optional<Message*> TransportLayerMock::TicketMock::msg() const {
- return _message;
-}
-
TransportLayerMock::TransportLayerMock() : _shutdown(false) {}
Ticket TransportLayerMock::sourceMessage(const SessionHandle& session,
@@ -75,8 +54,7 @@ Ticket TransportLayerMock::sourceMessage(const SessionHandle& session,
return Ticket(TransportLayer::TicketSessionClosedStatus);
}
- return Ticket(this,
- stdx::make_unique<TransportLayerMock::TicketMock>(session, message, expiration));
+ return Ticket(this, stdx::make_unique<transport::MockTicket>(session, message, expiration));
}
Ticket TransportLayerMock::sinkMessage(const SessionHandle& session,
@@ -90,7 +68,7 @@ Ticket TransportLayerMock::sinkMessage(const SessionHandle& session,
return Ticket(TransportLayer::TicketSessionClosedStatus);
}
- return Ticket(this, stdx::make_unique<TransportLayerMock::TicketMock>(session, expiration));
+ return Ticket(this, stdx::make_unique<transport::MockTicket>(session, expiration));
}
Status TransportLayerMock::wait(Ticket&& ticket) {
@@ -124,10 +102,8 @@ TransportLayer::Stats TransportLayerMock::sessionStats() {
return Stats();
}
-void TransportLayerMock::registerTags(const ConstSessionHandle& session) {}
-
SessionHandle TransportLayerMock::createSession() {
- auto session = Session::create(HostAndPort(), HostAndPort(), this);
+ auto session = MockSession::create(this);
Session::Id sessionId = session->id();
_sessions[sessionId] = Connection{false, session, SSLPeerInfo()};
@@ -179,7 +155,5 @@ TransportLayerMock::~TransportLayerMock() {
shutdown();
}
-void TransportLayerMock::_destroy(Session& session) {}
-
} // namespace transport
} // namespace mongo
diff --git a/src/mongo/transport/transport_layer_mock.h b/src/mongo/transport/transport_layer_mock.h
index 420360a44de..34d6f3c451c 100644
--- a/src/mongo/transport/transport_layer_mock.h
+++ b/src/mongo/transport/transport_layer_mock.h
@@ -48,31 +48,6 @@ class TransportLayerMock : public TransportLayer {
MONGO_DISALLOW_COPYING(TransportLayerMock);
public:
- class TicketMock : public TicketImpl {
- public:
- // Source constructor
- TicketMock(const SessionHandle& session,
- Message* message,
- Date_t expiration = Ticket::kNoExpirationDate);
-
- // Sink constructor
- TicketMock(const SessionHandle& session, Date_t expiration = Ticket::kNoExpirationDate);
-
- TicketMock(TicketMock&&) = default;
- TicketMock& operator=(TicketMock&&) = default;
-
- SessionId sessionId() const override;
-
- Date_t expiration() const override;
-
- boost::optional<Message*> msg() const;
-
- private:
- const SessionHandle& _session;
- boost::optional<Message*> _message;
- Date_t _expiration;
- };
-
TransportLayerMock();
~TransportLayerMock();
@@ -88,7 +63,6 @@ public:
SSLPeerInfo getX509PeerInfo(const ConstSessionHandle& session) const override;
void setX509PeerInfo(const SessionHandle& session, SSLPeerInfo peerInfo);
- void registerTags(const ConstSessionHandle& session) override;
Stats sessionStats() override;
@@ -103,8 +77,6 @@ public:
bool inShutdown() const;
private:
- void _destroy(Session& session) override;
-
struct Connection {
bool ended;
SessionHandle session;
diff --git a/src/mongo/transport/transport_layer_mock_test.cpp b/src/mongo/transport/transport_layer_mock_test.cpp
index 9876db68d8e..1cf90b395ff 100644
--- a/src/mongo/transport/transport_layer_mock_test.cpp
+++ b/src/mongo/transport/transport_layer_mock_test.cpp
@@ -29,6 +29,7 @@
#include "mongo/platform/basic.h"
#include "mongo/stdx/memory.h"
+#include "mongo/transport/mock_ticket.h"
#include "mongo/transport/session.h"
#include "mongo/transport/transport_layer.h"
#include "mongo/transport/transport_layer_mock.h"
@@ -170,7 +171,7 @@ TEST_F(TransportLayerMockTest, SourceMessageTLShutdown) {
// wait() returns an OK status
TEST_F(TransportLayerMockTest, Wait) {
SessionHandle session = tl()->createSession();
- Ticket ticket = Ticket(tl(), stdx::make_unique<TransportLayerMock::TicketMock>(session));
+ Ticket ticket = Ticket(tl(), stdx::make_unique<transport::MockTicket>(session));
Status status = tl()->wait(std::move(ticket));
ASSERT_OK(status);
@@ -180,7 +181,7 @@ TEST_F(TransportLayerMockTest, Wait) {
TEST_F(TransportLayerMockTest, WaitExpiredTicket) {
SessionHandle session = tl()->createSession();
Ticket expiredTicket =
- Ticket(tl(), stdx::make_unique<TransportLayerMock::TicketMock>(session, Date_t::now()));
+ Ticket(tl(), stdx::make_unique<transport::MockTicket>(session, Date_t::now()));
Status status = tl()->wait(std::move(expiredTicket));
ASSERT_EQUALS(status.code(), ErrorCodes::ExceededTimeLimit);
@@ -198,7 +199,7 @@ TEST_F(TransportLayerMockTest, WaitInvalidTicket) {
// wait() returns a SessionClosed error status if the Ticket's Session is closed
TEST_F(TransportLayerMockTest, WaitSessionClosed) {
SessionHandle session = tl()->createSession();
- Ticket ticket = Ticket(tl(), stdx::make_unique<TransportLayerMock::TicketMock>(session));
+ Ticket ticket = Ticket(tl(), stdx::make_unique<transport::MockTicket>(session));
tl()->end(session);
@@ -211,7 +212,7 @@ TEST_F(TransportLayerMockTest, WaitSessionClosed) {
TEST_F(TransportLayerMockTest, WaitSessionUnknown) {
std::unique_ptr<TransportLayerMock> anotherTL = stdx::make_unique<TransportLayerMock>();
SessionHandle session = anotherTL->createSession();
- Ticket ticket = Ticket(tl(), stdx::make_unique<TransportLayerMock::TicketMock>(session));
+ Ticket ticket = Ticket(tl(), stdx::make_unique<transport::MockTicket>(session));
Status status = tl()->wait(std::move(ticket));
ASSERT_EQUALS(status.code(), ErrorCodes::TransportSessionUnknown);
@@ -220,7 +221,7 @@ TEST_F(TransportLayerMockTest, WaitSessionUnknown) {
// wait() returns a ShutdownInProgress status if the TransportLayer is in shutdown
TEST_F(TransportLayerMockTest, WaitTLShutdown) {
SessionHandle session = tl()->createSession();
- Ticket ticket = Ticket(tl(), stdx::make_unique<TransportLayerMock::TicketMock>(session));
+ Ticket ticket = Ticket(tl(), stdx::make_unique<transport::MockTicket>(session));
tl()->shutdown();
@@ -242,7 +243,7 @@ void assertEnded(TransportLayer* tl,
std::vector<SessionHandle> sessions,
ErrorCodes::Error code = ErrorCodes::TransportSessionClosed) {
for (auto session : sessions) {
- Ticket ticket = Ticket(tl, stdx::make_unique<TransportLayerMock::TicketMock>(session));
+ Ticket ticket = Ticket(tl, stdx::make_unique<transport::MockTicket>(session));
Status status = tl->wait(std::move(ticket));
ASSERT_EQUALS(status.code(), code);
}