summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2015-12-21 13:46:17 -0500
committerCharlie Swanson <charlie.swanson@mongodb.com>2016-01-13 12:04:23 -0500
commit3eeeaec1d1fbb41d1914e82deca2cac4fccdddc2 (patch)
treefa1fa46a7b9c37a0dd4db622a8311b0cf48c4594
parentf8f5dab11c2ed931add8d34e5e5dcc666e1ed5c8 (diff)
downloadmongo-3eeeaec1d1fbb41d1914e82deca2cac4fccdddc2.tar.gz
SERVER-7656 Execute aggregation command on targeted shard if first stage is an exact match on shard key
-rw-r--r--jstests/aggregation/testshard1.js39
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp14
-rw-r--r--src/mongo/db/pipeline/pipeline.h5
-rw-r--r--src/mongo/s/commands_public.cpp54
4 files changed, 96 insertions, 16 deletions
diff --git a/jstests/aggregation/testshard1.js b/jstests/aggregation/testshard1.js
index 58e6fdbbfb4..6b08e7b39a3 100644
--- a/jstests/aggregation/testshard1.js
+++ b/jstests/aggregation/testshard1.js
@@ -77,7 +77,8 @@ var nItems = 200000;
var bulk = db.ts1.initializeUnorderedBulkOp();
for(i = 1; i <= nItems; ++i) {
bulk.insert(
- {counter: ++count, number: strings[i % 20], random: Math.random(),
+ {_id: i,
+ counter: ++count, number: strings[i % 20], random: Math.random(),
filler: "0123456789012345678901234567890123456789"});
}
assert.writeOK(bulk.execute());
@@ -232,6 +233,42 @@ for (var shardName in res.shards) {
assert("stages" in res.shards[shardName]);
}
+(function() {
+ jsTestLog('Testing a $match stage on the shard key.');
+
+ var outCollection = 'testShardKeyMatchOut';
+
+ // Point query.
+ var targetId = Math.floor(nItems * Math.random());
+ var pipeline = [{$match: {_id: targetId}}, {$project: {_id: 1}}, {$sort: {_id: 1}}];
+ var expectedDocs = [{_id: targetId}];
+ // Normal pipeline.
+ assert.eq(aggregateOrdered(db.ts1, pipeline), expectedDocs);
+ // With $out.
+ db[outCollection].drop();
+ pipeline.push({$out: outCollection});
+ db.ts1.aggregate(pipeline);
+ assert.eq(db[outCollection].find().toArray(), expectedDocs);
+
+ // Range query.
+ var range = 500;
+ var targetStart = Math.floor((nItems - range) * Math.random());
+ pipeline = [{$match: {_id: {$gte: targetStart, $lt: targetStart + range}}},
+ {$project: {_id: 1}},
+ {$sort: {_id: 1}}];
+ expectedDocs = [];
+ for (var i = targetStart; i < targetStart + range; i++) {
+ expectedDocs.push({_id: i});
+ }
+ // Normal pipeline.
+ assert.eq(aggregateOrdered(db.ts1, pipeline), expectedDocs);
+ // With $out.
+ db[outCollection].drop();
+ pipeline.push({$out: outCollection});
+ db.ts1.aggregate(pipeline);
+ assert.eq(db[outCollection].find().toArray(), expectedDocs);
+}());
+
// Call sub-tests designed to work sharded and unsharded.
// They check for this variable to know to shard their collections.
RUNNING_IN_SHARDED_AGG_TEST = true; // global
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index a02389556f2..fa3727e94d3 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -61,7 +61,6 @@ const char Pipeline::mongosPipelineName[] = "mongosPipeline";
Pipeline::Pipeline(const intrusive_ptr<ExpressionContext>& pTheCtx)
: explain(false), pCtx(pTheCtx) {}
-
/* this structure is used to make a lookup table of operators */
struct StageDesc {
const char* pName;
@@ -417,6 +416,17 @@ void Pipeline::Optimizations::Sharded::limitFieldsSentFromShardsToMerger(Pipelin
BSON("$project" << mergeDeps.toProjection()).firstElement(), shardPipe->pCtx));
}
+bool Pipeline::hasOutStage() const {
+ if (sources.empty()) {
+ return false;
+ }
+
+ // The $out stage must be the last one in the pipeline, so check if the last
+ // stage is $out.
+ return dynamic_cast<DocumentSourceOut*>(sources.back().get());
+}
+
+
BSONObj Pipeline::getInitialQuery() const {
if (sources.empty())
return BSONObj();
@@ -512,7 +522,7 @@ bool Pipeline::canRunInMongos() const {
if (explain)
return false;
- if (!sources.empty() && dynamic_cast<DocumentSourceNeedsMongod*>(sources.back().get()))
+ if (hasOutStage())
return false;
return true;
diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h
index c682d49cc62..0f9e93984dd 100644
--- a/src/mongo/db/pipeline/pipeline.h
+++ b/src/mongo/db/pipeline/pipeline.h
@@ -89,6 +89,11 @@ public:
BSONObj getInitialQuery() const;
/**
+ * Returns true if the pipeline has an $out stage, and false otherwise.
+ */
+ bool hasOutStage() const;
+
+ /**
Write the Pipeline as a BSONObj command. This should be the
inverse of parseCommand().
diff --git a/src/mongo/s/commands_public.cpp b/src/mongo/s/commands_public.cpp
index f2d3789c2a6..7a5a8281717 100644
--- a/src/mongo/s/commands_public.cpp
+++ b/src/mongo/s/commands_public.cpp
@@ -2525,20 +2525,36 @@ bool PipelineCommand::run(OperationContext* txn,
return aggPassthrough(conf, cmdObj, result, options);
/* split the pipeline into pieces for mongods and this mongos */
- intrusive_ptr<Pipeline> pShardPipeline(pPipeline->splitForSharded());
+ BSONObj firstMatchQuery = pPipeline->getInitialQuery();
+ ChunkManagerPtr chunkMgr = conf->getChunkManager(fullns);
+ StatusWith<BSONObj> swShardKeyMatches =
+ chunkMgr->getShardKeyPattern().extractShardKeyFromQuery(firstMatchQuery);
+ uassertStatusOK(swShardKeyMatches.getStatus());
+ const BSONObj& shardKeyMatches = swShardKeyMatches.getValue();
+
+ // Don't need to split pipeline if the first $match is an exact match on shard key, unless
+ // there is a stage that needs to be run on the primary shard.
+ const bool needSplit = shardKeyMatches.isEmpty() || pPipeline->hasOutStage();
+ intrusive_ptr<Pipeline> pShardPipeline(needSplit ? pPipeline->splitForSharded() : pPipeline);
// create the command for the shards
MutableDocument commandBuilder(pShardPipeline->serialize());
- commandBuilder.setField("fromRouter", Value(true)); // this means produce output to be merged
+ if (needSplit) {
+ // this means produce output to be merged
+ commandBuilder.setField("fromRouter", Value(true));
- if (cmdObj.hasField("$queryOptions")) {
- commandBuilder.setField("$queryOptions", Value(cmdObj["$queryOptions"]));
+ if (!pPipeline->isExplain()) {
+ // "cursor" is ignored by 2.6 shards when doing explain, but including it leads to a
+ // worse error message when talking to 2.4 shards.
+ commandBuilder.setField("cursor", Value(DOC("batchSize" << 0)));
+ }
+ } else {
+ // If we don't need to split, just send the command directly to the shard, as is.
+ commandBuilder.setField("cursor", Value(cmdObj["cursor"]));
}
- if (!pPipeline->isExplain()) {
- // "cursor" is ignored by 2.6 shards when doing explain, but including it leads to a
- // worse error message when talking to 2.4 shards.
- commandBuilder.setField("cursor", Value(DOC("batchSize" << 0)));
+ if (cmdObj.hasField("$queryOptions")) {
+ commandBuilder.setField("$queryOptions", Value(cmdObj["$queryOptions"]));
}
if (cmdObj.hasField(LiteParsedQuery::cmdOptionMaxTimeMS)) {
@@ -2547,20 +2563,23 @@ bool PipelineCommand::run(OperationContext* txn,
}
BSONObj shardedCommand = commandBuilder.freeze().toBson();
- BSONObj shardQuery = pShardPipeline->getInitialQuery();
// Run the command on the shards
// TODO need to make sure cursors are killed if a retry is needed
vector<Strategy::CommandResult> shardResults;
- STRATEGY->commandOp(dbName, shardedCommand, options, fullns, shardQuery, &shardResults);
+ STRATEGY->commandOp(dbName, shardedCommand, options, fullns, firstMatchQuery, &shardResults);
if (pPipeline->isExplain()) {
// This must be checked before we start modifying result.
uassertAllShardsSupportExplain(shardResults);
- result << "splitPipeline"
- << DOC("shardsPart" << pShardPipeline->writeExplainOps() << "mergerPart"
- << pPipeline->writeExplainOps());
+ if (needSplit) {
+ result << "splitPipeline"
+ << DOC("shardsPart" << pShardPipeline->writeExplainOps() << "mergerPart"
+ << pPipeline->writeExplainOps());
+ } else {
+ result << "splitPipeline" << BSONNULL;
+ }
BSONObjBuilder shardExplains(result.subobjStart("shards"));
for (size_t i = 0; i < shardResults.size(); i++) {
@@ -2572,6 +2591,15 @@ bool PipelineCommand::run(OperationContext* txn,
return true;
}
+ if (!needSplit) {
+ invariant(shardResults.size() == 1);
+ invariant(shardResults[0].target.getServers().size() == 1);
+ const BSONObj reply = shardResults[0].result;
+ storePossibleCursor(shardResults[0].target.toString(), reply);
+ result.appendElements(reply);
+ return reply["ok"].trueValue();
+ }
+
if (doAnyShardsNotSupportCursors(shardResults)) {
killAllCursors(shardResults);
noCursorFallback(pShardPipeline, pPipeline, dbName, fullns, options, cmdObj, result);