diff options
Diffstat (limited to 'src/mongo/s/commands/cluster_pipeline_cmd.cpp')
-rw-r--r-- | src/mongo/s/commands/cluster_pipeline_cmd.cpp | 597 |
1 files changed, 285 insertions, 312 deletions
diff --git a/src/mongo/s/commands/cluster_pipeline_cmd.cpp b/src/mongo/s/commands/cluster_pipeline_cmd.cpp index 60f7512d172..1d1fe933410 100644 --- a/src/mongo/s/commands/cluster_pipeline_cmd.cpp +++ b/src/mongo/s/commands/cluster_pipeline_cmd.cpp @@ -53,372 +53,345 @@ namespace mongo { - using boost::intrusive_ptr; - using std::unique_ptr; - using std::shared_ptr; - using std::string; - using std::vector; +using boost::intrusive_ptr; +using std::unique_ptr; +using std::shared_ptr; +using std::string; +using std::vector; namespace { - /** - * Implements the aggregation (pipeline command for sharding). - */ - class PipelineCommand : public Command { - public: - PipelineCommand() : Command(Pipeline::commandName, false) { } +/** + * Implements the aggregation (pipeline command for sharding). + */ +class PipelineCommand : public Command { +public: + PipelineCommand() : Command(Pipeline::commandName, false) {} - virtual bool slaveOk() const { - return true; - } + virtual bool slaveOk() const { + return true; + } - virtual bool adminOnly() const { - return false; - } + virtual bool adminOnly() const { + return false; + } - virtual bool isWriteCommandForConfigServer() const { - return false; - } + virtual bool isWriteCommandForConfigServer() const { + return false; + } - virtual void help(std::stringstream& help) const { - help << "Runs the sharded aggregation command"; - } + virtual void help(std::stringstream& help) const { + help << "Runs the sharded aggregation command"; + } - // virtuals from Command - virtual void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) { + // virtuals from Command + virtual void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) { + Pipeline::addRequiredPrivileges(this, dbname, cmdObj, out); + } - Pipeline::addRequiredPrivileges(this, dbname, cmdObj, out); + virtual bool run(OperationContext* txn, + const std::string& dbname, + BSONObj& cmdObj, + int options, + std::string& errmsg, + BSONObjBuilder& result) { + const string fullns = parseNs(dbname, cmdObj); + + auto status = grid.catalogCache()->getDatabase(dbname); + if (!status.isOK()) { + return appendEmptyResultSet(result, status.getStatus(), fullns); } - virtual bool run(OperationContext* txn, - const std::string& dbname, - BSONObj& cmdObj, - int options, - std::string& errmsg, - BSONObjBuilder& result) { - - const string fullns = parseNs(dbname, cmdObj); + shared_ptr<DBConfig> conf = status.getValue(); - auto status = grid.catalogCache()->getDatabase(dbname); - if (!status.isOK()) { - return appendEmptyResultSet(result, status.getStatus(), fullns); - } - - shared_ptr<DBConfig> conf = status.getValue(); - - // If the system isn't running sharded, or the target collection isn't sharded, pass - // this on to a mongod. - if (!conf->isShardingEnabled() || !conf->isSharded(fullns)) { - return aggPassthrough(conf, cmdObj, result, options); - } + // If the system isn't running sharded, or the target collection isn't sharded, pass + // this on to a mongod. + if (!conf->isShardingEnabled() || !conf->isSharded(fullns)) { + return aggPassthrough(conf, cmdObj, result, options); + } - intrusive_ptr<ExpressionContext> mergeCtx = - new ExpressionContext(txn, NamespaceString(fullns)); - mergeCtx->inRouter = true; - // explicitly *not* setting mergeCtx->tempDir + intrusive_ptr<ExpressionContext> mergeCtx = + new ExpressionContext(txn, NamespaceString(fullns)); + mergeCtx->inRouter = true; + // explicitly *not* setting mergeCtx->tempDir - // Parse the pipeline specification - intrusive_ptr<Pipeline> pipeline(Pipeline::parseCommand(errmsg, cmdObj, mergeCtx)); - if (!pipeline.get()) { - // There was some parsing error - return false; - } + // Parse the pipeline specification + intrusive_ptr<Pipeline> pipeline(Pipeline::parseCommand(errmsg, cmdObj, mergeCtx)); + if (!pipeline.get()) { + // There was some parsing error + return false; + } - // If the first $match stage is an exact match on the shard key, we only have to send it - // to one shard, so send the command to that shard. - BSONObj firstMatchQuery = pipeline->getInitialQuery(); - ChunkManagerPtr chunkMgr = conf->getChunkManager(fullns); - BSONObj shardKeyMatches = uassertStatusOK( - chunkMgr->getShardKeyPattern().extractShardKeyFromQuery(firstMatchQuery)); - - // Don't need to split pipeline if the first $match is an exact match on shard key, but - // we can't send the entire pipeline to one shard if there is a $out stage, since that - // shard may not be the primary shard for the database. - bool needSplit = shardKeyMatches.isEmpty() || pipeline->hasOutStage(); - - // Split the pipeline into pieces for mongod(s) and this mongos. If needSplit is true, - // 'pipeline' will become the merger side. - intrusive_ptr<Pipeline> shardPipeline(needSplit ? pipeline->splitForSharded() - : pipeline); - - // Create the command for the shards. The 'fromRouter' field means produce output to - // be merged. - MutableDocument commandBuilder(shardPipeline->serialize()); - if (needSplit) { - commandBuilder.setField("fromRouter", Value(true)); - commandBuilder.setField("cursor", Value(DOC("batchSize" << 0))); - } - else { - commandBuilder.setField("cursor", Value(cmdObj["cursor"])); - } + // If the first $match stage is an exact match on the shard key, we only have to send it + // to one shard, so send the command to that shard. + BSONObj firstMatchQuery = pipeline->getInitialQuery(); + ChunkManagerPtr chunkMgr = conf->getChunkManager(fullns); + BSONObj shardKeyMatches = uassertStatusOK( + chunkMgr->getShardKeyPattern().extractShardKeyFromQuery(firstMatchQuery)); + + // Don't need to split pipeline if the first $match is an exact match on shard key, but + // we can't send the entire pipeline to one shard if there is a $out stage, since that + // shard may not be the primary shard for the database. + bool needSplit = shardKeyMatches.isEmpty() || pipeline->hasOutStage(); + + // Split the pipeline into pieces for mongod(s) and this mongos. If needSplit is true, + // 'pipeline' will become the merger side. + intrusive_ptr<Pipeline> shardPipeline(needSplit ? pipeline->splitForSharded() : pipeline); + + // Create the command for the shards. The 'fromRouter' field means produce output to + // be merged. + MutableDocument commandBuilder(shardPipeline->serialize()); + if (needSplit) { + commandBuilder.setField("fromRouter", Value(true)); + commandBuilder.setField("cursor", Value(DOC("batchSize" << 0))); + } else { + commandBuilder.setField("cursor", Value(cmdObj["cursor"])); + } - if (cmdObj.hasField("$queryOptions")) { - commandBuilder.setField("$queryOptions", Value(cmdObj["$queryOptions"])); - } + if (cmdObj.hasField("$queryOptions")) { + commandBuilder.setField("$queryOptions", Value(cmdObj["$queryOptions"])); + } - if (cmdObj.hasField(LiteParsedQuery::cmdOptionMaxTimeMS)) { - commandBuilder.setField(LiteParsedQuery::cmdOptionMaxTimeMS, - Value(cmdObj[LiteParsedQuery::cmdOptionMaxTimeMS])); - } + if (cmdObj.hasField(LiteParsedQuery::cmdOptionMaxTimeMS)) { + commandBuilder.setField(LiteParsedQuery::cmdOptionMaxTimeMS, + Value(cmdObj[LiteParsedQuery::cmdOptionMaxTimeMS])); + } - BSONObj shardedCommand = commandBuilder.freeze().toBson(); - BSONObj shardQuery = shardPipeline->getInitialQuery(); - - // Run the command on the shards - // TODO need to make sure cursors are killed if a retry is needed - vector<Strategy::CommandResult> shardResults; - Strategy::commandOp(dbname, - shardedCommand, - options, - fullns, - shardQuery, - &shardResults); - - if (pipeline->isExplain()) { - // This must be checked before we start modifying result. - uassertAllShardsSupportExplain(shardResults); - - if (needSplit) { - result << "splitPipeline" - << DOC("shardsPart" << shardPipeline->writeExplainOps() - << "mergerPart" << pipeline->writeExplainOps()); - } - else { - result << "splitPipeline" << BSONNULL; - } + BSONObj shardedCommand = commandBuilder.freeze().toBson(); + BSONObj shardQuery = shardPipeline->getInitialQuery(); - BSONObjBuilder shardExplains(result.subobjStart("shards")); - for (size_t i = 0; i < shardResults.size(); i++) { - shardExplains.append(shardResults[i].shardTargetId, - BSON("host" << shardResults[i].target.toString() << - "stages" << shardResults[i].result["stages"])); - } + // Run the command on the shards + // TODO need to make sure cursors are killed if a retry is needed + vector<Strategy::CommandResult> shardResults; + Strategy::commandOp(dbname, shardedCommand, options, fullns, shardQuery, &shardResults); - return true; - } + if (pipeline->isExplain()) { + // This must be checked before we start modifying result. + uassertAllShardsSupportExplain(shardResults); - if (!needSplit) { - invariant(shardResults.size() == 1); - const auto& reply = shardResults[0].result; - storePossibleCursor(shardResults[0].target.toString(), reply); - result.appendElements(reply); - return reply["ok"].trueValue(); + if (needSplit) { + result << "splitPipeline" + << DOC("shardsPart" << shardPipeline->writeExplainOps() << "mergerPart" + << pipeline->writeExplainOps()); + } else { + result << "splitPipeline" << BSONNULL; } - DocumentSourceMergeCursors::CursorIds cursorIds = parseCursors(shardResults, fullns); - pipeline->addInitialSource(DocumentSourceMergeCursors::create(cursorIds, mergeCtx)); - - MutableDocument mergeCmd(pipeline->serialize()); - mergeCmd["cursor"] = Value(cmdObj["cursor"]); - - if (cmdObj.hasField("$queryOptions")) { - mergeCmd["$queryOptions"] = Value(cmdObj["$queryOptions"]); + BSONObjBuilder shardExplains(result.subobjStart("shards")); + for (size_t i = 0; i < shardResults.size(); i++) { + shardExplains.append(shardResults[i].shardTargetId, + BSON("host" << shardResults[i].target.toString() << "stages" + << shardResults[i].result["stages"])); } - if (cmdObj.hasField(LiteParsedQuery::cmdOptionMaxTimeMS)) { - mergeCmd[LiteParsedQuery::cmdOptionMaxTimeMS] - = Value(cmdObj[LiteParsedQuery::cmdOptionMaxTimeMS]); - } + return true; + } - string outputNsOrEmpty; - if (DocumentSourceOut* out = dynamic_cast<DocumentSourceOut*>(pipeline->output())) { - outputNsOrEmpty = out->getOutputNs().ns(); - } + if (!needSplit) { + invariant(shardResults.size() == 1); + const auto& reply = shardResults[0].result; + storePossibleCursor(shardResults[0].target.toString(), reply); + result.appendElements(reply); + return reply["ok"].trueValue(); + } - // Run merging command on primary shard of database. Need to use ShardConnection so - // that the merging mongod is sent the config servers on connection init. - const auto shard = grid.shardRegistry()->getShard(conf->getPrimaryId()); - ShardConnection conn(shard->getConnString(), outputNsOrEmpty); - BSONObj mergedResults = aggRunCommand(conn.get(), - dbname, - mergeCmd.freeze().toBson(), - options); - conn.done(); + DocumentSourceMergeCursors::CursorIds cursorIds = parseCursors(shardResults, fullns); + pipeline->addInitialSource(DocumentSourceMergeCursors::create(cursorIds, mergeCtx)); - // Copy output from merging (primary) shard to the output object from our command. - // Also, propagates errmsg and code if ok == false. - result.appendElements(mergedResults); + MutableDocument mergeCmd(pipeline->serialize()); + mergeCmd["cursor"] = Value(cmdObj["cursor"]); - return mergedResults["ok"].trueValue(); + if (cmdObj.hasField("$queryOptions")) { + mergeCmd["$queryOptions"] = Value(cmdObj["$queryOptions"]); } - private: - DocumentSourceMergeCursors::CursorIds parseCursors( - const vector<Strategy::CommandResult>& shardResults, - const string& fullns); - - void killAllCursors(const vector<Strategy::CommandResult>& shardResults); - void uassertAllShardsSupportExplain(const vector<Strategy::CommandResult>& shardResults); - - // These are temporary hacks because the runCommand method doesn't report the exact - // host the command was run on which is necessary for cursor support. The exact host - // could be different from conn->getServerAddress() for connections that map to - // multiple servers such as for replica sets. These also take care of registering - // returned cursors with mongos's cursorCache. - BSONObj aggRunCommand(DBClientBase* conn, - const string& db, - BSONObj cmd, - int queryOptions); - - bool aggPassthrough(DBConfigPtr conf, - BSONObj cmd, - BSONObjBuilder& result, - int queryOptions); - } clusterPipelineCmd; - - DocumentSourceMergeCursors::CursorIds PipelineCommand::parseCursors( - const vector<Strategy::CommandResult>& shardResults, - const string& fullns) { - try { - DocumentSourceMergeCursors::CursorIds cursors; - - for (size_t i = 0; i < shardResults.size(); i++) { - BSONObj result = shardResults[i].result; - - if (!result["ok"].trueValue()) { - // If the failure of the sharded command can be accounted to a single error, - // throw a UserException with that error code; otherwise, throw with a - // location uassert code. - int errCode = getUniqueCodeFromCommandResults(shardResults); - if (errCode == 0) { - errCode = 17022; - } - - invariant(errCode == result["code"].numberInt() || errCode == 17022); - uasserted(errCode, str::stream() - << "sharded pipeline failed on shard " - << shardResults[i].shardTargetId << ": " - << result.toString()); - } + if (cmdObj.hasField(LiteParsedQuery::cmdOptionMaxTimeMS)) { + mergeCmd[LiteParsedQuery::cmdOptionMaxTimeMS] = + Value(cmdObj[LiteParsedQuery::cmdOptionMaxTimeMS]); + } - BSONObj cursor = result["cursor"].Obj(); + string outputNsOrEmpty; + if (DocumentSourceOut* out = dynamic_cast<DocumentSourceOut*>(pipeline->output())) { + outputNsOrEmpty = out->getOutputNs().ns(); + } - massert(17023, - str::stream() << "shard " << shardResults[i].shardTargetId - << " returned non-empty first batch", - cursor["firstBatch"].Obj().isEmpty()); + // Run merging command on primary shard of database. Need to use ShardConnection so + // that the merging mongod is sent the config servers on connection init. + const auto shard = grid.shardRegistry()->getShard(conf->getPrimaryId()); + ShardConnection conn(shard->getConnString(), outputNsOrEmpty); + BSONObj mergedResults = + aggRunCommand(conn.get(), dbname, mergeCmd.freeze().toBson(), options); + conn.done(); - massert(17024, - str::stream() << "shard " << shardResults[i].shardTargetId - << " returned cursorId 0", - cursor["id"].Long() != 0); + // Copy output from merging (primary) shard to the output object from our command. + // Also, propagates errmsg and code if ok == false. + result.appendElements(mergedResults); - massert(17025, - str::stream() << "shard " << shardResults[i].shardTargetId - << " returned different ns: " << cursor["ns"], - cursor["ns"].String() == fullns); + return mergedResults["ok"].trueValue(); + } - cursors.push_back(std::make_pair(shardResults[i].target, cursor["id"].Long())); - } +private: + DocumentSourceMergeCursors::CursorIds parseCursors( + const vector<Strategy::CommandResult>& shardResults, const string& fullns); - return cursors; - } - catch (...) { - // Need to clean up any cursors we successfully created on the shards - killAllCursors(shardResults); - throw; - } - } + void killAllCursors(const vector<Strategy::CommandResult>& shardResults); + void uassertAllShardsSupportExplain(const vector<Strategy::CommandResult>& shardResults); - void PipelineCommand::uassertAllShardsSupportExplain( - const vector<Strategy::CommandResult>& shardResults) { + // These are temporary hacks because the runCommand method doesn't report the exact + // host the command was run on which is necessary for cursor support. The exact host + // could be different from conn->getServerAddress() for connections that map to + // multiple servers such as for replica sets. These also take care of registering + // returned cursors with mongos's cursorCache. + BSONObj aggRunCommand(DBClientBase* conn, const string& db, BSONObj cmd, int queryOptions); - for (size_t i = 0; i < shardResults.size(); i++) { - uassert(17403, - str::stream() << "Shard " << shardResults[i].target.toString() - << " failed: " << shardResults[i].result, - shardResults[i].result["ok"].trueValue()); - - uassert(17404, - str::stream() << "Shard " << shardResults[i].target.toString() - << " does not support $explain", - shardResults[i].result.hasField("stages")); - } - } + bool aggPassthrough(DBConfigPtr conf, BSONObj cmd, BSONObjBuilder& result, int queryOptions); +} clusterPipelineCmd; - void PipelineCommand::killAllCursors(const vector<Strategy::CommandResult>& shardResults) { - // This function must ignore and log all errors. Callers expect a best-effort attempt at - // cleanup without exceptions. If any cursors aren't cleaned up here, they will be cleaned - // up automatically on the shard after 10 minutes anyway. +DocumentSourceMergeCursors::CursorIds PipelineCommand::parseCursors( + const vector<Strategy::CommandResult>& shardResults, const string& fullns) { + try { + DocumentSourceMergeCursors::CursorIds cursors; for (size_t i = 0; i < shardResults.size(); i++) { - try { - BSONObj result = shardResults[i].result; - if (!result["ok"].trueValue()) { - continue; - } - - const long long cursor = result["cursor"]["id"].Long(); - if (!cursor) { - continue; + BSONObj result = shardResults[i].result; + + if (!result["ok"].trueValue()) { + // If the failure of the sharded command can be accounted to a single error, + // throw a UserException with that error code; otherwise, throw with a + // location uassert code. + int errCode = getUniqueCodeFromCommandResults(shardResults); + if (errCode == 0) { + errCode = 17022; } - ScopedDbConnection conn(shardResults[i].target); - conn->killCursor(cursor); - conn.done(); - } - catch (const DBException& e) { - log() << "Couldn't kill aggregation cursor on shard: " << shardResults[i].target - << " due to DBException: " << e.toString(); - } - catch (const std::exception& e) { - log() << "Couldn't kill aggregation cursor on shard: " << shardResults[i].target - << " due to std::exception: " << e.what(); + invariant(errCode == result["code"].numberInt() || errCode == 17022); + uasserted(errCode, + str::stream() << "sharded pipeline failed on shard " + << shardResults[i].shardTargetId << ": " + << result.toString()); } - catch (...) { - log() << "Couldn't kill aggregation cursor on shard: " << shardResults[i].target - << " due to non-exception"; - } - } - } - BSONObj PipelineCommand::aggRunCommand(DBClientBase* conn, - const string& db, - BSONObj cmd, - int queryOptions) { + BSONObj cursor = result["cursor"].Obj(); - // Temporary hack. See comment on declaration for details. + massert(17023, + str::stream() << "shard " << shardResults[i].shardTargetId + << " returned non-empty first batch", + cursor["firstBatch"].Obj().isEmpty()); - massert(17016, - "should only be running an aggregate command here", - str::equals(cmd.firstElementFieldName(), "aggregate")); + massert(17024, + str::stream() << "shard " << shardResults[i].shardTargetId + << " returned cursorId 0", + cursor["id"].Long() != 0); - auto cursor = conn->query(db + ".$cmd", - cmd, - -1, // nToReturn - 0, // nToSkip - NULL, // fieldsToReturn - queryOptions); - massert(17014, - str::stream() << "aggregate command didn't return results on host: " - << conn->toString(), - cursor && cursor->more()); + massert(17025, + str::stream() << "shard " << shardResults[i].shardTargetId + << " returned different ns: " << cursor["ns"], + cursor["ns"].String() == fullns); - BSONObj result = cursor->nextSafe().getOwned(); - - if (ErrorCodes::SendStaleConfig == getStatusFromCommandResult(result)) { - throw RecvStaleConfigException("command failed because of stale config", result); + cursors.push_back(std::make_pair(shardResults[i].target, cursor["id"].Long())); } - uassertStatusOK(storePossibleCursor(cursor->originalHost(), result)); - return result; + return cursors; + } catch (...) { + // Need to clean up any cursors we successfully created on the shards + killAllCursors(shardResults); + throw; + } +} + +void PipelineCommand::uassertAllShardsSupportExplain( + const vector<Strategy::CommandResult>& shardResults) { + for (size_t i = 0; i < shardResults.size(); i++) { + uassert(17403, + str::stream() << "Shard " << shardResults[i].target.toString() + << " failed: " << shardResults[i].result, + shardResults[i].result["ok"].trueValue()); + + uassert(17404, + str::stream() << "Shard " << shardResults[i].target.toString() + << " does not support $explain", + shardResults[i].result.hasField("stages")); } +} - bool PipelineCommand::aggPassthrough(DBConfigPtr conf, - BSONObj cmd, - BSONObjBuilder& out, - int queryOptions) { +void PipelineCommand::killAllCursors(const vector<Strategy::CommandResult>& shardResults) { + // This function must ignore and log all errors. Callers expect a best-effort attempt at + // cleanup without exceptions. If any cursors aren't cleaned up here, they will be cleaned + // up automatically on the shard after 10 minutes anyway. - // Temporary hack. See comment on declaration for details. - const auto shard = grid.shardRegistry()->getShard(conf->getPrimaryId()); - ShardConnection conn(shard->getConnString(), ""); - BSONObj result = aggRunCommand(conn.get(), conf->name(), cmd, queryOptions); - conn.done(); - out.appendElements(result); - return result["ok"].trueValue(); + for (size_t i = 0; i < shardResults.size(); i++) { + try { + BSONObj result = shardResults[i].result; + if (!result["ok"].trueValue()) { + continue; + } + + const long long cursor = result["cursor"]["id"].Long(); + if (!cursor) { + continue; + } + + ScopedDbConnection conn(shardResults[i].target); + conn->killCursor(cursor); + conn.done(); + } catch (const DBException& e) { + log() << "Couldn't kill aggregation cursor on shard: " << shardResults[i].target + << " due to DBException: " << e.toString(); + } catch (const std::exception& e) { + log() << "Couldn't kill aggregation cursor on shard: " << shardResults[i].target + << " due to std::exception: " << e.what(); + } catch (...) { + log() << "Couldn't kill aggregation cursor on shard: " << shardResults[i].target + << " due to non-exception"; + } + } +} + +BSONObj PipelineCommand::aggRunCommand(DBClientBase* conn, + const string& db, + BSONObj cmd, + int queryOptions) { + // Temporary hack. See comment on declaration for details. + + massert(17016, + "should only be running an aggregate command here", + str::equals(cmd.firstElementFieldName(), "aggregate")); + + auto cursor = conn->query(db + ".$cmd", + cmd, + -1, // nToReturn + 0, // nToSkip + NULL, // fieldsToReturn + queryOptions); + massert( + 17014, + str::stream() << "aggregate command didn't return results on host: " << conn->toString(), + cursor && cursor->more()); + + BSONObj result = cursor->nextSafe().getOwned(); + + if (ErrorCodes::SendStaleConfig == getStatusFromCommandResult(result)) { + throw RecvStaleConfigException("command failed because of stale config", result); } -} // namespace -} // namespace mongo + uassertStatusOK(storePossibleCursor(cursor->originalHost(), result)); + return result; +} + +bool PipelineCommand::aggPassthrough(DBConfigPtr conf, + BSONObj cmd, + BSONObjBuilder& out, + int queryOptions) { + // Temporary hack. See comment on declaration for details. + const auto shard = grid.shardRegistry()->getShard(conf->getPrimaryId()); + ShardConnection conn(shard->getConnString(), ""); + BSONObj result = aggRunCommand(conn.get(), conf->name(), cmd, queryOptions); + conn.done(); + out.appendElements(result); + return result["ok"].trueValue(); +} + +} // namespace +} // namespace mongo |