/** * Copyright (C) 2018 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork #include "mongo/platform/basic.h" #include "mongo/client/async_client.h" #include "mongo/bson/bsonobjbuilder.h" #include "mongo/client/authenticate.h" #include "mongo/config.h" #include "mongo/db/auth/internal_user_auth.h" #include "mongo/db/commands/test_commands_enabled.h" #include "mongo/db/server_options.h" #include "mongo/db/wire_version.h" #include "mongo/executor/egress_tag_closer_manager.h" #include "mongo/rpc/factory.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/rpc/legacy_request_builder.h" #include "mongo/rpc/metadata/client_metadata.h" #include "mongo/rpc/reply_interface.h" #include "mongo/stdx/memory.h" #include "mongo/util/log.h" #include "mongo/util/net/socket_utils.h" #include "mongo/util/net/ssl_manager.h" #include "mongo/util/version.h" namespace mongo { Future AsyncDBClient::connect(const HostAndPort& peer, transport::ConnectSSLMode sslMode, ServiceContext* const context, transport::ReactorHandle reactor, Milliseconds timeout) { auto tl = context->getTransportLayer(); return tl->asyncConnect(peer, sslMode, std::move(reactor), timeout) .then([peer, context](transport::SessionHandle session) { return std::make_shared(peer, std::move(session), context); }); } BSONObj AsyncDBClient::_buildIsMasterRequest(const std::string& appName) { BSONObjBuilder bob; bob.append("isMaster", 1); bob.append("hangUpOnStepDown", false); const auto versionString = VersionInfoInterface::instance().version(); ClientMetadata::serialize(appName, versionString, &bob); if (getTestCommandsEnabled()) { // Only include the host:port of this process in the isMaster command request if test // commands are enabled. mongobridge uses this field to identify the process opening a // connection to it. StringBuilder sb; sb << getHostNameCached() << ':' << serverGlobalParams.port; bob.append("hostInfo", sb.str()); } _compressorManager.clientBegin(&bob); if (WireSpec::instance().isInternalClient) { WireSpec::appendInternalClientWireVersion(WireSpec::instance().outgoing, &bob); } return bob.obj(); } void AsyncDBClient::_parseIsMasterResponse(BSONObj request, const std::unique_ptr& response) { uassert(50786, "Expected opQuery response to isMaster", response->getProtocol() == rpc::Protocol::kOpQuery); auto responseBody = response->getCommandReply(); uassertStatusOK(getStatusFromCommandResult(responseBody)); auto protocolSet = uassertStatusOK(rpc::parseProtocolSetFromIsMasterReply(responseBody)); auto validateStatus = rpc::validateWireVersion(WireSpec::instance().outgoing, protocolSet.version); if (!validateStatus.isOK()) { warning() << "remote host has incompatible wire version: " << validateStatus; uasserted(validateStatus.code(), str::stream() << "remote host has incompatible wire version: " << validateStatus.reason()); } auto& egressTagManager = executor::EgressTagCloserManager::get(_svcCtx); // Tag outgoing connection so it can be kept open on FCV upgrade if it is not to a // server with a lower binary version. if (protocolSet.version.maxWireVersion >= WireSpec::instance().outgoing.maxWireVersion) { egressTagManager.mutateTags( _peer, [](transport::Session::TagMask tags) { return transport::Session::kKeepOpen; }); } auto clientProtocols = rpc::computeProtocolSet(WireSpec::instance().outgoing); invariant(clientProtocols != rpc::supports::kNone); // Set the operation protocol _negotiatedProtocol = uassertStatusOK(rpc::negotiate(protocolSet.protocolSet, clientProtocols)); _compressorManager.clientFinish(responseBody); } Future AsyncDBClient::authenticate(const BSONObj& params) { // This check is sufficient to see if auth is enabled on the system, // and avoids creating dependencies on deeper, less accessible auth code. if (!isInternalAuthSet()) { return Future::makeReady(); } // We will only have a valid clientName if SSL is enabled. std::string clientName; #ifdef MONGO_CONFIG_SSL if (getSSLManager()) { clientName = getSSLManager()->getSSLConfiguration().clientSubjectName.toString(); } #endif auto pf = makePromiseFuture(); auto authCompleteCb = [promise = pf.promise.share()](auth::AuthResponse response) mutable { if (response.isOK()) { promise.emplaceValue(); } else { promise.setError(response.status); } }; auto doAuthCb = [this](executor::RemoteCommandRequest request, auth::AuthCompletionHandler handler) { runCommandRequest(request).getAsync([handler = std::move(handler)]( StatusWith response) { if (!response.isOK()) { handler(executor::RemoteCommandResponse(response.getStatus())); } else { handler(std::move(response.getValue())); } }); }; auth::authenticateClient( params, remote(), clientName, std::move(doAuthCb), std::move(authCompleteCb)); return std::move(pf.future); } Future AsyncDBClient::initWireVersion(const std::string& appName, executor::NetworkConnectionHook* const hook) { auto requestObj = _buildIsMasterRequest(appName); // We use a legacy request to create our ismaster request because we may // have to communicate with servers that do not support other protocols. auto requestMsg = rpc::legacyRequestFromOpMsgRequest(OpMsgRequest::fromDBAndBody("admin", requestObj)); auto clkSource = _svcCtx->getFastClockSource(); auto start = clkSource->now(); return _call(requestMsg).then([this, requestObj, hook, clkSource, start](Message response) { auto cmdReply = rpc::makeReply(&response); _parseIsMasterResponse(requestObj, cmdReply); if (hook) { auto millis = duration_cast(clkSource->now() - start); executor::RemoteCommandResponse cmdResp(*cmdReply, millis); uassertStatusOK(hook->validateHost(_peer, requestObj, std::move(cmdResp))); } }); } Future AsyncDBClient::_call(Message request, const transport::BatonHandle& baton) { auto swm = _compressorManager.compressMessage(request); if (!swm.isOK()) { return swm.getStatus(); } request = std::move(swm.getValue()); auto msgId = nextMessageId(); request.header().setId(msgId); request.header().setResponseToMsgId(0); return _session->asyncSinkMessage(request, baton) .then([this, baton] { return _session->asyncSourceMessage(baton); }) .then([this, msgId](Message response) -> StatusWith { uassert(50787, "ResponseId did not match sent message ID.", response.header().getResponseToMsgId() == msgId); if (response.operation() == dbCompressed) { return _compressorManager.decompressMessage(response); } else { return response; } }); } Future AsyncDBClient::runCommand(OpMsgRequest request, const transport::BatonHandle& baton) { invariant(_negotiatedProtocol); auto requestMsg = rpc::messageFromOpMsgRequest(*_negotiatedProtocol, std::move(request)); return _call(std::move(requestMsg), baton) .then([this](Message response) -> Future { return rpc::UniqueReply(response, rpc::makeReply(&response)); }); } Future AsyncDBClient::runCommandRequest( executor::RemoteCommandRequest request, const transport::BatonHandle& baton) { auto clkSource = _svcCtx->getPreciseClockSource(); auto start = clkSource->now(); auto opMsgRequest = OpMsgRequest::fromDBAndBody( std::move(request.dbname), std::move(request.cmdObj), std::move(request.metadata)); return runCommand(std::move(opMsgRequest), baton) .then([start, clkSource, this](rpc::UniqueReply response) { auto duration = duration_cast(clkSource->now() - start); return executor::RemoteCommandResponse(*response, duration); }) .onError([start, clkSource](Status status) { auto duration = duration_cast(clkSource->now() - start); return executor::RemoteCommandResponse(status, duration); }); } void AsyncDBClient::cancel(const transport::BatonHandle& baton) { _session->cancelAsyncOperations(baton); } bool AsyncDBClient::isStillConnected() { return _session->isConnected(); } void AsyncDBClient::end() { _session->end(); } const HostAndPort& AsyncDBClient::remote() const { return _peer; } const HostAndPort& AsyncDBClient::local() const { return _session->local(); } } // namespace mongo