summaryrefslogtreecommitdiff
path: root/src/mongo/s/commands_public.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/commands_public.cpp')
-rw-r--r--src/mongo/s/commands_public.cpp54
1 files changed, 41 insertions, 13 deletions
diff --git a/src/mongo/s/commands_public.cpp b/src/mongo/s/commands_public.cpp
index f2d3789c2a6..7a5a8281717 100644
--- a/src/mongo/s/commands_public.cpp
+++ b/src/mongo/s/commands_public.cpp
@@ -2525,20 +2525,36 @@ bool PipelineCommand::run(OperationContext* txn,
return aggPassthrough(conf, cmdObj, result, options);
/* split the pipeline into pieces for mongods and this mongos */
- intrusive_ptr<Pipeline> pShardPipeline(pPipeline->splitForSharded());
+ BSONObj firstMatchQuery = pPipeline->getInitialQuery();
+ ChunkManagerPtr chunkMgr = conf->getChunkManager(fullns);
+ StatusWith<BSONObj> swShardKeyMatches =
+ chunkMgr->getShardKeyPattern().extractShardKeyFromQuery(firstMatchQuery);
+ uassertStatusOK(swShardKeyMatches.getStatus());
+ const BSONObj& shardKeyMatches = swShardKeyMatches.getValue();
+
+ // Don't need to split pipeline if the first $match is an exact match on shard key, unless
+ // there is a stage that needs to be run on the primary shard.
+ const bool needSplit = shardKeyMatches.isEmpty() || pPipeline->hasOutStage();
+ intrusive_ptr<Pipeline> pShardPipeline(needSplit ? pPipeline->splitForSharded() : pPipeline);
// create the command for the shards
MutableDocument commandBuilder(pShardPipeline->serialize());
- commandBuilder.setField("fromRouter", Value(true)); // this means produce output to be merged
+ if (needSplit) {
+ // this means produce output to be merged
+ commandBuilder.setField("fromRouter", Value(true));
- if (cmdObj.hasField("$queryOptions")) {
- commandBuilder.setField("$queryOptions", Value(cmdObj["$queryOptions"]));
+ if (!pPipeline->isExplain()) {
+ // "cursor" is ignored by 2.6 shards when doing explain, but including it leads to a
+ // worse error message when talking to 2.4 shards.
+ commandBuilder.setField("cursor", Value(DOC("batchSize" << 0)));
+ }
+ } else {
+ // If we don't need to split, just send the command directly to the shard, as is.
+ commandBuilder.setField("cursor", Value(cmdObj["cursor"]));
}
- if (!pPipeline->isExplain()) {
- // "cursor" is ignored by 2.6 shards when doing explain, but including it leads to a
- // worse error message when talking to 2.4 shards.
- commandBuilder.setField("cursor", Value(DOC("batchSize" << 0)));
+ if (cmdObj.hasField("$queryOptions")) {
+ commandBuilder.setField("$queryOptions", Value(cmdObj["$queryOptions"]));
}
if (cmdObj.hasField(LiteParsedQuery::cmdOptionMaxTimeMS)) {
@@ -2547,20 +2563,23 @@ bool PipelineCommand::run(OperationContext* txn,
}
BSONObj shardedCommand = commandBuilder.freeze().toBson();
- BSONObj shardQuery = pShardPipeline->getInitialQuery();
// Run the command on the shards
// TODO need to make sure cursors are killed if a retry is needed
vector<Strategy::CommandResult> shardResults;
- STRATEGY->commandOp(dbName, shardedCommand, options, fullns, shardQuery, &shardResults);
+ STRATEGY->commandOp(dbName, shardedCommand, options, fullns, firstMatchQuery, &shardResults);
if (pPipeline->isExplain()) {
// This must be checked before we start modifying result.
uassertAllShardsSupportExplain(shardResults);
- result << "splitPipeline"
- << DOC("shardsPart" << pShardPipeline->writeExplainOps() << "mergerPart"
- << pPipeline->writeExplainOps());
+ if (needSplit) {
+ result << "splitPipeline"
+ << DOC("shardsPart" << pShardPipeline->writeExplainOps() << "mergerPart"
+ << pPipeline->writeExplainOps());
+ } else {
+ result << "splitPipeline" << BSONNULL;
+ }
BSONObjBuilder shardExplains(result.subobjStart("shards"));
for (size_t i = 0; i < shardResults.size(); i++) {
@@ -2572,6 +2591,15 @@ bool PipelineCommand::run(OperationContext* txn,
return true;
}
+ if (!needSplit) {
+ invariant(shardResults.size() == 1);
+ invariant(shardResults[0].target.getServers().size() == 1);
+ const BSONObj reply = shardResults[0].result;
+ storePossibleCursor(shardResults[0].target.toString(), reply);
+ result.appendElements(reply);
+ return reply["ok"].trueValue();
+ }
+
if (doAnyShardsNotSupportCursors(shardResults)) {
killAllCursors(shardResults);
noCursorFallback(pShardPipeline, pPipeline, dbName, fullns, options, cmdObj, result);