summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/client/SConscript20
-rw-r--r--src/mongo/client/async_client.cpp264
-rw-r--r--src/mongo/client/async_client.h90
-rw-r--r--src/mongo/executor/SConscript19
-rw-r--r--src/mongo/executor/connection_pool.cpp104
-rw-r--r--src/mongo/executor/connection_pool.h2
-rw-r--r--src/mongo/executor/connection_pool_test.cpp10
-rw-r--r--src/mongo/executor/connection_pool_tl.cpp228
-rw-r--r--src/mongo/executor/connection_pool_tl.h121
-rw-r--r--src/mongo/executor/network_interface.h3
-rw-r--r--src/mongo/executor/network_interface_tl.cpp370
-rw-r--r--src/mongo/executor/network_interface_tl.h119
-rw-r--r--src/mongo/executor/task_executor.h1
-rw-r--r--src/mongo/s/sharding_router_test_fixture.cpp11
-rw-r--r--src/mongo/s/sharding_router_test_fixture.h1
-rw-r--r--src/mongo/transport/mock_session.h6
-rw-r--r--src/mongo/transport/transport_layer.h20
-rw-r--r--src/mongo/transport/transport_layer_asio.cpp74
18 files changed, 1385 insertions, 78 deletions
diff --git a/src/mongo/client/SConscript b/src/mongo/client/SConscript
index 420260ad62c..2e063b97746 100644
--- a/src/mongo/client/SConscript
+++ b/src/mongo/client/SConscript
@@ -209,6 +209,26 @@ env.CppIntegrationTest(
)
env.Library(
+ target='async_client',
+ source=[
+ 'async_client.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/db/auth/authcommon',
+ '$BUILD_DIR/mongo/db/wire_version',
+ '$BUILD_DIR/mongo/rpc/command_status',
+ '$BUILD_DIR/mongo/rpc/rpc',
+ '$BUILD_DIR/mongo/transport/transport_layer_common',
+ 'authentication',
+ ],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/db/commands/test_commands_enabled',
+ '$BUILD_DIR/mongo/executor/egress_tag_closer_manager',
+ '$BUILD_DIR/mongo/transport/message_compressor',
+ ],
+)
+
+env.Library(
target='connection_pool',
source=[
'connection_pool.cpp',
diff --git a/src/mongo/client/async_client.cpp b/src/mongo/client/async_client.cpp
new file mode 100644
index 00000000000..c1499fe394d
--- /dev/null
+++ b/src/mongo/client/async_client.cpp
@@ -0,0 +1,264 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/client/async_client.h"
+
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/client/authenticate.h"
+#include "mongo/config.h"
+#include "mongo/db/auth/authorization_manager_global.h"
+#include "mongo/db/auth/internal_user_auth.h"
+#include "mongo/db/commands/test_commands_enabled.h"
+#include "mongo/db/wire_version.h"
+#include "mongo/executor/egress_tag_closer_manager.h"
+#include "mongo/rpc/factory.h"
+#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/rpc/legacy_request_builder.h"
+#include "mongo/rpc/metadata/client_metadata.h"
+#include "mongo/rpc/reply_interface.h"
+#include "mongo/stdx/memory.h"
+#include "mongo/util/log.h"
+#include "mongo/util/net/sock.h"
+#include "mongo/util/net/ssl_manager.h"
+#include "mongo/util/version.h"
+
+namespace mongo {
+
+Future<AsyncDBClient::Handle> AsyncDBClient::connect(const HostAndPort& peer,
+ transport::ConnectSSLMode sslMode,
+ ServiceContext* const context,
+ transport::ReactorHandle reactor) {
+ auto tl = context->getTransportLayer();
+ return tl->asyncConnect(peer, sslMode, std::move(reactor))
+ .then([peer, context](transport::SessionHandle session) {
+ return std::make_shared<AsyncDBClient>(peer, std::move(session), context);
+ });
+}
+
+BSONObj AsyncDBClient::_buildIsMasterRequest(const std::string& appName) {
+ BSONObjBuilder bob;
+
+ bob.append("isMaster", 1);
+ bob.append("hangUpOnStepDown", false);
+ const auto versionString = VersionInfoInterface::instance().version();
+ ClientMetadata::serialize(appName, versionString, &bob);
+
+ if (getTestCommandsEnabled()) {
+ // Only include the host:port of this process in the isMaster command request if test
+ // commands are enabled. mongobridge uses this field to identify the process opening a
+ // connection to it.
+ StringBuilder sb;
+ sb << getHostNameCached() << ':' << serverGlobalParams.port;
+ bob.append("hostInfo", sb.str());
+ }
+
+ _compressorManager.clientBegin(&bob);
+
+ if (WireSpec::instance().isInternalClient) {
+ WireSpec::appendInternalClientWireVersion(WireSpec::instance().outgoing, &bob);
+ }
+
+ return bob.obj();
+}
+
+void AsyncDBClient::_parseIsMasterResponse(BSONObj request,
+ const std::unique_ptr<rpc::ReplyInterface>& response) {
+ uassert(50786,
+ "Expected opQuery response to isMaster",
+ response->getProtocol() == rpc::Protocol::kOpQuery);
+ auto responseBody = response->getCommandReply();
+ uassertStatusOK(getStatusFromCommandResult(responseBody));
+
+ auto protocolSet = uassertStatusOK(rpc::parseProtocolSetFromIsMasterReply(responseBody));
+ auto validateStatus =
+ rpc::validateWireVersion(WireSpec::instance().outgoing, protocolSet.version);
+ if (!validateStatus.isOK()) {
+ warning() << "remote host has incompatible wire version: " << validateStatus;
+ uasserted(validateStatus.code(),
+ str::stream() << "remote host has incompatible wire version: "
+ << validateStatus.reason());
+ }
+
+ auto& egressTagManager = executor::EgressTagCloserManager::get(_svcCtx);
+ // Tag outgoing connection so it can be kept open on FCV upgrade if it is not to a
+ // server with a lower binary version.
+ if (protocolSet.version.maxWireVersion >= WireSpec::instance().outgoing.maxWireVersion) {
+ egressTagManager.mutateTags(
+ _peer, [](transport::Session::TagMask tags) { return transport::Session::kKeepOpen; });
+ }
+
+ auto clientProtocols = rpc::computeProtocolSet(WireSpec::instance().outgoing);
+ invariant(clientProtocols != rpc::supports::kNone);
+ // Set the operation protocol
+ _negotiatedProtocol = uassertStatusOK(rpc::negotiate(protocolSet.protocolSet, clientProtocols));
+
+ _compressorManager.clientFinish(responseBody);
+}
+
+Future<void> AsyncDBClient::authenticate(const BSONObj& params) {
+ // This check is sufficient to see if auth is enabled on the system,
+ // and avoids creating dependencies on deeper, less accessible auth code.
+ if (!isInternalAuthSet()) {
+ return Future<void>::makeReady();
+ }
+
+ // We will only have a valid clientName if SSL is enabled.
+ std::string clientName;
+#ifdef MONGO_CONFIG_SSL
+ if (getSSLManager()) {
+ clientName = getSSLManager()->getSSLConfiguration().clientSubjectName;
+ }
+#endif
+
+ Promise<void> retPromise;
+ auto ret = retPromise.getFuture();
+
+ auto authCompleteCb = [promise = retPromise.share()](auth::AuthResponse response) mutable {
+ if (response.isOK()) {
+ promise.emplaceValue();
+ } else {
+ promise.setError(response.status);
+ }
+ };
+
+ auto doAuthCb = [this](executor::RemoteCommandRequest request,
+ auth::AuthCompletionHandler handler) {
+
+ runCommandRequest(request).getAsync([handler = std::move(handler)](
+ StatusWith<executor::RemoteCommandResponse> response) {
+ if (!response.isOK()) {
+ handler(executor::RemoteCommandResponse(response.getStatus()));
+ } else {
+ handler(std::move(response.getValue()));
+ }
+ });
+ };
+
+ auth::authenticateClient(
+ params, remote(), clientName, std::move(doAuthCb), std::move(authCompleteCb));
+
+ return ret;
+}
+
+Future<void> AsyncDBClient::initWireVersion(const std::string& appName,
+ executor::NetworkConnectionHook* const hook) {
+ auto requestObj = _buildIsMasterRequest(appName);
+ // We use a legacy request to create our ismaster request because we may
+ // have to communicate with servers that do not support other protocols.
+ auto requestMsg =
+ rpc::legacyRequestFromOpMsgRequest(OpMsgRequest::fromDBAndBody("admin", requestObj));
+ auto clkSource = _svcCtx->getFastClockSource();
+ auto start = clkSource->now();
+
+ return _call(requestMsg).then([this, requestObj, hook, clkSource, start](Message response) {
+ auto cmdReply = rpc::makeReply(&response);
+ if (hook) {
+ auto millis = duration_cast<Milliseconds>(clkSource->now() - start);
+ executor::RemoteCommandResponse cmdResp(*cmdReply, millis);
+ uassertStatusOK(hook->validateHost(_peer, std::move(cmdResp)));
+ }
+ _parseIsMasterResponse(requestObj, cmdReply);
+ });
+}
+
+Future<Message> AsyncDBClient::_call(Message request) {
+ auto swm = _compressorManager.compressMessage(request);
+ if (!swm.isOK()) {
+ return swm.getStatus();
+ }
+
+ request = std::move(swm.getValue());
+ auto msgId = nextMessageId();
+ request.header().setId(msgId);
+ request.header().setResponseToMsgId(0);
+
+ return _session->asyncSinkMessage(request)
+ .then([this] { return _session->asyncSourceMessage(); })
+ .then([this, msgId](Message response) -> StatusWith<Message> {
+ uassert(50787,
+ "ResponseId did not match sent message ID.",
+ response.header().getResponseToMsgId() == msgId);
+
+ if (response.operation() == dbCompressed) {
+ return _compressorManager.decompressMessage(response);
+ } else {
+ return response;
+ }
+ });
+}
+
+Future<rpc::UniqueReply> AsyncDBClient::runCommand(OpMsgRequest request) {
+ invariant(_negotiatedProtocol);
+ auto requestMsg = rpc::messageFromOpMsgRequest(*_negotiatedProtocol, std::move(request));
+ return _call(std::move(requestMsg)).then([this](Message response) -> Future<rpc::UniqueReply> {
+ return rpc::UniqueReply(response, rpc::makeReply(&response));
+ });
+}
+
+Future<executor::RemoteCommandResponse> AsyncDBClient::runCommandRequest(
+ executor::RemoteCommandRequest request) {
+ auto clkSource = _svcCtx->getPreciseClockSource();
+ auto start = clkSource->now();
+ auto opMsgRequest = OpMsgRequest::fromDBAndBody(
+ std::move(request.dbname), std::move(request.cmdObj), std::move(request.metadata));
+ return runCommand(std::move(opMsgRequest))
+ .then([start, clkSource, this](rpc::UniqueReply response) {
+ auto duration = duration_cast<Milliseconds>(clkSource->now() - start);
+ return executor::RemoteCommandResponse(*response, duration);
+ })
+ .onError([start, clkSource](Status status) {
+ auto duration = duration_cast<Milliseconds>(clkSource->now() - start);
+ return executor::RemoteCommandResponse(status, duration);
+ });
+}
+
+void AsyncDBClient::cancel() {
+ _session->cancelAsyncOperations();
+}
+
+bool AsyncDBClient::isStillConnected() {
+ return _session->isConnected();
+}
+
+void AsyncDBClient::end() {
+ _session->end();
+}
+
+const HostAndPort& AsyncDBClient::remote() const {
+ return _peer;
+}
+
+const HostAndPort& AsyncDBClient::local() const {
+ return _session->local();
+}
+
+} // namespace mongo
diff --git a/src/mongo/client/async_client.h b/src/mongo/client/async_client.h
new file mode 100644
index 00000000000..ee7239a1d03
--- /dev/null
+++ b/src/mongo/client/async_client.h
@@ -0,0 +1,90 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "mongo/db/service_context.h"
+#include "mongo/executor/network_connection_hook.h"
+#include "mongo/executor/remote_command_request.h"
+#include "mongo/executor/remote_command_response.h"
+#include "mongo/rpc/protocol.h"
+#include "mongo/rpc/unique_message.h"
+#include "mongo/transport/message_compressor_manager.h"
+#include "mongo/transport/transport_layer.h"
+#include "mongo/util/future.h"
+
+namespace mongo {
+
+class AsyncDBClient : public std::enable_shared_from_this<AsyncDBClient> {
+public:
+ explicit AsyncDBClient(const HostAndPort& peer,
+ transport::SessionHandle session,
+ ServiceContext* svcCtx)
+ : _peer(std::move(peer)), _session(std::move(session)), _svcCtx(svcCtx) {}
+
+ using Handle = std::shared_ptr<AsyncDBClient>;
+
+ static Future<Handle> connect(const HostAndPort& peer,
+ transport::ConnectSSLMode sslMode,
+ ServiceContext* const context,
+ transport::ReactorHandle reactor);
+
+ Future<executor::RemoteCommandResponse> runCommandRequest(
+ executor::RemoteCommandRequest request);
+ Future<rpc::UniqueReply> runCommand(OpMsgRequest request);
+
+ Future<void> authenticate(const BSONObj& params);
+
+ Future<void> initWireVersion(const std::string& appName,
+ executor::NetworkConnectionHook* const hook);
+
+ void cancel();
+
+ bool isStillConnected();
+
+ void end();
+
+ const HostAndPort& remote() const;
+ const HostAndPort& local() const;
+
+private:
+ Future<Message> _call(Message request);
+ BSONObj _buildIsMasterRequest(const std::string& appName);
+ void _parseIsMasterResponse(BSONObj request,
+ const std::unique_ptr<rpc::ReplyInterface>& response);
+
+ const HostAndPort _peer;
+ transport::SessionHandle _session;
+ ServiceContext* const _svcCtx;
+ MessageCompressorManager _compressorManager;
+ boost::optional<rpc::Protocol> _negotiatedProtocol;
+};
+
+} // namespace mongo
diff --git a/src/mongo/executor/SConscript b/src/mongo/executor/SConscript
index a31aede6c85..6c13b1c460b 100644
--- a/src/mongo/executor/SConscript
+++ b/src/mongo/executor/SConscript
@@ -157,6 +157,24 @@ env.Library(target='egress_tag_closer_manager',
])
env.Library(
+ target='network_interface_tl',
+ source=[
+ 'connection_pool_tl.cpp',
+ 'network_interface_tl.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/client/async_client',
+ '$BUILD_DIR/mongo/transport/transport_layer',
+ ],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/db/auth/authcommon',
+ '$BUILD_DIR/mongo/transport/transport_layer_manager',
+ 'connection_pool_executor',
+ 'network_interface',
+ ]
+)
+
+env.Library(
target='network_interface_asio',
source=[
'connection_pool_asio.cpp',
@@ -284,6 +302,7 @@ env.Library(
LIBDEPS=[
'network_interface',
'network_interface_asio',
+ 'network_interface_tl',
],
LIBDEPS_PRIVATE=[
'egress_tag_closer_manager',
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp
index c00fe1c739b..7092c460ed3 100644
--- a/src/mongo/executor/connection_pool.cpp
+++ b/src/mongo/executor/connection_pool.cpp
@@ -77,13 +77,13 @@ public:
* pool.runWithActiveClient([](stdx::unique_lock<stdx::mutex> lk){ codeToBeProtected(); });
*/
template <typename Callback>
- void runWithActiveClient(Callback&& cb) {
- runWithActiveClient(stdx::unique_lock<stdx::mutex>(_parent->_mutex),
- std::forward<Callback>(cb));
+ auto runWithActiveClient(Callback&& cb) {
+ return runWithActiveClient(stdx::unique_lock<stdx::mutex>(_parent->_mutex),
+ std::forward<Callback>(cb));
}
template <typename Callback>
- void runWithActiveClient(stdx::unique_lock<stdx::mutex> lk, Callback&& cb) {
+ auto runWithActiveClient(stdx::unique_lock<stdx::mutex> lk, Callback&& cb) {
invariant(lk.owns_lock());
_activeClients++;
@@ -96,7 +96,7 @@ public:
{
decltype(lk) localLk(std::move(lk));
- cb(std::move(localLk));
+ return cb(std::move(localLk));
}
}
@@ -107,10 +107,9 @@ public:
* Gets a connection from the specific pool. Sinks a unique_lock from the
* parent to preserve the lock on _mutex
*/
- void getConnection(const HostAndPort& hostAndPort,
- Milliseconds timeout,
- stdx::unique_lock<stdx::mutex> lk,
- GetConnectionCallback cb);
+ Future<ConnectionHandle> getConnection(const HostAndPort& hostAndPort,
+ Milliseconds timeout,
+ stdx::unique_lock<stdx::mutex> lk);
/**
* Cascades a failure across existing connections and requests. Invoking
@@ -185,7 +184,7 @@ private:
using OwnedConnection = std::unique_ptr<ConnectionInterface>;
using OwnershipPool = stdx::unordered_map<ConnectionInterface*, OwnedConnection>;
using LRUOwnershipPool = LRUCache<OwnershipPool::key_type, OwnershipPool::mapped_type>;
- using Request = std::pair<Date_t, GetConnectionCallback>;
+ using Request = std::pair<Date_t, SharedPromise<ConnectionHandle>>;
struct RequestComparator {
bool operator()(const Request& a, const Request& b) {
return a.first > b.first;
@@ -218,7 +217,7 @@ private:
OwnershipPool _droppedProcessingPool;
OwnershipPool _checkedOutPool;
- std::priority_queue<Request, std::vector<Request>, RequestComparator> _requests;
+ std::vector<Request> _requests;
std::unique_ptr<TimerInterface> _requestTimer;
Date_t _requestTimerExpiration;
@@ -285,6 +284,37 @@ ConnectionPool::~ConnectionPool() {
if (hasGlobalServiceContext() && _manager) {
_manager->remove(this);
}
+
+ std::vector<SpecificPool*> pools;
+
+ // Ensure we decrement active clients for all pools that we inc on (because we intend to process
+ // failures)
+ const auto guard = MakeGuard([&] {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+
+ for (const auto& pool : pools) {
+ pool->decActiveClients(lk);
+ }
+ });
+
+ // Grab all current pools that don't match tags (under the lock)
+ {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+
+ for (auto& pair : _pools) {
+ pools.push_back(pair.second.get());
+ pair.second->incActiveClients(lk);
+ }
+ }
+
+ // Reacquire the lock per pool and process failures. We'll dec active clients when we're all
+ // through in the guard
+ for (const auto& pool : pools) {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ pool->processFailure(
+ Status(ErrorCodes::ShutdownInProgress, "Shuting down the connection pool"),
+ std::move(lk));
+ }
}
void ConnectionPool::dropConnections(const HostAndPort& hostAndPort) {
@@ -353,6 +383,11 @@ void ConnectionPool::mutateTags(
void ConnectionPool::get(const HostAndPort& hostAndPort,
Milliseconds timeout,
GetConnectionCallback cb) {
+ return get(hostAndPort, timeout).getAsync(std::move(cb));
+}
+
+Future<ConnectionPool::ConnectionHandle> ConnectionPool::get(const HostAndPort& hostAndPort,
+ Milliseconds timeout) {
SpecificPool* pool;
stdx::unique_lock<stdx::mutex> lk(_mutex);
@@ -369,8 +404,8 @@ void ConnectionPool::get(const HostAndPort& hostAndPort,
invariant(pool);
- pool->runWithActiveClient(std::move(lk), [&](decltype(lk) lk) {
- pool->getConnection(hostAndPort, timeout, std::move(lk), std::move(cb));
+ return pool->runWithActiveClient(std::move(lk), [&](decltype(lk) lk) {
+ return pool->getConnection(hostAndPort, timeout, std::move(lk));
});
}
@@ -451,22 +486,26 @@ size_t ConnectionPool::SpecificPool::openConnections(const stdx::unique_lock<std
return _checkedOutPool.size() + _readyPool.size() + _processingPool.size();
}
-void ConnectionPool::SpecificPool::getConnection(const HostAndPort& hostAndPort,
- Milliseconds timeout,
- stdx::unique_lock<stdx::mutex> lk,
- GetConnectionCallback cb) {
+Future<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::getConnection(
+ const HostAndPort& hostAndPort, Milliseconds timeout, stdx::unique_lock<stdx::mutex> lk) {
if (timeout < Milliseconds(0) || timeout > _parent->_options.refreshTimeout) {
timeout = _parent->_options.refreshTimeout;
}
const auto expiration = _parent->_factory->now() + timeout;
- _requests.push(make_pair(expiration, std::move(cb)));
+ Promise<ConnectionHandle> promise;
+ auto future = promise.getFuture();
+
+ _requests.push_back(make_pair(expiration, promise.share()));
+ std::push_heap(begin(_requests), end(_requests), RequestComparator{});
updateStateInLock();
spawnConnections(lk);
fulfillRequests(lk);
+
+ return future;
}
void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr,
@@ -634,9 +673,8 @@ void ConnectionPool::SpecificPool::processFailure(const Status& status,
// with the same failed status
lk.unlock();
- while (requestsToFail.size()) {
- requestsToFail.top().second(status);
- requestsToFail.pop();
+ for (auto& request : requestsToFail) {
+ request.second.setError(status);
}
}
@@ -678,8 +716,9 @@ void ConnectionPool::SpecificPool::fulfillRequests(stdx::unique_lock<stdx::mutex
}
// Grab the request and callback
- auto cb = std::move(_requests.top().second);
- _requests.pop();
+ auto promise = std::move(_requests.front().second);
+ std::pop_heap(begin(_requests), end(_requests), RequestComparator{});
+ _requests.pop_back();
auto connPtr = conn.get();
@@ -691,7 +730,7 @@ void ConnectionPool::SpecificPool::fulfillRequests(stdx::unique_lock<stdx::mutex
// pass it to the user
connPtr->resetToUnknown();
lk.unlock();
- cb(ConnectionHandle(connPtr, ConnectionHandleDeleter(_parent)));
+ promise.emplaceValue(ConnectionHandle(connPtr, ConnectionHandleDeleter(_parent)));
lk.lock();
}
}
@@ -831,16 +870,16 @@ void ConnectionPool::SpecificPool::updateStateInLock() {
// If we were already running and the timer is the same as it was
// before, nothing to do
- if (_state == State::kRunning && _requestTimerExpiration == _requests.top().first)
+ if (_state == State::kRunning && _requestTimerExpiration == _requests.front().first)
return;
_state = State::kRunning;
_requestTimer->cancelTimeout();
- _requestTimerExpiration = _requests.top().first;
+ _requestTimerExpiration = _requests.front().first;
- auto timeout = _requests.top().first - _parent->_factory->now();
+ auto timeout = _requests.front().first - _parent->_factory->now();
// We set a timer for the most recent request, then invoke each timed
// out request we couldn't service
@@ -849,15 +888,16 @@ void ConnectionPool::SpecificPool::updateStateInLock() {
auto now = _parent->_factory->now();
while (_requests.size()) {
- auto& x = _requests.top();
+ auto& x = _requests.front();
if (x.first <= now) {
- auto cb = std::move(x.second);
- _requests.pop();
+ auto promise = std::move(x.second);
+ std::pop_heap(begin(_requests), end(_requests), RequestComparator{});
+ _requests.pop_back();
lk.unlock();
- cb(Status(ErrorCodes::NetworkInterfaceExceededTimeLimit,
- "Couldn't get a connection within the time limit"));
+ promise.setError(Status(ErrorCodes::NetworkInterfaceExceededTimeLimit,
+ "Couldn't get a connection within the time limit"));
lk.lock();
} else {
break;
diff --git a/src/mongo/executor/connection_pool.h b/src/mongo/executor/connection_pool.h
index 970867aa8da..10a711261ef 100644
--- a/src/mongo/executor/connection_pool.h
+++ b/src/mongo/executor/connection_pool.h
@@ -38,6 +38,7 @@
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/unordered_map.h"
#include "mongo/transport/session.h"
+#include "mongo/util/future.h"
#include "mongo/util/net/hostandport.h"
#include "mongo/util/time_support.h"
@@ -144,6 +145,7 @@ public:
const stdx::function<transport::Session::TagMask(transport::Session::TagMask)>&
mutateFunc) override;
+ Future<ConnectionHandle> get(const HostAndPort& hostAndPort, Milliseconds timeout);
void get(const HostAndPort& hostAndPort, Milliseconds timeout, GetConnectionCallback cb);
void appendConnectionStats(ConnectionPoolStats* stats) const;
diff --git a/src/mongo/executor/connection_pool_test.cpp b/src/mongo/executor/connection_pool_test.cpp
index 12caf325206..09af8dab714 100644
--- a/src/mongo/executor/connection_pool_test.cpp
+++ b/src/mongo/executor/connection_pool_test.cpp
@@ -1241,9 +1241,8 @@ TEST_F(ConnectionPoolTest, SetupTimeoutsDontTimeoutUnrelatedRequests) {
ASSERT(!conn1);
// Get conn2 (which should have an extra second before the timeout)
- boost::optional<StatusWith<ConnectionPool::ConnectionHandle>> conn2;
pool.get(HostAndPort(), Seconds(10), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn2 = std::move(swConn);
+ ASSERT_EQ(swConn.getStatus(), ErrorCodes::ShutdownInProgress);
});
PoolImpl::setNow(now + Seconds(2));
@@ -1251,8 +1250,6 @@ TEST_F(ConnectionPoolTest, SetupTimeoutsDontTimeoutUnrelatedRequests) {
ASSERT(conn1);
ASSERT(!conn1->isOK());
ASSERT(conn1->getStatus().code() == ErrorCodes::NetworkInterfaceExceededTimeLimit);
-
- ASSERT(!conn2);
}
/**
@@ -1294,9 +1291,8 @@ TEST_F(ConnectionPoolTest, RefreshTimeoutsDontTimeoutRequests) {
ASSERT(!conn1);
// Get conn2 (which should have an extra second before the timeout)
- boost::optional<StatusWith<ConnectionPool::ConnectionHandle>> conn2;
pool.get(HostAndPort(), Seconds(10), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn2 = std::move(swConn);
+ ASSERT_EQ(swConn.getStatus(), ErrorCodes::ShutdownInProgress);
});
PoolImpl::setNow(now + Seconds(5));
@@ -1304,8 +1300,6 @@ TEST_F(ConnectionPoolTest, RefreshTimeoutsDontTimeoutRequests) {
ASSERT(conn1);
ASSERT(!conn1->isOK());
ASSERT(conn1->getStatus().code() == ErrorCodes::NetworkInterfaceExceededTimeLimit);
-
- ASSERT(!conn2);
}
template <typename T>
diff --git a/src/mongo/executor/connection_pool_tl.cpp b/src/mongo/executor/connection_pool_tl.cpp
new file mode 100644
index 00000000000..3823a9a3e0a
--- /dev/null
+++ b/src/mongo/executor/connection_pool_tl.cpp
@@ -0,0 +1,228 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kASIO
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/executor/connection_pool_tl.h"
+
+#include "mongo/db/auth/internal_user_auth.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+namespace executor {
+namespace connection_pool_tl {
+namespace {
+const auto kMaxTimerDuration = Milliseconds::max();
+
+struct TimeoutHandler {
+ AtomicBool done;
+ Promise<void> promise;
+};
+
+} // namespace
+
+void TLTimer::setTimeout(Milliseconds timeoutVal, TimeoutCallback cb) {
+ _timer->waitFor(timeoutVal).getAsync([cb = std::move(cb)](Status status) {
+ if (status == ErrorCodes::CallbackCanceled) {
+ return;
+ }
+
+ fassert(50475, status);
+
+ cb();
+ });
+}
+
+void TLTimer::cancelTimeout() {
+ _timer->cancel();
+}
+
+void TLConnection::indicateSuccess() {
+ _status = Status::OK();
+ _lastUsed = _reactor->now();
+}
+
+void TLConnection::indicateFailure(Status status) {
+ _status = std::move(status);
+}
+
+const HostAndPort& TLConnection::getHostAndPort() const {
+ return _peer;
+}
+
+bool TLConnection::isHealthy() {
+ return _client->isStillConnected();
+}
+
+AsyncDBClient* TLConnection::client() {
+ return _client.get();
+}
+
+void TLConnection::indicateUsed() {
+ // It is illegal to attempt to use a connection after calling indicateFailure().
+ invariant(_status.isOK() || _status == ConnectionPool::kConnectionStateUnknown);
+ _lastUsed = _reactor->now();
+}
+
+Date_t TLConnection::getLastUsed() const {
+ return _lastUsed;
+}
+
+const Status& TLConnection::getStatus() const {
+ return _status;
+}
+
+void TLConnection::setTimeout(Milliseconds timeout, TimeoutCallback cb) {
+ _timer.setTimeout(timeout, std::move(cb));
+}
+
+void TLConnection::cancelTimeout() {
+ _timer.cancelTimeout();
+}
+
+void TLConnection::setup(Milliseconds timeout, SetupCallback cb) {
+
+ auto handler = std::make_shared<TimeoutHandler>();
+ handler->promise.getFuture().getAsync(
+ [ this, cb = std::move(cb) ](Status status) { cb(this, std::move(status)); });
+
+ log() << "Connecting to " << _peer;
+ setTimeout(timeout, [this, handler, timeout] {
+ if (handler->done.swap(true)) {
+ return;
+ }
+ std::string reason = str::stream() << "Timed out connecting to " << _peer << " after "
+ << timeout;
+ handler->promise.setError(
+ Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, std::move(reason)));
+ });
+
+ AsyncDBClient::connect(_peer, transport::kGlobalSSLMode, _serviceContext, _reactor)
+ .onError(
+ [this](StatusWith<AsyncDBClient::Handle> swc) -> StatusWith<AsyncDBClient::Handle> {
+ return Status(ErrorCodes::HostUnreachable, swc.getStatus().reason());
+ })
+ .then([this](AsyncDBClient::Handle client) {
+ _client = std::move(client);
+ return _client->initWireVersion("NetworkInterfaceTL", _onConnectHook);
+ })
+ .then([this] { return _client->authenticate(getInternalUserAuthParams()); })
+ .then([this] {
+ if (!_onConnectHook) {
+ return Future<void>::makeReady();
+ }
+ auto connectHookRequest = uassertStatusOK(_onConnectHook->makeRequest(_peer));
+ if (!connectHookRequest) {
+ return Future<void>::makeReady();
+ }
+ return _client->runCommandRequest(*connectHookRequest)
+ .then([this](RemoteCommandResponse response) {
+ return _onConnectHook->handleReply(_peer, std::move(response));
+ });
+ })
+ .getAsync([this, handler](Status status) {
+ if (handler->done.swap(true)) {
+ return;
+ }
+
+ cancelTimeout();
+
+ if (status.isOK()) {
+ handler->promise.emplaceValue();
+ } else {
+ log() << "Failed to connect to " << _peer << " - " << redact(status);
+ handler->promise.setError(status);
+ }
+ });
+}
+
+void TLConnection::resetToUnknown() {
+ _status = ConnectionPool::kConnectionStateUnknown;
+}
+
+void TLConnection::refresh(Milliseconds timeout, RefreshCallback cb) {
+ auto handler = std::make_shared<TimeoutHandler>();
+ handler->promise.getFuture().getAsync(
+ [ this, cb = std::move(cb) ](Status status) { cb(this, status); });
+
+ setTimeout(timeout, [this, handler] {
+ if (handler->done.swap(true)) {
+ return;
+ }
+
+ _status = {ErrorCodes::HostUnreachable, "Timed out refreshing host"};
+ _client->cancel();
+
+ handler->promise.setError(_status);
+ });
+
+ _client
+ ->runCommandRequest(
+ {_peer, std::string("admin"), BSON("isMaster" << 1), BSONObj(), nullptr})
+ .then([this](executor::RemoteCommandResponse response) {
+ return Future<void>::makeReady(response.status);
+ })
+ .getAsync([this, handler](Status status) {
+ if (handler->done.swap(true)) {
+ return;
+ }
+
+ cancelTimeout();
+
+ _status = status;
+ if (status.isOK()) {
+ handler->promise.emplaceValue();
+ } else {
+ handler->promise.setError(status);
+ }
+ });
+}
+
+size_t TLConnection::getGeneration() const {
+ return _generation;
+}
+
+std::unique_ptr<ConnectionPool::ConnectionInterface> TLTypeFactory::makeConnection(
+ const HostAndPort& hostAndPort, size_t generation) {
+ return std::make_unique<TLConnection>(
+ _reactor, getGlobalServiceContext(), hostAndPort, generation, _onConnectHook.get());
+}
+
+std::unique_ptr<ConnectionPool::TimerInterface> TLTypeFactory::makeTimer() {
+ return std::make_unique<TLTimer>(_reactor);
+}
+
+Date_t TLTypeFactory::now() {
+ return _reactor->now();
+}
+
+} // namespace connection_pool_tl
+} // namespace executor
+} // namespace
diff --git a/src/mongo/executor/connection_pool_tl.h b/src/mongo/executor/connection_pool_tl.h
new file mode 100644
index 00000000000..e761828fb64
--- /dev/null
+++ b/src/mongo/executor/connection_pool_tl.h
@@ -0,0 +1,121 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "mongo/client/async_client.h"
+#include "mongo/executor/connection_pool.h"
+#include "mongo/executor/network_connection_hook.h"
+#include "mongo/executor/network_interface.h"
+
+namespace mongo {
+namespace executor {
+namespace connection_pool_tl {
+
+class TLTypeFactory final : public ConnectionPool::DependentTypeFactoryInterface {
+public:
+ TLTypeFactory(transport::ReactorHandle reactor,
+ transport::TransportLayer* tl,
+ std::unique_ptr<NetworkConnectionHook> onConnectHook)
+ : _reactor(std::move(reactor)), _tl(tl), _onConnectHook(std::move(onConnectHook)) {}
+
+ std::unique_ptr<ConnectionPool::ConnectionInterface> makeConnection(
+ const HostAndPort& hostAndPort, size_t generation) override;
+ std::unique_ptr<ConnectionPool::TimerInterface> makeTimer() override;
+
+ Date_t now() override;
+
+private:
+ transport::ReactorHandle _reactor;
+ transport::TransportLayer* _tl;
+ std::unique_ptr<NetworkConnectionHook> _onConnectHook;
+};
+
+class TLTimer final : public ConnectionPool::TimerInterface {
+public:
+ explicit TLTimer(const transport::ReactorHandle& reactor)
+ : _reactor(reactor), _timer(_reactor->makeTimer()) {}
+
+ void setTimeout(Milliseconds timeout, TimeoutCallback cb) override;
+ void cancelTimeout() override;
+
+private:
+ transport::ReactorHandle _reactor;
+ std::unique_ptr<transport::ReactorTimer> _timer;
+};
+
+class TLConnection final : public ConnectionPool::ConnectionInterface {
+public:
+ TLConnection(transport::ReactorHandle reactor,
+ ServiceContext* serviceContext,
+ HostAndPort peer,
+ size_t generation,
+ NetworkConnectionHook* onConnectHook)
+ : _reactor(reactor),
+ _serviceContext(serviceContext),
+ _timer(_reactor),
+ _peer(std::move(peer)),
+ _generation(generation),
+ _onConnectHook(onConnectHook) {}
+
+ void indicateSuccess() override;
+ void indicateFailure(Status status) override;
+ const HostAndPort& getHostAndPort() const;
+ bool isHealthy() override;
+ AsyncDBClient* client();
+
+private:
+ void indicateUsed() override;
+ Date_t getLastUsed() const override;
+ const Status& getStatus() const override;
+
+ void setTimeout(Milliseconds timeout, TimeoutCallback cb) override;
+ void cancelTimeout() override;
+ void setup(Milliseconds timeout, SetupCallback cb) override;
+ void resetToUnknown() override;
+ void refresh(Milliseconds timeout, RefreshCallback cb) override;
+
+ size_t getGeneration() const override;
+
+private:
+ transport::ReactorHandle _reactor;
+ ServiceContext* const _serviceContext;
+ TLTimer _timer;
+ HostAndPort _peer;
+ size_t _generation;
+ NetworkConnectionHook* const _onConnectHook;
+ AsyncDBClient::Handle _client;
+ Date_t _lastUsed;
+ Status _status = ConnectionPool::kConnectionStateUnknown;
+};
+
+} // namespace connection_pool_asio
+} // namespace executor
+} // namespace mongo
diff --git a/src/mongo/executor/network_interface.h b/src/mongo/executor/network_interface.h
index a925fd81008..48428b820e3 100644
--- a/src/mongo/executor/network_interface.h
+++ b/src/mongo/executor/network_interface.h
@@ -141,7 +141,8 @@ public:
*
* Returns ErrorCodes::ShutdownInProgress if NetworkInterface::shutdown has already started
* and true otherwise. If it returns Status::OK(), then the action will be executed by
- * NetworkInterface eventually; otherwise, it will not.
+ * NetworkInterface eventually if no error occurs while waiting for the alarm; otherwise,
+ * it will not.
*
* "action" should not do anything that requires a lot of computation, or that might block for a
* long time, as it may execute in a network thread.
diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp
new file mode 100644
index 00000000000..eef2299e623
--- /dev/null
+++ b/src/mongo/executor/network_interface_tl.cpp
@@ -0,0 +1,370 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kASIO
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/executor/network_interface_tl.h"
+
+#include "mongo/db/server_options.h"
+#include "mongo/executor/connection_pool_tl.h"
+#include "mongo/transport/transport_layer_manager.h"
+#include "mongo/util/concurrency/idle_thread_block.h"
+#include "mongo/util/log.h"
+#include "mongo/util/net/sock.h"
+
+namespace mongo {
+namespace executor {
+
+NetworkInterfaceTL::NetworkInterfaceTL(std::string instanceName,
+ ConnectionPool::Options connPoolOpts,
+ ServiceContext* svcCtx,
+ std::unique_ptr<NetworkConnectionHook> onConnectHook,
+ std::unique_ptr<rpc::EgressMetadataHook> metadataHook)
+ : _instanceName(std::move(instanceName)),
+ _svcCtx(svcCtx),
+ _tl(nullptr),
+ _ownedTransportLayer(nullptr),
+ _reactor(nullptr),
+ _connPoolOpts(std::move(connPoolOpts)),
+ _onConnectHook(std::move(onConnectHook)),
+ _metadataHook(std::move(metadataHook)),
+ _inShutdown(false) {}
+
+std::string NetworkInterfaceTL::getDiagnosticString() {
+ return "DEPRECATED: getDiagnosticString is deprecated in NetworkInterfaceTL";
+}
+
+void NetworkInterfaceTL::appendConnectionStats(ConnectionPoolStats* stats) const {
+ _pool->appendConnectionStats(stats);
+}
+
+std::string NetworkInterfaceTL::getHostName() {
+ return getHostNameCached();
+}
+
+void NetworkInterfaceTL::startup() {
+ if (_svcCtx) {
+ _tl = _svcCtx->getTransportLayer();
+ }
+
+ if (!_tl) {
+ warning() << "No TransportLayer configured during NetworkInterface startup";
+ _ownedTransportLayer =
+ transport::TransportLayerManager::makeAndStartDefaultEgressTransportLayer();
+ _tl = _ownedTransportLayer.get();
+ }
+
+ _reactor = _tl->getReactor(transport::TransportLayer::kNewReactor);
+ auto typeFactory = std::make_unique<connection_pool_tl::TLTypeFactory>(
+ _reactor, _tl, std::move(_onConnectHook));
+ _pool = std::make_unique<ConnectionPool>(
+ std::move(typeFactory), "NetworkInterfaceTL", _connPoolOpts);
+ _ioThread = stdx::thread([this] {
+ setThreadName(_instanceName);
+ LOG(2) << "The NetworkInterfaceTL reactor thread is spinning up";
+ _reactor->run();
+ });
+}
+
+void NetworkInterfaceTL::shutdown() {
+ _inShutdown.store(true);
+ _reactor->stop();
+ _ioThread.join();
+ LOG(2) << "NetworkInterfaceTL shutdown successfully";
+}
+
+bool NetworkInterfaceTL::inShutdown() const {
+ return _inShutdown.load();
+}
+
+void NetworkInterfaceTL::waitForWork() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ MONGO_IDLE_THREAD_BLOCK;
+ _workReadyCond.wait(lk, [this] { return _isExecutorRunnable; });
+}
+
+void NetworkInterfaceTL::waitForWorkUntil(Date_t when) {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ MONGO_IDLE_THREAD_BLOCK;
+ _workReadyCond.wait_until(lk, when.toSystemTimePoint(), [this] { return _isExecutorRunnable; });
+}
+
+void NetworkInterfaceTL::signalWorkAvailable() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ if (!_isExecutorRunnable) {
+ _isExecutorRunnable = true;
+ _workReadyCond.notify_one();
+ }
+}
+
+Date_t NetworkInterfaceTL::now() {
+ // TODO This check is because we set up NetworkInterfaces in MONGO_INITIALIZERS and then expect
+ // this method to work before the NI is started.
+ if (!_reactor) {
+ return Date_t::now();
+ }
+ return _reactor->now();
+}
+
+
+Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHandle,
+ RemoteCommandRequest& request,
+ const RemoteCommandCompletionFn& onFinish) {
+ if (inShutdown()) {
+ return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"};
+ }
+
+ LOG(3) << "startCommand: " << redact(request.toString());
+
+ if (_metadataHook) {
+ BSONObjBuilder newMetadata(std::move(request.metadata));
+
+ auto status = _metadataHook->writeRequestMetadata(request.opCtx, &newMetadata);
+ if (!status.isOK()) {
+ return status;
+ }
+
+ request.metadata = newMetadata.obj();
+ }
+
+ auto state = std::make_shared<CommandState>(request, cbHandle);
+ state->mergedFuture = state->promise.getFuture();
+ {
+ stdx::lock_guard<stdx::mutex> lk(_inProgressMutex);
+ _inProgress.insert({state->cbHandle, state});
+ }
+
+ state->start = now();
+ if (state->request.timeout != state->request.kNoTimeout) {
+ state->deadline = state->start + state->request.timeout;
+ }
+
+ _pool->get(request.target, request.timeout)
+ .tapError([state](Status error) {
+ LOG(2) << "Failed to get connection from pool for request " << state->request.id << ": "
+ << error;
+ })
+ .then([this, state](ConnectionPool::ConnectionHandle conn) mutable {
+ return _onAcquireConn(state, std::move(conn));
+ })
+ .onError([](Status error) -> StatusWith<RemoteCommandResponse> {
+ // The TransportLayer has, for historical reasons returned SocketException for
+ // network errors, but sharding assumes HostUnreachable on network errors.
+ if (error == ErrorCodes::SocketException) {
+ error = Status(ErrorCodes::HostUnreachable, error.reason());
+ }
+ return error;
+ })
+ .getAsync([this, state, onFinish](StatusWith<RemoteCommandResponse> response) {
+ auto duration = now() - state->start;
+ if (!response.isOK()) {
+ onFinish(RemoteCommandResponse(response.getStatus(), duration));
+ } else {
+ auto rs = std::move(response.getValue());
+ LOG(2) << "Request " << state->request.id << " finished with response: "
+ << redact(rs.isOK() ? rs.data.toString() : rs.status.toString());
+ onFinish(rs);
+ }
+ });
+ return Status::OK();
+}
+
+// This is only called from within a then() callback on a future, so throwing is equivalent to
+// returning a ready Future with a not-OK status.
+Future<RemoteCommandResponse> NetworkInterfaceTL::_onAcquireConn(
+ std::shared_ptr<CommandState> state, ConnectionPool::ConnectionHandle conn) {
+ if (state->done.load()) {
+ conn->indicateSuccess();
+ uasserted(ErrorCodes::CallbackCanceled, "Command was canceled");
+ }
+
+ state->conn = std::move(conn);
+ auto tlconn = checked_cast<connection_pool_tl::TLConnection*>(state->conn.get());
+ auto client = tlconn->client();
+
+ if (state->deadline != RemoteCommandRequest::kNoExpirationDate) {
+ auto nowVal = now();
+ if (nowVal >= state->deadline) {
+ auto connDuration = nowVal - state->start;
+ uasserted(ErrorCodes::NetworkInterfaceExceededTimeLimit,
+ str::stream() << "Remote command timed out while waiting to get a "
+ "connection from the pool, took "
+ << connDuration
+ << ", timeout was set to "
+ << state->request.timeout);
+ }
+
+ state->timer = _reactor->makeTimer();
+ state->timer->waitUntil(state->deadline).getAsync([client, state](Status status) {
+ if (status == ErrorCodes::CallbackCanceled) {
+ invariant(state->done.load());
+ return;
+ }
+
+ if (state->done.swap(true)) {
+ return;
+ }
+
+ LOG(2) << "Request " << state->request.id << " timed out"
+ << ", deadline was " << state->deadline << ", op was "
+ << redact(state->request.toString());
+ state->promise.setError(
+ Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, "timed out"));
+
+ client->cancel();
+ });
+ }
+
+ client->runCommandRequest(state->request)
+ .then([this, state](RemoteCommandResponse response) {
+ if (state->done.load()) {
+ uasserted(ErrorCodes::CallbackCanceled, "Callback was canceled");
+ }
+
+ // TODO Investigate whether this is necessary here.
+ return _reactor->execute([ this, state, response = std::move(response) ]() mutable {
+ if (_metadataHook && response.status.isOK()) {
+ auto target = state->conn->getHostAndPort().toString();
+ response.status = _metadataHook->readReplyMetadata(
+ nullptr, std::move(target), response.metadata);
+ }
+
+ return std::move(response);
+ });
+ })
+ .getAsync([this, state](StatusWith<RemoteCommandResponse> swr) {
+ _eraseInUseConn(state->cbHandle);
+ if (!swr.isOK()) {
+ state->conn->indicateFailure(swr.getStatus());
+ } else if (!swr.getValue().isOK()) {
+ state->conn->indicateFailure(swr.getValue().status);
+ } else {
+ state->conn->indicateSuccess();
+ }
+
+ if (state->done.swap(true))
+ return;
+
+ if (state->timer) {
+ state->timer->cancel();
+ }
+
+ state->promise.setWith([&] { return std::move(swr); });
+ });
+
+ return std::move(state->mergedFuture);
+}
+
+void NetworkInterfaceTL::_eraseInUseConn(const TaskExecutor::CallbackHandle& cbHandle) {
+ stdx::lock_guard<stdx::mutex> lk(_inProgressMutex);
+ _inProgress.erase(cbHandle);
+}
+
+void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) {
+ stdx::unique_lock<stdx::mutex> lk(_inProgressMutex);
+ auto it = _inProgress.find(cbHandle);
+ if (it == _inProgress.end()) {
+ return;
+ }
+ auto state = it->second;
+ _inProgress.erase(it);
+ lk.unlock();
+
+ if (state->done.swap(true)) {
+ return;
+ }
+
+ LOG(2) << "Canceling operation; original request was: " << redact(state->request.toString());
+ state->promise.setError({ErrorCodes::CallbackCanceled,
+ str::stream() << "Command canceled; original request was: "
+ << redact(state->request.toString())});
+ if (state->conn) {
+ auto client = checked_cast<connection_pool_tl::TLConnection*>(state->conn.get());
+ client->client()->cancel();
+ }
+}
+
+Status NetworkInterfaceTL::setAlarm(Date_t when, const stdx::function<void()>& action) {
+ if (inShutdown()) {
+ return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"};
+ }
+
+ if (when <= now()) {
+ _reactor->schedule(transport::Reactor::kPost, std::move(action));
+ return Status::OK();
+ }
+
+ std::shared_ptr<transport::ReactorTimer> alarmTimer = _reactor->makeTimer();
+ std::weak_ptr<transport::ReactorTimer> weakTimer = alarmTimer;
+ {
+ // We do this so that the lifetime of the alarmTimers is the lifetime of the NITL.
+ stdx::lock_guard<stdx::mutex> lk(_inProgressMutex);
+ _inProgressAlarms.insert(alarmTimer);
+ }
+
+ alarmTimer->waitUntil(when).getAsync([this, weakTimer, action, when](Status status) {
+ auto alarmTimer = weakTimer.lock();
+ if (!alarmTimer) {
+ return;
+ } else {
+ stdx::lock_guard<stdx::mutex> lk(_inProgressMutex);
+ _inProgressAlarms.erase(alarmTimer);
+ }
+
+ auto nowVal = now();
+ if (nowVal < when) {
+ warning() << "Alarm returned early. Expected at: " << when << ", fired at: " << nowVal;
+ const auto status = setAlarm(when, std::move(action));
+ if ((!status.isOK()) && (status != ErrorCodes::ShutdownInProgress)) {
+ fassertFailedWithStatus(50785, status);
+ }
+
+ return;
+ }
+
+ if (status.isOK()) {
+ _reactor->schedule(transport::Reactor::kPost, std::move(action));
+ } else if (status != ErrorCodes::CallbackCanceled) {
+ warning() << "setAlarm() received an error: " << status;
+ }
+ });
+ return Status::OK();
+}
+
+bool NetworkInterfaceTL::onNetworkThread() {
+ return _reactor->onReactorThread();
+}
+
+void NetworkInterfaceTL::dropConnections(const HostAndPort& hostAndPort) {
+ _pool->dropConnections(hostAndPort);
+}
+
+} // namespace executor
+} // namespace mongo
diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h
new file mode 100644
index 00000000000..8c4d4697d93
--- /dev/null
+++ b/src/mongo/executor/network_interface_tl.h
@@ -0,0 +1,119 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <deque>
+
+#include "mongo/client/async_client.h"
+#include "mongo/db/service_context.h"
+#include "mongo/executor/connection_pool.h"
+#include "mongo/executor/network_interface.h"
+#include "mongo/rpc/metadata/metadata_hook.h"
+#include "mongo/stdx/thread.h"
+#include "mongo/stdx/unordered_map.h"
+#include "mongo/transport/transport_layer.h"
+
+namespace mongo {
+namespace executor {
+
+class NetworkInterfaceTL : public NetworkInterface {
+public:
+ NetworkInterfaceTL(std::string instanceName,
+ ConnectionPool::Options connPoolOpts,
+ ServiceContext* ctx,
+ std::unique_ptr<NetworkConnectionHook> onConnectHook,
+ std::unique_ptr<rpc::EgressMetadataHook> metadataHook);
+ std::string getDiagnosticString() override;
+ void appendConnectionStats(ConnectionPoolStats* stats) const override;
+ std::string getHostName() override;
+ void startup() override;
+ void shutdown() override;
+ bool inShutdown() const override;
+ void waitForWork() override;
+ void waitForWorkUntil(Date_t when) override;
+ void signalWorkAvailable() override;
+ Date_t now() override;
+ Status startCommand(const TaskExecutor::CallbackHandle& cbHandle,
+ RemoteCommandRequest& request,
+ const RemoteCommandCompletionFn& onFinish) override;
+ void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) override;
+ Status setAlarm(Date_t when, const stdx::function<void()>& action) override;
+
+ bool onNetworkThread() override;
+
+ void dropConnections(const HostAndPort& hostAndPort) override;
+
+private:
+ struct CommandState {
+ CommandState(RemoteCommandRequest request_, TaskExecutor::CallbackHandle cbHandle_)
+ : request(std::move(request_)), cbHandle(std::move(cbHandle_)) {}
+
+ RemoteCommandRequest request;
+ TaskExecutor::CallbackHandle cbHandle;
+ Date_t deadline = RemoteCommandRequest::kNoExpirationDate;
+ Date_t start;
+
+ ConnectionPool::ConnectionHandle conn;
+ std::unique_ptr<transport::ReactorTimer> timer;
+
+ AtomicBool done;
+ Promise<RemoteCommandResponse> promise;
+ Future<RemoteCommandResponse> mergedFuture;
+ };
+
+ void _eraseInUseConn(const TaskExecutor::CallbackHandle& handle);
+ Future<RemoteCommandResponse> _onAcquireConn(std::shared_ptr<CommandState> state,
+ ConnectionPool::ConnectionHandle conn);
+
+ std::string _instanceName;
+ ServiceContext* _svcCtx;
+ transport::TransportLayer* _tl;
+ // Will be created if ServiceContext is null, or if no TransportLayer was configured at startup
+ std::unique_ptr<transport::TransportLayer> _ownedTransportLayer;
+ transport::ReactorHandle _reactor;
+
+ ConnectionPool::Options _connPoolOpts;
+ std::unique_ptr<NetworkConnectionHook> _onConnectHook;
+ std::unique_ptr<ConnectionPool> _pool;
+
+ std::unique_ptr<rpc::EgressMetadataHook> _metadataHook;
+ AtomicBool _inShutdown;
+ stdx::thread _ioThread;
+
+ stdx::mutex _inProgressMutex;
+ stdx::unordered_map<TaskExecutor::CallbackHandle, std::shared_ptr<CommandState>> _inProgress;
+ stdx::unordered_set<std::shared_ptr<transport::ReactorTimer>> _inProgressAlarms;
+
+ stdx::mutex _mutex;
+ stdx::condition_variable _workReadyCond;
+ bool _isExecutorRunnable = false;
+};
+
+} // namespace executor
+} // namespace mongo
diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h
index 8ef9e732233..8d7c9cf72b7 100644
--- a/src/mongo/executor/task_executor.h
+++ b/src/mongo/executor/task_executor.h
@@ -41,6 +41,7 @@
#include "mongo/platform/hash_namespace.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/functional.h"
+#include "mongo/util/future.h"
#include "mongo/util/time_support.h"
namespace mongo {
diff --git a/src/mongo/s/sharding_router_test_fixture.cpp b/src/mongo/s/sharding_router_test_fixture.cpp
index 03d5fb6d01b..69cbd4b4ac1 100644
--- a/src/mongo/s/sharding_router_test_fixture.cpp
+++ b/src/mongo/s/sharding_router_test_fixture.cpp
@@ -106,15 +106,8 @@ void ShardingTestFixture::setUp() {
service->setPreciseClockSource(stdx::make_unique<ClockSourceMock>());
service->setTickSource(stdx::make_unique<TickSourceMock>());
- {
- auto tlMock = stdx::make_unique<transport::TransportLayerMock>();
- _transportLayer = tlMock.get();
- ASSERT_OK(_transportLayer->start());
- service->setTransportLayer(std::move(tlMock));
- }
-
CollatorFactoryInterface::set(service, stdx::make_unique<CollatorFactoryMock>());
- _transportSession = transport::MockSession::create(_transportLayer);
+ _transportSession = transport::MockSession::create(nullptr);
_client = service->makeClient("ShardingTestFixture", _transportSession);
_opCtx = _client->makeOperationContext();
@@ -544,7 +537,7 @@ void ShardingTestFixture::expectFindSendBSONObjVector(const HostAndPort& configH
}
void ShardingTestFixture::setRemote(const HostAndPort& remote) {
- _transportSession = transport::MockSession::create(remote, HostAndPort{}, _transportLayer);
+ _transportSession = transport::MockSession::create(remote, HostAndPort{}, nullptr);
}
void ShardingTestFixture::checkReadConcern(const BSONObj& cmdObj,
diff --git a/src/mongo/s/sharding_router_test_fixture.h b/src/mongo/s/sharding_router_test_fixture.h
index 574f484faac..05c1fa37a88 100644
--- a/src/mongo/s/sharding_router_test_fixture.h
+++ b/src/mongo/s/sharding_router_test_fixture.h
@@ -188,7 +188,6 @@ public:
private:
ServiceContext::UniqueClient _client;
ServiceContext::UniqueOperationContext _opCtx;
- transport::TransportLayerMock* _transportLayer;
transport::SessionHandle _transportSession;
RemoteCommandTargeterFactoryMock* _targeterFactory;
diff --git a/src/mongo/transport/mock_session.h b/src/mongo/transport/mock_session.h
index 19379babb20..60e6a4e33e2 100644
--- a/src/mongo/transport/mock_session.h
+++ b/src/mongo/transport/mock_session.h
@@ -66,13 +66,13 @@ public:
}
void end() override {
- if (!_tl->owns(id()))
+ if (!_tl || !_tl->owns(id()))
return;
_tl->_sessions[id()].ended = true;
}
StatusWith<Message> sourceMessage() override {
- if (_tl->inShutdown()) {
+ if (!_tl || _tl->inShutdown()) {
return TransportLayer::ShutdownStatus;
} else if (!_tl->owns(id())) {
return TransportLayer::SessionUnknownStatus;
@@ -88,7 +88,7 @@ public:
}
Status sinkMessage(Message message) override {
- if (_tl->inShutdown()) {
+ if (!_tl || _tl->inShutdown()) {
return TransportLayer::ShutdownStatus;
} else if (!_tl->owns(id())) {
return TransportLayer::SessionUnknownStatus;
diff --git a/src/mongo/transport/transport_layer.h b/src/mongo/transport/transport_layer.h
index 1b8f502237f..492fbf82ea0 100644
--- a/src/mongo/transport/transport_layer.h
+++ b/src/mongo/transport/transport_layer.h
@@ -113,19 +113,24 @@ public:
ReactorTimer(const ReactorTimer&) = delete;
ReactorTimer& operator=(const ReactorTimer&) = delete;
+ /*
+ * The destructor calls cancel() to ensure outstanding Futures are filled.
+ */
virtual ~ReactorTimer() = default;
/*
- * Cancel any outstanding calls to waitFor/waitUntil. The future will have
- * an ErrorCodes::CallbackCancelled status.
+ * Cancel any outstanding future from waitFor/waitUntil. The future will be filled with an
+ * ErrorCodes::CallbackCancelled status.
+ *
+ * If no future is outstanding, then this is a noop.
*/
virtual void cancel() = 0;
/*
- * Returns a future that will be filled with Status::OK after the timeout has
- * ellapsed or has been cancelled.
+ * Returns a future that will be filled with Status::OK after the timeout has ellapsed.
+ *
+ * Calling this implicitly calls cancel().
*/
-
virtual Future<void> waitFor(Milliseconds timeout) = 0;
virtual Future<void> waitUntil(Date_t timeout) = 0;
};
@@ -154,8 +159,9 @@ public:
Promise<FutureContinuationResult<Callback>> promise;
auto future = promise.getFuture();
- schedule(kPost,
- [ cb = std::forward<Callback>(cb), sp = promise.share() ] { sp.setWith(cb); });
+ schedule(kPost, [ cb = std::forward<Callback>(cb), sp = promise.share() ]() mutable {
+ sp.setWith(cb);
+ });
return future;
}
diff --git a/src/mongo/transport/transport_layer_asio.cpp b/src/mongo/transport/transport_layer_asio.cpp
index 772a4eb189f..2c4e1e18fe4 100644
--- a/src/mongo/transport/transport_layer_asio.cpp
+++ b/src/mongo/transport/transport_layer_asio.cpp
@@ -64,7 +64,7 @@ namespace transport {
class ASIOReactorTimer final : public ReactorTimer {
public:
explicit ASIOReactorTimer(asio::io_context& ctx)
- : _timer(std::make_shared<asio::system_timer>(ctx)) {}
+ : _timerState(std::make_shared<TimerState>(ctx)) {}
~ASIOReactorTimer() {
// The underlying timer won't get destroyed until the last promise from _asyncWait
@@ -73,42 +73,82 @@ public:
}
void cancel() override {
- _timer->cancel();
+ auto promise = [&] {
+ stdx::lock_guard<stdx::mutex> lk(_timerState->mutex);
+ _timerState->generation++;
+ return std::move(_timerState->finalPromise);
+ }();
+
+ if (promise) {
+ // We're worried that setting the error on the promise without unwinding the stack
+ // can lead to a deadlock, so this gets scheduled on the io_context of the timer.
+ _timerState->timer.get_io_context().post([promise = promise->share()]() mutable {
+ promise.setError({ErrorCodes::CallbackCanceled, "Timer was canceled"});
+ });
+ }
+ _timerState->timer.cancel();
}
Future<void> waitFor(Milliseconds timeout) override {
- return _asyncWait([&] { _timer->expires_after(timeout.toSystemDuration()); });
+ return _asyncWait([&] { _timerState->timer.expires_after(timeout.toSystemDuration()); });
}
Future<void> waitUntil(Date_t expiration) override {
- return _asyncWait([&] { _timer->expires_at(expiration.toSystemTimePoint()); });
+ return _asyncWait([&] { _timerState->timer.expires_at(expiration.toSystemTimePoint()); });
}
private:
- template <typename Callback>
- Future<void> _asyncWait(Callback&& cb) {
+ template <typename ArmTimerCb>
+ Future<void> _asyncWait(ArmTimerCb&& armTimer) {
try {
- cb();
- Promise<void> promise;
- auto ret = promise.getFuture();
- _timer->async_wait(
- [ promise = promise.share(), timer = _timer ](const std::error_code& ec) mutable {
+ cancel();
+
+ Future<void> ret;
+ uint64_t id;
+ std::tie(ret, id) = [&] {
+ stdx::lock_guard<stdx::mutex> lk(_timerState->mutex);
+ auto id = ++_timerState->generation;
+ invariant(!_timerState->finalPromise);
+ _timerState->finalPromise = std::make_unique<Promise<void>>();
+ auto future = _timerState->finalPromise->getFuture();
+ return std::make_pair(std::move(future), id);
+ }();
+
+ armTimer();
+ _timerState->timer.async_wait(
+ [ id, state = _timerState ](const std::error_code& ec) mutable {
+ stdx::unique_lock<stdx::mutex> lk(state->mutex);
+ if (id != state->generation) {
+ return;
+ }
+ auto promise = std::move(state->finalPromise);
+ lk.unlock();
+
if (ec) {
- promise.setError(errorCodeToStatus(ec));
+ promise->setError(errorCodeToStatus(ec));
} else {
- promise.emplaceValue();
+ promise->emplaceValue();
}
});
+
return ret;
} catch (asio::system_error& ex) {
return Future<void>::makeReady(errorCodeToStatus(ex.code()));
}
}
- // Destroying an asio::system_timer that has outstanding callbacks from async_wait will cause
- // a broken promise - so this is managed by a shared ptr, so that each callback can extend the
- // lifetime of the timer to after its been called.
- std::shared_ptr<asio::system_timer> _timer;
+ // The timer itself and its state are stored in this struct managed by a shared_ptr so we can
+ // extend the lifetime of the timer until all callbacks to timer.async_wait have run.
+ struct TimerState {
+ explicit TimerState(asio::io_context& ctx) : timer(ctx) {}
+
+ asio::system_timer timer;
+ stdx::mutex mutex;
+ uint64_t generation = 0;
+ std::unique_ptr<Promise<void>> finalPromise;
+ };
+
+ std::shared_ptr<TimerState> _timerState;
};
class TransportLayerASIO::ASIOReactor final : public Reactor {