diff options
author | Ruoxin Xu <ruoxin.xu@mongodb.com> | 2021-01-13 23:19:33 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-01-20 13:29:03 +0000 |
commit | 9686cd9358b744877d43a2d7b71fdb91e40d3de9 (patch) | |
tree | 1277e37a62f3994335c537b9594049e07809440d | |
parent | 191aa3556e3fecd6cec1336ec346911109dc5223 (diff) | |
download | mongo-9686cd9358b744877d43a2d7b71fdb91e40d3de9.tar.gz |
SERVER-53311 Add a 'struct' type to represent 'cursor' object in aggregate command input IDL
-rw-r--r-- | src/mongo/db/commands/run_aggregate.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/pipeline/aggregate_command.idl | 18 | ||||
-rw-r--r-- | src/mongo/db/pipeline/aggregation_request_helper.cpp | 66 | ||||
-rw-r--r-- | src/mongo/db/pipeline/aggregation_request_helper.h | 11 | ||||
-rw-r--r-- | src/mongo/db/pipeline/aggregation_request_test.cpp | 70 | ||||
-rw-r--r-- | src/mongo/db/pipeline/sharded_agg_helpers.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/query/count_command_test.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/query/parsed_distinct_test.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/query/query_request_test.cpp | 20 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/views/resolved_view.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/views/resolved_view_test.cpp | 8 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_map_reduce_agg.cpp | 2 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_aggregate.cpp | 6 |
14 files changed, 135 insertions, 100 deletions
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index b2bef2c6024..28d6abd0c63 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -124,7 +124,8 @@ bool handleCursorCommand(OperationContext* opCtx, const BSONObj& cmdObj, rpc::ReplyBuilderInterface* result) { invariant(!cursors.empty()); - long long batchSize = request.getBatchSize(); + long long batchSize = + request.getCursor().getBatchSize().value_or(aggregation_request_helper::kDefaultBatchSize); if (cursors.size() > 1) { diff --git a/src/mongo/db/pipeline/aggregate_command.idl b/src/mongo/db/pipeline/aggregate_command.idl index a242014f1b9..926ceb42792 100644 --- a/src/mongo/db/pipeline/aggregate_command.idl +++ b/src/mongo/db/pipeline/aggregate_command.idl @@ -46,19 +46,19 @@ types: description: "An array of objects specifying the aggregation pipeline." cpp_type: "std::vector<mongo::BSONObj>" deserializer: ::mongo::parsePipelineFromBSON - batchSize: + aggregateCursor: bson_serialization_type: any - description: "An int representing the cursor batch size." - cpp_type: "std::int64_t" - serializer: ::mongo::serializeBatchSizeToBSON - deserializer: ::mongo::parseBatchSizeFromBSON + description: "A cursor type representing the cursor field of aggregate command." + cpp_type: SimpleCursorOptions + serializer: ::mongo::serializeAggregateCursorToBSON + deserializer: ::mongo::parseAggregateCursorFromBSON explainVerbosity: bson_serialization_type: any description: "The Verbosity value representing explain verbosity." cpp_type: "mongo::ExplainOptions::Verbosity" serializer: ::mongo::serializeExplainToBSON deserializer: ::mongo::parseExplainModeFromBSON - + commands: aggregate: description: "Represents the user-supplied options to the aggregate command." @@ -80,10 +80,8 @@ commands: type: optionalBool cursor: description: "To indicate a cursor with a non-default batch size." - cpp_name: "batchSize" - type: batchSize - default: 101 - validator: { gte: 0 } + type: aggregateCursor + default: SimpleCursorOptions() maxTimeMS: description: "Specifies a time limit in milliseconds for processing operations on a cursor. If you do not specify a value for maxTimeMS, operations will not time out." type: safeInt64 diff --git a/src/mongo/db/pipeline/aggregation_request_helper.cpp b/src/mongo/db/pipeline/aggregation_request_helper.cpp index 80300abb773..0a51de0b871 100644 --- a/src/mongo/db/pipeline/aggregation_request_helper.cpp +++ b/src/mongo/db/pipeline/aggregation_request_helper.cpp @@ -132,7 +132,7 @@ Document serializeToCommandDoc(const AggregateCommand& request) { Status validate(const BSONObj& cmdObj, boost::optional<ExplainOptions::Verbosity> explainVerbosity) { bool hasAllowDiskUseElem = cmdObj.hasField(AggregateCommand::kAllowDiskUseFieldName); - bool hasCursorElem = cmdObj.hasField(AggregateCommand::kBatchSizeFieldName); + bool hasCursorElem = cmdObj.hasField(AggregateCommand::kCursorFieldName); bool hasExplainElem = cmdObj.hasField(AggregateCommand::kExplainFieldName); bool hasExplain = explainVerbosity || (hasExplainElem && cmdObj[AggregateCommand::kExplainFieldName].Bool()); @@ -144,7 +144,7 @@ Status validate(const BSONObj& cmdObj, if (!hasCursorElem && !hasExplainElem) { return {ErrorCodes::FailedToParse, str::stream() - << "The '" << AggregateCommand::kBatchSizeFieldName + << "The '" << AggregateCommand::kCursorFieldName << "' option is required, except for aggregate with the explain argument"}; } @@ -172,36 +172,6 @@ Status validate(const BSONObj& cmdObj, // Custom serializers/deserializers for AggregateCommand. -long long parseBatchSizeFromBSON(const BSONElement& cursorElem) { - long long batchSize = 101; - - if (cursorElem.eoo()) { - return batchSize; - } - - uassert(ErrorCodes::TypeMismatch, - "cursor field must be missing or an object", - cursorElem.type() == mongo::Object); - - BSONObj cursor = cursorElem.embeddedObject(); - BSONElement batchSizeElem = cursor[aggregation_request_helper::kBatchSizeField]; - - const int expectedNumberOfCursorFields = batchSizeElem.eoo() ? 0 : 1; - uassert(ErrorCodes::BadValue, - "cursor object can't contain fields other than batchSize", - cursor.nFields() == expectedNumberOfCursorFields); - - if (batchSizeElem.eoo()) { - return batchSize; - } - - uassert( - ErrorCodes::TypeMismatch, "cursor.batchSize must be a number", batchSizeElem.isNumber()); - batchSize = batchSizeElem.numberLong(); - - return batchSize; -} - boost::optional<mongo::ExplainOptions::Verbosity> parseExplainModeFromBSON( const BSONElement& explainElem) { uassert(ErrorCodes::TypeMismatch, @@ -220,16 +190,38 @@ void serializeExplainToBSON(const mongo::ExplainOptions::Verbosity& explain, BSONObjBuilder* builder) { // Note that we do not serialize 'explain' field to the command object. This serializer only // serializes an empty cursor object for field 'cursor' when it is an explain command. - builder->append(AggregateCommand::kBatchSizeFieldName, BSONObj()); + builder->append(AggregateCommand::kCursorFieldName, BSONObj()); return; } -void serializeBatchSizeToBSON(const std::int64_t& batchSize, - StringData fieldName, - BSONObjBuilder* builder) { +mongo::SimpleCursorOptions parseAggregateCursorFromBSON(const BSONElement& cursorElem) { + if (cursorElem.eoo()) { + SimpleCursorOptions cursor; + cursor.setBatchSize(aggregation_request_helper::kDefaultBatchSize); + return cursor; + } + + uassert(ErrorCodes::TypeMismatch, + "cursor field must be missing or an object", + cursorElem.type() == mongo::Object); + + SimpleCursorOptions cursor = SimpleCursorOptions::parse( + IDLParserErrorContext(AggregateCommand::kCursorFieldName), cursorElem.embeddedObject()); + if (!cursor.getBatchSize()) + cursor.setBatchSize(aggregation_request_helper::kDefaultBatchSize); + + return cursor; +} + +void serializeAggregateCursorToBSON(const mongo::SimpleCursorOptions& cursor, + StringData fieldName, + BSONObjBuilder* builder) { if (!builder->hasField(fieldName)) { - builder->append(fieldName, BSON(aggregation_request_helper::kBatchSizeField << batchSize)); + builder->append( + fieldName, + BSON(aggregation_request_helper::kBatchSizeField + << cursor.getBatchSize().value_or(aggregation_request_helper::kDefaultBatchSize))); } return; diff --git a/src/mongo/db/pipeline/aggregation_request_helper.h b/src/mongo/db/pipeline/aggregation_request_helper.h index d3e61857a4b..7c04dfea823 100644 --- a/src/mongo/db/pipeline/aggregation_request_helper.h +++ b/src/mongo/db/pipeline/aggregation_request_helper.h @@ -40,6 +40,7 @@ #include "mongo/db/pipeline/legacy_runtime_constants_gen.h" #include "mongo/db/query/explain_options.h" #include "mongo/db/write_concern_options.h" +#include "mongo/idl/basic_types_gen.h" namespace mongo { @@ -109,8 +110,6 @@ Status validate(const BSONObj& cmdObj, boost::optional<ExplainOptions::Verbosity * Custom serializers/deserializers for AggregateCommand. */ -long long parseBatchSizeFromBSON(const BSONElement& cursorElem); - boost::optional<mongo::ExplainOptions::Verbosity> parseExplainModeFromBSON( const BSONElement& explainElem); @@ -118,9 +117,11 @@ void serializeExplainToBSON(const mongo::ExplainOptions::Verbosity& explain, StringData fieldName, BSONObjBuilder* builder); -void serializeBatchSizeToBSON(const std::int64_t& batchSize, - StringData fieldName, - BSONObjBuilder* builder); +mongo::SimpleCursorOptions parseAggregateCursorFromBSON(const BSONElement& cursorElem); + +void serializeAggregateCursorToBSON(const SimpleCursorOptions& cursor, + StringData fieldName, + BSONObjBuilder* builder); /** * Parse an aggregation pipeline definition from 'pipelineElem'. diff --git a/src/mongo/db/pipeline/aggregation_request_test.cpp b/src/mongo/db/pipeline/aggregation_request_test.cpp index 4492375d67b..95e0e9316f6 100644 --- a/src/mongo/db/pipeline/aggregation_request_test.cpp +++ b/src/mongo/db/pipeline/aggregation_request_test.cpp @@ -79,7 +79,9 @@ TEST(AggregationRequestTest, ShouldParseAllKnownOptions) { ASSERT_TRUE(request.getNeedsMerge()); ASSERT_TRUE(request.getBypassDocumentValidation().value_or(false)); ASSERT_TRUE(request.getRequestReshardingResumeToken()); - ASSERT_EQ(request.getBatchSize(), 10); + ASSERT_EQ( + request.getCursor().getBatchSize().value_or(aggregation_request_helper::kDefaultBatchSize), + 10); ASSERT_BSONOBJ_EQ(request.getHint().value_or(BSONObj()), BSON("a" << 1)); ASSERT_BSONOBJ_EQ(request.getCollation().value_or(BSONObj()), BSON("locale" @@ -121,7 +123,9 @@ TEST(AggregationRequestTest, ShouldParseExplicitExplainFalseWithCursorOption) { "'a'}"); auto request = unittest::assertGet(aggregation_request_helper::parseFromBSON(nss, inputBson)); ASSERT_FALSE(request.getExplain()); - ASSERT_EQ(request.getBatchSize(), 10); + ASSERT_EQ( + request.getCursor().getBatchSize().value_or(aggregation_request_helper::kDefaultBatchSize), + 10); } TEST(AggregationRequestTest, ShouldParseWithSeparateQueryPlannerExplainModeArg) { @@ -142,7 +146,9 @@ TEST(AggregationRequestTest, ShouldParseWithSeparateQueryPlannerExplainModeArgAn nss, inputBson, ExplainOptions::Verbosity::kExecStats)); ASSERT_TRUE(request.getExplain()); ASSERT(*request.getExplain() == ExplainOptions::Verbosity::kExecStats); - ASSERT_EQ(request.getBatchSize(), 10); + ASSERT_EQ( + request.getCursor().getBatchSize().value_or(aggregation_request_helper::kDefaultBatchSize), + 10); } TEST(AggregationRequestTest, ShouldParseExplainFlagWithReadConcern) { @@ -170,7 +176,7 @@ TEST(AggregationRequestTest, ShouldOnlySerializeRequiredFieldsIfNoOptionalFields auto expectedSerialization = Document{{AggregateCommand::kCommandName, nss.coll()}, {AggregateCommand::kPipelineFieldName, std::vector<Value>{}}, - {AggregateCommand::kBatchSizeFieldName, Value(kDefaultCursorOptionDocument)}}; + {AggregateCommand::kCursorFieldName, Value(kDefaultCursorOptionDocument)}}; ASSERT_DOCUMENT_EQ(aggregation_request_helper::serializeToCommandDoc(request), expectedSerialization); } @@ -183,7 +189,9 @@ TEST(AggregationRequestTest, ShouldSerializeOptionalValuesIfSet) { request.setNeedsMerge(true); request.setBypassDocumentValidation(true); request.setRequestReshardingResumeToken(true); - request.setBatchSize(10); + SimpleCursorOptions cursor; + cursor.setBatchSize(10); + request.setCursor(cursor); request.setMaxTimeMS(10u); const auto hintObj = BSON("a" << 1); request.setHint(hintObj); @@ -203,23 +211,23 @@ TEST(AggregationRequestTest, ShouldSerializeOptionalValuesIfSet) { auto uuid = UUID::gen(); request.setCollectionUUID(uuid); - auto expectedSerialization = Document{ - {AggregateCommand::kCommandName, nss.coll()}, - {AggregateCommand::kPipelineFieldName, std::vector<Value>{}}, - {AggregateCommand::kAllowDiskUseFieldName, true}, - {AggregateCommand::kBatchSizeFieldName, Value(Document({{kBatchSizeFieldName, 10}}))}, - {QueryRequest::cmdOptionMaxTimeMS, 10}, - {AggregateCommand::kBypassDocumentValidationFieldName, true}, - {repl::ReadConcernArgs::kReadConcernFieldName, readConcernObj}, - {AggregateCommand::kCollationFieldName, collationObj}, - {AggregateCommand::kHintFieldName, hintObj}, - {AggregateCommand::kLetFieldName, letParamsObj}, - {AggregateCommand::kNeedsMergeFieldName, true}, - {AggregateCommand::kFromMongosFieldName, true}, - {QueryRequest::kUnwrappedReadPrefField, readPrefObj}, - {AggregateCommand::kRequestReshardingResumeTokenFieldName, true}, - {AggregateCommand::kIsMapReduceCommandFieldName, true}, - {AggregateCommand::kCollectionUUIDFieldName, uuid}}; + auto expectedSerialization = + Document{{AggregateCommand::kCommandName, nss.coll()}, + {AggregateCommand::kPipelineFieldName, std::vector<Value>{}}, + {AggregateCommand::kAllowDiskUseFieldName, true}, + {AggregateCommand::kCursorFieldName, Value(Document({{kBatchSizeFieldName, 10}}))}, + {QueryRequest::cmdOptionMaxTimeMS, 10}, + {AggregateCommand::kBypassDocumentValidationFieldName, true}, + {repl::ReadConcernArgs::kReadConcernFieldName, readConcernObj}, + {AggregateCommand::kCollationFieldName, collationObj}, + {AggregateCommand::kHintFieldName, hintObj}, + {AggregateCommand::kLetFieldName, letParamsObj}, + {AggregateCommand::kNeedsMergeFieldName, true}, + {AggregateCommand::kFromMongosFieldName, true}, + {QueryRequest::kUnwrappedReadPrefField, readPrefObj}, + {AggregateCommand::kRequestReshardingResumeTokenFieldName, true}, + {AggregateCommand::kIsMapReduceCommandFieldName, true}, + {AggregateCommand::kCollectionUUIDFieldName, uuid}}; ASSERT_DOCUMENT_EQ(aggregation_request_helper::serializeToCommandDoc(request), expectedSerialization); } @@ -227,12 +235,14 @@ TEST(AggregationRequestTest, ShouldSerializeOptionalValuesIfSet) { TEST(AggregationRequestTest, ShouldSerializeBatchSizeIfSetAndExplainFalse) { NamespaceString nss("a.collection"); AggregateCommand request(nss, {}); - request.setBatchSize(10); + SimpleCursorOptions cursor; + cursor.setBatchSize(10); + request.setCursor(cursor); auto expectedSerialization = Document{ {AggregateCommand::kCommandName, nss.coll()}, {AggregateCommand::kPipelineFieldName, std::vector<Value>{}}, - {AggregateCommand::kBatchSizeFieldName, Value(Document({{kBatchSizeFieldName, 10}}))}}; + {AggregateCommand::kCursorFieldName, Value(Document({{kBatchSizeFieldName, 10}}))}}; ASSERT_DOCUMENT_EQ(aggregation_request_helper::serializeToCommandDoc(request), expectedSerialization); } @@ -244,7 +254,7 @@ TEST(AggregationRequestTest, ShouldSerialiseAggregateFieldToOneIfCollectionIsAgg auto expectedSerialization = Document{{AggregateCommand::kCommandName, 1}, {AggregateCommand::kPipelineFieldName, std::vector<Value>{}}, - {AggregateCommand::kBatchSizeFieldName, + {AggregateCommand::kCursorFieldName, Value(Document({{aggregation_request_helper::kBatchSizeField, aggregation_request_helper::kDefaultBatchSize}}))}}; @@ -258,7 +268,9 @@ TEST(AggregationRequestTest, ShouldSetBatchSizeToDefaultOnEmptyCursorObject) { "{aggregate: 'collection', pipeline: [{$match: {a: 'abc'}}], cursor: {}, $db: 'a'}"); auto request = aggregation_request_helper::parseFromBSON(nss, inputBson); ASSERT_OK(request.getStatus()); - ASSERT_EQ(request.getValue().getBatchSize(), aggregation_request_helper::kDefaultBatchSize); + ASSERT_EQ(request.getValue().getCursor().getBatchSize().value_or( + aggregation_request_helper::kDefaultBatchSize), + aggregation_request_helper::kDefaultBatchSize); } TEST(AggregationRequestTest, ShouldAcceptHintAsString) { @@ -276,13 +288,15 @@ TEST(AggregationRequestTest, ShouldAcceptHintAsString) { TEST(AggregationRequestTest, ShouldNotSerializeBatchSizeWhenExplainSet) { NamespaceString nss("a.collection"); AggregateCommand request(nss, {}); - request.setBatchSize(10); + SimpleCursorOptions cursor; + cursor.setBatchSize(10); + request.setCursor(cursor); request.setExplain(ExplainOptions::Verbosity::kQueryPlanner); auto expectedSerialization = Document{{AggregateCommand::kCommandName, nss.coll()}, {AggregateCommand::kPipelineFieldName, std::vector<Value>{}}, - {AggregateCommand::kBatchSizeFieldName, Value(Document())}}; + {AggregateCommand::kCursorFieldName, Value(Document())}}; ASSERT_DOCUMENT_EQ(aggregation_request_helper::serializeToCommandDoc(request), expectedSerialization); } diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index 8a0fbc10998..ec72dceff96 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -109,7 +109,9 @@ RemoteCursor openChangeStreamNewShardMonitor(const boost::intrusive_ptr<Expressi << DocumentSourceChangeStreamSpec::kAllowToRunOnConfigDBFieldName << true))}); aggReq.setFromMongos(true); aggReq.setNeedsMerge(true); - aggReq.setBatchSize(0); + SimpleCursorOptions cursor; + cursor.setBatchSize(0); + aggReq.setCursor(cursor); auto cmdObjWithRWC = applyReadWriteConcern(expCtx->opCtx, true, /* appendRC */ @@ -827,7 +829,7 @@ BSONObj createCommandForTargetedShards(const boost::intrusive_ptr<ExpressionCont } } - targetedCmd[AggregateCommand::kBatchSizeFieldName] = + targetedCmd[AggregateCommand::kCursorFieldName] = Value(DOC(aggregation_request_helper::kBatchSizeField << 0)); targetedCmd[AggregateCommand::kExchangeFieldName] = diff --git a/src/mongo/db/query/count_command_test.cpp b/src/mongo/db/query/count_command_test.cpp index 67b1cc64591..65264d2a6e5 100644 --- a/src/mongo/db/query/count_command_test.cpp +++ b/src/mongo/db/query/count_command_test.cpp @@ -187,7 +187,8 @@ TEST(CountCommandTest, ConvertToAggregationWithQueryAndFilterAndLimit) { auto cmdObj = OpMsgRequest::fromDBAndBody(testns.db(), agg).body; auto ar = uassertStatusOK(aggregation_request_helper::parseFromBSON(testns, cmdObj)); - ASSERT_EQ(ar.getBatchSize(), aggregation_request_helper::kDefaultBatchSize); + ASSERT_EQ(ar.getCursor().getBatchSize().value_or(aggregation_request_helper::kDefaultBatchSize), + aggregation_request_helper::kDefaultBatchSize); ASSERT_EQ(ar.getNamespace(), testns); ASSERT_BSONOBJ_EQ(ar.getCollation().value_or(BSONObj()), BSONObj()); diff --git a/src/mongo/db/query/parsed_distinct_test.cpp b/src/mongo/db/query/parsed_distinct_test.cpp index 17706e9e87a..54bf37496d1 100644 --- a/src/mongo/db/query/parsed_distinct_test.cpp +++ b/src/mongo/db/query/parsed_distinct_test.cpp @@ -63,7 +63,9 @@ TEST(ParsedDistinctTest, ConvertToAggregationNoQuery) { auto ar = aggregation_request_helper::parseFromBSON(testns, cmdObj); ASSERT_OK(ar.getStatus()); ASSERT(!ar.getValue().getExplain()); - ASSERT_EQ(ar.getValue().getBatchSize(), aggregation_request_helper::kDefaultBatchSize); + ASSERT_EQ(ar.getValue().getCursor().getBatchSize().value_or( + aggregation_request_helper::kDefaultBatchSize), + aggregation_request_helper::kDefaultBatchSize); ASSERT_EQ(ar.getValue().getNamespace(), testns); ASSERT_BSONOBJ_EQ(ar.getValue().getCollation().value_or(BSONObj()), BSONObj()); ASSERT(ar.getValue().getReadConcern().value_or(BSONObj()).isEmpty()); @@ -103,7 +105,9 @@ TEST(ParsedDistinctTest, ConvertToAggregationDottedPathNoQuery) { auto ar = aggregation_request_helper::parseFromBSON(testns, cmdObj); ASSERT_OK(ar.getStatus()); ASSERT(!ar.getValue().getExplain()); - ASSERT_EQ(ar.getValue().getBatchSize(), aggregation_request_helper::kDefaultBatchSize); + ASSERT_EQ(ar.getValue().getCursor().getBatchSize().value_or( + aggregation_request_helper::kDefaultBatchSize), + aggregation_request_helper::kDefaultBatchSize); ASSERT_EQ(ar.getValue().getNamespace(), testns); ASSERT_BSONOBJ_EQ(ar.getValue().getCollation().value_or(BSONObj()), BSONObj()); ASSERT(ar.getValue().getReadConcern().value_or(BSONObj()).isEmpty()); @@ -168,7 +172,9 @@ TEST(ParsedDistinctTest, ConvertToAggregationWithAllOptions) { auto ar = aggregation_request_helper::parseFromBSON(testns, cmdObj); ASSERT_OK(ar.getStatus()); ASSERT(!ar.getValue().getExplain()); - ASSERT_EQ(ar.getValue().getBatchSize(), aggregation_request_helper::kDefaultBatchSize); + ASSERT_EQ(ar.getValue().getCursor().getBatchSize().value_or( + aggregation_request_helper::kDefaultBatchSize), + aggregation_request_helper::kDefaultBatchSize); ASSERT_EQ(ar.getValue().getNamespace(), testns); ASSERT_BSONOBJ_EQ(ar.getValue().getCollation().value_or(BSONObj()), BSON("locale" @@ -215,7 +221,9 @@ TEST(ParsedDistinctTest, ConvertToAggregationWithQuery) { auto ar = aggregation_request_helper::parseFromBSON(testns, cmdObj); ASSERT_OK(ar.getStatus()); ASSERT(!ar.getValue().getExplain()); - ASSERT_EQ(ar.getValue().getBatchSize(), aggregation_request_helper::kDefaultBatchSize); + ASSERT_EQ(ar.getValue().getCursor().getBatchSize().value_or( + aggregation_request_helper::kDefaultBatchSize), + aggregation_request_helper::kDefaultBatchSize); ASSERT_EQ(ar.getValue().getNamespace(), testns); ASSERT_BSONOBJ_EQ(ar.getValue().getCollation().value_or(BSONObj()), BSONObj()); ASSERT(ar.getValue().getReadConcern().value_or(BSONObj()).isEmpty()); diff --git a/src/mongo/db/query/query_request_test.cpp b/src/mongo/db/query/query_request_test.cpp index 4a042669352..26d014a996b 100644 --- a/src/mongo/db/query/query_request_test.cpp +++ b/src/mongo/db/query/query_request_test.cpp @@ -1280,7 +1280,9 @@ TEST(QueryRequestTest, ConvertToAggregationSucceeds) { ASSERT_OK(ar.getStatus()); ASSERT(!ar.getValue().getExplain()); ASSERT(ar.getValue().getPipeline().empty()); - ASSERT_EQ(ar.getValue().getBatchSize(), aggregation_request_helper::kDefaultBatchSize); + ASSERT_EQ(ar.getValue().getCursor().getBatchSize().value_or( + aggregation_request_helper::kDefaultBatchSize), + aggregation_request_helper::kDefaultBatchSize); ASSERT_EQ(ar.getValue().getNamespace(), testns); ASSERT_BSONOBJ_EQ(ar.getValue().getCollation().value_or(BSONObj()), BSONObj()); } @@ -1414,7 +1416,9 @@ TEST(QueryRequestTest, ConvertToAggregationWithPipeline) { auto ar = aggregation_request_helper::parseFromBSON(testns, aggCmd); ASSERT_OK(ar.getStatus()); ASSERT(!ar.getValue().getExplain()); - ASSERT_EQ(ar.getValue().getBatchSize(), aggregation_request_helper::kDefaultBatchSize); + ASSERT_EQ(ar.getValue().getCursor().getBatchSize().value_or( + aggregation_request_helper::kDefaultBatchSize), + aggregation_request_helper::kDefaultBatchSize); ASSERT_EQ(ar.getValue().getNamespace(), testns); ASSERT_BSONOBJ_EQ(ar.getValue().getCollation().value_or(BSONObj()), BSONObj()); @@ -1441,7 +1445,9 @@ TEST(QueryRequestTest, ConvertToAggregationWithBatchSize) { ASSERT_OK(ar.getStatus()); ASSERT(!ar.getValue().getExplain()); ASSERT_EQ(ar.getValue().getNamespace(), testns); - ASSERT_EQ(ar.getValue().getBatchSize(), 4LL); + ASSERT_EQ(ar.getValue().getCursor().getBatchSize().value_or( + aggregation_request_helper::kDefaultBatchSize), + 4LL); ASSERT_BSONOBJ_EQ(ar.getValue().getCollation().value_or(BSONObj()), BSONObj()); } @@ -1459,7 +1465,9 @@ TEST(QueryRequestTest, ConvertToAggregationWithMaxTimeMS) { auto ar = aggregation_request_helper::parseFromBSON(testns, aggCmd); ASSERT_OK(ar.getStatus()); ASSERT(!ar.getValue().getExplain()); - ASSERT_EQ(ar.getValue().getBatchSize(), aggregation_request_helper::kDefaultBatchSize); + ASSERT_EQ(ar.getValue().getCursor().getBatchSize().value_or( + aggregation_request_helper::kDefaultBatchSize), + aggregation_request_helper::kDefaultBatchSize); ASSERT_EQ(ar.getValue().getNamespace(), testns); ASSERT_BSONOBJ_EQ(ar.getValue().getCollation().value_or(BSONObj()), BSONObj()); } @@ -1475,7 +1483,9 @@ TEST(QueryRequestTest, ConvertToAggregationWithCollationSucceeds) { ASSERT_OK(ar.getStatus()); ASSERT(!ar.getValue().getExplain()); ASSERT(ar.getValue().getPipeline().empty()); - ASSERT_EQ(ar.getValue().getBatchSize(), aggregation_request_helper::kDefaultBatchSize); + ASSERT_EQ(ar.getValue().getCursor().getBatchSize().value_or( + aggregation_request_helper::kDefaultBatchSize), + aggregation_request_helper::kDefaultBatchSize); ASSERT_EQ(ar.getValue().getNamespace(), testns); ASSERT_BSONOBJ_EQ(ar.getValue().getCollation().value_or(BSONObj()), BSON("f" << 1)); } diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp index a2713b2c7d3..30b057612b1 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp @@ -266,7 +266,9 @@ AggregateCommand ReshardingOplogFetcher::_makeAggregateCommand(Client* client) { aggRequest.setRequestReshardingResumeToken(true); if (_initialBatchSize) { - aggRequest.setBatchSize(_initialBatchSize); + SimpleCursorOptions cursor; + cursor.setBatchSize(_initialBatchSize); + aggRequest.setCursor(cursor); } return aggRequest; diff --git a/src/mongo/db/views/resolved_view.cpp b/src/mongo/db/views/resolved_view.cpp index bccc3562d06..d74f0bacbeb 100644 --- a/src/mongo/db/views/resolved_view.cpp +++ b/src/mongo/db/views/resolved_view.cpp @@ -100,7 +100,7 @@ AggregateCommand ResolvedView::asExpandedViewAggregation(const AggregateCommand& if (request.getExplain()) { expandedRequest.setExplain(request.getExplain()); } else { - expandedRequest.setBatchSize(request.getBatchSize()); + expandedRequest.setCursor(request.getCursor()); } expandedRequest.setHint(request.getHint()); diff --git a/src/mongo/db/views/resolved_view_test.cpp b/src/mongo/db/views/resolved_view_test.cpp index 0a795870986..95797376f28 100644 --- a/src/mongo/db/views/resolved_view_test.cpp +++ b/src/mongo/db/views/resolved_view_test.cpp @@ -90,13 +90,17 @@ TEST(ResolvedViewTest, ExpandingAggRequestPreservesExplain) { TEST(ResolvedViewTest, ExpandingAggRequestWithCursorAndExplainOnlyPreservesExplain) { const ResolvedView resolvedView{backingNss, emptyPipeline, kSimpleCollation}; AggregateCommand aggRequest{viewNss, {}}; - aggRequest.setBatchSize(10); + SimpleCursorOptions cursor; + cursor.setBatchSize(10); + aggRequest.setCursor(cursor); aggRequest.setExplain(ExplainOptions::Verbosity::kExecStats); auto result = resolvedView.asExpandedViewAggregation(aggRequest); ASSERT(result.getExplain()); ASSERT(*result.getExplain() == ExplainOptions::Verbosity::kExecStats); - ASSERT_EQ(result.getBatchSize(), aggregation_request_helper::kDefaultBatchSize); + ASSERT_EQ( + result.getCursor().getBatchSize().value_or(aggregation_request_helper::kDefaultBatchSize), + aggregation_request_helper::kDefaultBatchSize); } TEST(ResolvedViewTest, ExpandingAggRequestPreservesBypassDocumentValidation) { diff --git a/src/mongo/s/commands/cluster_map_reduce_agg.cpp b/src/mongo/s/commands/cluster_map_reduce_agg.cpp index 2af202ec39b..0c786e0d729 100644 --- a/src/mongo/s/commands/cluster_map_reduce_agg.cpp +++ b/src/mongo/s/commands/cluster_map_reduce_agg.cpp @@ -114,7 +114,7 @@ Document serializeToCommand(BSONObj originalCmd, const MapReduce& parsedMr, Pipe translatedCmd["aggregate"] = Value(parsedMr.getNamespace().coll()); translatedCmd[AggregateCommand::kPipelineFieldName] = Value(pipeline->serialize()); - translatedCmd[AggregateCommand::kBatchSizeFieldName] = + translatedCmd[AggregateCommand::kCursorFieldName] = Value(Document{{"batchSize", std::numeric_limits<long long>::max()}}); translatedCmd[AggregateCommand::kAllowDiskUseFieldName] = Value(true); translatedCmd[AggregateCommand::kFromMongosFieldName] = Value(true); diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp index bdbcd306c5e..fa0d714d31c 100644 --- a/src/mongo/s/query/cluster_aggregate.cpp +++ b/src/mongo/s/query/cluster_aggregate.cpp @@ -344,7 +344,8 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, return cluster_aggregation_planner::runPipelineOnMongoS( namespaces, - request.getBatchSize(), + request.getCursor().getBatchSize().value_or( + aggregation_request_helper::kDefaultBatchSize), std::move(targeter.pipeline), result, privileges); @@ -355,7 +356,8 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, opCtx, std::move(targeter), aggregation_request_helper::serializeToCommandDoc(request), - request.getBatchSize(), + request.getCursor().getBatchSize().value_or( + aggregation_request_helper::kDefaultBatchSize), namespaces, privileges, result, |