summaryrefslogtreecommitdiff
path: root/src/mongo/s/commands/cluster_pipeline_cmd.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/commands/cluster_pipeline_cmd.cpp')
-rw-r--r--src/mongo/s/commands/cluster_pipeline_cmd.cpp597
1 files changed, 285 insertions, 312 deletions
diff --git a/src/mongo/s/commands/cluster_pipeline_cmd.cpp b/src/mongo/s/commands/cluster_pipeline_cmd.cpp
index 60f7512d172..1d1fe933410 100644
--- a/src/mongo/s/commands/cluster_pipeline_cmd.cpp
+++ b/src/mongo/s/commands/cluster_pipeline_cmd.cpp
@@ -53,372 +53,345 @@
namespace mongo {
- using boost::intrusive_ptr;
- using std::unique_ptr;
- using std::shared_ptr;
- using std::string;
- using std::vector;
+using boost::intrusive_ptr;
+using std::unique_ptr;
+using std::shared_ptr;
+using std::string;
+using std::vector;
namespace {
- /**
- * Implements the aggregation (pipeline command for sharding).
- */
- class PipelineCommand : public Command {
- public:
- PipelineCommand() : Command(Pipeline::commandName, false) { }
+/**
+ * Implements the aggregation (pipeline command for sharding).
+ */
+class PipelineCommand : public Command {
+public:
+ PipelineCommand() : Command(Pipeline::commandName, false) {}
- virtual bool slaveOk() const {
- return true;
- }
+ virtual bool slaveOk() const {
+ return true;
+ }
- virtual bool adminOnly() const {
- return false;
- }
+ virtual bool adminOnly() const {
+ return false;
+ }
- virtual bool isWriteCommandForConfigServer() const {
- return false;
- }
+ virtual bool isWriteCommandForConfigServer() const {
+ return false;
+ }
- virtual void help(std::stringstream& help) const {
- help << "Runs the sharded aggregation command";
- }
+ virtual void help(std::stringstream& help) const {
+ help << "Runs the sharded aggregation command";
+ }
- // virtuals from Command
- virtual void addRequiredPrivileges(const std::string& dbname,
- const BSONObj& cmdObj,
- std::vector<Privilege>* out) {
+ // virtuals from Command
+ virtual void addRequiredPrivileges(const std::string& dbname,
+ const BSONObj& cmdObj,
+ std::vector<Privilege>* out) {
+ Pipeline::addRequiredPrivileges(this, dbname, cmdObj, out);
+ }
- Pipeline::addRequiredPrivileges(this, dbname, cmdObj, out);
+ virtual bool run(OperationContext* txn,
+ const std::string& dbname,
+ BSONObj& cmdObj,
+ int options,
+ std::string& errmsg,
+ BSONObjBuilder& result) {
+ const string fullns = parseNs(dbname, cmdObj);
+
+ auto status = grid.catalogCache()->getDatabase(dbname);
+ if (!status.isOK()) {
+ return appendEmptyResultSet(result, status.getStatus(), fullns);
}
- virtual bool run(OperationContext* txn,
- const std::string& dbname,
- BSONObj& cmdObj,
- int options,
- std::string& errmsg,
- BSONObjBuilder& result) {
-
- const string fullns = parseNs(dbname, cmdObj);
+ shared_ptr<DBConfig> conf = status.getValue();
- auto status = grid.catalogCache()->getDatabase(dbname);
- if (!status.isOK()) {
- return appendEmptyResultSet(result, status.getStatus(), fullns);
- }
-
- shared_ptr<DBConfig> conf = status.getValue();
-
- // If the system isn't running sharded, or the target collection isn't sharded, pass
- // this on to a mongod.
- if (!conf->isShardingEnabled() || !conf->isSharded(fullns)) {
- return aggPassthrough(conf, cmdObj, result, options);
- }
+ // If the system isn't running sharded, or the target collection isn't sharded, pass
+ // this on to a mongod.
+ if (!conf->isShardingEnabled() || !conf->isSharded(fullns)) {
+ return aggPassthrough(conf, cmdObj, result, options);
+ }
- intrusive_ptr<ExpressionContext> mergeCtx =
- new ExpressionContext(txn, NamespaceString(fullns));
- mergeCtx->inRouter = true;
- // explicitly *not* setting mergeCtx->tempDir
+ intrusive_ptr<ExpressionContext> mergeCtx =
+ new ExpressionContext(txn, NamespaceString(fullns));
+ mergeCtx->inRouter = true;
+ // explicitly *not* setting mergeCtx->tempDir
- // Parse the pipeline specification
- intrusive_ptr<Pipeline> pipeline(Pipeline::parseCommand(errmsg, cmdObj, mergeCtx));
- if (!pipeline.get()) {
- // There was some parsing error
- return false;
- }
+ // Parse the pipeline specification
+ intrusive_ptr<Pipeline> pipeline(Pipeline::parseCommand(errmsg, cmdObj, mergeCtx));
+ if (!pipeline.get()) {
+ // There was some parsing error
+ return false;
+ }
- // If the first $match stage is an exact match on the shard key, we only have to send it
- // to one shard, so send the command to that shard.
- BSONObj firstMatchQuery = pipeline->getInitialQuery();
- ChunkManagerPtr chunkMgr = conf->getChunkManager(fullns);
- BSONObj shardKeyMatches = uassertStatusOK(
- chunkMgr->getShardKeyPattern().extractShardKeyFromQuery(firstMatchQuery));
-
- // Don't need to split pipeline if the first $match is an exact match on shard key, but
- // we can't send the entire pipeline to one shard if there is a $out stage, since that
- // shard may not be the primary shard for the database.
- bool needSplit = shardKeyMatches.isEmpty() || pipeline->hasOutStage();
-
- // Split the pipeline into pieces for mongod(s) and this mongos. If needSplit is true,
- // 'pipeline' will become the merger side.
- intrusive_ptr<Pipeline> shardPipeline(needSplit ? pipeline->splitForSharded()
- : pipeline);
-
- // Create the command for the shards. The 'fromRouter' field means produce output to
- // be merged.
- MutableDocument commandBuilder(shardPipeline->serialize());
- if (needSplit) {
- commandBuilder.setField("fromRouter", Value(true));
- commandBuilder.setField("cursor", Value(DOC("batchSize" << 0)));
- }
- else {
- commandBuilder.setField("cursor", Value(cmdObj["cursor"]));
- }
+ // If the first $match stage is an exact match on the shard key, we only have to send it
+ // to one shard, so send the command to that shard.
+ BSONObj firstMatchQuery = pipeline->getInitialQuery();
+ ChunkManagerPtr chunkMgr = conf->getChunkManager(fullns);
+ BSONObj shardKeyMatches = uassertStatusOK(
+ chunkMgr->getShardKeyPattern().extractShardKeyFromQuery(firstMatchQuery));
+
+ // Don't need to split pipeline if the first $match is an exact match on shard key, but
+ // we can't send the entire pipeline to one shard if there is a $out stage, since that
+ // shard may not be the primary shard for the database.
+ bool needSplit = shardKeyMatches.isEmpty() || pipeline->hasOutStage();
+
+ // Split the pipeline into pieces for mongod(s) and this mongos. If needSplit is true,
+ // 'pipeline' will become the merger side.
+ intrusive_ptr<Pipeline> shardPipeline(needSplit ? pipeline->splitForSharded() : pipeline);
+
+ // Create the command for the shards. The 'fromRouter' field means produce output to
+ // be merged.
+ MutableDocument commandBuilder(shardPipeline->serialize());
+ if (needSplit) {
+ commandBuilder.setField("fromRouter", Value(true));
+ commandBuilder.setField("cursor", Value(DOC("batchSize" << 0)));
+ } else {
+ commandBuilder.setField("cursor", Value(cmdObj["cursor"]));
+ }
- if (cmdObj.hasField("$queryOptions")) {
- commandBuilder.setField("$queryOptions", Value(cmdObj["$queryOptions"]));
- }
+ if (cmdObj.hasField("$queryOptions")) {
+ commandBuilder.setField("$queryOptions", Value(cmdObj["$queryOptions"]));
+ }
- if (cmdObj.hasField(LiteParsedQuery::cmdOptionMaxTimeMS)) {
- commandBuilder.setField(LiteParsedQuery::cmdOptionMaxTimeMS,
- Value(cmdObj[LiteParsedQuery::cmdOptionMaxTimeMS]));
- }
+ if (cmdObj.hasField(LiteParsedQuery::cmdOptionMaxTimeMS)) {
+ commandBuilder.setField(LiteParsedQuery::cmdOptionMaxTimeMS,
+ Value(cmdObj[LiteParsedQuery::cmdOptionMaxTimeMS]));
+ }
- BSONObj shardedCommand = commandBuilder.freeze().toBson();
- BSONObj shardQuery = shardPipeline->getInitialQuery();
-
- // Run the command on the shards
- // TODO need to make sure cursors are killed if a retry is needed
- vector<Strategy::CommandResult> shardResults;
- Strategy::commandOp(dbname,
- shardedCommand,
- options,
- fullns,
- shardQuery,
- &shardResults);
-
- if (pipeline->isExplain()) {
- // This must be checked before we start modifying result.
- uassertAllShardsSupportExplain(shardResults);
-
- if (needSplit) {
- result << "splitPipeline"
- << DOC("shardsPart" << shardPipeline->writeExplainOps()
- << "mergerPart" << pipeline->writeExplainOps());
- }
- else {
- result << "splitPipeline" << BSONNULL;
- }
+ BSONObj shardedCommand = commandBuilder.freeze().toBson();
+ BSONObj shardQuery = shardPipeline->getInitialQuery();
- BSONObjBuilder shardExplains(result.subobjStart("shards"));
- for (size_t i = 0; i < shardResults.size(); i++) {
- shardExplains.append(shardResults[i].shardTargetId,
- BSON("host" << shardResults[i].target.toString() <<
- "stages" << shardResults[i].result["stages"]));
- }
+ // Run the command on the shards
+ // TODO need to make sure cursors are killed if a retry is needed
+ vector<Strategy::CommandResult> shardResults;
+ Strategy::commandOp(dbname, shardedCommand, options, fullns, shardQuery, &shardResults);
- return true;
- }
+ if (pipeline->isExplain()) {
+ // This must be checked before we start modifying result.
+ uassertAllShardsSupportExplain(shardResults);
- if (!needSplit) {
- invariant(shardResults.size() == 1);
- const auto& reply = shardResults[0].result;
- storePossibleCursor(shardResults[0].target.toString(), reply);
- result.appendElements(reply);
- return reply["ok"].trueValue();
+ if (needSplit) {
+ result << "splitPipeline"
+ << DOC("shardsPart" << shardPipeline->writeExplainOps() << "mergerPart"
+ << pipeline->writeExplainOps());
+ } else {
+ result << "splitPipeline" << BSONNULL;
}
- DocumentSourceMergeCursors::CursorIds cursorIds = parseCursors(shardResults, fullns);
- pipeline->addInitialSource(DocumentSourceMergeCursors::create(cursorIds, mergeCtx));
-
- MutableDocument mergeCmd(pipeline->serialize());
- mergeCmd["cursor"] = Value(cmdObj["cursor"]);
-
- if (cmdObj.hasField("$queryOptions")) {
- mergeCmd["$queryOptions"] = Value(cmdObj["$queryOptions"]);
+ BSONObjBuilder shardExplains(result.subobjStart("shards"));
+ for (size_t i = 0; i < shardResults.size(); i++) {
+ shardExplains.append(shardResults[i].shardTargetId,
+ BSON("host" << shardResults[i].target.toString() << "stages"
+ << shardResults[i].result["stages"]));
}
- if (cmdObj.hasField(LiteParsedQuery::cmdOptionMaxTimeMS)) {
- mergeCmd[LiteParsedQuery::cmdOptionMaxTimeMS]
- = Value(cmdObj[LiteParsedQuery::cmdOptionMaxTimeMS]);
- }
+ return true;
+ }
- string outputNsOrEmpty;
- if (DocumentSourceOut* out = dynamic_cast<DocumentSourceOut*>(pipeline->output())) {
- outputNsOrEmpty = out->getOutputNs().ns();
- }
+ if (!needSplit) {
+ invariant(shardResults.size() == 1);
+ const auto& reply = shardResults[0].result;
+ storePossibleCursor(shardResults[0].target.toString(), reply);
+ result.appendElements(reply);
+ return reply["ok"].trueValue();
+ }
- // Run merging command on primary shard of database. Need to use ShardConnection so
- // that the merging mongod is sent the config servers on connection init.
- const auto shard = grid.shardRegistry()->getShard(conf->getPrimaryId());
- ShardConnection conn(shard->getConnString(), outputNsOrEmpty);
- BSONObj mergedResults = aggRunCommand(conn.get(),
- dbname,
- mergeCmd.freeze().toBson(),
- options);
- conn.done();
+ DocumentSourceMergeCursors::CursorIds cursorIds = parseCursors(shardResults, fullns);
+ pipeline->addInitialSource(DocumentSourceMergeCursors::create(cursorIds, mergeCtx));
- // Copy output from merging (primary) shard to the output object from our command.
- // Also, propagates errmsg and code if ok == false.
- result.appendElements(mergedResults);
+ MutableDocument mergeCmd(pipeline->serialize());
+ mergeCmd["cursor"] = Value(cmdObj["cursor"]);
- return mergedResults["ok"].trueValue();
+ if (cmdObj.hasField("$queryOptions")) {
+ mergeCmd["$queryOptions"] = Value(cmdObj["$queryOptions"]);
}
- private:
- DocumentSourceMergeCursors::CursorIds parseCursors(
- const vector<Strategy::CommandResult>& shardResults,
- const string& fullns);
-
- void killAllCursors(const vector<Strategy::CommandResult>& shardResults);
- void uassertAllShardsSupportExplain(const vector<Strategy::CommandResult>& shardResults);
-
- // These are temporary hacks because the runCommand method doesn't report the exact
- // host the command was run on which is necessary for cursor support. The exact host
- // could be different from conn->getServerAddress() for connections that map to
- // multiple servers such as for replica sets. These also take care of registering
- // returned cursors with mongos's cursorCache.
- BSONObj aggRunCommand(DBClientBase* conn,
- const string& db,
- BSONObj cmd,
- int queryOptions);
-
- bool aggPassthrough(DBConfigPtr conf,
- BSONObj cmd,
- BSONObjBuilder& result,
- int queryOptions);
- } clusterPipelineCmd;
-
- DocumentSourceMergeCursors::CursorIds PipelineCommand::parseCursors(
- const vector<Strategy::CommandResult>& shardResults,
- const string& fullns) {
- try {
- DocumentSourceMergeCursors::CursorIds cursors;
-
- for (size_t i = 0; i < shardResults.size(); i++) {
- BSONObj result = shardResults[i].result;
-
- if (!result["ok"].trueValue()) {
- // If the failure of the sharded command can be accounted to a single error,
- // throw a UserException with that error code; otherwise, throw with a
- // location uassert code.
- int errCode = getUniqueCodeFromCommandResults(shardResults);
- if (errCode == 0) {
- errCode = 17022;
- }
-
- invariant(errCode == result["code"].numberInt() || errCode == 17022);
- uasserted(errCode, str::stream()
- << "sharded pipeline failed on shard "
- << shardResults[i].shardTargetId << ": "
- << result.toString());
- }
+ if (cmdObj.hasField(LiteParsedQuery::cmdOptionMaxTimeMS)) {
+ mergeCmd[LiteParsedQuery::cmdOptionMaxTimeMS] =
+ Value(cmdObj[LiteParsedQuery::cmdOptionMaxTimeMS]);
+ }
- BSONObj cursor = result["cursor"].Obj();
+ string outputNsOrEmpty;
+ if (DocumentSourceOut* out = dynamic_cast<DocumentSourceOut*>(pipeline->output())) {
+ outputNsOrEmpty = out->getOutputNs().ns();
+ }
- massert(17023,
- str::stream() << "shard " << shardResults[i].shardTargetId
- << " returned non-empty first batch",
- cursor["firstBatch"].Obj().isEmpty());
+ // Run merging command on primary shard of database. Need to use ShardConnection so
+ // that the merging mongod is sent the config servers on connection init.
+ const auto shard = grid.shardRegistry()->getShard(conf->getPrimaryId());
+ ShardConnection conn(shard->getConnString(), outputNsOrEmpty);
+ BSONObj mergedResults =
+ aggRunCommand(conn.get(), dbname, mergeCmd.freeze().toBson(), options);
+ conn.done();
- massert(17024,
- str::stream() << "shard " << shardResults[i].shardTargetId
- << " returned cursorId 0",
- cursor["id"].Long() != 0);
+ // Copy output from merging (primary) shard to the output object from our command.
+ // Also, propagates errmsg and code if ok == false.
+ result.appendElements(mergedResults);
- massert(17025,
- str::stream() << "shard " << shardResults[i].shardTargetId
- << " returned different ns: " << cursor["ns"],
- cursor["ns"].String() == fullns);
+ return mergedResults["ok"].trueValue();
+ }
- cursors.push_back(std::make_pair(shardResults[i].target, cursor["id"].Long()));
- }
+private:
+ DocumentSourceMergeCursors::CursorIds parseCursors(
+ const vector<Strategy::CommandResult>& shardResults, const string& fullns);
- return cursors;
- }
- catch (...) {
- // Need to clean up any cursors we successfully created on the shards
- killAllCursors(shardResults);
- throw;
- }
- }
+ void killAllCursors(const vector<Strategy::CommandResult>& shardResults);
+ void uassertAllShardsSupportExplain(const vector<Strategy::CommandResult>& shardResults);
- void PipelineCommand::uassertAllShardsSupportExplain(
- const vector<Strategy::CommandResult>& shardResults) {
+ // These are temporary hacks because the runCommand method doesn't report the exact
+ // host the command was run on which is necessary for cursor support. The exact host
+ // could be different from conn->getServerAddress() for connections that map to
+ // multiple servers such as for replica sets. These also take care of registering
+ // returned cursors with mongos's cursorCache.
+ BSONObj aggRunCommand(DBClientBase* conn, const string& db, BSONObj cmd, int queryOptions);
- for (size_t i = 0; i < shardResults.size(); i++) {
- uassert(17403,
- str::stream() << "Shard " << shardResults[i].target.toString()
- << " failed: " << shardResults[i].result,
- shardResults[i].result["ok"].trueValue());
-
- uassert(17404,
- str::stream() << "Shard " << shardResults[i].target.toString()
- << " does not support $explain",
- shardResults[i].result.hasField("stages"));
- }
- }
+ bool aggPassthrough(DBConfigPtr conf, BSONObj cmd, BSONObjBuilder& result, int queryOptions);
+} clusterPipelineCmd;
- void PipelineCommand::killAllCursors(const vector<Strategy::CommandResult>& shardResults) {
- // This function must ignore and log all errors. Callers expect a best-effort attempt at
- // cleanup without exceptions. If any cursors aren't cleaned up here, they will be cleaned
- // up automatically on the shard after 10 minutes anyway.
+DocumentSourceMergeCursors::CursorIds PipelineCommand::parseCursors(
+ const vector<Strategy::CommandResult>& shardResults, const string& fullns) {
+ try {
+ DocumentSourceMergeCursors::CursorIds cursors;
for (size_t i = 0; i < shardResults.size(); i++) {
- try {
- BSONObj result = shardResults[i].result;
- if (!result["ok"].trueValue()) {
- continue;
- }
-
- const long long cursor = result["cursor"]["id"].Long();
- if (!cursor) {
- continue;
+ BSONObj result = shardResults[i].result;
+
+ if (!result["ok"].trueValue()) {
+ // If the failure of the sharded command can be accounted to a single error,
+ // throw a UserException with that error code; otherwise, throw with a
+ // location uassert code.
+ int errCode = getUniqueCodeFromCommandResults(shardResults);
+ if (errCode == 0) {
+ errCode = 17022;
}
- ScopedDbConnection conn(shardResults[i].target);
- conn->killCursor(cursor);
- conn.done();
- }
- catch (const DBException& e) {
- log() << "Couldn't kill aggregation cursor on shard: " << shardResults[i].target
- << " due to DBException: " << e.toString();
- }
- catch (const std::exception& e) {
- log() << "Couldn't kill aggregation cursor on shard: " << shardResults[i].target
- << " due to std::exception: " << e.what();
+ invariant(errCode == result["code"].numberInt() || errCode == 17022);
+ uasserted(errCode,
+ str::stream() << "sharded pipeline failed on shard "
+ << shardResults[i].shardTargetId << ": "
+ << result.toString());
}
- catch (...) {
- log() << "Couldn't kill aggregation cursor on shard: " << shardResults[i].target
- << " due to non-exception";
- }
- }
- }
- BSONObj PipelineCommand::aggRunCommand(DBClientBase* conn,
- const string& db,
- BSONObj cmd,
- int queryOptions) {
+ BSONObj cursor = result["cursor"].Obj();
- // Temporary hack. See comment on declaration for details.
+ massert(17023,
+ str::stream() << "shard " << shardResults[i].shardTargetId
+ << " returned non-empty first batch",
+ cursor["firstBatch"].Obj().isEmpty());
- massert(17016,
- "should only be running an aggregate command here",
- str::equals(cmd.firstElementFieldName(), "aggregate"));
+ massert(17024,
+ str::stream() << "shard " << shardResults[i].shardTargetId
+ << " returned cursorId 0",
+ cursor["id"].Long() != 0);
- auto cursor = conn->query(db + ".$cmd",
- cmd,
- -1, // nToReturn
- 0, // nToSkip
- NULL, // fieldsToReturn
- queryOptions);
- massert(17014,
- str::stream() << "aggregate command didn't return results on host: "
- << conn->toString(),
- cursor && cursor->more());
+ massert(17025,
+ str::stream() << "shard " << shardResults[i].shardTargetId
+ << " returned different ns: " << cursor["ns"],
+ cursor["ns"].String() == fullns);
- BSONObj result = cursor->nextSafe().getOwned();
-
- if (ErrorCodes::SendStaleConfig == getStatusFromCommandResult(result)) {
- throw RecvStaleConfigException("command failed because of stale config", result);
+ cursors.push_back(std::make_pair(shardResults[i].target, cursor["id"].Long()));
}
- uassertStatusOK(storePossibleCursor(cursor->originalHost(), result));
- return result;
+ return cursors;
+ } catch (...) {
+ // Need to clean up any cursors we successfully created on the shards
+ killAllCursors(shardResults);
+ throw;
+ }
+}
+
+void PipelineCommand::uassertAllShardsSupportExplain(
+ const vector<Strategy::CommandResult>& shardResults) {
+ for (size_t i = 0; i < shardResults.size(); i++) {
+ uassert(17403,
+ str::stream() << "Shard " << shardResults[i].target.toString()
+ << " failed: " << shardResults[i].result,
+ shardResults[i].result["ok"].trueValue());
+
+ uassert(17404,
+ str::stream() << "Shard " << shardResults[i].target.toString()
+ << " does not support $explain",
+ shardResults[i].result.hasField("stages"));
}
+}
- bool PipelineCommand::aggPassthrough(DBConfigPtr conf,
- BSONObj cmd,
- BSONObjBuilder& out,
- int queryOptions) {
+void PipelineCommand::killAllCursors(const vector<Strategy::CommandResult>& shardResults) {
+ // This function must ignore and log all errors. Callers expect a best-effort attempt at
+ // cleanup without exceptions. If any cursors aren't cleaned up here, they will be cleaned
+ // up automatically on the shard after 10 minutes anyway.
- // Temporary hack. See comment on declaration for details.
- const auto shard = grid.shardRegistry()->getShard(conf->getPrimaryId());
- ShardConnection conn(shard->getConnString(), "");
- BSONObj result = aggRunCommand(conn.get(), conf->name(), cmd, queryOptions);
- conn.done();
- out.appendElements(result);
- return result["ok"].trueValue();
+ for (size_t i = 0; i < shardResults.size(); i++) {
+ try {
+ BSONObj result = shardResults[i].result;
+ if (!result["ok"].trueValue()) {
+ continue;
+ }
+
+ const long long cursor = result["cursor"]["id"].Long();
+ if (!cursor) {
+ continue;
+ }
+
+ ScopedDbConnection conn(shardResults[i].target);
+ conn->killCursor(cursor);
+ conn.done();
+ } catch (const DBException& e) {
+ log() << "Couldn't kill aggregation cursor on shard: " << shardResults[i].target
+ << " due to DBException: " << e.toString();
+ } catch (const std::exception& e) {
+ log() << "Couldn't kill aggregation cursor on shard: " << shardResults[i].target
+ << " due to std::exception: " << e.what();
+ } catch (...) {
+ log() << "Couldn't kill aggregation cursor on shard: " << shardResults[i].target
+ << " due to non-exception";
+ }
+ }
+}
+
+BSONObj PipelineCommand::aggRunCommand(DBClientBase* conn,
+ const string& db,
+ BSONObj cmd,
+ int queryOptions) {
+ // Temporary hack. See comment on declaration for details.
+
+ massert(17016,
+ "should only be running an aggregate command here",
+ str::equals(cmd.firstElementFieldName(), "aggregate"));
+
+ auto cursor = conn->query(db + ".$cmd",
+ cmd,
+ -1, // nToReturn
+ 0, // nToSkip
+ NULL, // fieldsToReturn
+ queryOptions);
+ massert(
+ 17014,
+ str::stream() << "aggregate command didn't return results on host: " << conn->toString(),
+ cursor && cursor->more());
+
+ BSONObj result = cursor->nextSafe().getOwned();
+
+ if (ErrorCodes::SendStaleConfig == getStatusFromCommandResult(result)) {
+ throw RecvStaleConfigException("command failed because of stale config", result);
}
-} // namespace
-} // namespace mongo
+ uassertStatusOK(storePossibleCursor(cursor->originalHost(), result));
+ return result;
+}
+
+bool PipelineCommand::aggPassthrough(DBConfigPtr conf,
+ BSONObj cmd,
+ BSONObjBuilder& out,
+ int queryOptions) {
+ // Temporary hack. See comment on declaration for details.
+ const auto shard = grid.shardRegistry()->getShard(conf->getPrimaryId());
+ ShardConnection conn(shard->getConnString(), "");
+ BSONObj result = aggRunCommand(conn.get(), conf->name(), cmd, queryOptions);
+ conn.done();
+ out.appendElements(result);
+ return result["ok"].trueValue();
+}
+
+} // namespace
+} // namespace mongo