From 1e3803204cad03f72438df0613bfd87506755695 Mon Sep 17 00:00:00 2001 From: Mark Benvenuto Date: Mon, 22 Feb 2021 18:35:46 -0500 Subject: SERVER-54139 Use a connection pool for curl handles (cherry picked from commit 137bd47d9138fe9bbb0e1fc81dfbbce2cfd35a96) --- src/mongo/SConscript | 3 + src/mongo/util/net/SConscript | 22 +- src/mongo/util/net/http_client.cpp | 61 ++ src/mongo/util/net/http_client.h | 39 ++ src/mongo/util/net/http_client_curl.cpp | 692 +++++++++++++++++------ src/mongo/util/net/http_client_none.cpp | 29 +- src/mongo/util/net/http_client_winhttp.cpp | 27 +- src/mongo/util/options_parser/options_parser.cpp | 2 +- 8 files changed, 697 insertions(+), 178 deletions(-) create mode 100644 src/mongo/util/net/http_client.cpp diff --git a/src/mongo/SConscript b/src/mongo/SConscript index 6c78164c965..69fae0e0156 100644 --- a/src/mongo/SConscript +++ b/src/mongo/SConscript @@ -436,6 +436,7 @@ mongod = env.Program( 'util/elapsed_tracker', 'util/fail_point', 'util/latch_analyzer' if get_option('use-diagnostic-latches') == 'on' else [], + 'util/net/http_client_impl', 'util/net/network', 'util/ntservice', 'util/options_parser/options_parser_init', @@ -541,6 +542,7 @@ mongos = env.Program( 'util/clock_sources', 'util/fail_point', 'util/latch_analyzer' if get_option('use-diagnostic-latches') == 'on' else [], + 'util/net/http_client_impl', 'util/net/ssl_options_server' if get_option('ssl') == 'on' else '', 'util/ntservice', 'util/version_impl', @@ -650,6 +652,7 @@ if not has_option('noshell') and usemozjs: 'shell/shell_options_register', 'transport/message_compressor_options_client', "$BUILD_DIR/mongo/client/connection_string", + '$BUILD_DIR/mongo/util/net/http_client_impl', '$BUILD_DIR/mongo/util/net/ssl_options_client' if get_option('ssl') == 'on' else '', ], INSTALL_ALIAS=[ diff --git a/src/mongo/util/net/SConscript b/src/mongo/util/net/SConscript index ed24016b30d..24412313a97 100644 --- a/src/mongo/util/net/SConscript +++ b/src/mongo/util/net/SConscript @@ -170,9 +170,20 @@ else: ], ) +env.Library( + target='http_client', + source=[ + 'http_client.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + ], +) + + if http_client == "off": env.Library( - target='http_client', + target='http_client_impl', source=[ 'http_client_none.cpp', ], @@ -182,12 +193,19 @@ if http_client == "off": ) else: env.Library( - target='http_client', + target='http_client_impl', source=[ 'http_client_winhttp.cpp' if env.TargetOSIs('windows') else 'http_client_curl.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/base', + 'http_client', + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/executor/connection_pool_executor', + '$BUILD_DIR/mongo/util/alarm', + '$BUILD_DIR/mongo/util/concurrency/thread_pool', + 'network', ], SYSLIBDEPS=[ 'winhttp' if env.TargetOSIs('windows') else 'curl', diff --git a/src/mongo/util/net/http_client.cpp b/src/mongo/util/net/http_client.cpp new file mode 100644 index 00000000000..903385e3b4e --- /dev/null +++ b/src/mongo/util/net/http_client.cpp @@ -0,0 +1,61 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/util/net/http_client.h" +#include "mongo/base/status.h" + +namespace mongo { + +namespace { +HttpClientProvider* _factory{nullptr}; +} + +HttpClientProvider::~HttpClientProvider() {} + +void registerHTTPClientProvider(HttpClientProvider* factory) { + invariant(_factory == nullptr); + _factory = factory; +} + +std::unique_ptr HttpClient::create() { + invariant(_factory != nullptr); + return _factory->create(); +} + +std::unique_ptr HttpClient::createWithoutConnectionPool() { + invariant(_factory != nullptr); + return _factory->createWithoutConnectionPool(); +} + +BSONObj HttpClient::getServerStatus() { + invariant(_factory != nullptr); + return _factory->getServerStatus(); +} + +} // namespace mongo diff --git a/src/mongo/util/net/http_client.h b/src/mongo/util/net/http_client.h index 9f5767b51fd..1301e5e133f 100644 --- a/src/mongo/util/net/http_client.h +++ b/src/mongo/util/net/http_client.h @@ -89,10 +89,49 @@ public: */ static std::unique_ptr create(); + /** + * Factory method provided by client implementation. + * + * The connection pool requires the ability to spawn threads which is not allowed through + * options parsing. Callers should default to create() unless they are calling into the + * HttpClient before thread spawning is allowed. + */ + static std::unique_ptr createWithoutConnectionPool(); + /** * Content for ServerStatus http_client section. */ static BSONObj getServerStatus(); }; +/** + * HttpClientProvider is the factory behind the HttpClient + * + * This exists as a level-of-indirection to break link graph cycles. + */ +class HttpClientProvider { +public: + virtual ~HttpClientProvider(); + + /** + * Factory method provided by client implementation. + */ + virtual std::unique_ptr create() = 0; + + /** + * Factory method provided by client implementation. + */ + virtual std::unique_ptr createWithoutConnectionPool() = 0; + + /** + * Content for ServerStatus http_client section. + */ + virtual BSONObj getServerStatus() = 0; +}; + +/** + * Register HTTP Client provider + */ +void registerHTTPClientProvider(HttpClientProvider* factory); + } // namespace mongo diff --git a/src/mongo/util/net/http_client_curl.cpp b/src/mongo/util/net/http_client_curl.cpp index 008161f6d0c..c1bb62be267 100644 --- a/src/mongo/util/net/http_client_curl.cpp +++ b/src/mongo/util/net/http_client_curl.cpp @@ -34,6 +34,7 @@ #include #include #include +#include #include #include "mongo/base/data_builder.h" @@ -41,17 +42,55 @@ #include "mongo/base/data_range_cursor.h" #include "mongo/base/init.h" #include "mongo/base/status.h" +#include "mongo/base/status_with.h" #include "mongo/base/string_data.h" #include "mongo/bson/bsonobj.h" #include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/commands/server_status.h" +#include "mongo/executor/connection_pool.h" +#include "mongo/executor/connection_pool_stats.h" #include "mongo/platform/mutex.h" +#include "mongo/stdx/unordered_map.h" +#include "mongo/transport/transport_layer.h" +#include "mongo/util/alarm.h" +#include "mongo/util/alarm_runner_background_thread.h" #include "mongo/util/assert_util.h" #include "mongo/util/log.h" +#include "mongo/util/concurrency/thread_pool.h" +#include "mongo/util/functional.h" +#include "mongo/util/net/hostandport.h" #include "mongo/util/net/http_client.h" +#include "mongo/util/processinfo.h" +#include "mongo/util/strong_weak_finish_line.h" +#include "mongo/util/system_clock_source.h" +#include "mongo/util/timer.h" + namespace mongo { namespace { +using namespace executor; + +/** + * Curl Protocol configuration supported by HttpClient + */ +enum class Protocols { + // Allow either http or https, unsafe + kHttpOrHttps, + + // Allow https only + kHttpsOnly, +}; + +// Connection pool talk in terms of Mongo's SSL configuration. +// These functions provide a way to map back and forth between them. +transport::ConnectSSLMode mapProtocolToSSLMode(Protocols protocol) { + return (protocol == Protocols::kHttpsOnly) ? transport::kEnableSSL : transport::kDisableSSL; +} + +Protocols mapSSLModeToProtocol(transport::ConnectSSLMode sslMode) { + return (sslMode == transport::kEnableSSL) ? Protocols::kHttpsOnly : Protocols::kHttpOrHttps; +} class CurlLibraryManager { public: @@ -65,9 +104,6 @@ public: CurlLibraryManager() = default; ~CurlLibraryManager() { - if (_share) { - curl_share_cleanup(_share); - } // Ordering matters: curl_global_cleanup() must happen last. if (_initialized) { curl_global_cleanup(); @@ -80,18 +116,9 @@ public: return status; } - status = _initializeShare(); - if (!status.isOK()) { - return status; - } - return Status::OK(); } - CURLSH* getShareHandle() const { - return _share; - } - private: Status _initializeGlobal() { if (_initialized) { @@ -113,73 +140,9 @@ private: return Status::OK(); } - Status _initializeShare() { - invariant(_initialized); - if (_share) { - return Status::OK(); - } - - _share = curl_share_init(); - curl_share_setopt(_share, CURLSHOPT_SHARE, CURL_LOCK_DATA_DNS); - curl_share_setopt(_share, CURLSHOPT_SHARE, CURL_LOCK_DATA_CONNECT); - curl_share_setopt(_share, CURLSHOPT_SHARE, CURL_LOCK_DATA_SSL_SESSION); - curl_share_setopt(_share, CURLSHOPT_USERDATA, this); - curl_share_setopt(_share, CURLSHOPT_LOCKFUNC, _lockShare); - curl_share_setopt(_share, CURLSHOPT_UNLOCKFUNC, _unlockShare); - - return Status::OK(); - } - - - static void _lockShare(CURL*, curl_lock_data lock_data, curl_lock_access, void* ctx) { - // curl_lock_access maps to shared and single access, i.e. a read and exclusive lock - // except the unlock method does not have curl_lock_access as a parameter so we map - // all lock requests to regular mutexes - switch (lock_data) { - case CURL_LOCK_DATA_SHARE: - reinterpret_cast(ctx)->_mutexShare.lock(); - break; - case CURL_LOCK_DATA_DNS: - reinterpret_cast(ctx)->_mutexDns.lock(); - break; - case CURL_LOCK_DATA_SSL_SESSION: - reinterpret_cast(ctx)->_mutexSSLSession.lock(); - break; - case CURL_LOCK_DATA_CONNECT: - reinterpret_cast(ctx)->_mutexConnect.lock(); - break; - default: - fassert(5185801, "Unsupported curl lock type"); - } - } - - static void _unlockShare(CURL*, curl_lock_data lock_data, void* ctx) { - switch (lock_data) { - case CURL_LOCK_DATA_SHARE: - reinterpret_cast(ctx)->_mutexShare.unlock(); - break; - case CURL_LOCK_DATA_DNS: - reinterpret_cast(ctx)->_mutexDns.unlock(); - break; - case CURL_LOCK_DATA_SSL_SESSION: - reinterpret_cast(ctx)->_mutexSSLSession.unlock(); - break; - case CURL_LOCK_DATA_CONNECT: - reinterpret_cast(ctx)->_mutexConnect.unlock(); - break; - default: - fassert(5185802, "Unsupported curl unlock type"); - } - } - private: bool _initialized = false; - CURLSH* _share = nullptr; - Mutex _mutexDns = MONGO_MAKE_LATCH("CurlLibraryManager::ShareDns"); - Mutex _mutexConnect = MONGO_MAKE_LATCH("CurlLibraryManager::ShareConnect"); - Mutex _mutexSSLSession = MONGO_MAKE_LATCH("CurlLibraryManager::ShareSSLSession"); - Mutex _mutexShare = MONGO_MAKE_LATCH("CurlLibraryManager::ShareLock"); } curlLibraryManager; /** @@ -226,7 +189,7 @@ struct CurlEasyCleanup { } } }; -using CurlHandle = std::unique_ptr; +using CurlEasyHandle = std::unique_ptr; struct CurlSlistFreeAll { void operator()(curl_slist* list) { @@ -237,99 +200,490 @@ struct CurlSlistFreeAll { }; using CurlSlist = std::unique_ptr; -class CurlHttpClient : public HttpClient { -public: - CurlHttpClient() { - // Initialize a base handle with common settings. - // Methods like requireHTTPS() will operate on this - // base handle. - _handle.reset(curl_easy_init()); - uassert(ErrorCodes::InternalError, "Curl initialization failed", _handle); - - curl_easy_setopt(_handle.get(), CURLOPT_CONNECTTIMEOUT, longSeconds(kConnectionTimeout)); - curl_easy_setopt(_handle.get(), CURLOPT_FOLLOWLOCATION, 0); - curl_easy_setopt(_handle.get(), CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1); - curl_easy_setopt(_handle.get(), CURLOPT_NOSIGNAL, 1); - curl_easy_setopt(_handle.get(), CURLOPT_PROTOCOLS, CURLPROTO_HTTPS); + +long longSeconds(Seconds tm) { + return static_cast(durationCount(tm)); +} + +CurlEasyHandle createCurlEasyHandle(Protocols protocol) { + CurlEasyHandle handle(curl_easy_init()); + uassert(ErrorCodes::InternalError, "Curl initialization failed", handle); + + curl_easy_setopt(handle.get(), CURLOPT_CONNECTTIMEOUT, longSeconds(kConnectionTimeout)); + curl_easy_setopt(handle.get(), CURLOPT_FOLLOWLOCATION, 0); + curl_easy_setopt(handle.get(), CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1); + curl_easy_setopt(handle.get(), CURLOPT_NOSIGNAL, 1); + curl_easy_setopt(handle.get(), CURLOPT_PROTOCOLS, CURLPROTO_HTTPS); #ifdef CURLOPT_TCP_KEEPALIVE - curl_easy_setopt(_handle.get(), CURLOPT_TCP_KEEPALIVE, 1); + curl_easy_setopt(handle.get(), CURLOPT_TCP_KEEPALIVE, 1); #endif - curl_easy_setopt(_handle.get(), CURLOPT_TIMEOUT, longSeconds(kTotalRequestTimeout)); + curl_easy_setopt(handle.get(), CURLOPT_TIMEOUT, longSeconds(kTotalRequestTimeout)); #if LIBCURL_VERSION_NUM > 0x072200 - // Requires >= 7.34.0 - curl_easy_setopt(_handle.get(), CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1_2); + // Requires >= 7.34.0 + curl_easy_setopt(handle.get(), CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1_2); #endif - curl_easy_setopt(_handle.get(), CURLOPT_WRITEFUNCTION, WriteMemoryCallback); - // TODO: CURLOPT_EXPECT_100_TIMEOUT_MS? - // TODO: consider making this configurable - // curl_easy_setopt(_handle.get(), CURLOPT_VERBOSE, 1); - // curl_easy_setopt(_handle.get(), CURLOPT_DEBUGFUNCTION , ???); +#if LIBCURL_VERSION_NUM > 0x073400 + // Requires >= 7.52.0 + curl_easy_setopt(handle.get(), CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1_3); +#endif + + curl_easy_setopt(handle.get(), CURLOPT_WRITEFUNCTION, WriteMemoryCallback); + + if (protocol == Protocols::kHttpOrHttps) { + curl_easy_setopt(handle.get(), CURLOPT_PROTOCOLS, CURLPROTO_HTTPS | CURLPROTO_HTTP); + } else { + curl_easy_setopt(handle.get(), CURLOPT_PROTOCOLS, CURLPROTO_HTTPS); } - ~CurlHttpClient() final = default; + // TODO: CURLOPT_EXPECT_100_TIMEOUT_MS? + // TODO: consider making this configurable, defaults to stderr + // curl_easy_setopt(handle.get(), CURLOPT_VERBOSE, 1); + // curl_easy_setopt(_handle.get(), CURLOPT_DEBUGFUNCTION , ???); - void allowInsecureHTTP(bool allow) final { - if (allow) { - curl_easy_setopt(_handle.get(), CURLOPT_PROTOCOLS, CURLPROTO_HTTPS | CURLPROTO_HTTP); - } else { - curl_easy_setopt(_handle.get(), CURLOPT_PROTOCOLS, CURLPROTO_HTTPS); + return handle; +} + +ConnectionPool::Options makePoolOptions(Seconds timeout) { + ConnectionPool::Options opts; + opts.refreshTimeout = timeout; + opts.refreshRequirement = Seconds(60); + opts.hostTimeout = Seconds(120); + return opts; +} + +/* + * This implements the timer interface for the ConnectionPool. + * Timers will be expired in order on a single background thread. + */ +class CurlHandleTimer : public ConnectionPool::TimerInterface { +public: + explicit CurlHandleTimer(ClockSource* clockSource, std::shared_ptr scheduler) + : _clockSource(clockSource), _scheduler(std::move(scheduler)), _handle(nullptr) {} + + virtual ~CurlHandleTimer() { + if (_handle) { + _handle->cancel().ignore(); + } + } + + void setTimeout(Milliseconds timeout, TimeoutCallback cb) final { + auto res = _scheduler->alarmFromNow(timeout); + _handle = std::move(res.handle); + + std::move(res.future).getAsync([cb](Status status) { + if (status == ErrorCodes::CallbackCanceled) { + return; + } + + fassert(5413901, status); + cb(); + }); + } + + void cancelTimeout() final { + auto handle = std::move(_handle); + if (handle) { + handle->cancel().ignore(); + } + } + + Date_t now() final { + return _clockSource->now(); + } + +private: + ClockSource* const _clockSource; + std::shared_ptr _scheduler; + AlarmScheduler::SharedHandle _handle; +}; + +/** + * Type factory that manages the curl connection pool + */ +class CurlHandleTypeFactory : public executor::ConnectionPool::DependentTypeFactoryInterface { +public: + CurlHandleTypeFactory() + : _clockSource(SystemClockSource::get()), + _executor(std::make_shared(_makeThreadPoolOptions())), + _timerScheduler(std::make_shared(_clockSource)), + _timerRunner({_timerScheduler}) {} + + std::shared_ptr makeConnection(const HostAndPort&, + transport::ConnectSSLMode, + size_t generation) final; + + std::shared_ptr makeTimer() final { + _start(); + return std::make_shared(_clockSource, _timerScheduler); + } + + const std::shared_ptr& getExecutor() final { + return _executor; + } + + Date_t now() final { + return _clockSource->now(); + } + + void shutdown() final { + if (!_running) { + return; + } + _timerRunner.shutdown(); + + auto pool = checked_pointer_cast(_executor); + pool->shutdown(); + pool->join(); + } + +private: + void _start() { + if (_running) + return; + _timerRunner.start(); + + auto pool = checked_pointer_cast(_executor); + pool->startup(); + + _running = true; + } + + static inline ThreadPool::Options _makeThreadPoolOptions() { + ThreadPool::Options opts; + opts.poolName = "CurlConnPool"; + opts.maxThreads = ThreadPool::Options::kUnlimited; + opts.maxIdleThreadAge = Seconds{5}; + + return opts; + } + +private: + ClockSource* const _clockSource; + std::shared_ptr _executor; + std::shared_ptr _timerScheduler; + bool _running = false; + AlarmRunnerBackgroundThread _timerRunner; +}; + + +/** + * Curl handle that is managed by a connection pool + * + * The connection pool does not manage actual connections, just handles. Curl has automatica + * reconnect logic if it gets disconnected. Also, HTTP connections are cheaper then MongoDB. + */ +class PooledCurlHandle : public ConnectionPool::ConnectionInterface, + public std::enable_shared_from_this { +public: + PooledCurlHandle(std::shared_ptr executor, + ClockSource* clockSource, + const std::shared_ptr& alarmScheduler, + const HostAndPort& host, + Protocols protocol, + size_t generation) + : ConnectionInterface(generation), + _executor(std::move(executor)), + _alarmScheduler(alarmScheduler), + _timer(clockSource, alarmScheduler), + _target(host), + _protocol(protocol) {} + + + virtual ~PooledCurlHandle() = default; + + const HostAndPort& getHostAndPort() const final { + return _target; + } + + // This cannot block under any circumstances because the ConnectionPool is holding + // a mutex while calling isHealthy(). Since we don't have a good way of knowing whether + // the connection is healthy, just return true here. + bool isHealthy() final { + return true; + } + + void setTimeout(Milliseconds timeout, TimeoutCallback cb) final { + _timer.setTimeout(timeout, cb); + } + + void cancelTimeout() final { + _timer.cancelTimeout(); + } + + Date_t now() final { + return _timer.now(); + } + + transport::ConnectSSLMode getSslMode() const final { + return mapProtocolToSSLMode(_protocol); + } + + CURL* get() { + return _handle.get(); + } + +private: + void setup(Milliseconds timeout, SetupCallback cb) final; + + void refresh(Milliseconds timeout, RefreshCallback cb) final; + +private: + std::shared_ptr _executor; + std::shared_ptr _alarmScheduler; + CurlHandleTimer _timer; + HostAndPort _target; + + Protocols _protocol; + CurlEasyHandle _handle; +}; + +void PooledCurlHandle::setup(Milliseconds timeout, SetupCallback cb) { + auto anchor = shared_from_this(); + _executor->schedule([this, anchor, cb = std::move(cb)](auto execStatus) { + if (!execStatus.isOK()) { + cb(this, execStatus); + return; + } + + _handle = createCurlEasyHandle(_protocol); + + cb(this, Status::OK()); + }); +} + +void PooledCurlHandle::refresh(Milliseconds timeout, RefreshCallback cb) { + auto anchor = shared_from_this(); + _executor->schedule([this, anchor, cb = std::move(cb)](auto execStatus) { + if (!execStatus.isOK()) { + cb(this, execStatus); + return; } + + // Tell the connection pool that it was a success. Curl reconnects seamlessly behind the + // scenes and there is no reliable way to test if the connection is still alive in a + // connection agnostic way. HTTP verbs like HEAD are not uniformly supported. + // + // The connection pool simply needs to prune handles on a timer for us. + indicateSuccess(); + indicateUsed(); + + cb(this, Status::OK()); + }); +} + +std::shared_ptr +CurlHandleTypeFactory::makeConnection(const HostAndPort& host, + transport::ConnectSSLMode sslMode, + size_t generation) { + _start(); + + return std::make_shared( + _executor, _clockSource, _timerScheduler, host, mapSSLModeToProtocol(sslMode), generation); +} + +/** + * Handle that manages connection pool semantics and returns handle to connection pool in + * destructor. + * + * Caller must call indiciateSuccess if they want the handle to be reused. + */ +class CurlHandle { +public: + CurlHandle(executor::ConnectionPool::ConnectionHandle handle, CURL* curlHandle) + : _poolHandle(std::move(handle)), _handle(curlHandle) {} + + ~CurlHandle() { + if (!_success) { + _poolHandle->indicateFailure( + Status(ErrorCodes::HostUnreachable, "unknown curl handle failure")); + } + } + + CURL* get() { + return _handle; + } + + void indicateSuccess() { + _poolHandle->indicateSuccess(); + + // Tell the connection pool that we used the connection otherwise the pool will be believe + // the connection went idle since it is possible to checkout a connection and not actually + // use it. + _poolHandle->indicateUsed(); + + _success = true; + } + +private: + executor::ConnectionPool::ConnectionHandle _poolHandle; + bool _success = false; + + // Owned by _poolHandle + CURL* _handle; +}; + +/** + * Factory that returns curl handles managed in connection pool + */ +class CurlPool { +public: + CurlPool() + : _typeFactory(std::make_shared()), + _pool(std::make_shared( + _typeFactory, "Curl", makePoolOptions(Seconds(60)))) {} + + CurlHandle get(HostAndPort server, Protocols protocol); + +private: + std::shared_ptr _typeFactory; + std::shared_ptr _pool; +}; + +CurlHandle CurlPool::get(HostAndPort server, Protocols protocol) { + + auto sslMode = mapProtocolToSSLMode(protocol); + + auto semi = _pool->get(server, sslMode, Seconds(60)); + + StatusWith swHandle = std::move(semi).getNoThrow(); + invariant(swHandle.isOK()); + + auto curlHandle = static_cast(swHandle.getValue().get())->get(); + + return CurlHandle(std::move(swHandle.getValue()), curlHandle); +} + +HostAndPort exactHostAndPortFromUrl(StringData url) { + // Treat the URL as a host and port + // URL: http(s)?://(host):(port)/... + // + constexpr StringData slashes = "//"_sd; + auto slashesIndex = url.find(slashes); + uassert(5413902, str::stream() << "//, URL: " << url, slashesIndex != std::string::npos); + + url = url.substr(slashesIndex + slashes.size()); + if (url.find('/') != std::string::npos) { + url = url.substr(0, url.find("/")); + } + + return HostAndPort(url); +} + +/** + * The connection pool requires the ability to spawn threads which is not allowed through + * options parsing. Callers should default to HttpConnectionPool::kUse unless they are calling + * into the HttpClient before thread spawning is allowed. + */ +enum class HttpConnectionPool { + kUse, + kDoNotUse, +}; + +enum class HttpMethod { + kGET, + kPOST, + kPUT, +}; + +class CurlHttpClient final : public HttpClient { +public: + CurlHttpClient(HttpConnectionPool pool) : _pool(pool) {} + + void allowInsecureHTTP(bool allow) final { + _allowInsecure = allow; } void setHeaders(const std::vector& headers) final { // Can't set on base handle because cURL doesn't deep-dup this field // and we don't want it getting overwritten while another thread is using it. + _headers = headers; } void setTimeout(Seconds timeout) final { - curl_easy_setopt(_handle.get(), CURLOPT_TIMEOUT, longSeconds(timeout)); + _timeout = timeout; } void setConnectTimeout(Seconds timeout) final { - curl_easy_setopt(_handle.get(), CURLOPT_CONNECTTIMEOUT, longSeconds(timeout)); + _connectTimeout = timeout; } - DataBuilder get(StringData url) const final { - // Make a local copy of the base handle for this request. - CurlHandle myHandle(curl_easy_duphandle(_handle.get())); - uassert(ErrorCodes::InternalError, "Curl initialization failed", myHandle); + DataBuilder post(StringData url, ConstDataRange data) const final { + return request(HttpMethod::kPOST, url, data); + } - return doRequest(myHandle.get(), url); + DataBuilder get(StringData url) const final { + return request(HttpMethod::kGET, url, {nullptr, 0}); } - DataBuilder post(StringData url, ConstDataRange cdr) const final { - // Make a local copy of the base handle for this request. - CurlHandle myHandle(curl_easy_duphandle(_handle.get())); - uassert(ErrorCodes::InternalError, "Curl initialization failed", myHandle); + DataBuilder request(HttpMethod method, + StringData url, + ConstDataRange cdr = {nullptr, 0}) const { + auto protocol = _allowInsecure ? Protocols::kHttpOrHttps : Protocols::kHttpsOnly; + if (_pool == HttpConnectionPool::kUse) { + static CurlPool factory; - curl_easy_setopt(myHandle.get(), CURLOPT_POST, 1); + auto server = exactHostAndPortFromUrl(url); - ConstDataRangeCursor cdrc(cdr); - curl_easy_setopt(myHandle.get(), CURLOPT_READFUNCTION, ReadMemoryCallback); - curl_easy_setopt(myHandle.get(), CURLOPT_READDATA, &cdrc); - curl_easy_setopt(myHandle.get(), CURLOPT_POSTFIELDSIZE, (long)cdrc.length()); + CurlHandle handle(factory.get(server, protocol)); + auto reply = request(handle.get(), method, url, cdr); + + // indidicateFailure will be called if indicateSuccess is not called. + handle.indicateSuccess(); + return reply; - return doRequest(myHandle.get(), url); + } else { + // Make a request with a non-pooled handle. This is needed during server startup when + // thread spawning is not allowed which is required by the thread pool. + auto handle = createCurlEasyHandle(protocol); + return request(handle.get(), method, url, cdr); + } } private: - /** - * Helper for use with curl_easy_setopt which takes a vararg list, - * and expects a long, not the long long durationCount() returns. - */ - long longSeconds(Seconds tm) { - return static_cast(durationCount(tm)); - } + DataBuilder request(CURL* handle, HttpMethod method, StringData url, ConstDataRange cdr) const { + uassert(ErrorCodes::InternalError, "Curl initialization failed", handle); + + curl_easy_setopt(handle, CURLOPT_TIMEOUT, longSeconds(_timeout)); + + curl_easy_setopt(handle, CURLOPT_CONNECTTIMEOUT, longSeconds(_connectTimeout)); + + ConstDataRangeCursor cdrc(cdr); + switch (method) { + case HttpMethod::kGET: + uassert(ErrorCodes::BadValue, + "Request body not permitted with GET requests", + cdr.length() == 0); + // Per https://curl.se/libcurl/c/CURLOPT_POST.html + // We need to reset the type of request we want to make when reusing the request + // curl_easy_setopt(handle, CURLOPT_HTTPGET, 1); + break; + case HttpMethod::kPOST: + curl_easy_setopt(handle, CURLOPT_PUT, 0); + curl_easy_setopt(handle, CURLOPT_POST, 1); + + curl_easy_setopt(handle, CURLOPT_READFUNCTION, ReadMemoryCallback); + curl_easy_setopt(handle, CURLOPT_READDATA, &cdrc); + curl_easy_setopt(handle, CURLOPT_POSTFIELDSIZE, (long)cdrc.length()); + break; + case HttpMethod::kPUT: + curl_easy_setopt(handle, CURLOPT_POST, 0); + curl_easy_setopt(handle, CURLOPT_PUT, 1); + + curl_easy_setopt(handle, CURLOPT_READFUNCTION, ReadMemoryCallback); + curl_easy_setopt(handle, CURLOPT_READDATA, &cdrc); + curl_easy_setopt(handle, CURLOPT_INFILESIZE_LARGE, (long)cdrc.length()); + break; + default: + MONGO_UNREACHABLE; + } - DataBuilder doRequest(CURL* handle, StringData url) const { const auto urlString = url.toString(); curl_easy_setopt(handle, CURLOPT_URL, urlString.c_str()); - curl_easy_setopt(handle, CURLOPT_SHARE, curlLibraryManager.getShareHandle()); - DataBuilder dataBuilder(4096); + DataBuilder dataBuilder(4096), headerBuilder(4096); curl_easy_setopt(handle, CURLOPT_WRITEDATA, &dataBuilder); curl_slist* chunk = curl_slist_append(nullptr, "Connection: keep-alive"); @@ -352,51 +706,67 @@ private: << curl_easy_strerror(result), result == CURLE_OK); - uassert(ErrorCodes::OperationFailed, - str::stream() << "Unexpected http status code from server: " << statusCode, - statusCode == 200); return dataBuilder; } private: - CurlHandle _handle; std::vector _headers; + + HttpConnectionPool _pool; + + bool _allowInsecure{false}; + Seconds _timeout; + Seconds _connectTimeout; }; -} // namespace +class HttpClientProviderImpl : public HttpClientProvider { +public: + HttpClientProviderImpl() { + registerHTTPClientProvider(this); + } -// Transitional API used by blockstore to trigger libcurl init -// until it's been migrated to use the HTTPClient API. -Status curlLibraryManager_initialize() { - return curlLibraryManager.initialize(); -} + std::unique_ptr create() final { + uassertStatusOK(curlLibraryManager.initialize()); + return std::make_unique(HttpConnectionPool::kUse); + } -std::unique_ptr HttpClient::create() { - uassertStatusOK(curlLibraryManager.initialize()); - return std::make_unique(); -} + std::unique_ptr createWithoutConnectionPool() final { + uassertStatusOK(curlLibraryManager.initialize()); + return std::make_unique(HttpConnectionPool::kDoNotUse); + } -BSONObj HttpClient::getServerStatus() { + BSONObj getServerStatus() final { - BSONObjBuilder info; - info.append("type", "curl"); + BSONObjBuilder info; + info.append("type", "curl"); - { - BSONObjBuilder v(info.subobjStart("compiled")); - v.append("version", LIBCURL_VERSION); - v.append("version_num", LIBCURL_VERSION_NUM); - } + { + BSONObjBuilder v(info.subobjStart("compiled")); + v.append("version", LIBCURL_VERSION); + v.append("version_num", LIBCURL_VERSION_NUM); + } - { - auto* curl_info = curl_version_info(CURLVERSION_NOW); + { + auto* curl_info = curl_version_info(CURLVERSION_NOW); - BSONObjBuilder v(info.subobjStart("running")); - v.append("version", curl_info->version); - v.append("version_num", curl_info->version_num); + BSONObjBuilder v(info.subobjStart("running")); + v.append("version", curl_info->version); + v.append("version_num", static_cast(curl_info->version_num)); + } + + return info.obj(); } - return info.obj(); +} provider; + +} // namespace + +// Transitional API used by blockstore to trigger libcurl init +// until it's been migrated to use the HTTPClient API. +Status curlLibraryManager_initialize() { + return curlLibraryManager.initialize(); } + } // namespace mongo diff --git a/src/mongo/util/net/http_client_none.cpp b/src/mongo/util/net/http_client_none.cpp index 6a3d11e6b1a..690748807bf 100644 --- a/src/mongo/util/net/http_client_none.cpp +++ b/src/mongo/util/net/http_client_none.cpp @@ -32,12 +32,29 @@ namespace mongo { -std::unique_ptr HttpClient::create() { - return nullptr; -} +namespace { -BSONObj HttpClient::getServerStatus() { - return BSONObj(); -} +class HttpClientProviderImpl : public HttpClientProvider { +public: + HttpClientProviderImpl() { + registerHTTPClientProvider(this); + } + std::unique_ptr create() final { + return nullptr; + } + + std::unique_ptr createWithoutConnectionPool() final { + return nullptr; + } + + /** + * Content for ServerStatus http_client section. + */ + BSONObj getServerStatus() final { + return BSONObj(); + } +} provider; + +} // namespace } // namespace mongo diff --git a/src/mongo/util/net/http_client_winhttp.cpp b/src/mongo/util/net/http_client_winhttp.cpp index f774e7387e4..f70567b9757 100644 --- a/src/mongo/util/net/http_client_winhttp.cpp +++ b/src/mongo/util/net/http_client_winhttp.cpp @@ -300,15 +300,26 @@ private: Seconds _timeout = kTotalRequestTimeout; }; -} // namespace +class HttpClientProviderImpl : public HttpClientProvider { +public: + HttpClientProviderImpl() { + registerHTTPClientProvider(this); + } -std::unique_ptr HttpClient::create() { - return std::make_unique(); -} + std::unique_ptr create() final { + return std::make_unique(); + } -BSONObj HttpClient::getServerStatus() { - return BSON("type" - << "winhttp"); -} + std::unique_ptr createWithoutConnectionPool() final { + return std::make_unique(); + } + + BSONObj getServerStatus() final { + return BSON("type" + << "winhttp"); + } +} provider; + +} // namespace } // namespace mongo diff --git a/src/mongo/util/options_parser/options_parser.cpp b/src/mongo/util/options_parser/options_parser.cpp index aa04f99d94f..f92358ef8de 100644 --- a/src/mongo/util/options_parser/options_parser.cpp +++ b/src/mongo/util/options_parser/options_parser.cpp @@ -540,7 +540,7 @@ private: std::string runYAMLRestExpansion(StringData url, Seconds timeout) { - auto client = HttpClient::create(); + auto client = HttpClient::createWithoutConnectionPool(); uassert( ErrorCodes::OperationFailed, "No HTTP Client available in this build of MongoDB", client); -- cgit v1.2.1