/**
* Copyright (C) 2016 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
#include "mongo/platform/basic.h"
#include "mongo/s/client/shard_remote.h"
#include
#include
#include "mongo/client/fetcher.h"
#include "mongo/client/read_preference.h"
#include "mongo/client/remote_command_retry_scheduler.h"
#include "mongo/client/remote_command_targeter.h"
#include "mongo/client/replica_set_monitor.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/query/query_request.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/rpc/metadata/server_selection_metadata.h"
#include "mongo/s/grid.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
#include "mongo/util/time_support.h"
namespace mongo {
using std::string;
using executor::RemoteCommandRequest;
using executor::RemoteCommandResponse;
using executor::TaskExecutor;
using RemoteCommandCallbackArgs = TaskExecutor::RemoteCommandCallbackArgs;
namespace {
const Status kInternalErrorStatus{ErrorCodes::InternalError,
"Invalid to check for write concern error if command failed"};
const Milliseconds kConfigCommandTimeout = Seconds{30};
const BSONObj kNoMetadata(rpc::makeEmptyMetadata());
// Include kReplSetMetadataFieldName in a request to get the shard's ReplSetMetadata in the
// response.
const BSONObj kReplMetadata(BSON(rpc::kReplSetMetadataFieldName << 1));
// Allow the command to be executed on a secondary (see ServerSelectionMetadata).
const BSONObj kSecondaryOkMetadata{rpc::ServerSelectionMetadata(true, boost::none).toBSON()};
// Helper for requesting ReplSetMetadata in the response as well as allowing the command to be
// executed on a secondary.
const BSONObj kReplSecondaryOkMetadata{[] {
BSONObjBuilder o;
o.appendElements(kSecondaryOkMetadata);
o.appendElements(kReplMetadata);
return o.obj();
}()};
/**
* Returns a new BSONObj describing the same command and arguments as 'cmdObj', but with a maxTimeMS
* set on it that is the minimum of the maxTimeMS in 'cmdObj' (if present), 'maxTimeMicros', and
* 30 seconds.
*/
BSONObj appendMaxTimeToCmdObj(OperationContext* txn, const BSONObj& cmdObj) {
Milliseconds maxTime = kConfigCommandTimeout;
bool hasTxnMaxTime = txn->hasDeadline();
bool hasUserMaxTime = !cmdObj[QueryRequest::cmdOptionMaxTimeMS].eoo();
if (hasTxnMaxTime) {
maxTime = std::min(maxTime, duration_cast(txn->getRemainingMaxTimeMicros()));
if (maxTime <= Milliseconds::zero()) {
// If there is less than 1ms remaining before the maxTime timeout expires, set the max
// time to 1ms, since setting maxTimeMs to 1ms in a command means "no max time".
maxTime = Milliseconds{1};
}
}
if (hasUserMaxTime) {
Milliseconds userMaxTime(cmdObj[QueryRequest::cmdOptionMaxTimeMS].numberLong());
if (userMaxTime <= maxTime) {
return cmdObj;
}
}
BSONObjBuilder updatedCmdBuilder;
if (hasUserMaxTime) { // Need to remove user provided maxTimeMS.
BSONObjIterator cmdObjIter(cmdObj);
const char* maxTimeFieldName = QueryRequest::cmdOptionMaxTimeMS;
while (cmdObjIter.more()) {
BSONElement e = cmdObjIter.next();
if (str::equals(e.fieldName(), maxTimeFieldName)) {
continue;
}
updatedCmdBuilder.append(e);
}
} else {
updatedCmdBuilder.appendElements(cmdObj);
}
updatedCmdBuilder.append(QueryRequest::cmdOptionMaxTimeMS,
durationCount(maxTime));
return updatedCmdBuilder.obj();
}
} // unnamed namespace
ShardRemote::ShardRemote(const ShardId& id,
const ConnectionString& originalConnString,
std::unique_ptr targeter)
: Shard(id), _originalConnString(originalConnString), _targeter(targeter.release()) {}
ShardRemote::~ShardRemote() = default;
bool ShardRemote::isRetriableError(ErrorCodes::Error code, RetryPolicy options) {
if (options == RetryPolicy::kNoRetry) {
return false;
}
const auto& retriableErrors = options == RetryPolicy::kIdempotent
? RemoteCommandRetryScheduler::kAllRetriableErrors
: RemoteCommandRetryScheduler::kNotMasterErrors;
return std::find(retriableErrors.begin(), retriableErrors.end(), code) != retriableErrors.end();
}
const ConnectionString ShardRemote::getConnString() const {
return _targeter->connectionString();
}
void ShardRemote::updateReplSetMonitor(const HostAndPort& remoteHost,
const Status& remoteCommandStatus) {
if (remoteCommandStatus.isOK())
return;
if (ErrorCodes::isNotMasterError(remoteCommandStatus.code()) ||
(remoteCommandStatus == ErrorCodes::InterruptedDueToReplStateChange)) {
_targeter->markHostNotMaster(remoteHost);
} else if (ErrorCodes::isNetworkError(remoteCommandStatus.code())) {
_targeter->markHostUnreachable(remoteHost);
} else if (remoteCommandStatus == ErrorCodes::NotMasterOrSecondary) {
_targeter->markHostUnreachable(remoteHost);
} else if (remoteCommandStatus == ErrorCodes::ExceededTimeLimit) {
_targeter->markHostUnreachable(remoteHost);
}
}
std::string ShardRemote::toString() const {
return getId().toString() + ":" + _originalConnString.toString();
}
const BSONObj& ShardRemote::_getMetadataForCommand(const ReadPreferenceSetting& readPref) {
if (isConfig()) {
if (readPref.pref == ReadPreference::PrimaryOnly) {
return kReplMetadata;
} else {
return kReplSecondaryOkMetadata;
}
} else {
if (readPref.pref == ReadPreference::PrimaryOnly) {
return kNoMetadata;
} else {
return kSecondaryOkMetadata;
}
}
}
StatusWith ShardRemote::_runCommand(OperationContext* txn,
const ReadPreferenceSetting& readPref,
const string& dbName,
const BSONObj& cmdObj) {
const BSONObj cmdWithMaxTimeMS = (isConfig() ? appendMaxTimeToCmdObj(txn, cmdObj) : cmdObj);
const auto host =
_targeter->findHost(readPref, RemoteCommandTargeter::selectFindHostMaxWaitTime(txn));
if (!host.isOK()) {
return host.getStatus();
}
RemoteCommandRequest request(host.getValue(),
dbName,
cmdWithMaxTimeMS,
_getMetadataForCommand(readPref),
isConfig() ? kConfigCommandTimeout
: executor::RemoteCommandRequest::kNoTimeout);
StatusWith swResponse =
Status(ErrorCodes::InternalError, "Internal error running command");
TaskExecutor* executor = Grid::get(txn)->getExecutorPool()->getFixedExecutor();
auto callStatus = executor->scheduleRemoteCommand(
request,
[&swResponse](const RemoteCommandCallbackArgs& args) { swResponse = args.response; });
if (!callStatus.isOK()) {
return callStatus.getStatus();
}
// Block until the command is carried out
executor->wait(callStatus.getValue());
updateReplSetMonitor(host.getValue(), swResponse.getStatus());
if (!swResponse.isOK()) {
if (swResponse.getStatus().compareCode(ErrorCodes::ExceededTimeLimit)) {
LOG(0) << "Operation timed out with status " << swResponse.getStatus();
}
return swResponse.getStatus();
}
BSONObj responseObj = swResponse.getValue().data.getOwned();
BSONObj responseMetadata = swResponse.getValue().metadata.getOwned();
Status commandStatus = getStatusFromCommandResult(responseObj);
Status writeConcernStatus = getWriteConcernStatusFromCommandResult(responseObj);
updateReplSetMonitor(host.getValue(), commandStatus);
return CommandResponse(std::move(responseObj),
std::move(responseMetadata),
std::move(commandStatus),
std::move(writeConcernStatus));
}
StatusWith ShardRemote::_exhaustiveFindOnConfig(
OperationContext* txn,
const ReadPreferenceSetting& readPref,
const NamespaceString& nss,
const BSONObj& query,
const BSONObj& sort,
boost::optional limit) {
// Do not allow exhaustive finds to be run against regular shards.
invariant(getId() == "config");
const auto host =
_targeter->findHost(readPref, RemoteCommandTargeter::selectFindHostMaxWaitTime(txn));
if (!host.isOK()) {
return host.getStatus();
}
QueryResponse response;
// If for some reason the callback never gets invoked, we will return this status in response.
Status status = Status(ErrorCodes::InternalError, "Internal error running find command");
auto fetcherCallback =
[this, &status, &response](const Fetcher::QueryResponseStatus& dataStatus,
Fetcher::NextAction* nextAction,
BSONObjBuilder* getMoreBob) {
// Throw out any accumulated results on error
if (!dataStatus.isOK()) {
status = dataStatus.getStatus();
response.docs.clear();
return;
}
auto& data = dataStatus.getValue();
if (data.otherFields.metadata.hasField(rpc::kReplSetMetadataFieldName)) {
auto replParseStatus =
rpc::ReplSetMetadata::readFromMetadata(data.otherFields.metadata);
if (!replParseStatus.isOK()) {
status = replParseStatus.getStatus();
response.docs.clear();
return;
}
response.opTime = replParseStatus.getValue().getLastOpVisible();
// We return the config opTime that was returned for this particular request, but as
// a safeguard we ensure our global configOpTime is at least as large as it.
invariant(grid.configOpTime() >= response.opTime);
}
for (const BSONObj& doc : data.documents) {
response.docs.push_back(doc.getOwned());
}
status = Status::OK();
if (!getMoreBob) {
return;
}
getMoreBob->append("getMore", data.cursorId);
getMoreBob->append("collection", data.nss.coll());
};
BSONObj readConcernObj;
{
const repl::ReadConcernArgs readConcern{grid.configOpTime(),
repl::ReadConcernLevel::kMajorityReadConcern};
BSONObjBuilder bob;
readConcern.appendInfo(&bob);
readConcernObj =
bob.done().getObjectField(repl::ReadConcernArgs::kReadConcernFieldName).getOwned();
}
auto qr = stdx::make_unique(nss);
qr->setFilter(query);
qr->setSort(sort);
qr->setReadConcern(readConcernObj);
qr->setLimit(limit);
BSONObjBuilder findCmdBuilder;
qr->asFindCommand(&findCmdBuilder);
Microseconds maxTime = std::min(duration_cast(kConfigCommandTimeout),
txn->getRemainingMaxTimeMicros());
if (maxTime < Milliseconds{1}) {
// If there is less than 1ms remaining before the maxTime timeout expires, set the max time
// to 1ms, since setting maxTimeMs to 1ms in a find command means "no max time".
maxTime = Milliseconds{1};
}
findCmdBuilder.append(QueryRequest::cmdOptionMaxTimeMS, durationCount(maxTime));
Fetcher fetcher(Grid::get(txn)->getExecutorPool()->getFixedExecutor(),
host.getValue(),
nss.db().toString(),
findCmdBuilder.done(),
fetcherCallback,
_getMetadataForCommand(readPref),
duration_cast(maxTime));
Status scheduleStatus = fetcher.schedule();
if (!scheduleStatus.isOK()) {
return scheduleStatus;
}
fetcher.wait();
updateReplSetMonitor(host.getValue(), status);
if (!status.isOK()) {
if (status.compareCode(ErrorCodes::ExceededTimeLimit)) {
LOG(0) << "Operation timed out with status " << status;
}
return status;
}
return response;
}
} // namespace mongo