summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMathias Stearn <mathias@10gen.com>2018-02-09 14:18:11 -0500
committerMathias Stearn <mathias@10gen.com>2018-02-13 19:04:39 -0500
commited15b99846db007f06d74d1cb5f8d37f954aa244 (patch)
treec6613360f29cc52a955ade71534dfdef077a1588
parent66d2a03579bb1a259aec36038f9250e681ede08c (diff)
downloadmongo-ed15b99846db007f06d74d1cb5f8d37f954aa244.tar.gz
SERVER-33255 clean up TransportLayer API
-rw-r--r--src/mongo/client/scoped_db_conn_test.cpp10
-rw-r--r--src/mongo/db/session.cpp3
-rw-r--r--src/mongo/transport/SConscript45
-rw-r--r--src/mongo/transport/ingress_header_test.cpp43
-rw-r--r--src/mongo/transport/mock_session.h53
-rw-r--r--src/mongo/transport/mock_ticket.h83
-rw-r--r--src/mongo/transport/service_entry_point_mock.cpp121
-rw-r--r--src/mongo/transport/service_entry_point_mock.h91
-rw-r--r--src/mongo/transport/service_entry_point_mock_test.cpp79
-rw-r--r--src/mongo/transport/service_entry_point_test_suite.cpp456
-rw-r--r--src/mongo/transport/service_entry_point_test_suite.h175
-rw-r--r--src/mongo/transport/service_state_machine.cpp36
-rw-r--r--src/mongo/transport/service_state_machine_test.cpp84
-rw-r--r--src/mongo/transport/session.cpp8
-rw-r--r--src/mongo/transport/session.h51
-rw-r--r--src/mongo/transport/session_asio.h188
-rw-r--r--src/mongo/transport/ticket.cpp93
-rw-r--r--src/mongo/transport/ticket.h154
-rw-r--r--src/mongo/transport/ticket_asio.cpp168
-rw-r--r--src/mongo/transport/ticket_asio.h103
-rw-r--r--src/mongo/transport/ticket_impl.h65
-rw-r--r--src/mongo/transport/transport_layer.h92
-rw-r--r--src/mongo/transport/transport_layer_asio.cpp44
-rw-r--r--src/mongo/transport/transport_layer_asio.h18
-rw-r--r--src/mongo/transport/transport_layer_manager.cpp24
-rw-r--r--src/mongo/transport/transport_layer_manager.h14
-rw-r--r--src/mongo/transport/transport_layer_mock.cpp55
-rw-r--r--src/mongo/transport/transport_layer_mock.h18
-rw-r--r--src/mongo/transport/transport_layer_mock_test.cpp261
29 files changed, 300 insertions, 2335 deletions
diff --git a/src/mongo/client/scoped_db_conn_test.cpp b/src/mongo/client/scoped_db_conn_test.cpp
index e9dca46cf23..50c00521947 100644
--- a/src/mongo/client/scoped_db_conn_test.cpp
+++ b/src/mongo/client/scoped_db_conn_test.cpp
@@ -118,10 +118,11 @@ public:
private:
void run(transport::SessionHandle session) {
- Message inMessage;
- if (!session->sourceMessage(&inMessage).wait().isOK()) {
+ auto swInMessage = session->sourceMessage();
+ if (!swInMessage.isOK()) {
return;
}
+ Message inMessage = swInMessage.getValue();
auto request = rpc::opMsgRequestFromAnyProtocol(inMessage);
commandRequestHook(request);
@@ -146,9 +147,8 @@ private:
log() << "Delaying response for " << _replyDelay;
sleepFor(_replyDelay);
}
- if (!session->sinkMessage(response).wait().isOK()) {
- return;
- }
+
+ session->sinkMessage(response).ignore();
}
/**
diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp
index cf2519588d8..7c31274ee11 100644
--- a/src/mongo/db/session.cpp
+++ b/src/mongo/db/session.cpp
@@ -551,8 +551,7 @@ void Session::_registerUpdateCacheOnCommit(OperationContext* opCtx,
const auto closeConnectionElem = data["closeConnection"];
if (closeConnectionElem.eoo() || closeConnectionElem.Bool()) {
- auto transportSession = opCtx->getClient()->session();
- transportSession->getTransportLayer()->end(transportSession);
+ opCtx->getClient()->session()->end();
}
const auto failBeforeCommitExceptionElem = data["failBeforeCommitExceptionCode"];
diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript
index ab1e4de73ef..75c342876f7 100644
--- a/src/mongo/transport/SConscript
+++ b/src/mongo/transport/SConscript
@@ -4,22 +4,11 @@ Import('env')
env = env.Clone()
-env.CppUnitTest(
- target='ingress_header_test',
- source=[
- 'ingress_header_test.cpp',
- ],
- LIBDEPS=[
- '$BUILD_DIR/mongo/base',
- ],
-)
-
env.Library(
target='transport_layer_common',
source=[
'service_entry_point_utils.cpp',
'session.cpp',
- 'ticket.cpp',
'transport_layer.cpp',
],
LIBDEPS=[
@@ -58,7 +47,6 @@ tlEnv.Library(
tlEnv.Library(
target='transport_layer',
source=[
- 'ticket_asio.cpp',
'transport_layer_asio.cpp',
],
LIBDEPS=[
@@ -121,17 +109,6 @@ tlEnv.CppUnitTest(
# )
env.Library(
- target='service_entry_point_test_suite',
- source=[
- 'service_entry_point_test_suite.cpp',
- ],
- LIBDEPS=[
- 'transport_layer_common',
- '$BUILD_DIR/mongo/unittest/unittest',
- ],
-)
-
-env.Library(
target='service_entry_point',
source=[
'service_entry_point_impl.cpp',
@@ -148,17 +125,6 @@ env.Library(
)
env.CppUnitTest(
- target='service_entry_point_mock_test',
- source=[
- 'service_entry_point_mock.cpp',
- 'service_entry_point_mock_test.cpp',
- ],
- LIBDEPS=[
- 'service_entry_point_test_suite',
- ],
-)
-
-env.CppUnitTest(
target='service_state_machine_test',
source=[
'service_state_machine_test.cpp',
@@ -177,17 +143,6 @@ env.CppUnitTest(
],
)
-env.CppUnitTest(
- target='transport_layer_mock_test',
- source=[
- 'transport_layer_mock_test.cpp',
- ],
- LIBDEPS=[
- 'transport_layer_mock',
- ],
-)
-
-
zlibEnv = env.Clone()
zlibEnv.InjectThirdPartyIncludePaths(libraries=['zlib', 'snappy'])
zlibEnv.Library(
diff --git a/src/mongo/transport/ingress_header_test.cpp b/src/mongo/transport/ingress_header_test.cpp
deleted file mode 100644
index fae28ea3a77..00000000000
--- a/src/mongo/transport/ingress_header_test.cpp
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Copyright (C) 2016 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects
- * for all of the code used other than as permitted herein. If you modify
- * file(s) with this exception, you may extend this exception to your
- * version of the file(s), but you are not obligated to do so. If you do not
- * wish to do so, delete this exception statement from your version. If you
- * delete this exception statement from all source files in the program,
- * then also delete it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/transport/service_entry_point.h"
-#include "mongo/transport/session.h"
-#include "mongo/transport/ticket.h"
-#include "mongo/transport/transport_layer.h"
-#include "mongo/unittest/unittest.h"
-
-namespace {
-
-TEST(IngressHeaderTest, TestHeaders) {
- // No-op.
-}
-
-} // namespace
diff --git a/src/mongo/transport/mock_session.h b/src/mongo/transport/mock_session.h
index 147e13e4f7e..2f091854193 100644
--- a/src/mongo/transport/mock_session.h
+++ b/src/mongo/transport/mock_session.h
@@ -28,14 +28,14 @@
#pragma once
+#include "mongo/base/checked_cast.h"
#include "mongo/transport/session.h"
+#include "mongo/transport/transport_layer_mock.h"
#include "mongo/util/net/hostandport.h"
namespace mongo {
namespace transport {
-class TransportLayer;
-
class MockSession : public Session {
MONGO_DISALLOW_COPYING(MockSession);
@@ -65,12 +65,53 @@ public:
return _local;
}
-protected:
- explicit MockSession(TransportLayer* tl) : _tl(tl), _remote(), _local() {}
+ void end() override {
+ if (!_tl->owns(id()))
+ return;
+ _tl->_sessions[id()].ended = true;
+ }
+
+ StatusWith<Message> sourceMessage() override {
+ if (_tl->inShutdown()) {
+ return TransportLayer::ShutdownStatus;
+ } else if (!_tl->owns(id())) {
+ return TransportLayer::SessionUnknownStatus;
+ } else if (_tl->_sessions[id()].ended) {
+ return TransportLayer::TicketSessionClosedStatus;
+ }
+
+ return Message(); // Subclasses can do something different.
+ }
+
+ void asyncSourceMessage(std::function<void(StatusWith<Message>)> cb) override {
+ cb(sourceMessage());
+ }
+
+ Status sinkMessage(Message message) override {
+ if (_tl->inShutdown()) {
+ return TransportLayer::ShutdownStatus;
+ } else if (!_tl->owns(id())) {
+ return TransportLayer::SessionUnknownStatus;
+ } else if (_tl->_sessions[id()].ended) {
+ return TransportLayer::TicketSessionClosedStatus;
+ }
+
+ return Status::OK();
+ }
+
+ void asyncSinkMessage(Message message, std::function<void(Status)> cb) override {
+ cb(sinkMessage(message));
+ }
+
+ explicit MockSession(TransportLayer* tl)
+ : _tl(checked_cast<TransportLayerMock*>(tl)), _remote(), _local() {}
explicit MockSession(HostAndPort remote, HostAndPort local, TransportLayer* tl)
- : _tl(tl), _remote(std::move(remote)), _local(std::move(local)) {}
+ : _tl(checked_cast<TransportLayerMock*>(tl)),
+ _remote(std::move(remote)),
+ _local(std::move(local)) {}
- TransportLayer* _tl;
+protected:
+ TransportLayerMock* _tl;
HostAndPort _remote;
HostAndPort _local;
diff --git a/src/mongo/transport/mock_ticket.h b/src/mongo/transport/mock_ticket.h
deleted file mode 100644
index bbf4eaa9df4..00000000000
--- a/src/mongo/transport/mock_ticket.h
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Copyright (C) 2016 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include "mongo/base/disallow_copying.h"
-#include "mongo/transport/session.h"
-#include "mongo/transport/ticket_impl.h"
-#include "mongo/util/time_support.h"
-
-namespace mongo {
-
-class Message;
-
-namespace transport {
-
-/**
- * A mock ticket class for our test suite.
- */
-class MockTicket : public TicketImpl {
- MONGO_DISALLOW_COPYING(MockTicket);
-
-public:
- // Source constructor
- MockTicket(const SessionHandle& session,
- Message* message,
- Date_t expiration = Ticket::kNoExpirationDate)
- : _session(session), _id(session->id()), _message(message), _expiration(expiration) {}
-
- // Sink constructor
- MockTicket(const SessionHandle& session, Date_t expiration = Ticket::kNoExpirationDate)
- : _session(session), _id(session->id()), _expiration(expiration) {}
-
- SessionId sessionId() const override {
- return _id;
- }
-
- Date_t expiration() const override {
- return _expiration;
- }
-
- boost::optional<Message*> message() const {
- return _message;
- }
-
- SessionHandle session() const {
- return _session.lock();
- }
-
-private:
- std::weak_ptr<Session> _session;
- Session::Id _id;
- boost::optional<Message*> _message;
- Date_t _expiration;
-};
-
-} // namespace transport
-} // namespace mongo
diff --git a/src/mongo/transport/service_entry_point_mock.cpp b/src/mongo/transport/service_entry_point_mock.cpp
deleted file mode 100644
index eb70533ce27..00000000000
--- a/src/mongo/transport/service_entry_point_mock.cpp
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Copyright (C) 2016 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/transport/service_entry_point_mock.h"
-
-#include <vector>
-
-#include "mongo/bson/bsonmisc.h"
-#include "mongo/bson/bsonobjbuilder.h"
-#include "mongo/stdx/functional.h"
-#include "mongo/stdx/thread.h"
-#include "mongo/transport/transport_layer.h"
-
-namespace mongo {
-
-using namespace transport;
-
-ServiceEntryPointMock::ServiceEntryPointMock(transport::TransportLayer* tl)
- : _tl(tl), _inShutdown(false) {}
-
-ServiceEntryPointMock::~ServiceEntryPointMock() {
- endAllSessions(transport::Session::kEmptyTagMask);
-}
-
-void ServiceEntryPointMock::startSession(transport::SessionHandle session) {
- _threads.emplace_back(&ServiceEntryPointMock::run, this, std::move(session));
-}
-
-void ServiceEntryPointMock::run(transport::SessionHandle session) {
- Message inMessage;
- while (true) {
- {
- stdx::lock_guard<stdx::mutex> lk(_shutdownLock);
- if (_inShutdown)
- break;
- }
-
- // sourceMessage()
- if (!session->sourceMessage(&inMessage).wait().isOK()) {
- break;
- }
-
- auto resp = handleRequest(nullptr, inMessage);
-
- // sinkMessage()
- if (!session->sinkMessage(resp.response).wait().isOK()) {
- break;
- }
- }
-}
-
-DbResponse ServiceEntryPointMock::handleRequest(OperationContext* opCtx, const Message& request) {
- // Need to set up our { ok : 1 } response.
- BufBuilder b{};
-
- // Leave room for the message header
- b.skip(mongo::MsgData::MsgDataHeaderSize);
-
- // Add our response
- auto okObj = BSON("ok" << 1.0);
- okObj.appendSelfToBufBuilder(b);
-
- // Add some metadata
- auto metadata = BSONObj();
- metadata.appendSelfToBufBuilder(b);
-
- // Set Message header fields
- MsgData::View msg = b.buf();
- msg.setLen(b.len());
- msg.setOperation(dbCommandReply);
-
- return {Message(b.release()), ""};
-}
-
-void ServiceEntryPointMock::endAllSessions(transport::Session::TagMask) {
- {
- stdx::lock_guard<stdx::mutex> lk(_shutdownLock);
- _inShutdown = true;
- }
-
- for (auto& t : _threads) {
- t.join();
- }
-}
-
-ServiceEntryPoint::Stats ServiceEntryPointMock::sessionStats() const {
- return {};
-}
-
-size_t ServiceEntryPointMock::numOpenSessions() const {
- return 0ULL;
-}
-
-} // namespace mongo
diff --git a/src/mongo/transport/service_entry_point_mock.h b/src/mongo/transport/service_entry_point_mock.h
deleted file mode 100644
index 7c13757562f..00000000000
--- a/src/mongo/transport/service_entry_point_mock.h
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Copyright (C) 2016 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include <vector>
-
-#include "mongo/base/disallow_copying.h"
-#include "mongo/stdx/mutex.h"
-#include "mongo/stdx/thread.h"
-#include "mongo/transport/service_entry_point.h"
-#include "mongo/transport/session.h"
-#include "mongo/util/net/message.h"
-
-namespace mongo {
-
-namespace transport {
-
-class TransportLayer;
-
-} // namespace transport
-
-class ServiceEntryPointMock : public ServiceEntryPoint {
- MONGO_DISALLOW_COPYING(ServiceEntryPointMock);
-
-public:
- ServiceEntryPointMock(transport::TransportLayer* tl);
-
- virtual ~ServiceEntryPointMock();
-
- /**
- * This method will spawn a thread that will do the following:
- *
- * - call tl->sourceMessage()
- * - call tl->wait()
- * - call tl->sinkMessage() with { ok : 1 }
- * - call tl->wait()
- *
- * ...repeat until wait() returns an error.
- */
- void startSession(transport::SessionHandle session) override;
-
- void endAllSessions(transport::Session::TagMask tags) override;
-
- bool shutdown(Milliseconds timeout) override {
- return true;
- }
-
- Stats sessionStats() const override;
-
- size_t numOpenSessions() const override;
-
- DbResponse handleRequest(OperationContext* opCtx, const Message& request) override;
-
-private:
- void run(transport::SessionHandle session);
-
- transport::TransportLayer* _tl;
-
- stdx::mutex _shutdownLock;
- bool _inShutdown;
-
- std::vector<stdx::thread> _threads;
-};
-
-} // namespace mongo
diff --git a/src/mongo/transport/service_entry_point_mock_test.cpp b/src/mongo/transport/service_entry_point_mock_test.cpp
deleted file mode 100644
index 66806111980..00000000000
--- a/src/mongo/transport/service_entry_point_mock_test.cpp
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Copyright (C) 2016 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects
- * for all of the code used other than as permitted herein. If you modify
- * file(s) with this exception, you may extend this exception to your
- * version of the file(s), but you are not obligated to do so. If you do not
- * wish to do so, delete this exception statement from your version. If you
- * delete this exception statement from all source files in the program,
- * then also delete it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/stdx/memory.h"
-#include "mongo/transport/service_entry_point.h"
-#include "mongo/transport/service_entry_point_mock.h"
-#include "mongo/transport/service_entry_point_test_suite.h"
-#include "mongo/transport/session.h"
-#include "mongo/transport/ticket.h"
-#include "mongo/unittest/unittest.h"
-
-namespace mongo {
-
-namespace {
-
-std::unique_ptr<ServiceEntryPoint> mockSEPFactory(transport::TransportLayer* tl) {
- return stdx::make_unique<ServiceEntryPointMock>(tl);
-}
-
-} // namespace
-
-TEST_F(ServiceEntryPointTestSuite, NoLifeCycleTest) {
- setServiceEntryPoint(&mockSEPFactory);
- noLifeCycleTest();
-}
-
-TEST_F(ServiceEntryPointTestSuite, HalfLifeCycleTest) {
- setServiceEntryPoint(&mockSEPFactory);
- halfLifeCycleTest();
-}
-
-TEST_F(ServiceEntryPointTestSuite, FullLifeCycleTest) {
- setServiceEntryPoint(&mockSEPFactory);
- fullLifeCycleTest();
-}
-
-TEST_F(ServiceEntryPointTestSuite, InterruptingSessionTest) {
- setServiceEntryPoint(&mockSEPFactory);
- interruptingSessionTest();
-}
-
-TEST_F(ServiceEntryPointTestSuite, BurstStressTest) {
- setServiceEntryPoint(&mockSEPFactory);
- burstStressTest();
-}
-
-TEST_F(ServiceEntryPointTestSuite, LongSessionStressTest) {
- setServiceEntryPoint(&mockSEPFactory);
- longSessionStressTest();
-}
-
-} // namespace mongo
diff --git a/src/mongo/transport/service_entry_point_test_suite.cpp b/src/mongo/transport/service_entry_point_test_suite.cpp
deleted file mode 100644
index 027d7baa2b1..00000000000
--- a/src/mongo/transport/service_entry_point_test_suite.cpp
+++ /dev/null
@@ -1,456 +0,0 @@
-/**
- * Copyright (C) 2016 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/transport/service_entry_point_test_suite.h"
-
-#include <boost/optional.hpp>
-
-#include "mongo/bson/bsonmisc.h"
-#include "mongo/bson/bsonobjbuilder.h"
-#include "mongo/platform/atomic_word.h"
-#include "mongo/stdx/condition_variable.h"
-#include "mongo/stdx/functional.h"
-#include "mongo/stdx/future.h"
-#include "mongo/stdx/memory.h"
-#include "mongo/stdx/mutex.h"
-#include "mongo/stdx/unordered_map.h"
-#include "mongo/stdx/unordered_set.h"
-#include "mongo/transport/service_entry_point.h"
-#include "mongo/transport/session.h"
-#include "mongo/transport/ticket.h"
-#include "mongo/transport/ticket_impl.h"
-#include "mongo/transport/transport_layer.h"
-#include "mongo/unittest/unittest.h"
-#include "mongo/util/net/message.h"
-#include "mongo/util/net/ssl_types.h"
-
-namespace mongo {
-
-using namespace transport;
-
-using TicketCallback = TransportLayer::TicketCallback;
-using SEPTestSession = ServiceEntryPointTestSuite::SEPTestSession;
-
-namespace {
-
-// Helper function to populate a message with { ping : 1 } command
-void setPingCommand(Message* m) {
- BufBuilder b{};
-
- // Leave room for the message header.
- b.skip(mongo::MsgData::MsgDataHeaderSize);
-
- b.appendStr("admin");
- b.appendStr("ping");
-
- auto commandObj = BSON("ping" << 1);
- commandObj.appendSelfToBufBuilder(b);
-
- auto metadata = BSONObj();
- metadata.appendSelfToBufBuilder(b);
-
- // Set Message header fields.
- MsgData::View msg = b.buf();
- msg.setLen(b.len());
- msg.setOperation(dbCommand);
-
- m->reset();
-
- // Transfer buffer ownership to the Message.
- m->setData(b.release());
-}
-
-// Some default method implementations
-const auto kDefaultEnd = [](const SessionHandle& session) { return; };
-const auto kDefaultDestroyHook = [](SEPTestSession& session) { return; };
-const auto kDefaultAsyncWait = [](Ticket, TicketCallback cb) { cb(Status::OK()); };
-const auto kNoopFunction = [] { return; };
-
-// "End connection" error status
-const auto kEndConnectionStatus = Status(ErrorCodes::HostUnreachable, "connection closed");
-
-} // namespace
-
-ServiceEntryPointTestSuite::MockTLHarness::MockTLHarness()
- : _sourceMessage([this](const transport::SessionHandle& h, Message * m, Date_t d) {
- return _defaultSource(h, m, d);
- }),
- _sinkMessage([this](const transport::SessionHandle& h, const Message& m, Date_t d) {
- return _defaultSink(h, m, d);
- }),
- _wait([this](transport::Ticket t) { return _defaultWait(std::move(t)); }),
- _asyncWait(kDefaultAsyncWait),
- _end(kDefaultEnd) {}
-
-Ticket ServiceEntryPointTestSuite::MockTLHarness::sourceMessage(const SessionHandle& session,
- Message* message,
- Date_t expiration) {
- return _sourceMessage(session, message, expiration);
-}
-
-Ticket ServiceEntryPointTestSuite::MockTLHarness::sinkMessage(const SessionHandle& session,
- const Message& message,
- Date_t expiration) {
- return _sinkMessage(session, message, expiration);
-}
-
-Status ServiceEntryPointTestSuite::MockTLHarness::wait(Ticket&& ticket) {
- return _wait(std::move(ticket));
-}
-
-void ServiceEntryPointTestSuite::MockTLHarness::asyncWait(Ticket&& ticket,
- TicketCallback callback) {
- return _asyncWait(std::move(ticket), std::move(callback));
-}
-
-void ServiceEntryPointTestSuite::MockTLHarness::end(const SessionHandle& session) {
- return _end(session);
-}
-
-Status ServiceEntryPointTestSuite::MockTLHarness::setup() {
- return Status::OK();
-}
-
-Status ServiceEntryPointTestSuite::MockTLHarness::start() {
- return _start();
-}
-
-void ServiceEntryPointTestSuite::MockTLHarness::shutdown() {
- return _shutdown();
-}
-
-Status ServiceEntryPointTestSuite::MockTLHarness::_defaultWait(transport::Ticket ticket) {
- auto mockTicket = getMockTicket(ticket);
- if (mockTicket->message()) {
- setPingCommand(*(mockTicket->message()));
- }
- return Status::OK();
-}
-
-Status ServiceEntryPointTestSuite::MockTLHarness::_waitError(transport::Ticket ticket) {
- return kEndConnectionStatus;
-}
-
-Status ServiceEntryPointTestSuite::MockTLHarness::_waitOnceThenError(transport::Ticket ticket) {
- _wait = [this](transport::Ticket t) { return _waitError(std::move(t)); };
- return _defaultWait(std::move(ticket));
-}
-
-Ticket ServiceEntryPointTestSuite::MockTLHarness::_defaultSource(const SessionHandle& s,
- Message* m,
- Date_t d) {
- return Ticket(this, stdx::make_unique<transport::MockTicket>(s, m, d));
-}
-
-Ticket ServiceEntryPointTestSuite::MockTLHarness::_defaultSink(const SessionHandle& s,
- const Message&,
- Date_t d) {
- return Ticket(this, stdx::make_unique<transport::MockTicket>(s, d));
-}
-
-Ticket ServiceEntryPointTestSuite::MockTLHarness::_sinkThenErrorOnWait(const SessionHandle& s,
- const Message& m,
- Date_t d) {
- _wait = [=](transport::Ticket t) { return _waitOnceThenError(std::move(t)); };
- return _defaultSink(s, m, d);
-}
-
-void ServiceEntryPointTestSuite::MockTLHarness::_resetHooks() {
- _sourceMessage = [this](const transport::SessionHandle& h, Message* m, Date_t d) {
- return _defaultSource(h, m, d);
- };
- _sinkMessage = [this](const transport::SessionHandle& h, const Message& m, Date_t d) {
- return _defaultSink(h, m, d);
- };
- _wait = [this](transport::Ticket t) { return _defaultWait(std::move(t)); };
- _asyncWait = kDefaultAsyncWait;
- _end = kDefaultEnd;
- _destroy_hook = kDefaultDestroyHook;
-}
-
-transport::MockTicket* ServiceEntryPointTestSuite::MockTLHarness::getMockTicket(
- const transport::Ticket& ticket) {
- return dynamic_cast<transport::MockTicket*>(getTicketImpl(ticket));
-}
-
-void ServiceEntryPointTestSuite::MockTLHarness::_destroy(SEPTestSession& session) {
- return _destroy_hook(session);
-}
-
-void ServiceEntryPointTestSuite::setUp() {
- _tl = stdx::make_unique<MockTLHarness>();
-}
-
-void ServiceEntryPointTestSuite::setServiceEntryPoint(ServiceEntryPointFactory factory) {
- _sep = factory(_tl.get());
-}
-
-// Start a Session and error on get-Message
-void ServiceEntryPointTestSuite::noLifeCycleTest() {
- stdx::promise<void> testComplete;
- auto testFuture = testComplete.get_future();
-
- _tl->_resetHooks();
-
- // Step 1: SEP gets a ticket to source a Message
- // Step 2: SEP calls wait() on the ticket and receives an error
- _tl->_wait = [tlp = _tl.get()](transport::Ticket t) {
- return tlp->_waitError(std::move(t));
- };
-
- // Step 3: SEP destroys the session, which calls end()
- _tl->_destroy_hook = [&testComplete](SEPTestSession&) { testComplete.set_value(); };
-
- // Kick off the SEP
- auto s = SEPTestSession::create(_tl.get());
- _sep->startSession(std::move(s));
-
- testFuture.wait();
-}
-
-// Partial cycle: get-Message, handle-Message, error on send-Message
-void ServiceEntryPointTestSuite::halfLifeCycleTest() {
- stdx::promise<void> testComplete;
- auto testFuture = testComplete.get_future();
-
- _tl->_resetHooks();
-
- // Step 1: SEP gets a ticket to source a Message
- // Step 2: SEP calls wait() on the ticket and receives a Message
- // Step 3: SEP gets a ticket to sink a Message
- _tl->_sinkMessage = [this](const SessionHandle& session, const Message& m, Date_t expiration) {
-
- // Step 4: SEP calls wait() on the ticket and receives an error
- _tl->_wait = [tlp = _tl.get()](transport::Ticket t) {
- return tlp->_waitError(std::move(t));
- };
-
- return _tl->_defaultSink(session, m, expiration);
- };
-
- // Step 5: SEP destroys the session, which calls _destroy()
- _tl->_destroy_hook = [&testComplete](SEPTestSession&) { testComplete.set_value(); };
-
- // Kick off the SEP
- auto s = SEPTestSession::create(_tl.get());
- _sep->startSession(std::move(s));
-
- testFuture.wait();
-}
-
-// Perform a full get-Message, handle-Message, send-Message cycle
-void ServiceEntryPointTestSuite::fullLifeCycleTest() {
- stdx::promise<void> testComplete;
- auto testFuture = testComplete.get_future();
-
- _tl->_resetHooks();
-
- // Step 1: SEP gets a ticket to source a Message
- // Step 2: SEP calls wait() on the ticket and receives a Message
- _tl->_sinkMessage = [tlp = _tl.get()](auto&& a1, auto&& a2, auto&& a3) {
- return tlp->_sinkThenErrorOnWait(a1, a2, a3);
- };
-
- // Step 3: SEP gets a ticket to sink a Message
- // Step 4: SEP calls wait() on the ticket and receives Status::OK()
- // Step 5: SEP gets a ticket to source a Message
- // Step 6: SEP calls wait() on the ticket and receives and error
- // Step 7: SEP destroys the session, which calls _destroy()
- _tl->_destroy_hook = [&testComplete](SEPTestSession& session) { testComplete.set_value(); };
-
- // Kick off the SEP
- auto s = SEPTestSession::create(_tl.get());
- _sep->startSession(std::move(s));
-
- testFuture.wait();
-}
-
-void ServiceEntryPointTestSuite::interruptingSessionTest() {
- auto sA = SEPTestSession::create(_tl.get());
- auto sB = SEPTestSession::create(_tl.get());
- auto idA = sA->id();
- auto idB = sB->id();
- int waitCountB = 0;
-
- stdx::promise<void> startB;
- auto startBFuture = startB.get_future();
-
- stdx::promise<void> resumeA;
- auto resumeAFuture = resumeA.get_future();
-
- stdx::promise<void> testComplete;
- auto testFuture = testComplete.get_future();
-
- _tl->_resetHooks();
-
- // Start Session A
- // Step 1: SEP calls sourceMessage() for A
- // Step 2: SEP calls wait() for A and we block...
- // Start Session B
- _tl->_wait = [this, idA, &startB, &resumeAFuture, &waitCountB](Ticket t) -> Status {
- // If we're handling B, just do a default wait
- if (t.sessionId() != idA) {
- if (waitCountB < 2) {
- ++waitCountB;
- return _tl->_defaultWait(std::move(t));
- } else {
- // If we've done a full round trip, time to end session B
- return kEndConnectionStatus;
- }
- }
-
- // Otherwise, we need to start B and block A
- startB.set_value();
- resumeAFuture.wait();
-
- _tl->_wait = [tlp = _tl.get()](transport::Ticket t) {
- return tlp->_waitOnceThenError(std::move(t));
- };
-
- return Status::OK();
- };
-
- // Step 3: SEP calls sourceMessage() for B, gets tB
- // Step 4: SEP calls wait() for tB, gets { ping : 1 }
- // Step 5: SEP calls sinkMessage() for B, gets tB2
- // Step 6: SEP calls wait() for tB2, gets Status::OK()
- // Step 7: SEP calls sourceMessage() for B, gets tB3
- // Step 8: SEP calls wait() for tB3, gets an error
- // Step 9: SEP calls end(B)
- _tl->_destroy_hook = [idA, idB, &resumeA, &testComplete](SEPTestSession& session) {
- // When end(B) is called, time to resume session A
- if (session.id() == idB) {
- // Resume session A
- resumeA.set_value();
- } else {
- // Else our test is over when end(A) is called
- invariant(session.id() == idA);
- testComplete.set_value();
- }
- };
-
- // Resume Session A
- // Step 10: SEP calls sinkMessage() for A, gets tA
- // Step 11: SEP calls wait() for tA, gets Status::OK()
- // Step 12: SEP calls sourceMessage() for A, get tA2
- // Step 13: SEP calls wait() for tA2, receives an error
- // Step 14: SEP calls end(A)
-
- // Kick off the test
- _sep->startSession(std::move(sA));
-
- startBFuture.wait();
- _sep->startSession(std::move(sB));
-
- testFuture.wait();
-}
-
-void ServiceEntryPointTestSuite::burstStressTest(int numSessions,
- int numCycles,
- Milliseconds delay) {
- AtomicWord<int> ended{0};
- stdx::promise<void> allSessionsComplete;
-
- auto allCompleteFuture = allSessionsComplete.get_future();
-
- stdx::mutex cyclesLock;
- stdx::unordered_map<Session::Id, int> completedCycles;
-
- _tl->_resetHooks();
-
- // Same wait() callback for all sessions.
- _tl->_wait = [this, &completedCycles, &cyclesLock, numCycles, &delay](Ticket ticket) -> Status {
- auto id = ticket.sessionId();
- int cycleCount;
-
- {
- stdx::lock_guard<stdx::mutex> lock(cyclesLock);
- auto item = completedCycles.find(id);
- invariant(item != completedCycles.end());
- cycleCount = item->second;
- }
-
- auto mockTicket = _tl->getMockTicket(ticket);
- // If we are sourcing:
- if (mockTicket->message()) {
- // If we've completed enough cycles, done.
- if (cycleCount == numCycles) {
- return kEndConnectionStatus;
- }
-
- // Otherwise, source another { ping : 1 }
- invariant(mockTicket->message());
- setPingCommand(*(mockTicket->message()));
-
- // Wait a bit before returning
- sleepmillis(delay.count());
-
- return Status::OK();
- }
-
- // We are sinking, increment numCycles and return OK.
- {
- stdx::lock_guard<stdx::mutex> lock(cyclesLock);
- auto item = completedCycles.find(id);
- invariant(item != completedCycles.end());
- ++(item->second);
- }
-
- return Status::OK();
- };
-
- // When we end the last session, end the test.
- _tl->_destroy_hook = [&allSessionsComplete, numSessions, &ended](SEPTestSession& session) {
- if (ended.fetchAndAdd(1) == (numSessions - 1)) {
- allSessionsComplete.set_value();
- }
- };
-
- for (int i = 0; i < numSessions; i++) {
- auto s = SEPTestSession::create(_tl.get());
- {
- // This operation may cause a re-hash.
- stdx::lock_guard<stdx::mutex> lock(cyclesLock);
- completedCycles.emplace(s->id(), 0);
- }
- _sep->startSession(std::move(s));
- }
-
- // Block and wait for all sessions to finish.
- allCompleteFuture.wait();
-}
-
-void ServiceEntryPointTestSuite::longSessionStressTest() {
- return burstStressTest(1000, 100, Milliseconds(100));
-}
-
-} // namespace mongo
diff --git a/src/mongo/transport/service_entry_point_test_suite.h b/src/mongo/transport/service_entry_point_test_suite.h
deleted file mode 100644
index 96461584b83..00000000000
--- a/src/mongo/transport/service_entry_point_test_suite.h
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Copyright (C) 2016 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault
-
-#include <memory>
-
-#include "mongo/transport/mock_session.h"
-#include "mongo/transport/mock_ticket.h"
-#include "mongo/transport/transport_layer.h"
-#include "mongo/unittest/unittest.h"
-#include "mongo/util/net/message.h"
-#include "mongo/util/time_support.h"
-
-namespace mongo {
-
-class ServiceEntryPoint;
-struct SSLPeerInfo;
-
-/**
- * Test class. Uses a mock TransportLayer to test that the ServiceEntryPoint
- * calls the expected methods on the TransportLayer in the expected order,
- * and with the expected parameters.
- *
- * Usage:
- *
- * TEST_F(ServiceEntryPointTestSuite, ServiceEntryPointImplTest) {
- * // Set up our ServiceEntryPoint
- * auto sepFactory = [](TransportLayer* tl){
- * return stdx::make_unique<ServiceEntryPointImpl>(tl);
- * };
- *
- * setServiceEntryPoint(sepFactory);
- *
- * // Run some tests
- * fullLifeCycleTest();
- * }
- */
-class ServiceEntryPointTestSuite : public mongo::unittest::Test {
-public:
- // Need a function that takes a TransportLayer* and returns a new
- // ServiceEntryPoint
- using ServiceEntryPointFactory =
- stdx::function<std::unique_ptr<ServiceEntryPoint>(transport::TransportLayer*)>;
-
- void setUp() override;
-
- void setServiceEntryPoint(ServiceEntryPointFactory factory);
-
- // Lifecycle Tests
- void noLifeCycleTest();
- void halfLifeCycleTest();
- void fullLifeCycleTest();
-
- // Concurrent Session Tests
- void interruptingSessionTest();
-
- // Stress Tests
- void burstStressTest(int numSessions = 1000,
- int numCycles = 1,
- Milliseconds delay = Milliseconds(0));
- void longSessionStressTest();
-
- class SEPTestSession;
- using SEPTestSessionHandle = std::shared_ptr<SEPTestSession>;
-
- /**
- * This class mocks the TransportLayer and allows us to insert hooks beneath
- * its methods.
- */
- class MockTLHarness : public transport::TransportLayer {
- public:
- friend class SEPTestSession;
-
- MockTLHarness();
-
- transport::Ticket sourceMessage(
- const transport::SessionHandle& session,
- Message* message,
- Date_t expiration = transport::Ticket::kNoExpirationDate) override;
- transport::Ticket sinkMessage(
- const transport::SessionHandle& session,
- const Message& message,
- Date_t expiration = transport::Ticket::kNoExpirationDate) override;
- Status wait(transport::Ticket&& ticket) override;
- void asyncWait(transport::Ticket&& ticket, TicketCallback callback) override;
-
- void end(const transport::SessionHandle& session) override;
- Status setup() override;
- Status start() override;
- void shutdown() override;
-
- transport::MockTicket* getMockTicket(const transport::Ticket& ticket);
-
- // Mocked method hooks
- stdx::function<transport::Ticket(const transport::SessionHandle&, Message*, Date_t)>
- _sourceMessage;
- stdx::function<transport::Ticket(const transport::SessionHandle&, const Message&, Date_t)>
- _sinkMessage;
- stdx::function<Status(transport::Ticket)> _wait;
- stdx::function<void(transport::Ticket, TicketCallback)> _asyncWait;
- stdx::function<void(const transport::SessionHandle&)> _end;
- stdx::function<void(SEPTestSession& session)> _destroy_hook;
- stdx::function<Status(void)> _start = [] { return Status::OK(); };
- stdx::function<void(void)> _shutdown = [] {};
-
- // Pre-set hook methods
- transport::Ticket _defaultSource(const transport::SessionHandle& s, Message* m, Date_t d);
- transport::Ticket _defaultSink(const transport::SessionHandle& s, const Message&, Date_t d);
- transport::Ticket _sinkThenErrorOnWait(const transport::SessionHandle& s,
- const Message& m,
- Date_t d);
-
- Status _defaultWait(transport::Ticket ticket);
- Status _waitError(transport::Ticket ticket);
- Status _waitOnceThenError(transport::Ticket ticket);
-
- // Reset all hooks to their defaults
- void _resetHooks();
-
- private:
- void _destroy(SEPTestSession& session);
- };
-
- /**
- * A light wrapper around the mock session class, to handle our destroy logic.
- */
- class SEPTestSession : public transport::MockSession {
- MockTLHarness* _mockTL;
-
- public:
- static std::shared_ptr<SEPTestSession> create(MockTLHarness* tl) {
- std::shared_ptr<SEPTestSession> handle(new SEPTestSession(tl));
- return handle;
- }
-
- ~SEPTestSession() {
- _mockTL->_destroy(*this);
- }
-
- private:
- explicit SEPTestSession(MockTLHarness* tl) : transport::MockSession(tl), _mockTL(tl) {}
- };
-
-private:
- std::unique_ptr<MockTLHarness> _tl;
- std::unique_ptr<ServiceEntryPoint> _sep;
-};
-
-} // namespace mongo
diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp
index 18c5156657e..abeca256d51 100644
--- a/src/mongo/transport/service_state_machine.cpp
+++ b/src/mongo/transport/service_state_machine.cpp
@@ -41,7 +41,6 @@
#include "mongo/transport/service_entry_point.h"
#include "mongo/transport/service_executor_task_names.h"
#include "mongo/transport/session.h"
-#include "mongo/transport/ticket.h"
#include "mongo/transport/transport_layer.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/concurrency/idle_thread_block.h"
@@ -239,34 +238,43 @@ const transport::SessionHandle& ServiceStateMachine::_session() const {
void ServiceStateMachine::_sourceMessage(ThreadGuard guard) {
invariant(_inMessage.empty());
- auto ticket = _session()->sourceMessage(&_inMessage);
-
+ invariant(_state.load() == State::Source);
_state.store(State::SourceWait);
guard.release();
if (_transportMode == transport::Mode::kSynchronous) {
- _sourceCallback([this](auto ticket) {
+ auto msg = [&] {
MONGO_IDLE_THREAD_BLOCK;
- return _session()->getTransportLayer()->wait(std::move(ticket));
- }(std::move(ticket)));
+ return _session()->sourceMessage();
+ }();
+
+ if (msg.isOK()) {
+ _inMessage = std::move(msg.getValue());
+ invariant(!_inMessage.empty());
+ }
+ _sourceCallback(msg.getStatus());
} else if (_transportMode == transport::Mode::kAsynchronous) {
- _session()->getTransportLayer()->asyncWait(
- std::move(ticket), [this](Status status) { _sourceCallback(status); });
+ _session()->asyncSourceMessage([this](StatusWith<Message> msg) {
+ if (msg.isOK()) {
+ _inMessage = std::move(msg.getValue());
+ invariant(!_inMessage.empty());
+ }
+ _sourceCallback(msg.getStatus());
+ });
}
}
void ServiceStateMachine::_sinkMessage(ThreadGuard guard, Message toSink) {
// Sink our response to the client
- auto ticket = _session()->sinkMessage(toSink);
-
+ invariant(_state.load() == State::Process);
_state.store(State::SinkWait);
guard.release();
if (_transportMode == transport::Mode::kSynchronous) {
- _sinkCallback(_session()->getTransportLayer()->wait(std::move(ticket)));
+ _sinkCallback(_session()->sinkMessage(std::move(toSink)));
} else if (_transportMode == transport::Mode::kAsynchronous) {
- _session()->getTransportLayer()->asyncWait(
- std::move(ticket), [this](Status status) { _sinkCallback(status); });
+ _session()->asyncSinkMessage(std::move(toSink),
+ [this](Status status) { _sinkCallback(std::move(status)); });
}
}
@@ -487,7 +495,7 @@ void ServiceStateMachine::terminate() {
if (state() == State::Ended)
return;
- _session()->getTransportLayer()->end(_session());
+ _session()->end();
}
void ServiceStateMachine::terminateIfTagsDontMatch(transport::Session::TagMask tags) {
diff --git a/src/mongo/transport/service_state_machine_test.cpp b/src/mongo/transport/service_state_machine_test.cpp
index d74fcbcdfc7..0cc4aa0634a 100644
--- a/src/mongo/transport/service_state_machine_test.cpp
+++ b/src/mongo/transport/service_state_machine_test.cpp
@@ -38,7 +38,6 @@
#include "mongo/db/service_context_noop.h"
#include "mongo/stdx/memory.h"
#include "mongo/transport/mock_session.h"
-#include "mongo/transport/mock_ticket.h"
#include "mongo/transport/service_entry_point.h"
#include "mongo/transport/service_executor.h"
#include "mongo/transport/service_executor_task_names.h"
@@ -115,62 +114,59 @@ private:
using namespace transport;
class MockTL : public TransportLayerMock {
public:
- ~MockTL() = default;
+ class Session : public MockSession {
+ public:
+ using MockSession::MockSession;
- Ticket sourceMessage(const SessionHandle& session,
- Message* message,
- Date_t expiration = Ticket::kNoExpirationDate) override {
- ASSERT_EQ(_ssm->state(), ServiceStateMachine::State::Source);
- _lastTicketSource = true;
+ StatusWith<Message> sourceMessage() override {
+ auto tl = checked_cast<MockTL*>(getTransportLayer());
+ ASSERT_EQ(tl->_ssm->state(), ServiceStateMachine::State::SourceWait);
+ tl->_lastTicketSource = true;
- _ranSource = true;
- log() << "In sourceMessage";
+ tl->_ranSource = true;
+ log() << "In sourceMessage";
- if (_nextShouldFail & Source) {
- return TransportLayer::TicketSessionClosedStatus;
- }
+ if (tl->_waitHook)
+ tl->_waitHook();
- OpMsgBuilder builder;
- builder.setBody(BSON("ping" << 1));
- *message = builder.finish();
+ if (tl->_nextShouldFail & Source) {
+ return TransportLayer::TicketSessionClosedStatus;
+ }
- return TransportLayerMock::sourceMessage(session, message, expiration);
- }
+ auto out = MockSession::sourceMessage();
+ if (out.isOK()) {
+ OpMsgBuilder builder;
+ builder.setBody(BSON("ping" << 1));
+ out.getValue() = builder.finish();
+ }
+ return out;
+ }
- Ticket sinkMessage(const SessionHandle& session,
- const Message& message,
- Date_t expiration = Ticket::kNoExpirationDate) override {
- ASSERT_EQ(_ssm->state(), ServiceStateMachine::State::Process);
- _lastTicketSource = false;
+ Status sinkMessage(Message message) override {
+ auto tl = checked_cast<MockTL*>(getTransportLayer());
+ ASSERT_EQ(tl->_ssm->state(), ServiceStateMachine::State::SinkWait);
+ tl->_lastTicketSource = false;
- log() << "In sinkMessage";
- _ranSink = true;
+ log() << "In sinkMessage";
+ tl->_ranSink = true;
- if (_nextShouldFail & Sink) {
- return TransportLayer::TicketSessionClosedStatus;
- }
+ if (tl->_waitHook)
+ tl->_waitHook();
- _lastSunk = message;
+ if (tl->_nextShouldFail & Sink) {
+ return TransportLayer::TicketSessionClosedStatus;
+ }
- return TransportLayerMock::sinkMessage(session, message, expiration);
- }
+ auto out = MockSession::sinkMessage(message);
+ if (out.isOK())
+ tl->_lastSunk = message;
- Status wait(Ticket&& ticket) override {
- if (!ticket.valid()) {
- return ticket.status();
+ return out;
}
- ASSERT_EQ(_ssm->state(),
- _lastTicketSource ? ServiceStateMachine::State::SourceWait
- : ServiceStateMachine::State::SinkWait);
-
- log() << "In wait. ssm state: " << stateToString(_ssm->state());
- if (_waitHook)
- _waitHook();
- return TransportLayerMock::wait(std::move(ticket));
- }
+ };
- void asyncWait(Ticket&& ticket, TicketCallback callback) override {
- MONGO_UNREACHABLE;
+ MockTL() {
+ createSessionHook = [](TransportLayer* tl) { return std::make_unique<Session>(tl); };
}
void setSSM(ServiceStateMachine* ssm) {
diff --git a/src/mongo/transport/session.cpp b/src/mongo/transport/session.cpp
index ad2eceae039..b6d78673e10 100644
--- a/src/mongo/transport/session.cpp
+++ b/src/mongo/transport/session.cpp
@@ -45,14 +45,6 @@ AtomicUInt64 sessionIdCounter(0);
Session::Session() : _id(sessionIdCounter.addAndFetch(1)), _tags(kPending) {}
-Ticket Session::sourceMessage(Message* message, Date_t expiration) {
- return getTransportLayer()->sourceMessage(shared_from_this(), message, expiration);
-}
-
-Ticket Session::sinkMessage(const Message& message, Date_t expiration) {
- return getTransportLayer()->sinkMessage(shared_from_this(), message, expiration);
-}
-
void Session::setTags(TagMask tagsToSet) {
mutateTags([tagsToSet](TagMask originalTags) { return (originalTags | tagsToSet); });
}
diff --git a/src/mongo/transport/session.h b/src/mongo/transport/session.h
index b2160117807..d25d32b9027 100644
--- a/src/mongo/transport/session.h
+++ b/src/mongo/transport/session.h
@@ -33,7 +33,6 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/transport/session_id.h"
-#include "mongo/transport/ticket.h"
#include "mongo/util/decorable.h"
#include "mongo/util/net/hostandport.h"
#include "mongo/util/net/message.h"
@@ -75,47 +74,45 @@ public:
static constexpr TagMask kExternalClientKeepOpen = 8;
static constexpr TagMask kPending = 1 << 31;
- /**
- * Destroys a session, calling end() for this session in its TransportLayer.
- */
virtual ~Session() = default;
- /**
- * Return the id for this session.
- */
Id id() const {
return _id;
}
- /**
- * The TransportLayer for this Session.
- */
virtual TransportLayer* getTransportLayer() const = 0;
/**
- * Source (receive) a new Message for this Session.
+ * Ends this Session.
*
- * This method will forward to sourceMessage on this Session's transport layer.
- */
- virtual Ticket sourceMessage(Message* message, Date_t expiration = Ticket::kNoExpirationDate);
-
- /**
- * Sink (send) a new Message for this Session. This method should be used
- * to send replies to a given host.
+ * Operations on this Session that have already been started via wait() or asyncWait() will
+ * complete, but may return a failed Status. Future operations on this Session will fail. If
+ * this TransportLayer implementation is networked, any connections for this Session will be
+ * closed.
*
- * This method will forward to sinkMessage on this Session's transport layer.
+ * This method is idempotent and synchronous.
+ *
+ * Destructors of derived classes will close the session automatically if needed. This method
+ * should only be called explicitly if the session should be closed separately from destruction,
+ * eg due to some outside event.
*/
- virtual Ticket sinkMessage(const Message& message,
- Date_t expiration = Ticket::kNoExpirationDate);
+ virtual void end() = 0;
/**
- * Return the remote host for this session.
+ * Source (receive) a new Message from the remote host for this Session.
*/
- virtual const HostAndPort& remote() const = 0;
+ virtual StatusWith<Message> sourceMessage() = 0;
+ virtual void asyncSourceMessage(std::function<void(StatusWith<Message>)> cb) = 0;
/**
- * Return the local host information for this session.
+ * Sink (send) a Message to the remote host for this Session.
+ *
+ * Async version will keep the buffer alive until the operation completes.
*/
+ virtual Status sinkMessage(Message message) = 0;
+ virtual void asyncSinkMessage(Message message, std::function<void(Status)> cb) = 0;
+
+ virtual const HostAndPort& remote() const = 0;
virtual const HostAndPort& local() const = 0;
/**
@@ -147,15 +144,9 @@ public:
*/
virtual void mutateTags(const stdx::function<TagMask(TagMask)>& mutateFunc);
- /**
- * Get this session's tags.
- */
virtual TagMask getTags() const;
protected:
- /**
- * Construct a new session.
- */
Session();
private:
diff --git a/src/mongo/transport/session_asio.h b/src/mongo/transport/session_asio.h
index 352dc6cf102..5cdd8105870 100644
--- a/src/mongo/transport/session_asio.h
+++ b/src/mongo/transport/session_asio.h
@@ -32,6 +32,7 @@
#include "mongo/base/system_error.h"
#include "mongo/config.h"
+#include "mongo/db/stats/counters.h"
#include "mongo/transport/asio_utils.h"
#include "mongo/transport/transport_layer_asio.h"
#include "mongo/util/net/sock.h"
@@ -50,7 +51,7 @@ namespace transport {
using GenericSocket = asio::generic::stream_protocol::socket;
-class TransportLayerASIO::ASIOSession : public Session {
+class TransportLayerASIO::ASIOSession final : public Session {
MONGO_DISALLOW_COPYING(ASIOSession);
public:
@@ -58,9 +59,6 @@ public:
: _socket(std::move(socket)), _tl(tl) {
std::error_code ec;
- _socket.non_blocking(_tl->_listenerOptions.transportMode == Mode::kAsynchronous, ec);
- fassert(40490, ec.value() == 0);
-
auto family = endpointToSockAddr(_socket.local_endpoint()).getType();
if (family == AF_INET || family == AF_INET6) {
_socket.set_option(asio::ip::tcp::no_delay(true));
@@ -76,7 +74,7 @@ public:
}
~ASIOSession() {
- shutdown();
+ end();
}
TransportLayer* getTransportLayer() const override {
@@ -91,16 +89,7 @@ public:
return _local;
}
- GenericSocket& getSocket() {
-#ifdef MONGO_CONFIG_SSL
- if (_sslSocket) {
- return static_cast<GenericSocket&>(_sslSocket->lowest_layer());
- }
-#endif
- return _socket;
- }
-
- void shutdown() {
+ void end() override {
if (getSocket().is_open()) {
std::error_code ec;
getSocket().cancel();
@@ -111,6 +100,76 @@ public:
}
}
+ StatusWith<Message> sourceMessage() override {
+ ensureSync();
+ auto out = StatusWith<Message>(ErrorCodes::InternalError, "uninitialized...");
+ bool called = false;
+ sourceMessageImpl(true, [&](StatusWith<Message> in) {
+ out = std::move(in);
+ called = true;
+ });
+ invariant(called);
+ return out;
+ }
+
+ void asyncSourceMessage(std::function<void(StatusWith<Message>)> cb) override {
+ ensureAsync();
+ sourceMessageImpl(false, std::move(cb));
+ }
+
+ Status sinkMessage(Message message) override {
+ ensureSync();
+
+ std::error_code ec;
+ size_t size;
+ bool called = false;
+
+ write(true,
+ asio::buffer(message.buf(), message.size()),
+ [&](const std::error_code& ec_, size_t size_) {
+ ec = ec_;
+ size = size_;
+ called = true;
+ });
+ invariant(called);
+
+ if (ec)
+ return errorCodeToStatus(ec);
+
+ invariant(size == size_t(message.size()));
+ networkCounter.hitPhysicalOut(message.size());
+ return Status::OK();
+ }
+
+ void asyncSinkMessage(Message message, std::function<void(Status)> cb) override {
+ ensureAsync();
+
+ write(false, asio::buffer(message.buf(), message.size()), [
+ message, // keep the buffer alive.
+ cb = std::move(cb),
+ this
+ ](const std::error_code& ec, size_t size) {
+ if (ec) {
+ cb(errorCodeToStatus(ec));
+ return;
+ }
+ invariant(size == size_t(message.size()));
+ networkCounter.hitPhysicalOut(message.size());
+ cb(Status::OK());
+ });
+ };
+
+
+private:
+ GenericSocket& getSocket() {
+#ifdef MONGO_CONFIG_SSL
+ if (_sslSocket) {
+ return static_cast<GenericSocket&>(_sslSocket->lowest_layer());
+ }
+#endif
+ return _socket;
+ }
+
bool isOpen() const {
#ifdef MONGO_CONFIG_SSL
return _sslSocket ? _sslSocket->lowest_layer().is_open() : _socket.is_open();
@@ -119,6 +178,55 @@ public:
#endif
}
+ template <typename Callback>
+ void sourceMessageImpl(bool sync, Callback&& cb) {
+ static constexpr auto kHeaderSize = sizeof(MSGHEADER::Value);
+
+ auto headerBuffer = SharedBuffer::allocate(kHeaderSize);
+ auto ptr = headerBuffer.get();
+ read(sync,
+ asio::buffer(ptr, kHeaderSize),
+ [ cb = std::forward<Callback>(cb), headerBuffer = std::move(headerBuffer), this ](
+ const std::error_code& ec, size_t size) mutable {
+
+ if (ec)
+ return cb(errorCodeToStatus(ec));
+ invariant(size == kHeaderSize);
+
+ const auto msgLen = size_t(MSGHEADER::View(headerBuffer.get()).getMessageLength());
+ if (msgLen < kHeaderSize || msgLen > MaxMessageSizeBytes) {
+ StringBuilder sb;
+ sb << "recv(): message msgLen " << msgLen << " is invalid. "
+ << "Min " << kHeaderSize << " Max: " << MaxMessageSizeBytes;
+ const auto str = sb.str();
+ LOG(0) << str;
+
+ return cb(Status(ErrorCodes::ProtocolError, str));
+ }
+
+ if (msgLen == size) {
+ // This probably isn't a real case since all (current) messages have bodies.
+ networkCounter.hitPhysicalIn(msgLen);
+ return cb(Message(std::move(headerBuffer)));
+ }
+
+ auto buffer = SharedBuffer::allocate(msgLen);
+ memcpy(buffer.get(), headerBuffer.get(), kHeaderSize);
+
+ MsgData::View msgView(buffer.get());
+ read(true,
+ asio::buffer(msgView.data(), msgView.dataLen()),
+ [ cb = std::move(cb), buffer = std::move(buffer), msgLen, this ](
+ const std::error_code& ec, size_t size) mutable {
+ if (ec)
+ return cb(errorCodeToStatus(ec));
+ networkCounter.hitPhysicalIn(msgLen);
+ return cb(Message(std::move(buffer)));
+ });
+ });
+ }
+
+
template <typename MutableBufferSequence, typename CompleteHandler>
void read(bool sync, const MutableBufferSequence& buffers, CompleteHandler&& handler) {
#ifdef MONGO_CONFIG_SSL
@@ -127,7 +235,8 @@ public:
sync, *_sslSocket, buffers, std::forward<CompleteHandler>(handler));
} else if (!_ranHandshake) {
invariant(asio::buffer_size(buffers) >= sizeof(MSGHEADER::Value));
- auto postHandshakeCb = [this, sync, buffers, handler](Status status, bool needsRead) {
+ auto postHandshakeCb = [this, sync, buffers, handler](Status status,
+ bool needsRead) mutable {
if (status.isOK()) {
if (needsRead) {
read(sync, buffers, handler);
@@ -142,7 +251,7 @@ public:
auto handshakeRecvCb =
[ this, postHandshakeCb = std::move(postHandshakeCb), sync, buffers ](
- const std::error_code& ec, size_t size) {
+ const std::error_code& ec, size_t size) mutable {
_ranHandshake = true;
if (ec) {
postHandshakeCb(errorCodeToStatus(ec), size);
@@ -152,30 +261,41 @@ public:
maybeHandshakeSSL(sync, buffers, std::move(postHandshakeCb));
};
- opportunisticRead(sync, _socket, buffers, std::move(handshakeRecvCb));
- } else {
-
-#endif
- opportunisticRead(sync, _socket, buffers, std::forward<CompleteHandler>(handler));
-#ifdef MONGO_CONFIG_SSL
+ return opportunisticRead(sync, _socket, buffers, std::move(handshakeRecvCb));
}
#endif
+ return opportunisticRead(sync, _socket, buffers, std::forward<CompleteHandler>(handler));
}
template <typename ConstBufferSequence, typename CompleteHandler>
void write(bool sync, const ConstBufferSequence& buffers, CompleteHandler&& handler) {
#ifdef MONGO_CONFIG_SSL
if (_sslSocket) {
- opportunisticWrite(sync, *_sslSocket, buffers, std::forward<CompleteHandler>(handler));
- } else {
-#endif
- opportunisticWrite(sync, _socket, buffers, std::forward<CompleteHandler>(handler));
-#ifdef MONGO_CONFIG_SSL
+ return opportunisticWrite(
+ sync, *_sslSocket, buffers, std::forward<CompleteHandler>(handler));
}
#endif
+ return opportunisticWrite(sync, _socket, buffers, std::forward<CompleteHandler>(handler));
+ }
+
+ void ensureSync() {
+ if (_blockingMode == Sync)
+ return;
+ asio::error_code ec;
+ getSocket().non_blocking(false, ec);
+ fassertStatusOK(40490, errorCodeToStatus(ec));
+ _blockingMode = Sync;
+ }
+
+ void ensureAsync() {
+ if (_blockingMode == Async)
+ return;
+ asio::error_code ec;
+ getSocket().non_blocking(true, ec);
+ fassertStatusOK(50706, errorCodeToStatus(ec));
+ _blockingMode = Async;
}
-private:
template <typename Stream, typename MutableBufferSequence, typename CompleteHandler>
void opportunisticRead(bool sync,
Stream& stream,
@@ -243,7 +363,7 @@ private:
_sslSocket.emplace(std::move(_socket), *_tl->_sslContext);
auto handshakeCompleteCb = [ this, onComplete = std::move(onComplete) ](
- const std::error_code& ec, size_t size) {
+ const std::error_code& ec, size_t size) mutable {
auto& sslPeerInfo = SSLPeerInfo::forSession(shared_from_this());
if (!ec && sslPeerInfo.subjectName.empty()) {
@@ -297,6 +417,14 @@ private:
}
#endif
+ enum BlockingMode {
+ Unknown,
+ Sync,
+ Async,
+ };
+
+ BlockingMode _blockingMode = Unknown;
+
HostAndPort _remote;
HostAndPort _local;
diff --git a/src/mongo/transport/ticket.cpp b/src/mongo/transport/ticket.cpp
deleted file mode 100644
index d1003957b80..00000000000
--- a/src/mongo/transport/ticket.cpp
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Copyright (C) 2016 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/base/status.h"
-#include "mongo/transport/ticket.h"
-#include "mongo/transport/ticket_impl.h"
-#include "mongo/transport/transport_layer.h"
-
-namespace mongo {
-namespace transport {
-
-const Date_t Ticket::kNoExpirationDate{Date_t::max()};
-
-Status Ticket::ExpiredStatus = Status(ErrorCodes::ExceededTimeLimit, "Ticket has expired.");
-
-Status Ticket::SessionClosedStatus =
- Status(ErrorCodes::TransportSessionClosed, "Ticket's Session is closed.");
-
-Ticket::Ticket(TransportLayer* tl, std::unique_ptr<TicketImpl> ticket)
- : _tl(tl), _ticket(std::move(ticket)) {}
-
-Ticket::Ticket(Status status) : _status(status) {}
-
-Ticket::~Ticket() = default;
-
-Ticket::Ticket(Ticket&&) = default;
-Ticket& Ticket::operator=(Ticket&&) = default;
-
-Status Ticket::wait()&& {
- // If the ticket is invalid then _tl is a nullptr and we should return early.
- if (!valid())
- return status();
-
- invariant(_tl);
- return _tl->wait(std::move(*this));
-}
-
-void Ticket::asyncWait(TicketCallback cb)&& {
- // If the ticket is invalid then _tl is a nullptr and we should return early.
- if (!valid()) {
- cb(status());
- return;
- }
-
- invariant(_tl);
- return _tl->asyncWait(std::move(*this), std::move(cb));
-}
-
-bool Ticket::valid() {
- return _status == Status::OK() && !expired();
-}
-
-Status Ticket::status() const {
- return _status;
-}
-
-bool Ticket::expired() {
- bool expired = expiration() <= Date_t::now();
- if (_status == Status::OK() && expired) {
- _status = Status(ErrorCodes::ExceededTimeLimit, "Ticket has expired.");
- }
- return expired;
-}
-
-} // namespace transport
-} // namespace mongo
diff --git a/src/mongo/transport/ticket.h b/src/mongo/transport/ticket.h
deleted file mode 100644
index d4ffae468da..00000000000
--- a/src/mongo/transport/ticket.h
+++ /dev/null
@@ -1,154 +0,0 @@
-/**
- * Copyright (C) 2016 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include <memory>
-
-#include "mongo/base/disallow_copying.h"
-#include "mongo/base/status.h"
-#include "mongo/stdx/functional.h"
-#include "mongo/transport/ticket_impl.h"
-#include "mongo/util/time_support.h"
-
-namespace mongo {
-namespace transport {
-
-class Session;
-class TransportLayer;
-
-/**
- * A Ticket represents some work to be done within the TransportLayer.
- * Run Tickets by passing them in a call to either TransportLayer::wait()
- * or TransportLayer::asyncWait().
- */
-class Ticket {
- MONGO_DISALLOW_COPYING(Ticket);
-
-public:
- using TicketCallback = stdx::function<void(Status)>;
-
- friend class TransportLayer;
-
- /**
- * Indicates that there is no expiration time by when a ticket needs to complete.
- */
- static const Date_t kNoExpirationDate;
-
- static Status ExpiredStatus;
- static Status SessionClosedStatus;
-
- Ticket(TransportLayer* tl, std::unique_ptr<TicketImpl> ticket);
-
- /**
- * An invalid ticket and a Status for why it's invalid.
- */
- Ticket(Status status);
-
- ~Ticket();
-
- /**
- * Move constructor and assignment operator.
- */
- Ticket(Ticket&&);
- Ticket& operator=(Ticket&&);
-
- /**
- * Return this ticket's session id.
- */
- SessionId sessionId() const {
- return _ticket->sessionId();
- }
-
- /**
- * Return this ticket's expiration date.
- */
- Date_t expiration() const {
- return _ticket->expiration();
- }
-
- /**
- * Wait for this ticket to be filled.
- *
- * This is this-rvalue qualified because it consumes the ticket
- */
- Status wait() &&;
-
- /**
- * Asynchronously wait for this ticket to be filled.
- *
- * This is this-rvalue qualified because it consumes the ticket
- *
- * If the ticket has expired or is not valid when asyncWait is called, cb will be called
- * immediately and inline with the error status.
- */
- void asyncWait(TicketCallback cb) &&;
-
- /*
- * Return's the Status of the ticket.
- */
- Status status() const;
-
- /*
- * Returns true if the ticket has expired, false otherwise.
- *
- * If the ticket has expired, changes the Status of the ticket to TicketExpired but
- * only if the current Status is Status::OK().
- */
- bool expired();
-
- /*
- * Return true if the ticket is usable, false otherwise.
- *
- * If the ticket has expired, changes the Status of the ticket to TicketExpired but
- * only if the current Status is Status::OK().
- */
- bool valid();
-
- /**
- * Return a non-owning pointer to the underlying TicketImpl type
- */
- TicketImpl* impl() const {
- return _ticket.get();
- }
-
- /**
- * Return an owning pointer to the underlying TicketImpl type. This consumes the ticket
- */
- std::unique_ptr<TicketImpl> releaseImpl() && {
- return std::move(_ticket);
- }
-
-private:
- TransportLayer* _tl = nullptr;
- Status _status = Status::OK();
- std::unique_ptr<TicketImpl> _ticket;
-};
-
-} // namespace transport
-} // namespace mongo
diff --git a/src/mongo/transport/ticket_asio.cpp b/src/mongo/transport/ticket_asio.cpp
deleted file mode 100644
index 78102f6b5a8..00000000000
--- a/src/mongo/transport/ticket_asio.cpp
+++ /dev/null
@@ -1,168 +0,0 @@
-/**
- * Copyright (C) 2017 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/base/system_error.h"
-#include "mongo/db/stats/counters.h"
-#include "mongo/transport/asio_utils.h"
-#include "mongo/transport/ticket_asio.h"
-#include "mongo/util/log.h"
-
-#include "mongo/transport/session_asio.h"
-
-namespace mongo {
-namespace transport {
-namespace {
-constexpr auto kHeaderSize = sizeof(MSGHEADER::Value);
-
-} // namespace
-
-
-std::shared_ptr<TransportLayerASIO::ASIOSession> TransportLayerASIO::ASIOTicket::getSession() {
- auto session = _session.lock();
- if (!session || !session->isOpen()) {
- finishFill(Ticket::SessionClosedStatus);
- return nullptr;
- }
- return session;
-}
-
-bool TransportLayerASIO::ASIOTicket::isSync() const {
- return _fillSync;
-}
-
-TransportLayerASIO::ASIOTicket::ASIOTicket(const ASIOSessionHandle& session, Date_t expiration)
- : _session(session), _sessionId(session->id()), _expiration(expiration) {}
-
-TransportLayerASIO::ASIOSourceTicket::ASIOSourceTicket(const ASIOSessionHandle& session,
- Date_t expiration,
- Message* msg)
- : ASIOTicket(session, expiration), _target(msg) {}
-
-TransportLayerASIO::ASIOSinkTicket::ASIOSinkTicket(const ASIOSessionHandle& session,
- Date_t expiration,
- const Message& msg)
- : ASIOTicket(session, expiration), _msgToSend(msg) {}
-
-void TransportLayerASIO::ASIOSourceTicket::_bodyCallback(const std::error_code& ec, size_t size) {
- if (ec) {
- finishFill(errorCodeToStatus(ec));
- return;
- }
-
- _target->setData(std::move(_buffer));
- networkCounter.hitPhysicalIn(_target->size());
- finishFill(Status::OK());
-}
-
-void TransportLayerASIO::ASIOSourceTicket::_headerCallback(const std::error_code& ec, size_t size) {
- if (ec) {
- finishFill(errorCodeToStatus(ec));
- return;
- }
-
- auto session = getSession();
- if (!session)
- return;
-
- MSGHEADER::View headerView(_buffer.get());
- auto msgLen = static_cast<size_t>(headerView.getMessageLength());
- if (msgLen < kHeaderSize || msgLen > MaxMessageSizeBytes) {
- StringBuilder sb;
- sb << "recv(): message msgLen " << msgLen << " is invalid. "
- << "Min " << kHeaderSize << " Max: " << MaxMessageSizeBytes;
- const auto str = sb.str();
- LOG(0) << str;
- finishFill(Status(ErrorCodes::ProtocolError, str));
- return;
- }
-
- if (msgLen == size) {
- finishFill(Status::OK());
- return;
- }
-
- _buffer.realloc(msgLen);
- MsgData::View msgView(_buffer.get());
-
- session->read(isSync(),
- asio::buffer(msgView.data(), msgView.dataLen()),
- [this](const std::error_code& ec, size_t size) { _bodyCallback(ec, size); });
-}
-
-void TransportLayerASIO::ASIOSourceTicket::fillImpl() {
- auto session = getSession();
- if (!session)
- return;
-
- const auto initBufSize = kHeaderSize;
- _buffer = SharedBuffer::allocate(initBufSize);
-
- session->read(isSync(),
- asio::buffer(_buffer.get(), initBufSize),
- [this](const std::error_code& ec, size_t size) { _headerCallback(ec, size); });
-}
-
-void TransportLayerASIO::ASIOSinkTicket::_sinkCallback(const std::error_code& ec, size_t size) {
- networkCounter.hitPhysicalOut(_msgToSend.size());
- finishFill(ec ? errorCodeToStatus(ec) : Status::OK());
-}
-
-void TransportLayerASIO::ASIOSinkTicket::fillImpl() {
- auto session = getSession();
- if (!session)
- return;
-
- session->write(isSync(),
- asio::buffer(_msgToSend.buf(), _msgToSend.size()),
- [this](const std::error_code& ec, size_t size) { _sinkCallback(ec, size); });
-}
-
-void TransportLayerASIO::ASIOTicket::finishFill(Status status) {
- // We want to make sure that a Ticket can only be filled once; filling a ticket invalidates it.
- // So we check that the _fillCallback is set, then move it out of the ticket and into a local
- // variable, and then call that. It's illegal to interact with the ticket after calling the
- // fillCallback, so we have to move it out of _fillCallback so there are no writes to any
- // variables in ASIOTicket after it gets called.
- invariant(_fillCallback);
- auto fillCallback = std::move(_fillCallback);
- fillCallback(status);
-}
-
-void TransportLayerASIO::ASIOTicket::fill(bool sync, TicketCallback&& cb) {
- _fillSync = sync;
- dassert(!_fillCallback);
- _fillCallback = std::move(cb);
- fillImpl();
-}
-
-} // namespace transport
-} // namespace mongo
diff --git a/src/mongo/transport/ticket_asio.h b/src/mongo/transport/ticket_asio.h
deleted file mode 100644
index faa687a943e..00000000000
--- a/src/mongo/transport/ticket_asio.h
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Copyright (C) 2017 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include "mongo/transport/transport_layer_asio.h"
-
-#include "asio.hpp"
-
-namespace mongo {
-namespace transport {
-
-class TransportLayerASIO::ASIOTicket : public TicketImpl {
- MONGO_DISALLOW_COPYING(ASIOTicket);
-
-public:
- explicit ASIOTicket(const ASIOSessionHandle& session, Date_t expiration);
-
- SessionId sessionId() const final {
- return _sessionId;
- }
-
- Date_t expiration() const final {
- return _expiration;
- }
-
- /**
- * Run this ticket's work item.
- */
- void fill(bool sync, TicketCallback&& cb);
-
-protected:
- void finishFill(Status status);
- std::shared_ptr<ASIOSession> getSession();
- bool isSync() const;
-
- // This must be implemented by the Source/Sink subclasses as the actual implementation
- // of filling the ticket.
- virtual void fillImpl() = 0;
-
-private:
- std::weak_ptr<ASIOSession> _session;
- const SessionId _sessionId;
- const Date_t _expiration;
-
- TicketCallback _fillCallback;
- bool _fillSync;
-};
-
-class TransportLayerASIO::ASIOSourceTicket : public TransportLayerASIO::ASIOTicket {
-public:
- ASIOSourceTicket(const ASIOSessionHandle& session, Date_t expiration, Message* msg);
-
-protected:
- void fillImpl() final;
-
-private:
- void _headerCallback(const std::error_code& ec, size_t size);
- void _bodyCallback(const std::error_code& ec, size_t size);
-
- SharedBuffer _buffer;
- Message* _target;
-};
-
-class TransportLayerASIO::ASIOSinkTicket : public TransportLayerASIO::ASIOTicket {
-public:
- ASIOSinkTicket(const ASIOSessionHandle& session, Date_t expiration, const Message& msg);
-
-protected:
- void fillImpl() final;
-
-private:
- void _sinkCallback(const std::error_code& ec, size_t size);
- Message _msgToSend;
-};
-
-} // namespace transport
-} // namespace mongo
diff --git a/src/mongo/transport/ticket_impl.h b/src/mongo/transport/ticket_impl.h
deleted file mode 100644
index c6d325dbc37..00000000000
--- a/src/mongo/transport/ticket_impl.h
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Copyright (C) 2016 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include "mongo/base/disallow_copying.h"
-#include "mongo/transport/session_id.h"
-#include "mongo/util/net/message.h"
-#include "mongo/util/time_support.h"
-
-namespace mongo {
-namespace transport {
-
-/**
- * Interface representing implementations of Ticket.
- *
- * Ticket implementations are specific to a TransportLayer implementation.
- */
-class TicketImpl {
- MONGO_DISALLOW_COPYING(TicketImpl);
-
-public:
- virtual ~TicketImpl() = default;
-
- /**
- * Return this ticket's session id.
- */
- virtual SessionId sessionId() const = 0;
-
- /**
- * Return this ticket's expiration date.
- */
- virtual Date_t expiration() const = 0;
-
-protected:
- TicketImpl() = default;
-};
-
-} // namespace transport
-} // namespace mongo
diff --git a/src/mongo/transport/transport_layer.h b/src/mongo/transport/transport_layer.h
index 383e3bd0cd5..053aa5f6c15 100644
--- a/src/mongo/transport/transport_layer.h
+++ b/src/mongo/transport/transport_layer.h
@@ -31,15 +31,12 @@
#include "mongo/base/status.h"
#include "mongo/stdx/functional.h"
#include "mongo/transport/session.h"
-#include "mongo/transport/ticket.h"
#include "mongo/util/net/message.h"
#include "mongo/util/time_support.h"
namespace mongo {
namespace transport {
-class TicketImpl;
-
/**
* The TransportLayer moves Messages between transport::Endpoints and the database.
* This class owns an Acceptor that generates new endpoints from which it can
@@ -67,77 +64,6 @@ public:
virtual ~TransportLayer() = default;
/**
- * Source (receive) a new Message for this Session.
- *
- * This method returns a work Ticket. The caller must complete the Ticket by
- * passing it to either TransportLayer::wait() or TransportLayer::asyncWait().
- *
- * If an expiration date is given, the returned Ticket will expire at that time.
- *
- * When run, the returned Ticket will be exchanged for a Status. If the
- * TransportLayer is unable to source a Message, this will be a failed status,
- * and the passed-in Message buffer may be left in an invalid state.
- */
- virtual Ticket sourceMessage(const SessionHandle& session,
- Message* message,
- Date_t expiration = Ticket::kNoExpirationDate) = 0;
-
- /**
- * Sink (send) a new Message for this Session. This method should be used
- * to send replies to a given host.
- *
- * This method returns a work Ticket. The caller must complete the Ticket by
- * passing it to either TransportLayer::wait() or TransportLayer::asyncWait().
- *
- * If an expiration date is given, the returned Ticket will expire at that time.
- *
- * When run, the returned Ticket will be exchanged for a Status. If the
- * TransportLayer is unable to sink the Message, this will be a failed status.
- *
- * This method does NOT take ownership of the sunk Message, which must be cleaned
- * up by the caller.
- */
- virtual Ticket sinkMessage(const SessionHandle& session,
- const Message& message,
- Date_t expiration = Ticket::kNoExpirationDate) = 0;
-
- /**
- * Perform a synchronous wait on the given work Ticket. When this call returns,
- * the Ticket will have been completed. A call to wait() consumes the Ticket.
- *
- * This thread may be used by the TransportLayer to run other Tickets that were
- * enqueued prior to this call.
- */
- virtual Status wait(Ticket&& ticket) = 0;
-
- /**
- * Callback for Tickets that are run via asyncWait().
- */
- using TicketCallback = stdx::function<void(Status)>;
-
- /**
- * Perform an asynchronous wait on the given work Ticket. Once the Ticket has been
- * completed, the passed-in callback will be invoked.
- *
- * This thread will not be used by the TransportLayer to perform work. The callback
- * passed to asyncWait() may be run on any thread.
- */
- virtual void asyncWait(Ticket&& ticket, TicketCallback callback) = 0;
-
- /**
- * End the given Session. Tickets for this Session that have already been
- * started via wait() or asyncWait() will complete, but may return a failed Status.
- * Future calls to wait() or asyncWait() for this Session will fail. If this
- * TransportLayer implementation is networked, any connections for this Session will
- * be closed.
- *
- * ~Session() will automatically call end() with itself.
- *
- * This method is idempotent and synchronous.
- */
- virtual void end(const SessionHandle& session) = 0;
-
- /**
* Start the TransportLayer. After this point, the TransportLayer will begin accepting active
* sessions from new transport::Endpoints.
*/
@@ -160,24 +86,6 @@ public:
protected:
TransportLayer() = default;
-
- /**
- * Return the implementation of this Ticket.
- */
- TicketImpl* getTicketImpl(const Ticket& ticket) {
- return ticket.impl();
- }
-
- std::unique_ptr<TicketImpl> getOwnedTicketImpl(Ticket&& ticket) {
- return std::move(ticket).releaseImpl();
- }
-
- /**
- * Return the transport layer of this Ticket.
- */
- TransportLayer* getTicketTransportLayer(const Ticket& ticket) {
- return ticket._tl;
- }
};
} // namespace transport
diff --git a/src/mongo/transport/transport_layer_asio.cpp b/src/mongo/transport/transport_layer_asio.cpp
index b1c9499eb22..df0319a9b14 100644
--- a/src/mongo/transport/transport_layer_asio.cpp
+++ b/src/mongo/transport/transport_layer_asio.cpp
@@ -46,8 +46,6 @@
#include "mongo/db/service_context.h"
#include "mongo/transport/asio_utils.h"
#include "mongo/transport/service_entry_point.h"
-#include "mongo/transport/ticket.h"
-#include "mongo/transport/ticket_asio.h"
#include "mongo/util/log.h"
#include "mongo/util/net/hostandport.h"
#include "mongo/util/net/message.h"
@@ -85,48 +83,6 @@ TransportLayerASIO::TransportLayerASIO(const TransportLayerASIO::Options& opts,
TransportLayerASIO::~TransportLayerASIO() = default;
-Ticket TransportLayerASIO::sourceMessage(const SessionHandle& session,
- Message* message,
- Date_t expiration) {
- auto asioSession = checked_pointer_cast<ASIOSession>(session);
- auto ticket = stdx::make_unique<ASIOSourceTicket>(asioSession, expiration, message);
- return {this, std::move(ticket)};
-}
-
-Ticket TransportLayerASIO::sinkMessage(const SessionHandle& session,
- const Message& message,
- Date_t expiration) {
- auto asioSession = checked_pointer_cast<ASIOSession>(session);
- auto ticket = stdx::make_unique<ASIOSinkTicket>(asioSession, expiration, message);
- return {this, std::move(ticket)};
-}
-
-Status TransportLayerASIO::wait(Ticket&& ticket) {
- auto ownedASIOTicket = getOwnedTicketImpl(std::move(ticket));
- auto asioTicket = checked_cast<ASIOTicket*>(ownedASIOTicket.get());
-
- Status waitStatus = Status::OK();
- asioTicket->fill(true, [&waitStatus](Status result) { waitStatus = result; });
-
- return waitStatus;
-}
-
-void TransportLayerASIO::asyncWait(Ticket&& ticket, TicketCallback callback) {
- auto ownedASIOTicket = std::shared_ptr<TicketImpl>(getOwnedTicketImpl(std::move(ticket)));
- auto asioTicket = checked_cast<ASIOTicket*>(ownedASIOTicket.get());
-
- asioTicket->fill(
- false,
- [ callback = std::move(callback),
- ownedASIOTicket = std::move(ownedASIOTicket) ](Status status) { callback(status); });
-}
-
-// Must not be called while holding the TransportLayerASIO mutex.
-void TransportLayerASIO::end(const SessionHandle& session) {
- auto asioSession = checked_pointer_cast<ASIOSession>(session);
- asioSession->shutdown();
-}
-
Status TransportLayerASIO::setup() {
std::vector<std::string> listenAddrs;
if (_listenerOptions.ipList.empty()) {
diff --git a/src/mongo/transport/transport_layer_asio.h b/src/mongo/transport/transport_layer_asio.h
index f04881debd8..62c0e02edae 100644
--- a/src/mongo/transport/transport_layer_asio.h
+++ b/src/mongo/transport/transport_layer_asio.h
@@ -38,7 +38,6 @@
#include "mongo/stdx/memory.h"
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/thread.h"
-#include "mongo/transport/ticket_impl.h"
#include "mongo/transport/transport_layer.h"
#include "mongo/transport/transport_mode.h"
#include "mongo/util/net/hostandport.h"
@@ -93,20 +92,6 @@ public:
virtual ~TransportLayerASIO();
- Ticket sourceMessage(const SessionHandle& session,
- Message* message,
- Date_t expiration = Ticket::kNoExpirationDate) final;
-
- Ticket sinkMessage(const SessionHandle& session,
- const Message& message,
- Date_t expiration = Ticket::kNoExpirationDate) final;
-
- Status wait(Ticket&& ticket) final;
-
- void asyncWait(Ticket&& ticket, TicketCallback callback) final;
-
- void end(const SessionHandle& session) final;
-
Status setup() final;
Status start() final;
@@ -120,9 +105,6 @@ public:
private:
class ASIOSession;
- class ASIOTicket;
- class ASIOSourceTicket;
- class ASIOSinkTicket;
using ASIOSessionHandle = std::shared_ptr<ASIOSession>;
using ConstASIOSessionHandle = std::shared_ptr<const ASIOSession>;
diff --git a/src/mongo/transport/transport_layer_manager.cpp b/src/mongo/transport/transport_layer_manager.cpp
index 9df17130142..28bcf351e51 100644
--- a/src/mongo/transport/transport_layer_manager.cpp
+++ b/src/mongo/transport/transport_layer_manager.cpp
@@ -49,26 +49,6 @@ namespace transport {
TransportLayerManager::TransportLayerManager() = default;
-Ticket TransportLayerManager::sourceMessage(const SessionHandle& session,
- Message* message,
- Date_t expiration) {
- return session->getTransportLayer()->sourceMessage(session, message, expiration);
-}
-
-Ticket TransportLayerManager::sinkMessage(const SessionHandle& session,
- const Message& message,
- Date_t expiration) {
- return session->getTransportLayer()->sinkMessage(session, message, expiration);
-}
-
-Status TransportLayerManager::wait(Ticket&& ticket) {
- return getTicketTransportLayer(ticket)->wait(std::move(ticket));
-}
-
-void TransportLayerManager::asyncWait(Ticket&& ticket, TicketCallback callback) {
- return getTicketTransportLayer(ticket)->asyncWait(std::move(ticket), std::move(callback));
-}
-
template <typename Callable>
void TransportLayerManager::_foreach(Callable&& cb) const {
{
@@ -79,10 +59,6 @@ void TransportLayerManager::_foreach(Callable&& cb) const {
}
}
-void TransportLayerManager::end(const SessionHandle& session) {
- session->getTransportLayer()->end(session);
-}
-
// TODO Right now this and setup() leave TLs started if there's an error. In practice the server
// exits with an error and this isn't an issue, but we should make this more robust.
Status TransportLayerManager::start() {
diff --git a/src/mongo/transport/transport_layer_manager.h b/src/mongo/transport/transport_layer_manager.h
index 8349fc56f32..7c80935f00a 100644
--- a/src/mongo/transport/transport_layer_manager.h
+++ b/src/mongo/transport/transport_layer_manager.h
@@ -33,8 +33,6 @@
#include "mongo/base/status.h"
#include "mongo/stdx/mutex.h"
#include "mongo/transport/session.h"
-#include "mongo/transport/ticket.h"
-#include "mongo/transport/ticket_impl.h"
#include "mongo/transport/transport_layer.h"
#include "mongo/util/net/message.h"
#include "mongo/util/time_support.h"
@@ -59,18 +57,6 @@ public:
: _tls(std::move(tls)) {}
TransportLayerManager();
- Ticket sourceMessage(const SessionHandle& session,
- Message* message,
- Date_t expiration = Ticket::kNoExpirationDate) override;
- Ticket sinkMessage(const SessionHandle& session,
- const Message& message,
- Date_t expiration = Ticket::kNoExpirationDate) override;
-
- Status wait(Ticket&& ticket) override;
- void asyncWait(Ticket&& ticket, TicketCallback callback) override;
-
- void end(const SessionHandle& session) override;
-
Status start() override;
void shutdown() override;
Status setup() override;
diff --git a/src/mongo/transport/transport_layer_mock.cpp b/src/mongo/transport/transport_layer_mock.cpp
index 44c43b2d236..ec1a28e6777 100644
--- a/src/mongo/transport/transport_layer_mock.cpp
+++ b/src/mongo/transport/transport_layer_mock.cpp
@@ -33,7 +33,6 @@
#include "mongo/base/status.h"
#include "mongo/stdx/memory.h"
#include "mongo/transport/mock_session.h"
-#include "mongo/transport/mock_ticket.h"
#include "mongo/transport/transport_layer.h"
#include "mongo/util/net/message.h"
#include "mongo/util/time_support.h"
@@ -43,54 +42,8 @@ namespace transport {
TransportLayerMock::TransportLayerMock() : _shutdown(false) {}
-Ticket TransportLayerMock::sourceMessage(const SessionHandle& session,
- Message* message,
- Date_t expiration) {
- if (inShutdown()) {
- return Ticket(TransportLayer::ShutdownStatus);
- } else if (!owns(session->id())) {
- return Ticket(TransportLayer::SessionUnknownStatus);
- } else if (_sessions[session->id()].ended) {
- return Ticket(TransportLayer::TicketSessionClosedStatus);
- }
-
- return Ticket(this, stdx::make_unique<transport::MockTicket>(session, message, expiration));
-}
-
-Ticket TransportLayerMock::sinkMessage(const SessionHandle& session,
- const Message& message,
- Date_t expiration) {
- if (inShutdown()) {
- return Ticket(TransportLayer::ShutdownStatus);
- } else if (!owns(session->id())) {
- return Ticket(TransportLayer::SessionUnknownStatus);
- } else if (_sessions[session->id()].ended) {
- return Ticket(TransportLayer::TicketSessionClosedStatus);
- }
-
- return Ticket(this, stdx::make_unique<transport::MockTicket>(session, expiration));
-}
-
-Status TransportLayerMock::wait(Ticket&& ticket) {
- if (inShutdown()) {
- return ShutdownStatus;
- } else if (!ticket.valid()) {
- return ticket.status();
- } else if (!owns(ticket.sessionId())) {
- return TicketSessionUnknownStatus;
- } else if (_sessions[ticket.sessionId()].ended) {
- return TransportLayer::TicketSessionClosedStatus;
- }
-
- return Status::OK();
-}
-
-void TransportLayerMock::asyncWait(Ticket&& ticket, TicketCallback callback) {
- callback(wait(std::move(ticket)));
-}
-
SessionHandle TransportLayerMock::createSession() {
- auto session = MockSession::create(this);
+ auto session = createSessionHook ? createSessionHook(this) : MockSession::create(this);
Session::Id sessionId = session->id();
_sessions[sessionId] = Connection{false, session, SSLPeerInfo()};
@@ -109,12 +62,6 @@ bool TransportLayerMock::owns(Session::Id id) {
return _sessions.count(id) > 0;
}
-void TransportLayerMock::end(const SessionHandle& session) {
- if (!owns(session->id()))
- return;
- _sessions[session->id()].ended = true;
-}
-
Status TransportLayerMock::setup() {
return Status::OK();
}
diff --git a/src/mongo/transport/transport_layer_mock.h b/src/mongo/transport/transport_layer_mock.h
index fd8670818ad..7a861df129a 100644
--- a/src/mongo/transport/transport_layer_mock.h
+++ b/src/mongo/transport/transport_layer_mock.h
@@ -31,8 +31,6 @@
#include "mongo/base/status.h"
#include "mongo/stdx/unordered_map.h"
#include "mongo/transport/session.h"
-#include "mongo/transport/ticket.h"
-#include "mongo/transport/ticket_impl.h"
#include "mongo/transport/transport_layer.h"
#include "mongo/util/net/message.h"
#include "mongo/util/net/ssl_types.h"
@@ -51,27 +49,21 @@ public:
TransportLayerMock();
~TransportLayerMock();
- Ticket sourceMessage(const SessionHandle& session,
- Message* message,
- Date_t expiration = Ticket::kNoExpirationDate) override;
- Ticket sinkMessage(const SessionHandle& session,
- const Message& message,
- Date_t expiration = Ticket::kNoExpirationDate) override;
-
- Status wait(Ticket&& ticket) override;
- void asyncWait(Ticket&& ticket, TicketCallback callback) override;
-
SessionHandle createSession();
SessionHandle get(Session::Id id);
bool owns(Session::Id id);
- void end(const SessionHandle& session) override;
Status setup() override;
Status start() override;
void shutdown() override;
bool inShutdown() const;
+ // Set to a factory function to use your own session type.
+ std::function<SessionHandle(TransportLayer*)> createSessionHook;
+
private:
+ friend class MockSession;
+
struct Connection {
bool ended;
SessionHandle session;
diff --git a/src/mongo/transport/transport_layer_mock_test.cpp b/src/mongo/transport/transport_layer_mock_test.cpp
deleted file mode 100644
index 2234095c806..00000000000
--- a/src/mongo/transport/transport_layer_mock_test.cpp
+++ /dev/null
@@ -1,261 +0,0 @@
-/**
- * Copyright (C) 2016 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/stdx/memory.h"
-#include "mongo/transport/mock_ticket.h"
-#include "mongo/transport/session.h"
-#include "mongo/transport/transport_layer.h"
-#include "mongo/transport/transport_layer_mock.h"
-#include "mongo/unittest/unittest.h"
-
-namespace mongo {
-namespace transport {
-
-class TransportLayerMockTest : public mongo::unittest::Test {
-public:
- void setUp() {
- _transportLayer = stdx::make_unique<TransportLayerMock>();
- }
-
- TransportLayerMock* tl() {
- return _transportLayer.get();
- }
-
-private:
- std::unique_ptr<TransportLayerMock> _transportLayer;
-};
-
-// sinkMessage() generates a valid Ticket
-TEST_F(TransportLayerMockTest, SinkMessageGeneratesTicket) {
- Message msg{};
- SessionHandle session = tl()->createSession();
-
- // call sinkMessage() with no expiration
- Ticket ticket = tl()->sinkMessage(session, msg);
- ASSERT(ticket.valid());
- ASSERT_OK(ticket.status());
- ASSERT_EQUALS(ticket.sessionId(), session->id());
- ASSERT_EQUALS(ticket.expiration(), Ticket::kNoExpirationDate);
-
- // call sinkMessage() with an expiration
- Date_t expiration = Date_t::now() + Hours(1);
- ticket = tl()->sinkMessage(session, msg, expiration);
- ASSERT(ticket.valid());
- ASSERT_OK(ticket.status());
- ASSERT_EQUALS(ticket.sessionId(), session->id());
- ASSERT_EQUALS(ticket.expiration(), expiration);
-}
-
-// sinkMessage() generates an invalid Ticket if the Session is closed
-TEST_F(TransportLayerMockTest, SinkMessageSessionClosed) {
- Message msg{};
- SessionHandle session = tl()->createSession();
-
- tl()->end(session);
-
- Ticket ticket = tl()->sinkMessage(session, msg);
- ASSERT_FALSE(ticket.valid());
- ASSERT_EQUALS(ticket.status().code(), ErrorCodes::TransportSessionClosed);
-}
-
-// sinkMessage() generates an invalid Ticket if the TransportLayer does not own the Session
-TEST_F(TransportLayerMockTest, SinkMessageSessionUnknown) {
- Message msg{};
-
- std::unique_ptr<TransportLayerMock> anotherTL = stdx::make_unique<TransportLayerMock>();
- SessionHandle session = anotherTL->createSession();
-
- Ticket ticket = tl()->sinkMessage(session, msg);
- ASSERT_FALSE(ticket.valid());
- ASSERT_EQUALS(ticket.status().code(), ErrorCodes::TransportSessionUnknown);
-}
-
-// sinkMessage() generates an invalid Ticket if the TransportLayer is in shutdown
-TEST_F(TransportLayerMockTest, SinkMessageTLShutdown) {
- Message msg{};
- SessionHandle session = tl()->createSession();
-
- tl()->shutdown();
-
- Ticket ticket = tl()->sinkMessage(session, msg);
- ASSERT_FALSE(ticket.valid());
- ASSERT_EQUALS(ticket.status().code(), ErrorCodes::ShutdownInProgress);
-}
-
-// sourceMessage() generates a valid ticket
-TEST_F(TransportLayerMockTest, SourceMessageGeneratesTicket) {
- Message msg{};
- SessionHandle session = tl()->createSession();
-
- // call sourceMessage() with no expiration
- Ticket ticket = tl()->sourceMessage(session, &msg);
- ASSERT(ticket.valid());
- ASSERT_OK(ticket.status());
- ASSERT_EQUALS(ticket.sessionId(), session->id());
- ASSERT(msg.empty());
- ASSERT_EQUALS(ticket.expiration(), Ticket::kNoExpirationDate);
-
- // call sourceMessage() with an expiration
- Date_t expiration = Date_t::now() + Hours(1);
- ticket = tl()->sourceMessage(session, &msg, expiration);
- ASSERT(ticket.valid());
- ASSERT_OK(ticket.status());
- ASSERT_EQUALS(ticket.sessionId(), session->id());
- ASSERT(msg.empty());
- ASSERT_EQUALS(ticket.expiration(), expiration);
-}
-
-// sourceMessage() generates an invalid ticket if the Session is closed
-TEST_F(TransportLayerMockTest, SourceMessageSessionClosed) {
- Message msg{};
- SessionHandle session = tl()->createSession();
-
- tl()->end(session);
-
- Ticket ticket = tl()->sourceMessage(session, &msg);
- ASSERT_FALSE(ticket.valid());
- ASSERT_EQUALS(ticket.status().code(), ErrorCodes::TransportSessionClosed);
-}
-
-// sourceMessage() generates an invalid ticket if the TransportLayer does not own the Session
-TEST_F(TransportLayerMockTest, SourceMessageSessionUnknown) {
- Message msg{};
-
- std::unique_ptr<TransportLayerMock> anotherTL = stdx::make_unique<TransportLayerMock>();
- SessionHandle session = anotherTL->createSession();
-
- Ticket ticket = tl()->sourceMessage(session, &msg);
- ASSERT_FALSE(ticket.valid());
- ASSERT_EQUALS(ticket.status().code(), ErrorCodes::TransportSessionUnknown);
-}
-
-// sourceMessage() generates an invalid ticket if the TransportLayer is in shutdown
-TEST_F(TransportLayerMockTest, SourceMessageTLShutdown) {
- Message msg{};
- SessionHandle session = tl()->createSession();
-
- tl()->shutdown();
-
- Ticket ticket = tl()->sourceMessage(session, &msg);
- ASSERT_FALSE(ticket.valid());
- ASSERT_EQUALS(ticket.status().code(), ErrorCodes::ShutdownInProgress);
-}
-
-// wait() returns an OK status
-TEST_F(TransportLayerMockTest, Wait) {
- SessionHandle session = tl()->createSession();
- Ticket ticket = Ticket(tl(), stdx::make_unique<transport::MockTicket>(session));
-
- Status status = tl()->wait(std::move(ticket));
- ASSERT_OK(status);
-}
-
-// wait() returns an TicketExpired error status if the Ticket expired
-TEST_F(TransportLayerMockTest, WaitExpiredTicket) {
- SessionHandle session = tl()->createSession();
- Ticket expiredTicket =
- Ticket(tl(), stdx::make_unique<transport::MockTicket>(session, Date_t::now()));
-
- Status status = tl()->wait(std::move(expiredTicket));
- ASSERT_EQUALS(status.code(), ErrorCodes::ExceededTimeLimit);
-}
-
-// wait() returns the invalid Ticket's Status
-TEST_F(TransportLayerMockTest, WaitInvalidTicket) {
- Ticket invalidTicket = Ticket(Status(ErrorCodes::UnknownError, ""));
- ASSERT_FALSE(invalidTicket.valid());
-
- Status status = tl()->wait(std::move(invalidTicket));
- ASSERT_EQUALS(status.code(), ErrorCodes::UnknownError);
-}
-
-// wait() returns a SessionClosed error status if the Ticket's Session is closed
-TEST_F(TransportLayerMockTest, WaitSessionClosed) {
- SessionHandle session = tl()->createSession();
- Ticket ticket = Ticket(tl(), stdx::make_unique<transport::MockTicket>(session));
-
- tl()->end(session);
-
- Status status = tl()->wait(std::move(ticket));
- ASSERT_EQUALS(status.code(), ErrorCodes::TransportSessionClosed);
-}
-
-// wait() returns a SessionUnknown error status if the TransportLayer does not own the Ticket's
-// Session
-TEST_F(TransportLayerMockTest, WaitSessionUnknown) {
- std::unique_ptr<TransportLayerMock> anotherTL = stdx::make_unique<TransportLayerMock>();
- SessionHandle session = anotherTL->createSession();
- Ticket ticket = Ticket(tl(), stdx::make_unique<transport::MockTicket>(session));
-
- Status status = tl()->wait(std::move(ticket));
- ASSERT_EQUALS(status.code(), ErrorCodes::TransportSessionUnknown);
-}
-
-// wait() returns a ShutdownInProgress status if the TransportLayer is in shutdown
-TEST_F(TransportLayerMockTest, WaitTLShutdown) {
- SessionHandle session = tl()->createSession();
- Ticket ticket = Ticket(tl(), stdx::make_unique<transport::MockTicket>(session));
-
- tl()->shutdown();
-
- Status status = tl()->wait(std::move(ticket));
- ASSERT_EQUALS(status.code(), ErrorCodes::ShutdownInProgress);
-}
-
-std::vector<SessionHandle> createSessions(TransportLayerMock* tl) {
- int numSessions = 10;
- std::vector<SessionHandle> sessions;
- for (int i = 0; i < numSessions; i++) {
- SessionHandle session = tl->createSession();
- sessions.push_back(session);
- }
- return sessions;
-}
-
-void assertEnded(TransportLayer* tl,
- std::vector<SessionHandle> sessions,
- ErrorCodes::Error code = ErrorCodes::TransportSessionClosed) {
- for (auto session : sessions) {
- Ticket ticket = Ticket(tl, stdx::make_unique<transport::MockTicket>(session));
- Status status = tl->wait(std::move(ticket));
- ASSERT_EQUALS(status.code(), code);
- }
-}
-
-// shutdown() ends all sessions and shuts down
-TEST_F(TransportLayerMockTest, Shutdown) {
- std::vector<SessionHandle> sessions = createSessions(tl());
- tl()->shutdown();
- assertEnded(tl(), sessions, ErrorCodes::ShutdownInProgress);
- ASSERT(tl()->inShutdown());
-}
-
-} // namespace transport
-} // namespace mongo