summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJason Carey <jcarey@argv.me>2015-09-02 16:19:04 -0400
committerJason Carey <jcarey@argv.me>2015-09-09 13:28:53 -0400
commit08557952c61958166265817ca8bd0c991c01dd59 (patch)
tree5095403149b1f1109bd20977429a2c3afb954968
parent80eff9c7d945b0c447261de8733a0fd154e32d27 (diff)
downloadmongo-08557952c61958166265817ca8bd0c991c01dd59.tar.gz
SERVER-19769 Fixups for Connection Pooling
Various fixes for connection pooling
-rw-r--r--src/mongo/executor/connection_pool.cpp84
-rw-r--r--src/mongo/executor/connection_pool.h8
-rw-r--r--src/mongo/executor/connection_pool_asio.cpp52
-rw-r--r--src/mongo/executor/connection_pool_asio.h10
-rw-r--r--src/mongo/executor/connection_pool_asio_integration_test.cpp2
-rw-r--r--src/mongo/executor/connection_pool_test.cpp2
-rw-r--r--src/mongo/executor/connection_pool_test_fixture.cpp8
-rw-r--r--src/mongo/executor/connection_pool_test_fixture.h6
-rw-r--r--src/mongo/executor/network_interface_asio.cpp9
-rw-r--r--src/mongo/executor/network_interface_asio.h4
-rw-r--r--src/mongo/executor/network_interface_asio_command.cpp8
11 files changed, 116 insertions, 77 deletions
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp
index a5e2d9e2e91..7afa474a5e4 100644
--- a/src/mongo/executor/connection_pool.cpp
+++ b/src/mongo/executor/connection_pool.cpp
@@ -56,8 +56,6 @@ public:
SpecificPool(ConnectionPool* parent, const HostAndPort& hostAndPort);
~SpecificPool();
- void dropConnections(stdx::unique_lock<stdx::mutex> lk);
-
/**
* Get's a connection from the specific pool. Sinks a unique_lock from the
* parent to preserve the lock on _mutex
@@ -68,6 +66,13 @@ public:
GetConnectionCallback cb);
/**
+ * Cascade a failure across existing connections and requests. Invoking
+ * this function drops all current connections and fails all current
+ * requests with the passed status.
+ */
+ void processFailure(const Status& status, stdx::unique_lock<stdx::mutex> lk);
+
+ /**
* Returns a connection to a specific pool. Sinks a unique_lock from the
* parent to preserve the lock on _mutex
*/
@@ -85,8 +90,6 @@ private:
void addToReady(stdx::unique_lock<stdx::mutex>& lk, OwnedConnection conn);
- void failAllRequests(const Status& status, stdx::unique_lock<stdx::mutex> lk);
-
void fulfillRequests(stdx::unique_lock<stdx::mutex>& lk);
void spawnConnections(stdx::unique_lock<stdx::mutex>& lk, const HostAndPort& hostAndPort);
@@ -143,8 +146,8 @@ private:
// TODO: revisit these durations when we come up with a more pervasive solution
// for NetworkInterfaceASIO's timers
-Milliseconds const ConnectionPool::kDefaultRefreshTimeout = Seconds(30);
-Milliseconds const ConnectionPool::kDefaultRefreshRequirement = Minutes(1);
+Milliseconds const ConnectionPool::kDefaultRefreshTimeout = Minutes(5);
+Milliseconds const ConnectionPool::kDefaultRefreshRequirement = Minutes(5);
Milliseconds const ConnectionPool::kDefaultHostTimeout = Minutes(5);
ConnectionPool::ConnectionPool(std::unique_ptr<DependentTypeFactoryInterface> impl, Options options)
@@ -160,7 +163,8 @@ void ConnectionPool::dropConnections(const HostAndPort& hostAndPort) {
if (iter == _pools.end())
return;
- iter->second.get()->dropConnections(std::move(lk));
+ iter->second.get()->processFailure(
+ Status(ErrorCodes::PooledConnectionsDropped, "Pooled connections dropped"), std::move(lk));
}
void ConnectionPool::get(const HostAndPort& hostAndPort,
@@ -207,21 +211,6 @@ ConnectionPool::SpecificPool::~SpecificPool() {
DESTRUCTOR_GUARD(_requestTimer->cancelTimeout();)
}
-void ConnectionPool::SpecificPool::dropConnections(stdx::unique_lock<stdx::mutex> lk) {
- _generation++;
-
- _readyPool.clear();
-
- for (auto&& x : _processingPool) {
- _droppedProcessingPool[x.first] = std::move(x.second);
- }
-
- _processingPool.clear();
-
- failAllRequests(Status(ErrorCodes::PooledConnectionsDropped, "Pooled connections dropped"),
- std::move(lk));
-}
-
void ConnectionPool::SpecificPool::getConnection(const HostAndPort& hostAndPort,
Milliseconds timeout,
stdx::unique_lock<stdx::mutex> lk,
@@ -239,15 +228,25 @@ void ConnectionPool::SpecificPool::getConnection(const HostAndPort& hostAndPort,
void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr,
stdx::unique_lock<stdx::mutex> lk) {
auto needsRefreshTP = connPtr->getLastUsed() + _parent->_options.refreshRequirement;
- auto now = _parent->_factory->now();
auto conn = takeFromPool(_checkedOutPool, connPtr);
- if (conn->isFailed() || conn->getGeneration() != _generation) {
- // If the connection failed or has been dropped, simply let it lapse
+ updateStateInLock();
+
+ if (conn->getGeneration() != _generation) {
+ // If the connection is from an older generation, just return.
+ return;
+ }
+
+ if (!conn->getStatus().isOK()) {
+ // If the connection failed cascade to all callers.
//
// TODO: alert via some callback if the host is bad
- } else if (needsRefreshTP <= now) {
+ return processFailure(conn->getStatus(), std::move(lk));
+ }
+
+ auto now = _parent->_factory->now();
+ if (needsRefreshTP <= now) {
// If we need to refresh this connection
if (_readyPool.size() + _processingPool.size() + _checkedOutPool.size() >=
@@ -284,7 +283,7 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr
}
// Otherwise pass the failure on through
- failAllRequests(status, std::move(lk));
+ processFailure(status, std::move(lk));
});
lk.lock();
} else {
@@ -295,9 +294,7 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr
updateStateInLock();
}
-/**
- * Adds a live connection to the ready pool
- */
+// Adds a live connection to the ready pool
void ConnectionPool::SpecificPool::addToReady(stdx::unique_lock<stdx::mutex>& lk,
OwnedConnection conn) {
auto connPtr = conn.get();
@@ -327,12 +324,25 @@ void ConnectionPool::SpecificPool::addToReady(stdx::unique_lock<stdx::mutex>& lk
fulfillRequests(lk);
}
-void ConnectionPool::SpecificPool::failAllRequests(const Status& status,
- stdx::unique_lock<stdx::mutex> lk) {
+// Drop connections and fail all requests
+void ConnectionPool::SpecificPool::processFailure(const Status& status,
+ stdx::unique_lock<stdx::mutex> lk) {
+ // Bump the generation so we don't reuse any pending or checked out
+ // connections
+ _generation++;
+
+ // Drop ready connections
+ _readyPool.clear();
+
+ // Migrate processing connections to the dropped pool
+ for (auto&& x : _processingPool) {
+ _droppedProcessingPool[x.first] = std::move(x.second);
+ }
+ _processingPool.clear();
+
// Move the requests out so they aren't visible
// in other threads
decltype(_requests) requestsToFail;
-
{
using std::swap;
swap(requestsToFail, _requests);
@@ -423,11 +433,9 @@ void ConnectionPool::SpecificPool::spawnConnections(stdx::unique_lock<stdx::mute
// connection lapse
} else if (status.isOK()) {
addToReady(lk, std::move(conn));
- } else if (_requests.size()) {
- // If the setup request failed, immediately fail
- // all other pending requests.
-
- failAllRequests(status, std::move(lk));
+ } else {
+ // If the setup failed, cascade the failure edge
+ processFailure(status, std::move(lk));
}
});
// Note that this assumes that the refreshTimeout is sound for the
diff --git a/src/mongo/executor/connection_pool.h b/src/mongo/executor/connection_pool.h
index d5a0332b5cb..d46b18b9812 100644
--- a/src/mongo/executor/connection_pool.h
+++ b/src/mongo/executor/connection_pool.h
@@ -196,7 +196,7 @@ public:
* Indicates that a connection has failed. This will prevent the connection
* from re-entering the connection pool.
*/
- virtual void indicateFailed() = 0;
+ virtual void indicateFailed(Status status) = 0;
/**
* The HostAndPort for the connection. This should be the same as the
@@ -219,10 +219,10 @@ private:
virtual Date_t getLastUsed() const = 0;
/**
- * Returns true if the connection is failed. This implies that it should
- * not be returned to the pool.
+ * Returns the status associated with the connection. If the status is not
+ * OK, the connection will not be returned to the pool.
*/
- virtual bool isFailed() const = 0;
+ virtual const Status& getStatus() const = 0;
/**
* Sets up the connection. This should include connection + auth + any
diff --git a/src/mongo/executor/connection_pool_asio.cpp b/src/mongo/executor/connection_pool_asio.cpp
index 803d0096ddd..62a21cd40d6 100644
--- a/src/mongo/executor/connection_pool_asio.cpp
+++ b/src/mongo/executor/connection_pool_asio.cpp
@@ -34,6 +34,7 @@
#include "mongo/executor/async_stream_factory_interface.h"
#include "mongo/rpc/factory.h"
+#include "mongo/rpc/legacy_request_builder.h"
#include "mongo/rpc/reply_interface.h"
#include "mongo/stdx/memory.h"
@@ -41,22 +42,37 @@ namespace mongo {
namespace executor {
namespace connection_pool_asio {
-ASIOTimer::ASIOTimer(asio::io_service* io_service) : _io_service(io_service), _impl(*io_service) {}
+ASIOTimer::ASIOTimer(asio::io_service* io_service)
+ : _io_service(io_service), _impl(*io_service), _canceled(true) {}
+
+ASIOTimer::~ASIOTimer() {
+ cancelTimeout();
+}
void ASIOTimer::setTimeout(Milliseconds timeout, TimeoutCallback cb) {
_cb = std::move(cb);
cancelTimeout();
_impl.expires_after(timeout);
+ _canceled = false;
+
_impl.async_wait([this](const asio::error_code& error) {
- auto cb = std::move(_cb);
+ if (error == asio::error::operation_aborted) {
+ return;
+ }
+
+ bool wasCanceled = _canceled;
+ _canceled = true;
- if (error != asio::error::operation_aborted)
+ if (!wasCanceled) {
+ auto cb = std::move(_cb);
cb();
+ }
});
}
void ASIOTimer::cancelTimeout() {
+ _canceled = true;
_impl.cancel();
}
@@ -71,8 +87,8 @@ void ASIOConnection::indicateUsed() {
_lastUsed = _global->now();
}
-void ASIOConnection::indicateFailed() {
- _isFailed = true;
+void ASIOConnection::indicateFailed(Status status) {
+ _status = std::move(status);
}
const HostAndPort& ASIOConnection::getHostAndPort() const {
@@ -83,8 +99,8 @@ Date_t ASIOConnection::getLastUsed() const {
return _lastUsed;
}
-bool ASIOConnection::isFailed() const {
- return _isFailed;
+const Status& ASIOConnection::getStatus() const {
+ return _status;
}
size_t ASIOConnection::getGeneration() const {
@@ -95,7 +111,8 @@ std::unique_ptr<NetworkInterfaceASIO::AsyncOp> ASIOConnection::makeAsyncOp(ASIOC
return stdx::make_unique<NetworkInterfaceASIO::AsyncOp>(
conn->_global->_impl,
TaskExecutor::CallbackHandle(),
- makeIsMasterRequest(conn),
+ RemoteCommandRequest{
+ conn->getHostAndPort(), std::string("admin"), BSON("isMaster" << 1), BSONObj()},
[conn](const TaskExecutor::ResponseStatus& status) {
auto cb = std::move(conn->_setupCallback);
cb(conn, status.isOK() ? Status::OK() : status.getStatus());
@@ -103,8 +120,14 @@ std::unique_ptr<NetworkInterfaceASIO::AsyncOp> ASIOConnection::makeAsyncOp(ASIOC
conn->_global->now());
}
-RemoteCommandRequest ASIOConnection::makeIsMasterRequest(ASIOConnection* conn) {
- return {conn->_hostAndPort, std::string("admin"), BSON("isMaster" << 1), BSONObj()};
+Message ASIOConnection::makeIsMasterRequest(ASIOConnection* conn) {
+ rpc::LegacyRequestBuilder requestBuilder{};
+ requestBuilder.setDatabase("admin");
+ requestBuilder.setCommandName("isMaster");
+ requestBuilder.setMetadata(rpc::makeEmptyMetadata());
+ requestBuilder.setCommandArgs(BSON("isMaster" << 1));
+
+ return std::move(*(requestBuilder.done()));
}
void ASIOConnection::setTimeout(Milliseconds timeout, TimeoutCallback cb) {
@@ -138,6 +161,7 @@ void ASIOConnection::refresh(Milliseconds timeout, RefreshCallback cb) {
if (!beginStatus.isOK()) {
auto cb = std::move(_refreshCallback);
cb(this, beginStatus);
+ return;
}
_global->_impl->_asyncRunCommand(
@@ -145,11 +169,11 @@ void ASIOConnection::refresh(Milliseconds timeout, RefreshCallback cb) {
[this, op](std::error_code ec, size_t bytes) {
cancelTimeout();
- if (ec) {
- return _refreshCallback(this, Status(ErrorCodes::HostUnreachable, ec.message()));
- }
-
auto cb = std::move(_refreshCallback);
+
+ if (ec)
+ return cb(this, Status(ErrorCodes::HostUnreachable, ec.message()));
+
cb(this, Status::OK());
});
}
diff --git a/src/mongo/executor/connection_pool_asio.h b/src/mongo/executor/connection_pool_asio.h
index 296a5d20859..b99db55aa31 100644
--- a/src/mongo/executor/connection_pool_asio.h
+++ b/src/mongo/executor/connection_pool_asio.h
@@ -42,6 +42,7 @@ namespace connection_pool_asio {
class ASIOTimer final : public ConnectionPool::TimerInterface {
public:
ASIOTimer(asio::io_service* service);
+ ~ASIOTimer();
void setTimeout(Milliseconds timeout, TimeoutCallback cb) override;
void cancelTimeout() override;
@@ -50,6 +51,7 @@ private:
TimeoutCallback _cb;
asio::io_service* const _io_service;
asio::steady_timer _impl;
+ bool _canceled;
};
/**
@@ -62,7 +64,7 @@ public:
ASIOConnection(const HostAndPort& hostAndPort, size_t generation, ASIOImpl* global);
void indicateUsed() override;
- void indicateFailed() override;
+ void indicateFailed(Status status) override;
const HostAndPort& getHostAndPort() const override;
std::unique_ptr<NetworkInterfaceASIO::AsyncOp> releaseAsyncOp();
@@ -70,7 +72,7 @@ public:
private:
Date_t getLastUsed() const override;
- bool isFailed() const override;
+ const Status& getStatus() const override;
void setTimeout(Milliseconds timeout, TimeoutCallback cb) override;
void cancelTimeout() override;
@@ -81,7 +83,7 @@ private:
size_t getGeneration() const override;
static std::unique_ptr<NetworkInterfaceASIO::AsyncOp> makeAsyncOp(ASIOConnection* conn);
- static RemoteCommandRequest makeIsMasterRequest(ASIOConnection* conn);
+ static Message makeIsMasterRequest(ASIOConnection* conn);
private:
SetupCallback _setupCallback;
@@ -89,7 +91,7 @@ private:
ASIOImpl* const _global;
ASIOTimer _timer;
Date_t _lastUsed;
- bool _isFailed = false;
+ Status _status = Status::OK();
HostAndPort _hostAndPort;
size_t _generation;
std::unique_ptr<NetworkInterfaceASIO::AsyncOp> _impl;
diff --git a/src/mongo/executor/connection_pool_asio_integration_test.cpp b/src/mongo/executor/connection_pool_asio_integration_test.cpp
index 782e00aa28b..9c82a7e13e4 100644
--- a/src/mongo/executor/connection_pool_asio_integration_test.cpp
+++ b/src/mongo/executor/connection_pool_asio_integration_test.cpp
@@ -91,7 +91,7 @@ TEST(ConnectionPoolASIO, TestPing) {
net.startup();
auto guard = MakeGuard([&] { net.shutdown(); });
- const int N = 500;
+ const int N = 50;
std::array<stdx::thread, N> threads;
diff --git a/src/mongo/executor/connection_pool_test.cpp b/src/mongo/executor/connection_pool_test.cpp
index be951ccbafc..12f59572dd2 100644
--- a/src/mongo/executor/connection_pool_test.cpp
+++ b/src/mongo/executor/connection_pool_test.cpp
@@ -99,7 +99,7 @@ TEST_F(ConnectionPoolTest, FailedConnDifferentConn) {
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
conn1Id = CONN2ID(swConn);
- swConn.getValue()->indicateFailed();
+ swConn.getValue()->indicateFailed(Status(ErrorCodes::BadValue, "error"));
});
// Grab the second id
diff --git a/src/mongo/executor/connection_pool_test_fixture.cpp b/src/mongo/executor/connection_pool_test_fixture.cpp
index 860275ebf39..e7c9703984f 100644
--- a/src/mongo/executor/connection_pool_test_fixture.cpp
+++ b/src/mongo/executor/connection_pool_test_fixture.cpp
@@ -81,8 +81,8 @@ void ConnectionImpl::indicateUsed() {
_lastUsed = _global->now();
}
-void ConnectionImpl::indicateFailed() {
- _isFailed = true;
+void ConnectionImpl::indicateFailed(Status status) {
+ _status = std::move(status);
}
size_t ConnectionImpl::id() const {
@@ -132,8 +132,8 @@ Date_t ConnectionImpl::getLastUsed() const {
return _lastUsed;
}
-bool ConnectionImpl::isFailed() const {
- return _isFailed;
+const Status& ConnectionImpl::getStatus() const {
+ return _status;
}
void ConnectionImpl::setTimeout(Milliseconds timeout, TimeoutCallback cb) {
diff --git a/src/mongo/executor/connection_pool_test_fixture.h b/src/mongo/executor/connection_pool_test_fixture.h
index 684b458fd28..edc83c4c401 100644
--- a/src/mongo/executor/connection_pool_test_fixture.h
+++ b/src/mongo/executor/connection_pool_test_fixture.h
@@ -82,7 +82,7 @@ public:
void indicateUsed() override;
- void indicateFailed() override;
+ void indicateFailed(Status status) override;
const HostAndPort& getHostAndPort() const override;
@@ -100,7 +100,7 @@ public:
private:
Date_t getLastUsed() const override;
- bool isFailed() const override;
+ const Status& getStatus() const override;
void setTimeout(Milliseconds timeout, TimeoutCallback cb) override;
@@ -114,7 +114,7 @@ private:
HostAndPort _hostAndPort;
Date_t _lastUsed;
- bool _isFailed = false;
+ Status _status = Status::OK();
SetupCallback _setupCallback;
RefreshCallback _refreshCallback;
TimerImpl _timer;
diff --git a/src/mongo/executor/network_interface_asio.cpp b/src/mongo/executor/network_interface_asio.cpp
index c4c31596049..01b36857900 100644
--- a/src/mongo/executor/network_interface_asio.cpp
+++ b/src/mongo/executor/network_interface_asio.cpp
@@ -61,9 +61,9 @@ NetworkInterfaceASIO::NetworkInterfaceASIO(
_resolver(_io_service),
_state(State::kReady),
_streamFactory(std::move(streamFactory)),
- _isExecutorRunnable(false),
_connectionPool(stdx::make_unique<connection_pool_asio::ASIOImpl>(this),
- _options.connectionPoolOptions) {}
+ _options.connectionPoolOptions),
+ _isExecutorRunnable(false) {}
std::string NetworkInterfaceASIO::getDiagnosticString() {
str::stream output;
@@ -152,6 +152,7 @@ void NetworkInterfaceASIO::startCommand(const TaskExecutor::CallbackHandle& cbHa
}
onFinish(swConn.getStatus());
+ signalWorkAvailable();
return;
}
@@ -182,11 +183,11 @@ void NetworkInterfaceASIO::startCommand(const TaskExecutor::CallbackHandle& cbHa
_beginCommunication(op);
};
- // TODO: thread some higher level timeout through, rather than 10 seconds,
+ // TODO: thread some higher level timeout through, rather than 5 minutes,
// once we make timeouts pervasive in this api.
asio::post(
_io_service,
- [this, request, nextStep] { _connectionPool.get(request.target, Seconds(10), nextStep); });
+ [this, request, nextStep] { _connectionPool.get(request.target, Minutes(5), nextStep); });
}
void NetworkInterfaceASIO::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) {
diff --git a/src/mongo/executor/network_interface_asio.h b/src/mongo/executor/network_interface_asio.h
index c2f01806b10..0577728e741 100644
--- a/src/mongo/executor/network_interface_asio.h
+++ b/src/mongo/executor/network_interface_asio.h
@@ -319,6 +319,8 @@ private:
std::unique_ptr<AsyncStreamFactoryInterface> _streamFactory;
+ ConnectionPool _connectionPool;
+
stdx::mutex _inProgressMutex;
std::unordered_map<AsyncOp*, std::unique_ptr<AsyncOp>> _inProgress;
std::vector<TaskExecutor::CallbackHandle> _inGetConnection;
@@ -326,8 +328,6 @@ private:
stdx::mutex _executorMutex;
bool _isExecutorRunnable;
stdx::condition_variable _isExecutorRunnableCondition;
-
- ConnectionPool _connectionPool;
};
template <typename T, typename R, typename... MethodArgs, typename... DeducedArgs>
diff --git a/src/mongo/executor/network_interface_asio_command.cpp b/src/mongo/executor/network_interface_asio_command.cpp
index 889ba5c11d5..8d20c9ac62e 100644
--- a/src/mongo/executor/network_interface_asio_command.cpp
+++ b/src/mongo/executor/network_interface_asio_command.cpp
@@ -215,7 +215,7 @@ void NetworkInterfaceASIO::_beginCommunication(AsyncOp* op) {
// codepath.
if (op->_inSetup) {
op->_inSetup = false;
- op->finish(op->command()->response(rpc::Protocol::kOpQuery, now()));
+ op->finish(RemoteCommandResponse());
return;
}
@@ -274,7 +274,11 @@ void NetworkInterfaceASIO::_completeOperation(AsyncOp* op, const ResponseStatus&
auto asioConn = static_cast<connection_pool_asio::ASIOConnection*>(conn.get());
asioConn->bindAsyncOp(std::move(ownedOp));
- asioConn->indicateUsed();
+ if (!resp.isOK()) {
+ asioConn->indicateFailed(resp.getStatus());
+ } else {
+ asioConn->indicateUsed();
+ }
signalWorkAvailable();
}