diff options
author | Esha Maharishi <esha.maharishi@mongodb.com> | 2017-03-31 17:20:07 -0400 |
---|---|---|
committer | Esha Maharishi <esha.maharishi@mongodb.com> | 2017-04-06 13:29:56 -0400 |
commit | c8e0e1668b25fde6e5980a6431fc7db24411c109 (patch) | |
tree | f138e8a1fa2b99cab64461fd450ef8dba4b066dd | |
parent | 366b39a143ec17a65e8ccc08f1883e4c92dd669e (diff) | |
download | mongo-c8e0e1668b25fde6e5980a6431fc7db24411c109.tar.gz |
SERVER-28165 make RunOnAllShardsCommand use ARS instead of Future::spawnCommand
-rw-r--r-- | jstests/sharding/stats.js | 17 | ||||
-rw-r--r-- | src/mongo/db/catalog/coll_mod.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/commands/create_indexes.cpp | 3 | ||||
-rw-r--r-- | src/mongo/s/commands/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_current_op.cpp | 43 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_db_stats_cmd.cpp | 45 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_repair_database_cmd.cpp | 34 | ||||
-rw-r--r-- | src/mongo/s/commands/commands_public.cpp | 199 | ||||
-rw-r--r-- | src/mongo/s/commands/run_on_all_shards_cmd.cpp | 202 | ||||
-rw-r--r-- | src/mongo/s/commands/run_on_all_shards_cmd.h | 101 | ||||
-rw-r--r-- | src/mongo/s/commands/scatter_gather_from_shards.cpp | 166 | ||||
-rw-r--r-- | src/mongo/s/commands/scatter_gather_from_shards.h | 60 |
12 files changed, 431 insertions, 443 deletions
diff --git a/jstests/sharding/stats.js b/jstests/sharding/stats.js index 9dc521cc298..628abb2d895 100644 --- a/jstests/sharding/stats.js +++ b/jstests/sharding/stats.js @@ -4,9 +4,6 @@ s.adminCommand({enablesharding: "test"}); - a = s._connections[0].getDB("test"); - b = s._connections[1].getDB("test"); - db = s.getDB("test"); s.ensurePrimaryShard('test', 'shard0001'); @@ -43,6 +40,9 @@ bulk.insert({_id: i}); assert.writeOK(bulk.execute()); + a = s.shard0.getDB("test"); + b = s.shard1.getDB("test"); + x = assert.commandWorked(db.foo.stats()); assert.eq(N, x.count, "coll total count expected"); assert.eq(db.foo.count(), x.count, "coll total count match"); @@ -66,15 +66,12 @@ x = assert.commandWorked(db.stats()); - // dbstats uses Future::CommandResult so raw output uses connection strings not shard names - shards = Object.keySet(x.raw); - assert.eq(N + (a_extras + b_extras), x.objects, "db total count expected"); assert.eq(2, numKeys(x.raw), "db shard num"); - assert.eq((N / 2) + a_extras, x.raw[shards[0]].objects, "db count on shard0000 expected"); - assert.eq((N / 2) + b_extras, x.raw[shards[1]].objects, "db count on shard0001 expected"); - assert.eq(a.stats().objects, x.raw[shards[0]].objects, "db count on shard0000 match"); - assert.eq(b.stats().objects, x.raw[shards[1]].objects, "db count on shard0001 match"); + assert.eq((N / 2) + a_extras, x.raw[s.shard0.name].objects, "db count on shard0000 expected"); + assert.eq((N / 2) + b_extras, x.raw[s.shard1.name].objects, "db count on shard0001 expected"); + assert.eq(a.stats().objects, x.raw[s.shard0.name].objects, "db count on shard0000 match"); + assert.eq(b.stats().objects, x.raw[s.shard1.name].objects, "db count on shard0001 match"); /* Test db.stat() and db.collection.stat() scaling */ diff --git a/src/mongo/db/catalog/coll_mod.cpp b/src/mongo/db/catalog/coll_mod.cpp index 661cb5b1ab2..93abe72214c 100644 --- a/src/mongo/db/catalog/coll_mod.cpp +++ b/src/mongo/db/catalog/coll_mod.cpp @@ -76,6 +76,8 @@ StatusWith<CollModRequest> parseCollModRequest(OperationContext* opCtx, // no-op ignore top-level fields prefixed with $. They are for the command processor } else if (QueryRequest::cmdOptionMaxTimeMS == e.fieldNameStringData()) { // no-op + } else if (str::equals("shardVersion", e.fieldName())) { + // no-op } else if (str::equals("index", e.fieldName()) && !isView) { BSONObj indexObj = e.Obj(); StringData indexName; diff --git a/src/mongo/db/commands/create_indexes.cpp b/src/mongo/db/commands/create_indexes.cpp index 2e9bd0ceaed..189ec8e78f1 100644 --- a/src/mongo/db/commands/create_indexes.cpp +++ b/src/mongo/db/commands/create_indexes.cpp @@ -68,6 +68,7 @@ const StringData kIndexesFieldName = "indexes"_sd; const StringData kCommandName = "createIndexes"_sd; const StringData kWriteConcern = "writeConcern"_sd; const StringData kMaxTimeMS = "maxTimeMS"_sd; +const StringData kShardVersion = "shardVersion"_sd; /** * Parses the index specifications from 'cmdObj', validates them, and returns equivalent index @@ -125,7 +126,7 @@ StatusWith<std::vector<BSONObj>> parseAndValidateIndexSpecs( hasIndexesField = true; } else if (kCommandName == cmdElemFieldName || kWriteConcern == cmdElemFieldName || - kMaxTimeMS == cmdElemFieldName) { + kMaxTimeMS == cmdElemFieldName || kShardVersion == cmdElemFieldName) { continue; } else { return {ErrorCodes::BadValue, diff --git a/src/mongo/s/commands/SConscript b/src/mongo/s/commands/SConscript index b78738edf65..b7af6271a4e 100644 --- a/src/mongo/s/commands/SConscript +++ b/src/mongo/s/commands/SConscript @@ -76,7 +76,7 @@ env.Library( 'cluster_write.cpp', 'cluster_write_cmd.cpp', 'commands_public.cpp', - 'run_on_all_shards_cmd.cpp', + 'scatter_gather_from_shards.cpp', 'sharded_command_processing.cpp', 'strategy.cpp', ], diff --git a/src/mongo/s/commands/cluster_current_op.cpp b/src/mongo/s/commands/cluster_current_op.cpp index c9a307c4c39..ecf1c1fb30b 100644 --- a/src/mongo/s/commands/cluster_current_op.cpp +++ b/src/mongo/s/commands/cluster_current_op.cpp @@ -38,8 +38,10 @@ #include "mongo/db/auth/authorization_session.h" #include "mongo/db/commands.h" #include "mongo/db/jsobj.h" -#include "mongo/s/commands/run_on_all_shards_cmd.h" +#include "mongo/s/client/shard_registry.h" +#include "mongo/s/commands/scatter_gather_from_shards.h" #include "mongo/s/commands/strategy.h" +#include "mongo/s/grid.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" @@ -51,12 +53,15 @@ const char kOpIdFieldName[] = "opid"; const char kClientFieldName[] = "client"; // awkward underscores used to make this visually distinct from kClientFieldName const char kClient_S_FieldName[] = "client_s"; - const char kCommandName[] = "currentOp"; -class ClusterCurrentOpCommand : public RunOnAllShardsCommand { +class ClusterCurrentOpCommand : public Command { public: - ClusterCurrentOpCommand() : RunOnAllShardsCommand(kCommandName) {} + ClusterCurrentOpCommand() : Command(kCommandName) {} + + bool slaveOk() const override { + return true; + } bool adminOnly() const final { return true; @@ -75,7 +80,33 @@ public: return false; } - void aggregateResults(const std::vector<ShardAndReply>& results, BSONObjBuilder& output) final { + using ShardAndReply = std::tuple<ShardId, BSONObj>; + + bool run(OperationContext* opCtx, + const std::string& dbName, + BSONObj& cmdObj, + int options, + std::string& errmsg, + BSONObjBuilder& output) override { + + // Target all shards. + std::vector<AsyncRequestsSender::Request> requests; + std::vector<ShardId> shardIds; + Grid::get(opCtx)->shardRegistry()->getAllShardIds(&shardIds); + for (auto&& shardId : shardIds) { + requests.emplace_back(std::move(shardId), cmdObj); + } + + auto swResults = gatherResults(opCtx, dbName, cmdObj, options, requests, &output); + if (!swResults.isOK()) { + return appendCommandStatus(output, swResults.getStatus()); + } + + aggregateResults(std::move(swResults.getValue()), output); + return true; + } + + void aggregateResults(const std::vector<ShardAndReply>& results, BSONObjBuilder& output) { // Each shard responds with a document containing an array of subdocuments. // Each subdocument represents an operation running on that shard. // We merge the responses into a single document containg an array @@ -99,7 +130,7 @@ public: BSONArrayBuilder aggregatedOpsBab(output.subarrayStart(kInprogFieldName)); for (auto&& shardResponse : results) { - StringData shardName; + ShardId shardName; BSONObj shardResponseObj; std::tie(shardName, shardResponseObj) = shardResponse; diff --git a/src/mongo/s/commands/cluster_db_stats_cmd.cpp b/src/mongo/s/commands/cluster_db_stats_cmd.cpp index 1546fe4482e..16151cf0f1c 100644 --- a/src/mongo/s/commands/cluster_db_stats_cmd.cpp +++ b/src/mongo/s/commands/cluster_db_stats_cmd.cpp @@ -30,16 +30,27 @@ #include <vector> -#include "mongo/s/commands/run_on_all_shards_cmd.h" +#include "mongo/s/commands/scatter_gather_from_shards.h" + +#include "mongo/s/client/shard_registry.h" +#include "mongo/s/grid.h" namespace mongo { namespace { using std::vector; -class DBStatsCmd : public RunOnAllShardsCommand { +class DBStatsCmd : public Command { public: - DBStatsCmd() : RunOnAllShardsCommand("dbStats", "dbstats") {} + DBStatsCmd() : Command("dbStats", false, "dbstats") {} + + bool slaveOk() const override { + return true; + } + bool adminOnly() const override { + return false; + } + virtual void addRequiredPrivileges(const std::string& dbname, const BSONObj& cmdObj, std::vector<Privilege>* out) { @@ -52,7 +63,33 @@ public: return false; } - virtual void aggregateResults(const vector<ShardAndReply>& results, BSONObjBuilder& output) { + using ShardAndReply = std::tuple<ShardId, BSONObj>; + + bool run(OperationContext* opCtx, + const std::string& dbName, + BSONObj& cmdObj, + int options, + std::string& errmsg, + BSONObjBuilder& output) override { + + // Target all shards. + std::vector<AsyncRequestsSender::Request> requests; + std::vector<ShardId> shardIds; + Grid::get(opCtx)->shardRegistry()->getAllShardIds(&shardIds); + for (auto&& shardId : shardIds) { + requests.emplace_back(std::move(shardId), cmdObj); + } + + auto swResults = gatherResults(opCtx, dbName, cmdObj, options, requests, &output); + if (!swResults.isOK()) { + return appendCommandStatus(output, swResults.getStatus()); + } + + aggregateResults(std::move(swResults.getValue()), output); + return true; + } + + void aggregateResults(const vector<ShardAndReply>& results, BSONObjBuilder& output) { long long objects = 0; long long unscaledDataSize = 0; long long dataSize = 0; diff --git a/src/mongo/s/commands/cluster_repair_database_cmd.cpp b/src/mongo/s/commands/cluster_repair_database_cmd.cpp index 9affce5c598..707d6149846 100644 --- a/src/mongo/s/commands/cluster_repair_database_cmd.cpp +++ b/src/mongo/s/commands/cluster_repair_database_cmd.cpp @@ -28,14 +28,24 @@ #include "mongo/platform/basic.h" -#include "mongo/s/commands/run_on_all_shards_cmd.h" +#include "mongo/s/commands/scatter_gather_from_shards.h" + +#include "mongo/s/client/shard_registry.h" +#include "mongo/s/grid.h" namespace mongo { namespace { -class ClusterRepairDatabaseCmd : public RunOnAllShardsCommand { +class ClusterRepairDatabaseCmd : public Command { public: - ClusterRepairDatabaseCmd() : RunOnAllShardsCommand("repairDatabase") {} + ClusterRepairDatabaseCmd() : Command("repairDatabase") {} + + bool slaveOk() const override { + return true; + } + bool adminOnly() const override { + return false; + } virtual void addRequiredPrivileges(const std::string& dbname, const BSONObj& cmdObj, @@ -49,6 +59,24 @@ public: return false; } + bool run(OperationContext* opCtx, + const std::string& dbName, + BSONObj& cmdObj, + int options, + std::string& errmsg, + BSONObjBuilder& output) override { + // Target all shards. + std::vector<AsyncRequestsSender::Request> requests; + std::vector<ShardId> shardIds; + Grid::get(opCtx)->shardRegistry()->getAllShardIds(&shardIds); + for (auto&& shardId : shardIds) { + requests.emplace_back(std::move(shardId), cmdObj); + } + + auto swResults = gatherResults(opCtx, dbName, cmdObj, options, requests, &output); + return appendCommandStatus(output, swResults.getStatus()); + } + } clusterRepairDatabaseCmd; } // namespace diff --git a/src/mongo/s/commands/commands_public.cpp b/src/mongo/s/commands/commands_public.cpp index c7c3b6f4273..371782d2e4d 100644 --- a/src/mongo/s/commands/commands_public.cpp +++ b/src/mongo/s/commands/commands_public.cpp @@ -61,7 +61,7 @@ #include "mongo/s/commands/cluster_aggregate.h" #include "mongo/s/commands/cluster_commands_common.h" #include "mongo/s/commands/cluster_explain.h" -#include "mongo/s/commands/run_on_all_shards_cmd.h" +#include "mongo/s/commands/scatter_gather_from_shards.h" #include "mongo/s/commands/sharded_command_processing.h" #include "mongo/s/grid.h" #include "mongo/s/query/store_possible_cursor.h" @@ -85,6 +85,8 @@ using std::vector; namespace { +const int kMaxNumStaleVersionRetries = 3; + bool cursorCommandPassthrough(OperationContext* opCtx, StringData dbName, const ShardId& shardId, @@ -211,30 +213,90 @@ protected: } }; -class AllShardsCollectionCommand : public RunOnAllShardsCommand { +class AllShardsCollectionCommand : public Command { protected: - AllShardsCollectionCommand(const char* n, + AllShardsCollectionCommand(const char* name, const char* oldname = NULL, - bool useShardConn = false, bool implicitCreateDb = false) - : RunOnAllShardsCommand(n, oldname, useShardConn, implicitCreateDb) {} + : Command(name, false, oldname), _implicitCreateDb(implicitCreateDb) {} + + bool slaveOk() const override { + return true; + } + bool adminOnly() const override { + return false; + } + + BSONObj appendShardVersion(BSONObj cmdObj, ChunkVersion version) { + BSONObjBuilder cmdWithVersionBob; + cmdWithVersionBob.appendElements(cmdObj); + version.appendForCommands(&cmdWithVersionBob); + return cmdWithVersionBob.obj(); + } - void getShardIds(OperationContext* opCtx, - const string& dbName, - BSONObj& cmdObj, - vector<ShardId>& shardIds) override { + bool run(OperationContext* opCtx, + const string& dbName, + BSONObj& cmdObj, + int options, + std::string& errmsg, + BSONObjBuilder& output) override { const NamespaceString nss(parseNsCollectionRequired(dbName, cmdObj)); + LOG(1) << "AllShardsCollectionCommand: " << nss << " cmd:" << redact(cmdObj); - const auto routingInfo = - uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); - if (routingInfo.cm()) { - // If it's a sharded collection, send it to all shards - Grid::get(opCtx)->shardRegistry()->getAllShardIds(&shardIds); - } else { - // Otherwise just send it to the primary shard for the database - shardIds.push_back(routingInfo.primaryId()); + if (_implicitCreateDb) { + uassertStatusOK(createShardDatabase(opCtx, dbName)); + } + + Status status = Status::OK(); + int numAttempts = 0; + while (numAttempts < kMaxNumStaleVersionRetries) { + status = Status::OK(); + output.resetToEmpty(); + + // Target only shards that own data for the collection, and append the shardVersion for + // the collection to the command. + std::vector<AsyncRequestsSender::Request> requests; + auto routingInfo = uassertStatusOK( + Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); + if (routingInfo.cm()) { + vector<ShardId> shardIds; + Grid::get(opCtx)->shardRegistry()->getAllShardIds(&shardIds); + for (const ShardId& shardId : shardIds) { + requests.emplace_back( + std::move(shardId), + appendShardVersion(cmdObj, routingInfo.cm()->getVersion(shardId))); + } + } else { + if (routingInfo.primary()->isConfig()) { + // Don't append shard version info when contacting the config servers. + requests.emplace_back(routingInfo.primaryId(), cmdObj); + } else { + requests.emplace_back(routingInfo.primaryId(), + appendShardVersion(cmdObj, ChunkVersion::UNSHARDED())); + } + } + + auto swResults = gatherResults(opCtx, dbName, cmdObj, options, requests, &output); + if (ErrorCodes::isStaleShardingError(swResults.getStatus().code())) { + Grid::get(opCtx)->catalogCache()->onStaleConfigError(std::move(routingInfo)); + ++numAttempts; + continue; + } + status = swResults.getStatus(); + break; + } + + // We don't uassertStatusOK(), because that causes 'output' to be cleared, but we want to + // report the raw results even if there was an error. + if (!status.isOK()) { + return appendCommandStatus(output, status); } + return true; } + +private: + // Whether the requested database should be created implicitly + const bool _implicitCreateDb; }; class NotAllowedOnShardedCollectionCmd : public PublicGridCommand { @@ -281,100 +343,7 @@ public: CreateIndexesCmd() : AllShardsCollectionCommand("createIndexes", NULL, /* oldName */ - true /* use ShardConnection */, - true /* implicit create db */) { - // createIndexes command should use ShardConnection so the getLastError would - // be able to properly enforce the write concern (via the saveGLEStats callback). - } - - /** - * the createIndexes command doesn't require the 'ns' field to be populated - * so we make sure its here as its needed for the system.indexes insert - */ - BSONObj fixSpec(const NamespaceString& ns, const BSONObj& original) const { - if (original["ns"].type() == String) - return original; - BSONObjBuilder bb; - bb.appendElements(original); - bb.append("ns", ns.toString()); - return bb.obj(); - } - - /** - * @return equivalent of gle - */ - BSONObj createIndexLegacy(const string& server, - const NamespaceString& nss, - const BSONObj& spec) const { - try { - ScopedDbConnection conn(server); - conn->insert(nss.getSystemIndexesCollection(), spec); - BSONObj gle = conn->getLastErrorDetailed(nss.db().toString()); - conn.done(); - return gle; - } catch (DBException& e) { - BSONObjBuilder b; - b.append("errmsg", e.toString()); - b.append("code", e.getCode()); - b.append("codeName", ErrorCodes::errorString(ErrorCodes::fromInt(e.getCode()))); - return b.obj(); - } - } - - virtual BSONObj specialErrorHandler(const string& server, - const string& dbName, - const BSONObj& cmdObj, - const BSONObj& originalResult) const { - string errmsg = originalResult["errmsg"]; - if (errmsg.find("no such cmd") == string::npos) { - // cannot use codes as 2.4 didn't have a code for this - return originalResult; - } - - // we need to down convert - - NamespaceString nss(dbName, cmdObj["createIndexes"].String()); - - if (cmdObj["indexes"].type() != Array) - return originalResult; - - BSONObjBuilder newResult; - newResult.append("note", "downgraded"); - newResult.append("sentTo", server); - - BSONArrayBuilder individualResults; - - bool ok = true; - - BSONObjIterator indexIterator(cmdObj["indexes"].Obj()); - while (indexIterator.more()) { - BSONObj spec = indexIterator.next().Obj(); - spec = fixSpec(nss, spec); - - BSONObj gle = createIndexLegacy(server, nss, spec); - - individualResults.append(BSON("spec" << spec << "gle" << gle)); - - BSONElement e = gle["errmsg"]; - if (e.type() == String && e.String().size() > 0) { - ok = false; - newResult.appendAs(e, "errmsg"); - break; - } - - e = gle["err"]; - if (e.type() == String && e.String().size() > 0) { - ok = false; - newResult.appendAs(e, "errmsg"); - break; - } - } - - newResult.append("eachIndex", individualResults.arr()); - - newResult.append("ok", ok ? 1 : 0); - return newResult.obj(); - } + true /* implicit create db */) {} virtual void addRequiredPrivileges(const std::string& dbname, const BSONObj& cmdObj, @@ -1477,11 +1446,11 @@ public: verify(0); } - // We could support arbitrary shard keys by sending commands to all shards but I don't think - // we should + // We could support arbitrary shard keys by sending commands to all shards but I don't + // think we should errmsg = - "GridFS fs.chunks collection must be sharded on either {files_id:1} or {files_id:1, " - "n:1}"; + "GridFS fs.chunks collection must be sharded on either {files_id:1} or " + "{files_id:1, n:1}"; return false; } } fileMD5Cmd; diff --git a/src/mongo/s/commands/run_on_all_shards_cmd.cpp b/src/mongo/s/commands/run_on_all_shards_cmd.cpp deleted file mode 100644 index 881b7d654ab..00000000000 --- a/src/mongo/s/commands/run_on_all_shards_cmd.cpp +++ /dev/null @@ -1,202 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand - -#include "mongo/platform/basic.h" - -#include "mongo/s/commands/run_on_all_shards_cmd.h" - -#include <list> -#include <set> - -#include "mongo/db/jsobj.h" -#include "mongo/s/catalog_cache.h" -#include "mongo/s/client/shard.h" -#include "mongo/s/client/shard_registry.h" -#include "mongo/s/commands/cluster_commands_common.h" -#include "mongo/s/commands/sharded_command_processing.h" -#include "mongo/s/grid.h" -#include "mongo/util/log.h" - -namespace mongo { - -RunOnAllShardsCommand::RunOnAllShardsCommand(const char* name, - const char* oldName, - bool useShardConn, - bool implicitCreateDb) - : Command(name, false, oldName), - _useShardConn(useShardConn), - _implicitCreateDb(implicitCreateDb) {} - -void RunOnAllShardsCommand::aggregateResults(const std::vector<ShardAndReply>& results, - BSONObjBuilder& output) {} - -BSONObj RunOnAllShardsCommand::specialErrorHandler(const std::string& server, - const std::string& db, - const BSONObj& cmdObj, - const BSONObj& originalResult) const { - return originalResult; -} - -void RunOnAllShardsCommand::getShardIds(OperationContext* opCtx, - const std::string& db, - BSONObj& cmdObj, - std::vector<ShardId>& shardIds) { - grid.shardRegistry()->getAllShardIds(&shardIds); -} - -bool RunOnAllShardsCommand::run(OperationContext* opCtx, - const std::string& dbName, - BSONObj& cmdObj, - int options, - std::string& errmsg, - BSONObjBuilder& output) { - LOG(1) << "RunOnAllShardsCommand db: " << dbName << " cmd:" << redact(cmdObj); - - if (_implicitCreateDb) { - uassertStatusOK(createShardDatabase(opCtx, dbName)); - } - - std::vector<ShardId> shardIds; - getShardIds(opCtx, dbName, cmdObj, shardIds); - - std::list<std::shared_ptr<Future::CommandResult>> futures; - for (const ShardId& shardId : shardIds) { - const auto shardStatus = grid.shardRegistry()->getShard(opCtx, shardId); - if (!shardStatus.isOK()) { - continue; - } - - futures.push_back(Future::spawnCommand(shardStatus.getValue()->getConnString().toString(), - dbName, - cmdObj, - 0, - NULL, - _useShardConn)); - } - - std::vector<ShardAndReply> results; - BSONObjBuilder subobj(output.subobjStart("raw")); - BSONObjBuilder errors; - int commonErrCode = -1; - - std::list<std::shared_ptr<Future::CommandResult>>::iterator futuresit; - std::vector<ShardId>::const_iterator shardIdsIt; - - BSONElement wcErrorElem; - ShardId wcErrorShardId; - bool hasWCError = false; - - // We iterate over the set of shard ids and their corresponding futures in parallel. - // TODO: replace with zip iterator if we ever decide to use one from Boost or elsewhere - for (futuresit = futures.begin(), shardIdsIt = shardIds.cbegin(); - futuresit != futures.end() && shardIdsIt != shardIds.end(); - ++futuresit, ++shardIdsIt) { - std::shared_ptr<Future::CommandResult> res = *futuresit; - - if (res->join(opCtx)) { - // success :) - BSONObj result = res->result(); - results.emplace_back(shardIdsIt->toString(), result); - subobj.append(res->getServer(), result); - - if (!hasWCError) { - if ((wcErrorElem = result["writeConcernError"])) { - wcErrorShardId = *shardIdsIt; - hasWCError = true; - } - } - continue; - } - - BSONObj result = res->result(); - - if (!hasWCError) { - if ((wcErrorElem = result["writeConcernError"])) { - wcErrorShardId = *shardIdsIt; - hasWCError = true; - } - } - - if (result["errmsg"].type() || result["code"].numberInt() != 0) { - result = specialErrorHandler(res->getServer(), dbName, cmdObj, result); - - BSONElement errmsgObj = result["errmsg"]; - if (errmsgObj.eoo() || errmsgObj.String().empty()) { - // it was fixed! - results.emplace_back(shardIdsIt->toString(), result); - subobj.append(res->getServer(), result); - continue; - } - } - - // Handle "errmsg". - if (!result["errmsg"].eoo()) { - errors.appendAs(result["errmsg"], res->getServer()); - } else { - // Can happen if message is empty, for some reason - errors.append(res->getServer(), - str::stream() << "result without error message returned : " << result); - } - - // Handle "code". - int errCode = result["code"].numberInt(); - if (commonErrCode == -1) { - commonErrCode = errCode; - } else if (commonErrCode != errCode) { - commonErrCode = 0; - } - results.emplace_back(shardIdsIt->toString(), result); - subobj.append(res->getServer(), result); - } - - subobj.done(); - - if (hasWCError) { - appendWriteConcernErrorToCmdResponse(wcErrorShardId, wcErrorElem, output); - } - - BSONObj errobj = errors.done(); - - if (!errobj.isEmpty()) { - errmsg = errobj.toString(); - - // If every error has a code, and the code for all errors is the same, then add - // a top-level field "code" with this value to the output object. - if (commonErrCode > 0) { - output.append("code", commonErrCode); - } - - return false; - } - - aggregateResults(results, output); - return true; -} -} diff --git a/src/mongo/s/commands/run_on_all_shards_cmd.h b/src/mongo/s/commands/run_on_all_shards_cmd.h deleted file mode 100644 index a01c133b987..00000000000 --- a/src/mongo/s/commands/run_on_all_shards_cmd.h +++ /dev/null @@ -1,101 +0,0 @@ -/** -* Copyright (C) 2015 10gen Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects -* for all of the code used other than as permitted herein. If you modify -* file(s) with this exception, you may extend this exception to your -* version of the file(s), but you are not obligated to do so. If you do not -* wish to do so, delete this exception statement from your version. If you -* delete this exception statement from all source files in the program, -* then also delete it in the license file. -*/ - -#include <string> -#include <tuple> -#include <vector> - -#include "mongo/base/string_data.h" -#include "mongo/db/commands.h" -#include "mongo/s/client/shard.h" - -namespace mongo { - -class BSONObj; -class BSONObjBuilder; -class OperationContext; - -/** - * Logic for commands that simply map out to all shards then fold the results into - * a single response. - * - * All shards are contacted in parallel. - * - * When extending, don't override run() - but rather aggregateResults(). If you need - * to implement some kind of fall back logic for multiversion clusters, - * override specialErrorHandler(). - */ -class RunOnAllShardsCommand : public Command { -public: - RunOnAllShardsCommand(const char* name, - const char* oldName = NULL, - bool useShardConn = false, - bool implicitCreateDb = false); - - bool slaveOk() const override { - return true; - } - bool adminOnly() const override { - return false; - } - - // The StringData contains the shard ident. - // This can be used to create an instance of Shard - using ShardAndReply = std::tuple<StringData, BSONObj>; - - virtual void aggregateResults(const std::vector<ShardAndReply>& results, - BSONObjBuilder& output); - - // The default implementation is the identity function. - virtual BSONObj specialErrorHandler(const std::string& server, - const std::string& db, - const BSONObj& cmdObj, - const BSONObj& originalResult) const; - - // The default implementation uses all shards. - virtual void getShardIds(OperationContext* opCtx, - const std::string& db, - BSONObj& cmdObj, - std::vector<ShardId>& shardIds); - - bool run(OperationContext* opCtx, - const std::string& db, - BSONObj& cmdObj, - int options, - std::string& errmsg, - BSONObjBuilder& output) final; - -private: - // Use ShardConnection as opposed to ScopedDbConnection - const bool _useShardConn; - - // Whether the requested database should be created implicitly - const bool _implicitCreateDb; -}; - -} // namespace mongo diff --git a/src/mongo/s/commands/scatter_gather_from_shards.cpp b/src/mongo/s/commands/scatter_gather_from_shards.cpp new file mode 100644 index 00000000000..ac331ceaf5d --- /dev/null +++ b/src/mongo/s/commands/scatter_gather_from_shards.cpp @@ -0,0 +1,166 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand + +#include "mongo/platform/basic.h" + +#include "mongo/s/commands/scatter_gather_from_shards.h" + +#include <list> +#include <set> + +#include "mongo/db/jsobj.h" +#include "mongo/executor/task_executor_pool.h" +#include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/rpc/metadata/server_selection_metadata.h" +#include "mongo/s/catalog_cache.h" +#include "mongo/s/client/shard.h" +#include "mongo/s/client/shard_registry.h" +#include "mongo/s/commands/sharded_command_processing.h" +#include "mongo/s/grid.h" +#include "mongo/util/log.h" + +namespace mongo { + +using ShardAndReply = std::tuple<ShardId, BSONObj>; + +StatusWith<std::vector<ShardAndReply>> gatherResults( + OperationContext* opCtx, + const std::string& dbName, + const BSONObj& cmdObj, + int options, + const std::vector<AsyncRequestsSender::Request>& requests, + BSONObjBuilder* output) { + // Extract the readPreference from the command. + + rpc::ServerSelectionMetadata ssm; + BSONObjBuilder unusedCmdBob; + BSONObjBuilder upconvertedMetadataBob; + uassertStatusOK(rpc::ServerSelectionMetadata::upconvert( + cmdObj, options, &unusedCmdBob, &upconvertedMetadataBob)); + auto upconvertedMetadata = upconvertedMetadataBob.obj(); + auto ssmElem = upconvertedMetadata.getField(rpc::ServerSelectionMetadata::fieldName()); + if (!ssmElem.eoo()) { + ssm = uassertStatusOK(rpc::ServerSelectionMetadata::readFromMetadata(ssmElem)); + } + auto readPref = ssm.getReadPreference(); + + // Send the requests. + + AsyncRequestsSender ars( + opCtx, + Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), + dbName, + requests, + readPref ? *readPref : ReadPreferenceSetting(ReadPreference::PrimaryOnly, TagSet())); + + // Get the responses. + + std::vector<ShardAndReply> results; // Stores results by ShardId + BSONObjBuilder subobj(output->subobjStart("raw")); // Stores results by ConnectionString + BSONObjBuilder errors; // Stores errors by ConnectionString + int commonErrCode = -1; // Stores the overall error code + + BSONElement wcErrorElem; + ShardId wcErrorShardId; + bool hasWCError = false; + + while (!ars.done()) { + auto response = ars.next(); + const auto swShard = Grid::get(opCtx)->shardRegistry()->getShard(opCtx, response.shardId); + if (!swShard.isOK()) { + output->resetToEmpty(); + return swShard.getStatus(); + } + const auto shard = std::move(swShard.getValue()); + + auto status = response.swResponse.getStatus(); + if (status.isOK()) { + // We successfully received a response. + + status = getStatusFromCommandResult(response.swResponse.getValue().data); + if (ErrorCodes::isStaleShardingError(status.code())) { + // Do not report any raw results if we fail to establish a shardVersion. + output->resetToEmpty(); + return status; + } + + auto result = std::move(response.swResponse.getValue().data); + if (!hasWCError) { + if ((wcErrorElem = result["writeConcernError"])) { + wcErrorShardId = response.shardId; + hasWCError = true; + } + } + + if (status.isOK()) { + // The command status was OK. + subobj.append(shard->getConnString().toString(), result); + results.emplace_back(std::move(response.shardId), std::move(result)); + continue; + } + } + + // Either we failed to get a response, or the command had a non-OK status. + + // Convert the error status back into the format of a command result. + BSONObjBuilder resultBob; + Command::appendCommandStatus(resultBob, status); + auto result = resultBob.obj(); + + // Update the data structures that store the results. + errors.append(shard->getConnString().toString(), status.reason()); + if (commonErrCode == -1) { + commonErrCode = status.code(); + } else if (commonErrCode != status.code()) { + commonErrCode = 0; + } + subobj.append(shard->getConnString().toString(), result); + results.emplace_back(std::move(response.shardId), std::move(result)); + } + + subobj.done(); + + if (hasWCError) { + appendWriteConcernErrorToCmdResponse(wcErrorShardId, wcErrorElem, *output); + } + + BSONObj errobj = errors.done(); + if (!errobj.isEmpty()) { + // If code for all errors is the same, then report the common error code. + if (commonErrCode > 0) { + return {ErrorCodes::fromInt(commonErrCode), errobj.toString()}; + } + return {ErrorCodes::OperationFailed, errobj.toString()}; + } + + return results; +} + +} // namespace mongo diff --git a/src/mongo/s/commands/scatter_gather_from_shards.h b/src/mongo/s/commands/scatter_gather_from_shards.h new file mode 100644 index 00000000000..ebca85f01a3 --- /dev/null +++ b/src/mongo/s/commands/scatter_gather_from_shards.h @@ -0,0 +1,60 @@ +/** +* Copyright (C) 2015 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +* +* As a special exception, the copyright holders give permission to link the +* code of portions of this program with the OpenSSL library under certain +* conditions as described in each individual source file and distribute +* linked combinations including the program with the OpenSSL library. You +* must comply with the GNU Affero General Public License in all respects +* for all of the code used other than as permitted herein. If you modify +* file(s) with this exception, you may extend this exception to your +* version of the file(s), but you are not obligated to do so. If you do not +* wish to do so, delete this exception statement from your version. If you +* delete this exception statement from all source files in the program, +* then also delete it in the license file. +*/ + +#include <string> +#include <tuple> +#include <vector> + +#include "mongo/base/string_data.h" +#include "mongo/db/commands.h" +#include "mongo/s/async_requests_sender.h" +#include "mongo/s/client/shard.h" + +namespace mongo { + +class BSONObj; +class BSONObjBuilder; +class OperationContext; + +using ShardAndReply = std::tuple<ShardId, BSONObj>; + +/** + * Logic for commands that simply map out to all shards then fold the results into + * a single response. + * + * All shards are contacted in parallel. + */ +StatusWith<std::vector<ShardAndReply>> gatherResults( + OperationContext* opCtx, + const std::string& dbName, + const BSONObj& cmdObj, + int options, + const std::vector<AsyncRequestsSender::Request>& requests, + BSONObjBuilder* output); + +} // namespace mongo |