From b9198150102c2edd2c2d27f0db2a96eaef9b6e38 Mon Sep 17 00:00:00 2001 From: Ben Caimano Date: Sun, 30 Jun 2019 13:35:40 -0400 Subject: SERVER-42026 Lock during ConnectionPool::SpecificPool::spawnConnections() --- src/mongo/executor/connection_pool.cpp | 30 ++++++++++++---------- src/mongo/executor/connection_pool_test.cpp | 30 +++++++++++++++++----- .../executor/connection_pool_test_fixture.cpp | 12 ++++----- src/mongo/executor/connection_pool_test_fixture.h | 6 ++--- 4 files changed, 47 insertions(+), 31 deletions(-) (limited to 'src/mongo/executor') diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp index db157a12c7d..9a8ff0f0a96 100644 --- a/src/mongo/executor/connection_pool.cpp +++ b/src/mongo/executor/connection_pool.cpp @@ -331,9 +331,11 @@ public: template void runOnExecutor(CallableT&& cb) { ExecutorFuture(ExecutorPtr(_parent->_factory->getExecutor())) // - .getAsync([ anchor = shared_from_this(), - cb = std::forward(cb) ](Status && status) mutable { + .getAsync([ this, anchor = shared_from_this(), cb = std::forward(cb) ]( + Status && status) mutable { invariant(status); + + stdx::lock_guard lk(_parent->_mutex); cb(); }); } @@ -489,22 +491,17 @@ void ConnectionPool::dropConnections(const HostAndPort& hostAndPort) { if (iter == _pools.end()) return; - auto pool = iter->second; + auto& pool = iter->second; pool->triggerShutdown( Status(ErrorCodes::PooledConnectionsDropped, "Pooled connections dropped")); } void ConnectionPool::dropConnections(transport::Session::TagMask tags) { - // Grab all current pools (under the lock) - auto pools = [&] { - stdx::lock_guard lk(_mutex); - return _pools; - }(); + stdx::lock_guard lk(_mutex); - for (const auto& pair : pools) { + for (const auto& pair : _pools) { auto& pool = pair.second; - stdx::lock_guard lk(_mutex); if (pool->matchesTags(tags)) continue; @@ -664,8 +661,6 @@ Future ConnectionPool::SpecificPool::getConnec auto ConnectionPool::SpecificPool::makeHandle(ConnectionInterface* connection) -> ConnectionHandle { auto deleter = [ this, anchor = shared_from_this() ](ConnectionInterface * connection) { runOnExecutor([this, connection]() { - stdx::lock_guard lk(_parent->_mutex); - returnConnection(connection); _lastActiveTime = _parent->_factory->now(); @@ -837,9 +832,16 @@ void ConnectionPool::SpecificPool::addToReady(OwnedConnection conn) { // Sets state to shutdown and kicks off the failure protocol to tank existing connections void ConnectionPool::SpecificPool::triggerShutdown(const Status& status) { - _health.isShutdown = true; + auto wasShutdown = std::exchange(_health.isShutdown, true); + if (wasShutdown) { + return; + } LOG(2) << "Delisting connection pool for " << _hostAndPort; + + // Make sure the pool lifetime lasts until the end of this function, + // it could be only in the map of pools + auto anchor = shared_from_this(); _parent->_controller->removeHost(_id); _parent->_pools.erase(_hostAndPort); @@ -1118,7 +1120,7 @@ void ConnectionPool::SpecificPool::updateController() { } } - runOnExecutor([ this, anchor = shared_from_this() ]() { spawnConnections(); }); + runOnExecutor([this]() { spawnConnections(); }); } // Updates our state and manages the request timer diff --git a/src/mongo/executor/connection_pool_test.cpp b/src/mongo/executor/connection_pool_test.cpp index 048e694af4f..31b27dc9238 100644 --- a/src/mongo/executor/connection_pool_test.cpp +++ b/src/mongo/executor/connection_pool_test.cpp @@ -64,12 +64,28 @@ protected: } auto makePool(ConnectionPool::Options options = {}) { - _pool = - std::make_shared(std::make_shared(), "test pool", options); + _pool = std::make_shared( + std::make_shared(_executor), "test pool", options); return _pool; } + /** + * Get from a pool with out-of-line execution and return the future for a connection + * + * Since the InlineOutOfLineExecutor starts running on the same thread once schedule is called, + * this function allows us to avoid deadlocks with get(), which is the only public function that + * calls schedule while holding a lock. In normal operation, the OutOfLineExecutor is actually + * out of line, and this contrivance isn't necessary. + */ + template + auto getFromPool(Args&&... args) { + return ExecutorFuture(_executor) + .then([ pool = _pool, args... ]() { return pool->get(args...); }) + .semi(); + } + private: + std::shared_ptr _executor = std::make_shared(); std::shared_ptr _pool; }; @@ -1440,7 +1456,7 @@ TEST_F(ConnectionPoolTest, AsyncGet) { size_t connId = 0; // no connections in the pool, our future is not satisfied - auto connFuture = pool->get(HostAndPort(), transport::kGlobalSSLMode, Seconds{1}); + auto connFuture = getFromPool(HostAndPort(), transport::kGlobalSSLMode, Seconds{1}); ASSERT_FALSE(connFuture.isReady()); // Successfully get a new connection @@ -1463,8 +1479,8 @@ TEST_F(ConnectionPoolTest, AsyncGet) { size_t connId2 = 0; size_t connId3 = 0; - auto connFuture1 = pool->get(HostAndPort(), transport::kGlobalSSLMode, Seconds{1}); - auto connFuture2 = pool->get(HostAndPort(), transport::kGlobalSSLMode, Seconds{10}); + auto connFuture1 = getFromPool(HostAndPort(), transport::kGlobalSSLMode, Seconds{1}); + auto connFuture2 = getFromPool(HostAndPort(), transport::kGlobalSSLMode, Seconds{10}); // The first future should be immediately ready. The second should be in the queue. ASSERT_TRUE(connFuture1.isReady()); @@ -1475,7 +1491,7 @@ TEST_F(ConnectionPoolTest, AsyncGet) { auto conn1 = std::move(connFuture1).get(); // Grab our third future while our first one is being fulfilled - connFuture3 = pool->get(HostAndPort(), transport::kGlobalSSLMode, Seconds{1}); + connFuture3 = getFromPool(HostAndPort(), transport::kGlobalSSLMode, Seconds{1}); connId1 = getId(conn1); doneWith(conn1); @@ -1512,7 +1528,7 @@ TEST_F(ConnectionPoolTest, ReturnAfterShutdown) { auto pool = makePool(); // Grab a connection and hold it to end of scope - auto connFuture = pool->get(HostAndPort(), transport::kGlobalSSLMode, Seconds(1)); + auto connFuture = getFromPool(HostAndPort(), transport::kGlobalSSLMode, Seconds(1)); ConnectionImpl::pushSetup(Status::OK()); auto conn = std::move(connFuture).get(); doneWith(conn); diff --git a/src/mongo/executor/connection_pool_test_fixture.cpp b/src/mongo/executor/connection_pool_test_fixture.cpp index dd8f4012388..9b28d752157 100644 --- a/src/mongo/executor/connection_pool_test_fixture.cpp +++ b/src/mongo/executor/connection_pool_test_fixture.cpp @@ -65,13 +65,15 @@ void TimerImpl::clear() { } } -void TimerImpl::fireIfNecessary() { - auto now = PoolImpl().now(); +Date_t TimerImpl::now() { + return _global->now(); +} +void TimerImpl::fireIfNecessary() { auto timers = _timers; for (auto&& x : timers) { - if (_timers.count(x) && (x->_expiration <= now)) { + if (_timers.count(x) && (x->_expiration <= x->now())) { auto execCB = [cb = std::move(x->_cb)](auto&&) mutable { std::move(cb)(); }; @@ -82,10 +84,6 @@ void TimerImpl::fireIfNecessary() { } } -Date_t TimerImpl::now() { - return _global->now(); -} - std::set TimerImpl::_timers; ConnectionImpl::ConnectionImpl(const HostAndPort& hostAndPort, size_t generation, PoolImpl* global) diff --git a/src/mongo/executor/connection_pool_test_fixture.h b/src/mongo/executor/connection_pool_test_fixture.h index 89cd65e48ec..8fec73b5953 100644 --- a/src/mongo/executor/connection_pool_test_fixture.h +++ b/src/mongo/executor/connection_pool_test_fixture.h @@ -46,7 +46,7 @@ class PoolImpl; */ class TimerImpl final : public ConnectionPool::TimerInterface { public: - TimerImpl(PoolImpl* global); + explicit TimerImpl(PoolImpl* global); ~TimerImpl() override; void setTimeout(Milliseconds timeout, TimeoutCallback cb) override; @@ -174,7 +174,7 @@ class PoolImpl final : public ConnectionPool::DependentTypeFactoryInterface { friend class TimerImpl; public: - PoolImpl() = default; + explicit PoolImpl(const std::shared_ptr& executor) : _executor(executor) {} std::shared_ptr makeConnection( const HostAndPort& hostAndPort, transport::ConnectSSLMode sslMode, @@ -197,7 +197,7 @@ public: private: ConnectionPool* _pool = nullptr; - std::shared_ptr _executor = std::make_shared(); + std::shared_ptr _executor; static boost::optional _now; }; -- cgit v1.2.1