diff options
-rw-r--r-- | buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml | 1 | ||||
-rw-r--r-- | jstests/sharding/view_rewrite.js | 213 | ||||
-rw-r--r-- | src/mongo/db/pipeline/aggregation_request.cpp | 40 | ||||
-rw-r--r-- | src/mongo/db/pipeline/aggregation_request.h | 34 | ||||
-rw-r--r-- | src/mongo/db/pipeline/aggregation_request_test.cpp | 54 | ||||
-rw-r--r-- | src/mongo/db/query/count_request.cpp | 81 | ||||
-rw-r--r-- | src/mongo/db/query/count_request.h | 55 | ||||
-rw-r--r-- | src/mongo/db/query/count_request_test.cpp | 123 | ||||
-rw-r--r-- | src/mongo/db/query/parsed_distinct.cpp | 62 | ||||
-rw-r--r-- | src/mongo/db/query/parsed_distinct.h | 1 | ||||
-rw-r--r-- | src/mongo/db/query/parsed_distinct_test.cpp | 72 | ||||
-rw-r--r-- | src/mongo/db/query/query_request.cpp | 14 | ||||
-rw-r--r-- | src/mongo/db/query/query_request.h | 15 | ||||
-rw-r--r-- | src/mongo/db/query/query_request_test.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/views/resolved_view.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/views/resolved_view_test.cpp | 42 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_aggregate.cpp | 32 |
17 files changed, 746 insertions, 112 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml index 44e324a0600..0f9e6d36c4d 100644 --- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml +++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml @@ -16,6 +16,7 @@ selector: - jstests/sharding/linearizable_read_concern.js # Enable when 3.6 becomes last-stable. - jstests/sharding/views.js + - jstests/sharding/view_rewrite.js # New feature in v3.6 mongos - jstests/sharding/logical_time_metadata.js diff --git a/jstests/sharding/view_rewrite.js b/jstests/sharding/view_rewrite.js new file mode 100644 index 00000000000..5e45a3905cc --- /dev/null +++ b/jstests/sharding/view_rewrite.js @@ -0,0 +1,213 @@ +/** + * Tests that query options are not dropped by mongos when a query against a view is rewritten as an + * aggregation against the underlying collection. + */ +(function() { + "use strict"; + + load("jstests/libs/profiler.js"); // For profilerHasSingleMatchingEntryOrThrow. + + const st = new ShardingTest({ + name: "view_rewrite", + shards: 2, + other: { + rs0: { + nodes: [ + {rsConfig: {priority: 1}}, + {rsConfig: {priority: 0, tags: {"tag": "secondary"}}} + ] + }, + rs1: { + nodes: [ + {rsConfig: {priority: 1}}, + {rsConfig: {priority: 0, tags: {"tag": "secondary"}}} + ] + }, + enableBalancer: false + } + }); + + const mongos = st.s; + const config = mongos.getDB("config"); + const mongosDB = mongos.getDB("view_rewrite"); + assert.commandWorked(mongosDB.dropDatabase()); + + const coll = mongosDB.getCollection("coll"); + + assert.commandWorked(config.adminCommand({enableSharding: mongosDB.getName()})); + st.ensurePrimaryShard(mongosDB.getName(), "view_rewrite-rs0"); + const rs0Secondary = st.rs0.getSecondary(); + const rs1Primary = st.rs1.getPrimary(); + const rs1Secondary = st.rs1.getSecondary(); + + for (let i = 0; i < 10; ++i) { + assert.writeOK(coll.insert({a: i})); + } + + assert.commandWorked(mongosDB.createView("view", coll.getName(), [])); + const view = mongosDB.getCollection("view"); + + // + // Confirms that queries run against views on mongos result in execution of a rewritten + // aggregation that contains all expected query options. + // + function confirmOptionsInProfiler(shardPrimary) { + assert.commandWorked(shardPrimary.setProfilingLevel(2)); + + // Aggregation + assert.commandWorked(mongosDB.runCommand({ + aggregate: "view", + pipeline: [], + comment: "agg_rewrite", + maxTimeMS: 5 * 60 * 1000, + readConcern: {level: "linearizable"}, + cursor: {} + })); + + profilerHasSingleMatchingEntryOrThrow(shardPrimary, { + "ns": coll.getFullName(), + "command.aggregate": coll.getName(), + "command.comment": "agg_rewrite", + "command.maxTimeMS": 5 * 60 * 1000, + "command.readConcern": {level: "linearizable"}, + "command.pipeline.$mergeCursors": {"$exists": false} + }); + + // Find + assert.commandWorked(mongosDB.runCommand({ + find: "view", + comment: "find_rewrite", + maxTimeMS: 5 * 60 * 1000, + readConcern: {level: "linearizable"} + })); + + profilerHasSingleMatchingEntryOrThrow(shardPrimary, { + "ns": coll.getFullName(), + "command.aggregate": coll.getName(), + "command.comment": "find_rewrite", + "command.maxTimeMS": 5 * 60 * 1000, + "command.readConcern": {level: "linearizable"}, + "command.pipeline.$mergeCursors": {"$exists": false} + }); + + // Count + assert.commandWorked(mongosDB.runCommand({ + count: "view", + comment: "count_rewrite", + maxTimeMS: 5 * 60 * 1000, + readConcern: {level: "linearizable"} + })); + + profilerHasSingleMatchingEntryOrThrow(shardPrimary, { + "ns": coll.getFullName(), + "command.aggregate": coll.getName(), + "command.comment": "count_rewrite", + "command.maxTimeMS": 5 * 60 * 1000, + "command.readConcern": {level: "linearizable"}, + "command.pipeline.$mergeCursors": {"$exists": false} + }); + + // Distinct + assert.commandWorked(mongosDB.runCommand({ + distinct: "view", + key: "a", + comment: "distinct_rewrite", + maxTimeMS: 5 * 60 * 1000, + readConcern: {level: "linearizable"} + })); + + profilerHasSingleMatchingEntryOrThrow(shardPrimary, { + "ns": coll.getFullName(), + "command.aggregate": coll.getName(), + "command.comment": "distinct_rewrite", + "command.maxTimeMS": 5 * 60 * 1000, + "command.readConcern": {level: "linearizable"}, + "command.pipeline.$mergeCursors": {"$exists": false} + }); + + assert.commandWorked(shardPrimary.setProfilingLevel(0)); + shardPrimary.system.profile.drop(); + } + + // + // Confirms that queries run against views on mongos are executed against a tagged secondary, as + // per readPreference setting. + // + function confirmReadPreference(shardSecondary) { + assert.commandWorked(shardSecondary.setProfilingLevel(2)); + + // Aggregation + assert.commandWorked(mongosDB.runCommand({ + query: {aggregate: "view", pipeline: [], comment: "agg_readPref", cursor: {}}, + $readPreference: {mode: "nearest", tags: [{tag: "secondary"}]} + })); + + profilerHasSingleMatchingEntryOrThrow(shardSecondary, { + "ns": coll.getFullName(), + "command.aggregate": coll.getName(), + "command.comment": "agg_readPref", + "command.pipeline.$mergeCursors": {"$exists": false} + }); + + // Find + assert.commandWorked(mongosDB.runCommand({ + query: {find: "view", comment: "find_readPref", maxTimeMS: 5 * 60 * 1000}, + $readPreference: {mode: "nearest", tags: [{tag: "secondary"}]} + })); + + profilerHasSingleMatchingEntryOrThrow(shardSecondary, { + "ns": coll.getFullName(), + "command.aggregate": coll.getName(), + "command.comment": "find_readPref", + "command.pipeline.$mergeCursors": {"$exists": false} + }); + + // Count + assert.commandWorked(mongosDB.runCommand({ + query: {count: "view", comment: "count_readPref"}, + $readPreference: {mode: "nearest", tags: [{tag: "secondary"}]} + })); + + profilerHasSingleMatchingEntryOrThrow(shardSecondary, { + "ns": coll.getFullName(), + "command.aggregate": coll.getName(), + "command.comment": "count_readPref", + "command.pipeline.$mergeCursors": {"$exists": false} + }); + + // Distinct + assert.commandWorked(mongosDB.runCommand({ + query: {distinct: "view", key: "a", comment: "distinct_readPref"}, + $readPreference: {mode: "nearest", tags: [{tag: "secondary"}]} + })); + + profilerHasSingleMatchingEntryOrThrow(shardSecondary, { + "ns": coll.getFullName(), + "command.aggregate": coll.getName(), + "command.comment": "distinct_readPref", + "command.pipeline.$mergeCursors": {"$exists": false} + }); + + assert.commandWorked(shardSecondary.setProfilingLevel(0)); + } + + // + // Test rewrite for queries run against an unsharded collection. + // + confirmOptionsInProfiler(st.rs0.getPrimary().getDB(mongosDB.getName())); + confirmReadPreference(st.rs0.getSecondary().getDB(mongosDB.getName())); + + // + // Test rewrite for queries run against a sharded collection. + // + assert.commandWorked(coll.createIndex({a: 1})); + assert.commandWorked(config.adminCommand({shardCollection: coll.getFullName(), key: {a: 1}})); + assert.commandWorked(mongos.adminCommand({split: coll.getFullName(), middle: {a: 6}})); + assert.commandWorked(mongosDB.adminCommand( + {moveChunk: coll.getFullName(), find: {a: 25}, to: "view_rewrite-rs1"})); + // Sharded tests are run against the non-primary shard for the "view_rewrite" db. + confirmOptionsInProfiler(st.rs1.getPrimary().getDB(mongosDB.getName())); + confirmReadPreference(st.rs1.getSecondary().getDB(mongosDB.getName())); + + st.stop(); +})(); diff --git a/src/mongo/db/pipeline/aggregation_request.cpp b/src/mongo/db/pipeline/aggregation_request.cpp index 214c134837f..43adbb213ad 100644 --- a/src/mongo/db/pipeline/aggregation_request.cpp +++ b/src/mongo/db/pipeline/aggregation_request.cpp @@ -84,11 +84,7 @@ StatusWith<AggregationRequest> AggregationRequest::parseFromBSON( AggregationRequest request(std::move(nss), std::move(pipeline)); const std::initializer_list<StringData> optionsParsedElseWhere = { - QueryRequest::cmdOptionMaxTimeMS, - WriteConcernOptions::kWriteConcernField, - kPipelineName, - kCommandName, - repl::ReadConcernArgs::kReadConcernFieldName}; + WriteConcernOptions::kWriteConcernField, kPipelineName, kCommandName}; bool hasCursorElem = false; bool hasExplainElem = false; @@ -97,6 +93,12 @@ StatusWith<AggregationRequest> AggregationRequest::parseFromBSON( for (auto&& elem : cmdObj) { auto fieldName = elem.fieldNameStringData(); + // $queryOptions is the exception to our '$' filter. We expect this field to be validated + // elsewhere. + if (QueryRequest::kUnwrappedReadPrefField == fieldName) { + request.setUnwrappedReadPref(elem.embeddedObject()); + } + // Ignore top-level fields prefixed with $. They are for the command processor, not us. if (fieldName[0] == '$') { continue; @@ -125,6 +127,21 @@ StatusWith<AggregationRequest> AggregationRequest::parseFromBSON( << typeName(elem.type())}; } request.setCollation(elem.embeddedObject().getOwned()); + } else if (QueryRequest::cmdOptionMaxTimeMS == fieldName) { + auto maxTimeMs = QueryRequest::parseMaxTimeMS(elem); + if (!maxTimeMs.isOK()) { + return maxTimeMs.getStatus(); + } + + request.setMaxTimeMS(maxTimeMs.getValue()); + } else if (repl::ReadConcernArgs::kReadConcernFieldName == fieldName) { + if (elem.type() != BSONType::Object) { + return {ErrorCodes::TypeMismatch, + str::stream() << repl::ReadConcernArgs::kReadConcernFieldName + << " must be an object, not a " + << typeName(elem.type())}; + } + request.setReadConcern(elem.embeddedObject().getOwned()); } else if (kHintName == fieldName) { if (BSONType::Object == elem.type()) { request.setHint(elem.embeddedObject()); @@ -198,7 +215,7 @@ StatusWith<AggregationRequest> AggregationRequest::parseFromBSON( << "' option is required, except for aggregation explain"}; } - if (request.getExplain() && cmdObj[repl::ReadConcernArgs::kReadConcernFieldName]) { + if (request.getExplain() && !request.getReadConcern().isEmpty()) { return {ErrorCodes::FailedToParse, str::stream() << "Aggregation explain does not support the '" << repl::ReadConcernArgs::kReadConcernFieldName @@ -232,7 +249,16 @@ Document AggregationRequest::serializeToCommandObj() const { // Only serialize a hint if one was specified. {kHintName, _hint.isEmpty() ? Value() : Value(_hint)}, // Only serialize a comment if one was specified. - {kCommentName, _comment.empty() ? Value() : Value(_comment)}}; + {kCommentName, _comment.empty() ? Value() : Value(_comment)}, + // Only serialize readConcern if specified. + {repl::ReadConcernArgs::kReadConcernFieldName, + _readConcern.isEmpty() ? Value() : Value(_readConcern)}, + // Only serialize the unwrapped read preference if specified. + {QueryRequest::kUnwrappedReadPrefField, + _unwrappedReadPref.isEmpty() ? Value() : Value(_unwrappedReadPref)}, + // Only serialize maxTimeMs if specified. + {QueryRequest::cmdOptionMaxTimeMS, + _maxTimeMS == 0 ? Value() : Value(static_cast<int>(_maxTimeMS))}}; } } // namespace mongo diff --git a/src/mongo/db/pipeline/aggregation_request.h b/src/mongo/db/pipeline/aggregation_request.h index 935b98fed97..1f290f26a90 100644 --- a/src/mongo/db/pipeline/aggregation_request.h +++ b/src/mongo/db/pipeline/aggregation_request.h @@ -140,6 +140,18 @@ public: return _explainMode; } + unsigned int getMaxTimeMS() const { + return _maxTimeMS; + } + + const BSONObj& getReadConcern() const { + return _readConcern; + } + + const BSONObj& getUnwrappedReadPref() const { + return _unwrappedReadPref; + } + // // Setters for optional fields. // @@ -180,6 +192,18 @@ public: _bypassDocumentValidation = shouldBypassDocumentValidation; } + void setMaxTimeMS(unsigned int maxTimeMS) { + _maxTimeMS = maxTimeMS; + } + + void setReadConcern(BSONObj readConcern) { + _readConcern = readConcern.getOwned(); + } + + void setUnwrappedReadPref(BSONObj unwrappedReadPref) { + _unwrappedReadPref = unwrappedReadPref.getOwned(); + } + private: // Required fields. const NamespaceString _nss; @@ -203,11 +227,21 @@ private: // The comment parameter attached to this aggregation. std::string _comment; + BSONObj _readConcern; + + // The unwrapped readPreference object, if one was given to us by the mongos command processor. + // This object will be empty when no readPreference is specified or if the request does not + // originate from mongos. + BSONObj _unwrappedReadPref; + // The explain mode to use, or boost::none if this is not a request for an aggregation explain. boost::optional<ExplainOptions::Verbosity> _explainMode; bool _allowDiskUse = false; bool _fromRouter = false; bool _bypassDocumentValidation = false; + + // A user-specified maxTimeMS limit, or a value of '0' if not specified. + unsigned int _maxTimeMS = 0; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/aggregation_request_test.cpp b/src/mongo/db/pipeline/aggregation_request_test.cpp index e148cef9b25..ba036c71bf4 100644 --- a/src/mongo/db/pipeline/aggregation_request_test.cpp +++ b/src/mongo/db/pipeline/aggregation_request_test.cpp @@ -38,6 +38,8 @@ #include "mongo/db/pipeline/document.h" #include "mongo/db/pipeline/document_value_test_util.h" #include "mongo/db/pipeline/value.h" +#include "mongo/db/query/query_request.h" +#include "mongo/db/repl/read_concern_args.h" #include "mongo/unittest/unittest.h" #include "mongo/util/assert_util.h" @@ -54,12 +56,12 @@ const Document kDefaultCursorOptionDocument{ TEST(AggregationRequestTest, ShouldParseAllKnownOptions) { NamespaceString nss("a.collection"); const BSONObj inputBson = fromjson( - "{pipeline: [{$match: {a: 'abc'}}], explain: true, allowDiskUse: true, fromRouter: true, " + "{pipeline: [{$match: {a: 'abc'}}], explain: false, allowDiskUse: true, fromRouter: true, " "bypassDocumentValidation: true, collation: {locale: 'en_US'}, cursor: {batchSize: 10}, " - "hint: {a: 1}, comment: 'agg_comment'}"); + "hint: {a: 1}, maxTimeMS: 100, readConcern: {level: 'linearizable'}, " + "$queryOptions: {$readPreference: 'nearest'}, comment: 'agg_comment'}}"); auto request = unittest::assertGet(AggregationRequest::parseFromBSON(nss, inputBson)); - ASSERT_TRUE(request.getExplain()); - ASSERT(*request.getExplain() == ExplainOptions::Verbosity::kQueryPlanner); + ASSERT_FALSE(request.getExplain()); ASSERT_TRUE(request.shouldAllowDiskUse()); ASSERT_TRUE(request.isFromRouter()); ASSERT_TRUE(request.shouldBypassDocumentValidation()); @@ -69,6 +71,21 @@ TEST(AggregationRequestTest, ShouldParseAllKnownOptions) { ASSERT_BSONOBJ_EQ(request.getCollation(), BSON("locale" << "en_US")); + ASSERT_EQ(request.getMaxTimeMS(), 100u); + ASSERT_BSONOBJ_EQ(request.getReadConcern(), + BSON("level" + << "linearizable")); + ASSERT_BSONOBJ_EQ(request.getUnwrappedReadPref(), + BSON("$readPreference" + << "nearest")); +} + +TEST(AggregationRequestTest, ShouldParseExplicitExplainTrue) { + NamespaceString nss("a.collection"); + const BSONObj inputBson = fromjson("{pipeline: [], explain: true, cursor: {}}"); + auto request = unittest::assertGet(AggregationRequest::parseFromBSON(nss, inputBson)); + ASSERT_TRUE(request.getExplain()); + ASSERT(*request.getExplain() == ExplainOptions::Verbosity::kQueryPlanner); } TEST(AggregationRequestTest, ShouldParseExplicitExplainFalseWithCursorOption) { @@ -123,6 +140,9 @@ TEST(AggregationRequestTest, ShouldNotSerializeOptionalValuesIfEquivalentToDefau request.setCollation(BSONObj()); request.setHint(BSONObj()); request.setComment(""); + request.setMaxTimeMS(0u); + request.setUnwrappedReadPref(BSONObj()); + request.setReadConcern(BSONObj()); auto expectedSerialization = Document{{AggregationRequest::kCommandName, nss.coll()}, @@ -138,6 +158,7 @@ TEST(AggregationRequestTest, ShouldSerializeOptionalValuesIfSet) { request.setFromRouter(true); request.setBypassDocumentValidation(true); request.setBatchSize(10); + request.setMaxTimeMS(10u); const auto hintObj = BSON("a" << 1); request.setHint(hintObj); const auto comment = std::string("agg_comment"); @@ -145,6 +166,12 @@ TEST(AggregationRequestTest, ShouldSerializeOptionalValuesIfSet) { const auto collationObj = BSON("locale" << "en_US"); request.setCollation(collationObj); + const auto readPrefObj = BSON("$readPreference" + << "nearest"); + request.setUnwrappedReadPref(readPrefObj); + const auto readConcernObj = BSON("level" + << "linearizable"); + request.setReadConcern(readConcernObj); auto expectedSerialization = Document{{AggregationRequest::kCommandName, nss.coll()}, @@ -156,7 +183,10 @@ TEST(AggregationRequestTest, ShouldSerializeOptionalValuesIfSet) { {AggregationRequest::kCursorName, Value(Document({{AggregationRequest::kBatchSizeName, 10}}))}, {AggregationRequest::kHintName, hintObj}, - {AggregationRequest::kCommentName, comment}}; + {AggregationRequest::kCommentName, comment}, + {repl::ReadConcernArgs::kReadConcernFieldName, readConcernObj}, + {QueryRequest::kUnwrappedReadPrefField, readPrefObj}, + {QueryRequest::cmdOptionMaxTimeMS, 10}}; ASSERT_DOCUMENT_EQ(request.serializeToCommandObj(), expectedSerialization); } @@ -352,19 +382,5 @@ TEST(AggregationRequestTest, ShouldIgnoreWriteConcernOption) { ASSERT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); } -TEST(AggregationRequestTest, ShouldIgnoreMaxTimeMsOption) { - NamespaceString nss("a.collection"); - const BSONObj inputBson = - fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, maxTimeMS: 'invalid'}"); - ASSERT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); -} - -TEST(AggregationRequestTest, ShouldIgnoreReadConcernOption) { - NamespaceString nss("a.collection"); - const BSONObj inputBson = - fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, readConcern: 'invalid'}"); - ASSERT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); -} - } // namespace } // namespace mongo diff --git a/src/mongo/db/query/count_request.cpp b/src/mongo/db/query/count_request.cpp index 3451cd7989d..2515cb4b382 100644 --- a/src/mongo/db/query/count_request.cpp +++ b/src/mongo/db/query/count_request.cpp @@ -30,6 +30,7 @@ #include "mongo/db/query/count_request.h" +#include "mongo/db/query/query_request.h" #include "mongo/util/mongoutils/str.h" namespace mongo { @@ -42,7 +43,9 @@ const char kSkipField[] = "skip"; const char kHintField[] = "hint"; const char kCollationField[] = "collation"; const char kExplainField[] = "explain"; - +const char kCommentField[] = "comment"; +const char kMaxTimeMSField[] = "maxTimeMS"; +const char kReadConcernField[] = "readConcern"; } // namespace CountRequest::CountRequest(NamespaceString nss, BSONObj query) @@ -56,31 +59,6 @@ void CountRequest::setCollation(BSONObj collation) { _collation = collation.getOwned(); } -BSONObj CountRequest::toBSON() const { - BSONObjBuilder builder; - - builder.append(kCmdName, _nss.ns()); - builder.append(kQueryField, _query); - - if (_limit) { - builder.append(kLimitField, _limit.get()); - } - - if (_skip) { - builder.append(kSkipField, _skip.get()); - } - - if (_hint) { - builder.append(kHintField, _hint.get()); - } - - if (_collation) { - builder.append(kCollationField, _collation.get()); - } - - return builder.obj(); -} - StatusWith<CountRequest> CountRequest::parseFromBSON(const std::string& dbname, const BSONObj& cmdObj, bool isExplain) { @@ -121,8 +99,17 @@ StatusWith<CountRequest> CountRequest::parseFromBSON(const std::string& dbname, return Status(ErrorCodes::BadValue, "skip value is not a valid number"); } + // maxTimeMS + if (cmdObj[kMaxTimeMSField].ok()) { + auto maxTimeMS = QueryRequest::parseMaxTimeMS(cmdObj[kMaxTimeMSField]); + if (!maxTimeMS.isOK()) { + return maxTimeMS.getStatus(); + } + request.setMaxTimeMS(static_cast<unsigned int>(maxTimeMS.getValue())); + } + // Hint - if (Object == cmdObj[kHintField].type()) { + if (BSONType::Object == cmdObj[kHintField].type()) { request.setHint(cmdObj[kHintField].Obj()); } else if (String == cmdObj[kHintField].type()) { const std::string hint = cmdObj.getStringField(kHintField); @@ -130,12 +117,34 @@ StatusWith<CountRequest> CountRequest::parseFromBSON(const std::string& dbname, } // Collation - if (Object == cmdObj[kCollationField].type()) { + if (BSONType::Object == cmdObj[kCollationField].type()) { request.setCollation(cmdObj[kCollationField].Obj()); } else if (cmdObj[kCollationField].ok()) { return Status(ErrorCodes::BadValue, "collation value is not a document"); } + // readConcern + if (BSONType::Object == cmdObj[kReadConcernField].type()) { + request.setReadConcern(cmdObj[kReadConcernField].Obj()); + } else if (cmdObj[kReadConcernField].ok()) { + return Status(ErrorCodes::BadValue, "readConcern value is not a document"); + } + + // unwrappedReadPref + if (BSONType::Object == cmdObj[QueryRequest::kUnwrappedReadPrefField].type()) { + request.setUnwrappedReadPref(cmdObj[QueryRequest::kUnwrappedReadPrefField].Obj()); + } else if (cmdObj[QueryRequest::kUnwrappedReadPrefField].ok()) { + return Status(ErrorCodes::BadValue, "readPreference value is not a document"); + } + + // Comment + if (BSONType::String == cmdObj[kCommentField].type()) { + request.setComment(cmdObj[kCommentField].valueStringData()); + } else if (cmdObj[kCommentField].ok()) { + return Status(ErrorCodes::BadValue, "comment value is not a string"); + } + + // Explain request.setExplain(isExplain); @@ -179,6 +188,22 @@ StatusWith<BSONObj> CountRequest::asAggregationCommand() const { aggregationBuilder.append(kHintField, *_hint); } + if (!_comment.empty()) { + aggregationBuilder.append(kCommentField, _comment); + } + + if (_maxTimeMS > 0) { + aggregationBuilder.append(kMaxTimeMSField, _maxTimeMS); + } + + if (!_readConcern.isEmpty()) { + aggregationBuilder.append(kReadConcernField, _readConcern); + } + + if (!_unwrappedReadPref.isEmpty()) { + aggregationBuilder.append(QueryRequest::kUnwrappedReadPrefField, _unwrappedReadPref); + } + // The 'cursor' option is always specified so that aggregation uses the cursor interface. aggregationBuilder.append("cursor", BSONObj()); diff --git a/src/mongo/db/query/count_request.h b/src/mongo/db/query/count_request.h index 0fd466ab93c..e1b951cb7de 100644 --- a/src/mongo/db/query/count_request.h +++ b/src/mongo/db/query/count_request.h @@ -54,7 +54,7 @@ public: return _nss; } - const BSONObj getQuery() const { + BSONObj getQuery() const { return _query; } @@ -74,13 +74,13 @@ public: _skip = skip; } - const BSONObj getHint() const { + BSONObj getHint() const { return _hint.value_or(BSONObj()); } void setHint(BSONObj hint); - const BSONObj getCollation() const { + BSONObj getCollation() const { return _collation.value_or(BSONObj()); } @@ -94,11 +94,37 @@ public: _explain = explain; } - /** - * Constructs a BSON representation of this request, which can be used for sending it in - * commands. - */ - BSONObj toBSON() const; + const std::string& getComment() const { + return _comment; + } + + void setComment(StringData comment) { + _comment = comment.toString(); + } + + unsigned int getMaxTimeMS() const { + return _maxTimeMS; + } + + void setMaxTimeMS(unsigned int maxTimeMS) { + _maxTimeMS = maxTimeMS; + } + + BSONObj getReadConcern() const { + return _readConcern; + } + + void setReadConcern(BSONObj readConcern) { + _readConcern = readConcern.getOwned(); + } + + BSONObj getUnwrappedReadPref() const { + return _unwrappedReadPref; + } + + void setUnwrappedReadPref(BSONObj unwrappedReadPref) { + _unwrappedReadPref = unwrappedReadPref.getOwned(); + } /** * Converts this CountRequest into an aggregation. @@ -133,6 +159,19 @@ private: // Optional. The collation used to compare strings. boost::optional<BSONObj> _collation; + BSONObj _readConcern; + + // The unwrapped readPreference object, if one was given to us by the mongos command processor. + // This object will be empty when no readPreference is specified or if the request does not + // originate from mongos. + BSONObj _unwrappedReadPref; + + // When non-empty, represents a user comment. + std::string _comment; + + // A user-specified maxTimeMS limit, or a value of '0' if not specified. + unsigned int _maxTimeMS = 0; + // If true, generate an explain plan instead of the actual count. bool _explain = false; }; diff --git a/src/mongo/db/query/count_request_test.cpp b/src/mongo/db/query/count_request_test.cpp index 8508c75ead6..509e55236ed 100644 --- a/src/mongo/db/query/count_request_test.cpp +++ b/src/mongo/db/query/count_request_test.cpp @@ -61,8 +61,12 @@ TEST(CountRequest, ParseDefaults) { // Defaults ASSERT_EQUALS(countRequest.getLimit(), 0); ASSERT_EQUALS(countRequest.getSkip(), 0); + ASSERT_EQUALS(countRequest.getMaxTimeMS(), 0u); ASSERT(countRequest.getHint().isEmpty()); ASSERT(countRequest.getCollation().isEmpty()); + ASSERT(countRequest.getReadConcern().isEmpty()); + ASSERT(countRequest.getUnwrappedReadPref().isEmpty()); + ASSERT(countRequest.getComment().empty()); } TEST(CountRequest, ParseComplete) { @@ -81,7 +85,17 @@ TEST(CountRequest, ParseComplete) { << BSON("b" << 5) << "collation" << BSON("locale" - << "en_US")), + << "en_US") + << "readConcern" + << BSON("level" + << "linearizable") + << "$queryOptions" + << BSON("$readPreference" + << "secondary") + << "comment" + << "aComment" + << "maxTimeMS" + << 10000), isExplain); ASSERT_OK(countRequestStatus.getStatus()); @@ -92,8 +106,13 @@ TEST(CountRequest, ParseComplete) { ASSERT_BSONOBJ_EQ(countRequest.getQuery(), fromjson("{ a : { '$gte' : 11 } }")); ASSERT_EQUALS(countRequest.getLimit(), 100); ASSERT_EQUALS(countRequest.getSkip(), 1000); + ASSERT_EQUALS(countRequest.getMaxTimeMS(), 10000u); + ASSERT_EQUALS(countRequest.getComment(), "aComment"); ASSERT_BSONOBJ_EQ(countRequest.getHint(), fromjson("{ b : 5 }")); ASSERT_BSONOBJ_EQ(countRequest.getCollation(), fromjson("{ locale : 'en_US' }")); + ASSERT_BSONOBJ_EQ(countRequest.getReadConcern(), fromjson("{ level: 'linearizable' }")); + ASSERT_BSONOBJ_EQ(countRequest.getUnwrappedReadPref(), + fromjson("{ $readPreference: 'secondary' }")); } TEST(CountRequest, ParseWithExplain) { @@ -189,26 +208,6 @@ TEST(CountRequest, FailParseBadCollationValue) { ASSERT_EQUALS(countRequestStatus.getStatus(), ErrorCodes::BadValue); } -TEST(CountRequest, ToBSON) { - CountRequest countRequest(NamespaceString("TestDB.TestColl"), BSON("a" << BSON("$gte" << 11))); - countRequest.setLimit(100); - countRequest.setSkip(1000); - countRequest.setHint(BSON("b" << 5)); - countRequest.setCollation(BSON("locale" - << "en_US")); - - BSONObj actualObj = countRequest.toBSON(); - BSONObj expectedObj( - fromjson("{ count : 'TestDB.TestColl'," - " query : { a : { '$gte' : 11 } }," - " limit : 100," - " skip : 1000," - " hint : { b : 5 }," - " collation : { locale : 'en_US' } },")); - - ASSERT_BSONOBJ_EQ(actualObj, expectedObj); -} - TEST(CountRequest, ConvertToAggregationWithHint) { CountRequest countRequest(testns, BSONObj()); countRequest.setHint(BSON("x" << 1)); @@ -271,6 +270,88 @@ TEST(CountRequest, ConvertToAggregationWithQueryAndFilterAndLimit) { SimpleBSONObjComparator::kInstance.makeEqualTo())); } +TEST(CountRequest, ConvertToAggregationWithMaxTimeMS) { + CountRequest countRequest(testns, BSONObj()); + countRequest.setMaxTimeMS(100); + auto agg = countRequest.asAggregationCommand(); + ASSERT_OK(agg); + + auto ar = AggregationRequest::parseFromBSON(testns, agg.getValue()); + ASSERT_OK(ar.getStatus()); + ASSERT_EQ(ar.getValue().getMaxTimeMS(), 100u); + + std::vector<BSONObj> expectedPipeline{BSON("$count" + << "count")}; + ASSERT(std::equal(expectedPipeline.begin(), + expectedPipeline.end(), + ar.getValue().getPipeline().begin(), + ar.getValue().getPipeline().end(), + SimpleBSONObjComparator::kInstance.makeEqualTo())); +} + +TEST(CountRequest, ConvertToAggregationWithQueryOptions) { + CountRequest countRequest(testns, BSONObj()); + countRequest.setUnwrappedReadPref(BSON("readPreference" + << "secondary")); + auto agg = countRequest.asAggregationCommand(); + ASSERT_OK(agg); + + auto ar = AggregationRequest::parseFromBSON(testns, agg.getValue()); + ASSERT_OK(ar.getStatus()); + ASSERT_BSONOBJ_EQ(ar.getValue().getUnwrappedReadPref(), + BSON("readPreference" + << "secondary")); + + std::vector<BSONObj> expectedPipeline{BSON("$count" + << "count")}; + ASSERT(std::equal(expectedPipeline.begin(), + expectedPipeline.end(), + ar.getValue().getPipeline().begin(), + ar.getValue().getPipeline().end(), + SimpleBSONObjComparator::kInstance.makeEqualTo())); +} + +TEST(CountRequest, ConvertToAggregationWithComment) { + CountRequest countRequest(testns, BSONObj()); + countRequest.setComment("aComment"); + auto agg = countRequest.asAggregationCommand(); + ASSERT_OK(agg); + + auto ar = AggregationRequest::parseFromBSON(testns, agg.getValue()); + ASSERT_OK(ar.getStatus()); + ASSERT_EQ(ar.getValue().getComment(), "aComment"); + + std::vector<BSONObj> expectedPipeline{BSON("$count" + << "count")}; + ASSERT(std::equal(expectedPipeline.begin(), + expectedPipeline.end(), + ar.getValue().getPipeline().begin(), + ar.getValue().getPipeline().end(), + SimpleBSONObjComparator::kInstance.makeEqualTo())); +} + +TEST(CountRequest, ConvertToAggregationWithReadConcern) { + CountRequest countRequest(testns, BSONObj()); + countRequest.setReadConcern(BSON("level" + << "linearizable")); + auto agg = countRequest.asAggregationCommand(); + ASSERT_OK(agg); + + auto ar = AggregationRequest::parseFromBSON(testns, agg.getValue()); + ASSERT_OK(ar.getStatus()); + ASSERT_BSONOBJ_EQ(ar.getValue().getReadConcern(), + BSON("level" + << "linearizable")); + + std::vector<BSONObj> expectedPipeline{BSON("$count" + << "count")}; + ASSERT(std::equal(expectedPipeline.begin(), + expectedPipeline.end(), + ar.getValue().getPipeline().begin(), + ar.getValue().getPipeline().end(), + SimpleBSONObjComparator::kInstance.makeEqualTo())); +} + TEST(CountRequest, ConvertToAggregationOmitsExplain) { CountRequest countRequest(testns, BSONObj()); countRequest.setExplain(true); diff --git a/src/mongo/db/query/parsed_distinct.cpp b/src/mongo/db/query/parsed_distinct.cpp index 3c3a75c62cd..8b024533326 100644 --- a/src/mongo/db/query/parsed_distinct.cpp +++ b/src/mongo/db/query/parsed_distinct.cpp @@ -42,6 +42,7 @@ namespace mongo { const char ParsedDistinct::kKeyField[] = "key"; const char ParsedDistinct::kQueryField[] = "query"; const char ParsedDistinct::kCollationField[] = "collation"; +const char ParsedDistinct::kCommentField[] = "comment"; StatusWith<BSONObj> ParsedDistinct::asAggregationCommand() const { BSONObjBuilder aggregationBuilder; @@ -88,6 +89,23 @@ StatusWith<BSONObj> ParsedDistinct::asAggregationCommand() const { aggregationBuilder.append(kCollationField, qr.getCollation()); + if (qr.getMaxTimeMS() > 0) { + aggregationBuilder.append(QueryRequest::cmdOptionMaxTimeMS, qr.getMaxTimeMS()); + } + + if (!qr.getReadConcern().isEmpty()) { + aggregationBuilder.append(repl::ReadConcernArgs::kReadConcernFieldName, + qr.getReadConcern()); + } + + if (!qr.getUnwrappedReadPref().isEmpty()) { + aggregationBuilder.append(QueryRequest::kUnwrappedReadPrefField, qr.getUnwrappedReadPref()); + } + + if (!qr.getComment().empty()) { + aggregationBuilder.append(kCommentField, qr.getComment()); + } + // Specify the 'cursor' option so that aggregation uses the cursor interface. aggregationBuilder.append("cursor", BSONObj()); @@ -137,6 +155,50 @@ StatusWith<ParsedDistinct> ParsedDistinct::parse(OperationContext* opCtx, qr->setCollation(collationElt.embeddedObject()); } + if (BSONElement readConcernElt = cmdObj[repl::ReadConcernArgs::kReadConcernFieldName]) { + if (readConcernElt.type() != BSONType::Object) { + return Status(ErrorCodes::TypeMismatch, + str::stream() << "\"" << repl::ReadConcernArgs::kReadConcernFieldName + << "\" had the wrong type. Expected " + << typeName(BSONType::Object) + << ", found " + << typeName(readConcernElt.type())); + } + qr->setReadConcern(readConcernElt.embeddedObject()); + } + + if (BSONElement commentElt = cmdObj[kCommentField]) { + if (commentElt.type() != BSONType::String) { + return Status(ErrorCodes::TypeMismatch, + str::stream() << "\"" << kCommentField + << "\" had the wrong type. Expected " + << typeName(BSONType::String) + << ", found " + << typeName(commentElt.type())); + } + qr->setComment(commentElt.str()); + } + + if (BSONElement queryOptionsElt = cmdObj[QueryRequest::kUnwrappedReadPrefField]) { + if (queryOptionsElt.type() != BSONType::Object) { + return Status(ErrorCodes::TypeMismatch, + str::stream() << "\"" << QueryRequest::kUnwrappedReadPrefField + << "\" had the wrong type. Expected " + << typeName(BSONType::Object) + << ", found " + << typeName(queryOptionsElt.type())); + } + qr->setUnwrappedReadPref(queryOptionsElt.embeddedObject()); + } + + if (BSONElement maxTimeMSElt = cmdObj[QueryRequest::cmdOptionMaxTimeMS]) { + auto maxTimeMS = QueryRequest::parseMaxTimeMS(maxTimeMSElt); + if (!maxTimeMS.isOK()) { + return maxTimeMS.getStatus(); + } + qr->setMaxTimeMS(static_cast<unsigned int>(maxTimeMS.getValue())); + } + qr->setExplain(isExplain); auto cq = CanonicalQuery::canonicalize(opCtx, std::move(qr), extensionsCallback); diff --git a/src/mongo/db/query/parsed_distinct.h b/src/mongo/db/query/parsed_distinct.h index eb1c4e2b0ab..2294b2e0650 100644 --- a/src/mongo/db/query/parsed_distinct.h +++ b/src/mongo/db/query/parsed_distinct.h @@ -49,6 +49,7 @@ public: static const char kKeyField[]; static const char kQueryField[]; static const char kCollationField[]; + static const char kCommentField[]; ParsedDistinct(std::unique_ptr<CanonicalQuery> query, const std::string key) : _query(std::move(query)), _key(std::move(key)) {} diff --git a/src/mongo/db/query/parsed_distinct_test.cpp b/src/mongo/db/query/parsed_distinct_test.cpp index 3a4f6ccb42e..68554d2637c 100644 --- a/src/mongo/db/query/parsed_distinct_test.cpp +++ b/src/mongo/db/query/parsed_distinct_test.cpp @@ -64,6 +64,74 @@ TEST(ParsedDistinctTest, ConvertToAggregationNoQuery) { ASSERT_EQ(ar.getValue().getBatchSize(), AggregationRequest::kDefaultBatchSize); ASSERT_EQ(ar.getValue().getNamespaceString(), testns); ASSERT_BSONOBJ_EQ(ar.getValue().getCollation(), BSONObj()); + ASSERT(ar.getValue().getReadConcern().isEmpty()); + ASSERT(ar.getValue().getUnwrappedReadPref().isEmpty()); + ASSERT(ar.getValue().getComment().empty()); + ASSERT_EQUALS(ar.getValue().getMaxTimeMS(), 0u); + + std::vector<BSONObj> expectedPipeline{ + BSON("$unwind" << BSON("path" + << "$x" + << "preserveNullAndEmptyArrays" + << true)), + BSON("$group" << BSON("_id" << BSONNULL << "distinct" << BSON("$addToSet" + << "$x")))}; + ASSERT(std::equal(expectedPipeline.begin(), + expectedPipeline.end(), + ar.getValue().getPipeline().begin(), + ar.getValue().getPipeline().end(), + SimpleBSONObjComparator::kInstance.makeEqualTo())); +} + +TEST(ParsedDistinctTest, ConvertToAggregationWithAllOptions) { + QueryTestServiceContext serviceContext; + auto uniqueTxn = serviceContext.makeOperationContext(); + OperationContext* opCtx = uniqueTxn.get(); + + auto pd = ParsedDistinct::parse(opCtx, + testns, + BSON("distinct" + << "testcoll" + << "key" + << "x" + << "hint" + << BSON("b" << 5) + << "collation" + << BSON("locale" + << "en_US") + << "readConcern" + << BSON("level" + << "linearizable") + << "$queryOptions" + << BSON("readPreference" + << "secondary") + << "comment" + << "aComment" + << "maxTimeMS" + << 100), + ExtensionsCallbackDisallowExtensions(), + !isExplain); + ASSERT_OK(pd.getStatus()); + + auto agg = pd.getValue().asAggregationCommand(); + ASSERT_OK(agg); + + auto ar = AggregationRequest::parseFromBSON(testns, agg.getValue()); + ASSERT_OK(ar.getStatus()); + ASSERT(!ar.getValue().getExplain()); + ASSERT_EQ(ar.getValue().getBatchSize(), AggregationRequest::kDefaultBatchSize); + ASSERT_EQ(ar.getValue().getNamespaceString(), testns); + ASSERT_BSONOBJ_EQ(ar.getValue().getCollation(), + BSON("locale" + << "en_US")); + ASSERT_BSONOBJ_EQ(ar.getValue().getReadConcern(), + BSON("level" + << "linearizable")); + ASSERT_BSONOBJ_EQ(ar.getValue().getUnwrappedReadPref(), + BSON("readPreference" + << "secondary")); + ASSERT_EQUALS(ar.getValue().getComment(), "aComment"); + ASSERT_EQUALS(ar.getValue().getMaxTimeMS(), 100u); std::vector<BSONObj> expectedPipeline{ BSON("$unwind" << BSON("path" @@ -100,6 +168,10 @@ TEST(ParsedDistinctTest, ConvertToAggregationWithQuery) { ASSERT_EQ(ar.getValue().getBatchSize(), AggregationRequest::kDefaultBatchSize); ASSERT_EQ(ar.getValue().getNamespaceString(), testns); ASSERT_BSONOBJ_EQ(ar.getValue().getCollation(), BSONObj()); + ASSERT(ar.getValue().getReadConcern().isEmpty()); + ASSERT(ar.getValue().getUnwrappedReadPref().isEmpty()); + ASSERT(ar.getValue().getComment().empty()); + ASSERT_EQUALS(ar.getValue().getMaxTimeMS(), 0u); std::vector<BSONObj> expectedPipeline{ BSON("$match" << BSON("z" << 7)), diff --git a/src/mongo/db/query/query_request.cpp b/src/mongo/db/query/query_request.cpp index 04c4154f741..467f57f41f0 100644 --- a/src/mongo/db/query/query_request.cpp +++ b/src/mongo/db/query/query_request.cpp @@ -168,6 +168,14 @@ StatusWith<unique_ptr<QueryRequest>> QueryRequest::makeFromFindCommand(Namespace } qr->_readConcern = el.Obj().getOwned(); + } else if (str::equals(fieldName, QueryRequest::kUnwrappedReadPrefField.c_str())) { + // Read preference parsing is handled elsewhere, but we store a copy here. + Status status = checkFieldType(el, Object); + if (!status.isOK()) { + return status; + } + + qr->setUnwrappedReadPref(el.Obj()); } else if (str::equals(fieldName, kCollationField)) { // Collation parsing is handled elsewhere, but we store a copy here. Status status = checkFieldType(el, Object); @@ -1063,6 +1071,12 @@ StatusWith<BSONObj> QueryRequest::asAggregationCommand() const { if (!_comment.empty()) { aggregationBuilder.append("comment", _comment); } + if (!_readConcern.isEmpty()) { + aggregationBuilder.append("readConcern", _readConcern); + } + if (!_unwrappedReadPref.isEmpty()) { + aggregationBuilder.append(QueryRequest::kUnwrappedReadPrefField, _unwrappedReadPref); + } return StatusWith<BSONObj>(aggregationBuilder.obj()); } } // namespace mongo diff --git a/src/mongo/db/query/query_request.h b/src/mongo/db/query/query_request.h index 376d5ece98c..a8ca956c43b 100644 --- a/src/mongo/db/query/query_request.h +++ b/src/mongo/db/query/query_request.h @@ -250,6 +250,14 @@ public: _comment = comment; } + const BSONObj& getUnwrappedReadPref() const { + return _unwrappedReadPref; + } + + void setUnwrappedReadPref(BSONObj unwrappedReadPref) { + _unwrappedReadPref = unwrappedReadPref.getOwned(); + } + int getMaxScan() const { return _maxScan; } @@ -449,6 +457,11 @@ private: // The collation is parsed elsewhere. BSONObj _collation; + // The unwrapped readPreference object, if one was given to us by the mongos command processor. + // This object will be empty when no readPreference is specified or if the request does not + // originate from mongos. + BSONObj _unwrappedReadPref; + bool _wantMore = true; // Must be either unset or positive. Negative skip is illegal and a skip of zero received from @@ -473,6 +486,8 @@ private: std::string _comment; int _maxScan = 0; + + // A user-specified maxTimeMS limit, or a value of '0' if not specified. int _maxTimeMS = 0; BSONObj _min; diff --git a/src/mongo/db/query/query_request_test.cpp b/src/mongo/db/query/query_request_test.cpp index 0834cd6e4c8..f64a5429faf 100644 --- a/src/mongo/db/query/query_request_test.cpp +++ b/src/mongo/db/query/query_request_test.cpp @@ -473,6 +473,7 @@ TEST(QueryRequestTest, ParseFromCommandAllNonOptionFields) { "projection: {c: 1}," "hint: {d: 1}," "readConcern: {e: 1}," + "$queryOptions: {$readPreference: 'secondary'}," "collation: {f: 1}," "limit: 3," "skip: 5," @@ -494,6 +495,9 @@ TEST(QueryRequestTest, ParseFromCommandAllNonOptionFields) { ASSERT_EQUALS(0, expectedHint.woCompare(qr->getHint())); BSONObj expectedReadConcern = BSON("e" << 1); ASSERT_EQUALS(0, expectedReadConcern.woCompare(qr->getReadConcern())); + BSONObj expectedUnwrappedReadPref = BSON("$readPreference" + << "secondary"); + ASSERT_EQUALS(0, expectedUnwrappedReadPref.woCompare(qr->getUnwrappedReadPref())); BSONObj expectedCollation = BSON("f" << 1); ASSERT_EQUALS(0, expectedCollation.woCompare(qr->getCollation())); ASSERT_EQUALS(3, *qr->getLimit()); @@ -589,6 +593,7 @@ TEST(QueryRequestTest, ParseFromCommandSkipWrongType) { ASSERT_NOT_OK(result.getStatus()); } + TEST(QueryRequestTest, ParseFromCommandLimitWrongType) { BSONObj cmdObj = fromjson( "{find: 'testns'," @@ -624,6 +629,17 @@ TEST(QueryRequestTest, ParseFromCommandCommentWrongType) { ASSERT_NOT_OK(result.getStatus()); } +TEST(QueryRequestTest, ParseFromCommandUnwrappedReadPrefWrongType) { + BSONObj cmdObj = fromjson( + "{find: 'testns'," + "filter: {a: 1}," + "$queryOptions: 1}"); + const NamespaceString nss("test.testns"); + bool isExplain = false; + auto result = QueryRequest::makeFromFindCommand(nss, cmdObj, isExplain); + ASSERT_NOT_OK(result.getStatus()); +} + TEST(QueryRequestTest, ParseFromCommandMaxScanWrongType) { BSONObj cmdObj = fromjson( "{find: 'testns'," diff --git a/src/mongo/db/views/resolved_view.cpp b/src/mongo/db/views/resolved_view.cpp index 3376c05570d..fa0564113f8 100644 --- a/src/mongo/db/views/resolved_view.cpp +++ b/src/mongo/db/views/resolved_view.cpp @@ -85,6 +85,9 @@ AggregationRequest ResolvedView::asExpandedViewAggregation( expandedRequest.setHint(request.getHint()); expandedRequest.setComment(request.getComment()); + expandedRequest.setMaxTimeMS(request.getMaxTimeMS()); + expandedRequest.setReadConcern(request.getReadConcern()); + expandedRequest.setUnwrappedReadPref(request.getUnwrappedReadPref()); expandedRequest.setBypassDocumentValidation(request.shouldBypassDocumentValidation()); expandedRequest.setAllowDiskUse(request.shouldAllowDiskUse()); expandedRequest.setCollation(request.getCollation()); diff --git a/src/mongo/db/views/resolved_view_test.cpp b/src/mongo/db/views/resolved_view_test.cpp index cd34a8e6cf4..05c47d3fd51 100644 --- a/src/mongo/db/views/resolved_view_test.cpp +++ b/src/mongo/db/views/resolved_view_test.cpp @@ -113,6 +113,48 @@ TEST(ResolvedViewTest, ExpandingAggRequestPreservesAllowDiskUse) { ASSERT_TRUE(result.shouldAllowDiskUse()); } +TEST(ResolvedViewTest, ExpandingAggRequestPreservesHint) { + const ResolvedView resolvedView{backingNss, emptyPipeline}; + AggregationRequest aggRequest(viewNss, {}); + aggRequest.setHint(BSON("a" << 1)); + + auto result = resolvedView.asExpandedViewAggregation(aggRequest); + ASSERT_BSONOBJ_EQ(result.getHint(), BSON("a" << 1)); +} + +TEST(ResolvedViewTest, ExpandingAggRequestPreservesReadPreference) { + const ResolvedView resolvedView{backingNss, emptyPipeline}; + AggregationRequest aggRequest(viewNss, {}); + aggRequest.setUnwrappedReadPref(BSON("$readPreference" + << "nearest")); + + auto result = resolvedView.asExpandedViewAggregation(aggRequest); + ASSERT_BSONOBJ_EQ(result.getUnwrappedReadPref(), + BSON("$readPreference" + << "nearest")); +} + +TEST(ResolvedViewTest, ExpandingAggRequestPreservesReadConcern) { + const ResolvedView resolvedView{backingNss, emptyPipeline}; + AggregationRequest aggRequest(viewNss, {}); + aggRequest.setReadConcern(BSON("level" + << "linearizable")); + + auto result = resolvedView.asExpandedViewAggregation(aggRequest); + ASSERT_BSONOBJ_EQ(result.getReadConcern(), + BSON("level" + << "linearizable")); +} + +TEST(ResolvedViewTest, ExpandingAggRequestPreservesMaxTimeMS) { + const ResolvedView resolvedView{backingNss, emptyPipeline}; + AggregationRequest aggRequest(viewNss, {}); + aggRequest.setMaxTimeMS(100u); + + auto result = resolvedView.asExpandedViewAggregation(aggRequest); + ASSERT_EQ(result.getMaxTimeMS(), 100u); +} + TEST(ResolvedViewTest, ExpandingAggRequestPreservesCollation) { const ResolvedView resolvedView{backingNss, emptyPipeline}; AggregationRequest aggRequest(viewNss, {}); diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp index 010cbc55e63..bdb4be00869 100644 --- a/src/mongo/s/commands/cluster_aggregate.cpp +++ b/src/mongo/s/commands/cluster_aggregate.cpp @@ -60,12 +60,6 @@ namespace mongo { namespace { -// These fields are not part of the AggregationRequest since they are not handled by the aggregation -// subsystem, so we serialize them separately. -const std::initializer_list<StringData> kFieldsToPropagateToShards = { - "$queryOptions", "readConcern", QueryRequest::cmdOptionMaxTimeMS, -}; - // Given a document representing an aggregation command such as // // {aggregate: "myCollection", pipeline: [], ...}, @@ -73,12 +67,7 @@ const std::initializer_list<StringData> kFieldsToPropagateToShards = { // produces the corresponding explain command: // // {explain: {aggregate: "myCollection", pipline: [], ...}, verbosity: ...} -// -// The 'cmdObj' is the original user request, which may contain fields to propagate to the shards -// that are not handled by the aggregation subsystem itself (e.g. maxTimeMS). -Document wrapAggAsExplain(Document aggregateCommand, - ExplainOptions::Verbosity verbosity, - const BSONObj& cmdObj) { +Document wrapAggAsExplain(Document aggregateCommand, ExplainOptions::Verbosity verbosity) { MutableDocument explainCommandBuilder; explainCommandBuilder["explain"] = Value(aggregateCommand); @@ -87,11 +76,6 @@ Document wrapAggAsExplain(Document aggregateCommand, explainCommandBuilder[explainOption.fieldNameStringData()] = Value(explainOption); } - // Propagate options not specific to agg to the shards. - for (auto&& field : kFieldsToPropagateToShards) { - explainCommandBuilder[field] = Value(cmdObj[field]); - } - return explainCommandBuilder.freeze(); } @@ -208,13 +192,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, // If this is a request for an aggregation explain, then we must wrap the aggregate inside an // explain command. if (mergeCtx->explain) { - commandBuilder.reset(wrapAggAsExplain(commandBuilder.freeze(), *mergeCtx->explain, cmdObj)); - } else { - // Add things like $queryOptions which must be sent to the shards but are not part of the - // agg request. - for (auto&& field : kFieldsToPropagateToShards) { - commandBuilder[field] = Value(cmdObj[field]); - } + commandBuilder.reset(wrapAggAsExplain(commandBuilder.freeze(), *mergeCtx->explain)); } BSONObj shardedCommand = commandBuilder.freeze().toBson(); @@ -277,10 +255,6 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, mergeCmd["pipeline"] = Value(pipeline.getValue()->serialize()); mergeCmd["cursor"] = Value(cmdObj["cursor"]); - if (cmdObj.hasField("$queryOptions")) { - mergeCmd["$queryOptions"] = Value(cmdObj["$queryOptions"]); - } - if (cmdObj.hasField(QueryRequest::cmdOptionMaxTimeMS)) { mergeCmd[QueryRequest::cmdOptionMaxTimeMS] = Value(cmdObj[QueryRequest::cmdOptionMaxTimeMS]); @@ -493,7 +467,7 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx, // the shards. if (aggRequest.getExplain()) { auto explainCmdObj = - wrapAggAsExplain(aggRequest.serializeToCommandObj(), *aggRequest.getExplain(), cmdObj); + wrapAggAsExplain(aggRequest.serializeToCommandObj(), *aggRequest.getExplain()); cmdObj = explainCmdObj.toBson(); } |