diff options
author | Jonathan Reams <jbreams@mongodb.com> | 2018-04-09 15:37:57 -0400 |
---|---|---|
committer | Jonathan Reams <jbreams@mongodb.com> | 2018-04-13 12:18:39 -0400 |
commit | 821ac1964b7fc1beba93a71fb971584b3af4808d (patch) | |
tree | 7b4db79590920bc293ce2136a5c06f32802f9b76 /src/mongo/executor/network_interface_tl.cpp | |
parent | ad3671a64bd8958370a4aeaf93fe00d2d1272e3a (diff) | |
download | mongo-821ac1964b7fc1beba93a71fb971584b3af4808d.tar.gz |
SERVER-33821 Implement NetworkInterfaceTL and AsyncDBClient
Diffstat (limited to 'src/mongo/executor/network_interface_tl.cpp')
-rw-r--r-- | src/mongo/executor/network_interface_tl.cpp | 370 |
1 files changed, 370 insertions, 0 deletions
diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp new file mode 100644 index 00000000000..eef2299e623 --- /dev/null +++ b/src/mongo/executor/network_interface_tl.cpp @@ -0,0 +1,370 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kASIO + +#include "mongo/platform/basic.h" + +#include "mongo/executor/network_interface_tl.h" + +#include "mongo/db/server_options.h" +#include "mongo/executor/connection_pool_tl.h" +#include "mongo/transport/transport_layer_manager.h" +#include "mongo/util/concurrency/idle_thread_block.h" +#include "mongo/util/log.h" +#include "mongo/util/net/sock.h" + +namespace mongo { +namespace executor { + +NetworkInterfaceTL::NetworkInterfaceTL(std::string instanceName, + ConnectionPool::Options connPoolOpts, + ServiceContext* svcCtx, + std::unique_ptr<NetworkConnectionHook> onConnectHook, + std::unique_ptr<rpc::EgressMetadataHook> metadataHook) + : _instanceName(std::move(instanceName)), + _svcCtx(svcCtx), + _tl(nullptr), + _ownedTransportLayer(nullptr), + _reactor(nullptr), + _connPoolOpts(std::move(connPoolOpts)), + _onConnectHook(std::move(onConnectHook)), + _metadataHook(std::move(metadataHook)), + _inShutdown(false) {} + +std::string NetworkInterfaceTL::getDiagnosticString() { + return "DEPRECATED: getDiagnosticString is deprecated in NetworkInterfaceTL"; +} + +void NetworkInterfaceTL::appendConnectionStats(ConnectionPoolStats* stats) const { + _pool->appendConnectionStats(stats); +} + +std::string NetworkInterfaceTL::getHostName() { + return getHostNameCached(); +} + +void NetworkInterfaceTL::startup() { + if (_svcCtx) { + _tl = _svcCtx->getTransportLayer(); + } + + if (!_tl) { + warning() << "No TransportLayer configured during NetworkInterface startup"; + _ownedTransportLayer = + transport::TransportLayerManager::makeAndStartDefaultEgressTransportLayer(); + _tl = _ownedTransportLayer.get(); + } + + _reactor = _tl->getReactor(transport::TransportLayer::kNewReactor); + auto typeFactory = std::make_unique<connection_pool_tl::TLTypeFactory>( + _reactor, _tl, std::move(_onConnectHook)); + _pool = std::make_unique<ConnectionPool>( + std::move(typeFactory), "NetworkInterfaceTL", _connPoolOpts); + _ioThread = stdx::thread([this] { + setThreadName(_instanceName); + LOG(2) << "The NetworkInterfaceTL reactor thread is spinning up"; + _reactor->run(); + }); +} + +void NetworkInterfaceTL::shutdown() { + _inShutdown.store(true); + _reactor->stop(); + _ioThread.join(); + LOG(2) << "NetworkInterfaceTL shutdown successfully"; +} + +bool NetworkInterfaceTL::inShutdown() const { + return _inShutdown.load(); +} + +void NetworkInterfaceTL::waitForWork() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + MONGO_IDLE_THREAD_BLOCK; + _workReadyCond.wait(lk, [this] { return _isExecutorRunnable; }); +} + +void NetworkInterfaceTL::waitForWorkUntil(Date_t when) { + stdx::unique_lock<stdx::mutex> lk(_mutex); + MONGO_IDLE_THREAD_BLOCK; + _workReadyCond.wait_until(lk, when.toSystemTimePoint(), [this] { return _isExecutorRunnable; }); +} + +void NetworkInterfaceTL::signalWorkAvailable() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + if (!_isExecutorRunnable) { + _isExecutorRunnable = true; + _workReadyCond.notify_one(); + } +} + +Date_t NetworkInterfaceTL::now() { + // TODO This check is because we set up NetworkInterfaces in MONGO_INITIALIZERS and then expect + // this method to work before the NI is started. + if (!_reactor) { + return Date_t::now(); + } + return _reactor->now(); +} + + +Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHandle, + RemoteCommandRequest& request, + const RemoteCommandCompletionFn& onFinish) { + if (inShutdown()) { + return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"}; + } + + LOG(3) << "startCommand: " << redact(request.toString()); + + if (_metadataHook) { + BSONObjBuilder newMetadata(std::move(request.metadata)); + + auto status = _metadataHook->writeRequestMetadata(request.opCtx, &newMetadata); + if (!status.isOK()) { + return status; + } + + request.metadata = newMetadata.obj(); + } + + auto state = std::make_shared<CommandState>(request, cbHandle); + state->mergedFuture = state->promise.getFuture(); + { + stdx::lock_guard<stdx::mutex> lk(_inProgressMutex); + _inProgress.insert({state->cbHandle, state}); + } + + state->start = now(); + if (state->request.timeout != state->request.kNoTimeout) { + state->deadline = state->start + state->request.timeout; + } + + _pool->get(request.target, request.timeout) + .tapError([state](Status error) { + LOG(2) << "Failed to get connection from pool for request " << state->request.id << ": " + << error; + }) + .then([this, state](ConnectionPool::ConnectionHandle conn) mutable { + return _onAcquireConn(state, std::move(conn)); + }) + .onError([](Status error) -> StatusWith<RemoteCommandResponse> { + // The TransportLayer has, for historical reasons returned SocketException for + // network errors, but sharding assumes HostUnreachable on network errors. + if (error == ErrorCodes::SocketException) { + error = Status(ErrorCodes::HostUnreachable, error.reason()); + } + return error; + }) + .getAsync([this, state, onFinish](StatusWith<RemoteCommandResponse> response) { + auto duration = now() - state->start; + if (!response.isOK()) { + onFinish(RemoteCommandResponse(response.getStatus(), duration)); + } else { + auto rs = std::move(response.getValue()); + LOG(2) << "Request " << state->request.id << " finished with response: " + << redact(rs.isOK() ? rs.data.toString() : rs.status.toString()); + onFinish(rs); + } + }); + return Status::OK(); +} + +// This is only called from within a then() callback on a future, so throwing is equivalent to +// returning a ready Future with a not-OK status. +Future<RemoteCommandResponse> NetworkInterfaceTL::_onAcquireConn( + std::shared_ptr<CommandState> state, ConnectionPool::ConnectionHandle conn) { + if (state->done.load()) { + conn->indicateSuccess(); + uasserted(ErrorCodes::CallbackCanceled, "Command was canceled"); + } + + state->conn = std::move(conn); + auto tlconn = checked_cast<connection_pool_tl::TLConnection*>(state->conn.get()); + auto client = tlconn->client(); + + if (state->deadline != RemoteCommandRequest::kNoExpirationDate) { + auto nowVal = now(); + if (nowVal >= state->deadline) { + auto connDuration = nowVal - state->start; + uasserted(ErrorCodes::NetworkInterfaceExceededTimeLimit, + str::stream() << "Remote command timed out while waiting to get a " + "connection from the pool, took " + << connDuration + << ", timeout was set to " + << state->request.timeout); + } + + state->timer = _reactor->makeTimer(); + state->timer->waitUntil(state->deadline).getAsync([client, state](Status status) { + if (status == ErrorCodes::CallbackCanceled) { + invariant(state->done.load()); + return; + } + + if (state->done.swap(true)) { + return; + } + + LOG(2) << "Request " << state->request.id << " timed out" + << ", deadline was " << state->deadline << ", op was " + << redact(state->request.toString()); + state->promise.setError( + Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, "timed out")); + + client->cancel(); + }); + } + + client->runCommandRequest(state->request) + .then([this, state](RemoteCommandResponse response) { + if (state->done.load()) { + uasserted(ErrorCodes::CallbackCanceled, "Callback was canceled"); + } + + // TODO Investigate whether this is necessary here. + return _reactor->execute([ this, state, response = std::move(response) ]() mutable { + if (_metadataHook && response.status.isOK()) { + auto target = state->conn->getHostAndPort().toString(); + response.status = _metadataHook->readReplyMetadata( + nullptr, std::move(target), response.metadata); + } + + return std::move(response); + }); + }) + .getAsync([this, state](StatusWith<RemoteCommandResponse> swr) { + _eraseInUseConn(state->cbHandle); + if (!swr.isOK()) { + state->conn->indicateFailure(swr.getStatus()); + } else if (!swr.getValue().isOK()) { + state->conn->indicateFailure(swr.getValue().status); + } else { + state->conn->indicateSuccess(); + } + + if (state->done.swap(true)) + return; + + if (state->timer) { + state->timer->cancel(); + } + + state->promise.setWith([&] { return std::move(swr); }); + }); + + return std::move(state->mergedFuture); +} + +void NetworkInterfaceTL::_eraseInUseConn(const TaskExecutor::CallbackHandle& cbHandle) { + stdx::lock_guard<stdx::mutex> lk(_inProgressMutex); + _inProgress.erase(cbHandle); +} + +void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) { + stdx::unique_lock<stdx::mutex> lk(_inProgressMutex); + auto it = _inProgress.find(cbHandle); + if (it == _inProgress.end()) { + return; + } + auto state = it->second; + _inProgress.erase(it); + lk.unlock(); + + if (state->done.swap(true)) { + return; + } + + LOG(2) << "Canceling operation; original request was: " << redact(state->request.toString()); + state->promise.setError({ErrorCodes::CallbackCanceled, + str::stream() << "Command canceled; original request was: " + << redact(state->request.toString())}); + if (state->conn) { + auto client = checked_cast<connection_pool_tl::TLConnection*>(state->conn.get()); + client->client()->cancel(); + } +} + +Status NetworkInterfaceTL::setAlarm(Date_t when, const stdx::function<void()>& action) { + if (inShutdown()) { + return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"}; + } + + if (when <= now()) { + _reactor->schedule(transport::Reactor::kPost, std::move(action)); + return Status::OK(); + } + + std::shared_ptr<transport::ReactorTimer> alarmTimer = _reactor->makeTimer(); + std::weak_ptr<transport::ReactorTimer> weakTimer = alarmTimer; + { + // We do this so that the lifetime of the alarmTimers is the lifetime of the NITL. + stdx::lock_guard<stdx::mutex> lk(_inProgressMutex); + _inProgressAlarms.insert(alarmTimer); + } + + alarmTimer->waitUntil(when).getAsync([this, weakTimer, action, when](Status status) { + auto alarmTimer = weakTimer.lock(); + if (!alarmTimer) { + return; + } else { + stdx::lock_guard<stdx::mutex> lk(_inProgressMutex); + _inProgressAlarms.erase(alarmTimer); + } + + auto nowVal = now(); + if (nowVal < when) { + warning() << "Alarm returned early. Expected at: " << when << ", fired at: " << nowVal; + const auto status = setAlarm(when, std::move(action)); + if ((!status.isOK()) && (status != ErrorCodes::ShutdownInProgress)) { + fassertFailedWithStatus(50785, status); + } + + return; + } + + if (status.isOK()) { + _reactor->schedule(transport::Reactor::kPost, std::move(action)); + } else if (status != ErrorCodes::CallbackCanceled) { + warning() << "setAlarm() received an error: " << status; + } + }); + return Status::OK(); +} + +bool NetworkInterfaceTL::onNetworkThread() { + return _reactor->onReactorThread(); +} + +void NetworkInterfaceTL::dropConnections(const HostAndPort& hostAndPort) { + _pool->dropConnections(hostAndPort); +} + +} // namespace executor +} // namespace mongo |