diff options
author | Mark Benvenuto <mark.benvenuto@mongodb.com> | 2021-02-22 18:35:46 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-02-23 00:17:20 +0000 |
commit | 137bd47d9138fe9bbb0e1fc81dfbbce2cfd35a96 (patch) | |
tree | 5e5fe10d6e485b0aa92872ad8dcf367150d80c6d /src | |
parent | 99c7dcf5c9630754051e0d6d7ca8604cda4d6646 (diff) | |
download | mongo-137bd47d9138fe9bbb0e1fc81dfbbce2cfd35a96.tar.gz |
SERVER-54139 Use a connection pool for curl handles
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/s/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/shell/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/util/net/SConscript | 21 | ||||
-rw-r--r-- | src/mongo/util/net/http_client.cpp | 61 | ||||
-rw-r--r-- | src/mongo/util/net/http_client.h | 39 | ||||
-rw-r--r-- | src/mongo/util/net/http_client_curl.cpp | 673 | ||||
-rw-r--r-- | src/mongo/util/net/http_client_none.cpp | 29 | ||||
-rw-r--r-- | src/mongo/util/net/http_client_winhttp.cpp | 27 | ||||
-rw-r--r-- | src/mongo/util/options_parser/options_parser.cpp | 2 |
10 files changed, 678 insertions, 177 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 9770ed687ec..38a5d3dd81c 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -2145,6 +2145,7 @@ env.Library( '$BUILD_DIR/mongo/util/cmdline_utils/cmdline_utils', '$BUILD_DIR/mongo/util/concurrency/thread_pool', '$BUILD_DIR/mongo/util/latch_analyzer' if get_option('use-diagnostic-latches') == 'on' else [], + '$BUILD_DIR/mongo/util/net/http_client_impl', '$BUILD_DIR/mongo/util/net/ssl_manager', '$BUILD_DIR/mongo/util/signal_handlers', '$BUILD_DIR/mongo/watchdog/watchdog_mongod', diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index 8cb2e89dbdc..c1b13f8a89e 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -447,6 +447,7 @@ env.Library( '$BUILD_DIR/mongo/util/clock_sources', '$BUILD_DIR/mongo/util/fail_point', '$BUILD_DIR/mongo/util/latch_analyzer' if get_option('use-diagnostic-latches') == 'on' else [], + '$BUILD_DIR/mongo/util/net/http_client_impl', '$BUILD_DIR/mongo/util/net/ssl_options_server' if get_option('ssl') == 'on' else '', '$BUILD_DIR/mongo/util/ntservice', '$BUILD_DIR/mongo/util/options_parser/options_parser', diff --git a/src/mongo/shell/SConscript b/src/mongo/shell/SConscript index 99d47a9175a..8a201b1a59e 100644 --- a/src/mongo/shell/SConscript +++ b/src/mongo/shell/SConscript @@ -251,6 +251,7 @@ if not has_option('noshell') and usemozjs: "$BUILD_DIR/mongo/transport/message_compressor", "$BUILD_DIR/mongo/transport/message_compressor_options_client", "$BUILD_DIR/mongo/transport/transport_layer_manager", + "$BUILD_DIR/mongo/util/net/http_client_impl", "$BUILD_DIR/mongo/util/net/network", "$BUILD_DIR/mongo/util/net/ssl_options_client" if get_option('ssl') == 'on' else '', "$BUILD_DIR/mongo/util/options_parser/options_parser_init", diff --git a/src/mongo/util/net/SConscript b/src/mongo/util/net/SConscript index 45deefd544c..b8c397f1e93 100644 --- a/src/mongo/util/net/SConscript +++ b/src/mongo/util/net/SConscript @@ -177,9 +177,20 @@ else: ], ) +env.Library( + target='http_client', + source=[ + 'http_client.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + ], +) + + if http_client == "off": env.Library( - target='http_client', + target='http_client_impl', source=[ 'http_client_none.cpp', ], @@ -189,12 +200,18 @@ if http_client == "off": ) else: env.Library( - target='http_client', + target='http_client_impl', source=[ 'http_client_winhttp.cpp' if env.TargetOSIs('windows') else 'http_client_curl.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/base', + 'http_client', + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/executor/connection_pool_executor', + '$BUILD_DIR/mongo/util/alarm', + 'network', ], SYSLIBDEPS=[ 'winhttp' if env.TargetOSIs('windows') else 'curl', diff --git a/src/mongo/util/net/http_client.cpp b/src/mongo/util/net/http_client.cpp new file mode 100644 index 00000000000..903385e3b4e --- /dev/null +++ b/src/mongo/util/net/http_client.cpp @@ -0,0 +1,61 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/util/net/http_client.h" +#include "mongo/base/status.h" + +namespace mongo { + +namespace { +HttpClientProvider* _factory{nullptr}; +} + +HttpClientProvider::~HttpClientProvider() {} + +void registerHTTPClientProvider(HttpClientProvider* factory) { + invariant(_factory == nullptr); + _factory = factory; +} + +std::unique_ptr<HttpClient> HttpClient::create() { + invariant(_factory != nullptr); + return _factory->create(); +} + +std::unique_ptr<HttpClient> HttpClient::createWithoutConnectionPool() { + invariant(_factory != nullptr); + return _factory->createWithoutConnectionPool(); +} + +BSONObj HttpClient::getServerStatus() { + invariant(_factory != nullptr); + return _factory->getServerStatus(); +} + +} // namespace mongo diff --git a/src/mongo/util/net/http_client.h b/src/mongo/util/net/http_client.h index 5c0a1916918..e47d4ea9582 100644 --- a/src/mongo/util/net/http_client.h +++ b/src/mongo/util/net/http_client.h @@ -124,6 +124,15 @@ public: static std::unique_ptr<HttpClient> create(); /** + * Factory method provided by client implementation. + * + * The connection pool requires the ability to spawn threads which is not allowed through + * options parsing. Callers should default to create() unless they are calling into the + * HttpClient before thread spawning is allowed. + */ + static std::unique_ptr<HttpClient> createWithoutConnectionPool(); + + /** * Content for ServerStatus http_client section. */ static BSONObj getServerStatus(); @@ -138,4 +147,34 @@ private: } }; +/** + * HttpClientProvider is the factory behind the HttpClient + * + * This exists as a level-of-indirection to break link graph cycles. + */ +class HttpClientProvider { +public: + virtual ~HttpClientProvider(); + + /** + * Factory method provided by client implementation. + */ + virtual std::unique_ptr<HttpClient> create() = 0; + + /** + * Factory method provided by client implementation. + */ + virtual std::unique_ptr<HttpClient> createWithoutConnectionPool() = 0; + + /** + * Content for ServerStatus http_client section. + */ + virtual BSONObj getServerStatus() = 0; +}; + +/** + * Register HTTP Client provider + */ +void registerHTTPClientProvider(HttpClientProvider* factory); + } // namespace mongo diff --git a/src/mongo/util/net/http_client_curl.cpp b/src/mongo/util/net/http_client_curl.cpp index c134d0a46b1..66d0cbb36b6 100644 --- a/src/mongo/util/net/http_client_curl.cpp +++ b/src/mongo/util/net/http_client_curl.cpp @@ -32,6 +32,7 @@ #include <cstddef> #include <curl/curl.h> #include <curl/easy.h> +#include <memory> #include <string> #include "mongo/base/data_builder.h" @@ -39,16 +40,54 @@ #include "mongo/base/data_range_cursor.h" #include "mongo/base/init.h" #include "mongo/base/status.h" +#include "mongo/base/status_with.h" #include "mongo/base/string_data.h" #include "mongo/bson/bsonobj.h" #include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/commands/server_status.h" +#include "mongo/executor/connection_pool.h" +#include "mongo/executor/connection_pool_stats.h" #include "mongo/platform/mutex.h" +#include "mongo/stdx/unordered_map.h" +#include "mongo/transport/transport_layer.h" +#include "mongo/util/alarm.h" +#include "mongo/util/alarm_runner_background_thread.h" #include "mongo/util/assert_util.h" +#include "mongo/util/concurrency/thread_pool.h" +#include "mongo/util/functional.h" +#include "mongo/util/net/hostandport.h" #include "mongo/util/net/http_client.h" +#include "mongo/util/processinfo.h" +#include "mongo/util/strong_weak_finish_line.h" +#include "mongo/util/system_clock_source.h" +#include "mongo/util/timer.h" + namespace mongo { namespace { +using namespace executor; + +/** + * Curl Protocol configuration supported by HttpClient + */ +enum class Protocols { + // Allow either http or https, unsafe + kHttpOrHttps, + + // Allow https only + kHttpsOnly, +}; + +// Connection pool talk in terms of Mongo's SSL configuration. +// These functions provide a way to map back and forth between them. +transport::ConnectSSLMode mapProtocolToSSLMode(Protocols protocol) { + return (protocol == Protocols::kHttpsOnly) ? transport::kEnableSSL : transport::kDisableSSL; +} + +Protocols mapSSLModeToProtocol(transport::ConnectSSLMode sslMode) { + return (sslMode == transport::kEnableSSL) ? Protocols::kHttpsOnly : Protocols::kHttpOrHttps; +} class CurlLibraryManager { public: @@ -62,9 +101,6 @@ public: CurlLibraryManager() = default; ~CurlLibraryManager() { - if (_share) { - curl_share_cleanup(_share); - } // Ordering matters: curl_global_cleanup() must happen last. if (_initialized) { curl_global_cleanup(); @@ -77,18 +113,9 @@ public: return status; } - status = _initializeShare(); - if (!status.isOK()) { - return status; - } - return Status::OK(); } - CURLSH* getShareHandle() const { - return _share; - } - private: Status _initializeGlobal() { if (_initialized) { @@ -110,73 +137,8 @@ private: return Status::OK(); } - Status _initializeShare() { - invariant(_initialized); - if (_share) { - return Status::OK(); - } - - _share = curl_share_init(); - curl_share_setopt(_share, CURLSHOPT_SHARE, CURL_LOCK_DATA_DNS); - curl_share_setopt(_share, CURLSHOPT_SHARE, CURL_LOCK_DATA_CONNECT); - curl_share_setopt(_share, CURLSHOPT_SHARE, CURL_LOCK_DATA_SSL_SESSION); - curl_share_setopt(_share, CURLSHOPT_USERDATA, this); - curl_share_setopt(_share, CURLSHOPT_LOCKFUNC, _lockShare); - curl_share_setopt(_share, CURLSHOPT_UNLOCKFUNC, _unlockShare); - - return Status::OK(); - } - - static void _lockShare(CURL*, curl_lock_data lock_data, curl_lock_access, void* ctx) { - // curl_lock_access maps to shared and single access, i.e. a read and exclusive lock - // except the unlock method does not have curl_lock_access as a parameter so we map - // all lock requests to regular mutexes - switch (lock_data) { - case CURL_LOCK_DATA_SHARE: - reinterpret_cast<CurlLibraryManager*>(ctx)->_mutexShare.lock(); - break; - case CURL_LOCK_DATA_DNS: - reinterpret_cast<CurlLibraryManager*>(ctx)->_mutexDns.lock(); - break; - case CURL_LOCK_DATA_SSL_SESSION: - reinterpret_cast<CurlLibraryManager*>(ctx)->_mutexSSLSession.lock(); - break; - case CURL_LOCK_DATA_CONNECT: - reinterpret_cast<CurlLibraryManager*>(ctx)->_mutexConnect.lock(); - break; - default: - fassert(5185801, "Unsupported curl lock type"); - } - } - - - static void _unlockShare(CURL*, curl_lock_data lock_data, void* ctx) { - switch (lock_data) { - case CURL_LOCK_DATA_SHARE: - reinterpret_cast<CurlLibraryManager*>(ctx)->_mutexShare.unlock(); - break; - case CURL_LOCK_DATA_DNS: - reinterpret_cast<CurlLibraryManager*>(ctx)->_mutexDns.unlock(); - break; - case CURL_LOCK_DATA_SSL_SESSION: - reinterpret_cast<CurlLibraryManager*>(ctx)->_mutexSSLSession.unlock(); - break; - case CURL_LOCK_DATA_CONNECT: - reinterpret_cast<CurlLibraryManager*>(ctx)->_mutexConnect.unlock(); - break; - default: - fassert(5185802, "Unsupported curl unlock type"); - } - } - private: bool _initialized = false; - CURLSH* _share = nullptr; - - Mutex _mutexDns = MONGO_MAKE_LATCH("CurlLibraryManager::ShareDns"); - Mutex _mutexConnect = MONGO_MAKE_LATCH("CurlLibraryManager::ShareConnect"); - Mutex _mutexSSLSession = MONGO_MAKE_LATCH("CurlLibraryManager::ShareSSLSession"); - Mutex _mutexShare = MONGO_MAKE_LATCH("CurlLibraryManager::ShareLock"); } curlLibraryManager; @@ -224,7 +186,7 @@ struct CurlEasyCleanup { } } }; -using CurlHandle = std::unique_ptr<CURL, CurlEasyCleanup>; +using CurlEasyHandle = std::unique_ptr<CURL, CurlEasyCleanup>; struct CurlSlistFreeAll { void operator()(curl_slist* list) { @@ -235,65 +197,442 @@ struct CurlSlistFreeAll { }; using CurlSlist = std::unique_ptr<curl_slist, CurlSlistFreeAll>; -class CurlHttpClient final : public HttpClient { -public: - CurlHttpClient() { - // Initialize a base handle with common settings. - // Methods like requireHTTPS() will operate on this - // base handle. - _handle.reset(curl_easy_init()); - uassert(ErrorCodes::InternalError, "Curl initialization failed", _handle); - - curl_easy_setopt(_handle.get(), CURLOPT_CONNECTTIMEOUT, longSeconds(kConnectionTimeout)); - curl_easy_setopt(_handle.get(), CURLOPT_FOLLOWLOCATION, 0); - curl_easy_setopt(_handle.get(), CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1); - curl_easy_setopt(_handle.get(), CURLOPT_NOSIGNAL, 1); - curl_easy_setopt(_handle.get(), CURLOPT_PROTOCOLS, CURLPROTO_HTTPS); + +long longSeconds(Seconds tm) { + return static_cast<long>(durationCount<Seconds>(tm)); +} + +CurlEasyHandle createCurlEasyHandle(Protocols protocol) { + CurlEasyHandle handle(curl_easy_init()); + uassert(ErrorCodes::InternalError, "Curl initialization failed", handle); + + curl_easy_setopt(handle.get(), CURLOPT_CONNECTTIMEOUT, longSeconds(kConnectionTimeout)); + curl_easy_setopt(handle.get(), CURLOPT_FOLLOWLOCATION, 0); + curl_easy_setopt(handle.get(), CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1); + curl_easy_setopt(handle.get(), CURLOPT_NOSIGNAL, 1); + curl_easy_setopt(handle.get(), CURLOPT_PROTOCOLS, CURLPROTO_HTTPS); #ifdef CURLOPT_TCP_KEEPALIVE - curl_easy_setopt(_handle.get(), CURLOPT_TCP_KEEPALIVE, 1); + curl_easy_setopt(handle.get(), CURLOPT_TCP_KEEPALIVE, 1); #endif - curl_easy_setopt(_handle.get(), CURLOPT_TIMEOUT, longSeconds(kTotalRequestTimeout)); + curl_easy_setopt(handle.get(), CURLOPT_TIMEOUT, longSeconds(kTotalRequestTimeout)); #if LIBCURL_VERSION_NUM > 0x072200 - // Requires >= 7.34.0 - curl_easy_setopt(_handle.get(), CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1_2); + // Requires >= 7.34.0 + curl_easy_setopt(handle.get(), CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1_2); +#endif + +#if LIBCURL_VERSION_NUM > 0x073400 + // Requires >= 7.52.0 + curl_easy_setopt(handle.get(), CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1_3); #endif - curl_easy_setopt(_handle.get(), CURLOPT_WRITEFUNCTION, WriteMemoryCallback); - curl_easy_setopt(_handle.get(), CURLOPT_HEADERFUNCTION, WriteMemoryCallback); - // TODO: CURLOPT_EXPECT_100_TIMEOUT_MS? - // TODO: consider making this configurable - // curl_easy_setopt(_handle.get(), CURLOPT_VERBOSE, 1); - // curl_easy_setopt(_handle.get(), CURLOPT_DEBUGFUNCTION , ???); + + curl_easy_setopt(handle.get(), CURLOPT_WRITEFUNCTION, WriteMemoryCallback); + curl_easy_setopt(handle.get(), CURLOPT_HEADERFUNCTION, WriteMemoryCallback); + + if (protocol == Protocols::kHttpOrHttps) { + curl_easy_setopt(handle.get(), CURLOPT_PROTOCOLS, CURLPROTO_HTTPS | CURLPROTO_HTTP); + } else { + curl_easy_setopt(handle.get(), CURLOPT_PROTOCOLS, CURLPROTO_HTTPS); } - void allowInsecureHTTP(bool allow) final { - if (allow) { - curl_easy_setopt(_handle.get(), CURLOPT_PROTOCOLS, CURLPROTO_HTTPS | CURLPROTO_HTTP); - } else { - curl_easy_setopt(_handle.get(), CURLOPT_PROTOCOLS, CURLPROTO_HTTPS); + // TODO: CURLOPT_EXPECT_100_TIMEOUT_MS? + // TODO: consider making this configurable, defaults to stderr + // curl_easy_setopt(handle.get(), CURLOPT_VERBOSE, 1); + // curl_easy_setopt(_handle.get(), CURLOPT_DEBUGFUNCTION , ???); + + return handle; +} + +ConnectionPool::Options makePoolOptions(Seconds timeout) { + ConnectionPool::Options opts; + opts.refreshTimeout = timeout; + opts.refreshRequirement = Seconds(60); + opts.hostTimeout = Seconds(120); + return opts; +} + +/* + * This implements the timer interface for the ConnectionPool. + * Timers will be expired in order on a single background thread. + */ +class CurlHandleTimer : public ConnectionPool::TimerInterface { +public: + explicit CurlHandleTimer(ClockSource* clockSource, std::shared_ptr<AlarmScheduler> scheduler) + : _clockSource(clockSource), _scheduler(std::move(scheduler)), _handle(nullptr) {} + + virtual ~CurlHandleTimer() { + if (_handle) { + _handle->cancel().ignore(); + } + } + + void setTimeout(Milliseconds timeout, TimeoutCallback cb) final { + auto res = _scheduler->alarmFromNow(timeout); + _handle = std::move(res.handle); + + std::move(res.future).getAsync([cb](Status status) { + if (status == ErrorCodes::CallbackCanceled) { + return; + } + + fassert(5413901, status); + cb(); + }); + } + + void cancelTimeout() final { + auto handle = std::move(_handle); + if (handle) { + handle->cancel().ignore(); + } + } + + Date_t now() final { + return _clockSource->now(); + } + +private: + ClockSource* const _clockSource; + std::shared_ptr<AlarmScheduler> _scheduler; + AlarmScheduler::SharedHandle _handle; +}; + +/** + * Type factory that manages the curl connection pool + */ +class CurlHandleTypeFactory : public executor::ConnectionPool::DependentTypeFactoryInterface { +public: + CurlHandleTypeFactory() + : _clockSource(SystemClockSource::get()), + _executor(std::make_shared<ThreadPool>(_makeThreadPoolOptions())), + _timerScheduler(std::make_shared<AlarmSchedulerPrecise>(_clockSource)), + _timerRunner({_timerScheduler}) {} + + std::shared_ptr<ConnectionPool::ConnectionInterface> makeConnection(const HostAndPort&, + transport::ConnectSSLMode, + size_t generation) final; + + std::shared_ptr<ConnectionPool::TimerInterface> makeTimer() final { + _start(); + return std::make_shared<CurlHandleTimer>(_clockSource, _timerScheduler); + } + + const std::shared_ptr<OutOfLineExecutor>& getExecutor() final { + return _executor; + } + + Date_t now() final { + return _clockSource->now(); + } + + void shutdown() final { + if (!_running) { + return; + } + _timerRunner.shutdown(); + + auto pool = checked_pointer_cast<ThreadPool>(_executor); + pool->shutdown(); + pool->join(); + } + +private: + void _start() { + if (_running) + return; + _timerRunner.start(); + + auto pool = checked_pointer_cast<ThreadPool>(_executor); + pool->startup(); + + _running = true; + } + + static inline ThreadPool::Options _makeThreadPoolOptions() { + ThreadPool::Options opts; + opts.poolName = "CurlConnPool"; + opts.maxThreads = ThreadPool::Options::kUnlimited; + opts.maxIdleThreadAge = Seconds{5}; + + return opts; + } + +private: + ClockSource* const _clockSource; + std::shared_ptr<OutOfLineExecutor> _executor; + std::shared_ptr<AlarmScheduler> _timerScheduler; + bool _running = false; + AlarmRunnerBackgroundThread _timerRunner; +}; + + +/** + * Curl handle that is managed by a connection pool + * + * The connection pool does not manage actual connections, just handles. Curl has automatica + * reconnect logic if it gets disconnected. Also, HTTP connections are cheaper then MongoDB. + */ +class PooledCurlHandle : public ConnectionPool::ConnectionInterface, + public std::enable_shared_from_this<PooledCurlHandle> { +public: + PooledCurlHandle(std::shared_ptr<OutOfLineExecutor> executor, + ClockSource* clockSource, + const std::shared_ptr<AlarmScheduler>& alarmScheduler, + const HostAndPort& host, + Protocols protocol, + size_t generation) + : ConnectionInterface(generation), + _executor(std::move(executor)), + _alarmScheduler(alarmScheduler), + _timer(clockSource, alarmScheduler), + _target(host), + _protocol(protocol) {} + + + virtual ~PooledCurlHandle() = default; + + const HostAndPort& getHostAndPort() const final { + return _target; + } + + // This cannot block under any circumstances because the ConnectionPool is holding + // a mutex while calling isHealthy(). Since we don't have a good way of knowing whether + // the connection is healthy, just return true here. + bool isHealthy() final { + return true; + } + + void setTimeout(Milliseconds timeout, TimeoutCallback cb) final { + _timer.setTimeout(timeout, cb); + } + + void cancelTimeout() final { + _timer.cancelTimeout(); + } + + Date_t now() final { + return _timer.now(); + } + + transport::ConnectSSLMode getSslMode() const final { + return mapProtocolToSSLMode(_protocol); + } + + CURL* get() { + return _handle.get(); + } + +private: + void setup(Milliseconds timeout, SetupCallback cb) final; + + void refresh(Milliseconds timeout, RefreshCallback cb) final; + +private: + std::shared_ptr<OutOfLineExecutor> _executor; + std::shared_ptr<AlarmScheduler> _alarmScheduler; + CurlHandleTimer _timer; + HostAndPort _target; + + Protocols _protocol; + CurlEasyHandle _handle; +}; + +void PooledCurlHandle::setup(Milliseconds timeout, SetupCallback cb) { + auto anchor = shared_from_this(); + _executor->schedule([this, anchor, cb = std::move(cb)](auto execStatus) { + if (!execStatus.isOK()) { + cb(this, execStatus); + return; + } + + _handle = createCurlEasyHandle(_protocol); + + cb(this, Status::OK()); + }); +} + +void PooledCurlHandle::refresh(Milliseconds timeout, RefreshCallback cb) { + auto anchor = shared_from_this(); + _executor->schedule([this, anchor, cb = std::move(cb)](auto execStatus) { + if (!execStatus.isOK()) { + cb(this, execStatus); + return; + } + + // Tell the connection pool that it was a success. Curl reconnects seamlessly behind the + // scenes and there is no reliable way to test if the connection is still alive in a + // connection agnostic way. HTTP verbs like HEAD are not uniformly supported. + // + // The connection pool simply needs to prune handles on a timer for us. + indicateSuccess(); + indicateUsed(); + + cb(this, Status::OK()); + }); +} + +std::shared_ptr<executor::ConnectionPool::ConnectionInterface> +CurlHandleTypeFactory::makeConnection(const HostAndPort& host, + transport::ConnectSSLMode sslMode, + size_t generation) { + _start(); + + return std::make_shared<PooledCurlHandle>( + _executor, _clockSource, _timerScheduler, host, mapSSLModeToProtocol(sslMode), generation); +} + +/** + * Handle that manages connection pool semantics and returns handle to connection pool in + * destructor. + * + * Caller must call indiciateSuccess if they want the handle to be reused. + */ +class CurlHandle { +public: + CurlHandle(executor::ConnectionPool::ConnectionHandle handle, CURL* curlHandle) + : _poolHandle(std::move(handle)), _handle(curlHandle) {} + + ~CurlHandle() { + if (!_success) { + _poolHandle->indicateFailure( + Status(ErrorCodes::HostUnreachable, "unknown curl handle failure")); } } + CURL* get() { + return _handle; + } + + void indicateSuccess() { + _poolHandle->indicateSuccess(); + + // Tell the connection pool that we used the connection otherwise the pool will be believe + // the connection went idle since it is possible to checkout a connection and not actually + // use it. + _poolHandle->indicateUsed(); + + _success = true; + } + +private: + executor::ConnectionPool::ConnectionHandle _poolHandle; + bool _success = false; + + // Owned by _poolHandle + CURL* _handle; +}; + +/** + * Factory that returns curl handles managed in connection pool + */ +class CurlPool { +public: + CurlPool() + : _typeFactory(std::make_shared<CurlHandleTypeFactory>()), + _pool(std::make_shared<executor::ConnectionPool>( + _typeFactory, "Curl", makePoolOptions(Seconds(60)))) {} + + CurlHandle get(HostAndPort server, Protocols protocol); + +private: + std::shared_ptr<CurlHandleTypeFactory> _typeFactory; + std::shared_ptr<executor::ConnectionPool> _pool; +}; + +CurlHandle CurlPool::get(HostAndPort server, Protocols protocol) { + + auto sslMode = mapProtocolToSSLMode(protocol); + + auto semi = _pool->get(server, sslMode, Seconds(60)); + + StatusWith<executor::ConnectionPool::ConnectionHandle> swHandle = std::move(semi).getNoThrow(); + invariant(swHandle.isOK()); + + auto curlHandle = static_cast<PooledCurlHandle*>(swHandle.getValue().get())->get(); + + return CurlHandle(std::move(swHandle.getValue()), curlHandle); +} + +HostAndPort exactHostAndPortFromUrl(StringData url) { + // Treat the URL as a host and port + // URL: http(s)?://(host):(port)/... + // + constexpr StringData slashes = "//"_sd; + auto slashesIndex = url.find(slashes); + uassert(5413902, str::stream() << "//, URL: " << url, slashesIndex != std::string::npos); + + url = url.substr(slashesIndex + slashes.size()); + if (url.find('/') != std::string::npos) { + url = url.substr(0, url.find("/")); + } + + return HostAndPort(url); +} + +/** + * The connection pool requires the ability to spawn threads which is not allowed through + * options parsing. Callers should default to HttpConnectionPool::kUse unless they are calling + * into the HttpClient before thread spawning is allowed. + */ +enum class HttpConnectionPool { + kUse, + kDoNotUse, +}; +class CurlHttpClient final : public HttpClient { +public: + CurlHttpClient(HttpConnectionPool pool) : _pool(pool) {} + + void allowInsecureHTTP(bool allow) final { + _allowInsecure = allow; + } + void setHeaders(const std::vector<std::string>& headers) final { // Can't set on base handle because cURL doesn't deep-dup this field // and we don't want it getting overwritten while another thread is using it. + _headers = headers; } void setTimeout(Seconds timeout) final { - curl_easy_setopt(_handle.get(), CURLOPT_TIMEOUT, longSeconds(timeout)); + _timeout = timeout; } void setConnectTimeout(Seconds timeout) final { - curl_easy_setopt(_handle.get(), CURLOPT_CONNECTTIMEOUT, longSeconds(timeout)); + _connectTimeout = timeout; } HttpReply request(HttpMethod method, StringData url, ConstDataRange cdr = {nullptr, 0}) const final { - CurlHandle myHandle(curl_easy_duphandle(_handle.get())); - uassert(ErrorCodes::InternalError, "Curl initialization failed", myHandle); + auto protocol = _allowInsecure ? Protocols::kHttpOrHttps : Protocols::kHttpsOnly; + if (_pool == HttpConnectionPool::kUse) { + static CurlPool factory; + + auto server = exactHostAndPortFromUrl(url); + + CurlHandle handle(factory.get(server, protocol)); + auto reply = request(handle.get(), method, url, cdr); + + // indidicateFailure will be called if indicateSuccess is not called. + handle.indicateSuccess(); + return reply; + + } else { + // Make a request with a non-pooled handle. This is needed during server startup when + // thread spawning is not allowed which is required by the thread pool. + auto handle = createCurlEasyHandle(protocol); + return request(handle.get(), method, url, cdr); + } + } + +private: + HttpReply request(CURL* handle, HttpMethod method, StringData url, ConstDataRange cdr) const { + uassert(ErrorCodes::InternalError, "Curl initialization failed", handle); + + curl_easy_setopt(handle, CURLOPT_TIMEOUT, longSeconds(_timeout)); + + curl_easy_setopt(handle, CURLOPT_CONNECTTIMEOUT, longSeconds(_connectTimeout)); ConstDataRangeCursor cdrc(cdr); switch (method) { @@ -301,104 +640,118 @@ public: uassert(ErrorCodes::BadValue, "Request body not permitted with GET requests", cdr.length() == 0); + // Per https://curl.se/libcurl/c/CURLOPT_POST.html + // We need to reset the type of request we want to make when reusing the request + // curl_easy_setopt(handle, CURLOPT_HTTPGET, 1); break; case HttpMethod::kPOST: - curl_easy_setopt(myHandle.get(), CURLOPT_POST, 1); + curl_easy_setopt(handle, CURLOPT_PUT, 0); + curl_easy_setopt(handle, CURLOPT_POST, 1); - curl_easy_setopt(myHandle.get(), CURLOPT_READFUNCTION, ReadMemoryCallback); - curl_easy_setopt(myHandle.get(), CURLOPT_READDATA, &cdrc); - curl_easy_setopt(myHandle.get(), CURLOPT_POSTFIELDSIZE, (long)cdrc.length()); + curl_easy_setopt(handle, CURLOPT_READFUNCTION, ReadMemoryCallback); + curl_easy_setopt(handle, CURLOPT_READDATA, &cdrc); + curl_easy_setopt(handle, CURLOPT_POSTFIELDSIZE, (long)cdrc.length()); break; case HttpMethod::kPUT: - curl_easy_setopt(myHandle.get(), CURLOPT_PUT, 1); + curl_easy_setopt(handle, CURLOPT_POST, 0); + curl_easy_setopt(handle, CURLOPT_PUT, 1); - curl_easy_setopt(myHandle.get(), CURLOPT_READFUNCTION, ReadMemoryCallback); - curl_easy_setopt(myHandle.get(), CURLOPT_READDATA, &cdrc); - curl_easy_setopt(myHandle.get(), CURLOPT_INFILESIZE_LARGE, (long)cdrc.length()); + curl_easy_setopt(handle, CURLOPT_READFUNCTION, ReadMemoryCallback); + curl_easy_setopt(handle, CURLOPT_READDATA, &cdrc); + curl_easy_setopt(handle, CURLOPT_INFILESIZE_LARGE, (long)cdrc.length()); break; default: MONGO_UNREACHABLE; } const auto urlString = url.toString(); - curl_easy_setopt(myHandle.get(), CURLOPT_URL, urlString.c_str()); - curl_easy_setopt(myHandle.get(), CURLOPT_SHARE, curlLibraryManager.getShareHandle()); + curl_easy_setopt(handle, CURLOPT_URL, urlString.c_str()); DataBuilder dataBuilder(4096), headerBuilder(4096); - curl_easy_setopt(myHandle.get(), CURLOPT_WRITEDATA, &dataBuilder); - curl_easy_setopt(myHandle.get(), CURLOPT_HEADERDATA, &headerBuilder); + curl_easy_setopt(handle, CURLOPT_WRITEDATA, &dataBuilder); + curl_easy_setopt(handle, CURLOPT_HEADERDATA, &headerBuilder); curl_slist* chunk = curl_slist_append(nullptr, "Connection: keep-alive"); for (const auto& header : _headers) { chunk = curl_slist_append(chunk, header.c_str()); } - curl_easy_setopt(myHandle.get(), CURLOPT_HTTPHEADER, chunk); + curl_easy_setopt(handle, CURLOPT_HTTPHEADER, chunk); CurlSlist _headers(chunk); - CURLcode result = curl_easy_perform(myHandle.get()); + CURLcode result = curl_easy_perform(handle); uassert(ErrorCodes::OperationFailed, str::stream() << "Bad HTTP response from API server: " << curl_easy_strerror(result), result == CURLE_OK); long statusCode; - result = curl_easy_getinfo(myHandle.get(), CURLINFO_RESPONSE_CODE, &statusCode); + result = curl_easy_getinfo(handle, CURLINFO_RESPONSE_CODE, &statusCode); uassert(ErrorCodes::OperationFailed, str::stream() << "Unexpected error retrieving response: " << curl_easy_strerror(result), result == CURLE_OK); + return HttpReply(statusCode, std::move(headerBuilder), std::move(dataBuilder)); } private: - /** - * Helper for use with curl_easy_setopt which takes a vararg list, - * and expects a long, not the long long durationCount() returns. - */ - long longSeconds(Seconds tm) { - return static_cast<long>(durationCount<Seconds>(tm)); - } + std::vector<std::string> _headers; + HttpConnectionPool _pool; -private: - CurlHandle _handle; - std::vector<std::string> _headers; + bool _allowInsecure{false}; + Seconds _timeout; + Seconds _connectTimeout; }; -} // namespace +class HttpClientProviderImpl : public HttpClientProvider { +public: + HttpClientProviderImpl() { + registerHTTPClientProvider(this); + } -// Transitional API used by blockstore to trigger libcurl init -// until it's been migrated to use the HTTPClient API. -Status curlLibraryManager_initialize() { - return curlLibraryManager.initialize(); -} + std::unique_ptr<HttpClient> create() final { + uassertStatusOK(curlLibraryManager.initialize()); + return std::make_unique<CurlHttpClient>(HttpConnectionPool::kUse); + } -std::unique_ptr<HttpClient> HttpClient::create() { - uassertStatusOK(curlLibraryManager.initialize()); - return std::make_unique<CurlHttpClient>(); -} + std::unique_ptr<HttpClient> createWithoutConnectionPool() final { + uassertStatusOK(curlLibraryManager.initialize()); + return std::make_unique<CurlHttpClient>(HttpConnectionPool::kDoNotUse); + } -BSONObj HttpClient::getServerStatus() { + BSONObj getServerStatus() final { - BSONObjBuilder info; - info.append("type", "curl"); + BSONObjBuilder info; + info.append("type", "curl"); - { - BSONObjBuilder v(info.subobjStart("compiled")); - v.append("version", LIBCURL_VERSION); - v.append("version_num", LIBCURL_VERSION_NUM); - } + { + BSONObjBuilder v(info.subobjStart("compiled")); + v.append("version", LIBCURL_VERSION); + v.append("version_num", LIBCURL_VERSION_NUM); + } - { - auto* curl_info = curl_version_info(CURLVERSION_NOW); + { + auto* curl_info = curl_version_info(CURLVERSION_NOW); - BSONObjBuilder v(info.subobjStart("running")); - v.append("version", curl_info->version); - v.append("version_num", static_cast<int>(curl_info->version_num)); + BSONObjBuilder v(info.subobjStart("running")); + v.append("version", curl_info->version); + v.append("version_num", static_cast<int>(curl_info->version_num)); + } + + return info.obj(); } - return info.obj(); +} provider; + +} // namespace + +// Transitional API used by blockstore to trigger libcurl init +// until it's been migrated to use the HTTPClient API. +Status curlLibraryManager_initialize() { + return curlLibraryManager.initialize(); } + } // namespace mongo diff --git a/src/mongo/util/net/http_client_none.cpp b/src/mongo/util/net/http_client_none.cpp index 6a3d11e6b1a..690748807bf 100644 --- a/src/mongo/util/net/http_client_none.cpp +++ b/src/mongo/util/net/http_client_none.cpp @@ -32,12 +32,29 @@ namespace mongo { -std::unique_ptr<HttpClient> HttpClient::create() { - return nullptr; -} +namespace { -BSONObj HttpClient::getServerStatus() { - return BSONObj(); -} +class HttpClientProviderImpl : public HttpClientProvider { +public: + HttpClientProviderImpl() { + registerHTTPClientProvider(this); + } + std::unique_ptr<HttpClient> create() final { + return nullptr; + } + + std::unique_ptr<HttpClient> createWithoutConnectionPool() final { + return nullptr; + } + + /** + * Content for ServerStatus http_client section. + */ + BSONObj getServerStatus() final { + return BSONObj(); + } +} provider; + +} // namespace } // namespace mongo diff --git a/src/mongo/util/net/http_client_winhttp.cpp b/src/mongo/util/net/http_client_winhttp.cpp index 2368f2826c0..9e877966167 100644 --- a/src/mongo/util/net/http_client_winhttp.cpp +++ b/src/mongo/util/net/http_client_winhttp.cpp @@ -322,15 +322,26 @@ private: Seconds _timeout = kTotalRequestTimeout; }; -} // namespace +class HttpClientProviderImpl : public HttpClientProvider { +public: + HttpClientProviderImpl() { + registerHTTPClientProvider(this); + } -std::unique_ptr<HttpClient> HttpClient::create() { - return std::make_unique<WinHttpClient>(); -} + std::unique_ptr<HttpClient> create() final { + return std::make_unique<WinHttpClient>(); + } -BSONObj HttpClient::getServerStatus() { - return BSON("type" - << "winhttp"); -} + std::unique_ptr<HttpClient> createWithoutConnectionPool() final { + return std::make_unique<WinHttpClient>(); + } + + BSONObj getServerStatus() final { + return BSON("type" + << "winhttp"); + } +} provider; + +} // namespace } // namespace mongo diff --git a/src/mongo/util/options_parser/options_parser.cpp b/src/mongo/util/options_parser/options_parser.cpp index a01d229514c..315c62dd448 100644 --- a/src/mongo/util/options_parser/options_parser.cpp +++ b/src/mongo/util/options_parser/options_parser.cpp @@ -525,7 +525,7 @@ private: std::string runYAMLRestExpansion(StringData url, Seconds timeout) { - auto client = HttpClient::create(); + auto client = HttpClient::createWithoutConnectionPool(); uassert( ErrorCodes::OperationFailed, "No HTTP Client available in this build of MongoDB", client); |