diff options
author | Charlie Swanson <charlie.swanson@mongodb.com> | 2015-12-21 13:46:17 -0500 |
---|---|---|
committer | Charlie Swanson <charlie.swanson@mongodb.com> | 2016-01-13 12:04:23 -0500 |
commit | 3eeeaec1d1fbb41d1914e82deca2cac4fccdddc2 (patch) | |
tree | fa1fa46a7b9c37a0dd4db622a8311b0cf48c4594 /src/mongo/s/commands_public.cpp | |
parent | f8f5dab11c2ed931add8d34e5e5dcc666e1ed5c8 (diff) | |
download | mongo-3eeeaec1d1fbb41d1914e82deca2cac4fccdddc2.tar.gz |
SERVER-7656 Execute aggregation command on targeted shard if first stage is an exact match on shard key
Diffstat (limited to 'src/mongo/s/commands_public.cpp')
-rw-r--r-- | src/mongo/s/commands_public.cpp | 54 |
1 files changed, 41 insertions, 13 deletions
diff --git a/src/mongo/s/commands_public.cpp b/src/mongo/s/commands_public.cpp index f2d3789c2a6..7a5a8281717 100644 --- a/src/mongo/s/commands_public.cpp +++ b/src/mongo/s/commands_public.cpp @@ -2525,20 +2525,36 @@ bool PipelineCommand::run(OperationContext* txn, return aggPassthrough(conf, cmdObj, result, options); /* split the pipeline into pieces for mongods and this mongos */ - intrusive_ptr<Pipeline> pShardPipeline(pPipeline->splitForSharded()); + BSONObj firstMatchQuery = pPipeline->getInitialQuery(); + ChunkManagerPtr chunkMgr = conf->getChunkManager(fullns); + StatusWith<BSONObj> swShardKeyMatches = + chunkMgr->getShardKeyPattern().extractShardKeyFromQuery(firstMatchQuery); + uassertStatusOK(swShardKeyMatches.getStatus()); + const BSONObj& shardKeyMatches = swShardKeyMatches.getValue(); + + // Don't need to split pipeline if the first $match is an exact match on shard key, unless + // there is a stage that needs to be run on the primary shard. + const bool needSplit = shardKeyMatches.isEmpty() || pPipeline->hasOutStage(); + intrusive_ptr<Pipeline> pShardPipeline(needSplit ? pPipeline->splitForSharded() : pPipeline); // create the command for the shards MutableDocument commandBuilder(pShardPipeline->serialize()); - commandBuilder.setField("fromRouter", Value(true)); // this means produce output to be merged + if (needSplit) { + // this means produce output to be merged + commandBuilder.setField("fromRouter", Value(true)); - if (cmdObj.hasField("$queryOptions")) { - commandBuilder.setField("$queryOptions", Value(cmdObj["$queryOptions"])); + if (!pPipeline->isExplain()) { + // "cursor" is ignored by 2.6 shards when doing explain, but including it leads to a + // worse error message when talking to 2.4 shards. + commandBuilder.setField("cursor", Value(DOC("batchSize" << 0))); + } + } else { + // If we don't need to split, just send the command directly to the shard, as is. + commandBuilder.setField("cursor", Value(cmdObj["cursor"])); } - if (!pPipeline->isExplain()) { - // "cursor" is ignored by 2.6 shards when doing explain, but including it leads to a - // worse error message when talking to 2.4 shards. - commandBuilder.setField("cursor", Value(DOC("batchSize" << 0))); + if (cmdObj.hasField("$queryOptions")) { + commandBuilder.setField("$queryOptions", Value(cmdObj["$queryOptions"])); } if (cmdObj.hasField(LiteParsedQuery::cmdOptionMaxTimeMS)) { @@ -2547,20 +2563,23 @@ bool PipelineCommand::run(OperationContext* txn, } BSONObj shardedCommand = commandBuilder.freeze().toBson(); - BSONObj shardQuery = pShardPipeline->getInitialQuery(); // Run the command on the shards // TODO need to make sure cursors are killed if a retry is needed vector<Strategy::CommandResult> shardResults; - STRATEGY->commandOp(dbName, shardedCommand, options, fullns, shardQuery, &shardResults); + STRATEGY->commandOp(dbName, shardedCommand, options, fullns, firstMatchQuery, &shardResults); if (pPipeline->isExplain()) { // This must be checked before we start modifying result. uassertAllShardsSupportExplain(shardResults); - result << "splitPipeline" - << DOC("shardsPart" << pShardPipeline->writeExplainOps() << "mergerPart" - << pPipeline->writeExplainOps()); + if (needSplit) { + result << "splitPipeline" + << DOC("shardsPart" << pShardPipeline->writeExplainOps() << "mergerPart" + << pPipeline->writeExplainOps()); + } else { + result << "splitPipeline" << BSONNULL; + } BSONObjBuilder shardExplains(result.subobjStart("shards")); for (size_t i = 0; i < shardResults.size(); i++) { @@ -2572,6 +2591,15 @@ bool PipelineCommand::run(OperationContext* txn, return true; } + if (!needSplit) { + invariant(shardResults.size() == 1); + invariant(shardResults[0].target.getServers().size() == 1); + const BSONObj reply = shardResults[0].result; + storePossibleCursor(shardResults[0].target.toString(), reply); + result.appendElements(reply); + return reply["ok"].trueValue(); + } + if (doAnyShardsNotSupportCursors(shardResults)) { killAllCursors(shardResults); noCursorFallback(pShardPipeline, pPipeline, dbName, fullns, options, cmdObj, result); |