summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/pipeline/SConscript1
-rw-r--r--src/mongo/db/pipeline/document_source.h66
-rw-r--r--src/mongo/db/pipeline/document_source_add_fields_test.cpp18
-rw-r--r--src/mongo/db/pipeline/document_source_bucket_auto_test.cpp57
-rw-r--r--src/mongo/db/pipeline/document_source_facet_test.cpp18
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup.cpp19
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup_test.cpp228
-rw-r--r--src/mongo/db/pipeline/document_source_group_test.cpp130
-rw-r--r--src/mongo/db/pipeline/document_source_limit_test.cpp22
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_test.cpp144
-rw-r--r--src/mongo/db/pipeline/document_source_match_test.cpp23
-rw-r--r--src/mongo/db/pipeline/document_source_mock_test.cpp12
-rw-r--r--src/mongo/db/pipeline/document_source_project_test.cpp22
-rw-r--r--src/mongo/db/pipeline/document_source_redact_test.cpp19
-rw-r--r--src/mongo/db/pipeline/document_source_replace_root_test.cpp21
-rw-r--r--src/mongo/db/pipeline/document_source_sample_test.cpp33
-rw-r--r--src/mongo/db/pipeline/document_source_skip_test.cpp68
-rw-r--r--src/mongo/db/pipeline/document_source_sort.cpp21
-rw-r--r--src/mongo/db/pipeline/document_source_sort_test.cpp118
-rw-r--r--src/mongo/db/pipeline/document_source_unwind_test.cpp24
-rw-r--r--src/mongo/db/pipeline/stub_mongod_interface.h96
-rw-r--r--src/mongo/db/pipeline/tee_buffer.cpp13
-rw-r--r--src/mongo/db/pipeline/tee_buffer_test.cpp1
23 files changed, 1048 insertions, 126 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 0176859c6e1..8a4b39f9477 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -131,6 +131,7 @@ env.CppUnitTest(
'document_source_redact_test.cpp',
'document_source_replace_root_test.cpp',
'document_source_sample_test.cpp',
+ 'document_source_skip_test.cpp',
'document_source_sort_by_count_test.cpp',
'document_source_sort_test.cpp',
'document_source_test.cpp',
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index 49801200e4b..90f158aec1d 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -1418,6 +1418,8 @@ private:
class DocumentSourceSort final : public DocumentSource, public SplittableDocumentSource {
public:
+ static const uint64_t kMaxMemoryUsageBytes = 100 * 1024 * 1024;
+
// virtuals from DocumentSource
GetNextResult getNext() final;
const char* getSourceName() const final;
@@ -1441,42 +1443,27 @@ public:
boost::intrusive_ptr<DocumentSource> getShardSource() final;
boost::intrusive_ptr<DocumentSource> getMergeSource() final;
- /**
- Add sort key field.
-
- Adds a sort key field to the key being built up. A concatenated
- key is built up by calling this repeatedly.
-
- @param fieldPath the field path to the key component
- @param ascending if true, use the key for an ascending sort,
- otherwise, use it for descending
- */
- void addKey(const std::string& fieldPath, bool ascending);
-
/// Write out a Document whose contents are the sort key.
Document serializeSortKey(bool explain) const;
/**
- Create a sorting DocumentSource from BSON.
-
- This is a convenience method that uses the above, and operates on
- a BSONElement that has been deteremined to be an Object with an
- element named $group.
-
- @param pBsonElement the BSONELement that defines the group
- @param pExpCtx the expression context for the pipeline
- @returns the grouping DocumentSource
+ * Parses a $sort stage from the user-supplied BSON.
*/
static boost::intrusive_ptr<DocumentSource> createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
- /// Create a DocumentSourceSort with a given sort and (optional) limit
+ /**
+ * Convenience method for creating a $sort stage.
+ */
static boost::intrusive_ptr<DocumentSourceSort> create(
const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
BSONObj sortOrder,
- long long limit = -1);
+ long long limit = -1,
+ uint64_t maxMemoryUsageBytes = kMaxMemoryUsageBytes);
- /// returns -1 for no limit
+ /**
+ * Returns -1 for no limit.
+ */
long long getLimit() const;
/**
@@ -1510,10 +1497,15 @@ private:
explicit DocumentSourceSort(const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
Value serialize(bool explain = false) const final {
- verify(false); // should call addToBsonArray instead
+ MONGO_UNREACHABLE; // Should call serializeToArray instead.
}
/**
+ * Helper to add a sort key to this stage.
+ */
+ void addKey(StringData fieldPath, bool ascending);
+
+ /**
* Before returning anything, we have to consume all input and sort it. This method consumes all
* input and prepares the sorted stream '_output'.
*
@@ -1568,6 +1560,7 @@ private:
boost::intrusive_ptr<DocumentSourceLimit> limitSrc;
+ uint64_t _maxMemoryUsageBytes;
bool _done;
bool _mergingPresorted;
std::unique_ptr<MySorter> _sorter;
@@ -1912,7 +1905,8 @@ public:
boost::intrusive_ptr<Expression> startWith,
boost::optional<BSONObj> additionalFilter,
boost::optional<FieldPath> depthField,
- boost::optional<long long> maxDepth);
+ boost::optional<long long> maxDepth,
+ boost::optional<boost::intrusive_ptr<DocumentSourceUnwind>> unwindSrc);
static boost::intrusive_ptr<DocumentSource> createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
@@ -1921,15 +1915,17 @@ protected:
void doInjectExpressionContext() final;
private:
- DocumentSourceGraphLookUp(NamespaceString from,
- std::string as,
- std::string connectFromField,
- std::string connectToField,
- boost::intrusive_ptr<Expression> startWith,
- boost::optional<BSONObj> additionalFilter,
- boost::optional<FieldPath> depthField,
- boost::optional<long long> maxDepth,
- const boost::intrusive_ptr<ExpressionContext>& expCtx);
+ DocumentSourceGraphLookUp(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ NamespaceString from,
+ std::string as,
+ std::string connectFromField,
+ std::string connectToField,
+ boost::intrusive_ptr<Expression> startWith,
+ boost::optional<BSONObj> additionalFilter,
+ boost::optional<FieldPath> depthField,
+ boost::optional<long long> maxDepth,
+ boost::optional<boost::intrusive_ptr<DocumentSourceUnwind>> unwindSrc);
Value serialize(bool explain = false) const final {
// Should not be called; use serializeToArray instead.
diff --git a/src/mongo/db/pipeline/document_source_add_fields_test.cpp b/src/mongo/db/pipeline/document_source_add_fields_test.cpp
index eabea7ff045..724c896e4c0 100644
--- a/src/mongo/db/pipeline/document_source_add_fields_test.cpp
+++ b/src/mongo/db/pipeline/document_source_add_fields_test.cpp
@@ -132,5 +132,23 @@ TEST_F(AddFieldsTest, ShouldAddReferencedFieldsToDependencies) {
ASSERT_EQUALS(true, dependencies.getNeedTextScore());
}
+TEST_F(AddFieldsTest, ShouldPropagatePauses) {
+ auto addFields = DocumentSourceAddFields::create(BSON("a" << 10), getExpCtx());
+ auto mock = DocumentSourceMock::create({Document(),
+ DocumentSource::GetNextResult::makePauseExecution(),
+ Document(),
+ DocumentSource::GetNextResult::makePauseExecution()});
+ addFields->setSource(mock.get());
+
+ ASSERT_TRUE(addFields->getNext().isAdvanced());
+ ASSERT_TRUE(addFields->getNext().isPaused());
+ ASSERT_TRUE(addFields->getNext().isAdvanced());
+ ASSERT_TRUE(addFields->getNext().isPaused());
+
+ ASSERT_TRUE(addFields->getNext().isEOF());
+ ASSERT_TRUE(addFields->getNext().isEOF());
+ ASSERT_TRUE(addFields->getNext().isEOF());
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp b/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp
index 17e0c1d7a3b..f572925b42f 100644
--- a/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp
+++ b/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp
@@ -308,6 +308,39 @@ TEST_F(BucketAutoTests, RespectsCanonicalTypeOrderingOfValues) {
ASSERT_DOCUMENT_EQ(results[1], Document(fromjson("{_id : {min : 'a', max : 'b'}, count : 2}")));
}
+TEST_F(BucketAutoTests, ShouldPropagatePauses) {
+ auto bucketAutoSpec = fromjson("{$bucketAuto : {groupBy : '$x', buckets : 2}}");
+ auto bucketAutoStage = createBucketAuto(bucketAutoSpec);
+ auto source = DocumentSourceMock::create({Document{{"x", 1}},
+ DocumentSource::GetNextResult::makePauseExecution(),
+ Document{{"x", 2}},
+ Document{{"x", 3}},
+ DocumentSource::GetNextResult::makePauseExecution(),
+ Document{{"x", 4}},
+ DocumentSource::GetNextResult::makePauseExecution()});
+ bucketAutoStage->setSource(source.get());
+
+ // The $bucketAuto stage needs to consume all inputs before returning any output, so we should
+ // see all three pauses before any advances.
+ ASSERT_TRUE(bucketAutoStage->getNext().isPaused());
+ ASSERT_TRUE(bucketAutoStage->getNext().isPaused());
+ ASSERT_TRUE(bucketAutoStage->getNext().isPaused());
+
+ auto next = bucketAutoStage->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(next.releaseDocument(),
+ Document(fromjson("{_id : {min : 1, max : 3}, count : 2}")));
+
+ next = bucketAutoStage->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(next.releaseDocument(),
+ Document(fromjson("{_id : {min : 3, max : 4}, count : 2}")));
+
+ ASSERT_TRUE(bucketAutoStage->getNext().isEOF());
+ ASSERT_TRUE(bucketAutoStage->getNext().isEOF());
+ ASSERT_TRUE(bucketAutoStage->getNext().isEOF());
+}
+
TEST_F(BucketAutoTests, SourceNameIsBucketAuto) {
auto bucketAuto = createBucketAuto(fromjson("{$bucketAuto : {groupBy : '$x', buckets : 2}}"));
ASSERT_EQUALS(string(bucketAuto->getSourceName()), "$bucketAuto");
@@ -509,20 +542,38 @@ TEST_F(BucketAutoTests, FailsWithInvalidOutputFieldName) {
ASSERT_THROWS_CODE(createBucketAuto(spec), UserException, 40236);
}
-TEST_F(BucketAutoTests, FailsWhenBufferingTooManyDocuments) {
+TEST_F(BucketAutoTests, ShouldFailIfBufferingTooManyDocuments) {
+ const uint64_t maxMemoryUsageBytes = 1000;
deque<DocumentSource::GetNextResult> inputs;
- auto largeStr = string(1000, 'b');
+ auto largeStr = string(maxMemoryUsageBytes, 'b');
auto inputDoc = Document{{"a", largeStr}};
- ASSERT_GTE(inputDoc.getApproximateSize(), 1000UL);
+ ASSERT_GTE(inputDoc.getApproximateSize(), maxMemoryUsageBytes);
inputs.emplace_back(std::move(inputDoc));
inputs.emplace_back(Document{{"a", largeStr}});
auto mock = DocumentSourceMock::create(inputs);
+ const int numBuckets = 1;
+ auto bucketAuto =
+ DocumentSourceBucketAuto::create(getExpCtx(), numBuckets, maxMemoryUsageBytes);
+ bucketAuto->setSource(mock.get());
+ ASSERT_THROWS_CODE(bucketAuto->getNext(), UserException, 16819);
+}
+
+TEST_F(BucketAutoTests, ShouldFailIfBufferingTooManyDocumentsEvenIfPaused) {
const uint64_t maxMemoryUsageBytes = 1000;
+ auto largeStr = string(maxMemoryUsageBytes / 2, 'b');
+ auto inputDoc = Document{{"a", largeStr}};
+ ASSERT_GT(inputDoc.getApproximateSize(), maxMemoryUsageBytes / 2);
+
+ auto mock = DocumentSourceMock::create({std::move(inputDoc),
+ DocumentSource::GetNextResult::makePauseExecution(),
+ Document{{"a", largeStr}}});
+
const int numBuckets = 1;
auto bucketAuto =
DocumentSourceBucketAuto::create(getExpCtx(), numBuckets, maxMemoryUsageBytes);
bucketAuto->setSource(mock.get());
+ ASSERT_TRUE(bucketAuto->getNext().isPaused());
ASSERT_THROWS_CODE(bucketAuto->getNext(), UserException, 16819);
}
diff --git a/src/mongo/db/pipeline/document_source_facet_test.cpp b/src/mongo/db/pipeline/document_source_facet_test.cpp
index 0dda75adb1b..9c8cbce17e0 100644
--- a/src/mongo/db/pipeline/document_source_facet_test.cpp
+++ b/src/mongo/db/pipeline/document_source_facet_test.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/pipeline/aggregation_context_fixture.h"
#include "mongo/db/pipeline/document.h"
#include "mongo/db/pipeline/document_value_test_util.h"
+#include "mongo/unittest/death_test.h"
#include "mongo/unittest/unittest.h"
namespace mongo {
@@ -301,6 +302,23 @@ TEST_F(DocumentSourceFacetTest, ShouldBeAbleToEvaluateMultipleStagesWithinOneSub
ASSERT_DOCUMENT_EQ(output.getDocument(), Document(fromjson("{subPipe: [{_id: 0}, {_id: 1}]}")));
}
+// TODO: DocumentSourceFacet will have to propagate pauses if we ever allow nested $facets.
+DEATH_TEST_F(DocumentSourceFacetTest,
+ ShouldFailIfGivenPausedInput,
+ "Invariant failure !input.isPaused()") {
+ auto ctx = getExpCtx();
+
+ auto firstDummy = DocumentSourcePassthrough::create();
+ auto pipeline = uassertStatusOK(Pipeline::create({firstDummy}, ctx));
+
+ auto facetStage = DocumentSourceFacet::create({{"subPipe", pipeline}}, ctx);
+
+ auto mock = DocumentSourceMock::create(DocumentSource::GetNextResult::makePauseExecution());
+ facetStage->setSource(mock.get());
+
+ facetStage->getNext(); // This should cause a crash.
+}
+
//
// Miscellaneous.
//
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp
index b9f3d0aa4e8..daa51ad8294 100644
--- a/src/mongo/db/pipeline/document_source_graph_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp
@@ -450,6 +450,7 @@ void DocumentSourceGraphLookUp::doReattachToOperationContext(OperationContext* o
}
DocumentSourceGraphLookUp::DocumentSourceGraphLookUp(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
NamespaceString from,
std::string as,
std::string connectFromField,
@@ -458,7 +459,7 @@ DocumentSourceGraphLookUp::DocumentSourceGraphLookUp(
boost::optional<BSONObj> additionalFilter,
boost::optional<FieldPath> depthField,
boost::optional<long long> maxDepth,
- const boost::intrusive_ptr<ExpressionContext>& expCtx)
+ boost::optional<boost::intrusive_ptr<DocumentSourceUnwind>> unwindSrc)
: DocumentSourceNeedsMongod(expCtx),
_from(std::move(from)),
_as(std::move(as)),
@@ -469,7 +470,8 @@ DocumentSourceGraphLookUp::DocumentSourceGraphLookUp(
_depthField(depthField),
_maxDepth(maxDepth),
_visited(ValueComparator::kInstance.makeUnorderedValueMap<BSONObj>()),
- _cache(expCtx->getValueComparator()) {}
+ _cache(expCtx->getValueComparator()),
+ _unwind(unwindSrc) {}
intrusive_ptr<DocumentSourceGraphLookUp> DocumentSourceGraphLookUp::create(
const intrusive_ptr<ExpressionContext>& expCtx,
@@ -480,9 +482,11 @@ intrusive_ptr<DocumentSourceGraphLookUp> DocumentSourceGraphLookUp::create(
intrusive_ptr<Expression> startWith,
boost::optional<BSONObj> additionalFilter,
boost::optional<FieldPath> depthField,
- boost::optional<long long> maxDepth) {
+ boost::optional<long long> maxDepth,
+ boost::optional<boost::intrusive_ptr<DocumentSourceUnwind>> unwindSrc) {
intrusive_ptr<DocumentSourceGraphLookUp> source(
- new DocumentSourceGraphLookUp(std::move(fromNs),
+ new DocumentSourceGraphLookUp(expCtx,
+ std::move(fromNs),
std::move(asField),
std::move(connectFromField),
std::move(connectToField),
@@ -490,7 +494,7 @@ intrusive_ptr<DocumentSourceGraphLookUp> DocumentSourceGraphLookUp::create(
additionalFilter,
depthField,
maxDepth,
- expCtx));
+ unwindSrc));
source->_variables.reset(new Variables());
source->injectExpressionContext(expCtx);
@@ -595,7 +599,8 @@ intrusive_ptr<DocumentSource> DocumentSourceGraphLookUp::createFromBson(
!isMissingRequiredField);
intrusive_ptr<DocumentSourceGraphLookUp> newSource(
- new DocumentSourceGraphLookUp(std::move(from),
+ new DocumentSourceGraphLookUp(expCtx,
+ std::move(from),
std::move(as),
std::move(connectFromField),
std::move(connectToField),
@@ -603,7 +608,7 @@ intrusive_ptr<DocumentSource> DocumentSourceGraphLookUp::createFromBson(
additionalFilter,
depthField,
maxDepth,
- expCtx));
+ boost::none));
newSource->_variables.reset(new Variables(idGenerator.getIdCount()));
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp
index 398831cc728..6f74a0d6280 100644
--- a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp
+++ b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp
@@ -36,6 +36,7 @@
#include "mongo/db/pipeline/aggregation_context_fixture.h"
#include "mongo/db/pipeline/document.h"
#include "mongo/db/pipeline/document_value_test_util.h"
+#include "mongo/db/pipeline/stub_mongod_interface.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/mongoutils/str.h"
@@ -60,56 +61,11 @@ using DocumentSourceGraphLookUpTest = AggregationContextFixture;
* A MongodInterface use for testing that supports making pipelines with an initial
* DocumentSourceMock source.
*/
-class MockMongodImplementation final : public DocumentSourceNeedsMongod::MongodInterface {
+class MockMongodImplementation final : public StubMongodInterface {
public:
MockMongodImplementation(std::deque<DocumentSource::GetNextResult> results)
: _results(std::move(results)) {}
- void setOperationContext(OperationContext* opCtx) final {
- MONGO_UNREACHABLE;
- }
-
- DBClientBase* directClient() final {
- MONGO_UNREACHABLE;
- }
-
- bool isSharded(const NamespaceString& ns) final {
- MONGO_UNREACHABLE;
- }
-
- BSONObj insert(const NamespaceString& ns, const std::vector<BSONObj>& objs) final {
- MONGO_UNREACHABLE;
- }
-
- CollectionIndexUsageMap getIndexStats(OperationContext* opCtx,
- const NamespaceString& ns) final {
- MONGO_UNREACHABLE;
- }
-
- void appendLatencyStats(const NamespaceString& nss,
- bool showHistograms,
- BSONObjBuilder* builder) const final {
- MONGO_UNREACHABLE;
- }
-
- Status appendStorageStats(const NamespaceString& nss,
- const BSONObj& param,
- BSONObjBuilder* builder) const final {
- MONGO_UNREACHABLE;
- }
-
- BSONObj getCollectionOptions(const NamespaceString& nss) final {
- MONGO_UNREACHABLE;
- }
-
- Status renameIfOptionsAndIndexesHaveNotChanged(
- const BSONObj& renameCommandObj,
- const NamespaceString& targetNs,
- const BSONObj& originalCollectionOptions,
- const std::list<BSONObj>& originalIndexes) final {
- MONGO_UNREACHABLE;
- }
-
StatusWith<boost::intrusive_ptr<Pipeline>> makePipeline(
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx) final {
@@ -148,6 +104,7 @@ TEST_F(DocumentSourceGraphLookUpTest,
ExpressionFieldPath::create("_id"),
boost::none,
boost::none,
+ boost::none,
boost::none);
graphLookupStage->setSource(inputMock.get());
graphLookupStage->injectMongodInterface(
@@ -176,6 +133,7 @@ TEST_F(DocumentSourceGraphLookUpTest,
ExpressionFieldPath::create("_id"),
boost::none,
boost::none,
+ boost::none,
boost::none);
graphLookupStage->setSource(inputMock.get());
graphLookupStage->injectMongodInterface(
@@ -195,6 +153,7 @@ TEST_F(DocumentSourceGraphLookUpTest,
NamespaceString fromNs("test", "graph_lookup");
expCtx->resolvedNamespaces[fromNs.coll()] = {fromNs, std::vector<BSONObj>{}};
+ auto unwindStage = DocumentSourceUnwind::create(expCtx, "results", false, boost::none);
auto graphLookupStage = DocumentSourceGraphLookUp::create(expCtx,
fromNs,
"results",
@@ -203,16 +162,13 @@ TEST_F(DocumentSourceGraphLookUpTest,
ExpressionFieldPath::create("_id"),
boost::none,
boost::none,
- boost::none);
+ boost::none,
+ unwindStage);
graphLookupStage->injectMongodInterface(
std::make_shared<MockMongodImplementation>(std::move(fromContents)));
+ graphLookupStage->setSource(inputMock.get());
- auto unwindStage = DocumentSourceUnwind::create(expCtx, "results", false, boost::none);
- auto pipeline =
- unittest::assertGet(Pipeline::create({inputMock, graphLookupStage, unwindStage}, expCtx));
- pipeline->optimizePipeline();
-
- ASSERT_THROWS_CODE(pipeline->getNext(), UserException, 40271);
+ ASSERT_THROWS_CODE(graphLookupStage->getNext(), UserException, 40271);
}
bool arrayContains(const boost::intrusive_ptr<ExpressionContext>& expCtx,
@@ -248,19 +204,20 @@ TEST_F(DocumentSourceGraphLookUpTest,
ExpressionFieldPath::create("_id"),
boost::none,
boost::none,
+ boost::none,
boost::none);
graphLookupStage->setSource(inputMock.get());
graphLookupStage->injectMongodInterface(
std::make_shared<MockMongodImplementation>(std::move(fromContents)));
- auto pipeline = unittest::assertGet(Pipeline::create({inputMock, graphLookupStage}, expCtx));
+ graphLookupStage->setSource(inputMock.get());
- auto next = pipeline->getNext();
- ASSERT(next);
+ auto next = graphLookupStage->getNext();
+ ASSERT_TRUE(next.isAdvanced());
- ASSERT_EQ(2U, next->size());
- ASSERT_VALUE_EQ(Value(0), next->getField("_id"));
+ ASSERT_EQ(2U, next.getDocument().size());
+ ASSERT_VALUE_EQ(Value(0), next.getDocument().getField("_id"));
- auto resultsValue = next->getField("results");
+ auto resultsValue = next.getDocument().getField("results");
ASSERT(resultsValue.isArray());
auto resultsArray = resultsValue.getArray();
@@ -271,23 +228,166 @@ TEST_F(DocumentSourceGraphLookUpTest,
ASSERT(arrayContains(expCtx, resultsArray, Value(to1)));
ASSERT_EQ(2U, resultsArray.size());
- next = pipeline->getNext();
- ASSERT(!next);
+ next = graphLookupStage->getNext();
+ ASSERT(next.isEOF());
} else if (arrayContains(expCtx, resultsArray, Value(to0from2))) {
// If 'to0from2' was returned, then we should see 'to2' and nothing else.
ASSERT(arrayContains(expCtx, resultsArray, Value(to2)));
ASSERT_EQ(2U, resultsArray.size());
- next = pipeline->getNext();
- ASSERT(!next);
+ next = graphLookupStage->getNext();
+ ASSERT(next.isEOF());
} else {
FAIL(str::stream() << "Expected either [ " << to0from1.toString() << " ] or [ "
<< to0from2.toString()
<< " ] but found [ "
- << next->toString()
+ << next.getDocument().toString()
<< " ]");
}
}
+TEST_F(DocumentSourceGraphLookUpTest, ShouldPropagatePauses) {
+ auto expCtx = getExpCtx();
+
+ auto inputMock =
+ DocumentSourceMock::create({Document{{"startPoint", 0}},
+ DocumentSource::GetNextResult::makePauseExecution(),
+ Document{{"startPoint", 0}},
+ DocumentSource::GetNextResult::makePauseExecution()});
+
+ std::deque<DocumentSource::GetNextResult> fromContents{
+ Document{{"_id", "a"}, {"to", 0}, {"from", 1}}, Document{{"_id", "b"}, {"to", 1}}};
+
+ NamespaceString fromNs("test", "foreign");
+ expCtx->resolvedNamespaces[fromNs.coll()] = {fromNs, std::vector<BSONObj>{}};
+ auto graphLookupStage =
+ DocumentSourceGraphLookUp::create(expCtx,
+ fromNs,
+ "results",
+ "from",
+ "to",
+ ExpressionFieldPath::create("startPoint"),
+ boost::none,
+ boost::none,
+ boost::none,
+ boost::none);
+
+ graphLookupStage->setSource(inputMock.get());
+
+ graphLookupStage->injectMongodInterface(
+ std::make_shared<MockMongodImplementation>(std::move(fromContents)));
+
+ auto next = graphLookupStage->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+
+ // We expect {startPoint: 0, results: [{_id: "a", to: 0, from: 1}, {_id: "b", to: 1}]}, but the
+ // 'results' array can be in any order. So we use arrayContains to assert it has the right
+ // contents.
+ auto result = next.releaseDocument();
+ ASSERT_VALUE_EQ(result["startPoint"], Value(0));
+ ASSERT_EQ(result["results"].getType(), BSONType::Array);
+ ASSERT_EQ(result["results"].getArray().size(), 2UL);
+ ASSERT_TRUE(arrayContains(expCtx,
+ result["results"].getArray(),
+ Value(Document{{"_id", "a"}, {"to", 0}, {"from", 1}})));
+ ASSERT_TRUE(arrayContains(
+ expCtx, result["results"].getArray(), Value(Document{{"_id", "b"}, {"to", 1}})));
+
+ ASSERT_TRUE(graphLookupStage->getNext().isPaused());
+
+ next = graphLookupStage->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ result = next.releaseDocument();
+ ASSERT_VALUE_EQ(result["startPoint"], Value(0));
+ ASSERT_EQ(result["results"].getType(), BSONType::Array);
+ ASSERT_EQ(result["results"].getArray().size(), 2UL);
+ ASSERT_TRUE(arrayContains(expCtx,
+ result["results"].getArray(),
+ Value(Document{{"_id", "a"}, {"to", 0}, {"from", 1}})));
+ ASSERT_TRUE(arrayContains(
+ expCtx, result["results"].getArray(), Value(Document{{"_id", "b"}, {"to", 1}})));
+
+ ASSERT_TRUE(graphLookupStage->getNext().isPaused());
+
+ ASSERT_TRUE(graphLookupStage->getNext().isEOF());
+ ASSERT_TRUE(graphLookupStage->getNext().isEOF());
+}
+
+TEST_F(DocumentSourceGraphLookUpTest, ShouldPropagatePausesWhileUnwinding) {
+ auto expCtx = getExpCtx();
+
+ // Set up the $graphLookup stage
+ auto inputMock =
+ DocumentSourceMock::create({Document{{"startPoint", 0}},
+ DocumentSource::GetNextResult::makePauseExecution(),
+ Document{{"startPoint", 0}},
+ DocumentSource::GetNextResult::makePauseExecution()});
+
+ std::deque<DocumentSource::GetNextResult> fromContents{
+ Document{{"_id", "a"}, {"to", 0}, {"from", 1}}, Document{{"_id", "b"}, {"to", 1}}};
+
+ NamespaceString fromNs("test", "foreign");
+ expCtx->resolvedNamespaces[fromNs.coll()] = {fromNs, std::vector<BSONObj>{}};
+
+ const bool preserveNullAndEmptyArrays = false;
+ const boost::optional<std::string> includeArrayIndex = boost::none;
+ auto unwindStage = DocumentSourceUnwind::create(
+ expCtx, "results", preserveNullAndEmptyArrays, includeArrayIndex);
+
+ auto graphLookupStage =
+ DocumentSourceGraphLookUp::create(expCtx,
+ fromNs,
+ "results",
+ "from",
+ "to",
+ ExpressionFieldPath::create("startPoint"),
+ boost::none,
+ boost::none,
+ boost::none,
+ unwindStage);
+
+ graphLookupStage->setSource(inputMock.get());
+
+ graphLookupStage->injectMongodInterface(
+ std::make_shared<MockMongodImplementation>(std::move(fromContents)));
+
+ // Assert it has the expected results. Note the results can be in either order.
+ auto expectedA =
+ Document{{"startPoint", 0}, {"results", Document{{"_id", "a"}, {"to", 0}, {"from", 1}}}};
+ auto expectedB = Document{{"startPoint", 0}, {"results", Document{{"_id", "b"}, {"to", 1}}}};
+ auto next = graphLookupStage->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ if (expCtx->getDocumentComparator().evaluate(next.getDocument() == expectedA)) {
+ next = graphLookupStage->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(next.releaseDocument(), expectedB);
+ } else {
+ ASSERT_DOCUMENT_EQ(next.releaseDocument(), expectedB);
+ next = graphLookupStage->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(next.releaseDocument(), expectedA);
+ }
+
+ ASSERT_TRUE(graphLookupStage->getNext().isPaused());
+
+ next = graphLookupStage->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ if (expCtx->getDocumentComparator().evaluate(next.getDocument() == expectedA)) {
+ next = graphLookupStage->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(next.releaseDocument(), expectedB);
+ } else {
+ ASSERT_DOCUMENT_EQ(next.releaseDocument(), expectedB);
+ next = graphLookupStage->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(next.releaseDocument(), expectedA);
+ }
+
+ ASSERT_TRUE(graphLookupStage->getNext().isPaused());
+
+ ASSERT_TRUE(graphLookupStage->getNext().isEOF());
+ ASSERT_TRUE(graphLookupStage->getNext().isEOF());
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_group_test.cpp b/src/mongo/db/pipeline/document_source_group_test.cpp
index 850fc9589d1..32b6d1f6f87 100644
--- a/src/mongo/db/pipeline/document_source_group_test.cpp
+++ b/src/mongo/db/pipeline/document_source_group_test.cpp
@@ -38,15 +38,18 @@
#include "mongo/bson/bsonmisc.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/json.h"
+#include "mongo/db/pipeline/aggregation_context_fixture.h"
#include "mongo/db/pipeline/aggregation_request.h"
#include "mongo/db/pipeline/dependencies.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_value_test_util.h"
+#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/value_comparator.h"
#include "mongo/db/query/query_test_service_context.h"
#include "mongo/dbtests/dbtests.h"
#include "mongo/stdx/memory.h"
+#include "mongo/stdx/unordered_set.h"
#include "mongo/unittest/temp_dir.h"
#include "mongo/unittest/unittest.h"
@@ -61,6 +64,133 @@ using std::vector;
static const char* const ns = "unittests.document_source_group_tests";
+// This provides access to getExpCtx(), but we'll use a different name for this test suite.
+using DocumentSourceGroupTest = AggregationContextFixture;
+
+TEST_F(DocumentSourceGroupTest, ShouldBeAbleToPauseLoading) {
+ auto expCtx = getExpCtx();
+ expCtx->inRouter = true; // Disallow external sort.
+ // This is the only way to do this in a debug build.
+ AccumulationStatement countStatement{"count",
+ AccumulationStatement::getFactory("$sum"),
+ ExpressionConstant::create(expCtx, Value(1))};
+ auto group = DocumentSourceGroup::create(
+ expCtx, ExpressionConstant::create(expCtx, Value(BSONNULL)), {countStatement}, 0);
+ auto mock = DocumentSourceMock::create({DocumentSource::GetNextResult::makePauseExecution(),
+ Document(),
+ DocumentSource::GetNextResult::makePauseExecution(),
+ Document(),
+ Document(),
+ DocumentSource::GetNextResult::makePauseExecution(),
+ Document()});
+ group->setSource(mock.get());
+
+ // There were 3 pauses, so we should expect 3 paused results before any results can be returned.
+ ASSERT_TRUE(group->getNext().isPaused());
+ ASSERT_TRUE(group->getNext().isPaused());
+ ASSERT_TRUE(group->getNext().isPaused());
+
+ // There were 4 documents, so we expect a count of 4.
+ auto result = group->getNext();
+ ASSERT_TRUE(result.isAdvanced());
+ ASSERT_DOCUMENT_EQ(result.releaseDocument(), (Document{{"_id", BSONNULL}, {"count", 4}}));
+}
+
+TEST_F(DocumentSourceGroupTest, ShouldBeAbleToPauseLoadingWhileSpilled) {
+ auto expCtx = getExpCtx();
+
+ // Allow the $group stage to spill to disk.
+ TempDir tempDir("DocumentSourceGroupTest");
+ expCtx->tempDir = tempDir.path();
+ expCtx->extSortAllowed = true;
+ const size_t maxMemoryUsageBytes = 1000;
+
+ VariablesIdGenerator idGen;
+ VariablesParseState vps(&idGen);
+ AccumulationStatement pushStatement{"spaceHog",
+ AccumulationStatement::getFactory("$push"),
+ ExpressionFieldPath::parse("$largeStr", vps)};
+ auto groupByExpression = ExpressionFieldPath::parse("$_id", vps);
+ auto group = DocumentSourceGroup::create(
+ expCtx, groupByExpression, {pushStatement}, idGen.getIdCount(), maxMemoryUsageBytes);
+
+ string largeStr(maxMemoryUsageBytes, 'x');
+ auto mock = DocumentSourceMock::create({Document{{"_id", 0}, {"largeStr", largeStr}},
+ DocumentSource::GetNextResult::makePauseExecution(),
+ Document{{"_id", 1}, {"largeStr", largeStr}},
+ DocumentSource::GetNextResult::makePauseExecution(),
+ Document{{"_id", 2}, {"largeStr", largeStr}}});
+ group->setSource(mock.get());
+
+ // There were 2 pauses, so we should expect 2 paused results before any results can be returned.
+ ASSERT_TRUE(group->getNext().isPaused());
+ ASSERT_TRUE(group->getNext().isPaused());
+
+ // Now we expect to get the results back, although in no particular order.
+ stdx::unordered_set<int> idSet;
+ for (auto result = group->getNext(); result.isAdvanced(); result = group->getNext()) {
+ idSet.insert(result.releaseDocument()["_id"].coerceToInt());
+ }
+ ASSERT_TRUE(group->getNext().isEOF());
+
+ ASSERT_EQ(idSet.size(), 3UL);
+ ASSERT_EQ(idSet.count(0), 1UL);
+ ASSERT_EQ(idSet.count(1), 1UL);
+ ASSERT_EQ(idSet.count(2), 1UL);
+}
+
+TEST_F(DocumentSourceGroupTest, ShouldErrorIfNotAllowedToSpillToDiskAndResultSetIsTooLarge) {
+ auto expCtx = getExpCtx();
+ const size_t maxMemoryUsageBytes = 1000;
+ expCtx->inRouter = true; // Disallow external sort.
+ // This is the only way to do this in a debug build.
+
+ VariablesIdGenerator idGen;
+ VariablesParseState vps(&idGen);
+ AccumulationStatement pushStatement{"spaceHog",
+ AccumulationStatement::getFactory("$push"),
+ ExpressionFieldPath::parse("$largeStr", vps)};
+ auto groupByExpression = ExpressionFieldPath::parse("$_id", vps);
+ auto group = DocumentSourceGroup::create(
+ expCtx, groupByExpression, {pushStatement}, idGen.getIdCount(), maxMemoryUsageBytes);
+
+ string largeStr(maxMemoryUsageBytes, 'x');
+ auto mock = DocumentSourceMock::create({Document{{"_id", 0}, {"largeStr", largeStr}},
+ Document{{"_id", 1}, {"largeStr", largeStr}}});
+ group->setSource(mock.get());
+
+ ASSERT_THROWS_CODE(group->getNext(), UserException, 16945);
+}
+
+TEST_F(DocumentSourceGroupTest, ShouldCorrectlyTrackMemoryUsageBetweenPauses) {
+ auto expCtx = getExpCtx();
+ const size_t maxMemoryUsageBytes = 1000;
+ expCtx->inRouter = true; // Disallow external sort.
+ // This is the only way to do this in a debug build.
+
+ VariablesIdGenerator idGen;
+ VariablesParseState vps(&idGen);
+ AccumulationStatement pushStatement{"spaceHog",
+ AccumulationStatement::getFactory("$push"),
+ ExpressionFieldPath::parse("$largeStr", vps)};
+ auto groupByExpression = ExpressionFieldPath::parse("$_id", vps);
+ auto group = DocumentSourceGroup::create(
+ expCtx, groupByExpression, {pushStatement}, idGen.getIdCount(), maxMemoryUsageBytes);
+
+ string largeStr(maxMemoryUsageBytes / 2, 'x');
+ auto mock = DocumentSourceMock::create({Document{{"_id", 0}, {"largeStr", largeStr}},
+ DocumentSource::GetNextResult::makePauseExecution(),
+ Document{{"_id", 1}, {"largeStr", largeStr}},
+ Document{{"_id", 2}, {"largeStr", largeStr}}});
+ group->setSource(mock.get());
+
+ // The first getNext() should pause.
+ ASSERT_TRUE(group->getNext().isPaused());
+
+ // The next should realize it's used too much memory.
+ ASSERT_THROWS_CODE(group->getNext(), UserException, 16945);
+}
+
BSONObj toBson(const intrusive_ptr<DocumentSource>& source) {
vector<Value> arr;
source->serializeToArray(arr);
diff --git a/src/mongo/db/pipeline/document_source_limit_test.cpp b/src/mongo/db/pipeline/document_source_limit_test.cpp
index 7de5bf85a3a..2fc83eed374 100644
--- a/src/mongo/db/pipeline/document_source_limit_test.cpp
+++ b/src/mongo/db/pipeline/document_source_limit_test.cpp
@@ -99,5 +99,27 @@ TEST_F(DocumentSourceLimitTest, ShouldNotIntroduceAnyDependencies) {
ASSERT_EQUALS(false, dependencies.getNeedTextScore());
}
+TEST_F(DocumentSourceLimitTest, ShouldPropagatePauses) {
+ auto limit = DocumentSourceLimit::create(getExpCtx(), 2);
+ auto mock = DocumentSourceMock::create({DocumentSource::GetNextResult::makePauseExecution(),
+ Document(),
+ DocumentSource::GetNextResult::makePauseExecution(),
+ Document(),
+ DocumentSource::GetNextResult::makePauseExecution(),
+ Document()});
+ limit->setSource(mock.get());
+
+ ASSERT_TRUE(limit->getNext().isPaused());
+ ASSERT_TRUE(limit->getNext().isAdvanced());
+ ASSERT_TRUE(limit->getNext().isPaused());
+ ASSERT_TRUE(limit->getNext().isAdvanced());
+
+ // We've reached the limit.
+ ASSERT_TRUE(mock->isDisposed);
+ ASSERT_TRUE(limit->getNext().isEOF());
+ ASSERT_TRUE(limit->getNext().isEOF());
+ ASSERT_TRUE(limit->getNext().isEOF());
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp
index 2ab28f3e392..c42eebdb2f1 100644
--- a/src/mongo/db/pipeline/document_source_lookup_test.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp
@@ -29,6 +29,7 @@
#include "mongo/platform/basic.h"
#include <boost/intrusive_ptr.hpp>
+#include <deque>
#include <vector>
#include "mongo/bson/bsonmisc.h"
@@ -37,12 +38,15 @@
#include "mongo/db/pipeline/aggregation_context_fixture.h"
#include "mongo/db/pipeline/document.h"
#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_value_test_util.h"
#include "mongo/db/pipeline/field_path.h"
+#include "mongo/db/pipeline/stub_mongod_interface.h"
#include "mongo/db/pipeline/value.h"
namespace mongo {
namespace {
using boost::intrusive_ptr;
+using std::deque;
using std::vector;
// This provides access to getExpCtx(), but we'll use a different name for this test suite.
@@ -125,5 +129,145 @@ TEST(MakeMatchStageFromInput, ArrayValueWithRegexUsesOrQuery) {
<< BSONObj()))));
}
+//
+// Execution tests.
+//
+
+/**
+ * A mock MongodInterface which allows mocking a foreign pipeline.
+ */
+class MockMongodInterface final : public StubMongodInterface {
+public:
+ MockMongodInterface(deque<DocumentSource::GetNextResult> mockResults)
+ : _mockResults(std::move(mockResults)) {}
+
+ bool isSharded(const NamespaceString& ns) final {
+ return false;
+ }
+
+ StatusWith<boost::intrusive_ptr<Pipeline>> makePipeline(
+ const std::vector<BSONObj>& rawPipeline,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx) final {
+ auto pipeline = Pipeline::parse(rawPipeline, expCtx);
+ if (!pipeline.isOK()) {
+ return pipeline.getStatus();
+ }
+
+ pipeline.getValue()->addInitialSource(DocumentSourceMock::create(_mockResults));
+ pipeline.getValue()->injectExpressionContext(expCtx);
+ pipeline.getValue()->optimizePipeline();
+
+ return pipeline;
+ }
+
+private:
+ deque<DocumentSource::GetNextResult> _mockResults;
+};
+
+TEST_F(DocumentSourceLookUpTest, ShouldPropagatePauses) {
+ auto expCtx = getExpCtx();
+ NamespaceString fromNs("test", "foreign");
+ expCtx->resolvedNamespaces[fromNs.coll()] = {fromNs, std::vector<BSONObj>{}};
+
+ // Set up the $lookup stage.
+ auto lookupSpec = Document{{"$lookup",
+ Document{{"from", fromNs.coll()},
+ {"localField", "foreignId"},
+ {"foreignField", "_id"},
+ {"as", "foreignDocs"}}}}
+ .toBson();
+ auto parsed = DocumentSourceLookUp::createFromBson(lookupSpec.firstElement(), expCtx);
+ auto lookup = static_cast<DocumentSourceLookUp*>(parsed.get());
+
+ // Mock its input, pausing every other result.
+ auto mockLocalSource =
+ DocumentSourceMock::create({Document{{"foreignId", 0}},
+ DocumentSource::GetNextResult::makePauseExecution(),
+ Document{{"foreignId", 1}},
+ DocumentSource::GetNextResult::makePauseExecution()});
+
+ lookup->setSource(mockLocalSource.get());
+ lookup->injectExpressionContext(expCtx);
+
+ // Mock out the foreign collection.
+ deque<DocumentSource::GetNextResult> mockForeignContents{Document{{"_id", 0}},
+ Document{{"_id", 1}}};
+ lookup->injectMongodInterface(
+ std::make_shared<MockMongodInterface>(std::move(mockForeignContents)));
+
+ auto next = lookup->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(
+ next.releaseDocument(),
+ (Document{{"foreignId", 0}, {"foreignDocs", vector<Value>{Value(Document{{"_id", 0}})}}}));
+
+ ASSERT_TRUE(lookup->getNext().isPaused());
+
+ next = lookup->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(
+ next.releaseDocument(),
+ (Document{{"foreignId", 1}, {"foreignDocs", vector<Value>{Value(Document{{"_id", 1}})}}}));
+
+ ASSERT_TRUE(lookup->getNext().isPaused());
+
+ ASSERT_TRUE(lookup->getNext().isEOF());
+ ASSERT_TRUE(lookup->getNext().isEOF());
+}
+
+TEST_F(DocumentSourceLookUpTest, ShouldPropagatePausesWhileUnwinding) {
+ auto expCtx = getExpCtx();
+ NamespaceString fromNs("test", "foreign");
+ expCtx->resolvedNamespaces[fromNs.coll()] = {fromNs, std::vector<BSONObj>{}};
+
+ // Set up the $lookup stage.
+ auto lookupSpec = Document{{"$lookup",
+ Document{{"from", fromNs.coll()},
+ {"localField", "foreignId"},
+ {"foreignField", "_id"},
+ {"as", "foreignDoc"}}}}
+ .toBson();
+ auto parsed = DocumentSourceLookUp::createFromBson(lookupSpec.firstElement(), expCtx);
+ auto lookup = static_cast<DocumentSourceLookUp*>(parsed.get());
+
+ const bool preserveNullAndEmptyArrays = false;
+ const boost::optional<std::string> includeArrayIndex = boost::none;
+ lookup->setUnwindStage(DocumentSourceUnwind::create(
+ expCtx, "foreignDoc", preserveNullAndEmptyArrays, includeArrayIndex));
+
+ // Mock its input, pausing every other result.
+ auto mockLocalSource =
+ DocumentSourceMock::create({Document{{"foreignId", 0}},
+ DocumentSource::GetNextResult::makePauseExecution(),
+ Document{{"foreignId", 1}},
+ DocumentSource::GetNextResult::makePauseExecution()});
+ lookup->setSource(mockLocalSource.get());
+
+ lookup->injectExpressionContext(expCtx);
+
+ // Mock out the foreign collection.
+ deque<DocumentSource::GetNextResult> mockForeignContents{Document{{"_id", 0}},
+ Document{{"_id", 1}}};
+ lookup->injectMongodInterface(
+ std::make_shared<MockMongodInterface>(std::move(mockForeignContents)));
+
+ auto next = lookup->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(next.releaseDocument(),
+ (Document{{"foreignId", 0}, {"foreignDoc", Document{{"_id", 0}}}}));
+
+ ASSERT_TRUE(lookup->getNext().isPaused());
+
+ next = lookup->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(next.releaseDocument(),
+ (Document{{"foreignId", 1}, {"foreignDoc", Document{{"_id", 1}}}}));
+
+ ASSERT_TRUE(lookup->getNext().isPaused());
+
+ ASSERT_TRUE(lookup->getNext().isEOF());
+ ASSERT_TRUE(lookup->getNext().isEOF());
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_match_test.cpp b/src/mongo/db/pipeline/document_source_match_test.cpp
index d61262ed1f7..eb4d40bbc97 100644
--- a/src/mongo/db/pipeline/document_source_match_test.cpp
+++ b/src/mongo/db/pipeline/document_source_match_test.cpp
@@ -302,6 +302,29 @@ TEST_F(DocumentSourceMatchTest, MultipleMatchStagesShouldCombineIntoOne) {
"{c:1}]}"));
}
+TEST_F(DocumentSourceMatchTest, ShouldPropagatePauses) {
+ auto match = DocumentSourceMatch::create(BSON("a" << 1), getExpCtx());
+ auto mock = DocumentSourceMock::create({DocumentSource::GetNextResult::makePauseExecution(),
+ Document{{"a", 1}},
+ DocumentSource::GetNextResult::makePauseExecution(),
+ Document{{"a", 2}},
+ Document{{"a", 2}},
+ DocumentSource::GetNextResult::makePauseExecution(),
+ Document{{"a", 1}}});
+ match->setSource(mock.get());
+
+ ASSERT_TRUE(match->getNext().isPaused());
+ ASSERT_TRUE(match->getNext().isAdvanced());
+ ASSERT_TRUE(match->getNext().isPaused());
+
+ // {a: 2} doesn't match, should go directly to the next pause.
+ ASSERT_TRUE(match->getNext().isPaused());
+ ASSERT_TRUE(match->getNext().isAdvanced());
+ ASSERT_TRUE(match->getNext().isEOF());
+ ASSERT_TRUE(match->getNext().isEOF());
+ ASSERT_TRUE(match->getNext().isEOF());
+}
+
TEST(ObjectForMatch, ShouldExtractTopLevelFieldIfDottedFieldNeeded) {
Document input(fromjson("{a: 1, b: {c: 1, d: 1}}"));
BSONObj expected = fromjson("{b: {c: 1, d: 1}}");
diff --git a/src/mongo/db/pipeline/document_source_mock_test.cpp b/src/mongo/db/pipeline/document_source_mock_test.cpp
index acf4f21f3fe..e4826ff5b24 100644
--- a/src/mongo/db/pipeline/document_source_mock_test.cpp
+++ b/src/mongo/db/pipeline/document_source_mock_test.cpp
@@ -43,13 +43,23 @@ TEST(DocumentSourceMockTest, OneDoc) {
ASSERT(source->getNext().isEOF());
}
-TEST(DocumentSourceMockTest, DequeDocuments) {
+TEST(DocumentSourceMockTest, ShouldBeConstructableFromInitializerListOfDocuments) {
auto source = DocumentSourceMock::create({Document{{"a", 1}}, Document{{"a", 2}}});
ASSERT_DOCUMENT_EQ(source->getNext().getDocument(), (Document{{"a", 1}}));
ASSERT_DOCUMENT_EQ(source->getNext().getDocument(), (Document{{"a", 2}}));
ASSERT(source->getNext().isEOF());
}
+TEST(DocumentSourceMockTest, ShouldBeConstructableFromDequeOfResults) {
+ auto source = DocumentSourceMock::create({Document{{"a", 1}},
+ DocumentSource::GetNextResult::makePauseExecution(),
+ Document{{"a", 2}}});
+ ASSERT_DOCUMENT_EQ(source->getNext().getDocument(), (Document{{"a", 1}}));
+ ASSERT_TRUE(source->getNext().isPaused());
+ ASSERT_DOCUMENT_EQ(source->getNext().getDocument(), (Document{{"a", 2}}));
+ ASSERT(source->getNext().isEOF());
+}
+
TEST(DocumentSourceMockTest, StringJSON) {
auto source = DocumentSourceMock::create("{a : 1}");
ASSERT_DOCUMENT_EQ(source->getNext().getDocument(), (Document{{"a", 1}}));
diff --git a/src/mongo/db/pipeline/document_source_project_test.cpp b/src/mongo/db/pipeline/document_source_project_test.cpp
index 12c5f72f087..de2bd12e254 100644
--- a/src/mongo/db/pipeline/document_source_project_test.cpp
+++ b/src/mongo/db/pipeline/document_source_project_test.cpp
@@ -134,6 +134,28 @@ TEST_F(ProjectStageTest, ExclusionShouldBeAbleToProcessMultipleDocuments) {
ASSERT(project->getNext().isEOF());
}
+TEST_F(ProjectStageTest, ShouldPropagatePauses) {
+ auto project = DocumentSourceProject::create(BSON("a" << false), getExpCtx());
+ auto source = DocumentSourceMock::create({Document(),
+ DocumentSource::GetNextResult::makePauseExecution(),
+ Document(),
+ DocumentSource::GetNextResult::makePauseExecution(),
+ Document(),
+ DocumentSource::GetNextResult::makePauseExecution()});
+ project->setSource(source.get());
+
+ ASSERT_TRUE(project->getNext().isAdvanced());
+ ASSERT_TRUE(project->getNext().isPaused());
+ ASSERT_TRUE(project->getNext().isAdvanced());
+ ASSERT_TRUE(project->getNext().isPaused());
+ ASSERT_TRUE(project->getNext().isAdvanced());
+ ASSERT_TRUE(project->getNext().isPaused());
+
+ ASSERT(project->getNext().isEOF());
+ ASSERT(project->getNext().isEOF());
+ ASSERT(project->getNext().isEOF());
+}
+
TEST_F(ProjectStageTest, InclusionShouldAddDependenciesOfIncludedAndComputedFields) {
auto project = DocumentSourceProject::create(
fromjson("{a: true, x: '$b', y: {$and: ['$c','$d']}, z: {$meta: 'textScore'}}"),
diff --git a/src/mongo/db/pipeline/document_source_redact_test.cpp b/src/mongo/db/pipeline/document_source_redact_test.cpp
index f4de62feff0..a6a4bfd9f7b 100644
--- a/src/mongo/db/pipeline/document_source_redact_test.cpp
+++ b/src/mongo/db/pipeline/document_source_redact_test.cpp
@@ -57,5 +57,24 @@ TEST_F(DocumentSourceRedactTest, ShouldCopyRedactSafePartOfMatchBeforeItself) {
ASSERT_EQUALS(pipeline.size(), 3U);
ASSERT(dynamic_cast<DocumentSourceMatch*>(pipeline.front().get()));
}
+
+TEST_F(DocumentSourceRedactTest, ShouldPropagatePauses) {
+ auto redactSpec = BSON("$redact"
+ << "$$KEEP");
+ auto redact = DocumentSourceRedact::createFromBson(redactSpec.firstElement(), getExpCtx());
+ auto mock = DocumentSourceMock::create({Document{{"_id", 0}},
+ DocumentSource::GetNextResult::makePauseExecution(),
+ Document{{"_id", 1}},
+ DocumentSource::GetNextResult::makePauseExecution()});
+ redact->setSource(mock.get());
+
+ // The $redact is keeping everything, so we should see everything from the mock, then EOF.
+ ASSERT_TRUE(redact->getNext().isAdvanced());
+ ASSERT_TRUE(redact->getNext().isPaused());
+ ASSERT_TRUE(redact->getNext().isAdvanced());
+ ASSERT_TRUE(redact->getNext().isPaused());
+ ASSERT_TRUE(redact->getNext().isEOF());
+ ASSERT_TRUE(redact->getNext().isEOF());
+}
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_replace_root_test.cpp b/src/mongo/db/pipeline/document_source_replace_root_test.cpp
index 8f7797d6890..0e01d4ac7be 100644
--- a/src/mongo/db/pipeline/document_source_replace_root_test.cpp
+++ b/src/mongo/db/pipeline/document_source_replace_root_test.cpp
@@ -182,6 +182,27 @@ TEST_F(ReplaceRootBasics, SystemVariableForNewRootReplacesRootWithThatObject) {
assertExhausted(replaceRoot);
}
+TEST_F(ReplaceRootBasics, ShouldPropagatePauses) {
+ auto replaceRoot = createReplaceRoot(BSON("newRoot"
+ << "$$ROOT"));
+ auto mock = DocumentSourceMock::create({Document(),
+ DocumentSource::GetNextResult::makePauseExecution(),
+ Document(),
+ Document(),
+ DocumentSource::GetNextResult::makePauseExecution(),
+ DocumentSource::GetNextResult::makePauseExecution()});
+ replaceRoot->setSource(mock.get());
+
+ ASSERT_TRUE(replaceRoot->getNext().isAdvanced());
+ ASSERT_TRUE(replaceRoot->getNext().isPaused());
+ ASSERT_TRUE(replaceRoot->getNext().isAdvanced());
+ ASSERT_TRUE(replaceRoot->getNext().isAdvanced());
+ ASSERT_TRUE(replaceRoot->getNext().isPaused());
+ ASSERT_TRUE(replaceRoot->getNext().isPaused());
+
+ assertExhausted(replaceRoot);
+}
+
// Verify that when the expression at newRoot does not resolve to an object, as per the spec we
// throw a user assertion.
TEST_F(ReplaceRootBasics, ErrorsWhenNewRootDoesNotEvaluateToAnObject) {
diff --git a/src/mongo/db/pipeline/document_source_sample_test.cpp b/src/mongo/db/pipeline/document_source_sample_test.cpp
index 333b0cc5b16..22d6a4e0f25 100644
--- a/src/mongo/db/pipeline/document_source_sample_test.cpp
+++ b/src/mongo/db/pipeline/document_source_sample_test.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/service_context.h"
#include "mongo/stdx/memory.h"
+#include "mongo/unittest/death_test.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/clock_source_mock.h"
#include "mongo/util/tick_source_mock.h"
@@ -182,6 +183,25 @@ TEST_F(SampleBasics, DocsUnmodified) {
assertEOF();
}
+TEST_F(SampleBasics, ShouldPropagatePauses) {
+ createSample(2);
+ source()->queue.push_back(Document());
+ source()->queue.push_back(DocumentSource::GetNextResult::makePauseExecution());
+ source()->queue.push_back(Document());
+ source()->queue.push_back(DocumentSource::GetNextResult::makePauseExecution());
+ source()->queue.push_back(Document());
+ source()->queue.push_back(DocumentSource::GetNextResult::makePauseExecution());
+
+ // The $sample stage needs to populate itself, so should propagate all three pauses before
+ // returning any results.
+ ASSERT_TRUE(sample()->getNext().isPaused());
+ ASSERT_TRUE(sample()->getNext().isPaused());
+ ASSERT_TRUE(sample()->getNext().isPaused());
+ ASSERT_TRUE(sample()->getNext().isAdvanced());
+ ASSERT_TRUE(sample()->getNext().isAdvanced());
+ assertEOF();
+}
+
/**
* Fixture to test error cases of the $sample stage.
*/
@@ -383,5 +403,18 @@ TEST_F(SampleFromRandomCursorBasics, MimicNonOptimized) {
ASSERT_GTE(secondTotal / nTrials, 0.48);
ASSERT_LTE(secondTotal / nTrials, 0.52);
}
+
+DEATH_TEST_F(SampleFromRandomCursorBasics,
+ ShouldFailIfGivenPausedInput,
+ "Invariant failure Hit a MONGO_UNREACHABLE!") {
+ createSample(2);
+ source()->queue.push_back(Document{{"_id", 1}});
+ source()->queue.push_back(DocumentSource::GetNextResult::makePauseExecution());
+
+ // Should see the first result, then see a pause and fail.
+ ASSERT_TRUE(sample()->getNext().isAdvanced());
+ sample()->getNext();
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_skip_test.cpp b/src/mongo/db/pipeline/document_source_skip_test.cpp
new file mode 100644
index 00000000000..175efad4788
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_skip_test.cpp
@@ -0,0 +1,68 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/aggregation_context_fixture.h"
+#include "mongo/db/pipeline/document.h"
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_value_test_util.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+
+// This provides access to getExpCtx(), but we'll use a different name for this test suite.
+using DocumentSourceSkipTest = AggregationContextFixture;
+
+TEST_F(DocumentSourceSkipTest, ShouldPropagatePauses) {
+ auto skip = DocumentSourceSkip::create(getExpCtx(), 2);
+ auto mock = DocumentSourceMock::create({Document(),
+ DocumentSource::GetNextResult::makePauseExecution(),
+ Document(),
+ Document(),
+ DocumentSource::GetNextResult::makePauseExecution(),
+ DocumentSource::GetNextResult::makePauseExecution()});
+ skip->setSource(mock.get());
+
+ // Skip the first document.
+ ASSERT_TRUE(skip->getNext().isPaused());
+
+ // Skip one more, then advance.
+ ASSERT_TRUE(skip->getNext().isAdvanced());
+
+ ASSERT_TRUE(skip->getNext().isPaused());
+ ASSERT_TRUE(skip->getNext().isPaused());
+
+ ASSERT_TRUE(skip->getNext().isEOF());
+ ASSERT_TRUE(skip->getNext().isEOF());
+ ASSERT_TRUE(skip->getNext().isEOF());
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp
index e87002816d1..2091b6a5cf4 100644
--- a/src/mongo/db/pipeline/document_source_sort.cpp
+++ b/src/mongo/db/pipeline/document_source_sort.cpp
@@ -107,10 +107,10 @@ long long DocumentSourceSort::getLimit() const {
return limitSrc ? limitSrc->getLimit() : -1;
}
-void DocumentSourceSort::addKey(const string& fieldPath, bool ascending) {
+void DocumentSourceSort::addKey(StringData fieldPath, bool ascending) {
VariablesIdGenerator idGenerator;
VariablesParseState vps(&idGenerator);
- vSortKey.push_back(ExpressionFieldPath::parse("$$ROOT." + fieldPath, vps));
+ vSortKey.push_back(ExpressionFieldPath::parse("$$ROOT." + fieldPath.toString(), vps));
vAscending.push_back(ascending);
}
@@ -173,16 +173,19 @@ intrusive_ptr<DocumentSource> DocumentSourceSort::createFromBson(
}
intrusive_ptr<DocumentSourceSort> DocumentSourceSort::create(
- const intrusive_ptr<ExpressionContext>& pExpCtx, BSONObj sortOrder, long long limit) {
- intrusive_ptr<DocumentSourceSort> pSort = new DocumentSourceSort(pExpCtx);
+ const intrusive_ptr<ExpressionContext>& pExpCtx,
+ BSONObj sortOrder,
+ long long limit,
+ uint64_t maxMemoryUsageBytes) {
+ intrusive_ptr<DocumentSourceSort> pSort(new DocumentSourceSort(pExpCtx));
+ pSort->_maxMemoryUsageBytes = maxMemoryUsageBytes;
pSort->injectExpressionContext(pExpCtx);
pSort->_sort = sortOrder.getOwned();
- /* check for then iterate over the sort object */
- BSONForEach(keyField, sortOrder) {
- const char* fieldName = keyField.fieldName();
+ for (auto&& keyField : sortOrder) {
+ auto fieldName = keyField.fieldNameStringData();
- if (str::equals(fieldName, "$mergePresorted")) {
+ if ("$mergePresorted" == fieldName) {
verify(keyField.Bool());
pSort->_mergingPresorted = true;
continue;
@@ -235,7 +238,7 @@ SortOptions DocumentSourceSort::makeSortOptions() const {
if (limitSrc)
opts.limit = limitSrc->getLimit();
- opts.maxMemoryUsageBytes = 100 * 1024 * 1024;
+ opts.maxMemoryUsageBytes = _maxMemoryUsageBytes;
if (pExpCtx->extSortAllowed && !pExpCtx->inRouter) {
opts.extSortAllowed = true;
opts.tempDir = pExpCtx->tempDir;
diff --git a/src/mongo/db/pipeline/document_source_sort_test.cpp b/src/mongo/db/pipeline/document_source_sort_test.cpp
index 1d54a9a5618..04864c3cb2d 100644
--- a/src/mongo/db/pipeline/document_source_sort_test.cpp
+++ b/src/mongo/db/pipeline/document_source_sort_test.cpp
@@ -43,6 +43,7 @@
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_value_test_util.h"
#include "mongo/db/pipeline/pipeline.h"
+#include "mongo/unittest/temp_dir.h"
#include "mongo/unittest/unittest.h"
namespace mongo {
@@ -349,5 +350,122 @@ TEST_F(DocumentSourceSortExecutionTest, ExtractArrayValues) {
"[{_id:1,a:[{b:1},{b:1}]},{_id:0,a:[{b:1},{b:2}]}]");
}
+TEST_F(DocumentSourceSortExecutionTest, ShouldPauseWhenAskedTo) {
+ auto sort = DocumentSourceSort::create(getExpCtx(), BSON("a" << 1));
+ auto mock = DocumentSourceMock::create({DocumentSource::GetNextResult::makePauseExecution(),
+ Document{{"a", 0}},
+ DocumentSource::GetNextResult::makePauseExecution()});
+ sort->setSource(mock.get());
+
+ // Should propagate the first pause.
+ ASSERT_TRUE(sort->getNext().isPaused());
+
+ // Should load the single document, then pause.
+ ASSERT_TRUE(sort->getNext().isPaused());
+
+ // Now it should start giving results.
+ auto result = sort->getNext();
+ ASSERT_TRUE(result.isAdvanced());
+ ASSERT_DOCUMENT_EQ(result.releaseDocument(), (Document{{"a", 0}}));
+}
+
+TEST_F(DocumentSourceSortExecutionTest, ShouldResumePopulationBetweenPauses) {
+ auto sort = DocumentSourceSort::create(getExpCtx(), BSON("a" << 1));
+ auto mock = DocumentSourceMock::create({Document{{"a", 1}},
+ DocumentSource::GetNextResult::makePauseExecution(),
+ Document{{"a", 0}}});
+ sort->setSource(mock.get());
+
+ // Should load the first document, then propagate the pause.
+ ASSERT_TRUE(sort->getNext().isPaused());
+
+ // Should finish loading and start yielding results in sorted order.
+ auto result = sort->getNext();
+ ASSERT_TRUE(result.isAdvanced());
+ ASSERT_DOCUMENT_EQ(result.releaseDocument(), (Document{{"a", 0}}));
+
+ result = sort->getNext();
+ ASSERT_TRUE(result.isAdvanced());
+ ASSERT_DOCUMENT_EQ(result.releaseDocument(), (Document{{"a", 1}}));
+
+ ASSERT_TRUE(sort->getNext().isEOF());
+ ASSERT_TRUE(sort->getNext().isEOF());
+ ASSERT_TRUE(sort->getNext().isEOF());
+}
+
+TEST_F(DocumentSourceSortExecutionTest, ShouldBeAbleToPauseLoadingWhileSpilled) {
+ auto expCtx = getExpCtx();
+
+ // Allow the $sort stage to spill to disk.
+ unittest::TempDir tempDir("DocumentSourceSortTest");
+ expCtx->tempDir = tempDir.path();
+ expCtx->extSortAllowed = true;
+ const size_t maxMemoryUsageBytes = 1000;
+
+ auto sort = DocumentSourceSort::create(expCtx, BSON("_id" << -1), -1, maxMemoryUsageBytes);
+
+ string largeStr(maxMemoryUsageBytes, 'x');
+ auto mock = DocumentSourceMock::create({Document{{"_id", 0}, {"largeStr", largeStr}},
+ DocumentSource::GetNextResult::makePauseExecution(),
+ Document{{"_id", 1}, {"largeStr", largeStr}},
+ DocumentSource::GetNextResult::makePauseExecution(),
+ Document{{"_id", 2}, {"largeStr", largeStr}}});
+ sort->setSource(mock.get());
+
+ // There were 2 pauses, so we should expect 2 paused results before any results can be returned.
+ ASSERT_TRUE(sort->getNext().isPaused());
+ ASSERT_TRUE(sort->getNext().isPaused());
+
+ // Now we expect to get the results back, sorted by _id descending.
+ auto next = sort->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_VALUE_EQ(next.releaseDocument()["_id"], Value(2));
+
+ next = sort->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_VALUE_EQ(next.releaseDocument()["_id"], Value(1));
+
+ next = sort->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_VALUE_EQ(next.releaseDocument()["_id"], Value(0));
+}
+
+TEST_F(DocumentSourceSortExecutionTest,
+ ShouldErrorIfNotAllowedToSpillToDiskAndResultSetIsTooLarge) {
+ auto expCtx = getExpCtx();
+ expCtx->extSortAllowed = false;
+ const size_t maxMemoryUsageBytes = 1000;
+
+ auto sort = DocumentSourceSort::create(expCtx, BSON("_id" << -1), -1, maxMemoryUsageBytes);
+
+ string largeStr(maxMemoryUsageBytes, 'x');
+ auto mock = DocumentSourceMock::create({Document{{"_id", 0}, {"largeStr", largeStr}},
+ Document{{"_id", 1}, {"largeStr", largeStr}}});
+ sort->setSource(mock.get());
+
+ ASSERT_THROWS_CODE(sort->getNext(), UserException, 16819);
+}
+
+TEST_F(DocumentSourceSortExecutionTest, ShouldCorrectlyTrackMemoryUsageBetweenPauses) {
+ auto expCtx = getExpCtx();
+ expCtx->extSortAllowed = false;
+ const size_t maxMemoryUsageBytes = 1000;
+
+ auto sort = DocumentSourceSort::create(expCtx, BSON("_id" << -1), -1, maxMemoryUsageBytes);
+
+ string largeStr(maxMemoryUsageBytes / 2, 'x');
+ auto mock = DocumentSourceMock::create({Document{{"_id", 0}, {"largeStr", largeStr}},
+ DocumentSource::GetNextResult::makePauseExecution(),
+ Document{{"_id", 1}, {"largeStr", largeStr}},
+ Document{{"_id", 2}, {"largeStr", largeStr}}});
+ sort->setSource(mock.get());
+
+ // The first getNext() should pause.
+ ASSERT_TRUE(sort->getNext().isPaused());
+
+ // The next should realize it's used too much memory.
+ ASSERT_THROWS_CODE(sort->getNext(), UserException, 16819);
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_unwind_test.cpp b/src/mongo/db/pipeline/document_source_unwind_test.cpp
index 6be7ed5b703..56ad90dca9a 100644
--- a/src/mongo/db/pipeline/document_source_unwind_test.cpp
+++ b/src/mongo/db/pipeline/document_source_unwind_test.cpp
@@ -693,6 +693,30 @@ TEST_F(UnwindStageTest, TruncatesOutputSortAtUnwoundPath) {
ASSERT_EQUALS(1U, outputSort.count(BSON("a" << 1)));
}
+TEST_F(UnwindStageTest, ShouldPropagatePauses) {
+ const bool includeNullIfEmptyOrMissing = false;
+ const boost::optional<std::string> includeArrayIndex = boost::none;
+ auto unwind = DocumentSourceUnwind::create(
+ getExpCtx(), "array", includeNullIfEmptyOrMissing, includeArrayIndex);
+ auto source =
+ DocumentSourceMock::create({Document{{"array", vector<Value>{Value(1), Value(2)}}},
+ DocumentSource::GetNextResult::makePauseExecution(),
+ Document{{"array", vector<Value>{Value(1), Value(2)}}}});
+
+ unwind->setSource(source.get());
+
+ ASSERT_TRUE(unwind->getNext().isAdvanced());
+ ASSERT_TRUE(unwind->getNext().isAdvanced());
+
+ ASSERT_TRUE(unwind->getNext().isPaused());
+
+ ASSERT_TRUE(unwind->getNext().isAdvanced());
+ ASSERT_TRUE(unwind->getNext().isAdvanced());
+
+ ASSERT_TRUE(unwind->getNext().isEOF());
+ ASSERT_TRUE(unwind->getNext().isEOF());
+}
+
//
// Error cases.
//
diff --git a/src/mongo/db/pipeline/stub_mongod_interface.h b/src/mongo/db/pipeline/stub_mongod_interface.h
new file mode 100644
index 00000000000..57279650f43
--- /dev/null
+++ b/src/mongo/db/pipeline/stub_mongod_interface.h
@@ -0,0 +1,96 @@
+/**
+ * Copyright (C) 2016 MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/document_source.h"
+
+#include "mongo/util/assert_util.h"
+
+namespace mongo {
+
+/**
+ * A stub MongodInterface that can be used for testing. Create a subclass and override methods as
+ * needed.
+ */
+class StubMongodInterface : public DocumentSourceNeedsMongod::MongodInterface {
+public:
+ virtual ~StubMongodInterface() = default;
+
+ void setOperationContext(OperationContext* opCtx) override {
+ MONGO_UNREACHABLE;
+ }
+
+ DBClientBase* directClient() override {
+ MONGO_UNREACHABLE;
+ }
+
+ bool isSharded(const NamespaceString& ns) override {
+ MONGO_UNREACHABLE;
+ }
+
+ BSONObj insert(const NamespaceString& ns, const std::vector<BSONObj>& objs) override {
+ MONGO_UNREACHABLE;
+ }
+
+ CollectionIndexUsageMap getIndexStats(OperationContext* opCtx,
+ const NamespaceString& ns) override {
+ MONGO_UNREACHABLE;
+ }
+
+ void appendLatencyStats(const NamespaceString& nss,
+ bool includeHistograms,
+ BSONObjBuilder* builder) const override {
+ MONGO_UNREACHABLE;
+ }
+
+ Status appendStorageStats(const NamespaceString& nss,
+ const BSONObj& param,
+ BSONObjBuilder* builder) const override {
+ MONGO_UNREACHABLE;
+ }
+
+ BSONObj getCollectionOptions(const NamespaceString& nss) override {
+ MONGO_UNREACHABLE;
+ }
+
+ Status renameIfOptionsAndIndexesHaveNotChanged(
+ const BSONObj& renameCommandObj,
+ const NamespaceString& targetNs,
+ const BSONObj& originalCollectionOptions,
+ const std::list<BSONObj>& originalIndexes) override {
+ MONGO_UNREACHABLE;
+ }
+
+ StatusWith<boost::intrusive_ptr<Pipeline>> makePipeline(
+ const std::vector<BSONObj>& rawPipeline,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx) override {
+ MONGO_UNREACHABLE;
+ }
+};
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/tee_buffer.cpp b/src/mongo/db/pipeline/tee_buffer.cpp
index 51ed90241e9..f16a6ffd289 100644
--- a/src/mongo/db/pipeline/tee_buffer.cpp
+++ b/src/mongo/db/pipeline/tee_buffer.cpp
@@ -81,13 +81,6 @@ void TeeBuffer::loadNextBatch() {
auto input = _source->getNext();
for (; input.isAdvanced(); input = _source->getNext()) {
-
- // For the following reasons, we invariant that we never get a paused input:
- // - TeeBuffer is the only place where a paused GetNextReturn will be returned.
- // - The $facet stage is the only stage that uses TeeBuffer.
- // - We currently disallow nested $facet stages.
- invariant(!input.isPaused());
-
bytesInBuffer += input.getDocument().getApproximateSize();
_buffer.push_back(std::move(input));
@@ -96,6 +89,12 @@ void TeeBuffer::loadNextBatch() {
}
}
+ // For the following reasons, we invariant that we never get a paused input:
+ // - TeeBuffer is the only place where a paused GetNextReturn will be returned.
+ // - The $facet stage is the only stage that uses TeeBuffer.
+ // - We currently disallow nested $facet stages.
+ invariant(!input.isPaused());
+
// Populate the pending returns.
for (size_t consumerId = 0; consumerId < _consumers.size(); ++consumerId) {
if (_consumers[consumerId].stillInUse) {
diff --git a/src/mongo/db/pipeline/tee_buffer_test.cpp b/src/mongo/db/pipeline/tee_buffer_test.cpp
index edb52d3fb41..10a692922e0 100644
--- a/src/mongo/db/pipeline/tee_buffer_test.cpp
+++ b/src/mongo/db/pipeline/tee_buffer_test.cpp
@@ -90,6 +90,7 @@ TEST(TeeBufferTest, ShouldProvideAllResultsWithoutPauseIfOnlyOneConsumer) {
teeBuffer->setSource(mock.get());
auto next = teeBuffer->getNext(0);
+ ASSERT_TRUE(next.isAdvanced());
ASSERT_DOCUMENT_EQ(next.getDocument(), inputs.front().getDocument());
next = teeBuffer->getNext(0);