summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline
diff options
context:
space:
mode:
authorTess Avitabile <tess.avitabile@mongodb.com>2017-01-11 17:14:40 -0500
committerTess Avitabile <tess.avitabile@mongodb.com>2017-01-13 17:56:02 -0500
commit92e599237444912607e70a745fe5c0aa00dd4caf (patch)
tree3241f19d67635438a2ceb142d0584d0bb38b5bce /src/mongo/db/pipeline
parent5c2aac3b24d0680418ee8fab1fa6e53be2a0eede (diff)
downloadmongo-92e599237444912607e70a745fe5c0aa00dd4caf.tar.gz
SERVER-24623 Remove single document aggregation result option
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r--src/mongo/db/pipeline/aggregation_request.cpp17
-rw-r--r--src/mongo/db/pipeline/aggregation_request.h18
-rw-r--r--src/mongo/db/pipeline/aggregation_request_test.cpp71
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp24
-rw-r--r--src/mongo/db/pipeline/pipeline.h7
5 files changed, 63 insertions, 74 deletions
diff --git a/src/mongo/db/pipeline/aggregation_request.cpp b/src/mongo/db/pipeline/aggregation_request.cpp
index c741c32c4d3..57a8f3644a9 100644
--- a/src/mongo/db/pipeline/aggregation_request.cpp
+++ b/src/mongo/db/pipeline/aggregation_request.cpp
@@ -58,7 +58,7 @@ const StringData AggregationRequest::kAllowDiskUseName = "allowDiskUse"_sd;
const long long AggregationRequest::kDefaultBatchSize = 101;
AggregationRequest::AggregationRequest(NamespaceString nss, std::vector<BSONObj> pipeline)
- : _nss(std::move(nss)), _pipeline(std::move(pipeline)) {}
+ : _nss(std::move(nss)), _pipeline(std::move(pipeline)), _batchSize(kDefaultBatchSize) {}
StatusWith<AggregationRequest> AggregationRequest::parseFromBSON(NamespaceString nss,
const BSONObj& cmdObj) {
@@ -85,6 +85,8 @@ StatusWith<AggregationRequest> AggregationRequest::parseFromBSON(NamespaceString
kCommandName,
repl::ReadConcernArgs::kReadConcernFieldName};
+ bool hasCursorElem = false;
+
// Parse optional parameters.
for (auto&& elem : cmdObj) {
auto fieldName = elem.fieldNameStringData();
@@ -108,7 +110,7 @@ StatusWith<AggregationRequest> AggregationRequest::parseFromBSON(NamespaceString
return status;
}
- request.setCursorCommand(true);
+ hasCursorElem = true;
request.setBatchSize(batchSize);
} else if (kCollationName == fieldName) {
if (elem.type() != BSONType::Object) {
@@ -149,6 +151,14 @@ StatusWith<AggregationRequest> AggregationRequest::parseFromBSON(NamespaceString
str::stream() << "unrecognized field '" << elem.fieldName() << "'"};
}
}
+
+ if (!hasCursorElem && !request.isExplain()) {
+ return {ErrorCodes::FailedToParse,
+ str::stream() << "The '" << kCursorName << "' option is required, unless '"
+ << kExplainName
+ << "' is true"};
+ }
+
return request;
}
@@ -165,7 +175,8 @@ Document AggregationRequest::serializeToCommandObj() const {
_bypassDocumentValidation ? Value(true) : Value()},
// Only serialize a collation if one was specified.
{kCollationName, _collation.isEmpty() ? Value() : Value(_collation)},
- {kCursorName, _batchSize ? Value(Document{{kBatchSizeName, _batchSize.get()}}) : Value()}};
+ // Only serialize batchSize when explain is false.
+ {kCursorName, _explain ? Value() : Value(Document{{kBatchSizeName, _batchSize}})}};
}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/aggregation_request.h b/src/mongo/db/pipeline/aggregation_request.h
index 3a8844b00e9..4a3ef4c0223 100644
--- a/src/mongo/db/pipeline/aggregation_request.h
+++ b/src/mongo/db/pipeline/aggregation_request.h
@@ -81,7 +81,7 @@ public:
// Getters.
//
- boost::optional<long long> getBatchSize() const {
+ long long getBatchSize() const {
return _batchSize;
}
@@ -96,10 +96,6 @@ public:
return _pipeline;
}
- bool isCursorCommand() const {
- return _cursorCommand;
- }
-
bool isExplain() const {
return _explain;
}
@@ -128,8 +124,7 @@ public:
//
/**
- * Must be either unset or non-negative. Negative batchSize is illegal but batchSize of 0 is
- * allowed.
+ * Negative batchSize is illegal but batchSize of 0 is allowed.
*/
void setBatchSize(long long batchSize) {
uassert(40203, "batchSize must be non-negative", batchSize >= 0);
@@ -140,10 +135,6 @@ public:
_collation = collation.getOwned();
}
- void setCursorCommand(bool isCursorCommand) {
- _cursorCommand = isCursorCommand;
- }
-
void setExplain(bool isExplain) {
_explain = isExplain;
}
@@ -168,9 +159,9 @@ private:
// An unparsed version of the pipeline.
const std::vector<BSONObj> _pipeline;
- // Optional fields.
+ long long _batchSize;
- boost::optional<long long> _batchSize;
+ // Optional fields.
// An owned copy of the user-specified collation object, or an empty object if no collation was
// specified.
@@ -180,6 +171,5 @@ private:
bool _allowDiskUse = false;
bool _fromRouter = false;
bool _bypassDocumentValidation = false;
- bool _cursorCommand = false;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/aggregation_request_test.cpp b/src/mongo/db/pipeline/aggregation_request_test.cpp
index acd38732522..4d1a5f33fb6 100644
--- a/src/mongo/db/pipeline/aggregation_request_test.cpp
+++ b/src/mongo/db/pipeline/aggregation_request_test.cpp
@@ -44,6 +44,9 @@
namespace mongo {
namespace {
+const Document kDefaultCursorOptionDocument{
+ {AggregationRequest::kBatchSizeName, AggregationRequest::kDefaultBatchSize}};
+
//
// Parsing
//
@@ -58,8 +61,7 @@ TEST(AggregationRequestTest, ShouldParseAllKnownOptions) {
ASSERT_TRUE(request.shouldAllowDiskUse());
ASSERT_TRUE(request.isFromRouter());
ASSERT_TRUE(request.shouldBypassDocumentValidation());
- ASSERT_TRUE(request.isCursorCommand());
- ASSERT_EQ(request.getBatchSize().get(), 10);
+ ASSERT_EQ(request.getBatchSize(), 10);
ASSERT_BSONOBJ_EQ(request.getCollation(),
BSON("locale"
<< "en_US"));
@@ -75,7 +77,8 @@ TEST(AggregationRequestTest, ShouldOnlySerializeRequiredFieldsIfNoOptionalFields
auto expectedSerialization =
Document{{AggregationRequest::kCommandName, nss.coll()},
- {AggregationRequest::kPipelineName, Value(std::vector<Value>{})}};
+ {AggregationRequest::kPipelineName, Value(std::vector<Value>{})},
+ {AggregationRequest::kCursorName, Value(kDefaultCursorOptionDocument)}};
ASSERT_DOCUMENT_EQ(request.serializeToCommandObj(), expectedSerialization);
}
@@ -90,7 +93,8 @@ TEST(AggregationRequestTest, ShouldNotSerializeOptionalValuesIfEquivalentToDefau
auto expectedSerialization =
Document{{AggregationRequest::kCommandName, nss.coll()},
- {AggregationRequest::kPipelineName, Value(std::vector<Value>{})}};
+ {AggregationRequest::kPipelineName, Value(std::vector<Value>{})},
+ {AggregationRequest::kCursorName, Value(kDefaultCursorOptionDocument)}};
ASSERT_DOCUMENT_EQ(request.serializeToCommandObj(), expectedSerialization);
}
@@ -101,6 +105,7 @@ TEST(AggregationRequestTest, ShouldSerializeOptionalValuesIfSet) {
request.setAllowDiskUse(true);
request.setFromRouter(true);
request.setBypassDocumentValidation(true);
+ request.setBatchSize(10); // batchSize not serialzed when explain is true.
const auto collationObj = BSON("locale"
<< "en_US");
request.setCollation(collationObj);
@@ -116,23 +121,25 @@ TEST(AggregationRequestTest, ShouldSerializeOptionalValuesIfSet) {
ASSERT_DOCUMENT_EQ(request.serializeToCommandObj(), expectedSerialization);
}
-TEST(AggregationRequestTest, ShouldSetBatchSizeToDefaultOnEmptyCursorObject) {
+TEST(AggregationRequestTest, ShouldSerializeBatchSizeIfSetAndExplainFalse) {
NamespaceString nss("a.collection");
- const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}}");
- auto request = AggregationRequest::parseFromBSON(nss, inputBson);
- ASSERT_OK(request.getStatus());
- ASSERT_TRUE(request.getValue().isCursorCommand());
- ASSERT_TRUE(request.getValue().getBatchSize());
- ASSERT_EQ(request.getValue().getBatchSize().get(), AggregationRequest::kDefaultBatchSize);
+ AggregationRequest request(nss, {});
+ request.setBatchSize(10);
+
+ auto expectedSerialization =
+ Document{{AggregationRequest::kCommandName, nss.coll()},
+ {AggregationRequest::kPipelineName, Value(std::vector<Value>{})},
+ {AggregationRequest::kCursorName,
+ Value(Document({{AggregationRequest::kBatchSizeName, 10}}))}};
+ ASSERT_DOCUMENT_EQ(request.serializeToCommandObj(), expectedSerialization);
}
-TEST(AggregationRequestTest, NoBatchSizeWhenCursorObjectNotSet) {
+TEST(AggregationRequestTest, ShouldSetBatchSizeToDefaultOnEmptyCursorObject) {
NamespaceString nss("a.collection");
- const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}]}");
+ const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}}");
auto request = AggregationRequest::parseFromBSON(nss, inputBson);
ASSERT_OK(request.getStatus());
- ASSERT_FALSE(request.getValue().isCursorCommand());
- ASSERT_FALSE(request.getValue().getBatchSize());
+ ASSERT_EQ(request.getValue().getBatchSize(), AggregationRequest::kDefaultBatchSize);
}
//
@@ -141,41 +148,51 @@ TEST(AggregationRequestTest, NoBatchSizeWhenCursorObjectNotSet) {
TEST(AggregationRequestTest, ShouldRejectNonArrayPipeline) {
NamespaceString nss("a.collection");
- const BSONObj inputBson = fromjson("{pipeline: {}}");
+ const BSONObj inputBson = fromjson("{pipeline: {}, cursor: {}}");
ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
}
TEST(AggregationRequestTest, ShouldRejectPipelineArrayIfAnElementIsNotAnObject) {
NamespaceString nss("a.collection");
- BSONObj inputBson = fromjson("{pipeline: [4]}");
+ BSONObj inputBson = fromjson("{pipeline: [4], cursor: {}}");
ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
- inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}, 4]}");
+ inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}, 4], cursor: {}}");
ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
}
TEST(AggregationRequestTest, ShouldRejectNonObjectCollation) {
NamespaceString nss("a.collection");
- const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}], collation: 1}");
+ const BSONObj inputBson =
+ fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, collation: 1}");
ASSERT_NOT_OK(
AggregationRequest::parseFromBSON(NamespaceString("a.collection"), inputBson).getStatus());
}
TEST(AggregationRequestTest, ShouldRejectNonBoolExplain) {
NamespaceString nss("a.collection");
- const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}], explain: 1}");
+ const BSONObj inputBson =
+ fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, explain: 1}");
ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
}
TEST(AggregationRequestTest, ShouldRejectNonBoolFromRouter) {
NamespaceString nss("a.collection");
- const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}], fromRouter: 1}");
+ const BSONObj inputBson =
+ fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, fromRouter: 1}");
ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
}
TEST(AggregationRequestTest, ShouldRejectNonBoolAllowDiskUse) {
NamespaceString nss("a.collection");
- const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}], allowDiskUse: 1}");
+ const BSONObj inputBson =
+ fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, allowDiskUse: 1}");
+ ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
+}
+
+TEST(AggregationRequestTest, ShouldRejectNoCursorNoExplain) {
+ NamespaceString nss("a.collection");
+ const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}]}");
ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
}
@@ -185,27 +202,29 @@ TEST(AggregationRequestTest, ShouldRejectNonBoolAllowDiskUse) {
TEST(AggregationRequestTest, ShouldIgnoreFieldsPrefixedWithDollar) {
NamespaceString nss("a.collection");
- const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}], $unknown: 1}");
+ const BSONObj inputBson =
+ fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, $unknown: 1}");
ASSERT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
}
TEST(AggregationRequestTest, ShouldIgnoreWriteConcernOption) {
NamespaceString nss("a.collection");
const BSONObj inputBson =
- fromjson("{pipeline: [{$match: {a: 'abc'}}], writeConcern: 'invalid'}");
+ fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, writeConcern: 'invalid'}");
ASSERT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
}
TEST(AggregationRequestTest, ShouldIgnoreMaxTimeMsOption) {
NamespaceString nss("a.collection");
- const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}], maxTimeMS: 'invalid'}");
+ const BSONObj inputBson =
+ fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, maxTimeMS: 'invalid'}");
ASSERT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
}
TEST(AggregationRequestTest, ShouldIgnoreReadConcernOption) {
NamespaceString nss("a.collection");
const BSONObj inputBson =
- fromjson("{pipeline: [{$match: {a: 'abc'}}], readConcern: 'invalid'}");
+ fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, readConcern: 'invalid'}");
ASSERT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
}
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index 8bb745fba13..5c13f710b40 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -310,30 +310,6 @@ void Pipeline::stitch() {
}
}
-void Pipeline::run(BSONObjBuilder& result) {
- // We should not get here in the explain case.
- verify(!pCtx->isExplain);
-
- // the array in which the aggregation results reside
- // cant use subArrayStart() due to error handling
- BSONArrayBuilder resultArray;
- while (auto next = getNext()) {
- // Add the document to the result set.
- BSONObjBuilder documentBuilder(resultArray.subobjStart());
- next->toBson(&documentBuilder);
- documentBuilder.doneFast();
- // Object will be too large, assert. The extra 1KB is for headers.
- uassert(16389,
- str::stream() << "aggregation result exceeds maximum document size ("
- << BSONObjMaxUserSize / (1024 * 1024)
- << "MB)",
- resultArray.len() < BSONObjMaxUserSize - 1024);
- }
-
- resultArray.done();
- result.appendArray("result", resultArray.arr());
-}
-
boost::optional<Document> Pipeline::getNext() {
invariant(!_sources.empty());
auto nextResult = _sources.back()->getNext();
diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h
index fe4ca1de424..3aceb6c28f0 100644
--- a/src/mongo/db/pipeline/pipeline.h
+++ b/src/mongo/db/pipeline/pipeline.h
@@ -139,13 +139,6 @@ public:
*/
std::vector<Value> serialize() const;
- /**
- Run the Pipeline on the given source.
-
- @param result builder to write the result to
- */
- void run(BSONObjBuilder& result);
-
/// The initial source is special since it varies between mongos and mongod.
void addInitialSource(boost::intrusive_ptr<DocumentSource> source);