summaryrefslogtreecommitdiff
path: root/src/mongo/client/scoped_db_conn_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/client/scoped_db_conn_test.cpp')
-rw-r--r--src/mongo/client/scoped_db_conn_test.cpp71
1 files changed, 48 insertions, 23 deletions
diff --git a/src/mongo/client/scoped_db_conn_test.cpp b/src/mongo/client/scoped_db_conn_test.cpp
index 9ee72a63953..ba54e4ec733 100644
--- a/src/mongo/client/scoped_db_conn_test.cpp
+++ b/src/mongo/client/scoped_db_conn_test.cpp
@@ -41,11 +41,14 @@
#include "mongo/rpc/request_interface.h"
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/thread.h"
+#include "mongo/transport/service_entry_point.h"
+#include "mongo/transport/session.h"
+#include "mongo/transport/transport_layer.h"
+#include "mongo/transport/transport_layer_legacy.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
#include "mongo/util/net/listen.h"
-#include "mongo/util/net/message_server.h"
#include "mongo/util/net/socket_exception.h"
#include "mongo/util/quick_exit.h"
#include "mongo/util/time_support.h"
@@ -72,12 +75,30 @@ namespace {
const string TARGET_HOST = "localhost:27017";
const int TARGET_PORT = 27017;
-class DummyMessageHandler final : public MessageHandler {
+class DummyServiceEntryPoint : public ServiceEntryPoint {
+ MONGO_DISALLOW_COPYING(DummyServiceEntryPoint);
+
public:
- virtual void connected(AbstractMessagingPort* p) {}
+ DummyServiceEntryPoint() {}
+
+ virtual ~DummyServiceEntryPoint() {
+ for (auto& t : _threads) {
+ t.join();
+ }
+ }
+
+ void startSession(transport::Session&& session) override {
+ _threads.emplace_back(&DummyServiceEntryPoint::run, this, std::move(session));
+ }
+
+private:
+ void run(transport::Session&& session) {
+ Message inMessage;
+ if (!session.sourceMessage(&inMessage).wait().isOK()) {
+ return;
+ }
- virtual void process(Message& m, AbstractMessagingPort* port) {
- auto request = rpc::makeRequest(&m);
+ auto request = rpc::makeRequest(&inMessage);
auto reply = rpc::makeReplyBuilder(request->getProtocol());
BSONObjBuilder commandResponse;
@@ -92,10 +113,14 @@ public:
.setMetadata(rpc::makeEmptyMetadata())
.done();
- port->reply(m, response);
+ response.header().setResponseToMsgId(inMessage.header().getId());
+
+ if (!session.sinkMessage(response).wait().isOK()) {
+ return;
+ }
}
- virtual void close() {}
+ std::vector<stdx::thread> _threads;
};
// TODO: Take this out and make it as a reusable class in a header file. The only
@@ -130,15 +155,15 @@ public:
* @param messageHandler the message handler to use for this server. Ownership
* of this object is passed to this server.
*/
- void run(std::shared_ptr<MessageHandler> messsageHandler) {
- if (_server != NULL) {
+ void run(std::shared_ptr<ServiceEntryPoint> serviceEntryPoint) {
+ if (_server) {
return;
}
- MessageServer::Options options;
+ transport::TransportLayerLegacy::Options options;
options.port = _port;
- _server.reset(createServer(options, std::move(messsageHandler), getGlobalServiceContext()));
+ _server = stdx::make_unique<transport::TransportLayerLegacy>(options, serviceEntryPoint);
_serverThread = stdx::thread(runServer, _server.get());
}
@@ -164,23 +189,23 @@ public:
sleepmillis(500);
connCount = Listener::globalTicketHolder.used();
}
-
+ _server->shutdown();
_server.reset();
}
/**
* Helper method for running the server on a separate thread.
*/
- static void runServer(MessageServer* server) {
- server->setupSockets();
- server->run();
+ static void runServer(transport::TransportLayerLegacy* server) {
+ server->setup();
+ server->start();
}
private:
const int _port;
stdx::thread _serverThread;
- unique_ptr<MessageServer> _server;
+ unique_ptr<transport::TransportLayerLegacy> _server;
};
/**
@@ -190,9 +215,9 @@ class DummyServerFixture : public unittest::Test {
public:
void setUp() {
_maxPoolSizePerHost = globalConnPool.getMaxPoolSize();
- _dummyServer = new DummyServer(TARGET_PORT);
+ _dummyServer = stdx::make_unique<DummyServer>(TARGET_PORT);
- auto dummyHandler = std::make_shared<DummyMessageHandler>();
+ auto dummyHandler = std::make_shared<DummyServiceEntryPoint>();
_dummyServer->run(std::move(dummyHandler));
DBClientConnection conn;
Timer timer;
@@ -212,7 +237,7 @@ public:
void tearDown() {
ScopedDbConnection::clearPool();
- delete _dummyServer;
+ _dummyServer.reset();
globalConnPool.setMaxPoolSize(_maxPoolSizePerHost);
}
@@ -272,12 +297,12 @@ protected:
}
private:
- static void runServer(MessageServer* server) {
- server->setupSockets();
- server->run();
+ static void runServer(transport::TransportLayerLegacy* server) {
+ server->setup();
+ server->start();
}
- DummyServer* _dummyServer;
+ std::unique_ptr<DummyServer> _dummyServer;
uint32_t _maxPoolSizePerHost;
};