diff options
-rw-r--r-- | jstests/aggregation/mongos_merge.js | 52 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source.h | 8 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_internal_split_pipeline.h | 13 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_limit.h | 15 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline.h | 6 |
6 files changed, 68 insertions, 32 deletions
diff --git a/jstests/aggregation/mongos_merge.js b/jstests/aggregation/mongos_merge.js index ea00739d739..fe13b337930 100644 --- a/jstests/aggregation/mongos_merge.js +++ b/jstests/aggregation/mongos_merge.js @@ -76,7 +76,7 @@ * - The number of documents returned by the aggregation matches 'expectedCount'. * - The merge was performed on a mongoS if 'mergeType' is 'mongos', and on a shard otherwise. */ - function assertMergeBehaviour({testName, pipeline, mergeType, expectedCount}) { + function assertMergeBehaviour({testName, pipeline, mergeType, batchSize, expectedCount}) { // Verify that the 'mergeOnMongoS' explain() output for this pipeline matches our // expectation. assert.eq( @@ -84,7 +84,11 @@ .mergeType, mergeType); - assert.eq(mongosColl.aggregate(pipeline, {comment: testName}).itcount(), expectedCount); + assert.eq( + mongosColl + .aggregate(pipeline, {comment: testName, cursor: {batchSize: (batchSize || 101)}}) + .itcount(), + expectedCount); // Verify that a $mergeCursors aggregation ran on the primary shard if 'mergeType' is not // 'mongos', and that no such aggregation ran otherwise. @@ -103,11 +107,12 @@ * Throws an assertion if the aggregation specified by 'pipeline' does not produce * 'expectedCount' results, or if the merge phase is not performed on the mongoS. */ - function assertMergeOnMongoS({testName, pipeline, expectedCount}) { + function assertMergeOnMongoS({testName, pipeline, batchSize, expectedCount}) { assertMergeBehaviour({ testName: testName, pipeline: pipeline, mergeType: "mongos", + batchSize: (batchSize || 101), expectedCount: expectedCount }); } @@ -116,11 +121,12 @@ * Throws an assertion if the aggregation specified by 'pipeline' does not produce * 'expectedCount' results, or if the merge phase was not performed on a shard. */ - function assertMergeOnMongoD({testName, pipeline, mergeType, expectedCount}) { + function assertMergeOnMongoD({testName, pipeline, mergeType, batchSize, expectedCount}) { assertMergeBehaviour({ testName: testName, pipeline: pipeline, mergeType: (mergeType || "anyShard"), + batchSize: (batchSize || 101), expectedCount: expectedCount }); } @@ -135,6 +141,7 @@ assertMergeOnMongoS({ testName: "agg_mongos_merge_match_only", pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}], + batchSize: 10, expectedCount: 400 }); @@ -142,6 +149,7 @@ assertMergeOnMongoS({ testName: "agg_mongos_merge_sort_presorted", pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$sort: {_id: -1}}], + batchSize: 10, expectedCount: 400 }); @@ -149,6 +157,7 @@ assertMergeOnMongoD({ testName: "agg_mongos_merge_sort_in_mem", pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$sort: {_id: -1}}, {$sort: {a: 1}}], + batchSize: 10, expectedCount: 400 }); @@ -160,41 +169,45 @@ {$_internalSplitPipeline: {mergeType: "primaryShard"}} ], mergeType: "primaryShard", + batchSize: 10, expectedCount: 400 }); // Test that $skip is merged on mongoS. assertMergeOnMongoS({ testName: "agg_mongos_merge_skip", - pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$sort: {_id: -1}}, {$skip: 100}], - expectedCount: 300 + pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$sort: {_id: -1}}, {$skip: 300}], + batchSize: 10, + expectedCount: 100 }); // Test that $limit is merged on mongoS. assertMergeOnMongoS({ testName: "agg_mongos_merge_limit", - pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$limit: 50}], - expectedCount: 50 + pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$limit: 300}], + batchSize: 10, + expectedCount: 300 }); // Test that $sample is merged on mongoS. assertMergeOnMongoS({ testName: "agg_mongos_merge_sample", - pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$sample: {size: 50}}], - expectedCount: 50 + pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$sample: {size: 300}}], + batchSize: 10, + expectedCount: 300 }); // Test that merge pipelines containing all mongos-runnable stages produce the expected output. assertMergeOnMongoS({ testName: "agg_mongos_merge_all_mongos_runnable_stages", pipeline: [ - {$match: {_id: {$gte: -5, $lte: 100}}}, - {$sort: {_id: -1}}, - {$skip: 95}, - {$limit: 10}, + {$match: {_id: {$gte: -200, $lte: 200}}}, + {$sort: {_id: 1}}, + {$skip: 150}, + {$limit: 150}, {$addFields: {d: true}}, {$unwind: "$a"}, - {$sample: {size: 5}}, + {$sample: {size: 100}}, {$project: {c: 0}}, { $redact: { @@ -204,14 +217,17 @@ }, { $match: { - _id: {$gte: -4, $lte: 5}, - a: {$gte: -4, $lte: 5}, + _id: {$gte: -50, $lte: 100}, + a: {$gte: -50, $lte: 100}, b: {$exists: false}, c: {$exists: false}, d: true } } ], - expectedCount: 5 + batchSize: 10, + expectedCount: 100 }); + + st.stop(); })();
\ No newline at end of file diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index 8f1c2fef4a0..134a8a86e3d 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -529,14 +529,18 @@ public: /** * Returns a source to be run on the shards, or NULL if no work should be done on the shards for * this stage. Must not mutate the existing source object; if different behaviour is required in - * the split-pipeline case, a new source should be created and configured appropriately. + * the split-pipeline case, a new source should be created and configured appropriately. It is + * an error for getShardSource() to return a pointer to the same object as getMergeSource(), + * since this can result in the source being stitched into both the shard and merge pipelines + * when the latter is executed on mongoS. */ virtual boost::intrusive_ptr<DocumentSource> getShardSource() = 0; /** * Returns a source that combines results from the shards, or NULL if no work should be done in * the merge pipeline for this stage. Must not mutate the existing source object; if different - * behaviour is required, a new source should be created and configured appropriately. + * behaviour is required, a new source should be created and configured appropriately. It is an + * error for getMergeSource() to return a pointer to the same object as getShardSource(). */ virtual boost::intrusive_ptr<DocumentSource> getMergeSource() = 0; diff --git a/src/mongo/db/pipeline/document_source_internal_split_pipeline.h b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h index c18a6d301a6..f9ac84c555f 100644 --- a/src/mongo/db/pipeline/document_source_internal_split_pipeline.h +++ b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h @@ -49,9 +49,10 @@ public: static boost::intrusive_ptr<DocumentSource> createFromBson( BSONElement, const boost::intrusive_ptr<ExpressionContext>&); - DocumentSourceInternalSplitPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, - HostTypeRequirement mergeType) - : DocumentSource(expCtx), _mergeType(mergeType) {} + static boost::intrusive_ptr<DocumentSource> create( + const boost::intrusive_ptr<ExpressionContext>& expCtx, HostTypeRequirement mergeType) { + return new DocumentSourceInternalSplitPipeline(expCtx, mergeType); + } const char* getSourceName() const final { return kStageName.rawData(); @@ -62,7 +63,7 @@ public: } boost::intrusive_ptr<DocumentSource> getMergeSource() final { - return this; + return DocumentSourceInternalSplitPipeline::create(pExpCtx, _mergeType); } StageConstraints constraints() const final { @@ -74,6 +75,10 @@ public: GetNextResult getNext() final; private: + DocumentSourceInternalSplitPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, + HostTypeRequirement mergeType) + : DocumentSource(expCtx), _mergeType(mergeType) {} + Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; HostTypeRequirement _mergeType = HostTypeRequirement::kAnyShard; }; diff --git a/src/mongo/db/pipeline/document_source_limit.h b/src/mongo/db/pipeline/document_source_limit.h index 88acfa8b45a..f5f93fff1a4 100644 --- a/src/mongo/db/pipeline/document_source_limit.h +++ b/src/mongo/db/pipeline/document_source_limit.h @@ -68,13 +68,22 @@ public: static boost::intrusive_ptr<DocumentSourceLimit> create( const boost::intrusive_ptr<ExpressionContext>& pExpCtx, long long limit); - // Virtuals for SplittableDocumentSource - // Need to run on rounter. Running on shard as well is an optimization. + /** + * Returns the current DocumentSourceLimit for use in the shards pipeline. Running this stage on + * the shards is an optimization, but is not strictly necessary in order to produce correct + * pipeline output. + */ boost::intrusive_ptr<DocumentSource> getShardSource() final { return this; } + + /** + * Returns a new DocumentSourceLimit with the same limit as the current stage, for use in the + * merge pipeline. Unlike the shards source, it is necessary for this stage to run on the + * merging host in order to produce correct pipeline output. + */ boost::intrusive_ptr<DocumentSource> getMergeSource() final { - return this; + return DocumentSourceLimit::create(pExpCtx, _limit); } long long getLimit() const { diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index 5c2ce92237f..89975fc4cf2 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -272,8 +272,7 @@ void Pipeline::dispose(OperationContext* opCtx) { pCtx->opCtx = opCtx; // Make sure all stages are connected, in case we are being disposed via an error path and - // were - // not stitched at the time of the error. + // were not stitched at the time of the error. stitch(); if (!_sources.empty()) { @@ -332,6 +331,7 @@ void Pipeline::unsplitFromSharded( _unsplitSources.reset(); _splitForSharded = false; + stitch(); } @@ -351,6 +351,8 @@ void Pipeline::Optimizations::Sharded::findSplitPoint(Pipeline* shardPipe, Pipel // split this source into Merge and Shard _sources intrusive_ptr<DocumentSource> shardSource = splittable->getShardSource(); intrusive_ptr<DocumentSource> mergeSource = splittable->getMergeSource(); + invariant(shardSource != mergeSource); + if (shardSource) shardPipe->_sources.push_back(shardSource); if (mergeSource) diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h index 6f46bf75476..35b18954fdd 100644 --- a/src/mongo/db/pipeline/pipeline.h +++ b/src/mongo/db/pipeline/pipeline.h @@ -196,7 +196,7 @@ public: * Returns true if this pipeline is the part of a split pipeline which should be targeted to the * shards. */ - bool isSplitForSharded() { + bool isSplitForSharded() const { return _splitForSharded; } @@ -204,7 +204,7 @@ public: * Returns true if this pipeline is the part of a split pipeline which is responsible for * merging the results from the shards. */ - bool isSplitForMerge() { + bool isSplitForMerge() const { return _splitForMerge; } @@ -259,7 +259,7 @@ public: */ DepsTracker getDependencies(DepsTracker::MetadataAvailable metadataAvailable) const; - const SourceContainer& getSources() { + const SourceContainer& getSources() const { return _sources; } |