summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMark Benvenuto <mark.benvenuto@mongodb.com>2021-02-22 18:35:46 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-03-03 15:39:33 +0000
commit1e3803204cad03f72438df0613bfd87506755695 (patch)
tree12f0bb532df1cdb8717ca3cf5c33fd759e0f62e9
parentf21c4ba0fc9a5d03d063d197c66b93870a244e4d (diff)
downloadmongo-1e3803204cad03f72438df0613bfd87506755695.tar.gz
SERVER-54139 Use a connection pool for curl handles
(cherry picked from commit 137bd47d9138fe9bbb0e1fc81dfbbce2cfd35a96)
-rw-r--r--src/mongo/SConscript3
-rw-r--r--src/mongo/util/net/SConscript22
-rw-r--r--src/mongo/util/net/http_client.cpp61
-rw-r--r--src/mongo/util/net/http_client.h39
-rw-r--r--src/mongo/util/net/http_client_curl.cpp692
-rw-r--r--src/mongo/util/net/http_client_none.cpp29
-rw-r--r--src/mongo/util/net/http_client_winhttp.cpp27
-rw-r--r--src/mongo/util/options_parser/options_parser.cpp2
8 files changed, 697 insertions, 178 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript
index 6c78164c965..69fae0e0156 100644
--- a/src/mongo/SConscript
+++ b/src/mongo/SConscript
@@ -436,6 +436,7 @@ mongod = env.Program(
'util/elapsed_tracker',
'util/fail_point',
'util/latch_analyzer' if get_option('use-diagnostic-latches') == 'on' else [],
+ 'util/net/http_client_impl',
'util/net/network',
'util/ntservice',
'util/options_parser/options_parser_init',
@@ -541,6 +542,7 @@ mongos = env.Program(
'util/clock_sources',
'util/fail_point',
'util/latch_analyzer' if get_option('use-diagnostic-latches') == 'on' else [],
+ 'util/net/http_client_impl',
'util/net/ssl_options_server' if get_option('ssl') == 'on' else '',
'util/ntservice',
'util/version_impl',
@@ -650,6 +652,7 @@ if not has_option('noshell') and usemozjs:
'shell/shell_options_register',
'transport/message_compressor_options_client',
"$BUILD_DIR/mongo/client/connection_string",
+ '$BUILD_DIR/mongo/util/net/http_client_impl',
'$BUILD_DIR/mongo/util/net/ssl_options_client' if get_option('ssl') == 'on' else '',
],
INSTALL_ALIAS=[
diff --git a/src/mongo/util/net/SConscript b/src/mongo/util/net/SConscript
index ed24016b30d..24412313a97 100644
--- a/src/mongo/util/net/SConscript
+++ b/src/mongo/util/net/SConscript
@@ -170,9 +170,20 @@ else:
],
)
+env.Library(
+ target='http_client',
+ source=[
+ 'http_client.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/base',
+ ],
+)
+
+
if http_client == "off":
env.Library(
- target='http_client',
+ target='http_client_impl',
source=[
'http_client_none.cpp',
],
@@ -182,12 +193,19 @@ if http_client == "off":
)
else:
env.Library(
- target='http_client',
+ target='http_client_impl',
source=[
'http_client_winhttp.cpp' if env.TargetOSIs('windows') else 'http_client_curl.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
+ 'http_client',
+ ],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/executor/connection_pool_executor',
+ '$BUILD_DIR/mongo/util/alarm',
+ '$BUILD_DIR/mongo/util/concurrency/thread_pool',
+ 'network',
],
SYSLIBDEPS=[
'winhttp' if env.TargetOSIs('windows') else 'curl',
diff --git a/src/mongo/util/net/http_client.cpp b/src/mongo/util/net/http_client.cpp
new file mode 100644
index 00000000000..903385e3b4e
--- /dev/null
+++ b/src/mongo/util/net/http_client.cpp
@@ -0,0 +1,61 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/util/net/http_client.h"
+#include "mongo/base/status.h"
+
+namespace mongo {
+
+namespace {
+HttpClientProvider* _factory{nullptr};
+}
+
+HttpClientProvider::~HttpClientProvider() {}
+
+void registerHTTPClientProvider(HttpClientProvider* factory) {
+ invariant(_factory == nullptr);
+ _factory = factory;
+}
+
+std::unique_ptr<HttpClient> HttpClient::create() {
+ invariant(_factory != nullptr);
+ return _factory->create();
+}
+
+std::unique_ptr<HttpClient> HttpClient::createWithoutConnectionPool() {
+ invariant(_factory != nullptr);
+ return _factory->createWithoutConnectionPool();
+}
+
+BSONObj HttpClient::getServerStatus() {
+ invariant(_factory != nullptr);
+ return _factory->getServerStatus();
+}
+
+} // namespace mongo
diff --git a/src/mongo/util/net/http_client.h b/src/mongo/util/net/http_client.h
index 9f5767b51fd..1301e5e133f 100644
--- a/src/mongo/util/net/http_client.h
+++ b/src/mongo/util/net/http_client.h
@@ -90,9 +90,48 @@ public:
static std::unique_ptr<HttpClient> create();
/**
+ * Factory method provided by client implementation.
+ *
+ * The connection pool requires the ability to spawn threads which is not allowed through
+ * options parsing. Callers should default to create() unless they are calling into the
+ * HttpClient before thread spawning is allowed.
+ */
+ static std::unique_ptr<HttpClient> createWithoutConnectionPool();
+
+ /**
* Content for ServerStatus http_client section.
*/
static BSONObj getServerStatus();
};
+/**
+ * HttpClientProvider is the factory behind the HttpClient
+ *
+ * This exists as a level-of-indirection to break link graph cycles.
+ */
+class HttpClientProvider {
+public:
+ virtual ~HttpClientProvider();
+
+ /**
+ * Factory method provided by client implementation.
+ */
+ virtual std::unique_ptr<HttpClient> create() = 0;
+
+ /**
+ * Factory method provided by client implementation.
+ */
+ virtual std::unique_ptr<HttpClient> createWithoutConnectionPool() = 0;
+
+ /**
+ * Content for ServerStatus http_client section.
+ */
+ virtual BSONObj getServerStatus() = 0;
+};
+
+/**
+ * Register HTTP Client provider
+ */
+void registerHTTPClientProvider(HttpClientProvider* factory);
+
} // namespace mongo
diff --git a/src/mongo/util/net/http_client_curl.cpp b/src/mongo/util/net/http_client_curl.cpp
index 008161f6d0c..c1bb62be267 100644
--- a/src/mongo/util/net/http_client_curl.cpp
+++ b/src/mongo/util/net/http_client_curl.cpp
@@ -34,6 +34,7 @@
#include <cstddef>
#include <curl/curl.h>
#include <curl/easy.h>
+#include <memory>
#include <string>
#include "mongo/base/data_builder.h"
@@ -41,17 +42,55 @@
#include "mongo/base/data_range_cursor.h"
#include "mongo/base/init.h"
#include "mongo/base/status.h"
+#include "mongo/base/status_with.h"
#include "mongo/base/string_data.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/commands/server_status.h"
+#include "mongo/executor/connection_pool.h"
+#include "mongo/executor/connection_pool_stats.h"
#include "mongo/platform/mutex.h"
+#include "mongo/stdx/unordered_map.h"
+#include "mongo/transport/transport_layer.h"
+#include "mongo/util/alarm.h"
+#include "mongo/util/alarm_runner_background_thread.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/log.h"
+#include "mongo/util/concurrency/thread_pool.h"
+#include "mongo/util/functional.h"
+#include "mongo/util/net/hostandport.h"
#include "mongo/util/net/http_client.h"
+#include "mongo/util/processinfo.h"
+#include "mongo/util/strong_weak_finish_line.h"
+#include "mongo/util/system_clock_source.h"
+#include "mongo/util/timer.h"
+
namespace mongo {
namespace {
+using namespace executor;
+
+/**
+ * Curl Protocol configuration supported by HttpClient
+ */
+enum class Protocols {
+ // Allow either http or https, unsafe
+ kHttpOrHttps,
+
+ // Allow https only
+ kHttpsOnly,
+};
+
+// Connection pool talk in terms of Mongo's SSL configuration.
+// These functions provide a way to map back and forth between them.
+transport::ConnectSSLMode mapProtocolToSSLMode(Protocols protocol) {
+ return (protocol == Protocols::kHttpsOnly) ? transport::kEnableSSL : transport::kDisableSSL;
+}
+
+Protocols mapSSLModeToProtocol(transport::ConnectSSLMode sslMode) {
+ return (sslMode == transport::kEnableSSL) ? Protocols::kHttpsOnly : Protocols::kHttpOrHttps;
+}
class CurlLibraryManager {
public:
@@ -65,9 +104,6 @@ public:
CurlLibraryManager() = default;
~CurlLibraryManager() {
- if (_share) {
- curl_share_cleanup(_share);
- }
// Ordering matters: curl_global_cleanup() must happen last.
if (_initialized) {
curl_global_cleanup();
@@ -80,18 +116,9 @@ public:
return status;
}
- status = _initializeShare();
- if (!status.isOK()) {
- return status;
- }
-
return Status::OK();
}
- CURLSH* getShareHandle() const {
- return _share;
- }
-
private:
Status _initializeGlobal() {
if (_initialized) {
@@ -113,73 +140,9 @@ private:
return Status::OK();
}
- Status _initializeShare() {
- invariant(_initialized);
- if (_share) {
- return Status::OK();
- }
-
- _share = curl_share_init();
- curl_share_setopt(_share, CURLSHOPT_SHARE, CURL_LOCK_DATA_DNS);
- curl_share_setopt(_share, CURLSHOPT_SHARE, CURL_LOCK_DATA_CONNECT);
- curl_share_setopt(_share, CURLSHOPT_SHARE, CURL_LOCK_DATA_SSL_SESSION);
- curl_share_setopt(_share, CURLSHOPT_USERDATA, this);
- curl_share_setopt(_share, CURLSHOPT_LOCKFUNC, _lockShare);
- curl_share_setopt(_share, CURLSHOPT_UNLOCKFUNC, _unlockShare);
-
- return Status::OK();
- }
-
-
- static void _lockShare(CURL*, curl_lock_data lock_data, curl_lock_access, void* ctx) {
- // curl_lock_access maps to shared and single access, i.e. a read and exclusive lock
- // except the unlock method does not have curl_lock_access as a parameter so we map
- // all lock requests to regular mutexes
- switch (lock_data) {
- case CURL_LOCK_DATA_SHARE:
- reinterpret_cast<CurlLibraryManager*>(ctx)->_mutexShare.lock();
- break;
- case CURL_LOCK_DATA_DNS:
- reinterpret_cast<CurlLibraryManager*>(ctx)->_mutexDns.lock();
- break;
- case CURL_LOCK_DATA_SSL_SESSION:
- reinterpret_cast<CurlLibraryManager*>(ctx)->_mutexSSLSession.lock();
- break;
- case CURL_LOCK_DATA_CONNECT:
- reinterpret_cast<CurlLibraryManager*>(ctx)->_mutexConnect.lock();
- break;
- default:
- fassert(5185801, "Unsupported curl lock type");
- }
- }
-
- static void _unlockShare(CURL*, curl_lock_data lock_data, void* ctx) {
- switch (lock_data) {
- case CURL_LOCK_DATA_SHARE:
- reinterpret_cast<CurlLibraryManager*>(ctx)->_mutexShare.unlock();
- break;
- case CURL_LOCK_DATA_DNS:
- reinterpret_cast<CurlLibraryManager*>(ctx)->_mutexDns.unlock();
- break;
- case CURL_LOCK_DATA_SSL_SESSION:
- reinterpret_cast<CurlLibraryManager*>(ctx)->_mutexSSLSession.unlock();
- break;
- case CURL_LOCK_DATA_CONNECT:
- reinterpret_cast<CurlLibraryManager*>(ctx)->_mutexConnect.unlock();
- break;
- default:
- fassert(5185802, "Unsupported curl unlock type");
- }
- }
-
private:
bool _initialized = false;
- CURLSH* _share = nullptr;
- Mutex _mutexDns = MONGO_MAKE_LATCH("CurlLibraryManager::ShareDns");
- Mutex _mutexConnect = MONGO_MAKE_LATCH("CurlLibraryManager::ShareConnect");
- Mutex _mutexSSLSession = MONGO_MAKE_LATCH("CurlLibraryManager::ShareSSLSession");
- Mutex _mutexShare = MONGO_MAKE_LATCH("CurlLibraryManager::ShareLock");
} curlLibraryManager;
/**
@@ -226,7 +189,7 @@ struct CurlEasyCleanup {
}
}
};
-using CurlHandle = std::unique_ptr<CURL, CurlEasyCleanup>;
+using CurlEasyHandle = std::unique_ptr<CURL, CurlEasyCleanup>;
struct CurlSlistFreeAll {
void operator()(curl_slist* list) {
@@ -237,99 +200,490 @@ struct CurlSlistFreeAll {
};
using CurlSlist = std::unique_ptr<curl_slist, CurlSlistFreeAll>;
-class CurlHttpClient : public HttpClient {
-public:
- CurlHttpClient() {
- // Initialize a base handle with common settings.
- // Methods like requireHTTPS() will operate on this
- // base handle.
- _handle.reset(curl_easy_init());
- uassert(ErrorCodes::InternalError, "Curl initialization failed", _handle);
-
- curl_easy_setopt(_handle.get(), CURLOPT_CONNECTTIMEOUT, longSeconds(kConnectionTimeout));
- curl_easy_setopt(_handle.get(), CURLOPT_FOLLOWLOCATION, 0);
- curl_easy_setopt(_handle.get(), CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1);
- curl_easy_setopt(_handle.get(), CURLOPT_NOSIGNAL, 1);
- curl_easy_setopt(_handle.get(), CURLOPT_PROTOCOLS, CURLPROTO_HTTPS);
+
+long longSeconds(Seconds tm) {
+ return static_cast<long>(durationCount<Seconds>(tm));
+}
+
+CurlEasyHandle createCurlEasyHandle(Protocols protocol) {
+ CurlEasyHandle handle(curl_easy_init());
+ uassert(ErrorCodes::InternalError, "Curl initialization failed", handle);
+
+ curl_easy_setopt(handle.get(), CURLOPT_CONNECTTIMEOUT, longSeconds(kConnectionTimeout));
+ curl_easy_setopt(handle.get(), CURLOPT_FOLLOWLOCATION, 0);
+ curl_easy_setopt(handle.get(), CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1);
+ curl_easy_setopt(handle.get(), CURLOPT_NOSIGNAL, 1);
+ curl_easy_setopt(handle.get(), CURLOPT_PROTOCOLS, CURLPROTO_HTTPS);
#ifdef CURLOPT_TCP_KEEPALIVE
- curl_easy_setopt(_handle.get(), CURLOPT_TCP_KEEPALIVE, 1);
+ curl_easy_setopt(handle.get(), CURLOPT_TCP_KEEPALIVE, 1);
#endif
- curl_easy_setopt(_handle.get(), CURLOPT_TIMEOUT, longSeconds(kTotalRequestTimeout));
+ curl_easy_setopt(handle.get(), CURLOPT_TIMEOUT, longSeconds(kTotalRequestTimeout));
#if LIBCURL_VERSION_NUM > 0x072200
- // Requires >= 7.34.0
- curl_easy_setopt(_handle.get(), CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1_2);
+ // Requires >= 7.34.0
+ curl_easy_setopt(handle.get(), CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1_2);
#endif
- curl_easy_setopt(_handle.get(), CURLOPT_WRITEFUNCTION, WriteMemoryCallback);
- // TODO: CURLOPT_EXPECT_100_TIMEOUT_MS?
- // TODO: consider making this configurable
- // curl_easy_setopt(_handle.get(), CURLOPT_VERBOSE, 1);
- // curl_easy_setopt(_handle.get(), CURLOPT_DEBUGFUNCTION , ???);
+#if LIBCURL_VERSION_NUM > 0x073400
+ // Requires >= 7.52.0
+ curl_easy_setopt(handle.get(), CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1_3);
+#endif
+
+ curl_easy_setopt(handle.get(), CURLOPT_WRITEFUNCTION, WriteMemoryCallback);
+
+ if (protocol == Protocols::kHttpOrHttps) {
+ curl_easy_setopt(handle.get(), CURLOPT_PROTOCOLS, CURLPROTO_HTTPS | CURLPROTO_HTTP);
+ } else {
+ curl_easy_setopt(handle.get(), CURLOPT_PROTOCOLS, CURLPROTO_HTTPS);
}
- ~CurlHttpClient() final = default;
+ // TODO: CURLOPT_EXPECT_100_TIMEOUT_MS?
+ // TODO: consider making this configurable, defaults to stderr
+ // curl_easy_setopt(handle.get(), CURLOPT_VERBOSE, 1);
+ // curl_easy_setopt(_handle.get(), CURLOPT_DEBUGFUNCTION , ???);
- void allowInsecureHTTP(bool allow) final {
- if (allow) {
- curl_easy_setopt(_handle.get(), CURLOPT_PROTOCOLS, CURLPROTO_HTTPS | CURLPROTO_HTTP);
- } else {
- curl_easy_setopt(_handle.get(), CURLOPT_PROTOCOLS, CURLPROTO_HTTPS);
+ return handle;
+}
+
+ConnectionPool::Options makePoolOptions(Seconds timeout) {
+ ConnectionPool::Options opts;
+ opts.refreshTimeout = timeout;
+ opts.refreshRequirement = Seconds(60);
+ opts.hostTimeout = Seconds(120);
+ return opts;
+}
+
+/*
+ * This implements the timer interface for the ConnectionPool.
+ * Timers will be expired in order on a single background thread.
+ */
+class CurlHandleTimer : public ConnectionPool::TimerInterface {
+public:
+ explicit CurlHandleTimer(ClockSource* clockSource, std::shared_ptr<AlarmScheduler> scheduler)
+ : _clockSource(clockSource), _scheduler(std::move(scheduler)), _handle(nullptr) {}
+
+ virtual ~CurlHandleTimer() {
+ if (_handle) {
+ _handle->cancel().ignore();
+ }
+ }
+
+ void setTimeout(Milliseconds timeout, TimeoutCallback cb) final {
+ auto res = _scheduler->alarmFromNow(timeout);
+ _handle = std::move(res.handle);
+
+ std::move(res.future).getAsync([cb](Status status) {
+ if (status == ErrorCodes::CallbackCanceled) {
+ return;
+ }
+
+ fassert(5413901, status);
+ cb();
+ });
+ }
+
+ void cancelTimeout() final {
+ auto handle = std::move(_handle);
+ if (handle) {
+ handle->cancel().ignore();
+ }
+ }
+
+ Date_t now() final {
+ return _clockSource->now();
+ }
+
+private:
+ ClockSource* const _clockSource;
+ std::shared_ptr<AlarmScheduler> _scheduler;
+ AlarmScheduler::SharedHandle _handle;
+};
+
+/**
+ * Type factory that manages the curl connection pool
+ */
+class CurlHandleTypeFactory : public executor::ConnectionPool::DependentTypeFactoryInterface {
+public:
+ CurlHandleTypeFactory()
+ : _clockSource(SystemClockSource::get()),
+ _executor(std::make_shared<ThreadPool>(_makeThreadPoolOptions())),
+ _timerScheduler(std::make_shared<AlarmSchedulerPrecise>(_clockSource)),
+ _timerRunner({_timerScheduler}) {}
+
+ std::shared_ptr<ConnectionPool::ConnectionInterface> makeConnection(const HostAndPort&,
+ transport::ConnectSSLMode,
+ size_t generation) final;
+
+ std::shared_ptr<ConnectionPool::TimerInterface> makeTimer() final {
+ _start();
+ return std::make_shared<CurlHandleTimer>(_clockSource, _timerScheduler);
+ }
+
+ const std::shared_ptr<OutOfLineExecutor>& getExecutor() final {
+ return _executor;
+ }
+
+ Date_t now() final {
+ return _clockSource->now();
+ }
+
+ void shutdown() final {
+ if (!_running) {
+ return;
+ }
+ _timerRunner.shutdown();
+
+ auto pool = checked_pointer_cast<ThreadPool>(_executor);
+ pool->shutdown();
+ pool->join();
+ }
+
+private:
+ void _start() {
+ if (_running)
+ return;
+ _timerRunner.start();
+
+ auto pool = checked_pointer_cast<ThreadPool>(_executor);
+ pool->startup();
+
+ _running = true;
+ }
+
+ static inline ThreadPool::Options _makeThreadPoolOptions() {
+ ThreadPool::Options opts;
+ opts.poolName = "CurlConnPool";
+ opts.maxThreads = ThreadPool::Options::kUnlimited;
+ opts.maxIdleThreadAge = Seconds{5};
+
+ return opts;
+ }
+
+private:
+ ClockSource* const _clockSource;
+ std::shared_ptr<OutOfLineExecutor> _executor;
+ std::shared_ptr<AlarmScheduler> _timerScheduler;
+ bool _running = false;
+ AlarmRunnerBackgroundThread _timerRunner;
+};
+
+
+/**
+ * Curl handle that is managed by a connection pool
+ *
+ * The connection pool does not manage actual connections, just handles. Curl has automatica
+ * reconnect logic if it gets disconnected. Also, HTTP connections are cheaper then MongoDB.
+ */
+class PooledCurlHandle : public ConnectionPool::ConnectionInterface,
+ public std::enable_shared_from_this<PooledCurlHandle> {
+public:
+ PooledCurlHandle(std::shared_ptr<OutOfLineExecutor> executor,
+ ClockSource* clockSource,
+ const std::shared_ptr<AlarmScheduler>& alarmScheduler,
+ const HostAndPort& host,
+ Protocols protocol,
+ size_t generation)
+ : ConnectionInterface(generation),
+ _executor(std::move(executor)),
+ _alarmScheduler(alarmScheduler),
+ _timer(clockSource, alarmScheduler),
+ _target(host),
+ _protocol(protocol) {}
+
+
+ virtual ~PooledCurlHandle() = default;
+
+ const HostAndPort& getHostAndPort() const final {
+ return _target;
+ }
+
+ // This cannot block under any circumstances because the ConnectionPool is holding
+ // a mutex while calling isHealthy(). Since we don't have a good way of knowing whether
+ // the connection is healthy, just return true here.
+ bool isHealthy() final {
+ return true;
+ }
+
+ void setTimeout(Milliseconds timeout, TimeoutCallback cb) final {
+ _timer.setTimeout(timeout, cb);
+ }
+
+ void cancelTimeout() final {
+ _timer.cancelTimeout();
+ }
+
+ Date_t now() final {
+ return _timer.now();
+ }
+
+ transport::ConnectSSLMode getSslMode() const final {
+ return mapProtocolToSSLMode(_protocol);
+ }
+
+ CURL* get() {
+ return _handle.get();
+ }
+
+private:
+ void setup(Milliseconds timeout, SetupCallback cb) final;
+
+ void refresh(Milliseconds timeout, RefreshCallback cb) final;
+
+private:
+ std::shared_ptr<OutOfLineExecutor> _executor;
+ std::shared_ptr<AlarmScheduler> _alarmScheduler;
+ CurlHandleTimer _timer;
+ HostAndPort _target;
+
+ Protocols _protocol;
+ CurlEasyHandle _handle;
+};
+
+void PooledCurlHandle::setup(Milliseconds timeout, SetupCallback cb) {
+ auto anchor = shared_from_this();
+ _executor->schedule([this, anchor, cb = std::move(cb)](auto execStatus) {
+ if (!execStatus.isOK()) {
+ cb(this, execStatus);
+ return;
+ }
+
+ _handle = createCurlEasyHandle(_protocol);
+
+ cb(this, Status::OK());
+ });
+}
+
+void PooledCurlHandle::refresh(Milliseconds timeout, RefreshCallback cb) {
+ auto anchor = shared_from_this();
+ _executor->schedule([this, anchor, cb = std::move(cb)](auto execStatus) {
+ if (!execStatus.isOK()) {
+ cb(this, execStatus);
+ return;
}
+
+ // Tell the connection pool that it was a success. Curl reconnects seamlessly behind the
+ // scenes and there is no reliable way to test if the connection is still alive in a
+ // connection agnostic way. HTTP verbs like HEAD are not uniformly supported.
+ //
+ // The connection pool simply needs to prune handles on a timer for us.
+ indicateSuccess();
+ indicateUsed();
+
+ cb(this, Status::OK());
+ });
+}
+
+std::shared_ptr<executor::ConnectionPool::ConnectionInterface>
+CurlHandleTypeFactory::makeConnection(const HostAndPort& host,
+ transport::ConnectSSLMode sslMode,
+ size_t generation) {
+ _start();
+
+ return std::make_shared<PooledCurlHandle>(
+ _executor, _clockSource, _timerScheduler, host, mapSSLModeToProtocol(sslMode), generation);
+}
+
+/**
+ * Handle that manages connection pool semantics and returns handle to connection pool in
+ * destructor.
+ *
+ * Caller must call indiciateSuccess if they want the handle to be reused.
+ */
+class CurlHandle {
+public:
+ CurlHandle(executor::ConnectionPool::ConnectionHandle handle, CURL* curlHandle)
+ : _poolHandle(std::move(handle)), _handle(curlHandle) {}
+
+ ~CurlHandle() {
+ if (!_success) {
+ _poolHandle->indicateFailure(
+ Status(ErrorCodes::HostUnreachable, "unknown curl handle failure"));
+ }
+ }
+
+ CURL* get() {
+ return _handle;
+ }
+
+ void indicateSuccess() {
+ _poolHandle->indicateSuccess();
+
+ // Tell the connection pool that we used the connection otherwise the pool will be believe
+ // the connection went idle since it is possible to checkout a connection and not actually
+ // use it.
+ _poolHandle->indicateUsed();
+
+ _success = true;
+ }
+
+private:
+ executor::ConnectionPool::ConnectionHandle _poolHandle;
+ bool _success = false;
+
+ // Owned by _poolHandle
+ CURL* _handle;
+};
+
+/**
+ * Factory that returns curl handles managed in connection pool
+ */
+class CurlPool {
+public:
+ CurlPool()
+ : _typeFactory(std::make_shared<CurlHandleTypeFactory>()),
+ _pool(std::make_shared<executor::ConnectionPool>(
+ _typeFactory, "Curl", makePoolOptions(Seconds(60)))) {}
+
+ CurlHandle get(HostAndPort server, Protocols protocol);
+
+private:
+ std::shared_ptr<CurlHandleTypeFactory> _typeFactory;
+ std::shared_ptr<executor::ConnectionPool> _pool;
+};
+
+CurlHandle CurlPool::get(HostAndPort server, Protocols protocol) {
+
+ auto sslMode = mapProtocolToSSLMode(protocol);
+
+ auto semi = _pool->get(server, sslMode, Seconds(60));
+
+ StatusWith<executor::ConnectionPool::ConnectionHandle> swHandle = std::move(semi).getNoThrow();
+ invariant(swHandle.isOK());
+
+ auto curlHandle = static_cast<PooledCurlHandle*>(swHandle.getValue().get())->get();
+
+ return CurlHandle(std::move(swHandle.getValue()), curlHandle);
+}
+
+HostAndPort exactHostAndPortFromUrl(StringData url) {
+ // Treat the URL as a host and port
+ // URL: http(s)?://(host):(port)/...
+ //
+ constexpr StringData slashes = "//"_sd;
+ auto slashesIndex = url.find(slashes);
+ uassert(5413902, str::stream() << "//, URL: " << url, slashesIndex != std::string::npos);
+
+ url = url.substr(slashesIndex + slashes.size());
+ if (url.find('/') != std::string::npos) {
+ url = url.substr(0, url.find("/"));
+ }
+
+ return HostAndPort(url);
+}
+
+/**
+ * The connection pool requires the ability to spawn threads which is not allowed through
+ * options parsing. Callers should default to HttpConnectionPool::kUse unless they are calling
+ * into the HttpClient before thread spawning is allowed.
+ */
+enum class HttpConnectionPool {
+ kUse,
+ kDoNotUse,
+};
+
+enum class HttpMethod {
+ kGET,
+ kPOST,
+ kPUT,
+};
+
+class CurlHttpClient final : public HttpClient {
+public:
+ CurlHttpClient(HttpConnectionPool pool) : _pool(pool) {}
+
+ void allowInsecureHTTP(bool allow) final {
+ _allowInsecure = allow;
}
void setHeaders(const std::vector<std::string>& headers) final {
// Can't set on base handle because cURL doesn't deep-dup this field
// and we don't want it getting overwritten while another thread is using it.
+
_headers = headers;
}
void setTimeout(Seconds timeout) final {
- curl_easy_setopt(_handle.get(), CURLOPT_TIMEOUT, longSeconds(timeout));
+ _timeout = timeout;
}
void setConnectTimeout(Seconds timeout) final {
- curl_easy_setopt(_handle.get(), CURLOPT_CONNECTTIMEOUT, longSeconds(timeout));
+ _connectTimeout = timeout;
}
- DataBuilder get(StringData url) const final {
- // Make a local copy of the base handle for this request.
- CurlHandle myHandle(curl_easy_duphandle(_handle.get()));
- uassert(ErrorCodes::InternalError, "Curl initialization failed", myHandle);
+ DataBuilder post(StringData url, ConstDataRange data) const final {
+ return request(HttpMethod::kPOST, url, data);
+ }
- return doRequest(myHandle.get(), url);
+ DataBuilder get(StringData url) const final {
+ return request(HttpMethod::kGET, url, {nullptr, 0});
}
- DataBuilder post(StringData url, ConstDataRange cdr) const final {
- // Make a local copy of the base handle for this request.
- CurlHandle myHandle(curl_easy_duphandle(_handle.get()));
- uassert(ErrorCodes::InternalError, "Curl initialization failed", myHandle);
+ DataBuilder request(HttpMethod method,
+ StringData url,
+ ConstDataRange cdr = {nullptr, 0}) const {
+ auto protocol = _allowInsecure ? Protocols::kHttpOrHttps : Protocols::kHttpsOnly;
+ if (_pool == HttpConnectionPool::kUse) {
+ static CurlPool factory;
- curl_easy_setopt(myHandle.get(), CURLOPT_POST, 1);
+ auto server = exactHostAndPortFromUrl(url);
- ConstDataRangeCursor cdrc(cdr);
- curl_easy_setopt(myHandle.get(), CURLOPT_READFUNCTION, ReadMemoryCallback);
- curl_easy_setopt(myHandle.get(), CURLOPT_READDATA, &cdrc);
- curl_easy_setopt(myHandle.get(), CURLOPT_POSTFIELDSIZE, (long)cdrc.length());
+ CurlHandle handle(factory.get(server, protocol));
+ auto reply = request(handle.get(), method, url, cdr);
+
+ // indidicateFailure will be called if indicateSuccess is not called.
+ handle.indicateSuccess();
+ return reply;
- return doRequest(myHandle.get(), url);
+ } else {
+ // Make a request with a non-pooled handle. This is needed during server startup when
+ // thread spawning is not allowed which is required by the thread pool.
+ auto handle = createCurlEasyHandle(protocol);
+ return request(handle.get(), method, url, cdr);
+ }
}
private:
- /**
- * Helper for use with curl_easy_setopt which takes a vararg list,
- * and expects a long, not the long long durationCount() returns.
- */
- long longSeconds(Seconds tm) {
- return static_cast<long>(durationCount<Seconds>(tm));
- }
+ DataBuilder request(CURL* handle, HttpMethod method, StringData url, ConstDataRange cdr) const {
+ uassert(ErrorCodes::InternalError, "Curl initialization failed", handle);
+
+ curl_easy_setopt(handle, CURLOPT_TIMEOUT, longSeconds(_timeout));
+
+ curl_easy_setopt(handle, CURLOPT_CONNECTTIMEOUT, longSeconds(_connectTimeout));
+
+ ConstDataRangeCursor cdrc(cdr);
+ switch (method) {
+ case HttpMethod::kGET:
+ uassert(ErrorCodes::BadValue,
+ "Request body not permitted with GET requests",
+ cdr.length() == 0);
+ // Per https://curl.se/libcurl/c/CURLOPT_POST.html
+ // We need to reset the type of request we want to make when reusing the request
+ // curl_easy_setopt(handle, CURLOPT_HTTPGET, 1);
+ break;
+ case HttpMethod::kPOST:
+ curl_easy_setopt(handle, CURLOPT_PUT, 0);
+ curl_easy_setopt(handle, CURLOPT_POST, 1);
+
+ curl_easy_setopt(handle, CURLOPT_READFUNCTION, ReadMemoryCallback);
+ curl_easy_setopt(handle, CURLOPT_READDATA, &cdrc);
+ curl_easy_setopt(handle, CURLOPT_POSTFIELDSIZE, (long)cdrc.length());
+ break;
+ case HttpMethod::kPUT:
+ curl_easy_setopt(handle, CURLOPT_POST, 0);
+ curl_easy_setopt(handle, CURLOPT_PUT, 1);
+
+ curl_easy_setopt(handle, CURLOPT_READFUNCTION, ReadMemoryCallback);
+ curl_easy_setopt(handle, CURLOPT_READDATA, &cdrc);
+ curl_easy_setopt(handle, CURLOPT_INFILESIZE_LARGE, (long)cdrc.length());
+ break;
+ default:
+ MONGO_UNREACHABLE;
+ }
- DataBuilder doRequest(CURL* handle, StringData url) const {
const auto urlString = url.toString();
curl_easy_setopt(handle, CURLOPT_URL, urlString.c_str());
- curl_easy_setopt(handle, CURLOPT_SHARE, curlLibraryManager.getShareHandle());
- DataBuilder dataBuilder(4096);
+ DataBuilder dataBuilder(4096), headerBuilder(4096);
curl_easy_setopt(handle, CURLOPT_WRITEDATA, &dataBuilder);
curl_slist* chunk = curl_slist_append(nullptr, "Connection: keep-alive");
@@ -352,51 +706,67 @@ private:
<< curl_easy_strerror(result),
result == CURLE_OK);
- uassert(ErrorCodes::OperationFailed,
- str::stream() << "Unexpected http status code from server: " << statusCode,
- statusCode == 200);
return dataBuilder;
}
private:
- CurlHandle _handle;
std::vector<std::string> _headers;
+
+ HttpConnectionPool _pool;
+
+ bool _allowInsecure{false};
+ Seconds _timeout;
+ Seconds _connectTimeout;
};
-} // namespace
+class HttpClientProviderImpl : public HttpClientProvider {
+public:
+ HttpClientProviderImpl() {
+ registerHTTPClientProvider(this);
+ }
-// Transitional API used by blockstore to trigger libcurl init
-// until it's been migrated to use the HTTPClient API.
-Status curlLibraryManager_initialize() {
- return curlLibraryManager.initialize();
-}
+ std::unique_ptr<HttpClient> create() final {
+ uassertStatusOK(curlLibraryManager.initialize());
+ return std::make_unique<CurlHttpClient>(HttpConnectionPool::kUse);
+ }
-std::unique_ptr<HttpClient> HttpClient::create() {
- uassertStatusOK(curlLibraryManager.initialize());
- return std::make_unique<CurlHttpClient>();
-}
+ std::unique_ptr<HttpClient> createWithoutConnectionPool() final {
+ uassertStatusOK(curlLibraryManager.initialize());
+ return std::make_unique<CurlHttpClient>(HttpConnectionPool::kDoNotUse);
+ }
-BSONObj HttpClient::getServerStatus() {
+ BSONObj getServerStatus() final {
- BSONObjBuilder info;
- info.append("type", "curl");
+ BSONObjBuilder info;
+ info.append("type", "curl");
- {
- BSONObjBuilder v(info.subobjStart("compiled"));
- v.append("version", LIBCURL_VERSION);
- v.append("version_num", LIBCURL_VERSION_NUM);
- }
+ {
+ BSONObjBuilder v(info.subobjStart("compiled"));
+ v.append("version", LIBCURL_VERSION);
+ v.append("version_num", LIBCURL_VERSION_NUM);
+ }
- {
- auto* curl_info = curl_version_info(CURLVERSION_NOW);
+ {
+ auto* curl_info = curl_version_info(CURLVERSION_NOW);
- BSONObjBuilder v(info.subobjStart("running"));
- v.append("version", curl_info->version);
- v.append("version_num", curl_info->version_num);
+ BSONObjBuilder v(info.subobjStart("running"));
+ v.append("version", curl_info->version);
+ v.append("version_num", static_cast<int>(curl_info->version_num));
+ }
+
+ return info.obj();
}
- return info.obj();
+} provider;
+
+} // namespace
+
+// Transitional API used by blockstore to trigger libcurl init
+// until it's been migrated to use the HTTPClient API.
+Status curlLibraryManager_initialize() {
+ return curlLibraryManager.initialize();
}
+
} // namespace mongo
diff --git a/src/mongo/util/net/http_client_none.cpp b/src/mongo/util/net/http_client_none.cpp
index 6a3d11e6b1a..690748807bf 100644
--- a/src/mongo/util/net/http_client_none.cpp
+++ b/src/mongo/util/net/http_client_none.cpp
@@ -32,12 +32,29 @@
namespace mongo {
-std::unique_ptr<HttpClient> HttpClient::create() {
- return nullptr;
-}
+namespace {
-BSONObj HttpClient::getServerStatus() {
- return BSONObj();
-}
+class HttpClientProviderImpl : public HttpClientProvider {
+public:
+ HttpClientProviderImpl() {
+ registerHTTPClientProvider(this);
+ }
+ std::unique_ptr<HttpClient> create() final {
+ return nullptr;
+ }
+
+ std::unique_ptr<HttpClient> createWithoutConnectionPool() final {
+ return nullptr;
+ }
+
+ /**
+ * Content for ServerStatus http_client section.
+ */
+ BSONObj getServerStatus() final {
+ return BSONObj();
+ }
+} provider;
+
+} // namespace
} // namespace mongo
diff --git a/src/mongo/util/net/http_client_winhttp.cpp b/src/mongo/util/net/http_client_winhttp.cpp
index f774e7387e4..f70567b9757 100644
--- a/src/mongo/util/net/http_client_winhttp.cpp
+++ b/src/mongo/util/net/http_client_winhttp.cpp
@@ -300,15 +300,26 @@ private:
Seconds _timeout = kTotalRequestTimeout;
};
-} // namespace
+class HttpClientProviderImpl : public HttpClientProvider {
+public:
+ HttpClientProviderImpl() {
+ registerHTTPClientProvider(this);
+ }
-std::unique_ptr<HttpClient> HttpClient::create() {
- return std::make_unique<WinHttpClient>();
-}
+ std::unique_ptr<HttpClient> create() final {
+ return std::make_unique<WinHttpClient>();
+ }
-BSONObj HttpClient::getServerStatus() {
- return BSON("type"
- << "winhttp");
-}
+ std::unique_ptr<HttpClient> createWithoutConnectionPool() final {
+ return std::make_unique<WinHttpClient>();
+ }
+
+ BSONObj getServerStatus() final {
+ return BSON("type"
+ << "winhttp");
+ }
+} provider;
+
+} // namespace
} // namespace mongo
diff --git a/src/mongo/util/options_parser/options_parser.cpp b/src/mongo/util/options_parser/options_parser.cpp
index aa04f99d94f..f92358ef8de 100644
--- a/src/mongo/util/options_parser/options_parser.cpp
+++ b/src/mongo/util/options_parser/options_parser.cpp
@@ -540,7 +540,7 @@ private:
std::string runYAMLRestExpansion(StringData url, Seconds timeout) {
- auto client = HttpClient::create();
+ auto client = HttpClient::createWithoutConnectionPool();
uassert(
ErrorCodes::OperationFailed, "No HTTP Client available in this build of MongoDB", client);