summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorRuoxin Xu <ruoxin.xu@mongodb.com>2020-10-29 12:14:01 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-01-07 11:20:50 +0000
commit90c89d33c400d2f1eb8972170b7a17e3315c4198 (patch)
tree2aaee3468e4350950b546b2b24783d9ddc2d8e2e /src/mongo/db
parent66cdb6d0fccf3b65c61a1bea5d6171591d21c9da (diff)
downloadmongo-90c89d33c400d2f1eb8972170b7a17e3315c4198.tar.gz
SERVER-51649 Convert aggregate command input to IDL
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/auth/authorization_session.cpp1
-rw-r--r--src/mongo/db/auth/authorization_session.h4
-rw-r--r--src/mongo/db/auth/authorization_session_impl.cpp14
-rw-r--r--src/mongo/db/auth/authorization_session_impl.h4
-rw-r--r--src/mongo/db/auth/authorization_session_test.cpp172
-rw-r--r--src/mongo/db/commands/SConscript3
-rw-r--r--src/mongo/db/commands/count_cmd.cpp7
-rw-r--r--src/mongo/db/commands/current_op.cpp10
-rw-r--r--src/mongo/db/commands/current_op_common.cpp6
-rw-r--r--src/mongo/db/commands/current_op_common.h6
-rw-r--r--src/mongo/db/commands/distinct.cpp5
-rw-r--r--src/mongo/db/commands/find_cmd.cpp6
-rw-r--r--src/mongo/db/commands/pipeline_command.cpp22
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp57
-rw-r--r--src/mongo/db/commands/run_aggregate.h6
-rw-r--r--src/mongo/db/commands/user_management_commands.cpp7
-rw-r--r--src/mongo/db/commands/write_commands/write_commands.cpp4
-rw-r--r--src/mongo/db/error_labels.cpp5
-rw-r--r--src/mongo/db/error_labels_test.cpp14
-rw-r--r--src/mongo/db/namespace_string.cpp8
-rw-r--r--src/mongo/db/namespace_string.h2
-rw-r--r--src/mongo/db/ops/write_ops_parsers.cpp4
-rw-r--r--src/mongo/db/pipeline/SConscript14
-rw-r--r--src/mongo/db/pipeline/aggregate_command.idl164
-rw-r--r--src/mongo/db/pipeline/aggregation_request.cpp363
-rw-r--r--src/mongo/db/pipeline/aggregation_request.h381
-rw-r--r--src/mongo/db/pipeline/aggregation_request_helper.cpp237
-rw-r--r--src/mongo/db/pipeline/aggregation_request_helper.h144
-rw-r--r--src/mongo/db/pipeline/aggregation_request_test.cpp447
-rw-r--r--src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp11
-rw-r--r--src/mongo/db/pipeline/document_source.h3
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_group_test.cpp7
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.cpp11
-rw-r--r--src/mongo/db/pipeline/document_source_merge_spec.cpp5
-rw-r--r--src/mongo/db/pipeline/document_source_unwind_test.cpp3
-rw-r--r--src/mongo/db/pipeline/expression_context.cpp14
-rw-r--r--src/mongo/db/pipeline/expression_context.h6
-rw-r--r--src/mongo/db/pipeline/expression_context_for_test.h3
-rw-r--r--src/mongo/db/pipeline/expression_walker_test.cpp7
-rw-r--r--src/mongo/db/pipeline/lite_parsed_pipeline.h6
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp14
-rw-r--r--src/mongo/db/pipeline/pipeline_d.h12
-rw-r--r--src/mongo/db/pipeline/pipeline_metadata_tree_test.cpp7
-rw-r--r--src/mongo/db/pipeline/pipeline_test.cpp12
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp78
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.h8
-rw-r--r--src/mongo/db/query/SConscript3
-rw-r--r--src/mongo/db/query/count_command_test.cpp31
-rw-r--r--src/mongo/db/query/hint_parser.cpp2
-rw-r--r--src/mongo/db/query/parsed_distinct_test.cpp69
-rw-r--r--src/mongo/db/query/query_request_test.cpp82
-rw-r--r--src/mongo/db/s/SConscript1
-rw-r--r--src/mongo/db/s/periodic_sharded_index_consistency_checker.cpp4
-rw-r--r--src/mongo/db/s/resharding/resharding_collection_cloner.cpp4
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp10
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_fetcher.h2
-rw-r--r--src/mongo/db/s/resharding/resharding_txn_cloner.cpp6
-rw-r--r--src/mongo/db/s/shard_local.cpp2
-rw-r--r--src/mongo/db/s/shard_local.h2
-rw-r--r--src/mongo/db/views/SConscript1
-rw-r--r--src/mongo/db/views/resolved_view.cpp12
-rw-r--r--src/mongo/db/views/resolved_view.h4
-rw-r--r--src/mongo/db/views/resolved_view_test.cpp60
-rw-r--r--src/mongo/db/views/view_catalog.cpp4
65 files changed, 1272 insertions, 1355 deletions
diff --git a/src/mongo/db/auth/authorization_session.cpp b/src/mongo/db/auth/authorization_session.cpp
index caed33df1b9..fc5810e7a18 100644
--- a/src/mongo/db/auth/authorization_session.cpp
+++ b/src/mongo/db/auth/authorization_session.cpp
@@ -48,7 +48,6 @@
#include "mongo/db/client.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/namespace_string.h"
-#include "mongo/db/pipeline/aggregation_request.h"
#include "mongo/db/pipeline/lite_parsed_pipeline.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/str.h"
diff --git a/src/mongo/db/auth/authorization_session.h b/src/mongo/db/auth/authorization_session.h
index b18d8409099..98360a2b66d 100644
--- a/src/mongo/db/auth/authorization_session.h
+++ b/src/mongo/db/auth/authorization_session.h
@@ -44,7 +44,7 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/ops/write_ops_parsers.h"
-#include "mongo/db/pipeline/aggregation_request.h"
+#include "mongo/db/pipeline/aggregate_command_gen.h"
namespace mongo {
@@ -217,7 +217,7 @@ public:
// Attempts to get the privileges necessary to run the aggregation pipeline specified in
// 'request' on the namespace 'ns' either directly on mongoD or via mongoS.
virtual StatusWith<PrivilegeVector> getPrivilegesForAggregate(const NamespaceString& ns,
- const AggregationRequest& request,
+ const AggregateCommand& request,
bool isMongos) = 0;
// Checks if this connection has the privileges necessary to create 'ns' with the options
diff --git a/src/mongo/db/auth/authorization_session_impl.cpp b/src/mongo/db/auth/authorization_session_impl.cpp
index 0684c536269..8b16d95ae72 100644
--- a/src/mongo/db/auth/authorization_session_impl.cpp
+++ b/src/mongo/db/auth/authorization_session_impl.cpp
@@ -52,7 +52,7 @@
#include "mongo/db/jsobj.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
-#include "mongo/db/pipeline/aggregation_request.h"
+#include "mongo/db/pipeline/aggregation_request_helper.h"
#include "mongo/db/pipeline/lite_parsed_pipeline.h"
#include "mongo/logv2/log.h"
#include "mongo/util/assert_util.h"
@@ -90,10 +90,10 @@ Status checkAuthForCreateOrModifyView(AuthorizationSession* authzSession,
return Status::OK();
}
- auto status = AggregationRequest::parseFromBSON(viewNs,
- BSON("aggregate" << viewOnNs.coll()
- << "pipeline" << viewPipeline
- << "cursor" << BSONObj()));
+ auto status = aggregation_request_helper::parseFromBSON(
+ viewNs,
+ BSON("aggregate" << viewOnNs.coll() << "pipeline" << viewPipeline << "cursor" << BSONObj()
+ << "$db" << viewOnNs.db()));
if (!status.isOK())
return status.getStatus();
@@ -269,7 +269,7 @@ PrivilegeVector AuthorizationSessionImpl::getDefaultPrivileges() {
}
StatusWith<PrivilegeVector> AuthorizationSessionImpl::getPrivilegesForAggregate(
- const NamespaceString& nss, const AggregationRequest& request, bool isMongos) {
+ const NamespaceString& nss, const AggregateCommand& request, bool isMongos) {
if (!nss.isValid()) {
return Status(ErrorCodes::InvalidNamespace,
str::stream() << "Invalid input namespace, " << nss.ns());
@@ -306,7 +306,7 @@ StatusWith<PrivilegeVector> AuthorizationSessionImpl::getPrivilegesForAggregate(
for (auto&& pipelineStage : pipeline) {
liteParsedDocSource = LiteParsedDocumentSource::parse(nss, pipelineStage);
PrivilegeVector currentPrivs = liteParsedDocSource->requiredPrivileges(
- isMongos, request.shouldBypassDocumentValidation());
+ isMongos, request.getBypassDocumentValidation().value_or(false));
Privilege::addPrivilegesToPrivilegeVector(&privileges, currentPrivs);
}
return privileges;
diff --git a/src/mongo/db/auth/authorization_session_impl.h b/src/mongo/db/auth/authorization_session_impl.h
index b7896daf518..f7f4b0aa596 100644
--- a/src/mongo/db/auth/authorization_session_impl.h
+++ b/src/mongo/db/auth/authorization_session_impl.h
@@ -42,7 +42,7 @@
#include "mongo/db/auth/user_name.h"
#include "mongo/db/auth/user_set.h"
#include "mongo/db/namespace_string.h"
-#include "mongo/db/pipeline/aggregation_request.h"
+#include "mongo/db/pipeline/aggregate_command_gen.h"
namespace mongo {
@@ -118,7 +118,7 @@ public:
UserNameIterator cursorOwner) override;
StatusWith<PrivilegeVector> getPrivilegesForAggregate(const NamespaceString& ns,
- const AggregationRequest& request,
+ const AggregateCommand& request,
bool isMongos) override;
Status checkAuthForCreate(const NamespaceString& ns,
diff --git a/src/mongo/db/auth/authorization_session_test.cpp b/src/mongo/db/auth/authorization_session_test.cpp
index 81dff04b7f3..318a7cd509e 100644
--- a/src/mongo/db/auth/authorization_session_test.cpp
+++ b/src/mongo/db/auth/authorization_session_test.cpp
@@ -47,7 +47,7 @@
#include "mongo/db/json.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
-#include "mongo/db/pipeline/aggregation_request.h"
+#include "mongo/db/pipeline/aggregation_request_helper.h"
#include "mongo/db/service_context_test_fixture.h"
#include "mongo/transport/session.h"
#include "mongo/transport/transport_layer_mock.h"
@@ -595,10 +595,10 @@ TEST_F(AuthorizationSessionTest, AcquireUserObtainsAndValidatesAuthenticationRes
}
TEST_F(AuthorizationSessionTest, CannotAggregateEmptyPipelineWithoutFindAction) {
- auto aggReq = uassertStatusOK(AggregationRequest::parseFromBSON(
+ auto aggReq = uassertStatusOK(aggregation_request_helper::parseFromBSON(
testFooNss,
- BSON("aggregate" << testFooNss.coll() << "pipeline" << BSONArray() << "cursor"
- << BSONObj())));
+ BSON("aggregate" << testFooNss.coll() << "pipeline" << BSONArray() << "cursor" << BSONObj()
+ << "$db" << testFooNss.db())));
PrivilegeVector privileges =
uassertStatusOK(authzSession->getPrivilegesForAggregate(testFooNss, aggReq, false));
ASSERT_FALSE(authzSession->isAuthorizedForPrivileges(privileges));
@@ -607,10 +607,10 @@ TEST_F(AuthorizationSessionTest, CannotAggregateEmptyPipelineWithoutFindAction)
TEST_F(AuthorizationSessionTest, CanAggregateEmptyPipelineWithFindAction) {
authzSession->assumePrivilegesForDB(Privilege(testFooCollResource, {ActionType::find}));
- auto aggReq = uassertStatusOK(AggregationRequest::parseFromBSON(
+ auto aggReq = uassertStatusOK(aggregation_request_helper::parseFromBSON(
testFooNss,
- BSON("aggregate" << testFooNss.coll() << "pipeline" << BSONArray() << "cursor"
- << BSONObj())));
+ BSON("aggregate" << testFooNss.coll() << "pipeline" << BSONArray() << "cursor" << BSONObj()
+ << "$db" << testFooNss.db())));
PrivilegeVector privileges =
uassertStatusOK(authzSession->getPrivilegesForAggregate(testFooNss, aggReq, false));
ASSERT_TRUE(authzSession->isAuthorizedForPrivileges(privileges));
@@ -623,9 +623,10 @@ TEST_F(AuthorizationSessionTest, CannotAggregateWithoutFindActionIfFirstStageNot
BSONArray pipeline = BSON_ARRAY(BSON("$limit" << 1) << BSON("$collStats" << BSONObj())
<< BSON("$indexStats" << BSONObj()));
- auto aggReq = uassertStatusOK(AggregationRequest::parseFromBSON(
+ auto aggReq = uassertStatusOK(aggregation_request_helper::parseFromBSON(
testFooNss,
- BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj())));
+ BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj()
+ << "$db" << testFooNss.db())));
PrivilegeVector privileges =
uassertStatusOK(authzSession->getPrivilegesForAggregate(testFooNss, aggReq, false));
ASSERT_FALSE(authzSession->isAuthorizedForPrivileges(privileges));
@@ -636,9 +637,10 @@ TEST_F(AuthorizationSessionTest, CannotAggregateWithFindActionIfPipelineContains
BSONArray pipeline = BSON_ARRAY(BSON("$limit" << 1) << BSON("$collStats" << BSONObj())
<< BSON("$indexStats" << BSONObj()));
- auto aggReq = uassertStatusOK(AggregationRequest::parseFromBSON(
+ auto aggReq = uassertStatusOK(aggregation_request_helper::parseFromBSON(
testFooNss,
- BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj())));
+ BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj()
+ << "$db" << testFooNss.db())));
PrivilegeVector privileges =
uassertStatusOK(authzSession->getPrivilegesForAggregate(testFooNss, aggReq, false));
ASSERT_FALSE(authzSession->isAuthorizedForPrivileges(privileges));
@@ -648,9 +650,10 @@ TEST_F(AuthorizationSessionTest, CannotAggregateCollStatsWithoutCollStatsAction)
authzSession->assumePrivilegesForDB(Privilege(testFooCollResource, {ActionType::find}));
BSONArray pipeline = BSON_ARRAY(BSON("$collStats" << BSONObj()));
- auto aggReq = uassertStatusOK(AggregationRequest::parseFromBSON(
+ auto aggReq = uassertStatusOK(aggregation_request_helper::parseFromBSON(
testFooNss,
- BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj())));
+ BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj()
+ << "$db" << testFooNss.db())));
PrivilegeVector privileges =
uassertStatusOK(authzSession->getPrivilegesForAggregate(testFooNss, aggReq, false));
ASSERT_FALSE(authzSession->isAuthorizedForPrivileges(privileges));
@@ -660,9 +663,10 @@ TEST_F(AuthorizationSessionTest, CanAggregateCollStatsWithCollStatsAction) {
authzSession->assumePrivilegesForDB(Privilege(testFooCollResource, {ActionType::collStats}));
BSONArray pipeline = BSON_ARRAY(BSON("$collStats" << BSONObj()));
- auto aggReq = uassertStatusOK(AggregationRequest::parseFromBSON(
+ auto aggReq = uassertStatusOK(aggregation_request_helper::parseFromBSON(
testFooNss,
- BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj())));
+ BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj()
+ << "$db" << testFooNss.db())));
PrivilegeVector privileges =
uassertStatusOK(authzSession->getPrivilegesForAggregate(testFooNss, aggReq, false));
ASSERT_TRUE(authzSession->isAuthorizedForPrivileges(privileges));
@@ -672,9 +676,10 @@ TEST_F(AuthorizationSessionTest, CannotAggregateIndexStatsWithoutIndexStatsActio
authzSession->assumePrivilegesForDB(Privilege(testFooCollResource, {ActionType::find}));
BSONArray pipeline = BSON_ARRAY(BSON("$indexStats" << BSONObj()));
- auto aggReq = uassertStatusOK(AggregationRequest::parseFromBSON(
+ auto aggReq = uassertStatusOK(aggregation_request_helper::parseFromBSON(
testFooNss,
- BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj())));
+ BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj()
+ << "$db" << testFooNss.db())));
PrivilegeVector privileges =
uassertStatusOK(authzSession->getPrivilegesForAggregate(testFooNss, aggReq, false));
ASSERT_FALSE(authzSession->isAuthorizedForPrivileges(privileges));
@@ -684,9 +689,10 @@ TEST_F(AuthorizationSessionTest, CanAggregateIndexStatsWithIndexStatsAction) {
authzSession->assumePrivilegesForDB(Privilege(testFooCollResource, {ActionType::indexStats}));
BSONArray pipeline = BSON_ARRAY(BSON("$indexStats" << BSONObj()));
- auto aggReq = uassertStatusOK(AggregationRequest::parseFromBSON(
+ auto aggReq = uassertStatusOK(aggregation_request_helper::parseFromBSON(
testFooNss,
- BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj())));
+ BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj()
+ << "$db" << testFooNss.db())));
PrivilegeVector privileges =
uassertStatusOK(authzSession->getPrivilegesForAggregate(testFooNss, aggReq, false));
ASSERT_TRUE(authzSession->isAuthorizedForPrivileges(privileges));
@@ -696,9 +702,10 @@ TEST_F(AuthorizationSessionTest, CanAggregateCurrentOpAllUsersFalseWithoutInprog
authzSession->assumePrivilegesForDB(Privilege(testFooCollResource, {ActionType::find}));
BSONArray pipeline = BSON_ARRAY(BSON("$currentOp" << BSON("allUsers" << false)));
- auto aggReq = uassertStatusOK(AggregationRequest::parseFromBSON(
+ auto aggReq = uassertStatusOK(aggregation_request_helper::parseFromBSON(
testFooNss,
- BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj())));
+ BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj()
+ << "$db" << testFooNss.db())));
PrivilegeVector privileges =
uassertStatusOK(authzSession->getPrivilegesForAggregate(testFooNss, aggReq, false));
ASSERT_TRUE(authzSession->isAuthorizedForPrivileges(privileges));
@@ -708,9 +715,10 @@ TEST_F(AuthorizationSessionTest, CannotAggregateCurrentOpAllUsersFalseWithoutInp
authzSession->assumePrivilegesForDB(Privilege(testFooCollResource, {ActionType::find}));
BSONArray pipeline = BSON_ARRAY(BSON("$currentOp" << BSON("allUsers" << false)));
- auto aggReq = uassertStatusOK(AggregationRequest::parseFromBSON(
+ auto aggReq = uassertStatusOK(aggregation_request_helper::parseFromBSON(
testFooNss,
- BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj())));
+ BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj()
+ << "$db" << testFooNss.db())));
PrivilegeVector privileges =
uassertStatusOK(authzSession->getPrivilegesForAggregate(testFooNss, aggReq, true));
ASSERT_FALSE(authzSession->isAuthorizedForPrivileges(privileges));
@@ -718,17 +726,19 @@ TEST_F(AuthorizationSessionTest, CannotAggregateCurrentOpAllUsersFalseWithoutInp
TEST_F(AuthorizationSessionTest, CannotAggregateCurrentOpAllUsersFalseIfNotAuthenticatedOnMongoD) {
BSONArray pipeline = BSON_ARRAY(BSON("$currentOp" << BSON("allUsers" << false)));
- auto aggReq = uassertStatusOK(AggregationRequest::parseFromBSON(
+ auto aggReq = uassertStatusOK(aggregation_request_helper::parseFromBSON(
testFooNss,
- BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj())));
+ BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj()
+ << "$db" << testFooNss.db())));
ASSERT_FALSE(authzSession->isAuthenticated());
}
TEST_F(AuthorizationSessionTest, CannotAggregateCurrentOpAllUsersFalseIfNotAuthenticatedOnMongoS) {
BSONArray pipeline = BSON_ARRAY(BSON("$currentOp" << BSON("allUsers" << false)));
- auto aggReq = uassertStatusOK(AggregationRequest::parseFromBSON(
+ auto aggReq = uassertStatusOK(aggregation_request_helper::parseFromBSON(
testFooNss,
- BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj())));
+ BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj()
+ << "$db" << testFooNss.db())));
PrivilegeVector privileges =
uassertStatusOK(authzSession->getPrivilegesForAggregate(testFooNss, aggReq, true));
@@ -739,9 +749,10 @@ TEST_F(AuthorizationSessionTest, CannotAggregateCurrentOpAllUsersTrueWithoutInpr
authzSession->assumePrivilegesForDB(Privilege(testFooCollResource, {ActionType::find}));
BSONArray pipeline = BSON_ARRAY(BSON("$currentOp" << BSON("allUsers" << true)));
- auto aggReq = uassertStatusOK(AggregationRequest::parseFromBSON(
+ auto aggReq = uassertStatusOK(aggregation_request_helper::parseFromBSON(
testFooNss,
- BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj())));
+ BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj()
+ << "$db" << testFooNss.db())));
PrivilegeVector privileges =
uassertStatusOK(authzSession->getPrivilegesForAggregate(testFooNss, aggReq, false));
ASSERT_FALSE(authzSession->isAuthorizedForPrivileges(privileges));
@@ -751,9 +762,10 @@ TEST_F(AuthorizationSessionTest, CannotAggregateCurrentOpAllUsersTrueWithoutInpr
authzSession->assumePrivilegesForDB(Privilege(testFooCollResource, {ActionType::find}));
BSONArray pipeline = BSON_ARRAY(BSON("$currentOp" << BSON("allUsers" << true)));
- auto aggReq = uassertStatusOK(AggregationRequest::parseFromBSON(
+ auto aggReq = uassertStatusOK(aggregation_request_helper::parseFromBSON(
testFooNss,
- BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj())));
+ BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj()
+ << "$db" << testFooNss.db())));
PrivilegeVector privileges =
uassertStatusOK(authzSession->getPrivilegesForAggregate(testFooNss, aggReq, true));
ASSERT_FALSE(authzSession->isAuthorizedForPrivileges(privileges));
@@ -764,9 +776,10 @@ TEST_F(AuthorizationSessionTest, CanAggregateCurrentOpAllUsersTrueWithInprogActi
Privilege(ResourcePattern::forClusterResource(), {ActionType::inprog}));
BSONArray pipeline = BSON_ARRAY(BSON("$currentOp" << BSON("allUsers" << true)));
- auto aggReq = uassertStatusOK(AggregationRequest::parseFromBSON(
+ auto aggReq = uassertStatusOK(aggregation_request_helper::parseFromBSON(
testFooNss,
- BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj())));
+ BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj()
+ << "$db" << testFooNss.db())));
PrivilegeVector privileges =
uassertStatusOK(authzSession->getPrivilegesForAggregate(testFooNss, aggReq, false));
ASSERT_TRUE(authzSession->isAuthorizedForPrivileges(privileges));
@@ -777,9 +790,10 @@ TEST_F(AuthorizationSessionTest, CanAggregateCurrentOpAllUsersTrueWithInprogActi
Privilege(ResourcePattern::forClusterResource(), {ActionType::inprog}));
BSONArray pipeline = BSON_ARRAY(BSON("$currentOp" << BSON("allUsers" << true)));
- auto aggReq = uassertStatusOK(AggregationRequest::parseFromBSON(
+ auto aggReq = uassertStatusOK(aggregation_request_helper::parseFromBSON(
testFooNss,
- BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj())));
+ BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj()
+ << "$db" << testFooNss.db())));
PrivilegeVector privileges =
uassertStatusOK(authzSession->getPrivilegesForAggregate(testFooNss, aggReq, true));
ASSERT_TRUE(authzSession->isAuthorizedForPrivileges(privileges));
@@ -790,9 +804,10 @@ TEST_F(AuthorizationSessionTest, CannotSpoofAllUsersTrueWithoutInprogActionOnMon
BSONArray pipeline =
BSON_ARRAY(BSON("$currentOp" << BSON("allUsers" << false << "allUsers" << true)));
- auto aggReq = uassertStatusOK(AggregationRequest::parseFromBSON(
+ auto aggReq = uassertStatusOK(aggregation_request_helper::parseFromBSON(
testFooNss,
- BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj())));
+ BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj()
+ << "$db" << testFooNss.db())));
PrivilegeVector privileges =
uassertStatusOK(authzSession->getPrivilegesForAggregate(testFooNss, aggReq, false));
ASSERT_FALSE(authzSession->isAuthorizedForPrivileges(privileges));
@@ -803,9 +818,10 @@ TEST_F(AuthorizationSessionTest, CannotSpoofAllUsersTrueWithoutInprogActionOnMon
BSONArray pipeline =
BSON_ARRAY(BSON("$currentOp" << BSON("allUsers" << false << "allUsers" << true)));
- auto aggReq = uassertStatusOK(AggregationRequest::parseFromBSON(
+ auto aggReq = uassertStatusOK(aggregation_request_helper::parseFromBSON(
testFooNss,
- BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj())));
+ BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj()
+ << "$db" << testFooNss.db())));
PrivilegeVector privileges =
uassertStatusOK(authzSession->getPrivilegesForAggregate(testFooNss, aggReq, true));
ASSERT_FALSE(authzSession->isAuthorizedForPrivileges(privileges));
@@ -816,9 +832,10 @@ TEST_F(AuthorizationSessionTest, AddPrivilegesForStageFailsIfOutNamespaceIsNotVa
BSONArray pipeline = BSON_ARRAY(BSON("$out"
<< ""));
- auto aggReq = uassertStatusOK(AggregationRequest::parseFromBSON(
+ auto aggReq = uassertStatusOK(aggregation_request_helper::parseFromBSON(
testFooNss,
- BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj())));
+ BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj()
+ << "$db" << testFooNss.db())));
ASSERT_THROWS_CODE(authzSession->getPrivilegesForAggregate(testFooNss, aggReq, false),
AssertionException,
ErrorCodes::InvalidNamespace);
@@ -829,9 +846,10 @@ TEST_F(AuthorizationSessionTest, CannotAggregateOutWithoutInsertAndRemoveOnTarge
authzSession->assumePrivilegesForDB(Privilege(testFooCollResource, {ActionType::find}));
BSONArray pipeline = BSON_ARRAY(BSON("$out" << testBarNss.coll()));
- auto aggReq = uassertStatusOK(AggregationRequest::parseFromBSON(
+ auto aggReq = uassertStatusOK(aggregation_request_helper::parseFromBSON(
testFooNss,
- BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj())));
+ BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj()
+ << "$db" << testFooNss.db())));
PrivilegeVector privileges =
uassertStatusOK(authzSession->getPrivilegesForAggregate(testFooNss, aggReq, false));
ASSERT_FALSE(authzSession->isAuthorizedForPrivileges(privileges));
@@ -853,17 +871,20 @@ TEST_F(AuthorizationSessionTest, CanAggregateOutWithInsertAndRemoveOnTargetNames
Privilege(testBarCollResource, {ActionType::insert, ActionType::remove})});
BSONArray pipeline = BSON_ARRAY(BSON("$out" << testBarNss.coll()));
- auto aggReq = uassertStatusOK(AggregationRequest::parseFromBSON(
+ auto aggReq = uassertStatusOK(aggregation_request_helper::parseFromBSON(
testFooNss,
- BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj())));
+ BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj()
+ << "$db" << testFooNss.db())));
PrivilegeVector privileges =
uassertStatusOK(authzSession->getPrivilegesForAggregate(testFooNss, aggReq, false));
ASSERT_TRUE(authzSession->isAuthorizedForPrivileges(privileges));
- auto aggNoBypassDocumentValidationReq = uassertStatusOK(AggregationRequest::parseFromBSON(
- testFooNss,
- BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline
- << "bypassDocumentValidation" << false << "cursor" << BSONObj())));
+ auto aggNoBypassDocumentValidationReq =
+ uassertStatusOK(aggregation_request_helper::parseFromBSON(
+ testFooNss,
+ BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline
+ << "bypassDocumentValidation" << false << "cursor" << BSONObj()
+ << "$db" << testFooNss.db())));
privileges = uassertStatusOK(authzSession->getPrivilegesForAggregate(
testFooNss, aggNoBypassDocumentValidationReq, false));
@@ -877,10 +898,10 @@ TEST_F(AuthorizationSessionTest,
Privilege(testBarCollResource, {ActionType::insert, ActionType::remove})});
BSONArray pipeline = BSON_ARRAY(BSON("$out" << testBarNss.coll()));
- auto aggReq = uassertStatusOK(AggregationRequest::parseFromBSON(
+ auto aggReq = uassertStatusOK(aggregation_request_helper::parseFromBSON(
testFooNss,
BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj()
- << "bypassDocumentValidation" << true)));
+ << "bypassDocumentValidation" << true << "$db" << testFooNss.db())));
PrivilegeVector privileges =
uassertStatusOK(authzSession->getPrivilegesForAggregate(testFooNss, aggReq, false));
ASSERT_FALSE(authzSession->isAuthorizedForPrivileges(privileges));
@@ -895,10 +916,10 @@ TEST_F(AuthorizationSessionTest,
{ActionType::insert, ActionType::remove, ActionType::bypassDocumentValidation})});
BSONArray pipeline = BSON_ARRAY(BSON("$out" << testBarNss.coll()));
- auto aggReq = uassertStatusOK(AggregationRequest::parseFromBSON(
+ auto aggReq = uassertStatusOK(aggregation_request_helper::parseFromBSON(
testFooNss,
BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj()
- << "bypassDocumentValidation" << true)));
+ << "bypassDocumentValidation" << true << "$db" << testFooNss.db())));
PrivilegeVector privileges =
uassertStatusOK(authzSession->getPrivilegesForAggregate(testFooNss, aggReq, true));
ASSERT_TRUE(authzSession->isAuthorizedForPrivileges(privileges));
@@ -908,9 +929,10 @@ TEST_F(AuthorizationSessionTest, CannotAggregateLookupWithoutFindOnJoinedNamespa
authzSession->assumePrivilegesForDB(Privilege(testFooCollResource, {ActionType::find}));
BSONArray pipeline = BSON_ARRAY(BSON("$lookup" << BSON("from" << testBarNss.coll())));
- auto aggReq = uassertStatusOK(AggregationRequest::parseFromBSON(
+ auto aggReq = uassertStatusOK(aggregation_request_helper::parseFromBSON(
testFooNss,
- BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj())));
+ BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj()
+ << "$db" << testFooNss.db())));
PrivilegeVector privileges =
uassertStatusOK(authzSession->getPrivilegesForAggregate(testFooNss, aggReq, false));
ASSERT_FALSE(authzSession->isAuthorizedForPrivileges(privileges));
@@ -921,9 +943,10 @@ TEST_F(AuthorizationSessionTest, CanAggregateLookupWithFindOnJoinedNamespace) {
Privilege(testBarCollResource, {ActionType::find})});
BSONArray pipeline = BSON_ARRAY(BSON("$lookup" << BSON("from" << testBarNss.coll())));
- auto aggReq = uassertStatusOK(AggregationRequest::parseFromBSON(
+ auto aggReq = uassertStatusOK(aggregation_request_helper::parseFromBSON(
testFooNss,
- BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj())));
+ BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj()
+ << "$db" << testFooNss.db())));
PrivilegeVector privileges =
uassertStatusOK(authzSession->getPrivilegesForAggregate(testFooNss, aggReq, true));
ASSERT_TRUE(authzSession->isAuthorizedForPrivileges(privileges));
@@ -937,9 +960,10 @@ TEST_F(AuthorizationSessionTest, CannotAggregateLookupWithoutFindOnNestedJoinedN
BSONArray nestedPipeline = BSON_ARRAY(BSON("$lookup" << BSON("from" << testQuxNss.coll())));
BSONArray pipeline = BSON_ARRAY(
BSON("$lookup" << BSON("from" << testBarNss.coll() << "pipeline" << nestedPipeline)));
- auto aggReq = uassertStatusOK(AggregationRequest::parseFromBSON(
+ auto aggReq = uassertStatusOK(aggregation_request_helper::parseFromBSON(
testFooNss,
- BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj())));
+ BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj()
+ << "$db" << testFooNss.db())));
PrivilegeVector privileges =
uassertStatusOK(authzSession->getPrivilegesForAggregate(testFooNss, aggReq, false));
ASSERT_FALSE(authzSession->isAuthorizedForPrivileges(privileges));
@@ -953,9 +977,10 @@ TEST_F(AuthorizationSessionTest, CanAggregateLookupWithFindOnNestedJoinedNamespa
BSONArray nestedPipeline = BSON_ARRAY(BSON("$lookup" << BSON("from" << testQuxNss.coll())));
BSONArray pipeline = BSON_ARRAY(
BSON("$lookup" << BSON("from" << testBarNss.coll() << "pipeline" << nestedPipeline)));
- auto aggReq = uassertStatusOK(AggregationRequest::parseFromBSON(
+ auto aggReq = uassertStatusOK(aggregation_request_helper::parseFromBSON(
testFooNss,
- BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj())));
+ BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj()
+ << "$db" << testFooNss.db())));
PrivilegeVector privileges =
uassertStatusOK(authzSession->getPrivilegesForAggregate(testFooNss, aggReq, false));
ASSERT_TRUE(authzSession->isAuthorizedForPrivileges(privileges));
@@ -997,9 +1022,10 @@ TEST_F(AuthorizationSessionTest, CheckAuthForAggregateWithDeeplyNestedLookup) {
BSONArrayBuilder pipelineBuilder(cmdBuilder.subarrayStart("pipeline"));
addNestedPipeline(&pipelineBuilder, maxLookupDepth);
pipelineBuilder.doneFast();
- cmdBuilder << "cursor" << BSONObj();
+ cmdBuilder << "cursor" << BSONObj() << "$db" << testFooNss.db();
- auto aggReq = uassertStatusOK(AggregationRequest::parseFromBSON(testFooNss, cmdBuilder.obj()));
+ auto aggReq =
+ uassertStatusOK(aggregation_request_helper::parseFromBSON(testFooNss, cmdBuilder.obj()));
PrivilegeVector privileges =
uassertStatusOK(authzSession->getPrivilegesForAggregate(testFooNss, aggReq, false));
ASSERT_TRUE(authzSession->isAuthorizedForPrivileges(privileges));
@@ -1010,9 +1036,10 @@ TEST_F(AuthorizationSessionTest, CannotAggregateGraphLookupWithoutFindOnJoinedNa
authzSession->assumePrivilegesForDB(Privilege(testFooCollResource, {ActionType::find}));
BSONArray pipeline = BSON_ARRAY(BSON("$graphLookup" << BSON("from" << testBarNss.coll())));
- auto aggReq = uassertStatusOK(AggregationRequest::parseFromBSON(
+ auto aggReq = uassertStatusOK(aggregation_request_helper::parseFromBSON(
testFooNss,
- BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj())));
+ BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj()
+ << "$db" << testFooNss.db())));
PrivilegeVector privileges =
uassertStatusOK(authzSession->getPrivilegesForAggregate(testFooNss, aggReq, false));
ASSERT_FALSE(authzSession->isAuthorizedForPrivileges(privileges));
@@ -1023,9 +1050,10 @@ TEST_F(AuthorizationSessionTest, CanAggregateGraphLookupWithFindOnJoinedNamespac
Privilege(testBarCollResource, {ActionType::find})});
BSONArray pipeline = BSON_ARRAY(BSON("$graphLookup" << BSON("from" << testBarNss.coll())));
- auto aggReq = uassertStatusOK(AggregationRequest::parseFromBSON(
+ auto aggReq = uassertStatusOK(aggregation_request_helper::parseFromBSON(
testFooNss,
- BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj())));
+ BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj()
+ << "$db" << testFooNss.db())));
PrivilegeVector privileges =
uassertStatusOK(authzSession->getPrivilegesForAggregate(testFooNss, aggReq, false));
ASSERT_TRUE(authzSession->isAuthorizedForPrivileges(privileges));
@@ -1039,9 +1067,10 @@ TEST_F(AuthorizationSessionTest,
BSONArray pipeline =
BSON_ARRAY(fromjson("{$facet: {lookup: [{$lookup: {from: 'bar'}}], graphLookup: "
"[{$graphLookup: {from: 'qux'}}]}}"));
- auto aggReq = uassertStatusOK(AggregationRequest::parseFromBSON(
+ auto aggReq = uassertStatusOK(aggregation_request_helper::parseFromBSON(
testFooNss,
- BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj())));
+ BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj()
+ << "$db" << testFooNss.db())));
PrivilegeVector privileges =
uassertStatusOK(authzSession->getPrivilegesForAggregate(testFooNss, aggReq, false));
ASSERT_FALSE(authzSession->isAuthorizedForPrivileges(privileges));
@@ -1067,9 +1096,10 @@ TEST_F(AuthorizationSessionTest,
BSON_ARRAY(fromjson("{$facet: {lookup: [{$lookup: {from: 'bar'}}], graphLookup: "
"[{$graphLookup: {from: 'qux'}}]}}"));
- auto aggReq = uassertStatusOK(AggregationRequest::parseFromBSON(
+ auto aggReq = uassertStatusOK(aggregation_request_helper::parseFromBSON(
testFooNss,
- BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj())));
+ BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "cursor" << BSONObj()
+ << "$db" << testFooNss.db())));
PrivilegeVector privileges =
uassertStatusOK(authzSession->getPrivilegesForAggregate(testFooNss, aggReq, true));
ASSERT_TRUE(authzSession->isAuthorizedForPrivileges(privileges));
diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript
index 10d508f37a9..7ea953962ef 100644
--- a/src/mongo/db/commands/SConscript
+++ b/src/mongo/db/commands/SConscript
@@ -332,6 +332,7 @@ env.Library(
'$BUILD_DIR/mongo/db/curop_failpoint_helpers',
'$BUILD_DIR/mongo/db/index_builds_coordinator_interface',
'$BUILD_DIR/mongo/db/ops/write_ops_exec',
+ '$BUILD_DIR/mongo/db/pipeline/aggregation_request_helper',
'$BUILD_DIR/mongo/db/pipeline/process_interface/mongo_process_interface',
'$BUILD_DIR/mongo/db/query/command_request_response',
'$BUILD_DIR/mongo/db/query_exec',
@@ -536,7 +537,7 @@ env.Library(
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/db/namespace_string',
- '$BUILD_DIR/mongo/db/pipeline/aggregation_request',
+ '$BUILD_DIR/mongo/db/pipeline/aggregation_request_helper',
'$BUILD_DIR/mongo/db/query/command_request_response',
'$BUILD_DIR/mongo/db/service_context',
'test_commands_enabled'
diff --git a/src/mongo/db/commands/count_cmd.cpp b/src/mongo/db/commands/count_cmd.cpp
index bb5b45a5c9f..be6b252de96 100644
--- a/src/mongo/db/commands/count_cmd.cpp
+++ b/src/mongo/db/commands/count_cmd.cpp
@@ -37,6 +37,7 @@
#include "mongo/db/curop_failpoint_helpers.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/exec/count.h"
+#include "mongo/db/pipeline/aggregation_request_helper.h"
#include "mongo/db/query/collection_query_info.h"
#include "mongo/db/query/count_command_as_aggregation_command.h"
#include "mongo/db/query/explain.h"
@@ -157,8 +158,10 @@ public:
return viewAggregation.getStatus();
}
+ auto viewAggCmd =
+ OpMsgRequest::fromDBAndBody(nss.db(), viewAggregation.getValue()).body;
auto viewAggRequest =
- AggregationRequest::parseFromBSON(nss, viewAggregation.getValue(), verbosity);
+ aggregation_request_helper::parseFromBSON(nss, viewAggCmd, verbosity);
if (!viewAggRequest.isOK()) {
return viewAggRequest.getStatus();
}
@@ -166,7 +169,7 @@ public:
// An empty PrivilegeVector is acceptable because these privileges are only checked on
// getMore and explain will not open a cursor.
return runAggregate(opCtx,
- viewAggRequest.getValue().getNamespaceString(),
+ viewAggRequest.getValue().getNamespace(),
viewAggRequest.getValue(),
viewAggregation.getValue(),
PrivilegeVector(),
diff --git a/src/mongo/db/commands/current_op.cpp b/src/mongo/db/commands/current_op.cpp
index a5c2b1946c5..6c6e542bdfc 100644
--- a/src/mongo/db/commands/current_op.cpp
+++ b/src/mongo/db/commands/current_op.cpp
@@ -37,6 +37,8 @@
#include "mongo/db/commands/fsync_locked.h"
#include "mongo/db/commands/run_aggregate.h"
#include "mongo/db/exec/document_value/document.h"
+#include "mongo/db/pipeline/aggregate_command_gen.h"
+#include "mongo/db/pipeline/aggregation_request_helper.h"
#include "mongo/db/stats/fill_locker_info.h"
namespace mongo {
@@ -64,9 +66,9 @@ public:
return Status(ErrorCodes::Unauthorized, "Unauthorized");
}
- virtual StatusWith<CursorResponse> runAggregation(
- OperationContext* opCtx, const AggregationRequest& request) const final {
- auto aggCmdObj = request.serializeToCommandObj().toBson();
+ virtual StatusWith<CursorResponse> runAggregation(OperationContext* opCtx,
+ const AggregateCommand& request) const final {
+ auto aggCmdObj = aggregation_request_helper::serializeToCommandObj(request);
rpc::OpMsgReplyBuilder replyBuilder;
@@ -76,7 +78,7 @@ public:
}
auto status = runAggregate(opCtx,
- request.getNamespaceString(),
+ request.getNamespace(),
request,
std::move(aggCmdObj),
privileges,
diff --git a/src/mongo/db/commands/current_op_common.cpp b/src/mongo/db/commands/current_op_common.cpp
index d1a08443e20..ccec9928924 100644
--- a/src/mongo/db/commands/current_op_common.cpp
+++ b/src/mongo/db/commands/current_op_common.cpp
@@ -109,9 +109,9 @@ bool CurrentOpCommandBase::run(OperationContext* opCtx,
pipeline.push_back(groupBuilder.obj());
- // Pipeline is complete; create an AggregationRequest for $currentOp.
- const AggregationRequest request(NamespaceString::makeCollectionlessAggregateNSS("admin"),
- std::move(pipeline));
+ // Pipeline is complete; create an AggregateCommand for $currentOp.
+ const AggregateCommand request(NamespaceString::makeCollectionlessAggregateNSS("admin"),
+ std::move(pipeline));
// Run the pipeline and obtain a CursorResponse.
auto aggResults = uassertStatusOK(runAggregation(opCtx, request));
diff --git a/src/mongo/db/commands/current_op_common.h b/src/mongo/db/commands/current_op_common.h
index 9d03ba274b3..8331f44d990 100644
--- a/src/mongo/db/commands/current_op_common.h
+++ b/src/mongo/db/commands/current_op_common.h
@@ -31,7 +31,7 @@
#include "mongo/db/commands.h"
#include "mongo/db/operation_context.h"
-#include "mongo/db/pipeline/aggregation_request.h"
+#include "mongo/db/pipeline/aggregate_command_gen.h"
#include "mongo/db/query/cursor_response.h"
namespace mongo {
@@ -70,11 +70,11 @@ private:
virtual void modifyPipeline(std::vector<BSONObj>* pipeline) const {};
/**
- * Runs the aggregation specified by the supplied AggregationRequest, returning a CursorResponse
+ * Runs the aggregation specified by the supplied AggregateCommand, returning a CursorResponse
* if successful or a Status containing the error otherwise.
*/
virtual StatusWith<CursorResponse> runAggregation(OperationContext* opCtx,
- const AggregationRequest& request) const = 0;
+ const AggregateCommand& request) const = 0;
/**
* Allows overriders to optionally write additional data to the response object before the final
diff --git a/src/mongo/db/commands/distinct.cpp b/src/mongo/db/commands/distinct.cpp
index 78c835cb68d..34b8cddab51 100644
--- a/src/mongo/db/commands/distinct.cpp
+++ b/src/mongo/db/commands/distinct.cpp
@@ -46,6 +46,7 @@
#include "mongo/db/jsobj.h"
#include "mongo/db/matcher/extensions_callback_real.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/pipeline/aggregation_request_helper.h"
#include "mongo/db/query/collection_query_info.h"
#include "mongo/db/query/cursor_response.h"
#include "mongo/db/query/explain.h"
@@ -158,8 +159,10 @@ public:
return viewAggregation.getStatus();
}
+ auto viewAggCmd =
+ OpMsgRequest::fromDBAndBody(nss.db(), viewAggregation.getValue()).body;
auto viewAggRequest =
- AggregationRequest::parseFromBSON(nss, viewAggregation.getValue(), verbosity);
+ aggregation_request_helper::parseFromBSON(nss, viewAggCmd, verbosity);
if (!viewAggRequest.isOK()) {
return viewAggRequest.getStatus();
}
diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp
index 2e65c35d43c..f7eb40c9025 100644
--- a/src/mongo/db/commands/find_cmd.cpp
+++ b/src/mongo/db/commands/find_cmd.cpp
@@ -41,6 +41,7 @@
#include "mongo/db/db_raii.h"
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/matcher/extensions_callback_real.h"
+#include "mongo/db/pipeline/aggregation_request_helper.h"
#include "mongo/db/pipeline/variables.h"
#include "mongo/db/query/collation/collator_factory_interface.h"
#include "mongo/db/query/cursor_response.h"
@@ -267,16 +268,17 @@ public:
const auto& qr = cq->getQueryRequest();
auto viewAggregationCommand = uassertStatusOK(qr.asAggregationCommand());
+ auto viewAggCmd = OpMsgRequest::fromDBAndBody(_dbName, viewAggregationCommand).body;
// Create the agg request equivalent of the find operation, with the explain
// verbosity included.
auto aggRequest = uassertStatusOK(
- AggregationRequest::parseFromBSON(nss, viewAggregationCommand, verbosity));
+ aggregation_request_helper::parseFromBSON(nss, viewAggCmd, verbosity));
try {
// An empty PrivilegeVector is acceptable because these privileges are only
// checked on getMore and explain will not open a cursor.
uassertStatusOK(runAggregate(
- opCtx, nss, aggRequest, viewAggregationCommand, PrivilegeVector(), result));
+ opCtx, nss, aggRequest, viewAggCmd, PrivilegeVector(), result));
} catch (DBException& error) {
if (error.code() == ErrorCodes::InvalidPipelineOperator) {
uasserted(ErrorCodes::InvalidPipelineOperator,
diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp
index 738d161ce65..6535d11c2b1 100644
--- a/src/mongo/db/commands/pipeline_command.cpp
+++ b/src/mongo/db/commands/pipeline_command.cpp
@@ -33,6 +33,8 @@
#include "mongo/db/commands.h"
#include "mongo/db/commands/run_aggregate.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/pipeline/aggregate_command_gen.h"
+#include "mongo/db/pipeline/aggregation_request_helper.h"
#include "mongo/db/pipeline/lite_parsed_pipeline.h"
#include "mongo/db/pipeline/pipeline.h"
@@ -70,13 +72,13 @@ public:
OperationContext* opCtx,
const OpMsgRequest& opMsgRequest,
boost::optional<ExplainOptions::Verbosity> explainVerbosity) override {
- const auto aggregationRequest = uassertStatusOK(AggregationRequest::parseFromBSON(
+ const auto aggregationRequest = uassertStatusOK(aggregation_request_helper::parseFromBSON(
opMsgRequest.getDatabase().toString(), opMsgRequest.body, explainVerbosity));
- auto privileges = uassertStatusOK(
- AuthorizationSession::get(opCtx->getClient())
- ->getPrivilegesForAggregate(
- aggregationRequest.getNamespaceString(), aggregationRequest, false));
+ auto privileges =
+ uassertStatusOK(AuthorizationSession::get(opCtx->getClient())
+ ->getPrivilegesForAggregate(
+ aggregationRequest.getNamespace(), aggregationRequest, false));
return std::make_unique<Invocation>(
this, opMsgRequest, std::move(aggregationRequest), std::move(privileges));
@@ -94,7 +96,7 @@ public:
public:
Invocation(Command* cmd,
const OpMsgRequest& request,
- const AggregationRequest aggregationRequest,
+ const AggregateCommand aggregationRequest,
PrivilegeVector privileges)
: CommandInvocation(cmd),
_request(request),
@@ -133,7 +135,7 @@ public:
opCtx, !Pipeline::aggHasWriteStage(_request.body));
uassertStatusOK(runAggregate(opCtx,
- _aggregationRequest.getNamespaceString(),
+ _aggregationRequest.getNamespace(),
_aggregationRequest,
_liteParsedPipeline,
_request.body,
@@ -142,7 +144,7 @@ public:
}
NamespaceString ns() const override {
- return _aggregationRequest.getNamespaceString();
+ return _aggregationRequest.getNamespace();
}
void explain(OperationContext* opCtx,
@@ -150,7 +152,7 @@ public:
rpc::ReplyBuilderInterface* result) override {
uassertStatusOK(runAggregate(opCtx,
- _aggregationRequest.getNamespaceString(),
+ _aggregationRequest.getNamespace(),
_aggregationRequest,
_liteParsedPipeline,
_request.body,
@@ -167,7 +169,7 @@ public:
const OpMsgRequest& _request;
const std::string _dbName;
- const AggregationRequest _aggregationRequest;
+ const AggregateCommand _aggregationRequest;
const LiteParsedPipeline _liteParsedPipeline;
const PrivilegeVector _privileges;
};
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index c4d80f31c60..c03ec31d6b9 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -47,6 +47,7 @@
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/pipeline/accumulator.h"
+#include "mongo/db/pipeline/aggregation_request_helper.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_exchange.h"
#include "mongo/db/pipeline/document_source_geo_near.h"
@@ -98,14 +99,14 @@ namespace {
*/
bool canOptimizeAwayPipeline(const Pipeline* pipeline,
const PlanExecutor* exec,
- const AggregationRequest& request,
+ const AggregateCommand& request,
bool hasGeoNearStage,
bool hasChangeStreamStage) {
return pipeline && exec && !hasGeoNearStage && !hasChangeStreamStage &&
pipeline->getSources().empty() &&
// For exchange we will create a number of pipelines consisting of a single
// DocumentSourceExchange stage, so cannot not optimize it away.
- !request.getExchangeSpec();
+ !request.getExchange();
}
/**
@@ -118,7 +119,7 @@ bool handleCursorCommand(OperationContext* opCtx,
boost::intrusive_ptr<ExpressionContext> expCtx,
const NamespaceString& nsForCursor,
std::vector<ClientCursor*> cursors,
- const AggregationRequest& request,
+ const AggregateCommand& request,
const BSONObj& cmdObj,
rpc::ReplyBuilderInterface* result) {
invariant(!cursors.empty());
@@ -258,7 +259,7 @@ bool handleCursorCommand(OperationContext* opCtx,
}
StatusWith<StringMap<ExpressionContext::ResolvedNamespace>> resolveInvolvedNamespaces(
- OperationContext* opCtx, const AggregationRequest& request) {
+ OperationContext* opCtx, const AggregateCommand& request) {
const LiteParsedPipeline liteParsedPipeline(request);
const auto& pipelineInvolvedNamespaces = liteParsedPipeline.getInvolvedNamespaces();
@@ -274,7 +275,7 @@ StatusWith<StringMap<ExpressionContext::ResolvedNamespace>> resolveInvolvedNames
// resolution of the view definitions below might lead into an endless cycle if any are allowed
// to change.
auto viewCatalog =
- DatabaseHolder::get(opCtx)->getViewCatalog(opCtx, request.getNamespaceString().db());
+ DatabaseHolder::get(opCtx)->getViewCatalog(opCtx, request.getNamespace().db());
std::deque<NamespaceString> involvedNamespacesQueue(pipelineInvolvedNamespaces.begin(),
pipelineInvolvedNamespaces.end());
@@ -314,14 +315,14 @@ StatusWith<StringMap<ExpressionContext::ResolvedNamespace>> resolveInvolvedNames
// If the involved namespace is not in the same database as the aggregation, it must be
// from an $out or a $merge to a collection in a different database.
- if (involvedNs.db() != request.getNamespaceString().db()) {
+ if (involvedNs.db() != request.getNamespace().db()) {
// SERVER-51886: It is not correct to assume that we are reading from a collection
// because the collection targeted by $out/$merge on a given database can have the same
// name as a view on the source database. As such, we determine whether the collection
// name references a view on the aggregation request's database. Note that the inverse
// scenario (mistaking a view for a collection) is not an issue because $merge/$out
// cannot target a view.
- auto nssToCheck = NamespaceString(request.getNamespaceString().db(), involvedNs.coll());
+ auto nssToCheck = NamespaceString(request.getNamespace().db(), involvedNs.coll());
if (viewCatalog && viewCatalog->lookup(opCtx, nssToCheck.ns())) {
auto status = resolveViewDefinition(nssToCheck);
if (!status.isOK()) {
@@ -391,18 +392,18 @@ Status collatorCompatibleWithPipeline(OperationContext* opCtx,
// versioned. This can happen in the case where we are running in a cluster with a 4.4 mongoS, which
// does not set any shard version on a $mergeCursors pipeline.
void setIgnoredShardVersionForMergeCursors(OperationContext* opCtx,
- const AggregationRequest& request) {
- auto isMergeCursors = request.isFromMongos() && request.getPipeline().size() > 0 &&
+ const AggregateCommand& request) {
+ auto isMergeCursors = request.getFromMongos() && request.getPipeline().size() > 0 &&
request.getPipeline().front().firstElementFieldNameStringData() == "$mergeCursors"_sd;
if (isMergeCursors && !OperationShardingState::isOperationVersioned(opCtx)) {
OperationShardingState::get(opCtx).initializeClientRoutingVersions(
- request.getNamespaceString(), ChunkVersion::IGNORED(), boost::none);
+ request.getNamespace(), ChunkVersion::IGNORED(), boost::none);
}
}
boost::intrusive_ptr<ExpressionContext> makeExpressionContext(
OperationContext* opCtx,
- const AggregationRequest& request,
+ const AggregateCommand& request,
std::unique_ptr<CollatorInterface> collator,
boost::optional<UUID> uuid) {
setIgnoredShardVersionForMergeCursors(opCtx, request);
@@ -462,14 +463,14 @@ void _adjustChangeStreamReadConcern(OperationContext* opCtx) {
std::vector<std::unique_ptr<Pipeline, PipelineDeleter>> createExchangePipelinesIfNeeded(
OperationContext* opCtx,
boost::intrusive_ptr<ExpressionContext> expCtx,
- const AggregationRequest& request,
+ const AggregateCommand& request,
std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
boost::optional<UUID> uuid) {
std::vector<std::unique_ptr<Pipeline, PipelineDeleter>> pipelines;
- if (request.getExchangeSpec() && !expCtx->explain) {
+ if (request.getExchange() && !expCtx->explain) {
boost::intrusive_ptr<Exchange> exchange =
- new Exchange(request.getExchangeSpec().get(), std::move(pipeline));
+ new Exchange(request.getExchange().get(), std::move(pipeline));
for (size_t idx = 0; idx < exchange->getConsumers(); ++idx) {
// For every new pipeline we have create a new ExpressionContext as the context
@@ -498,7 +499,7 @@ std::vector<std::unique_ptr<Pipeline, PipelineDeleter>> createExchangePipelinesI
Status runAggregate(OperationContext* opCtx,
const NamespaceString& nss,
- const AggregationRequest& request,
+ const AggregateCommand& request,
const BSONObj& cmdObj,
const PrivilegeVector& privileges,
rpc::ReplyBuilderInterface* result) {
@@ -507,13 +508,13 @@ Status runAggregate(OperationContext* opCtx,
Status runAggregate(OperationContext* opCtx,
const NamespaceString& origNss,
- const AggregationRequest& request,
+ const AggregateCommand& request,
const LiteParsedPipeline& liteParsedPipeline,
const BSONObj& cmdObj,
const PrivilegeVector& privileges,
rpc::ReplyBuilderInterface* result) {
// For operations on views, this will be the underlying namespace.
- NamespaceString nss = request.getNamespaceString();
+ NamespaceString nss = request.getNamespace();
// The collation to use for this aggregation. boost::optional to distinguish between the case
// where the collation has not yet been resolved, and where it has been resolved to nullptr.
@@ -546,7 +547,7 @@ Status runAggregate(OperationContext* opCtx,
// If this is a change stream, perform special checks and change the execution namespace.
if (liteParsedPipeline.hasChangeStream()) {
uassert(4928900,
- str::stream() << AggregationRequest::kCollectionUUIDName
+ str::stream() << AggregateCommand::kCollectionUUIDFieldName
<< " is not supported for a change stream",
!request.getCollectionUUID());
@@ -569,14 +570,14 @@ Status runAggregate(OperationContext* opCtx,
// If the user specified an explicit collation, adopt it; otherwise, use the simple
// collation. We do not inherit the collection's default collation or UUID, since
// the stream may be resuming from a point before the current UUID existed.
- collatorToUse.emplace(
- PipelineD::resolveCollator(opCtx, request.getCollation(), nullptr));
+ collatorToUse.emplace(PipelineD::resolveCollator(
+ opCtx, request.getCollation().get_value_or(BSONObj()), nullptr));
// Obtain collection locks on the execution namespace; that is, the oplog.
ctx.emplace(opCtx, nss, AutoGetCollectionViewMode::kViewsForbidden);
} else if (nss.isCollectionlessAggregateNS() && pipelineInvolvedNamespaces.empty()) {
uassert(4928901,
- str::stream() << AggregationRequest::kCollectionUUIDName
+ str::stream() << AggregateCommand::kCollectionUUIDFieldName
<< " is not supported for a collectionless aggregation",
!request.getCollectionUUID());
@@ -586,13 +587,13 @@ Status runAggregate(OperationContext* opCtx,
Top::LockType::NotLocked,
AutoStatsTracker::LogMode::kUpdateTopAndCurOp,
0);
- collatorToUse.emplace(
- PipelineD::resolveCollator(opCtx, request.getCollation(), nullptr));
+ collatorToUse.emplace(PipelineD::resolveCollator(
+ opCtx, request.getCollation().get_value_or(BSONObj()), nullptr));
} else {
// This is a regular aggregation. Lock the collection or view.
ctx.emplace(opCtx, nss, AutoGetCollectionViewMode::kViewsPermitted);
- collatorToUse.emplace(
- PipelineD::resolveCollator(opCtx, request.getCollation(), ctx->getCollection()));
+ collatorToUse.emplace(PipelineD::resolveCollator(
+ opCtx, request.getCollation().get_value_or(BSONObj()), ctx->getCollection()));
if (ctx->getCollection()) {
uuid = ctx->getCollection()->uuid();
}
@@ -609,13 +610,13 @@ Status runAggregate(OperationContext* opCtx,
invariant(nss != NamespaceString::kRsOplogNamespace);
invariant(!nss.isCollectionlessAggregateNS());
uassert(ErrorCodes::OptionNotSupportedOnView,
- str::stream() << AggregationRequest::kCollectionUUIDName
+ str::stream() << AggregateCommand::kCollectionUUIDFieldName
<< " is not supported against a view",
!request.getCollectionUUID());
// Check that the default collation of 'view' is compatible with the operation's
// collation. The check is skipped if the request did not specify a collation.
- if (!request.getCollation().isEmpty()) {
+ if (!request.getCollation().get_value_or(BSONObj()).isEmpty()) {
invariant(collatorToUse); // Should already be resolved at this point.
if (!CollatorInterface::collatorsMatch(ctx->getView()->defaultCollator(),
collatorToUse->get())) {
@@ -637,7 +638,7 @@ Status runAggregate(OperationContext* opCtx,
// Parse the resolved view into a new aggregation request.
auto newRequest = resolvedView.asExpandedViewAggregation(request);
- auto newCmd = newRequest.serializeToCommandObj().toBson();
+ auto newCmd = aggregation_request_helper::serializeToCommandObj(newRequest);
auto status = runAggregate(opCtx, origNss, newRequest, newCmd, privileges, result);
diff --git a/src/mongo/db/commands/run_aggregate.h b/src/mongo/db/commands/run_aggregate.h
index 059c48a599b..30376367e83 100644
--- a/src/mongo/db/commands/run_aggregate.h
+++ b/src/mongo/db/commands/run_aggregate.h
@@ -34,7 +34,7 @@
#include "mongo/db/auth/privilege.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
-#include "mongo/db/pipeline/aggregation_request.h"
+#include "mongo/db/pipeline/aggregate_command_gen.h"
#include "mongo/db/pipeline/lite_parsed_pipeline.h"
#include "mongo/rpc/op_msg_rpc_impls.h"
@@ -53,7 +53,7 @@ namespace mongo {
*/
Status runAggregate(OperationContext* opCtx,
const NamespaceString& nss,
- const AggregationRequest& request,
+ const AggregateCommand& request,
const LiteParsedPipeline& liteParsedPipeline,
const BSONObj& cmdObj,
const PrivilegeVector& privileges,
@@ -64,7 +64,7 @@ Status runAggregate(OperationContext* opCtx,
*/
Status runAggregate(OperationContext* opCtx,
const NamespaceString& nss,
- const AggregationRequest& request,
+ const AggregateCommand& request,
const BSONObj& cmdObj,
const PrivilegeVector& privileges,
rpc::ReplyBuilderInterface* result);
diff --git a/src/mongo/db/commands/user_management_commands.cpp b/src/mongo/db/commands/user_management_commands.cpp
index f32b686f5a6..0d3387048fb 100644
--- a/src/mongo/db/commands/user_management_commands.cpp
+++ b/src/mongo/db/commands/user_management_commands.cpp
@@ -65,6 +65,7 @@
#include "mongo/db/jsobj.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/ops/write_ops.h"
+#include "mongo/db/pipeline/aggregation_request_helper.h"
#include "mongo/db/query/cursor_response.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/service_context.h"
@@ -1417,13 +1418,13 @@ UsersInfoReply CmdUMCTyped<UsersInfoCommand, UsersInfoReply, UMCInfoParams>::Inv
DBDirectClient client(opCtx);
rpc::OpMsgReplyBuilder replyBuilder;
- AggregationRequest aggRequest(AuthorizationManager::usersCollectionNamespace,
- std::move(pipeline));
+ AggregateCommand aggRequest(AuthorizationManager::usersCollectionNamespace,
+ std::move(pipeline));
// Impose no cursor privilege requirements, as cursor is drained internally
uassertStatusOK(runAggregate(opCtx,
AuthorizationManager::usersCollectionNamespace,
aggRequest,
- aggRequest.serializeToCommandObj().toBson(),
+ aggregation_request_helper::serializeToCommandObj(aggRequest),
PrivilegeVector(),
&replyBuilder));
auto bodyBuilder = replyBuilder.getBodyBuilder();
diff --git a/src/mongo/db/commands/write_commands/write_commands.cpp b/src/mongo/db/commands/write_commands/write_commands.cpp
index 6421054ae15..a73bdeea9ad 100644
--- a/src/mongo/db/commands/write_commands/write_commands.cpp
+++ b/src/mongo/db/commands/write_commands/write_commands.cpp
@@ -50,6 +50,7 @@
#include "mongo/db/ops/parsed_update.h"
#include "mongo/db/ops/write_ops.h"
#include "mongo/db/ops/write_ops_exec.h"
+#include "mongo/db/pipeline/aggregate_command_gen.h"
#include "mongo/db/pipeline/lite_parsed_pipeline.h"
#include "mongo/db/query/explain.h"
#include "mongo/db/query/get_executor.h"
@@ -762,8 +763,7 @@ private:
// which stages were being used.
auto& updateMod = update.getU();
if (updateMod.type() == write_ops::UpdateModification::Type::kPipeline) {
- AggregationRequest request(_batch.getNamespace(),
- updateMod.getUpdatePipeline());
+ AggregateCommand request(_batch.getNamespace(), updateMod.getUpdatePipeline());
LiteParsedPipeline pipeline(request);
pipeline.tickGlobalStageCounters();
_updateMetrics->incrementExecutedWithAggregationPipeline();
diff --git a/src/mongo/db/error_labels.cpp b/src/mongo/db/error_labels.cpp
index d6413300548..7ac35e006fd 100644
--- a/src/mongo/db/error_labels.cpp
+++ b/src/mongo/db/error_labels.cpp
@@ -30,7 +30,7 @@
#include "mongo/db/error_labels.h"
#include "mongo/db/commands.h"
#include "mongo/db/curop.h"
-#include "mongo/db/pipeline/aggregation_request.h"
+#include "mongo/db/pipeline/aggregation_request_helper.h"
#include "mongo/db/pipeline/lite_parsed_pipeline.h"
namespace mongo {
@@ -101,7 +101,8 @@ bool ErrorLabelBuilder::isResumableChangeStreamError() const {
// Do enough parsing to confirm that this is a well-formed pipeline with a $changeStream.
const auto swLitePipe = [&nss, &cmdObj]() -> StatusWith<LiteParsedPipeline> {
try {
- auto aggRequest = uassertStatusOK(AggregationRequest::parseFromBSON(nss, cmdObj));
+ auto aggRequest =
+ uassertStatusOK(aggregation_request_helper::parseFromBSON(nss, cmdObj));
return LiteParsedPipeline(aggRequest);
} catch (const DBException& ex) {
return ex.toStatus();
diff --git a/src/mongo/db/error_labels_test.cpp b/src/mongo/db/error_labels_test.cpp
index 2bf321fab1c..8a2545cec10 100644
--- a/src/mongo/db/error_labels_test.cpp
+++ b/src/mongo/db/error_labels_test.cpp
@@ -32,6 +32,8 @@
#include "mongo/db/curop.h"
#include "mongo/db/error_labels.h"
#include "mongo/db/logical_session_id.h"
+#include "mongo/db/pipeline/aggregate_command_gen.h"
+#include "mongo/db/pipeline/aggregation_request_helper.h"
#include "mongo/db/pipeline/lite_parsed_pipeline.h"
#include "mongo/db/service_context_test_fixture.h"
#include "mongo/unittest/unittest.h"
@@ -290,8 +292,8 @@ TEST_F(ErrorLabelBuilderTest, ResumableChangeStreamErrorAppliesToChangeStreamAgg
// is the only factor that determines the success or failure of isResumableChangeStreamError().
auto cmdObj = BSON("aggregate" << nss().coll() << "pipeline"
<< BSON_ARRAY(BSON("$changeStream" << BSONObj())) << "cursor"
- << BSONObj());
- auto aggRequest = uassertStatusOK(AggregationRequest::parseFromBSON(nss(), cmdObj));
+ << BSONObj() << "$db" << nss().db());
+ auto aggRequest = uassertStatusOK(aggregation_request_helper::parseFromBSON(nss(), cmdObj));
ASSERT_TRUE(LiteParsedPipeline(aggRequest).hasChangeStream());
// The label applies to a $changeStream "aggregate" command.
@@ -314,8 +316,8 @@ TEST_F(ErrorLabelBuilderTest, ResumableChangeStreamErrorDoesNotApplyToNonResumab
// is the only factor that determines the success or failure of isResumableChangeStreamError().
auto cmdObj = BSON("aggregate" << nss().coll() << "pipeline"
<< BSON_ARRAY(BSON("$changeStream" << BSONObj())) << "cursor"
- << BSONObj());
- auto aggRequest = uassertStatusOK(AggregationRequest::parseFromBSON(nss(), cmdObj));
+ << BSONObj() << "$db" << nss().db());
+ auto aggRequest = uassertStatusOK(aggregation_request_helper::parseFromBSON(nss(), cmdObj));
ASSERT_TRUE(LiteParsedPipeline(aggRequest).hasChangeStream());
// The label does not apply to a ChangeStreamFatalError error on a $changeStream aggregation.
@@ -338,8 +340,8 @@ TEST_F(ErrorLabelBuilderTest, ResumableChangeStreamErrorDoesNotApplyToNonChangeS
// is the only factor that determines the success or failure of isResumableChangeStreamError().
auto cmdObj =
BSON("aggregate" << nss().coll() << "pipeline" << BSON_ARRAY(BSON("$match" << BSONObj()))
- << "cursor" << BSONObj());
- auto aggRequest = uassertStatusOK(AggregationRequest::parseFromBSON(nss(), cmdObj));
+ << "cursor" << BSONObj() << "$db" << nss().db());
+ auto aggRequest = uassertStatusOK(aggregation_request_helper::parseFromBSON(nss(), cmdObj));
ASSERT_FALSE(LiteParsedPipeline(aggRequest).hasChangeStream());
// The label does not apply to a non-$changeStream "aggregate" command.
diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp
index 6b72b07a5dd..b0d876cde44 100644
--- a/src/mongo/db/namespace_string.cpp
+++ b/src/mongo/db/namespace_string.cpp
@@ -187,6 +187,14 @@ std::string NamespaceString::getSisterNS(StringData local) const {
return db().toString() + "." + local.toString();
}
+void NamespaceString::serializeCollectionName(BSONObjBuilder* builder, StringData fieldName) const {
+ if (isCollectionlessAggregateNS()) {
+ builder->append(fieldName, 1);
+ } else {
+ builder->append(fieldName, coll());
+ }
+}
+
bool NamespaceString::isDropPendingNamespace() const {
return coll().startsWith(dropPendingNSPrefix);
}
diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h
index 01f863c5e20..c264421dc13 100644
--- a/src/mongo/db/namespace_string.h
+++ b/src/mongo/db/namespace_string.h
@@ -395,6 +395,8 @@ public:
return {db(), "$cmd"};
}
+ void serializeCollectionName(BSONObjBuilder* builder, StringData fieldName) const;
+
/**
* @return true if the ns is an oplog one, otherwise false.
*/
diff --git a/src/mongo/db/ops/write_ops_parsers.cpp b/src/mongo/db/ops/write_ops_parsers.cpp
index 7c7ef01f16e..295a16e257d 100644
--- a/src/mongo/db/ops/write_ops_parsers.cpp
+++ b/src/mongo/db/ops/write_ops_parsers.cpp
@@ -33,7 +33,7 @@
#include "mongo/db/dbmessage.h"
#include "mongo/db/ops/write_ops.h"
-#include "mongo/db/pipeline/aggregation_request.h"
+#include "mongo/db/pipeline/aggregation_request_helper.h"
#include "mongo/db/update/update_oplog_entry_serialization.h"
#include "mongo/db/update/update_oplog_entry_version.h"
#include "mongo/util/assert_util.h"
@@ -259,7 +259,7 @@ write_ops::UpdateModification::UpdateModification(BSONElement update) {
"Update argument must be either an object or an array",
type == BSONType::Array);
- _update = PipelineUpdate{uassertStatusOK(AggregationRequest::parsePipelineFromBSON(update))};
+ _update = PipelineUpdate{parsePipelineFromBSON(update)};
}
write_ops::UpdateModification::UpdateModification(const BSONObj& update, ClassicTag) {
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 9a5adb2ce1f..6423fa960ef 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -19,7 +19,7 @@ env.Library(
'aggregation.cpp',
],
LIBDEPS=[
- 'aggregation_request',
+ 'aggregation_request_helper',
'expression_context',
'pipeline',
]
@@ -36,9 +36,10 @@ env.Library(
)
env.Library(
- target='aggregation_request',
+ target='aggregation_request_helper',
source=[
- 'aggregation_request.cpp',
+ 'aggregation_request_helper.cpp',
+ env.Idlc('aggregate_command.idl')[0]
],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
@@ -50,6 +51,7 @@ env.Library(
'$BUILD_DIR/mongo/db/repl/read_concern_args',
'$BUILD_DIR/mongo/db/storage/storage_options',
'$BUILD_DIR/mongo/db/write_concern_options',
+ '$BUILD_DIR/mongo/idl/idl_parser',
'document_sources_idl',
]
)
@@ -89,7 +91,7 @@ env.Library(
'$BUILD_DIR/mongo/util/intrusive_counter',
'$BUILD_DIR/mongo/util/regex_util',
'$BUILD_DIR/mongo/util/summation',
- 'aggregation_request',
+ 'aggregation_request_helper',
'dependencies',
'variable_validation',
],
@@ -183,7 +185,7 @@ env.Library(
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/stats/counters',
- 'aggregation_request',
+ 'aggregation_request_helper',
]
)
@@ -431,7 +433,7 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/s/sharding_router_test_fixture',
'$BUILD_DIR/mongo/util/clock_source_mock',
'accumulator',
- 'aggregation_request',
+ 'aggregation_request_helper',
'document_source_mock',
'document_sources_idl',
'expression_context',
diff --git a/src/mongo/db/pipeline/aggregate_command.idl b/src/mongo/db/pipeline/aggregate_command.idl
new file mode 100644
index 00000000000..a242014f1b9
--- /dev/null
+++ b/src/mongo/db/pipeline/aggregate_command.idl
@@ -0,0 +1,164 @@
+# Copyright (C) 2020-present MongoDB, Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the Server Side Public License, version 1,
+# as published by MongoDB, Inc.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# Server Side Public License for more details.
+#
+# You should have received a copy of the Server Side Public License
+# along with this program. If not, see
+# <http://www.mongodb.com/licensing/server-side-public-license>.
+#
+# As a special exception, the copyright holders give permission to link the
+# code of portions of this program with the OpenSSL library under certain
+# conditions as described in each individual source file and distribute
+# linked combinations including the program with the OpenSSL library. You
+# must comply with the Server Side Public License in all respects for
+# all of the code used other than as permitted herein. If you modify file(s)
+# with this exception, you may extend this exception to your version of the
+# file(s), but you are not obligated to do so. If you do not wish to do so,
+# delete this exception statement from your version. If you delete this
+# exception statement from all source files in the program, then also delete
+# it in the license file.
+#
+
+global:
+ cpp_namespace: "mongo"
+ cpp_includes:
+ - "mongo/db/pipeline/aggregation_request_helper.h"
+ - "mongo/db/query/count_request.h"
+ - "mongo/db/query/explain_options.h"
+
+imports:
+ - "mongo/idl/basic_types.idl"
+ - "mongo/db/pipeline/exchange_spec.idl"
+ - "mongo/db/pipeline/legacy_runtime_constants.idl"
+ - "mongo/db/query/hint.idl"
+ - "mongo/db/write_concern_options.idl"
+
+types:
+ pipeline:
+ bson_serialization_type: any
+ description: "An array of objects specifying the aggregation pipeline."
+ cpp_type: "std::vector<mongo::BSONObj>"
+ deserializer: ::mongo::parsePipelineFromBSON
+ batchSize:
+ bson_serialization_type: any
+ description: "An int representing the cursor batch size."
+ cpp_type: "std::int64_t"
+ serializer: ::mongo::serializeBatchSizeToBSON
+ deserializer: ::mongo::parseBatchSizeFromBSON
+ explainVerbosity:
+ bson_serialization_type: any
+ description: "The Verbosity value representing explain verbosity."
+ cpp_type: "mongo::ExplainOptions::Verbosity"
+ serializer: ::mongo::serializeExplainToBSON
+ deserializer: ::mongo::parseExplainModeFromBSON
+
+commands:
+ aggregate:
+ description: "Represents the user-supplied options to the aggregate command."
+ cpp_name: AggregateCommand
+ command_name: aggregate
+ strict: true
+ namespace: concatenate_with_db
+ allow_global_collection_name: true
+ fields:
+ pipeline:
+ description: "An unparsed version of the pipeline."
+ type: pipeline
+ explain:
+ description: "Specifies to return the information on the processing of the pipeline."
+ type: explainVerbosity
+ optional: true
+ allowDiskUse:
+ description: "Enables writing to temporary files."
+ type: optionalBool
+ cursor:
+ description: "To indicate a cursor with a non-default batch size."
+ cpp_name: "batchSize"
+ type: batchSize
+ default: 101
+ validator: { gte: 0 }
+ maxTimeMS:
+ description: "Specifies a time limit in milliseconds for processing operations on a cursor. If you do not specify a value for maxTimeMS, operations will not time out."
+ type: safeInt64
+ validator: { gte: 0 }
+ optional: true
+ bypassDocumentValidation:
+ description: "True if this should bypass the document validation."
+ type: safeBool
+ optional: true
+ readConcern:
+ description: "Specifies the read concern."
+ type: object_owned
+ optional: true
+ collation:
+ description: "Specifies the collation to use for the operation."
+ type: object_owned
+ optional: true
+ hint:
+ description: "The index name to use or the index specification document."
+ type: indexHint
+ optional: true
+ writeConcern:
+ description: "A document that expresses the write concern to use with the $out or $merge stage."
+ type: WriteConcern
+ optional: true
+ let:
+ description: "A document containing user-specified let parameter constants; i.e. values that do not change once computed."
+ type: object_owned
+ optional: true
+ needsMerge:
+ description: "True if this request represents the shards part of a split pipeline, and should produce mergeable output."
+ type: optionalBool
+ fromMongos:
+ description: "True if this request originated from a mongoS."
+ type: optionalBool
+
+ # TODO: Mark the undocumented parameters below as 'unstable'.
+ $queryOptions:
+ description: "The unwrapped readPreference object, if one was given to us by the mongos command processor. This object will be empty when no readPreference is specified or if the request does not originate from mongos."
+ cpp_name: unwrappedReadPref
+ type: object_owned
+ optional: true
+ $_requestReshardingResumeToken:
+ description: "True if this requests resharding resume token."
+ cpp_name: requestReshardingResumeToken
+ type: optionalBool
+ exchange:
+ description: "An optional exchange specification for this request. If set it means that the request represents a producer running as a part of the exchange machinery. This is an internal option; we do not expect it to be set on requests from users or drivers."
+ type: ExchangeSpec
+ optional: true
+ runtimeConstants:
+ description: "A legacy way to specify constant variables available during execution. 'let' is now preferred."
+ type: LegacyRuntimeConstants
+ cpp_name: legacyRuntimeConstants
+ optional: true
+ isMapReduceCommand:
+ description: "True if an aggregation was invoked by the MapReduce command."
+ type: optionalBool
+ collectionUUID:
+ description: "The expected UUID of the namespace the aggregation executes on."
+ type: uuid
+ optional: true
+ use44SortKeys:
+ # TODO SERVER-47065: A 5.0 node still has to accept the 'use44SortKeys' field, since it
+ # could be included in a command sent from a 4.4 mongos or 4.4 mongod. In 5.1, this
+ # code to tolerate the 'use44SortKeys' field can be deleted.
+ description: "If true, this aggregation will use the 4.4 sort key format."
+ type: bool
+ ignore: true
+ useNewUpsert:
+ # TODO SERVER-46751: we must retain the ability to ingest the 'useNewUpsert' field for
+ # 5.0 upgrade purposes, since a 4.4 mongoS will always send {useNewUpsert:true} to the
+ # shards. We do nothing with it because useNewUpsert will be automatically used in 5.0
+ # when appropriate. Remove this final vestige of useNewUpsert when 5.0 becomes
+ # last-lts.
+ description: "A flag to indicate wether or not to use new upsert option."
+ type: bool
+ ignore: true
diff --git a/src/mongo/db/pipeline/aggregation_request.cpp b/src/mongo/db/pipeline/aggregation_request.cpp
deleted file mode 100644
index 2e53d5fc8e7..00000000000
--- a/src/mongo/db/pipeline/aggregation_request.cpp
+++ /dev/null
@@ -1,363 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/pipeline/aggregation_request.h"
-
-#include <algorithm>
-
-#include "mongo/base/error_codes.h"
-#include "mongo/base/status_with.h"
-#include "mongo/base/string_data.h"
-#include "mongo/bson/bsonobjbuilder.h"
-#include "mongo/db/catalog/document_validation.h"
-#include "mongo/db/commands.h"
-#include "mongo/db/exec/document_value/document.h"
-#include "mongo/db/exec/document_value/value.h"
-#include "mongo/db/query/cursor_request.h"
-#include "mongo/db/query/query_request.h"
-#include "mongo/db/repl/read_concern_args.h"
-#include "mongo/db/storage/storage_options.h"
-#include "mongo/idl/command_generic_argument.h"
-
-namespace mongo {
-
-StatusWith<AggregationRequest> AggregationRequest::parseFromBSON(
- const std::string& dbName,
- const BSONObj& cmdObj,
- boost::optional<ExplainOptions::Verbosity> explainVerbosity) {
- return parseFromBSON(parseNs(dbName, cmdObj), cmdObj, explainVerbosity);
-}
-
-StatusWith<AggregationRequest> AggregationRequest::parseFromBSON(
- NamespaceString nss,
- const BSONObj& cmdObj,
- boost::optional<ExplainOptions::Verbosity> explainVerbosity) {
- // Parse required parameters.
- auto pipelineElem = cmdObj[kPipelineName];
- auto pipeline = AggregationRequest::parsePipelineFromBSON(pipelineElem);
- if (!pipeline.isOK()) {
- return pipeline.getStatus();
- }
-
- AggregationRequest request(std::move(nss), std::move(pipeline.getValue()));
-
- const std::initializer_list<StringData> optionsParsedElseWhere = {kPipelineName, kCommandName};
-
- bool hasCursorElem = false;
- bool hasExplainElem = false;
-
- bool hasFromMongosElem = false;
- bool hasNeedsMergeElem = false;
-
- // Parse optional parameters.
- for (auto&& elem : cmdObj) {
- auto fieldName = elem.fieldNameStringData();
-
- if (QueryRequest::kUnwrappedReadPrefField == fieldName) {
- // We expect this field to be validated elsewhere.
- request.setUnwrappedReadPref(elem.embeddedObject());
- } else if (std::find(optionsParsedElseWhere.begin(),
- optionsParsedElseWhere.end(),
- fieldName) != optionsParsedElseWhere.end()) {
- // Ignore options that are parsed elsewhere.
- } else if (kCursorName == fieldName) {
- long long batchSize;
- auto status =
- CursorRequest::parseCommandCursorOptions(cmdObj, kDefaultBatchSize, &batchSize);
- if (!status.isOK()) {
- return status;
- }
-
- hasCursorElem = true;
- request.setBatchSize(batchSize);
- } else if (kCollationName == fieldName) {
- if (elem.type() != BSONType::Object) {
- return {ErrorCodes::TypeMismatch,
- str::stream() << kCollationName << " must be an object, not a "
- << typeName(elem.type())};
- }
- request.setCollation(elem.embeddedObject().getOwned());
- } else if (QueryRequest::cmdOptionMaxTimeMS == fieldName) {
- auto maxTimeMs = QueryRequest::parseMaxTimeMS(elem);
- if (!maxTimeMs.isOK()) {
- return maxTimeMs.getStatus();
- }
-
- request.setMaxTimeMS(maxTimeMs.getValue());
- } else if (repl::ReadConcernArgs::kReadConcernFieldName == fieldName) {
- if (elem.type() != BSONType::Object) {
- return {ErrorCodes::TypeMismatch,
- str::stream() << repl::ReadConcernArgs::kReadConcernFieldName
- << " must be an object, not a " << typeName(elem.type())};
- }
- request.setReadConcern(elem.embeddedObject().getOwned());
- } else if (kHintName == fieldName) {
- if (BSONType::Object == elem.type()) {
- request.setHint(elem.embeddedObject());
- } else if (BSONType::String == elem.type()) {
- request.setHint(BSON("$hint" << elem.valueStringData()));
- } else {
- return Status(ErrorCodes::FailedToParse,
- str::stream()
- << kHintName
- << " must be specified as a string representing an index"
- << " name, or an object representing an index's key pattern");
- }
- } else if (kExplainName == fieldName) {
- if (elem.type() != BSONType::Bool) {
- return {ErrorCodes::TypeMismatch,
- str::stream() << kExplainName << " must be a boolean, not a "
- << typeName(elem.type())};
- }
-
- hasExplainElem = true;
- if (elem.Bool()) {
- request.setExplain(ExplainOptions::Verbosity::kQueryPlanner);
- }
- } else if (kFromMongosName == fieldName) {
- if (elem.type() != BSONType::Bool) {
- return {ErrorCodes::TypeMismatch,
- str::stream() << kFromMongosName << " must be a boolean, not a "
- << typeName(elem.type())};
- }
-
- hasFromMongosElem = true;
- request.setFromMongos(elem.Bool());
- } else if (kNeedsMergeName == fieldName) {
- if (elem.type() != BSONType::Bool) {
- return {ErrorCodes::TypeMismatch,
- str::stream() << kNeedsMergeName << " must be a boolean, not a "
- << typeName(elem.type())};
- }
-
- hasNeedsMergeElem = true;
- request.setNeedsMerge(elem.Bool());
- } else if (kAllowDiskUseName == fieldName) {
- if (storageGlobalParams.readOnly) {
- return {ErrorCodes::IllegalOperation,
- str::stream() << "The '" << kAllowDiskUseName
- << "' option is not permitted in read-only mode."};
- } else if (elem.type() != BSONType::Bool) {
- return {ErrorCodes::TypeMismatch,
- str::stream() << kAllowDiskUseName << " must be a boolean, not a "
- << typeName(elem.type())};
- }
- request.setAllowDiskUse(elem.Bool());
- } else if (kExchangeName == fieldName) {
- try {
- IDLParserErrorContext ctx("internalExchange");
- request.setExchangeSpec(ExchangeSpec::parse(ctx, elem.Obj()));
- } catch (const DBException& ex) {
- return ex.toStatus();
- }
- } else if (bypassDocumentValidationCommandOption() == fieldName) {
- request.setBypassDocumentValidation(elem.trueValue());
- } else if (kRequestReshardingResumeToken == fieldName) {
- if (elem.type() != BSONType::Bool) {
- return {ErrorCodes::TypeMismatch,
- str::stream()
- << fieldName << "must be a boolean, not a " << typeName(elem.type())};
- }
-
- request.setRequestReshardingResumeToken(elem.Bool());
-
- if (request.getRequestReshardingResumeToken() &&
- !request.getNamespaceString().isOplog()) {
- return {ErrorCodes::FailedToParse,
- str::stream()
- << fieldName << " must only be set for the oplog namespace, not "
- << request.getNamespaceString()};
- }
- } else if (WriteConcernOptions::kWriteConcernField == fieldName) {
- if (elem.type() != BSONType::Object) {
- return {ErrorCodes::TypeMismatch,
- str::stream()
- << fieldName << " must be an object, not a " << typeName(elem.type())};
- }
-
- auto writeConcern = uassertStatusOK(WriteConcernOptions::parse(elem.embeddedObject()));
- request.setWriteConcern(writeConcern);
- } else if (kLegacyRuntimeConstantsName == fieldName) {
- // TODO SERVER-46384: Remove 'runtimeConstants' in 4.7 since it is redundant with 'let'
- try {
- IDLParserErrorContext ctx("internalLegacyRuntimeConstants");
- request.setLegacyRuntimeConstants(LegacyRuntimeConstants::parse(ctx, elem.Obj()));
- } catch (const DBException& ex) {
- return ex.toStatus();
- }
- } else if (kLetName == fieldName) {
- if (elem.type() != BSONType::Object)
- return {ErrorCodes::TypeMismatch,
- str::stream()
- << fieldName << " must be an object, not a " << typeName(elem.type())};
- auto bob = BSONObjBuilder{request.getLetParameters()};
- bob.appendElementsUnique(elem.embeddedObject());
- request._letParameters = bob.obj();
- } else if (kCollectionUUIDName == fieldName) {
- auto collectionUUIDSW = UUID::parse(elem);
- if (!collectionUUIDSW.isOK()) {
- return collectionUUIDSW.getStatus();
- }
-
- request.setCollectionUUID(collectionUUIDSW.getValue());
- } else if (fieldName == kUse44SortKeysName) {
- if (elem.type() != BSONType::Bool) {
- return {ErrorCodes::TypeMismatch,
- str::stream() << kUse44SortKeysName << " must be a boolean, not a "
- << typeName(elem.type())};
- }
- // TODO SERVER-47065: A 4.7+ node still has to accept the 'use44SortKeys' field, since
- // it could be included in a command sent from a 4.4 mongos or 4.4 mongod. When 5.0
- // becomes last-lts, this code to tolerate the 'use44SortKeys' field can be deleted.
- } else if (fieldName == "useNewUpsert"_sd) {
- // TODO SERVER-46751: we must retain the ability to ingest the 'useNewUpsert' field for
- // 4.7+ upgrade purposes, since a 4.4 mongoS will always send {useNewUpsert:true} to the
- // shards. We do nothing with it because useNewUpsert will be automatically used in 4.7+
- // when appropriate. Remove this final vestige of useNewUpsert when 5.0 becomes
- // last-lts.
- } else if (fieldName == kIsMapReduceCommandName) {
- if (elem.type() != BSONType::Bool) {
- return {ErrorCodes::TypeMismatch,
- str::stream() << kIsMapReduceCommandName << " must be a boolean, not a "
- << typeName(elem.type())};
- }
- request.setIsMapReduceCommand(elem.boolean());
- } else if (isMongocryptdArgument(fieldName)) {
- return {ErrorCodes::FailedToParse,
- str::stream() << "unrecognized field '" << elem.fieldName()
- << "'. This command may be meant for a mongocryptd process."};
- } else if (!isGenericArgument(fieldName)) {
- return {ErrorCodes::FailedToParse,
- str::stream() << "unrecognized field '" << elem.fieldName() << "'"};
- }
- }
-
- if (explainVerbosity) {
- if (hasExplainElem) {
- return {
- ErrorCodes::FailedToParse,
- str::stream() << "The '" << kExplainName
- << "' option is illegal when a explain verbosity is also provided"};
- }
-
- request.setExplain(explainVerbosity);
- }
-
- // 'hasExplainElem' implies an aggregate command-level explain option, which does not require
- // a cursor argument.
- if (!hasCursorElem && !hasExplainElem) {
- return {ErrorCodes::FailedToParse,
- str::stream()
- << "The '" << kCursorName
- << "' option is required, except for aggregate with the explain argument"};
- }
-
- if (request.getExplain() && cmdObj[WriteConcernOptions::kWriteConcernField]) {
- return {ErrorCodes::FailedToParse,
- str::stream() << "Aggregation explain does not support the'"
- << WriteConcernOptions::kWriteConcernField << "' option"};
- }
-
- if (hasNeedsMergeElem && !hasFromMongosElem) {
- return {ErrorCodes::FailedToParse,
- str::stream() << "Cannot specify '" << kNeedsMergeName << "' without '"
- << kFromMongosName << "'"};
- }
-
- return request;
-} // namespace mongo
-
-NamespaceString AggregationRequest::parseNs(const std::string& dbname, const BSONObj& cmdObj) {
- auto firstElement = cmdObj.firstElement();
-
- if (firstElement.isNumber()) {
- uassert(ErrorCodes::FailedToParse,
- str::stream() << "Invalid command format: the '"
- << firstElement.fieldNameStringData()
- << "' field must specify a collection name or 1",
- firstElement.number() == 1);
- return NamespaceString::makeCollectionlessAggregateNSS(dbname);
- } else {
- uassert(ErrorCodes::TypeMismatch,
- str::stream() << "collection name has invalid type: "
- << typeName(firstElement.type()),
- firstElement.type() == BSONType::String);
-
- const NamespaceString nss(dbname, firstElement.valueStringData());
-
- uassert(ErrorCodes::InvalidNamespace,
- str::stream() << "Invalid namespace specified '" << nss.ns() << "'",
- nss.isValid() && !nss.isCollectionlessAggregateNS());
-
- return nss;
- }
-}
-
-Document AggregationRequest::serializeToCommandObj() const {
- return Document{
- {kCommandName, (_nss.isCollectionlessAggregateNS() ? Value(1) : Value(_nss.coll()))},
- {kPipelineName, _pipeline},
- // Only serialize booleans if different than their default.
- {kAllowDiskUseName, _allowDiskUse ? Value(true) : Value()},
- {kFromMongosName, _fromMongos ? Value(true) : Value()},
- {kNeedsMergeName, _needsMerge ? Value(true) : Value()},
- {bypassDocumentValidationCommandOption(),
- _bypassDocumentValidation ? Value(true) : Value()},
- {kRequestReshardingResumeToken, _requestReshardingResumeToken ? Value(true) : Value()},
- // Only serialize a collation if one was specified.
- {kCollationName, _collation.isEmpty() ? Value() : Value(_collation)},
- // Only serialize batchSize if not an explain, otherwise serialize an empty cursor object.
- {kCursorName,
- _explainMode ? Value(Document()) : Value(Document{{kBatchSizeName, _batchSize}})},
- // Only serialize a hint if one was specified.
- {kHintName, _hint.isEmpty() ? Value() : Value(_hint)},
- // Only serialize readConcern if specified.
- {repl::ReadConcernArgs::kReadConcernFieldName,
- _readConcern.isEmpty() ? Value() : Value(_readConcern)},
- // Only serialize the unwrapped read preference if specified.
- {QueryRequest::kUnwrappedReadPrefField,
- _unwrappedReadPref.isEmpty() ? Value() : Value(_unwrappedReadPref)},
- // Only serialize maxTimeMs if specified.
- {QueryRequest::cmdOptionMaxTimeMS,
- _maxTimeMS == 0 ? Value() : Value(static_cast<int>(_maxTimeMS))},
- {kExchangeName, _exchangeSpec ? Value(_exchangeSpec->toBSON()) : Value()},
- {WriteConcernOptions::kWriteConcernField,
- _writeConcern ? Value(_writeConcern->toBSON()) : Value()},
- // Only serialize runtime constants if any were specified.
- {kLegacyRuntimeConstantsName,
- _legacyRuntimeConstants ? Value(_legacyRuntimeConstants->toBSON()) : Value()},
- {kIsMapReduceCommandName, _isMapReduceCommand ? Value(true) : Value()},
- {kLetName, !_letParameters.isEmpty() ? Value(_letParameters) : Value()},
- // Only serialize collection UUID if one was specified.
- {kCollectionUUIDName, _collectionUUID ? Value(*_collectionUUID) : Value()},
- };
-}
-} // namespace mongo
diff --git a/src/mongo/db/pipeline/aggregation_request.h b/src/mongo/db/pipeline/aggregation_request.h
deleted file mode 100644
index 1dfab801323..00000000000
--- a/src/mongo/db/pipeline/aggregation_request.h
+++ /dev/null
@@ -1,381 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include <boost/optional.hpp>
-#include <vector>
-
-#include "mongo/bson/bsonelement.h"
-#include "mongo/bson/bsonobj.h"
-#include "mongo/db/namespace_string.h"
-#include "mongo/db/pipeline/exchange_spec_gen.h"
-#include "mongo/db/pipeline/legacy_runtime_constants_gen.h"
-#include "mongo/db/query/explain_options.h"
-#include "mongo/db/write_concern_options.h"
-
-namespace mongo {
-
-template <typename T>
-class StatusWith;
-class Document;
-
-/**
- * Represents the user-supplied options to the aggregate command.
- */
-class AggregationRequest {
-public:
- static constexpr StringData kCommandName = "aggregate"_sd;
- static constexpr StringData kCursorName = "cursor"_sd;
- static constexpr StringData kBatchSizeName = "batchSize"_sd;
- static constexpr StringData kFromMongosName = "fromMongos"_sd;
- static constexpr StringData kNeedsMergeName = "needsMerge"_sd;
- static constexpr StringData kPipelineName = "pipeline"_sd;
- static constexpr StringData kCollationName = "collation"_sd;
- static constexpr StringData kExplainName = "explain"_sd;
- static constexpr StringData kAllowDiskUseName = "allowDiskUse"_sd;
- static constexpr StringData kHintName = "hint"_sd;
- static constexpr StringData kExchangeName = "exchange"_sd;
- static constexpr StringData kLegacyRuntimeConstantsName = "runtimeConstants"_sd;
- static constexpr StringData kUse44SortKeysName = "use44SortKeys"_sd;
- static constexpr StringData kIsMapReduceCommandName = "isMapReduceCommand"_sd;
- static constexpr StringData kLetName = "let"_sd;
- static constexpr StringData kCollectionUUIDName = "collectionUUID"_sd;
- static constexpr StringData kRequestReshardingResumeToken = "$_requestReshardingResumeToken"_sd;
-
- static constexpr long long kDefaultBatchSize = 101;
-
- /**
- * Parse an aggregation pipeline definition from 'pipelineElem'. Returns a non-OK status if
- * pipeline is not an array or if any of the array elements are not objects.
- */
- static StatusWith<std::vector<BSONObj>> parsePipelineFromBSON(BSONElement pipelineElem) {
- std::vector<BSONObj> pipeline;
- if (pipelineElem.eoo() || pipelineElem.type() != BSONType::Array) {
- return {ErrorCodes::TypeMismatch, "'pipeline' option must be specified as an array"};
- }
-
- for (auto elem : pipelineElem.Obj()) {
- if (elem.type() != BSONType::Object) {
- return {ErrorCodes::TypeMismatch,
- "Each element of the 'pipeline' array must be an object"};
- }
- pipeline.push_back(elem.embeddedObject().getOwned());
- }
-
- return pipeline;
- }
-
-
- /**
- * Create a new instance of AggregationRequest by parsing the raw command object. Returns a
- * non-OK status if a required field was missing, if there was an unrecognized field name or if
- * there was a bad value for one of the fields.
- *
- * If we are parsing a request for an explained aggregation with an explain verbosity provided,
- * then 'explainVerbosity' contains this information. In this case, 'cmdObj' may not itself
- * contain the explain specifier. Otherwise, 'explainVerbosity' should be boost::none.
- */
- static StatusWith<AggregationRequest> parseFromBSON(
- NamespaceString nss,
- const BSONObj& cmdObj,
- boost::optional<ExplainOptions::Verbosity> explainVerbosity = boost::none);
-
- /**
- * Convenience overload which constructs the request's NamespaceString from the given database
- * name and command object.
- */
- static StatusWith<AggregationRequest> parseFromBSON(
- const std::string& dbName,
- const BSONObj& cmdObj,
- boost::optional<ExplainOptions::Verbosity> explainVerbosity = boost::none);
-
- /*
- * The first field in 'cmdObj' must be a string representing a valid collection name, or the
- * number 1. In the latter case, returns a reserved namespace that does not represent a user
- * collection. See 'NamespaceString::makeCollectionlessAggregateNSS()'.
- */
- static NamespaceString parseNs(const std::string& dbname, const BSONObj& cmdObj);
-
- /**
- * Constructs an AggregationRequest over the given namespace with the given pipeline. All
- * options aside from the pipeline assume their default values.
- */
- AggregationRequest(NamespaceString nss, std::vector<BSONObj> pipeline)
- : _nss(std::move(nss)), _pipeline(std::move(pipeline)), _batchSize(kDefaultBatchSize) {}
-
- /**
- * Serializes the options to a Document. Note that this serialization includes the original
- * pipeline object, as specified. Callers will likely want to override this field with a
- * serialization of a parsed and optimized Pipeline object.
- *
- * The explain option is not serialized. Since the explain command format is {explain:
- * {aggregate: ...}, ...}, explain options are not part of the aggregate command object.
- */
- Document serializeToCommandObj() const;
-
- //
- // Getters.
- //
-
- long long getBatchSize() const {
- return _batchSize;
- }
-
- const NamespaceString& getNamespaceString() const {
- return _nss;
- }
-
- /**
- * An unparsed version of the pipeline. All BSONObjs are owned.
- */
- const std::vector<BSONObj>& getPipeline() const {
- return _pipeline;
- }
-
- /**
- * Returns true if this request originated from a mongoS.
- */
- bool isFromMongos() const {
- return _fromMongos;
- }
-
- /**
- * Returns true if this request represents the shards part of a split pipeline, and should
- * produce mergeable output.
- */
- bool needsMerge() const {
- return _needsMerge;
- }
-
- bool shouldAllowDiskUse() const {
- return _allowDiskUse;
- }
-
- bool shouldBypassDocumentValidation() const {
- return _bypassDocumentValidation;
- }
-
- bool getRequestReshardingResumeToken() const {
- return _requestReshardingResumeToken;
- }
-
- /**
- * Returns an empty object if no collation was specified.
- */
- BSONObj getCollation() const {
- return _collation;
- }
-
- BSONObj getHint() const {
- return _hint;
- }
-
- boost::optional<ExplainOptions::Verbosity> getExplain() const {
- return _explainMode;
- }
-
- unsigned int getMaxTimeMS() const {
- return _maxTimeMS;
- }
-
- const BSONObj& getReadConcern() const {
- return _readConcern;
- }
-
- const BSONObj& getUnwrappedReadPref() const {
- return _unwrappedReadPref;
- }
-
- const auto& getExchangeSpec() const {
- return _exchangeSpec;
- }
-
- boost::optional<WriteConcernOptions> getWriteConcern() const {
- return _writeConcern;
- }
-
- const auto& getLegacyRuntimeConstants() const {
- return _legacyRuntimeConstants;
- }
-
- const auto& getLetParameters() const {
- return _letParameters;
- }
-
- bool getIsMapReduceCommand() const {
- return _isMapReduceCommand;
- }
-
- const auto& getCollectionUUID() const {
- return _collectionUUID;
- }
-
- //
- // Setters for optional fields.
- //
-
- /**
- * Negative batchSize is illegal but batchSize of 0 is allowed.
- */
- void setBatchSize(long long batchSize) {
- uassert(40203, "batchSize must be non-negative", batchSize >= 0);
- _batchSize = batchSize;
- }
-
- void setCollation(BSONObj collation) {
- _collation = collation.getOwned();
- }
-
- void setHint(BSONObj hint) {
- _hint = hint.getOwned();
- }
-
- void setExplain(boost::optional<ExplainOptions::Verbosity> verbosity) {
- _explainMode = verbosity;
- }
-
- void setAllowDiskUse(bool allowDiskUse) {
- _allowDiskUse = allowDiskUse;
- }
-
- void setFromMongos(bool isFromMongos) {
- _fromMongos = isFromMongos;
- }
-
- void setNeedsMerge(bool needsMerge) {
- _needsMerge = needsMerge;
- }
-
- void setBypassDocumentValidation(bool shouldBypassDocumentValidation) {
- _bypassDocumentValidation = shouldBypassDocumentValidation;
- }
-
- void setRequestReshardingResumeToken(bool requestReshardingResumeToken) {
- _requestReshardingResumeToken = requestReshardingResumeToken;
- }
-
- void setMaxTimeMS(unsigned int maxTimeMS) {
- _maxTimeMS = maxTimeMS;
- }
-
- void setReadConcern(BSONObj readConcern) {
- _readConcern = readConcern.getOwned();
- }
-
- void setUnwrappedReadPref(BSONObj unwrappedReadPref) {
- _unwrappedReadPref = unwrappedReadPref.getOwned();
- }
-
- void setExchangeSpec(ExchangeSpec spec) {
- _exchangeSpec = std::move(spec);
- }
-
- void setWriteConcern(WriteConcernOptions writeConcern) {
- _writeConcern = writeConcern;
- }
-
- void setLegacyRuntimeConstants(LegacyRuntimeConstants runtimeConstants) {
- _legacyRuntimeConstants = std::move(runtimeConstants);
- }
-
- void setLetParameters(BSONObj letParameters) {
- _letParameters = letParameters.getOwned();
- }
-
- void setIsMapReduceCommand(bool isMapReduce) {
- _isMapReduceCommand = isMapReduce;
- }
-
- void setCollectionUUID(UUID collectionUUID) {
- _collectionUUID = std::move(collectionUUID);
- }
-
-private:
- // Required fields.
- const NamespaceString _nss;
-
- // An unparsed version of the pipeline.
- const std::vector<BSONObj> _pipeline;
-
- long long _batchSize;
-
- // Optional fields.
-
- // An owned copy of the user-specified collation object, or an empty object if no collation was
- // specified.
- BSONObj _collation;
-
- // The hint provided, if any. If the hint was by index key pattern, the value of '_hint' is
- // the key pattern hinted. If the hint was by index name, the value of '_hint' is
- // {$hint: <String>}, where <String> is the index name hinted.
- BSONObj _hint;
-
- BSONObj _readConcern;
-
- // The unwrapped readPreference object, if one was given to us by the mongos command processor.
- // This object will be empty when no readPreference is specified or if the request does not
- // originate from mongos.
- BSONObj _unwrappedReadPref;
-
- // The explain mode to use, or boost::none if this is not a request for an aggregation explain.
- boost::optional<ExplainOptions::Verbosity> _explainMode;
-
- bool _allowDiskUse = false;
- bool _fromMongos = false;
- bool _needsMerge = false;
- bool _bypassDocumentValidation = false;
- bool _requestReshardingResumeToken = false;
-
- // A user-specified maxTimeMS limit, or a value of '0' if not specified.
- unsigned int _maxTimeMS = 0;
-
- // An optional exchange specification for this request. If set it means that the request
- // represents a producer running as a part of the exchange machinery.
- // This is an internal option; we do not expect it to be set on requests from users or drivers.
- boost::optional<ExchangeSpec> _exchangeSpec;
-
- // The explicit writeConcern for the operation or boost::none if the user did not specifiy one.
- boost::optional<WriteConcernOptions> _writeConcern;
-
- // A document containing runtime constants; i.e. values that do not change once computed (e.g.
- // $$NOW).
- boost::optional<LegacyRuntimeConstants> _legacyRuntimeConstants;
-
- // The expected UUID of the namespace the aggregation executes on.
- boost::optional<UUID> _collectionUUID;
-
- // A document containing user-specified let parameter constants; i.e. values that do not change
- // once computed.
- BSONObj _letParameters;
-
- // True when an aggregation was invoked by the MapReduce command.
- bool _isMapReduceCommand = false;
-};
-} // namespace mongo
diff --git a/src/mongo/db/pipeline/aggregation_request_helper.cpp b/src/mongo/db/pipeline/aggregation_request_helper.cpp
new file mode 100644
index 00000000000..80300abb773
--- /dev/null
+++ b/src/mongo/db/pipeline/aggregation_request_helper.cpp
@@ -0,0 +1,237 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/pipeline/aggregation_request_helper.h"
+#include "mongo/base/error_codes.h"
+#include "mongo/base/status_with.h"
+#include "mongo/base/string_data.h"
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/catalog/document_validation.h"
+#include "mongo/db/commands.h"
+#include "mongo/db/exec/document_value/document.h"
+#include "mongo/db/exec/document_value/value.h"
+#include "mongo/db/query/cursor_request.h"
+#include "mongo/db/query/query_request.h"
+#include "mongo/db/repl/read_concern_args.h"
+#include "mongo/db/storage/storage_options.h"
+#include "mongo/idl/command_generic_argument.h"
+#include "mongo/platform/basic.h"
+
+namespace mongo {
+namespace aggregation_request_helper {
+
+StatusWith<AggregateCommand> parseFromBSON(
+ const std::string& dbName,
+ const BSONObj& cmdObj,
+ boost::optional<ExplainOptions::Verbosity> explainVerbosity) {
+ return parseFromBSON(parseNs(dbName, cmdObj), cmdObj, explainVerbosity);
+}
+
+StatusWith<AggregateCommand> parseFromBSON(
+ NamespaceString nss,
+ const BSONObj& cmdObj,
+ boost::optional<ExplainOptions::Verbosity> explainVerbosity) {
+
+ // if the command object lacks field 'aggregate' or '$db', we will use the namespace in 'nss'.
+ bool cmdObjChanged = false;
+ auto cmdObjBob = BSONObjBuilder{BSON(AggregateCommand::kCommandName << nss.coll())};
+ if (!cmdObj.hasField(AggregateCommand::kCommandName) ||
+ !cmdObj.hasField(AggregateCommand::kDbNameFieldName)) {
+ cmdObjBob.append("$db", nss.db());
+ cmdObjBob.appendElementsUnique(cmdObj);
+ cmdObjChanged = true;
+ }
+
+ AggregateCommand request(nss);
+ try {
+ request = AggregateCommand::parse(IDLParserErrorContext("aggregate"),
+ cmdObjChanged ? cmdObjBob.obj() : cmdObj);
+ } catch (const AssertionException&) {
+ return exceptionToStatus();
+ }
+
+ if (explainVerbosity) {
+ if (cmdObj.hasField(AggregateCommand::kExplainFieldName)) {
+ return {
+ ErrorCodes::FailedToParse,
+ str::stream() << "The '" << AggregateCommand::kExplainFieldName
+ << "' option is illegal when a explain verbosity is also provided"};
+ }
+
+ request.setExplain(explainVerbosity);
+ }
+
+ auto status = validate(cmdObj, explainVerbosity);
+ if (!status.isOK()) {
+ return status;
+ }
+
+ return request;
+}
+
+NamespaceString parseNs(const std::string& dbname, const BSONObj& cmdObj) {
+ auto firstElement = cmdObj.firstElement();
+
+ if (firstElement.isNumber()) {
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << "Invalid command format: the '"
+ << firstElement.fieldNameStringData()
+ << "' field must specify a collection name or 1",
+ firstElement.number() == 1);
+ return NamespaceString::makeCollectionlessAggregateNSS(dbname);
+ } else {
+ uassert(ErrorCodes::TypeMismatch,
+ str::stream() << "collection name has invalid type: "
+ << typeName(firstElement.type()),
+ firstElement.type() == BSONType::String);
+
+ const NamespaceString nss(dbname, firstElement.valueStringData());
+
+ uassert(ErrorCodes::InvalidNamespace,
+ str::stream() << "Invalid namespace specified '" << nss.ns() << "'",
+ nss.isValid() && !nss.isCollectionlessAggregateNS());
+
+ return nss;
+ }
+}
+
+BSONObj serializeToCommandObj(const AggregateCommand& request) {
+ return request.toBSON(BSONObj());
+}
+
+Document serializeToCommandDoc(const AggregateCommand& request) {
+ return Document(request.toBSON(BSONObj()).getOwned());
+}
+
+Status validate(const BSONObj& cmdObj,
+ boost::optional<ExplainOptions::Verbosity> explainVerbosity) {
+ bool hasAllowDiskUseElem = cmdObj.hasField(AggregateCommand::kAllowDiskUseFieldName);
+ bool hasCursorElem = cmdObj.hasField(AggregateCommand::kBatchSizeFieldName);
+ bool hasExplainElem = cmdObj.hasField(AggregateCommand::kExplainFieldName);
+ bool hasExplain =
+ explainVerbosity || (hasExplainElem && cmdObj[AggregateCommand::kExplainFieldName].Bool());
+ bool hasFromMongosElem = cmdObj.hasField(AggregateCommand::kFromMongosFieldName);
+ bool hasNeedsMergeElem = cmdObj.hasField(AggregateCommand::kNeedsMergeFieldName);
+
+ // 'hasExplainElem' implies an aggregate command-level explain option, which does not require
+ // a cursor argument.
+ if (!hasCursorElem && !hasExplainElem) {
+ return {ErrorCodes::FailedToParse,
+ str::stream()
+ << "The '" << AggregateCommand::kBatchSizeFieldName
+ << "' option is required, except for aggregate with the explain argument"};
+ }
+
+ if (hasExplain && cmdObj[WriteConcernOptions::kWriteConcernField]) {
+ return {ErrorCodes::FailedToParse,
+ str::stream() << "Aggregation explain does not support the'"
+ << WriteConcernOptions::kWriteConcernField << "' option"};
+ }
+
+ if (hasNeedsMergeElem && !hasFromMongosElem) {
+ return {ErrorCodes::FailedToParse,
+ str::stream() << "Cannot specify '" << AggregateCommand::kNeedsMergeFieldName
+ << "' without '" << AggregateCommand::kFromMongosFieldName << "'"};
+ }
+
+ if (hasAllowDiskUseElem && storageGlobalParams.readOnly) {
+ return {ErrorCodes::IllegalOperation,
+ str::stream() << "The '" << AggregateCommand::kAllowDiskUseFieldName
+ << "' option is not permitted in read-only mode."};
+ }
+
+ return Status::OK();
+}
+} // namespace aggregation_request_helper
+
+// Custom serializers/deserializers for AggregateCommand.
+
+long long parseBatchSizeFromBSON(const BSONElement& cursorElem) {
+ long long batchSize = 101;
+
+ if (cursorElem.eoo()) {
+ return batchSize;
+ }
+
+ uassert(ErrorCodes::TypeMismatch,
+ "cursor field must be missing or an object",
+ cursorElem.type() == mongo::Object);
+
+ BSONObj cursor = cursorElem.embeddedObject();
+ BSONElement batchSizeElem = cursor[aggregation_request_helper::kBatchSizeField];
+
+ const int expectedNumberOfCursorFields = batchSizeElem.eoo() ? 0 : 1;
+ uassert(ErrorCodes::BadValue,
+ "cursor object can't contain fields other than batchSize",
+ cursor.nFields() == expectedNumberOfCursorFields);
+
+ if (batchSizeElem.eoo()) {
+ return batchSize;
+ }
+
+ uassert(
+ ErrorCodes::TypeMismatch, "cursor.batchSize must be a number", batchSizeElem.isNumber());
+ batchSize = batchSizeElem.numberLong();
+
+ return batchSize;
+}
+
+boost::optional<mongo::ExplainOptions::Verbosity> parseExplainModeFromBSON(
+ const BSONElement& explainElem) {
+ uassert(ErrorCodes::TypeMismatch,
+ "explain must be a boolean",
+ explainElem.type() == BSONType::Bool);
+
+ if (explainElem.Bool()) {
+ return ExplainOptions::Verbosity::kQueryPlanner;
+ }
+
+ return boost::none;
+}
+
+void serializeExplainToBSON(const mongo::ExplainOptions::Verbosity& explain,
+ StringData fieldName,
+ BSONObjBuilder* builder) {
+ // Note that we do not serialize 'explain' field to the command object. This serializer only
+ // serializes an empty cursor object for field 'cursor' when it is an explain command.
+ builder->append(AggregateCommand::kBatchSizeFieldName, BSONObj());
+
+ return;
+}
+
+void serializeBatchSizeToBSON(const std::int64_t& batchSize,
+ StringData fieldName,
+ BSONObjBuilder* builder) {
+ if (!builder->hasField(fieldName)) {
+ builder->append(fieldName, BSON(aggregation_request_helper::kBatchSizeField << batchSize));
+ }
+
+ return;
+}
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/aggregation_request_helper.h b/src/mongo/db/pipeline/aggregation_request_helper.h
new file mode 100644
index 00000000000..d3e61857a4b
--- /dev/null
+++ b/src/mongo/db/pipeline/aggregation_request_helper.h
@@ -0,0 +1,144 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <boost/optional.hpp>
+#include <vector>
+
+#include "mongo/bson/bsonelement.h"
+#include "mongo/bson/bsonobj.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/pipeline/aggregate_command_gen.h"
+#include "mongo/db/pipeline/exchange_spec_gen.h"
+#include "mongo/db/pipeline/legacy_runtime_constants_gen.h"
+#include "mongo/db/query/explain_options.h"
+#include "mongo/db/write_concern_options.h"
+
+namespace mongo {
+
+template <typename T>
+class StatusWith;
+class Document;
+class AggregateCommand;
+
+namespace aggregation_request_helper {
+
+/**
+ * Helpers to serialize/deserialize AggregateCommand.
+ */
+static constexpr StringData kBatchSizeField = "batchSize"_sd;
+static constexpr long long kDefaultBatchSize = 101;
+
+/**
+ * Create a new instance of AggregateCommand by parsing the raw command object. Returns a
+ * non-OK status if a required field was missing, if there was an unrecognized field name or if
+ * there was a bad value for one of the fields.
+ *
+ * If we are parsing a request for an explained aggregation with an explain verbosity provided,
+ * then 'explainVerbosity' contains this information. In this case, 'cmdObj' may not itself
+ * contain the explain specifier. Otherwise, 'explainVerbosity' should be boost::none.
+ */
+StatusWith<AggregateCommand> parseFromBSON(
+ NamespaceString nss,
+ const BSONObj& cmdObj,
+ boost::optional<ExplainOptions::Verbosity> explainVerbosity = boost::none);
+
+/**
+ * Convenience overload which constructs the request's NamespaceString from the given database
+ * name and command object.
+ */
+StatusWith<AggregateCommand> parseFromBSON(
+ const std::string& dbName,
+ const BSONObj& cmdObj,
+ boost::optional<ExplainOptions::Verbosity> explainVerbosity = boost::none);
+
+/*
+ * The first field in 'cmdObj' must be a string representing a valid collection name, or the
+ * number 1. In the latter case, returns a reserved namespace that does not represent a user
+ * collection. See 'NamespaceString::makeCollectionlessAggregateNSS()'.
+ */
+NamespaceString parseNs(const std::string& dbname, const BSONObj& cmdObj);
+
+/**
+ * Serializes the options to a Document. Note that this serialization includes the original
+ * pipeline object, as specified. Callers will likely want to override this field with a
+ * serialization of a parsed and optimized Pipeline object.
+ *
+ * The explain option is not serialized. The preferred way to send an explain is with the explain
+ * command, like: {explain: {aggregate: ...}, ...}, explain options are not part of the aggregate
+ * command object.
+ */
+Document serializeToCommandDoc(const AggregateCommand& request);
+
+BSONObj serializeToCommandObj(const AggregateCommand& request);
+
+/**
+ * Validate the aggregate command object.
+ */
+Status validate(const BSONObj& cmdObj, boost::optional<ExplainOptions::Verbosity> explainVerbosity);
+} // namespace aggregation_request_helper
+
+/**
+ * Custom serializers/deserializers for AggregateCommand.
+ */
+
+long long parseBatchSizeFromBSON(const BSONElement& cursorElem);
+
+boost::optional<mongo::ExplainOptions::Verbosity> parseExplainModeFromBSON(
+ const BSONElement& explainElem);
+
+void serializeExplainToBSON(const mongo::ExplainOptions::Verbosity& explain,
+ StringData fieldName,
+ BSONObjBuilder* builder);
+
+void serializeBatchSizeToBSON(const std::int64_t& batchSize,
+ StringData fieldName,
+ BSONObjBuilder* builder);
+
+/**
+ * Parse an aggregation pipeline definition from 'pipelineElem'.
+ */
+static std::vector<BSONObj> parsePipelineFromBSON(const BSONElement& pipelineElem) {
+ std::vector<BSONObj> pipeline;
+
+ uassert(ErrorCodes::TypeMismatch,
+ "'pipeline' option must be specified as an array",
+ !pipelineElem.eoo() && pipelineElem.type() == BSONType::Array);
+
+ for (auto elem : pipelineElem.Obj()) {
+ uassert(ErrorCodes::TypeMismatch,
+ "Each element of the 'pipeline' array must be an object",
+ elem.type() == BSONType::Object);
+ pipeline.push_back(elem.embeddedObject().getOwned());
+ }
+
+ return pipeline;
+}
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/aggregation_request_test.cpp b/src/mongo/db/pipeline/aggregation_request_test.cpp
index 225be1a950e..4492375d67b 100644
--- a/src/mongo/db/pipeline/aggregation_request_test.cpp
+++ b/src/mongo/db/pipeline/aggregation_request_test.cpp
@@ -29,7 +29,7 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/pipeline/aggregation_request.h"
+#include "mongo/db/pipeline/aggregation_request_helper.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/bsonobjbuilder.h"
@@ -47,8 +47,10 @@
namespace mongo {
namespace {
+static constexpr auto kBatchSizeFieldName = "batchSize"_sd;
+
const Document kDefaultCursorOptionDocument{
- {AggregationRequest::kBatchSizeName, AggregationRequest::kDefaultBatchSize}};
+ {kBatchSizeFieldName, aggregation_request_helper::kDefaultBatchSize}};
//
// Parsing
@@ -56,70 +58,77 @@ const Document kDefaultCursorOptionDocument{
TEST(AggregationRequestTest, ShouldParseAllKnownOptions) {
// Using oplog namespace so that validation of $_requestReshardingResumeToken succeeds.
- NamespaceString nss("local.oplog.rs");
+ NamespaceString nss("oplog.rs");
BSONObj inputBson = fromjson(
- "{pipeline: [{$match: {a: 'abc'}}], explain: false, allowDiskUse: true, fromMongos: true, "
+ "{aggregate: 'oplog.rs', pipeline: [{$match: {a: 'abc'}}], explain: false, allowDiskUse: "
+ "true, fromMongos: true, "
"needsMerge: true, bypassDocumentValidation: true, $_requestReshardingResumeToken: true, "
"collation: {locale: 'en_US'}, cursor: {batchSize: 10}, hint: {a: 1}, maxTimeMS: 100, "
"readConcern: {level: 'linearizable'}, $queryOptions: {$readPreference: 'nearest'}, "
- "exchange: {policy: 'roundrobin', consumers:NumberInt(2)}, isMapReduceCommand: true}");
+ "exchange: {policy: 'roundrobin', consumers:NumberInt(2)}, isMapReduceCommand: true, $db: "
+ "'local'}");
auto uuid = UUID::gen();
BSONObjBuilder uuidBob;
- uuid.appendToBuilder(&uuidBob, AggregationRequest::kCollectionUUIDName);
+ uuid.appendToBuilder(&uuidBob, AggregateCommand::kCollectionUUIDFieldName);
inputBson = inputBson.addField(uuidBob.obj().firstElement());
- auto request = unittest::assertGet(AggregationRequest::parseFromBSON(nss, inputBson));
+ auto request = unittest::assertGet(aggregation_request_helper::parseFromBSON(nss, inputBson));
ASSERT_FALSE(request.getExplain());
- ASSERT_TRUE(request.shouldAllowDiskUse());
- ASSERT_TRUE(request.isFromMongos());
- ASSERT_TRUE(request.needsMerge());
- ASSERT_TRUE(request.shouldBypassDocumentValidation());
+ ASSERT_TRUE(request.getAllowDiskUse());
+ ASSERT_TRUE(request.getFromMongos());
+ ASSERT_TRUE(request.getNeedsMerge());
+ ASSERT_TRUE(request.getBypassDocumentValidation().value_or(false));
ASSERT_TRUE(request.getRequestReshardingResumeToken());
ASSERT_EQ(request.getBatchSize(), 10);
- ASSERT_BSONOBJ_EQ(request.getHint(), BSON("a" << 1));
- ASSERT_BSONOBJ_EQ(request.getCollation(),
+ ASSERT_BSONOBJ_EQ(request.getHint().value_or(BSONObj()), BSON("a" << 1));
+ ASSERT_BSONOBJ_EQ(request.getCollation().value_or(BSONObj()),
BSON("locale"
<< "en_US"));
- ASSERT_EQ(request.getMaxTimeMS(), 100u);
- ASSERT_BSONOBJ_EQ(request.getReadConcern(),
+ ASSERT_EQ(*request.getMaxTimeMS(), 100u);
+ ASSERT_BSONOBJ_EQ(*request.getReadConcern(),
BSON("level"
<< "linearizable"));
- ASSERT_BSONOBJ_EQ(request.getUnwrappedReadPref(),
+ ASSERT_BSONOBJ_EQ(request.getUnwrappedReadPref().value_or(BSONObj()),
BSON("$readPreference"
<< "nearest"));
- ASSERT_TRUE(request.getExchangeSpec().is_initialized());
+ ASSERT_TRUE(request.getExchange().is_initialized());
ASSERT_TRUE(request.getIsMapReduceCommand());
ASSERT_EQ(*request.getCollectionUUID(), uuid);
}
TEST(AggregationRequestTest, ShouldParseExplicitRequestReshardingResumeTokenFalseForNonOplog) {
NamespaceString nss("a.collection");
- const BSONObj inputBson =
- fromjson("{pipeline: [], $_requestReshardingResumeToken: false, cursor: {}}");
- auto request = unittest::assertGet(AggregationRequest::parseFromBSON(nss, inputBson));
+ const BSONObj inputBson = fromjson(
+ "{aggregate: 'collection', pipeline: [], $_requestReshardingResumeToken: false, cursor: "
+ "{}, $db: 'a'}");
+ auto request = unittest::assertGet(aggregation_request_helper::parseFromBSON(nss, inputBson));
ASSERT_FALSE(request.getRequestReshardingResumeToken());
}
TEST(AggregationRequestTest, ShouldParseExplicitExplainTrue) {
NamespaceString nss("a.collection");
- const BSONObj inputBson = fromjson("{pipeline: [], explain: true, cursor: {}}");
- auto request = unittest::assertGet(AggregationRequest::parseFromBSON(nss, inputBson));
+ const BSONObj inputBson =
+ fromjson("{aggregate: 'collection', pipeline: [], explain: true, cursor: {}, $db: 'a'}");
+ auto request = unittest::assertGet(aggregation_request_helper::parseFromBSON(nss, inputBson));
ASSERT_TRUE(request.getExplain());
ASSERT(*request.getExplain() == ExplainOptions::Verbosity::kQueryPlanner);
}
TEST(AggregationRequestTest, ShouldParseExplicitExplainFalseWithCursorOption) {
NamespaceString nss("a.collection");
- const BSONObj inputBson = fromjson("{pipeline: [], explain: false, cursor: {batchSize: 10}}");
- auto request = unittest::assertGet(AggregationRequest::parseFromBSON(nss, inputBson));
+ const BSONObj inputBson = fromjson(
+ "{aggregate: 'collection', pipeline: [], explain: false, cursor: {batchSize: 10}, $db: "
+ "'a'}");
+ auto request = unittest::assertGet(aggregation_request_helper::parseFromBSON(nss, inputBson));
ASSERT_FALSE(request.getExplain());
ASSERT_EQ(request.getBatchSize(), 10);
}
TEST(AggregationRequestTest, ShouldParseWithSeparateQueryPlannerExplainModeArg) {
NamespaceString nss("a.collection");
- const BSONObj inputBson = fromjson("{pipeline: [], cursor: {}}");
- auto request = unittest::assertGet(AggregationRequest::parseFromBSON(
+ const BSONObj inputBson =
+ fromjson("{aggregate: 'collection', pipeline: [], cursor: {}, $db: 'a'}");
+ auto request = unittest::assertGet(aggregation_request_helper::parseFromBSON(
nss, inputBson, ExplainOptions::Verbosity::kQueryPlanner));
ASSERT_TRUE(request.getExplain());
ASSERT(*request.getExplain() == ExplainOptions::Verbosity::kQueryPlanner);
@@ -127,9 +136,10 @@ TEST(AggregationRequestTest, ShouldParseWithSeparateQueryPlannerExplainModeArg)
TEST(AggregationRequestTest, ShouldParseWithSeparateQueryPlannerExplainModeArgAndCursorOption) {
NamespaceString nss("a.collection");
- const BSONObj inputBson = fromjson("{pipeline: [], cursor: {batchSize: 10}}");
- auto request = unittest::assertGet(
- AggregationRequest::parseFromBSON(nss, inputBson, ExplainOptions::Verbosity::kExecStats));
+ const BSONObj inputBson =
+ fromjson("{aggregate: 'collection', pipeline: [], cursor: {batchSize: 10}, $db: 'a'}");
+ auto request = unittest::assertGet(aggregation_request_helper::parseFromBSON(
+ nss, inputBson, ExplainOptions::Verbosity::kExecStats));
ASSERT_TRUE(request.getExplain());
ASSERT(*request.getExplain() == ExplainOptions::Verbosity::kExecStats);
ASSERT_EQ(request.getBatchSize(), 10);
@@ -138,12 +148,13 @@ TEST(AggregationRequestTest, ShouldParseWithSeparateQueryPlannerExplainModeArgAn
TEST(AggregationRequestTest, ShouldParseExplainFlagWithReadConcern) {
NamespaceString nss("a.collection");
// Non-local readConcern should not be allowed with the explain flag, but this is checked
- // elsewhere to avoid having to parse the readConcern in AggregationRequest.
- const BSONObj inputBson =
- fromjson("{pipeline: [], explain: true, readConcern: {level: 'majority'}}");
- auto request = unittest::assertGet(AggregationRequest::parseFromBSON(nss, inputBson));
+ // elsewhere to avoid having to parse the readConcern in AggregationCommand.
+ const BSONObj inputBson = fromjson(
+ "{aggregate: 'collection', pipeline: [], explain: true, readConcern: {level: 'majority'}, "
+ "$db: 'a'}");
+ auto request = unittest::assertGet(aggregation_request_helper::parseFromBSON(nss, inputBson));
ASSERT_TRUE(request.getExplain());
- ASSERT_BSONOBJ_EQ(request.getReadConcern(),
+ ASSERT_BSONOBJ_EQ(*request.getReadConcern(),
BSON("level"
<< "majority"));
}
@@ -154,41 +165,19 @@ TEST(AggregationRequestTest, ShouldParseExplainFlagWithReadConcern) {
TEST(AggregationRequestTest, ShouldOnlySerializeRequiredFieldsIfNoOptionalFieldsAreSpecified) {
NamespaceString nss("a.collection");
- AggregationRequest request(nss, {});
+ AggregateCommand request(nss, {});
auto expectedSerialization =
- Document{{AggregationRequest::kCommandName, nss.coll()},
- {AggregationRequest::kPipelineName, std::vector<Value>{}},
- {AggregationRequest::kCursorName, Value(kDefaultCursorOptionDocument)}};
- ASSERT_DOCUMENT_EQ(request.serializeToCommandObj(), expectedSerialization);
-}
-
-TEST(AggregationRequestTest, ShouldNotSerializeOptionalValuesIfEquivalentToDefault) {
- NamespaceString nss("a.collection");
- AggregationRequest request(nss, {});
- request.setExplain(boost::none);
- request.setAllowDiskUse(false);
- request.setFromMongos(false);
- request.setNeedsMerge(false);
- request.setBypassDocumentValidation(false);
- request.setRequestReshardingResumeToken(false);
- request.setCollation(BSONObj());
- request.setHint(BSONObj());
- request.setMaxTimeMS(0u);
- request.setUnwrappedReadPref(BSONObj());
- request.setReadConcern(BSONObj());
- request.setIsMapReduceCommand(false);
-
- auto expectedSerialization =
- Document{{AggregationRequest::kCommandName, nss.coll()},
- {AggregationRequest::kPipelineName, std::vector<Value>{}},
- {AggregationRequest::kCursorName, Value(kDefaultCursorOptionDocument)}};
- ASSERT_DOCUMENT_EQ(request.serializeToCommandObj(), expectedSerialization);
+ Document{{AggregateCommand::kCommandName, nss.coll()},
+ {AggregateCommand::kPipelineFieldName, std::vector<Value>{}},
+ {AggregateCommand::kBatchSizeFieldName, Value(kDefaultCursorOptionDocument)}};
+ ASSERT_DOCUMENT_EQ(aggregation_request_helper::serializeToCommandDoc(request),
+ expectedSerialization);
}
TEST(AggregationRequestTest, ShouldSerializeOptionalValuesIfSet) {
NamespaceString nss("a.collection");
- AggregationRequest request(nss, {});
+ AggregateCommand request(nss, {});
request.setAllowDiskUse(true);
request.setFromMongos(true);
request.setNeedsMerge(true);
@@ -210,87 +199,92 @@ TEST(AggregationRequestTest, ShouldSerializeOptionalValuesIfSet) {
request.setIsMapReduceCommand(true);
const auto letParamsObj = BSON("foo"
<< "bar");
- request.setLetParameters(letParamsObj);
+ request.setLet(letParamsObj);
auto uuid = UUID::gen();
request.setCollectionUUID(uuid);
- auto expectedSerialization =
- Document{{AggregationRequest::kCommandName, nss.coll()},
- {AggregationRequest::kPipelineName, std::vector<Value>{}},
- {AggregationRequest::kAllowDiskUseName, true},
- {AggregationRequest::kFromMongosName, true},
- {AggregationRequest::kNeedsMergeName, true},
- {bypassDocumentValidationCommandOption(), true},
- {AggregationRequest::kRequestReshardingResumeToken, true},
- {AggregationRequest::kCollationName, collationObj},
- {AggregationRequest::kCursorName,
- Value(Document({{AggregationRequest::kBatchSizeName, 10}}))},
- {AggregationRequest::kHintName, hintObj},
- {repl::ReadConcernArgs::kReadConcernFieldName, readConcernObj},
- {QueryRequest::kUnwrappedReadPrefField, readPrefObj},
- {QueryRequest::cmdOptionMaxTimeMS, 10},
- {AggregationRequest::kIsMapReduceCommandName, true},
- {AggregationRequest::kLetName, letParamsObj},
- {AggregationRequest::kCollectionUUIDName, uuid}};
- ASSERT_DOCUMENT_EQ(request.serializeToCommandObj(), expectedSerialization);
+ auto expectedSerialization = Document{
+ {AggregateCommand::kCommandName, nss.coll()},
+ {AggregateCommand::kPipelineFieldName, std::vector<Value>{}},
+ {AggregateCommand::kAllowDiskUseFieldName, true},
+ {AggregateCommand::kBatchSizeFieldName, Value(Document({{kBatchSizeFieldName, 10}}))},
+ {QueryRequest::cmdOptionMaxTimeMS, 10},
+ {AggregateCommand::kBypassDocumentValidationFieldName, true},
+ {repl::ReadConcernArgs::kReadConcernFieldName, readConcernObj},
+ {AggregateCommand::kCollationFieldName, collationObj},
+ {AggregateCommand::kHintFieldName, hintObj},
+ {AggregateCommand::kLetFieldName, letParamsObj},
+ {AggregateCommand::kNeedsMergeFieldName, true},
+ {AggregateCommand::kFromMongosFieldName, true},
+ {QueryRequest::kUnwrappedReadPrefField, readPrefObj},
+ {AggregateCommand::kRequestReshardingResumeTokenFieldName, true},
+ {AggregateCommand::kIsMapReduceCommandFieldName, true},
+ {AggregateCommand::kCollectionUUIDFieldName, uuid}};
+ ASSERT_DOCUMENT_EQ(aggregation_request_helper::serializeToCommandDoc(request),
+ expectedSerialization);
}
TEST(AggregationRequestTest, ShouldSerializeBatchSizeIfSetAndExplainFalse) {
NamespaceString nss("a.collection");
- AggregationRequest request(nss, {});
+ AggregateCommand request(nss, {});
request.setBatchSize(10);
- auto expectedSerialization =
- Document{{AggregationRequest::kCommandName, nss.coll()},
- {AggregationRequest::kPipelineName, std::vector<Value>{}},
- {AggregationRequest::kCursorName,
- Value(Document({{AggregationRequest::kBatchSizeName, 10}}))}};
- ASSERT_DOCUMENT_EQ(request.serializeToCommandObj(), expectedSerialization);
+ auto expectedSerialization = Document{
+ {AggregateCommand::kCommandName, nss.coll()},
+ {AggregateCommand::kPipelineFieldName, std::vector<Value>{}},
+ {AggregateCommand::kBatchSizeFieldName, Value(Document({{kBatchSizeFieldName, 10}}))}};
+ ASSERT_DOCUMENT_EQ(aggregation_request_helper::serializeToCommandDoc(request),
+ expectedSerialization);
}
TEST(AggregationRequestTest, ShouldSerialiseAggregateFieldToOneIfCollectionIsAggregateOneNSS) {
NamespaceString nss = NamespaceString::makeCollectionlessAggregateNSS("a");
- AggregationRequest request(nss, {});
+ AggregateCommand request(nss, {});
auto expectedSerialization =
- Document{{AggregationRequest::kCommandName, 1},
- {AggregationRequest::kPipelineName, std::vector<Value>{}},
- {AggregationRequest::kCursorName,
- Value(Document({{AggregationRequest::kBatchSizeName,
- AggregationRequest::kDefaultBatchSize}}))}};
+ Document{{AggregateCommand::kCommandName, 1},
+ {AggregateCommand::kPipelineFieldName, std::vector<Value>{}},
+ {AggregateCommand::kBatchSizeFieldName,
+ Value(Document({{aggregation_request_helper::kBatchSizeField,
+ aggregation_request_helper::kDefaultBatchSize}}))}};
- ASSERT_DOCUMENT_EQ(request.serializeToCommandObj(), expectedSerialization);
+ ASSERT_DOCUMENT_EQ(aggregation_request_helper::serializeToCommandDoc(request),
+ expectedSerialization);
}
TEST(AggregationRequestTest, ShouldSetBatchSizeToDefaultOnEmptyCursorObject) {
NamespaceString nss("a.collection");
- const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}}");
- auto request = AggregationRequest::parseFromBSON(nss, inputBson);
+ const BSONObj inputBson = fromjson(
+ "{aggregate: 'collection', pipeline: [{$match: {a: 'abc'}}], cursor: {}, $db: 'a'}");
+ auto request = aggregation_request_helper::parseFromBSON(nss, inputBson);
ASSERT_OK(request.getStatus());
- ASSERT_EQ(request.getValue().getBatchSize(), AggregationRequest::kDefaultBatchSize);
+ ASSERT_EQ(request.getValue().getBatchSize(), aggregation_request_helper::kDefaultBatchSize);
}
TEST(AggregationRequestTest, ShouldAcceptHintAsString) {
NamespaceString nss("a.collection");
- const BSONObj inputBson =
- fromjson("{pipeline: [{$match: {a: 'abc'}}], hint: 'a_1', cursor: {}}");
- auto request = AggregationRequest::parseFromBSON(nss, inputBson);
+ const BSONObj inputBson = fromjson(
+ "{aggregate: 'collection', pipeline: [{$match: {a: 'abc'}}], hint: 'a_1', cursor: {}, $db: "
+ "'a'}");
+ auto request = aggregation_request_helper::parseFromBSON(nss, inputBson);
ASSERT_OK(request.getStatus());
- ASSERT_BSONOBJ_EQ(request.getValue().getHint(),
+ ASSERT_BSONOBJ_EQ(request.getValue().getHint().value_or(BSONObj()),
BSON("$hint"
<< "a_1"));
}
TEST(AggregationRequestTest, ShouldNotSerializeBatchSizeWhenExplainSet) {
NamespaceString nss("a.collection");
- AggregationRequest request(nss, {});
+ AggregateCommand request(nss, {});
request.setBatchSize(10);
request.setExplain(ExplainOptions::Verbosity::kQueryPlanner);
- auto expectedSerialization = Document{{AggregationRequest::kCommandName, nss.coll()},
- {AggregationRequest::kPipelineName, std::vector<Value>{}},
- {AggregationRequest::kCursorName, Value(Document())}};
- ASSERT_DOCUMENT_EQ(request.serializeToCommandObj(), expectedSerialization);
+ auto expectedSerialization =
+ Document{{AggregateCommand::kCommandName, nss.coll()},
+ {AggregateCommand::kPipelineFieldName, std::vector<Value>{}},
+ {AggregateCommand::kBatchSizeFieldName, Value(Document())}};
+ ASSERT_DOCUMENT_EQ(aggregation_request_helper::serializeToCommandDoc(request),
+ expectedSerialization);
}
//
@@ -299,169 +293,196 @@ TEST(AggregationRequestTest, ShouldNotSerializeBatchSizeWhenExplainSet) {
TEST(AggregationRequestTest, ShouldRejectNonArrayPipeline) {
NamespaceString nss("a.collection");
- const BSONObj inputBson = fromjson("{pipeline: {}, cursor: {}}");
- ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
+ const BSONObj inputBson =
+ fromjson("{aggregate: 'collection', pipeline: {}, cursor: {}, $db: 'a'}");
+ ASSERT_NOT_OK(aggregation_request_helper::parseFromBSON(nss, inputBson).getStatus());
}
TEST(AggregationRequestTest, ShouldRejectPipelineArrayIfAnElementIsNotAnObject) {
NamespaceString nss("a.collection");
- BSONObj inputBson = fromjson("{pipeline: [4], cursor: {}}");
- ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
+ BSONObj inputBson = fromjson("{aggregate: 'collection', pipeline: [4], cursor: {}, $db: 'a'}");
+ ASSERT_NOT_OK(aggregation_request_helper::parseFromBSON(nss, inputBson).getStatus());
- inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}, 4], cursor: {}}");
- ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
+ inputBson = fromjson(
+ "{aggregate: 'collection', pipeline: [{$match: {a: 'abc'}}, 4], cursor: {}, $db: 'a'}");
+ ASSERT_NOT_OK(aggregation_request_helper::parseFromBSON(nss, inputBson).getStatus());
}
TEST(AggregationRequestTest, ShouldRejectNonObjectCollation) {
NamespaceString nss("a.collection");
- const BSONObj inputBson =
- fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, collation: 1}");
+ const BSONObj inputBson = fromjson(
+ "{aggregate: 'collection', pipeline: [{$match: {a: 'abc'}}], cursor: {}, collation: 1, "
+ "$db: 'a'}");
ASSERT_NOT_OK(
- AggregationRequest::parseFromBSON(NamespaceString("a.collection"), inputBson).getStatus());
+ aggregation_request_helper::parseFromBSON(NamespaceString("a.collection"), inputBson)
+ .getStatus());
}
TEST(AggregationRequestTest, ShouldRejectNonStringNonObjectHint) {
NamespaceString nss("a.collection");
- const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, hint: 1}");
+ const BSONObj inputBson = fromjson(
+ "{aggregate: 'collection', pipeline: [{$match: {a: 'abc'}}], cursor: {}, hint: 1, $db: "
+ "'a'}");
ASSERT_NOT_OK(
- AggregationRequest::parseFromBSON(NamespaceString("a.collection"), inputBson).getStatus());
+ aggregation_request_helper::parseFromBSON(NamespaceString("a.collection"), inputBson)
+ .getStatus());
}
TEST(AggregationRequestTest, ShouldRejectHintAsArray) {
NamespaceString nss("a.collection");
- const BSONObj inputBson =
- fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, hint: []}]}");
+ const BSONObj inputBson = fromjson(
+ "{aggregate: 'collection', pipeline: [{$match: {a: 'abc'}}], cursor: {}, hint: [], $db: "
+ "'a'}]}");
ASSERT_NOT_OK(
- AggregationRequest::parseFromBSON(NamespaceString("a.collection"), inputBson).getStatus());
+ aggregation_request_helper::parseFromBSON(NamespaceString("a.collection"), inputBson)
+ .getStatus());
}
TEST(AggregationRequestTest, ShouldRejectExplainIfNumber) {
NamespaceString nss("a.collection");
- const BSONObj inputBson =
- fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, explain: 1}");
- ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
+ const BSONObj inputBson = fromjson(
+ "{aggregate: 'collection', pipeline: [{$match: {a: 'abc'}}], cursor: {}, explain: 1, $db: "
+ "'a'}");
+ ASSERT_NOT_OK(aggregation_request_helper::parseFromBSON(nss, inputBson).getStatus());
}
TEST(AggregationRequestTest, ShouldRejectExplainIfObject) {
NamespaceString nss("a.collection");
- const BSONObj inputBson =
- fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, explain: {}}");
- ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
+ const BSONObj inputBson = fromjson(
+ "{aggregate: 'collection', pipeline: [{$match: {a: 'abc'}}], cursor: {}, explain: {}, $db: "
+ "'a'}");
+ ASSERT_NOT_OK(aggregation_request_helper::parseFromBSON(nss, inputBson).getStatus());
}
TEST(AggregationRequestTest, ShouldRejectNonBoolFromMongos) {
NamespaceString nss("a.collection");
- const BSONObj inputBson =
- fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, fromMongos: 1}");
- ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
+ const BSONObj inputBson = fromjson(
+ "{aggregate: 'collection', pipeline: [{$match: {a: 'abc'}}], cursor: {}, fromMongos: 1, "
+ "$db: 'a'}");
+ ASSERT_NOT_OK(aggregation_request_helper::parseFromBSON(nss, inputBson).getStatus());
}
TEST(AggregationRequestTest, ShouldRejectNonBoolNeedsMerge) {
NamespaceString nss("a.collection");
- const BSONObj inputBson =
- fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, needsMerge: 1, fromMongos: true}");
- ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
+ const BSONObj inputBson = fromjson(
+ "{aggregate: 'collection', pipeline: [{$match: {a: 'abc'}}], cursor: {}, needsMerge: 1, "
+ "fromMongos: true, $db: 'a'}");
+ ASSERT_NOT_OK(aggregation_request_helper::parseFromBSON(nss, inputBson).getStatus());
}
TEST(AggregationRequestTest, ShouldRejectNeedsMergeIfFromMongosNotPresent) {
NamespaceString nss("a.collection");
- const BSONObj inputBson =
- fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, needsMerge: true}");
- ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
+ const BSONObj inputBson = fromjson(
+ "{aggregate: 'collection', pipeline: [{$match: {a: 'abc'}}], cursor: {}, needsMerge: true, "
+ "$db: 'a'}");
+ ASSERT_NOT_OK(aggregation_request_helper::parseFromBSON(nss, inputBson).getStatus());
}
TEST(AggregationRequestTest, ShouldRejectNonBoolNeedsMerge34) {
NamespaceString nss("a.collection");
- const BSONObj inputBson =
- fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, fromRouter: 1}");
- ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
+ const BSONObj inputBson = fromjson(
+ "{aggregate: 'collection', pipeline: [{$match: {a: 'abc'}}], cursor: {}, fromRouter: 1, "
+ "$db: 'a'}");
+ ASSERT_NOT_OK(aggregation_request_helper::parseFromBSON(nss, inputBson).getStatus());
}
TEST(AggregationRequestTest, ShouldRejectNeedsMergeIfNeedsMerge34AlsoPresent) {
NamespaceString nss("a.collection");
const BSONObj inputBson = fromjson(
- "{pipeline: [{$match: {a: 'abc'}}], cursor: {}, needsMerge: true, fromMongos: true, "
- "fromRouter: true}");
- ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
+ "{aggregate: 'collection', pipeline: [{$match: {a: 'abc'}}], cursor: {}, needsMerge: true, "
+ "fromMongos: true, "
+ "fromRouter: true, $db: 'a'}");
+ ASSERT_NOT_OK(aggregation_request_helper::parseFromBSON(nss, inputBson).getStatus());
}
TEST(AggregationRequestTest, ShouldRejectFromMongosIfNeedsMerge34AlsoPresent) {
NamespaceString nss("a.collection");
const BSONObj inputBson = fromjson(
- "{pipeline: [{$match: {a: 'abc'}}], cursor: {}, fromMongos: true, fromRouter: true}");
- ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
+ "{aggregate: 'collection', pipeline: [{$match: {a: 'abc'}}], cursor: {}, fromMongos: true, "
+ "fromRouter: true, $db: 'a'}");
+ ASSERT_NOT_OK(aggregation_request_helper::parseFromBSON(nss, inputBson).getStatus());
}
TEST(AggregationRequestTest, ShouldRejectNonBoolAllowDiskUse) {
NamespaceString nss("a.collection");
- const BSONObj inputBson =
- fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, allowDiskUse: 1}");
- ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
+ const BSONObj inputBson = fromjson(
+ "{aggregate: 'collection', pipeline: [{$match: {a: 'abc'}}], cursor: {}, allowDiskUse: 1, "
+ "$db: 'a'}");
+ ASSERT_NOT_OK(aggregation_request_helper::parseFromBSON(nss, inputBson).getStatus());
}
TEST(AggregationRequestTest, ShouldRejectNonBoolIsMapReduceCommand) {
NamespaceString nss("a.collection");
- const BSONObj inputBson =
- fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, isMapReduceCommand: 1}");
- ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
+ const BSONObj inputBson = fromjson(
+ "{aggregate: 'collection', pipeline: [{$match: {a: 'abc'}}], cursor: {}, "
+ "isMapReduceCommand: 1, $db: 'a'}");
+ ASSERT_NOT_OK(aggregation_request_helper::parseFromBSON(nss, inputBson).getStatus());
}
TEST(AggregationRequestTest, ShouldRejectNoCursorNoExplain) {
NamespaceString nss("a.collection");
- const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}]}");
- ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
+ const BSONObj inputBson =
+ fromjson("{aggregate: 'collection', pipeline: [{$match: {a: 'abc'}}], $db: 'a'}");
+ ASSERT_NOT_OK(aggregation_request_helper::parseFromBSON(nss, inputBson).getStatus());
}
TEST(AggregationRequestTest, ShouldRejectExplainTrueWithSeparateExplainArg) {
NamespaceString nss("a.collection");
- const BSONObj inputBson = fromjson("{pipeline: [], explain: true}");
- ASSERT_NOT_OK(
- AggregationRequest::parseFromBSON(nss, inputBson, ExplainOptions::Verbosity::kExecStats)
- .getStatus());
+ const BSONObj inputBson =
+ fromjson("{aggregate: 'collection', pipeline: [], explain: true, $db: 'a'}");
+ ASSERT_NOT_OK(aggregation_request_helper::parseFromBSON(
+ nss, inputBson, ExplainOptions::Verbosity::kExecStats)
+ .getStatus());
}
TEST(AggregationRequestTest, ShouldRejectExplainFalseWithSeparateExplainArg) {
NamespaceString nss("a.collection");
- const BSONObj inputBson = fromjson("{pipeline: [], explain: false}");
- ASSERT_NOT_OK(
- AggregationRequest::parseFromBSON(nss, inputBson, ExplainOptions::Verbosity::kExecStats)
- .getStatus());
+ const BSONObj inputBson =
+ fromjson("{aggregate: 'collection', pipeline: [], explain: false, $db: 'a'}");
+ ASSERT_NOT_OK(aggregation_request_helper::parseFromBSON(
+ nss, inputBson, ExplainOptions::Verbosity::kExecStats)
+ .getStatus());
}
TEST(AggregationRequestTest, ShouldRejectExplainExecStatsVerbosityWithReadConcernMajority) {
NamespaceString nss("a.collection");
- const BSONObj inputBson = fromjson("{pipeline: [], readConcern: {level: 'majority'}}");
- ASSERT_NOT_OK(
- AggregationRequest::parseFromBSON(nss, inputBson, ExplainOptions::Verbosity::kExecStats)
- .getStatus());
+ const BSONObj inputBson = fromjson(
+ "{aggregate: 'collection', pipeline: [], readConcern: {level: 'majority'}, $db: 'a'}");
+ ASSERT_NOT_OK(aggregation_request_helper::parseFromBSON(
+ nss, inputBson, ExplainOptions::Verbosity::kExecStats)
+ .getStatus());
}
TEST(AggregationRequestTest, ShouldRejectExplainWithWriteConcernMajority) {
NamespaceString nss("a.collection");
- const BSONObj inputBson =
- fromjson("{pipeline: [], explain: true, writeConcern: {w: 'majority'}}");
- ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
+ const BSONObj inputBson = fromjson(
+ "{aggregate: 'collection', pipeline: [], explain: true, writeConcern: {w: 'majority'}, "
+ "$db: 'a'}");
+ ASSERT_NOT_OK(aggregation_request_helper::parseFromBSON(nss, inputBson).getStatus());
}
TEST(AggregationRequestTest, ShouldRejectExplainExecStatsVerbosityWithWriteConcernMajority) {
NamespaceString nss("a.collection");
- const BSONObj inputBson = fromjson("{pipeline: [], writeConcern: {w: 'majority'}}");
- ASSERT_NOT_OK(
- AggregationRequest::parseFromBSON(nss, inputBson, ExplainOptions::Verbosity::kExecStats)
- .getStatus());
+ const BSONObj inputBson = fromjson(
+ "{aggregate: 'collection', pipeline: [], writeConcern: {w: 'majority'}, $db: 'a'}");
+ ASSERT_NOT_OK(aggregation_request_helper::parseFromBSON(
+ nss, inputBson, ExplainOptions::Verbosity::kExecStats)
+ .getStatus());
}
TEST(AggregationRequestTest, ShouldRejectRequestReshardingResumeTokenIfNonOplogNss) {
NamespaceString nss("a.collection");
- const BSONObj inputBson = fromjson("{pipeline: [], $_requestReshardingResumeToken: true}");
- ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
+ const BSONObj inputBson = fromjson(
+ "{aggregate: 'collection', pipeline: [], $_requestReshardingResumeToken: true, $db: 'a'}");
+ ASSERT_NOT_OK(aggregation_request_helper::parseFromBSON(nss, inputBson).getStatus());
}
TEST(AggregationRequestTest, CannotParseNeedsMerge34) {
NamespaceString nss("a.collection");
- const BSONObj inputBson =
- fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, fromRouter: true}");
- ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
+ const BSONObj inputBson = fromjson(
+ "{aggregate: 'collection', pipeline: [{$match: {a: 'abc'}}], cursor: {}, fromRouter: true, "
+ "$db: 'a'}");
+ ASSERT_NOT_OK(aggregation_request_helper::parseFromBSON(nss, inputBson).getStatus());
}
TEST(AggregationRequestTest, ParseNSShouldReturnAggregateOneNSIfAggregateFieldIsOne) {
@@ -470,72 +491,81 @@ TEST(AggregationRequestTest, ParseNSShouldReturnAggregateOneNSIfAggregateFieldIs
for (auto& one : ones) {
const BSONObj inputBSON =
- fromjson(str::stream() << "{aggregate: " << one << ", pipeline: []}");
- ASSERT(AggregationRequest::parseNs("a", inputBSON).isCollectionlessAggregateNS());
+ fromjson(str::stream() << "{aggregate: " << one << ", pipeline: [], $db: 'a'}");
+ ASSERT(aggregation_request_helper::parseNs("a", inputBSON).isCollectionlessAggregateNS());
}
}
TEST(AggregationRequestTest, ParseNSShouldRejectNumericNSIfAggregateFieldIsNotOne) {
- const BSONObj inputBSON = fromjson("{aggregate: 2, pipeline: []}");
- ASSERT_THROWS_CODE(
- AggregationRequest::parseNs("a", inputBSON), AssertionException, ErrorCodes::FailedToParse);
+ const BSONObj inputBSON = fromjson("{aggregate: 2, pipeline: [], $db: 'a'}");
+ ASSERT_THROWS_CODE(aggregation_request_helper::parseNs("a", inputBSON),
+ AssertionException,
+ ErrorCodes::FailedToParse);
}
TEST(AggregationRequestTest, ParseNSShouldRejectNonStringNonNumericNS) {
- const BSONObj inputBSON = fromjson("{aggregate: {}, pipeline: []}");
- ASSERT_THROWS_CODE(
- AggregationRequest::parseNs("a", inputBSON), AssertionException, ErrorCodes::TypeMismatch);
+ const BSONObj inputBSON = fromjson("{aggregate: {}, pipeline: [], $db: 'a'}");
+ ASSERT_THROWS_CODE(aggregation_request_helper::parseNs("a", inputBSON),
+ AssertionException,
+ ErrorCodes::TypeMismatch);
}
TEST(AggregationRequestTest, ParseNSShouldRejectAggregateOneStringAsCollectionName) {
- const BSONObj inputBSON = fromjson("{aggregate: '$cmd.aggregate', pipeline: []}");
- ASSERT_THROWS_CODE(AggregationRequest::parseNs("a", inputBSON),
+ const BSONObj inputBSON = fromjson("{aggregate: '$cmd.aggregate', pipeline: [], $db: 'a'}");
+ ASSERT_THROWS_CODE(aggregation_request_helper::parseNs("a", inputBSON),
AssertionException,
ErrorCodes::InvalidNamespace);
}
TEST(AggregationRequestTest, ParseNSShouldRejectInvalidCollectionName) {
- const BSONObj inputBSON = fromjson("{aggregate: '', pipeline: []}");
- ASSERT_THROWS_CODE(AggregationRequest::parseNs("a", inputBSON),
+ const BSONObj inputBSON = fromjson("{aggregate: '', pipeline: [], $db: 'a'}");
+ ASSERT_THROWS_CODE(aggregation_request_helper::parseNs("a", inputBSON),
AssertionException,
ErrorCodes::InvalidNamespace);
}
TEST(AggregationRequestTest, ParseFromBSONOverloadsShouldProduceIdenticalRequests) {
- const BSONObj inputBSON =
- fromjson("{aggregate: 'collection', pipeline: [{$match: {}}, {$project: {}}], cursor: {}}");
+ const BSONObj inputBSON = fromjson(
+ "{aggregate: 'collection', pipeline: [{$match: {}}, {$project: {}}], cursor: {}, $db: "
+ "'a'}");
NamespaceString nss("a.collection");
- auto aggReqDBName = unittest::assertGet(AggregationRequest::parseFromBSON("a", inputBSON));
- auto aggReqNSS = unittest::assertGet(AggregationRequest::parseFromBSON(nss, inputBSON));
+ auto aggReqDBName =
+ unittest::assertGet(aggregation_request_helper::parseFromBSON("a", inputBSON));
+ auto aggReqNSS = unittest::assertGet(aggregation_request_helper::parseFromBSON(nss, inputBSON));
- ASSERT_DOCUMENT_EQ(aggReqDBName.serializeToCommandObj(), aggReqNSS.serializeToCommandObj());
+ ASSERT_DOCUMENT_EQ(aggregation_request_helper::serializeToCommandDoc(aggReqDBName),
+ aggregation_request_helper::serializeToCommandDoc(aggReqNSS));
}
TEST(AggregationRequestTest, ShouldRejectExchangeNotObject) {
NamespaceString nss("a.collection");
- const BSONObj inputBson = fromjson("{pipeline: [], exchage: '42'}");
- ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
+ const BSONObj inputBson =
+ fromjson("{aggregate: 'collection', pipeline: [], exchage: '42', $db: 'a'}");
+ ASSERT_NOT_OK(aggregation_request_helper::parseFromBSON(nss, inputBson).getStatus());
}
TEST(AggregationRequestTest, ShouldRejectExchangeInvalidSpec) {
NamespaceString nss("a.collection");
- const BSONObj inputBson = fromjson("{pipeline: [], exchage: {}}");
- ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
+ const BSONObj inputBson =
+ fromjson("{aggregate: 'collection', pipeline: [], exchage: {}, $db: 'a'}");
+ ASSERT_NOT_OK(aggregation_request_helper::parseFromBSON(nss, inputBson).getStatus());
}
TEST(AggregationRequestTest, ShouldRejectInvalidWriteConcern) {
NamespaceString nss("a.collection");
- const BSONObj inputBson =
- fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, writeConcern: 'invalid'}");
- ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
+ const BSONObj inputBson = fromjson(
+ "{aggregate: 'collection', pipeline: [{$match: {a: 'abc'}}], cursor: {}, writeConcern: "
+ "'invalid', $db: 'a'}");
+ ASSERT_NOT_OK(aggregation_request_helper::parseFromBSON(nss, inputBson).getStatus());
}
TEST(AggregationRequestTest, ShouldRejectInvalidCollectionUUID) {
NamespaceString nss("a.collection");
- const BSONObj inputBSON = fromjson("{pipeline: [{$match: {}}], collectionUUID: 2}");
- ASSERT_EQUALS(AggregationRequest::parseFromBSON(nss, inputBSON).getStatus().code(),
- ErrorCodes::InvalidUUID);
+ const BSONObj inputBSON = fromjson(
+ "{aggregate: 'collection', pipeline: [{$match: {}}], collectionUUID: 2, $db: 'a'}");
+ ASSERT_EQUALS(aggregation_request_helper::parseFromBSON(nss, inputBSON).getStatus().code(),
+ ErrorCodes::TypeMismatch);
}
//
@@ -544,9 +574,10 @@ TEST(AggregationRequestTest, ShouldRejectInvalidCollectionUUID) {
TEST(AggregationRequestTest, ShouldIgnoreQueryOptions) {
NamespaceString nss("a.collection");
- const BSONObj inputBson =
- fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, $queryOptions: {}}");
- ASSERT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
+ const BSONObj inputBson = fromjson(
+ "{aggregate: 'collection', pipeline: [{$match: {a: 'abc'}}], cursor: {}, $queryOptions: "
+ "{}, $db: 'a'}");
+ ASSERT_OK(aggregation_request_helper::parseFromBSON(nss, inputBson).getStatus());
}
} // namespace
diff --git a/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp b/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp
index 325215b6717..9d296c32dfa 100644
--- a/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp
+++ b/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp
@@ -29,6 +29,7 @@
#include "mongo/platform/basic.h"
+#include "mongo/db/pipeline/aggregation_request_helper.h"
#include "mongo/db/pipeline/sharded_agg_helpers.h"
#include "mongo/s/query/sharded_agg_test_fixture.h"
#include "mongo/s/stale_shard_version_helpers.h"
@@ -52,7 +53,7 @@ TEST_F(DispatchShardPipelineTest, DoesNotSplitPipelineIfTargetingOneShard) {
auto pipeline = Pipeline::create(
{parseStage(stages[0]), parseStage(stages[1]), parseStage(stages[2])}, expCtx());
const Document serializedCommand =
- AggregationRequest(expCtx()->ns, stages).serializeToCommandObj();
+ aggregation_request_helper::serializeToCommandDoc(AggregateCommand(expCtx()->ns, stages));
const bool hasChangeStream = false;
auto future = launchAsync([&] {
@@ -83,7 +84,7 @@ TEST_F(DispatchShardPipelineTest, DoesSplitPipelineIfMatchSpansTwoShards) {
auto pipeline = Pipeline::create(
{parseStage(stages[0]), parseStage(stages[1]), parseStage(stages[2])}, expCtx());
const Document serializedCommand =
- AggregationRequest(expCtx()->ns, stages).serializeToCommandObj();
+ aggregation_request_helper::serializeToCommandDoc(AggregateCommand(expCtx()->ns, stages));
const bool hasChangeStream = false;
auto future = launchAsync([&] {
@@ -117,7 +118,7 @@ TEST_F(DispatchShardPipelineTest, DispatchShardPipelineRetriesOnNetworkError) {
auto pipeline = Pipeline::create(
{parseStage(stages[0]), parseStage(stages[1]), parseStage(stages[2])}, expCtx());
const Document serializedCommand =
- AggregationRequest(expCtx()->ns, stages).serializeToCommandObj();
+ aggregation_request_helper::serializeToCommandDoc(AggregateCommand(expCtx()->ns, stages));
const bool hasChangeStream = false;
auto future = launchAsync([&] {
// Shouldn't throw.
@@ -162,7 +163,7 @@ TEST_F(DispatchShardPipelineTest, DispatchShardPipelineDoesNotRetryOnStaleConfig
auto pipeline = Pipeline::create(
{parseStage(stages[0]), parseStage(stages[1]), parseStage(stages[2])}, expCtx());
const Document serializedCommand =
- AggregationRequest(expCtx()->ns, stages).serializeToCommandObj();
+ aggregation_request_helper::serializeToCommandDoc(AggregateCommand(expCtx()->ns, stages));
const bool hasChangeStream = false;
auto future = launchAsync([&] {
ASSERT_THROWS_CODE(sharded_agg_helpers::dispatchShardPipeline(
@@ -191,7 +192,7 @@ TEST_F(DispatchShardPipelineTest, WrappedDispatchDoesRetryOnStaleConfigError) {
auto pipeline = Pipeline::create(
{parseStage(stages[0]), parseStage(stages[1]), parseStage(stages[2])}, expCtx());
const Document serializedCommand =
- AggregationRequest(expCtx()->ns, stages).serializeToCommandObj();
+ aggregation_request_helper::serializeToCommandDoc(AggregateCommand(expCtx()->ns, stages));
const bool hasChangeStream = false;
auto future = launchAsync([&] {
// Shouldn't throw.
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index 099e6cb6183..b9c89f39bc7 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -61,13 +61,12 @@
namespace mongo {
-class AggregationRequest;
class Document;
/**
* Registers a DocumentSource to have the name 'key'.
*
- * 'liteParser' takes an AggregationRequest and a BSONElement and returns a
+ * 'liteParser' takes an AggregateCommand and a BSONElement and returns a
* LiteParsedDocumentSource. This is used for checks that need to happen before a full parse,
* such as checks about which namespaces are referenced by this aggregation.
*
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index b99c8fa6ddb..e9a31a8912a 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -512,7 +512,7 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson(
BSONObj DocumentSourceChangeStream::replaceResumeTokenInCommand(BSONObj originalCmdObj,
Document resumeToken) {
Document originalCmd(originalCmdObj);
- auto pipeline = originalCmd[AggregationRequest::kPipelineName].getArray();
+ auto pipeline = originalCmd[AggregateCommand::kPipelineFieldName].getArray();
// A $changeStream must be the first element of the pipeline in order to be able
// to replace (or add) a resume token.
invariant(!pipeline[0][DocumentSourceChangeStream::kStageName].missing());
@@ -527,7 +527,7 @@ BSONObj DocumentSourceChangeStream::replaceResumeTokenInCommand(BSONObj original
pipeline[0] =
Value(Document{{DocumentSourceChangeStream::kStageName, changeStreamStage.freeze()}});
MutableDocument newCmd(std::move(originalCmd));
- newCmd[AggregationRequest::kPipelineName] = Value(pipeline);
+ newCmd[AggregateCommand::kPipelineFieldName] = Value(pipeline);
return newCmd.freeze().toBson();
}
diff --git a/src/mongo/db/pipeline/document_source_group_test.cpp b/src/mongo/db/pipeline/document_source_group_test.cpp
index 568f31e08ff..6ce0d370318 100644
--- a/src/mongo/db/pipeline/document_source_group_test.cpp
+++ b/src/mongo/db/pipeline/document_source_group_test.cpp
@@ -42,8 +42,9 @@
#include "mongo/bson/json.h"
#include "mongo/db/exec/document_value/document_value_test_util.h"
#include "mongo/db/exec/document_value/value_comparator.h"
+#include "mongo/db/pipeline/aggregate_command_gen.h"
#include "mongo/db/pipeline/aggregation_context_fixture.h"
-#include "mongo/db/pipeline/aggregation_request.h"
+#include "mongo/db/pipeline/aggregation_request_helper.h"
#include "mongo/db/pipeline/dependencies.h"
#include "mongo/db/pipeline/document_source_group.h"
#include "mongo/db/pipeline/document_source_mock.h"
@@ -256,7 +257,7 @@ public:
Base()
: _opCtx(makeOperationContext()),
_ctx(new ExpressionContextForTest(_opCtx.get(),
- AggregationRequest(NamespaceString(ns), {}))),
+ AggregateCommand(NamespaceString(ns), {}))),
_tempDir("DocumentSourceGroupTest") {}
protected:
@@ -265,7 +266,7 @@ protected:
BSONElement specElement = namedSpec.firstElement();
intrusive_ptr<ExpressionContextForTest> expressionContext =
- new ExpressionContextForTest(_opCtx.get(), AggregationRequest(NamespaceString(ns), {}));
+ new ExpressionContextForTest(_opCtx.get(), AggregateCommand(NamespaceString(ns), {}));
// For $group, 'inShard' implies 'fromMongos' and 'needsMerge'.
expressionContext->fromMongos = expressionContext->needsMerge = inShard;
expressionContext->inMongos = inMongos;
diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp
index 099109366fc..d57097ec5af 100644
--- a/src/mongo/db/pipeline/document_source_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/exec/document_value/value.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/matcher/expression_algo.h"
+#include "mongo/db/pipeline/aggregation_request_helper.h"
#include "mongo/db/pipeline/document_path_support.h"
#include "mongo/db/pipeline/document_source_merge_gen.h"
#include "mongo/db/pipeline/expression.h"
@@ -194,7 +195,7 @@ std::unique_ptr<DocumentSourceLookUp::LiteParsed> DocumentSourceLookUp::LitePars
auto pipelineElem = specObj["pipeline"];
boost::optional<LiteParsedPipeline> liteParsedPipeline;
if (pipelineElem) {
- auto pipeline = uassertStatusOK(AggregationRequest::parsePipelineFromBSON(pipelineElem));
+ auto pipeline = parsePipelineFromBSON(pipelineElem);
liteParsedPipeline = LiteParsedPipeline(fromNss, pipeline);
}
@@ -877,13 +878,7 @@ intrusive_ptr<DocumentSource> DocumentSourceLookUp::createFromBson(
const auto argName = argument.fieldNameStringData();
if (argName == "pipeline") {
- auto result = AggregationRequest::parsePipelineFromBSON(argument);
- if (!result.isOK()) {
- uasserted(ErrorCodes::FailedToParse,
- str::stream() << "invalid $lookup pipeline definition: "
- << result.getStatus().toString());
- }
- pipeline = std::move(result.getValue());
+ pipeline = parsePipelineFromBSON(argument);
hasPipeline = true;
continue;
}
diff --git a/src/mongo/db/pipeline/document_source_merge_spec.cpp b/src/mongo/db/pipeline/document_source_merge_spec.cpp
index fe6123261b4..2ca9a21edf7 100644
--- a/src/mongo/db/pipeline/document_source_merge_spec.cpp
+++ b/src/mongo/db/pipeline/document_source_merge_spec.cpp
@@ -34,7 +34,7 @@
#include <fmt/format.h>
#include "mongo/bson/bsonobjbuilder.h"
-#include "mongo/db/pipeline/aggregation_request.h"
+#include "mongo/db/pipeline/aggregation_request_helper.h"
#include "mongo/db/pipeline/document_source_merge.h"
#include "mongo/db/pipeline/document_source_merge_gen.h"
@@ -110,8 +110,7 @@ MergeWhenMatchedPolicy mergeWhenMatchedParseFromBSON(const BSONElement& elem) {
elem.type() == BSONType::String || elem.type() == BSONType::Array);
if (elem.type() == BSONType::Array) {
- return {MergeWhenMatchedModeEnum::kPipeline,
- uassertStatusOK(AggregationRequest::parsePipelineFromBSON(elem))};
+ return {MergeWhenMatchedModeEnum::kPipeline, parsePipelineFromBSON(elem)};
}
invariant(elem.type() == BSONType::String);
diff --git a/src/mongo/db/pipeline/document_source_unwind_test.cpp b/src/mongo/db/pipeline/document_source_unwind_test.cpp
index bd287fd384b..fa4519a6832 100644
--- a/src/mongo/db/pipeline/document_source_unwind_test.cpp
+++ b/src/mongo/db/pipeline/document_source_unwind_test.cpp
@@ -40,6 +40,7 @@
#include "mongo/bson/json.h"
#include "mongo/db/exec/document_value/document_value_test_util.h"
#include "mongo/db/exec/document_value/value_comparator.h"
+#include "mongo/db/pipeline/aggregate_command_gen.h"
#include "mongo/db/pipeline/aggregation_context_fixture.h"
#include "mongo/db/pipeline/dependencies.h"
#include "mongo/db/pipeline/document_source_mock.h"
@@ -71,7 +72,7 @@ public:
: _queryServiceContext(std::make_unique<QueryTestServiceContext>()),
_opCtx(_queryServiceContext->makeOperationContext()),
_ctx(new ExpressionContextForTest(_opCtx.get(),
- AggregationRequest(NamespaceString(ns), {}))) {}
+ AggregateCommand(NamespaceString(ns), {}))) {}
virtual ~CheckResultsBase() {}
diff --git a/src/mongo/db/pipeline/expression_context.cpp b/src/mongo/db/pipeline/expression_context.cpp
index 0ca09f7a27c..01d34d83cac 100644
--- a/src/mongo/db/pipeline/expression_context.cpp
+++ b/src/mongo/db/pipeline/expression_context.cpp
@@ -46,7 +46,7 @@ ExpressionContext::ResolvedNamespace::ResolvedNamespace(NamespaceString ns,
: ns(std::move(ns)), pipeline(std::move(pipeline)) {}
ExpressionContext::ExpressionContext(OperationContext* opCtx,
- const AggregationRequest& request,
+ const AggregateCommand& request,
std::unique_ptr<CollatorInterface> collator,
std::shared_ptr<MongoProcessInterface> processInterface,
StringMap<ResolvedNamespace> resolvedNamespaces,
@@ -54,18 +54,18 @@ ExpressionContext::ExpressionContext(OperationContext* opCtx,
bool mayDbProfile)
: ExpressionContext(opCtx,
request.getExplain(),
- request.isFromMongos(),
- request.needsMerge(),
- request.shouldAllowDiskUse(),
- request.shouldBypassDocumentValidation(),
+ request.getFromMongos(),
+ request.getNeedsMerge(),
+ request.getAllowDiskUse(),
+ request.getBypassDocumentValidation().value_or(false),
request.getIsMapReduceCommand(),
- request.getNamespaceString(),
+ request.getNamespace(),
request.getLegacyRuntimeConstants(),
std::move(collator),
std::move(processInterface),
std::move(resolvedNamespaces),
std::move(collUUID),
- request.getLetParameters(),
+ request.getLet(),
mayDbProfile) {
if (request.getIsMapReduceCommand()) {
diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h
index 721cf6ffe01..67fa0adcd50 100644
--- a/src/mongo/db/pipeline/expression_context.h
+++ b/src/mongo/db/pipeline/expression_context.h
@@ -42,7 +42,7 @@
#include "mongo/db/exec/document_value/value_comparator.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
-#include "mongo/db/pipeline/aggregation_request.h"
+#include "mongo/db/pipeline/aggregate_command_gen.h"
#include "mongo/db/pipeline/javascript_execution.h"
#include "mongo/db/pipeline/legacy_runtime_constants_gen.h"
#include "mongo/db/pipeline/process_interface/mongo_process_interface.h"
@@ -104,7 +104,7 @@ public:
* 'resolvedNamespaces' maps collection names (not full namespaces) to ResolvedNamespaces.
*/
ExpressionContext(OperationContext* opCtx,
- const AggregationRequest& request,
+ const AggregateCommand& request,
std::unique_ptr<CollatorInterface> collator,
std::shared_ptr<MongoProcessInterface> mongoProcessInterface,
StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces,
@@ -113,7 +113,7 @@ public:
/**
* Constructs an ExpressionContext to be used for Pipeline parsing and evaluation. This version
- * requires finer-grained parameters but does not require an AggregationRequest.
+ * requires finer-grained parameters but does not require an AggregateCommand.
* 'resolvedNamespaces' maps collection names (not full namespaces) to ResolvedNamespaces.
*/
ExpressionContext(OperationContext* opCtx,
diff --git a/src/mongo/db/pipeline/expression_context_for_test.h b/src/mongo/db/pipeline/expression_context_for_test.h
index d5100825daa..85d8dbf7bde 100644
--- a/src/mongo/db/pipeline/expression_context_for_test.h
+++ b/src/mongo/db/pipeline/expression_context_for_test.h
@@ -31,6 +31,7 @@
#include <boost/optional.hpp>
+#include "mongo/db/pipeline/aggregate_command_gen.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/process_interface/stub_mongo_process_interface.h"
#include "mongo/db/query/datetime/date_time_support.h"
@@ -140,7 +141,7 @@ public:
* Constructor which sets the given OperationContext on the ExpressionContextForTest. This will
* also resolve the ExpressionContextForTest's ServiceContext from the OperationContext.
*/
- ExpressionContextForTest(OperationContext* opCtx, const AggregationRequest& request)
+ ExpressionContextForTest(OperationContext* opCtx, const AggregateCommand& request)
: ExpressionContext(
opCtx, request, nullptr, std::make_shared<StubMongoProcessInterface>(), {}, {}),
_serviceContext(opCtx->getServiceContext()) {
diff --git a/src/mongo/db/pipeline/expression_walker_test.cpp b/src/mongo/db/pipeline/expression_walker_test.cpp
index 60185ad9834..447ae1e3c9a 100644
--- a/src/mongo/db/pipeline/expression_walker_test.cpp
+++ b/src/mongo/db/pipeline/expression_walker_test.cpp
@@ -34,7 +34,9 @@
#include "mongo/base/string_data.h"
#include "mongo/bson/json.h"
+#include "mongo/db/pipeline/aggregate_command_gen.h"
#include "mongo/db/pipeline/aggregation_context_fixture.h"
+#include "mongo/db/pipeline/aggregation_request_helper.h"
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/expression_walker.h"
#include "mongo/unittest/unittest.h"
@@ -48,10 +50,9 @@ protected:
const auto inputBson = fromjson("{pipeline: " + jsonArray + "}");
ASSERT_EQUALS(inputBson["pipeline"].type(), BSONType::Array);
- auto rawPipeline =
- uassertStatusOK(AggregationRequest::parsePipelineFromBSON(inputBson["pipeline"]));
+ auto rawPipeline = parsePipelineFromBSON(inputBson["pipeline"]);
NamespaceString testNss("test", "collection");
- AggregationRequest request(testNss, rawPipeline);
+ AggregateCommand request(testNss, rawPipeline);
return Pipeline::parse(request.getPipeline(), getExpCtx());
}
diff --git a/src/mongo/db/pipeline/lite_parsed_pipeline.h b/src/mongo/db/pipeline/lite_parsed_pipeline.h
index 0af48f8b74f..7dc717d507e 100644
--- a/src/mongo/db/pipeline/lite_parsed_pipeline.h
+++ b/src/mongo/db/pipeline/lite_parsed_pipeline.h
@@ -35,7 +35,7 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/db/namespace_string.h"
-#include "mongo/db/pipeline/aggregation_request.h"
+#include "mongo/db/pipeline/aggregate_command_gen.h"
#include "mongo/db/pipeline/lite_parsed_document_source.h"
#include "mongo/db/read_concern_support_result.h"
@@ -53,8 +53,8 @@ public:
* May throw a AssertionException if there is an invalid stage specification, although full
* validation happens later, during Pipeline construction.
*/
- LiteParsedPipeline(const AggregationRequest& request)
- : LiteParsedPipeline(request.getNamespaceString(), request.getPipeline()) {}
+ LiteParsedPipeline(const AggregateCommand& request)
+ : LiteParsedPipeline(request.getNamespace(), request.getPipeline()) {}
LiteParsedPipeline(const NamespaceString& nss, const std::vector<BSONObj>& pipelineStages) {
_stageSpecs.reserve(pipelineStages.size());
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 2915c9c99f6..b2f9bb5491f 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -196,7 +196,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe
BSONObj sortObj,
SkipThenLimit skipThenLimit,
boost::optional<std::string> groupIdForDistinctScan,
- const AggregationRequest* aggRequest,
+ const AggregateCommand* aggRequest,
const size_t plannerOpts,
const MatchExpressionParser::AllowedFeatureSet& matcherFeatures) {
auto qr = std::make_unique<QueryRequest>(nss);
@@ -208,7 +208,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe
qr->setLimit(skipThenLimit.getLimit());
if (aggRequest) {
qr->setExplain(static_cast<bool>(aggRequest->getExplain()));
- qr->setHint(aggRequest->getHint());
+ qr->setHint(aggRequest->getHint().value_or(BSONObj()));
}
// The collation on the ExpressionContext has been resolved to either the user-specified
@@ -318,7 +318,7 @@ StringData extractGeoNearFieldFromIndexes(OperationContext* opCtx,
std::pair<PipelineD::AttachExecutorCallback, std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>>
PipelineD::buildInnerQueryExecutor(const CollectionPtr& collection,
const NamespaceString& nss,
- const AggregationRequest* aggRequest,
+ const AggregateCommand* aggRequest,
Pipeline* pipeline) {
auto expCtx = pipeline->getContext();
@@ -389,7 +389,7 @@ void PipelineD::attachInnerQueryExecutorToPipeline(
void PipelineD::buildAndAttachInnerQueryExecutorToPipeline(const CollectionPtr& collection,
const NamespaceString& nss,
- const AggregationRequest* aggRequest,
+ const AggregateCommand* aggRequest,
Pipeline* pipeline) {
auto callback = PipelineD::buildInnerQueryExecutor(collection, nss, aggRequest, pipeline);
@@ -510,7 +510,7 @@ auto buildProjectionForPushdown(const DepsTracker& deps, Pipeline* pipeline) {
std::pair<PipelineD::AttachExecutorCallback, std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>>
PipelineD::buildInnerQueryExecutorGeneric(const CollectionPtr& collection,
const NamespaceString& nss,
- const AggregationRequest* aggRequest,
+ const AggregateCommand* aggRequest,
Pipeline* pipeline) {
// Make a last effort to optimize pipeline stages before potentially detaching them to be pushed
// down into the query executor.
@@ -599,7 +599,7 @@ PipelineD::buildInnerQueryExecutorGeneric(const CollectionPtr& collection,
std::pair<PipelineD::AttachExecutorCallback, std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>>
PipelineD::buildInnerQueryExecutorGeoNear(const CollectionPtr& collection,
const NamespaceString& nss,
- const AggregationRequest* aggRequest,
+ const AggregateCommand* aggRequest,
Pipeline* pipeline) {
uassert(ErrorCodes::NamespaceNotFound,
str::stream() << "$geoNear requires a geo index to run, but " << nss.ns()
@@ -667,7 +667,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep
QueryMetadataBitSet unavailableMetadata,
const BSONObj& queryObj,
SkipThenLimit skipThenLimit,
- const AggregationRequest* aggRequest,
+ const AggregateCommand* aggRequest,
const MatchExpressionParser::AllowedFeatureSet& matcherFeatures,
bool* hasNoRequirements) {
invariant(hasNoRequirements);
diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h
index 3a3f4d1f9e9..1bfc2b77922 100644
--- a/src/mongo/db/pipeline/pipeline_d.h
+++ b/src/mongo/db/pipeline/pipeline_d.h
@@ -34,7 +34,7 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/db/namespace_string.h"
-#include "mongo/db/pipeline/aggregation_request.h"
+#include "mongo/db/pipeline/aggregate_command_gen.h"
#include "mongo/db/pipeline/dependencies.h"
#include "mongo/db/pipeline/document_source_cursor.h"
#include "mongo/db/pipeline/document_source_group.h"
@@ -92,7 +92,7 @@ public:
static std::pair<AttachExecutorCallback, std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>>
buildInnerQueryExecutor(const CollectionPtr& collection,
const NamespaceString& nss,
- const AggregationRequest* aggRequest,
+ const AggregateCommand* aggRequest,
Pipeline* pipeline);
/**
@@ -116,7 +116,7 @@ public:
*/
static void buildAndAttachInnerQueryExecutorToPipeline(const CollectionPtr& collection,
const NamespaceString& nss,
- const AggregationRequest* aggRequest,
+ const AggregateCommand* aggRequest,
Pipeline* pipeline);
static Timestamp getLatestOplogTimestamp(const Pipeline* pipeline);
@@ -148,7 +148,7 @@ private:
static std::pair<AttachExecutorCallback, std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>>
buildInnerQueryExecutorGeneric(const CollectionPtr& collection,
const NamespaceString& nss,
- const AggregationRequest* aggRequest,
+ const AggregateCommand* aggRequest,
Pipeline* pipeline);
/**
@@ -159,7 +159,7 @@ private:
static std::pair<AttachExecutorCallback, std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>>
buildInnerQueryExecutorGeoNear(const CollectionPtr& collection,
const NamespaceString& nss,
- const AggregationRequest* aggRequest,
+ const AggregateCommand* aggRequest,
Pipeline* pipeline);
/**
@@ -184,7 +184,7 @@ private:
QueryMetadataBitSet metadataAvailable,
const BSONObj& queryObj,
SkipThenLimit skipThenLimit,
- const AggregationRequest* aggRequest,
+ const AggregateCommand* aggRequest,
const MatchExpressionParser::AllowedFeatureSet& matcherFeatures,
bool* hasNoRequirements);
};
diff --git a/src/mongo/db/pipeline/pipeline_metadata_tree_test.cpp b/src/mongo/db/pipeline/pipeline_metadata_tree_test.cpp
index fb60cfdf0f5..53b88c3113c 100644
--- a/src/mongo/db/pipeline/pipeline_metadata_tree_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_metadata_tree_test.cpp
@@ -41,7 +41,7 @@
#include "mongo/bson/json.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/pipeline/aggregation_context_fixture.h"
-#include "mongo/db/pipeline/aggregation_request.h"
+#include "mongo/db/pipeline/aggregation_request_helper.h"
#include "mongo/db/pipeline/document_source_bucket_auto.h"
#include "mongo/db/pipeline/document_source_facet.h"
#include "mongo/db/pipeline/document_source_graph_lookup.h"
@@ -77,10 +77,9 @@ protected:
const auto inputBson = fromjson("{pipeline: " + jsonArray + "}");
ASSERT_EQUALS(inputBson["pipeline"].type(), BSONType::Array);
- auto rawPipeline =
- uassertStatusOK(AggregationRequest::parsePipelineFromBSON(inputBson["pipeline"]));
+ auto rawPipeline = parsePipelineFromBSON(inputBson["pipeline"]);
NamespaceString testNss("test", "collection");
- AggregationRequest request(testNss, rawPipeline);
+ AggregateCommand request(testNss, rawPipeline);
getExpCtx()->ns = testNss;
return Pipeline::parse(request.getPipeline(), getExpCtx());
diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp
index d1dac07330b..6d69adcc05c 100644
--- a/src/mongo/db/pipeline/pipeline_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_test.cpp
@@ -122,7 +122,7 @@ void assertPipelineOptimizesAndSerializesTo(std::string inputPipeJson,
ASSERT_EQUALS(stageElem.type(), BSONType::Object);
rawPipeline.push_back(stageElem.embeddedObject());
}
- AggregationRequest request(kTestNss, rawPipeline);
+ AggregateCommand request(kTestNss, rawPipeline);
intrusive_ptr<ExpressionContextForTest> ctx =
new ExpressionContextForTest(opCtx.get(), request);
ctx->mongoProcessInterface = std::make_shared<StubExplainInterface>();
@@ -2374,7 +2374,7 @@ public:
ASSERT_EQUALS(stageElem.type(), BSONType::Object);
rawPipeline.push_back(stageElem.embeddedObject());
}
- AggregationRequest request(kTestNss, rawPipeline);
+ AggregateCommand request(kTestNss, rawPipeline);
intrusive_ptr<ExpressionContextForTest> ctx = createExpressionContext(request);
TempDir tempDir("PipelineTest");
ctx->tempDir = tempDir.path();
@@ -2404,7 +2404,7 @@ public:
virtual ~Base() {}
virtual intrusive_ptr<ExpressionContextForTest> createExpressionContext(
- const AggregationRequest& request) {
+ const AggregateCommand& request) {
return new ExpressionContextForTest(&_opCtx, request);
}
@@ -2926,7 +2926,7 @@ class MergeWithUnshardedCollection : public ShardMergerBase {
class MergeWithShardedCollection : public ShardMergerBase {
intrusive_ptr<ExpressionContextForTest> createExpressionContext(
- const AggregationRequest& request) override {
+ const AggregateCommand& request) override {
class ProcessInterface : public StubMongoProcessInterface {
bool isSharded(OperationContext* opCtx, const NamespaceString& ns) override {
return true;
@@ -3160,7 +3160,7 @@ TEST(PipelineInitialSource, GeoNearInitialQuery) {
const std::vector<BSONObj> rawPipeline = {
fromjson("{$geoNear: {distanceField: 'd', near: [0, 0], query: {a: 1}}}")};
intrusive_ptr<ExpressionContextForTest> ctx = new ExpressionContextForTest(
- &_opCtx, AggregationRequest(NamespaceString("a.collection"), rawPipeline));
+ &_opCtx, AggregateCommand(NamespaceString("a.collection"), rawPipeline));
auto pipe = Pipeline::parse(rawPipeline, ctx);
ASSERT_BSONOBJ_EQ(pipe->getInitialQuery(), BSON("a" << 1));
}
@@ -3169,7 +3169,7 @@ TEST(PipelineInitialSource, MatchInitialQuery) {
OperationContextNoop _opCtx;
const std::vector<BSONObj> rawPipeline = {fromjson("{$match: {'a': 4}}")};
intrusive_ptr<ExpressionContextForTest> ctx = new ExpressionContextForTest(
- &_opCtx, AggregationRequest(NamespaceString("a.collection"), rawPipeline));
+ &_opCtx, AggregateCommand(NamespaceString("a.collection"), rawPipeline));
auto pipe = Pipeline::parse(rawPipeline, ctx);
ASSERT_BSONOBJ_EQ(pipe->getInitialQuery(), BSON("a" << 4));
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
index 533c9bc5d47..db56faecdbf 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -33,6 +33,8 @@
#include "sharded_agg_helpers.h"
#include "mongo/db/curop.h"
+#include "mongo/db/pipeline/aggregate_command_gen.h"
+#include "mongo/db/pipeline/aggregation_request_helper.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_change_stream.h"
#include "mongo/db/pipeline/document_source_group.h"
@@ -99,7 +101,7 @@ RemoteCursor openChangeStreamNewShardMonitor(const boost::intrusive_ptr<Expressi
Timestamp startMonitoringAtTime) {
const auto& configShard = Grid::get(expCtx->opCtx)->shardRegistry()->getConfigShard();
// Pipeline: {$changeStream: {startAtOperationTime: [now], allowToRunOnConfigDB: true}}
- AggregationRequest aggReq(
+ AggregateCommand aggReq(
ShardType::ConfigNS,
{BSON(DocumentSourceChangeStream::kStageName
<< BSON(DocumentSourceChangeStreamSpec::kStartAtOperationTimeFieldName
@@ -108,13 +110,14 @@ RemoteCursor openChangeStreamNewShardMonitor(const boost::intrusive_ptr<Expressi
aggReq.setFromMongos(true);
aggReq.setNeedsMerge(true);
aggReq.setBatchSize(0);
- auto cmdObjWithRWC = applyReadWriteConcern(expCtx->opCtx,
- true, /* appendRC */
- !expCtx->explain, /* appendWC */
- aggReq.serializeToCommandObj().toBson());
+ auto cmdObjWithRWC =
+ applyReadWriteConcern(expCtx->opCtx,
+ true, /* appendRC */
+ !expCtx->explain, /* appendWC */
+ aggregation_request_helper::serializeToCommandObj(aggReq));
auto configCursor = establishCursors(expCtx->opCtx,
expCtx->mongoProcessInterface->taskExecutor,
- aggReq.getNamespaceString(),
+ aggReq.getNamespace(),
ReadPreferenceSetting{ReadPreference::SecondaryPreferred},
{{configShard->getId(), cmdObjWithRWC}},
false);
@@ -141,19 +144,19 @@ BSONObj genericTransformForShards(MutableDocument&& cmdForShards,
auto [legacyRuntimeConstants, unusedSerializedVariables] =
expCtx->variablesParseState.transitionalCompatibilitySerialize(expCtx->variables);
- cmdForShards[AggregationRequest::kLegacyRuntimeConstantsName] =
+ cmdForShards[AggregateCommand::kLegacyRuntimeConstantsFieldName] =
Value(legacyRuntimeConstants.toBSON());
} else {
// Either this is a "modern" cluster or we are a mongos and can assume the shards are
// "modern" and will understand the 'let' parameter.
- cmdForShards[AggregationRequest::kLetName] =
+ cmdForShards[AggregateCommand::kLetFieldName] =
Value(expCtx->variablesParseState.serialize(expCtx->variables));
}
- cmdForShards[AggregationRequest::kFromMongosName] = Value(expCtx->inMongos);
+ cmdForShards[AggregateCommand::kFromMongosFieldName] = Value(expCtx->inMongos);
if (!collationObj.isEmpty()) {
- cmdForShards[AggregationRequest::kCollationName] = Value(collationObj);
+ cmdForShards[AggregateCommand::kCollationFieldName] = Value(collationObj);
}
// If this is a request for an aggregation explain, then we must wrap the aggregate inside an
@@ -597,16 +600,15 @@ void abandonCacheIfSentToShards(Pipeline* shardsPipeline) {
std::unique_ptr<Pipeline, PipelineDeleter> targetShardsAndAddMergeCursors(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
- stdx::variant<std::unique_ptr<Pipeline, PipelineDeleter>, AggregationRequest> targetRequest) {
+ stdx::variant<std::unique_ptr<Pipeline, PipelineDeleter>, AggregateCommand> targetRequest) {
auto&& [aggRequest, pipeline] = [&] {
return stdx::visit(
visit_helper::Overloaded{
[&](std::unique_ptr<Pipeline, PipelineDeleter>&& pipeline) {
- return std::make_pair(
- AggregationRequest(expCtx->ns, pipeline->serializeToBson()),
- std::move(pipeline));
+ return std::make_pair(AggregateCommand(expCtx->ns, pipeline->serializeToBson()),
+ std::move(pipeline));
},
- [&](AggregationRequest&& aggRequest) {
+ [&](AggregateCommand&& aggRequest) {
auto rawPipeline = aggRequest.getPipeline();
return std::make_pair(std::move(aggRequest),
Pipeline::parse(std::move(rawPipeline), expCtx));
@@ -617,7 +619,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> targetShardsAndAddMergeCursors(
invariant(pipeline->getSources().empty() ||
!dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get()));
- // The default value for 'allowDiskUse' and 'maxTimeMS' in the AggregationRequest may not match
+ // The default value for 'allowDiskUse' and 'maxTimeMS' in the AggregateCommand may not match
// what was set on the originating command, so copy it from the ExpressionContext.
aggRequest.setAllowDiskUse(expCtx->allowDiskUse);
@@ -628,8 +630,10 @@ std::unique_ptr<Pipeline, PipelineDeleter> targetShardsAndAddMergeCursors(
LiteParsedPipeline liteParsedPipeline(aggRequest);
auto hasChangeStream = liteParsedPipeline.hasChangeStream();
- auto shardDispatchResults = dispatchShardPipeline(
- aggRequest.serializeToCommandObj(), hasChangeStream, std::move(pipeline));
+ auto shardDispatchResults =
+ dispatchShardPipeline(aggregation_request_helper::serializeToCommandDoc(aggRequest),
+ hasChangeStream,
+ std::move(pipeline));
std::vector<ShardId> targetedShards;
targetedShards.reserve(shardDispatchResults.remoteCursors.size());
@@ -660,36 +664,36 @@ std::unique_ptr<Pipeline, PipelineDeleter> targetShardsAndAddMergeCursors(
std::unique_ptr<Pipeline, PipelineDeleter> runPipelineDirectlyOnSingleShard(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
- AggregationRequest request,
+ AggregateCommand request,
ShardId shardId) {
invariant(!request.getExplain());
- auto readPreference =
- uassertStatusOK(ReadPreferenceSetting::fromContainingBSON(request.getUnwrappedReadPref()));
+ auto readPreference = uassertStatusOK(ReadPreferenceSetting::fromContainingBSON(
+ request.getUnwrappedReadPref().value_or(BSONObj())));
auto* opCtx = expCtx->opCtx;
auto* catalogCache = Grid::get(opCtx)->catalogCache();
- auto cm = uassertStatusOK(
- catalogCache->getCollectionRoutingInfo(opCtx, request.getNamespaceString()));
+ auto cm =
+ uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, request.getNamespace()));
auto versionedCmdObj = [&] {
if (cm.isSharded()) {
- return appendShardVersion(request.serializeToCommandObj().toBson(),
+ return appendShardVersion(aggregation_request_helper::serializeToCommandObj(request),
cm.getVersion(shardId));
} else {
// The collection is unsharded. Don't append shard version info when contacting the
// config servers.
const auto cmdObjWithShardVersion = (shardId != ShardId::kConfigServerId)
- ? appendShardVersion(request.serializeToCommandObj().toBson(),
+ ? appendShardVersion(aggregation_request_helper::serializeToCommandObj(request),
ChunkVersion::UNSHARDED())
- : request.serializeToCommandObj().toBson();
+ : aggregation_request_helper::serializeToCommandObj(request);
return appendDbVersionIfPresent(std::move(cmdObjWithShardVersion), cm.dbVersion());
}
}();
auto cursors = establishCursors(opCtx,
expCtx->mongoProcessInterface->taskExecutor,
- request.getNamespaceString(),
+ request.getNamespace(),
std::move(readPreference),
{{shardId, versionedCmdObj}},
false /* allowPartialResults */,
@@ -784,7 +788,7 @@ BSONObj createPassthroughCommandForShard(
// Create the command for the shards.
MutableDocument targetedCmd(serializedCommand);
if (pipeline) {
- targetedCmd[AggregationRequest::kPipelineName] = Value(pipeline->serialize());
+ targetedCmd[AggregateCommand::kPipelineFieldName] = Value(pipeline->serialize());
}
return genericTransformForShards(
@@ -802,12 +806,12 @@ BSONObj createCommandForTargetedShards(const boost::intrusive_ptr<ExpressionCont
// has defaulted any arguments or otherwise changed the spec. For example, $listSessions may
// have detected a logged in user and appended that user name to the $listSessions spec to
// send to the shards.
- targetedCmd[AggregationRequest::kPipelineName] =
+ targetedCmd[AggregateCommand::kPipelineFieldName] =
Value(splitPipeline.shardsPipeline->serialize());
// When running on many shards with the exchange we may not need merging.
if (needsMerge) {
- targetedCmd[AggregationRequest::kNeedsMergeName] = Value(true);
+ targetedCmd[AggregateCommand::kNeedsMergeFieldName] = Value(true);
// If there aren't any stages like $out in the pipeline being sent to the shards, remove the
// write concern. The write concern should only be applied when there are writes performed
@@ -820,10 +824,10 @@ BSONObj createCommandForTargetedShards(const boost::intrusive_ptr<ExpressionCont
}
}
- targetedCmd[AggregationRequest::kCursorName] =
- Value(DOC(AggregationRequest::kBatchSizeName << 0));
+ targetedCmd[AggregateCommand::kBatchSizeFieldName] =
+ Value(DOC(QueryRequest::kBatchSizeField << 0));
- targetedCmd[AggregationRequest::kExchangeName] =
+ targetedCmd[AggregateCommand::kExchangeFieldName] =
exchangeSpec ? Value(exchangeSpec->exchangeSpec.toBSON()) : Value();
return genericTransformForShards(
@@ -1156,11 +1160,13 @@ BSONObj targetShardsForExplain(Pipeline* ownedPipeline) {
return stages;
}();
- AggregationRequest aggRequest(expCtx->ns, rawStages);
+ AggregateCommand aggRequest(expCtx->ns, rawStages);
LiteParsedPipeline liteParsedPipeline(aggRequest);
auto hasChangeStream = liteParsedPipeline.hasChangeStream();
- auto shardDispatchResults = dispatchShardPipeline(
- aggRequest.serializeToCommandObj(), hasChangeStream, std::move(pipeline));
+ auto shardDispatchResults =
+ dispatchShardPipeline(aggregation_request_helper::serializeToCommandDoc(aggRequest),
+ hasChangeStream,
+ std::move(pipeline));
BSONObjBuilder explainBuilder;
auto appendStatus =
appendExplainResults(std::move(shardDispatchResults), expCtx, &explainBuilder);
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.h b/src/mongo/db/pipeline/sharded_agg_helpers.h
index f0f78a5fa4d..b2be0661aab 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.h
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.h
@@ -198,13 +198,13 @@ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorToPipeline(Pipeline* owne
* beginning with that DocumentSourceMergeCursors stage. Note that one of the 'remote' cursors might
* be this node itself.
*
- * Use the AggregationRequest alternative for 'targetRequest' to explicitly specify command options
+ * Use the AggregateCommand alternative for 'targetRequest' to explicitly specify command options
* (e.g. read concern) to the shards when establishing remote cursors. Note that doing so incurs the
* cost of parsing the pipeline.
*/
std::unique_ptr<Pipeline, PipelineDeleter> targetShardsAndAddMergeCursors(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
- stdx::variant<std::unique_ptr<Pipeline, PipelineDeleter>, AggregationRequest> targetRequest);
+ stdx::variant<std::unique_ptr<Pipeline, PipelineDeleter>, AggregateCommand> targetRequest);
/**
* For a sharded or unsharded collection, establishes a remote cursor on only the specified shard,
@@ -215,11 +215,11 @@ std::unique_ptr<Pipeline, PipelineDeleter> targetShardsAndAddMergeCursors(
* especially useful for reading from unsharded collections such as config.transactions and
* local.oplog.rs that cannot be targeted by targetShardsAndAddMergeCursors().
*
- * Note that the specified AggregationRequest must not be for an explain command.
+ * Note that the specified AggregateCommand must not be for an explain command.
*/
std::unique_ptr<Pipeline, PipelineDeleter> runPipelineDirectlyOnSingleShard(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
- AggregationRequest request,
+ AggregateCommand request,
ShardId shardId);
} // namespace sharded_agg_helpers
diff --git a/src/mongo/db/query/SConscript b/src/mongo/db/query/SConscript
index 16af82ee571..8a7eed3fde0 100644
--- a/src/mongo/db/query/SConscript
+++ b/src/mongo/db/query/SConscript
@@ -105,6 +105,7 @@ env.Library(
'distinct_command.idl',
],
LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/db/namespace_string',
'$BUILD_DIR/mongo/idl/idl_parser',
],
)
@@ -345,7 +346,7 @@ env.CppUnitTest(
"$BUILD_DIR/mongo/db/auth/authmocks",
"$BUILD_DIR/mongo/db/concurrency/lock_manager",
"$BUILD_DIR/mongo/db/exec/sbe/sbe_plan_stage_test",
- "$BUILD_DIR/mongo/db/pipeline/aggregation_request",
+ "$BUILD_DIR/mongo/db/pipeline/aggregation_request_helper",
"$BUILD_DIR/mongo/db/query_exec",
"$BUILD_DIR/mongo/db/service_context_d",
"$BUILD_DIR/mongo/db/service_context_d_test_fixture",
diff --git a/src/mongo/db/query/count_command_test.cpp b/src/mongo/db/query/count_command_test.cpp
index 7fc17516694..67b1cc64591 100644
--- a/src/mongo/db/query/count_command_test.cpp
+++ b/src/mongo/db/query/count_command_test.cpp
@@ -32,7 +32,7 @@
#include <algorithm>
#include "mongo/bson/json.h"
-#include "mongo/db/pipeline/aggregation_request.h"
+#include "mongo/db/pipeline/aggregation_request_helper.h"
#include "mongo/db/query/count_command_as_aggregation_command.h"
#include "mongo/db/query/count_command_gen.h"
#include "mongo/unittest/unittest.h"
@@ -162,9 +162,10 @@ TEST(CountCommandTest, ConvertToAggregationWithHint) {
<< "hint" << BSON("x" << 1));
auto countCmd = CountCommand::parse(ctxt, commandObj);
auto agg = uassertStatusOK(countCommandAsAggregationCommand(countCmd, testns));
+ auto cmdObj = OpMsgRequest::fromDBAndBody(testns.db(), agg).body;
- auto ar = uassertStatusOK(AggregationRequest::parseFromBSON(testns, agg));
- ASSERT_BSONOBJ_EQ(ar.getHint(), BSON("x" << 1));
+ auto ar = uassertStatusOK(aggregation_request_helper::parseFromBSON(testns, cmdObj));
+ ASSERT_BSONOBJ_EQ(ar.getHint().value_or(BSONObj()), BSON("x" << 1));
std::vector<BSONObj> expectedPipeline{BSON("$count"
<< "count")};
@@ -183,11 +184,12 @@ TEST(CountCommandTest, ConvertToAggregationWithQueryAndFilterAndLimit) {
<< "limit" << 200 << "skip" << 300 << "query" << BSON("x" << 7));
auto countCmd = CountCommand::parse(ctxt, commandObj);
auto agg = uassertStatusOK(countCommandAsAggregationCommand(countCmd, testns));
+ auto cmdObj = OpMsgRequest::fromDBAndBody(testns.db(), agg).body;
- auto ar = uassertStatusOK(AggregationRequest::parseFromBSON(testns, agg));
- ASSERT_EQ(ar.getBatchSize(), AggregationRequest::kDefaultBatchSize);
- ASSERT_EQ(ar.getNamespaceString(), testns);
- ASSERT_BSONOBJ_EQ(ar.getCollation(), BSONObj());
+ auto ar = uassertStatusOK(aggregation_request_helper::parseFromBSON(testns, cmdObj));
+ ASSERT_EQ(ar.getBatchSize(), aggregation_request_helper::kDefaultBatchSize);
+ ASSERT_EQ(ar.getNamespace(), testns);
+ ASSERT_BSONOBJ_EQ(ar.getCollation().value_or(BSONObj()), BSONObj());
std::vector<BSONObj> expectedPipeline{BSON("$match" << BSON("x" << 7)),
BSON("$skip" << 300),
@@ -207,9 +209,10 @@ TEST(CountCommandTest, ConvertToAggregationWithMaxTimeMS) {
<< "maxTimeMS" << 100 << "$db"
<< "TestDB"));
auto agg = uassertStatusOK(countCommandAsAggregationCommand(countCmd, testns));
+ auto cmdObj = OpMsgRequest::fromDBAndBody(testns.db(), agg).body;
- auto ar = uassertStatusOK(AggregationRequest::parseFromBSON(testns, agg));
- ASSERT_EQ(ar.getMaxTimeMS(), 100u);
+ auto ar = uassertStatusOK(aggregation_request_helper::parseFromBSON(testns, cmdObj));
+ ASSERT_EQ(ar.getMaxTimeMS().value_or(0), 100u);
std::vector<BSONObj> expectedPipeline{BSON("$count"
<< "count")};
@@ -229,9 +232,10 @@ TEST(CountCommandTest, ConvertToAggregationWithQueryOptions) {
countCmd.setQueryOptions(BSON("readPreference"
<< "secondary"));
auto agg = uassertStatusOK(countCommandAsAggregationCommand(countCmd, testns));
+ auto cmdObj = OpMsgRequest::fromDBAndBody(testns.db(), agg).body;
- auto ar = uassertStatusOK(AggregationRequest::parseFromBSON(testns, agg));
- ASSERT_BSONOBJ_EQ(ar.getUnwrappedReadPref(),
+ auto ar = uassertStatusOK(aggregation_request_helper::parseFromBSON(testns, cmdObj));
+ ASSERT_BSONOBJ_EQ(ar.getUnwrappedReadPref().value_or(BSONObj()),
BSON("readPreference"
<< "secondary"));
@@ -253,9 +257,10 @@ TEST(CountCommandTest, ConvertToAggregationWithReadConcern) {
countCmd.setReadConcern(BSON("level"
<< "linearizable"));
auto agg = uassertStatusOK(countCommandAsAggregationCommand(countCmd, testns));
+ auto cmdObj = OpMsgRequest::fromDBAndBody(testns.db(), agg).body;
- auto ar = uassertStatusOK(AggregationRequest::parseFromBSON(testns, agg));
- ASSERT_BSONOBJ_EQ(ar.getReadConcern(),
+ auto ar = uassertStatusOK(aggregation_request_helper::parseFromBSON(testns, cmdObj));
+ ASSERT_BSONOBJ_EQ(ar.getReadConcern().value_or(BSONObj()),
BSON("level"
<< "linearizable"));
diff --git a/src/mongo/db/query/hint_parser.cpp b/src/mongo/db/query/hint_parser.cpp
index 1ecd3c5ee26..d1372d86f55 100644
--- a/src/mongo/db/query/hint_parser.cpp
+++ b/src/mongo/db/query/hint_parser.cpp
@@ -37,7 +37,7 @@ BSONObj parseHint(const BSONElement& element) {
if (element.type() == BSONType::String) {
return BSON("$hint" << element.valueStringData());
} else if (element.type() == BSONType::Object) {
- return element.Obj();
+ return element.Obj().getOwned();
} else {
uasserted(ErrorCodes::FailedToParse, "Hint must be a string or an object");
}
diff --git a/src/mongo/db/query/parsed_distinct_test.cpp b/src/mongo/db/query/parsed_distinct_test.cpp
index 0f0efd7a18a..17706e9e87a 100644
--- a/src/mongo/db/query/parsed_distinct_test.cpp
+++ b/src/mongo/db/query/parsed_distinct_test.cpp
@@ -33,7 +33,7 @@
#include "mongo/bson/json.h"
#include "mongo/db/matcher/extensions_callback_noop.h"
-#include "mongo/db/pipeline/aggregation_request.h"
+#include "mongo/db/pipeline/aggregation_request_helper.h"
#include "mongo/db/query/parsed_distinct.h"
#include "mongo/db/query/query_test_service_context.h"
#include "mongo/unittest/unittest.h"
@@ -59,15 +59,16 @@ TEST(ParsedDistinctTest, ConvertToAggregationNoQuery) {
auto agg = pd.getValue().asAggregationCommand();
ASSERT_OK(agg);
- auto ar = AggregationRequest::parseFromBSON(testns, agg.getValue());
+ auto cmdObj = OpMsgRequest::fromDBAndBody(testns.db(), agg.getValue()).body;
+ auto ar = aggregation_request_helper::parseFromBSON(testns, cmdObj);
ASSERT_OK(ar.getStatus());
ASSERT(!ar.getValue().getExplain());
- ASSERT_EQ(ar.getValue().getBatchSize(), AggregationRequest::kDefaultBatchSize);
- ASSERT_EQ(ar.getValue().getNamespaceString(), testns);
- ASSERT_BSONOBJ_EQ(ar.getValue().getCollation(), BSONObj());
- ASSERT(ar.getValue().getReadConcern().isEmpty());
- ASSERT(ar.getValue().getUnwrappedReadPref().isEmpty());
- ASSERT_EQUALS(ar.getValue().getMaxTimeMS(), 0u);
+ ASSERT_EQ(ar.getValue().getBatchSize(), aggregation_request_helper::kDefaultBatchSize);
+ ASSERT_EQ(ar.getValue().getNamespace(), testns);
+ ASSERT_BSONOBJ_EQ(ar.getValue().getCollation().value_or(BSONObj()), BSONObj());
+ ASSERT(ar.getValue().getReadConcern().value_or(BSONObj()).isEmpty());
+ ASSERT(ar.getValue().getUnwrappedReadPref().value_or(BSONObj()).isEmpty());
+ ASSERT_EQUALS(ar.getValue().getMaxTimeMS().value_or(0), 0u);
std::vector<BSONObj> expectedPipeline{
BSON("$unwind" << BSON("path"
@@ -98,15 +99,16 @@ TEST(ParsedDistinctTest, ConvertToAggregationDottedPathNoQuery) {
auto agg = pd.getValue().asAggregationCommand();
ASSERT_OK(agg);
- auto ar = AggregationRequest::parseFromBSON(testns, agg.getValue());
+ auto cmdObj = OpMsgRequest::fromDBAndBody(testns.db(), agg.getValue()).body;
+ auto ar = aggregation_request_helper::parseFromBSON(testns, cmdObj);
ASSERT_OK(ar.getStatus());
ASSERT(!ar.getValue().getExplain());
- ASSERT_EQ(ar.getValue().getBatchSize(), AggregationRequest::kDefaultBatchSize);
- ASSERT_EQ(ar.getValue().getNamespaceString(), testns);
- ASSERT_BSONOBJ_EQ(ar.getValue().getCollation(), BSONObj());
- ASSERT(ar.getValue().getReadConcern().isEmpty());
- ASSERT(ar.getValue().getUnwrappedReadPref().isEmpty());
- ASSERT_EQUALS(ar.getValue().getMaxTimeMS(), 0u);
+ ASSERT_EQ(ar.getValue().getBatchSize(), aggregation_request_helper::kDefaultBatchSize);
+ ASSERT_EQ(ar.getValue().getNamespace(), testns);
+ ASSERT_BSONOBJ_EQ(ar.getValue().getCollation().value_or(BSONObj()), BSONObj());
+ ASSERT(ar.getValue().getReadConcern().value_or(BSONObj()).isEmpty());
+ ASSERT(ar.getValue().getUnwrappedReadPref().value_or(BSONObj()).isEmpty());
+ ASSERT_EQUALS(ar.getValue().getMaxTimeMS().value_or(0), 0u);
std::vector<BSONObj> expectedPipeline{
BSON("$unwind" << BSON("path"
@@ -162,21 +164,22 @@ TEST(ParsedDistinctTest, ConvertToAggregationWithAllOptions) {
auto agg = pd.getValue().asAggregationCommand();
ASSERT_OK(agg);
- auto ar = AggregationRequest::parseFromBSON(testns, agg.getValue());
+ auto cmdObj = OpMsgRequest::fromDBAndBody(testns.db(), agg.getValue()).body;
+ auto ar = aggregation_request_helper::parseFromBSON(testns, cmdObj);
ASSERT_OK(ar.getStatus());
ASSERT(!ar.getValue().getExplain());
- ASSERT_EQ(ar.getValue().getBatchSize(), AggregationRequest::kDefaultBatchSize);
- ASSERT_EQ(ar.getValue().getNamespaceString(), testns);
- ASSERT_BSONOBJ_EQ(ar.getValue().getCollation(),
+ ASSERT_EQ(ar.getValue().getBatchSize(), aggregation_request_helper::kDefaultBatchSize);
+ ASSERT_EQ(ar.getValue().getNamespace(), testns);
+ ASSERT_BSONOBJ_EQ(ar.getValue().getCollation().value_or(BSONObj()),
BSON("locale"
<< "en_US"));
- ASSERT_BSONOBJ_EQ(ar.getValue().getReadConcern(),
+ ASSERT_BSONOBJ_EQ(ar.getValue().getReadConcern().value_or(BSONObj()),
BSON("level"
<< "linearizable"));
- ASSERT_BSONOBJ_EQ(ar.getValue().getUnwrappedReadPref(),
+ ASSERT_BSONOBJ_EQ(ar.getValue().getUnwrappedReadPref().value_or(BSONObj()),
BSON("readPreference"
<< "secondary"));
- ASSERT_EQUALS(ar.getValue().getMaxTimeMS(), 100u);
+ ASSERT_EQUALS(ar.getValue().getMaxTimeMS().value_or(0), 100u);
std::vector<BSONObj> expectedPipeline{
BSON("$unwind" << BSON("path"
@@ -208,15 +211,16 @@ TEST(ParsedDistinctTest, ConvertToAggregationWithQuery) {
auto agg = pd.getValue().asAggregationCommand();
ASSERT_OK(agg);
- auto ar = AggregationRequest::parseFromBSON(testns, agg.getValue());
+ auto cmdObj = OpMsgRequest::fromDBAndBody(testns.db(), agg.getValue()).body;
+ auto ar = aggregation_request_helper::parseFromBSON(testns, cmdObj);
ASSERT_OK(ar.getStatus());
ASSERT(!ar.getValue().getExplain());
- ASSERT_EQ(ar.getValue().getBatchSize(), AggregationRequest::kDefaultBatchSize);
- ASSERT_EQ(ar.getValue().getNamespaceString(), testns);
- ASSERT_BSONOBJ_EQ(ar.getValue().getCollation(), BSONObj());
- ASSERT(ar.getValue().getReadConcern().isEmpty());
- ASSERT(ar.getValue().getUnwrappedReadPref().isEmpty());
- ASSERT_EQUALS(ar.getValue().getMaxTimeMS(), 0u);
+ ASSERT_EQ(ar.getValue().getBatchSize(), aggregation_request_helper::kDefaultBatchSize);
+ ASSERT_EQ(ar.getValue().getNamespace(), testns);
+ ASSERT_BSONOBJ_EQ(ar.getValue().getCollation().value_or(BSONObj()), BSONObj());
+ ASSERT(ar.getValue().getReadConcern().value_or(BSONObj()).isEmpty());
+ ASSERT(ar.getValue().getUnwrappedReadPref().value_or(BSONObj()).isEmpty());
+ ASSERT_EQUALS(ar.getValue().getMaxTimeMS().value_or(0), 0u);
std::vector<BSONObj> expectedPipeline{
BSON("$match" << BSON("z" << 7)),
@@ -250,11 +254,12 @@ TEST(ParsedDistinctTest, ExplainNotIncludedWhenConvertingToAggregationCommand) {
ASSERT_FALSE(agg.getValue().hasField("explain"));
- auto ar = AggregationRequest::parseFromBSON(testns, agg.getValue());
+ auto cmdObj = OpMsgRequest::fromDBAndBody(testns.db(), agg.getValue()).body;
+ auto ar = aggregation_request_helper::parseFromBSON(testns, cmdObj);
ASSERT_OK(ar.getStatus());
ASSERT(!ar.getValue().getExplain());
- ASSERT_EQ(ar.getValue().getNamespaceString(), testns);
- ASSERT_BSONOBJ_EQ(ar.getValue().getCollation(), BSONObj());
+ ASSERT_EQ(ar.getValue().getNamespace(), testns);
+ ASSERT_BSONOBJ_EQ(ar.getValue().getCollation().value_or(BSONObj()), BSONObj());
std::vector<BSONObj> expectedPipeline{
BSON("$unwind" << BSON("path"
diff --git a/src/mongo/db/query/query_request_test.cpp b/src/mongo/db/query/query_request_test.cpp
index 4c12adf9ea5..49692f72f5c 100644
--- a/src/mongo/db/query/query_request_test.cpp
+++ b/src/mongo/db/query/query_request_test.cpp
@@ -38,7 +38,7 @@
#include "mongo/db/dbmessage.h"
#include "mongo/db/json.h"
#include "mongo/db/namespace_string.h"
-#include "mongo/db/pipeline/aggregation_request.h"
+#include "mongo/db/pipeline/aggregation_request_helper.h"
#include "mongo/db/query/query_request.h"
#include "mongo/db/service_context_test_fixture.h"
#include "mongo/unittest/unittest.h"
@@ -1288,13 +1288,14 @@ TEST(QueryRequestTest, ConvertToAggregationSucceeds) {
auto agg = qr.asAggregationCommand();
ASSERT_OK(agg);
- auto ar = AggregationRequest::parseFromBSON(testns, agg.getValue());
+ auto aggCmd = OpMsgRequest::fromDBAndBody(testns.db(), agg.getValue()).body;
+ auto ar = aggregation_request_helper::parseFromBSON(testns, aggCmd);
ASSERT_OK(ar.getStatus());
ASSERT(!ar.getValue().getExplain());
ASSERT(ar.getValue().getPipeline().empty());
- ASSERT_EQ(ar.getValue().getBatchSize(), AggregationRequest::kDefaultBatchSize);
- ASSERT_EQ(ar.getValue().getNamespaceString(), testns);
- ASSERT_BSONOBJ_EQ(ar.getValue().getCollation(), BSONObj());
+ ASSERT_EQ(ar.getValue().getBatchSize(), aggregation_request_helper::kDefaultBatchSize);
+ ASSERT_EQ(ar.getValue().getNamespace(), testns);
+ ASSERT_BSONOBJ_EQ(ar.getValue().getCollation().value_or(BSONObj()), BSONObj());
}
TEST(QueryRequestTest, ConvertToAggregationOmitsExplain) {
@@ -1303,23 +1304,25 @@ TEST(QueryRequestTest, ConvertToAggregationOmitsExplain) {
auto agg = qr.asAggregationCommand();
ASSERT_OK(agg);
- auto ar = AggregationRequest::parseFromBSON(testns, agg.getValue());
+ auto aggCmd = OpMsgRequest::fromDBAndBody(testns.db(), agg.getValue()).body;
+ auto ar = aggregation_request_helper::parseFromBSON(testns, aggCmd);
ASSERT_OK(ar.getStatus());
ASSERT_FALSE(ar.getValue().getExplain());
ASSERT(ar.getValue().getPipeline().empty());
- ASSERT_EQ(ar.getValue().getNamespaceString(), testns);
- ASSERT_BSONOBJ_EQ(ar.getValue().getCollation(), BSONObj());
+ ASSERT_EQ(ar.getValue().getNamespace(), testns);
+ ASSERT_BSONOBJ_EQ(ar.getValue().getCollation().value_or(BSONObj()), BSONObj());
}
TEST(QueryRequestTest, ConvertToAggregationWithHintSucceeds) {
QueryRequest qr(testns);
qr.setHint(fromjson("{a_1: -1}"));
- const auto aggCmd = qr.asAggregationCommand();
- ASSERT_OK(aggCmd);
+ const auto agg = qr.asAggregationCommand();
+ ASSERT_OK(agg);
- auto ar = AggregationRequest::parseFromBSON(testns, aggCmd.getValue());
+ auto aggCmd = OpMsgRequest::fromDBAndBody(testns.db(), agg.getValue()).body;
+ auto ar = aggregation_request_helper::parseFromBSON(testns, aggCmd);
ASSERT_OK(ar.getStatus());
- ASSERT_BSONOBJ_EQ(qr.getHint(), ar.getValue().getHint());
+ ASSERT_BSONOBJ_EQ(qr.getHint(), ar.getValue().getHint().value_or(BSONObj()));
}
TEST(QueryRequestTest, ConvertToAggregationWithMinFails) {
@@ -1420,12 +1423,13 @@ TEST(QueryRequestTest, ConvertToAggregationWithPipeline) {
auto agg = qr.asAggregationCommand();
ASSERT_OK(agg);
- auto ar = AggregationRequest::parseFromBSON(testns, agg.getValue());
+ auto aggCmd = OpMsgRequest::fromDBAndBody(testns.db(), agg.getValue()).body;
+ auto ar = aggregation_request_helper::parseFromBSON(testns, aggCmd);
ASSERT_OK(ar.getStatus());
ASSERT(!ar.getValue().getExplain());
- ASSERT_EQ(ar.getValue().getBatchSize(), AggregationRequest::kDefaultBatchSize);
- ASSERT_EQ(ar.getValue().getNamespaceString(), testns);
- ASSERT_BSONOBJ_EQ(ar.getValue().getCollation(), BSONObj());
+ ASSERT_EQ(ar.getValue().getBatchSize(), aggregation_request_helper::kDefaultBatchSize);
+ ASSERT_EQ(ar.getValue().getNamespace(), testns);
+ ASSERT_BSONOBJ_EQ(ar.getValue().getCollation().value_or(BSONObj()), BSONObj());
std::vector<BSONObj> expectedPipeline{BSON("$match" << BSON("x" << 1)),
BSON("$sort" << BSON("y" << -1)),
@@ -1445,12 +1449,13 @@ TEST(QueryRequestTest, ConvertToAggregationWithBatchSize) {
auto agg = qr.asAggregationCommand();
ASSERT_OK(agg);
- auto ar = AggregationRequest::parseFromBSON(testns, agg.getValue());
+ auto aggCmd = OpMsgRequest::fromDBAndBody(testns.db(), agg.getValue()).body;
+ auto ar = aggregation_request_helper::parseFromBSON(testns, aggCmd);
ASSERT_OK(ar.getStatus());
ASSERT(!ar.getValue().getExplain());
- ASSERT_EQ(ar.getValue().getNamespaceString(), testns);
+ ASSERT_EQ(ar.getValue().getNamespace(), testns);
ASSERT_EQ(ar.getValue().getBatchSize(), 4LL);
- ASSERT_BSONOBJ_EQ(ar.getValue().getCollation(), BSONObj());
+ ASSERT_BSONOBJ_EQ(ar.getValue().getCollation().value_or(BSONObj()), BSONObj());
}
TEST(QueryRequestTest, ConvertToAggregationWithMaxTimeMS) {
@@ -1463,12 +1468,13 @@ TEST(QueryRequestTest, ConvertToAggregationWithMaxTimeMS) {
const BSONObj cmdObj = agg.getValue();
ASSERT_EQ(cmdObj["maxTimeMS"].Int(), 9);
- auto ar = AggregationRequest::parseFromBSON(testns, cmdObj);
+ auto aggCmd = OpMsgRequest::fromDBAndBody(testns.db(), cmdObj).body;
+ auto ar = aggregation_request_helper::parseFromBSON(testns, aggCmd);
ASSERT_OK(ar.getStatus());
ASSERT(!ar.getValue().getExplain());
- ASSERT_EQ(ar.getValue().getBatchSize(), AggregationRequest::kDefaultBatchSize);
- ASSERT_EQ(ar.getValue().getNamespaceString(), testns);
- ASSERT_BSONOBJ_EQ(ar.getValue().getCollation(), BSONObj());
+ ASSERT_EQ(ar.getValue().getBatchSize(), aggregation_request_helper::kDefaultBatchSize);
+ ASSERT_EQ(ar.getValue().getNamespace(), testns);
+ ASSERT_BSONOBJ_EQ(ar.getValue().getCollation().value_or(BSONObj()), BSONObj());
}
TEST(QueryRequestTest, ConvertToAggregationWithCollationSucceeds) {
@@ -1477,13 +1483,14 @@ TEST(QueryRequestTest, ConvertToAggregationWithCollationSucceeds) {
auto agg = qr.asAggregationCommand();
ASSERT_OK(agg);
- auto ar = AggregationRequest::parseFromBSON(testns, agg.getValue());
+ auto aggCmd = OpMsgRequest::fromDBAndBody(testns.db(), agg.getValue()).body;
+ auto ar = aggregation_request_helper::parseFromBSON(testns, aggCmd);
ASSERT_OK(ar.getStatus());
ASSERT(!ar.getValue().getExplain());
ASSERT(ar.getValue().getPipeline().empty());
- ASSERT_EQ(ar.getValue().getBatchSize(), AggregationRequest::kDefaultBatchSize);
- ASSERT_EQ(ar.getValue().getNamespaceString(), testns);
- ASSERT_BSONOBJ_EQ(ar.getValue().getCollation(), BSON("f" << 1));
+ ASSERT_EQ(ar.getValue().getBatchSize(), aggregation_request_helper::kDefaultBatchSize);
+ ASSERT_EQ(ar.getValue().getNamespace(), testns);
+ ASSERT_BSONOBJ_EQ(ar.getValue().getCollation().value_or(BSONObj()), BSON("f" << 1));
}
TEST(QueryRequestTest, ConvertToAggregationWithReadOnceFails) {
@@ -1507,7 +1514,8 @@ TEST(QueryRequestTest, ConvertToAggregationWithLegacyRuntimeConstantsSucceeds) {
auto agg = qr.asAggregationCommand();
ASSERT_OK(agg);
- auto ar = AggregationRequest::parseFromBSON(testns, agg.getValue());
+ auto aggCmd = OpMsgRequest::fromDBAndBody(testns.db(), agg.getValue()).body;
+ auto ar = aggregation_request_helper::parseFromBSON(testns, aggCmd);
ASSERT_OK(ar.getStatus());
ASSERT(ar.getValue().getLegacyRuntimeConstants().has_value());
ASSERT_EQ(ar.getValue().getLegacyRuntimeConstants()->getLocalNow(), rtc.getLocalNow());
@@ -1517,23 +1525,25 @@ TEST(QueryRequestTest, ConvertToAggregationWithLegacyRuntimeConstantsSucceeds) {
TEST(QueryRequestTest, ConvertToAggregationWithAllowDiskUseTrueSucceeds) {
QueryRequest qr(testns);
qr.setAllowDiskUse(true);
- const auto aggCmd = qr.asAggregationCommand();
- ASSERT_OK(aggCmd.getStatus());
+ const auto agg = qr.asAggregationCommand();
+ ASSERT_OK(agg.getStatus());
- auto ar = AggregationRequest::parseFromBSON(testns, aggCmd.getValue());
+ auto aggCmd = OpMsgRequest::fromDBAndBody(testns.db(), agg.getValue()).body;
+ auto ar = aggregation_request_helper::parseFromBSON(testns, aggCmd);
ASSERT_OK(ar.getStatus());
- ASSERT_EQ(true, ar.getValue().shouldAllowDiskUse());
+ ASSERT_EQ(true, ar.getValue().getAllowDiskUse());
}
TEST(QueryRequestTest, ConvertToAggregationWithAllowDiskUseFalseSucceeds) {
QueryRequest qr(testns);
qr.setAllowDiskUse(false);
- const auto aggCmd = qr.asAggregationCommand();
- ASSERT_OK(aggCmd.getStatus());
+ const auto agg = qr.asAggregationCommand();
+ ASSERT_OK(agg.getStatus());
- auto ar = AggregationRequest::parseFromBSON(testns, aggCmd.getValue());
+ auto aggCmd = OpMsgRequest::fromDBAndBody(testns.db(), agg.getValue()).body;
+ auto ar = aggregation_request_helper::parseFromBSON(testns, aggCmd);
ASSERT_OK(ar.getStatus());
- ASSERT_EQ(false, ar.getValue().shouldAllowDiskUse());
+ ASSERT_EQ(false, ar.getValue().getAllowDiskUse());
}
TEST(QueryRequestTest, ConvertToFindWithAllowDiskUseTrueSucceeds) {
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 1897346b598..95094cf06c8 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -117,6 +117,7 @@ env.Library(
'$BUILD_DIR/mongo/db/dbhelpers',
'$BUILD_DIR/mongo/db/op_observer_impl',
'$BUILD_DIR/mongo/db/ops/write_ops_exec',
+ '$BUILD_DIR/mongo/db/pipeline/aggregation_request_helper',
'$BUILD_DIR/mongo/db/repl/abstract_async_component',
'$BUILD_DIR/mongo/db/repl/oplog',
'$BUILD_DIR/mongo/db/repl/oplog_application',
diff --git a/src/mongo/db/s/periodic_sharded_index_consistency_checker.cpp b/src/mongo/db/s/periodic_sharded_index_consistency_checker.cpp
index 831f4575c92..699d03671dd 100644
--- a/src/mongo/db/s/periodic_sharded_index_consistency_checker.cpp
+++ b/src/mongo/db/s/periodic_sharded_index_consistency_checker.cpp
@@ -141,8 +141,8 @@ void PeriodicShardedIndexConsistencyChecker::_launchShardedIndexConsistencyCheck
continue;
}
- auto request =
- uassertStatusOK(AggregationRequest::parseFromBSON(nss, aggRequestBSON));
+ auto request = uassertStatusOK(
+ aggregation_request_helper::parseFromBSON(nss, aggRequestBSON));
auto catalogCache = Grid::get(opCtx)->catalogCache();
shardVersionRetry(
diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp
index 4470aec360b..5819b96c22f 100644
--- a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp
+++ b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp
@@ -43,7 +43,7 @@
#include "mongo/db/curop.h"
#include "mongo/db/exec/document_value/document.h"
#include "mongo/db/exec/document_value/value.h"
-#include "mongo/db/pipeline/aggregation_request.h"
+#include "mongo/db/pipeline/aggregation_request_helper.h"
#include "mongo/db/pipeline/document_source_lookup.h"
#include "mongo/db/pipeline/document_source_match.h"
#include "mongo/db/pipeline/document_source_replace_root.h"
@@ -161,7 +161,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> ReshardingCollectionCloner::makePipel
std::unique_ptr<Pipeline, PipelineDeleter> ReshardingCollectionCloner::_targetAggregationRequest(
OperationContext* opCtx, const Pipeline& pipeline) {
- AggregationRequest request(_sourceNss, pipeline.serializeToBson());
+ AggregateCommand request(_sourceNss, pipeline.serializeToBson());
request.setCollectionUUID(_sourceUUID);
request.setHint(BSON("_id" << 1));
request.setReadConcern(BSON(repl::ReadConcernArgs::kLevelFieldName
diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp
index f1860457ef4..a2713b2c7d3 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp
@@ -42,6 +42,7 @@
#include "mongo/client/remote_command_targeter.h"
#include "mongo/db/catalog_raii.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/pipeline/aggregate_command_gen.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/db/repl/read_concern_level.h"
#include "mongo/db/s/resharding_util.h"
@@ -238,7 +239,7 @@ void ReshardingOplogFetcher::_ensureCollection(Client* client, const NamespaceSt
});
}
-AggregationRequest ReshardingOplogFetcher::_makeAggregationRequest(Client* client) {
+AggregateCommand ReshardingOplogFetcher::_makeAggregateCommand(Client* client) {
auto opCtxRaii = client->makeOperationContext();
auto opCtx = opCtxRaii.get();
auto expCtx = _makeExpressionContext(opCtx);
@@ -248,8 +249,7 @@ AggregationRequest ReshardingOplogFetcher::_makeAggregationRequest(Client* clien
expCtx, _startAt, _collUUID, _recipientShard, _doesDonorOwnMinKeyChunk)
->serializeToBson();
- AggregationRequest aggRequest(NamespaceString::kRsOplogNamespace,
- std::move(serializedPipeline));
+ AggregateCommand aggRequest(NamespaceString::kRsOplogNamespace, std::move(serializedPipeline));
if (_useReadConcern) {
auto readConcernArgs = repl::ReadConcernArgs(
boost::optional<LogicalTime>(_startAt.getTs()),
@@ -261,7 +261,7 @@ AggregationRequest ReshardingOplogFetcher::_makeAggregationRequest(Client* clien
ReadPreferenceSetting::kMinimalMaxStalenessValue);
aggRequest.setUnwrappedReadPref(readPref.toContainingBSON());
- aggRequest.setWriteConcern({});
+ aggRequest.setWriteConcern(WriteConcernOptions());
aggRequest.setHint(BSON("$natural" << 1));
aggRequest.setRequestReshardingResumeToken(true);
@@ -275,7 +275,7 @@ AggregationRequest ReshardingOplogFetcher::_makeAggregationRequest(Client* clien
bool ReshardingOplogFetcher::consume(Client* client, Shard* shard) {
_ensureCollection(client, _toWriteInto);
- auto aggRequest = _makeAggregationRequest(client);
+ auto aggRequest = _makeAggregateCommand(client);
auto opCtxRaii = client->makeOperationContext();
int batchesProcessed = 0;
diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher.h b/src/mongo/db/s/resharding/resharding_oplog_fetcher.h
index 88a1206562d..ee70e2a5bdc 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_fetcher.h
+++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher.h
@@ -119,7 +119,7 @@ private:
* Returns true if there's more work to do and the task should be rescheduled.
*/
void _ensureCollection(Client* client, const NamespaceString nss);
- AggregationRequest _makeAggregationRequest(Client* client);
+ AggregateCommand _makeAggregateCommand(Client* client);
ExecutorFuture<void> _reschedule(std::shared_ptr<executor::TaskExecutor> executor,
const CancelationToken& cancelToken);
diff --git a/src/mongo/db/s/resharding/resharding_txn_cloner.cpp b/src/mongo/db/s/resharding/resharding_txn_cloner.cpp
index b8b20c15e02..8ffd81fbedf 100644
--- a/src/mongo/db/s/resharding/resharding_txn_cloner.cpp
+++ b/src/mongo/db/s/resharding/resharding_txn_cloner.cpp
@@ -125,14 +125,14 @@ boost::optional<LogicalSessionId> ReshardingTxnCloner::_fetchProgressLsid(Operat
std::unique_ptr<Pipeline, PipelineDeleter> ReshardingTxnCloner::_targetAggregationRequest(
OperationContext* opCtx, const Pipeline& pipeline) {
- AggregationRequest request(NamespaceString::kSessionTransactionsTableNamespace,
- pipeline.serializeToBson());
+ AggregateCommand request(NamespaceString::kSessionTransactionsTableNamespace,
+ pipeline.serializeToBson());
request.setReadConcern(BSON(repl::ReadConcernArgs::kLevelFieldName
<< repl::readConcernLevels::kMajorityName
<< repl::ReadConcernArgs::kAfterClusterTimeFieldName
<< _fetchTimestamp));
- request.setWriteConcern({});
+ request.setWriteConcern(WriteConcernOptions());
request.setHint(BSON(SessionTxnRecord::kSessionIdFieldName << 1));
request.setUnwrappedReadPref(ReadPreferenceSetting{ReadPreference::Nearest}.toContainingBSON());
diff --git a/src/mongo/db/s/shard_local.cpp b/src/mongo/db/s/shard_local.cpp
index 7cfa1c011d4..1467b1cdd65 100644
--- a/src/mongo/db/s/shard_local.cpp
+++ b/src/mongo/db/s/shard_local.cpp
@@ -212,7 +212,7 @@ void ShardLocal::runFireAndForgetCommand(OperationContext* opCtx,
}
Status ShardLocal::runAggregation(OperationContext* opCtx,
- const AggregationRequest& aggRequest,
+ const AggregateCommand& aggRequest,
std::function<bool(const std::vector<BSONObj>& batch)> callback) {
MONGO_UNREACHABLE;
}
diff --git a/src/mongo/db/s/shard_local.h b/src/mongo/db/s/shard_local.h
index 696bd665c3d..ef50595971c 100644
--- a/src/mongo/db/s/shard_local.h
+++ b/src/mongo/db/s/shard_local.h
@@ -71,7 +71,7 @@ public:
const BSONObj& cmdObj) override;
Status runAggregation(OperationContext* opCtx,
- const AggregationRequest& aggRequest,
+ const AggregateCommand& aggRequest,
std::function<bool(const std::vector<BSONObj>& batch)> callback);
private:
diff --git a/src/mongo/db/views/SConscript b/src/mongo/db/views/SConscript
index a03a9cd2fad..602a5710255 100644
--- a/src/mongo/db/views/SConscript
+++ b/src/mongo/db/views/SConscript
@@ -42,6 +42,7 @@ env.Library(
],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/db/pipeline/aggregation_request_helper',
]
)
diff --git a/src/mongo/db/views/resolved_view.cpp b/src/mongo/db/views/resolved_view.cpp
index 5bb17093d21..bccc3562d06 100644
--- a/src/mongo/db/views/resolved_view.cpp
+++ b/src/mongo/db/views/resolved_view.cpp
@@ -33,7 +33,6 @@
#include "mongo/base/init.h"
#include "mongo/bson/bsonobjbuilder.h"
-#include "mongo/db/pipeline/aggregation_request.h"
#include "mongo/rpc/get_status_from_command_result.h"
namespace mongo {
@@ -87,8 +86,7 @@ std::shared_ptr<const ErrorExtraInfo> ResolvedView::parse(const BSONObj& cmdRepl
return std::make_shared<ResolvedView>(fromBSON(cmdReply));
}
-AggregationRequest ResolvedView::asExpandedViewAggregation(
- const AggregationRequest& request) const {
+AggregateCommand ResolvedView::asExpandedViewAggregation(const AggregateCommand& request) const {
// Perform the aggregation on the resolved namespace. The new pipeline consists of two parts:
// first, 'pipeline' in this ResolvedView; then, the pipeline in 'request'.
std::vector<BSONObj> resolvedPipeline;
@@ -97,7 +95,7 @@ AggregationRequest ResolvedView::asExpandedViewAggregation(
resolvedPipeline.insert(
resolvedPipeline.end(), request.getPipeline().begin(), request.getPipeline().end());
- AggregationRequest expandedRequest{_namespace, resolvedPipeline};
+ AggregateCommand expandedRequest{_namespace, resolvedPipeline};
if (request.getExplain()) {
expandedRequest.setExplain(request.getExplain());
@@ -109,10 +107,10 @@ AggregationRequest ResolvedView::asExpandedViewAggregation(
expandedRequest.setMaxTimeMS(request.getMaxTimeMS());
expandedRequest.setReadConcern(request.getReadConcern());
expandedRequest.setUnwrappedReadPref(request.getUnwrappedReadPref());
- expandedRequest.setBypassDocumentValidation(request.shouldBypassDocumentValidation());
- expandedRequest.setAllowDiskUse(request.shouldAllowDiskUse());
+ expandedRequest.setBypassDocumentValidation(request.getBypassDocumentValidation());
+ expandedRequest.setAllowDiskUse(request.getAllowDiskUse());
expandedRequest.setIsMapReduceCommand(request.getIsMapReduceCommand());
- expandedRequest.setLetParameters(request.getLetParameters());
+ expandedRequest.setLet(request.getLet());
// Operations on a view must always use the default collation of the view. We must have already
// checked that if the user's request specifies a collation, it matches the collation of the
diff --git a/src/mongo/db/views/resolved_view.h b/src/mongo/db/views/resolved_view.h
index 9ab61fe105e..aa931189c2a 100644
--- a/src/mongo/db/views/resolved_view.h
+++ b/src/mongo/db/views/resolved_view.h
@@ -34,7 +34,7 @@
#include "mongo/base/status_with.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/db/namespace_string.h"
-#include "mongo/db/pipeline/aggregation_request.h"
+#include "mongo/db/pipeline/aggregate_command_gen.h"
namespace mongo {
@@ -57,7 +57,7 @@ public:
* Convert an aggregation command on a view to the equivalent command against the view's
* underlying collection.
*/
- AggregationRequest asExpandedViewAggregation(const AggregationRequest& aggRequest) const;
+ AggregateCommand asExpandedViewAggregation(const AggregateCommand& aggRequest) const;
const NamespaceString& getNamespace() const {
return _namespace;
diff --git a/src/mongo/db/views/resolved_view_test.cpp b/src/mongo/db/views/resolved_view_test.cpp
index a3a60d9e102..0a795870986 100644
--- a/src/mongo/db/views/resolved_view_test.cpp
+++ b/src/mongo/db/views/resolved_view_test.cpp
@@ -37,7 +37,7 @@
#include "mongo/bson/json.h"
#include "mongo/db/exec/document_value/document.h"
#include "mongo/db/namespace_string.h"
-#include "mongo/db/pipeline/aggregation_request.h"
+#include "mongo/db/pipeline/aggregation_request_helper.h"
#include "mongo/db/views/resolved_view.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/unittest/unittest.h"
@@ -48,36 +48,38 @@ namespace {
const NamespaceString viewNss("testdb.testview");
const NamespaceString backingNss("testdb.testcoll");
const std::vector<BSONObj> emptyPipeline;
-const BSONObj kDefaultCursorOptionDocument =
- BSON(AggregationRequest::kBatchSizeName << AggregationRequest::kDefaultBatchSize);
+const BSONObj kDefaultCursorOptionDocument = BSON(aggregation_request_helper::kBatchSizeField
+ << aggregation_request_helper::kDefaultBatchSize);
const BSONObj kSimpleCollation;
TEST(ResolvedViewTest, ExpandingAggRequestWithEmptyPipelineOnNoOpViewYieldsEmptyPipeline) {
const ResolvedView resolvedView{backingNss, emptyPipeline, kSimpleCollation};
- AggregationRequest requestOnView{viewNss, emptyPipeline};
+ AggregateCommand requestOnView{viewNss, emptyPipeline};
auto result = resolvedView.asExpandedViewAggregation(requestOnView);
- BSONObj expected = BSON("aggregate" << backingNss.coll() << "pipeline" << BSONArray()
- << "cursor" << kDefaultCursorOptionDocument);
- ASSERT_BSONOBJ_EQ(result.serializeToCommandObj().toBson(), expected);
+ BSONObj expected =
+ BSON("aggregate" << backingNss.coll() << "pipeline" << BSONArray() << "cursor"
+ << kDefaultCursorOptionDocument << "collation" << BSONObj());
+ ASSERT_BSONOBJ_EQ(aggregation_request_helper::serializeToCommandObj(result), expected);
}
TEST(ResolvedViewTest, ExpandingAggRequestWithNonemptyPipelineAppendsToViewPipeline) {
std::vector<BSONObj> viewPipeline{BSON("skip" << 7)};
const ResolvedView resolvedView{backingNss, viewPipeline, kSimpleCollation};
- AggregationRequest requestOnView{viewNss, std::vector<BSONObj>{BSON("limit" << 3)}};
+ AggregateCommand requestOnView{viewNss, std::vector<BSONObj>{BSON("limit" << 3)}};
auto result = resolvedView.asExpandedViewAggregation(requestOnView);
- BSONObj expected = BSON("aggregate" << backingNss.coll() << "pipeline"
- << BSON_ARRAY(BSON("skip" << 7) << BSON("limit" << 3))
- << "cursor" << kDefaultCursorOptionDocument);
- ASSERT_BSONOBJ_EQ(result.serializeToCommandObj().toBson(), expected);
+ BSONObj expected =
+ BSON("aggregate" << backingNss.coll() << "pipeline"
+ << BSON_ARRAY(BSON("skip" << 7) << BSON("limit" << 3)) << "cursor"
+ << kDefaultCursorOptionDocument << "collation" << BSONObj());
+ ASSERT_BSONOBJ_EQ(aggregation_request_helper::serializeToCommandObj(result), expected);
}
TEST(ResolvedViewTest, ExpandingAggRequestPreservesExplain) {
const ResolvedView resolvedView{backingNss, emptyPipeline, kSimpleCollation};
- AggregationRequest aggRequest{viewNss, {}};
+ AggregateCommand aggRequest{viewNss, {}};
aggRequest.setExplain(ExplainOptions::Verbosity::kExecStats);
auto result = resolvedView.asExpandedViewAggregation(aggRequest);
@@ -87,74 +89,74 @@ TEST(ResolvedViewTest, ExpandingAggRequestPreservesExplain) {
TEST(ResolvedViewTest, ExpandingAggRequestWithCursorAndExplainOnlyPreservesExplain) {
const ResolvedView resolvedView{backingNss, emptyPipeline, kSimpleCollation};
- AggregationRequest aggRequest{viewNss, {}};
+ AggregateCommand aggRequest{viewNss, {}};
aggRequest.setBatchSize(10);
aggRequest.setExplain(ExplainOptions::Verbosity::kExecStats);
auto result = resolvedView.asExpandedViewAggregation(aggRequest);
ASSERT(result.getExplain());
ASSERT(*result.getExplain() == ExplainOptions::Verbosity::kExecStats);
- ASSERT_EQ(result.getBatchSize(), AggregationRequest::kDefaultBatchSize);
+ ASSERT_EQ(result.getBatchSize(), aggregation_request_helper::kDefaultBatchSize);
}
TEST(ResolvedViewTest, ExpandingAggRequestPreservesBypassDocumentValidation) {
const ResolvedView resolvedView{backingNss, emptyPipeline, kSimpleCollation};
- AggregationRequest aggRequest(viewNss, {});
+ AggregateCommand aggRequest(viewNss, {});
aggRequest.setBypassDocumentValidation(true);
auto result = resolvedView.asExpandedViewAggregation(aggRequest);
- ASSERT_TRUE(result.shouldBypassDocumentValidation());
+ ASSERT_TRUE(result.getBypassDocumentValidation().value_or(false));
}
TEST(ResolvedViewTest, ExpandingAggRequestPreservesAllowDiskUse) {
const ResolvedView resolvedView{backingNss, emptyPipeline, kSimpleCollation};
- AggregationRequest aggRequest(viewNss, {});
+ AggregateCommand aggRequest(viewNss, {});
aggRequest.setAllowDiskUse(true);
auto result = resolvedView.asExpandedViewAggregation(aggRequest);
- ASSERT_TRUE(result.shouldAllowDiskUse());
+ ASSERT_TRUE(result.getAllowDiskUse());
}
TEST(ResolvedViewTest, ExpandingAggRequestPreservesHint) {
const ResolvedView resolvedView{backingNss, emptyPipeline, kSimpleCollation};
- AggregationRequest aggRequest(viewNss, {});
+ AggregateCommand aggRequest(viewNss, {});
aggRequest.setHint(BSON("a" << 1));
auto result = resolvedView.asExpandedViewAggregation(aggRequest);
- ASSERT_BSONOBJ_EQ(result.getHint(), BSON("a" << 1));
+ ASSERT_BSONOBJ_EQ(result.getHint().value_or(BSONObj()), BSON("a" << 1));
}
TEST(ResolvedViewTest, ExpandingAggRequestPreservesReadPreference) {
const ResolvedView resolvedView{backingNss, emptyPipeline, kSimpleCollation};
- AggregationRequest aggRequest(viewNss, {});
+ AggregateCommand aggRequest(viewNss, {});
aggRequest.setUnwrappedReadPref(BSON("$readPreference"
<< "nearest"));
auto result = resolvedView.asExpandedViewAggregation(aggRequest);
- ASSERT_BSONOBJ_EQ(result.getUnwrappedReadPref(),
+ ASSERT_BSONOBJ_EQ(result.getUnwrappedReadPref().value_or(BSONObj()),
BSON("$readPreference"
<< "nearest"));
}
TEST(ResolvedViewTest, ExpandingAggRequestPreservesReadConcern) {
const ResolvedView resolvedView{backingNss, emptyPipeline, kSimpleCollation};
- AggregationRequest aggRequest(viewNss, {});
+ AggregateCommand aggRequest(viewNss, {});
aggRequest.setReadConcern(BSON("level"
<< "linearizable"));
auto result = resolvedView.asExpandedViewAggregation(aggRequest);
- ASSERT_BSONOBJ_EQ(result.getReadConcern(),
+ ASSERT_BSONOBJ_EQ(result.getReadConcern().value_or(BSONObj()),
BSON("level"
<< "linearizable"));
}
TEST(ResolvedViewTest, ExpandingAggRequestPreservesMaxTimeMS) {
const ResolvedView resolvedView{backingNss, emptyPipeline, kSimpleCollation};
- AggregationRequest aggRequest(viewNss, {});
+ AggregateCommand aggRequest(viewNss, {});
aggRequest.setMaxTimeMS(100u);
auto result = resolvedView.asExpandedViewAggregation(aggRequest);
- ASSERT_EQ(result.getMaxTimeMS(), 100u);
+ ASSERT_EQ(result.getMaxTimeMS().value_or(0), 100u);
}
TEST(ResolvedViewTest, ExpandingAggRequestPreservesDefaultCollationOfView) {
@@ -165,10 +167,10 @@ TEST(ResolvedViewTest, ExpandingAggRequestPreservesDefaultCollationOfView) {
ASSERT_BSONOBJ_EQ(resolvedView.getDefaultCollation(),
BSON("locale"
<< "fr_CA"));
- AggregationRequest aggRequest(viewNss, {});
+ AggregateCommand aggRequest(viewNss, {});
auto result = resolvedView.asExpandedViewAggregation(aggRequest);
- ASSERT_BSONOBJ_EQ(result.getCollation(),
+ ASSERT_BSONOBJ_EQ(result.getCollation().value_or(BSONObj()),
BSON("locale"
<< "fr_CA"));
}
diff --git a/src/mongo/db/views/view_catalog.cpp b/src/mongo/db/views/view_catalog.cpp
index d90c180bb97..359799159eb 100644
--- a/src/mongo/db/views/view_catalog.cpp
+++ b/src/mongo/db/views/view_catalog.cpp
@@ -43,7 +43,7 @@
#include "mongo/db/catalog/database.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
-#include "mongo/db/pipeline/aggregation_request.h"
+#include "mongo/db/pipeline/aggregate_command_gen.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/lite_parsed_pipeline.h"
@@ -404,7 +404,7 @@ StatusWith<stdx::unordered_set<NamespaceString>> ViewCatalog::_validatePipeline(
}
boost::intrusive_ptr<ExpressionContext> expCtx =
new ExpressionContext(opCtx,
- AggregationRequest(viewDef.viewOn(), viewDef.pipeline()),
+ AggregateCommand(viewDef.viewOn(), viewDef.pipeline()),
CollatorInterface::cloneCollator(viewDef.defaultCollator()),
// We can use a stub MongoProcessInterface because we are only parsing
// the Pipeline for validation here. We won't do anything with the