summaryrefslogtreecommitdiff
path: root/src/mongo/executor
diff options
context:
space:
mode:
authorJonathan Reams <jbreams@mongodb.com>2019-01-16 11:46:03 -0500
committerJonathan Reams <jbreams@mongodb.com>2019-01-23 17:02:16 -0500
commitc1b72f76bee602cd915bc6ea91bdcef10bd0c707 (patch)
tree374edbdb9c98344ad9a0543bb1d04633a045e3e7 /src/mongo/executor
parente7b1c689b632610399ab716a98f125605dd8a11c (diff)
downloadmongo-c1b72f76bee602cd915bc6ea91bdcef10bd0c707.tar.gz
SERVER-34260 Move bookkeeping functions into ConnectionInterface
Diffstat (limited to 'src/mongo/executor')
-rw-r--r--src/mongo/executor/connection_pool.cpp32
-rw-r--r--src/mongo/executor/connection_pool.h58
-rw-r--r--src/mongo/executor/connection_pool_test_fixture.cpp38
-rw-r--r--src/mongo/executor/connection_pool_test_fixture.h22
-rw-r--r--src/mongo/executor/connection_pool_tl.cpp37
-rw-r--r--src/mongo/executor/connection_pool_tl.h18
6 files changed, 91 insertions, 114 deletions
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp
index 54107e04a2e..45b75798d80 100644
--- a/src/mongo/executor/connection_pool.cpp
+++ b/src/mongo/executor/connection_pool.cpp
@@ -28,7 +28,7 @@
* it in the license file.
*/
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kASIO
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kConnectionPool
#include "mongo/platform/basic.h"
@@ -54,6 +54,36 @@
namespace mongo {
namespace executor {
+void ConnectionPool::ConnectionInterface::indicateUsed() {
+ // It is illegal to attempt to use a connection after calling indicateFailure().
+ invariant(_status.isOK() || _status == ConnectionPool::kConnectionStateUnknown);
+ _lastUsed = now();
+}
+
+void ConnectionPool::ConnectionInterface::indicateSuccess() {
+ _status = Status::OK();
+}
+
+void ConnectionPool::ConnectionInterface::indicateFailure(Status status) {
+ _status = std::move(status);
+}
+
+Date_t ConnectionPool::ConnectionInterface::getLastUsed() const {
+ return _lastUsed;
+}
+
+const Status& ConnectionPool::ConnectionInterface::getStatus() const {
+ return _status;
+}
+
+void ConnectionPool::ConnectionInterface::resetToUnknown() {
+ _status = ConnectionPool::kConnectionStateUnknown;
+}
+
+size_t ConnectionPool::ConnectionInterface::getGeneration() const {
+ return _generation;
+}
+
/**
* A pool for a specific HostAndPort
*
diff --git a/src/mongo/executor/connection_pool.h b/src/mongo/executor/connection_pool.h
index e6525319571..ed3dac9c261 100644
--- a/src/mongo/executor/connection_pool.h
+++ b/src/mongo/executor/connection_pool.h
@@ -208,6 +208,11 @@ public:
* It should be safe to cancel a previously canceled, or never set, timer.
*/
virtual void cancelTimeout() = 0;
+
+ /**
+ * Returns the current time for the clock used by the timer
+ */
+ virtual Date_t now() = 0;
};
/**
@@ -223,21 +228,22 @@ class ConnectionPool::ConnectionInterface : public TimerInterface {
friend class ConnectionPool;
public:
- ConnectionInterface() = default;
+ explicit ConnectionInterface(size_t generation) : _generation(generation) {}
+
virtual ~ConnectionInterface() = default;
/**
* Indicates that the user is now done with this connection. Users MUST call either
* this method or indicateFailure() before returning the connection to its pool.
*/
- virtual void indicateSuccess() = 0;
+ void indicateSuccess();
/**
* Indicates that a connection has failed. This will prevent the connection
* from re-entering the connection pool. Users MUST call either this method or
* indicateSuccess() before returning connections to the pool.
*/
- virtual void indicateFailure(Status status) = 0;
+ void indicateFailure(Status status);
/**
* This method updates a 'liveness' timestamp to avoid unnecessarily refreshing
@@ -248,7 +254,7 @@ public:
* back in without use, one would expect an indicateSuccess without an indicateUsed. Only if we
* checked it out and did work would we call indicateUsed.
*/
- virtual void indicateUsed() = 0;
+ void indicateUsed();
/**
* The HostAndPort for the connection. This should be the same as the
@@ -262,25 +268,33 @@ public:
*/
virtual bool isHealthy() = 0;
-protected:
- /**
- * Making these protected makes the definitions available to override in
- * children.
- */
- using SetupCallback = stdx::function<void(ConnectionInterface*, Status)>;
- using RefreshCallback = stdx::function<void(ConnectionInterface*, Status)>;
-
-private:
/**
* Returns the last used time point for the connection
*/
- virtual Date_t getLastUsed() const = 0;
+ Date_t getLastUsed() const;
/**
* Returns the status associated with the connection. If the status is not
* OK, the connection will not be returned to the pool.
*/
- virtual const Status& getStatus() const = 0;
+ const Status& getStatus() const;
+
+ /**
+ * Get the generation of the connection. This is used to track whether to
+ * continue using a connection after a call to dropConnections() by noting
+ * if the generation on the specific pool is the same as the generation on
+ * a connection (if not the connection is from a previous era and should
+ * not be re-used).
+ */
+ size_t getGeneration() const;
+
+protected:
+ /**
+ * Making these protected makes the definitions available to override in
+ * children.
+ */
+ using SetupCallback = stdx::function<void(ConnectionInterface*, Status)>;
+ using RefreshCallback = stdx::function<void(ConnectionInterface*, Status)>;
/**
* Sets up the connection. This should include connection + auth + any
@@ -291,7 +305,7 @@ private:
/**
* Resets the connection's state to kConnectionStateUnknown for the next user.
*/
- virtual void resetToUnknown() = 0;
+ void resetToUnknown();
/**
* Refreshes the connection. This should involve a network round trip and
@@ -299,14 +313,10 @@ private:
*/
virtual void refresh(Milliseconds timeout, RefreshCallback cb) = 0;
- /**
- * Get the generation of the connection. This is used to track whether to
- * continue using a connection after a call to dropConnections() by noting
- * if the generation on the specific pool is the same as the generation on
- * a connection (if not the connection is from a previous era and should
- * not be re-used).
- */
- virtual size_t getGeneration() const = 0;
+private:
+ size_t _generation;
+ Date_t _lastUsed;
+ Status _status = ConnectionPool::kConnectionStateUnknown;
};
/**
diff --git a/src/mongo/executor/connection_pool_test_fixture.cpp b/src/mongo/executor/connection_pool_test_fixture.cpp
index f032b81039e..9e5f5338c47 100644
--- a/src/mongo/executor/connection_pool_test_fixture.cpp
+++ b/src/mongo/executor/connection_pool_test_fixture.cpp
@@ -77,29 +77,21 @@ void TimerImpl::fireIfNecessary() {
}
}
+Date_t TimerImpl::now() {
+ return _global->now();
+}
+
std::set<TimerImpl*> TimerImpl::_timers;
ConnectionImpl::ConnectionImpl(const HostAndPort& hostAndPort, size_t generation, PoolImpl* global)
- : _hostAndPort(hostAndPort),
+ : ConnectionInterface(generation),
+ _hostAndPort(hostAndPort),
_timer(global),
_global(global),
- _id(_idCounter++),
- _generation(generation) {}
-
-void ConnectionImpl::indicateUsed() {
- _lastUsed = _global->now();
-}
-
-void ConnectionImpl::indicateSuccess() {
- _status = Status::OK();
-}
-
-void ConnectionImpl::indicateFailure(Status status) {
- _status = std::move(status);
-}
+ _id(_idCounter++) {}
-void ConnectionImpl::resetToUnknown() {
- _status = ConnectionPool::kConnectionStateUnknown;
+Date_t ConnectionImpl::now() {
+ return _timer.now();
}
size_t ConnectionImpl::id() const {
@@ -168,14 +160,6 @@ size_t ConnectionImpl::refreshQueueDepth() {
return _refreshQueue.size();
}
-Date_t ConnectionImpl::getLastUsed() const {
- return _lastUsed;
-}
-
-const Status& ConnectionImpl::getStatus() const {
- return _status;
-}
-
void ConnectionImpl::setTimeout(Milliseconds timeout, TimeoutCallback cb) {
_timer.setTimeout(timeout, cb);
}
@@ -227,10 +211,6 @@ void ConnectionImpl::refresh(Milliseconds timeout, RefreshCallback cb) {
}
}
-size_t ConnectionImpl::getGeneration() const {
- return _generation;
-}
-
std::deque<ConnectionImpl::PushSetupCallback> ConnectionImpl::_pushSetupQueue;
std::deque<ConnectionImpl::PushRefreshCallback> ConnectionImpl::_pushRefreshQueue;
std::deque<ConnectionImpl*> ConnectionImpl::_setupQueue;
diff --git a/src/mongo/executor/connection_pool_test_fixture.h b/src/mongo/executor/connection_pool_test_fixture.h
index e9215e3244f..eb3760d7b5b 100644
--- a/src/mongo/executor/connection_pool_test_fixture.h
+++ b/src/mongo/executor/connection_pool_test_fixture.h
@@ -53,6 +53,8 @@ public:
void cancelTimeout() override;
+ Date_t now() override;
+
// launches all timers for whom now() has passed
static void fireIfNecessary();
@@ -83,11 +85,6 @@ public:
size_t id() const;
- void indicateSuccess() override;
- void indicateFailure(Status status) override;
-
- void resetToUnknown() override;
-
const HostAndPort& getHostAndPort() const override;
transport::ConnectSSLMode getSslMode() const override {
return transport::kGlobalSSLMode;
@@ -108,32 +105,23 @@ public:
static void pushRefresh(Status status);
static size_t refreshQueueDepth();
-private:
- void indicateUsed() override;
-
- Date_t getLastUsed() const override;
-
- const Status& getStatus() const override;
-
void setTimeout(Milliseconds timeout, TimeoutCallback cb) override;
void cancelTimeout() override;
+ Date_t now() override;
+
+private:
void setup(Milliseconds timeout, SetupCallback cb) override;
void refresh(Milliseconds timeout, RefreshCallback cb) override;
- size_t getGeneration() const override;
-
HostAndPort _hostAndPort;
- Date_t _lastUsed;
- Status _status = Status::OK();
SetupCallback _setupCallback;
RefreshCallback _refreshCallback;
TimerImpl _timer;
PoolImpl* _global;
size_t _id;
- size_t _generation;
// Answer queues
static std::deque<PushSetupCallback> _pushSetupQueue;
diff --git a/src/mongo/executor/connection_pool_tl.cpp b/src/mongo/executor/connection_pool_tl.cpp
index 344fcac81cc..14c73d50cba 100644
--- a/src/mongo/executor/connection_pool_tl.cpp
+++ b/src/mongo/executor/connection_pool_tl.cpp
@@ -115,12 +115,8 @@ void TLTimer::cancelTimeout() {
_timer->cancel();
}
-void TLConnection::indicateSuccess() {
- _status = Status::OK();
-}
-
-void TLConnection::indicateFailure(Status status) {
- _status = std::move(status);
+Date_t TLTimer::now() {
+ return _reactor->now();
}
const HostAndPort& TLConnection::getHostAndPort() const {
@@ -139,20 +135,6 @@ AsyncDBClient* TLConnection::client() {
return _client.get();
}
-void TLConnection::indicateUsed() {
- // It is illegal to attempt to use a connection after calling indicateFailure().
- invariant(_status.isOK() || _status == ConnectionPool::kConnectionStateUnknown);
- _lastUsed = _reactor->now();
-}
-
-Date_t TLConnection::getLastUsed() const {
- return _lastUsed;
-}
-
-const Status& TLConnection::getStatus() const {
- return _status;
-}
-
void TLConnection::setTimeout(Milliseconds timeout, TimeoutCallback cb) {
auto anchor = shared_from_this();
_timer->setTimeout(timeout, [ cb = std::move(cb), anchor = std::move(anchor) ] { cb(); });
@@ -292,10 +274,6 @@ void TLConnection::setup(Milliseconds timeout, SetupCallback cb) {
LOG(2) << "Finished connection setup.";
}
-void TLConnection::resetToUnknown() {
- _status = ConnectionPool::kConnectionStateUnknown;
-}
-
void TLConnection::refresh(Milliseconds timeout, RefreshCallback cb) {
auto anchor = shared_from_this();
@@ -309,10 +287,10 @@ void TLConnection::refresh(Milliseconds timeout, RefreshCallback cb) {
return;
}
- _status = {ErrorCodes::HostUnreachable, "Timed out refreshing host"};
+ indicateFailure({ErrorCodes::HostUnreachable, "Timed out refreshing host"});
_client->cancel();
- handler->promise.setError(_status);
+ handler->promise.setError(getStatus());
});
_client
@@ -328,17 +306,18 @@ void TLConnection::refresh(Milliseconds timeout, RefreshCallback cb) {
cancelTimeout();
- _status = status;
if (status.isOK()) {
+ indicateSuccess();
handler->promise.emplaceValue();
} else {
+ indicateFailure(status);
handler->promise.setError(status);
}
});
}
-size_t TLConnection::getGeneration() const {
- return _generation;
+Date_t TLConnection::now() {
+ return _reactor->now();
}
void TLConnection::cancelAsync() {
diff --git a/src/mongo/executor/connection_pool_tl.h b/src/mongo/executor/connection_pool_tl.h
index e97dda4c44e..3c40b62047d 100644
--- a/src/mongo/executor/connection_pool_tl.h
+++ b/src/mongo/executor/connection_pool_tl.h
@@ -112,6 +112,7 @@ public:
void setTimeout(Milliseconds timeout, TimeoutCallback cb) override;
void cancelTimeout() override;
+ Date_t now() override;
private:
transport::ReactorHandle _reactor;
@@ -127,13 +128,13 @@ public:
transport::ConnectSSLMode sslMode,
size_t generation,
NetworkConnectionHook* onConnectHook)
- : TLTypeFactory::Type(factory),
+ : ConnectionInterface(generation),
+ TLTypeFactory::Type(factory),
_reactor(reactor),
_serviceContext(serviceContext),
_timer(factory->makeTimer()),
_peer(std::move(peer)),
_sslMode(sslMode),
- _generation(generation),
_onConnectHook(onConnectHook) {}
~TLConnection() {
// Release must be the first expression of this dtor
@@ -144,27 +145,19 @@ public:
cancelAsync();
}
- void indicateSuccess() override;
- void indicateFailure(Status status) override;
- void indicateUsed() override;
const HostAndPort& getHostAndPort() const override;
transport::ConnectSSLMode getSslMode() const override;
bool isHealthy() override;
AsyncDBClient* client();
+ Date_t now() override;
private:
- Date_t getLastUsed() const override;
- const Status& getStatus() const override;
-
void setTimeout(Milliseconds timeout, TimeoutCallback cb) override;
void cancelTimeout() override;
void setup(Milliseconds timeout, SetupCallback cb) override;
- void resetToUnknown() override;
void refresh(Milliseconds timeout, RefreshCallback cb) override;
void cancelAsync();
- size_t getGeneration() const override;
-
private:
transport::ReactorHandle _reactor;
ServiceContext* const _serviceContext;
@@ -172,11 +165,8 @@ private:
HostAndPort _peer;
transport::ConnectSSLMode _sslMode;
- size_t _generation;
NetworkConnectionHook* const _onConnectHook;
AsyncDBClient::Handle _client;
- Date_t _lastUsed;
- Status _status = ConnectionPool::kConnectionStateUnknown;
};
} // namespace connection_pool_asio