summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEsha Maharishi <esha.maharishi@mongodb.com>2017-03-31 17:20:07 -0400
committerEsha Maharishi <esha.maharishi@mongodb.com>2017-04-06 13:29:56 -0400
commitc8e0e1668b25fde6e5980a6431fc7db24411c109 (patch)
treef138e8a1fa2b99cab64461fd450ef8dba4b066dd
parent366b39a143ec17a65e8ccc08f1883e4c92dd669e (diff)
downloadmongo-c8e0e1668b25fde6e5980a6431fc7db24411c109.tar.gz
SERVER-28165 make RunOnAllShardsCommand use ARS instead of Future::spawnCommand
-rw-r--r--jstests/sharding/stats.js17
-rw-r--r--src/mongo/db/catalog/coll_mod.cpp2
-rw-r--r--src/mongo/db/commands/create_indexes.cpp3
-rw-r--r--src/mongo/s/commands/SConscript2
-rw-r--r--src/mongo/s/commands/cluster_current_op.cpp43
-rw-r--r--src/mongo/s/commands/cluster_db_stats_cmd.cpp45
-rw-r--r--src/mongo/s/commands/cluster_repair_database_cmd.cpp34
-rw-r--r--src/mongo/s/commands/commands_public.cpp199
-rw-r--r--src/mongo/s/commands/run_on_all_shards_cmd.cpp202
-rw-r--r--src/mongo/s/commands/run_on_all_shards_cmd.h101
-rw-r--r--src/mongo/s/commands/scatter_gather_from_shards.cpp166
-rw-r--r--src/mongo/s/commands/scatter_gather_from_shards.h60
12 files changed, 431 insertions, 443 deletions
diff --git a/jstests/sharding/stats.js b/jstests/sharding/stats.js
index 9dc521cc298..628abb2d895 100644
--- a/jstests/sharding/stats.js
+++ b/jstests/sharding/stats.js
@@ -4,9 +4,6 @@
s.adminCommand({enablesharding: "test"});
- a = s._connections[0].getDB("test");
- b = s._connections[1].getDB("test");
-
db = s.getDB("test");
s.ensurePrimaryShard('test', 'shard0001');
@@ -43,6 +40,9 @@
bulk.insert({_id: i});
assert.writeOK(bulk.execute());
+ a = s.shard0.getDB("test");
+ b = s.shard1.getDB("test");
+
x = assert.commandWorked(db.foo.stats());
assert.eq(N, x.count, "coll total count expected");
assert.eq(db.foo.count(), x.count, "coll total count match");
@@ -66,15 +66,12 @@
x = assert.commandWorked(db.stats());
- // dbstats uses Future::CommandResult so raw output uses connection strings not shard names
- shards = Object.keySet(x.raw);
-
assert.eq(N + (a_extras + b_extras), x.objects, "db total count expected");
assert.eq(2, numKeys(x.raw), "db shard num");
- assert.eq((N / 2) + a_extras, x.raw[shards[0]].objects, "db count on shard0000 expected");
- assert.eq((N / 2) + b_extras, x.raw[shards[1]].objects, "db count on shard0001 expected");
- assert.eq(a.stats().objects, x.raw[shards[0]].objects, "db count on shard0000 match");
- assert.eq(b.stats().objects, x.raw[shards[1]].objects, "db count on shard0001 match");
+ assert.eq((N / 2) + a_extras, x.raw[s.shard0.name].objects, "db count on shard0000 expected");
+ assert.eq((N / 2) + b_extras, x.raw[s.shard1.name].objects, "db count on shard0001 expected");
+ assert.eq(a.stats().objects, x.raw[s.shard0.name].objects, "db count on shard0000 match");
+ assert.eq(b.stats().objects, x.raw[s.shard1.name].objects, "db count on shard0001 match");
/* Test db.stat() and db.collection.stat() scaling */
diff --git a/src/mongo/db/catalog/coll_mod.cpp b/src/mongo/db/catalog/coll_mod.cpp
index 661cb5b1ab2..93abe72214c 100644
--- a/src/mongo/db/catalog/coll_mod.cpp
+++ b/src/mongo/db/catalog/coll_mod.cpp
@@ -76,6 +76,8 @@ StatusWith<CollModRequest> parseCollModRequest(OperationContext* opCtx,
// no-op ignore top-level fields prefixed with $. They are for the command processor
} else if (QueryRequest::cmdOptionMaxTimeMS == e.fieldNameStringData()) {
// no-op
+ } else if (str::equals("shardVersion", e.fieldName())) {
+ // no-op
} else if (str::equals("index", e.fieldName()) && !isView) {
BSONObj indexObj = e.Obj();
StringData indexName;
diff --git a/src/mongo/db/commands/create_indexes.cpp b/src/mongo/db/commands/create_indexes.cpp
index 2e9bd0ceaed..189ec8e78f1 100644
--- a/src/mongo/db/commands/create_indexes.cpp
+++ b/src/mongo/db/commands/create_indexes.cpp
@@ -68,6 +68,7 @@ const StringData kIndexesFieldName = "indexes"_sd;
const StringData kCommandName = "createIndexes"_sd;
const StringData kWriteConcern = "writeConcern"_sd;
const StringData kMaxTimeMS = "maxTimeMS"_sd;
+const StringData kShardVersion = "shardVersion"_sd;
/**
* Parses the index specifications from 'cmdObj', validates them, and returns equivalent index
@@ -125,7 +126,7 @@ StatusWith<std::vector<BSONObj>> parseAndValidateIndexSpecs(
hasIndexesField = true;
} else if (kCommandName == cmdElemFieldName || kWriteConcern == cmdElemFieldName ||
- kMaxTimeMS == cmdElemFieldName) {
+ kMaxTimeMS == cmdElemFieldName || kShardVersion == cmdElemFieldName) {
continue;
} else {
return {ErrorCodes::BadValue,
diff --git a/src/mongo/s/commands/SConscript b/src/mongo/s/commands/SConscript
index b78738edf65..b7af6271a4e 100644
--- a/src/mongo/s/commands/SConscript
+++ b/src/mongo/s/commands/SConscript
@@ -76,7 +76,7 @@ env.Library(
'cluster_write.cpp',
'cluster_write_cmd.cpp',
'commands_public.cpp',
- 'run_on_all_shards_cmd.cpp',
+ 'scatter_gather_from_shards.cpp',
'sharded_command_processing.cpp',
'strategy.cpp',
],
diff --git a/src/mongo/s/commands/cluster_current_op.cpp b/src/mongo/s/commands/cluster_current_op.cpp
index c9a307c4c39..ecf1c1fb30b 100644
--- a/src/mongo/s/commands/cluster_current_op.cpp
+++ b/src/mongo/s/commands/cluster_current_op.cpp
@@ -38,8 +38,10 @@
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/commands.h"
#include "mongo/db/jsobj.h"
-#include "mongo/s/commands/run_on_all_shards_cmd.h"
+#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/commands/scatter_gather_from_shards.h"
#include "mongo/s/commands/strategy.h"
+#include "mongo/s/grid.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
@@ -51,12 +53,15 @@ const char kOpIdFieldName[] = "opid";
const char kClientFieldName[] = "client";
// awkward underscores used to make this visually distinct from kClientFieldName
const char kClient_S_FieldName[] = "client_s";
-
const char kCommandName[] = "currentOp";
-class ClusterCurrentOpCommand : public RunOnAllShardsCommand {
+class ClusterCurrentOpCommand : public Command {
public:
- ClusterCurrentOpCommand() : RunOnAllShardsCommand(kCommandName) {}
+ ClusterCurrentOpCommand() : Command(kCommandName) {}
+
+ bool slaveOk() const override {
+ return true;
+ }
bool adminOnly() const final {
return true;
@@ -75,7 +80,33 @@ public:
return false;
}
- void aggregateResults(const std::vector<ShardAndReply>& results, BSONObjBuilder& output) final {
+ using ShardAndReply = std::tuple<ShardId, BSONObj>;
+
+ bool run(OperationContext* opCtx,
+ const std::string& dbName,
+ BSONObj& cmdObj,
+ int options,
+ std::string& errmsg,
+ BSONObjBuilder& output) override {
+
+ // Target all shards.
+ std::vector<AsyncRequestsSender::Request> requests;
+ std::vector<ShardId> shardIds;
+ Grid::get(opCtx)->shardRegistry()->getAllShardIds(&shardIds);
+ for (auto&& shardId : shardIds) {
+ requests.emplace_back(std::move(shardId), cmdObj);
+ }
+
+ auto swResults = gatherResults(opCtx, dbName, cmdObj, options, requests, &output);
+ if (!swResults.isOK()) {
+ return appendCommandStatus(output, swResults.getStatus());
+ }
+
+ aggregateResults(std::move(swResults.getValue()), output);
+ return true;
+ }
+
+ void aggregateResults(const std::vector<ShardAndReply>& results, BSONObjBuilder& output) {
// Each shard responds with a document containing an array of subdocuments.
// Each subdocument represents an operation running on that shard.
// We merge the responses into a single document containg an array
@@ -99,7 +130,7 @@ public:
BSONArrayBuilder aggregatedOpsBab(output.subarrayStart(kInprogFieldName));
for (auto&& shardResponse : results) {
- StringData shardName;
+ ShardId shardName;
BSONObj shardResponseObj;
std::tie(shardName, shardResponseObj) = shardResponse;
diff --git a/src/mongo/s/commands/cluster_db_stats_cmd.cpp b/src/mongo/s/commands/cluster_db_stats_cmd.cpp
index 1546fe4482e..16151cf0f1c 100644
--- a/src/mongo/s/commands/cluster_db_stats_cmd.cpp
+++ b/src/mongo/s/commands/cluster_db_stats_cmd.cpp
@@ -30,16 +30,27 @@
#include <vector>
-#include "mongo/s/commands/run_on_all_shards_cmd.h"
+#include "mongo/s/commands/scatter_gather_from_shards.h"
+
+#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/grid.h"
namespace mongo {
namespace {
using std::vector;
-class DBStatsCmd : public RunOnAllShardsCommand {
+class DBStatsCmd : public Command {
public:
- DBStatsCmd() : RunOnAllShardsCommand("dbStats", "dbstats") {}
+ DBStatsCmd() : Command("dbStats", false, "dbstats") {}
+
+ bool slaveOk() const override {
+ return true;
+ }
+ bool adminOnly() const override {
+ return false;
+ }
+
virtual void addRequiredPrivileges(const std::string& dbname,
const BSONObj& cmdObj,
std::vector<Privilege>* out) {
@@ -52,7 +63,33 @@ public:
return false;
}
- virtual void aggregateResults(const vector<ShardAndReply>& results, BSONObjBuilder& output) {
+ using ShardAndReply = std::tuple<ShardId, BSONObj>;
+
+ bool run(OperationContext* opCtx,
+ const std::string& dbName,
+ BSONObj& cmdObj,
+ int options,
+ std::string& errmsg,
+ BSONObjBuilder& output) override {
+
+ // Target all shards.
+ std::vector<AsyncRequestsSender::Request> requests;
+ std::vector<ShardId> shardIds;
+ Grid::get(opCtx)->shardRegistry()->getAllShardIds(&shardIds);
+ for (auto&& shardId : shardIds) {
+ requests.emplace_back(std::move(shardId), cmdObj);
+ }
+
+ auto swResults = gatherResults(opCtx, dbName, cmdObj, options, requests, &output);
+ if (!swResults.isOK()) {
+ return appendCommandStatus(output, swResults.getStatus());
+ }
+
+ aggregateResults(std::move(swResults.getValue()), output);
+ return true;
+ }
+
+ void aggregateResults(const vector<ShardAndReply>& results, BSONObjBuilder& output) {
long long objects = 0;
long long unscaledDataSize = 0;
long long dataSize = 0;
diff --git a/src/mongo/s/commands/cluster_repair_database_cmd.cpp b/src/mongo/s/commands/cluster_repair_database_cmd.cpp
index 9affce5c598..707d6149846 100644
--- a/src/mongo/s/commands/cluster_repair_database_cmd.cpp
+++ b/src/mongo/s/commands/cluster_repair_database_cmd.cpp
@@ -28,14 +28,24 @@
#include "mongo/platform/basic.h"
-#include "mongo/s/commands/run_on_all_shards_cmd.h"
+#include "mongo/s/commands/scatter_gather_from_shards.h"
+
+#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/grid.h"
namespace mongo {
namespace {
-class ClusterRepairDatabaseCmd : public RunOnAllShardsCommand {
+class ClusterRepairDatabaseCmd : public Command {
public:
- ClusterRepairDatabaseCmd() : RunOnAllShardsCommand("repairDatabase") {}
+ ClusterRepairDatabaseCmd() : Command("repairDatabase") {}
+
+ bool slaveOk() const override {
+ return true;
+ }
+ bool adminOnly() const override {
+ return false;
+ }
virtual void addRequiredPrivileges(const std::string& dbname,
const BSONObj& cmdObj,
@@ -49,6 +59,24 @@ public:
return false;
}
+ bool run(OperationContext* opCtx,
+ const std::string& dbName,
+ BSONObj& cmdObj,
+ int options,
+ std::string& errmsg,
+ BSONObjBuilder& output) override {
+ // Target all shards.
+ std::vector<AsyncRequestsSender::Request> requests;
+ std::vector<ShardId> shardIds;
+ Grid::get(opCtx)->shardRegistry()->getAllShardIds(&shardIds);
+ for (auto&& shardId : shardIds) {
+ requests.emplace_back(std::move(shardId), cmdObj);
+ }
+
+ auto swResults = gatherResults(opCtx, dbName, cmdObj, options, requests, &output);
+ return appendCommandStatus(output, swResults.getStatus());
+ }
+
} clusterRepairDatabaseCmd;
} // namespace
diff --git a/src/mongo/s/commands/commands_public.cpp b/src/mongo/s/commands/commands_public.cpp
index c7c3b6f4273..371782d2e4d 100644
--- a/src/mongo/s/commands/commands_public.cpp
+++ b/src/mongo/s/commands/commands_public.cpp
@@ -61,7 +61,7 @@
#include "mongo/s/commands/cluster_aggregate.h"
#include "mongo/s/commands/cluster_commands_common.h"
#include "mongo/s/commands/cluster_explain.h"
-#include "mongo/s/commands/run_on_all_shards_cmd.h"
+#include "mongo/s/commands/scatter_gather_from_shards.h"
#include "mongo/s/commands/sharded_command_processing.h"
#include "mongo/s/grid.h"
#include "mongo/s/query/store_possible_cursor.h"
@@ -85,6 +85,8 @@ using std::vector;
namespace {
+const int kMaxNumStaleVersionRetries = 3;
+
bool cursorCommandPassthrough(OperationContext* opCtx,
StringData dbName,
const ShardId& shardId,
@@ -211,30 +213,90 @@ protected:
}
};
-class AllShardsCollectionCommand : public RunOnAllShardsCommand {
+class AllShardsCollectionCommand : public Command {
protected:
- AllShardsCollectionCommand(const char* n,
+ AllShardsCollectionCommand(const char* name,
const char* oldname = NULL,
- bool useShardConn = false,
bool implicitCreateDb = false)
- : RunOnAllShardsCommand(n, oldname, useShardConn, implicitCreateDb) {}
+ : Command(name, false, oldname), _implicitCreateDb(implicitCreateDb) {}
+
+ bool slaveOk() const override {
+ return true;
+ }
+ bool adminOnly() const override {
+ return false;
+ }
+
+ BSONObj appendShardVersion(BSONObj cmdObj, ChunkVersion version) {
+ BSONObjBuilder cmdWithVersionBob;
+ cmdWithVersionBob.appendElements(cmdObj);
+ version.appendForCommands(&cmdWithVersionBob);
+ return cmdWithVersionBob.obj();
+ }
- void getShardIds(OperationContext* opCtx,
- const string& dbName,
- BSONObj& cmdObj,
- vector<ShardId>& shardIds) override {
+ bool run(OperationContext* opCtx,
+ const string& dbName,
+ BSONObj& cmdObj,
+ int options,
+ std::string& errmsg,
+ BSONObjBuilder& output) override {
const NamespaceString nss(parseNsCollectionRequired(dbName, cmdObj));
+ LOG(1) << "AllShardsCollectionCommand: " << nss << " cmd:" << redact(cmdObj);
- const auto routingInfo =
- uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss));
- if (routingInfo.cm()) {
- // If it's a sharded collection, send it to all shards
- Grid::get(opCtx)->shardRegistry()->getAllShardIds(&shardIds);
- } else {
- // Otherwise just send it to the primary shard for the database
- shardIds.push_back(routingInfo.primaryId());
+ if (_implicitCreateDb) {
+ uassertStatusOK(createShardDatabase(opCtx, dbName));
+ }
+
+ Status status = Status::OK();
+ int numAttempts = 0;
+ while (numAttempts < kMaxNumStaleVersionRetries) {
+ status = Status::OK();
+ output.resetToEmpty();
+
+ // Target only shards that own data for the collection, and append the shardVersion for
+ // the collection to the command.
+ std::vector<AsyncRequestsSender::Request> requests;
+ auto routingInfo = uassertStatusOK(
+ Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss));
+ if (routingInfo.cm()) {
+ vector<ShardId> shardIds;
+ Grid::get(opCtx)->shardRegistry()->getAllShardIds(&shardIds);
+ for (const ShardId& shardId : shardIds) {
+ requests.emplace_back(
+ std::move(shardId),
+ appendShardVersion(cmdObj, routingInfo.cm()->getVersion(shardId)));
+ }
+ } else {
+ if (routingInfo.primary()->isConfig()) {
+ // Don't append shard version info when contacting the config servers.
+ requests.emplace_back(routingInfo.primaryId(), cmdObj);
+ } else {
+ requests.emplace_back(routingInfo.primaryId(),
+ appendShardVersion(cmdObj, ChunkVersion::UNSHARDED()));
+ }
+ }
+
+ auto swResults = gatherResults(opCtx, dbName, cmdObj, options, requests, &output);
+ if (ErrorCodes::isStaleShardingError(swResults.getStatus().code())) {
+ Grid::get(opCtx)->catalogCache()->onStaleConfigError(std::move(routingInfo));
+ ++numAttempts;
+ continue;
+ }
+ status = swResults.getStatus();
+ break;
+ }
+
+ // We don't uassertStatusOK(), because that causes 'output' to be cleared, but we want to
+ // report the raw results even if there was an error.
+ if (!status.isOK()) {
+ return appendCommandStatus(output, status);
}
+ return true;
}
+
+private:
+ // Whether the requested database should be created implicitly
+ const bool _implicitCreateDb;
};
class NotAllowedOnShardedCollectionCmd : public PublicGridCommand {
@@ -281,100 +343,7 @@ public:
CreateIndexesCmd()
: AllShardsCollectionCommand("createIndexes",
NULL, /* oldName */
- true /* use ShardConnection */,
- true /* implicit create db */) {
- // createIndexes command should use ShardConnection so the getLastError would
- // be able to properly enforce the write concern (via the saveGLEStats callback).
- }
-
- /**
- * the createIndexes command doesn't require the 'ns' field to be populated
- * so we make sure its here as its needed for the system.indexes insert
- */
- BSONObj fixSpec(const NamespaceString& ns, const BSONObj& original) const {
- if (original["ns"].type() == String)
- return original;
- BSONObjBuilder bb;
- bb.appendElements(original);
- bb.append("ns", ns.toString());
- return bb.obj();
- }
-
- /**
- * @return equivalent of gle
- */
- BSONObj createIndexLegacy(const string& server,
- const NamespaceString& nss,
- const BSONObj& spec) const {
- try {
- ScopedDbConnection conn(server);
- conn->insert(nss.getSystemIndexesCollection(), spec);
- BSONObj gle = conn->getLastErrorDetailed(nss.db().toString());
- conn.done();
- return gle;
- } catch (DBException& e) {
- BSONObjBuilder b;
- b.append("errmsg", e.toString());
- b.append("code", e.getCode());
- b.append("codeName", ErrorCodes::errorString(ErrorCodes::fromInt(e.getCode())));
- return b.obj();
- }
- }
-
- virtual BSONObj specialErrorHandler(const string& server,
- const string& dbName,
- const BSONObj& cmdObj,
- const BSONObj& originalResult) const {
- string errmsg = originalResult["errmsg"];
- if (errmsg.find("no such cmd") == string::npos) {
- // cannot use codes as 2.4 didn't have a code for this
- return originalResult;
- }
-
- // we need to down convert
-
- NamespaceString nss(dbName, cmdObj["createIndexes"].String());
-
- if (cmdObj["indexes"].type() != Array)
- return originalResult;
-
- BSONObjBuilder newResult;
- newResult.append("note", "downgraded");
- newResult.append("sentTo", server);
-
- BSONArrayBuilder individualResults;
-
- bool ok = true;
-
- BSONObjIterator indexIterator(cmdObj["indexes"].Obj());
- while (indexIterator.more()) {
- BSONObj spec = indexIterator.next().Obj();
- spec = fixSpec(nss, spec);
-
- BSONObj gle = createIndexLegacy(server, nss, spec);
-
- individualResults.append(BSON("spec" << spec << "gle" << gle));
-
- BSONElement e = gle["errmsg"];
- if (e.type() == String && e.String().size() > 0) {
- ok = false;
- newResult.appendAs(e, "errmsg");
- break;
- }
-
- e = gle["err"];
- if (e.type() == String && e.String().size() > 0) {
- ok = false;
- newResult.appendAs(e, "errmsg");
- break;
- }
- }
-
- newResult.append("eachIndex", individualResults.arr());
-
- newResult.append("ok", ok ? 1 : 0);
- return newResult.obj();
- }
+ true /* implicit create db */) {}
virtual void addRequiredPrivileges(const std::string& dbname,
const BSONObj& cmdObj,
@@ -1477,11 +1446,11 @@ public:
verify(0);
}
- // We could support arbitrary shard keys by sending commands to all shards but I don't think
- // we should
+ // We could support arbitrary shard keys by sending commands to all shards but I don't
+ // think we should
errmsg =
- "GridFS fs.chunks collection must be sharded on either {files_id:1} or {files_id:1, "
- "n:1}";
+ "GridFS fs.chunks collection must be sharded on either {files_id:1} or "
+ "{files_id:1, n:1}";
return false;
}
} fileMD5Cmd;
diff --git a/src/mongo/s/commands/run_on_all_shards_cmd.cpp b/src/mongo/s/commands/run_on_all_shards_cmd.cpp
deleted file mode 100644
index 881b7d654ab..00000000000
--- a/src/mongo/s/commands/run_on_all_shards_cmd.cpp
+++ /dev/null
@@ -1,202 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects
- * for all of the code used other than as permitted herein. If you modify
- * file(s) with this exception, you may extend this exception to your
- * version of the file(s), but you are not obligated to do so. If you do not
- * wish to do so, delete this exception statement from your version. If you
- * delete this exception statement from all source files in the program,
- * then also delete it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/s/commands/run_on_all_shards_cmd.h"
-
-#include <list>
-#include <set>
-
-#include "mongo/db/jsobj.h"
-#include "mongo/s/catalog_cache.h"
-#include "mongo/s/client/shard.h"
-#include "mongo/s/client/shard_registry.h"
-#include "mongo/s/commands/cluster_commands_common.h"
-#include "mongo/s/commands/sharded_command_processing.h"
-#include "mongo/s/grid.h"
-#include "mongo/util/log.h"
-
-namespace mongo {
-
-RunOnAllShardsCommand::RunOnAllShardsCommand(const char* name,
- const char* oldName,
- bool useShardConn,
- bool implicitCreateDb)
- : Command(name, false, oldName),
- _useShardConn(useShardConn),
- _implicitCreateDb(implicitCreateDb) {}
-
-void RunOnAllShardsCommand::aggregateResults(const std::vector<ShardAndReply>& results,
- BSONObjBuilder& output) {}
-
-BSONObj RunOnAllShardsCommand::specialErrorHandler(const std::string& server,
- const std::string& db,
- const BSONObj& cmdObj,
- const BSONObj& originalResult) const {
- return originalResult;
-}
-
-void RunOnAllShardsCommand::getShardIds(OperationContext* opCtx,
- const std::string& db,
- BSONObj& cmdObj,
- std::vector<ShardId>& shardIds) {
- grid.shardRegistry()->getAllShardIds(&shardIds);
-}
-
-bool RunOnAllShardsCommand::run(OperationContext* opCtx,
- const std::string& dbName,
- BSONObj& cmdObj,
- int options,
- std::string& errmsg,
- BSONObjBuilder& output) {
- LOG(1) << "RunOnAllShardsCommand db: " << dbName << " cmd:" << redact(cmdObj);
-
- if (_implicitCreateDb) {
- uassertStatusOK(createShardDatabase(opCtx, dbName));
- }
-
- std::vector<ShardId> shardIds;
- getShardIds(opCtx, dbName, cmdObj, shardIds);
-
- std::list<std::shared_ptr<Future::CommandResult>> futures;
- for (const ShardId& shardId : shardIds) {
- const auto shardStatus = grid.shardRegistry()->getShard(opCtx, shardId);
- if (!shardStatus.isOK()) {
- continue;
- }
-
- futures.push_back(Future::spawnCommand(shardStatus.getValue()->getConnString().toString(),
- dbName,
- cmdObj,
- 0,
- NULL,
- _useShardConn));
- }
-
- std::vector<ShardAndReply> results;
- BSONObjBuilder subobj(output.subobjStart("raw"));
- BSONObjBuilder errors;
- int commonErrCode = -1;
-
- std::list<std::shared_ptr<Future::CommandResult>>::iterator futuresit;
- std::vector<ShardId>::const_iterator shardIdsIt;
-
- BSONElement wcErrorElem;
- ShardId wcErrorShardId;
- bool hasWCError = false;
-
- // We iterate over the set of shard ids and their corresponding futures in parallel.
- // TODO: replace with zip iterator if we ever decide to use one from Boost or elsewhere
- for (futuresit = futures.begin(), shardIdsIt = shardIds.cbegin();
- futuresit != futures.end() && shardIdsIt != shardIds.end();
- ++futuresit, ++shardIdsIt) {
- std::shared_ptr<Future::CommandResult> res = *futuresit;
-
- if (res->join(opCtx)) {
- // success :)
- BSONObj result = res->result();
- results.emplace_back(shardIdsIt->toString(), result);
- subobj.append(res->getServer(), result);
-
- if (!hasWCError) {
- if ((wcErrorElem = result["writeConcernError"])) {
- wcErrorShardId = *shardIdsIt;
- hasWCError = true;
- }
- }
- continue;
- }
-
- BSONObj result = res->result();
-
- if (!hasWCError) {
- if ((wcErrorElem = result["writeConcernError"])) {
- wcErrorShardId = *shardIdsIt;
- hasWCError = true;
- }
- }
-
- if (result["errmsg"].type() || result["code"].numberInt() != 0) {
- result = specialErrorHandler(res->getServer(), dbName, cmdObj, result);
-
- BSONElement errmsgObj = result["errmsg"];
- if (errmsgObj.eoo() || errmsgObj.String().empty()) {
- // it was fixed!
- results.emplace_back(shardIdsIt->toString(), result);
- subobj.append(res->getServer(), result);
- continue;
- }
- }
-
- // Handle "errmsg".
- if (!result["errmsg"].eoo()) {
- errors.appendAs(result["errmsg"], res->getServer());
- } else {
- // Can happen if message is empty, for some reason
- errors.append(res->getServer(),
- str::stream() << "result without error message returned : " << result);
- }
-
- // Handle "code".
- int errCode = result["code"].numberInt();
- if (commonErrCode == -1) {
- commonErrCode = errCode;
- } else if (commonErrCode != errCode) {
- commonErrCode = 0;
- }
- results.emplace_back(shardIdsIt->toString(), result);
- subobj.append(res->getServer(), result);
- }
-
- subobj.done();
-
- if (hasWCError) {
- appendWriteConcernErrorToCmdResponse(wcErrorShardId, wcErrorElem, output);
- }
-
- BSONObj errobj = errors.done();
-
- if (!errobj.isEmpty()) {
- errmsg = errobj.toString();
-
- // If every error has a code, and the code for all errors is the same, then add
- // a top-level field "code" with this value to the output object.
- if (commonErrCode > 0) {
- output.append("code", commonErrCode);
- }
-
- return false;
- }
-
- aggregateResults(results, output);
- return true;
-}
-}
diff --git a/src/mongo/s/commands/run_on_all_shards_cmd.h b/src/mongo/s/commands/run_on_all_shards_cmd.h
deleted file mode 100644
index a01c133b987..00000000000
--- a/src/mongo/s/commands/run_on_all_shards_cmd.h
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
-* Copyright (C) 2015 10gen Inc.
-*
-* This program is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License, version 3,
-* as published by the Free Software Foundation.
-*
-* This program is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see <http://www.gnu.org/licenses/>.
-*
-* As a special exception, the copyright holders give permission to link the
-* code of portions of this program with the OpenSSL library under certain
-* conditions as described in each individual source file and distribute
-* linked combinations including the program with the OpenSSL library. You
-* must comply with the GNU Affero General Public License in all respects
-* for all of the code used other than as permitted herein. If you modify
-* file(s) with this exception, you may extend this exception to your
-* version of the file(s), but you are not obligated to do so. If you do not
-* wish to do so, delete this exception statement from your version. If you
-* delete this exception statement from all source files in the program,
-* then also delete it in the license file.
-*/
-
-#include <string>
-#include <tuple>
-#include <vector>
-
-#include "mongo/base/string_data.h"
-#include "mongo/db/commands.h"
-#include "mongo/s/client/shard.h"
-
-namespace mongo {
-
-class BSONObj;
-class BSONObjBuilder;
-class OperationContext;
-
-/**
- * Logic for commands that simply map out to all shards then fold the results into
- * a single response.
- *
- * All shards are contacted in parallel.
- *
- * When extending, don't override run() - but rather aggregateResults(). If you need
- * to implement some kind of fall back logic for multiversion clusters,
- * override specialErrorHandler().
- */
-class RunOnAllShardsCommand : public Command {
-public:
- RunOnAllShardsCommand(const char* name,
- const char* oldName = NULL,
- bool useShardConn = false,
- bool implicitCreateDb = false);
-
- bool slaveOk() const override {
- return true;
- }
- bool adminOnly() const override {
- return false;
- }
-
- // The StringData contains the shard ident.
- // This can be used to create an instance of Shard
- using ShardAndReply = std::tuple<StringData, BSONObj>;
-
- virtual void aggregateResults(const std::vector<ShardAndReply>& results,
- BSONObjBuilder& output);
-
- // The default implementation is the identity function.
- virtual BSONObj specialErrorHandler(const std::string& server,
- const std::string& db,
- const BSONObj& cmdObj,
- const BSONObj& originalResult) const;
-
- // The default implementation uses all shards.
- virtual void getShardIds(OperationContext* opCtx,
- const std::string& db,
- BSONObj& cmdObj,
- std::vector<ShardId>& shardIds);
-
- bool run(OperationContext* opCtx,
- const std::string& db,
- BSONObj& cmdObj,
- int options,
- std::string& errmsg,
- BSONObjBuilder& output) final;
-
-private:
- // Use ShardConnection as opposed to ScopedDbConnection
- const bool _useShardConn;
-
- // Whether the requested database should be created implicitly
- const bool _implicitCreateDb;
-};
-
-} // namespace mongo
diff --git a/src/mongo/s/commands/scatter_gather_from_shards.cpp b/src/mongo/s/commands/scatter_gather_from_shards.cpp
new file mode 100644
index 00000000000..ac331ceaf5d
--- /dev/null
+++ b/src/mongo/s/commands/scatter_gather_from_shards.cpp
@@ -0,0 +1,166 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/commands/scatter_gather_from_shards.h"
+
+#include <list>
+#include <set>
+
+#include "mongo/db/jsobj.h"
+#include "mongo/executor/task_executor_pool.h"
+#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/rpc/metadata/server_selection_metadata.h"
+#include "mongo/s/catalog_cache.h"
+#include "mongo/s/client/shard.h"
+#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/commands/sharded_command_processing.h"
+#include "mongo/s/grid.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+
+using ShardAndReply = std::tuple<ShardId, BSONObj>;
+
+StatusWith<std::vector<ShardAndReply>> gatherResults(
+ OperationContext* opCtx,
+ const std::string& dbName,
+ const BSONObj& cmdObj,
+ int options,
+ const std::vector<AsyncRequestsSender::Request>& requests,
+ BSONObjBuilder* output) {
+ // Extract the readPreference from the command.
+
+ rpc::ServerSelectionMetadata ssm;
+ BSONObjBuilder unusedCmdBob;
+ BSONObjBuilder upconvertedMetadataBob;
+ uassertStatusOK(rpc::ServerSelectionMetadata::upconvert(
+ cmdObj, options, &unusedCmdBob, &upconvertedMetadataBob));
+ auto upconvertedMetadata = upconvertedMetadataBob.obj();
+ auto ssmElem = upconvertedMetadata.getField(rpc::ServerSelectionMetadata::fieldName());
+ if (!ssmElem.eoo()) {
+ ssm = uassertStatusOK(rpc::ServerSelectionMetadata::readFromMetadata(ssmElem));
+ }
+ auto readPref = ssm.getReadPreference();
+
+ // Send the requests.
+
+ AsyncRequestsSender ars(
+ opCtx,
+ Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ dbName,
+ requests,
+ readPref ? *readPref : ReadPreferenceSetting(ReadPreference::PrimaryOnly, TagSet()));
+
+ // Get the responses.
+
+ std::vector<ShardAndReply> results; // Stores results by ShardId
+ BSONObjBuilder subobj(output->subobjStart("raw")); // Stores results by ConnectionString
+ BSONObjBuilder errors; // Stores errors by ConnectionString
+ int commonErrCode = -1; // Stores the overall error code
+
+ BSONElement wcErrorElem;
+ ShardId wcErrorShardId;
+ bool hasWCError = false;
+
+ while (!ars.done()) {
+ auto response = ars.next();
+ const auto swShard = Grid::get(opCtx)->shardRegistry()->getShard(opCtx, response.shardId);
+ if (!swShard.isOK()) {
+ output->resetToEmpty();
+ return swShard.getStatus();
+ }
+ const auto shard = std::move(swShard.getValue());
+
+ auto status = response.swResponse.getStatus();
+ if (status.isOK()) {
+ // We successfully received a response.
+
+ status = getStatusFromCommandResult(response.swResponse.getValue().data);
+ if (ErrorCodes::isStaleShardingError(status.code())) {
+ // Do not report any raw results if we fail to establish a shardVersion.
+ output->resetToEmpty();
+ return status;
+ }
+
+ auto result = std::move(response.swResponse.getValue().data);
+ if (!hasWCError) {
+ if ((wcErrorElem = result["writeConcernError"])) {
+ wcErrorShardId = response.shardId;
+ hasWCError = true;
+ }
+ }
+
+ if (status.isOK()) {
+ // The command status was OK.
+ subobj.append(shard->getConnString().toString(), result);
+ results.emplace_back(std::move(response.shardId), std::move(result));
+ continue;
+ }
+ }
+
+ // Either we failed to get a response, or the command had a non-OK status.
+
+ // Convert the error status back into the format of a command result.
+ BSONObjBuilder resultBob;
+ Command::appendCommandStatus(resultBob, status);
+ auto result = resultBob.obj();
+
+ // Update the data structures that store the results.
+ errors.append(shard->getConnString().toString(), status.reason());
+ if (commonErrCode == -1) {
+ commonErrCode = status.code();
+ } else if (commonErrCode != status.code()) {
+ commonErrCode = 0;
+ }
+ subobj.append(shard->getConnString().toString(), result);
+ results.emplace_back(std::move(response.shardId), std::move(result));
+ }
+
+ subobj.done();
+
+ if (hasWCError) {
+ appendWriteConcernErrorToCmdResponse(wcErrorShardId, wcErrorElem, *output);
+ }
+
+ BSONObj errobj = errors.done();
+ if (!errobj.isEmpty()) {
+ // If code for all errors is the same, then report the common error code.
+ if (commonErrCode > 0) {
+ return {ErrorCodes::fromInt(commonErrCode), errobj.toString()};
+ }
+ return {ErrorCodes::OperationFailed, errobj.toString()};
+ }
+
+ return results;
+}
+
+} // namespace mongo
diff --git a/src/mongo/s/commands/scatter_gather_from_shards.h b/src/mongo/s/commands/scatter_gather_from_shards.h
new file mode 100644
index 00000000000..ebca85f01a3
--- /dev/null
+++ b/src/mongo/s/commands/scatter_gather_from_shards.h
@@ -0,0 +1,60 @@
+/**
+* Copyright (C) 2015 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*
+* As a special exception, the copyright holders give permission to link the
+* code of portions of this program with the OpenSSL library under certain
+* conditions as described in each individual source file and distribute
+* linked combinations including the program with the OpenSSL library. You
+* must comply with the GNU Affero General Public License in all respects
+* for all of the code used other than as permitted herein. If you modify
+* file(s) with this exception, you may extend this exception to your
+* version of the file(s), but you are not obligated to do so. If you do not
+* wish to do so, delete this exception statement from your version. If you
+* delete this exception statement from all source files in the program,
+* then also delete it in the license file.
+*/
+
+#include <string>
+#include <tuple>
+#include <vector>
+
+#include "mongo/base/string_data.h"
+#include "mongo/db/commands.h"
+#include "mongo/s/async_requests_sender.h"
+#include "mongo/s/client/shard.h"
+
+namespace mongo {
+
+class BSONObj;
+class BSONObjBuilder;
+class OperationContext;
+
+using ShardAndReply = std::tuple<ShardId, BSONObj>;
+
+/**
+ * Logic for commands that simply map out to all shards then fold the results into
+ * a single response.
+ *
+ * All shards are contacted in parallel.
+ */
+StatusWith<std::vector<ShardAndReply>> gatherResults(
+ OperationContext* opCtx,
+ const std::string& dbName,
+ const BSONObj& cmdObj,
+ int options,
+ const std::vector<AsyncRequestsSender::Request>& requests,
+ BSONObjBuilder* output);
+
+} // namespace mongo