/** * Copyright (C) 2010 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects * for all of the code used other than as permitted herein. If you modify * file(s) with this exception, you may extend this exception to your * version of the file(s), but you are not obligated to do so. If you do not * wish to do so, delete this exception statement from your version. If you * delete this exception statement from all source files in the program, * then also delete it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding #include "mongo/platform/basic.h" #include "mongo/s/commands/strategy.h" #include "mongo/base/data_cursor.h" #include "mongo/base/owned_pointer_vector.h" #include "mongo/base/status.h" #include "mongo/bson/util/bson_extract.h" #include "mongo/bson/util/builder.h" #include "mongo/db/audit.h" #include "mongo/db/auth/action_type.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/commands.h" #include "mongo/db/lasterror.h" #include "mongo/db/matcher/extensions_callback_noop.h" #include "mongo/db/namespace_string.h" #include "mongo/db/query/find_common.h" #include "mongo/db/query/getmore_request.h" #include "mongo/db/query/query_request.h" #include "mongo/db/stats/counters.h" #include "mongo/db/views/resolved_view.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/rpc/metadata/server_selection_metadata.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/client/parallel.h" #include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/commands/cluster_explain.h" #include "mongo/s/grid.h" #include "mongo/s/query/cluster_cursor_manager.h" #include "mongo/s/query/cluster_find.h" #include "mongo/s/stale_exception.h" #include "mongo/s/write_ops/batch_upconvert.h" #include "mongo/s/write_ops/batched_command_request.h" #include "mongo/s/write_ops/batched_command_response.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" #include "mongo/util/timer.h" namespace mongo { using std::unique_ptr; using std::shared_ptr; using std::set; using std::string; using std::stringstream; namespace { void runAgainstRegistered(OperationContext* opCtx, const char* ns, BSONObj& jsobj, BSONObjBuilder& anObjBuilder, int queryOptions) { // It should be impossible for this uassert to fail since there should be no way to get // into this function with any other collection name. uassert(16618, "Illegal attempt to run a command against a namespace other than $cmd.", nsToCollectionSubstring(ns) == "$cmd"); BSONElement e = jsobj.firstElement(); std::string commandName = e.fieldName(); Command* c = e.type() ? Command::findCommand(commandName) : NULL; if (!c) { Command::appendCommandStatus( anObjBuilder, false, str::stream() << "no such cmd: " << commandName); anObjBuilder.append("code", ErrorCodes::CommandNotFound); Command::unknownCommands.increment(); return; } Command::execCommandClient(opCtx, c, queryOptions, ns, jsobj, anObjBuilder); } } // namespace void Strategy::queryOp(OperationContext* opCtx, const NamespaceString& nss, DbMessage* dbm) { globalOpCounters.gotQuery(); const QueryMessage q(*dbm); Client* const client = opCtx->getClient(); AuthorizationSession* const authSession = AuthorizationSession::get(client); Status status = authSession->checkAuthForFind(nss, false); audit::logQueryAuthzCheck(client, nss, q.query, status.code()); uassertStatusOK(status); LOG(3) << "query: " << q.ns << " " << redact(q.query) << " ntoreturn: " << q.ntoreturn << " options: " << q.queryOptions; if (q.queryOptions & QueryOption_Exhaust) { uasserted(18526, str::stream() << "The 'exhaust' query option is invalid for mongos queries: " << nss.ns() << " " << q.query.toString()); } // Determine the default read preference mode based on the value of the slaveOk flag. const ReadPreferenceSetting readPreference = [&]() { BSONElement rpElem; auto readPrefExtractStatus = bsonExtractTypedField( q.query, QueryRequest::kWrappedReadPrefField, mongo::Object, &rpElem); if (readPrefExtractStatus == ErrorCodes::NoSuchKey) { return ReadPreferenceSetting(q.queryOptions & QueryOption_SlaveOk ? ReadPreference::SecondaryPreferred : ReadPreference::PrimaryOnly); } uassertStatusOK(readPrefExtractStatus); return uassertStatusOK(ReadPreferenceSetting::fromBSON(rpElem.Obj())); }(); auto canonicalQuery = uassertStatusOK(CanonicalQuery::canonicalize(opCtx, q, ExtensionsCallbackNoop())); // If the $explain flag was set, we must run the operation on the shards as an explain command // rather than a find command. const QueryRequest& queryRequest = canonicalQuery->getQueryRequest(); if (queryRequest.isExplain()) { const BSONObj findCommand = queryRequest.asFindCommand(); // We default to allPlansExecution verbosity. const auto verbosity = ExplainCommon::EXEC_ALL_PLANS; const bool secondaryOk = (readPreference.pref != ReadPreference::PrimaryOnly); const rpc::ServerSelectionMetadata metadata(secondaryOk, readPreference); BSONObjBuilder explainBuilder; uassertStatusOK(Strategy::explainFind( opCtx, findCommand, queryRequest, verbosity, metadata, &explainBuilder)); BSONObj explainObj = explainBuilder.done(); replyToQuery(0, // query result flags client->session(), dbm->msg(), static_cast(explainObj.objdata()), explainObj.objsize(), 1, // numResults 0, // startingFrom CursorId(0)); return; } // Do the work to generate the first batch of results. This blocks waiting to get responses from // the shard(s). std::vector batch; // 0 means the cursor is exhausted. Otherwise we assume that a cursor with the returned id can // be retrieved via the ClusterCursorManager. auto cursorId = ClusterFind::runQuery(opCtx, *canonicalQuery, readPreference, &batch, nullptr /*Argument is for views which OP_QUERY doesn't support*/); if (!cursorId.isOK() && cursorId.getStatus() == ErrorCodes::CommandOnShardedViewNotSupportedOnMongod) { uasserted(40247, "OP_QUERY not supported on views"); } uassertStatusOK(cursorId.getStatus()); // Fill out the response buffer. int numResults = 0; OpQueryReplyBuilder reply; for (auto&& obj : batch) { obj.appendSelfToBufBuilder(reply.bufBuilderForResults()); numResults++; } reply.send(client->session(), 0, // query result flags dbm->msg(), numResults, 0, // startingFrom cursorId.getValue()); } void Strategy::clientCommandOp(OperationContext* opCtx, const NamespaceString& nss, DbMessage* dbm) { const QueryMessage q(*dbm); Client* const client = opCtx->getClient(); LOG(3) << "command: " << q.ns << " " << redact(q.query) << " ntoreturn: " << q.ntoreturn << " options: " << q.queryOptions; if (q.queryOptions & QueryOption_Exhaust) { uasserted(18527, str::stream() << "The 'exhaust' query option is invalid for mongos commands: " << nss.ns() << " " << q.query.toString()); } uassert(16978, str::stream() << "Bad numberToReturn (" << q.ntoreturn << ") for $cmd type ns - can only be 1 or -1", q.ntoreturn == 1 || q.ntoreturn == -1); // Handle the $cmd.sys pseudo-commands if (nss.isSpecialCommand()) { const auto upgradeToRealCommand = [&](StringData commandName) { BSONObjBuilder cmdBob; cmdBob.append(commandName, 1); cmdBob.appendElements(q.query); // fields are validated by Commands auto interposedCmd = cmdBob.done(); // Rewrite upgraded pseudoCommands to run on the 'admin' database. const NamespaceString interposedNss("admin", "$cmd"); BSONObjBuilder reply; runAgainstRegistered( opCtx, interposedNss.ns().c_str(), interposedCmd, reply, q.queryOptions); replyToQuery(0, client->session(), dbm->msg(), reply.done()); }; if (nss.coll() == "$cmd.sys.inprog") { upgradeToRealCommand("currentOp"); return; } else if (nss.coll() == "$cmd.sys.killop") { upgradeToRealCommand("killOp"); return; } else if (nss.coll() == "$cmd.sys.unlock") { replyToQuery(0, client->session(), dbm->msg(), BSON("err" << "can't do unlock through mongos")); return; } // No pseudo-command found, fall through to execute as a regular query } BSONObj cmdObj = q.query; { BSONElement e = cmdObj.firstElement(); if (e.type() == Object && (e.fieldName()[0] == '$' ? str::equals("query", e.fieldName() + 1) : str::equals("query", e.fieldName()))) { // Extract the embedded query object. if (cmdObj.hasField(Query::ReadPrefField.name())) { // The command has a read preference setting. We don't want to lose this information // so we copy this to a new field called $queryOptions.$readPreference BSONObjBuilder finalCmdObjBuilder; finalCmdObjBuilder.appendElements(e.embeddedObject()); BSONObjBuilder queryOptionsBuilder(finalCmdObjBuilder.subobjStart("$queryOptions")); queryOptionsBuilder.append(cmdObj[Query::ReadPrefField.name()]); queryOptionsBuilder.done(); cmdObj = finalCmdObjBuilder.obj(); } else { cmdObj = e.embeddedObject(); } } } // Handle command option maxTimeMS. uassert(ErrorCodes::InvalidOptions, "no such command option $maxTimeMs; use maxTimeMS instead", cmdObj[QueryRequest::queryOptionMaxTimeMS].eoo()); const int maxTimeMS = uassertStatusOK(QueryRequest::parseMaxTimeMS(cmdObj[QueryRequest::cmdOptionMaxTimeMS])); if (maxTimeMS > 0) { opCtx->setDeadlineAfterNowBy(Milliseconds{maxTimeMS}); } int loops = 5; while (true) { try { OpQueryReplyBuilder reply; { BSONObjBuilder builder(reply.bufBuilderForResults()); runAgainstRegistered(opCtx, q.ns, cmdObj, builder, q.queryOptions); } reply.sendCommandReply(client->session(), dbm->msg()); return; } catch (const StaleConfigException& e) { if (loops <= 0) throw e; loops--; log() << "Retrying command " << redact(q.query) << causedBy(e); // For legacy reasons, ns may not actually be set in the exception const std::string staleNS(e.getns().empty() ? std::string(q.ns) : e.getns()); ShardConnection::checkMyConnectionVersions(opCtx, staleNS); if (loops < 4) { const NamespaceString staleNSS(staleNS); if (staleNSS.isValid()) { Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(staleNSS); } } } catch (const DBException& e) { OpQueryReplyBuilder reply; { BSONObjBuilder builder(reply.bufBuilderForResults()); Command::appendCommandStatus(builder, e.toStatus()); } reply.sendCommandReply(client->session(), dbm->msg()); return; } } } void Strategy::commandOp(OperationContext* opCtx, const string& db, const BSONObj& command, int options, const string& versionedNS, const BSONObj& targetingQuery, const BSONObj& targetingCollation, std::vector* results) { QuerySpec qSpec(db + ".$cmd", command, BSONObj(), 0, 1, options); ParallelSortClusteredCursor cursor( qSpec, CommandInfo(versionedNS, targetingQuery, targetingCollation)); // Initialize the cursor cursor.init(opCtx); set shardIds; cursor.getQueryShardIds(shardIds); for (const ShardId& shardId : shardIds) { CommandResult result; result.shardTargetId = shardId; result.target = fassertStatusOK( 34417, ConnectionString::parse(cursor.getShardCursor(shardId)->originalHost())); result.result = cursor.getShardCursor(shardId)->peekFirst().getOwned(); results->push_back(result); } } void Strategy::getMore(OperationContext* opCtx, const NamespaceString& nss, DbMessage* dbm) { const int ntoreturn = dbm->pullInt(); uassert( 34424, str::stream() << "Invalid ntoreturn for OP_GET_MORE: " << ntoreturn, ntoreturn >= 0); const long long cursorId = dbm->pullInt64(); globalOpCounters.gotGetMore(); Client* const client = opCtx->getClient(); // TODO: Handle stale config exceptions here from coll being dropped or sharded during op for // now has same semantics as legacy request. auto statusGetDb = Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, nss.db()); if (statusGetDb == ErrorCodes::NamespaceNotFound) { replyToQuery(ResultFlag_CursorNotFound, client->session(), dbm->msg(), 0, 0, 0); return; } uassertStatusOK(statusGetDb); boost::optional batchSize; if (ntoreturn) { batchSize = ntoreturn; } GetMoreRequest getMoreRequest(nss, cursorId, batchSize, boost::none, boost::none, boost::none); auto cursorResponse = ClusterFind::runGetMore(opCtx, getMoreRequest); if (cursorResponse == ErrorCodes::CursorNotFound) { replyToQuery(ResultFlag_CursorNotFound, client->session(), dbm->msg(), 0, 0, 0); return; } uassertStatusOK(cursorResponse.getStatus()); // Build the response document. BufBuilder buffer(FindCommon::kInitReplyBufferSize); int numResults = 0; for (const auto& obj : cursorResponse.getValue().getBatch()) { buffer.appendBuf((void*)obj.objdata(), obj.objsize()); ++numResults; } replyToQuery(0, client->session(), dbm->msg(), buffer.buf(), buffer.len(), numResults, cursorResponse.getValue().getNumReturnedSoFar().value_or(0), cursorResponse.getValue().getCursorId()); } void Strategy::killCursors(OperationContext* opCtx, DbMessage* dbm) { const int numCursors = dbm->pullInt(); massert(34425, str::stream() << "Invalid killCursors message. numCursors: " << numCursors << ", message size: " << dbm->msg().dataSize() << ".", dbm->msg().dataSize() == 8 + (8 * numCursors)); uassert(28794, str::stream() << "numCursors must be between 1 and 29999. numCursors: " << numCursors << ".", numCursors >= 1 && numCursors < 30000); globalOpCounters.gotOp(dbKillCursors, false); ConstDataCursor cursors(dbm->getArray(numCursors)); Client* const client = opCtx->getClient(); AuthorizationSession* const authSession = AuthorizationSession::get(client); ClusterCursorManager* const manager = Grid::get(opCtx)->getCursorManager(); for (int i = 0; i < numCursors; ++i) { const CursorId cursorId = cursors.readAndAdvance>(); boost::optional nss = manager->getNamespaceForCursorId(cursorId); if (!nss) { LOG(3) << "Can't find cursor to kill. Cursor id: " << cursorId << "."; continue; } Status authorizationStatus = authSession->checkAuthForKillCursors(*nss, cursorId); audit::logKillCursorsAuthzCheck(client, *nss, cursorId, authorizationStatus.isOK() ? ErrorCodes::OK : ErrorCodes::Unauthorized); if (!authorizationStatus.isOK()) { LOG(3) << "Not authorized to kill cursor. Namespace: '" << *nss << "', cursor id: " << cursorId << "."; continue; } Status killCursorStatus = manager->killCursor(*nss, cursorId); if (!killCursorStatus.isOK()) { LOG(3) << "Can't find cursor to kill. Namespace: '" << *nss << "', cursor id: " << cursorId << "."; continue; } LOG(3) << "Killed cursor. Namespace: '" << *nss << "', cursor id: " << cursorId << "."; } } void Strategy::writeOp(OperationContext* opCtx, DbMessage* dbm) { OwnedPointerVector commandRequestsOwned; std::vector& commandRequests = commandRequestsOwned.mutableVector(); msgToBatchRequests(dbm->msg(), &commandRequests); auto& clientLastError = LastError::get(opCtx->getClient()); for (auto it = commandRequests.begin(); it != commandRequests.end(); ++it) { // Multiple commands registered to last error as multiple requests if (it != commandRequests.begin()) { clientLastError.startRequest(); } BatchedCommandRequest* const commandRequest = *it; BatchedCommandResponse commandResponse; { // Disable the last error object for the duration of the write cmd LastError::Disabled disableLastError(&clientLastError); // Adjust namespace for command const NamespaceString& fullNS(commandRequest->getNS()); const std::string cmdNS = fullNS.getCommandNS(); BSONObj commandBSON = commandRequest->toBSON(); BSONObjBuilder builder; runAgainstRegistered(opCtx, cmdNS.c_str(), commandBSON, builder, 0); bool parsed = commandResponse.parseBSON(builder.done(), nullptr); (void)parsed; // for compile dassert(parsed && commandResponse.isValid(nullptr)); } // Populate the lastError object based on the write response clientLastError.reset(); const bool hadError = batchErrorToLastError(*commandRequest, commandResponse, &clientLastError); // Check if this is an ordered batch and we had an error which should stop processing if (commandRequest->getOrdered() && hadError) { break; } } } Status Strategy::explainFind(OperationContext* opCtx, const BSONObj& findCommand, const QueryRequest& qr, ExplainCommon::Verbosity verbosity, const rpc::ServerSelectionMetadata& serverSelectionMetadata, BSONObjBuilder* out) { BSONObjBuilder explainCmdBob; int options = 0; ClusterExplain::wrapAsExplain( findCommand, verbosity, serverSelectionMetadata, &explainCmdBob, &options); // We will time how long it takes to run the commands on the shards. Timer timer; std::vector shardResults; Strategy::commandOp(opCtx, qr.nss().db().toString(), explainCmdBob.obj(), options, qr.nss().toString(), qr.getFilter(), qr.getCollation(), &shardResults); long long millisElapsed = timer.millis(); if (shardResults.size() == 1 && ResolvedView::isResolvedViewErrorResponse(shardResults[0].result)) { out->append("resolvedView", shardResults[0].result.getObjectField("resolvedView")); return getStatusFromCommandResult(shardResults[0].result); } const char* mongosStageName = ClusterExplain::getStageNameForReadOp(shardResults, findCommand); return ClusterExplain::buildExplainResult( opCtx, shardResults, mongosStageName, millisElapsed, out); } } // namespace mongo