/**
* Copyright (C) 2010 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
#include "mongo/platform/basic.h"
#include "mongo/s/commands/strategy.h"
#include "mongo/base/data_cursor.h"
#include "mongo/base/init.h"
#include "mongo/base/status.h"
#include "mongo/bson/util/bson_extract.h"
#include "mongo/bson/util/builder.h"
#include "mongo/db/audit.h"
#include "mongo/db/auth/action_type.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/commands.h"
#include "mongo/db/initialize_operation_session_info.h"
#include "mongo/db/lasterror.h"
#include "mongo/db/logical_clock.h"
#include "mongo/db/logical_session_id_helpers.h"
#include "mongo/db/logical_time_validator.h"
#include "mongo/db/matcher/extensions_callback_noop.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_time_tracker.h"
#include "mongo/db/ops/write_ops.h"
#include "mongo/db/query/find_common.h"
#include "mongo/db/query/getmore_request.h"
#include "mongo/db/query/query_request.h"
#include "mongo/db/stats/counters.h"
#include "mongo/db/views/resolved_view.h"
#include "mongo/rpc/factory.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/rpc/metadata/logical_time_metadata.h"
#include "mongo/rpc/metadata/tracking_metadata.h"
#include "mongo/s/catalog_cache.h"
#include "mongo/s/client/parallel.h"
#include "mongo/s/client/shard_connection.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/commands/cluster_commands_helpers.h"
#include "mongo/s/commands/cluster_explain.h"
#include "mongo/s/grid.h"
#include "mongo/s/query/cluster_cursor_manager.h"
#include "mongo/s/query/cluster_find.h"
#include "mongo/s/stale_exception.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
#include "mongo/util/net/op_msg.h"
#include "mongo/util/scopeguard.h"
#include "mongo/util/timer.h"
namespace mongo {
namespace {
const auto kOperationTime = "operationTime"_sd;
/**
* Extract and process metadata from the command request body.
*/
Status processCommandMetadata(OperationContext* opCtx, const BSONObj& cmdObj) {
ReadPreferenceSetting::get(opCtx) =
uassertStatusOK(ReadPreferenceSetting::fromContainingBSON(cmdObj));
auto logicalClock = LogicalClock::get(opCtx);
invariant(logicalClock);
auto logicalTimeMetadata = rpc::LogicalTimeMetadata::readFromMetadata(cmdObj);
if (!logicalTimeMetadata.isOK()) {
return logicalTimeMetadata.getStatus();
}
auto logicalTimeValidator = LogicalTimeValidator::get(opCtx);
const auto& signedTime = logicalTimeMetadata.getValue().getSignedTime();
// No need to check proof is no time is given.
if (signedTime.getTime() == LogicalTime::kUninitialized) {
return Status::OK();
}
if (!LogicalTimeValidator::isAuthorizedToAdvanceClock(opCtx)) {
auto advanceClockStatus = logicalTimeValidator->validate(opCtx, signedTime);
if (!advanceClockStatus.isOK()) {
return advanceClockStatus;
}
}
return logicalClock->advanceClusterTime(signedTime.getTime());
}
/**
* Append required fields to command response.
*/
void appendRequiredFieldsToResponse(OperationContext* opCtx, BSONObjBuilder* responseBuilder) {
auto validator = LogicalTimeValidator::get(opCtx);
if (validator->shouldGossipLogicalTime()) {
// Add $clusterTime.
auto now = LogicalClock::get(opCtx)->getClusterTime();
if (LogicalTimeValidator::isAuthorizedToAdvanceClock(opCtx)) {
SignedLogicalTime dummySignedTime(now, TimeProofService::TimeProof(), 0);
rpc::LogicalTimeMetadata(dummySignedTime).writeToMetadata(responseBuilder);
} else {
auto currentTime = validator->signLogicalTime(opCtx, now);
rpc::LogicalTimeMetadata(currentTime).writeToMetadata(responseBuilder);
}
// Add operationTime.
auto operationTime = OperationTimeTracker::get(opCtx)->getMaxOperationTime();
if (operationTime != LogicalTime::kUninitialized) {
responseBuilder->append(kOperationTime, operationTime.asTimestamp());
} else if (now != LogicalTime::kUninitialized) {
// If we don't know the actual operation time, use the cluster time instead. This is
// safe but not optimal because we can always return a later operation time than actual.
responseBuilder->append(kOperationTime, now.asTimestamp());
}
}
}
void execCommandClient(OperationContext* opCtx,
Command* c,
const OpMsgRequest& request,
BSONObjBuilder& result) {
ON_BLOCK_EXIT([opCtx, &result] { appendRequiredFieldsToResponse(opCtx, &result); });
const auto dbname = request.getDatabase();
uassert(ErrorCodes::IllegalOperation,
"Can't use 'local' database through mongos",
dbname != NamespaceString::kLocalDb);
uassert(ErrorCodes::InvalidNamespace,
str::stream() << "Invalid database name: '" << dbname << "'",
NamespaceString::validDBName(dbname, NamespaceString::DollarInDbNameBehavior::Allow));
StringMap topLevelFields;
for (auto&& element : request.body) {
StringData fieldName = element.fieldNameStringData();
if (fieldName == "help" && element.type() == Bool && element.Bool()) {
std::stringstream help;
help << "help for: " << c->getName() << " ";
help << c->help();
result.append("help", help.str());
CommandHelpers::appendCommandStatus(result, true, "");
return;
}
uassert(ErrorCodes::FailedToParse,
str::stream() << "Parsed command object contains duplicate top level key: "
<< fieldName,
topLevelFields[fieldName]++ == 0);
}
Status status = Command::checkAuthorization(c, opCtx, request);
if (!status.isOK()) {
CommandHelpers::appendCommandStatus(result, status);
return;
}
c->incrementCommandsExecuted();
if (c->shouldAffectCommandCounter()) {
globalOpCounters.gotCommand();
}
StatusWith wcResult =
WriteConcernOptions::extractWCFromCommand(request.body);
if (!wcResult.isOK()) {
CommandHelpers::appendCommandStatus(result, wcResult.getStatus());
return;
}
bool supportsWriteConcern = c->supportsWriteConcern(request.body);
if (!supportsWriteConcern && !wcResult.getValue().usedDefault) {
// This command doesn't do writes so it should not be passed a writeConcern.
// If we did not use the default writeConcern, one was provided when it shouldn't have
// been by the user.
CommandHelpers::appendCommandStatus(
result, Status(ErrorCodes::InvalidOptions, "Command does not support writeConcern"));
return;
}
repl::ReadConcernArgs readConcernArgs;
auto readConcernParseStatus = readConcernArgs.initialize(request.body);
if (!readConcernParseStatus.isOK()) {
CommandHelpers::appendCommandStatus(result, readConcernParseStatus);
return;
}
if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern) {
CommandHelpers::appendCommandStatus(
result,
Status(ErrorCodes::InvalidOptions, "read concern snapshot is not supported on mongos"));
return;
}
// attach tracking
rpc::TrackingMetadata trackingMetadata;
trackingMetadata.initWithOperName(c->getName());
rpc::TrackingMetadata::get(opCtx) = trackingMetadata;
auto metadataStatus = processCommandMetadata(opCtx, request.body);
if (!metadataStatus.isOK()) {
CommandHelpers::appendCommandStatus(result, metadataStatus);
return;
}
bool ok = false;
if (!supportsWriteConcern) {
ok = c->publicRun(opCtx, request, result);
} else {
// Change the write concern while running the command.
const auto oldWC = opCtx->getWriteConcern();
ON_BLOCK_EXIT([&] { opCtx->setWriteConcern(oldWC); });
opCtx->setWriteConcern(wcResult.getValue());
ok = c->publicRun(opCtx, request, result);
}
if (!ok) {
c->incrementCommandsFailed();
}
CommandHelpers::appendCommandStatus(result, ok);
}
void runCommand(OperationContext* opCtx, const OpMsgRequest& request, BSONObjBuilder&& builder) {
// Handle command option maxTimeMS first thing while processing the command so that the
// subsequent code has the deadline available
uassert(ErrorCodes::InvalidOptions,
"no such command option $maxTimeMs; use maxTimeMS instead",
request.body[QueryRequest::queryOptionMaxTimeMS].eoo());
const int maxTimeMS = uassertStatusOK(
QueryRequest::parseMaxTimeMS(request.body[QueryRequest::cmdOptionMaxTimeMS]));
if (maxTimeMS > 0) {
opCtx->setDeadlineAfterNowBy(Milliseconds{maxTimeMS});
}
auto const commandName = request.getCommandName();
auto const command = CommandHelpers::findCommand(commandName);
if (!command) {
ON_BLOCK_EXIT([opCtx, &builder] { appendRequiredFieldsToResponse(opCtx, &builder); });
CommandHelpers::appendCommandStatus(
builder,
{ErrorCodes::CommandNotFound, str::stream() << "no such cmd: " << commandName});
globalCommandRegistry()->incrementUnknownCommands();
return;
}
initializeOperationSessionInfo(opCtx, request.body, command->requiresAuth(), true, true);
int loops = 5;
while (true) {
builder.resetToEmpty();
try {
execCommandClient(opCtx, command, request, builder);
return;
} catch (const StaleConfigException& e) {
if (e->getns().empty()) {
// This should be impossible but older versions tried incorrectly to handle it here.
log() << "Received a stale config error with an empty namespace while executing "
<< redact(request.body) << " : " << redact(e);
throw;
}
if (loops <= 0)
throw e;
loops--;
log() << "Retrying command " << redact(request.body) << causedBy(e);
ShardConnection::checkMyConnectionVersions(opCtx, e->getns());
if (loops < 4) {
const NamespaceString staleNSS(e->getns());
if (staleNSS.isValid()) {
Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(staleNSS);
}
}
continue;
} catch (const DBException& e) {
ON_BLOCK_EXIT([opCtx, &builder] { appendRequiredFieldsToResponse(opCtx, &builder); });
builder.resetToEmpty();
command->incrementCommandsFailed();
CommandHelpers::appendCommandStatus(builder, e.toStatus());
LastError::get(opCtx->getClient()).setLastError(e.code(), e.reason());
return;
}
MONGO_UNREACHABLE;
}
}
} // namespace
DbResponse Strategy::queryOp(OperationContext* opCtx, const NamespaceString& nss, DbMessage* dbm) {
globalOpCounters.gotQuery();
const QueryMessage q(*dbm);
Client* const client = opCtx->getClient();
AuthorizationSession* const authSession = AuthorizationSession::get(client);
Status status = authSession->checkAuthForFind(nss, false);
audit::logQueryAuthzCheck(client, nss, q.query, status.code());
uassertStatusOK(status);
LOG(3) << "query: " << q.ns << " " << redact(q.query) << " ntoreturn: " << q.ntoreturn
<< " options: " << q.queryOptions;
if (q.queryOptions & QueryOption_Exhaust) {
uasserted(18526,
str::stream() << "The 'exhaust' query option is invalid for mongos queries: "
<< nss.ns()
<< " "
<< q.query.toString());
}
// Determine the default read preference mode based on the value of the slaveOk flag.
const auto defaultReadPref = q.queryOptions & QueryOption_SlaveOk
? ReadPreference::SecondaryPreferred
: ReadPreference::PrimaryOnly;
ReadPreferenceSetting::get(opCtx) =
uassertStatusOK(ReadPreferenceSetting::fromContainingBSON(q.query, defaultReadPref));
const boost::intrusive_ptr expCtx;
auto canonicalQuery = uassertStatusOK(
CanonicalQuery::canonicalize(opCtx,
q,
expCtx,
ExtensionsCallbackNoop(),
MatchExpressionParser::kAllowAllSpecialFeatures));
// If the $explain flag was set, we must run the operation on the shards as an explain command
// rather than a find command.
const QueryRequest& queryRequest = canonicalQuery->getQueryRequest();
if (queryRequest.isExplain()) {
const BSONObj findCommand = queryRequest.asFindCommand();
// We default to allPlansExecution verbosity.
const auto verbosity = ExplainOptions::Verbosity::kExecAllPlans;
BSONObjBuilder explainBuilder;
Strategy::explainFind(opCtx,
findCommand,
queryRequest,
verbosity,
ReadPreferenceSetting::get(opCtx),
&explainBuilder);
BSONObj explainObj = explainBuilder.done();
return replyToQuery(explainObj);
}
// Do the work to generate the first batch of results. This blocks waiting to get responses from
// the shard(s).
std::vector batch;
// 0 means the cursor is exhausted. Otherwise we assume that a cursor with the returned id can
// be retrieved via the ClusterCursorManager.
CursorId cursorId;
try {
cursorId = ClusterFind::runQuery(
opCtx, *canonicalQuery, ReadPreferenceSetting::get(opCtx), &batch);
} catch (const ExceptionFor&) {
uasserted(40247, "OP_QUERY not supported on views");
}
// Fill out the response buffer.
int numResults = 0;
OpQueryReplyBuilder reply;
for (auto&& obj : batch) {
obj.appendSelfToBufBuilder(reply.bufBuilderForResults());
numResults++;
}
return DbResponse{reply.toQueryReply(0, // query result flags
numResults,
0, // startingFrom
cursorId)};
}
DbResponse Strategy::clientCommand(OperationContext* opCtx, const Message& m) {
auto reply = rpc::makeReplyBuilder(rpc::protocolForMessage(m));
[&] {
OpMsgRequest request;
std::string db;
try { // Parse.
request = rpc::opMsgRequestFromAnyProtocol(m);
db = request.getDatabase().toString();
} catch (const DBException& ex) {
// If this error needs to fail the connection, propagate it out.
if (ErrorCodes::isConnectionFatalMessageParseError(ex.code()))
throw;
LOG(1) << "Exception thrown while parsing command " << causedBy(redact(ex));
reply->reset();
auto bob = reply->getInPlaceReplyBuilder(0);
CommandHelpers::appendCommandStatus(bob, ex.toStatus());
appendRequiredFieldsToResponse(opCtx, &bob);
return; // From lambda. Don't try executing if parsing failed.
}
try { // Execute.
LOG(3) << "Command begin db: " << db << " msg id: " << m.header().getId();
runCommand(opCtx, request, reply->getInPlaceReplyBuilder(0));
LOG(3) << "Command end db: " << db << " msg id: " << m.header().getId();
} catch (const DBException& ex) {
LOG(1) << "Exception thrown while processing command on " << db
<< " msg id: " << m.header().getId() << causedBy(redact(ex));
reply->reset();
auto bob = reply->getInPlaceReplyBuilder(0);
CommandHelpers::appendCommandStatus(bob, ex.toStatus());
appendRequiredFieldsToResponse(opCtx, &bob);
}
}();
if (OpMsg::isFlagSet(m, OpMsg::kMoreToCome)) {
return {}; // Don't reply.
}
reply->setMetadata(BSONObj()); // mongos doesn't use metadata but the API requires this call.
return DbResponse{reply->done()};
}
void Strategy::commandOp(OperationContext* opCtx,
const std::string& db,
const BSONObj& command,
const std::string& versionedNS,
const BSONObj& targetingQuery,
const BSONObj& targetingCollation,
std::vector* results) {
QuerySpec qSpec(db + ".$cmd", command, BSONObj(), 0, 1, 0);
ParallelSortClusteredCursor cursor(
qSpec, CommandInfo(versionedNS, targetingQuery, targetingCollation));
// Initialize the cursor
cursor.init(opCtx);
std::set shardIds;
cursor.getQueryShardIds(shardIds);
for (const ShardId& shardId : shardIds) {
CommandResult result;
result.shardTargetId = shardId;
result.target = fassertStatusOK(
34417, ConnectionString::parse(cursor.getShardCursor(shardId)->originalHost()));
result.result = cursor.getShardCursor(shardId)->peekFirst().getOwned();
results->push_back(result);
}
}
DbResponse Strategy::getMore(OperationContext* opCtx, const NamespaceString& nss, DbMessage* dbm) {
const int ntoreturn = dbm->pullInt();
uassert(
34424, str::stream() << "Invalid ntoreturn for OP_GET_MORE: " << ntoreturn, ntoreturn >= 0);
const long long cursorId = dbm->pullInt64();
globalOpCounters.gotGetMore();
// TODO: Handle stale config exceptions here from coll being dropped or sharded during op for
// now has same semantics as legacy request.
auto statusGetDb = Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, nss.db());
if (statusGetDb == ErrorCodes::NamespaceNotFound) {
return replyToQuery(ResultFlag_CursorNotFound, nullptr, 0, 0);
}
uassertStatusOK(statusGetDb);
boost::optional batchSize;
if (ntoreturn) {
batchSize = ntoreturn;
}
GetMoreRequest getMoreRequest(nss, cursorId, batchSize, boost::none, boost::none, boost::none);
auto cursorResponse = ClusterFind::runGetMore(opCtx, getMoreRequest);
if (cursorResponse == ErrorCodes::CursorNotFound) {
return replyToQuery(ResultFlag_CursorNotFound, nullptr, 0, 0);
}
uassertStatusOK(cursorResponse.getStatus());
// Build the response document.
BufBuilder buffer(FindCommon::kInitReplyBufferSize);
int numResults = 0;
for (const auto& obj : cursorResponse.getValue().getBatch()) {
buffer.appendBuf((void*)obj.objdata(), obj.objsize());
++numResults;
}
return replyToQuery(0,
buffer.buf(),
buffer.len(),
numResults,
cursorResponse.getValue().getNumReturnedSoFar().value_or(0),
cursorResponse.getValue().getCursorId());
}
void Strategy::killCursors(OperationContext* opCtx, DbMessage* dbm) {
const int numCursors = dbm->pullInt();
massert(34425,
str::stream() << "Invalid killCursors message. numCursors: " << numCursors
<< ", message size: "
<< dbm->msg().dataSize()
<< ".",
dbm->msg().dataSize() == 8 + (8 * numCursors));
uassert(28794,
str::stream() << "numCursors must be between 1 and 29999. numCursors: " << numCursors
<< ".",
numCursors >= 1 && numCursors < 30000);
globalOpCounters.gotOp(dbKillCursors, false);
ConstDataCursor cursors(dbm->getArray(numCursors));
Client* const client = opCtx->getClient();
ClusterCursorManager* const manager = Grid::get(opCtx)->getCursorManager();
for (int i = 0; i < numCursors; ++i) {
const CursorId cursorId = cursors.readAndAdvance>();
boost::optional nss = manager->getNamespaceForCursorId(cursorId);
if (!nss) {
LOG(3) << "Can't find cursor to kill. Cursor id: " << cursorId << ".";
continue;
}
auto authzSession = AuthorizationSession::get(client);
auto authChecker = [&authzSession, &nss](UserNameIterator userNames) -> Status {
return authzSession->checkAuthForKillCursors(*nss, userNames);
};
auto authzStatus = manager->checkAuthForKillCursors(opCtx, *nss, cursorId, authChecker);
audit::logKillCursorsAuthzCheck(client, *nss, cursorId, authzStatus.code());
if (!authzStatus.isOK()) {
LOG(3) << "Not authorized to kill cursor. Namespace: '" << *nss
<< "', cursor id: " << cursorId << ".";
continue;
}
Status killCursorStatus = manager->killCursor(opCtx, *nss, cursorId);
if (!killCursorStatus.isOK()) {
LOG(3) << "Can't find cursor to kill. Namespace: '" << *nss
<< "', cursor id: " << cursorId << ".";
continue;
}
LOG(3) << "Killed cursor. Namespace: '" << *nss << "', cursor id: " << cursorId << ".";
}
}
void Strategy::writeOp(OperationContext* opCtx, DbMessage* dbm) {
runCommand(opCtx,
[&]() {
const auto& msg = dbm->msg();
switch (msg.operation()) {
case dbInsert: {
return InsertOp::parseLegacy(msg).serialize({});
}
case dbUpdate: {
return UpdateOp::parseLegacy(msg).serialize({});
}
case dbDelete: {
return DeleteOp::parseLegacy(msg).serialize({});
}
default:
MONGO_UNREACHABLE;
}
}(),
BSONObjBuilder());
}
void Strategy::explainFind(OperationContext* opCtx,
const BSONObj& findCommand,
const QueryRequest& qr,
ExplainOptions::Verbosity verbosity,
const ReadPreferenceSetting& readPref,
BSONObjBuilder* out) {
const auto explainCmd = ClusterExplain::wrapAsExplain(findCommand, verbosity);
// We will time how long it takes to run the commands on the shards.
Timer timer;
auto shardResponses =
scatterGatherVersionedTargetByRoutingTable(opCtx,
qr.nss().db().toString(),
qr.nss(),
explainCmd,
readPref,
Shard::RetryPolicy::kIdempotent,
qr.getFilter(),
qr.getCollation());
long long millisElapsed = timer.millis();
const char* mongosStageName =
ClusterExplain::getStageNameForReadOp(shardResponses.size(), findCommand);
uassertStatusOK(
ClusterExplain::buildExplainResult(opCtx,
ClusterExplain::downconvert(opCtx, shardResponses),
mongosStageName,
millisElapsed,
out));
}
} // namespace mongo