diff options
author | Mathias Stearn <mathias@10gen.com> | 2018-02-09 14:18:11 -0500 |
---|---|---|
committer | Mathias Stearn <mathias@10gen.com> | 2018-02-13 19:04:39 -0500 |
commit | ed15b99846db007f06d74d1cb5f8d37f954aa244 (patch) | |
tree | c6613360f29cc52a955ade71534dfdef077a1588 | |
parent | 66d2a03579bb1a259aec36038f9250e681ede08c (diff) | |
download | mongo-ed15b99846db007f06d74d1cb5f8d37f954aa244.tar.gz |
SERVER-33255 clean up TransportLayer API
29 files changed, 300 insertions, 2335 deletions
diff --git a/src/mongo/client/scoped_db_conn_test.cpp b/src/mongo/client/scoped_db_conn_test.cpp index e9dca46cf23..50c00521947 100644 --- a/src/mongo/client/scoped_db_conn_test.cpp +++ b/src/mongo/client/scoped_db_conn_test.cpp @@ -118,10 +118,11 @@ public: private: void run(transport::SessionHandle session) { - Message inMessage; - if (!session->sourceMessage(&inMessage).wait().isOK()) { + auto swInMessage = session->sourceMessage(); + if (!swInMessage.isOK()) { return; } + Message inMessage = swInMessage.getValue(); auto request = rpc::opMsgRequestFromAnyProtocol(inMessage); commandRequestHook(request); @@ -146,9 +147,8 @@ private: log() << "Delaying response for " << _replyDelay; sleepFor(_replyDelay); } - if (!session->sinkMessage(response).wait().isOK()) { - return; - } + + session->sinkMessage(response).ignore(); } /** diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp index cf2519588d8..7c31274ee11 100644 --- a/src/mongo/db/session.cpp +++ b/src/mongo/db/session.cpp @@ -551,8 +551,7 @@ void Session::_registerUpdateCacheOnCommit(OperationContext* opCtx, const auto closeConnectionElem = data["closeConnection"]; if (closeConnectionElem.eoo() || closeConnectionElem.Bool()) { - auto transportSession = opCtx->getClient()->session(); - transportSession->getTransportLayer()->end(transportSession); + opCtx->getClient()->session()->end(); } const auto failBeforeCommitExceptionElem = data["failBeforeCommitExceptionCode"]; diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript index ab1e4de73ef..75c342876f7 100644 --- a/src/mongo/transport/SConscript +++ b/src/mongo/transport/SConscript @@ -4,22 +4,11 @@ Import('env') env = env.Clone() -env.CppUnitTest( - target='ingress_header_test', - source=[ - 'ingress_header_test.cpp', - ], - LIBDEPS=[ - '$BUILD_DIR/mongo/base', - ], -) - env.Library( target='transport_layer_common', source=[ 'service_entry_point_utils.cpp', 'session.cpp', - 'ticket.cpp', 'transport_layer.cpp', ], LIBDEPS=[ @@ -58,7 +47,6 @@ tlEnv.Library( tlEnv.Library( target='transport_layer', source=[ - 'ticket_asio.cpp', 'transport_layer_asio.cpp', ], LIBDEPS=[ @@ -121,17 +109,6 @@ tlEnv.CppUnitTest( # ) env.Library( - target='service_entry_point_test_suite', - source=[ - 'service_entry_point_test_suite.cpp', - ], - LIBDEPS=[ - 'transport_layer_common', - '$BUILD_DIR/mongo/unittest/unittest', - ], -) - -env.Library( target='service_entry_point', source=[ 'service_entry_point_impl.cpp', @@ -148,17 +125,6 @@ env.Library( ) env.CppUnitTest( - target='service_entry_point_mock_test', - source=[ - 'service_entry_point_mock.cpp', - 'service_entry_point_mock_test.cpp', - ], - LIBDEPS=[ - 'service_entry_point_test_suite', - ], -) - -env.CppUnitTest( target='service_state_machine_test', source=[ 'service_state_machine_test.cpp', @@ -177,17 +143,6 @@ env.CppUnitTest( ], ) -env.CppUnitTest( - target='transport_layer_mock_test', - source=[ - 'transport_layer_mock_test.cpp', - ], - LIBDEPS=[ - 'transport_layer_mock', - ], -) - - zlibEnv = env.Clone() zlibEnv.InjectThirdPartyIncludePaths(libraries=['zlib', 'snappy']) zlibEnv.Library( diff --git a/src/mongo/transport/ingress_header_test.cpp b/src/mongo/transport/ingress_header_test.cpp deleted file mode 100644 index fae28ea3a77..00000000000 --- a/src/mongo/transport/ingress_header_test.cpp +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Copyright (C) 2016 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include "mongo/transport/service_entry_point.h" -#include "mongo/transport/session.h" -#include "mongo/transport/ticket.h" -#include "mongo/transport/transport_layer.h" -#include "mongo/unittest/unittest.h" - -namespace { - -TEST(IngressHeaderTest, TestHeaders) { - // No-op. -} - -} // namespace diff --git a/src/mongo/transport/mock_session.h b/src/mongo/transport/mock_session.h index 147e13e4f7e..2f091854193 100644 --- a/src/mongo/transport/mock_session.h +++ b/src/mongo/transport/mock_session.h @@ -28,14 +28,14 @@ #pragma once +#include "mongo/base/checked_cast.h" #include "mongo/transport/session.h" +#include "mongo/transport/transport_layer_mock.h" #include "mongo/util/net/hostandport.h" namespace mongo { namespace transport { -class TransportLayer; - class MockSession : public Session { MONGO_DISALLOW_COPYING(MockSession); @@ -65,12 +65,53 @@ public: return _local; } -protected: - explicit MockSession(TransportLayer* tl) : _tl(tl), _remote(), _local() {} + void end() override { + if (!_tl->owns(id())) + return; + _tl->_sessions[id()].ended = true; + } + + StatusWith<Message> sourceMessage() override { + if (_tl->inShutdown()) { + return TransportLayer::ShutdownStatus; + } else if (!_tl->owns(id())) { + return TransportLayer::SessionUnknownStatus; + } else if (_tl->_sessions[id()].ended) { + return TransportLayer::TicketSessionClosedStatus; + } + + return Message(); // Subclasses can do something different. + } + + void asyncSourceMessage(std::function<void(StatusWith<Message>)> cb) override { + cb(sourceMessage()); + } + + Status sinkMessage(Message message) override { + if (_tl->inShutdown()) { + return TransportLayer::ShutdownStatus; + } else if (!_tl->owns(id())) { + return TransportLayer::SessionUnknownStatus; + } else if (_tl->_sessions[id()].ended) { + return TransportLayer::TicketSessionClosedStatus; + } + + return Status::OK(); + } + + void asyncSinkMessage(Message message, std::function<void(Status)> cb) override { + cb(sinkMessage(message)); + } + + explicit MockSession(TransportLayer* tl) + : _tl(checked_cast<TransportLayerMock*>(tl)), _remote(), _local() {} explicit MockSession(HostAndPort remote, HostAndPort local, TransportLayer* tl) - : _tl(tl), _remote(std::move(remote)), _local(std::move(local)) {} + : _tl(checked_cast<TransportLayerMock*>(tl)), + _remote(std::move(remote)), + _local(std::move(local)) {} - TransportLayer* _tl; +protected: + TransportLayerMock* _tl; HostAndPort _remote; HostAndPort _local; diff --git a/src/mongo/transport/mock_ticket.h b/src/mongo/transport/mock_ticket.h deleted file mode 100644 index bbf4eaa9df4..00000000000 --- a/src/mongo/transport/mock_ticket.h +++ /dev/null @@ -1,83 +0,0 @@ -/** - * Copyright (C) 2016 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include "mongo/base/disallow_copying.h" -#include "mongo/transport/session.h" -#include "mongo/transport/ticket_impl.h" -#include "mongo/util/time_support.h" - -namespace mongo { - -class Message; - -namespace transport { - -/** - * A mock ticket class for our test suite. - */ -class MockTicket : public TicketImpl { - MONGO_DISALLOW_COPYING(MockTicket); - -public: - // Source constructor - MockTicket(const SessionHandle& session, - Message* message, - Date_t expiration = Ticket::kNoExpirationDate) - : _session(session), _id(session->id()), _message(message), _expiration(expiration) {} - - // Sink constructor - MockTicket(const SessionHandle& session, Date_t expiration = Ticket::kNoExpirationDate) - : _session(session), _id(session->id()), _expiration(expiration) {} - - SessionId sessionId() const override { - return _id; - } - - Date_t expiration() const override { - return _expiration; - } - - boost::optional<Message*> message() const { - return _message; - } - - SessionHandle session() const { - return _session.lock(); - } - -private: - std::weak_ptr<Session> _session; - Session::Id _id; - boost::optional<Message*> _message; - Date_t _expiration; -}; - -} // namespace transport -} // namespace mongo diff --git a/src/mongo/transport/service_entry_point_mock.cpp b/src/mongo/transport/service_entry_point_mock.cpp deleted file mode 100644 index eb70533ce27..00000000000 --- a/src/mongo/transport/service_entry_point_mock.cpp +++ /dev/null @@ -1,121 +0,0 @@ -/** - * Copyright (C) 2016 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include "mongo/transport/service_entry_point_mock.h" - -#include <vector> - -#include "mongo/bson/bsonmisc.h" -#include "mongo/bson/bsonobjbuilder.h" -#include "mongo/stdx/functional.h" -#include "mongo/stdx/thread.h" -#include "mongo/transport/transport_layer.h" - -namespace mongo { - -using namespace transport; - -ServiceEntryPointMock::ServiceEntryPointMock(transport::TransportLayer* tl) - : _tl(tl), _inShutdown(false) {} - -ServiceEntryPointMock::~ServiceEntryPointMock() { - endAllSessions(transport::Session::kEmptyTagMask); -} - -void ServiceEntryPointMock::startSession(transport::SessionHandle session) { - _threads.emplace_back(&ServiceEntryPointMock::run, this, std::move(session)); -} - -void ServiceEntryPointMock::run(transport::SessionHandle session) { - Message inMessage; - while (true) { - { - stdx::lock_guard<stdx::mutex> lk(_shutdownLock); - if (_inShutdown) - break; - } - - // sourceMessage() - if (!session->sourceMessage(&inMessage).wait().isOK()) { - break; - } - - auto resp = handleRequest(nullptr, inMessage); - - // sinkMessage() - if (!session->sinkMessage(resp.response).wait().isOK()) { - break; - } - } -} - -DbResponse ServiceEntryPointMock::handleRequest(OperationContext* opCtx, const Message& request) { - // Need to set up our { ok : 1 } response. - BufBuilder b{}; - - // Leave room for the message header - b.skip(mongo::MsgData::MsgDataHeaderSize); - - // Add our response - auto okObj = BSON("ok" << 1.0); - okObj.appendSelfToBufBuilder(b); - - // Add some metadata - auto metadata = BSONObj(); - metadata.appendSelfToBufBuilder(b); - - // Set Message header fields - MsgData::View msg = b.buf(); - msg.setLen(b.len()); - msg.setOperation(dbCommandReply); - - return {Message(b.release()), ""}; -} - -void ServiceEntryPointMock::endAllSessions(transport::Session::TagMask) { - { - stdx::lock_guard<stdx::mutex> lk(_shutdownLock); - _inShutdown = true; - } - - for (auto& t : _threads) { - t.join(); - } -} - -ServiceEntryPoint::Stats ServiceEntryPointMock::sessionStats() const { - return {}; -} - -size_t ServiceEntryPointMock::numOpenSessions() const { - return 0ULL; -} - -} // namespace mongo diff --git a/src/mongo/transport/service_entry_point_mock.h b/src/mongo/transport/service_entry_point_mock.h deleted file mode 100644 index 7c13757562f..00000000000 --- a/src/mongo/transport/service_entry_point_mock.h +++ /dev/null @@ -1,91 +0,0 @@ -/** - * Copyright (C) 2016 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include <vector> - -#include "mongo/base/disallow_copying.h" -#include "mongo/stdx/mutex.h" -#include "mongo/stdx/thread.h" -#include "mongo/transport/service_entry_point.h" -#include "mongo/transport/session.h" -#include "mongo/util/net/message.h" - -namespace mongo { - -namespace transport { - -class TransportLayer; - -} // namespace transport - -class ServiceEntryPointMock : public ServiceEntryPoint { - MONGO_DISALLOW_COPYING(ServiceEntryPointMock); - -public: - ServiceEntryPointMock(transport::TransportLayer* tl); - - virtual ~ServiceEntryPointMock(); - - /** - * This method will spawn a thread that will do the following: - * - * - call tl->sourceMessage() - * - call tl->wait() - * - call tl->sinkMessage() with { ok : 1 } - * - call tl->wait() - * - * ...repeat until wait() returns an error. - */ - void startSession(transport::SessionHandle session) override; - - void endAllSessions(transport::Session::TagMask tags) override; - - bool shutdown(Milliseconds timeout) override { - return true; - } - - Stats sessionStats() const override; - - size_t numOpenSessions() const override; - - DbResponse handleRequest(OperationContext* opCtx, const Message& request) override; - -private: - void run(transport::SessionHandle session); - - transport::TransportLayer* _tl; - - stdx::mutex _shutdownLock; - bool _inShutdown; - - std::vector<stdx::thread> _threads; -}; - -} // namespace mongo diff --git a/src/mongo/transport/service_entry_point_mock_test.cpp b/src/mongo/transport/service_entry_point_mock_test.cpp deleted file mode 100644 index 66806111980..00000000000 --- a/src/mongo/transport/service_entry_point_mock_test.cpp +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Copyright (C) 2016 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include "mongo/stdx/memory.h" -#include "mongo/transport/service_entry_point.h" -#include "mongo/transport/service_entry_point_mock.h" -#include "mongo/transport/service_entry_point_test_suite.h" -#include "mongo/transport/session.h" -#include "mongo/transport/ticket.h" -#include "mongo/unittest/unittest.h" - -namespace mongo { - -namespace { - -std::unique_ptr<ServiceEntryPoint> mockSEPFactory(transport::TransportLayer* tl) { - return stdx::make_unique<ServiceEntryPointMock>(tl); -} - -} // namespace - -TEST_F(ServiceEntryPointTestSuite, NoLifeCycleTest) { - setServiceEntryPoint(&mockSEPFactory); - noLifeCycleTest(); -} - -TEST_F(ServiceEntryPointTestSuite, HalfLifeCycleTest) { - setServiceEntryPoint(&mockSEPFactory); - halfLifeCycleTest(); -} - -TEST_F(ServiceEntryPointTestSuite, FullLifeCycleTest) { - setServiceEntryPoint(&mockSEPFactory); - fullLifeCycleTest(); -} - -TEST_F(ServiceEntryPointTestSuite, InterruptingSessionTest) { - setServiceEntryPoint(&mockSEPFactory); - interruptingSessionTest(); -} - -TEST_F(ServiceEntryPointTestSuite, BurstStressTest) { - setServiceEntryPoint(&mockSEPFactory); - burstStressTest(); -} - -TEST_F(ServiceEntryPointTestSuite, LongSessionStressTest) { - setServiceEntryPoint(&mockSEPFactory); - longSessionStressTest(); -} - -} // namespace mongo diff --git a/src/mongo/transport/service_entry_point_test_suite.cpp b/src/mongo/transport/service_entry_point_test_suite.cpp deleted file mode 100644 index 027d7baa2b1..00000000000 --- a/src/mongo/transport/service_entry_point_test_suite.cpp +++ /dev/null @@ -1,456 +0,0 @@ -/** - * Copyright (C) 2016 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault - -#include "mongo/platform/basic.h" - -#include "mongo/transport/service_entry_point_test_suite.h" - -#include <boost/optional.hpp> - -#include "mongo/bson/bsonmisc.h" -#include "mongo/bson/bsonobjbuilder.h" -#include "mongo/platform/atomic_word.h" -#include "mongo/stdx/condition_variable.h" -#include "mongo/stdx/functional.h" -#include "mongo/stdx/future.h" -#include "mongo/stdx/memory.h" -#include "mongo/stdx/mutex.h" -#include "mongo/stdx/unordered_map.h" -#include "mongo/stdx/unordered_set.h" -#include "mongo/transport/service_entry_point.h" -#include "mongo/transport/session.h" -#include "mongo/transport/ticket.h" -#include "mongo/transport/ticket_impl.h" -#include "mongo/transport/transport_layer.h" -#include "mongo/unittest/unittest.h" -#include "mongo/util/net/message.h" -#include "mongo/util/net/ssl_types.h" - -namespace mongo { - -using namespace transport; - -using TicketCallback = TransportLayer::TicketCallback; -using SEPTestSession = ServiceEntryPointTestSuite::SEPTestSession; - -namespace { - -// Helper function to populate a message with { ping : 1 } command -void setPingCommand(Message* m) { - BufBuilder b{}; - - // Leave room for the message header. - b.skip(mongo::MsgData::MsgDataHeaderSize); - - b.appendStr("admin"); - b.appendStr("ping"); - - auto commandObj = BSON("ping" << 1); - commandObj.appendSelfToBufBuilder(b); - - auto metadata = BSONObj(); - metadata.appendSelfToBufBuilder(b); - - // Set Message header fields. - MsgData::View msg = b.buf(); - msg.setLen(b.len()); - msg.setOperation(dbCommand); - - m->reset(); - - // Transfer buffer ownership to the Message. - m->setData(b.release()); -} - -// Some default method implementations -const auto kDefaultEnd = [](const SessionHandle& session) { return; }; -const auto kDefaultDestroyHook = [](SEPTestSession& session) { return; }; -const auto kDefaultAsyncWait = [](Ticket, TicketCallback cb) { cb(Status::OK()); }; -const auto kNoopFunction = [] { return; }; - -// "End connection" error status -const auto kEndConnectionStatus = Status(ErrorCodes::HostUnreachable, "connection closed"); - -} // namespace - -ServiceEntryPointTestSuite::MockTLHarness::MockTLHarness() - : _sourceMessage([this](const transport::SessionHandle& h, Message * m, Date_t d) { - return _defaultSource(h, m, d); - }), - _sinkMessage([this](const transport::SessionHandle& h, const Message& m, Date_t d) { - return _defaultSink(h, m, d); - }), - _wait([this](transport::Ticket t) { return _defaultWait(std::move(t)); }), - _asyncWait(kDefaultAsyncWait), - _end(kDefaultEnd) {} - -Ticket ServiceEntryPointTestSuite::MockTLHarness::sourceMessage(const SessionHandle& session, - Message* message, - Date_t expiration) { - return _sourceMessage(session, message, expiration); -} - -Ticket ServiceEntryPointTestSuite::MockTLHarness::sinkMessage(const SessionHandle& session, - const Message& message, - Date_t expiration) { - return _sinkMessage(session, message, expiration); -} - -Status ServiceEntryPointTestSuite::MockTLHarness::wait(Ticket&& ticket) { - return _wait(std::move(ticket)); -} - -void ServiceEntryPointTestSuite::MockTLHarness::asyncWait(Ticket&& ticket, - TicketCallback callback) { - return _asyncWait(std::move(ticket), std::move(callback)); -} - -void ServiceEntryPointTestSuite::MockTLHarness::end(const SessionHandle& session) { - return _end(session); -} - -Status ServiceEntryPointTestSuite::MockTLHarness::setup() { - return Status::OK(); -} - -Status ServiceEntryPointTestSuite::MockTLHarness::start() { - return _start(); -} - -void ServiceEntryPointTestSuite::MockTLHarness::shutdown() { - return _shutdown(); -} - -Status ServiceEntryPointTestSuite::MockTLHarness::_defaultWait(transport::Ticket ticket) { - auto mockTicket = getMockTicket(ticket); - if (mockTicket->message()) { - setPingCommand(*(mockTicket->message())); - } - return Status::OK(); -} - -Status ServiceEntryPointTestSuite::MockTLHarness::_waitError(transport::Ticket ticket) { - return kEndConnectionStatus; -} - -Status ServiceEntryPointTestSuite::MockTLHarness::_waitOnceThenError(transport::Ticket ticket) { - _wait = [this](transport::Ticket t) { return _waitError(std::move(t)); }; - return _defaultWait(std::move(ticket)); -} - -Ticket ServiceEntryPointTestSuite::MockTLHarness::_defaultSource(const SessionHandle& s, - Message* m, - Date_t d) { - return Ticket(this, stdx::make_unique<transport::MockTicket>(s, m, d)); -} - -Ticket ServiceEntryPointTestSuite::MockTLHarness::_defaultSink(const SessionHandle& s, - const Message&, - Date_t d) { - return Ticket(this, stdx::make_unique<transport::MockTicket>(s, d)); -} - -Ticket ServiceEntryPointTestSuite::MockTLHarness::_sinkThenErrorOnWait(const SessionHandle& s, - const Message& m, - Date_t d) { - _wait = [=](transport::Ticket t) { return _waitOnceThenError(std::move(t)); }; - return _defaultSink(s, m, d); -} - -void ServiceEntryPointTestSuite::MockTLHarness::_resetHooks() { - _sourceMessage = [this](const transport::SessionHandle& h, Message* m, Date_t d) { - return _defaultSource(h, m, d); - }; - _sinkMessage = [this](const transport::SessionHandle& h, const Message& m, Date_t d) { - return _defaultSink(h, m, d); - }; - _wait = [this](transport::Ticket t) { return _defaultWait(std::move(t)); }; - _asyncWait = kDefaultAsyncWait; - _end = kDefaultEnd; - _destroy_hook = kDefaultDestroyHook; -} - -transport::MockTicket* ServiceEntryPointTestSuite::MockTLHarness::getMockTicket( - const transport::Ticket& ticket) { - return dynamic_cast<transport::MockTicket*>(getTicketImpl(ticket)); -} - -void ServiceEntryPointTestSuite::MockTLHarness::_destroy(SEPTestSession& session) { - return _destroy_hook(session); -} - -void ServiceEntryPointTestSuite::setUp() { - _tl = stdx::make_unique<MockTLHarness>(); -} - -void ServiceEntryPointTestSuite::setServiceEntryPoint(ServiceEntryPointFactory factory) { - _sep = factory(_tl.get()); -} - -// Start a Session and error on get-Message -void ServiceEntryPointTestSuite::noLifeCycleTest() { - stdx::promise<void> testComplete; - auto testFuture = testComplete.get_future(); - - _tl->_resetHooks(); - - // Step 1: SEP gets a ticket to source a Message - // Step 2: SEP calls wait() on the ticket and receives an error - _tl->_wait = [tlp = _tl.get()](transport::Ticket t) { - return tlp->_waitError(std::move(t)); - }; - - // Step 3: SEP destroys the session, which calls end() - _tl->_destroy_hook = [&testComplete](SEPTestSession&) { testComplete.set_value(); }; - - // Kick off the SEP - auto s = SEPTestSession::create(_tl.get()); - _sep->startSession(std::move(s)); - - testFuture.wait(); -} - -// Partial cycle: get-Message, handle-Message, error on send-Message -void ServiceEntryPointTestSuite::halfLifeCycleTest() { - stdx::promise<void> testComplete; - auto testFuture = testComplete.get_future(); - - _tl->_resetHooks(); - - // Step 1: SEP gets a ticket to source a Message - // Step 2: SEP calls wait() on the ticket and receives a Message - // Step 3: SEP gets a ticket to sink a Message - _tl->_sinkMessage = [this](const SessionHandle& session, const Message& m, Date_t expiration) { - - // Step 4: SEP calls wait() on the ticket and receives an error - _tl->_wait = [tlp = _tl.get()](transport::Ticket t) { - return tlp->_waitError(std::move(t)); - }; - - return _tl->_defaultSink(session, m, expiration); - }; - - // Step 5: SEP destroys the session, which calls _destroy() - _tl->_destroy_hook = [&testComplete](SEPTestSession&) { testComplete.set_value(); }; - - // Kick off the SEP - auto s = SEPTestSession::create(_tl.get()); - _sep->startSession(std::move(s)); - - testFuture.wait(); -} - -// Perform a full get-Message, handle-Message, send-Message cycle -void ServiceEntryPointTestSuite::fullLifeCycleTest() { - stdx::promise<void> testComplete; - auto testFuture = testComplete.get_future(); - - _tl->_resetHooks(); - - // Step 1: SEP gets a ticket to source a Message - // Step 2: SEP calls wait() on the ticket and receives a Message - _tl->_sinkMessage = [tlp = _tl.get()](auto&& a1, auto&& a2, auto&& a3) { - return tlp->_sinkThenErrorOnWait(a1, a2, a3); - }; - - // Step 3: SEP gets a ticket to sink a Message - // Step 4: SEP calls wait() on the ticket and receives Status::OK() - // Step 5: SEP gets a ticket to source a Message - // Step 6: SEP calls wait() on the ticket and receives and error - // Step 7: SEP destroys the session, which calls _destroy() - _tl->_destroy_hook = [&testComplete](SEPTestSession& session) { testComplete.set_value(); }; - - // Kick off the SEP - auto s = SEPTestSession::create(_tl.get()); - _sep->startSession(std::move(s)); - - testFuture.wait(); -} - -void ServiceEntryPointTestSuite::interruptingSessionTest() { - auto sA = SEPTestSession::create(_tl.get()); - auto sB = SEPTestSession::create(_tl.get()); - auto idA = sA->id(); - auto idB = sB->id(); - int waitCountB = 0; - - stdx::promise<void> startB; - auto startBFuture = startB.get_future(); - - stdx::promise<void> resumeA; - auto resumeAFuture = resumeA.get_future(); - - stdx::promise<void> testComplete; - auto testFuture = testComplete.get_future(); - - _tl->_resetHooks(); - - // Start Session A - // Step 1: SEP calls sourceMessage() for A - // Step 2: SEP calls wait() for A and we block... - // Start Session B - _tl->_wait = [this, idA, &startB, &resumeAFuture, &waitCountB](Ticket t) -> Status { - // If we're handling B, just do a default wait - if (t.sessionId() != idA) { - if (waitCountB < 2) { - ++waitCountB; - return _tl->_defaultWait(std::move(t)); - } else { - // If we've done a full round trip, time to end session B - return kEndConnectionStatus; - } - } - - // Otherwise, we need to start B and block A - startB.set_value(); - resumeAFuture.wait(); - - _tl->_wait = [tlp = _tl.get()](transport::Ticket t) { - return tlp->_waitOnceThenError(std::move(t)); - }; - - return Status::OK(); - }; - - // Step 3: SEP calls sourceMessage() for B, gets tB - // Step 4: SEP calls wait() for tB, gets { ping : 1 } - // Step 5: SEP calls sinkMessage() for B, gets tB2 - // Step 6: SEP calls wait() for tB2, gets Status::OK() - // Step 7: SEP calls sourceMessage() for B, gets tB3 - // Step 8: SEP calls wait() for tB3, gets an error - // Step 9: SEP calls end(B) - _tl->_destroy_hook = [idA, idB, &resumeA, &testComplete](SEPTestSession& session) { - // When end(B) is called, time to resume session A - if (session.id() == idB) { - // Resume session A - resumeA.set_value(); - } else { - // Else our test is over when end(A) is called - invariant(session.id() == idA); - testComplete.set_value(); - } - }; - - // Resume Session A - // Step 10: SEP calls sinkMessage() for A, gets tA - // Step 11: SEP calls wait() for tA, gets Status::OK() - // Step 12: SEP calls sourceMessage() for A, get tA2 - // Step 13: SEP calls wait() for tA2, receives an error - // Step 14: SEP calls end(A) - - // Kick off the test - _sep->startSession(std::move(sA)); - - startBFuture.wait(); - _sep->startSession(std::move(sB)); - - testFuture.wait(); -} - -void ServiceEntryPointTestSuite::burstStressTest(int numSessions, - int numCycles, - Milliseconds delay) { - AtomicWord<int> ended{0}; - stdx::promise<void> allSessionsComplete; - - auto allCompleteFuture = allSessionsComplete.get_future(); - - stdx::mutex cyclesLock; - stdx::unordered_map<Session::Id, int> completedCycles; - - _tl->_resetHooks(); - - // Same wait() callback for all sessions. - _tl->_wait = [this, &completedCycles, &cyclesLock, numCycles, &delay](Ticket ticket) -> Status { - auto id = ticket.sessionId(); - int cycleCount; - - { - stdx::lock_guard<stdx::mutex> lock(cyclesLock); - auto item = completedCycles.find(id); - invariant(item != completedCycles.end()); - cycleCount = item->second; - } - - auto mockTicket = _tl->getMockTicket(ticket); - // If we are sourcing: - if (mockTicket->message()) { - // If we've completed enough cycles, done. - if (cycleCount == numCycles) { - return kEndConnectionStatus; - } - - // Otherwise, source another { ping : 1 } - invariant(mockTicket->message()); - setPingCommand(*(mockTicket->message())); - - // Wait a bit before returning - sleepmillis(delay.count()); - - return Status::OK(); - } - - // We are sinking, increment numCycles and return OK. - { - stdx::lock_guard<stdx::mutex> lock(cyclesLock); - auto item = completedCycles.find(id); - invariant(item != completedCycles.end()); - ++(item->second); - } - - return Status::OK(); - }; - - // When we end the last session, end the test. - _tl->_destroy_hook = [&allSessionsComplete, numSessions, &ended](SEPTestSession& session) { - if (ended.fetchAndAdd(1) == (numSessions - 1)) { - allSessionsComplete.set_value(); - } - }; - - for (int i = 0; i < numSessions; i++) { - auto s = SEPTestSession::create(_tl.get()); - { - // This operation may cause a re-hash. - stdx::lock_guard<stdx::mutex> lock(cyclesLock); - completedCycles.emplace(s->id(), 0); - } - _sep->startSession(std::move(s)); - } - - // Block and wait for all sessions to finish. - allCompleteFuture.wait(); -} - -void ServiceEntryPointTestSuite::longSessionStressTest() { - return burstStressTest(1000, 100, Milliseconds(100)); -} - -} // namespace mongo diff --git a/src/mongo/transport/service_entry_point_test_suite.h b/src/mongo/transport/service_entry_point_test_suite.h deleted file mode 100644 index 96461584b83..00000000000 --- a/src/mongo/transport/service_entry_point_test_suite.h +++ /dev/null @@ -1,175 +0,0 @@ -/** - * Copyright (C) 2016 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault - -#include <memory> - -#include "mongo/transport/mock_session.h" -#include "mongo/transport/mock_ticket.h" -#include "mongo/transport/transport_layer.h" -#include "mongo/unittest/unittest.h" -#include "mongo/util/net/message.h" -#include "mongo/util/time_support.h" - -namespace mongo { - -class ServiceEntryPoint; -struct SSLPeerInfo; - -/** - * Test class. Uses a mock TransportLayer to test that the ServiceEntryPoint - * calls the expected methods on the TransportLayer in the expected order, - * and with the expected parameters. - * - * Usage: - * - * TEST_F(ServiceEntryPointTestSuite, ServiceEntryPointImplTest) { - * // Set up our ServiceEntryPoint - * auto sepFactory = [](TransportLayer* tl){ - * return stdx::make_unique<ServiceEntryPointImpl>(tl); - * }; - * - * setServiceEntryPoint(sepFactory); - * - * // Run some tests - * fullLifeCycleTest(); - * } - */ -class ServiceEntryPointTestSuite : public mongo::unittest::Test { -public: - // Need a function that takes a TransportLayer* and returns a new - // ServiceEntryPoint - using ServiceEntryPointFactory = - stdx::function<std::unique_ptr<ServiceEntryPoint>(transport::TransportLayer*)>; - - void setUp() override; - - void setServiceEntryPoint(ServiceEntryPointFactory factory); - - // Lifecycle Tests - void noLifeCycleTest(); - void halfLifeCycleTest(); - void fullLifeCycleTest(); - - // Concurrent Session Tests - void interruptingSessionTest(); - - // Stress Tests - void burstStressTest(int numSessions = 1000, - int numCycles = 1, - Milliseconds delay = Milliseconds(0)); - void longSessionStressTest(); - - class SEPTestSession; - using SEPTestSessionHandle = std::shared_ptr<SEPTestSession>; - - /** - * This class mocks the TransportLayer and allows us to insert hooks beneath - * its methods. - */ - class MockTLHarness : public transport::TransportLayer { - public: - friend class SEPTestSession; - - MockTLHarness(); - - transport::Ticket sourceMessage( - const transport::SessionHandle& session, - Message* message, - Date_t expiration = transport::Ticket::kNoExpirationDate) override; - transport::Ticket sinkMessage( - const transport::SessionHandle& session, - const Message& message, - Date_t expiration = transport::Ticket::kNoExpirationDate) override; - Status wait(transport::Ticket&& ticket) override; - void asyncWait(transport::Ticket&& ticket, TicketCallback callback) override; - - void end(const transport::SessionHandle& session) override; - Status setup() override; - Status start() override; - void shutdown() override; - - transport::MockTicket* getMockTicket(const transport::Ticket& ticket); - - // Mocked method hooks - stdx::function<transport::Ticket(const transport::SessionHandle&, Message*, Date_t)> - _sourceMessage; - stdx::function<transport::Ticket(const transport::SessionHandle&, const Message&, Date_t)> - _sinkMessage; - stdx::function<Status(transport::Ticket)> _wait; - stdx::function<void(transport::Ticket, TicketCallback)> _asyncWait; - stdx::function<void(const transport::SessionHandle&)> _end; - stdx::function<void(SEPTestSession& session)> _destroy_hook; - stdx::function<Status(void)> _start = [] { return Status::OK(); }; - stdx::function<void(void)> _shutdown = [] {}; - - // Pre-set hook methods - transport::Ticket _defaultSource(const transport::SessionHandle& s, Message* m, Date_t d); - transport::Ticket _defaultSink(const transport::SessionHandle& s, const Message&, Date_t d); - transport::Ticket _sinkThenErrorOnWait(const transport::SessionHandle& s, - const Message& m, - Date_t d); - - Status _defaultWait(transport::Ticket ticket); - Status _waitError(transport::Ticket ticket); - Status _waitOnceThenError(transport::Ticket ticket); - - // Reset all hooks to their defaults - void _resetHooks(); - - private: - void _destroy(SEPTestSession& session); - }; - - /** - * A light wrapper around the mock session class, to handle our destroy logic. - */ - class SEPTestSession : public transport::MockSession { - MockTLHarness* _mockTL; - - public: - static std::shared_ptr<SEPTestSession> create(MockTLHarness* tl) { - std::shared_ptr<SEPTestSession> handle(new SEPTestSession(tl)); - return handle; - } - - ~SEPTestSession() { - _mockTL->_destroy(*this); - } - - private: - explicit SEPTestSession(MockTLHarness* tl) : transport::MockSession(tl), _mockTL(tl) {} - }; - -private: - std::unique_ptr<MockTLHarness> _tl; - std::unique_ptr<ServiceEntryPoint> _sep; -}; - -} // namespace mongo diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp index 18c5156657e..abeca256d51 100644 --- a/src/mongo/transport/service_state_machine.cpp +++ b/src/mongo/transport/service_state_machine.cpp @@ -41,7 +41,6 @@ #include "mongo/transport/service_entry_point.h" #include "mongo/transport/service_executor_task_names.h" #include "mongo/transport/session.h" -#include "mongo/transport/ticket.h" #include "mongo/transport/transport_layer.h" #include "mongo/util/assert_util.h" #include "mongo/util/concurrency/idle_thread_block.h" @@ -239,34 +238,43 @@ const transport::SessionHandle& ServiceStateMachine::_session() const { void ServiceStateMachine::_sourceMessage(ThreadGuard guard) { invariant(_inMessage.empty()); - auto ticket = _session()->sourceMessage(&_inMessage); - + invariant(_state.load() == State::Source); _state.store(State::SourceWait); guard.release(); if (_transportMode == transport::Mode::kSynchronous) { - _sourceCallback([this](auto ticket) { + auto msg = [&] { MONGO_IDLE_THREAD_BLOCK; - return _session()->getTransportLayer()->wait(std::move(ticket)); - }(std::move(ticket))); + return _session()->sourceMessage(); + }(); + + if (msg.isOK()) { + _inMessage = std::move(msg.getValue()); + invariant(!_inMessage.empty()); + } + _sourceCallback(msg.getStatus()); } else if (_transportMode == transport::Mode::kAsynchronous) { - _session()->getTransportLayer()->asyncWait( - std::move(ticket), [this](Status status) { _sourceCallback(status); }); + _session()->asyncSourceMessage([this](StatusWith<Message> msg) { + if (msg.isOK()) { + _inMessage = std::move(msg.getValue()); + invariant(!_inMessage.empty()); + } + _sourceCallback(msg.getStatus()); + }); } } void ServiceStateMachine::_sinkMessage(ThreadGuard guard, Message toSink) { // Sink our response to the client - auto ticket = _session()->sinkMessage(toSink); - + invariant(_state.load() == State::Process); _state.store(State::SinkWait); guard.release(); if (_transportMode == transport::Mode::kSynchronous) { - _sinkCallback(_session()->getTransportLayer()->wait(std::move(ticket))); + _sinkCallback(_session()->sinkMessage(std::move(toSink))); } else if (_transportMode == transport::Mode::kAsynchronous) { - _session()->getTransportLayer()->asyncWait( - std::move(ticket), [this](Status status) { _sinkCallback(status); }); + _session()->asyncSinkMessage(std::move(toSink), + [this](Status status) { _sinkCallback(std::move(status)); }); } } @@ -487,7 +495,7 @@ void ServiceStateMachine::terminate() { if (state() == State::Ended) return; - _session()->getTransportLayer()->end(_session()); + _session()->end(); } void ServiceStateMachine::terminateIfTagsDontMatch(transport::Session::TagMask tags) { diff --git a/src/mongo/transport/service_state_machine_test.cpp b/src/mongo/transport/service_state_machine_test.cpp index d74fcbcdfc7..0cc4aa0634a 100644 --- a/src/mongo/transport/service_state_machine_test.cpp +++ b/src/mongo/transport/service_state_machine_test.cpp @@ -38,7 +38,6 @@ #include "mongo/db/service_context_noop.h" #include "mongo/stdx/memory.h" #include "mongo/transport/mock_session.h" -#include "mongo/transport/mock_ticket.h" #include "mongo/transport/service_entry_point.h" #include "mongo/transport/service_executor.h" #include "mongo/transport/service_executor_task_names.h" @@ -115,62 +114,59 @@ private: using namespace transport; class MockTL : public TransportLayerMock { public: - ~MockTL() = default; + class Session : public MockSession { + public: + using MockSession::MockSession; - Ticket sourceMessage(const SessionHandle& session, - Message* message, - Date_t expiration = Ticket::kNoExpirationDate) override { - ASSERT_EQ(_ssm->state(), ServiceStateMachine::State::Source); - _lastTicketSource = true; + StatusWith<Message> sourceMessage() override { + auto tl = checked_cast<MockTL*>(getTransportLayer()); + ASSERT_EQ(tl->_ssm->state(), ServiceStateMachine::State::SourceWait); + tl->_lastTicketSource = true; - _ranSource = true; - log() << "In sourceMessage"; + tl->_ranSource = true; + log() << "In sourceMessage"; - if (_nextShouldFail & Source) { - return TransportLayer::TicketSessionClosedStatus; - } + if (tl->_waitHook) + tl->_waitHook(); - OpMsgBuilder builder; - builder.setBody(BSON("ping" << 1)); - *message = builder.finish(); + if (tl->_nextShouldFail & Source) { + return TransportLayer::TicketSessionClosedStatus; + } - return TransportLayerMock::sourceMessage(session, message, expiration); - } + auto out = MockSession::sourceMessage(); + if (out.isOK()) { + OpMsgBuilder builder; + builder.setBody(BSON("ping" << 1)); + out.getValue() = builder.finish(); + } + return out; + } - Ticket sinkMessage(const SessionHandle& session, - const Message& message, - Date_t expiration = Ticket::kNoExpirationDate) override { - ASSERT_EQ(_ssm->state(), ServiceStateMachine::State::Process); - _lastTicketSource = false; + Status sinkMessage(Message message) override { + auto tl = checked_cast<MockTL*>(getTransportLayer()); + ASSERT_EQ(tl->_ssm->state(), ServiceStateMachine::State::SinkWait); + tl->_lastTicketSource = false; - log() << "In sinkMessage"; - _ranSink = true; + log() << "In sinkMessage"; + tl->_ranSink = true; - if (_nextShouldFail & Sink) { - return TransportLayer::TicketSessionClosedStatus; - } + if (tl->_waitHook) + tl->_waitHook(); - _lastSunk = message; + if (tl->_nextShouldFail & Sink) { + return TransportLayer::TicketSessionClosedStatus; + } - return TransportLayerMock::sinkMessage(session, message, expiration); - } + auto out = MockSession::sinkMessage(message); + if (out.isOK()) + tl->_lastSunk = message; - Status wait(Ticket&& ticket) override { - if (!ticket.valid()) { - return ticket.status(); + return out; } - ASSERT_EQ(_ssm->state(), - _lastTicketSource ? ServiceStateMachine::State::SourceWait - : ServiceStateMachine::State::SinkWait); - - log() << "In wait. ssm state: " << stateToString(_ssm->state()); - if (_waitHook) - _waitHook(); - return TransportLayerMock::wait(std::move(ticket)); - } + }; - void asyncWait(Ticket&& ticket, TicketCallback callback) override { - MONGO_UNREACHABLE; + MockTL() { + createSessionHook = [](TransportLayer* tl) { return std::make_unique<Session>(tl); }; } void setSSM(ServiceStateMachine* ssm) { diff --git a/src/mongo/transport/session.cpp b/src/mongo/transport/session.cpp index ad2eceae039..b6d78673e10 100644 --- a/src/mongo/transport/session.cpp +++ b/src/mongo/transport/session.cpp @@ -45,14 +45,6 @@ AtomicUInt64 sessionIdCounter(0); Session::Session() : _id(sessionIdCounter.addAndFetch(1)), _tags(kPending) {} -Ticket Session::sourceMessage(Message* message, Date_t expiration) { - return getTransportLayer()->sourceMessage(shared_from_this(), message, expiration); -} - -Ticket Session::sinkMessage(const Message& message, Date_t expiration) { - return getTransportLayer()->sinkMessage(shared_from_this(), message, expiration); -} - void Session::setTags(TagMask tagsToSet) { mutateTags([tagsToSet](TagMask originalTags) { return (originalTags | tagsToSet); }); } diff --git a/src/mongo/transport/session.h b/src/mongo/transport/session.h index b2160117807..d25d32b9027 100644 --- a/src/mongo/transport/session.h +++ b/src/mongo/transport/session.h @@ -33,7 +33,6 @@ #include "mongo/base/disallow_copying.h" #include "mongo/platform/atomic_word.h" #include "mongo/transport/session_id.h" -#include "mongo/transport/ticket.h" #include "mongo/util/decorable.h" #include "mongo/util/net/hostandport.h" #include "mongo/util/net/message.h" @@ -75,47 +74,45 @@ public: static constexpr TagMask kExternalClientKeepOpen = 8; static constexpr TagMask kPending = 1 << 31; - /** - * Destroys a session, calling end() for this session in its TransportLayer. - */ virtual ~Session() = default; - /** - * Return the id for this session. - */ Id id() const { return _id; } - /** - * The TransportLayer for this Session. - */ virtual TransportLayer* getTransportLayer() const = 0; /** - * Source (receive) a new Message for this Session. + * Ends this Session. * - * This method will forward to sourceMessage on this Session's transport layer. - */ - virtual Ticket sourceMessage(Message* message, Date_t expiration = Ticket::kNoExpirationDate); - - /** - * Sink (send) a new Message for this Session. This method should be used - * to send replies to a given host. + * Operations on this Session that have already been started via wait() or asyncWait() will + * complete, but may return a failed Status. Future operations on this Session will fail. If + * this TransportLayer implementation is networked, any connections for this Session will be + * closed. * - * This method will forward to sinkMessage on this Session's transport layer. + * This method is idempotent and synchronous. + * + * Destructors of derived classes will close the session automatically if needed. This method + * should only be called explicitly if the session should be closed separately from destruction, + * eg due to some outside event. */ - virtual Ticket sinkMessage(const Message& message, - Date_t expiration = Ticket::kNoExpirationDate); + virtual void end() = 0; /** - * Return the remote host for this session. + * Source (receive) a new Message from the remote host for this Session. */ - virtual const HostAndPort& remote() const = 0; + virtual StatusWith<Message> sourceMessage() = 0; + virtual void asyncSourceMessage(std::function<void(StatusWith<Message>)> cb) = 0; /** - * Return the local host information for this session. + * Sink (send) a Message to the remote host for this Session. + * + * Async version will keep the buffer alive until the operation completes. */ + virtual Status sinkMessage(Message message) = 0; + virtual void asyncSinkMessage(Message message, std::function<void(Status)> cb) = 0; + + virtual const HostAndPort& remote() const = 0; virtual const HostAndPort& local() const = 0; /** @@ -147,15 +144,9 @@ public: */ virtual void mutateTags(const stdx::function<TagMask(TagMask)>& mutateFunc); - /** - * Get this session's tags. - */ virtual TagMask getTags() const; protected: - /** - * Construct a new session. - */ Session(); private: diff --git a/src/mongo/transport/session_asio.h b/src/mongo/transport/session_asio.h index 352dc6cf102..5cdd8105870 100644 --- a/src/mongo/transport/session_asio.h +++ b/src/mongo/transport/session_asio.h @@ -32,6 +32,7 @@ #include "mongo/base/system_error.h" #include "mongo/config.h" +#include "mongo/db/stats/counters.h" #include "mongo/transport/asio_utils.h" #include "mongo/transport/transport_layer_asio.h" #include "mongo/util/net/sock.h" @@ -50,7 +51,7 @@ namespace transport { using GenericSocket = asio::generic::stream_protocol::socket; -class TransportLayerASIO::ASIOSession : public Session { +class TransportLayerASIO::ASIOSession final : public Session { MONGO_DISALLOW_COPYING(ASIOSession); public: @@ -58,9 +59,6 @@ public: : _socket(std::move(socket)), _tl(tl) { std::error_code ec; - _socket.non_blocking(_tl->_listenerOptions.transportMode == Mode::kAsynchronous, ec); - fassert(40490, ec.value() == 0); - auto family = endpointToSockAddr(_socket.local_endpoint()).getType(); if (family == AF_INET || family == AF_INET6) { _socket.set_option(asio::ip::tcp::no_delay(true)); @@ -76,7 +74,7 @@ public: } ~ASIOSession() { - shutdown(); + end(); } TransportLayer* getTransportLayer() const override { @@ -91,16 +89,7 @@ public: return _local; } - GenericSocket& getSocket() { -#ifdef MONGO_CONFIG_SSL - if (_sslSocket) { - return static_cast<GenericSocket&>(_sslSocket->lowest_layer()); - } -#endif - return _socket; - } - - void shutdown() { + void end() override { if (getSocket().is_open()) { std::error_code ec; getSocket().cancel(); @@ -111,6 +100,76 @@ public: } } + StatusWith<Message> sourceMessage() override { + ensureSync(); + auto out = StatusWith<Message>(ErrorCodes::InternalError, "uninitialized..."); + bool called = false; + sourceMessageImpl(true, [&](StatusWith<Message> in) { + out = std::move(in); + called = true; + }); + invariant(called); + return out; + } + + void asyncSourceMessage(std::function<void(StatusWith<Message>)> cb) override { + ensureAsync(); + sourceMessageImpl(false, std::move(cb)); + } + + Status sinkMessage(Message message) override { + ensureSync(); + + std::error_code ec; + size_t size; + bool called = false; + + write(true, + asio::buffer(message.buf(), message.size()), + [&](const std::error_code& ec_, size_t size_) { + ec = ec_; + size = size_; + called = true; + }); + invariant(called); + + if (ec) + return errorCodeToStatus(ec); + + invariant(size == size_t(message.size())); + networkCounter.hitPhysicalOut(message.size()); + return Status::OK(); + } + + void asyncSinkMessage(Message message, std::function<void(Status)> cb) override { + ensureAsync(); + + write(false, asio::buffer(message.buf(), message.size()), [ + message, // keep the buffer alive. + cb = std::move(cb), + this + ](const std::error_code& ec, size_t size) { + if (ec) { + cb(errorCodeToStatus(ec)); + return; + } + invariant(size == size_t(message.size())); + networkCounter.hitPhysicalOut(message.size()); + cb(Status::OK()); + }); + }; + + +private: + GenericSocket& getSocket() { +#ifdef MONGO_CONFIG_SSL + if (_sslSocket) { + return static_cast<GenericSocket&>(_sslSocket->lowest_layer()); + } +#endif + return _socket; + } + bool isOpen() const { #ifdef MONGO_CONFIG_SSL return _sslSocket ? _sslSocket->lowest_layer().is_open() : _socket.is_open(); @@ -119,6 +178,55 @@ public: #endif } + template <typename Callback> + void sourceMessageImpl(bool sync, Callback&& cb) { + static constexpr auto kHeaderSize = sizeof(MSGHEADER::Value); + + auto headerBuffer = SharedBuffer::allocate(kHeaderSize); + auto ptr = headerBuffer.get(); + read(sync, + asio::buffer(ptr, kHeaderSize), + [ cb = std::forward<Callback>(cb), headerBuffer = std::move(headerBuffer), this ]( + const std::error_code& ec, size_t size) mutable { + + if (ec) + return cb(errorCodeToStatus(ec)); + invariant(size == kHeaderSize); + + const auto msgLen = size_t(MSGHEADER::View(headerBuffer.get()).getMessageLength()); + if (msgLen < kHeaderSize || msgLen > MaxMessageSizeBytes) { + StringBuilder sb; + sb << "recv(): message msgLen " << msgLen << " is invalid. " + << "Min " << kHeaderSize << " Max: " << MaxMessageSizeBytes; + const auto str = sb.str(); + LOG(0) << str; + + return cb(Status(ErrorCodes::ProtocolError, str)); + } + + if (msgLen == size) { + // This probably isn't a real case since all (current) messages have bodies. + networkCounter.hitPhysicalIn(msgLen); + return cb(Message(std::move(headerBuffer))); + } + + auto buffer = SharedBuffer::allocate(msgLen); + memcpy(buffer.get(), headerBuffer.get(), kHeaderSize); + + MsgData::View msgView(buffer.get()); + read(true, + asio::buffer(msgView.data(), msgView.dataLen()), + [ cb = std::move(cb), buffer = std::move(buffer), msgLen, this ]( + const std::error_code& ec, size_t size) mutable { + if (ec) + return cb(errorCodeToStatus(ec)); + networkCounter.hitPhysicalIn(msgLen); + return cb(Message(std::move(buffer))); + }); + }); + } + + template <typename MutableBufferSequence, typename CompleteHandler> void read(bool sync, const MutableBufferSequence& buffers, CompleteHandler&& handler) { #ifdef MONGO_CONFIG_SSL @@ -127,7 +235,8 @@ public: sync, *_sslSocket, buffers, std::forward<CompleteHandler>(handler)); } else if (!_ranHandshake) { invariant(asio::buffer_size(buffers) >= sizeof(MSGHEADER::Value)); - auto postHandshakeCb = [this, sync, buffers, handler](Status status, bool needsRead) { + auto postHandshakeCb = [this, sync, buffers, handler](Status status, + bool needsRead) mutable { if (status.isOK()) { if (needsRead) { read(sync, buffers, handler); @@ -142,7 +251,7 @@ public: auto handshakeRecvCb = [ this, postHandshakeCb = std::move(postHandshakeCb), sync, buffers ]( - const std::error_code& ec, size_t size) { + const std::error_code& ec, size_t size) mutable { _ranHandshake = true; if (ec) { postHandshakeCb(errorCodeToStatus(ec), size); @@ -152,30 +261,41 @@ public: maybeHandshakeSSL(sync, buffers, std::move(postHandshakeCb)); }; - opportunisticRead(sync, _socket, buffers, std::move(handshakeRecvCb)); - } else { - -#endif - opportunisticRead(sync, _socket, buffers, std::forward<CompleteHandler>(handler)); -#ifdef MONGO_CONFIG_SSL + return opportunisticRead(sync, _socket, buffers, std::move(handshakeRecvCb)); } #endif + return opportunisticRead(sync, _socket, buffers, std::forward<CompleteHandler>(handler)); } template <typename ConstBufferSequence, typename CompleteHandler> void write(bool sync, const ConstBufferSequence& buffers, CompleteHandler&& handler) { #ifdef MONGO_CONFIG_SSL if (_sslSocket) { - opportunisticWrite(sync, *_sslSocket, buffers, std::forward<CompleteHandler>(handler)); - } else { -#endif - opportunisticWrite(sync, _socket, buffers, std::forward<CompleteHandler>(handler)); -#ifdef MONGO_CONFIG_SSL + return opportunisticWrite( + sync, *_sslSocket, buffers, std::forward<CompleteHandler>(handler)); } #endif + return opportunisticWrite(sync, _socket, buffers, std::forward<CompleteHandler>(handler)); + } + + void ensureSync() { + if (_blockingMode == Sync) + return; + asio::error_code ec; + getSocket().non_blocking(false, ec); + fassertStatusOK(40490, errorCodeToStatus(ec)); + _blockingMode = Sync; + } + + void ensureAsync() { + if (_blockingMode == Async) + return; + asio::error_code ec; + getSocket().non_blocking(true, ec); + fassertStatusOK(50706, errorCodeToStatus(ec)); + _blockingMode = Async; } -private: template <typename Stream, typename MutableBufferSequence, typename CompleteHandler> void opportunisticRead(bool sync, Stream& stream, @@ -243,7 +363,7 @@ private: _sslSocket.emplace(std::move(_socket), *_tl->_sslContext); auto handshakeCompleteCb = [ this, onComplete = std::move(onComplete) ]( - const std::error_code& ec, size_t size) { + const std::error_code& ec, size_t size) mutable { auto& sslPeerInfo = SSLPeerInfo::forSession(shared_from_this()); if (!ec && sslPeerInfo.subjectName.empty()) { @@ -297,6 +417,14 @@ private: } #endif + enum BlockingMode { + Unknown, + Sync, + Async, + }; + + BlockingMode _blockingMode = Unknown; + HostAndPort _remote; HostAndPort _local; diff --git a/src/mongo/transport/ticket.cpp b/src/mongo/transport/ticket.cpp deleted file mode 100644 index d1003957b80..00000000000 --- a/src/mongo/transport/ticket.cpp +++ /dev/null @@ -1,93 +0,0 @@ -/** - * Copyright (C) 2016 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include "mongo/base/status.h" -#include "mongo/transport/ticket.h" -#include "mongo/transport/ticket_impl.h" -#include "mongo/transport/transport_layer.h" - -namespace mongo { -namespace transport { - -const Date_t Ticket::kNoExpirationDate{Date_t::max()}; - -Status Ticket::ExpiredStatus = Status(ErrorCodes::ExceededTimeLimit, "Ticket has expired."); - -Status Ticket::SessionClosedStatus = - Status(ErrorCodes::TransportSessionClosed, "Ticket's Session is closed."); - -Ticket::Ticket(TransportLayer* tl, std::unique_ptr<TicketImpl> ticket) - : _tl(tl), _ticket(std::move(ticket)) {} - -Ticket::Ticket(Status status) : _status(status) {} - -Ticket::~Ticket() = default; - -Ticket::Ticket(Ticket&&) = default; -Ticket& Ticket::operator=(Ticket&&) = default; - -Status Ticket::wait()&& { - // If the ticket is invalid then _tl is a nullptr and we should return early. - if (!valid()) - return status(); - - invariant(_tl); - return _tl->wait(std::move(*this)); -} - -void Ticket::asyncWait(TicketCallback cb)&& { - // If the ticket is invalid then _tl is a nullptr and we should return early. - if (!valid()) { - cb(status()); - return; - } - - invariant(_tl); - return _tl->asyncWait(std::move(*this), std::move(cb)); -} - -bool Ticket::valid() { - return _status == Status::OK() && !expired(); -} - -Status Ticket::status() const { - return _status; -} - -bool Ticket::expired() { - bool expired = expiration() <= Date_t::now(); - if (_status == Status::OK() && expired) { - _status = Status(ErrorCodes::ExceededTimeLimit, "Ticket has expired."); - } - return expired; -} - -} // namespace transport -} // namespace mongo diff --git a/src/mongo/transport/ticket.h b/src/mongo/transport/ticket.h deleted file mode 100644 index d4ffae468da..00000000000 --- a/src/mongo/transport/ticket.h +++ /dev/null @@ -1,154 +0,0 @@ -/** - * Copyright (C) 2016 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include <memory> - -#include "mongo/base/disallow_copying.h" -#include "mongo/base/status.h" -#include "mongo/stdx/functional.h" -#include "mongo/transport/ticket_impl.h" -#include "mongo/util/time_support.h" - -namespace mongo { -namespace transport { - -class Session; -class TransportLayer; - -/** - * A Ticket represents some work to be done within the TransportLayer. - * Run Tickets by passing them in a call to either TransportLayer::wait() - * or TransportLayer::asyncWait(). - */ -class Ticket { - MONGO_DISALLOW_COPYING(Ticket); - -public: - using TicketCallback = stdx::function<void(Status)>; - - friend class TransportLayer; - - /** - * Indicates that there is no expiration time by when a ticket needs to complete. - */ - static const Date_t kNoExpirationDate; - - static Status ExpiredStatus; - static Status SessionClosedStatus; - - Ticket(TransportLayer* tl, std::unique_ptr<TicketImpl> ticket); - - /** - * An invalid ticket and a Status for why it's invalid. - */ - Ticket(Status status); - - ~Ticket(); - - /** - * Move constructor and assignment operator. - */ - Ticket(Ticket&&); - Ticket& operator=(Ticket&&); - - /** - * Return this ticket's session id. - */ - SessionId sessionId() const { - return _ticket->sessionId(); - } - - /** - * Return this ticket's expiration date. - */ - Date_t expiration() const { - return _ticket->expiration(); - } - - /** - * Wait for this ticket to be filled. - * - * This is this-rvalue qualified because it consumes the ticket - */ - Status wait() &&; - - /** - * Asynchronously wait for this ticket to be filled. - * - * This is this-rvalue qualified because it consumes the ticket - * - * If the ticket has expired or is not valid when asyncWait is called, cb will be called - * immediately and inline with the error status. - */ - void asyncWait(TicketCallback cb) &&; - - /* - * Return's the Status of the ticket. - */ - Status status() const; - - /* - * Returns true if the ticket has expired, false otherwise. - * - * If the ticket has expired, changes the Status of the ticket to TicketExpired but - * only if the current Status is Status::OK(). - */ - bool expired(); - - /* - * Return true if the ticket is usable, false otherwise. - * - * If the ticket has expired, changes the Status of the ticket to TicketExpired but - * only if the current Status is Status::OK(). - */ - bool valid(); - - /** - * Return a non-owning pointer to the underlying TicketImpl type - */ - TicketImpl* impl() const { - return _ticket.get(); - } - - /** - * Return an owning pointer to the underlying TicketImpl type. This consumes the ticket - */ - std::unique_ptr<TicketImpl> releaseImpl() && { - return std::move(_ticket); - } - -private: - TransportLayer* _tl = nullptr; - Status _status = Status::OK(); - std::unique_ptr<TicketImpl> _ticket; -}; - -} // namespace transport -} // namespace mongo diff --git a/src/mongo/transport/ticket_asio.cpp b/src/mongo/transport/ticket_asio.cpp deleted file mode 100644 index 78102f6b5a8..00000000000 --- a/src/mongo/transport/ticket_asio.cpp +++ /dev/null @@ -1,168 +0,0 @@ -/** - * Copyright (C) 2017 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork - -#include "mongo/platform/basic.h" - -#include "mongo/base/system_error.h" -#include "mongo/db/stats/counters.h" -#include "mongo/transport/asio_utils.h" -#include "mongo/transport/ticket_asio.h" -#include "mongo/util/log.h" - -#include "mongo/transport/session_asio.h" - -namespace mongo { -namespace transport { -namespace { -constexpr auto kHeaderSize = sizeof(MSGHEADER::Value); - -} // namespace - - -std::shared_ptr<TransportLayerASIO::ASIOSession> TransportLayerASIO::ASIOTicket::getSession() { - auto session = _session.lock(); - if (!session || !session->isOpen()) { - finishFill(Ticket::SessionClosedStatus); - return nullptr; - } - return session; -} - -bool TransportLayerASIO::ASIOTicket::isSync() const { - return _fillSync; -} - -TransportLayerASIO::ASIOTicket::ASIOTicket(const ASIOSessionHandle& session, Date_t expiration) - : _session(session), _sessionId(session->id()), _expiration(expiration) {} - -TransportLayerASIO::ASIOSourceTicket::ASIOSourceTicket(const ASIOSessionHandle& session, - Date_t expiration, - Message* msg) - : ASIOTicket(session, expiration), _target(msg) {} - -TransportLayerASIO::ASIOSinkTicket::ASIOSinkTicket(const ASIOSessionHandle& session, - Date_t expiration, - const Message& msg) - : ASIOTicket(session, expiration), _msgToSend(msg) {} - -void TransportLayerASIO::ASIOSourceTicket::_bodyCallback(const std::error_code& ec, size_t size) { - if (ec) { - finishFill(errorCodeToStatus(ec)); - return; - } - - _target->setData(std::move(_buffer)); - networkCounter.hitPhysicalIn(_target->size()); - finishFill(Status::OK()); -} - -void TransportLayerASIO::ASIOSourceTicket::_headerCallback(const std::error_code& ec, size_t size) { - if (ec) { - finishFill(errorCodeToStatus(ec)); - return; - } - - auto session = getSession(); - if (!session) - return; - - MSGHEADER::View headerView(_buffer.get()); - auto msgLen = static_cast<size_t>(headerView.getMessageLength()); - if (msgLen < kHeaderSize || msgLen > MaxMessageSizeBytes) { - StringBuilder sb; - sb << "recv(): message msgLen " << msgLen << " is invalid. " - << "Min " << kHeaderSize << " Max: " << MaxMessageSizeBytes; - const auto str = sb.str(); - LOG(0) << str; - finishFill(Status(ErrorCodes::ProtocolError, str)); - return; - } - - if (msgLen == size) { - finishFill(Status::OK()); - return; - } - - _buffer.realloc(msgLen); - MsgData::View msgView(_buffer.get()); - - session->read(isSync(), - asio::buffer(msgView.data(), msgView.dataLen()), - [this](const std::error_code& ec, size_t size) { _bodyCallback(ec, size); }); -} - -void TransportLayerASIO::ASIOSourceTicket::fillImpl() { - auto session = getSession(); - if (!session) - return; - - const auto initBufSize = kHeaderSize; - _buffer = SharedBuffer::allocate(initBufSize); - - session->read(isSync(), - asio::buffer(_buffer.get(), initBufSize), - [this](const std::error_code& ec, size_t size) { _headerCallback(ec, size); }); -} - -void TransportLayerASIO::ASIOSinkTicket::_sinkCallback(const std::error_code& ec, size_t size) { - networkCounter.hitPhysicalOut(_msgToSend.size()); - finishFill(ec ? errorCodeToStatus(ec) : Status::OK()); -} - -void TransportLayerASIO::ASIOSinkTicket::fillImpl() { - auto session = getSession(); - if (!session) - return; - - session->write(isSync(), - asio::buffer(_msgToSend.buf(), _msgToSend.size()), - [this](const std::error_code& ec, size_t size) { _sinkCallback(ec, size); }); -} - -void TransportLayerASIO::ASIOTicket::finishFill(Status status) { - // We want to make sure that a Ticket can only be filled once; filling a ticket invalidates it. - // So we check that the _fillCallback is set, then move it out of the ticket and into a local - // variable, and then call that. It's illegal to interact with the ticket after calling the - // fillCallback, so we have to move it out of _fillCallback so there are no writes to any - // variables in ASIOTicket after it gets called. - invariant(_fillCallback); - auto fillCallback = std::move(_fillCallback); - fillCallback(status); -} - -void TransportLayerASIO::ASIOTicket::fill(bool sync, TicketCallback&& cb) { - _fillSync = sync; - dassert(!_fillCallback); - _fillCallback = std::move(cb); - fillImpl(); -} - -} // namespace transport -} // namespace mongo diff --git a/src/mongo/transport/ticket_asio.h b/src/mongo/transport/ticket_asio.h deleted file mode 100644 index faa687a943e..00000000000 --- a/src/mongo/transport/ticket_asio.h +++ /dev/null @@ -1,103 +0,0 @@ -/** - * Copyright (C) 2017 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include "mongo/transport/transport_layer_asio.h" - -#include "asio.hpp" - -namespace mongo { -namespace transport { - -class TransportLayerASIO::ASIOTicket : public TicketImpl { - MONGO_DISALLOW_COPYING(ASIOTicket); - -public: - explicit ASIOTicket(const ASIOSessionHandle& session, Date_t expiration); - - SessionId sessionId() const final { - return _sessionId; - } - - Date_t expiration() const final { - return _expiration; - } - - /** - * Run this ticket's work item. - */ - void fill(bool sync, TicketCallback&& cb); - -protected: - void finishFill(Status status); - std::shared_ptr<ASIOSession> getSession(); - bool isSync() const; - - // This must be implemented by the Source/Sink subclasses as the actual implementation - // of filling the ticket. - virtual void fillImpl() = 0; - -private: - std::weak_ptr<ASIOSession> _session; - const SessionId _sessionId; - const Date_t _expiration; - - TicketCallback _fillCallback; - bool _fillSync; -}; - -class TransportLayerASIO::ASIOSourceTicket : public TransportLayerASIO::ASIOTicket { -public: - ASIOSourceTicket(const ASIOSessionHandle& session, Date_t expiration, Message* msg); - -protected: - void fillImpl() final; - -private: - void _headerCallback(const std::error_code& ec, size_t size); - void _bodyCallback(const std::error_code& ec, size_t size); - - SharedBuffer _buffer; - Message* _target; -}; - -class TransportLayerASIO::ASIOSinkTicket : public TransportLayerASIO::ASIOTicket { -public: - ASIOSinkTicket(const ASIOSessionHandle& session, Date_t expiration, const Message& msg); - -protected: - void fillImpl() final; - -private: - void _sinkCallback(const std::error_code& ec, size_t size); - Message _msgToSend; -}; - -} // namespace transport -} // namespace mongo diff --git a/src/mongo/transport/ticket_impl.h b/src/mongo/transport/ticket_impl.h deleted file mode 100644 index c6d325dbc37..00000000000 --- a/src/mongo/transport/ticket_impl.h +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Copyright (C) 2016 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include "mongo/base/disallow_copying.h" -#include "mongo/transport/session_id.h" -#include "mongo/util/net/message.h" -#include "mongo/util/time_support.h" - -namespace mongo { -namespace transport { - -/** - * Interface representing implementations of Ticket. - * - * Ticket implementations are specific to a TransportLayer implementation. - */ -class TicketImpl { - MONGO_DISALLOW_COPYING(TicketImpl); - -public: - virtual ~TicketImpl() = default; - - /** - * Return this ticket's session id. - */ - virtual SessionId sessionId() const = 0; - - /** - * Return this ticket's expiration date. - */ - virtual Date_t expiration() const = 0; - -protected: - TicketImpl() = default; -}; - -} // namespace transport -} // namespace mongo diff --git a/src/mongo/transport/transport_layer.h b/src/mongo/transport/transport_layer.h index 383e3bd0cd5..053aa5f6c15 100644 --- a/src/mongo/transport/transport_layer.h +++ b/src/mongo/transport/transport_layer.h @@ -31,15 +31,12 @@ #include "mongo/base/status.h" #include "mongo/stdx/functional.h" #include "mongo/transport/session.h" -#include "mongo/transport/ticket.h" #include "mongo/util/net/message.h" #include "mongo/util/time_support.h" namespace mongo { namespace transport { -class TicketImpl; - /** * The TransportLayer moves Messages between transport::Endpoints and the database. * This class owns an Acceptor that generates new endpoints from which it can @@ -67,77 +64,6 @@ public: virtual ~TransportLayer() = default; /** - * Source (receive) a new Message for this Session. - * - * This method returns a work Ticket. The caller must complete the Ticket by - * passing it to either TransportLayer::wait() or TransportLayer::asyncWait(). - * - * If an expiration date is given, the returned Ticket will expire at that time. - * - * When run, the returned Ticket will be exchanged for a Status. If the - * TransportLayer is unable to source a Message, this will be a failed status, - * and the passed-in Message buffer may be left in an invalid state. - */ - virtual Ticket sourceMessage(const SessionHandle& session, - Message* message, - Date_t expiration = Ticket::kNoExpirationDate) = 0; - - /** - * Sink (send) a new Message for this Session. This method should be used - * to send replies to a given host. - * - * This method returns a work Ticket. The caller must complete the Ticket by - * passing it to either TransportLayer::wait() or TransportLayer::asyncWait(). - * - * If an expiration date is given, the returned Ticket will expire at that time. - * - * When run, the returned Ticket will be exchanged for a Status. If the - * TransportLayer is unable to sink the Message, this will be a failed status. - * - * This method does NOT take ownership of the sunk Message, which must be cleaned - * up by the caller. - */ - virtual Ticket sinkMessage(const SessionHandle& session, - const Message& message, - Date_t expiration = Ticket::kNoExpirationDate) = 0; - - /** - * Perform a synchronous wait on the given work Ticket. When this call returns, - * the Ticket will have been completed. A call to wait() consumes the Ticket. - * - * This thread may be used by the TransportLayer to run other Tickets that were - * enqueued prior to this call. - */ - virtual Status wait(Ticket&& ticket) = 0; - - /** - * Callback for Tickets that are run via asyncWait(). - */ - using TicketCallback = stdx::function<void(Status)>; - - /** - * Perform an asynchronous wait on the given work Ticket. Once the Ticket has been - * completed, the passed-in callback will be invoked. - * - * This thread will not be used by the TransportLayer to perform work. The callback - * passed to asyncWait() may be run on any thread. - */ - virtual void asyncWait(Ticket&& ticket, TicketCallback callback) = 0; - - /** - * End the given Session. Tickets for this Session that have already been - * started via wait() or asyncWait() will complete, but may return a failed Status. - * Future calls to wait() or asyncWait() for this Session will fail. If this - * TransportLayer implementation is networked, any connections for this Session will - * be closed. - * - * ~Session() will automatically call end() with itself. - * - * This method is idempotent and synchronous. - */ - virtual void end(const SessionHandle& session) = 0; - - /** * Start the TransportLayer. After this point, the TransportLayer will begin accepting active * sessions from new transport::Endpoints. */ @@ -160,24 +86,6 @@ public: protected: TransportLayer() = default; - - /** - * Return the implementation of this Ticket. - */ - TicketImpl* getTicketImpl(const Ticket& ticket) { - return ticket.impl(); - } - - std::unique_ptr<TicketImpl> getOwnedTicketImpl(Ticket&& ticket) { - return std::move(ticket).releaseImpl(); - } - - /** - * Return the transport layer of this Ticket. - */ - TransportLayer* getTicketTransportLayer(const Ticket& ticket) { - return ticket._tl; - } }; } // namespace transport diff --git a/src/mongo/transport/transport_layer_asio.cpp b/src/mongo/transport/transport_layer_asio.cpp index b1c9499eb22..df0319a9b14 100644 --- a/src/mongo/transport/transport_layer_asio.cpp +++ b/src/mongo/transport/transport_layer_asio.cpp @@ -46,8 +46,6 @@ #include "mongo/db/service_context.h" #include "mongo/transport/asio_utils.h" #include "mongo/transport/service_entry_point.h" -#include "mongo/transport/ticket.h" -#include "mongo/transport/ticket_asio.h" #include "mongo/util/log.h" #include "mongo/util/net/hostandport.h" #include "mongo/util/net/message.h" @@ -85,48 +83,6 @@ TransportLayerASIO::TransportLayerASIO(const TransportLayerASIO::Options& opts, TransportLayerASIO::~TransportLayerASIO() = default; -Ticket TransportLayerASIO::sourceMessage(const SessionHandle& session, - Message* message, - Date_t expiration) { - auto asioSession = checked_pointer_cast<ASIOSession>(session); - auto ticket = stdx::make_unique<ASIOSourceTicket>(asioSession, expiration, message); - return {this, std::move(ticket)}; -} - -Ticket TransportLayerASIO::sinkMessage(const SessionHandle& session, - const Message& message, - Date_t expiration) { - auto asioSession = checked_pointer_cast<ASIOSession>(session); - auto ticket = stdx::make_unique<ASIOSinkTicket>(asioSession, expiration, message); - return {this, std::move(ticket)}; -} - -Status TransportLayerASIO::wait(Ticket&& ticket) { - auto ownedASIOTicket = getOwnedTicketImpl(std::move(ticket)); - auto asioTicket = checked_cast<ASIOTicket*>(ownedASIOTicket.get()); - - Status waitStatus = Status::OK(); - asioTicket->fill(true, [&waitStatus](Status result) { waitStatus = result; }); - - return waitStatus; -} - -void TransportLayerASIO::asyncWait(Ticket&& ticket, TicketCallback callback) { - auto ownedASIOTicket = std::shared_ptr<TicketImpl>(getOwnedTicketImpl(std::move(ticket))); - auto asioTicket = checked_cast<ASIOTicket*>(ownedASIOTicket.get()); - - asioTicket->fill( - false, - [ callback = std::move(callback), - ownedASIOTicket = std::move(ownedASIOTicket) ](Status status) { callback(status); }); -} - -// Must not be called while holding the TransportLayerASIO mutex. -void TransportLayerASIO::end(const SessionHandle& session) { - auto asioSession = checked_pointer_cast<ASIOSession>(session); - asioSession->shutdown(); -} - Status TransportLayerASIO::setup() { std::vector<std::string> listenAddrs; if (_listenerOptions.ipList.empty()) { diff --git a/src/mongo/transport/transport_layer_asio.h b/src/mongo/transport/transport_layer_asio.h index f04881debd8..62c0e02edae 100644 --- a/src/mongo/transport/transport_layer_asio.h +++ b/src/mongo/transport/transport_layer_asio.h @@ -38,7 +38,6 @@ #include "mongo/stdx/memory.h" #include "mongo/stdx/mutex.h" #include "mongo/stdx/thread.h" -#include "mongo/transport/ticket_impl.h" #include "mongo/transport/transport_layer.h" #include "mongo/transport/transport_mode.h" #include "mongo/util/net/hostandport.h" @@ -93,20 +92,6 @@ public: virtual ~TransportLayerASIO(); - Ticket sourceMessage(const SessionHandle& session, - Message* message, - Date_t expiration = Ticket::kNoExpirationDate) final; - - Ticket sinkMessage(const SessionHandle& session, - const Message& message, - Date_t expiration = Ticket::kNoExpirationDate) final; - - Status wait(Ticket&& ticket) final; - - void asyncWait(Ticket&& ticket, TicketCallback callback) final; - - void end(const SessionHandle& session) final; - Status setup() final; Status start() final; @@ -120,9 +105,6 @@ public: private: class ASIOSession; - class ASIOTicket; - class ASIOSourceTicket; - class ASIOSinkTicket; using ASIOSessionHandle = std::shared_ptr<ASIOSession>; using ConstASIOSessionHandle = std::shared_ptr<const ASIOSession>; diff --git a/src/mongo/transport/transport_layer_manager.cpp b/src/mongo/transport/transport_layer_manager.cpp index 9df17130142..28bcf351e51 100644 --- a/src/mongo/transport/transport_layer_manager.cpp +++ b/src/mongo/transport/transport_layer_manager.cpp @@ -49,26 +49,6 @@ namespace transport { TransportLayerManager::TransportLayerManager() = default; -Ticket TransportLayerManager::sourceMessage(const SessionHandle& session, - Message* message, - Date_t expiration) { - return session->getTransportLayer()->sourceMessage(session, message, expiration); -} - -Ticket TransportLayerManager::sinkMessage(const SessionHandle& session, - const Message& message, - Date_t expiration) { - return session->getTransportLayer()->sinkMessage(session, message, expiration); -} - -Status TransportLayerManager::wait(Ticket&& ticket) { - return getTicketTransportLayer(ticket)->wait(std::move(ticket)); -} - -void TransportLayerManager::asyncWait(Ticket&& ticket, TicketCallback callback) { - return getTicketTransportLayer(ticket)->asyncWait(std::move(ticket), std::move(callback)); -} - template <typename Callable> void TransportLayerManager::_foreach(Callable&& cb) const { { @@ -79,10 +59,6 @@ void TransportLayerManager::_foreach(Callable&& cb) const { } } -void TransportLayerManager::end(const SessionHandle& session) { - session->getTransportLayer()->end(session); -} - // TODO Right now this and setup() leave TLs started if there's an error. In practice the server // exits with an error and this isn't an issue, but we should make this more robust. Status TransportLayerManager::start() { diff --git a/src/mongo/transport/transport_layer_manager.h b/src/mongo/transport/transport_layer_manager.h index 8349fc56f32..7c80935f00a 100644 --- a/src/mongo/transport/transport_layer_manager.h +++ b/src/mongo/transport/transport_layer_manager.h @@ -33,8 +33,6 @@ #include "mongo/base/status.h" #include "mongo/stdx/mutex.h" #include "mongo/transport/session.h" -#include "mongo/transport/ticket.h" -#include "mongo/transport/ticket_impl.h" #include "mongo/transport/transport_layer.h" #include "mongo/util/net/message.h" #include "mongo/util/time_support.h" @@ -59,18 +57,6 @@ public: : _tls(std::move(tls)) {} TransportLayerManager(); - Ticket sourceMessage(const SessionHandle& session, - Message* message, - Date_t expiration = Ticket::kNoExpirationDate) override; - Ticket sinkMessage(const SessionHandle& session, - const Message& message, - Date_t expiration = Ticket::kNoExpirationDate) override; - - Status wait(Ticket&& ticket) override; - void asyncWait(Ticket&& ticket, TicketCallback callback) override; - - void end(const SessionHandle& session) override; - Status start() override; void shutdown() override; Status setup() override; diff --git a/src/mongo/transport/transport_layer_mock.cpp b/src/mongo/transport/transport_layer_mock.cpp index 44c43b2d236..ec1a28e6777 100644 --- a/src/mongo/transport/transport_layer_mock.cpp +++ b/src/mongo/transport/transport_layer_mock.cpp @@ -33,7 +33,6 @@ #include "mongo/base/status.h" #include "mongo/stdx/memory.h" #include "mongo/transport/mock_session.h" -#include "mongo/transport/mock_ticket.h" #include "mongo/transport/transport_layer.h" #include "mongo/util/net/message.h" #include "mongo/util/time_support.h" @@ -43,54 +42,8 @@ namespace transport { TransportLayerMock::TransportLayerMock() : _shutdown(false) {} -Ticket TransportLayerMock::sourceMessage(const SessionHandle& session, - Message* message, - Date_t expiration) { - if (inShutdown()) { - return Ticket(TransportLayer::ShutdownStatus); - } else if (!owns(session->id())) { - return Ticket(TransportLayer::SessionUnknownStatus); - } else if (_sessions[session->id()].ended) { - return Ticket(TransportLayer::TicketSessionClosedStatus); - } - - return Ticket(this, stdx::make_unique<transport::MockTicket>(session, message, expiration)); -} - -Ticket TransportLayerMock::sinkMessage(const SessionHandle& session, - const Message& message, - Date_t expiration) { - if (inShutdown()) { - return Ticket(TransportLayer::ShutdownStatus); - } else if (!owns(session->id())) { - return Ticket(TransportLayer::SessionUnknownStatus); - } else if (_sessions[session->id()].ended) { - return Ticket(TransportLayer::TicketSessionClosedStatus); - } - - return Ticket(this, stdx::make_unique<transport::MockTicket>(session, expiration)); -} - -Status TransportLayerMock::wait(Ticket&& ticket) { - if (inShutdown()) { - return ShutdownStatus; - } else if (!ticket.valid()) { - return ticket.status(); - } else if (!owns(ticket.sessionId())) { - return TicketSessionUnknownStatus; - } else if (_sessions[ticket.sessionId()].ended) { - return TransportLayer::TicketSessionClosedStatus; - } - - return Status::OK(); -} - -void TransportLayerMock::asyncWait(Ticket&& ticket, TicketCallback callback) { - callback(wait(std::move(ticket))); -} - SessionHandle TransportLayerMock::createSession() { - auto session = MockSession::create(this); + auto session = createSessionHook ? createSessionHook(this) : MockSession::create(this); Session::Id sessionId = session->id(); _sessions[sessionId] = Connection{false, session, SSLPeerInfo()}; @@ -109,12 +62,6 @@ bool TransportLayerMock::owns(Session::Id id) { return _sessions.count(id) > 0; } -void TransportLayerMock::end(const SessionHandle& session) { - if (!owns(session->id())) - return; - _sessions[session->id()].ended = true; -} - Status TransportLayerMock::setup() { return Status::OK(); } diff --git a/src/mongo/transport/transport_layer_mock.h b/src/mongo/transport/transport_layer_mock.h index fd8670818ad..7a861df129a 100644 --- a/src/mongo/transport/transport_layer_mock.h +++ b/src/mongo/transport/transport_layer_mock.h @@ -31,8 +31,6 @@ #include "mongo/base/status.h" #include "mongo/stdx/unordered_map.h" #include "mongo/transport/session.h" -#include "mongo/transport/ticket.h" -#include "mongo/transport/ticket_impl.h" #include "mongo/transport/transport_layer.h" #include "mongo/util/net/message.h" #include "mongo/util/net/ssl_types.h" @@ -51,27 +49,21 @@ public: TransportLayerMock(); ~TransportLayerMock(); - Ticket sourceMessage(const SessionHandle& session, - Message* message, - Date_t expiration = Ticket::kNoExpirationDate) override; - Ticket sinkMessage(const SessionHandle& session, - const Message& message, - Date_t expiration = Ticket::kNoExpirationDate) override; - - Status wait(Ticket&& ticket) override; - void asyncWait(Ticket&& ticket, TicketCallback callback) override; - SessionHandle createSession(); SessionHandle get(Session::Id id); bool owns(Session::Id id); - void end(const SessionHandle& session) override; Status setup() override; Status start() override; void shutdown() override; bool inShutdown() const; + // Set to a factory function to use your own session type. + std::function<SessionHandle(TransportLayer*)> createSessionHook; + private: + friend class MockSession; + struct Connection { bool ended; SessionHandle session; diff --git a/src/mongo/transport/transport_layer_mock_test.cpp b/src/mongo/transport/transport_layer_mock_test.cpp deleted file mode 100644 index 2234095c806..00000000000 --- a/src/mongo/transport/transport_layer_mock_test.cpp +++ /dev/null @@ -1,261 +0,0 @@ -/** - * Copyright (C) 2016 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include "mongo/stdx/memory.h" -#include "mongo/transport/mock_ticket.h" -#include "mongo/transport/session.h" -#include "mongo/transport/transport_layer.h" -#include "mongo/transport/transport_layer_mock.h" -#include "mongo/unittest/unittest.h" - -namespace mongo { -namespace transport { - -class TransportLayerMockTest : public mongo::unittest::Test { -public: - void setUp() { - _transportLayer = stdx::make_unique<TransportLayerMock>(); - } - - TransportLayerMock* tl() { - return _transportLayer.get(); - } - -private: - std::unique_ptr<TransportLayerMock> _transportLayer; -}; - -// sinkMessage() generates a valid Ticket -TEST_F(TransportLayerMockTest, SinkMessageGeneratesTicket) { - Message msg{}; - SessionHandle session = tl()->createSession(); - - // call sinkMessage() with no expiration - Ticket ticket = tl()->sinkMessage(session, msg); - ASSERT(ticket.valid()); - ASSERT_OK(ticket.status()); - ASSERT_EQUALS(ticket.sessionId(), session->id()); - ASSERT_EQUALS(ticket.expiration(), Ticket::kNoExpirationDate); - - // call sinkMessage() with an expiration - Date_t expiration = Date_t::now() + Hours(1); - ticket = tl()->sinkMessage(session, msg, expiration); - ASSERT(ticket.valid()); - ASSERT_OK(ticket.status()); - ASSERT_EQUALS(ticket.sessionId(), session->id()); - ASSERT_EQUALS(ticket.expiration(), expiration); -} - -// sinkMessage() generates an invalid Ticket if the Session is closed -TEST_F(TransportLayerMockTest, SinkMessageSessionClosed) { - Message msg{}; - SessionHandle session = tl()->createSession(); - - tl()->end(session); - - Ticket ticket = tl()->sinkMessage(session, msg); - ASSERT_FALSE(ticket.valid()); - ASSERT_EQUALS(ticket.status().code(), ErrorCodes::TransportSessionClosed); -} - -// sinkMessage() generates an invalid Ticket if the TransportLayer does not own the Session -TEST_F(TransportLayerMockTest, SinkMessageSessionUnknown) { - Message msg{}; - - std::unique_ptr<TransportLayerMock> anotherTL = stdx::make_unique<TransportLayerMock>(); - SessionHandle session = anotherTL->createSession(); - - Ticket ticket = tl()->sinkMessage(session, msg); - ASSERT_FALSE(ticket.valid()); - ASSERT_EQUALS(ticket.status().code(), ErrorCodes::TransportSessionUnknown); -} - -// sinkMessage() generates an invalid Ticket if the TransportLayer is in shutdown -TEST_F(TransportLayerMockTest, SinkMessageTLShutdown) { - Message msg{}; - SessionHandle session = tl()->createSession(); - - tl()->shutdown(); - - Ticket ticket = tl()->sinkMessage(session, msg); - ASSERT_FALSE(ticket.valid()); - ASSERT_EQUALS(ticket.status().code(), ErrorCodes::ShutdownInProgress); -} - -// sourceMessage() generates a valid ticket -TEST_F(TransportLayerMockTest, SourceMessageGeneratesTicket) { - Message msg{}; - SessionHandle session = tl()->createSession(); - - // call sourceMessage() with no expiration - Ticket ticket = tl()->sourceMessage(session, &msg); - ASSERT(ticket.valid()); - ASSERT_OK(ticket.status()); - ASSERT_EQUALS(ticket.sessionId(), session->id()); - ASSERT(msg.empty()); - ASSERT_EQUALS(ticket.expiration(), Ticket::kNoExpirationDate); - - // call sourceMessage() with an expiration - Date_t expiration = Date_t::now() + Hours(1); - ticket = tl()->sourceMessage(session, &msg, expiration); - ASSERT(ticket.valid()); - ASSERT_OK(ticket.status()); - ASSERT_EQUALS(ticket.sessionId(), session->id()); - ASSERT(msg.empty()); - ASSERT_EQUALS(ticket.expiration(), expiration); -} - -// sourceMessage() generates an invalid ticket if the Session is closed -TEST_F(TransportLayerMockTest, SourceMessageSessionClosed) { - Message msg{}; - SessionHandle session = tl()->createSession(); - - tl()->end(session); - - Ticket ticket = tl()->sourceMessage(session, &msg); - ASSERT_FALSE(ticket.valid()); - ASSERT_EQUALS(ticket.status().code(), ErrorCodes::TransportSessionClosed); -} - -// sourceMessage() generates an invalid ticket if the TransportLayer does not own the Session -TEST_F(TransportLayerMockTest, SourceMessageSessionUnknown) { - Message msg{}; - - std::unique_ptr<TransportLayerMock> anotherTL = stdx::make_unique<TransportLayerMock>(); - SessionHandle session = anotherTL->createSession(); - - Ticket ticket = tl()->sourceMessage(session, &msg); - ASSERT_FALSE(ticket.valid()); - ASSERT_EQUALS(ticket.status().code(), ErrorCodes::TransportSessionUnknown); -} - -// sourceMessage() generates an invalid ticket if the TransportLayer is in shutdown -TEST_F(TransportLayerMockTest, SourceMessageTLShutdown) { - Message msg{}; - SessionHandle session = tl()->createSession(); - - tl()->shutdown(); - - Ticket ticket = tl()->sourceMessage(session, &msg); - ASSERT_FALSE(ticket.valid()); - ASSERT_EQUALS(ticket.status().code(), ErrorCodes::ShutdownInProgress); -} - -// wait() returns an OK status -TEST_F(TransportLayerMockTest, Wait) { - SessionHandle session = tl()->createSession(); - Ticket ticket = Ticket(tl(), stdx::make_unique<transport::MockTicket>(session)); - - Status status = tl()->wait(std::move(ticket)); - ASSERT_OK(status); -} - -// wait() returns an TicketExpired error status if the Ticket expired -TEST_F(TransportLayerMockTest, WaitExpiredTicket) { - SessionHandle session = tl()->createSession(); - Ticket expiredTicket = - Ticket(tl(), stdx::make_unique<transport::MockTicket>(session, Date_t::now())); - - Status status = tl()->wait(std::move(expiredTicket)); - ASSERT_EQUALS(status.code(), ErrorCodes::ExceededTimeLimit); -} - -// wait() returns the invalid Ticket's Status -TEST_F(TransportLayerMockTest, WaitInvalidTicket) { - Ticket invalidTicket = Ticket(Status(ErrorCodes::UnknownError, "")); - ASSERT_FALSE(invalidTicket.valid()); - - Status status = tl()->wait(std::move(invalidTicket)); - ASSERT_EQUALS(status.code(), ErrorCodes::UnknownError); -} - -// wait() returns a SessionClosed error status if the Ticket's Session is closed -TEST_F(TransportLayerMockTest, WaitSessionClosed) { - SessionHandle session = tl()->createSession(); - Ticket ticket = Ticket(tl(), stdx::make_unique<transport::MockTicket>(session)); - - tl()->end(session); - - Status status = tl()->wait(std::move(ticket)); - ASSERT_EQUALS(status.code(), ErrorCodes::TransportSessionClosed); -} - -// wait() returns a SessionUnknown error status if the TransportLayer does not own the Ticket's -// Session -TEST_F(TransportLayerMockTest, WaitSessionUnknown) { - std::unique_ptr<TransportLayerMock> anotherTL = stdx::make_unique<TransportLayerMock>(); - SessionHandle session = anotherTL->createSession(); - Ticket ticket = Ticket(tl(), stdx::make_unique<transport::MockTicket>(session)); - - Status status = tl()->wait(std::move(ticket)); - ASSERT_EQUALS(status.code(), ErrorCodes::TransportSessionUnknown); -} - -// wait() returns a ShutdownInProgress status if the TransportLayer is in shutdown -TEST_F(TransportLayerMockTest, WaitTLShutdown) { - SessionHandle session = tl()->createSession(); - Ticket ticket = Ticket(tl(), stdx::make_unique<transport::MockTicket>(session)); - - tl()->shutdown(); - - Status status = tl()->wait(std::move(ticket)); - ASSERT_EQUALS(status.code(), ErrorCodes::ShutdownInProgress); -} - -std::vector<SessionHandle> createSessions(TransportLayerMock* tl) { - int numSessions = 10; - std::vector<SessionHandle> sessions; - for (int i = 0; i < numSessions; i++) { - SessionHandle session = tl->createSession(); - sessions.push_back(session); - } - return sessions; -} - -void assertEnded(TransportLayer* tl, - std::vector<SessionHandle> sessions, - ErrorCodes::Error code = ErrorCodes::TransportSessionClosed) { - for (auto session : sessions) { - Ticket ticket = Ticket(tl, stdx::make_unique<transport::MockTicket>(session)); - Status status = tl->wait(std::move(ticket)); - ASSERT_EQUALS(status.code(), code); - } -} - -// shutdown() ends all sessions and shuts down -TEST_F(TransportLayerMockTest, Shutdown) { - std::vector<SessionHandle> sessions = createSessions(tl()); - tl()->shutdown(); - assertEnded(tl(), sessions, ErrorCodes::ShutdownInProgress); - ASSERT(tl()->inShutdown()); -} - -} // namespace transport -} // namespace mongo |