diff options
Diffstat (limited to 'src/mongo/client/scoped_db_conn_test.cpp')
-rw-r--r-- | src/mongo/client/scoped_db_conn_test.cpp | 71 |
1 files changed, 48 insertions, 23 deletions
diff --git a/src/mongo/client/scoped_db_conn_test.cpp b/src/mongo/client/scoped_db_conn_test.cpp index 9ee72a63953..ba54e4ec733 100644 --- a/src/mongo/client/scoped_db_conn_test.cpp +++ b/src/mongo/client/scoped_db_conn_test.cpp @@ -41,11 +41,14 @@ #include "mongo/rpc/request_interface.h" #include "mongo/stdx/mutex.h" #include "mongo/stdx/thread.h" +#include "mongo/transport/service_entry_point.h" +#include "mongo/transport/session.h" +#include "mongo/transport/transport_layer.h" +#include "mongo/transport/transport_layer_legacy.h" #include "mongo/unittest/unittest.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" #include "mongo/util/net/listen.h" -#include "mongo/util/net/message_server.h" #include "mongo/util/net/socket_exception.h" #include "mongo/util/quick_exit.h" #include "mongo/util/time_support.h" @@ -72,12 +75,30 @@ namespace { const string TARGET_HOST = "localhost:27017"; const int TARGET_PORT = 27017; -class DummyMessageHandler final : public MessageHandler { +class DummyServiceEntryPoint : public ServiceEntryPoint { + MONGO_DISALLOW_COPYING(DummyServiceEntryPoint); + public: - virtual void connected(AbstractMessagingPort* p) {} + DummyServiceEntryPoint() {} + + virtual ~DummyServiceEntryPoint() { + for (auto& t : _threads) { + t.join(); + } + } + + void startSession(transport::Session&& session) override { + _threads.emplace_back(&DummyServiceEntryPoint::run, this, std::move(session)); + } + +private: + void run(transport::Session&& session) { + Message inMessage; + if (!session.sourceMessage(&inMessage).wait().isOK()) { + return; + } - virtual void process(Message& m, AbstractMessagingPort* port) { - auto request = rpc::makeRequest(&m); + auto request = rpc::makeRequest(&inMessage); auto reply = rpc::makeReplyBuilder(request->getProtocol()); BSONObjBuilder commandResponse; @@ -92,10 +113,14 @@ public: .setMetadata(rpc::makeEmptyMetadata()) .done(); - port->reply(m, response); + response.header().setResponseToMsgId(inMessage.header().getId()); + + if (!session.sinkMessage(response).wait().isOK()) { + return; + } } - virtual void close() {} + std::vector<stdx::thread> _threads; }; // TODO: Take this out and make it as a reusable class in a header file. The only @@ -130,15 +155,15 @@ public: * @param messageHandler the message handler to use for this server. Ownership * of this object is passed to this server. */ - void run(std::shared_ptr<MessageHandler> messsageHandler) { - if (_server != NULL) { + void run(std::shared_ptr<ServiceEntryPoint> serviceEntryPoint) { + if (_server) { return; } - MessageServer::Options options; + transport::TransportLayerLegacy::Options options; options.port = _port; - _server.reset(createServer(options, std::move(messsageHandler), getGlobalServiceContext())); + _server = stdx::make_unique<transport::TransportLayerLegacy>(options, serviceEntryPoint); _serverThread = stdx::thread(runServer, _server.get()); } @@ -164,23 +189,23 @@ public: sleepmillis(500); connCount = Listener::globalTicketHolder.used(); } - + _server->shutdown(); _server.reset(); } /** * Helper method for running the server on a separate thread. */ - static void runServer(MessageServer* server) { - server->setupSockets(); - server->run(); + static void runServer(transport::TransportLayerLegacy* server) { + server->setup(); + server->start(); } private: const int _port; stdx::thread _serverThread; - unique_ptr<MessageServer> _server; + unique_ptr<transport::TransportLayerLegacy> _server; }; /** @@ -190,9 +215,9 @@ class DummyServerFixture : public unittest::Test { public: void setUp() { _maxPoolSizePerHost = globalConnPool.getMaxPoolSize(); - _dummyServer = new DummyServer(TARGET_PORT); + _dummyServer = stdx::make_unique<DummyServer>(TARGET_PORT); - auto dummyHandler = std::make_shared<DummyMessageHandler>(); + auto dummyHandler = std::make_shared<DummyServiceEntryPoint>(); _dummyServer->run(std::move(dummyHandler)); DBClientConnection conn; Timer timer; @@ -212,7 +237,7 @@ public: void tearDown() { ScopedDbConnection::clearPool(); - delete _dummyServer; + _dummyServer.reset(); globalConnPool.setMaxPoolSize(_maxPoolSizePerHost); } @@ -272,12 +297,12 @@ protected: } private: - static void runServer(MessageServer* server) { - server->setupSockets(); - server->run(); + static void runServer(transport::TransportLayerLegacy* server) { + server->setup(); + server->start(); } - DummyServer* _dummyServer; + std::unique_ptr<DummyServer> _dummyServer; uint32_t _maxPoolSizePerHost; }; |