/** * Copyright (C) 2008-2015 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand #include "mongo/platform/basic.h" #include "mongo/bson/util/bson_extract.h" #include "mongo/client/connpool.h" #include "mongo/db/auth/action_set.h" #include "mongo/db/auth/action_type.h" #include "mongo/db/auth/authorization_manager.h" #include "mongo/db/auth/authorization_manager_global.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/auth/privilege.h" #include "mongo/db/commands.h" #include "mongo/db/commands/copydb.h" #include "mongo/db/commands/rename_collection.h" #include "mongo/db/lasterror.h" #include "mongo/db/query/lite_parsed_query.h" #include "mongo/s/catalog/catalog_cache.h" #include "mongo/s/catalog/catalog_manager.h" #include "mongo/s/chunk_manager.h" #include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/cluster_explain.h" #include "mongo/s/cluster_last_error_info.h" #include "mongo/s/commands/cluster_commands_common.h" #include "mongo/s/commands/run_on_all_shards_cmd.h" #include "mongo/s/config.h" #include "mongo/s/grid.h" #include "mongo/s/query/store_possible_cursor.h" #include "mongo/s/stale_exception.h" #include "mongo/s/version_manager.h" #include "mongo/scripting/engine.h" #include "mongo/util/log.h" #include "mongo/util/timer.h" namespace mongo { using std::unique_ptr; using std::shared_ptr; using std::list; using std::make_pair; using std::map; using std::multimap; using std::set; using std::string; using std::stringstream; using std::vector; namespace { bool cursorCommandPassthrough(OperationContext* txn, shared_ptr conf, const BSONObj& cmdObj, int options, BSONObjBuilder* out) { const auto shard = grid.shardRegistry()->getShard(txn, conf->getPrimaryId()); if (!shard) { return Command::appendCommandStatus( *out, {ErrorCodes::ShardNotFound, "failed to find a valid shard"}); } ScopedDbConnection conn(shard->getConnString()); auto cursor = conn->query(str::stream() << conf->name() << ".$cmd", cmdObj, -1, // nToReturn 0, // nToSkip NULL, // fieldsToReturn options); if (!cursor || !cursor->more()) { return Command::appendCommandStatus( *out, {ErrorCodes::OperationFailed, "failed to read command response from shard"}); } BSONObj response = cursor->nextSafe().getOwned(); conn.done(); Status status = Command::getStatusFromCommandResult(response); if (ErrorCodes::SendStaleConfig == status || ErrorCodes::RecvStaleConfig == status) { throw RecvStaleConfigException("command failed because of stale config", response); } if (!status.isOK()) { return Command::appendCommandStatus(*out, status); } StatusWith transformedResponse = storePossibleCursor(HostAndPort(cursor->originalHost()), response, grid.shardRegistry()->getExecutorPool()->getArbitraryExecutor(), grid.getCursorManager()); if (!transformedResponse.isOK()) { return Command::appendCommandStatus(*out, transformedResponse.getStatus()); } out->appendElements(transformedResponse.getValue()); return true; } class PublicGridCommand : public Command { public: PublicGridCommand(const char* n, const char* oldname = NULL) : Command(n, false, oldname) {} virtual bool slaveOk() const { return true; } virtual bool adminOnly() const { return false; } // Override if passthrough should also send query options // Safer as off by default, can slowly enable as we add more tests virtual bool passOptions() const { return false; } // all grid commands are designed not to lock virtual bool isWriteCommandForConfigServer() const { return false; } protected: bool passthrough(OperationContext* txn, shared_ptr conf, const BSONObj& cmdObj, BSONObjBuilder& result) { return _passthrough(txn, conf->name(), conf, cmdObj, 0, result); } bool adminPassthrough(OperationContext* txn, shared_ptr conf, const BSONObj& cmdObj, BSONObjBuilder& result) { return _passthrough(txn, "admin", conf, cmdObj, 0, result); } bool passthrough(OperationContext* txn, shared_ptr conf, const BSONObj& cmdObj, int options, BSONObjBuilder& result) { return _passthrough(txn, conf->name(), conf, cmdObj, options, result); } private: bool _passthrough(OperationContext* txn, const string& db, shared_ptr conf, const BSONObj& cmdObj, int options, BSONObjBuilder& result) { const auto shard = grid.shardRegistry()->getShard(txn, conf->getPrimaryId()); ShardConnection conn(shard->getConnString(), ""); BSONObj res; bool ok = conn->runCommand(db, cmdObj, res, passOptions() ? options : 0); conn.done(); result.appendElements(res); return ok; } }; class AllShardsCollectionCommand : public RunOnAllShardsCommand { public: AllShardsCollectionCommand(const char* n, const char* oldname = NULL, bool useShardConn = false, bool implicitCreateDb = false) : RunOnAllShardsCommand(n, oldname, useShardConn, implicitCreateDb) {} virtual void getShardIds(OperationContext* txn, const string& dbName, BSONObj& cmdObj, vector& shardIds) { const string fullns = dbName + '.' + cmdObj.firstElement().valuestrsafe(); auto status = grid.catalogCache()->getDatabase(txn, dbName); uassertStatusOK(status.getStatus()); shared_ptr conf = status.getValue(); if (!conf->isShardingEnabled() || !conf->isSharded(fullns)) { shardIds.push_back(conf->getShardId(txn, fullns)); } else { grid.shardRegistry()->getAllShardIds(&shardIds); } } }; class NotAllowedOnShardedCollectionCmd : public PublicGridCommand { public: NotAllowedOnShardedCollectionCmd(const char* n) : PublicGridCommand(n) {} virtual bool run(OperationContext* txn, const string& dbName, BSONObj& cmdObj, int options, string& errmsg, BSONObjBuilder& result) { const string fullns = parseNs(dbName, cmdObj); auto conf = uassertStatusOK(grid.catalogCache()->getDatabase(txn, dbName)); if (!conf->isSharded(fullns)) { return passthrough(txn, conf, cmdObj, options, result); } return appendCommandStatus( result, Status(ErrorCodes::IllegalOperation, str::stream() << "can't do command: " << name << " on sharded collection")); } }; // MongoS commands implementation class DropIndexesCmd : public AllShardsCollectionCommand { public: DropIndexesCmd() : AllShardsCollectionCommand("dropIndexes", "deleteIndexes") {} virtual void addRequiredPrivileges(const std::string& dbname, const BSONObj& cmdObj, std::vector* out) { ActionSet actions; actions.addAction(ActionType::dropIndex); out->push_back(Privilege(parseResourcePattern(dbname, cmdObj), actions)); } } dropIndexesCmd; class CreateIndexesCmd : public AllShardsCollectionCommand { public: CreateIndexesCmd() : AllShardsCollectionCommand("createIndexes", NULL, /* oldName */ true /* use ShardConnection */, true /* implicit create db */) { // createIndexes command should use ShardConnection so the getLastError would // be able to properly enforce the write concern (via the saveGLEStats callback). } /** * the createIndexes command doesn't require the 'ns' field to be populated * so we make sure its here as its needed for the system.indexes insert */ BSONObj fixSpec(const NamespaceString& ns, const BSONObj& original) const { if (original["ns"].type() == String) return original; BSONObjBuilder bb; bb.appendElements(original); bb.append("ns", ns.toString()); return bb.obj(); } /** * @return equivalent of gle */ BSONObj createIndexLegacy(const string& server, const NamespaceString& nss, const BSONObj& spec) const { try { ScopedDbConnection conn(server); conn->insert(nss.getSystemIndexesCollection(), spec); BSONObj gle = conn->getLastErrorDetailed(nss.db().toString()); conn.done(); return gle; } catch (DBException& e) { BSONObjBuilder b; b.append("errmsg", e.toString()); b.append("code", e.getCode()); return b.obj(); } } virtual BSONObj specialErrorHandler(const string& server, const string& dbName, const BSONObj& cmdObj, const BSONObj& originalResult) const { string errmsg = originalResult["errmsg"]; if (errmsg.find("no such cmd") == string::npos) { // cannot use codes as 2.4 didn't have a code for this return originalResult; } // we need to down convert NamespaceString nss(dbName, cmdObj["createIndexes"].String()); if (cmdObj["indexes"].type() != Array) return originalResult; BSONObjBuilder newResult; newResult.append("note", "downgraded"); newResult.append("sentTo", server); BSONArrayBuilder individualResults; bool ok = true; BSONObjIterator indexIterator(cmdObj["indexes"].Obj()); while (indexIterator.more()) { BSONObj spec = indexIterator.next().Obj(); spec = fixSpec(nss, spec); BSONObj gle = createIndexLegacy(server, nss, spec); individualResults.append(BSON("spec" << spec << "gle" << gle)); BSONElement e = gle["errmsg"]; if (e.type() == String && e.String().size() > 0) { ok = false; newResult.appendAs(e, "errmsg"); break; } e = gle["err"]; if (e.type() == String && e.String().size() > 0) { ok = false; newResult.appendAs(e, "errmsg"); break; } } newResult.append("eachIndex", individualResults.arr()); newResult.append("ok", ok ? 1 : 0); return newResult.obj(); } virtual void addRequiredPrivileges(const std::string& dbname, const BSONObj& cmdObj, std::vector* out) { ActionSet actions; actions.addAction(ActionType::createIndex); out->push_back(Privilege(parseResourcePattern(dbname, cmdObj), actions)); } } createIndexesCmd; class ReIndexCmd : public AllShardsCollectionCommand { public: ReIndexCmd() : AllShardsCollectionCommand("reIndex") {} virtual void addRequiredPrivileges(const std::string& dbname, const BSONObj& cmdObj, std::vector* out) { ActionSet actions; actions.addAction(ActionType::reIndex); out->push_back(Privilege(parseResourcePattern(dbname, cmdObj), actions)); } } reIndexCmd; class CollectionModCmd : public AllShardsCollectionCommand { public: CollectionModCmd() : AllShardsCollectionCommand("collMod") {} virtual void addRequiredPrivileges(const std::string& dbname, const BSONObj& cmdObj, std::vector* out) { ActionSet actions; actions.addAction(ActionType::collMod); out->push_back(Privilege(parseResourcePattern(dbname, cmdObj), actions)); } } collectionModCmd; class ValidateCmd : public AllShardsCollectionCommand { public: ValidateCmd() : AllShardsCollectionCommand("validate") {} virtual void addRequiredPrivileges(const std::string& dbname, const BSONObj& cmdObj, std::vector* out) { ActionSet actions; actions.addAction(ActionType::validate); out->push_back(Privilege(parseResourcePattern(dbname, cmdObj), actions)); } virtual void aggregateResults(const vector& results, BSONObjBuilder& output) { for (vector::const_iterator it(results.begin()), end(results.end()); it != end; it++) { const BSONObj& result = std::get<1>(*it); const BSONElement valid = result["valid"]; if (!valid.eoo()) { if (!valid.trueValue()) { output.appendBool("valid", false); return; } } else { // Support pre-1.9.0 output with everything in a big string const char* s = result["result"].valuestrsafe(); if (strstr(s, "exception") || strstr(s, "corrupt")) { output.appendBool("valid", false); return; } } } output.appendBool("valid", true); } } validateCmd; class CreateCmd : public PublicGridCommand { public: CreateCmd() : PublicGridCommand("create") {} virtual Status checkAuthForCommand(ClientBasic* client, const std::string& dbname, const BSONObj& cmdObj) { AuthorizationSession* authzSession = AuthorizationSession::get(client); if (cmdObj["capped"].trueValue()) { if (!authzSession->isAuthorizedForActionsOnResource( parseResourcePattern(dbname, cmdObj), ActionType::convertToCapped)) { return Status(ErrorCodes::Unauthorized, "unauthorized"); } } // ActionType::createCollection or ActionType::insert are both acceptable if (authzSession->isAuthorizedForActionsOnResource(parseResourcePattern(dbname, cmdObj), ActionType::createCollection) || authzSession->isAuthorizedForActionsOnResource(parseResourcePattern(dbname, cmdObj), ActionType::insert)) { return Status::OK(); } return Status(ErrorCodes::Unauthorized, "unauthorized"); } bool run(OperationContext* txn, const string& dbName, BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result) { auto status = grid.implicitCreateDb(txn, dbName); if (!status.isOK()) { return appendCommandStatus(result, status.getStatus()); } shared_ptr conf = status.getValue(); return passthrough(txn, conf, cmdObj, result); } } createCmd; class DropCmd : public PublicGridCommand { public: DropCmd() : PublicGridCommand("drop") {} virtual void addRequiredPrivileges(const std::string& dbname, const BSONObj& cmdObj, std::vector* out) { ActionSet actions; actions.addAction(ActionType::dropCollection); out->push_back(Privilege(parseResourcePattern(dbname, cmdObj), actions)); } bool run(OperationContext* txn, const string& dbName, BSONObj& cmdObj, int options, string& errmsg, BSONObjBuilder& result) { auto status = grid.catalogCache()->getDatabase(txn, dbName); if (!status.isOK()) { if (status == ErrorCodes::NamespaceNotFound) { return true; } return appendCommandStatus(result, status.getStatus()); } const string fullns = dbName + "." + cmdObj.firstElement().valuestrsafe(); log() << "DROP: " << fullns; const auto& db = status.getValue(); if (!db->isShardingEnabled() || !db->isSharded(fullns)) { log() << "\tdrop going to do passthrough"; return passthrough(txn, db, cmdObj, result); } uassertStatusOK(grid.catalogManager(txn)->dropCollection(txn, NamespaceString(fullns))); // Force a full reload next time the just dropped namespace is accessed db->invalidateNs(fullns); return true; } } dropCmd; class RenameCollectionCmd : public PublicGridCommand { public: RenameCollectionCmd() : PublicGridCommand("renameCollection") {} virtual Status checkAuthForCommand(ClientBasic* client, const std::string& dbname, const BSONObj& cmdObj) { return rename_collection::checkAuthForRenameCollectionCommand(client, dbname, cmdObj); } virtual bool adminOnly() const { return true; } bool run(OperationContext* txn, const string& dbName, BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result) { const string fullnsFrom = cmdObj.firstElement().valuestrsafe(); const string dbNameFrom = nsToDatabase(fullnsFrom); auto confFrom = uassertStatusOK(grid.catalogCache()->getDatabase(txn, dbNameFrom)); const string fullnsTo = cmdObj["to"].valuestrsafe(); const string dbNameTo = nsToDatabase(fullnsTo); auto confTo = uassertStatusOK(grid.catalogCache()->getDatabase(txn, dbNameTo)); uassert(13138, "You can't rename a sharded collection", !confFrom->isSharded(fullnsFrom)); uassert(13139, "You can't rename to a sharded collection", !confTo->isSharded(fullnsTo)); const ShardId& shardTo = confTo->getShardId(txn, fullnsTo); const ShardId& shardFrom = confFrom->getShardId(txn, fullnsFrom); uassert(13137, "Source and destination collections must be on same shard", shardFrom == shardTo); return adminPassthrough(txn, confFrom, cmdObj, result); } } renameCollectionCmd; class CopyDBCmd : public PublicGridCommand { public: CopyDBCmd() : PublicGridCommand("copydb") {} virtual bool adminOnly() const { return true; } virtual Status checkAuthForCommand(ClientBasic* client, const std::string& dbname, const BSONObj& cmdObj) { return copydb::checkAuthForCopydbCommand(client, dbname, cmdObj); } bool run(OperationContext* txn, const string& dbName, BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result) { const string todb = cmdObj.getStringField("todb"); uassert(ErrorCodes::EmptyFieldName, "missing todb argument", !todb.empty()); uassert(ErrorCodes::InvalidNamespace, "invalid todb argument", NamespaceString::validDBName(todb)); auto confTo = uassertStatusOK(grid.implicitCreateDb(txn, todb)); uassert(ErrorCodes::IllegalOperation, "cannot copy to a sharded database", !confTo->isShardingEnabled()); const string fromhost = cmdObj.getStringField("fromhost"); if (!fromhost.empty()) { return adminPassthrough(txn, confTo, cmdObj, result); } else { const string fromdb = cmdObj.getStringField("fromdb"); uassert(13399, "need a fromdb argument", !fromdb.empty()); shared_ptr confFrom = uassertStatusOK(grid.catalogCache()->getDatabase(txn, fromdb)); uassert(13400, "don't know where source DB is", confFrom); uassert(13401, "cant copy from sharded DB", !confFrom->isShardingEnabled()); BSONObjBuilder b; BSONForEach(e, cmdObj) { if (strcmp(e.fieldName(), "fromhost") != 0) { b.append(e); } } { const auto& shard = grid.shardRegistry()->getShard(txn, confFrom->getPrimaryId()); b.append("fromhost", shard->getConnString().toString()); } BSONObj fixed = b.obj(); return adminPassthrough(txn, confTo, fixed, result); } } } clusterCopyDBCmd; class CollectionStats : public PublicGridCommand { public: CollectionStats() : PublicGridCommand("collStats", "collstats") {} virtual void addRequiredPrivileges(const std::string& dbname, const BSONObj& cmdObj, std::vector* out) { ActionSet actions; actions.addAction(ActionType::collStats); out->push_back(Privilege(parseResourcePattern(dbname, cmdObj), actions)); } bool run(OperationContext* txn, const string& dbName, BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result) { const string fullns = parseNs(dbName, cmdObj); auto conf = uassertStatusOK(grid.catalogCache()->getDatabase(txn, dbName)); if (!conf->isShardingEnabled() || !conf->isSharded(fullns)) { result.appendBool("sharded", false); result.append("primary", conf->getPrimaryId()); return passthrough(txn, conf, cmdObj, result); } result.appendBool("sharded", true); ChunkManagerPtr cm = conf->getChunkManager(txn, fullns); massert(12594, "how could chunk manager be null!", cm); BSONObjBuilder shardStats; map counts; map indexSizes; /* long long count=0; long long size=0; long long storageSize=0; */ int nindexes = 0; bool warnedAboutIndexes = false; set shardIds; cm->getAllShardIds(&shardIds); for (const ShardId& shardId : shardIds) { const auto shard = grid.shardRegistry()->getShard(txn, shardId); if (!shard) { continue; } BSONObj res; { ScopedDbConnection conn(shard->getConnString()); if (!conn->runCommand(dbName, cmdObj, res)) { if (!res["code"].eoo()) { result.append(res["code"]); } errmsg = "failed on shard: " + res.toString(); return false; } conn.done(); } BSONObjIterator j(res); while (j.more()) { BSONElement e = j.next(); if (str::equals(e.fieldName(), "ns") || str::equals(e.fieldName(), "ok") || str::equals(e.fieldName(), "avgObjSize") || str::equals(e.fieldName(), "lastExtentSize") || str::equals(e.fieldName(), "paddingFactor")) { continue; } else if (str::equals(e.fieldName(), "count") || str::equals(e.fieldName(), "size") || str::equals(e.fieldName(), "storageSize") || str::equals(e.fieldName(), "numExtents") || str::equals(e.fieldName(), "totalIndexSize")) { counts[e.fieldName()] += e.numberLong(); } else if (str::equals(e.fieldName(), "indexSizes")) { BSONObjIterator k(e.Obj()); while (k.more()) { BSONElement temp = k.next(); indexSizes[temp.fieldName()] += temp.numberLong(); } } // no longer used since 2.2 else if (str::equals(e.fieldName(), "flags")) { if (!result.hasField(e.fieldName())) result.append(e); } // flags broken out in 2.4+ else if (str::equals(e.fieldName(), "systemFlags")) { if (!result.hasField(e.fieldName())) result.append(e); } else if (str::equals(e.fieldName(), "userFlags")) { if (!result.hasField(e.fieldName())) result.append(e); } else if (str::equals(e.fieldName(), "capped")) { if (!result.hasField(e.fieldName())) result.append(e); } else if (str::equals(e.fieldName(), "paddingFactorNote")) { if (!result.hasField(e.fieldName())) result.append(e); } else if (str::equals(e.fieldName(), "indexDetails")) { // skip this field in the rollup } else if (str::equals(e.fieldName(), "wiredTiger")) { // skip this field in the rollup } else if (str::equals(e.fieldName(), "nindexes")) { int myIndexes = e.numberInt(); if (nindexes == 0) { nindexes = myIndexes; } else if (nindexes == myIndexes) { // no-op } else { // hopefully this means we're building an index if (myIndexes > nindexes) nindexes = myIndexes; if (!warnedAboutIndexes) { result.append("warning", "indexes don't all match - ok if ensureIndex is running"); warnedAboutIndexes = true; } } } else { warning() << "mongos collstats doesn't know about: " << e.fieldName(); } } shardStats.append(shardId, res); } result.append("ns", fullns); for (map::iterator i = counts.begin(); i != counts.end(); ++i) result.appendNumber(i->first, i->second); { BSONObjBuilder ib(result.subobjStart("indexSizes")); for (map::iterator i = indexSizes.begin(); i != indexSizes.end(); ++i) ib.appendNumber(i->first, i->second); ib.done(); } if (counts["count"] > 0) result.append("avgObjSize", (double)counts["size"] / (double)counts["count"]); else result.append("avgObjSize", 0.0); result.append("nindexes", nindexes); result.append("nchunks", cm->numChunks()); result.append("shards", shardStats.obj()); return true; } } collectionStatsCmd; class DataSizeCmd : public PublicGridCommand { public: DataSizeCmd() : PublicGridCommand("dataSize", "datasize") {} virtual string parseNs(const string& dbname, const BSONObj& cmdObj) const override { return parseNsFullyQualified(dbname, cmdObj); } virtual void addRequiredPrivileges(const std::string& dbname, const BSONObj& cmdObj, std::vector* out) { ActionSet actions; actions.addAction(ActionType::find); out->push_back(Privilege(parseResourcePattern(dbname, cmdObj), actions)); } bool run(OperationContext* txn, const string& dbName, BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result) { const string fullns = parseNs(dbName, cmdObj); const string nsDBName = nsToDatabase(fullns); auto conf = uassertStatusOK(grid.catalogCache()->getDatabase(txn, nsDBName)); if (!conf->isShardingEnabled() || !conf->isSharded(fullns)) { return passthrough(txn, conf, cmdObj, result); } ChunkManagerPtr cm = conf->getChunkManager(txn, fullns); massert(13407, "how could chunk manager be null!", cm); BSONObj min = cmdObj.getObjectField("min"); BSONObj max = cmdObj.getObjectField("max"); BSONObj keyPattern = cmdObj.getObjectField("keyPattern"); uassert(13408, "keyPattern must equal shard key", cm->getShardKeyPattern().toBSON() == keyPattern); uassert(13405, str::stream() << "min value " << min << " does not have shard key", cm->getShardKeyPattern().isShardKey(min)); uassert(13406, str::stream() << "max value " << max << " does not have shard key", cm->getShardKeyPattern().isShardKey(max)); min = cm->getShardKeyPattern().normalizeShardKey(min); max = cm->getShardKeyPattern().normalizeShardKey(max); // yes these are doubles... double size = 0; double numObjects = 0; int millis = 0; set shardIds; cm->getShardIdsForRange(shardIds, min, max); for (const ShardId& shardId : shardIds) { const auto shard = grid.shardRegistry()->getShard(txn, shardId); if (!shard) { continue; } ScopedDbConnection conn(shard->getConnString()); BSONObj res; bool ok = conn->runCommand(conf->name(), cmdObj, res); conn.done(); if (!ok) { result.appendElements(res); return false; } size += res["size"].number(); numObjects += res["numObjects"].number(); millis += res["millis"].numberInt(); } result.append("size", size); result.append("numObjects", numObjects); result.append("millis", millis); return true; } } DataSizeCmd; class ConvertToCappedCmd : public NotAllowedOnShardedCollectionCmd { public: ConvertToCappedCmd() : NotAllowedOnShardedCollectionCmd("convertToCapped") {} virtual void addRequiredPrivileges(const std::string& dbname, const BSONObj& cmdObj, std::vector* out) { ActionSet actions; actions.addAction(ActionType::convertToCapped); out->push_back(Privilege(parseResourcePattern(dbname, cmdObj), actions)); } } convertToCappedCmd; class GroupCmd : public NotAllowedOnShardedCollectionCmd { public: GroupCmd() : NotAllowedOnShardedCollectionCmd("group") {} virtual void addRequiredPrivileges(const std::string& dbname, const BSONObj& cmdObj, std::vector* out) { ActionSet actions; actions.addAction(ActionType::find); out->push_back(Privilege(parseResourcePattern(dbname, cmdObj), actions)); } virtual bool passOptions() const { return true; } virtual std::string parseNs(const std::string& dbName, const BSONObj& cmdObj) const { return dbName + "." + cmdObj.firstElement().embeddedObjectUserCheck()["ns"].valuestrsafe(); } Status explain(OperationContext* txn, const std::string& dbname, const BSONObj& cmdObj, ExplainCommon::Verbosity verbosity, const rpc::ServerSelectionMetadata& serverSelectionMetadata, BSONObjBuilder* out) const { // We will time how long it takes to run the commands on the shards. Timer timer; BSONObj command; int options = 0; { BSONObjBuilder explainCmdBob; ClusterExplain::wrapAsExplain( cmdObj, verbosity, serverSelectionMetadata, &explainCmdBob, &options); command = explainCmdBob.obj(); } const NamespaceString nss(parseNs(dbname, cmdObj)); // Note that this implementation will not handle targeting retries and fails when the // sharding metadata is too stale auto status = grid.catalogCache()->getDatabase(txn, nss.db().toString()); if (!status.isOK()) { return Status(status.getStatus().code(), str::stream() << "Passthrough command failed: " << command.toString() << " on ns " << nss.ns() << ". Caused by " << causedBy(status.getStatus())); } shared_ptr conf = status.getValue(); if (conf->isSharded(nss.ns())) { return Status(ErrorCodes::IllegalOperation, str::stream() << "Passthrough command failed: " << command.toString() << " on ns " << nss.ns() << ". Cannot run on sharded namespace."); } const auto primaryShard = grid.shardRegistry()->getShard(txn, conf->getPrimaryId()); BSONObj shardResult; try { ShardConnection conn(primaryShard->getConnString(), ""); // TODO: this can throw a stale config when mongos is not up-to-date -- fix. if (!conn->runCommand(nss.db().toString(), command, shardResult, options)) { conn.done(); return Status(ErrorCodes::OperationFailed, str::stream() << "Passthrough command failed: " << command << " on ns " << nss.ns() << "; result: " << shardResult); } conn.done(); } catch (const DBException& ex) { return ex.toStatus(); } // Fill out the command result. Strategy::CommandResult cmdResult; cmdResult.shardTargetId = conf->getPrimaryId(); cmdResult.result = shardResult; cmdResult.target = primaryShard->getConnString(); return ClusterExplain::buildExplainResult( txn, {cmdResult}, ClusterExplain::kSingleShard, timer.millis(), out); } } groupCmd; class SplitVectorCmd : public NotAllowedOnShardedCollectionCmd { public: SplitVectorCmd() : NotAllowedOnShardedCollectionCmd("splitVector") {} virtual bool passOptions() const { return true; } virtual Status checkAuthForCommand(ClientBasic* client, const std::string& dbname, const BSONObj& cmdObj) { if (!AuthorizationSession::get(client)->isAuthorizedForActionsOnResource( ResourcePattern::forExactNamespace(NamespaceString(parseNs(dbname, cmdObj))), ActionType::splitVector)) { return Status(ErrorCodes::Unauthorized, "Unauthorized"); } return Status::OK(); } virtual bool run(OperationContext* txn, const string& dbName, BSONObj& cmdObj, int options, string& errmsg, BSONObjBuilder& result) { string x = parseNs(dbName, cmdObj); if (!str::startsWith(x, dbName)) { errmsg = str::stream() << "doing a splitVector across dbs isn't supported via mongos"; return false; } return NotAllowedOnShardedCollectionCmd::run(txn, dbName, cmdObj, options, errmsg, result); } virtual std::string parseNs(const string& dbname, const BSONObj& cmdObj) const { return parseNsFullyQualified(dbname, cmdObj); } } splitVectorCmd; class DistinctCmd : public PublicGridCommand { public: DistinctCmd() : PublicGridCommand("distinct") {} virtual void help(stringstream& help) const { help << "{ distinct : 'collection name' , key : 'a.b' , query : {} }"; } virtual bool passOptions() const { return true; } virtual void addRequiredPrivileges(const std::string& dbname, const BSONObj& cmdObj, std::vector* out) { ActionSet actions; actions.addAction(ActionType::find); out->push_back(Privilege(parseResourcePattern(dbname, cmdObj), actions)); } bool run(OperationContext* txn, const string& dbName, BSONObj& cmdObj, int options, string& errmsg, BSONObjBuilder& result) { const string fullns = parseNs(dbName, cmdObj); auto status = grid.catalogCache()->getDatabase(txn, dbName); if (!status.isOK()) { return appendEmptyResultSet(result, status.getStatus(), fullns); } shared_ptr conf = status.getValue(); if (!conf->isShardingEnabled() || !conf->isSharded(fullns)) { return passthrough(txn, conf, cmdObj, options, result); } ChunkManagerPtr cm = conf->getChunkManager(txn, fullns); massert(10420, "how could chunk manager be null!", cm); BSONObj query = getQuery(cmdObj); set shardIds; cm->getShardIdsForQuery(txn, query, &shardIds); set all; int size = 32; for (const ShardId& shardId : shardIds) { const auto shard = grid.shardRegistry()->getShard(txn, shardId); if (!shard) { continue; } ShardConnection conn(shard->getConnString(), fullns); BSONObj res; bool ok = conn->runCommand(conf->name(), cmdObj, res, options); conn.done(); if (!ok) { result.appendElements(res); return false; } BSONObjIterator it(res["values"].embeddedObject()); while (it.more()) { BSONElement nxt = it.next(); BSONObjBuilder temp(32); temp.appendAs(nxt, ""); all.insert(temp.obj()); } } BSONObjBuilder b(size); int n = 0; for (set::iterator i = all.begin(); i != all.end(); i++) { b.appendAs(i->firstElement(), b.numStr(n++)); } result.appendArray("values", b.obj()); return true; } Status explain(OperationContext* txn, const std::string& dbname, const BSONObj& cmdObj, ExplainCommon::Verbosity verbosity, const rpc::ServerSelectionMetadata& serverSelectionMetadata, BSONObjBuilder* out) const { const string fullns = parseNs(dbname, cmdObj); // Extract the targeting query. BSONObj targetingQuery; if (BSONElement queryElt = cmdObj["query"]) { if (queryElt.type() == BSONType::Object) { targetingQuery = queryElt.embeddedObject(); } else if (queryElt.type() != BSONType::jstNULL) { return Status(ErrorCodes::TypeMismatch, str::stream() << "\"query\" had the wrong type. Expected " << typeName(BSONType::Object) << " or " << typeName(BSONType::jstNULL) << ", found " << typeName(queryElt.type())); } } BSONObjBuilder explainCmdBob; int options = 0; ClusterExplain::wrapAsExplain( cmdObj, verbosity, serverSelectionMetadata, &explainCmdBob, &options); // We will time how long it takes to run the commands on the shards. Timer timer; vector shardResults; Strategy::commandOp( txn, dbname, explainCmdBob.obj(), options, fullns, targetingQuery, &shardResults); long long millisElapsed = timer.millis(); const char* mongosStageName = ClusterExplain::getStageNameForReadOp(shardResults, cmdObj); return ClusterExplain::buildExplainResult( txn, shardResults, mongosStageName, millisElapsed, out); } } disinctCmd; class FileMD5Cmd : public PublicGridCommand { public: FileMD5Cmd() : PublicGridCommand("filemd5") {} virtual void help(stringstream& help) const { help << " example: { filemd5 : ObjectId(aaaaaaa) , root : \"fs\" }"; } virtual std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const { std::string collectionName = cmdObj.getStringField("root"); if (collectionName.empty()) collectionName = "fs"; collectionName += ".chunks"; return NamespaceString(dbname, collectionName).ns(); } virtual void addRequiredPrivileges(const std::string& dbname, const BSONObj& cmdObj, std::vector* out) { out->push_back(Privilege(parseResourcePattern(dbname, cmdObj), ActionType::find)); } bool run(OperationContext* txn, const string& dbName, BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result) { const string fullns = parseNs(dbName, cmdObj); auto conf = uassertStatusOK(grid.catalogCache()->getDatabase(txn, dbName)); if (!conf->isShardingEnabled() || !conf->isSharded(fullns)) { return passthrough(txn, conf, cmdObj, result); } ChunkManagerPtr cm = conf->getChunkManager(txn, fullns); massert(13091, "how could chunk manager be null!", cm); if (cm->getShardKeyPattern().toBSON() == BSON("files_id" << 1)) { BSONObj finder = BSON("files_id" << cmdObj.firstElement()); vector results; Strategy::commandOp(txn, dbName, cmdObj, 0, fullns, finder, &results); verify(results.size() == 1); // querying on shard key so should only talk to one shard BSONObj res = results.begin()->result; result.appendElements(res); return res["ok"].trueValue(); } else if (cm->getShardKeyPattern().toBSON() == BSON("files_id" << 1 << "n" << 1)) { int n = 0; BSONObj lastResult; while (true) { // Theory of operation: Starting with n=0, send filemd5 command to shard // with that chunk (gridfs chunk not sharding chunk). That shard will then // compute a partial md5 state (passed in the "md5state" field) for all // contiguous chunks that it has. When it runs out or hits a discontinuity // (eg [1,2,7]) it returns what it has done so far. This is repeated as // long as we keep getting more chunks. The end condition is when we go to // look for chunk n and it doesn't exist. This means that the file's last // chunk is n-1, so we return the computed md5 results. BSONObjBuilder bb; bb.appendElements(cmdObj); bb.appendBool("partialOk", true); bb.append("startAt", n); if (!lastResult.isEmpty()) { bb.append(lastResult["md5state"]); } BSONObj shardCmd = bb.obj(); BSONObj finder = BSON("files_id" << cmdObj.firstElement() << "n" << n); vector results; try { Strategy::commandOp(txn, dbName, shardCmd, 0, fullns, finder, &results); } catch (DBException& e) { // This is handled below and logged Strategy::CommandResult errResult; errResult.shardTargetId = ""; errResult.result = BSON("errmsg" << e.what() << "ok" << 0); results.push_back(errResult); } verify(results.size() == 1); // querying on shard key so should only talk to one shard BSONObj res = results.begin()->result; bool ok = res["ok"].trueValue(); if (!ok) { // Add extra info to make debugging easier result.append("failedAt", n); result.append("sentCommand", shardCmd); BSONForEach(e, res) { if (!str::equals(e.fieldName(), "errmsg")) result.append(e); } log() << "Sharded filemd5 failed: " << result.asTempObj(); errmsg = string("sharded filemd5 failed because: ") + res["errmsg"].valuestrsafe(); return false; } uassert(16246, "Shard " + conf->name() + " is too old to support GridFS sharded by {files_id:1, n:1}", res.hasField("md5state")); lastResult = res; int nNext = res["numChunks"].numberInt(); if (n == nNext) { // no new data means we've reached the end of the file result.appendElements(res); return true; } verify(nNext > n); n = nNext; } verify(0); } // We could support arbitrary shard keys by sending commands to all shards but I don't think // we should errmsg = "GridFS fs.chunks collection must be sharded on either {files_id:1} or {files_id:1, " "n:1}"; return false; } } fileMD5Cmd; class Geo2dFindNearCmd : public PublicGridCommand { public: Geo2dFindNearCmd() : PublicGridCommand("geoNear") {} void help(stringstream& h) const { h << "http://dochub.mongodb.org/core/geo#GeospatialIndexing-geoNearCommand"; } virtual bool passOptions() const { return true; } virtual void addRequiredPrivileges(const std::string& dbname, const BSONObj& cmdObj, std::vector* out) { ActionSet actions; actions.addAction(ActionType::find); out->push_back(Privilege(parseResourcePattern(dbname, cmdObj), actions)); } bool run(OperationContext* txn, const string& dbName, BSONObj& cmdObj, int options, string& errmsg, BSONObjBuilder& result) { const string fullns = parseNs(dbName, cmdObj); auto conf = uassertStatusOK(grid.catalogCache()->getDatabase(txn, dbName)); if (!conf->isShardingEnabled() || !conf->isSharded(fullns)) { return passthrough(txn, conf, cmdObj, options, result); } ChunkManagerPtr cm = conf->getChunkManager(txn, fullns); massert(13500, "how could chunk manager be null!", cm); BSONObj query = getQuery(cmdObj); set shardIds; cm->getShardIdsForQuery(txn, query, &shardIds); // We support both "num" and "limit" options to control limit int limit = 100; const char* limitName = cmdObj["num"].isNumber() ? "num" : "limit"; if (cmdObj[limitName].isNumber()) limit = cmdObj[limitName].numberInt(); list> futures; BSONArrayBuilder shardArray; for (const ShardId& shardId : shardIds) { const auto shard = grid.shardRegistry()->getShard(txn, shardId); if (!shard) { continue; } futures.push_back( Future::spawnCommand(shard->getConnString().toString(), dbName, cmdObj, options)); shardArray.append(shardId); } multimap results; // TODO: maybe use merge-sort instead string nearStr; double time = 0; double btreelocs = 0; double nscanned = 0; double objectsLoaded = 0; for (list>::iterator i = futures.begin(); i != futures.end(); i++) { shared_ptr res = *i; if (!res->join(txn)) { errmsg = res->result()["errmsg"].String(); if (res->result().hasField("code")) { result.append(res->result()["code"]); } return false; } if (res->result().hasField("near")) { nearStr = res->result()["near"].String(); } time += res->result()["stats"]["time"].Number(); if (!res->result()["stats"]["btreelocs"].eoo()) { btreelocs += res->result()["stats"]["btreelocs"].Number(); } nscanned += res->result()["stats"]["nscanned"].Number(); if (!res->result()["stats"]["objectsLoaded"].eoo()) { objectsLoaded += res->result()["stats"]["objectsLoaded"].Number(); } BSONForEach(obj, res->result()["results"].embeddedObject()) { results.insert(make_pair(obj["dis"].Number(), obj.embeddedObject().getOwned())); } // TODO: maybe shrink results if size() > limit } result.append("ns", fullns); result.append("near", nearStr); int outCount = 0; double totalDistance = 0; double maxDistance = 0; { BSONArrayBuilder sub(result.subarrayStart("results")); for (multimap::const_iterator it(results.begin()), end(results.end()); it != end && outCount < limit; ++it, ++outCount) { totalDistance += it->first; maxDistance = it->first; // guaranteed to be highest so far sub.append(it->second); } sub.done(); } { BSONObjBuilder sub(result.subobjStart("stats")); sub.append("time", time); sub.append("btreelocs", btreelocs); sub.append("nscanned", nscanned); sub.append("objectsLoaded", objectsLoaded); sub.append("avgDistance", (outCount == 0) ? 0 : (totalDistance / outCount)); sub.append("maxDistance", maxDistance); sub.append("shards", shardArray.arr()); sub.done(); } return true; } } geo2dFindNearCmd; class ApplyOpsCmd : public PublicGridCommand { public: ApplyOpsCmd() : PublicGridCommand("applyOps") {} virtual void addRequiredPrivileges(const std::string& dbname, const BSONObj& cmdObj, std::vector* out) { // applyOps can do pretty much anything, so require all privileges. RoleGraph::generateUniversalPrivileges(out); } virtual bool run(OperationContext* txn, const string& dbName, BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result) { errmsg = "applyOps not allowed through mongos"; return false; } } applyOpsCmd; class CompactCmd : public PublicGridCommand { public: CompactCmd() : PublicGridCommand("compact") {} virtual void addRequiredPrivileges(const std::string& dbname, const BSONObj& cmdObj, std::vector* out) { ActionSet actions; actions.addAction(ActionType::compact); out->push_back(Privilege(parseResourcePattern(dbname, cmdObj), actions)); } virtual bool run(OperationContext* txn, const string& dbName, BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result) { errmsg = "compact not allowed through mongos"; return false; } } compactCmd; class EvalCmd : public PublicGridCommand { public: EvalCmd() : PublicGridCommand("eval", "$eval") {} virtual void addRequiredPrivileges(const std::string& dbname, const BSONObj& cmdObj, std::vector* out) { // $eval can do pretty much anything, so require all privileges. RoleGraph::generateUniversalPrivileges(out); } virtual bool run(OperationContext* txn, const string& dbName, BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result) { RARELY { warning() << "the eval command is deprecated" << startupWarningsLog; } // $eval isn't allowed to access sharded collections, but we need to leave the // shard to detect that. auto status = grid.catalogCache()->getDatabase(txn, dbName); if (!status.isOK()) { return appendCommandStatus(result, status.getStatus()); } shared_ptr conf = status.getValue(); return passthrough(txn, conf, cmdObj, result); } } evalCmd; class CmdListCollections final : public PublicGridCommand { public: CmdListCollections() : PublicGridCommand("listCollections") {} Status checkAuthForCommand(ClientBasic* client, const std::string& dbname, const BSONObj& cmdObj) final { AuthorizationSession* authzSession = AuthorizationSession::get(client); // Check for the listCollections ActionType on the database // or find on system.namespaces for pre 3.0 systems. if (authzSession->isAuthorizedForActionsOnResource(ResourcePattern::forDatabaseName(dbname), ActionType::listCollections) || authzSession->isAuthorizedForActionsOnResource( ResourcePattern::forExactNamespace(NamespaceString(dbname, "system.namespaces")), ActionType::find)) { return Status::OK(); } return Status(ErrorCodes::Unauthorized, str::stream() << "Not authorized to create users on db: " << dbname); } bool run(OperationContext* txn, const string& dbName, BSONObj& cmdObj, int options, string& errmsg, BSONObjBuilder& result) final { auto conf = grid.catalogCache()->getDatabase(txn, dbName); if (!conf.isOK()) { return appendEmptyResultSet(result, conf.getStatus(), dbName + ".$cmd.listCollections"); } return cursorCommandPassthrough(txn, conf.getValue(), cmdObj, options, &result); } } cmdListCollections; class CmdListIndexes final : public PublicGridCommand { public: CmdListIndexes() : PublicGridCommand("listIndexes") {} virtual Status checkAuthForCommand(ClientBasic* client, const std::string& dbname, const BSONObj& cmdObj) { AuthorizationSession* authzSession = AuthorizationSession::get(client); // Check for the listIndexes ActionType on the database, or find on system.indexes for pre // 3.0 systems. NamespaceString ns(parseNs(dbname, cmdObj)); if (authzSession->isAuthorizedForActionsOnResource(ResourcePattern::forExactNamespace(ns), ActionType::listIndexes) || authzSession->isAuthorizedForActionsOnResource( ResourcePattern::forExactNamespace(NamespaceString(dbname, "system.indexes")), ActionType::find)) { return Status::OK(); } return Status(ErrorCodes::Unauthorized, str::stream() << "Not authorized to list indexes on collection: " << ns.coll()); } bool run(OperationContext* txn, const string& dbName, BSONObj& cmdObj, int options, string& errmsg, BSONObjBuilder& result) final { auto conf = grid.catalogCache()->getDatabase(txn, dbName); if (!conf.isOK()) { return appendCommandStatus(result, conf.getStatus()); } return cursorCommandPassthrough(txn, conf.getValue(), cmdObj, options, &result); } } cmdListIndexes; class AvailableQueryOptions : public Command { public: AvailableQueryOptions() : Command("availableQueryOptions", false, "availablequeryoptions") {} virtual bool slaveOk() const { return true; } virtual bool isWriteCommandForConfigServer() const { return false; } virtual Status checkAuthForCommand(ClientBasic* client, const std::string& dbname, const BSONObj& cmdObj) { return Status::OK(); } virtual bool run(OperationContext* txn, const string& dbname, BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result) { result << "options" << QueryOption_AllSupportedForSharding; return true; } } availableQueryOptionsCmd; } // namespace } // namespace mongo