diff options
author | Jonathan Reams <jbreams@mongodb.com> | 2019-01-16 11:46:03 -0500 |
---|---|---|
committer | Jonathan Reams <jbreams@mongodb.com> | 2019-01-23 17:02:16 -0500 |
commit | c1b72f76bee602cd915bc6ea91bdcef10bd0c707 (patch) | |
tree | 374edbdb9c98344ad9a0543bb1d04633a045e3e7 /src/mongo/executor | |
parent | e7b1c689b632610399ab716a98f125605dd8a11c (diff) | |
download | mongo-c1b72f76bee602cd915bc6ea91bdcef10bd0c707.tar.gz |
SERVER-34260 Move bookkeeping functions into ConnectionInterface
Diffstat (limited to 'src/mongo/executor')
-rw-r--r-- | src/mongo/executor/connection_pool.cpp | 32 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool.h | 58 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool_test_fixture.cpp | 38 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool_test_fixture.h | 22 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool_tl.cpp | 37 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool_tl.h | 18 |
6 files changed, 91 insertions, 114 deletions
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp index 54107e04a2e..45b75798d80 100644 --- a/src/mongo/executor/connection_pool.cpp +++ b/src/mongo/executor/connection_pool.cpp @@ -28,7 +28,7 @@ * it in the license file. */ -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kASIO +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kConnectionPool #include "mongo/platform/basic.h" @@ -54,6 +54,36 @@ namespace mongo { namespace executor { +void ConnectionPool::ConnectionInterface::indicateUsed() { + // It is illegal to attempt to use a connection after calling indicateFailure(). + invariant(_status.isOK() || _status == ConnectionPool::kConnectionStateUnknown); + _lastUsed = now(); +} + +void ConnectionPool::ConnectionInterface::indicateSuccess() { + _status = Status::OK(); +} + +void ConnectionPool::ConnectionInterface::indicateFailure(Status status) { + _status = std::move(status); +} + +Date_t ConnectionPool::ConnectionInterface::getLastUsed() const { + return _lastUsed; +} + +const Status& ConnectionPool::ConnectionInterface::getStatus() const { + return _status; +} + +void ConnectionPool::ConnectionInterface::resetToUnknown() { + _status = ConnectionPool::kConnectionStateUnknown; +} + +size_t ConnectionPool::ConnectionInterface::getGeneration() const { + return _generation; +} + /** * A pool for a specific HostAndPort * diff --git a/src/mongo/executor/connection_pool.h b/src/mongo/executor/connection_pool.h index e6525319571..ed3dac9c261 100644 --- a/src/mongo/executor/connection_pool.h +++ b/src/mongo/executor/connection_pool.h @@ -208,6 +208,11 @@ public: * It should be safe to cancel a previously canceled, or never set, timer. */ virtual void cancelTimeout() = 0; + + /** + * Returns the current time for the clock used by the timer + */ + virtual Date_t now() = 0; }; /** @@ -223,21 +228,22 @@ class ConnectionPool::ConnectionInterface : public TimerInterface { friend class ConnectionPool; public: - ConnectionInterface() = default; + explicit ConnectionInterface(size_t generation) : _generation(generation) {} + virtual ~ConnectionInterface() = default; /** * Indicates that the user is now done with this connection. Users MUST call either * this method or indicateFailure() before returning the connection to its pool. */ - virtual void indicateSuccess() = 0; + void indicateSuccess(); /** * Indicates that a connection has failed. This will prevent the connection * from re-entering the connection pool. Users MUST call either this method or * indicateSuccess() before returning connections to the pool. */ - virtual void indicateFailure(Status status) = 0; + void indicateFailure(Status status); /** * This method updates a 'liveness' timestamp to avoid unnecessarily refreshing @@ -248,7 +254,7 @@ public: * back in without use, one would expect an indicateSuccess without an indicateUsed. Only if we * checked it out and did work would we call indicateUsed. */ - virtual void indicateUsed() = 0; + void indicateUsed(); /** * The HostAndPort for the connection. This should be the same as the @@ -262,25 +268,33 @@ public: */ virtual bool isHealthy() = 0; -protected: - /** - * Making these protected makes the definitions available to override in - * children. - */ - using SetupCallback = stdx::function<void(ConnectionInterface*, Status)>; - using RefreshCallback = stdx::function<void(ConnectionInterface*, Status)>; - -private: /** * Returns the last used time point for the connection */ - virtual Date_t getLastUsed() const = 0; + Date_t getLastUsed() const; /** * Returns the status associated with the connection. If the status is not * OK, the connection will not be returned to the pool. */ - virtual const Status& getStatus() const = 0; + const Status& getStatus() const; + + /** + * Get the generation of the connection. This is used to track whether to + * continue using a connection after a call to dropConnections() by noting + * if the generation on the specific pool is the same as the generation on + * a connection (if not the connection is from a previous era and should + * not be re-used). + */ + size_t getGeneration() const; + +protected: + /** + * Making these protected makes the definitions available to override in + * children. + */ + using SetupCallback = stdx::function<void(ConnectionInterface*, Status)>; + using RefreshCallback = stdx::function<void(ConnectionInterface*, Status)>; /** * Sets up the connection. This should include connection + auth + any @@ -291,7 +305,7 @@ private: /** * Resets the connection's state to kConnectionStateUnknown for the next user. */ - virtual void resetToUnknown() = 0; + void resetToUnknown(); /** * Refreshes the connection. This should involve a network round trip and @@ -299,14 +313,10 @@ private: */ virtual void refresh(Milliseconds timeout, RefreshCallback cb) = 0; - /** - * Get the generation of the connection. This is used to track whether to - * continue using a connection after a call to dropConnections() by noting - * if the generation on the specific pool is the same as the generation on - * a connection (if not the connection is from a previous era and should - * not be re-used). - */ - virtual size_t getGeneration() const = 0; +private: + size_t _generation; + Date_t _lastUsed; + Status _status = ConnectionPool::kConnectionStateUnknown; }; /** diff --git a/src/mongo/executor/connection_pool_test_fixture.cpp b/src/mongo/executor/connection_pool_test_fixture.cpp index f032b81039e..9e5f5338c47 100644 --- a/src/mongo/executor/connection_pool_test_fixture.cpp +++ b/src/mongo/executor/connection_pool_test_fixture.cpp @@ -77,29 +77,21 @@ void TimerImpl::fireIfNecessary() { } } +Date_t TimerImpl::now() { + return _global->now(); +} + std::set<TimerImpl*> TimerImpl::_timers; ConnectionImpl::ConnectionImpl(const HostAndPort& hostAndPort, size_t generation, PoolImpl* global) - : _hostAndPort(hostAndPort), + : ConnectionInterface(generation), + _hostAndPort(hostAndPort), _timer(global), _global(global), - _id(_idCounter++), - _generation(generation) {} - -void ConnectionImpl::indicateUsed() { - _lastUsed = _global->now(); -} - -void ConnectionImpl::indicateSuccess() { - _status = Status::OK(); -} - -void ConnectionImpl::indicateFailure(Status status) { - _status = std::move(status); -} + _id(_idCounter++) {} -void ConnectionImpl::resetToUnknown() { - _status = ConnectionPool::kConnectionStateUnknown; +Date_t ConnectionImpl::now() { + return _timer.now(); } size_t ConnectionImpl::id() const { @@ -168,14 +160,6 @@ size_t ConnectionImpl::refreshQueueDepth() { return _refreshQueue.size(); } -Date_t ConnectionImpl::getLastUsed() const { - return _lastUsed; -} - -const Status& ConnectionImpl::getStatus() const { - return _status; -} - void ConnectionImpl::setTimeout(Milliseconds timeout, TimeoutCallback cb) { _timer.setTimeout(timeout, cb); } @@ -227,10 +211,6 @@ void ConnectionImpl::refresh(Milliseconds timeout, RefreshCallback cb) { } } -size_t ConnectionImpl::getGeneration() const { - return _generation; -} - std::deque<ConnectionImpl::PushSetupCallback> ConnectionImpl::_pushSetupQueue; std::deque<ConnectionImpl::PushRefreshCallback> ConnectionImpl::_pushRefreshQueue; std::deque<ConnectionImpl*> ConnectionImpl::_setupQueue; diff --git a/src/mongo/executor/connection_pool_test_fixture.h b/src/mongo/executor/connection_pool_test_fixture.h index e9215e3244f..eb3760d7b5b 100644 --- a/src/mongo/executor/connection_pool_test_fixture.h +++ b/src/mongo/executor/connection_pool_test_fixture.h @@ -53,6 +53,8 @@ public: void cancelTimeout() override; + Date_t now() override; + // launches all timers for whom now() has passed static void fireIfNecessary(); @@ -83,11 +85,6 @@ public: size_t id() const; - void indicateSuccess() override; - void indicateFailure(Status status) override; - - void resetToUnknown() override; - const HostAndPort& getHostAndPort() const override; transport::ConnectSSLMode getSslMode() const override { return transport::kGlobalSSLMode; @@ -108,32 +105,23 @@ public: static void pushRefresh(Status status); static size_t refreshQueueDepth(); -private: - void indicateUsed() override; - - Date_t getLastUsed() const override; - - const Status& getStatus() const override; - void setTimeout(Milliseconds timeout, TimeoutCallback cb) override; void cancelTimeout() override; + Date_t now() override; + +private: void setup(Milliseconds timeout, SetupCallback cb) override; void refresh(Milliseconds timeout, RefreshCallback cb) override; - size_t getGeneration() const override; - HostAndPort _hostAndPort; - Date_t _lastUsed; - Status _status = Status::OK(); SetupCallback _setupCallback; RefreshCallback _refreshCallback; TimerImpl _timer; PoolImpl* _global; size_t _id; - size_t _generation; // Answer queues static std::deque<PushSetupCallback> _pushSetupQueue; diff --git a/src/mongo/executor/connection_pool_tl.cpp b/src/mongo/executor/connection_pool_tl.cpp index 344fcac81cc..14c73d50cba 100644 --- a/src/mongo/executor/connection_pool_tl.cpp +++ b/src/mongo/executor/connection_pool_tl.cpp @@ -115,12 +115,8 @@ void TLTimer::cancelTimeout() { _timer->cancel(); } -void TLConnection::indicateSuccess() { - _status = Status::OK(); -} - -void TLConnection::indicateFailure(Status status) { - _status = std::move(status); +Date_t TLTimer::now() { + return _reactor->now(); } const HostAndPort& TLConnection::getHostAndPort() const { @@ -139,20 +135,6 @@ AsyncDBClient* TLConnection::client() { return _client.get(); } -void TLConnection::indicateUsed() { - // It is illegal to attempt to use a connection after calling indicateFailure(). - invariant(_status.isOK() || _status == ConnectionPool::kConnectionStateUnknown); - _lastUsed = _reactor->now(); -} - -Date_t TLConnection::getLastUsed() const { - return _lastUsed; -} - -const Status& TLConnection::getStatus() const { - return _status; -} - void TLConnection::setTimeout(Milliseconds timeout, TimeoutCallback cb) { auto anchor = shared_from_this(); _timer->setTimeout(timeout, [ cb = std::move(cb), anchor = std::move(anchor) ] { cb(); }); @@ -292,10 +274,6 @@ void TLConnection::setup(Milliseconds timeout, SetupCallback cb) { LOG(2) << "Finished connection setup."; } -void TLConnection::resetToUnknown() { - _status = ConnectionPool::kConnectionStateUnknown; -} - void TLConnection::refresh(Milliseconds timeout, RefreshCallback cb) { auto anchor = shared_from_this(); @@ -309,10 +287,10 @@ void TLConnection::refresh(Milliseconds timeout, RefreshCallback cb) { return; } - _status = {ErrorCodes::HostUnreachable, "Timed out refreshing host"}; + indicateFailure({ErrorCodes::HostUnreachable, "Timed out refreshing host"}); _client->cancel(); - handler->promise.setError(_status); + handler->promise.setError(getStatus()); }); _client @@ -328,17 +306,18 @@ void TLConnection::refresh(Milliseconds timeout, RefreshCallback cb) { cancelTimeout(); - _status = status; if (status.isOK()) { + indicateSuccess(); handler->promise.emplaceValue(); } else { + indicateFailure(status); handler->promise.setError(status); } }); } -size_t TLConnection::getGeneration() const { - return _generation; +Date_t TLConnection::now() { + return _reactor->now(); } void TLConnection::cancelAsync() { diff --git a/src/mongo/executor/connection_pool_tl.h b/src/mongo/executor/connection_pool_tl.h index e97dda4c44e..3c40b62047d 100644 --- a/src/mongo/executor/connection_pool_tl.h +++ b/src/mongo/executor/connection_pool_tl.h @@ -112,6 +112,7 @@ public: void setTimeout(Milliseconds timeout, TimeoutCallback cb) override; void cancelTimeout() override; + Date_t now() override; private: transport::ReactorHandle _reactor; @@ -127,13 +128,13 @@ public: transport::ConnectSSLMode sslMode, size_t generation, NetworkConnectionHook* onConnectHook) - : TLTypeFactory::Type(factory), + : ConnectionInterface(generation), + TLTypeFactory::Type(factory), _reactor(reactor), _serviceContext(serviceContext), _timer(factory->makeTimer()), _peer(std::move(peer)), _sslMode(sslMode), - _generation(generation), _onConnectHook(onConnectHook) {} ~TLConnection() { // Release must be the first expression of this dtor @@ -144,27 +145,19 @@ public: cancelAsync(); } - void indicateSuccess() override; - void indicateFailure(Status status) override; - void indicateUsed() override; const HostAndPort& getHostAndPort() const override; transport::ConnectSSLMode getSslMode() const override; bool isHealthy() override; AsyncDBClient* client(); + Date_t now() override; private: - Date_t getLastUsed() const override; - const Status& getStatus() const override; - void setTimeout(Milliseconds timeout, TimeoutCallback cb) override; void cancelTimeout() override; void setup(Milliseconds timeout, SetupCallback cb) override; - void resetToUnknown() override; void refresh(Milliseconds timeout, RefreshCallback cb) override; void cancelAsync(); - size_t getGeneration() const override; - private: transport::ReactorHandle _reactor; ServiceContext* const _serviceContext; @@ -172,11 +165,8 @@ private: HostAndPort _peer; transport::ConnectSSLMode _sslMode; - size_t _generation; NetworkConnectionHook* const _onConnectHook; AsyncDBClient::Handle _client; - Date_t _lastUsed; - Status _status = ConnectionPool::kConnectionStateUnknown; }; } // namespace connection_pool_asio |