diff options
Diffstat (limited to 'src/mongo')
23 files changed, 1048 insertions, 126 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 0176859c6e1..8a4b39f9477 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -131,6 +131,7 @@ env.CppUnitTest( 'document_source_redact_test.cpp', 'document_source_replace_root_test.cpp', 'document_source_sample_test.cpp', + 'document_source_skip_test.cpp', 'document_source_sort_by_count_test.cpp', 'document_source_sort_test.cpp', 'document_source_test.cpp', diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index 49801200e4b..90f158aec1d 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -1418,6 +1418,8 @@ private: class DocumentSourceSort final : public DocumentSource, public SplittableDocumentSource { public: + static const uint64_t kMaxMemoryUsageBytes = 100 * 1024 * 1024; + // virtuals from DocumentSource GetNextResult getNext() final; const char* getSourceName() const final; @@ -1441,42 +1443,27 @@ public: boost::intrusive_ptr<DocumentSource> getShardSource() final; boost::intrusive_ptr<DocumentSource> getMergeSource() final; - /** - Add sort key field. - - Adds a sort key field to the key being built up. A concatenated - key is built up by calling this repeatedly. - - @param fieldPath the field path to the key component - @param ascending if true, use the key for an ascending sort, - otherwise, use it for descending - */ - void addKey(const std::string& fieldPath, bool ascending); - /// Write out a Document whose contents are the sort key. Document serializeSortKey(bool explain) const; /** - Create a sorting DocumentSource from BSON. - - This is a convenience method that uses the above, and operates on - a BSONElement that has been deteremined to be an Object with an - element named $group. - - @param pBsonElement the BSONELement that defines the group - @param pExpCtx the expression context for the pipeline - @returns the grouping DocumentSource + * Parses a $sort stage from the user-supplied BSON. */ static boost::intrusive_ptr<DocumentSource> createFromBson( BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); - /// Create a DocumentSourceSort with a given sort and (optional) limit + /** + * Convenience method for creating a $sort stage. + */ static boost::intrusive_ptr<DocumentSourceSort> create( const boost::intrusive_ptr<ExpressionContext>& pExpCtx, BSONObj sortOrder, - long long limit = -1); + long long limit = -1, + uint64_t maxMemoryUsageBytes = kMaxMemoryUsageBytes); - /// returns -1 for no limit + /** + * Returns -1 for no limit. + */ long long getLimit() const; /** @@ -1510,10 +1497,15 @@ private: explicit DocumentSourceSort(const boost::intrusive_ptr<ExpressionContext>& pExpCtx); Value serialize(bool explain = false) const final { - verify(false); // should call addToBsonArray instead + MONGO_UNREACHABLE; // Should call serializeToArray instead. } /** + * Helper to add a sort key to this stage. + */ + void addKey(StringData fieldPath, bool ascending); + + /** * Before returning anything, we have to consume all input and sort it. This method consumes all * input and prepares the sorted stream '_output'. * @@ -1568,6 +1560,7 @@ private: boost::intrusive_ptr<DocumentSourceLimit> limitSrc; + uint64_t _maxMemoryUsageBytes; bool _done; bool _mergingPresorted; std::unique_ptr<MySorter> _sorter; @@ -1912,7 +1905,8 @@ public: boost::intrusive_ptr<Expression> startWith, boost::optional<BSONObj> additionalFilter, boost::optional<FieldPath> depthField, - boost::optional<long long> maxDepth); + boost::optional<long long> maxDepth, + boost::optional<boost::intrusive_ptr<DocumentSourceUnwind>> unwindSrc); static boost::intrusive_ptr<DocumentSource> createFromBson( BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); @@ -1921,15 +1915,17 @@ protected: void doInjectExpressionContext() final; private: - DocumentSourceGraphLookUp(NamespaceString from, - std::string as, - std::string connectFromField, - std::string connectToField, - boost::intrusive_ptr<Expression> startWith, - boost::optional<BSONObj> additionalFilter, - boost::optional<FieldPath> depthField, - boost::optional<long long> maxDepth, - const boost::intrusive_ptr<ExpressionContext>& expCtx); + DocumentSourceGraphLookUp( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + NamespaceString from, + std::string as, + std::string connectFromField, + std::string connectToField, + boost::intrusive_ptr<Expression> startWith, + boost::optional<BSONObj> additionalFilter, + boost::optional<FieldPath> depthField, + boost::optional<long long> maxDepth, + boost::optional<boost::intrusive_ptr<DocumentSourceUnwind>> unwindSrc); Value serialize(bool explain = false) const final { // Should not be called; use serializeToArray instead. diff --git a/src/mongo/db/pipeline/document_source_add_fields_test.cpp b/src/mongo/db/pipeline/document_source_add_fields_test.cpp index eabea7ff045..724c896e4c0 100644 --- a/src/mongo/db/pipeline/document_source_add_fields_test.cpp +++ b/src/mongo/db/pipeline/document_source_add_fields_test.cpp @@ -132,5 +132,23 @@ TEST_F(AddFieldsTest, ShouldAddReferencedFieldsToDependencies) { ASSERT_EQUALS(true, dependencies.getNeedTextScore()); } +TEST_F(AddFieldsTest, ShouldPropagatePauses) { + auto addFields = DocumentSourceAddFields::create(BSON("a" << 10), getExpCtx()); + auto mock = DocumentSourceMock::create({Document(), + DocumentSource::GetNextResult::makePauseExecution(), + Document(), + DocumentSource::GetNextResult::makePauseExecution()}); + addFields->setSource(mock.get()); + + ASSERT_TRUE(addFields->getNext().isAdvanced()); + ASSERT_TRUE(addFields->getNext().isPaused()); + ASSERT_TRUE(addFields->getNext().isAdvanced()); + ASSERT_TRUE(addFields->getNext().isPaused()); + + ASSERT_TRUE(addFields->getNext().isEOF()); + ASSERT_TRUE(addFields->getNext().isEOF()); + ASSERT_TRUE(addFields->getNext().isEOF()); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp b/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp index 17e0c1d7a3b..f572925b42f 100644 --- a/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp +++ b/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp @@ -308,6 +308,39 @@ TEST_F(BucketAutoTests, RespectsCanonicalTypeOrderingOfValues) { ASSERT_DOCUMENT_EQ(results[1], Document(fromjson("{_id : {min : 'a', max : 'b'}, count : 2}"))); } +TEST_F(BucketAutoTests, ShouldPropagatePauses) { + auto bucketAutoSpec = fromjson("{$bucketAuto : {groupBy : '$x', buckets : 2}}"); + auto bucketAutoStage = createBucketAuto(bucketAutoSpec); + auto source = DocumentSourceMock::create({Document{{"x", 1}}, + DocumentSource::GetNextResult::makePauseExecution(), + Document{{"x", 2}}, + Document{{"x", 3}}, + DocumentSource::GetNextResult::makePauseExecution(), + Document{{"x", 4}}, + DocumentSource::GetNextResult::makePauseExecution()}); + bucketAutoStage->setSource(source.get()); + + // The $bucketAuto stage needs to consume all inputs before returning any output, so we should + // see all three pauses before any advances. + ASSERT_TRUE(bucketAutoStage->getNext().isPaused()); + ASSERT_TRUE(bucketAutoStage->getNext().isPaused()); + ASSERT_TRUE(bucketAutoStage->getNext().isPaused()); + + auto next = bucketAutoStage->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ(next.releaseDocument(), + Document(fromjson("{_id : {min : 1, max : 3}, count : 2}"))); + + next = bucketAutoStage->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ(next.releaseDocument(), + Document(fromjson("{_id : {min : 3, max : 4}, count : 2}"))); + + ASSERT_TRUE(bucketAutoStage->getNext().isEOF()); + ASSERT_TRUE(bucketAutoStage->getNext().isEOF()); + ASSERT_TRUE(bucketAutoStage->getNext().isEOF()); +} + TEST_F(BucketAutoTests, SourceNameIsBucketAuto) { auto bucketAuto = createBucketAuto(fromjson("{$bucketAuto : {groupBy : '$x', buckets : 2}}")); ASSERT_EQUALS(string(bucketAuto->getSourceName()), "$bucketAuto"); @@ -509,20 +542,38 @@ TEST_F(BucketAutoTests, FailsWithInvalidOutputFieldName) { ASSERT_THROWS_CODE(createBucketAuto(spec), UserException, 40236); } -TEST_F(BucketAutoTests, FailsWhenBufferingTooManyDocuments) { +TEST_F(BucketAutoTests, ShouldFailIfBufferingTooManyDocuments) { + const uint64_t maxMemoryUsageBytes = 1000; deque<DocumentSource::GetNextResult> inputs; - auto largeStr = string(1000, 'b'); + auto largeStr = string(maxMemoryUsageBytes, 'b'); auto inputDoc = Document{{"a", largeStr}}; - ASSERT_GTE(inputDoc.getApproximateSize(), 1000UL); + ASSERT_GTE(inputDoc.getApproximateSize(), maxMemoryUsageBytes); inputs.emplace_back(std::move(inputDoc)); inputs.emplace_back(Document{{"a", largeStr}}); auto mock = DocumentSourceMock::create(inputs); + const int numBuckets = 1; + auto bucketAuto = + DocumentSourceBucketAuto::create(getExpCtx(), numBuckets, maxMemoryUsageBytes); + bucketAuto->setSource(mock.get()); + ASSERT_THROWS_CODE(bucketAuto->getNext(), UserException, 16819); +} + +TEST_F(BucketAutoTests, ShouldFailIfBufferingTooManyDocumentsEvenIfPaused) { const uint64_t maxMemoryUsageBytes = 1000; + auto largeStr = string(maxMemoryUsageBytes / 2, 'b'); + auto inputDoc = Document{{"a", largeStr}}; + ASSERT_GT(inputDoc.getApproximateSize(), maxMemoryUsageBytes / 2); + + auto mock = DocumentSourceMock::create({std::move(inputDoc), + DocumentSource::GetNextResult::makePauseExecution(), + Document{{"a", largeStr}}}); + const int numBuckets = 1; auto bucketAuto = DocumentSourceBucketAuto::create(getExpCtx(), numBuckets, maxMemoryUsageBytes); bucketAuto->setSource(mock.get()); + ASSERT_TRUE(bucketAuto->getNext().isPaused()); ASSERT_THROWS_CODE(bucketAuto->getNext(), UserException, 16819); } diff --git a/src/mongo/db/pipeline/document_source_facet_test.cpp b/src/mongo/db/pipeline/document_source_facet_test.cpp index 0dda75adb1b..9c8cbce17e0 100644 --- a/src/mongo/db/pipeline/document_source_facet_test.cpp +++ b/src/mongo/db/pipeline/document_source_facet_test.cpp @@ -40,6 +40,7 @@ #include "mongo/db/pipeline/aggregation_context_fixture.h" #include "mongo/db/pipeline/document.h" #include "mongo/db/pipeline/document_value_test_util.h" +#include "mongo/unittest/death_test.h" #include "mongo/unittest/unittest.h" namespace mongo { @@ -301,6 +302,23 @@ TEST_F(DocumentSourceFacetTest, ShouldBeAbleToEvaluateMultipleStagesWithinOneSub ASSERT_DOCUMENT_EQ(output.getDocument(), Document(fromjson("{subPipe: [{_id: 0}, {_id: 1}]}"))); } +// TODO: DocumentSourceFacet will have to propagate pauses if we ever allow nested $facets. +DEATH_TEST_F(DocumentSourceFacetTest, + ShouldFailIfGivenPausedInput, + "Invariant failure !input.isPaused()") { + auto ctx = getExpCtx(); + + auto firstDummy = DocumentSourcePassthrough::create(); + auto pipeline = uassertStatusOK(Pipeline::create({firstDummy}, ctx)); + + auto facetStage = DocumentSourceFacet::create({{"subPipe", pipeline}}, ctx); + + auto mock = DocumentSourceMock::create(DocumentSource::GetNextResult::makePauseExecution()); + facetStage->setSource(mock.get()); + + facetStage->getNext(); // This should cause a crash. +} + // // Miscellaneous. // diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp index b9f3d0aa4e8..daa51ad8294 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp @@ -450,6 +450,7 @@ void DocumentSourceGraphLookUp::doReattachToOperationContext(OperationContext* o } DocumentSourceGraphLookUp::DocumentSourceGraphLookUp( + const boost::intrusive_ptr<ExpressionContext>& expCtx, NamespaceString from, std::string as, std::string connectFromField, @@ -458,7 +459,7 @@ DocumentSourceGraphLookUp::DocumentSourceGraphLookUp( boost::optional<BSONObj> additionalFilter, boost::optional<FieldPath> depthField, boost::optional<long long> maxDepth, - const boost::intrusive_ptr<ExpressionContext>& expCtx) + boost::optional<boost::intrusive_ptr<DocumentSourceUnwind>> unwindSrc) : DocumentSourceNeedsMongod(expCtx), _from(std::move(from)), _as(std::move(as)), @@ -469,7 +470,8 @@ DocumentSourceGraphLookUp::DocumentSourceGraphLookUp( _depthField(depthField), _maxDepth(maxDepth), _visited(ValueComparator::kInstance.makeUnorderedValueMap<BSONObj>()), - _cache(expCtx->getValueComparator()) {} + _cache(expCtx->getValueComparator()), + _unwind(unwindSrc) {} intrusive_ptr<DocumentSourceGraphLookUp> DocumentSourceGraphLookUp::create( const intrusive_ptr<ExpressionContext>& expCtx, @@ -480,9 +482,11 @@ intrusive_ptr<DocumentSourceGraphLookUp> DocumentSourceGraphLookUp::create( intrusive_ptr<Expression> startWith, boost::optional<BSONObj> additionalFilter, boost::optional<FieldPath> depthField, - boost::optional<long long> maxDepth) { + boost::optional<long long> maxDepth, + boost::optional<boost::intrusive_ptr<DocumentSourceUnwind>> unwindSrc) { intrusive_ptr<DocumentSourceGraphLookUp> source( - new DocumentSourceGraphLookUp(std::move(fromNs), + new DocumentSourceGraphLookUp(expCtx, + std::move(fromNs), std::move(asField), std::move(connectFromField), std::move(connectToField), @@ -490,7 +494,7 @@ intrusive_ptr<DocumentSourceGraphLookUp> DocumentSourceGraphLookUp::create( additionalFilter, depthField, maxDepth, - expCtx)); + unwindSrc)); source->_variables.reset(new Variables()); source->injectExpressionContext(expCtx); @@ -595,7 +599,8 @@ intrusive_ptr<DocumentSource> DocumentSourceGraphLookUp::createFromBson( !isMissingRequiredField); intrusive_ptr<DocumentSourceGraphLookUp> newSource( - new DocumentSourceGraphLookUp(std::move(from), + new DocumentSourceGraphLookUp(expCtx, + std::move(from), std::move(as), std::move(connectFromField), std::move(connectToField), @@ -603,7 +608,7 @@ intrusive_ptr<DocumentSource> DocumentSourceGraphLookUp::createFromBson( additionalFilter, depthField, maxDepth, - expCtx)); + boost::none)); newSource->_variables.reset(new Variables(idGenerator.getIdCount())); diff --git a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp index 398831cc728..6f74a0d6280 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp +++ b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp @@ -36,6 +36,7 @@ #include "mongo/db/pipeline/aggregation_context_fixture.h" #include "mongo/db/pipeline/document.h" #include "mongo/db/pipeline/document_value_test_util.h" +#include "mongo/db/pipeline/stub_mongod_interface.h" #include "mongo/unittest/unittest.h" #include "mongo/util/assert_util.h" #include "mongo/util/mongoutils/str.h" @@ -60,56 +61,11 @@ using DocumentSourceGraphLookUpTest = AggregationContextFixture; * A MongodInterface use for testing that supports making pipelines with an initial * DocumentSourceMock source. */ -class MockMongodImplementation final : public DocumentSourceNeedsMongod::MongodInterface { +class MockMongodImplementation final : public StubMongodInterface { public: MockMongodImplementation(std::deque<DocumentSource::GetNextResult> results) : _results(std::move(results)) {} - void setOperationContext(OperationContext* opCtx) final { - MONGO_UNREACHABLE; - } - - DBClientBase* directClient() final { - MONGO_UNREACHABLE; - } - - bool isSharded(const NamespaceString& ns) final { - MONGO_UNREACHABLE; - } - - BSONObj insert(const NamespaceString& ns, const std::vector<BSONObj>& objs) final { - MONGO_UNREACHABLE; - } - - CollectionIndexUsageMap getIndexStats(OperationContext* opCtx, - const NamespaceString& ns) final { - MONGO_UNREACHABLE; - } - - void appendLatencyStats(const NamespaceString& nss, - bool showHistograms, - BSONObjBuilder* builder) const final { - MONGO_UNREACHABLE; - } - - Status appendStorageStats(const NamespaceString& nss, - const BSONObj& param, - BSONObjBuilder* builder) const final { - MONGO_UNREACHABLE; - } - - BSONObj getCollectionOptions(const NamespaceString& nss) final { - MONGO_UNREACHABLE; - } - - Status renameIfOptionsAndIndexesHaveNotChanged( - const BSONObj& renameCommandObj, - const NamespaceString& targetNs, - const BSONObj& originalCollectionOptions, - const std::list<BSONObj>& originalIndexes) final { - MONGO_UNREACHABLE; - } - StatusWith<boost::intrusive_ptr<Pipeline>> makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx) final { @@ -148,6 +104,7 @@ TEST_F(DocumentSourceGraphLookUpTest, ExpressionFieldPath::create("_id"), boost::none, boost::none, + boost::none, boost::none); graphLookupStage->setSource(inputMock.get()); graphLookupStage->injectMongodInterface( @@ -176,6 +133,7 @@ TEST_F(DocumentSourceGraphLookUpTest, ExpressionFieldPath::create("_id"), boost::none, boost::none, + boost::none, boost::none); graphLookupStage->setSource(inputMock.get()); graphLookupStage->injectMongodInterface( @@ -195,6 +153,7 @@ TEST_F(DocumentSourceGraphLookUpTest, NamespaceString fromNs("test", "graph_lookup"); expCtx->resolvedNamespaces[fromNs.coll()] = {fromNs, std::vector<BSONObj>{}}; + auto unwindStage = DocumentSourceUnwind::create(expCtx, "results", false, boost::none); auto graphLookupStage = DocumentSourceGraphLookUp::create(expCtx, fromNs, "results", @@ -203,16 +162,13 @@ TEST_F(DocumentSourceGraphLookUpTest, ExpressionFieldPath::create("_id"), boost::none, boost::none, - boost::none); + boost::none, + unwindStage); graphLookupStage->injectMongodInterface( std::make_shared<MockMongodImplementation>(std::move(fromContents))); + graphLookupStage->setSource(inputMock.get()); - auto unwindStage = DocumentSourceUnwind::create(expCtx, "results", false, boost::none); - auto pipeline = - unittest::assertGet(Pipeline::create({inputMock, graphLookupStage, unwindStage}, expCtx)); - pipeline->optimizePipeline(); - - ASSERT_THROWS_CODE(pipeline->getNext(), UserException, 40271); + ASSERT_THROWS_CODE(graphLookupStage->getNext(), UserException, 40271); } bool arrayContains(const boost::intrusive_ptr<ExpressionContext>& expCtx, @@ -248,19 +204,20 @@ TEST_F(DocumentSourceGraphLookUpTest, ExpressionFieldPath::create("_id"), boost::none, boost::none, + boost::none, boost::none); graphLookupStage->setSource(inputMock.get()); graphLookupStage->injectMongodInterface( std::make_shared<MockMongodImplementation>(std::move(fromContents))); - auto pipeline = unittest::assertGet(Pipeline::create({inputMock, graphLookupStage}, expCtx)); + graphLookupStage->setSource(inputMock.get()); - auto next = pipeline->getNext(); - ASSERT(next); + auto next = graphLookupStage->getNext(); + ASSERT_TRUE(next.isAdvanced()); - ASSERT_EQ(2U, next->size()); - ASSERT_VALUE_EQ(Value(0), next->getField("_id")); + ASSERT_EQ(2U, next.getDocument().size()); + ASSERT_VALUE_EQ(Value(0), next.getDocument().getField("_id")); - auto resultsValue = next->getField("results"); + auto resultsValue = next.getDocument().getField("results"); ASSERT(resultsValue.isArray()); auto resultsArray = resultsValue.getArray(); @@ -271,23 +228,166 @@ TEST_F(DocumentSourceGraphLookUpTest, ASSERT(arrayContains(expCtx, resultsArray, Value(to1))); ASSERT_EQ(2U, resultsArray.size()); - next = pipeline->getNext(); - ASSERT(!next); + next = graphLookupStage->getNext(); + ASSERT(next.isEOF()); } else if (arrayContains(expCtx, resultsArray, Value(to0from2))) { // If 'to0from2' was returned, then we should see 'to2' and nothing else. ASSERT(arrayContains(expCtx, resultsArray, Value(to2))); ASSERT_EQ(2U, resultsArray.size()); - next = pipeline->getNext(); - ASSERT(!next); + next = graphLookupStage->getNext(); + ASSERT(next.isEOF()); } else { FAIL(str::stream() << "Expected either [ " << to0from1.toString() << " ] or [ " << to0from2.toString() << " ] but found [ " - << next->toString() + << next.getDocument().toString() << " ]"); } } +TEST_F(DocumentSourceGraphLookUpTest, ShouldPropagatePauses) { + auto expCtx = getExpCtx(); + + auto inputMock = + DocumentSourceMock::create({Document{{"startPoint", 0}}, + DocumentSource::GetNextResult::makePauseExecution(), + Document{{"startPoint", 0}}, + DocumentSource::GetNextResult::makePauseExecution()}); + + std::deque<DocumentSource::GetNextResult> fromContents{ + Document{{"_id", "a"}, {"to", 0}, {"from", 1}}, Document{{"_id", "b"}, {"to", 1}}}; + + NamespaceString fromNs("test", "foreign"); + expCtx->resolvedNamespaces[fromNs.coll()] = {fromNs, std::vector<BSONObj>{}}; + auto graphLookupStage = + DocumentSourceGraphLookUp::create(expCtx, + fromNs, + "results", + "from", + "to", + ExpressionFieldPath::create("startPoint"), + boost::none, + boost::none, + boost::none, + boost::none); + + graphLookupStage->setSource(inputMock.get()); + + graphLookupStage->injectMongodInterface( + std::make_shared<MockMongodImplementation>(std::move(fromContents))); + + auto next = graphLookupStage->getNext(); + ASSERT_TRUE(next.isAdvanced()); + + // We expect {startPoint: 0, results: [{_id: "a", to: 0, from: 1}, {_id: "b", to: 1}]}, but the + // 'results' array can be in any order. So we use arrayContains to assert it has the right + // contents. + auto result = next.releaseDocument(); + ASSERT_VALUE_EQ(result["startPoint"], Value(0)); + ASSERT_EQ(result["results"].getType(), BSONType::Array); + ASSERT_EQ(result["results"].getArray().size(), 2UL); + ASSERT_TRUE(arrayContains(expCtx, + result["results"].getArray(), + Value(Document{{"_id", "a"}, {"to", 0}, {"from", 1}}))); + ASSERT_TRUE(arrayContains( + expCtx, result["results"].getArray(), Value(Document{{"_id", "b"}, {"to", 1}}))); + + ASSERT_TRUE(graphLookupStage->getNext().isPaused()); + + next = graphLookupStage->getNext(); + ASSERT_TRUE(next.isAdvanced()); + result = next.releaseDocument(); + ASSERT_VALUE_EQ(result["startPoint"], Value(0)); + ASSERT_EQ(result["results"].getType(), BSONType::Array); + ASSERT_EQ(result["results"].getArray().size(), 2UL); + ASSERT_TRUE(arrayContains(expCtx, + result["results"].getArray(), + Value(Document{{"_id", "a"}, {"to", 0}, {"from", 1}}))); + ASSERT_TRUE(arrayContains( + expCtx, result["results"].getArray(), Value(Document{{"_id", "b"}, {"to", 1}}))); + + ASSERT_TRUE(graphLookupStage->getNext().isPaused()); + + ASSERT_TRUE(graphLookupStage->getNext().isEOF()); + ASSERT_TRUE(graphLookupStage->getNext().isEOF()); +} + +TEST_F(DocumentSourceGraphLookUpTest, ShouldPropagatePausesWhileUnwinding) { + auto expCtx = getExpCtx(); + + // Set up the $graphLookup stage + auto inputMock = + DocumentSourceMock::create({Document{{"startPoint", 0}}, + DocumentSource::GetNextResult::makePauseExecution(), + Document{{"startPoint", 0}}, + DocumentSource::GetNextResult::makePauseExecution()}); + + std::deque<DocumentSource::GetNextResult> fromContents{ + Document{{"_id", "a"}, {"to", 0}, {"from", 1}}, Document{{"_id", "b"}, {"to", 1}}}; + + NamespaceString fromNs("test", "foreign"); + expCtx->resolvedNamespaces[fromNs.coll()] = {fromNs, std::vector<BSONObj>{}}; + + const bool preserveNullAndEmptyArrays = false; + const boost::optional<std::string> includeArrayIndex = boost::none; + auto unwindStage = DocumentSourceUnwind::create( + expCtx, "results", preserveNullAndEmptyArrays, includeArrayIndex); + + auto graphLookupStage = + DocumentSourceGraphLookUp::create(expCtx, + fromNs, + "results", + "from", + "to", + ExpressionFieldPath::create("startPoint"), + boost::none, + boost::none, + boost::none, + unwindStage); + + graphLookupStage->setSource(inputMock.get()); + + graphLookupStage->injectMongodInterface( + std::make_shared<MockMongodImplementation>(std::move(fromContents))); + + // Assert it has the expected results. Note the results can be in either order. + auto expectedA = + Document{{"startPoint", 0}, {"results", Document{{"_id", "a"}, {"to", 0}, {"from", 1}}}}; + auto expectedB = Document{{"startPoint", 0}, {"results", Document{{"_id", "b"}, {"to", 1}}}}; + auto next = graphLookupStage->getNext(); + ASSERT_TRUE(next.isAdvanced()); + if (expCtx->getDocumentComparator().evaluate(next.getDocument() == expectedA)) { + next = graphLookupStage->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ(next.releaseDocument(), expectedB); + } else { + ASSERT_DOCUMENT_EQ(next.releaseDocument(), expectedB); + next = graphLookupStage->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ(next.releaseDocument(), expectedA); + } + + ASSERT_TRUE(graphLookupStage->getNext().isPaused()); + + next = graphLookupStage->getNext(); + ASSERT_TRUE(next.isAdvanced()); + if (expCtx->getDocumentComparator().evaluate(next.getDocument() == expectedA)) { + next = graphLookupStage->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ(next.releaseDocument(), expectedB); + } else { + ASSERT_DOCUMENT_EQ(next.releaseDocument(), expectedB); + next = graphLookupStage->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ(next.releaseDocument(), expectedA); + } + + ASSERT_TRUE(graphLookupStage->getNext().isPaused()); + + ASSERT_TRUE(graphLookupStage->getNext().isEOF()); + ASSERT_TRUE(graphLookupStage->getNext().isEOF()); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_group_test.cpp b/src/mongo/db/pipeline/document_source_group_test.cpp index 850fc9589d1..32b6d1f6f87 100644 --- a/src/mongo/db/pipeline/document_source_group_test.cpp +++ b/src/mongo/db/pipeline/document_source_group_test.cpp @@ -38,15 +38,18 @@ #include "mongo/bson/bsonmisc.h" #include "mongo/bson/bsonobj.h" #include "mongo/bson/json.h" +#include "mongo/db/pipeline/aggregation_context_fixture.h" #include "mongo/db/pipeline/aggregation_request.h" #include "mongo/db/pipeline/dependencies.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_value_test_util.h" +#include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/value_comparator.h" #include "mongo/db/query/query_test_service_context.h" #include "mongo/dbtests/dbtests.h" #include "mongo/stdx/memory.h" +#include "mongo/stdx/unordered_set.h" #include "mongo/unittest/temp_dir.h" #include "mongo/unittest/unittest.h" @@ -61,6 +64,133 @@ using std::vector; static const char* const ns = "unittests.document_source_group_tests"; +// This provides access to getExpCtx(), but we'll use a different name for this test suite. +using DocumentSourceGroupTest = AggregationContextFixture; + +TEST_F(DocumentSourceGroupTest, ShouldBeAbleToPauseLoading) { + auto expCtx = getExpCtx(); + expCtx->inRouter = true; // Disallow external sort. + // This is the only way to do this in a debug build. + AccumulationStatement countStatement{"count", + AccumulationStatement::getFactory("$sum"), + ExpressionConstant::create(expCtx, Value(1))}; + auto group = DocumentSourceGroup::create( + expCtx, ExpressionConstant::create(expCtx, Value(BSONNULL)), {countStatement}, 0); + auto mock = DocumentSourceMock::create({DocumentSource::GetNextResult::makePauseExecution(), + Document(), + DocumentSource::GetNextResult::makePauseExecution(), + Document(), + Document(), + DocumentSource::GetNextResult::makePauseExecution(), + Document()}); + group->setSource(mock.get()); + + // There were 3 pauses, so we should expect 3 paused results before any results can be returned. + ASSERT_TRUE(group->getNext().isPaused()); + ASSERT_TRUE(group->getNext().isPaused()); + ASSERT_TRUE(group->getNext().isPaused()); + + // There were 4 documents, so we expect a count of 4. + auto result = group->getNext(); + ASSERT_TRUE(result.isAdvanced()); + ASSERT_DOCUMENT_EQ(result.releaseDocument(), (Document{{"_id", BSONNULL}, {"count", 4}})); +} + +TEST_F(DocumentSourceGroupTest, ShouldBeAbleToPauseLoadingWhileSpilled) { + auto expCtx = getExpCtx(); + + // Allow the $group stage to spill to disk. + TempDir tempDir("DocumentSourceGroupTest"); + expCtx->tempDir = tempDir.path(); + expCtx->extSortAllowed = true; + const size_t maxMemoryUsageBytes = 1000; + + VariablesIdGenerator idGen; + VariablesParseState vps(&idGen); + AccumulationStatement pushStatement{"spaceHog", + AccumulationStatement::getFactory("$push"), + ExpressionFieldPath::parse("$largeStr", vps)}; + auto groupByExpression = ExpressionFieldPath::parse("$_id", vps); + auto group = DocumentSourceGroup::create( + expCtx, groupByExpression, {pushStatement}, idGen.getIdCount(), maxMemoryUsageBytes); + + string largeStr(maxMemoryUsageBytes, 'x'); + auto mock = DocumentSourceMock::create({Document{{"_id", 0}, {"largeStr", largeStr}}, + DocumentSource::GetNextResult::makePauseExecution(), + Document{{"_id", 1}, {"largeStr", largeStr}}, + DocumentSource::GetNextResult::makePauseExecution(), + Document{{"_id", 2}, {"largeStr", largeStr}}}); + group->setSource(mock.get()); + + // There were 2 pauses, so we should expect 2 paused results before any results can be returned. + ASSERT_TRUE(group->getNext().isPaused()); + ASSERT_TRUE(group->getNext().isPaused()); + + // Now we expect to get the results back, although in no particular order. + stdx::unordered_set<int> idSet; + for (auto result = group->getNext(); result.isAdvanced(); result = group->getNext()) { + idSet.insert(result.releaseDocument()["_id"].coerceToInt()); + } + ASSERT_TRUE(group->getNext().isEOF()); + + ASSERT_EQ(idSet.size(), 3UL); + ASSERT_EQ(idSet.count(0), 1UL); + ASSERT_EQ(idSet.count(1), 1UL); + ASSERT_EQ(idSet.count(2), 1UL); +} + +TEST_F(DocumentSourceGroupTest, ShouldErrorIfNotAllowedToSpillToDiskAndResultSetIsTooLarge) { + auto expCtx = getExpCtx(); + const size_t maxMemoryUsageBytes = 1000; + expCtx->inRouter = true; // Disallow external sort. + // This is the only way to do this in a debug build. + + VariablesIdGenerator idGen; + VariablesParseState vps(&idGen); + AccumulationStatement pushStatement{"spaceHog", + AccumulationStatement::getFactory("$push"), + ExpressionFieldPath::parse("$largeStr", vps)}; + auto groupByExpression = ExpressionFieldPath::parse("$_id", vps); + auto group = DocumentSourceGroup::create( + expCtx, groupByExpression, {pushStatement}, idGen.getIdCount(), maxMemoryUsageBytes); + + string largeStr(maxMemoryUsageBytes, 'x'); + auto mock = DocumentSourceMock::create({Document{{"_id", 0}, {"largeStr", largeStr}}, + Document{{"_id", 1}, {"largeStr", largeStr}}}); + group->setSource(mock.get()); + + ASSERT_THROWS_CODE(group->getNext(), UserException, 16945); +} + +TEST_F(DocumentSourceGroupTest, ShouldCorrectlyTrackMemoryUsageBetweenPauses) { + auto expCtx = getExpCtx(); + const size_t maxMemoryUsageBytes = 1000; + expCtx->inRouter = true; // Disallow external sort. + // This is the only way to do this in a debug build. + + VariablesIdGenerator idGen; + VariablesParseState vps(&idGen); + AccumulationStatement pushStatement{"spaceHog", + AccumulationStatement::getFactory("$push"), + ExpressionFieldPath::parse("$largeStr", vps)}; + auto groupByExpression = ExpressionFieldPath::parse("$_id", vps); + auto group = DocumentSourceGroup::create( + expCtx, groupByExpression, {pushStatement}, idGen.getIdCount(), maxMemoryUsageBytes); + + string largeStr(maxMemoryUsageBytes / 2, 'x'); + auto mock = DocumentSourceMock::create({Document{{"_id", 0}, {"largeStr", largeStr}}, + DocumentSource::GetNextResult::makePauseExecution(), + Document{{"_id", 1}, {"largeStr", largeStr}}, + Document{{"_id", 2}, {"largeStr", largeStr}}}); + group->setSource(mock.get()); + + // The first getNext() should pause. + ASSERT_TRUE(group->getNext().isPaused()); + + // The next should realize it's used too much memory. + ASSERT_THROWS_CODE(group->getNext(), UserException, 16945); +} + BSONObj toBson(const intrusive_ptr<DocumentSource>& source) { vector<Value> arr; source->serializeToArray(arr); diff --git a/src/mongo/db/pipeline/document_source_limit_test.cpp b/src/mongo/db/pipeline/document_source_limit_test.cpp index 7de5bf85a3a..2fc83eed374 100644 --- a/src/mongo/db/pipeline/document_source_limit_test.cpp +++ b/src/mongo/db/pipeline/document_source_limit_test.cpp @@ -99,5 +99,27 @@ TEST_F(DocumentSourceLimitTest, ShouldNotIntroduceAnyDependencies) { ASSERT_EQUALS(false, dependencies.getNeedTextScore()); } +TEST_F(DocumentSourceLimitTest, ShouldPropagatePauses) { + auto limit = DocumentSourceLimit::create(getExpCtx(), 2); + auto mock = DocumentSourceMock::create({DocumentSource::GetNextResult::makePauseExecution(), + Document(), + DocumentSource::GetNextResult::makePauseExecution(), + Document(), + DocumentSource::GetNextResult::makePauseExecution(), + Document()}); + limit->setSource(mock.get()); + + ASSERT_TRUE(limit->getNext().isPaused()); + ASSERT_TRUE(limit->getNext().isAdvanced()); + ASSERT_TRUE(limit->getNext().isPaused()); + ASSERT_TRUE(limit->getNext().isAdvanced()); + + // We've reached the limit. + ASSERT_TRUE(mock->isDisposed); + ASSERT_TRUE(limit->getNext().isEOF()); + ASSERT_TRUE(limit->getNext().isEOF()); + ASSERT_TRUE(limit->getNext().isEOF()); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp index 2ab28f3e392..c42eebdb2f1 100644 --- a/src/mongo/db/pipeline/document_source_lookup_test.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp @@ -29,6 +29,7 @@ #include "mongo/platform/basic.h" #include <boost/intrusive_ptr.hpp> +#include <deque> #include <vector> #include "mongo/bson/bsonmisc.h" @@ -37,12 +38,15 @@ #include "mongo/db/pipeline/aggregation_context_fixture.h" #include "mongo/db/pipeline/document.h" #include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_value_test_util.h" #include "mongo/db/pipeline/field_path.h" +#include "mongo/db/pipeline/stub_mongod_interface.h" #include "mongo/db/pipeline/value.h" namespace mongo { namespace { using boost::intrusive_ptr; +using std::deque; using std::vector; // This provides access to getExpCtx(), but we'll use a different name for this test suite. @@ -125,5 +129,145 @@ TEST(MakeMatchStageFromInput, ArrayValueWithRegexUsesOrQuery) { << BSONObj())))); } +// +// Execution tests. +// + +/** + * A mock MongodInterface which allows mocking a foreign pipeline. + */ +class MockMongodInterface final : public StubMongodInterface { +public: + MockMongodInterface(deque<DocumentSource::GetNextResult> mockResults) + : _mockResults(std::move(mockResults)) {} + + bool isSharded(const NamespaceString& ns) final { + return false; + } + + StatusWith<boost::intrusive_ptr<Pipeline>> makePipeline( + const std::vector<BSONObj>& rawPipeline, + const boost::intrusive_ptr<ExpressionContext>& expCtx) final { + auto pipeline = Pipeline::parse(rawPipeline, expCtx); + if (!pipeline.isOK()) { + return pipeline.getStatus(); + } + + pipeline.getValue()->addInitialSource(DocumentSourceMock::create(_mockResults)); + pipeline.getValue()->injectExpressionContext(expCtx); + pipeline.getValue()->optimizePipeline(); + + return pipeline; + } + +private: + deque<DocumentSource::GetNextResult> _mockResults; +}; + +TEST_F(DocumentSourceLookUpTest, ShouldPropagatePauses) { + auto expCtx = getExpCtx(); + NamespaceString fromNs("test", "foreign"); + expCtx->resolvedNamespaces[fromNs.coll()] = {fromNs, std::vector<BSONObj>{}}; + + // Set up the $lookup stage. + auto lookupSpec = Document{{"$lookup", + Document{{"from", fromNs.coll()}, + {"localField", "foreignId"}, + {"foreignField", "_id"}, + {"as", "foreignDocs"}}}} + .toBson(); + auto parsed = DocumentSourceLookUp::createFromBson(lookupSpec.firstElement(), expCtx); + auto lookup = static_cast<DocumentSourceLookUp*>(parsed.get()); + + // Mock its input, pausing every other result. + auto mockLocalSource = + DocumentSourceMock::create({Document{{"foreignId", 0}}, + DocumentSource::GetNextResult::makePauseExecution(), + Document{{"foreignId", 1}}, + DocumentSource::GetNextResult::makePauseExecution()}); + + lookup->setSource(mockLocalSource.get()); + lookup->injectExpressionContext(expCtx); + + // Mock out the foreign collection. + deque<DocumentSource::GetNextResult> mockForeignContents{Document{{"_id", 0}}, + Document{{"_id", 1}}}; + lookup->injectMongodInterface( + std::make_shared<MockMongodInterface>(std::move(mockForeignContents))); + + auto next = lookup->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ( + next.releaseDocument(), + (Document{{"foreignId", 0}, {"foreignDocs", vector<Value>{Value(Document{{"_id", 0}})}}})); + + ASSERT_TRUE(lookup->getNext().isPaused()); + + next = lookup->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ( + next.releaseDocument(), + (Document{{"foreignId", 1}, {"foreignDocs", vector<Value>{Value(Document{{"_id", 1}})}}})); + + ASSERT_TRUE(lookup->getNext().isPaused()); + + ASSERT_TRUE(lookup->getNext().isEOF()); + ASSERT_TRUE(lookup->getNext().isEOF()); +} + +TEST_F(DocumentSourceLookUpTest, ShouldPropagatePausesWhileUnwinding) { + auto expCtx = getExpCtx(); + NamespaceString fromNs("test", "foreign"); + expCtx->resolvedNamespaces[fromNs.coll()] = {fromNs, std::vector<BSONObj>{}}; + + // Set up the $lookup stage. + auto lookupSpec = Document{{"$lookup", + Document{{"from", fromNs.coll()}, + {"localField", "foreignId"}, + {"foreignField", "_id"}, + {"as", "foreignDoc"}}}} + .toBson(); + auto parsed = DocumentSourceLookUp::createFromBson(lookupSpec.firstElement(), expCtx); + auto lookup = static_cast<DocumentSourceLookUp*>(parsed.get()); + + const bool preserveNullAndEmptyArrays = false; + const boost::optional<std::string> includeArrayIndex = boost::none; + lookup->setUnwindStage(DocumentSourceUnwind::create( + expCtx, "foreignDoc", preserveNullAndEmptyArrays, includeArrayIndex)); + + // Mock its input, pausing every other result. + auto mockLocalSource = + DocumentSourceMock::create({Document{{"foreignId", 0}}, + DocumentSource::GetNextResult::makePauseExecution(), + Document{{"foreignId", 1}}, + DocumentSource::GetNextResult::makePauseExecution()}); + lookup->setSource(mockLocalSource.get()); + + lookup->injectExpressionContext(expCtx); + + // Mock out the foreign collection. + deque<DocumentSource::GetNextResult> mockForeignContents{Document{{"_id", 0}}, + Document{{"_id", 1}}}; + lookup->injectMongodInterface( + std::make_shared<MockMongodInterface>(std::move(mockForeignContents))); + + auto next = lookup->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ(next.releaseDocument(), + (Document{{"foreignId", 0}, {"foreignDoc", Document{{"_id", 0}}}})); + + ASSERT_TRUE(lookup->getNext().isPaused()); + + next = lookup->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ(next.releaseDocument(), + (Document{{"foreignId", 1}, {"foreignDoc", Document{{"_id", 1}}}})); + + ASSERT_TRUE(lookup->getNext().isPaused()); + + ASSERT_TRUE(lookup->getNext().isEOF()); + ASSERT_TRUE(lookup->getNext().isEOF()); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_match_test.cpp b/src/mongo/db/pipeline/document_source_match_test.cpp index d61262ed1f7..eb4d40bbc97 100644 --- a/src/mongo/db/pipeline/document_source_match_test.cpp +++ b/src/mongo/db/pipeline/document_source_match_test.cpp @@ -302,6 +302,29 @@ TEST_F(DocumentSourceMatchTest, MultipleMatchStagesShouldCombineIntoOne) { "{c:1}]}")); } +TEST_F(DocumentSourceMatchTest, ShouldPropagatePauses) { + auto match = DocumentSourceMatch::create(BSON("a" << 1), getExpCtx()); + auto mock = DocumentSourceMock::create({DocumentSource::GetNextResult::makePauseExecution(), + Document{{"a", 1}}, + DocumentSource::GetNextResult::makePauseExecution(), + Document{{"a", 2}}, + Document{{"a", 2}}, + DocumentSource::GetNextResult::makePauseExecution(), + Document{{"a", 1}}}); + match->setSource(mock.get()); + + ASSERT_TRUE(match->getNext().isPaused()); + ASSERT_TRUE(match->getNext().isAdvanced()); + ASSERT_TRUE(match->getNext().isPaused()); + + // {a: 2} doesn't match, should go directly to the next pause. + ASSERT_TRUE(match->getNext().isPaused()); + ASSERT_TRUE(match->getNext().isAdvanced()); + ASSERT_TRUE(match->getNext().isEOF()); + ASSERT_TRUE(match->getNext().isEOF()); + ASSERT_TRUE(match->getNext().isEOF()); +} + TEST(ObjectForMatch, ShouldExtractTopLevelFieldIfDottedFieldNeeded) { Document input(fromjson("{a: 1, b: {c: 1, d: 1}}")); BSONObj expected = fromjson("{b: {c: 1, d: 1}}"); diff --git a/src/mongo/db/pipeline/document_source_mock_test.cpp b/src/mongo/db/pipeline/document_source_mock_test.cpp index acf4f21f3fe..e4826ff5b24 100644 --- a/src/mongo/db/pipeline/document_source_mock_test.cpp +++ b/src/mongo/db/pipeline/document_source_mock_test.cpp @@ -43,13 +43,23 @@ TEST(DocumentSourceMockTest, OneDoc) { ASSERT(source->getNext().isEOF()); } -TEST(DocumentSourceMockTest, DequeDocuments) { +TEST(DocumentSourceMockTest, ShouldBeConstructableFromInitializerListOfDocuments) { auto source = DocumentSourceMock::create({Document{{"a", 1}}, Document{{"a", 2}}}); ASSERT_DOCUMENT_EQ(source->getNext().getDocument(), (Document{{"a", 1}})); ASSERT_DOCUMENT_EQ(source->getNext().getDocument(), (Document{{"a", 2}})); ASSERT(source->getNext().isEOF()); } +TEST(DocumentSourceMockTest, ShouldBeConstructableFromDequeOfResults) { + auto source = DocumentSourceMock::create({Document{{"a", 1}}, + DocumentSource::GetNextResult::makePauseExecution(), + Document{{"a", 2}}}); + ASSERT_DOCUMENT_EQ(source->getNext().getDocument(), (Document{{"a", 1}})); + ASSERT_TRUE(source->getNext().isPaused()); + ASSERT_DOCUMENT_EQ(source->getNext().getDocument(), (Document{{"a", 2}})); + ASSERT(source->getNext().isEOF()); +} + TEST(DocumentSourceMockTest, StringJSON) { auto source = DocumentSourceMock::create("{a : 1}"); ASSERT_DOCUMENT_EQ(source->getNext().getDocument(), (Document{{"a", 1}})); diff --git a/src/mongo/db/pipeline/document_source_project_test.cpp b/src/mongo/db/pipeline/document_source_project_test.cpp index 12c5f72f087..de2bd12e254 100644 --- a/src/mongo/db/pipeline/document_source_project_test.cpp +++ b/src/mongo/db/pipeline/document_source_project_test.cpp @@ -134,6 +134,28 @@ TEST_F(ProjectStageTest, ExclusionShouldBeAbleToProcessMultipleDocuments) { ASSERT(project->getNext().isEOF()); } +TEST_F(ProjectStageTest, ShouldPropagatePauses) { + auto project = DocumentSourceProject::create(BSON("a" << false), getExpCtx()); + auto source = DocumentSourceMock::create({Document(), + DocumentSource::GetNextResult::makePauseExecution(), + Document(), + DocumentSource::GetNextResult::makePauseExecution(), + Document(), + DocumentSource::GetNextResult::makePauseExecution()}); + project->setSource(source.get()); + + ASSERT_TRUE(project->getNext().isAdvanced()); + ASSERT_TRUE(project->getNext().isPaused()); + ASSERT_TRUE(project->getNext().isAdvanced()); + ASSERT_TRUE(project->getNext().isPaused()); + ASSERT_TRUE(project->getNext().isAdvanced()); + ASSERT_TRUE(project->getNext().isPaused()); + + ASSERT(project->getNext().isEOF()); + ASSERT(project->getNext().isEOF()); + ASSERT(project->getNext().isEOF()); +} + TEST_F(ProjectStageTest, InclusionShouldAddDependenciesOfIncludedAndComputedFields) { auto project = DocumentSourceProject::create( fromjson("{a: true, x: '$b', y: {$and: ['$c','$d']}, z: {$meta: 'textScore'}}"), diff --git a/src/mongo/db/pipeline/document_source_redact_test.cpp b/src/mongo/db/pipeline/document_source_redact_test.cpp index f4de62feff0..a6a4bfd9f7b 100644 --- a/src/mongo/db/pipeline/document_source_redact_test.cpp +++ b/src/mongo/db/pipeline/document_source_redact_test.cpp @@ -57,5 +57,24 @@ TEST_F(DocumentSourceRedactTest, ShouldCopyRedactSafePartOfMatchBeforeItself) { ASSERT_EQUALS(pipeline.size(), 3U); ASSERT(dynamic_cast<DocumentSourceMatch*>(pipeline.front().get())); } + +TEST_F(DocumentSourceRedactTest, ShouldPropagatePauses) { + auto redactSpec = BSON("$redact" + << "$$KEEP"); + auto redact = DocumentSourceRedact::createFromBson(redactSpec.firstElement(), getExpCtx()); + auto mock = DocumentSourceMock::create({Document{{"_id", 0}}, + DocumentSource::GetNextResult::makePauseExecution(), + Document{{"_id", 1}}, + DocumentSource::GetNextResult::makePauseExecution()}); + redact->setSource(mock.get()); + + // The $redact is keeping everything, so we should see everything from the mock, then EOF. + ASSERT_TRUE(redact->getNext().isAdvanced()); + ASSERT_TRUE(redact->getNext().isPaused()); + ASSERT_TRUE(redact->getNext().isAdvanced()); + ASSERT_TRUE(redact->getNext().isPaused()); + ASSERT_TRUE(redact->getNext().isEOF()); + ASSERT_TRUE(redact->getNext().isEOF()); +} } // namespace } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_replace_root_test.cpp b/src/mongo/db/pipeline/document_source_replace_root_test.cpp index 8f7797d6890..0e01d4ac7be 100644 --- a/src/mongo/db/pipeline/document_source_replace_root_test.cpp +++ b/src/mongo/db/pipeline/document_source_replace_root_test.cpp @@ -182,6 +182,27 @@ TEST_F(ReplaceRootBasics, SystemVariableForNewRootReplacesRootWithThatObject) { assertExhausted(replaceRoot); } +TEST_F(ReplaceRootBasics, ShouldPropagatePauses) { + auto replaceRoot = createReplaceRoot(BSON("newRoot" + << "$$ROOT")); + auto mock = DocumentSourceMock::create({Document(), + DocumentSource::GetNextResult::makePauseExecution(), + Document(), + Document(), + DocumentSource::GetNextResult::makePauseExecution(), + DocumentSource::GetNextResult::makePauseExecution()}); + replaceRoot->setSource(mock.get()); + + ASSERT_TRUE(replaceRoot->getNext().isAdvanced()); + ASSERT_TRUE(replaceRoot->getNext().isPaused()); + ASSERT_TRUE(replaceRoot->getNext().isAdvanced()); + ASSERT_TRUE(replaceRoot->getNext().isAdvanced()); + ASSERT_TRUE(replaceRoot->getNext().isPaused()); + ASSERT_TRUE(replaceRoot->getNext().isPaused()); + + assertExhausted(replaceRoot); +} + // Verify that when the expression at newRoot does not resolve to an object, as per the spec we // throw a user assertion. TEST_F(ReplaceRootBasics, ErrorsWhenNewRootDoesNotEvaluateToAnObject) { diff --git a/src/mongo/db/pipeline/document_source_sample_test.cpp b/src/mongo/db/pipeline/document_source_sample_test.cpp index 333b0cc5b16..22d6a4e0f25 100644 --- a/src/mongo/db/pipeline/document_source_sample_test.cpp +++ b/src/mongo/db/pipeline/document_source_sample_test.cpp @@ -40,6 +40,7 @@ #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/service_context.h" #include "mongo/stdx/memory.h" +#include "mongo/unittest/death_test.h" #include "mongo/unittest/unittest.h" #include "mongo/util/clock_source_mock.h" #include "mongo/util/tick_source_mock.h" @@ -182,6 +183,25 @@ TEST_F(SampleBasics, DocsUnmodified) { assertEOF(); } +TEST_F(SampleBasics, ShouldPropagatePauses) { + createSample(2); + source()->queue.push_back(Document()); + source()->queue.push_back(DocumentSource::GetNextResult::makePauseExecution()); + source()->queue.push_back(Document()); + source()->queue.push_back(DocumentSource::GetNextResult::makePauseExecution()); + source()->queue.push_back(Document()); + source()->queue.push_back(DocumentSource::GetNextResult::makePauseExecution()); + + // The $sample stage needs to populate itself, so should propagate all three pauses before + // returning any results. + ASSERT_TRUE(sample()->getNext().isPaused()); + ASSERT_TRUE(sample()->getNext().isPaused()); + ASSERT_TRUE(sample()->getNext().isPaused()); + ASSERT_TRUE(sample()->getNext().isAdvanced()); + ASSERT_TRUE(sample()->getNext().isAdvanced()); + assertEOF(); +} + /** * Fixture to test error cases of the $sample stage. */ @@ -383,5 +403,18 @@ TEST_F(SampleFromRandomCursorBasics, MimicNonOptimized) { ASSERT_GTE(secondTotal / nTrials, 0.48); ASSERT_LTE(secondTotal / nTrials, 0.52); } + +DEATH_TEST_F(SampleFromRandomCursorBasics, + ShouldFailIfGivenPausedInput, + "Invariant failure Hit a MONGO_UNREACHABLE!") { + createSample(2); + source()->queue.push_back(Document{{"_id", 1}}); + source()->queue.push_back(DocumentSource::GetNextResult::makePauseExecution()); + + // Should see the first result, then see a pause and fail. + ASSERT_TRUE(sample()->getNext().isAdvanced()); + sample()->getNext(); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_skip_test.cpp b/src/mongo/db/pipeline/document_source_skip_test.cpp new file mode 100644 index 00000000000..175efad4788 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_skip_test.cpp @@ -0,0 +1,68 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/aggregation_context_fixture.h" +#include "mongo/db/pipeline/document.h" +#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_value_test_util.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { + +// This provides access to getExpCtx(), but we'll use a different name for this test suite. +using DocumentSourceSkipTest = AggregationContextFixture; + +TEST_F(DocumentSourceSkipTest, ShouldPropagatePauses) { + auto skip = DocumentSourceSkip::create(getExpCtx(), 2); + auto mock = DocumentSourceMock::create({Document(), + DocumentSource::GetNextResult::makePauseExecution(), + Document(), + Document(), + DocumentSource::GetNextResult::makePauseExecution(), + DocumentSource::GetNextResult::makePauseExecution()}); + skip->setSource(mock.get()); + + // Skip the first document. + ASSERT_TRUE(skip->getNext().isPaused()); + + // Skip one more, then advance. + ASSERT_TRUE(skip->getNext().isAdvanced()); + + ASSERT_TRUE(skip->getNext().isPaused()); + ASSERT_TRUE(skip->getNext().isPaused()); + + ASSERT_TRUE(skip->getNext().isEOF()); + ASSERT_TRUE(skip->getNext().isEOF()); + ASSERT_TRUE(skip->getNext().isEOF()); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp index e87002816d1..2091b6a5cf4 100644 --- a/src/mongo/db/pipeline/document_source_sort.cpp +++ b/src/mongo/db/pipeline/document_source_sort.cpp @@ -107,10 +107,10 @@ long long DocumentSourceSort::getLimit() const { return limitSrc ? limitSrc->getLimit() : -1; } -void DocumentSourceSort::addKey(const string& fieldPath, bool ascending) { +void DocumentSourceSort::addKey(StringData fieldPath, bool ascending) { VariablesIdGenerator idGenerator; VariablesParseState vps(&idGenerator); - vSortKey.push_back(ExpressionFieldPath::parse("$$ROOT." + fieldPath, vps)); + vSortKey.push_back(ExpressionFieldPath::parse("$$ROOT." + fieldPath.toString(), vps)); vAscending.push_back(ascending); } @@ -173,16 +173,19 @@ intrusive_ptr<DocumentSource> DocumentSourceSort::createFromBson( } intrusive_ptr<DocumentSourceSort> DocumentSourceSort::create( - const intrusive_ptr<ExpressionContext>& pExpCtx, BSONObj sortOrder, long long limit) { - intrusive_ptr<DocumentSourceSort> pSort = new DocumentSourceSort(pExpCtx); + const intrusive_ptr<ExpressionContext>& pExpCtx, + BSONObj sortOrder, + long long limit, + uint64_t maxMemoryUsageBytes) { + intrusive_ptr<DocumentSourceSort> pSort(new DocumentSourceSort(pExpCtx)); + pSort->_maxMemoryUsageBytes = maxMemoryUsageBytes; pSort->injectExpressionContext(pExpCtx); pSort->_sort = sortOrder.getOwned(); - /* check for then iterate over the sort object */ - BSONForEach(keyField, sortOrder) { - const char* fieldName = keyField.fieldName(); + for (auto&& keyField : sortOrder) { + auto fieldName = keyField.fieldNameStringData(); - if (str::equals(fieldName, "$mergePresorted")) { + if ("$mergePresorted" == fieldName) { verify(keyField.Bool()); pSort->_mergingPresorted = true; continue; @@ -235,7 +238,7 @@ SortOptions DocumentSourceSort::makeSortOptions() const { if (limitSrc) opts.limit = limitSrc->getLimit(); - opts.maxMemoryUsageBytes = 100 * 1024 * 1024; + opts.maxMemoryUsageBytes = _maxMemoryUsageBytes; if (pExpCtx->extSortAllowed && !pExpCtx->inRouter) { opts.extSortAllowed = true; opts.tempDir = pExpCtx->tempDir; diff --git a/src/mongo/db/pipeline/document_source_sort_test.cpp b/src/mongo/db/pipeline/document_source_sort_test.cpp index 1d54a9a5618..04864c3cb2d 100644 --- a/src/mongo/db/pipeline/document_source_sort_test.cpp +++ b/src/mongo/db/pipeline/document_source_sort_test.cpp @@ -43,6 +43,7 @@ #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_value_test_util.h" #include "mongo/db/pipeline/pipeline.h" +#include "mongo/unittest/temp_dir.h" #include "mongo/unittest/unittest.h" namespace mongo { @@ -349,5 +350,122 @@ TEST_F(DocumentSourceSortExecutionTest, ExtractArrayValues) { "[{_id:1,a:[{b:1},{b:1}]},{_id:0,a:[{b:1},{b:2}]}]"); } +TEST_F(DocumentSourceSortExecutionTest, ShouldPauseWhenAskedTo) { + auto sort = DocumentSourceSort::create(getExpCtx(), BSON("a" << 1)); + auto mock = DocumentSourceMock::create({DocumentSource::GetNextResult::makePauseExecution(), + Document{{"a", 0}}, + DocumentSource::GetNextResult::makePauseExecution()}); + sort->setSource(mock.get()); + + // Should propagate the first pause. + ASSERT_TRUE(sort->getNext().isPaused()); + + // Should load the single document, then pause. + ASSERT_TRUE(sort->getNext().isPaused()); + + // Now it should start giving results. + auto result = sort->getNext(); + ASSERT_TRUE(result.isAdvanced()); + ASSERT_DOCUMENT_EQ(result.releaseDocument(), (Document{{"a", 0}})); +} + +TEST_F(DocumentSourceSortExecutionTest, ShouldResumePopulationBetweenPauses) { + auto sort = DocumentSourceSort::create(getExpCtx(), BSON("a" << 1)); + auto mock = DocumentSourceMock::create({Document{{"a", 1}}, + DocumentSource::GetNextResult::makePauseExecution(), + Document{{"a", 0}}}); + sort->setSource(mock.get()); + + // Should load the first document, then propagate the pause. + ASSERT_TRUE(sort->getNext().isPaused()); + + // Should finish loading and start yielding results in sorted order. + auto result = sort->getNext(); + ASSERT_TRUE(result.isAdvanced()); + ASSERT_DOCUMENT_EQ(result.releaseDocument(), (Document{{"a", 0}})); + + result = sort->getNext(); + ASSERT_TRUE(result.isAdvanced()); + ASSERT_DOCUMENT_EQ(result.releaseDocument(), (Document{{"a", 1}})); + + ASSERT_TRUE(sort->getNext().isEOF()); + ASSERT_TRUE(sort->getNext().isEOF()); + ASSERT_TRUE(sort->getNext().isEOF()); +} + +TEST_F(DocumentSourceSortExecutionTest, ShouldBeAbleToPauseLoadingWhileSpilled) { + auto expCtx = getExpCtx(); + + // Allow the $sort stage to spill to disk. + unittest::TempDir tempDir("DocumentSourceSortTest"); + expCtx->tempDir = tempDir.path(); + expCtx->extSortAllowed = true; + const size_t maxMemoryUsageBytes = 1000; + + auto sort = DocumentSourceSort::create(expCtx, BSON("_id" << -1), -1, maxMemoryUsageBytes); + + string largeStr(maxMemoryUsageBytes, 'x'); + auto mock = DocumentSourceMock::create({Document{{"_id", 0}, {"largeStr", largeStr}}, + DocumentSource::GetNextResult::makePauseExecution(), + Document{{"_id", 1}, {"largeStr", largeStr}}, + DocumentSource::GetNextResult::makePauseExecution(), + Document{{"_id", 2}, {"largeStr", largeStr}}}); + sort->setSource(mock.get()); + + // There were 2 pauses, so we should expect 2 paused results before any results can be returned. + ASSERT_TRUE(sort->getNext().isPaused()); + ASSERT_TRUE(sort->getNext().isPaused()); + + // Now we expect to get the results back, sorted by _id descending. + auto next = sort->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_VALUE_EQ(next.releaseDocument()["_id"], Value(2)); + + next = sort->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_VALUE_EQ(next.releaseDocument()["_id"], Value(1)); + + next = sort->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_VALUE_EQ(next.releaseDocument()["_id"], Value(0)); +} + +TEST_F(DocumentSourceSortExecutionTest, + ShouldErrorIfNotAllowedToSpillToDiskAndResultSetIsTooLarge) { + auto expCtx = getExpCtx(); + expCtx->extSortAllowed = false; + const size_t maxMemoryUsageBytes = 1000; + + auto sort = DocumentSourceSort::create(expCtx, BSON("_id" << -1), -1, maxMemoryUsageBytes); + + string largeStr(maxMemoryUsageBytes, 'x'); + auto mock = DocumentSourceMock::create({Document{{"_id", 0}, {"largeStr", largeStr}}, + Document{{"_id", 1}, {"largeStr", largeStr}}}); + sort->setSource(mock.get()); + + ASSERT_THROWS_CODE(sort->getNext(), UserException, 16819); +} + +TEST_F(DocumentSourceSortExecutionTest, ShouldCorrectlyTrackMemoryUsageBetweenPauses) { + auto expCtx = getExpCtx(); + expCtx->extSortAllowed = false; + const size_t maxMemoryUsageBytes = 1000; + + auto sort = DocumentSourceSort::create(expCtx, BSON("_id" << -1), -1, maxMemoryUsageBytes); + + string largeStr(maxMemoryUsageBytes / 2, 'x'); + auto mock = DocumentSourceMock::create({Document{{"_id", 0}, {"largeStr", largeStr}}, + DocumentSource::GetNextResult::makePauseExecution(), + Document{{"_id", 1}, {"largeStr", largeStr}}, + Document{{"_id", 2}, {"largeStr", largeStr}}}); + sort->setSource(mock.get()); + + // The first getNext() should pause. + ASSERT_TRUE(sort->getNext().isPaused()); + + // The next should realize it's used too much memory. + ASSERT_THROWS_CODE(sort->getNext(), UserException, 16819); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_unwind_test.cpp b/src/mongo/db/pipeline/document_source_unwind_test.cpp index 6be7ed5b703..56ad90dca9a 100644 --- a/src/mongo/db/pipeline/document_source_unwind_test.cpp +++ b/src/mongo/db/pipeline/document_source_unwind_test.cpp @@ -693,6 +693,30 @@ TEST_F(UnwindStageTest, TruncatesOutputSortAtUnwoundPath) { ASSERT_EQUALS(1U, outputSort.count(BSON("a" << 1))); } +TEST_F(UnwindStageTest, ShouldPropagatePauses) { + const bool includeNullIfEmptyOrMissing = false; + const boost::optional<std::string> includeArrayIndex = boost::none; + auto unwind = DocumentSourceUnwind::create( + getExpCtx(), "array", includeNullIfEmptyOrMissing, includeArrayIndex); + auto source = + DocumentSourceMock::create({Document{{"array", vector<Value>{Value(1), Value(2)}}}, + DocumentSource::GetNextResult::makePauseExecution(), + Document{{"array", vector<Value>{Value(1), Value(2)}}}}); + + unwind->setSource(source.get()); + + ASSERT_TRUE(unwind->getNext().isAdvanced()); + ASSERT_TRUE(unwind->getNext().isAdvanced()); + + ASSERT_TRUE(unwind->getNext().isPaused()); + + ASSERT_TRUE(unwind->getNext().isAdvanced()); + ASSERT_TRUE(unwind->getNext().isAdvanced()); + + ASSERT_TRUE(unwind->getNext().isEOF()); + ASSERT_TRUE(unwind->getNext().isEOF()); +} + // // Error cases. // diff --git a/src/mongo/db/pipeline/stub_mongod_interface.h b/src/mongo/db/pipeline/stub_mongod_interface.h new file mode 100644 index 00000000000..57279650f43 --- /dev/null +++ b/src/mongo/db/pipeline/stub_mongod_interface.h @@ -0,0 +1,96 @@ +/** + * Copyright (C) 2016 MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/document_source.h" + +#include "mongo/util/assert_util.h" + +namespace mongo { + +/** + * A stub MongodInterface that can be used for testing. Create a subclass and override methods as + * needed. + */ +class StubMongodInterface : public DocumentSourceNeedsMongod::MongodInterface { +public: + virtual ~StubMongodInterface() = default; + + void setOperationContext(OperationContext* opCtx) override { + MONGO_UNREACHABLE; + } + + DBClientBase* directClient() override { + MONGO_UNREACHABLE; + } + + bool isSharded(const NamespaceString& ns) override { + MONGO_UNREACHABLE; + } + + BSONObj insert(const NamespaceString& ns, const std::vector<BSONObj>& objs) override { + MONGO_UNREACHABLE; + } + + CollectionIndexUsageMap getIndexStats(OperationContext* opCtx, + const NamespaceString& ns) override { + MONGO_UNREACHABLE; + } + + void appendLatencyStats(const NamespaceString& nss, + bool includeHistograms, + BSONObjBuilder* builder) const override { + MONGO_UNREACHABLE; + } + + Status appendStorageStats(const NamespaceString& nss, + const BSONObj& param, + BSONObjBuilder* builder) const override { + MONGO_UNREACHABLE; + } + + BSONObj getCollectionOptions(const NamespaceString& nss) override { + MONGO_UNREACHABLE; + } + + Status renameIfOptionsAndIndexesHaveNotChanged( + const BSONObj& renameCommandObj, + const NamespaceString& targetNs, + const BSONObj& originalCollectionOptions, + const std::list<BSONObj>& originalIndexes) override { + MONGO_UNREACHABLE; + } + + StatusWith<boost::intrusive_ptr<Pipeline>> makePipeline( + const std::vector<BSONObj>& rawPipeline, + const boost::intrusive_ptr<ExpressionContext>& expCtx) override { + MONGO_UNREACHABLE; + } +}; +} // namespace mongo diff --git a/src/mongo/db/pipeline/tee_buffer.cpp b/src/mongo/db/pipeline/tee_buffer.cpp index 51ed90241e9..f16a6ffd289 100644 --- a/src/mongo/db/pipeline/tee_buffer.cpp +++ b/src/mongo/db/pipeline/tee_buffer.cpp @@ -81,13 +81,6 @@ void TeeBuffer::loadNextBatch() { auto input = _source->getNext(); for (; input.isAdvanced(); input = _source->getNext()) { - - // For the following reasons, we invariant that we never get a paused input: - // - TeeBuffer is the only place where a paused GetNextReturn will be returned. - // - The $facet stage is the only stage that uses TeeBuffer. - // - We currently disallow nested $facet stages. - invariant(!input.isPaused()); - bytesInBuffer += input.getDocument().getApproximateSize(); _buffer.push_back(std::move(input)); @@ -96,6 +89,12 @@ void TeeBuffer::loadNextBatch() { } } + // For the following reasons, we invariant that we never get a paused input: + // - TeeBuffer is the only place where a paused GetNextReturn will be returned. + // - The $facet stage is the only stage that uses TeeBuffer. + // - We currently disallow nested $facet stages. + invariant(!input.isPaused()); + // Populate the pending returns. for (size_t consumerId = 0; consumerId < _consumers.size(); ++consumerId) { if (_consumers[consumerId].stillInUse) { diff --git a/src/mongo/db/pipeline/tee_buffer_test.cpp b/src/mongo/db/pipeline/tee_buffer_test.cpp index edb52d3fb41..10a692922e0 100644 --- a/src/mongo/db/pipeline/tee_buffer_test.cpp +++ b/src/mongo/db/pipeline/tee_buffer_test.cpp @@ -90,6 +90,7 @@ TEST(TeeBufferTest, ShouldProvideAllResultsWithoutPauseIfOnlyOneConsumer) { teeBuffer->setSource(mock.get()); auto next = teeBuffer->getNext(0); + ASSERT_TRUE(next.isAdvanced()); ASSERT_DOCUMENT_EQ(next.getDocument(), inputs.front().getDocument()); next = teeBuffer->getNext(0); |