/** * Copyright (C) 2018 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kASIO #include "mongo/platform/basic.h" #include "mongo/executor/connection_pool_tl.h" #include "mongo/db/auth/internal_user_auth.h" #include "mongo/util/log.h" namespace mongo { namespace executor { namespace connection_pool_tl { namespace { const auto kMaxTimerDuration = Milliseconds::max(); struct TimeoutHandler { AtomicBool done; Promise promise; explicit TimeoutHandler(Promise p) : promise(std::move(p)) {} }; } // namespace void TLTypeFactory::shutdown() { // Stop any attempt to schedule timers in the future _inShutdown.store(true); stdx::lock_guard lk(_mutex); log() << "Killing all outstanding egress activity."; for (auto collar : _collars) { collar->kill(); } } void TLTypeFactory::fasten(Type* type) { stdx::lock_guard lk(_mutex); _collars.insert(type); } void TLTypeFactory::release(Type* type) { stdx::lock_guard lk(_mutex); _collars.erase(type); type->_wasReleased = true; } TLTypeFactory::Type::Type(const std::shared_ptr& factory) : _factory{factory} {} TLTypeFactory::Type::~Type() { invariant(_wasReleased); } void TLTypeFactory::Type::release() { _factory->release(this); } bool TLTypeFactory::inShutdown() const { return _inShutdown.load(); } void TLTimer::setTimeout(Milliseconds timeoutVal, TimeoutCallback cb) { // We will not wait on a timeout if we are in shutdown. // The clients will be canceled as an inevitable consequence of pools shutting down. if (inShutdown()) { LOG(2) << "Skipping timeout due to impending shutdown."; return; } _timer->waitUntil(_reactor->now() + timeoutVal).getAsync([cb = std::move(cb)](Status status) { // If we get canceled, then we don't worry about the timeout anymore if (status == ErrorCodes::CallbackCanceled) { return; } fassert(50475, status); cb(); }); } void TLTimer::cancelTimeout() { _timer->cancel(); } void TLConnection::indicateSuccess() { _status = Status::OK(); } void TLConnection::indicateFailure(Status status) { _status = std::move(status); } const HostAndPort& TLConnection::getHostAndPort() const { return _peer; } bool TLConnection::isHealthy() { return _client->isStillConnected(); } AsyncDBClient* TLConnection::client() { return _client.get(); } void TLConnection::indicateUsed() { // It is illegal to attempt to use a connection after calling indicateFailure(). invariant(_status.isOK() || _status == ConnectionPool::kConnectionStateUnknown); _lastUsed = _reactor->now(); } Date_t TLConnection::getLastUsed() const { return _lastUsed; } const Status& TLConnection::getStatus() const { return _status; } void TLConnection::setTimeout(Milliseconds timeout, TimeoutCallback cb) { auto anchor = shared_from_this(); _timer->setTimeout(timeout, [ cb = std::move(cb), anchor = std::move(anchor) ] { cb(); }); } void TLConnection::cancelTimeout() { _timer->cancelTimeout(); } void TLConnection::setup(Milliseconds timeout, SetupCallback cb) { auto anchor = shared_from_this(); auto pf = makePromiseFuture(); auto handler = std::make_shared(std::move(pf.promise)); std::move(pf.future).getAsync( [ this, cb = std::move(cb), anchor ](Status status) { cb(this, std::move(status)); }); log() << "Connecting to " << _peer; setTimeout(timeout, [this, handler, timeout] { if (handler->done.swap(true)) { return; } std::string reason = str::stream() << "Timed out connecting to " << _peer << " after " << timeout; handler->promise.setError( Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, std::move(reason))); if (_client) { _client->cancel(); } }); AsyncDBClient::connect(_peer, transport::kGlobalSSLMode, _serviceContext, _reactor, timeout) .onError([](StatusWith swc) -> StatusWith { return Status(ErrorCodes::HostUnreachable, swc.getStatus().reason()); }) .then([this](AsyncDBClient::Handle client) { _client = std::move(client); return _client->initWireVersion("NetworkInterfaceTL", _onConnectHook); }) .then([this] { return _client->authenticate(getInternalUserAuthParams()); }) .then([this] { if (!_onConnectHook) { return Future::makeReady(); } auto connectHookRequest = uassertStatusOK(_onConnectHook->makeRequest(_peer)); if (!connectHookRequest) { return Future::makeReady(); } return _client->runCommandRequest(*connectHookRequest) .then([this](RemoteCommandResponse response) { return _onConnectHook->handleReply(_peer, std::move(response)); }); }) .getAsync([this, handler, anchor](Status status) { if (handler->done.swap(true)) { return; } cancelTimeout(); if (status.isOK()) { handler->promise.emplaceValue(); } else { log() << "Failed to connect to " << _peer << " - " << redact(status); handler->promise.setError(status); } }); LOG(2) << "Finished connection setup."; } void TLConnection::resetToUnknown() { _status = ConnectionPool::kConnectionStateUnknown; } void TLConnection::refresh(Milliseconds timeout, RefreshCallback cb) { auto anchor = shared_from_this(); auto pf = makePromiseFuture(); auto handler = std::make_shared(std::move(pf.promise)); std::move(pf.future).getAsync( [ this, cb = std::move(cb), anchor ](Status status) { cb(this, status); }); setTimeout(timeout, [this, handler] { if (handler->done.swap(true)) { return; } _status = {ErrorCodes::HostUnreachable, "Timed out refreshing host"}; _client->cancel(); handler->promise.setError(_status); }); _client ->runCommandRequest( {_peer, std::string("admin"), BSON("isMaster" << 1), BSONObj(), nullptr}) .then([](executor::RemoteCommandResponse response) { return Future::makeReady(response.status); }) .getAsync([this, handler, anchor](Status status) { if (handler->done.swap(true)) { return; } cancelTimeout(); _status = status; if (status.isOK()) { handler->promise.emplaceValue(); } else { handler->promise.setError(status); } }); } size_t TLConnection::getGeneration() const { return _generation; } void TLConnection::cancelAsync() { if (_client) _client->cancel(); } std::shared_ptr TLTypeFactory::makeConnection( const HostAndPort& hostAndPort, size_t generation) { auto conn = std::make_shared(shared_from_this(), _reactor, getGlobalServiceContext(), hostAndPort, generation, _onConnectHook.get()); fasten(conn.get()); return conn; } std::shared_ptr TLTypeFactory::makeTimer() { auto timer = std::make_shared(shared_from_this(), _reactor); fasten(timer.get()); return timer; } Date_t TLTypeFactory::now() { return _reactor->now(); } } // namespace connection_pool_tl } // namespace executor } // namespace