/** * Copyright (C) 2015 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/platform/basic.h" #include "mongo/client/remote_command_runner_impl.h" #include "mongo/base/status_with.h" #include "mongo/db/namespace_string.h" #include "mongo/db/query/cursor_response.h" #include "mongo/db/query/getmore_request.h" #include "mongo/executor/downconvert_find_and_getmore_commands.h" #include "mongo/executor/network_connection_hook.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/rpc/protocol.h" #include "mongo/util/assert_util.h" namespace mongo { namespace { using executor::RemoteCommandRequest; using executor::RemoteCommandResponse; /** * Calculates the timeout for a network operation expiring at "expDate", given * that it is now "nowDate". * * Returns 0ms to indicate no expiration date, a number of milliseconds until "expDate", or * ErrorCodes::ExceededTimeLimit if "expDate" is not later than "nowDate". */ StatusWith getTimeoutMillis(const Date_t expDate, const Date_t nowDate) { if (expDate == RemoteCommandRequest::kNoExpirationDate) { return Milliseconds(0); } if (expDate <= nowDate) { return {ErrorCodes::ExceededTimeLimit, str::stream() << "Went to run command, but it was too late. " "Expiration was set to " << dateToISOStringUTC(expDate)}; } return expDate - nowDate; } /** * Peeks at error in cursor. If an error has occurred, converts the {$err: "...", code: N} * cursor error to a Status. */ Status getStatusFromCursorResult(DBClientCursor& cursor) { BSONObj error; if (!cursor.peekError(&error)) { return Status::OK(); } BSONElement e = error.getField("code"); return Status(e.isNumber() ? ErrorCodes::fromInt(e.numberInt()) : ErrorCodes::UnknownError, getErrField(error).valuestrsafe()); } using RequestDownconverter = StatusWith(*)(const RemoteCommandRequest&); using ReplyUpconverter = StatusWith(*)(std::uint32_t requestId, StringData cursorNamespace, const Message& response); template StatusWith runDownconvertedCommand(DBClientConnection* conn, const RemoteCommandRequest& request) { auto swDownconvertedRequest = downconvertRequest(request); if (!swDownconvertedRequest.isOK()) { return swDownconvertedRequest.getStatus(); } Message requestMsg{std::move(swDownconvertedRequest.getValue())}; Message responseMsg; try { conn->call(requestMsg, responseMsg, true, nullptr); } catch (...) { return exceptionToStatus(); } auto messageId = requestMsg.header().getId(); return upconvertReply(messageId, DbMessage(requestMsg).getns(), responseMsg); } /** * Downconverts the specified find command to a find protocol operation and sends it to the * server on the specified connection. */ StatusWith runDownconvertedFindCommand(DBClientConnection* conn, const RemoteCommandRequest& request) { return runDownconvertedCommand(conn, request); } /** * Downconverts the specified getMore command to legacy getMore operation and sends it to the * server on the specified connection. */ StatusWith runDownconvertedGetMoreCommand( DBClientConnection* conn, const RemoteCommandRequest& request) { return runDownconvertedCommand(conn, request); } } // namespace RemoteCommandRunnerImpl::RemoteCommandRunnerImpl( int messagingTags, std::unique_ptr hook) : _connPool(messagingTags, std::move(hook)) {} RemoteCommandRunnerImpl::~RemoteCommandRunnerImpl() { invariant(!_active); } void RemoteCommandRunnerImpl::startup() { _active = true; } void RemoteCommandRunnerImpl::shutdown() { if (!_active) { return; } _connPool.closeAllInUseConnections(); _active = false; } StatusWith RemoteCommandRunnerImpl::runCommand( const RemoteCommandRequest& request) { try { const Date_t requestStartDate = Date_t::now(); const auto timeoutMillis = getTimeoutMillis(request.expirationDate, requestStartDate); if (!timeoutMillis.isOK()) { return StatusWith(timeoutMillis.getStatus()); } ConnectionPool::ConnectionPtr conn( &_connPool, request.target, requestStartDate, timeoutMillis.getValue()); BSONObj output; BSONObj metadata; // If remote server does not support either find or getMore commands, down convert // to using DBClientInterface::query()/getMore(). // Perform down conversion based on wire protocol version. // 'commandName' will be an empty string if the command object is an empty BSON // document. StringData commandName = request.cmdObj.firstElement().fieldNameStringData(); const auto isFindCmd = commandName == LiteParsedQuery::kFindCommandName; const auto isGetMoreCmd = commandName == GetMoreRequest::kGetMoreCommandName; const auto isFindOrGetMoreCmd = isFindCmd || isGetMoreCmd; // We are using the wire version to check if we need to downconverting find/getMore // requests because coincidentally, the find/getMore command is only supported by // servers that also accept OP_COMMAND. bool supportsFindAndGetMoreCommands = rpc::supportsWireVersionForOpCommandInMongod( conn.get()->getMinWireVersion(), conn.get()->getMaxWireVersion()); if (!isFindOrGetMoreCmd || supportsFindAndGetMoreCommands) { rpc::UniqueReply commandResponse = conn.get()->runCommandWithMetadata(request.dbname, request.cmdObj.firstElementFieldName(), request.metadata, request.cmdObj); output = commandResponse->getCommandReply().getOwned(); metadata = commandResponse->getMetadata().getOwned(); } else if (isFindCmd) { return runDownconvertedFindCommand(conn.get(), request); } else if (isGetMoreCmd) { return runDownconvertedGetMoreCommand(conn.get(), request); } const Date_t requestFinishDate = Date_t::now(); conn.done(requestFinishDate); return StatusWith( RemoteCommandResponse(std::move(output), std::move(metadata), Milliseconds(requestFinishDate - requestStartDate))); } catch (const DBException& ex) { return StatusWith(ex.toStatus()); } catch (const std::exception& ex) { return StatusWith( ErrorCodes::UnknownError, str::stream() << "Sending command " << request.cmdObj << " on database " << request.dbname << " over network to " << request.target.toString() << " received exception " << ex.what()); } } } // namespace mongo