summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2017-08-30 00:11:28 +0100
committerBernard Gorman <bernard.gorman@gmail.com>2017-09-01 03:56:23 +0100
commitd34c7cf640a6e12b4f1abe86ef3c96d1216f0654 (patch)
treebeb83b01ddf04cc33d42e71f65e64314ce7d0e3f
parentf1530d925c530004d107c56a8dd012d26304038c (diff)
downloadmongo-d34c7cf640a6e12b4f1abe86ef3c96d1216f0654.tar.gz
SERVER-30412 Ensure that aggregation splitpoints are not shared between shard and merge pipelines on mongoS
-rw-r--r--jstests/aggregation/mongos_merge.js52
-rw-r--r--src/mongo/db/pipeline/document_source.h8
-rw-r--r--src/mongo/db/pipeline/document_source_internal_split_pipeline.h13
-rw-r--r--src/mongo/db/pipeline/document_source_limit.h15
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp6
-rw-r--r--src/mongo/db/pipeline/pipeline.h6
6 files changed, 68 insertions, 32 deletions
diff --git a/jstests/aggregation/mongos_merge.js b/jstests/aggregation/mongos_merge.js
index ea00739d739..fe13b337930 100644
--- a/jstests/aggregation/mongos_merge.js
+++ b/jstests/aggregation/mongos_merge.js
@@ -76,7 +76,7 @@
* - The number of documents returned by the aggregation matches 'expectedCount'.
* - The merge was performed on a mongoS if 'mergeType' is 'mongos', and on a shard otherwise.
*/
- function assertMergeBehaviour({testName, pipeline, mergeType, expectedCount}) {
+ function assertMergeBehaviour({testName, pipeline, mergeType, batchSize, expectedCount}) {
// Verify that the 'mergeOnMongoS' explain() output for this pipeline matches our
// expectation.
assert.eq(
@@ -84,7 +84,11 @@
.mergeType,
mergeType);
- assert.eq(mongosColl.aggregate(pipeline, {comment: testName}).itcount(), expectedCount);
+ assert.eq(
+ mongosColl
+ .aggregate(pipeline, {comment: testName, cursor: {batchSize: (batchSize || 101)}})
+ .itcount(),
+ expectedCount);
// Verify that a $mergeCursors aggregation ran on the primary shard if 'mergeType' is not
// 'mongos', and that no such aggregation ran otherwise.
@@ -103,11 +107,12 @@
* Throws an assertion if the aggregation specified by 'pipeline' does not produce
* 'expectedCount' results, or if the merge phase is not performed on the mongoS.
*/
- function assertMergeOnMongoS({testName, pipeline, expectedCount}) {
+ function assertMergeOnMongoS({testName, pipeline, batchSize, expectedCount}) {
assertMergeBehaviour({
testName: testName,
pipeline: pipeline,
mergeType: "mongos",
+ batchSize: (batchSize || 101),
expectedCount: expectedCount
});
}
@@ -116,11 +121,12 @@
* Throws an assertion if the aggregation specified by 'pipeline' does not produce
* 'expectedCount' results, or if the merge phase was not performed on a shard.
*/
- function assertMergeOnMongoD({testName, pipeline, mergeType, expectedCount}) {
+ function assertMergeOnMongoD({testName, pipeline, mergeType, batchSize, expectedCount}) {
assertMergeBehaviour({
testName: testName,
pipeline: pipeline,
mergeType: (mergeType || "anyShard"),
+ batchSize: (batchSize || 101),
expectedCount: expectedCount
});
}
@@ -135,6 +141,7 @@
assertMergeOnMongoS({
testName: "agg_mongos_merge_match_only",
pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}],
+ batchSize: 10,
expectedCount: 400
});
@@ -142,6 +149,7 @@
assertMergeOnMongoS({
testName: "agg_mongos_merge_sort_presorted",
pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$sort: {_id: -1}}],
+ batchSize: 10,
expectedCount: 400
});
@@ -149,6 +157,7 @@
assertMergeOnMongoD({
testName: "agg_mongos_merge_sort_in_mem",
pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$sort: {_id: -1}}, {$sort: {a: 1}}],
+ batchSize: 10,
expectedCount: 400
});
@@ -160,41 +169,45 @@
{$_internalSplitPipeline: {mergeType: "primaryShard"}}
],
mergeType: "primaryShard",
+ batchSize: 10,
expectedCount: 400
});
// Test that $skip is merged on mongoS.
assertMergeOnMongoS({
testName: "agg_mongos_merge_skip",
- pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$sort: {_id: -1}}, {$skip: 100}],
- expectedCount: 300
+ pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$sort: {_id: -1}}, {$skip: 300}],
+ batchSize: 10,
+ expectedCount: 100
});
// Test that $limit is merged on mongoS.
assertMergeOnMongoS({
testName: "agg_mongos_merge_limit",
- pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$limit: 50}],
- expectedCount: 50
+ pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$limit: 300}],
+ batchSize: 10,
+ expectedCount: 300
});
// Test that $sample is merged on mongoS.
assertMergeOnMongoS({
testName: "agg_mongos_merge_sample",
- pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$sample: {size: 50}}],
- expectedCount: 50
+ pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$sample: {size: 300}}],
+ batchSize: 10,
+ expectedCount: 300
});
// Test that merge pipelines containing all mongos-runnable stages produce the expected output.
assertMergeOnMongoS({
testName: "agg_mongos_merge_all_mongos_runnable_stages",
pipeline: [
- {$match: {_id: {$gte: -5, $lte: 100}}},
- {$sort: {_id: -1}},
- {$skip: 95},
- {$limit: 10},
+ {$match: {_id: {$gte: -200, $lte: 200}}},
+ {$sort: {_id: 1}},
+ {$skip: 150},
+ {$limit: 150},
{$addFields: {d: true}},
{$unwind: "$a"},
- {$sample: {size: 5}},
+ {$sample: {size: 100}},
{$project: {c: 0}},
{
$redact: {
@@ -204,14 +217,17 @@
},
{
$match: {
- _id: {$gte: -4, $lte: 5},
- a: {$gte: -4, $lte: 5},
+ _id: {$gte: -50, $lte: 100},
+ a: {$gte: -50, $lte: 100},
b: {$exists: false},
c: {$exists: false},
d: true
}
}
],
- expectedCount: 5
+ batchSize: 10,
+ expectedCount: 100
});
+
+ st.stop();
})(); \ No newline at end of file
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index 8f1c2fef4a0..134a8a86e3d 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -529,14 +529,18 @@ public:
/**
* Returns a source to be run on the shards, or NULL if no work should be done on the shards for
* this stage. Must not mutate the existing source object; if different behaviour is required in
- * the split-pipeline case, a new source should be created and configured appropriately.
+ * the split-pipeline case, a new source should be created and configured appropriately. It is
+ * an error for getShardSource() to return a pointer to the same object as getMergeSource(),
+ * since this can result in the source being stitched into both the shard and merge pipelines
+ * when the latter is executed on mongoS.
*/
virtual boost::intrusive_ptr<DocumentSource> getShardSource() = 0;
/**
* Returns a source that combines results from the shards, or NULL if no work should be done in
* the merge pipeline for this stage. Must not mutate the existing source object; if different
- * behaviour is required, a new source should be created and configured appropriately.
+ * behaviour is required, a new source should be created and configured appropriately. It is an
+ * error for getMergeSource() to return a pointer to the same object as getShardSource().
*/
virtual boost::intrusive_ptr<DocumentSource> getMergeSource() = 0;
diff --git a/src/mongo/db/pipeline/document_source_internal_split_pipeline.h b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h
index c18a6d301a6..f9ac84c555f 100644
--- a/src/mongo/db/pipeline/document_source_internal_split_pipeline.h
+++ b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h
@@ -49,9 +49,10 @@ public:
static boost::intrusive_ptr<DocumentSource> createFromBson(
BSONElement, const boost::intrusive_ptr<ExpressionContext>&);
- DocumentSourceInternalSplitPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- HostTypeRequirement mergeType)
- : DocumentSource(expCtx), _mergeType(mergeType) {}
+ static boost::intrusive_ptr<DocumentSource> create(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, HostTypeRequirement mergeType) {
+ return new DocumentSourceInternalSplitPipeline(expCtx, mergeType);
+ }
const char* getSourceName() const final {
return kStageName.rawData();
@@ -62,7 +63,7 @@ public:
}
boost::intrusive_ptr<DocumentSource> getMergeSource() final {
- return this;
+ return DocumentSourceInternalSplitPipeline::create(pExpCtx, _mergeType);
}
StageConstraints constraints() const final {
@@ -74,6 +75,10 @@ public:
GetNextResult getNext() final;
private:
+ DocumentSourceInternalSplitPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ HostTypeRequirement mergeType)
+ : DocumentSource(expCtx), _mergeType(mergeType) {}
+
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
HostTypeRequirement _mergeType = HostTypeRequirement::kAnyShard;
};
diff --git a/src/mongo/db/pipeline/document_source_limit.h b/src/mongo/db/pipeline/document_source_limit.h
index 88acfa8b45a..f5f93fff1a4 100644
--- a/src/mongo/db/pipeline/document_source_limit.h
+++ b/src/mongo/db/pipeline/document_source_limit.h
@@ -68,13 +68,22 @@ public:
static boost::intrusive_ptr<DocumentSourceLimit> create(
const boost::intrusive_ptr<ExpressionContext>& pExpCtx, long long limit);
- // Virtuals for SplittableDocumentSource
- // Need to run on rounter. Running on shard as well is an optimization.
+ /**
+ * Returns the current DocumentSourceLimit for use in the shards pipeline. Running this stage on
+ * the shards is an optimization, but is not strictly necessary in order to produce correct
+ * pipeline output.
+ */
boost::intrusive_ptr<DocumentSource> getShardSource() final {
return this;
}
+
+ /**
+ * Returns a new DocumentSourceLimit with the same limit as the current stage, for use in the
+ * merge pipeline. Unlike the shards source, it is necessary for this stage to run on the
+ * merging host in order to produce correct pipeline output.
+ */
boost::intrusive_ptr<DocumentSource> getMergeSource() final {
- return this;
+ return DocumentSourceLimit::create(pExpCtx, _limit);
}
long long getLimit() const {
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index 5c2ce92237f..89975fc4cf2 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -272,8 +272,7 @@ void Pipeline::dispose(OperationContext* opCtx) {
pCtx->opCtx = opCtx;
// Make sure all stages are connected, in case we are being disposed via an error path and
- // were
- // not stitched at the time of the error.
+ // were not stitched at the time of the error.
stitch();
if (!_sources.empty()) {
@@ -332,6 +331,7 @@ void Pipeline::unsplitFromSharded(
_unsplitSources.reset();
_splitForSharded = false;
+
stitch();
}
@@ -351,6 +351,8 @@ void Pipeline::Optimizations::Sharded::findSplitPoint(Pipeline* shardPipe, Pipel
// split this source into Merge and Shard _sources
intrusive_ptr<DocumentSource> shardSource = splittable->getShardSource();
intrusive_ptr<DocumentSource> mergeSource = splittable->getMergeSource();
+ invariant(shardSource != mergeSource);
+
if (shardSource)
shardPipe->_sources.push_back(shardSource);
if (mergeSource)
diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h
index 6f46bf75476..35b18954fdd 100644
--- a/src/mongo/db/pipeline/pipeline.h
+++ b/src/mongo/db/pipeline/pipeline.h
@@ -196,7 +196,7 @@ public:
* Returns true if this pipeline is the part of a split pipeline which should be targeted to the
* shards.
*/
- bool isSplitForSharded() {
+ bool isSplitForSharded() const {
return _splitForSharded;
}
@@ -204,7 +204,7 @@ public:
* Returns true if this pipeline is the part of a split pipeline which is responsible for
* merging the results from the shards.
*/
- bool isSplitForMerge() {
+ bool isSplitForMerge() const {
return _splitForMerge;
}
@@ -259,7 +259,7 @@ public:
*/
DepsTracker getDependencies(DepsTracker::MetadataAvailable metadataAvailable) const;
- const SourceContainer& getSources() {
+ const SourceContainer& getSources() const {
return _sources;
}