summaryrefslogtreecommitdiff
path: root/src/mongo/executor
diff options
context:
space:
mode:
authorBen Caimano <ben.caimano@10gen.com>2019-06-30 13:35:40 -0400
committerBen Caimano <ben.caimano@10gen.com>2019-07-01 18:55:43 -0400
commitb9198150102c2edd2c2d27f0db2a96eaef9b6e38 (patch)
treeb27f2e1dba6d6840aee32f30a626a419166782ce /src/mongo/executor
parent6dabe5871fcd280f654e475e7048385b54b1ea64 (diff)
downloadmongo-b9198150102c2edd2c2d27f0db2a96eaef9b6e38.tar.gz
SERVER-42026 Lock during ConnectionPool::SpecificPool::spawnConnections()
Diffstat (limited to 'src/mongo/executor')
-rw-r--r--src/mongo/executor/connection_pool.cpp30
-rw-r--r--src/mongo/executor/connection_pool_test.cpp30
-rw-r--r--src/mongo/executor/connection_pool_test_fixture.cpp12
-rw-r--r--src/mongo/executor/connection_pool_test_fixture.h6
4 files changed, 47 insertions, 31 deletions
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp
index db157a12c7d..9a8ff0f0a96 100644
--- a/src/mongo/executor/connection_pool.cpp
+++ b/src/mongo/executor/connection_pool.cpp
@@ -331,9 +331,11 @@ public:
template <typename CallableT>
void runOnExecutor(CallableT&& cb) {
ExecutorFuture(ExecutorPtr(_parent->_factory->getExecutor())) //
- .getAsync([ anchor = shared_from_this(),
- cb = std::forward<CallableT>(cb) ](Status && status) mutable {
+ .getAsync([ this, anchor = shared_from_this(), cb = std::forward<CallableT>(cb) ](
+ Status && status) mutable {
invariant(status);
+
+ stdx::lock_guard lk(_parent->_mutex);
cb();
});
}
@@ -489,22 +491,17 @@ void ConnectionPool::dropConnections(const HostAndPort& hostAndPort) {
if (iter == _pools.end())
return;
- auto pool = iter->second;
+ auto& pool = iter->second;
pool->triggerShutdown(
Status(ErrorCodes::PooledConnectionsDropped, "Pooled connections dropped"));
}
void ConnectionPool::dropConnections(transport::Session::TagMask tags) {
- // Grab all current pools (under the lock)
- auto pools = [&] {
- stdx::lock_guard lk(_mutex);
- return _pools;
- }();
+ stdx::lock_guard lk(_mutex);
- for (const auto& pair : pools) {
+ for (const auto& pair : _pools) {
auto& pool = pair.second;
- stdx::lock_guard lk(_mutex);
if (pool->matchesTags(tags))
continue;
@@ -664,8 +661,6 @@ Future<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::getConnec
auto ConnectionPool::SpecificPool::makeHandle(ConnectionInterface* connection) -> ConnectionHandle {
auto deleter = [ this, anchor = shared_from_this() ](ConnectionInterface * connection) {
runOnExecutor([this, connection]() {
- stdx::lock_guard lk(_parent->_mutex);
-
returnConnection(connection);
_lastActiveTime = _parent->_factory->now();
@@ -837,9 +832,16 @@ void ConnectionPool::SpecificPool::addToReady(OwnedConnection conn) {
// Sets state to shutdown and kicks off the failure protocol to tank existing connections
void ConnectionPool::SpecificPool::triggerShutdown(const Status& status) {
- _health.isShutdown = true;
+ auto wasShutdown = std::exchange(_health.isShutdown, true);
+ if (wasShutdown) {
+ return;
+ }
LOG(2) << "Delisting connection pool for " << _hostAndPort;
+
+ // Make sure the pool lifetime lasts until the end of this function,
+ // it could be only in the map of pools
+ auto anchor = shared_from_this();
_parent->_controller->removeHost(_id);
_parent->_pools.erase(_hostAndPort);
@@ -1118,7 +1120,7 @@ void ConnectionPool::SpecificPool::updateController() {
}
}
- runOnExecutor([ this, anchor = shared_from_this() ]() { spawnConnections(); });
+ runOnExecutor([this]() { spawnConnections(); });
}
// Updates our state and manages the request timer
diff --git a/src/mongo/executor/connection_pool_test.cpp b/src/mongo/executor/connection_pool_test.cpp
index 048e694af4f..31b27dc9238 100644
--- a/src/mongo/executor/connection_pool_test.cpp
+++ b/src/mongo/executor/connection_pool_test.cpp
@@ -64,12 +64,28 @@ protected:
}
auto makePool(ConnectionPool::Options options = {}) {
- _pool =
- std::make_shared<ConnectionPool>(std::make_shared<PoolImpl>(), "test pool", options);
+ _pool = std::make_shared<ConnectionPool>(
+ std::make_shared<PoolImpl>(_executor), "test pool", options);
return _pool;
}
+ /**
+ * Get from a pool with out-of-line execution and return the future for a connection
+ *
+ * Since the InlineOutOfLineExecutor starts running on the same thread once schedule is called,
+ * this function allows us to avoid deadlocks with get(), which is the only public function that
+ * calls schedule while holding a lock. In normal operation, the OutOfLineExecutor is actually
+ * out of line, and this contrivance isn't necessary.
+ */
+ template <typename... Args>
+ auto getFromPool(Args&&... args) {
+ return ExecutorFuture(_executor)
+ .then([ pool = _pool, args... ]() { return pool->get(args...); })
+ .semi();
+ }
+
private:
+ std::shared_ptr<OutOfLineExecutor> _executor = std::make_shared<InlineOutOfLineExecutor>();
std::shared_ptr<ConnectionPool> _pool;
};
@@ -1440,7 +1456,7 @@ TEST_F(ConnectionPoolTest, AsyncGet) {
size_t connId = 0;
// no connections in the pool, our future is not satisfied
- auto connFuture = pool->get(HostAndPort(), transport::kGlobalSSLMode, Seconds{1});
+ auto connFuture = getFromPool(HostAndPort(), transport::kGlobalSSLMode, Seconds{1});
ASSERT_FALSE(connFuture.isReady());
// Successfully get a new connection
@@ -1463,8 +1479,8 @@ TEST_F(ConnectionPoolTest, AsyncGet) {
size_t connId2 = 0;
size_t connId3 = 0;
- auto connFuture1 = pool->get(HostAndPort(), transport::kGlobalSSLMode, Seconds{1});
- auto connFuture2 = pool->get(HostAndPort(), transport::kGlobalSSLMode, Seconds{10});
+ auto connFuture1 = getFromPool(HostAndPort(), transport::kGlobalSSLMode, Seconds{1});
+ auto connFuture2 = getFromPool(HostAndPort(), transport::kGlobalSSLMode, Seconds{10});
// The first future should be immediately ready. The second should be in the queue.
ASSERT_TRUE(connFuture1.isReady());
@@ -1475,7 +1491,7 @@ TEST_F(ConnectionPoolTest, AsyncGet) {
auto conn1 = std::move(connFuture1).get();
// Grab our third future while our first one is being fulfilled
- connFuture3 = pool->get(HostAndPort(), transport::kGlobalSSLMode, Seconds{1});
+ connFuture3 = getFromPool(HostAndPort(), transport::kGlobalSSLMode, Seconds{1});
connId1 = getId(conn1);
doneWith(conn1);
@@ -1512,7 +1528,7 @@ TEST_F(ConnectionPoolTest, ReturnAfterShutdown) {
auto pool = makePool();
// Grab a connection and hold it to end of scope
- auto connFuture = pool->get(HostAndPort(), transport::kGlobalSSLMode, Seconds(1));
+ auto connFuture = getFromPool(HostAndPort(), transport::kGlobalSSLMode, Seconds(1));
ConnectionImpl::pushSetup(Status::OK());
auto conn = std::move(connFuture).get();
doneWith(conn);
diff --git a/src/mongo/executor/connection_pool_test_fixture.cpp b/src/mongo/executor/connection_pool_test_fixture.cpp
index dd8f4012388..9b28d752157 100644
--- a/src/mongo/executor/connection_pool_test_fixture.cpp
+++ b/src/mongo/executor/connection_pool_test_fixture.cpp
@@ -65,13 +65,15 @@ void TimerImpl::clear() {
}
}
-void TimerImpl::fireIfNecessary() {
- auto now = PoolImpl().now();
+Date_t TimerImpl::now() {
+ return _global->now();
+}
+void TimerImpl::fireIfNecessary() {
auto timers = _timers;
for (auto&& x : timers) {
- if (_timers.count(x) && (x->_expiration <= now)) {
+ if (_timers.count(x) && (x->_expiration <= x->now())) {
auto execCB = [cb = std::move(x->_cb)](auto&&) mutable {
std::move(cb)();
};
@@ -82,10 +84,6 @@ void TimerImpl::fireIfNecessary() {
}
}
-Date_t TimerImpl::now() {
- return _global->now();
-}
-
std::set<TimerImpl*> TimerImpl::_timers;
ConnectionImpl::ConnectionImpl(const HostAndPort& hostAndPort, size_t generation, PoolImpl* global)
diff --git a/src/mongo/executor/connection_pool_test_fixture.h b/src/mongo/executor/connection_pool_test_fixture.h
index 89cd65e48ec..8fec73b5953 100644
--- a/src/mongo/executor/connection_pool_test_fixture.h
+++ b/src/mongo/executor/connection_pool_test_fixture.h
@@ -46,7 +46,7 @@ class PoolImpl;
*/
class TimerImpl final : public ConnectionPool::TimerInterface {
public:
- TimerImpl(PoolImpl* global);
+ explicit TimerImpl(PoolImpl* global);
~TimerImpl() override;
void setTimeout(Milliseconds timeout, TimeoutCallback cb) override;
@@ -174,7 +174,7 @@ class PoolImpl final : public ConnectionPool::DependentTypeFactoryInterface {
friend class TimerImpl;
public:
- PoolImpl() = default;
+ explicit PoolImpl(const std::shared_ptr<OutOfLineExecutor>& executor) : _executor(executor) {}
std::shared_ptr<ConnectionPool::ConnectionInterface> makeConnection(
const HostAndPort& hostAndPort,
transport::ConnectSSLMode sslMode,
@@ -197,7 +197,7 @@ public:
private:
ConnectionPool* _pool = nullptr;
- std::shared_ptr<OutOfLineExecutor> _executor = std::make_shared<InlineOutOfLineExecutor>();
+ std::shared_ptr<OutOfLineExecutor> _executor;
static boost::optional<Date_t> _now;
};