/** * Copyright (C) 2015 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include
#include
#include "mongo/base/disallow_copying.h"
#include "mongo/stdx/chrono.h"
#include "mongo/stdx/functional.h"
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/unordered_map.h"
#include "mongo/util/net/hostandport.h"
#include "mongo/util/time_support.h"
namespace mongo {
class BSONObjBuilder;
namespace executor {
struct ConnectionPoolStats;
/**
* The actual user visible connection pool.
*
* This pool is constructed with a DependentTypeFactoryInterface which provides the tools it
* needs to generate connections and manage them over time.
*
* The overall workflow here is to manage separate pools for each unique
* HostAndPort. See comments on the various Options for how the pool operates.
*/
class ConnectionPool {
class ConnectionHandleDeleter;
class SpecificPool;
public:
class ConnectionInterface;
class DependentTypeFactoryInterface;
class TimerInterface;
using ConnectionHandle = std::unique_ptr;
using GetConnectionCallback = stdx::function)>;
static constexpr Milliseconds kDefaultHostTimeout = Milliseconds(300000); // 5mins
static const size_t kDefaultMaxConns;
static const size_t kDefaultMinConns;
static const size_t kDefaultMaxConnecting;
static constexpr Milliseconds kDefaultRefreshRequirement = Milliseconds(60000); // 1min
static constexpr Milliseconds kDefaultRefreshTimeout = Milliseconds(20000); // 20secs
static const Status kConnectionStateUnknown;
struct Options {
Options() {}
/**
* The minimum number of connections to keep alive while the pool is in
* operation
*/
size_t minConnections = kDefaultMinConns;
/**
* The maximum number of connections to spawn for a host. This includes
* pending connections in setup and connections checked out of the pool
* as well as the obvious live connections in the pool.
*/
size_t maxConnections = kDefaultMaxConns;
/**
* The maximum number of processing connections for a host. This includes pending
* connections in setup/refresh. It's designed to rate limit connection storms rather than
* steady state processing (as maxConnections does).
*/
size_t maxConnecting = kDefaultMaxConnecting;
/**
* Amount of time to wait before timing out a refresh attempt
*/
Milliseconds refreshTimeout = kDefaultRefreshTimeout;
/**
* Amount of time a connection may be idle before it cannot be returned
* for a user request and must instead be checked out and refreshed
* before handing to a user.
*/
Milliseconds refreshRequirement = kDefaultRefreshRequirement;
/**
* Amount of time to keep a specific pool around without any checked
* out connections or new requests
*/
Milliseconds hostTimeout = kDefaultHostTimeout;
};
explicit ConnectionPool(std::unique_ptr impl,
std::string name,
Options options = Options{});
~ConnectionPool();
void dropConnections(const HostAndPort& hostAndPort);
void get(const HostAndPort& hostAndPort, Milliseconds timeout, GetConnectionCallback cb);
void appendConnectionStats(ConnectionPoolStats* stats) const;
size_t getNumConnectionsPerHost(const HostAndPort& hostAndPort) const;
private:
void returnConnection(ConnectionInterface* connection);
std::string _name;
// Options are set at startup and never changed at run time, so these are
// accessed outside the lock
const Options _options;
const std::unique_ptr _factory;
// The global mutex for specific pool access and the generation counter
mutable stdx::mutex _mutex;
stdx::unordered_map> _pools;
};
class ConnectionPool::ConnectionHandleDeleter {
public:
ConnectionHandleDeleter() = default;
ConnectionHandleDeleter(ConnectionPool* pool) : _pool(pool) {}
void operator()(ConnectionInterface* connection) {
if (_pool && connection)
_pool->returnConnection(connection);
}
private:
ConnectionPool* _pool = nullptr;
};
/**
* Interface for a basic timer
*
* Minimal interface sets a timer with a callback and cancels the timer.
*/
class ConnectionPool::TimerInterface {
MONGO_DISALLOW_COPYING(TimerInterface);
public:
TimerInterface() = default;
using TimeoutCallback = stdx::function;
virtual ~TimerInterface() = default;
/**
* Sets the timeout for the timer. Setting an already set timer should
* override the previous timer.
*/
virtual void setTimeout(Milliseconds timeout, TimeoutCallback cb) = 0;
/**
* It should be safe to cancel a previously canceled, or never set, timer.
*/
virtual void cancelTimeout() = 0;
};
/**
* Interface for connection pool connections
*
* Provides a minimal interface to manipulate connections within the pool,
* specifically callbacks to set them up (connect + auth + whatever else),
* refresh them (issue some kind of ping) and manage a timer.
*/
class ConnectionPool::ConnectionInterface : public TimerInterface {
MONGO_DISALLOW_COPYING(ConnectionInterface);
friend class ConnectionPool;
public:
ConnectionInterface() = default;
virtual ~ConnectionInterface() = default;
/**
* Indicates that the user is now done with this connection. Users MUST call either
* this method or indicateFailure() before returning the connection to its pool.
*/
virtual void indicateSuccess() = 0;
/**
* Indicates that a connection has failed. This will prevent the connection
* from re-entering the connection pool. Users MUST call either this method or
* indicateSuccess() before returning connections to the pool.
*/
virtual void indicateFailure(Status status) = 0;
/**
* The HostAndPort for the connection. This should be the same as the
* HostAndPort passed to DependentTypeFactoryInterface::makeConnection.
*/
virtual const HostAndPort& getHostAndPort() const = 0;
/**
* Check if the connection is healthy using some implementation defined condition.
*/
virtual bool isHealthy() = 0;
protected:
/**
* Making these protected makes the definitions available to override in
* children.
*/
using SetupCallback = stdx::function;
using RefreshCallback = stdx::function;
private:
/**
* This method updates a 'liveness' timestamp to avoid unnecessarily refreshing
* the connection.
*/
virtual void indicateUsed() = 0;
/**
* Returns the last used time point for the connection
*/
virtual Date_t getLastUsed() const = 0;
/**
* Returns the status associated with the connection. If the status is not
* OK, the connection will not be returned to the pool.
*/
virtual const Status& getStatus() const = 0;
/**
* Sets up the connection. This should include connection + auth + any
* other associated hooks.
*/
virtual void setup(Milliseconds timeout, SetupCallback cb) = 0;
/**
* Resets the connection's state to kConnectionStateUnknown for the next user.
*/
virtual void resetToUnknown() = 0;
/**
* Refreshes the connection. This should involve a network round trip and
* should strongly imply an active connection
*/
virtual void refresh(Milliseconds timeout, RefreshCallback cb) = 0;
/**
* Get the generation of the connection. This is used to track whether to
* continue using a connection after a call to dropConnections() by noting
* if the generation on the specific pool is the same as the generation on
* a connection (if not the connection is from a previous era and should
* not be re-used).
*/
virtual size_t getGeneration() const = 0;
};
/**
* Implementation interface for the connection pool
*
* This factory provides generators for connections, timers and a clock for the
* connection pool.
*/
class ConnectionPool::DependentTypeFactoryInterface {
MONGO_DISALLOW_COPYING(DependentTypeFactoryInterface);
public:
DependentTypeFactoryInterface() = default;
virtual ~DependentTypeFactoryInterface() = default;
/**
* Makes a new connection given a host and port
*/
virtual std::unique_ptr makeConnection(const HostAndPort& hostAndPort,
size_t generation) = 0;
/**
* Makes a new timer
*/
virtual std::unique_ptr makeTimer() = 0;
/**
* Returns the current time point
*/
virtual Date_t now() = 0;
};
} // namespace executor
} // namespace mongo