diff options
Diffstat (limited to 'src/mongo/client/scoped_db_conn_test.cpp')
-rw-r--r-- | src/mongo/client/scoped_db_conn_test.cpp | 548 |
1 files changed, 268 insertions, 280 deletions
diff --git a/src/mongo/client/scoped_db_conn_test.cpp b/src/mongo/client/scoped_db_conn_test.cpp index 6efb32a4a98..bfd57ecbb77 100644 --- a/src/mongo/client/scoped_db_conn_test.cpp +++ b/src/mongo/client/scoped_db_conn_test.cpp @@ -55,378 +55,366 @@ namespace mongo { - using std::unique_ptr; - using std::string; - using std::vector; +using std::unique_ptr; +using std::string; +using std::vector; - class Client; - class OperationContext; +class Client; +class OperationContext; namespace { - std::mutex shutDownMutex; - bool shuttingDown = false; +std::mutex shutDownMutex; +bool shuttingDown = false; -} // namespace +} // namespace - // Symbols defined to build the binary correctly. - bool inShutdown() { - std::lock_guard<std::mutex> sl(shutDownMutex); - return shuttingDown; - } - - void signalShutdown() { } +// Symbols defined to build the binary correctly. +bool inShutdown() { + std::lock_guard<std::mutex> sl(shutDownMutex); + return shuttingDown; +} - DBClientBase* createDirectClient(OperationContext* txn) { - return NULL; - } +void signalShutdown() {} - void dbexit(ExitCode rc, const char *why) { - { - std::lock_guard<std::mutex> sl(shutDownMutex); - shuttingDown = true; - } +DBClientBase* createDirectClient(OperationContext* txn) { + return NULL; +} - quickExit(rc); +void dbexit(ExitCode rc, const char* why) { + { + std::lock_guard<std::mutex> sl(shutDownMutex); + shuttingDown = true; } - void exitCleanly(ExitCode rc) { - dbexit(rc, ""); - } + quickExit(rc); +} - bool haveLocalShardingInfo(Client* client, const string& ns) { - return false; - } +void exitCleanly(ExitCode rc) { + dbexit(rc, ""); +} + +bool haveLocalShardingInfo(Client* client, const string& ns) { + return false; +} namespace { - const string TARGET_HOST = "localhost:27017"; - const int TARGET_PORT = 27017; +const string TARGET_HOST = "localhost:27017"; +const int TARGET_PORT = 27017; - class DummyMessageHandler final : public MessageHandler { - public: - virtual void connected(AbstractMessagingPort* p) { +class DummyMessageHandler final : public MessageHandler { +public: + virtual void connected(AbstractMessagingPort* p) {} - } + virtual void process(Message& m, AbstractMessagingPort* por) {} - virtual void process(Message& m, AbstractMessagingPort* por) { +} dummyHandler; - } +// TODO: Take this out and make it as a reusable class in a header file. The only +// thing that is preventing this from happening is the dependency on the inShutdown +// method to terminate the socket listener thread. +/** + * Very basic server that accepts connections. Note: can only create one instance + * at a time. Should not create as a static global instance because of dependency + * with ListeningSockets::_instance. + * + * Warning: Not thread-safe + * + * Note: external symbols used: + * shutDownMutex, shuttingDown + */ +class DummyServer { +public: + /** + * Creates a new server that listens to the given port. + * + * @param port the port number to listen to. + */ + DummyServer(int port) : _port(port) {} - } dummyHandler; + ~DummyServer() { + stop(); + } - // TODO: Take this out and make it as a reusable class in a header file. The only - // thing that is preventing this from happening is the dependency on the inShutdown - // method to terminate the socket listener thread. /** - * Very basic server that accepts connections. Note: can only create one instance - * at a time. Should not create as a static global instance because of dependency - * with ListeningSockets::_instance. - * - * Warning: Not thread-safe + * Starts the server if it is not yet running. * - * Note: external symbols used: - * shutDownMutex, shuttingDown + * @param messageHandler the message handler to use for this server. Ownership + * of this object is passed to this server. */ - class DummyServer { - public: - /** - * Creates a new server that listens to the given port. - * - * @param port the port number to listen to. - */ - DummyServer(int port) : _port(port) { + void run(MessageHandler* messsageHandler) { + if (_server != NULL) { + return; + } + MessageServer::Options options; + options.port = _port; + + { + std::lock_guard<std::mutex> sl(shutDownMutex); + shuttingDown = false; } - ~DummyServer() { - stop(); + _server.reset(createServer(options, messsageHandler)); + _serverThread = std::thread(runServer, _server.get()); + } + + /** + * Stops the server if it is running. + */ + void stop() { + if (!_server) { + return; } - /** - * Starts the server if it is not yet running. - * - * @param messageHandler the message handler to use for this server. Ownership - * of this object is passed to this server. - */ - void run(MessageHandler* messsageHandler) { - if (_server != NULL) { - return; - } + { + std::lock_guard<std::mutex> sl(shutDownMutex); + shuttingDown = true; + } - MessageServer::Options options; - options.port = _port; + ListeningSockets::get()->closeAll(); + _serverThread.join(); - { - std::lock_guard<std::mutex> sl(shutDownMutex); - shuttingDown = false; + int connCount = Listener::globalTicketHolder.used(); + size_t iterCount = 0; + while (connCount > 0) { + if ((++iterCount % 20) == 0) { + log() << "DummyServer: Waiting for " << connCount << " connections to close." + << std::endl; } - _server.reset(createServer(options, messsageHandler)); - _serverThread = std::thread(runServer, _server.get()); + sleepmillis(500); + connCount = Listener::globalTicketHolder.used(); } - /** - * Stops the server if it is running. - */ - void stop() { - if (!_server) { - return; - } + _server.reset(); + } - { - std::lock_guard<std::mutex> sl(shutDownMutex); - shuttingDown = true; - } + /** + * Helper method for running the server on a separate thread. + */ + static void runServer(MessageServer* server) { + server->setupSockets(); + server->run(); + } - ListeningSockets::get()->closeAll(); - _serverThread.join(); +private: + const int _port; - int connCount = Listener::globalTicketHolder.used(); - size_t iterCount = 0; - while (connCount > 0) { - if ((++iterCount % 20) == 0) { - log() << "DummyServer: Waiting for " << connCount - << " connections to close." << std::endl; - } + std::thread _serverThread; + unique_ptr<MessageServer> _server; +}; - sleepmillis(500); - connCount = Listener::globalTicketHolder.used(); +/** + * Warning: cannot run in parallel + */ +class DummyServerFixture : public unittest::Test { +public: + void setUp() { + _maxPoolSizePerHost = globalConnPool.getMaxPoolSize(); + _dummyServer = new DummyServer(TARGET_PORT); + + _dummyServer->run(&dummyHandler); + DBClientConnection conn; + Timer timer; + + // Make sure the dummy server is up and running before proceeding + while (true) { + try { + conn.connect(TARGET_HOST); + break; + } catch (const ConnectException&) { + if (timer.seconds() > 20) { + FAIL("Timed out connecting to dummy server"); + } } - - _server.reset(); } + } - /** - * Helper method for running the server on a separate thread. - */ - static void runServer(MessageServer* server) { - server->setupSockets(); - server->run(); - } + void tearDown() { + ScopedDbConnection::clearPool(); + delete _dummyServer; + + globalConnPool.setMaxPoolSize(_maxPoolSizePerHost); + } - private: - const int _port; +protected: + static void assertGreaterThan(uint64_t a, uint64_t b) { + ASSERT_GREATER_THAN(a, b); + } - std::thread _serverThread; - unique_ptr<MessageServer> _server; - }; + static void assertNotEqual(uint64_t a, uint64_t b) { + ASSERT_NOT_EQUALS(a, b); + } /** - * Warning: cannot run in parallel + * Tries to grab a series of connections from the pool, perform checks on + * them, then put them back into the globalConnPool. After that, it checks these + * connections can be retrieved again from the pool. + * + * @param checkFunc method for comparing new connections and arg2. + * @param arg2 the value to pass as the 2nd parameter of checkFunc. + * @param newConnsToCreate the number of new connections to make. */ - class DummyServerFixture: public unittest::Test { - public: - void setUp() { - _maxPoolSizePerHost = globalConnPool.getMaxPoolSize(); - _dummyServer = new DummyServer(TARGET_PORT); - - _dummyServer->run(&dummyHandler); - DBClientConnection conn; - Timer timer; - - // Make sure the dummy server is up and running before proceeding - while (true) { - try { - conn.connect(TARGET_HOST); - break; - } catch (const ConnectException&) { - if (timer.seconds() > 20) { - FAIL("Timed out connecting to dummy server"); - } - } - } + void checkNewConns(void (*checkFunc)(uint64_t, uint64_t), + uint64_t arg2, + size_t newConnsToCreate) { + vector<ScopedDbConnection*> newConnList; + for (size_t x = 0; x < newConnsToCreate; x++) { + ScopedDbConnection* newConn = new ScopedDbConnection(TARGET_HOST); + checkFunc(newConn->get()->getSockCreationMicroSec(), arg2); + newConnList.push_back(newConn); } - void tearDown() { - ScopedDbConnection::clearPool(); - delete _dummyServer; + const uint64_t oldCreationTime = curTimeMicros64(); - globalConnPool.setMaxPoolSize(_maxPoolSizePerHost); + for (vector<ScopedDbConnection*>::iterator iter = newConnList.begin(); + iter != newConnList.end(); + ++iter) { + (*iter)->done(); + delete *iter; } - protected: - static void assertGreaterThan(uint64_t a, uint64_t b) { - ASSERT_GREATER_THAN(a, b); - } + newConnList.clear(); - static void assertNotEqual(uint64_t a, uint64_t b) { - ASSERT_NOT_EQUALS(a, b); + // Check that connections created after the purge was put back to the pool. + for (size_t x = 0; x < newConnsToCreate; x++) { + ScopedDbConnection* newConn = new ScopedDbConnection(TARGET_HOST); + ASSERT_LESS_THAN(newConn->get()->getSockCreationMicroSec(), oldCreationTime); + newConnList.push_back(newConn); } - /** - * Tries to grab a series of connections from the pool, perform checks on - * them, then put them back into the globalConnPool. After that, it checks these - * connections can be retrieved again from the pool. - * - * @param checkFunc method for comparing new connections and arg2. - * @param arg2 the value to pass as the 2nd parameter of checkFunc. - * @param newConnsToCreate the number of new connections to make. - */ - void checkNewConns(void (*checkFunc)(uint64_t, uint64_t), uint64_t arg2, - size_t newConnsToCreate) { - vector<ScopedDbConnection*> newConnList; - for (size_t x = 0; x < newConnsToCreate; x++) { - ScopedDbConnection* newConn = new ScopedDbConnection(TARGET_HOST); - checkFunc(newConn->get()->getSockCreationMicroSec(), arg2); - newConnList.push_back(newConn); - } + for (vector<ScopedDbConnection*>::iterator iter = newConnList.begin(); + iter != newConnList.end(); + ++iter) { + (*iter)->done(); + delete *iter; + } + } - const uint64_t oldCreationTime = curTimeMicros64(); +private: + static void runServer(MessageServer* server) { + server->setupSockets(); + server->run(); + } - for (vector<ScopedDbConnection*>::iterator iter = newConnList.begin(); - iter != newConnList.end(); ++iter) { - (*iter)->done(); - delete *iter; - } + DummyServer* _dummyServer; + uint32_t _maxPoolSizePerHost; +}; - newConnList.clear(); +TEST_F(DummyServerFixture, BasicScopedDbConnection) { + ScopedDbConnection conn1(TARGET_HOST); + ScopedDbConnection conn2(TARGET_HOST); - // Check that connections created after the purge was put back to the pool. - for (size_t x = 0; x < newConnsToCreate; x++) { - ScopedDbConnection* newConn = new ScopedDbConnection(TARGET_HOST); - ASSERT_LESS_THAN(newConn->get()->getSockCreationMicroSec(), oldCreationTime); - newConnList.push_back(newConn); - } + DBClientBase* conn1Ptr = conn1.get(); + conn1.done(); - for (vector<ScopedDbConnection*>::iterator iter = newConnList.begin(); - iter != newConnList.end(); ++iter) { - (*iter)->done(); - delete *iter; - } - } + ScopedDbConnection conn3(TARGET_HOST); + ASSERT_EQUALS(conn1Ptr, conn3.get()); - private: - static void runServer(MessageServer* server) { - server->setupSockets(); - server->run(); - } + conn2.done(); + conn3.done(); +} - DummyServer* _dummyServer; - uint32_t _maxPoolSizePerHost; - }; +TEST_F(DummyServerFixture, InvalidateBadConnInPool) { + ScopedDbConnection conn1(TARGET_HOST); + ScopedDbConnection conn2(TARGET_HOST); + ScopedDbConnection conn3(TARGET_HOST); - TEST_F(DummyServerFixture, BasicScopedDbConnection) { - ScopedDbConnection conn1(TARGET_HOST); - ScopedDbConnection conn2(TARGET_HOST); + conn1.done(); + conn3.done(); - DBClientBase* conn1Ptr = conn1.get(); - conn1.done(); + const uint64_t badCreationTime = curTimeMicros64(); - ScopedDbConnection conn3(TARGET_HOST); - ASSERT_EQUALS(conn1Ptr, conn3.get()); + getGlobalFailPointRegistry()->getFailPoint("throwSockExcep")->setMode(FailPoint::alwaysOn); - conn2.done(); - conn3.done(); + try { + conn2->query("test.user", Query()); + } catch (const SocketException&) { } - TEST_F(DummyServerFixture, InvalidateBadConnInPool) { - ScopedDbConnection conn1(TARGET_HOST); - ScopedDbConnection conn2(TARGET_HOST); - ScopedDbConnection conn3(TARGET_HOST); + getGlobalFailPointRegistry()->getFailPoint("throwSockExcep")->setMode(FailPoint::off); + conn2.done(); - conn1.done(); - conn3.done(); + checkNewConns(assertGreaterThan, badCreationTime, 10); +} - const uint64_t badCreationTime = curTimeMicros64(); +TEST_F(DummyServerFixture, DontReturnKnownBadConnToPool) { + ScopedDbConnection conn1(TARGET_HOST); + ScopedDbConnection conn2(TARGET_HOST); + ScopedDbConnection conn3(TARGET_HOST); - getGlobalFailPointRegistry()->getFailPoint("throwSockExcep")-> - setMode(FailPoint::alwaysOn); + conn1.done(); - try { - conn2->query("test.user", Query()); - } - catch (const SocketException&) { - } - - getGlobalFailPointRegistry()->getFailPoint("throwSockExcep")-> - setMode(FailPoint::off); - conn2.done(); + getGlobalFailPointRegistry()->getFailPoint("throwSockExcep")->setMode(FailPoint::alwaysOn); - checkNewConns(assertGreaterThan, badCreationTime, 10); + try { + conn3->query("test.user", Query()); + } catch (const SocketException&) { } - TEST_F(DummyServerFixture, DontReturnKnownBadConnToPool) { - ScopedDbConnection conn1(TARGET_HOST); - ScopedDbConnection conn2(TARGET_HOST); - ScopedDbConnection conn3(TARGET_HOST); - - conn1.done(); - - getGlobalFailPointRegistry()->getFailPoint("throwSockExcep")-> - setMode(FailPoint::alwaysOn); + getGlobalFailPointRegistry()->getFailPoint("throwSockExcep")->setMode(FailPoint::off); - try { - conn3->query("test.user", Query()); - } - catch (const SocketException&) { - } - - getGlobalFailPointRegistry()->getFailPoint("throwSockExcep")-> - setMode(FailPoint::off); + const uint64_t badCreationTime = conn3->getSockCreationMicroSec(); + conn3.done(); + // attempting to put a 'bad' connection back to the pool + conn2.done(); - const uint64_t badCreationTime = conn3->getSockCreationMicroSec(); - conn3.done(); - // attempting to put a 'bad' connection back to the pool - conn2.done(); + checkNewConns(assertGreaterThan, badCreationTime, 10); +} - checkNewConns(assertGreaterThan, badCreationTime, 10); - } +TEST_F(DummyServerFixture, InvalidateBadConnEvenWhenPoolIsFull) { + globalConnPool.setMaxPoolSize(2); - TEST_F(DummyServerFixture, InvalidateBadConnEvenWhenPoolIsFull) { - globalConnPool.setMaxPoolSize(2); + ScopedDbConnection conn1(TARGET_HOST); + ScopedDbConnection conn2(TARGET_HOST); + ScopedDbConnection conn3(TARGET_HOST); - ScopedDbConnection conn1(TARGET_HOST); - ScopedDbConnection conn2(TARGET_HOST); - ScopedDbConnection conn3(TARGET_HOST); + conn1.done(); + conn3.done(); - conn1.done(); - conn3.done(); + const uint64_t badCreationTime = curTimeMicros64(); - const uint64_t badCreationTime = curTimeMicros64(); + getGlobalFailPointRegistry()->getFailPoint("throwSockExcep")->setMode(FailPoint::alwaysOn); - getGlobalFailPointRegistry()->getFailPoint("throwSockExcep")-> - setMode(FailPoint::alwaysOn); - - try { - conn2->query("test.user", Query()); - } - catch (const SocketException&) { - } + try { + conn2->query("test.user", Query()); + } catch (const SocketException&) { + } - getGlobalFailPointRegistry()->getFailPoint("throwSockExcep")-> - setMode(FailPoint::off); - conn2.done(); + getGlobalFailPointRegistry()->getFailPoint("throwSockExcep")->setMode(FailPoint::off); + conn2.done(); - checkNewConns(assertGreaterThan, badCreationTime, 2); - } + checkNewConns(assertGreaterThan, badCreationTime, 2); +} - TEST_F(DummyServerFixture, DontReturnConnGoneBadToPool) { - ScopedDbConnection conn1(TARGET_HOST); +TEST_F(DummyServerFixture, DontReturnConnGoneBadToPool) { + ScopedDbConnection conn1(TARGET_HOST); - const uint64_t conn1CreationTime = conn1->getSockCreationMicroSec(); + const uint64_t conn1CreationTime = conn1->getSockCreationMicroSec(); - uint64_t conn2CreationTime = 0; + uint64_t conn2CreationTime = 0; - { - ScopedDbConnection conn2(TARGET_HOST); - conn2CreationTime = conn2->getSockCreationMicroSec(); + { + ScopedDbConnection conn2(TARGET_HOST); + conn2CreationTime = conn2->getSockCreationMicroSec(); - conn1.done(); - // conn2 gets out of scope without calling done() - } + conn1.done(); + // conn2 gets out of scope without calling done() + } - // conn2 should not have been put back into the pool but it should - // also not invalidate older connections since it didn't encounter - // a socket exception. + // conn2 should not have been put back into the pool but it should + // also not invalidate older connections since it didn't encounter + // a socket exception. - ScopedDbConnection conn1Again(TARGET_HOST); - ASSERT_EQUALS(conn1CreationTime, conn1Again->getSockCreationMicroSec()); + ScopedDbConnection conn1Again(TARGET_HOST); + ASSERT_EQUALS(conn1CreationTime, conn1Again->getSockCreationMicroSec()); - checkNewConns(assertNotEqual, conn2CreationTime, 10); + checkNewConns(assertNotEqual, conn2CreationTime, 10); - conn1Again.done(); - } + conn1Again.done(); +} -} // namespace -} // namespace mongo +} // namespace +} // namespace mongo |