From 3eeeaec1d1fbb41d1914e82deca2cac4fccdddc2 Mon Sep 17 00:00:00 2001 From: Charlie Swanson Date: Mon, 21 Dec 2015 13:46:17 -0500 Subject: SERVER-7656 Execute aggregation command on targeted shard if first stage is an exact match on shard key --- jstests/aggregation/testshard1.js | 39 ++++++++++++++++++++++++++- src/mongo/db/pipeline/pipeline.cpp | 14 ++++++++-- src/mongo/db/pipeline/pipeline.h | 5 ++++ src/mongo/s/commands_public.cpp | 54 +++++++++++++++++++++++++++++--------- 4 files changed, 96 insertions(+), 16 deletions(-) diff --git a/jstests/aggregation/testshard1.js b/jstests/aggregation/testshard1.js index 58e6fdbbfb4..6b08e7b39a3 100644 --- a/jstests/aggregation/testshard1.js +++ b/jstests/aggregation/testshard1.js @@ -77,7 +77,8 @@ var nItems = 200000; var bulk = db.ts1.initializeUnorderedBulkOp(); for(i = 1; i <= nItems; ++i) { bulk.insert( - {counter: ++count, number: strings[i % 20], random: Math.random(), + {_id: i, + counter: ++count, number: strings[i % 20], random: Math.random(), filler: "0123456789012345678901234567890123456789"}); } assert.writeOK(bulk.execute()); @@ -232,6 +233,42 @@ for (var shardName in res.shards) { assert("stages" in res.shards[shardName]); } +(function() { + jsTestLog('Testing a $match stage on the shard key.'); + + var outCollection = 'testShardKeyMatchOut'; + + // Point query. + var targetId = Math.floor(nItems * Math.random()); + var pipeline = [{$match: {_id: targetId}}, {$project: {_id: 1}}, {$sort: {_id: 1}}]; + var expectedDocs = [{_id: targetId}]; + // Normal pipeline. + assert.eq(aggregateOrdered(db.ts1, pipeline), expectedDocs); + // With $out. + db[outCollection].drop(); + pipeline.push({$out: outCollection}); + db.ts1.aggregate(pipeline); + assert.eq(db[outCollection].find().toArray(), expectedDocs); + + // Range query. + var range = 500; + var targetStart = Math.floor((nItems - range) * Math.random()); + pipeline = [{$match: {_id: {$gte: targetStart, $lt: targetStart + range}}}, + {$project: {_id: 1}}, + {$sort: {_id: 1}}]; + expectedDocs = []; + for (var i = targetStart; i < targetStart + range; i++) { + expectedDocs.push({_id: i}); + } + // Normal pipeline. + assert.eq(aggregateOrdered(db.ts1, pipeline), expectedDocs); + // With $out. + db[outCollection].drop(); + pipeline.push({$out: outCollection}); + db.ts1.aggregate(pipeline); + assert.eq(db[outCollection].find().toArray(), expectedDocs); +}()); + // Call sub-tests designed to work sharded and unsharded. // They check for this variable to know to shard their collections. RUNNING_IN_SHARDED_AGG_TEST = true; // global diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index a02389556f2..fa3727e94d3 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -61,7 +61,6 @@ const char Pipeline::mongosPipelineName[] = "mongosPipeline"; Pipeline::Pipeline(const intrusive_ptr& pTheCtx) : explain(false), pCtx(pTheCtx) {} - /* this structure is used to make a lookup table of operators */ struct StageDesc { const char* pName; @@ -417,6 +416,17 @@ void Pipeline::Optimizations::Sharded::limitFieldsSentFromShardsToMerger(Pipelin BSON("$project" << mergeDeps.toProjection()).firstElement(), shardPipe->pCtx)); } +bool Pipeline::hasOutStage() const { + if (sources.empty()) { + return false; + } + + // The $out stage must be the last one in the pipeline, so check if the last + // stage is $out. + return dynamic_cast(sources.back().get()); +} + + BSONObj Pipeline::getInitialQuery() const { if (sources.empty()) return BSONObj(); @@ -512,7 +522,7 @@ bool Pipeline::canRunInMongos() const { if (explain) return false; - if (!sources.empty() && dynamic_cast(sources.back().get())) + if (hasOutStage()) return false; return true; diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h index c682d49cc62..0f9e93984dd 100644 --- a/src/mongo/db/pipeline/pipeline.h +++ b/src/mongo/db/pipeline/pipeline.h @@ -88,6 +88,11 @@ public: */ BSONObj getInitialQuery() const; + /** + * Returns true if the pipeline has an $out stage, and false otherwise. + */ + bool hasOutStage() const; + /** Write the Pipeline as a BSONObj command. This should be the inverse of parseCommand(). diff --git a/src/mongo/s/commands_public.cpp b/src/mongo/s/commands_public.cpp index f2d3789c2a6..7a5a8281717 100644 --- a/src/mongo/s/commands_public.cpp +++ b/src/mongo/s/commands_public.cpp @@ -2525,20 +2525,36 @@ bool PipelineCommand::run(OperationContext* txn, return aggPassthrough(conf, cmdObj, result, options); /* split the pipeline into pieces for mongods and this mongos */ - intrusive_ptr pShardPipeline(pPipeline->splitForSharded()); + BSONObj firstMatchQuery = pPipeline->getInitialQuery(); + ChunkManagerPtr chunkMgr = conf->getChunkManager(fullns); + StatusWith swShardKeyMatches = + chunkMgr->getShardKeyPattern().extractShardKeyFromQuery(firstMatchQuery); + uassertStatusOK(swShardKeyMatches.getStatus()); + const BSONObj& shardKeyMatches = swShardKeyMatches.getValue(); + + // Don't need to split pipeline if the first $match is an exact match on shard key, unless + // there is a stage that needs to be run on the primary shard. + const bool needSplit = shardKeyMatches.isEmpty() || pPipeline->hasOutStage(); + intrusive_ptr pShardPipeline(needSplit ? pPipeline->splitForSharded() : pPipeline); // create the command for the shards MutableDocument commandBuilder(pShardPipeline->serialize()); - commandBuilder.setField("fromRouter", Value(true)); // this means produce output to be merged + if (needSplit) { + // this means produce output to be merged + commandBuilder.setField("fromRouter", Value(true)); - if (cmdObj.hasField("$queryOptions")) { - commandBuilder.setField("$queryOptions", Value(cmdObj["$queryOptions"])); + if (!pPipeline->isExplain()) { + // "cursor" is ignored by 2.6 shards when doing explain, but including it leads to a + // worse error message when talking to 2.4 shards. + commandBuilder.setField("cursor", Value(DOC("batchSize" << 0))); + } + } else { + // If we don't need to split, just send the command directly to the shard, as is. + commandBuilder.setField("cursor", Value(cmdObj["cursor"])); } - if (!pPipeline->isExplain()) { - // "cursor" is ignored by 2.6 shards when doing explain, but including it leads to a - // worse error message when talking to 2.4 shards. - commandBuilder.setField("cursor", Value(DOC("batchSize" << 0))); + if (cmdObj.hasField("$queryOptions")) { + commandBuilder.setField("$queryOptions", Value(cmdObj["$queryOptions"])); } if (cmdObj.hasField(LiteParsedQuery::cmdOptionMaxTimeMS)) { @@ -2547,20 +2563,23 @@ bool PipelineCommand::run(OperationContext* txn, } BSONObj shardedCommand = commandBuilder.freeze().toBson(); - BSONObj shardQuery = pShardPipeline->getInitialQuery(); // Run the command on the shards // TODO need to make sure cursors are killed if a retry is needed vector shardResults; - STRATEGY->commandOp(dbName, shardedCommand, options, fullns, shardQuery, &shardResults); + STRATEGY->commandOp(dbName, shardedCommand, options, fullns, firstMatchQuery, &shardResults); if (pPipeline->isExplain()) { // This must be checked before we start modifying result. uassertAllShardsSupportExplain(shardResults); - result << "splitPipeline" - << DOC("shardsPart" << pShardPipeline->writeExplainOps() << "mergerPart" - << pPipeline->writeExplainOps()); + if (needSplit) { + result << "splitPipeline" + << DOC("shardsPart" << pShardPipeline->writeExplainOps() << "mergerPart" + << pPipeline->writeExplainOps()); + } else { + result << "splitPipeline" << BSONNULL; + } BSONObjBuilder shardExplains(result.subobjStart("shards")); for (size_t i = 0; i < shardResults.size(); i++) { @@ -2572,6 +2591,15 @@ bool PipelineCommand::run(OperationContext* txn, return true; } + if (!needSplit) { + invariant(shardResults.size() == 1); + invariant(shardResults[0].target.getServers().size() == 1); + const BSONObj reply = shardResults[0].result; + storePossibleCursor(shardResults[0].target.toString(), reply); + result.appendElements(reply); + return reply["ok"].trueValue(); + } + if (doAnyShardsNotSupportCursors(shardResults)) { killAllCursors(shardResults); noCursorFallback(pShardPipeline, pPipeline, dbName, fullns, options, cmdObj, result); -- cgit v1.2.1