diff options
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/client/SConscript | 20 | ||||
-rw-r--r-- | src/mongo/client/async_client.cpp | 264 | ||||
-rw-r--r-- | src/mongo/client/async_client.h | 90 | ||||
-rw-r--r-- | src/mongo/executor/SConscript | 19 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool.cpp | 104 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool.h | 2 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool_test.cpp | 10 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool_tl.cpp | 228 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool_tl.h | 121 | ||||
-rw-r--r-- | src/mongo/executor/network_interface.h | 3 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_tl.cpp | 370 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_tl.h | 119 | ||||
-rw-r--r-- | src/mongo/executor/task_executor.h | 1 | ||||
-rw-r--r-- | src/mongo/s/sharding_router_test_fixture.cpp | 11 | ||||
-rw-r--r-- | src/mongo/s/sharding_router_test_fixture.h | 1 | ||||
-rw-r--r-- | src/mongo/transport/mock_session.h | 6 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer.h | 20 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_asio.cpp | 74 |
18 files changed, 1385 insertions, 78 deletions
diff --git a/src/mongo/client/SConscript b/src/mongo/client/SConscript index 420260ad62c..2e063b97746 100644 --- a/src/mongo/client/SConscript +++ b/src/mongo/client/SConscript @@ -209,6 +209,26 @@ env.CppIntegrationTest( ) env.Library( + target='async_client', + source=[ + 'async_client.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/db/auth/authcommon', + '$BUILD_DIR/mongo/db/wire_version', + '$BUILD_DIR/mongo/rpc/command_status', + '$BUILD_DIR/mongo/rpc/rpc', + '$BUILD_DIR/mongo/transport/transport_layer_common', + 'authentication', + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/commands/test_commands_enabled', + '$BUILD_DIR/mongo/executor/egress_tag_closer_manager', + '$BUILD_DIR/mongo/transport/message_compressor', + ], +) + +env.Library( target='connection_pool', source=[ 'connection_pool.cpp', diff --git a/src/mongo/client/async_client.cpp b/src/mongo/client/async_client.cpp new file mode 100644 index 00000000000..c1499fe394d --- /dev/null +++ b/src/mongo/client/async_client.cpp @@ -0,0 +1,264 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork + +#include "mongo/platform/basic.h" + +#include "mongo/client/async_client.h" + +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/client/authenticate.h" +#include "mongo/config.h" +#include "mongo/db/auth/authorization_manager_global.h" +#include "mongo/db/auth/internal_user_auth.h" +#include "mongo/db/commands/test_commands_enabled.h" +#include "mongo/db/wire_version.h" +#include "mongo/executor/egress_tag_closer_manager.h" +#include "mongo/rpc/factory.h" +#include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/rpc/legacy_request_builder.h" +#include "mongo/rpc/metadata/client_metadata.h" +#include "mongo/rpc/reply_interface.h" +#include "mongo/stdx/memory.h" +#include "mongo/util/log.h" +#include "mongo/util/net/sock.h" +#include "mongo/util/net/ssl_manager.h" +#include "mongo/util/version.h" + +namespace mongo { + +Future<AsyncDBClient::Handle> AsyncDBClient::connect(const HostAndPort& peer, + transport::ConnectSSLMode sslMode, + ServiceContext* const context, + transport::ReactorHandle reactor) { + auto tl = context->getTransportLayer(); + return tl->asyncConnect(peer, sslMode, std::move(reactor)) + .then([peer, context](transport::SessionHandle session) { + return std::make_shared<AsyncDBClient>(peer, std::move(session), context); + }); +} + +BSONObj AsyncDBClient::_buildIsMasterRequest(const std::string& appName) { + BSONObjBuilder bob; + + bob.append("isMaster", 1); + bob.append("hangUpOnStepDown", false); + const auto versionString = VersionInfoInterface::instance().version(); + ClientMetadata::serialize(appName, versionString, &bob); + + if (getTestCommandsEnabled()) { + // Only include the host:port of this process in the isMaster command request if test + // commands are enabled. mongobridge uses this field to identify the process opening a + // connection to it. + StringBuilder sb; + sb << getHostNameCached() << ':' << serverGlobalParams.port; + bob.append("hostInfo", sb.str()); + } + + _compressorManager.clientBegin(&bob); + + if (WireSpec::instance().isInternalClient) { + WireSpec::appendInternalClientWireVersion(WireSpec::instance().outgoing, &bob); + } + + return bob.obj(); +} + +void AsyncDBClient::_parseIsMasterResponse(BSONObj request, + const std::unique_ptr<rpc::ReplyInterface>& response) { + uassert(50786, + "Expected opQuery response to isMaster", + response->getProtocol() == rpc::Protocol::kOpQuery); + auto responseBody = response->getCommandReply(); + uassertStatusOK(getStatusFromCommandResult(responseBody)); + + auto protocolSet = uassertStatusOK(rpc::parseProtocolSetFromIsMasterReply(responseBody)); + auto validateStatus = + rpc::validateWireVersion(WireSpec::instance().outgoing, protocolSet.version); + if (!validateStatus.isOK()) { + warning() << "remote host has incompatible wire version: " << validateStatus; + uasserted(validateStatus.code(), + str::stream() << "remote host has incompatible wire version: " + << validateStatus.reason()); + } + + auto& egressTagManager = executor::EgressTagCloserManager::get(_svcCtx); + // Tag outgoing connection so it can be kept open on FCV upgrade if it is not to a + // server with a lower binary version. + if (protocolSet.version.maxWireVersion >= WireSpec::instance().outgoing.maxWireVersion) { + egressTagManager.mutateTags( + _peer, [](transport::Session::TagMask tags) { return transport::Session::kKeepOpen; }); + } + + auto clientProtocols = rpc::computeProtocolSet(WireSpec::instance().outgoing); + invariant(clientProtocols != rpc::supports::kNone); + // Set the operation protocol + _negotiatedProtocol = uassertStatusOK(rpc::negotiate(protocolSet.protocolSet, clientProtocols)); + + _compressorManager.clientFinish(responseBody); +} + +Future<void> AsyncDBClient::authenticate(const BSONObj& params) { + // This check is sufficient to see if auth is enabled on the system, + // and avoids creating dependencies on deeper, less accessible auth code. + if (!isInternalAuthSet()) { + return Future<void>::makeReady(); + } + + // We will only have a valid clientName if SSL is enabled. + std::string clientName; +#ifdef MONGO_CONFIG_SSL + if (getSSLManager()) { + clientName = getSSLManager()->getSSLConfiguration().clientSubjectName; + } +#endif + + Promise<void> retPromise; + auto ret = retPromise.getFuture(); + + auto authCompleteCb = [promise = retPromise.share()](auth::AuthResponse response) mutable { + if (response.isOK()) { + promise.emplaceValue(); + } else { + promise.setError(response.status); + } + }; + + auto doAuthCb = [this](executor::RemoteCommandRequest request, + auth::AuthCompletionHandler handler) { + + runCommandRequest(request).getAsync([handler = std::move(handler)]( + StatusWith<executor::RemoteCommandResponse> response) { + if (!response.isOK()) { + handler(executor::RemoteCommandResponse(response.getStatus())); + } else { + handler(std::move(response.getValue())); + } + }); + }; + + auth::authenticateClient( + params, remote(), clientName, std::move(doAuthCb), std::move(authCompleteCb)); + + return ret; +} + +Future<void> AsyncDBClient::initWireVersion(const std::string& appName, + executor::NetworkConnectionHook* const hook) { + auto requestObj = _buildIsMasterRequest(appName); + // We use a legacy request to create our ismaster request because we may + // have to communicate with servers that do not support other protocols. + auto requestMsg = + rpc::legacyRequestFromOpMsgRequest(OpMsgRequest::fromDBAndBody("admin", requestObj)); + auto clkSource = _svcCtx->getFastClockSource(); + auto start = clkSource->now(); + + return _call(requestMsg).then([this, requestObj, hook, clkSource, start](Message response) { + auto cmdReply = rpc::makeReply(&response); + if (hook) { + auto millis = duration_cast<Milliseconds>(clkSource->now() - start); + executor::RemoteCommandResponse cmdResp(*cmdReply, millis); + uassertStatusOK(hook->validateHost(_peer, std::move(cmdResp))); + } + _parseIsMasterResponse(requestObj, cmdReply); + }); +} + +Future<Message> AsyncDBClient::_call(Message request) { + auto swm = _compressorManager.compressMessage(request); + if (!swm.isOK()) { + return swm.getStatus(); + } + + request = std::move(swm.getValue()); + auto msgId = nextMessageId(); + request.header().setId(msgId); + request.header().setResponseToMsgId(0); + + return _session->asyncSinkMessage(request) + .then([this] { return _session->asyncSourceMessage(); }) + .then([this, msgId](Message response) -> StatusWith<Message> { + uassert(50787, + "ResponseId did not match sent message ID.", + response.header().getResponseToMsgId() == msgId); + + if (response.operation() == dbCompressed) { + return _compressorManager.decompressMessage(response); + } else { + return response; + } + }); +} + +Future<rpc::UniqueReply> AsyncDBClient::runCommand(OpMsgRequest request) { + invariant(_negotiatedProtocol); + auto requestMsg = rpc::messageFromOpMsgRequest(*_negotiatedProtocol, std::move(request)); + return _call(std::move(requestMsg)).then([this](Message response) -> Future<rpc::UniqueReply> { + return rpc::UniqueReply(response, rpc::makeReply(&response)); + }); +} + +Future<executor::RemoteCommandResponse> AsyncDBClient::runCommandRequest( + executor::RemoteCommandRequest request) { + auto clkSource = _svcCtx->getPreciseClockSource(); + auto start = clkSource->now(); + auto opMsgRequest = OpMsgRequest::fromDBAndBody( + std::move(request.dbname), std::move(request.cmdObj), std::move(request.metadata)); + return runCommand(std::move(opMsgRequest)) + .then([start, clkSource, this](rpc::UniqueReply response) { + auto duration = duration_cast<Milliseconds>(clkSource->now() - start); + return executor::RemoteCommandResponse(*response, duration); + }) + .onError([start, clkSource](Status status) { + auto duration = duration_cast<Milliseconds>(clkSource->now() - start); + return executor::RemoteCommandResponse(status, duration); + }); +} + +void AsyncDBClient::cancel() { + _session->cancelAsyncOperations(); +} + +bool AsyncDBClient::isStillConnected() { + return _session->isConnected(); +} + +void AsyncDBClient::end() { + _session->end(); +} + +const HostAndPort& AsyncDBClient::remote() const { + return _peer; +} + +const HostAndPort& AsyncDBClient::local() const { + return _session->local(); +} + +} // namespace mongo diff --git a/src/mongo/client/async_client.h b/src/mongo/client/async_client.h new file mode 100644 index 00000000000..ee7239a1d03 --- /dev/null +++ b/src/mongo/client/async_client.h @@ -0,0 +1,90 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <memory> + +#include "mongo/db/service_context.h" +#include "mongo/executor/network_connection_hook.h" +#include "mongo/executor/remote_command_request.h" +#include "mongo/executor/remote_command_response.h" +#include "mongo/rpc/protocol.h" +#include "mongo/rpc/unique_message.h" +#include "mongo/transport/message_compressor_manager.h" +#include "mongo/transport/transport_layer.h" +#include "mongo/util/future.h" + +namespace mongo { + +class AsyncDBClient : public std::enable_shared_from_this<AsyncDBClient> { +public: + explicit AsyncDBClient(const HostAndPort& peer, + transport::SessionHandle session, + ServiceContext* svcCtx) + : _peer(std::move(peer)), _session(std::move(session)), _svcCtx(svcCtx) {} + + using Handle = std::shared_ptr<AsyncDBClient>; + + static Future<Handle> connect(const HostAndPort& peer, + transport::ConnectSSLMode sslMode, + ServiceContext* const context, + transport::ReactorHandle reactor); + + Future<executor::RemoteCommandResponse> runCommandRequest( + executor::RemoteCommandRequest request); + Future<rpc::UniqueReply> runCommand(OpMsgRequest request); + + Future<void> authenticate(const BSONObj& params); + + Future<void> initWireVersion(const std::string& appName, + executor::NetworkConnectionHook* const hook); + + void cancel(); + + bool isStillConnected(); + + void end(); + + const HostAndPort& remote() const; + const HostAndPort& local() const; + +private: + Future<Message> _call(Message request); + BSONObj _buildIsMasterRequest(const std::string& appName); + void _parseIsMasterResponse(BSONObj request, + const std::unique_ptr<rpc::ReplyInterface>& response); + + const HostAndPort _peer; + transport::SessionHandle _session; + ServiceContext* const _svcCtx; + MessageCompressorManager _compressorManager; + boost::optional<rpc::Protocol> _negotiatedProtocol; +}; + +} // namespace mongo diff --git a/src/mongo/executor/SConscript b/src/mongo/executor/SConscript index a31aede6c85..6c13b1c460b 100644 --- a/src/mongo/executor/SConscript +++ b/src/mongo/executor/SConscript @@ -157,6 +157,24 @@ env.Library(target='egress_tag_closer_manager', ]) env.Library( + target='network_interface_tl', + source=[ + 'connection_pool_tl.cpp', + 'network_interface_tl.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/client/async_client', + '$BUILD_DIR/mongo/transport/transport_layer', + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/auth/authcommon', + '$BUILD_DIR/mongo/transport/transport_layer_manager', + 'connection_pool_executor', + 'network_interface', + ] +) + +env.Library( target='network_interface_asio', source=[ 'connection_pool_asio.cpp', @@ -284,6 +302,7 @@ env.Library( LIBDEPS=[ 'network_interface', 'network_interface_asio', + 'network_interface_tl', ], LIBDEPS_PRIVATE=[ 'egress_tag_closer_manager', diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp index c00fe1c739b..7092c460ed3 100644 --- a/src/mongo/executor/connection_pool.cpp +++ b/src/mongo/executor/connection_pool.cpp @@ -77,13 +77,13 @@ public: * pool.runWithActiveClient([](stdx::unique_lock<stdx::mutex> lk){ codeToBeProtected(); }); */ template <typename Callback> - void runWithActiveClient(Callback&& cb) { - runWithActiveClient(stdx::unique_lock<stdx::mutex>(_parent->_mutex), - std::forward<Callback>(cb)); + auto runWithActiveClient(Callback&& cb) { + return runWithActiveClient(stdx::unique_lock<stdx::mutex>(_parent->_mutex), + std::forward<Callback>(cb)); } template <typename Callback> - void runWithActiveClient(stdx::unique_lock<stdx::mutex> lk, Callback&& cb) { + auto runWithActiveClient(stdx::unique_lock<stdx::mutex> lk, Callback&& cb) { invariant(lk.owns_lock()); _activeClients++; @@ -96,7 +96,7 @@ public: { decltype(lk) localLk(std::move(lk)); - cb(std::move(localLk)); + return cb(std::move(localLk)); } } @@ -107,10 +107,9 @@ public: * Gets a connection from the specific pool. Sinks a unique_lock from the * parent to preserve the lock on _mutex */ - void getConnection(const HostAndPort& hostAndPort, - Milliseconds timeout, - stdx::unique_lock<stdx::mutex> lk, - GetConnectionCallback cb); + Future<ConnectionHandle> getConnection(const HostAndPort& hostAndPort, + Milliseconds timeout, + stdx::unique_lock<stdx::mutex> lk); /** * Cascades a failure across existing connections and requests. Invoking @@ -185,7 +184,7 @@ private: using OwnedConnection = std::unique_ptr<ConnectionInterface>; using OwnershipPool = stdx::unordered_map<ConnectionInterface*, OwnedConnection>; using LRUOwnershipPool = LRUCache<OwnershipPool::key_type, OwnershipPool::mapped_type>; - using Request = std::pair<Date_t, GetConnectionCallback>; + using Request = std::pair<Date_t, SharedPromise<ConnectionHandle>>; struct RequestComparator { bool operator()(const Request& a, const Request& b) { return a.first > b.first; @@ -218,7 +217,7 @@ private: OwnershipPool _droppedProcessingPool; OwnershipPool _checkedOutPool; - std::priority_queue<Request, std::vector<Request>, RequestComparator> _requests; + std::vector<Request> _requests; std::unique_ptr<TimerInterface> _requestTimer; Date_t _requestTimerExpiration; @@ -285,6 +284,37 @@ ConnectionPool::~ConnectionPool() { if (hasGlobalServiceContext() && _manager) { _manager->remove(this); } + + std::vector<SpecificPool*> pools; + + // Ensure we decrement active clients for all pools that we inc on (because we intend to process + // failures) + const auto guard = MakeGuard([&] { + stdx::unique_lock<stdx::mutex> lk(_mutex); + + for (const auto& pool : pools) { + pool->decActiveClients(lk); + } + }); + + // Grab all current pools that don't match tags (under the lock) + { + stdx::unique_lock<stdx::mutex> lk(_mutex); + + for (auto& pair : _pools) { + pools.push_back(pair.second.get()); + pair.second->incActiveClients(lk); + } + } + + // Reacquire the lock per pool and process failures. We'll dec active clients when we're all + // through in the guard + for (const auto& pool : pools) { + stdx::unique_lock<stdx::mutex> lk(_mutex); + pool->processFailure( + Status(ErrorCodes::ShutdownInProgress, "Shuting down the connection pool"), + std::move(lk)); + } } void ConnectionPool::dropConnections(const HostAndPort& hostAndPort) { @@ -353,6 +383,11 @@ void ConnectionPool::mutateTags( void ConnectionPool::get(const HostAndPort& hostAndPort, Milliseconds timeout, GetConnectionCallback cb) { + return get(hostAndPort, timeout).getAsync(std::move(cb)); +} + +Future<ConnectionPool::ConnectionHandle> ConnectionPool::get(const HostAndPort& hostAndPort, + Milliseconds timeout) { SpecificPool* pool; stdx::unique_lock<stdx::mutex> lk(_mutex); @@ -369,8 +404,8 @@ void ConnectionPool::get(const HostAndPort& hostAndPort, invariant(pool); - pool->runWithActiveClient(std::move(lk), [&](decltype(lk) lk) { - pool->getConnection(hostAndPort, timeout, std::move(lk), std::move(cb)); + return pool->runWithActiveClient(std::move(lk), [&](decltype(lk) lk) { + return pool->getConnection(hostAndPort, timeout, std::move(lk)); }); } @@ -451,22 +486,26 @@ size_t ConnectionPool::SpecificPool::openConnections(const stdx::unique_lock<std return _checkedOutPool.size() + _readyPool.size() + _processingPool.size(); } -void ConnectionPool::SpecificPool::getConnection(const HostAndPort& hostAndPort, - Milliseconds timeout, - stdx::unique_lock<stdx::mutex> lk, - GetConnectionCallback cb) { +Future<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::getConnection( + const HostAndPort& hostAndPort, Milliseconds timeout, stdx::unique_lock<stdx::mutex> lk) { if (timeout < Milliseconds(0) || timeout > _parent->_options.refreshTimeout) { timeout = _parent->_options.refreshTimeout; } const auto expiration = _parent->_factory->now() + timeout; - _requests.push(make_pair(expiration, std::move(cb))); + Promise<ConnectionHandle> promise; + auto future = promise.getFuture(); + + _requests.push_back(make_pair(expiration, promise.share())); + std::push_heap(begin(_requests), end(_requests), RequestComparator{}); updateStateInLock(); spawnConnections(lk); fulfillRequests(lk); + + return future; } void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr, @@ -634,9 +673,8 @@ void ConnectionPool::SpecificPool::processFailure(const Status& status, // with the same failed status lk.unlock(); - while (requestsToFail.size()) { - requestsToFail.top().second(status); - requestsToFail.pop(); + for (auto& request : requestsToFail) { + request.second.setError(status); } } @@ -678,8 +716,9 @@ void ConnectionPool::SpecificPool::fulfillRequests(stdx::unique_lock<stdx::mutex } // Grab the request and callback - auto cb = std::move(_requests.top().second); - _requests.pop(); + auto promise = std::move(_requests.front().second); + std::pop_heap(begin(_requests), end(_requests), RequestComparator{}); + _requests.pop_back(); auto connPtr = conn.get(); @@ -691,7 +730,7 @@ void ConnectionPool::SpecificPool::fulfillRequests(stdx::unique_lock<stdx::mutex // pass it to the user connPtr->resetToUnknown(); lk.unlock(); - cb(ConnectionHandle(connPtr, ConnectionHandleDeleter(_parent))); + promise.emplaceValue(ConnectionHandle(connPtr, ConnectionHandleDeleter(_parent))); lk.lock(); } } @@ -831,16 +870,16 @@ void ConnectionPool::SpecificPool::updateStateInLock() { // If we were already running and the timer is the same as it was // before, nothing to do - if (_state == State::kRunning && _requestTimerExpiration == _requests.top().first) + if (_state == State::kRunning && _requestTimerExpiration == _requests.front().first) return; _state = State::kRunning; _requestTimer->cancelTimeout(); - _requestTimerExpiration = _requests.top().first; + _requestTimerExpiration = _requests.front().first; - auto timeout = _requests.top().first - _parent->_factory->now(); + auto timeout = _requests.front().first - _parent->_factory->now(); // We set a timer for the most recent request, then invoke each timed // out request we couldn't service @@ -849,15 +888,16 @@ void ConnectionPool::SpecificPool::updateStateInLock() { auto now = _parent->_factory->now(); while (_requests.size()) { - auto& x = _requests.top(); + auto& x = _requests.front(); if (x.first <= now) { - auto cb = std::move(x.second); - _requests.pop(); + auto promise = std::move(x.second); + std::pop_heap(begin(_requests), end(_requests), RequestComparator{}); + _requests.pop_back(); lk.unlock(); - cb(Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, - "Couldn't get a connection within the time limit")); + promise.setError(Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, + "Couldn't get a connection within the time limit")); lk.lock(); } else { break; diff --git a/src/mongo/executor/connection_pool.h b/src/mongo/executor/connection_pool.h index 970867aa8da..10a711261ef 100644 --- a/src/mongo/executor/connection_pool.h +++ b/src/mongo/executor/connection_pool.h @@ -38,6 +38,7 @@ #include "mongo/stdx/mutex.h" #include "mongo/stdx/unordered_map.h" #include "mongo/transport/session.h" +#include "mongo/util/future.h" #include "mongo/util/net/hostandport.h" #include "mongo/util/time_support.h" @@ -144,6 +145,7 @@ public: const stdx::function<transport::Session::TagMask(transport::Session::TagMask)>& mutateFunc) override; + Future<ConnectionHandle> get(const HostAndPort& hostAndPort, Milliseconds timeout); void get(const HostAndPort& hostAndPort, Milliseconds timeout, GetConnectionCallback cb); void appendConnectionStats(ConnectionPoolStats* stats) const; diff --git a/src/mongo/executor/connection_pool_test.cpp b/src/mongo/executor/connection_pool_test.cpp index 12caf325206..09af8dab714 100644 --- a/src/mongo/executor/connection_pool_test.cpp +++ b/src/mongo/executor/connection_pool_test.cpp @@ -1241,9 +1241,8 @@ TEST_F(ConnectionPoolTest, SetupTimeoutsDontTimeoutUnrelatedRequests) { ASSERT(!conn1); // Get conn2 (which should have an extra second before the timeout) - boost::optional<StatusWith<ConnectionPool::ConnectionHandle>> conn2; pool.get(HostAndPort(), Seconds(10), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn2 = std::move(swConn); + ASSERT_EQ(swConn.getStatus(), ErrorCodes::ShutdownInProgress); }); PoolImpl::setNow(now + Seconds(2)); @@ -1251,8 +1250,6 @@ TEST_F(ConnectionPoolTest, SetupTimeoutsDontTimeoutUnrelatedRequests) { ASSERT(conn1); ASSERT(!conn1->isOK()); ASSERT(conn1->getStatus().code() == ErrorCodes::NetworkInterfaceExceededTimeLimit); - - ASSERT(!conn2); } /** @@ -1294,9 +1291,8 @@ TEST_F(ConnectionPoolTest, RefreshTimeoutsDontTimeoutRequests) { ASSERT(!conn1); // Get conn2 (which should have an extra second before the timeout) - boost::optional<StatusWith<ConnectionPool::ConnectionHandle>> conn2; pool.get(HostAndPort(), Seconds(10), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn2 = std::move(swConn); + ASSERT_EQ(swConn.getStatus(), ErrorCodes::ShutdownInProgress); }); PoolImpl::setNow(now + Seconds(5)); @@ -1304,8 +1300,6 @@ TEST_F(ConnectionPoolTest, RefreshTimeoutsDontTimeoutRequests) { ASSERT(conn1); ASSERT(!conn1->isOK()); ASSERT(conn1->getStatus().code() == ErrorCodes::NetworkInterfaceExceededTimeLimit); - - ASSERT(!conn2); } template <typename T> diff --git a/src/mongo/executor/connection_pool_tl.cpp b/src/mongo/executor/connection_pool_tl.cpp new file mode 100644 index 00000000000..3823a9a3e0a --- /dev/null +++ b/src/mongo/executor/connection_pool_tl.cpp @@ -0,0 +1,228 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kASIO + +#include "mongo/platform/basic.h" + +#include "mongo/executor/connection_pool_tl.h" + +#include "mongo/db/auth/internal_user_auth.h" +#include "mongo/util/log.h" + +namespace mongo { +namespace executor { +namespace connection_pool_tl { +namespace { +const auto kMaxTimerDuration = Milliseconds::max(); + +struct TimeoutHandler { + AtomicBool done; + Promise<void> promise; +}; + +} // namespace + +void TLTimer::setTimeout(Milliseconds timeoutVal, TimeoutCallback cb) { + _timer->waitFor(timeoutVal).getAsync([cb = std::move(cb)](Status status) { + if (status == ErrorCodes::CallbackCanceled) { + return; + } + + fassert(50475, status); + + cb(); + }); +} + +void TLTimer::cancelTimeout() { + _timer->cancel(); +} + +void TLConnection::indicateSuccess() { + _status = Status::OK(); + _lastUsed = _reactor->now(); +} + +void TLConnection::indicateFailure(Status status) { + _status = std::move(status); +} + +const HostAndPort& TLConnection::getHostAndPort() const { + return _peer; +} + +bool TLConnection::isHealthy() { + return _client->isStillConnected(); +} + +AsyncDBClient* TLConnection::client() { + return _client.get(); +} + +void TLConnection::indicateUsed() { + // It is illegal to attempt to use a connection after calling indicateFailure(). + invariant(_status.isOK() || _status == ConnectionPool::kConnectionStateUnknown); + _lastUsed = _reactor->now(); +} + +Date_t TLConnection::getLastUsed() const { + return _lastUsed; +} + +const Status& TLConnection::getStatus() const { + return _status; +} + +void TLConnection::setTimeout(Milliseconds timeout, TimeoutCallback cb) { + _timer.setTimeout(timeout, std::move(cb)); +} + +void TLConnection::cancelTimeout() { + _timer.cancelTimeout(); +} + +void TLConnection::setup(Milliseconds timeout, SetupCallback cb) { + + auto handler = std::make_shared<TimeoutHandler>(); + handler->promise.getFuture().getAsync( + [ this, cb = std::move(cb) ](Status status) { cb(this, std::move(status)); }); + + log() << "Connecting to " << _peer; + setTimeout(timeout, [this, handler, timeout] { + if (handler->done.swap(true)) { + return; + } + std::string reason = str::stream() << "Timed out connecting to " << _peer << " after " + << timeout; + handler->promise.setError( + Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, std::move(reason))); + }); + + AsyncDBClient::connect(_peer, transport::kGlobalSSLMode, _serviceContext, _reactor) + .onError( + [this](StatusWith<AsyncDBClient::Handle> swc) -> StatusWith<AsyncDBClient::Handle> { + return Status(ErrorCodes::HostUnreachable, swc.getStatus().reason()); + }) + .then([this](AsyncDBClient::Handle client) { + _client = std::move(client); + return _client->initWireVersion("NetworkInterfaceTL", _onConnectHook); + }) + .then([this] { return _client->authenticate(getInternalUserAuthParams()); }) + .then([this] { + if (!_onConnectHook) { + return Future<void>::makeReady(); + } + auto connectHookRequest = uassertStatusOK(_onConnectHook->makeRequest(_peer)); + if (!connectHookRequest) { + return Future<void>::makeReady(); + } + return _client->runCommandRequest(*connectHookRequest) + .then([this](RemoteCommandResponse response) { + return _onConnectHook->handleReply(_peer, std::move(response)); + }); + }) + .getAsync([this, handler](Status status) { + if (handler->done.swap(true)) { + return; + } + + cancelTimeout(); + + if (status.isOK()) { + handler->promise.emplaceValue(); + } else { + log() << "Failed to connect to " << _peer << " - " << redact(status); + handler->promise.setError(status); + } + }); +} + +void TLConnection::resetToUnknown() { + _status = ConnectionPool::kConnectionStateUnknown; +} + +void TLConnection::refresh(Milliseconds timeout, RefreshCallback cb) { + auto handler = std::make_shared<TimeoutHandler>(); + handler->promise.getFuture().getAsync( + [ this, cb = std::move(cb) ](Status status) { cb(this, status); }); + + setTimeout(timeout, [this, handler] { + if (handler->done.swap(true)) { + return; + } + + _status = {ErrorCodes::HostUnreachable, "Timed out refreshing host"}; + _client->cancel(); + + handler->promise.setError(_status); + }); + + _client + ->runCommandRequest( + {_peer, std::string("admin"), BSON("isMaster" << 1), BSONObj(), nullptr}) + .then([this](executor::RemoteCommandResponse response) { + return Future<void>::makeReady(response.status); + }) + .getAsync([this, handler](Status status) { + if (handler->done.swap(true)) { + return; + } + + cancelTimeout(); + + _status = status; + if (status.isOK()) { + handler->promise.emplaceValue(); + } else { + handler->promise.setError(status); + } + }); +} + +size_t TLConnection::getGeneration() const { + return _generation; +} + +std::unique_ptr<ConnectionPool::ConnectionInterface> TLTypeFactory::makeConnection( + const HostAndPort& hostAndPort, size_t generation) { + return std::make_unique<TLConnection>( + _reactor, getGlobalServiceContext(), hostAndPort, generation, _onConnectHook.get()); +} + +std::unique_ptr<ConnectionPool::TimerInterface> TLTypeFactory::makeTimer() { + return std::make_unique<TLTimer>(_reactor); +} + +Date_t TLTypeFactory::now() { + return _reactor->now(); +} + +} // namespace connection_pool_tl +} // namespace executor +} // namespace diff --git a/src/mongo/executor/connection_pool_tl.h b/src/mongo/executor/connection_pool_tl.h new file mode 100644 index 00000000000..e761828fb64 --- /dev/null +++ b/src/mongo/executor/connection_pool_tl.h @@ -0,0 +1,121 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <memory> + +#include "mongo/client/async_client.h" +#include "mongo/executor/connection_pool.h" +#include "mongo/executor/network_connection_hook.h" +#include "mongo/executor/network_interface.h" + +namespace mongo { +namespace executor { +namespace connection_pool_tl { + +class TLTypeFactory final : public ConnectionPool::DependentTypeFactoryInterface { +public: + TLTypeFactory(transport::ReactorHandle reactor, + transport::TransportLayer* tl, + std::unique_ptr<NetworkConnectionHook> onConnectHook) + : _reactor(std::move(reactor)), _tl(tl), _onConnectHook(std::move(onConnectHook)) {} + + std::unique_ptr<ConnectionPool::ConnectionInterface> makeConnection( + const HostAndPort& hostAndPort, size_t generation) override; + std::unique_ptr<ConnectionPool::TimerInterface> makeTimer() override; + + Date_t now() override; + +private: + transport::ReactorHandle _reactor; + transport::TransportLayer* _tl; + std::unique_ptr<NetworkConnectionHook> _onConnectHook; +}; + +class TLTimer final : public ConnectionPool::TimerInterface { +public: + explicit TLTimer(const transport::ReactorHandle& reactor) + : _reactor(reactor), _timer(_reactor->makeTimer()) {} + + void setTimeout(Milliseconds timeout, TimeoutCallback cb) override; + void cancelTimeout() override; + +private: + transport::ReactorHandle _reactor; + std::unique_ptr<transport::ReactorTimer> _timer; +}; + +class TLConnection final : public ConnectionPool::ConnectionInterface { +public: + TLConnection(transport::ReactorHandle reactor, + ServiceContext* serviceContext, + HostAndPort peer, + size_t generation, + NetworkConnectionHook* onConnectHook) + : _reactor(reactor), + _serviceContext(serviceContext), + _timer(_reactor), + _peer(std::move(peer)), + _generation(generation), + _onConnectHook(onConnectHook) {} + + void indicateSuccess() override; + void indicateFailure(Status status) override; + const HostAndPort& getHostAndPort() const; + bool isHealthy() override; + AsyncDBClient* client(); + +private: + void indicateUsed() override; + Date_t getLastUsed() const override; + const Status& getStatus() const override; + + void setTimeout(Milliseconds timeout, TimeoutCallback cb) override; + void cancelTimeout() override; + void setup(Milliseconds timeout, SetupCallback cb) override; + void resetToUnknown() override; + void refresh(Milliseconds timeout, RefreshCallback cb) override; + + size_t getGeneration() const override; + +private: + transport::ReactorHandle _reactor; + ServiceContext* const _serviceContext; + TLTimer _timer; + HostAndPort _peer; + size_t _generation; + NetworkConnectionHook* const _onConnectHook; + AsyncDBClient::Handle _client; + Date_t _lastUsed; + Status _status = ConnectionPool::kConnectionStateUnknown; +}; + +} // namespace connection_pool_asio +} // namespace executor +} // namespace mongo diff --git a/src/mongo/executor/network_interface.h b/src/mongo/executor/network_interface.h index a925fd81008..48428b820e3 100644 --- a/src/mongo/executor/network_interface.h +++ b/src/mongo/executor/network_interface.h @@ -141,7 +141,8 @@ public: * * Returns ErrorCodes::ShutdownInProgress if NetworkInterface::shutdown has already started * and true otherwise. If it returns Status::OK(), then the action will be executed by - * NetworkInterface eventually; otherwise, it will not. + * NetworkInterface eventually if no error occurs while waiting for the alarm; otherwise, + * it will not. * * "action" should not do anything that requires a lot of computation, or that might block for a * long time, as it may execute in a network thread. diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp new file mode 100644 index 00000000000..eef2299e623 --- /dev/null +++ b/src/mongo/executor/network_interface_tl.cpp @@ -0,0 +1,370 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kASIO + +#include "mongo/platform/basic.h" + +#include "mongo/executor/network_interface_tl.h" + +#include "mongo/db/server_options.h" +#include "mongo/executor/connection_pool_tl.h" +#include "mongo/transport/transport_layer_manager.h" +#include "mongo/util/concurrency/idle_thread_block.h" +#include "mongo/util/log.h" +#include "mongo/util/net/sock.h" + +namespace mongo { +namespace executor { + +NetworkInterfaceTL::NetworkInterfaceTL(std::string instanceName, + ConnectionPool::Options connPoolOpts, + ServiceContext* svcCtx, + std::unique_ptr<NetworkConnectionHook> onConnectHook, + std::unique_ptr<rpc::EgressMetadataHook> metadataHook) + : _instanceName(std::move(instanceName)), + _svcCtx(svcCtx), + _tl(nullptr), + _ownedTransportLayer(nullptr), + _reactor(nullptr), + _connPoolOpts(std::move(connPoolOpts)), + _onConnectHook(std::move(onConnectHook)), + _metadataHook(std::move(metadataHook)), + _inShutdown(false) {} + +std::string NetworkInterfaceTL::getDiagnosticString() { + return "DEPRECATED: getDiagnosticString is deprecated in NetworkInterfaceTL"; +} + +void NetworkInterfaceTL::appendConnectionStats(ConnectionPoolStats* stats) const { + _pool->appendConnectionStats(stats); +} + +std::string NetworkInterfaceTL::getHostName() { + return getHostNameCached(); +} + +void NetworkInterfaceTL::startup() { + if (_svcCtx) { + _tl = _svcCtx->getTransportLayer(); + } + + if (!_tl) { + warning() << "No TransportLayer configured during NetworkInterface startup"; + _ownedTransportLayer = + transport::TransportLayerManager::makeAndStartDefaultEgressTransportLayer(); + _tl = _ownedTransportLayer.get(); + } + + _reactor = _tl->getReactor(transport::TransportLayer::kNewReactor); + auto typeFactory = std::make_unique<connection_pool_tl::TLTypeFactory>( + _reactor, _tl, std::move(_onConnectHook)); + _pool = std::make_unique<ConnectionPool>( + std::move(typeFactory), "NetworkInterfaceTL", _connPoolOpts); + _ioThread = stdx::thread([this] { + setThreadName(_instanceName); + LOG(2) << "The NetworkInterfaceTL reactor thread is spinning up"; + _reactor->run(); + }); +} + +void NetworkInterfaceTL::shutdown() { + _inShutdown.store(true); + _reactor->stop(); + _ioThread.join(); + LOG(2) << "NetworkInterfaceTL shutdown successfully"; +} + +bool NetworkInterfaceTL::inShutdown() const { + return _inShutdown.load(); +} + +void NetworkInterfaceTL::waitForWork() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + MONGO_IDLE_THREAD_BLOCK; + _workReadyCond.wait(lk, [this] { return _isExecutorRunnable; }); +} + +void NetworkInterfaceTL::waitForWorkUntil(Date_t when) { + stdx::unique_lock<stdx::mutex> lk(_mutex); + MONGO_IDLE_THREAD_BLOCK; + _workReadyCond.wait_until(lk, when.toSystemTimePoint(), [this] { return _isExecutorRunnable; }); +} + +void NetworkInterfaceTL::signalWorkAvailable() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + if (!_isExecutorRunnable) { + _isExecutorRunnable = true; + _workReadyCond.notify_one(); + } +} + +Date_t NetworkInterfaceTL::now() { + // TODO This check is because we set up NetworkInterfaces in MONGO_INITIALIZERS and then expect + // this method to work before the NI is started. + if (!_reactor) { + return Date_t::now(); + } + return _reactor->now(); +} + + +Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHandle, + RemoteCommandRequest& request, + const RemoteCommandCompletionFn& onFinish) { + if (inShutdown()) { + return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"}; + } + + LOG(3) << "startCommand: " << redact(request.toString()); + + if (_metadataHook) { + BSONObjBuilder newMetadata(std::move(request.metadata)); + + auto status = _metadataHook->writeRequestMetadata(request.opCtx, &newMetadata); + if (!status.isOK()) { + return status; + } + + request.metadata = newMetadata.obj(); + } + + auto state = std::make_shared<CommandState>(request, cbHandle); + state->mergedFuture = state->promise.getFuture(); + { + stdx::lock_guard<stdx::mutex> lk(_inProgressMutex); + _inProgress.insert({state->cbHandle, state}); + } + + state->start = now(); + if (state->request.timeout != state->request.kNoTimeout) { + state->deadline = state->start + state->request.timeout; + } + + _pool->get(request.target, request.timeout) + .tapError([state](Status error) { + LOG(2) << "Failed to get connection from pool for request " << state->request.id << ": " + << error; + }) + .then([this, state](ConnectionPool::ConnectionHandle conn) mutable { + return _onAcquireConn(state, std::move(conn)); + }) + .onError([](Status error) -> StatusWith<RemoteCommandResponse> { + // The TransportLayer has, for historical reasons returned SocketException for + // network errors, but sharding assumes HostUnreachable on network errors. + if (error == ErrorCodes::SocketException) { + error = Status(ErrorCodes::HostUnreachable, error.reason()); + } + return error; + }) + .getAsync([this, state, onFinish](StatusWith<RemoteCommandResponse> response) { + auto duration = now() - state->start; + if (!response.isOK()) { + onFinish(RemoteCommandResponse(response.getStatus(), duration)); + } else { + auto rs = std::move(response.getValue()); + LOG(2) << "Request " << state->request.id << " finished with response: " + << redact(rs.isOK() ? rs.data.toString() : rs.status.toString()); + onFinish(rs); + } + }); + return Status::OK(); +} + +// This is only called from within a then() callback on a future, so throwing is equivalent to +// returning a ready Future with a not-OK status. +Future<RemoteCommandResponse> NetworkInterfaceTL::_onAcquireConn( + std::shared_ptr<CommandState> state, ConnectionPool::ConnectionHandle conn) { + if (state->done.load()) { + conn->indicateSuccess(); + uasserted(ErrorCodes::CallbackCanceled, "Command was canceled"); + } + + state->conn = std::move(conn); + auto tlconn = checked_cast<connection_pool_tl::TLConnection*>(state->conn.get()); + auto client = tlconn->client(); + + if (state->deadline != RemoteCommandRequest::kNoExpirationDate) { + auto nowVal = now(); + if (nowVal >= state->deadline) { + auto connDuration = nowVal - state->start; + uasserted(ErrorCodes::NetworkInterfaceExceededTimeLimit, + str::stream() << "Remote command timed out while waiting to get a " + "connection from the pool, took " + << connDuration + << ", timeout was set to " + << state->request.timeout); + } + + state->timer = _reactor->makeTimer(); + state->timer->waitUntil(state->deadline).getAsync([client, state](Status status) { + if (status == ErrorCodes::CallbackCanceled) { + invariant(state->done.load()); + return; + } + + if (state->done.swap(true)) { + return; + } + + LOG(2) << "Request " << state->request.id << " timed out" + << ", deadline was " << state->deadline << ", op was " + << redact(state->request.toString()); + state->promise.setError( + Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, "timed out")); + + client->cancel(); + }); + } + + client->runCommandRequest(state->request) + .then([this, state](RemoteCommandResponse response) { + if (state->done.load()) { + uasserted(ErrorCodes::CallbackCanceled, "Callback was canceled"); + } + + // TODO Investigate whether this is necessary here. + return _reactor->execute([ this, state, response = std::move(response) ]() mutable { + if (_metadataHook && response.status.isOK()) { + auto target = state->conn->getHostAndPort().toString(); + response.status = _metadataHook->readReplyMetadata( + nullptr, std::move(target), response.metadata); + } + + return std::move(response); + }); + }) + .getAsync([this, state](StatusWith<RemoteCommandResponse> swr) { + _eraseInUseConn(state->cbHandle); + if (!swr.isOK()) { + state->conn->indicateFailure(swr.getStatus()); + } else if (!swr.getValue().isOK()) { + state->conn->indicateFailure(swr.getValue().status); + } else { + state->conn->indicateSuccess(); + } + + if (state->done.swap(true)) + return; + + if (state->timer) { + state->timer->cancel(); + } + + state->promise.setWith([&] { return std::move(swr); }); + }); + + return std::move(state->mergedFuture); +} + +void NetworkInterfaceTL::_eraseInUseConn(const TaskExecutor::CallbackHandle& cbHandle) { + stdx::lock_guard<stdx::mutex> lk(_inProgressMutex); + _inProgress.erase(cbHandle); +} + +void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) { + stdx::unique_lock<stdx::mutex> lk(_inProgressMutex); + auto it = _inProgress.find(cbHandle); + if (it == _inProgress.end()) { + return; + } + auto state = it->second; + _inProgress.erase(it); + lk.unlock(); + + if (state->done.swap(true)) { + return; + } + + LOG(2) << "Canceling operation; original request was: " << redact(state->request.toString()); + state->promise.setError({ErrorCodes::CallbackCanceled, + str::stream() << "Command canceled; original request was: " + << redact(state->request.toString())}); + if (state->conn) { + auto client = checked_cast<connection_pool_tl::TLConnection*>(state->conn.get()); + client->client()->cancel(); + } +} + +Status NetworkInterfaceTL::setAlarm(Date_t when, const stdx::function<void()>& action) { + if (inShutdown()) { + return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"}; + } + + if (when <= now()) { + _reactor->schedule(transport::Reactor::kPost, std::move(action)); + return Status::OK(); + } + + std::shared_ptr<transport::ReactorTimer> alarmTimer = _reactor->makeTimer(); + std::weak_ptr<transport::ReactorTimer> weakTimer = alarmTimer; + { + // We do this so that the lifetime of the alarmTimers is the lifetime of the NITL. + stdx::lock_guard<stdx::mutex> lk(_inProgressMutex); + _inProgressAlarms.insert(alarmTimer); + } + + alarmTimer->waitUntil(when).getAsync([this, weakTimer, action, when](Status status) { + auto alarmTimer = weakTimer.lock(); + if (!alarmTimer) { + return; + } else { + stdx::lock_guard<stdx::mutex> lk(_inProgressMutex); + _inProgressAlarms.erase(alarmTimer); + } + + auto nowVal = now(); + if (nowVal < when) { + warning() << "Alarm returned early. Expected at: " << when << ", fired at: " << nowVal; + const auto status = setAlarm(when, std::move(action)); + if ((!status.isOK()) && (status != ErrorCodes::ShutdownInProgress)) { + fassertFailedWithStatus(50785, status); + } + + return; + } + + if (status.isOK()) { + _reactor->schedule(transport::Reactor::kPost, std::move(action)); + } else if (status != ErrorCodes::CallbackCanceled) { + warning() << "setAlarm() received an error: " << status; + } + }); + return Status::OK(); +} + +bool NetworkInterfaceTL::onNetworkThread() { + return _reactor->onReactorThread(); +} + +void NetworkInterfaceTL::dropConnections(const HostAndPort& hostAndPort) { + _pool->dropConnections(hostAndPort); +} + +} // namespace executor +} // namespace mongo diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h new file mode 100644 index 00000000000..8c4d4697d93 --- /dev/null +++ b/src/mongo/executor/network_interface_tl.h @@ -0,0 +1,119 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <deque> + +#include "mongo/client/async_client.h" +#include "mongo/db/service_context.h" +#include "mongo/executor/connection_pool.h" +#include "mongo/executor/network_interface.h" +#include "mongo/rpc/metadata/metadata_hook.h" +#include "mongo/stdx/thread.h" +#include "mongo/stdx/unordered_map.h" +#include "mongo/transport/transport_layer.h" + +namespace mongo { +namespace executor { + +class NetworkInterfaceTL : public NetworkInterface { +public: + NetworkInterfaceTL(std::string instanceName, + ConnectionPool::Options connPoolOpts, + ServiceContext* ctx, + std::unique_ptr<NetworkConnectionHook> onConnectHook, + std::unique_ptr<rpc::EgressMetadataHook> metadataHook); + std::string getDiagnosticString() override; + void appendConnectionStats(ConnectionPoolStats* stats) const override; + std::string getHostName() override; + void startup() override; + void shutdown() override; + bool inShutdown() const override; + void waitForWork() override; + void waitForWorkUntil(Date_t when) override; + void signalWorkAvailable() override; + Date_t now() override; + Status startCommand(const TaskExecutor::CallbackHandle& cbHandle, + RemoteCommandRequest& request, + const RemoteCommandCompletionFn& onFinish) override; + void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) override; + Status setAlarm(Date_t when, const stdx::function<void()>& action) override; + + bool onNetworkThread() override; + + void dropConnections(const HostAndPort& hostAndPort) override; + +private: + struct CommandState { + CommandState(RemoteCommandRequest request_, TaskExecutor::CallbackHandle cbHandle_) + : request(std::move(request_)), cbHandle(std::move(cbHandle_)) {} + + RemoteCommandRequest request; + TaskExecutor::CallbackHandle cbHandle; + Date_t deadline = RemoteCommandRequest::kNoExpirationDate; + Date_t start; + + ConnectionPool::ConnectionHandle conn; + std::unique_ptr<transport::ReactorTimer> timer; + + AtomicBool done; + Promise<RemoteCommandResponse> promise; + Future<RemoteCommandResponse> mergedFuture; + }; + + void _eraseInUseConn(const TaskExecutor::CallbackHandle& handle); + Future<RemoteCommandResponse> _onAcquireConn(std::shared_ptr<CommandState> state, + ConnectionPool::ConnectionHandle conn); + + std::string _instanceName; + ServiceContext* _svcCtx; + transport::TransportLayer* _tl; + // Will be created if ServiceContext is null, or if no TransportLayer was configured at startup + std::unique_ptr<transport::TransportLayer> _ownedTransportLayer; + transport::ReactorHandle _reactor; + + ConnectionPool::Options _connPoolOpts; + std::unique_ptr<NetworkConnectionHook> _onConnectHook; + std::unique_ptr<ConnectionPool> _pool; + + std::unique_ptr<rpc::EgressMetadataHook> _metadataHook; + AtomicBool _inShutdown; + stdx::thread _ioThread; + + stdx::mutex _inProgressMutex; + stdx::unordered_map<TaskExecutor::CallbackHandle, std::shared_ptr<CommandState>> _inProgress; + stdx::unordered_set<std::shared_ptr<transport::ReactorTimer>> _inProgressAlarms; + + stdx::mutex _mutex; + stdx::condition_variable _workReadyCond; + bool _isExecutorRunnable = false; +}; + +} // namespace executor +} // namespace mongo diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h index 8ef9e732233..8d7c9cf72b7 100644 --- a/src/mongo/executor/task_executor.h +++ b/src/mongo/executor/task_executor.h @@ -41,6 +41,7 @@ #include "mongo/platform/hash_namespace.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/functional.h" +#include "mongo/util/future.h" #include "mongo/util/time_support.h" namespace mongo { diff --git a/src/mongo/s/sharding_router_test_fixture.cpp b/src/mongo/s/sharding_router_test_fixture.cpp index 03d5fb6d01b..69cbd4b4ac1 100644 --- a/src/mongo/s/sharding_router_test_fixture.cpp +++ b/src/mongo/s/sharding_router_test_fixture.cpp @@ -106,15 +106,8 @@ void ShardingTestFixture::setUp() { service->setPreciseClockSource(stdx::make_unique<ClockSourceMock>()); service->setTickSource(stdx::make_unique<TickSourceMock>()); - { - auto tlMock = stdx::make_unique<transport::TransportLayerMock>(); - _transportLayer = tlMock.get(); - ASSERT_OK(_transportLayer->start()); - service->setTransportLayer(std::move(tlMock)); - } - CollatorFactoryInterface::set(service, stdx::make_unique<CollatorFactoryMock>()); - _transportSession = transport::MockSession::create(_transportLayer); + _transportSession = transport::MockSession::create(nullptr); _client = service->makeClient("ShardingTestFixture", _transportSession); _opCtx = _client->makeOperationContext(); @@ -544,7 +537,7 @@ void ShardingTestFixture::expectFindSendBSONObjVector(const HostAndPort& configH } void ShardingTestFixture::setRemote(const HostAndPort& remote) { - _transportSession = transport::MockSession::create(remote, HostAndPort{}, _transportLayer); + _transportSession = transport::MockSession::create(remote, HostAndPort{}, nullptr); } void ShardingTestFixture::checkReadConcern(const BSONObj& cmdObj, diff --git a/src/mongo/s/sharding_router_test_fixture.h b/src/mongo/s/sharding_router_test_fixture.h index 574f484faac..05c1fa37a88 100644 --- a/src/mongo/s/sharding_router_test_fixture.h +++ b/src/mongo/s/sharding_router_test_fixture.h @@ -188,7 +188,6 @@ public: private: ServiceContext::UniqueClient _client; ServiceContext::UniqueOperationContext _opCtx; - transport::TransportLayerMock* _transportLayer; transport::SessionHandle _transportSession; RemoteCommandTargeterFactoryMock* _targeterFactory; diff --git a/src/mongo/transport/mock_session.h b/src/mongo/transport/mock_session.h index 19379babb20..60e6a4e33e2 100644 --- a/src/mongo/transport/mock_session.h +++ b/src/mongo/transport/mock_session.h @@ -66,13 +66,13 @@ public: } void end() override { - if (!_tl->owns(id())) + if (!_tl || !_tl->owns(id())) return; _tl->_sessions[id()].ended = true; } StatusWith<Message> sourceMessage() override { - if (_tl->inShutdown()) { + if (!_tl || _tl->inShutdown()) { return TransportLayer::ShutdownStatus; } else if (!_tl->owns(id())) { return TransportLayer::SessionUnknownStatus; @@ -88,7 +88,7 @@ public: } Status sinkMessage(Message message) override { - if (_tl->inShutdown()) { + if (!_tl || _tl->inShutdown()) { return TransportLayer::ShutdownStatus; } else if (!_tl->owns(id())) { return TransportLayer::SessionUnknownStatus; diff --git a/src/mongo/transport/transport_layer.h b/src/mongo/transport/transport_layer.h index 1b8f502237f..492fbf82ea0 100644 --- a/src/mongo/transport/transport_layer.h +++ b/src/mongo/transport/transport_layer.h @@ -113,19 +113,24 @@ public: ReactorTimer(const ReactorTimer&) = delete; ReactorTimer& operator=(const ReactorTimer&) = delete; + /* + * The destructor calls cancel() to ensure outstanding Futures are filled. + */ virtual ~ReactorTimer() = default; /* - * Cancel any outstanding calls to waitFor/waitUntil. The future will have - * an ErrorCodes::CallbackCancelled status. + * Cancel any outstanding future from waitFor/waitUntil. The future will be filled with an + * ErrorCodes::CallbackCancelled status. + * + * If no future is outstanding, then this is a noop. */ virtual void cancel() = 0; /* - * Returns a future that will be filled with Status::OK after the timeout has - * ellapsed or has been cancelled. + * Returns a future that will be filled with Status::OK after the timeout has ellapsed. + * + * Calling this implicitly calls cancel(). */ - virtual Future<void> waitFor(Milliseconds timeout) = 0; virtual Future<void> waitUntil(Date_t timeout) = 0; }; @@ -154,8 +159,9 @@ public: Promise<FutureContinuationResult<Callback>> promise; auto future = promise.getFuture(); - schedule(kPost, - [ cb = std::forward<Callback>(cb), sp = promise.share() ] { sp.setWith(cb); }); + schedule(kPost, [ cb = std::forward<Callback>(cb), sp = promise.share() ]() mutable { + sp.setWith(cb); + }); return future; } diff --git a/src/mongo/transport/transport_layer_asio.cpp b/src/mongo/transport/transport_layer_asio.cpp index 772a4eb189f..2c4e1e18fe4 100644 --- a/src/mongo/transport/transport_layer_asio.cpp +++ b/src/mongo/transport/transport_layer_asio.cpp @@ -64,7 +64,7 @@ namespace transport { class ASIOReactorTimer final : public ReactorTimer { public: explicit ASIOReactorTimer(asio::io_context& ctx) - : _timer(std::make_shared<asio::system_timer>(ctx)) {} + : _timerState(std::make_shared<TimerState>(ctx)) {} ~ASIOReactorTimer() { // The underlying timer won't get destroyed until the last promise from _asyncWait @@ -73,42 +73,82 @@ public: } void cancel() override { - _timer->cancel(); + auto promise = [&] { + stdx::lock_guard<stdx::mutex> lk(_timerState->mutex); + _timerState->generation++; + return std::move(_timerState->finalPromise); + }(); + + if (promise) { + // We're worried that setting the error on the promise without unwinding the stack + // can lead to a deadlock, so this gets scheduled on the io_context of the timer. + _timerState->timer.get_io_context().post([promise = promise->share()]() mutable { + promise.setError({ErrorCodes::CallbackCanceled, "Timer was canceled"}); + }); + } + _timerState->timer.cancel(); } Future<void> waitFor(Milliseconds timeout) override { - return _asyncWait([&] { _timer->expires_after(timeout.toSystemDuration()); }); + return _asyncWait([&] { _timerState->timer.expires_after(timeout.toSystemDuration()); }); } Future<void> waitUntil(Date_t expiration) override { - return _asyncWait([&] { _timer->expires_at(expiration.toSystemTimePoint()); }); + return _asyncWait([&] { _timerState->timer.expires_at(expiration.toSystemTimePoint()); }); } private: - template <typename Callback> - Future<void> _asyncWait(Callback&& cb) { + template <typename ArmTimerCb> + Future<void> _asyncWait(ArmTimerCb&& armTimer) { try { - cb(); - Promise<void> promise; - auto ret = promise.getFuture(); - _timer->async_wait( - [ promise = promise.share(), timer = _timer ](const std::error_code& ec) mutable { + cancel(); + + Future<void> ret; + uint64_t id; + std::tie(ret, id) = [&] { + stdx::lock_guard<stdx::mutex> lk(_timerState->mutex); + auto id = ++_timerState->generation; + invariant(!_timerState->finalPromise); + _timerState->finalPromise = std::make_unique<Promise<void>>(); + auto future = _timerState->finalPromise->getFuture(); + return std::make_pair(std::move(future), id); + }(); + + armTimer(); + _timerState->timer.async_wait( + [ id, state = _timerState ](const std::error_code& ec) mutable { + stdx::unique_lock<stdx::mutex> lk(state->mutex); + if (id != state->generation) { + return; + } + auto promise = std::move(state->finalPromise); + lk.unlock(); + if (ec) { - promise.setError(errorCodeToStatus(ec)); + promise->setError(errorCodeToStatus(ec)); } else { - promise.emplaceValue(); + promise->emplaceValue(); } }); + return ret; } catch (asio::system_error& ex) { return Future<void>::makeReady(errorCodeToStatus(ex.code())); } } - // Destroying an asio::system_timer that has outstanding callbacks from async_wait will cause - // a broken promise - so this is managed by a shared ptr, so that each callback can extend the - // lifetime of the timer to after its been called. - std::shared_ptr<asio::system_timer> _timer; + // The timer itself and its state are stored in this struct managed by a shared_ptr so we can + // extend the lifetime of the timer until all callbacks to timer.async_wait have run. + struct TimerState { + explicit TimerState(asio::io_context& ctx) : timer(ctx) {} + + asio::system_timer timer; + stdx::mutex mutex; + uint64_t generation = 0; + std::unique_ptr<Promise<void>> finalPromise; + }; + + std::shared_ptr<TimerState> _timerState; }; class TransportLayerASIO::ASIOReactor final : public Reactor { |