From 9686cd9358b744877d43a2d7b71fdb91e40d3de9 Mon Sep 17 00:00:00 2001 From: Ruoxin Xu Date: Wed, 13 Jan 2021 23:19:33 +0000 Subject: SERVER-53311 Add a 'struct' type to represent 'cursor' object in aggregate command input IDL --- src/mongo/db/commands/run_aggregate.cpp | 3 +- src/mongo/db/pipeline/aggregate_command.idl | 18 +++--- .../db/pipeline/aggregation_request_helper.cpp | 66 +++++++++----------- src/mongo/db/pipeline/aggregation_request_helper.h | 11 ++-- src/mongo/db/pipeline/aggregation_request_test.cpp | 70 +++++++++++++--------- src/mongo/db/pipeline/sharded_agg_helpers.cpp | 6 +- src/mongo/db/query/count_command_test.cpp | 3 +- src/mongo/db/query/parsed_distinct_test.cpp | 16 +++-- src/mongo/db/query/query_request_test.cpp | 20 +++++-- .../db/s/resharding/resharding_oplog_fetcher.cpp | 4 +- src/mongo/db/views/resolved_view.cpp | 2 +- src/mongo/db/views/resolved_view_test.cpp | 8 ++- src/mongo/s/commands/cluster_map_reduce_agg.cpp | 2 +- src/mongo/s/query/cluster_aggregate.cpp | 6 +- 14 files changed, 135 insertions(+), 100 deletions(-) diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index b2bef2c6024..28d6abd0c63 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -124,7 +124,8 @@ bool handleCursorCommand(OperationContext* opCtx, const BSONObj& cmdObj, rpc::ReplyBuilderInterface* result) { invariant(!cursors.empty()); - long long batchSize = request.getBatchSize(); + long long batchSize = + request.getCursor().getBatchSize().value_or(aggregation_request_helper::kDefaultBatchSize); if (cursors.size() > 1) { diff --git a/src/mongo/db/pipeline/aggregate_command.idl b/src/mongo/db/pipeline/aggregate_command.idl index a242014f1b9..926ceb42792 100644 --- a/src/mongo/db/pipeline/aggregate_command.idl +++ b/src/mongo/db/pipeline/aggregate_command.idl @@ -46,19 +46,19 @@ types: description: "An array of objects specifying the aggregation pipeline." cpp_type: "std::vector" deserializer: ::mongo::parsePipelineFromBSON - batchSize: + aggregateCursor: bson_serialization_type: any - description: "An int representing the cursor batch size." - cpp_type: "std::int64_t" - serializer: ::mongo::serializeBatchSizeToBSON - deserializer: ::mongo::parseBatchSizeFromBSON + description: "A cursor type representing the cursor field of aggregate command." + cpp_type: SimpleCursorOptions + serializer: ::mongo::serializeAggregateCursorToBSON + deserializer: ::mongo::parseAggregateCursorFromBSON explainVerbosity: bson_serialization_type: any description: "The Verbosity value representing explain verbosity." cpp_type: "mongo::ExplainOptions::Verbosity" serializer: ::mongo::serializeExplainToBSON deserializer: ::mongo::parseExplainModeFromBSON - + commands: aggregate: description: "Represents the user-supplied options to the aggregate command." @@ -80,10 +80,8 @@ commands: type: optionalBool cursor: description: "To indicate a cursor with a non-default batch size." - cpp_name: "batchSize" - type: batchSize - default: 101 - validator: { gte: 0 } + type: aggregateCursor + default: SimpleCursorOptions() maxTimeMS: description: "Specifies a time limit in milliseconds for processing operations on a cursor. If you do not specify a value for maxTimeMS, operations will not time out." type: safeInt64 diff --git a/src/mongo/db/pipeline/aggregation_request_helper.cpp b/src/mongo/db/pipeline/aggregation_request_helper.cpp index 80300abb773..0a51de0b871 100644 --- a/src/mongo/db/pipeline/aggregation_request_helper.cpp +++ b/src/mongo/db/pipeline/aggregation_request_helper.cpp @@ -132,7 +132,7 @@ Document serializeToCommandDoc(const AggregateCommand& request) { Status validate(const BSONObj& cmdObj, boost::optional explainVerbosity) { bool hasAllowDiskUseElem = cmdObj.hasField(AggregateCommand::kAllowDiskUseFieldName); - bool hasCursorElem = cmdObj.hasField(AggregateCommand::kBatchSizeFieldName); + bool hasCursorElem = cmdObj.hasField(AggregateCommand::kCursorFieldName); bool hasExplainElem = cmdObj.hasField(AggregateCommand::kExplainFieldName); bool hasExplain = explainVerbosity || (hasExplainElem && cmdObj[AggregateCommand::kExplainFieldName].Bool()); @@ -144,7 +144,7 @@ Status validate(const BSONObj& cmdObj, if (!hasCursorElem && !hasExplainElem) { return {ErrorCodes::FailedToParse, str::stream() - << "The '" << AggregateCommand::kBatchSizeFieldName + << "The '" << AggregateCommand::kCursorFieldName << "' option is required, except for aggregate with the explain argument"}; } @@ -172,36 +172,6 @@ Status validate(const BSONObj& cmdObj, // Custom serializers/deserializers for AggregateCommand. -long long parseBatchSizeFromBSON(const BSONElement& cursorElem) { - long long batchSize = 101; - - if (cursorElem.eoo()) { - return batchSize; - } - - uassert(ErrorCodes::TypeMismatch, - "cursor field must be missing or an object", - cursorElem.type() == mongo::Object); - - BSONObj cursor = cursorElem.embeddedObject(); - BSONElement batchSizeElem = cursor[aggregation_request_helper::kBatchSizeField]; - - const int expectedNumberOfCursorFields = batchSizeElem.eoo() ? 0 : 1; - uassert(ErrorCodes::BadValue, - "cursor object can't contain fields other than batchSize", - cursor.nFields() == expectedNumberOfCursorFields); - - if (batchSizeElem.eoo()) { - return batchSize; - } - - uassert( - ErrorCodes::TypeMismatch, "cursor.batchSize must be a number", batchSizeElem.isNumber()); - batchSize = batchSizeElem.numberLong(); - - return batchSize; -} - boost::optional parseExplainModeFromBSON( const BSONElement& explainElem) { uassert(ErrorCodes::TypeMismatch, @@ -220,16 +190,38 @@ void serializeExplainToBSON(const mongo::ExplainOptions::Verbosity& explain, BSONObjBuilder* builder) { // Note that we do not serialize 'explain' field to the command object. This serializer only // serializes an empty cursor object for field 'cursor' when it is an explain command. - builder->append(AggregateCommand::kBatchSizeFieldName, BSONObj()); + builder->append(AggregateCommand::kCursorFieldName, BSONObj()); return; } -void serializeBatchSizeToBSON(const std::int64_t& batchSize, - StringData fieldName, - BSONObjBuilder* builder) { +mongo::SimpleCursorOptions parseAggregateCursorFromBSON(const BSONElement& cursorElem) { + if (cursorElem.eoo()) { + SimpleCursorOptions cursor; + cursor.setBatchSize(aggregation_request_helper::kDefaultBatchSize); + return cursor; + } + + uassert(ErrorCodes::TypeMismatch, + "cursor field must be missing or an object", + cursorElem.type() == mongo::Object); + + SimpleCursorOptions cursor = SimpleCursorOptions::parse( + IDLParserErrorContext(AggregateCommand::kCursorFieldName), cursorElem.embeddedObject()); + if (!cursor.getBatchSize()) + cursor.setBatchSize(aggregation_request_helper::kDefaultBatchSize); + + return cursor; +} + +void serializeAggregateCursorToBSON(const mongo::SimpleCursorOptions& cursor, + StringData fieldName, + BSONObjBuilder* builder) { if (!builder->hasField(fieldName)) { - builder->append(fieldName, BSON(aggregation_request_helper::kBatchSizeField << batchSize)); + builder->append( + fieldName, + BSON(aggregation_request_helper::kBatchSizeField + << cursor.getBatchSize().value_or(aggregation_request_helper::kDefaultBatchSize))); } return; diff --git a/src/mongo/db/pipeline/aggregation_request_helper.h b/src/mongo/db/pipeline/aggregation_request_helper.h index d3e61857a4b..7c04dfea823 100644 --- a/src/mongo/db/pipeline/aggregation_request_helper.h +++ b/src/mongo/db/pipeline/aggregation_request_helper.h @@ -40,6 +40,7 @@ #include "mongo/db/pipeline/legacy_runtime_constants_gen.h" #include "mongo/db/query/explain_options.h" #include "mongo/db/write_concern_options.h" +#include "mongo/idl/basic_types_gen.h" namespace mongo { @@ -109,8 +110,6 @@ Status validate(const BSONObj& cmdObj, boost::optional parseExplainModeFromBSON( const BSONElement& explainElem); @@ -118,9 +117,11 @@ void serializeExplainToBSON(const mongo::ExplainOptions::Verbosity& explain, StringData fieldName, BSONObjBuilder* builder); -void serializeBatchSizeToBSON(const std::int64_t& batchSize, - StringData fieldName, - BSONObjBuilder* builder); +mongo::SimpleCursorOptions parseAggregateCursorFromBSON(const BSONElement& cursorElem); + +void serializeAggregateCursorToBSON(const SimpleCursorOptions& cursor, + StringData fieldName, + BSONObjBuilder* builder); /** * Parse an aggregation pipeline definition from 'pipelineElem'. diff --git a/src/mongo/db/pipeline/aggregation_request_test.cpp b/src/mongo/db/pipeline/aggregation_request_test.cpp index 4492375d67b..95e0e9316f6 100644 --- a/src/mongo/db/pipeline/aggregation_request_test.cpp +++ b/src/mongo/db/pipeline/aggregation_request_test.cpp @@ -79,7 +79,9 @@ TEST(AggregationRequestTest, ShouldParseAllKnownOptions) { ASSERT_TRUE(request.getNeedsMerge()); ASSERT_TRUE(request.getBypassDocumentValidation().value_or(false)); ASSERT_TRUE(request.getRequestReshardingResumeToken()); - ASSERT_EQ(request.getBatchSize(), 10); + ASSERT_EQ( + request.getCursor().getBatchSize().value_or(aggregation_request_helper::kDefaultBatchSize), + 10); ASSERT_BSONOBJ_EQ(request.getHint().value_or(BSONObj()), BSON("a" << 1)); ASSERT_BSONOBJ_EQ(request.getCollation().value_or(BSONObj()), BSON("locale" @@ -121,7 +123,9 @@ TEST(AggregationRequestTest, ShouldParseExplicitExplainFalseWithCursorOption) { "'a'}"); auto request = unittest::assertGet(aggregation_request_helper::parseFromBSON(nss, inputBson)); ASSERT_FALSE(request.getExplain()); - ASSERT_EQ(request.getBatchSize(), 10); + ASSERT_EQ( + request.getCursor().getBatchSize().value_or(aggregation_request_helper::kDefaultBatchSize), + 10); } TEST(AggregationRequestTest, ShouldParseWithSeparateQueryPlannerExplainModeArg) { @@ -142,7 +146,9 @@ TEST(AggregationRequestTest, ShouldParseWithSeparateQueryPlannerExplainModeArgAn nss, inputBson, ExplainOptions::Verbosity::kExecStats)); ASSERT_TRUE(request.getExplain()); ASSERT(*request.getExplain() == ExplainOptions::Verbosity::kExecStats); - ASSERT_EQ(request.getBatchSize(), 10); + ASSERT_EQ( + request.getCursor().getBatchSize().value_or(aggregation_request_helper::kDefaultBatchSize), + 10); } TEST(AggregationRequestTest, ShouldParseExplainFlagWithReadConcern) { @@ -170,7 +176,7 @@ TEST(AggregationRequestTest, ShouldOnlySerializeRequiredFieldsIfNoOptionalFields auto expectedSerialization = Document{{AggregateCommand::kCommandName, nss.coll()}, {AggregateCommand::kPipelineFieldName, std::vector{}}, - {AggregateCommand::kBatchSizeFieldName, Value(kDefaultCursorOptionDocument)}}; + {AggregateCommand::kCursorFieldName, Value(kDefaultCursorOptionDocument)}}; ASSERT_DOCUMENT_EQ(aggregation_request_helper::serializeToCommandDoc(request), expectedSerialization); } @@ -183,7 +189,9 @@ TEST(AggregationRequestTest, ShouldSerializeOptionalValuesIfSet) { request.setNeedsMerge(true); request.setBypassDocumentValidation(true); request.setRequestReshardingResumeToken(true); - request.setBatchSize(10); + SimpleCursorOptions cursor; + cursor.setBatchSize(10); + request.setCursor(cursor); request.setMaxTimeMS(10u); const auto hintObj = BSON("a" << 1); request.setHint(hintObj); @@ -203,23 +211,23 @@ TEST(AggregationRequestTest, ShouldSerializeOptionalValuesIfSet) { auto uuid = UUID::gen(); request.setCollectionUUID(uuid); - auto expectedSerialization = Document{ - {AggregateCommand::kCommandName, nss.coll()}, - {AggregateCommand::kPipelineFieldName, std::vector{}}, - {AggregateCommand::kAllowDiskUseFieldName, true}, - {AggregateCommand::kBatchSizeFieldName, Value(Document({{kBatchSizeFieldName, 10}}))}, - {QueryRequest::cmdOptionMaxTimeMS, 10}, - {AggregateCommand::kBypassDocumentValidationFieldName, true}, - {repl::ReadConcernArgs::kReadConcernFieldName, readConcernObj}, - {AggregateCommand::kCollationFieldName, collationObj}, - {AggregateCommand::kHintFieldName, hintObj}, - {AggregateCommand::kLetFieldName, letParamsObj}, - {AggregateCommand::kNeedsMergeFieldName, true}, - {AggregateCommand::kFromMongosFieldName, true}, - {QueryRequest::kUnwrappedReadPrefField, readPrefObj}, - {AggregateCommand::kRequestReshardingResumeTokenFieldName, true}, - {AggregateCommand::kIsMapReduceCommandFieldName, true}, - {AggregateCommand::kCollectionUUIDFieldName, uuid}}; + auto expectedSerialization = + Document{{AggregateCommand::kCommandName, nss.coll()}, + {AggregateCommand::kPipelineFieldName, std::vector{}}, + {AggregateCommand::kAllowDiskUseFieldName, true}, + {AggregateCommand::kCursorFieldName, Value(Document({{kBatchSizeFieldName, 10}}))}, + {QueryRequest::cmdOptionMaxTimeMS, 10}, + {AggregateCommand::kBypassDocumentValidationFieldName, true}, + {repl::ReadConcernArgs::kReadConcernFieldName, readConcernObj}, + {AggregateCommand::kCollationFieldName, collationObj}, + {AggregateCommand::kHintFieldName, hintObj}, + {AggregateCommand::kLetFieldName, letParamsObj}, + {AggregateCommand::kNeedsMergeFieldName, true}, + {AggregateCommand::kFromMongosFieldName, true}, + {QueryRequest::kUnwrappedReadPrefField, readPrefObj}, + {AggregateCommand::kRequestReshardingResumeTokenFieldName, true}, + {AggregateCommand::kIsMapReduceCommandFieldName, true}, + {AggregateCommand::kCollectionUUIDFieldName, uuid}}; ASSERT_DOCUMENT_EQ(aggregation_request_helper::serializeToCommandDoc(request), expectedSerialization); } @@ -227,12 +235,14 @@ TEST(AggregationRequestTest, ShouldSerializeOptionalValuesIfSet) { TEST(AggregationRequestTest, ShouldSerializeBatchSizeIfSetAndExplainFalse) { NamespaceString nss("a.collection"); AggregateCommand request(nss, {}); - request.setBatchSize(10); + SimpleCursorOptions cursor; + cursor.setBatchSize(10); + request.setCursor(cursor); auto expectedSerialization = Document{ {AggregateCommand::kCommandName, nss.coll()}, {AggregateCommand::kPipelineFieldName, std::vector{}}, - {AggregateCommand::kBatchSizeFieldName, Value(Document({{kBatchSizeFieldName, 10}}))}}; + {AggregateCommand::kCursorFieldName, Value(Document({{kBatchSizeFieldName, 10}}))}}; ASSERT_DOCUMENT_EQ(aggregation_request_helper::serializeToCommandDoc(request), expectedSerialization); } @@ -244,7 +254,7 @@ TEST(AggregationRequestTest, ShouldSerialiseAggregateFieldToOneIfCollectionIsAgg auto expectedSerialization = Document{{AggregateCommand::kCommandName, 1}, {AggregateCommand::kPipelineFieldName, std::vector{}}, - {AggregateCommand::kBatchSizeFieldName, + {AggregateCommand::kCursorFieldName, Value(Document({{aggregation_request_helper::kBatchSizeField, aggregation_request_helper::kDefaultBatchSize}}))}}; @@ -258,7 +268,9 @@ TEST(AggregationRequestTest, ShouldSetBatchSizeToDefaultOnEmptyCursorObject) { "{aggregate: 'collection', pipeline: [{$match: {a: 'abc'}}], cursor: {}, $db: 'a'}"); auto request = aggregation_request_helper::parseFromBSON(nss, inputBson); ASSERT_OK(request.getStatus()); - ASSERT_EQ(request.getValue().getBatchSize(), aggregation_request_helper::kDefaultBatchSize); + ASSERT_EQ(request.getValue().getCursor().getBatchSize().value_or( + aggregation_request_helper::kDefaultBatchSize), + aggregation_request_helper::kDefaultBatchSize); } TEST(AggregationRequestTest, ShouldAcceptHintAsString) { @@ -276,13 +288,15 @@ TEST(AggregationRequestTest, ShouldAcceptHintAsString) { TEST(AggregationRequestTest, ShouldNotSerializeBatchSizeWhenExplainSet) { NamespaceString nss("a.collection"); AggregateCommand request(nss, {}); - request.setBatchSize(10); + SimpleCursorOptions cursor; + cursor.setBatchSize(10); + request.setCursor(cursor); request.setExplain(ExplainOptions::Verbosity::kQueryPlanner); auto expectedSerialization = Document{{AggregateCommand::kCommandName, nss.coll()}, {AggregateCommand::kPipelineFieldName, std::vector{}}, - {AggregateCommand::kBatchSizeFieldName, Value(Document())}}; + {AggregateCommand::kCursorFieldName, Value(Document())}}; ASSERT_DOCUMENT_EQ(aggregation_request_helper::serializeToCommandDoc(request), expectedSerialization); } diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index 8a0fbc10998..ec72dceff96 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -109,7 +109,9 @@ RemoteCursor openChangeStreamNewShardMonitor(const boost::intrusive_ptropCtx, true, /* appendRC */ @@ -827,7 +829,7 @@ BSONObj createCommandForTargetedShards(const boost::intrusive_ptrserialize()); - translatedCmd[AggregateCommand::kBatchSizeFieldName] = + translatedCmd[AggregateCommand::kCursorFieldName] = Value(Document{{"batchSize", std::numeric_limits::max()}}); translatedCmd[AggregateCommand::kAllowDiskUseFieldName] = Value(true); translatedCmd[AggregateCommand::kFromMongosFieldName] = Value(true); diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp index bdbcd306c5e..fa0d714d31c 100644 --- a/src/mongo/s/query/cluster_aggregate.cpp +++ b/src/mongo/s/query/cluster_aggregate.cpp @@ -344,7 +344,8 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, return cluster_aggregation_planner::runPipelineOnMongoS( namespaces, - request.getBatchSize(), + request.getCursor().getBatchSize().value_or( + aggregation_request_helper::kDefaultBatchSize), std::move(targeter.pipeline), result, privileges); @@ -355,7 +356,8 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, opCtx, std::move(targeter), aggregation_request_helper::serializeToCommandDoc(request), - request.getBatchSize(), + request.getCursor().getBatchSize().value_or( + aggregation_request_helper::kDefaultBatchSize), namespaces, privileges, result, -- cgit v1.2.1