diff options
author | Jonathan Reams <jbreams@mongodb.com> | 2018-04-09 15:37:57 -0400 |
---|---|---|
committer | Jonathan Reams <jbreams@mongodb.com> | 2018-04-13 12:18:39 -0400 |
commit | 821ac1964b7fc1beba93a71fb971584b3af4808d (patch) | |
tree | 7b4db79590920bc293ce2136a5c06f32802f9b76 /src/mongo/client/async_client.cpp | |
parent | ad3671a64bd8958370a4aeaf93fe00d2d1272e3a (diff) | |
download | mongo-821ac1964b7fc1beba93a71fb971584b3af4808d.tar.gz |
SERVER-33821 Implement NetworkInterfaceTL and AsyncDBClient
Diffstat (limited to 'src/mongo/client/async_client.cpp')
-rw-r--r-- | src/mongo/client/async_client.cpp | 264 |
1 files changed, 264 insertions, 0 deletions
diff --git a/src/mongo/client/async_client.cpp b/src/mongo/client/async_client.cpp new file mode 100644 index 00000000000..c1499fe394d --- /dev/null +++ b/src/mongo/client/async_client.cpp @@ -0,0 +1,264 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork + +#include "mongo/platform/basic.h" + +#include "mongo/client/async_client.h" + +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/client/authenticate.h" +#include "mongo/config.h" +#include "mongo/db/auth/authorization_manager_global.h" +#include "mongo/db/auth/internal_user_auth.h" +#include "mongo/db/commands/test_commands_enabled.h" +#include "mongo/db/wire_version.h" +#include "mongo/executor/egress_tag_closer_manager.h" +#include "mongo/rpc/factory.h" +#include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/rpc/legacy_request_builder.h" +#include "mongo/rpc/metadata/client_metadata.h" +#include "mongo/rpc/reply_interface.h" +#include "mongo/stdx/memory.h" +#include "mongo/util/log.h" +#include "mongo/util/net/sock.h" +#include "mongo/util/net/ssl_manager.h" +#include "mongo/util/version.h" + +namespace mongo { + +Future<AsyncDBClient::Handle> AsyncDBClient::connect(const HostAndPort& peer, + transport::ConnectSSLMode sslMode, + ServiceContext* const context, + transport::ReactorHandle reactor) { + auto tl = context->getTransportLayer(); + return tl->asyncConnect(peer, sslMode, std::move(reactor)) + .then([peer, context](transport::SessionHandle session) { + return std::make_shared<AsyncDBClient>(peer, std::move(session), context); + }); +} + +BSONObj AsyncDBClient::_buildIsMasterRequest(const std::string& appName) { + BSONObjBuilder bob; + + bob.append("isMaster", 1); + bob.append("hangUpOnStepDown", false); + const auto versionString = VersionInfoInterface::instance().version(); + ClientMetadata::serialize(appName, versionString, &bob); + + if (getTestCommandsEnabled()) { + // Only include the host:port of this process in the isMaster command request if test + // commands are enabled. mongobridge uses this field to identify the process opening a + // connection to it. + StringBuilder sb; + sb << getHostNameCached() << ':' << serverGlobalParams.port; + bob.append("hostInfo", sb.str()); + } + + _compressorManager.clientBegin(&bob); + + if (WireSpec::instance().isInternalClient) { + WireSpec::appendInternalClientWireVersion(WireSpec::instance().outgoing, &bob); + } + + return bob.obj(); +} + +void AsyncDBClient::_parseIsMasterResponse(BSONObj request, + const std::unique_ptr<rpc::ReplyInterface>& response) { + uassert(50786, + "Expected opQuery response to isMaster", + response->getProtocol() == rpc::Protocol::kOpQuery); + auto responseBody = response->getCommandReply(); + uassertStatusOK(getStatusFromCommandResult(responseBody)); + + auto protocolSet = uassertStatusOK(rpc::parseProtocolSetFromIsMasterReply(responseBody)); + auto validateStatus = + rpc::validateWireVersion(WireSpec::instance().outgoing, protocolSet.version); + if (!validateStatus.isOK()) { + warning() << "remote host has incompatible wire version: " << validateStatus; + uasserted(validateStatus.code(), + str::stream() << "remote host has incompatible wire version: " + << validateStatus.reason()); + } + + auto& egressTagManager = executor::EgressTagCloserManager::get(_svcCtx); + // Tag outgoing connection so it can be kept open on FCV upgrade if it is not to a + // server with a lower binary version. + if (protocolSet.version.maxWireVersion >= WireSpec::instance().outgoing.maxWireVersion) { + egressTagManager.mutateTags( + _peer, [](transport::Session::TagMask tags) { return transport::Session::kKeepOpen; }); + } + + auto clientProtocols = rpc::computeProtocolSet(WireSpec::instance().outgoing); + invariant(clientProtocols != rpc::supports::kNone); + // Set the operation protocol + _negotiatedProtocol = uassertStatusOK(rpc::negotiate(protocolSet.protocolSet, clientProtocols)); + + _compressorManager.clientFinish(responseBody); +} + +Future<void> AsyncDBClient::authenticate(const BSONObj& params) { + // This check is sufficient to see if auth is enabled on the system, + // and avoids creating dependencies on deeper, less accessible auth code. + if (!isInternalAuthSet()) { + return Future<void>::makeReady(); + } + + // We will only have a valid clientName if SSL is enabled. + std::string clientName; +#ifdef MONGO_CONFIG_SSL + if (getSSLManager()) { + clientName = getSSLManager()->getSSLConfiguration().clientSubjectName; + } +#endif + + Promise<void> retPromise; + auto ret = retPromise.getFuture(); + + auto authCompleteCb = [promise = retPromise.share()](auth::AuthResponse response) mutable { + if (response.isOK()) { + promise.emplaceValue(); + } else { + promise.setError(response.status); + } + }; + + auto doAuthCb = [this](executor::RemoteCommandRequest request, + auth::AuthCompletionHandler handler) { + + runCommandRequest(request).getAsync([handler = std::move(handler)]( + StatusWith<executor::RemoteCommandResponse> response) { + if (!response.isOK()) { + handler(executor::RemoteCommandResponse(response.getStatus())); + } else { + handler(std::move(response.getValue())); + } + }); + }; + + auth::authenticateClient( + params, remote(), clientName, std::move(doAuthCb), std::move(authCompleteCb)); + + return ret; +} + +Future<void> AsyncDBClient::initWireVersion(const std::string& appName, + executor::NetworkConnectionHook* const hook) { + auto requestObj = _buildIsMasterRequest(appName); + // We use a legacy request to create our ismaster request because we may + // have to communicate with servers that do not support other protocols. + auto requestMsg = + rpc::legacyRequestFromOpMsgRequest(OpMsgRequest::fromDBAndBody("admin", requestObj)); + auto clkSource = _svcCtx->getFastClockSource(); + auto start = clkSource->now(); + + return _call(requestMsg).then([this, requestObj, hook, clkSource, start](Message response) { + auto cmdReply = rpc::makeReply(&response); + if (hook) { + auto millis = duration_cast<Milliseconds>(clkSource->now() - start); + executor::RemoteCommandResponse cmdResp(*cmdReply, millis); + uassertStatusOK(hook->validateHost(_peer, std::move(cmdResp))); + } + _parseIsMasterResponse(requestObj, cmdReply); + }); +} + +Future<Message> AsyncDBClient::_call(Message request) { + auto swm = _compressorManager.compressMessage(request); + if (!swm.isOK()) { + return swm.getStatus(); + } + + request = std::move(swm.getValue()); + auto msgId = nextMessageId(); + request.header().setId(msgId); + request.header().setResponseToMsgId(0); + + return _session->asyncSinkMessage(request) + .then([this] { return _session->asyncSourceMessage(); }) + .then([this, msgId](Message response) -> StatusWith<Message> { + uassert(50787, + "ResponseId did not match sent message ID.", + response.header().getResponseToMsgId() == msgId); + + if (response.operation() == dbCompressed) { + return _compressorManager.decompressMessage(response); + } else { + return response; + } + }); +} + +Future<rpc::UniqueReply> AsyncDBClient::runCommand(OpMsgRequest request) { + invariant(_negotiatedProtocol); + auto requestMsg = rpc::messageFromOpMsgRequest(*_negotiatedProtocol, std::move(request)); + return _call(std::move(requestMsg)).then([this](Message response) -> Future<rpc::UniqueReply> { + return rpc::UniqueReply(response, rpc::makeReply(&response)); + }); +} + +Future<executor::RemoteCommandResponse> AsyncDBClient::runCommandRequest( + executor::RemoteCommandRequest request) { + auto clkSource = _svcCtx->getPreciseClockSource(); + auto start = clkSource->now(); + auto opMsgRequest = OpMsgRequest::fromDBAndBody( + std::move(request.dbname), std::move(request.cmdObj), std::move(request.metadata)); + return runCommand(std::move(opMsgRequest)) + .then([start, clkSource, this](rpc::UniqueReply response) { + auto duration = duration_cast<Milliseconds>(clkSource->now() - start); + return executor::RemoteCommandResponse(*response, duration); + }) + .onError([start, clkSource](Status status) { + auto duration = duration_cast<Milliseconds>(clkSource->now() - start); + return executor::RemoteCommandResponse(status, duration); + }); +} + +void AsyncDBClient::cancel() { + _session->cancelAsyncOperations(); +} + +bool AsyncDBClient::isStillConnected() { + return _session->isConnected(); +} + +void AsyncDBClient::end() { + _session->end(); +} + +const HostAndPort& AsyncDBClient::remote() const { + return _peer; +} + +const HostAndPort& AsyncDBClient::local() const { + return _session->local(); +} + +} // namespace mongo |