/* Copyright 2012 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault
#include "mongo/platform/basic.h"
#include
#include
#include "mongo/client/connpool.h"
#include "mongo/client/global_conn_pool.h"
#include "mongo/db/service_context.h"
#include "mongo/db/wire_version.h"
#include "mongo/rpc/factory.h"
#include "mongo/rpc/reply_builder_interface.h"
#include "mongo/rpc/request_interface.h"
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/thread.h"
#include "mongo/transport/service_entry_point.h"
#include "mongo/transport/session.h"
#include "mongo/transport/transport_layer.h"
#include "mongo/transport/transport_layer_legacy.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
#include "mongo/util/net/listen.h"
#include "mongo/util/net/socket_exception.h"
#include "mongo/util/quick_exit.h"
#include "mongo/util/time_support.h"
#include "mongo/util/timer.h"
/**
* Tests for ScopedDbConnection, particularly in connection pool management.
* The tests also indirectly tests DBClientConnection's failure detection
* logic (hence the use of the dummy server as opposed to mocking the
* connection).
*/
namespace mongo {
using std::unique_ptr;
using std::string;
using std::vector;
class Client;
class OperationContext;
namespace {
const string TARGET_HOST = "localhost:27017";
const int TARGET_PORT = 27017;
class DummyServiceEntryPoint : public ServiceEntryPoint {
MONGO_DISALLOW_COPYING(DummyServiceEntryPoint);
public:
DummyServiceEntryPoint() {}
virtual ~DummyServiceEntryPoint() {
for (auto& t : _threads) {
t.join();
}
}
void startSession(transport::SessionHandle session) override {
_threads.emplace_back(&DummyServiceEntryPoint::run, this, std::move(session));
}
void setReplyDelay(Milliseconds delay) {
_replyDelay = delay;
}
private:
void run(transport::SessionHandle session) {
Message inMessage;
if (!session->sourceMessage(&inMessage).wait().isOK()) {
return;
}
auto request = rpc::makeRequest(&inMessage);
commandRequestHook(request.get());
auto reply = rpc::makeReplyBuilder(request->getProtocol());
BSONObjBuilder commandResponse;
// We need to handle the isMaster received during connection.
if (request->getCommandName() == "isMaster") {
commandResponse.append("maxWireVersion", WireVersion::COMMANDS_ACCEPT_WRITE_CONCERN);
commandResponse.append("minWireVersion", WireVersion::RELEASE_2_4_AND_BEFORE);
}
auto response = reply->setCommandReply(commandResponse.done())
.setMetadata(rpc::makeEmptyMetadata())
.done();
response.header().setResponseToMsgId(inMessage.header().getId());
if (_replyDelay.count() > 0) {
log() << "Delaying response for " << _replyDelay;
sleepFor(_replyDelay);
}
if (!session->sinkMessage(response).wait().isOK()) {
return;
}
}
/**
* Subclasses can override this in order to make assertions about the command request.
*/
virtual void commandRequestHook(const rpc::RequestInterface* request) const {}
std::vector _threads;
Milliseconds _replyDelay{0};
};
// TODO: Take this out and make it as a reusable class in a header file. The only
// thing that is preventing this from happening is the dependency on the inShutdown
// method to terminate the socket listener thread.
/**
* Very basic server that accepts connections. Note: can only create one instance
* at a time. Should not create as a static global instance because of dependency
* with ListeningSockets::_instance.
*
* Warning: Not thread-safe
*
* Note: external symbols used:
* shutDownMutex, shuttingDown
*/
class DummyServer {
public:
/**
* Creates a new server that listens to the given port.
*
* @param port the port number to listen to.
*/
DummyServer(int port) : _port(port) {}
~DummyServer() {
stop();
}
/**
* Starts the server if it is not yet running.
*
* @param messageHandler the message handler to use for this server. Ownership
* of this object is passed to this server.
*/
void run(ServiceEntryPoint* serviceEntryPoint) {
if (_server) {
return;
}
transport::TransportLayerLegacy::Options options;
options.port = _port;
_server = stdx::make_unique(options, serviceEntryPoint);
_serverThread = stdx::thread(runServer, _server.get());
}
/**
* Stops the server if it is running.
*/
void stop() {
if (!_server) {
return;
}
ListeningSockets::get()->closeAll();
_serverThread.join();
int connCount = Listener::globalTicketHolder.used();
size_t iterCount = 0;
while (connCount > 0) {
if ((++iterCount % 20) == 0) {
log() << "DummyServer: Waiting for " << connCount << " connections to close."
<< std::endl;
}
sleepmillis(500);
connCount = Listener::globalTicketHolder.used();
}
_server->shutdown();
_server.reset();
}
/**
* Helper method for running the server on a separate thread.
*/
static void runServer(transport::TransportLayerLegacy* server) {
server->setup();
server->start();
}
private:
const int _port;
stdx::thread _serverThread;
unique_ptr _server;
};
/**
* Warning: cannot run in parallel
*/
class DummyServerFixture : public unittest::Test {
public:
void setUp() {
WireSpec::instance().isInternalClient = isInternalClient();
_maxPoolSizePerHost = globalConnPool.getMaxPoolSize();
_dummyServer = stdx::make_unique(TARGET_PORT);
_dummyServiceEntryPoint = makeServiceEntryPoint();
_dummyServer->run(_dummyServiceEntryPoint.get());
DBClientConnection conn;
Timer timer;
// Make sure the dummy server is up and running before proceeding
while (true) {
auto connectStatus = conn.connect(HostAndPort{TARGET_HOST}, StringData());
if (connectStatus.isOK()) {
break;
}
if (timer.seconds() > 20) {
FAIL(str::stream() << "Timed out connecting to dummy server: "
<< connectStatus.toString());
}
}
}
void tearDown() {
ScopedDbConnection::clearPool();
_dummyServer.reset();
_dummyServiceEntryPoint.reset();
globalConnPool.setMaxPoolSize(_maxPoolSizePerHost);
}
protected:
static void assertGreaterThan(uint64_t a, uint64_t b) {
ASSERT_GREATER_THAN(a, b);
}
static void assertNotEqual(uint64_t a, uint64_t b) {
ASSERT_NOT_EQUALS(a, b);
}
void setReplyDelay(Milliseconds delay) {
_dummyServiceEntryPoint->setReplyDelay(delay);
}
/**
* Tries to grab a series of connections from the pool, perform checks on
* them, then put them back into the globalConnPool. After that, it checks these
* connections can be retrieved again from the pool.
*
* @param checkFunc method for comparing new connections and arg2.
* @param arg2 the value to pass as the 2nd parameter of checkFunc.
* @param newConnsToCreate the number of new connections to make.
*/
void checkNewConns(void (*checkFunc)(uint64_t, uint64_t),
uint64_t arg2,
const int newConnsToCreate) {
vector newConnList;
for (int x = 0; x < newConnsToCreate; x++) {
ScopedDbConnection* newConn = new ScopedDbConnection(TARGET_HOST);
checkFunc(newConn->get()->getSockCreationMicroSec(), arg2);
newConnList.push_back(newConn);
}
std::set connIds;
int prevNumBadConns = globalConnPool.getNumBadConns(TARGET_HOST);
for (vector::iterator iter = newConnList.begin();
iter != newConnList.end();
++iter) {
connIds.insert((*iter)->get()->getConnectionId());
// ScopedDbConnection::done() may not successfuly add the connection back to
// the pool if the connection has failed. If the connection is not addded back
// to the pool, we increment the number of bad connections in the pool by one
// (see PoolForHost::done()). We then use the number of bad connections to
// verify the number of connections successfully added back to the pool.
(*iter)->done();
delete *iter;
}
int numBadConns = globalConnPool.getNumBadConns(TARGET_HOST) - prevNumBadConns;
const int numConnsInPool = globalConnPool.getNumAvailableConns(TARGET_HOST);
ASSERT_EQ(numConnsInPool, newConnsToCreate - numBadConns);
newConnList.clear();
// Check that connections created after the purge were put back to the pool.
int numReusedConns = 0;
prevNumBadConns = globalConnPool.getNumBadConns(TARGET_HOST);
for (int x = 0; x < newConnsToCreate; x++) {
// ScopedDBConnection will attempt to reuse a connection from the pool if
// the pool is not empty. It may fail to reuse that connection if that
// connection has gone bad, in that case, it'll try to get another connection
// from the pool and increment the number of bad connections in the pool
// If the pool is empty however, it'll create a new connection.
// See PoolForHost::get() and DBConnectionPool::get().
ScopedDbConnection* newConn = new ScopedDbConnection(TARGET_HOST);
newConnList.push_back(newConn);
if (connIds.count(newConn->get()->getConnectionId())) {
numReusedConns++;
}
}
numBadConns = globalConnPool.getNumBadConns(TARGET_HOST) - prevNumBadConns;
// Each bad connection is not a reused connection. Therefore the number of reused
// connections plus the number of bad ones should be equal to the number of connections
// that were in the pool.
ASSERT_EQ(numConnsInPool, numReusedConns + numBadConns);
ASSERT_EQ(globalConnPool.getNumAvailableConns(TARGET_HOST), 0);
for (vector::iterator iter = newConnList.begin();
iter != newConnList.end();
++iter) {
(*iter)->done();
delete *iter;
}
}
private:
static void runServer(transport::TransportLayerLegacy* server) {
server->setup();
server->start();
}
/**
* Subclasses can override this in order to use a specialized service entry point.
*/
virtual std::unique_ptr makeServiceEntryPoint() const {
return stdx::make_unique();
}
/**
* Subclasses can override this to make the client code behave like an internal client (e.g.
* mongod or mongos) as opposed to an external one (e.g. the shell).
*/
virtual bool isInternalClient() const {
return false;
}
std::unique_ptr _dummyServer;
std::unique_ptr _dummyServiceEntryPoint;
uint32_t _maxPoolSizePerHost;
};
TEST_F(DummyServerFixture, BasicScopedDbConnection) {
ScopedDbConnection conn1(TARGET_HOST);
ScopedDbConnection conn2(TARGET_HOST);
DBClientBase* conn1Ptr = conn1.get();
conn1.done();
ScopedDbConnection conn3(TARGET_HOST);
ASSERT_EQUALS(conn1Ptr, conn3.get());
conn2.done();
conn3.done();
}
TEST_F(DummyServerFixture, ScopedDbConnectionWithTimeout) {
auto delay = Milliseconds{8000};
auto gracePeriod = Milliseconds{100};
auto uri_sw = MongoURI::parse("mongodb://" + TARGET_HOST + "/?socketTimeoutMS=4000");
ASSERT_OK(uri_sw.getStatus());
auto uri = uri_sw.getValue();
Date_t start, end;
const auto uriTimeout = Seconds{4};
const auto overrideTimeout = Seconds{1};
ScopedDbConnection conn1(TARGET_HOST);
setReplyDelay(delay);
log() << "Testing ConnectionString timeouts";
start = Date_t::now();
ASSERT_THROWS(ScopedDbConnection conn2(TARGET_HOST, overrideTimeout.count()), SocketException);
end = Date_t::now();
// We add 100 milliseconds here because on some platforms the connection might timeout after
// 997ms instead of >= 1000ms.
ASSERT_GTE((end - start) + gracePeriod, overrideTimeout);
ASSERT_LT(end - start, uriTimeout);
log() << "Testing MongoURI with explicit timeout";
start = Date_t::now();
ASSERT_THROWS(ScopedDbConnection conn4(uri, overrideTimeout.count()), UserException);
end = Date_t::now();
ASSERT_GTE((end - start) + gracePeriod, overrideTimeout);
ASSERT_LT(end - start, uriTimeout);
log() << "Testing MongoURI doesn't timeout";
start = Date_t::now();
ScopedDbConnection conn5(uri);
end = Date_t::now();
ASSERT_GREATER_THAN((end - start) + gracePeriod, uriTimeout);
setReplyDelay(Milliseconds{0});
}
TEST_F(DummyServerFixture, InvalidateBadConnInPool) {
ScopedDbConnection conn1(TARGET_HOST);
ScopedDbConnection conn2(TARGET_HOST);
ScopedDbConnection conn3(TARGET_HOST);
conn1.done();
conn3.done();
const uint64_t badCreationTime = curTimeMicros64();
getGlobalFailPointRegistry()->getFailPoint("throwSockExcep")->setMode(FailPoint::alwaysOn);
try {
conn2->query("test.user", Query());
} catch (const SocketException&) {
}
getGlobalFailPointRegistry()->getFailPoint("throwSockExcep")->setMode(FailPoint::off);
conn2.done();
checkNewConns(assertGreaterThan, badCreationTime, 10);
}
TEST_F(DummyServerFixture, DontReturnKnownBadConnToPool) {
ScopedDbConnection conn1(TARGET_HOST);
ScopedDbConnection conn2(TARGET_HOST);
ScopedDbConnection conn3(TARGET_HOST);
conn1.done();
getGlobalFailPointRegistry()->getFailPoint("throwSockExcep")->setMode(FailPoint::alwaysOn);
try {
conn3->query("test.user", Query());
} catch (const SocketException&) {
}
getGlobalFailPointRegistry()->getFailPoint("throwSockExcep")->setMode(FailPoint::off);
const uint64_t badCreationTime = conn3->getSockCreationMicroSec();
conn3.done();
// attempting to put a 'bad' connection back to the pool
conn2.done();
checkNewConns(assertGreaterThan, badCreationTime, 10);
}
TEST_F(DummyServerFixture, InvalidateBadConnEvenWhenPoolIsFull) {
globalConnPool.setMaxPoolSize(2);
ScopedDbConnection conn1(TARGET_HOST);
ScopedDbConnection conn2(TARGET_HOST);
ScopedDbConnection conn3(TARGET_HOST);
conn1.done();
conn3.done();
const uint64_t badCreationTime = curTimeMicros64();
getGlobalFailPointRegistry()->getFailPoint("throwSockExcep")->setMode(FailPoint::alwaysOn);
try {
conn2->query("test.user", Query());
} catch (const SocketException&) {
}
getGlobalFailPointRegistry()->getFailPoint("throwSockExcep")->setMode(FailPoint::off);
conn2.done();
checkNewConns(assertGreaterThan, badCreationTime, 2);
}
TEST_F(DummyServerFixture, DontReturnConnGoneBadToPool) {
ScopedDbConnection conn1(TARGET_HOST);
const uint64_t conn1CreationTime = conn1->getSockCreationMicroSec();
uint64_t conn2CreationTime = 0;
{
ScopedDbConnection conn2(TARGET_HOST);
conn2CreationTime = conn2->getSockCreationMicroSec();
conn1.done();
// conn2 gets out of scope without calling done()
}
// conn2 should not have been put back into the pool but it should
// also not invalidate older connections since it didn't encounter
// a socket exception.
ScopedDbConnection conn1Again(TARGET_HOST);
ASSERT_EQUALS(conn1CreationTime, conn1Again->getSockCreationMicroSec());
checkNewConns(assertNotEqual, conn2CreationTime, 10);
conn1Again.done();
}
class DummyServiceEntryPointWithInternalClientInfoCheck final : public DummyServiceEntryPoint {
private:
void commandRequestHook(const rpc::RequestInterface* request) const final {
if (request->getCommandName() != "isMaster") {
// It's not an isMaster request. Nothing to do.
return;
}
BSONObj commandArgs = request->getCommandArgs();
auto internalClientElem = commandArgs["internalClient"];
ASSERT_EQ(internalClientElem.type(), BSONType::Object);
auto minWireVersionElem = internalClientElem.Obj()["minWireVersion"];
auto maxWireVersionElem = internalClientElem.Obj()["maxWireVersion"];
ASSERT_EQ(minWireVersionElem.type(), BSONType::NumberInt);
ASSERT_EQ(maxWireVersionElem.type(), BSONType::NumberInt);
ASSERT_EQ(minWireVersionElem.numberInt(), WireSpec::instance().outgoing.minWireVersion);
ASSERT_EQ(maxWireVersionElem.numberInt(), WireSpec::instance().outgoing.maxWireVersion);
}
};
class DummyServerFixtureWithInternalClientInfoCheck : public DummyServerFixture {
private:
std::unique_ptr makeServiceEntryPoint() const final {
return stdx::make_unique();
}
bool isInternalClient() const final {
return true;
}
};
TEST_F(DummyServerFixtureWithInternalClientInfoCheck, VerifyIsMasterRequestOnConnectionOpen) {
// The isMaster handshake will occur on connection open. The request is verified by the test
// fixture.
ScopedDbConnection conn(TARGET_HOST);
conn.done();
}
class DummyServiceEntryPointWithInternalClientMissingCheck final : public DummyServiceEntryPoint {
private:
void commandRequestHook(const rpc::RequestInterface* request) const final {
if (request->getCommandName() != "isMaster") {
// It's not an isMaster request. Nothing to do.
return;
}
BSONObj commandArgs = request->getCommandArgs();
ASSERT_FALSE(commandArgs["internalClient"]);
}
};
class DummyServerFixtureWithInternalClientMissingCheck : public DummyServerFixture {
private:
std::unique_ptr makeServiceEntryPoint() const final {
return stdx::make_unique();
}
};
TEST_F(DummyServerFixtureWithInternalClientMissingCheck, VerifyIsMasterRequestOnConnectionOpen) {
// The isMaster handshake will occur on connection open. The request is verified by the test
// fixture.
ScopedDbConnection conn(TARGET_HOST);
conn.done();
}
} // namespace
} // namespace mongo