/**
* Copyright (C) 2015 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/platform/basic.h"
#include "mongo/client/remote_command_runner_impl.h"
#include "mongo/db/commands.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/query/cursor_responses.h"
#include "mongo/db/query/getmore_request.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/base/status_with.h"
#include "mongo/util/assert_util.h"
namespace mongo {
namespace {
/**
* Calculates the timeout for a network operation expiring at "expDate", given
* that it is now "nowDate".
*
* Returns 0ms to indicate no expiration date, a number of milliseconds until "expDate", or
* ErrorCodes::ExceededTimeLimit if "expDate" is not later than "nowDate".
*/
StatusWith getTimeoutMillis(const Date_t expDate, const Date_t nowDate) {
if (expDate == RemoteCommandRequest::kNoExpirationDate) {
return Milliseconds(0);
}
if (expDate <= nowDate) {
return {ErrorCodes::ExceededTimeLimit,
str::stream() << "Went to run command, but it was too late. "
"Expiration was set to " << dateToISOStringUTC(expDate)};
}
return expDate - nowDate;
}
/**
* Updates command output document with status.
*/
BSONObj getCommandResultFromStatus(const Status& status) {
BSONObjBuilder result;
Command::appendCommandStatus(result, status);
return result.obj();
}
/**
* Peeks at error in cursor. If an error has occurred, converts the {$err: "...", code: N}
* cursor error to a Status.
*/
Status getStatusFromCursorResult(DBClientCursor& cursor) {
BSONObj error;
if (!cursor.peekError(&error)) {
return Status::OK();
}
BSONElement e = error.getField("code");
return Status(e.isNumber() ? ErrorCodes::fromInt(e.numberInt()) :
ErrorCodes::UnknownError,
getErrField(error).valuestrsafe());
}
/**
* Downconverts the specified find command to a find protocol operation and sends it to the
* server on the specified connection.
*/
Status runDownconvertedFindCommand(DBClientConnection* conn,
const std::string& dbname,
const BSONObj& cmdObj,
BSONObj* output) {
const NamespaceString nss(dbname, cmdObj.firstElement().String());
const std::string& ns = nss.ns();
std::unique_ptr lpq;
{
LiteParsedQuery* lpqRaw;
// It is a little heavy handed to use LiteParsedQuery to convert the command object to
// query() arguments but we get validation and consistent behavior with the find
// command implementation on the remote server.
Status status = LiteParsedQuery::make(ns, cmdObj, false, &lpqRaw);
if (!status.isOK()) {
*output = getCommandResultFromStatus(status);
return status;
}
lpq.reset(lpqRaw);
}
Query query(lpq->getFilter());
if (!lpq->getSort().isEmpty()) { query.sort(lpq->getSort()); }
if (!lpq->getHint().isEmpty()) { query.hint(lpq->getHint()); }
if (!lpq->getMin().isEmpty()) { query.minKey(lpq->getMin()); }
if (!lpq->getMax().isEmpty()) { query.minKey(lpq->getMax()); }
if (lpq->isExplain()) { query.explain(); }
if (lpq->isSnapshot()) { query.snapshot(); }
int nToReturn = lpq->getLimit().value_or(0) * -1;
int nToSkip = lpq->getSkip();
const BSONObj* fieldsToReturn = &lpq->getProj();
int queryOptions = lpq->getOptions();
int batchSize = lpq->getBatchSize().value_or(0);
std::unique_ptr cursor =
conn->query(ns, query, nToReturn, nToSkip, fieldsToReturn, queryOptions, batchSize);
cursor->decouple();
Status status = getStatusFromCursorResult(*cursor);
if (!status.isOK()) {
*output = getCommandResultFromStatus(status);
return status;
}
BSONArrayBuilder batch;
while (cursor->moreInCurrentBatch()) {
batch.append(cursor->next());
}
BSONObjBuilder result;
appendCursorResponseObject(cursor->getCursorId(), ns, batch.arr(), &result);
Command::appendCommandStatus(result, Status::OK());
*output = result.obj();
return Status::OK();
}
/**
* Downconverts the specified getMore command to legacy getMore operation and sends it to the
* server on the specified connection.
*/
Status runDownconvertedGetMoreCommand(DBClientConnection* conn,
const std::string& dbname,
const BSONObj& cmdObj,
BSONObj* output) {
StatusWith parseResult = GetMoreRequest::parseFromBSON(dbname, cmdObj);
if (!parseResult.isOK()) {
const Status& status = parseResult.getStatus();
*output = getCommandResultFromStatus(status);
return status;
}
const GetMoreRequest& req = parseResult.getValue();
const std::string& ns = req.nss.ns();
std::unique_ptr cursor = conn->getMore(ns, req.cursorid,
req.batchSize.value_or(0));
cursor->decouple();
Status status = getStatusFromCursorResult(*cursor);
if (!status.isOK()) {
*output = getCommandResultFromStatus(status);
return status;
}
BSONArrayBuilder batch;
while (cursor->moreInCurrentBatch()) {
batch.append(cursor->next());
}
BSONObjBuilder result;
appendGetMoreResponseObject(cursor->getCursorId(), ns, batch.arr(), &result);
Command::appendCommandStatus(result, Status::OK());
*output = result.obj();
return Status::OK();
}
} //namespace
RemoteCommandRunnerImpl::RemoteCommandRunnerImpl(int messagingPortTags)
: _connPool(messagingPortTags),
_shutDown(false) {
}
RemoteCommandRunnerImpl::~RemoteCommandRunnerImpl() {
invariant(_shutDown);
}
void RemoteCommandRunnerImpl::shutdown() {
if (_shutDown) {
return;
}
_shutDown = true;
_connPool.closeAllInUseConnections();
}
StatusWith RemoteCommandRunnerImpl::runCommand(
const RemoteCommandRequest& request) {
try {
BSONObj output;
const Date_t requestStartDate = Date_t::now();
const auto timeoutMillis = getTimeoutMillis(request.expirationDate, requestStartDate);
if (!timeoutMillis.isOK()) {
return StatusWith(timeoutMillis.getStatus());
}
ConnectionPool::ConnectionPtr conn(&_connPool,
request.target,
requestStartDate,
timeoutMillis.getValue());
bool ok = conn.get()->runCommand(request.dbname, request.cmdObj, output);
// If remote server does not support either find or getMore commands, down convert
// to using DBClientInterface::query()/getMore().
// TODO: Perform down conversion based on wire protocol version.
// Refer to the down conversion implementation in the shell.
if (!ok &&
getStatusFromCommandResult(output).code() == ErrorCodes::CommandNotFound) {
// 'commandName' will be an empty string if the command object is an empty BSON
// document.
StringData commandName = request.cmdObj.firstElement().fieldNameStringData();
if (commandName == "find") {
runDownconvertedFindCommand(
conn.get(),
request.dbname,
request.cmdObj,
&output);
}
else if (commandName == "getMore") {
runDownconvertedGetMoreCommand(
conn.get(),
request.dbname,
request.cmdObj,
&output);
}
}
const Date_t requestFinishDate = Date_t::now();
conn.done(requestFinishDate);
return StatusWith(
RemoteCommandResponse(output,
Milliseconds(requestFinishDate - requestStartDate)));
}
catch (const DBException& ex) {
return StatusWith(ex.toStatus());
}
catch (const std::exception& ex) {
return StatusWith(
ErrorCodes::UnknownError,
str::stream() << "Sending command " << request.cmdObj << " on database "
<< request.dbname << " over network to "
<< request.target.toString() << " received exception "
<< ex.what());
}
}
} // namespace mongo