summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml1
-rw-r--r--jstests/sharding/view_rewrite.js213
-rw-r--r--src/mongo/db/pipeline/aggregation_request.cpp40
-rw-r--r--src/mongo/db/pipeline/aggregation_request.h34
-rw-r--r--src/mongo/db/pipeline/aggregation_request_test.cpp54
-rw-r--r--src/mongo/db/query/count_request.cpp81
-rw-r--r--src/mongo/db/query/count_request.h55
-rw-r--r--src/mongo/db/query/count_request_test.cpp123
-rw-r--r--src/mongo/db/query/parsed_distinct.cpp62
-rw-r--r--src/mongo/db/query/parsed_distinct.h1
-rw-r--r--src/mongo/db/query/parsed_distinct_test.cpp72
-rw-r--r--src/mongo/db/query/query_request.cpp14
-rw-r--r--src/mongo/db/query/query_request.h15
-rw-r--r--src/mongo/db/query/query_request_test.cpp16
-rw-r--r--src/mongo/db/views/resolved_view.cpp3
-rw-r--r--src/mongo/db/views/resolved_view_test.cpp42
-rw-r--r--src/mongo/s/commands/cluster_aggregate.cpp32
17 files changed, 746 insertions, 112 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
index 44e324a0600..0f9e6d36c4d 100644
--- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
@@ -16,6 +16,7 @@ selector:
- jstests/sharding/linearizable_read_concern.js
# Enable when 3.6 becomes last-stable.
- jstests/sharding/views.js
+ - jstests/sharding/view_rewrite.js
# New feature in v3.6 mongos
- jstests/sharding/logical_time_metadata.js
diff --git a/jstests/sharding/view_rewrite.js b/jstests/sharding/view_rewrite.js
new file mode 100644
index 00000000000..5e45a3905cc
--- /dev/null
+++ b/jstests/sharding/view_rewrite.js
@@ -0,0 +1,213 @@
+/**
+ * Tests that query options are not dropped by mongos when a query against a view is rewritten as an
+ * aggregation against the underlying collection.
+ */
+(function() {
+ "use strict";
+
+ load("jstests/libs/profiler.js"); // For profilerHasSingleMatchingEntryOrThrow.
+
+ const st = new ShardingTest({
+ name: "view_rewrite",
+ shards: 2,
+ other: {
+ rs0: {
+ nodes: [
+ {rsConfig: {priority: 1}},
+ {rsConfig: {priority: 0, tags: {"tag": "secondary"}}}
+ ]
+ },
+ rs1: {
+ nodes: [
+ {rsConfig: {priority: 1}},
+ {rsConfig: {priority: 0, tags: {"tag": "secondary"}}}
+ ]
+ },
+ enableBalancer: false
+ }
+ });
+
+ const mongos = st.s;
+ const config = mongos.getDB("config");
+ const mongosDB = mongos.getDB("view_rewrite");
+ assert.commandWorked(mongosDB.dropDatabase());
+
+ const coll = mongosDB.getCollection("coll");
+
+ assert.commandWorked(config.adminCommand({enableSharding: mongosDB.getName()}));
+ st.ensurePrimaryShard(mongosDB.getName(), "view_rewrite-rs0");
+ const rs0Secondary = st.rs0.getSecondary();
+ const rs1Primary = st.rs1.getPrimary();
+ const rs1Secondary = st.rs1.getSecondary();
+
+ for (let i = 0; i < 10; ++i) {
+ assert.writeOK(coll.insert({a: i}));
+ }
+
+ assert.commandWorked(mongosDB.createView("view", coll.getName(), []));
+ const view = mongosDB.getCollection("view");
+
+ //
+ // Confirms that queries run against views on mongos result in execution of a rewritten
+ // aggregation that contains all expected query options.
+ //
+ function confirmOptionsInProfiler(shardPrimary) {
+ assert.commandWorked(shardPrimary.setProfilingLevel(2));
+
+ // Aggregation
+ assert.commandWorked(mongosDB.runCommand({
+ aggregate: "view",
+ pipeline: [],
+ comment: "agg_rewrite",
+ maxTimeMS: 5 * 60 * 1000,
+ readConcern: {level: "linearizable"},
+ cursor: {}
+ }));
+
+ profilerHasSingleMatchingEntryOrThrow(shardPrimary, {
+ "ns": coll.getFullName(),
+ "command.aggregate": coll.getName(),
+ "command.comment": "agg_rewrite",
+ "command.maxTimeMS": 5 * 60 * 1000,
+ "command.readConcern": {level: "linearizable"},
+ "command.pipeline.$mergeCursors": {"$exists": false}
+ });
+
+ // Find
+ assert.commandWorked(mongosDB.runCommand({
+ find: "view",
+ comment: "find_rewrite",
+ maxTimeMS: 5 * 60 * 1000,
+ readConcern: {level: "linearizable"}
+ }));
+
+ profilerHasSingleMatchingEntryOrThrow(shardPrimary, {
+ "ns": coll.getFullName(),
+ "command.aggregate": coll.getName(),
+ "command.comment": "find_rewrite",
+ "command.maxTimeMS": 5 * 60 * 1000,
+ "command.readConcern": {level: "linearizable"},
+ "command.pipeline.$mergeCursors": {"$exists": false}
+ });
+
+ // Count
+ assert.commandWorked(mongosDB.runCommand({
+ count: "view",
+ comment: "count_rewrite",
+ maxTimeMS: 5 * 60 * 1000,
+ readConcern: {level: "linearizable"}
+ }));
+
+ profilerHasSingleMatchingEntryOrThrow(shardPrimary, {
+ "ns": coll.getFullName(),
+ "command.aggregate": coll.getName(),
+ "command.comment": "count_rewrite",
+ "command.maxTimeMS": 5 * 60 * 1000,
+ "command.readConcern": {level: "linearizable"},
+ "command.pipeline.$mergeCursors": {"$exists": false}
+ });
+
+ // Distinct
+ assert.commandWorked(mongosDB.runCommand({
+ distinct: "view",
+ key: "a",
+ comment: "distinct_rewrite",
+ maxTimeMS: 5 * 60 * 1000,
+ readConcern: {level: "linearizable"}
+ }));
+
+ profilerHasSingleMatchingEntryOrThrow(shardPrimary, {
+ "ns": coll.getFullName(),
+ "command.aggregate": coll.getName(),
+ "command.comment": "distinct_rewrite",
+ "command.maxTimeMS": 5 * 60 * 1000,
+ "command.readConcern": {level: "linearizable"},
+ "command.pipeline.$mergeCursors": {"$exists": false}
+ });
+
+ assert.commandWorked(shardPrimary.setProfilingLevel(0));
+ shardPrimary.system.profile.drop();
+ }
+
+ //
+ // Confirms that queries run against views on mongos are executed against a tagged secondary, as
+ // per readPreference setting.
+ //
+ function confirmReadPreference(shardSecondary) {
+ assert.commandWorked(shardSecondary.setProfilingLevel(2));
+
+ // Aggregation
+ assert.commandWorked(mongosDB.runCommand({
+ query: {aggregate: "view", pipeline: [], comment: "agg_readPref", cursor: {}},
+ $readPreference: {mode: "nearest", tags: [{tag: "secondary"}]}
+ }));
+
+ profilerHasSingleMatchingEntryOrThrow(shardSecondary, {
+ "ns": coll.getFullName(),
+ "command.aggregate": coll.getName(),
+ "command.comment": "agg_readPref",
+ "command.pipeline.$mergeCursors": {"$exists": false}
+ });
+
+ // Find
+ assert.commandWorked(mongosDB.runCommand({
+ query: {find: "view", comment: "find_readPref", maxTimeMS: 5 * 60 * 1000},
+ $readPreference: {mode: "nearest", tags: [{tag: "secondary"}]}
+ }));
+
+ profilerHasSingleMatchingEntryOrThrow(shardSecondary, {
+ "ns": coll.getFullName(),
+ "command.aggregate": coll.getName(),
+ "command.comment": "find_readPref",
+ "command.pipeline.$mergeCursors": {"$exists": false}
+ });
+
+ // Count
+ assert.commandWorked(mongosDB.runCommand({
+ query: {count: "view", comment: "count_readPref"},
+ $readPreference: {mode: "nearest", tags: [{tag: "secondary"}]}
+ }));
+
+ profilerHasSingleMatchingEntryOrThrow(shardSecondary, {
+ "ns": coll.getFullName(),
+ "command.aggregate": coll.getName(),
+ "command.comment": "count_readPref",
+ "command.pipeline.$mergeCursors": {"$exists": false}
+ });
+
+ // Distinct
+ assert.commandWorked(mongosDB.runCommand({
+ query: {distinct: "view", key: "a", comment: "distinct_readPref"},
+ $readPreference: {mode: "nearest", tags: [{tag: "secondary"}]}
+ }));
+
+ profilerHasSingleMatchingEntryOrThrow(shardSecondary, {
+ "ns": coll.getFullName(),
+ "command.aggregate": coll.getName(),
+ "command.comment": "distinct_readPref",
+ "command.pipeline.$mergeCursors": {"$exists": false}
+ });
+
+ assert.commandWorked(shardSecondary.setProfilingLevel(0));
+ }
+
+ //
+ // Test rewrite for queries run against an unsharded collection.
+ //
+ confirmOptionsInProfiler(st.rs0.getPrimary().getDB(mongosDB.getName()));
+ confirmReadPreference(st.rs0.getSecondary().getDB(mongosDB.getName()));
+
+ //
+ // Test rewrite for queries run against a sharded collection.
+ //
+ assert.commandWorked(coll.createIndex({a: 1}));
+ assert.commandWorked(config.adminCommand({shardCollection: coll.getFullName(), key: {a: 1}}));
+ assert.commandWorked(mongos.adminCommand({split: coll.getFullName(), middle: {a: 6}}));
+ assert.commandWorked(mongosDB.adminCommand(
+ {moveChunk: coll.getFullName(), find: {a: 25}, to: "view_rewrite-rs1"}));
+ // Sharded tests are run against the non-primary shard for the "view_rewrite" db.
+ confirmOptionsInProfiler(st.rs1.getPrimary().getDB(mongosDB.getName()));
+ confirmReadPreference(st.rs1.getSecondary().getDB(mongosDB.getName()));
+
+ st.stop();
+})();
diff --git a/src/mongo/db/pipeline/aggregation_request.cpp b/src/mongo/db/pipeline/aggregation_request.cpp
index 214c134837f..43adbb213ad 100644
--- a/src/mongo/db/pipeline/aggregation_request.cpp
+++ b/src/mongo/db/pipeline/aggregation_request.cpp
@@ -84,11 +84,7 @@ StatusWith<AggregationRequest> AggregationRequest::parseFromBSON(
AggregationRequest request(std::move(nss), std::move(pipeline));
const std::initializer_list<StringData> optionsParsedElseWhere = {
- QueryRequest::cmdOptionMaxTimeMS,
- WriteConcernOptions::kWriteConcernField,
- kPipelineName,
- kCommandName,
- repl::ReadConcernArgs::kReadConcernFieldName};
+ WriteConcernOptions::kWriteConcernField, kPipelineName, kCommandName};
bool hasCursorElem = false;
bool hasExplainElem = false;
@@ -97,6 +93,12 @@ StatusWith<AggregationRequest> AggregationRequest::parseFromBSON(
for (auto&& elem : cmdObj) {
auto fieldName = elem.fieldNameStringData();
+ // $queryOptions is the exception to our '$' filter. We expect this field to be validated
+ // elsewhere.
+ if (QueryRequest::kUnwrappedReadPrefField == fieldName) {
+ request.setUnwrappedReadPref(elem.embeddedObject());
+ }
+
// Ignore top-level fields prefixed with $. They are for the command processor, not us.
if (fieldName[0] == '$') {
continue;
@@ -125,6 +127,21 @@ StatusWith<AggregationRequest> AggregationRequest::parseFromBSON(
<< typeName(elem.type())};
}
request.setCollation(elem.embeddedObject().getOwned());
+ } else if (QueryRequest::cmdOptionMaxTimeMS == fieldName) {
+ auto maxTimeMs = QueryRequest::parseMaxTimeMS(elem);
+ if (!maxTimeMs.isOK()) {
+ return maxTimeMs.getStatus();
+ }
+
+ request.setMaxTimeMS(maxTimeMs.getValue());
+ } else if (repl::ReadConcernArgs::kReadConcernFieldName == fieldName) {
+ if (elem.type() != BSONType::Object) {
+ return {ErrorCodes::TypeMismatch,
+ str::stream() << repl::ReadConcernArgs::kReadConcernFieldName
+ << " must be an object, not a "
+ << typeName(elem.type())};
+ }
+ request.setReadConcern(elem.embeddedObject().getOwned());
} else if (kHintName == fieldName) {
if (BSONType::Object == elem.type()) {
request.setHint(elem.embeddedObject());
@@ -198,7 +215,7 @@ StatusWith<AggregationRequest> AggregationRequest::parseFromBSON(
<< "' option is required, except for aggregation explain"};
}
- if (request.getExplain() && cmdObj[repl::ReadConcernArgs::kReadConcernFieldName]) {
+ if (request.getExplain() && !request.getReadConcern().isEmpty()) {
return {ErrorCodes::FailedToParse,
str::stream() << "Aggregation explain does not support the '"
<< repl::ReadConcernArgs::kReadConcernFieldName
@@ -232,7 +249,16 @@ Document AggregationRequest::serializeToCommandObj() const {
// Only serialize a hint if one was specified.
{kHintName, _hint.isEmpty() ? Value() : Value(_hint)},
// Only serialize a comment if one was specified.
- {kCommentName, _comment.empty() ? Value() : Value(_comment)}};
+ {kCommentName, _comment.empty() ? Value() : Value(_comment)},
+ // Only serialize readConcern if specified.
+ {repl::ReadConcernArgs::kReadConcernFieldName,
+ _readConcern.isEmpty() ? Value() : Value(_readConcern)},
+ // Only serialize the unwrapped read preference if specified.
+ {QueryRequest::kUnwrappedReadPrefField,
+ _unwrappedReadPref.isEmpty() ? Value() : Value(_unwrappedReadPref)},
+ // Only serialize maxTimeMs if specified.
+ {QueryRequest::cmdOptionMaxTimeMS,
+ _maxTimeMS == 0 ? Value() : Value(static_cast<int>(_maxTimeMS))}};
}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/aggregation_request.h b/src/mongo/db/pipeline/aggregation_request.h
index 935b98fed97..1f290f26a90 100644
--- a/src/mongo/db/pipeline/aggregation_request.h
+++ b/src/mongo/db/pipeline/aggregation_request.h
@@ -140,6 +140,18 @@ public:
return _explainMode;
}
+ unsigned int getMaxTimeMS() const {
+ return _maxTimeMS;
+ }
+
+ const BSONObj& getReadConcern() const {
+ return _readConcern;
+ }
+
+ const BSONObj& getUnwrappedReadPref() const {
+ return _unwrappedReadPref;
+ }
+
//
// Setters for optional fields.
//
@@ -180,6 +192,18 @@ public:
_bypassDocumentValidation = shouldBypassDocumentValidation;
}
+ void setMaxTimeMS(unsigned int maxTimeMS) {
+ _maxTimeMS = maxTimeMS;
+ }
+
+ void setReadConcern(BSONObj readConcern) {
+ _readConcern = readConcern.getOwned();
+ }
+
+ void setUnwrappedReadPref(BSONObj unwrappedReadPref) {
+ _unwrappedReadPref = unwrappedReadPref.getOwned();
+ }
+
private:
// Required fields.
const NamespaceString _nss;
@@ -203,11 +227,21 @@ private:
// The comment parameter attached to this aggregation.
std::string _comment;
+ BSONObj _readConcern;
+
+ // The unwrapped readPreference object, if one was given to us by the mongos command processor.
+ // This object will be empty when no readPreference is specified or if the request does not
+ // originate from mongos.
+ BSONObj _unwrappedReadPref;
+
// The explain mode to use, or boost::none if this is not a request for an aggregation explain.
boost::optional<ExplainOptions::Verbosity> _explainMode;
bool _allowDiskUse = false;
bool _fromRouter = false;
bool _bypassDocumentValidation = false;
+
+ // A user-specified maxTimeMS limit, or a value of '0' if not specified.
+ unsigned int _maxTimeMS = 0;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/aggregation_request_test.cpp b/src/mongo/db/pipeline/aggregation_request_test.cpp
index e148cef9b25..ba036c71bf4 100644
--- a/src/mongo/db/pipeline/aggregation_request_test.cpp
+++ b/src/mongo/db/pipeline/aggregation_request_test.cpp
@@ -38,6 +38,8 @@
#include "mongo/db/pipeline/document.h"
#include "mongo/db/pipeline/document_value_test_util.h"
#include "mongo/db/pipeline/value.h"
+#include "mongo/db/query/query_request.h"
+#include "mongo/db/repl/read_concern_args.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/assert_util.h"
@@ -54,12 +56,12 @@ const Document kDefaultCursorOptionDocument{
TEST(AggregationRequestTest, ShouldParseAllKnownOptions) {
NamespaceString nss("a.collection");
const BSONObj inputBson = fromjson(
- "{pipeline: [{$match: {a: 'abc'}}], explain: true, allowDiskUse: true, fromRouter: true, "
+ "{pipeline: [{$match: {a: 'abc'}}], explain: false, allowDiskUse: true, fromRouter: true, "
"bypassDocumentValidation: true, collation: {locale: 'en_US'}, cursor: {batchSize: 10}, "
- "hint: {a: 1}, comment: 'agg_comment'}");
+ "hint: {a: 1}, maxTimeMS: 100, readConcern: {level: 'linearizable'}, "
+ "$queryOptions: {$readPreference: 'nearest'}, comment: 'agg_comment'}}");
auto request = unittest::assertGet(AggregationRequest::parseFromBSON(nss, inputBson));
- ASSERT_TRUE(request.getExplain());
- ASSERT(*request.getExplain() == ExplainOptions::Verbosity::kQueryPlanner);
+ ASSERT_FALSE(request.getExplain());
ASSERT_TRUE(request.shouldAllowDiskUse());
ASSERT_TRUE(request.isFromRouter());
ASSERT_TRUE(request.shouldBypassDocumentValidation());
@@ -69,6 +71,21 @@ TEST(AggregationRequestTest, ShouldParseAllKnownOptions) {
ASSERT_BSONOBJ_EQ(request.getCollation(),
BSON("locale"
<< "en_US"));
+ ASSERT_EQ(request.getMaxTimeMS(), 100u);
+ ASSERT_BSONOBJ_EQ(request.getReadConcern(),
+ BSON("level"
+ << "linearizable"));
+ ASSERT_BSONOBJ_EQ(request.getUnwrappedReadPref(),
+ BSON("$readPreference"
+ << "nearest"));
+}
+
+TEST(AggregationRequestTest, ShouldParseExplicitExplainTrue) {
+ NamespaceString nss("a.collection");
+ const BSONObj inputBson = fromjson("{pipeline: [], explain: true, cursor: {}}");
+ auto request = unittest::assertGet(AggregationRequest::parseFromBSON(nss, inputBson));
+ ASSERT_TRUE(request.getExplain());
+ ASSERT(*request.getExplain() == ExplainOptions::Verbosity::kQueryPlanner);
}
TEST(AggregationRequestTest, ShouldParseExplicitExplainFalseWithCursorOption) {
@@ -123,6 +140,9 @@ TEST(AggregationRequestTest, ShouldNotSerializeOptionalValuesIfEquivalentToDefau
request.setCollation(BSONObj());
request.setHint(BSONObj());
request.setComment("");
+ request.setMaxTimeMS(0u);
+ request.setUnwrappedReadPref(BSONObj());
+ request.setReadConcern(BSONObj());
auto expectedSerialization =
Document{{AggregationRequest::kCommandName, nss.coll()},
@@ -138,6 +158,7 @@ TEST(AggregationRequestTest, ShouldSerializeOptionalValuesIfSet) {
request.setFromRouter(true);
request.setBypassDocumentValidation(true);
request.setBatchSize(10);
+ request.setMaxTimeMS(10u);
const auto hintObj = BSON("a" << 1);
request.setHint(hintObj);
const auto comment = std::string("agg_comment");
@@ -145,6 +166,12 @@ TEST(AggregationRequestTest, ShouldSerializeOptionalValuesIfSet) {
const auto collationObj = BSON("locale"
<< "en_US");
request.setCollation(collationObj);
+ const auto readPrefObj = BSON("$readPreference"
+ << "nearest");
+ request.setUnwrappedReadPref(readPrefObj);
+ const auto readConcernObj = BSON("level"
+ << "linearizable");
+ request.setReadConcern(readConcernObj);
auto expectedSerialization =
Document{{AggregationRequest::kCommandName, nss.coll()},
@@ -156,7 +183,10 @@ TEST(AggregationRequestTest, ShouldSerializeOptionalValuesIfSet) {
{AggregationRequest::kCursorName,
Value(Document({{AggregationRequest::kBatchSizeName, 10}}))},
{AggregationRequest::kHintName, hintObj},
- {AggregationRequest::kCommentName, comment}};
+ {AggregationRequest::kCommentName, comment},
+ {repl::ReadConcernArgs::kReadConcernFieldName, readConcernObj},
+ {QueryRequest::kUnwrappedReadPrefField, readPrefObj},
+ {QueryRequest::cmdOptionMaxTimeMS, 10}};
ASSERT_DOCUMENT_EQ(request.serializeToCommandObj(), expectedSerialization);
}
@@ -352,19 +382,5 @@ TEST(AggregationRequestTest, ShouldIgnoreWriteConcernOption) {
ASSERT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
}
-TEST(AggregationRequestTest, ShouldIgnoreMaxTimeMsOption) {
- NamespaceString nss("a.collection");
- const BSONObj inputBson =
- fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, maxTimeMS: 'invalid'}");
- ASSERT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
-}
-
-TEST(AggregationRequestTest, ShouldIgnoreReadConcernOption) {
- NamespaceString nss("a.collection");
- const BSONObj inputBson =
- fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, readConcern: 'invalid'}");
- ASSERT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
-}
-
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/query/count_request.cpp b/src/mongo/db/query/count_request.cpp
index 3451cd7989d..2515cb4b382 100644
--- a/src/mongo/db/query/count_request.cpp
+++ b/src/mongo/db/query/count_request.cpp
@@ -30,6 +30,7 @@
#include "mongo/db/query/count_request.h"
+#include "mongo/db/query/query_request.h"
#include "mongo/util/mongoutils/str.h"
namespace mongo {
@@ -42,7 +43,9 @@ const char kSkipField[] = "skip";
const char kHintField[] = "hint";
const char kCollationField[] = "collation";
const char kExplainField[] = "explain";
-
+const char kCommentField[] = "comment";
+const char kMaxTimeMSField[] = "maxTimeMS";
+const char kReadConcernField[] = "readConcern";
} // namespace
CountRequest::CountRequest(NamespaceString nss, BSONObj query)
@@ -56,31 +59,6 @@ void CountRequest::setCollation(BSONObj collation) {
_collation = collation.getOwned();
}
-BSONObj CountRequest::toBSON() const {
- BSONObjBuilder builder;
-
- builder.append(kCmdName, _nss.ns());
- builder.append(kQueryField, _query);
-
- if (_limit) {
- builder.append(kLimitField, _limit.get());
- }
-
- if (_skip) {
- builder.append(kSkipField, _skip.get());
- }
-
- if (_hint) {
- builder.append(kHintField, _hint.get());
- }
-
- if (_collation) {
- builder.append(kCollationField, _collation.get());
- }
-
- return builder.obj();
-}
-
StatusWith<CountRequest> CountRequest::parseFromBSON(const std::string& dbname,
const BSONObj& cmdObj,
bool isExplain) {
@@ -121,8 +99,17 @@ StatusWith<CountRequest> CountRequest::parseFromBSON(const std::string& dbname,
return Status(ErrorCodes::BadValue, "skip value is not a valid number");
}
+ // maxTimeMS
+ if (cmdObj[kMaxTimeMSField].ok()) {
+ auto maxTimeMS = QueryRequest::parseMaxTimeMS(cmdObj[kMaxTimeMSField]);
+ if (!maxTimeMS.isOK()) {
+ return maxTimeMS.getStatus();
+ }
+ request.setMaxTimeMS(static_cast<unsigned int>(maxTimeMS.getValue()));
+ }
+
// Hint
- if (Object == cmdObj[kHintField].type()) {
+ if (BSONType::Object == cmdObj[kHintField].type()) {
request.setHint(cmdObj[kHintField].Obj());
} else if (String == cmdObj[kHintField].type()) {
const std::string hint = cmdObj.getStringField(kHintField);
@@ -130,12 +117,34 @@ StatusWith<CountRequest> CountRequest::parseFromBSON(const std::string& dbname,
}
// Collation
- if (Object == cmdObj[kCollationField].type()) {
+ if (BSONType::Object == cmdObj[kCollationField].type()) {
request.setCollation(cmdObj[kCollationField].Obj());
} else if (cmdObj[kCollationField].ok()) {
return Status(ErrorCodes::BadValue, "collation value is not a document");
}
+ // readConcern
+ if (BSONType::Object == cmdObj[kReadConcernField].type()) {
+ request.setReadConcern(cmdObj[kReadConcernField].Obj());
+ } else if (cmdObj[kReadConcernField].ok()) {
+ return Status(ErrorCodes::BadValue, "readConcern value is not a document");
+ }
+
+ // unwrappedReadPref
+ if (BSONType::Object == cmdObj[QueryRequest::kUnwrappedReadPrefField].type()) {
+ request.setUnwrappedReadPref(cmdObj[QueryRequest::kUnwrappedReadPrefField].Obj());
+ } else if (cmdObj[QueryRequest::kUnwrappedReadPrefField].ok()) {
+ return Status(ErrorCodes::BadValue, "readPreference value is not a document");
+ }
+
+ // Comment
+ if (BSONType::String == cmdObj[kCommentField].type()) {
+ request.setComment(cmdObj[kCommentField].valueStringData());
+ } else if (cmdObj[kCommentField].ok()) {
+ return Status(ErrorCodes::BadValue, "comment value is not a string");
+ }
+
+
// Explain
request.setExplain(isExplain);
@@ -179,6 +188,22 @@ StatusWith<BSONObj> CountRequest::asAggregationCommand() const {
aggregationBuilder.append(kHintField, *_hint);
}
+ if (!_comment.empty()) {
+ aggregationBuilder.append(kCommentField, _comment);
+ }
+
+ if (_maxTimeMS > 0) {
+ aggregationBuilder.append(kMaxTimeMSField, _maxTimeMS);
+ }
+
+ if (!_readConcern.isEmpty()) {
+ aggregationBuilder.append(kReadConcernField, _readConcern);
+ }
+
+ if (!_unwrappedReadPref.isEmpty()) {
+ aggregationBuilder.append(QueryRequest::kUnwrappedReadPrefField, _unwrappedReadPref);
+ }
+
// The 'cursor' option is always specified so that aggregation uses the cursor interface.
aggregationBuilder.append("cursor", BSONObj());
diff --git a/src/mongo/db/query/count_request.h b/src/mongo/db/query/count_request.h
index 0fd466ab93c..e1b951cb7de 100644
--- a/src/mongo/db/query/count_request.h
+++ b/src/mongo/db/query/count_request.h
@@ -54,7 +54,7 @@ public:
return _nss;
}
- const BSONObj getQuery() const {
+ BSONObj getQuery() const {
return _query;
}
@@ -74,13 +74,13 @@ public:
_skip = skip;
}
- const BSONObj getHint() const {
+ BSONObj getHint() const {
return _hint.value_or(BSONObj());
}
void setHint(BSONObj hint);
- const BSONObj getCollation() const {
+ BSONObj getCollation() const {
return _collation.value_or(BSONObj());
}
@@ -94,11 +94,37 @@ public:
_explain = explain;
}
- /**
- * Constructs a BSON representation of this request, which can be used for sending it in
- * commands.
- */
- BSONObj toBSON() const;
+ const std::string& getComment() const {
+ return _comment;
+ }
+
+ void setComment(StringData comment) {
+ _comment = comment.toString();
+ }
+
+ unsigned int getMaxTimeMS() const {
+ return _maxTimeMS;
+ }
+
+ void setMaxTimeMS(unsigned int maxTimeMS) {
+ _maxTimeMS = maxTimeMS;
+ }
+
+ BSONObj getReadConcern() const {
+ return _readConcern;
+ }
+
+ void setReadConcern(BSONObj readConcern) {
+ _readConcern = readConcern.getOwned();
+ }
+
+ BSONObj getUnwrappedReadPref() const {
+ return _unwrappedReadPref;
+ }
+
+ void setUnwrappedReadPref(BSONObj unwrappedReadPref) {
+ _unwrappedReadPref = unwrappedReadPref.getOwned();
+ }
/**
* Converts this CountRequest into an aggregation.
@@ -133,6 +159,19 @@ private:
// Optional. The collation used to compare strings.
boost::optional<BSONObj> _collation;
+ BSONObj _readConcern;
+
+ // The unwrapped readPreference object, if one was given to us by the mongos command processor.
+ // This object will be empty when no readPreference is specified or if the request does not
+ // originate from mongos.
+ BSONObj _unwrappedReadPref;
+
+ // When non-empty, represents a user comment.
+ std::string _comment;
+
+ // A user-specified maxTimeMS limit, or a value of '0' if not specified.
+ unsigned int _maxTimeMS = 0;
+
// If true, generate an explain plan instead of the actual count.
bool _explain = false;
};
diff --git a/src/mongo/db/query/count_request_test.cpp b/src/mongo/db/query/count_request_test.cpp
index 8508c75ead6..509e55236ed 100644
--- a/src/mongo/db/query/count_request_test.cpp
+++ b/src/mongo/db/query/count_request_test.cpp
@@ -61,8 +61,12 @@ TEST(CountRequest, ParseDefaults) {
// Defaults
ASSERT_EQUALS(countRequest.getLimit(), 0);
ASSERT_EQUALS(countRequest.getSkip(), 0);
+ ASSERT_EQUALS(countRequest.getMaxTimeMS(), 0u);
ASSERT(countRequest.getHint().isEmpty());
ASSERT(countRequest.getCollation().isEmpty());
+ ASSERT(countRequest.getReadConcern().isEmpty());
+ ASSERT(countRequest.getUnwrappedReadPref().isEmpty());
+ ASSERT(countRequest.getComment().empty());
}
TEST(CountRequest, ParseComplete) {
@@ -81,7 +85,17 @@ TEST(CountRequest, ParseComplete) {
<< BSON("b" << 5)
<< "collation"
<< BSON("locale"
- << "en_US")),
+ << "en_US")
+ << "readConcern"
+ << BSON("level"
+ << "linearizable")
+ << "$queryOptions"
+ << BSON("$readPreference"
+ << "secondary")
+ << "comment"
+ << "aComment"
+ << "maxTimeMS"
+ << 10000),
isExplain);
ASSERT_OK(countRequestStatus.getStatus());
@@ -92,8 +106,13 @@ TEST(CountRequest, ParseComplete) {
ASSERT_BSONOBJ_EQ(countRequest.getQuery(), fromjson("{ a : { '$gte' : 11 } }"));
ASSERT_EQUALS(countRequest.getLimit(), 100);
ASSERT_EQUALS(countRequest.getSkip(), 1000);
+ ASSERT_EQUALS(countRequest.getMaxTimeMS(), 10000u);
+ ASSERT_EQUALS(countRequest.getComment(), "aComment");
ASSERT_BSONOBJ_EQ(countRequest.getHint(), fromjson("{ b : 5 }"));
ASSERT_BSONOBJ_EQ(countRequest.getCollation(), fromjson("{ locale : 'en_US' }"));
+ ASSERT_BSONOBJ_EQ(countRequest.getReadConcern(), fromjson("{ level: 'linearizable' }"));
+ ASSERT_BSONOBJ_EQ(countRequest.getUnwrappedReadPref(),
+ fromjson("{ $readPreference: 'secondary' }"));
}
TEST(CountRequest, ParseWithExplain) {
@@ -189,26 +208,6 @@ TEST(CountRequest, FailParseBadCollationValue) {
ASSERT_EQUALS(countRequestStatus.getStatus(), ErrorCodes::BadValue);
}
-TEST(CountRequest, ToBSON) {
- CountRequest countRequest(NamespaceString("TestDB.TestColl"), BSON("a" << BSON("$gte" << 11)));
- countRequest.setLimit(100);
- countRequest.setSkip(1000);
- countRequest.setHint(BSON("b" << 5));
- countRequest.setCollation(BSON("locale"
- << "en_US"));
-
- BSONObj actualObj = countRequest.toBSON();
- BSONObj expectedObj(
- fromjson("{ count : 'TestDB.TestColl',"
- " query : { a : { '$gte' : 11 } },"
- " limit : 100,"
- " skip : 1000,"
- " hint : { b : 5 },"
- " collation : { locale : 'en_US' } },"));
-
- ASSERT_BSONOBJ_EQ(actualObj, expectedObj);
-}
-
TEST(CountRequest, ConvertToAggregationWithHint) {
CountRequest countRequest(testns, BSONObj());
countRequest.setHint(BSON("x" << 1));
@@ -271,6 +270,88 @@ TEST(CountRequest, ConvertToAggregationWithQueryAndFilterAndLimit) {
SimpleBSONObjComparator::kInstance.makeEqualTo()));
}
+TEST(CountRequest, ConvertToAggregationWithMaxTimeMS) {
+ CountRequest countRequest(testns, BSONObj());
+ countRequest.setMaxTimeMS(100);
+ auto agg = countRequest.asAggregationCommand();
+ ASSERT_OK(agg);
+
+ auto ar = AggregationRequest::parseFromBSON(testns, agg.getValue());
+ ASSERT_OK(ar.getStatus());
+ ASSERT_EQ(ar.getValue().getMaxTimeMS(), 100u);
+
+ std::vector<BSONObj> expectedPipeline{BSON("$count"
+ << "count")};
+ ASSERT(std::equal(expectedPipeline.begin(),
+ expectedPipeline.end(),
+ ar.getValue().getPipeline().begin(),
+ ar.getValue().getPipeline().end(),
+ SimpleBSONObjComparator::kInstance.makeEqualTo()));
+}
+
+TEST(CountRequest, ConvertToAggregationWithQueryOptions) {
+ CountRequest countRequest(testns, BSONObj());
+ countRequest.setUnwrappedReadPref(BSON("readPreference"
+ << "secondary"));
+ auto agg = countRequest.asAggregationCommand();
+ ASSERT_OK(agg);
+
+ auto ar = AggregationRequest::parseFromBSON(testns, agg.getValue());
+ ASSERT_OK(ar.getStatus());
+ ASSERT_BSONOBJ_EQ(ar.getValue().getUnwrappedReadPref(),
+ BSON("readPreference"
+ << "secondary"));
+
+ std::vector<BSONObj> expectedPipeline{BSON("$count"
+ << "count")};
+ ASSERT(std::equal(expectedPipeline.begin(),
+ expectedPipeline.end(),
+ ar.getValue().getPipeline().begin(),
+ ar.getValue().getPipeline().end(),
+ SimpleBSONObjComparator::kInstance.makeEqualTo()));
+}
+
+TEST(CountRequest, ConvertToAggregationWithComment) {
+ CountRequest countRequest(testns, BSONObj());
+ countRequest.setComment("aComment");
+ auto agg = countRequest.asAggregationCommand();
+ ASSERT_OK(agg);
+
+ auto ar = AggregationRequest::parseFromBSON(testns, agg.getValue());
+ ASSERT_OK(ar.getStatus());
+ ASSERT_EQ(ar.getValue().getComment(), "aComment");
+
+ std::vector<BSONObj> expectedPipeline{BSON("$count"
+ << "count")};
+ ASSERT(std::equal(expectedPipeline.begin(),
+ expectedPipeline.end(),
+ ar.getValue().getPipeline().begin(),
+ ar.getValue().getPipeline().end(),
+ SimpleBSONObjComparator::kInstance.makeEqualTo()));
+}
+
+TEST(CountRequest, ConvertToAggregationWithReadConcern) {
+ CountRequest countRequest(testns, BSONObj());
+ countRequest.setReadConcern(BSON("level"
+ << "linearizable"));
+ auto agg = countRequest.asAggregationCommand();
+ ASSERT_OK(agg);
+
+ auto ar = AggregationRequest::parseFromBSON(testns, agg.getValue());
+ ASSERT_OK(ar.getStatus());
+ ASSERT_BSONOBJ_EQ(ar.getValue().getReadConcern(),
+ BSON("level"
+ << "linearizable"));
+
+ std::vector<BSONObj> expectedPipeline{BSON("$count"
+ << "count")};
+ ASSERT(std::equal(expectedPipeline.begin(),
+ expectedPipeline.end(),
+ ar.getValue().getPipeline().begin(),
+ ar.getValue().getPipeline().end(),
+ SimpleBSONObjComparator::kInstance.makeEqualTo()));
+}
+
TEST(CountRequest, ConvertToAggregationOmitsExplain) {
CountRequest countRequest(testns, BSONObj());
countRequest.setExplain(true);
diff --git a/src/mongo/db/query/parsed_distinct.cpp b/src/mongo/db/query/parsed_distinct.cpp
index 3c3a75c62cd..8b024533326 100644
--- a/src/mongo/db/query/parsed_distinct.cpp
+++ b/src/mongo/db/query/parsed_distinct.cpp
@@ -42,6 +42,7 @@ namespace mongo {
const char ParsedDistinct::kKeyField[] = "key";
const char ParsedDistinct::kQueryField[] = "query";
const char ParsedDistinct::kCollationField[] = "collation";
+const char ParsedDistinct::kCommentField[] = "comment";
StatusWith<BSONObj> ParsedDistinct::asAggregationCommand() const {
BSONObjBuilder aggregationBuilder;
@@ -88,6 +89,23 @@ StatusWith<BSONObj> ParsedDistinct::asAggregationCommand() const {
aggregationBuilder.append(kCollationField, qr.getCollation());
+ if (qr.getMaxTimeMS() > 0) {
+ aggregationBuilder.append(QueryRequest::cmdOptionMaxTimeMS, qr.getMaxTimeMS());
+ }
+
+ if (!qr.getReadConcern().isEmpty()) {
+ aggregationBuilder.append(repl::ReadConcernArgs::kReadConcernFieldName,
+ qr.getReadConcern());
+ }
+
+ if (!qr.getUnwrappedReadPref().isEmpty()) {
+ aggregationBuilder.append(QueryRequest::kUnwrappedReadPrefField, qr.getUnwrappedReadPref());
+ }
+
+ if (!qr.getComment().empty()) {
+ aggregationBuilder.append(kCommentField, qr.getComment());
+ }
+
// Specify the 'cursor' option so that aggregation uses the cursor interface.
aggregationBuilder.append("cursor", BSONObj());
@@ -137,6 +155,50 @@ StatusWith<ParsedDistinct> ParsedDistinct::parse(OperationContext* opCtx,
qr->setCollation(collationElt.embeddedObject());
}
+ if (BSONElement readConcernElt = cmdObj[repl::ReadConcernArgs::kReadConcernFieldName]) {
+ if (readConcernElt.type() != BSONType::Object) {
+ return Status(ErrorCodes::TypeMismatch,
+ str::stream() << "\"" << repl::ReadConcernArgs::kReadConcernFieldName
+ << "\" had the wrong type. Expected "
+ << typeName(BSONType::Object)
+ << ", found "
+ << typeName(readConcernElt.type()));
+ }
+ qr->setReadConcern(readConcernElt.embeddedObject());
+ }
+
+ if (BSONElement commentElt = cmdObj[kCommentField]) {
+ if (commentElt.type() != BSONType::String) {
+ return Status(ErrorCodes::TypeMismatch,
+ str::stream() << "\"" << kCommentField
+ << "\" had the wrong type. Expected "
+ << typeName(BSONType::String)
+ << ", found "
+ << typeName(commentElt.type()));
+ }
+ qr->setComment(commentElt.str());
+ }
+
+ if (BSONElement queryOptionsElt = cmdObj[QueryRequest::kUnwrappedReadPrefField]) {
+ if (queryOptionsElt.type() != BSONType::Object) {
+ return Status(ErrorCodes::TypeMismatch,
+ str::stream() << "\"" << QueryRequest::kUnwrappedReadPrefField
+ << "\" had the wrong type. Expected "
+ << typeName(BSONType::Object)
+ << ", found "
+ << typeName(queryOptionsElt.type()));
+ }
+ qr->setUnwrappedReadPref(queryOptionsElt.embeddedObject());
+ }
+
+ if (BSONElement maxTimeMSElt = cmdObj[QueryRequest::cmdOptionMaxTimeMS]) {
+ auto maxTimeMS = QueryRequest::parseMaxTimeMS(maxTimeMSElt);
+ if (!maxTimeMS.isOK()) {
+ return maxTimeMS.getStatus();
+ }
+ qr->setMaxTimeMS(static_cast<unsigned int>(maxTimeMS.getValue()));
+ }
+
qr->setExplain(isExplain);
auto cq = CanonicalQuery::canonicalize(opCtx, std::move(qr), extensionsCallback);
diff --git a/src/mongo/db/query/parsed_distinct.h b/src/mongo/db/query/parsed_distinct.h
index eb1c4e2b0ab..2294b2e0650 100644
--- a/src/mongo/db/query/parsed_distinct.h
+++ b/src/mongo/db/query/parsed_distinct.h
@@ -49,6 +49,7 @@ public:
static const char kKeyField[];
static const char kQueryField[];
static const char kCollationField[];
+ static const char kCommentField[];
ParsedDistinct(std::unique_ptr<CanonicalQuery> query, const std::string key)
: _query(std::move(query)), _key(std::move(key)) {}
diff --git a/src/mongo/db/query/parsed_distinct_test.cpp b/src/mongo/db/query/parsed_distinct_test.cpp
index 3a4f6ccb42e..68554d2637c 100644
--- a/src/mongo/db/query/parsed_distinct_test.cpp
+++ b/src/mongo/db/query/parsed_distinct_test.cpp
@@ -64,6 +64,74 @@ TEST(ParsedDistinctTest, ConvertToAggregationNoQuery) {
ASSERT_EQ(ar.getValue().getBatchSize(), AggregationRequest::kDefaultBatchSize);
ASSERT_EQ(ar.getValue().getNamespaceString(), testns);
ASSERT_BSONOBJ_EQ(ar.getValue().getCollation(), BSONObj());
+ ASSERT(ar.getValue().getReadConcern().isEmpty());
+ ASSERT(ar.getValue().getUnwrappedReadPref().isEmpty());
+ ASSERT(ar.getValue().getComment().empty());
+ ASSERT_EQUALS(ar.getValue().getMaxTimeMS(), 0u);
+
+ std::vector<BSONObj> expectedPipeline{
+ BSON("$unwind" << BSON("path"
+ << "$x"
+ << "preserveNullAndEmptyArrays"
+ << true)),
+ BSON("$group" << BSON("_id" << BSONNULL << "distinct" << BSON("$addToSet"
+ << "$x")))};
+ ASSERT(std::equal(expectedPipeline.begin(),
+ expectedPipeline.end(),
+ ar.getValue().getPipeline().begin(),
+ ar.getValue().getPipeline().end(),
+ SimpleBSONObjComparator::kInstance.makeEqualTo()));
+}
+
+TEST(ParsedDistinctTest, ConvertToAggregationWithAllOptions) {
+ QueryTestServiceContext serviceContext;
+ auto uniqueTxn = serviceContext.makeOperationContext();
+ OperationContext* opCtx = uniqueTxn.get();
+
+ auto pd = ParsedDistinct::parse(opCtx,
+ testns,
+ BSON("distinct"
+ << "testcoll"
+ << "key"
+ << "x"
+ << "hint"
+ << BSON("b" << 5)
+ << "collation"
+ << BSON("locale"
+ << "en_US")
+ << "readConcern"
+ << BSON("level"
+ << "linearizable")
+ << "$queryOptions"
+ << BSON("readPreference"
+ << "secondary")
+ << "comment"
+ << "aComment"
+ << "maxTimeMS"
+ << 100),
+ ExtensionsCallbackDisallowExtensions(),
+ !isExplain);
+ ASSERT_OK(pd.getStatus());
+
+ auto agg = pd.getValue().asAggregationCommand();
+ ASSERT_OK(agg);
+
+ auto ar = AggregationRequest::parseFromBSON(testns, agg.getValue());
+ ASSERT_OK(ar.getStatus());
+ ASSERT(!ar.getValue().getExplain());
+ ASSERT_EQ(ar.getValue().getBatchSize(), AggregationRequest::kDefaultBatchSize);
+ ASSERT_EQ(ar.getValue().getNamespaceString(), testns);
+ ASSERT_BSONOBJ_EQ(ar.getValue().getCollation(),
+ BSON("locale"
+ << "en_US"));
+ ASSERT_BSONOBJ_EQ(ar.getValue().getReadConcern(),
+ BSON("level"
+ << "linearizable"));
+ ASSERT_BSONOBJ_EQ(ar.getValue().getUnwrappedReadPref(),
+ BSON("readPreference"
+ << "secondary"));
+ ASSERT_EQUALS(ar.getValue().getComment(), "aComment");
+ ASSERT_EQUALS(ar.getValue().getMaxTimeMS(), 100u);
std::vector<BSONObj> expectedPipeline{
BSON("$unwind" << BSON("path"
@@ -100,6 +168,10 @@ TEST(ParsedDistinctTest, ConvertToAggregationWithQuery) {
ASSERT_EQ(ar.getValue().getBatchSize(), AggregationRequest::kDefaultBatchSize);
ASSERT_EQ(ar.getValue().getNamespaceString(), testns);
ASSERT_BSONOBJ_EQ(ar.getValue().getCollation(), BSONObj());
+ ASSERT(ar.getValue().getReadConcern().isEmpty());
+ ASSERT(ar.getValue().getUnwrappedReadPref().isEmpty());
+ ASSERT(ar.getValue().getComment().empty());
+ ASSERT_EQUALS(ar.getValue().getMaxTimeMS(), 0u);
std::vector<BSONObj> expectedPipeline{
BSON("$match" << BSON("z" << 7)),
diff --git a/src/mongo/db/query/query_request.cpp b/src/mongo/db/query/query_request.cpp
index 04c4154f741..467f57f41f0 100644
--- a/src/mongo/db/query/query_request.cpp
+++ b/src/mongo/db/query/query_request.cpp
@@ -168,6 +168,14 @@ StatusWith<unique_ptr<QueryRequest>> QueryRequest::makeFromFindCommand(Namespace
}
qr->_readConcern = el.Obj().getOwned();
+ } else if (str::equals(fieldName, QueryRequest::kUnwrappedReadPrefField.c_str())) {
+ // Read preference parsing is handled elsewhere, but we store a copy here.
+ Status status = checkFieldType(el, Object);
+ if (!status.isOK()) {
+ return status;
+ }
+
+ qr->setUnwrappedReadPref(el.Obj());
} else if (str::equals(fieldName, kCollationField)) {
// Collation parsing is handled elsewhere, but we store a copy here.
Status status = checkFieldType(el, Object);
@@ -1063,6 +1071,12 @@ StatusWith<BSONObj> QueryRequest::asAggregationCommand() const {
if (!_comment.empty()) {
aggregationBuilder.append("comment", _comment);
}
+ if (!_readConcern.isEmpty()) {
+ aggregationBuilder.append("readConcern", _readConcern);
+ }
+ if (!_unwrappedReadPref.isEmpty()) {
+ aggregationBuilder.append(QueryRequest::kUnwrappedReadPrefField, _unwrappedReadPref);
+ }
return StatusWith<BSONObj>(aggregationBuilder.obj());
}
} // namespace mongo
diff --git a/src/mongo/db/query/query_request.h b/src/mongo/db/query/query_request.h
index 376d5ece98c..a8ca956c43b 100644
--- a/src/mongo/db/query/query_request.h
+++ b/src/mongo/db/query/query_request.h
@@ -250,6 +250,14 @@ public:
_comment = comment;
}
+ const BSONObj& getUnwrappedReadPref() const {
+ return _unwrappedReadPref;
+ }
+
+ void setUnwrappedReadPref(BSONObj unwrappedReadPref) {
+ _unwrappedReadPref = unwrappedReadPref.getOwned();
+ }
+
int getMaxScan() const {
return _maxScan;
}
@@ -449,6 +457,11 @@ private:
// The collation is parsed elsewhere.
BSONObj _collation;
+ // The unwrapped readPreference object, if one was given to us by the mongos command processor.
+ // This object will be empty when no readPreference is specified or if the request does not
+ // originate from mongos.
+ BSONObj _unwrappedReadPref;
+
bool _wantMore = true;
// Must be either unset or positive. Negative skip is illegal and a skip of zero received from
@@ -473,6 +486,8 @@ private:
std::string _comment;
int _maxScan = 0;
+
+ // A user-specified maxTimeMS limit, or a value of '0' if not specified.
int _maxTimeMS = 0;
BSONObj _min;
diff --git a/src/mongo/db/query/query_request_test.cpp b/src/mongo/db/query/query_request_test.cpp
index 0834cd6e4c8..f64a5429faf 100644
--- a/src/mongo/db/query/query_request_test.cpp
+++ b/src/mongo/db/query/query_request_test.cpp
@@ -473,6 +473,7 @@ TEST(QueryRequestTest, ParseFromCommandAllNonOptionFields) {
"projection: {c: 1},"
"hint: {d: 1},"
"readConcern: {e: 1},"
+ "$queryOptions: {$readPreference: 'secondary'},"
"collation: {f: 1},"
"limit: 3,"
"skip: 5,"
@@ -494,6 +495,9 @@ TEST(QueryRequestTest, ParseFromCommandAllNonOptionFields) {
ASSERT_EQUALS(0, expectedHint.woCompare(qr->getHint()));
BSONObj expectedReadConcern = BSON("e" << 1);
ASSERT_EQUALS(0, expectedReadConcern.woCompare(qr->getReadConcern()));
+ BSONObj expectedUnwrappedReadPref = BSON("$readPreference"
+ << "secondary");
+ ASSERT_EQUALS(0, expectedUnwrappedReadPref.woCompare(qr->getUnwrappedReadPref()));
BSONObj expectedCollation = BSON("f" << 1);
ASSERT_EQUALS(0, expectedCollation.woCompare(qr->getCollation()));
ASSERT_EQUALS(3, *qr->getLimit());
@@ -589,6 +593,7 @@ TEST(QueryRequestTest, ParseFromCommandSkipWrongType) {
ASSERT_NOT_OK(result.getStatus());
}
+
TEST(QueryRequestTest, ParseFromCommandLimitWrongType) {
BSONObj cmdObj = fromjson(
"{find: 'testns',"
@@ -624,6 +629,17 @@ TEST(QueryRequestTest, ParseFromCommandCommentWrongType) {
ASSERT_NOT_OK(result.getStatus());
}
+TEST(QueryRequestTest, ParseFromCommandUnwrappedReadPrefWrongType) {
+ BSONObj cmdObj = fromjson(
+ "{find: 'testns',"
+ "filter: {a: 1},"
+ "$queryOptions: 1}");
+ const NamespaceString nss("test.testns");
+ bool isExplain = false;
+ auto result = QueryRequest::makeFromFindCommand(nss, cmdObj, isExplain);
+ ASSERT_NOT_OK(result.getStatus());
+}
+
TEST(QueryRequestTest, ParseFromCommandMaxScanWrongType) {
BSONObj cmdObj = fromjson(
"{find: 'testns',"
diff --git a/src/mongo/db/views/resolved_view.cpp b/src/mongo/db/views/resolved_view.cpp
index 3376c05570d..fa0564113f8 100644
--- a/src/mongo/db/views/resolved_view.cpp
+++ b/src/mongo/db/views/resolved_view.cpp
@@ -85,6 +85,9 @@ AggregationRequest ResolvedView::asExpandedViewAggregation(
expandedRequest.setHint(request.getHint());
expandedRequest.setComment(request.getComment());
+ expandedRequest.setMaxTimeMS(request.getMaxTimeMS());
+ expandedRequest.setReadConcern(request.getReadConcern());
+ expandedRequest.setUnwrappedReadPref(request.getUnwrappedReadPref());
expandedRequest.setBypassDocumentValidation(request.shouldBypassDocumentValidation());
expandedRequest.setAllowDiskUse(request.shouldAllowDiskUse());
expandedRequest.setCollation(request.getCollation());
diff --git a/src/mongo/db/views/resolved_view_test.cpp b/src/mongo/db/views/resolved_view_test.cpp
index cd34a8e6cf4..05c47d3fd51 100644
--- a/src/mongo/db/views/resolved_view_test.cpp
+++ b/src/mongo/db/views/resolved_view_test.cpp
@@ -113,6 +113,48 @@ TEST(ResolvedViewTest, ExpandingAggRequestPreservesAllowDiskUse) {
ASSERT_TRUE(result.shouldAllowDiskUse());
}
+TEST(ResolvedViewTest, ExpandingAggRequestPreservesHint) {
+ const ResolvedView resolvedView{backingNss, emptyPipeline};
+ AggregationRequest aggRequest(viewNss, {});
+ aggRequest.setHint(BSON("a" << 1));
+
+ auto result = resolvedView.asExpandedViewAggregation(aggRequest);
+ ASSERT_BSONOBJ_EQ(result.getHint(), BSON("a" << 1));
+}
+
+TEST(ResolvedViewTest, ExpandingAggRequestPreservesReadPreference) {
+ const ResolvedView resolvedView{backingNss, emptyPipeline};
+ AggregationRequest aggRequest(viewNss, {});
+ aggRequest.setUnwrappedReadPref(BSON("$readPreference"
+ << "nearest"));
+
+ auto result = resolvedView.asExpandedViewAggregation(aggRequest);
+ ASSERT_BSONOBJ_EQ(result.getUnwrappedReadPref(),
+ BSON("$readPreference"
+ << "nearest"));
+}
+
+TEST(ResolvedViewTest, ExpandingAggRequestPreservesReadConcern) {
+ const ResolvedView resolvedView{backingNss, emptyPipeline};
+ AggregationRequest aggRequest(viewNss, {});
+ aggRequest.setReadConcern(BSON("level"
+ << "linearizable"));
+
+ auto result = resolvedView.asExpandedViewAggregation(aggRequest);
+ ASSERT_BSONOBJ_EQ(result.getReadConcern(),
+ BSON("level"
+ << "linearizable"));
+}
+
+TEST(ResolvedViewTest, ExpandingAggRequestPreservesMaxTimeMS) {
+ const ResolvedView resolvedView{backingNss, emptyPipeline};
+ AggregationRequest aggRequest(viewNss, {});
+ aggRequest.setMaxTimeMS(100u);
+
+ auto result = resolvedView.asExpandedViewAggregation(aggRequest);
+ ASSERT_EQ(result.getMaxTimeMS(), 100u);
+}
+
TEST(ResolvedViewTest, ExpandingAggRequestPreservesCollation) {
const ResolvedView resolvedView{backingNss, emptyPipeline};
AggregationRequest aggRequest(viewNss, {});
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp
index 010cbc55e63..bdb4be00869 100644
--- a/src/mongo/s/commands/cluster_aggregate.cpp
+++ b/src/mongo/s/commands/cluster_aggregate.cpp
@@ -60,12 +60,6 @@
namespace mongo {
namespace {
-// These fields are not part of the AggregationRequest since they are not handled by the aggregation
-// subsystem, so we serialize them separately.
-const std::initializer_list<StringData> kFieldsToPropagateToShards = {
- "$queryOptions", "readConcern", QueryRequest::cmdOptionMaxTimeMS,
-};
-
// Given a document representing an aggregation command such as
//
// {aggregate: "myCollection", pipeline: [], ...},
@@ -73,12 +67,7 @@ const std::initializer_list<StringData> kFieldsToPropagateToShards = {
// produces the corresponding explain command:
//
// {explain: {aggregate: "myCollection", pipline: [], ...}, verbosity: ...}
-//
-// The 'cmdObj' is the original user request, which may contain fields to propagate to the shards
-// that are not handled by the aggregation subsystem itself (e.g. maxTimeMS).
-Document wrapAggAsExplain(Document aggregateCommand,
- ExplainOptions::Verbosity verbosity,
- const BSONObj& cmdObj) {
+Document wrapAggAsExplain(Document aggregateCommand, ExplainOptions::Verbosity verbosity) {
MutableDocument explainCommandBuilder;
explainCommandBuilder["explain"] = Value(aggregateCommand);
@@ -87,11 +76,6 @@ Document wrapAggAsExplain(Document aggregateCommand,
explainCommandBuilder[explainOption.fieldNameStringData()] = Value(explainOption);
}
- // Propagate options not specific to agg to the shards.
- for (auto&& field : kFieldsToPropagateToShards) {
- explainCommandBuilder[field] = Value(cmdObj[field]);
- }
-
return explainCommandBuilder.freeze();
}
@@ -208,13 +192,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
// If this is a request for an aggregation explain, then we must wrap the aggregate inside an
// explain command.
if (mergeCtx->explain) {
- commandBuilder.reset(wrapAggAsExplain(commandBuilder.freeze(), *mergeCtx->explain, cmdObj));
- } else {
- // Add things like $queryOptions which must be sent to the shards but are not part of the
- // agg request.
- for (auto&& field : kFieldsToPropagateToShards) {
- commandBuilder[field] = Value(cmdObj[field]);
- }
+ commandBuilder.reset(wrapAggAsExplain(commandBuilder.freeze(), *mergeCtx->explain));
}
BSONObj shardedCommand = commandBuilder.freeze().toBson();
@@ -277,10 +255,6 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
mergeCmd["pipeline"] = Value(pipeline.getValue()->serialize());
mergeCmd["cursor"] = Value(cmdObj["cursor"]);
- if (cmdObj.hasField("$queryOptions")) {
- mergeCmd["$queryOptions"] = Value(cmdObj["$queryOptions"]);
- }
-
if (cmdObj.hasField(QueryRequest::cmdOptionMaxTimeMS)) {
mergeCmd[QueryRequest::cmdOptionMaxTimeMS] =
Value(cmdObj[QueryRequest::cmdOptionMaxTimeMS]);
@@ -493,7 +467,7 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx,
// the shards.
if (aggRequest.getExplain()) {
auto explainCmdObj =
- wrapAggAsExplain(aggRequest.serializeToCommandObj(), *aggRequest.getExplain(), cmdObj);
+ wrapAggAsExplain(aggRequest.serializeToCommandObj(), *aggRequest.getExplain());
cmdObj = explainCmdObj.toBson();
}