diff options
author | samantharitter <samantha.ritter@10gen.com> | 2015-10-01 16:45:42 -0400 |
---|---|---|
committer | samantharitter <samantha.ritter@10gen.com> | 2015-11-04 09:57:20 -0500 |
commit | a5aecae4e39468fe8d4952f8512b9a34677e2795 (patch) | |
tree | 5c349fae47ce4e02ea1419a88d0ef3be11e5697d /src/mongo/executor | |
parent | 2af435e3a7f28c0a45c89cff882f2033e2ffd7d5 (diff) | |
download | mongo-a5aecae4e39468fe8d4952f8512b9a34677e2795.tar.gz |
SERVER-20546 Abort if connections are returned to connection pool in an unknown state
Diffstat (limited to 'src/mongo/executor')
-rw-r--r-- | src/mongo/executor/connection_pool.cpp | 10 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool.h | 24 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool_asio.cpp | 18 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool_asio.h | 8 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool_test.cpp | 129 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool_test_fixture.cpp | 10 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool_test_fixture.h | 7 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_asio.cpp | 6 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_asio_command.cpp | 4 |
9 files changed, 158 insertions, 58 deletions
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp index aa5e995600b..adcf4f7b5d7 100644 --- a/src/mongo/executor/connection_pool.cpp +++ b/src/mongo/executor/connection_pool.cpp @@ -166,6 +166,9 @@ Milliseconds const ConnectionPool::kDefaultRefreshTimeout = Seconds(20); Milliseconds const ConnectionPool::kDefaultRefreshRequirement = Seconds(60); Milliseconds const ConnectionPool::kDefaultHostTimeout = Minutes(5); +const Status ConnectionPool::kConnectionStateUnknown = + Status(ErrorCodes::InternalError, "Connection is in an unknown state"); + ConnectionPool::ConnectionPool(std::unique_ptr<DependentTypeFactoryInterface> impl, Options options) : _options(std::move(options)), _factory(std::move(impl)) {} @@ -301,6 +304,10 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr updateStateInLock(); + // Users are required to call indicateSuccess() or indicateFailure() before allowing + // a connection to be returned. Otherwise, we have entered an unknown state. + invariant(conn->getStatus() != kConnectionStateUnknown); + if (conn->getGeneration() != _generation) { // If the connection is from an older generation, just return. return; @@ -384,6 +391,8 @@ void ConnectionPool::SpecificPool::addToReady(stdx::unique_lock<stdx::mutex>& lk _checkedOutPool[connPtr] = std::move(conn); + connPtr->indicateSuccess(); + returnConnection(connPtr, std::move(lk)); }); @@ -460,6 +469,7 @@ void ConnectionPool::SpecificPool::fulfillRequests(stdx::unique_lock<stdx::mutex updateStateInLock(); // pass it to the user + connPtr->resetToUnknown(); lk.unlock(); cb(ConnectionHandle(connPtr, ConnectionHandleDeleter(_parent))); lk.lock(); diff --git a/src/mongo/executor/connection_pool.h b/src/mongo/executor/connection_pool.h index a5d1676bf0e..bc2a5aee0a8 100644 --- a/src/mongo/executor/connection_pool.h +++ b/src/mongo/executor/connection_pool.h @@ -69,6 +69,8 @@ public: static const Milliseconds kDefaultRefreshRequirement; static const Milliseconds kDefaultHostTimeout; + static const Status kConnectionStateUnknown; + struct Options { Options() {} @@ -188,16 +190,17 @@ public: virtual ~ConnectionInterface() = default; /** - * Intended to be called whenever a socket is used in a way which indicates - * liveliness. I.e. if an operation is executed over the connection. + * Indicates that the user is now done with this connection. Users MUST call either + * this method or indicateFailure() before returning the connection to its pool. */ - virtual void indicateUsed() = 0; + virtual void indicateSuccess() = 0; /** * Indicates that a connection has failed. This will prevent the connection - * from re-entering the connection pool. + * from re-entering the connection pool. Users MUST call either this method or + * indicateSuccess() before returning connections to the pool. */ - virtual void indicateFailed(Status status) = 0; + virtual void indicateFailure(Status status) = 0; /** * The HostAndPort for the connection. This should be the same as the @@ -215,6 +218,12 @@ protected: private: /** + * This method updates a 'liveness' timestamp to avoid unnecessarily refreshing + * the connection. + */ + virtual void indicateUsed() = 0; + + /** * Returns the last used time point for the connection */ virtual Date_t getLastUsed() const = 0; @@ -232,6 +241,11 @@ private: virtual void setup(Milliseconds timeout, SetupCallback cb) = 0; /** + * Resets the connection's state to kConnectionStateUnknown for the next user. + */ + virtual void resetToUnknown() = 0; + + /** * Refreshes the connection. This should involve a network round trip and * should strongly imply an active connection */ diff --git a/src/mongo/executor/connection_pool_asio.cpp b/src/mongo/executor/connection_pool_asio.cpp index d63ef0488cc..39d70cd6c4e 100644 --- a/src/mongo/executor/connection_pool_asio.cpp +++ b/src/mongo/executor/connection_pool_asio.cpp @@ -102,11 +102,13 @@ ASIOConnection::ASIOConnection(const HostAndPort& hostAndPort, size_t generation _generation(generation), _impl(makeAsyncOp(this)) {} -void ASIOConnection::indicateUsed() { - _lastUsed = _global->now(); +void ASIOConnection::indicateSuccess() { + indicateUsed(); + _status = Status::OK(); } -void ASIOConnection::indicateFailed(Status status) { +void ASIOConnection::indicateFailure(Status status) { + invariant(!status.isOK()); _status = std::move(status); } @@ -114,6 +116,12 @@ const HostAndPort& ASIOConnection::getHostAndPort() const { return _hostAndPort; } +void ASIOConnection::indicateUsed() { + // It is illegal to attempt to use a connection after calling indicateFailure(). + invariant(_status.isOK() || _status == ConnectionPool::kConnectionStateUnknown); + _lastUsed = _global->now(); +} + Date_t ASIOConnection::getLastUsed() const { return _lastUsed; } @@ -163,6 +171,10 @@ void ASIOConnection::setup(Milliseconds timeout, SetupCallback cb) { _global->_impl->_connect(_impl.get()); } +void ASIOConnection::resetToUnknown() { + _status = ConnectionPool::kConnectionStateUnknown; +} + void ASIOConnection::refresh(Milliseconds timeout, RefreshCallback cb) { auto op = _impl.get(); diff --git a/src/mongo/executor/connection_pool_asio.h b/src/mongo/executor/connection_pool_asio.h index 850431a2251..6700819c63d 100644 --- a/src/mongo/executor/connection_pool_asio.h +++ b/src/mongo/executor/connection_pool_asio.h @@ -71,14 +71,15 @@ class ASIOConnection final : public ConnectionPool::ConnectionInterface { public: ASIOConnection(const HostAndPort& hostAndPort, size_t generation, ASIOImpl* global); - void indicateUsed() override; - void indicateFailed(Status status) override; + void indicateSuccess() override; + void indicateFailure(Status status) override; const HostAndPort& getHostAndPort() const override; std::unique_ptr<NetworkInterfaceASIO::AsyncOp> releaseAsyncOp(); void bindAsyncOp(std::unique_ptr<NetworkInterfaceASIO::AsyncOp> op); private: + void indicateUsed() override; Date_t getLastUsed() const override; const Status& getStatus() const override; @@ -86,6 +87,7 @@ private: void cancelTimeout() override; void setup(Milliseconds timeout, SetupCallback cb) override; + void resetToUnknown() override; void refresh(Milliseconds timeout, RefreshCallback cb) override; size_t getGeneration() const override; @@ -99,7 +101,7 @@ private: ASIOImpl* const _global; ASIOTimer _timer; Date_t _lastUsed; - Status _status = Status::OK(); + Status _status = ConnectionPool::kConnectionStateUnknown; HostAndPort _hostAndPort; size_t _generation; std::unique_ptr<NetworkInterfaceASIO::AsyncOp> _impl; diff --git a/src/mongo/executor/connection_pool_test.cpp b/src/mongo/executor/connection_pool_test.cpp index 12f59572dd2..24949a142c7 100644 --- a/src/mongo/executor/connection_pool_test.cpp +++ b/src/mongo/executor/connection_pool_test.cpp @@ -48,6 +48,10 @@ protected: TimerImpl::clear(); } + void doneWith(const ConnectionPool::ConnectionHandle& swConn) { + static_cast<ConnectionImpl*>(swConn.get())->indicateSuccess(); + } + private: }; @@ -67,18 +71,22 @@ TEST_F(ConnectionPoolTest, SameConn) { // Grab and stash an id for the first request size_t conn1Id = 0; ConnectionImpl::pushSetup(Status::OK()); - pool.get( - HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { conn1Id = CONN2ID(swConn); }); + pool.get(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + conn1Id = CONN2ID(swConn); + doneWith(swConn.getValue()); + }); // Grab and stash an id for the second request size_t conn2Id = 0; ConnectionImpl::pushSetup(Status::OK()); - pool.get( - HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { conn2Id = CONN2ID(swConn); }); + pool.get(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + conn2Id = CONN2ID(swConn); + doneWith(swConn.getValue()); + }); // Verify that we hit them, and that they're the same ASSERT(conn1Id); @@ -99,16 +107,18 @@ TEST_F(ConnectionPoolTest, FailedConnDifferentConn) { Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { conn1Id = CONN2ID(swConn); - swConn.getValue()->indicateFailed(Status(ErrorCodes::BadValue, "error")); + swConn.getValue()->indicateFailure(Status(ErrorCodes::BadValue, "error")); }); // Grab the second id size_t conn2Id = 0; ConnectionImpl::pushSetup(Status::OK()); - pool.get( - HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { conn2Id = CONN2ID(swConn); }); + pool.get(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + conn2Id = CONN2ID(swConn); + doneWith(swConn.getValue()); + }); // Verify that we hit them, and that they're different ASSERT(conn1Id); @@ -126,18 +136,22 @@ TEST_F(ConnectionPoolTest, DifferentHostDifferentConn) { // Conn 1 from port 30000 size_t conn1Id = 0; ConnectionImpl::pushSetup(Status::OK()); - pool.get( - HostAndPort("localhost:30000"), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { conn1Id = CONN2ID(swConn); }); + pool.get(HostAndPort("localhost:30000"), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + conn1Id = CONN2ID(swConn); + doneWith(swConn.getValue()); + }); // Conn 2 from port 30001 size_t conn2Id = 0; ConnectionImpl::pushSetup(Status::OK()); - pool.get( - HostAndPort("localhost:30001"), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { conn2Id = CONN2ID(swConn); }); + pool.get(HostAndPort("localhost:30001"), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + conn2Id = CONN2ID(swConn); + doneWith(swConn.getValue()); + }); // Hit them and not the same ASSERT(conn1Id); @@ -173,6 +187,9 @@ TEST_F(ConnectionPoolTest, DifferentConnWithoutReturn) { // Verify that the two connections are different ASSERT_NE(conn1.get(), conn2.get()); + + doneWith(conn1); + doneWith(conn2); } /** @@ -227,7 +244,10 @@ TEST_F(ConnectionPoolTest, refreshHappens) { ConnectionImpl::pushSetup(Status::OK()); pool.get(HostAndPort(), Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { ASSERT(swConn.isOK()); }); + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(swConn.isOK()); + doneWith(swConn.getValue()); + }); // After 1 second, one refresh has occurred PoolImpl::setNow(now + Milliseconds(1000)); @@ -260,19 +280,23 @@ TEST_F(ConnectionPoolTest, refreshTimeoutHappens) { // Grab a connection and verify it's good ConnectionImpl::pushSetup(Status::OK()); - pool.get( - HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { conn1Id = CONN2ID(swConn); }); + pool.get(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + conn1Id = CONN2ID(swConn); + doneWith(swConn.getValue()); + }); PoolImpl::setNow(now + Milliseconds(500)); size_t conn2Id = 0; // Make sure we still get the first one - pool.get( - HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { conn2Id = CONN2ID(swConn); }); + pool.get(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + conn2Id = CONN2ID(swConn); + doneWith(swConn.getValue()); + }); ASSERT_EQ(conn1Id, conn2Id); // This should trigger a refresh, but not time it out. So now we have one @@ -303,6 +327,7 @@ TEST_F(ConnectionPoolTest, refreshTimeoutHappens) { [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { ASSERT_NE(CONN2ID(swConn), conn1Id); reachedB = true; + doneWith(swConn.getValue()); }); ASSERT(reachedA); @@ -326,6 +351,7 @@ TEST_F(ConnectionPoolTest, requestsServedByUrgency) { ASSERT(swConn.isOK()); reachedA = true; + doneWith(swConn.getValue()); }); pool.get(HostAndPort(), @@ -344,6 +370,7 @@ TEST_F(ConnectionPoolTest, requestsServedByUrgency) { ASSERT(reachedB); ASSERT(!reachedA); + doneWith(conn); conn.reset(); // Now that we've returned the connection, we see the second has been @@ -399,10 +426,14 @@ TEST_F(ConnectionPoolTest, maxPoolRespected) { // Return 1 ConnectionPool::ConnectionInterface* conn1Ptr = conn1.get(); + doneWith(conn1); conn1.reset(); // Verify that it's the one that pops out for request 3 ASSERT_EQ(conn1Ptr, conn3.get()); + + doneWith(conn2); + doneWith(conn3); } /** @@ -494,15 +525,15 @@ TEST_F(ConnectionPoolTest, minPoolRespected) { // Return each connection over 1, 2 and 3 ms PoolImpl::setNow(now + Milliseconds(1)); - conn1->indicateUsed(); + doneWith(conn1); conn1.reset(); PoolImpl::setNow(now + Milliseconds(2)); - conn2->indicateUsed(); + doneWith(conn2); conn2.reset(); PoolImpl::setNow(now + Milliseconds(3)); - conn3->indicateUsed(); + doneWith(conn3); conn3.reset(); // Jump 5 seconds and verify that refreshes only two refreshes occurred @@ -539,6 +570,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappens) { [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { connId = CONN2ID(swConn); reachedA = true; + doneWith(swConn.getValue()); }); ASSERT(reachedA); @@ -555,6 +587,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappens) { [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { ASSERT_NE(connId, CONN2ID(swConn)); reachedB = true; + doneWith(swConn.getValue()); }); ASSERT(reachedB); @@ -587,6 +620,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) { [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { connId = CONN2ID(swConn); reachedA = true; + doneWith(swConn.getValue()); }); ASSERT(reachedA); @@ -600,6 +634,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) { [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { ASSERT_EQ(connId, CONN2ID(swConn)); reachedB = true; + doneWith(swConn.getValue()); }); ASSERT(reachedB); @@ -614,6 +649,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) { [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { ASSERT_NE(connId, CONN2ID(swConn)); reachedC = true; + doneWith(swConn.getValue()); }); ASSERT(reachedC); @@ -652,10 +688,12 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) { // return the second ConnectionImpl::pushSetup(Status::OK()); - pool.get( - HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { conn2Id = CONN2ID(swConn); }); + pool.get(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + conn2Id = CONN2ID(swConn); + doneWith(swConn.getValue()); + }); ASSERT(conn2Id); @@ -670,11 +708,13 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) { [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { ASSERT_EQ(conn2Id, CONN2ID(swConn)); reachedA = true; + doneWith(swConn.getValue()); }); ASSERT(reachedA); // return conn 1 + doneWith(conn1); conn1.reset(); // expire the pool @@ -690,6 +730,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) { ASSERT_NE(conn1Id, CONN2ID(swConn)); ASSERT_NE(conn2Id, CONN2ID(swConn)); reachedB = true; + doneWith(swConn.getValue()); }); ASSERT(reachedB); } @@ -712,10 +753,12 @@ TEST_F(ConnectionPoolTest, dropConnections) { // Grab the first connection id size_t conn1Id = 0; ConnectionImpl::pushSetup(Status::OK()); - pool.get( - HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { conn1Id = CONN2ID(swConn); }); + pool.get(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + conn1Id = CONN2ID(swConn); + doneWith(swConn.getValue()); + }); ASSERT(conn1Id); // Grab it and this time keep it out of the pool @@ -747,6 +790,7 @@ TEST_F(ConnectionPoolTest, dropConnections) { ASSERT(reachedA); // return the connection + doneWith(handle); handle.reset(); // Make sure that a new connection request properly disposed of the gen1 @@ -757,8 +801,8 @@ TEST_F(ConnectionPoolTest, dropConnections) { Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { conn2Id = CONN2ID(swConn); - ASSERT_NE(conn2Id, conn1Id); + doneWith(swConn.getValue()); }); ASSERT(conn2Id); @@ -780,6 +824,7 @@ TEST_F(ConnectionPoolTest, dropConnections) { [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { ASSERT_NE(CONN2ID(swConn), conn2Id); reachedB = true; + doneWith(swConn.getValue()); }); ASSERT(reachedB); diff --git a/src/mongo/executor/connection_pool_test_fixture.cpp b/src/mongo/executor/connection_pool_test_fixture.cpp index e7c9703984f..3a3b9843fd3 100644 --- a/src/mongo/executor/connection_pool_test_fixture.cpp +++ b/src/mongo/executor/connection_pool_test_fixture.cpp @@ -81,10 +81,18 @@ void ConnectionImpl::indicateUsed() { _lastUsed = _global->now(); } -void ConnectionImpl::indicateFailed(Status status) { +void ConnectionImpl::indicateSuccess() { + _status = Status::OK(); +} + +void ConnectionImpl::indicateFailure(Status status) { _status = std::move(status); } +void ConnectionImpl::resetToUnknown() { + _status = ConnectionPool::kConnectionStateUnknown; +} + size_t ConnectionImpl::id() const { return _id; } diff --git a/src/mongo/executor/connection_pool_test_fixture.h b/src/mongo/executor/connection_pool_test_fixture.h index edc83c4c401..37cb4e504ae 100644 --- a/src/mongo/executor/connection_pool_test_fixture.h +++ b/src/mongo/executor/connection_pool_test_fixture.h @@ -80,9 +80,10 @@ public: size_t id() const; - void indicateUsed() override; + void indicateSuccess() override; + void indicateFailure(Status status) override; - void indicateFailed(Status status) override; + void resetToUnknown() override; const HostAndPort& getHostAndPort() const override; @@ -98,6 +99,8 @@ public: static void pushRefresh(Status status); private: + void indicateUsed() override; + Date_t getLastUsed() const override; const Status& getStatus() const override; diff --git a/src/mongo/executor/network_interface_asio.cpp b/src/mongo/executor/network_interface_asio.cpp index 19f38107de6..5f554b4fa94 100644 --- a/src/mongo/executor/network_interface_asio.cpp +++ b/src/mongo/executor/network_interface_asio.cpp @@ -205,8 +205,14 @@ void NetworkInterfaceASIO::startCommand(const TaskExecutor::CallbackHandle& cbHa // If we didn't find the request, we've been canceled if (eraseCount == 0) { lk.unlock(); + onFinish({ErrorCodes::CallbackCanceled, "Callback canceled"}); + + // Though we were canceled, we know that the stream is fine, so indicate success. + conn->indicateSuccess(); + signalWorkAvailable(); + return; } diff --git a/src/mongo/executor/network_interface_asio_command.cpp b/src/mongo/executor/network_interface_asio_command.cpp index 0100c91f50d..80deb8cf0d9 100644 --- a/src/mongo/executor/network_interface_asio_command.cpp +++ b/src/mongo/executor/network_interface_asio_command.cpp @@ -329,9 +329,9 @@ void NetworkInterfaceASIO::_completeOperation(AsyncOp* op, const ResponseStatus& asioConn->bindAsyncOp(std::move(ownedOp)); if (!resp.isOK()) { - asioConn->indicateFailed(resp.getStatus()); + asioConn->indicateFailure(resp.getStatus()); } else { - asioConn->indicateUsed(); + asioConn->indicateSuccess(); } signalWorkAvailable(); |