summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRuoxin Xu <ruoxin.xu@mongodb.com>2021-01-13 23:19:33 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-01-20 13:29:03 +0000
commit9686cd9358b744877d43a2d7b71fdb91e40d3de9 (patch)
tree1277e37a62f3994335c537b9594049e07809440d
parent191aa3556e3fecd6cec1336ec346911109dc5223 (diff)
downloadmongo-9686cd9358b744877d43a2d7b71fdb91e40d3de9.tar.gz
SERVER-53311 Add a 'struct' type to represent 'cursor' object in aggregate command input IDL
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp3
-rw-r--r--src/mongo/db/pipeline/aggregate_command.idl18
-rw-r--r--src/mongo/db/pipeline/aggregation_request_helper.cpp66
-rw-r--r--src/mongo/db/pipeline/aggregation_request_helper.h11
-rw-r--r--src/mongo/db/pipeline/aggregation_request_test.cpp70
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp6
-rw-r--r--src/mongo/db/query/count_command_test.cpp3
-rw-r--r--src/mongo/db/query/parsed_distinct_test.cpp16
-rw-r--r--src/mongo/db/query/query_request_test.cpp20
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp4
-rw-r--r--src/mongo/db/views/resolved_view.cpp2
-rw-r--r--src/mongo/db/views/resolved_view_test.cpp8
-rw-r--r--src/mongo/s/commands/cluster_map_reduce_agg.cpp2
-rw-r--r--src/mongo/s/query/cluster_aggregate.cpp6
14 files changed, 135 insertions, 100 deletions
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index b2bef2c6024..28d6abd0c63 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -124,7 +124,8 @@ bool handleCursorCommand(OperationContext* opCtx,
const BSONObj& cmdObj,
rpc::ReplyBuilderInterface* result) {
invariant(!cursors.empty());
- long long batchSize = request.getBatchSize();
+ long long batchSize =
+ request.getCursor().getBatchSize().value_or(aggregation_request_helper::kDefaultBatchSize);
if (cursors.size() > 1) {
diff --git a/src/mongo/db/pipeline/aggregate_command.idl b/src/mongo/db/pipeline/aggregate_command.idl
index a242014f1b9..926ceb42792 100644
--- a/src/mongo/db/pipeline/aggregate_command.idl
+++ b/src/mongo/db/pipeline/aggregate_command.idl
@@ -46,19 +46,19 @@ types:
description: "An array of objects specifying the aggregation pipeline."
cpp_type: "std::vector<mongo::BSONObj>"
deserializer: ::mongo::parsePipelineFromBSON
- batchSize:
+ aggregateCursor:
bson_serialization_type: any
- description: "An int representing the cursor batch size."
- cpp_type: "std::int64_t"
- serializer: ::mongo::serializeBatchSizeToBSON
- deserializer: ::mongo::parseBatchSizeFromBSON
+ description: "A cursor type representing the cursor field of aggregate command."
+ cpp_type: SimpleCursorOptions
+ serializer: ::mongo::serializeAggregateCursorToBSON
+ deserializer: ::mongo::parseAggregateCursorFromBSON
explainVerbosity:
bson_serialization_type: any
description: "The Verbosity value representing explain verbosity."
cpp_type: "mongo::ExplainOptions::Verbosity"
serializer: ::mongo::serializeExplainToBSON
deserializer: ::mongo::parseExplainModeFromBSON
-
+
commands:
aggregate:
description: "Represents the user-supplied options to the aggregate command."
@@ -80,10 +80,8 @@ commands:
type: optionalBool
cursor:
description: "To indicate a cursor with a non-default batch size."
- cpp_name: "batchSize"
- type: batchSize
- default: 101
- validator: { gte: 0 }
+ type: aggregateCursor
+ default: SimpleCursorOptions()
maxTimeMS:
description: "Specifies a time limit in milliseconds for processing operations on a cursor. If you do not specify a value for maxTimeMS, operations will not time out."
type: safeInt64
diff --git a/src/mongo/db/pipeline/aggregation_request_helper.cpp b/src/mongo/db/pipeline/aggregation_request_helper.cpp
index 80300abb773..0a51de0b871 100644
--- a/src/mongo/db/pipeline/aggregation_request_helper.cpp
+++ b/src/mongo/db/pipeline/aggregation_request_helper.cpp
@@ -132,7 +132,7 @@ Document serializeToCommandDoc(const AggregateCommand& request) {
Status validate(const BSONObj& cmdObj,
boost::optional<ExplainOptions::Verbosity> explainVerbosity) {
bool hasAllowDiskUseElem = cmdObj.hasField(AggregateCommand::kAllowDiskUseFieldName);
- bool hasCursorElem = cmdObj.hasField(AggregateCommand::kBatchSizeFieldName);
+ bool hasCursorElem = cmdObj.hasField(AggregateCommand::kCursorFieldName);
bool hasExplainElem = cmdObj.hasField(AggregateCommand::kExplainFieldName);
bool hasExplain =
explainVerbosity || (hasExplainElem && cmdObj[AggregateCommand::kExplainFieldName].Bool());
@@ -144,7 +144,7 @@ Status validate(const BSONObj& cmdObj,
if (!hasCursorElem && !hasExplainElem) {
return {ErrorCodes::FailedToParse,
str::stream()
- << "The '" << AggregateCommand::kBatchSizeFieldName
+ << "The '" << AggregateCommand::kCursorFieldName
<< "' option is required, except for aggregate with the explain argument"};
}
@@ -172,36 +172,6 @@ Status validate(const BSONObj& cmdObj,
// Custom serializers/deserializers for AggregateCommand.
-long long parseBatchSizeFromBSON(const BSONElement& cursorElem) {
- long long batchSize = 101;
-
- if (cursorElem.eoo()) {
- return batchSize;
- }
-
- uassert(ErrorCodes::TypeMismatch,
- "cursor field must be missing or an object",
- cursorElem.type() == mongo::Object);
-
- BSONObj cursor = cursorElem.embeddedObject();
- BSONElement batchSizeElem = cursor[aggregation_request_helper::kBatchSizeField];
-
- const int expectedNumberOfCursorFields = batchSizeElem.eoo() ? 0 : 1;
- uassert(ErrorCodes::BadValue,
- "cursor object can't contain fields other than batchSize",
- cursor.nFields() == expectedNumberOfCursorFields);
-
- if (batchSizeElem.eoo()) {
- return batchSize;
- }
-
- uassert(
- ErrorCodes::TypeMismatch, "cursor.batchSize must be a number", batchSizeElem.isNumber());
- batchSize = batchSizeElem.numberLong();
-
- return batchSize;
-}
-
boost::optional<mongo::ExplainOptions::Verbosity> parseExplainModeFromBSON(
const BSONElement& explainElem) {
uassert(ErrorCodes::TypeMismatch,
@@ -220,16 +190,38 @@ void serializeExplainToBSON(const mongo::ExplainOptions::Verbosity& explain,
BSONObjBuilder* builder) {
// Note that we do not serialize 'explain' field to the command object. This serializer only
// serializes an empty cursor object for field 'cursor' when it is an explain command.
- builder->append(AggregateCommand::kBatchSizeFieldName, BSONObj());
+ builder->append(AggregateCommand::kCursorFieldName, BSONObj());
return;
}
-void serializeBatchSizeToBSON(const std::int64_t& batchSize,
- StringData fieldName,
- BSONObjBuilder* builder) {
+mongo::SimpleCursorOptions parseAggregateCursorFromBSON(const BSONElement& cursorElem) {
+ if (cursorElem.eoo()) {
+ SimpleCursorOptions cursor;
+ cursor.setBatchSize(aggregation_request_helper::kDefaultBatchSize);
+ return cursor;
+ }
+
+ uassert(ErrorCodes::TypeMismatch,
+ "cursor field must be missing or an object",
+ cursorElem.type() == mongo::Object);
+
+ SimpleCursorOptions cursor = SimpleCursorOptions::parse(
+ IDLParserErrorContext(AggregateCommand::kCursorFieldName), cursorElem.embeddedObject());
+ if (!cursor.getBatchSize())
+ cursor.setBatchSize(aggregation_request_helper::kDefaultBatchSize);
+
+ return cursor;
+}
+
+void serializeAggregateCursorToBSON(const mongo::SimpleCursorOptions& cursor,
+ StringData fieldName,
+ BSONObjBuilder* builder) {
if (!builder->hasField(fieldName)) {
- builder->append(fieldName, BSON(aggregation_request_helper::kBatchSizeField << batchSize));
+ builder->append(
+ fieldName,
+ BSON(aggregation_request_helper::kBatchSizeField
+ << cursor.getBatchSize().value_or(aggregation_request_helper::kDefaultBatchSize)));
}
return;
diff --git a/src/mongo/db/pipeline/aggregation_request_helper.h b/src/mongo/db/pipeline/aggregation_request_helper.h
index d3e61857a4b..7c04dfea823 100644
--- a/src/mongo/db/pipeline/aggregation_request_helper.h
+++ b/src/mongo/db/pipeline/aggregation_request_helper.h
@@ -40,6 +40,7 @@
#include "mongo/db/pipeline/legacy_runtime_constants_gen.h"
#include "mongo/db/query/explain_options.h"
#include "mongo/db/write_concern_options.h"
+#include "mongo/idl/basic_types_gen.h"
namespace mongo {
@@ -109,8 +110,6 @@ Status validate(const BSONObj& cmdObj, boost::optional<ExplainOptions::Verbosity
* Custom serializers/deserializers for AggregateCommand.
*/
-long long parseBatchSizeFromBSON(const BSONElement& cursorElem);
-
boost::optional<mongo::ExplainOptions::Verbosity> parseExplainModeFromBSON(
const BSONElement& explainElem);
@@ -118,9 +117,11 @@ void serializeExplainToBSON(const mongo::ExplainOptions::Verbosity& explain,
StringData fieldName,
BSONObjBuilder* builder);
-void serializeBatchSizeToBSON(const std::int64_t& batchSize,
- StringData fieldName,
- BSONObjBuilder* builder);
+mongo::SimpleCursorOptions parseAggregateCursorFromBSON(const BSONElement& cursorElem);
+
+void serializeAggregateCursorToBSON(const SimpleCursorOptions& cursor,
+ StringData fieldName,
+ BSONObjBuilder* builder);
/**
* Parse an aggregation pipeline definition from 'pipelineElem'.
diff --git a/src/mongo/db/pipeline/aggregation_request_test.cpp b/src/mongo/db/pipeline/aggregation_request_test.cpp
index 4492375d67b..95e0e9316f6 100644
--- a/src/mongo/db/pipeline/aggregation_request_test.cpp
+++ b/src/mongo/db/pipeline/aggregation_request_test.cpp
@@ -79,7 +79,9 @@ TEST(AggregationRequestTest, ShouldParseAllKnownOptions) {
ASSERT_TRUE(request.getNeedsMerge());
ASSERT_TRUE(request.getBypassDocumentValidation().value_or(false));
ASSERT_TRUE(request.getRequestReshardingResumeToken());
- ASSERT_EQ(request.getBatchSize(), 10);
+ ASSERT_EQ(
+ request.getCursor().getBatchSize().value_or(aggregation_request_helper::kDefaultBatchSize),
+ 10);
ASSERT_BSONOBJ_EQ(request.getHint().value_or(BSONObj()), BSON("a" << 1));
ASSERT_BSONOBJ_EQ(request.getCollation().value_or(BSONObj()),
BSON("locale"
@@ -121,7 +123,9 @@ TEST(AggregationRequestTest, ShouldParseExplicitExplainFalseWithCursorOption) {
"'a'}");
auto request = unittest::assertGet(aggregation_request_helper::parseFromBSON(nss, inputBson));
ASSERT_FALSE(request.getExplain());
- ASSERT_EQ(request.getBatchSize(), 10);
+ ASSERT_EQ(
+ request.getCursor().getBatchSize().value_or(aggregation_request_helper::kDefaultBatchSize),
+ 10);
}
TEST(AggregationRequestTest, ShouldParseWithSeparateQueryPlannerExplainModeArg) {
@@ -142,7 +146,9 @@ TEST(AggregationRequestTest, ShouldParseWithSeparateQueryPlannerExplainModeArgAn
nss, inputBson, ExplainOptions::Verbosity::kExecStats));
ASSERT_TRUE(request.getExplain());
ASSERT(*request.getExplain() == ExplainOptions::Verbosity::kExecStats);
- ASSERT_EQ(request.getBatchSize(), 10);
+ ASSERT_EQ(
+ request.getCursor().getBatchSize().value_or(aggregation_request_helper::kDefaultBatchSize),
+ 10);
}
TEST(AggregationRequestTest, ShouldParseExplainFlagWithReadConcern) {
@@ -170,7 +176,7 @@ TEST(AggregationRequestTest, ShouldOnlySerializeRequiredFieldsIfNoOptionalFields
auto expectedSerialization =
Document{{AggregateCommand::kCommandName, nss.coll()},
{AggregateCommand::kPipelineFieldName, std::vector<Value>{}},
- {AggregateCommand::kBatchSizeFieldName, Value(kDefaultCursorOptionDocument)}};
+ {AggregateCommand::kCursorFieldName, Value(kDefaultCursorOptionDocument)}};
ASSERT_DOCUMENT_EQ(aggregation_request_helper::serializeToCommandDoc(request),
expectedSerialization);
}
@@ -183,7 +189,9 @@ TEST(AggregationRequestTest, ShouldSerializeOptionalValuesIfSet) {
request.setNeedsMerge(true);
request.setBypassDocumentValidation(true);
request.setRequestReshardingResumeToken(true);
- request.setBatchSize(10);
+ SimpleCursorOptions cursor;
+ cursor.setBatchSize(10);
+ request.setCursor(cursor);
request.setMaxTimeMS(10u);
const auto hintObj = BSON("a" << 1);
request.setHint(hintObj);
@@ -203,23 +211,23 @@ TEST(AggregationRequestTest, ShouldSerializeOptionalValuesIfSet) {
auto uuid = UUID::gen();
request.setCollectionUUID(uuid);
- auto expectedSerialization = Document{
- {AggregateCommand::kCommandName, nss.coll()},
- {AggregateCommand::kPipelineFieldName, std::vector<Value>{}},
- {AggregateCommand::kAllowDiskUseFieldName, true},
- {AggregateCommand::kBatchSizeFieldName, Value(Document({{kBatchSizeFieldName, 10}}))},
- {QueryRequest::cmdOptionMaxTimeMS, 10},
- {AggregateCommand::kBypassDocumentValidationFieldName, true},
- {repl::ReadConcernArgs::kReadConcernFieldName, readConcernObj},
- {AggregateCommand::kCollationFieldName, collationObj},
- {AggregateCommand::kHintFieldName, hintObj},
- {AggregateCommand::kLetFieldName, letParamsObj},
- {AggregateCommand::kNeedsMergeFieldName, true},
- {AggregateCommand::kFromMongosFieldName, true},
- {QueryRequest::kUnwrappedReadPrefField, readPrefObj},
- {AggregateCommand::kRequestReshardingResumeTokenFieldName, true},
- {AggregateCommand::kIsMapReduceCommandFieldName, true},
- {AggregateCommand::kCollectionUUIDFieldName, uuid}};
+ auto expectedSerialization =
+ Document{{AggregateCommand::kCommandName, nss.coll()},
+ {AggregateCommand::kPipelineFieldName, std::vector<Value>{}},
+ {AggregateCommand::kAllowDiskUseFieldName, true},
+ {AggregateCommand::kCursorFieldName, Value(Document({{kBatchSizeFieldName, 10}}))},
+ {QueryRequest::cmdOptionMaxTimeMS, 10},
+ {AggregateCommand::kBypassDocumentValidationFieldName, true},
+ {repl::ReadConcernArgs::kReadConcernFieldName, readConcernObj},
+ {AggregateCommand::kCollationFieldName, collationObj},
+ {AggregateCommand::kHintFieldName, hintObj},
+ {AggregateCommand::kLetFieldName, letParamsObj},
+ {AggregateCommand::kNeedsMergeFieldName, true},
+ {AggregateCommand::kFromMongosFieldName, true},
+ {QueryRequest::kUnwrappedReadPrefField, readPrefObj},
+ {AggregateCommand::kRequestReshardingResumeTokenFieldName, true},
+ {AggregateCommand::kIsMapReduceCommandFieldName, true},
+ {AggregateCommand::kCollectionUUIDFieldName, uuid}};
ASSERT_DOCUMENT_EQ(aggregation_request_helper::serializeToCommandDoc(request),
expectedSerialization);
}
@@ -227,12 +235,14 @@ TEST(AggregationRequestTest, ShouldSerializeOptionalValuesIfSet) {
TEST(AggregationRequestTest, ShouldSerializeBatchSizeIfSetAndExplainFalse) {
NamespaceString nss("a.collection");
AggregateCommand request(nss, {});
- request.setBatchSize(10);
+ SimpleCursorOptions cursor;
+ cursor.setBatchSize(10);
+ request.setCursor(cursor);
auto expectedSerialization = Document{
{AggregateCommand::kCommandName, nss.coll()},
{AggregateCommand::kPipelineFieldName, std::vector<Value>{}},
- {AggregateCommand::kBatchSizeFieldName, Value(Document({{kBatchSizeFieldName, 10}}))}};
+ {AggregateCommand::kCursorFieldName, Value(Document({{kBatchSizeFieldName, 10}}))}};
ASSERT_DOCUMENT_EQ(aggregation_request_helper::serializeToCommandDoc(request),
expectedSerialization);
}
@@ -244,7 +254,7 @@ TEST(AggregationRequestTest, ShouldSerialiseAggregateFieldToOneIfCollectionIsAgg
auto expectedSerialization =
Document{{AggregateCommand::kCommandName, 1},
{AggregateCommand::kPipelineFieldName, std::vector<Value>{}},
- {AggregateCommand::kBatchSizeFieldName,
+ {AggregateCommand::kCursorFieldName,
Value(Document({{aggregation_request_helper::kBatchSizeField,
aggregation_request_helper::kDefaultBatchSize}}))}};
@@ -258,7 +268,9 @@ TEST(AggregationRequestTest, ShouldSetBatchSizeToDefaultOnEmptyCursorObject) {
"{aggregate: 'collection', pipeline: [{$match: {a: 'abc'}}], cursor: {}, $db: 'a'}");
auto request = aggregation_request_helper::parseFromBSON(nss, inputBson);
ASSERT_OK(request.getStatus());
- ASSERT_EQ(request.getValue().getBatchSize(), aggregation_request_helper::kDefaultBatchSize);
+ ASSERT_EQ(request.getValue().getCursor().getBatchSize().value_or(
+ aggregation_request_helper::kDefaultBatchSize),
+ aggregation_request_helper::kDefaultBatchSize);
}
TEST(AggregationRequestTest, ShouldAcceptHintAsString) {
@@ -276,13 +288,15 @@ TEST(AggregationRequestTest, ShouldAcceptHintAsString) {
TEST(AggregationRequestTest, ShouldNotSerializeBatchSizeWhenExplainSet) {
NamespaceString nss("a.collection");
AggregateCommand request(nss, {});
- request.setBatchSize(10);
+ SimpleCursorOptions cursor;
+ cursor.setBatchSize(10);
+ request.setCursor(cursor);
request.setExplain(ExplainOptions::Verbosity::kQueryPlanner);
auto expectedSerialization =
Document{{AggregateCommand::kCommandName, nss.coll()},
{AggregateCommand::kPipelineFieldName, std::vector<Value>{}},
- {AggregateCommand::kBatchSizeFieldName, Value(Document())}};
+ {AggregateCommand::kCursorFieldName, Value(Document())}};
ASSERT_DOCUMENT_EQ(aggregation_request_helper::serializeToCommandDoc(request),
expectedSerialization);
}
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
index 8a0fbc10998..ec72dceff96 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -109,7 +109,9 @@ RemoteCursor openChangeStreamNewShardMonitor(const boost::intrusive_ptr<Expressi
<< DocumentSourceChangeStreamSpec::kAllowToRunOnConfigDBFieldName << true))});
aggReq.setFromMongos(true);
aggReq.setNeedsMerge(true);
- aggReq.setBatchSize(0);
+ SimpleCursorOptions cursor;
+ cursor.setBatchSize(0);
+ aggReq.setCursor(cursor);
auto cmdObjWithRWC =
applyReadWriteConcern(expCtx->opCtx,
true, /* appendRC */
@@ -827,7 +829,7 @@ BSONObj createCommandForTargetedShards(const boost::intrusive_ptr<ExpressionCont
}
}
- targetedCmd[AggregateCommand::kBatchSizeFieldName] =
+ targetedCmd[AggregateCommand::kCursorFieldName] =
Value(DOC(aggregation_request_helper::kBatchSizeField << 0));
targetedCmd[AggregateCommand::kExchangeFieldName] =
diff --git a/src/mongo/db/query/count_command_test.cpp b/src/mongo/db/query/count_command_test.cpp
index 67b1cc64591..65264d2a6e5 100644
--- a/src/mongo/db/query/count_command_test.cpp
+++ b/src/mongo/db/query/count_command_test.cpp
@@ -187,7 +187,8 @@ TEST(CountCommandTest, ConvertToAggregationWithQueryAndFilterAndLimit) {
auto cmdObj = OpMsgRequest::fromDBAndBody(testns.db(), agg).body;
auto ar = uassertStatusOK(aggregation_request_helper::parseFromBSON(testns, cmdObj));
- ASSERT_EQ(ar.getBatchSize(), aggregation_request_helper::kDefaultBatchSize);
+ ASSERT_EQ(ar.getCursor().getBatchSize().value_or(aggregation_request_helper::kDefaultBatchSize),
+ aggregation_request_helper::kDefaultBatchSize);
ASSERT_EQ(ar.getNamespace(), testns);
ASSERT_BSONOBJ_EQ(ar.getCollation().value_or(BSONObj()), BSONObj());
diff --git a/src/mongo/db/query/parsed_distinct_test.cpp b/src/mongo/db/query/parsed_distinct_test.cpp
index 17706e9e87a..54bf37496d1 100644
--- a/src/mongo/db/query/parsed_distinct_test.cpp
+++ b/src/mongo/db/query/parsed_distinct_test.cpp
@@ -63,7 +63,9 @@ TEST(ParsedDistinctTest, ConvertToAggregationNoQuery) {
auto ar = aggregation_request_helper::parseFromBSON(testns, cmdObj);
ASSERT_OK(ar.getStatus());
ASSERT(!ar.getValue().getExplain());
- ASSERT_EQ(ar.getValue().getBatchSize(), aggregation_request_helper::kDefaultBatchSize);
+ ASSERT_EQ(ar.getValue().getCursor().getBatchSize().value_or(
+ aggregation_request_helper::kDefaultBatchSize),
+ aggregation_request_helper::kDefaultBatchSize);
ASSERT_EQ(ar.getValue().getNamespace(), testns);
ASSERT_BSONOBJ_EQ(ar.getValue().getCollation().value_or(BSONObj()), BSONObj());
ASSERT(ar.getValue().getReadConcern().value_or(BSONObj()).isEmpty());
@@ -103,7 +105,9 @@ TEST(ParsedDistinctTest, ConvertToAggregationDottedPathNoQuery) {
auto ar = aggregation_request_helper::parseFromBSON(testns, cmdObj);
ASSERT_OK(ar.getStatus());
ASSERT(!ar.getValue().getExplain());
- ASSERT_EQ(ar.getValue().getBatchSize(), aggregation_request_helper::kDefaultBatchSize);
+ ASSERT_EQ(ar.getValue().getCursor().getBatchSize().value_or(
+ aggregation_request_helper::kDefaultBatchSize),
+ aggregation_request_helper::kDefaultBatchSize);
ASSERT_EQ(ar.getValue().getNamespace(), testns);
ASSERT_BSONOBJ_EQ(ar.getValue().getCollation().value_or(BSONObj()), BSONObj());
ASSERT(ar.getValue().getReadConcern().value_or(BSONObj()).isEmpty());
@@ -168,7 +172,9 @@ TEST(ParsedDistinctTest, ConvertToAggregationWithAllOptions) {
auto ar = aggregation_request_helper::parseFromBSON(testns, cmdObj);
ASSERT_OK(ar.getStatus());
ASSERT(!ar.getValue().getExplain());
- ASSERT_EQ(ar.getValue().getBatchSize(), aggregation_request_helper::kDefaultBatchSize);
+ ASSERT_EQ(ar.getValue().getCursor().getBatchSize().value_or(
+ aggregation_request_helper::kDefaultBatchSize),
+ aggregation_request_helper::kDefaultBatchSize);
ASSERT_EQ(ar.getValue().getNamespace(), testns);
ASSERT_BSONOBJ_EQ(ar.getValue().getCollation().value_or(BSONObj()),
BSON("locale"
@@ -215,7 +221,9 @@ TEST(ParsedDistinctTest, ConvertToAggregationWithQuery) {
auto ar = aggregation_request_helper::parseFromBSON(testns, cmdObj);
ASSERT_OK(ar.getStatus());
ASSERT(!ar.getValue().getExplain());
- ASSERT_EQ(ar.getValue().getBatchSize(), aggregation_request_helper::kDefaultBatchSize);
+ ASSERT_EQ(ar.getValue().getCursor().getBatchSize().value_or(
+ aggregation_request_helper::kDefaultBatchSize),
+ aggregation_request_helper::kDefaultBatchSize);
ASSERT_EQ(ar.getValue().getNamespace(), testns);
ASSERT_BSONOBJ_EQ(ar.getValue().getCollation().value_or(BSONObj()), BSONObj());
ASSERT(ar.getValue().getReadConcern().value_or(BSONObj()).isEmpty());
diff --git a/src/mongo/db/query/query_request_test.cpp b/src/mongo/db/query/query_request_test.cpp
index 4a042669352..26d014a996b 100644
--- a/src/mongo/db/query/query_request_test.cpp
+++ b/src/mongo/db/query/query_request_test.cpp
@@ -1280,7 +1280,9 @@ TEST(QueryRequestTest, ConvertToAggregationSucceeds) {
ASSERT_OK(ar.getStatus());
ASSERT(!ar.getValue().getExplain());
ASSERT(ar.getValue().getPipeline().empty());
- ASSERT_EQ(ar.getValue().getBatchSize(), aggregation_request_helper::kDefaultBatchSize);
+ ASSERT_EQ(ar.getValue().getCursor().getBatchSize().value_or(
+ aggregation_request_helper::kDefaultBatchSize),
+ aggregation_request_helper::kDefaultBatchSize);
ASSERT_EQ(ar.getValue().getNamespace(), testns);
ASSERT_BSONOBJ_EQ(ar.getValue().getCollation().value_or(BSONObj()), BSONObj());
}
@@ -1414,7 +1416,9 @@ TEST(QueryRequestTest, ConvertToAggregationWithPipeline) {
auto ar = aggregation_request_helper::parseFromBSON(testns, aggCmd);
ASSERT_OK(ar.getStatus());
ASSERT(!ar.getValue().getExplain());
- ASSERT_EQ(ar.getValue().getBatchSize(), aggregation_request_helper::kDefaultBatchSize);
+ ASSERT_EQ(ar.getValue().getCursor().getBatchSize().value_or(
+ aggregation_request_helper::kDefaultBatchSize),
+ aggregation_request_helper::kDefaultBatchSize);
ASSERT_EQ(ar.getValue().getNamespace(), testns);
ASSERT_BSONOBJ_EQ(ar.getValue().getCollation().value_or(BSONObj()), BSONObj());
@@ -1441,7 +1445,9 @@ TEST(QueryRequestTest, ConvertToAggregationWithBatchSize) {
ASSERT_OK(ar.getStatus());
ASSERT(!ar.getValue().getExplain());
ASSERT_EQ(ar.getValue().getNamespace(), testns);
- ASSERT_EQ(ar.getValue().getBatchSize(), 4LL);
+ ASSERT_EQ(ar.getValue().getCursor().getBatchSize().value_or(
+ aggregation_request_helper::kDefaultBatchSize),
+ 4LL);
ASSERT_BSONOBJ_EQ(ar.getValue().getCollation().value_or(BSONObj()), BSONObj());
}
@@ -1459,7 +1465,9 @@ TEST(QueryRequestTest, ConvertToAggregationWithMaxTimeMS) {
auto ar = aggregation_request_helper::parseFromBSON(testns, aggCmd);
ASSERT_OK(ar.getStatus());
ASSERT(!ar.getValue().getExplain());
- ASSERT_EQ(ar.getValue().getBatchSize(), aggregation_request_helper::kDefaultBatchSize);
+ ASSERT_EQ(ar.getValue().getCursor().getBatchSize().value_or(
+ aggregation_request_helper::kDefaultBatchSize),
+ aggregation_request_helper::kDefaultBatchSize);
ASSERT_EQ(ar.getValue().getNamespace(), testns);
ASSERT_BSONOBJ_EQ(ar.getValue().getCollation().value_or(BSONObj()), BSONObj());
}
@@ -1475,7 +1483,9 @@ TEST(QueryRequestTest, ConvertToAggregationWithCollationSucceeds) {
ASSERT_OK(ar.getStatus());
ASSERT(!ar.getValue().getExplain());
ASSERT(ar.getValue().getPipeline().empty());
- ASSERT_EQ(ar.getValue().getBatchSize(), aggregation_request_helper::kDefaultBatchSize);
+ ASSERT_EQ(ar.getValue().getCursor().getBatchSize().value_or(
+ aggregation_request_helper::kDefaultBatchSize),
+ aggregation_request_helper::kDefaultBatchSize);
ASSERT_EQ(ar.getValue().getNamespace(), testns);
ASSERT_BSONOBJ_EQ(ar.getValue().getCollation().value_or(BSONObj()), BSON("f" << 1));
}
diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp
index a2713b2c7d3..30b057612b1 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp
@@ -266,7 +266,9 @@ AggregateCommand ReshardingOplogFetcher::_makeAggregateCommand(Client* client) {
aggRequest.setRequestReshardingResumeToken(true);
if (_initialBatchSize) {
- aggRequest.setBatchSize(_initialBatchSize);
+ SimpleCursorOptions cursor;
+ cursor.setBatchSize(_initialBatchSize);
+ aggRequest.setCursor(cursor);
}
return aggRequest;
diff --git a/src/mongo/db/views/resolved_view.cpp b/src/mongo/db/views/resolved_view.cpp
index bccc3562d06..d74f0bacbeb 100644
--- a/src/mongo/db/views/resolved_view.cpp
+++ b/src/mongo/db/views/resolved_view.cpp
@@ -100,7 +100,7 @@ AggregateCommand ResolvedView::asExpandedViewAggregation(const AggregateCommand&
if (request.getExplain()) {
expandedRequest.setExplain(request.getExplain());
} else {
- expandedRequest.setBatchSize(request.getBatchSize());
+ expandedRequest.setCursor(request.getCursor());
}
expandedRequest.setHint(request.getHint());
diff --git a/src/mongo/db/views/resolved_view_test.cpp b/src/mongo/db/views/resolved_view_test.cpp
index 0a795870986..95797376f28 100644
--- a/src/mongo/db/views/resolved_view_test.cpp
+++ b/src/mongo/db/views/resolved_view_test.cpp
@@ -90,13 +90,17 @@ TEST(ResolvedViewTest, ExpandingAggRequestPreservesExplain) {
TEST(ResolvedViewTest, ExpandingAggRequestWithCursorAndExplainOnlyPreservesExplain) {
const ResolvedView resolvedView{backingNss, emptyPipeline, kSimpleCollation};
AggregateCommand aggRequest{viewNss, {}};
- aggRequest.setBatchSize(10);
+ SimpleCursorOptions cursor;
+ cursor.setBatchSize(10);
+ aggRequest.setCursor(cursor);
aggRequest.setExplain(ExplainOptions::Verbosity::kExecStats);
auto result = resolvedView.asExpandedViewAggregation(aggRequest);
ASSERT(result.getExplain());
ASSERT(*result.getExplain() == ExplainOptions::Verbosity::kExecStats);
- ASSERT_EQ(result.getBatchSize(), aggregation_request_helper::kDefaultBatchSize);
+ ASSERT_EQ(
+ result.getCursor().getBatchSize().value_or(aggregation_request_helper::kDefaultBatchSize),
+ aggregation_request_helper::kDefaultBatchSize);
}
TEST(ResolvedViewTest, ExpandingAggRequestPreservesBypassDocumentValidation) {
diff --git a/src/mongo/s/commands/cluster_map_reduce_agg.cpp b/src/mongo/s/commands/cluster_map_reduce_agg.cpp
index 2af202ec39b..0c786e0d729 100644
--- a/src/mongo/s/commands/cluster_map_reduce_agg.cpp
+++ b/src/mongo/s/commands/cluster_map_reduce_agg.cpp
@@ -114,7 +114,7 @@ Document serializeToCommand(BSONObj originalCmd, const MapReduce& parsedMr, Pipe
translatedCmd["aggregate"] = Value(parsedMr.getNamespace().coll());
translatedCmd[AggregateCommand::kPipelineFieldName] = Value(pipeline->serialize());
- translatedCmd[AggregateCommand::kBatchSizeFieldName] =
+ translatedCmd[AggregateCommand::kCursorFieldName] =
Value(Document{{"batchSize", std::numeric_limits<long long>::max()}});
translatedCmd[AggregateCommand::kAllowDiskUseFieldName] = Value(true);
translatedCmd[AggregateCommand::kFromMongosFieldName] = Value(true);
diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp
index bdbcd306c5e..fa0d714d31c 100644
--- a/src/mongo/s/query/cluster_aggregate.cpp
+++ b/src/mongo/s/query/cluster_aggregate.cpp
@@ -344,7 +344,8 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
return cluster_aggregation_planner::runPipelineOnMongoS(
namespaces,
- request.getBatchSize(),
+ request.getCursor().getBatchSize().value_or(
+ aggregation_request_helper::kDefaultBatchSize),
std::move(targeter.pipeline),
result,
privileges);
@@ -355,7 +356,8 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
opCtx,
std::move(targeter),
aggregation_request_helper::serializeToCommandDoc(request),
- request.getBatchSize(),
+ request.getCursor().getBatchSize().value_or(
+ aggregation_request_helper::kDefaultBatchSize),
namespaces,
privileges,
result,