summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorKevin Pulo <kevin.pulo@mongodb.com>2019-11-26 03:23:07 +0000
committerevergreen <evergreen@mongodb.com>2019-11-26 03:23:07 +0000
commit8b0f534a706005d366e200ee56af5c76217656b2 (patch)
tree4ff0d0d1ea68d4f1a17ce39c4ab8087cfa617434 /src
parent34e093782f53dec39ff89116c0c7128430c99bae (diff)
downloadmongo-8b0f534a706005d366e200ee56af5c76217656b2.tar.gz
SERVER-44470 Parse aggregation commands earlier, and rationalize aggregation readConcern handling
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/commands.h58
-rw-r--r--src/mongo/db/commands/count_cmd.cpp3
-rw-r--r--src/mongo/db/commands/distinct.cpp3
-rw-r--r--src/mongo/db/commands/explain_cmd.cpp2
-rw-r--r--src/mongo/db/commands/find_cmd.cpp3
-rw-r--r--src/mongo/db/commands/haystack.cpp3
-rw-r--r--src/mongo/db/commands/pipeline_command.cpp88
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp16
-rw-r--r--src/mongo/db/commands/run_aggregate.h12
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h16
-rw-r--r--src/mongo/db/pipeline/document_source_current_op.h4
-rw-r--r--src/mongo/db/pipeline/document_source_list_cached_and_active_users.h4
-rw-r--r--src/mongo/db/pipeline/document_source_list_local_sessions.h4
-rw-r--r--src/mongo/db/pipeline/document_source_plan_cache_stats.h4
-rw-r--r--src/mongo/db/pipeline/lite_parsed_document_source.h36
-rw-r--r--src/mongo/db/pipeline/lite_parsed_pipeline.cpp52
-rw-r--r--src/mongo/db/pipeline/lite_parsed_pipeline.h10
-rw-r--r--src/mongo/db/read_concern_support_result.h86
-rw-r--r--src/mongo/db/service_entry_point_common.cpp63
-rw-r--r--src/mongo/s/commands/cluster_count_cmd.cpp6
-rw-r--r--src/mongo/s/commands/cluster_current_op.cpp1
-rw-r--r--src/mongo/s/commands/cluster_distinct_cmd.cpp3
-rw-r--r--src/mongo/s/commands/cluster_explain_cmd.cpp2
-rw-r--r--src/mongo/s/commands/cluster_find_and_modify_cmd.cpp3
-rw-r--r--src/mongo/s/commands/cluster_find_cmd.cpp3
-rw-r--r--src/mongo/s/commands/cluster_killcursors_cmd.cpp3
-rw-r--r--src/mongo/s/commands/cluster_pipeline_cmd.cpp72
-rw-r--r--src/mongo/s/commands/cluster_write_cmd.cpp3
-rw-r--r--src/mongo/s/commands/strategy.cpp53
-rw-r--r--src/mongo/s/query/cluster_aggregate.cpp36
-rw-r--r--src/mongo/s/query/cluster_aggregate.h12
-rw-r--r--src/mongo/s/query/cluster_aggregate_test.cpp1
32 files changed, 446 insertions, 219 deletions
diff --git a/src/mongo/db/commands.h b/src/mongo/db/commands.h
index ea62ac51fc0..2350a040b6d 100644
--- a/src/mongo/db/commands.h
+++ b/src/mongo/db/commands.h
@@ -45,6 +45,7 @@
#include "mongo/db/commands/test_commands_enabled.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/query/explain.h"
+#include "mongo/db/read_concern_support_result.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/db/write_concern.h"
#include "mongo/rpc/op_msg.h"
@@ -269,6 +270,13 @@ public:
virtual std::unique_ptr<CommandInvocation> parse(OperationContext* opCtx,
const OpMsgRequest& request) = 0;
+ virtual std::unique_ptr<CommandInvocation> parseForExplain(
+ OperationContext* opCtx,
+ const OpMsgRequest& request,
+ boost::optional<ExplainOptions::Verbosity> explainVerbosity) {
+ return parse(opCtx, request);
+ }
+
/**
* Returns the command's name. This value never changes for the lifetime of the command.
*/
@@ -433,46 +441,6 @@ private:
};
/**
- * The result of checking an invocation's support for readConcern. There are two parts:
- * - Whether or not the invocation supports the given readConcern.
- * - Whether or not the invocation permits having the default readConcern applied to it.
- */
-struct ReadConcernSupportResult {
- /**
- * Whether this command invocation supports the requested readConcern level. This only
- * applies when running outside transactions because all commands that are allowed to run
- * in a transaction must support all the read concerns that can be used in a transaction.
- */
- enum class ReadConcern { kSupported, kNotSupported } readConcern;
-
- /**
- * Whether this command invocation supports applying the default readConcern to it.
- */
- enum class DefaultReadConcern { kPermitted, kNotPermitted } defaultReadConcern;
-
- /**
- * Construct with either the enum value or a bool, where true indicates
- * ReadConcern::kSupported or DefaultReadConcern::kPermitted (as appropriate).
- */
- ReadConcernSupportResult(ReadConcern supported, DefaultReadConcern defaultPermitted)
- : readConcern(supported), defaultReadConcern(defaultPermitted) {}
-
- ReadConcernSupportResult(bool supported, DefaultReadConcern defaultPermitted)
- : readConcern(supported ? ReadConcern::kSupported : ReadConcern::kNotSupported),
- defaultReadConcern(defaultPermitted) {}
-
- ReadConcernSupportResult(ReadConcern supported, bool defaultPermitted)
- : readConcern(supported),
- defaultReadConcern(defaultPermitted ? DefaultReadConcern::kPermitted
- : DefaultReadConcern::kNotPermitted) {}
-
- ReadConcernSupportResult(bool supported, bool defaultPermitted)
- : readConcern(supported ? ReadConcern::kSupported : ReadConcern::kNotSupported),
- defaultReadConcern(defaultPermitted ? DefaultReadConcern::kPermitted
- : DefaultReadConcern::kNotPermitted) {}
-};
-
-/**
* Represents a single invocation of a given command.
*/
class CommandInvocation {
@@ -516,8 +484,9 @@ public:
* Returns this invocation's support for readConcern.
*/
virtual ReadConcernSupportResult supportsReadConcern(repl::ReadConcernLevel level) const {
- return {level == repl::ReadConcernLevel::kLocalReadConcern,
- ReadConcernSupportResult::DefaultReadConcern::kNotPermitted};
+ return {{level != repl::ReadConcernLevel::kLocalReadConcern,
+ {ErrorCodes::InvalidOptions, "read concern not supported"}},
+ {{ErrorCodes::InvalidOptions, "default read concern not permitted"}}};
}
/**
@@ -667,8 +636,9 @@ public:
virtual ReadConcernSupportResult supportsReadConcern(const std::string& dbName,
const BSONObj& cmdObj,
repl::ReadConcernLevel level) const {
- return {level == repl::ReadConcernLevel::kLocalReadConcern,
- ReadConcernSupportResult::DefaultReadConcern::kNotPermitted};
+ return {{level != repl::ReadConcernLevel::kLocalReadConcern,
+ {ErrorCodes::InvalidOptions, "read concern not supported"}},
+ {{ErrorCodes::InvalidOptions, "default read concern not permitted"}}};
}
virtual bool allowsAfterClusterTime(const BSONObj& cmdObj) const {
diff --git a/src/mongo/db/commands/count_cmd.cpp b/src/mongo/db/commands/count_cmd.cpp
index da2033e7645..b31bb5822cc 100644
--- a/src/mongo/db/commands/count_cmd.cpp
+++ b/src/mongo/db/commands/count_cmd.cpp
@@ -93,8 +93,7 @@ public:
ReadConcernSupportResult supportsReadConcern(const std::string& dbName,
const BSONObj& cmdObj,
repl::ReadConcernLevel level) const override {
- return {ReadConcernSupportResult::ReadConcern::kSupported,
- ReadConcernSupportResult::DefaultReadConcern::kPermitted};
+ return ReadConcernSupportResult::allSupportedAndDefaultPermitted();
}
ReadWriteType getReadWriteType() const override {
diff --git a/src/mongo/db/commands/distinct.cpp b/src/mongo/db/commands/distinct.cpp
index ff93a8fcfae..4da2af80abf 100644
--- a/src/mongo/db/commands/distinct.cpp
+++ b/src/mongo/db/commands/distinct.cpp
@@ -87,8 +87,7 @@ public:
ReadConcernSupportResult supportsReadConcern(const std::string& dbName,
const BSONObj& cmdObj,
repl::ReadConcernLevel level) const override {
- return {ReadConcernSupportResult::ReadConcern::kSupported,
- ReadConcernSupportResult::DefaultReadConcern::kPermitted};
+ return ReadConcernSupportResult::allSupportedAndDefaultPermitted();
}
ReadWriteType getReadWriteType() const override {
diff --git a/src/mongo/db/commands/explain_cmd.cpp b/src/mongo/db/commands/explain_cmd.cpp
index 78fa4d67217..2d3460d6a63 100644
--- a/src/mongo/db/commands/explain_cmd.cpp
+++ b/src/mongo/db/commands/explain_cmd.cpp
@@ -170,7 +170,7 @@ std::unique_ptr<CommandInvocation> CmdExplain::parse(OperationContext* opCtx,
explainedCommand);
auto innerRequest =
std::make_unique<OpMsgRequest>(OpMsgRequest::fromDBAndBody(dbname, explainedObj));
- auto innerInvocation = explainedCommand->parse(opCtx, *innerRequest);
+ auto innerInvocation = explainedCommand->parseForExplain(opCtx, *innerRequest, verbosity);
return std::make_unique<Invocation>(
this, request, std::move(verbosity), std::move(innerRequest), std::move(innerInvocation));
}
diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp
index 56e65aad432..bd24d034e29 100644
--- a/src/mongo/db/commands/find_cmd.cpp
+++ b/src/mongo/db/commands/find_cmd.cpp
@@ -178,8 +178,7 @@ public:
}
ReadConcernSupportResult supportsReadConcern(repl::ReadConcernLevel level) const final {
- return {ReadConcernSupportResult::ReadConcern::kSupported,
- ReadConcernSupportResult::DefaultReadConcern::kPermitted};
+ return ReadConcernSupportResult::allSupportedAndDefaultPermitted();
}
bool canIgnorePrepareConflicts() const override {
diff --git a/src/mongo/db/commands/haystack.cpp b/src/mongo/db/commands/haystack.cpp
index a14a4fd3a29..3065e3e3f7c 100644
--- a/src/mongo/db/commands/haystack.cpp
+++ b/src/mongo/db/commands/haystack.cpp
@@ -76,8 +76,7 @@ public:
ReadConcernSupportResult supportsReadConcern(const std::string& dbName,
const BSONObj& cmdObj,
repl::ReadConcernLevel level) const final {
- return {ReadConcernSupportResult::ReadConcern::kSupported,
- ReadConcernSupportResult::DefaultReadConcern::kPermitted};
+ return ReadConcernSupportResult::allSupportedAndDefaultPermitted();
}
ReadWriteType getReadWriteType() const {
diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp
index 85c220d091a..a70ae8a51d3 100644
--- a/src/mongo/db/commands/pipeline_command.cpp
+++ b/src/mongo/db/commands/pipeline_command.cpp
@@ -33,42 +33,62 @@
#include "mongo/db/commands.h"
#include "mongo/db/commands/run_aggregate.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/pipeline/lite_parsed_pipeline.h"
#include "mongo/db/pipeline/pipeline.h"
namespace mongo {
namespace {
-bool isMergePipeline(const std::vector<BSONObj>& pipeline) {
- if (pipeline.empty()) {
- return false;
- }
- return pipeline[0].hasField("$mergeCursors");
-}
-
class PipelineCommand final : public Command {
public:
PipelineCommand() : Command("aggregate") {}
+ /**
+ * It's not known until after parsing whether or not an aggregation command is an explain
+ * request, because it might include the `explain: true` field (ie. aggregation explains do not
+ * need to arrive via the `explain` command). Therefore even parsing of regular aggregation
+ * commands needs to be able to handle the explain case.
+ *
+ * As a result, aggregation command parsing is done in parseForExplain():
+ *
+ * - To parse a regular aggregation command, call parseForExplain() with `explainVerbosity` of
+ * boost::none.
+ *
+ * - To parse an aggregation command as the sub-command in an `explain` command, call
+ * parseForExplain() with `explainVerbosity` set to the desired verbosity.
+ */
std::unique_ptr<CommandInvocation> parse(OperationContext* opCtx,
const OpMsgRequest& opMsgRequest) override {
- // TODO: Parsing to a Pipeline and/or AggregationRequest here.
-
- auto privileges =
- uassertStatusOK(AuthorizationSession::get(opCtx->getClient())
- ->getPrivilegesForAggregate(
- AggregationRequest::parseNs(
- opMsgRequest.getDatabase().toString(), opMsgRequest.body),
- opMsgRequest.body,
- false));
- return std::make_unique<Invocation>(this, opMsgRequest, std::move(privileges));
+ return parseForExplain(opCtx, opMsgRequest, boost::none);
+ }
+
+ std::unique_ptr<CommandInvocation> parseForExplain(
+ OperationContext* opCtx,
+ const OpMsgRequest& opMsgRequest,
+ boost::optional<ExplainOptions::Verbosity> explainVerbosity) override {
+ const auto aggregationRequest = uassertStatusOK(AggregationRequest::parseFromBSON(
+ opMsgRequest.getDatabase().toString(), opMsgRequest.body, explainVerbosity));
+
+ auto privileges = uassertStatusOK(
+ AuthorizationSession::get(opCtx->getClient())
+ ->getPrivilegesForAggregate(
+ aggregationRequest.getNamespaceString(), opMsgRequest.body, false));
+
+ return std::make_unique<Invocation>(
+ this, opMsgRequest, std::move(aggregationRequest), std::move(privileges));
}
class Invocation final : public CommandInvocation {
public:
- Invocation(Command* cmd, const OpMsgRequest& request, PrivilegeVector privileges)
+ Invocation(Command* cmd,
+ const OpMsgRequest& request,
+ const AggregationRequest aggregationRequest,
+ PrivilegeVector privileges)
: CommandInvocation(cmd),
_request(request),
_dbName(request.getDatabase().toString()),
+ _aggregationRequest(std::move(aggregationRequest)),
+ _liteParsedPipeline(_aggregationRequest),
_privileges(std::move(privileges)) {}
private:
@@ -83,16 +103,10 @@ public:
}
ReadConcernSupportResult supportsReadConcern(repl::ReadConcernLevel level) const override {
- // Aggregations that are run directly against a collection allow any read concern.
- // Otherwise, if the aggregate is collectionless then the read concern must be 'local'
- // (e.g. $currentOp). The exception to this is a $changeStream on a whole database,
- // which is considered collectionless but must be read concern 'majority'. Further read
- // concern validation is done one the pipeline is parsed.
- return {level == repl::ReadConcernLevel::kLocalReadConcern ||
- level == repl::ReadConcernLevel::kMajorityReadConcern ||
- !AggregationRequest::parseNs(_dbName, _request.body)
- .isCollectionlessAggregateNS(),
- ReadConcernSupportResult::DefaultReadConcern::kPermitted};
+ return _liteParsedPipeline.supportsReadConcern(
+ level,
+ _aggregationRequest.getExplain(),
+ serverGlobalParams.enableMajorityReadConcern);
}
bool allowsSpeculativeMajorityReads() const override {
@@ -106,29 +120,27 @@ public:
CommandHelpers::handleMarkKillOnClientDisconnect(
opCtx, !Pipeline::aggHasWriteStage(_request.body));
- const auto aggregationRequest = uassertStatusOK(
- AggregationRequest::parseFromBSON(_dbName, _request.body, boost::none));
uassertStatusOK(runAggregate(opCtx,
- aggregationRequest.getNamespaceString(),
- aggregationRequest,
+ _aggregationRequest.getNamespaceString(),
+ _aggregationRequest,
+ _liteParsedPipeline,
_request.body,
_privileges,
reply));
}
NamespaceString ns() const override {
- return AggregationRequest::parseNs(_dbName, _request.body);
+ return _aggregationRequest.getNamespaceString();
}
void explain(OperationContext* opCtx,
ExplainOptions::Verbosity verbosity,
rpc::ReplyBuilderInterface* result) override {
- const auto aggregationRequest = uassertStatusOK(
- AggregationRequest::parseFromBSON(_dbName, _request.body, verbosity));
uassertStatusOK(runAggregate(opCtx,
- aggregationRequest.getNamespaceString(),
- aggregationRequest,
+ _aggregationRequest.getNamespaceString(),
+ _aggregationRequest,
+ _liteParsedPipeline,
_request.body,
_privileges,
result));
@@ -143,6 +155,8 @@ public:
const OpMsgRequest& _request;
const std::string _dbName;
+ const AggregationRequest _aggregationRequest;
+ const LiteParsedPipeline _liteParsedPipeline;
const PrivilegeVector _privileges;
};
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index b7b87dd6018..ef7394df62a 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -480,8 +480,18 @@ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> createOuterPipelineProxyExe
} // namespace
Status runAggregate(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const AggregationRequest& request,
+ const BSONObj& cmdObj,
+ const PrivilegeVector& privileges,
+ rpc::ReplyBuilderInterface* result) {
+ return runAggregate(opCtx, nss, request, {request}, cmdObj, privileges, result);
+}
+
+Status runAggregate(OperationContext* opCtx,
const NamespaceString& origNss,
const AggregationRequest& request,
+ const LiteParsedPipeline& liteParsedPipeline,
const BSONObj& cmdObj,
const PrivilegeVector& privileges,
rpc::ReplyBuilderInterface* result) {
@@ -504,18 +514,12 @@ Status runAggregate(OperationContext* opCtx,
boost::intrusive_ptr<ExpressionContext> expCtx;
auto curOp = CurOp::get(opCtx);
{
- const LiteParsedPipeline liteParsedPipeline(request);
-
// If we are in a transaction, check whether the parsed pipeline supports
// being in a transaction.
if (opCtx->inMultiDocumentTransaction()) {
liteParsedPipeline.assertSupportsMultiDocumentTransaction(request.getExplain());
}
- // Check whether the parsed pipeline supports the given read concern.
- liteParsedPipeline.assertSupportsReadConcern(
- opCtx, request.getExplain(), serverGlobalParams.enableMajorityReadConcern);
-
const auto& pipelineInvolvedNamespaces = liteParsedPipeline.getInvolvedNamespaces();
// If this is a collectionless aggregation, we won't create 'ctx' but will still need an
diff --git a/src/mongo/db/commands/run_aggregate.h b/src/mongo/db/commands/run_aggregate.h
index 77d6bd7f9f9..059c48a599b 100644
--- a/src/mongo/db/commands/run_aggregate.h
+++ b/src/mongo/db/commands/run_aggregate.h
@@ -35,6 +35,7 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/pipeline/aggregation_request.h"
+#include "mongo/db/pipeline/lite_parsed_pipeline.h"
#include "mongo/rpc/op_msg_rpc_impls.h"
namespace mongo {
@@ -53,6 +54,17 @@ namespace mongo {
Status runAggregate(OperationContext* opCtx,
const NamespaceString& nss,
const AggregationRequest& request,
+ const LiteParsedPipeline& liteParsedPipeline,
+ const BSONObj& cmdObj,
+ const PrivilegeVector& privileges,
+ rpc::ReplyBuilderInterface* result);
+
+/**
+ * Convenience version that internally constructs the LiteParsedPipeline.
+ */
+Status runAggregate(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const AggregationRequest& request,
const BSONObj& cmdObj,
const PrivilegeVector& privileges,
rpc::ReplyBuilderInterface* result);
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
index d3492b0c319..8f8ef6f6e79 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.h
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -80,15 +80,13 @@ public:
}
}
- void assertSupportsReadConcern(const repl::ReadConcernArgs& readConcern) const {
- // Only "majority" is allowed for change streams.
- uassert(
- ErrorCodes::InvalidOptions,
- str::stream()
- << "$changeStream cannot run with a readConcern other than 'majority'. Current "
- << "readConcern: " << readConcern.toString(),
- !readConcern.hasLevel() ||
- readConcern.getLevel() == repl::ReadConcernLevel::kMajorityReadConcern);
+ ReadConcernSupportResult supportsReadConcern(repl::ReadConcernLevel level) const {
+ // Change streams require "majority" readConcern. If the client did not specify an
+ // explicit readConcern, change streams will internally upconvert the readConcern to
+ // majority (so clients can always send aggregations without readConcern). We therefore
+ // do not permit the cluster-wide default to be applied.
+ return onlySingleReadConcernSupported(
+ kStageName, repl::ReadConcernLevel::kMajorityReadConcern, level);
}
void assertSupportsMultiDocumentTransaction() const {
diff --git a/src/mongo/db/pipeline/document_source_current_op.h b/src/mongo/db/pipeline/document_source_current_op.h
index bc66e0fef64..f2e6470f370 100644
--- a/src/mongo/db/pipeline/document_source_current_op.h
+++ b/src/mongo/db/pipeline/document_source_current_op.h
@@ -79,8 +79,8 @@ public:
return true;
}
- void assertSupportsReadConcern(const repl::ReadConcernArgs& readConcern) const {
- onlyReadConcernLocalSupported(kStageName, readConcern);
+ ReadConcernSupportResult supportsReadConcern(repl::ReadConcernLevel level) const {
+ return onlyReadConcernLocalSupported(kStageName, level);
}
void assertSupportsMultiDocumentTransaction() const {
diff --git a/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h b/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h
index 9406bf2e126..fabc4e0cfd7 100644
--- a/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h
+++ b/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h
@@ -69,8 +69,8 @@ public:
return false;
}
- void assertSupportsReadConcern(const repl::ReadConcernArgs& readConcern) const {
- onlyReadConcernLocalSupported(kStageName, readConcern);
+ ReadConcernSupportResult supportsReadConcern(repl::ReadConcernLevel level) const {
+ return onlyReadConcernLocalSupported(kStageName, level);
}
void assertSupportsMultiDocumentTransaction() const {
diff --git a/src/mongo/db/pipeline/document_source_list_local_sessions.h b/src/mongo/db/pipeline/document_source_list_local_sessions.h
index b4439a4323d..0b1738b4f53 100644
--- a/src/mongo/db/pipeline/document_source_list_local_sessions.h
+++ b/src/mongo/db/pipeline/document_source_list_local_sessions.h
@@ -81,8 +81,8 @@ public:
return false;
}
- void assertSupportsReadConcern(const repl::ReadConcernArgs& readConcern) const {
- onlyReadConcernLocalSupported(DocumentSourceListLocalSessions::kStageName, readConcern);
+ ReadConcernSupportResult supportsReadConcern(repl::ReadConcernLevel level) const {
+ return onlyReadConcernLocalSupported(kStageName, level);
}
void assertSupportsMultiDocumentTransaction() const {
diff --git a/src/mongo/db/pipeline/document_source_plan_cache_stats.h b/src/mongo/db/pipeline/document_source_plan_cache_stats.h
index 5902896ee72..cdb582bed14 100644
--- a/src/mongo/db/pipeline/document_source_plan_cache_stats.h
+++ b/src/mongo/db/pipeline/document_source_plan_cache_stats.h
@@ -65,8 +65,8 @@ public:
return false;
}
- void assertSupportsReadConcern(const repl::ReadConcernArgs& readConcern) const {
- onlyReadConcernLocalSupported(DocumentSourcePlanCacheStats::kStageName, readConcern);
+ ReadConcernSupportResult supportsReadConcern(repl::ReadConcernLevel level) const {
+ return onlyReadConcernLocalSupported(kStageName, level);
}
void assertSupportsMultiDocumentTransaction() const {
diff --git a/src/mongo/db/pipeline/lite_parsed_document_source.h b/src/mongo/db/pipeline/lite_parsed_document_source.h
index 064e22389f3..82d9ce1b7f7 100644
--- a/src/mongo/db/pipeline/lite_parsed_document_source.h
+++ b/src/mongo/db/pipeline/lite_parsed_document_source.h
@@ -37,6 +37,7 @@
#include "mongo/db/auth/privilege.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/pipeline/aggregation_request.h"
+#include "mongo/db/read_concern_support_result.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/stdx/unordered_set.h"
@@ -130,10 +131,11 @@ public:
}
/**
- * Verifies that this stage is allowed to run with the specified read concern. Throws a
- * UserException if not compatible.
+ * Verifies that this stage is allowed to run with the specified read concern level.
*/
- virtual void assertSupportsReadConcern(const repl::ReadConcernArgs& readConcern) const {}
+ virtual ReadConcernSupportResult supportsReadConcern(repl::ReadConcernLevel level) const {
+ return ReadConcernSupportResult::allSupportedAndDefaultPermitted();
+ }
/**
* Verifies that this stage is allowed to run in a multi-document transaction. Throws a
@@ -150,14 +152,26 @@ protected:
<< "multi-document transaction.");
}
- void onlyReadConcernLocalSupported(StringData stageName,
- const repl::ReadConcernArgs& readConcern) const {
- uassert(ErrorCodes::InvalidOptions,
- str::stream()
- << "Aggregation stage " << stageName
- << " cannot run with a readConcern other than 'local'. Current readConcern: "
- << readConcern.toString(),
- readConcern.getLevel() == repl::ReadConcernLevel::kLocalReadConcern);
+ ReadConcernSupportResult onlySingleReadConcernSupported(
+ StringData stageName,
+ repl::ReadConcernLevel supportedLevel,
+ repl::ReadConcernLevel candidateLevel) const {
+ return {{candidateLevel != supportedLevel,
+ {ErrorCodes::InvalidOptions,
+ str::stream() << "Aggregation stage " << stageName
+ << " cannot run with a readConcern other than '"
+ << repl::readConcernLevels::toString(supportedLevel)
+ << "'. Current readConcern: "
+ << repl::readConcernLevels::toString(candidateLevel)}},
+ {{ErrorCodes::InvalidOptions,
+ str::stream() << "Aggregation stage " << stageName
+ << " does not permit default readConcern to be applied."}}};
+ }
+
+ ReadConcernSupportResult onlyReadConcernLocalSupported(StringData stageName,
+ repl::ReadConcernLevel level) const {
+ return onlySingleReadConcernSupported(
+ stageName, repl::ReadConcernLevel::kLocalReadConcern, level);
}
};
diff --git a/src/mongo/db/pipeline/lite_parsed_pipeline.cpp b/src/mongo/db/pipeline/lite_parsed_pipeline.cpp
index 6af52389e6a..954239fa928 100644
--- a/src/mongo/db/pipeline/lite_parsed_pipeline.cpp
+++ b/src/mongo/db/pipeline/lite_parsed_pipeline.cpp
@@ -35,30 +35,50 @@
namespace mongo {
-void LiteParsedPipeline::assertSupportsReadConcern(
- OperationContext* opCtx,
+ReadConcernSupportResult LiteParsedPipeline::supportsReadConcern(
+ repl::ReadConcernLevel level,
boost::optional<ExplainOptions::Verbosity> explain,
bool enableMajorityReadConcern) const {
- auto readConcern = repl::ReadConcernArgs::get(opCtx);
+ // Start by assuming that we will support both readConcern and cluster-wide default.
+ ReadConcernSupportResult result = ReadConcernSupportResult::allSupportedAndDefaultPermitted();
- // Reject non change stream aggregation queries that try to use "majority" read concern when
- // enableMajorityReadConcern=false.
+ // 1. Determine whether the given read concern must be rejected for any pipeline-global reasons.
if (!hasChangeStream() && !enableMajorityReadConcern &&
- (repl::ReadConcernArgs::get(opCtx).getLevel() ==
- repl::ReadConcernLevel::kMajorityReadConcern)) {
- uasserted(ErrorCodes::ReadConcernMajorityNotEnabled,
- "Only change stream aggregation queries support 'majority' read concern when "
- "enableMajorityReadConcern=false");
+ (level == repl::ReadConcernLevel::kMajorityReadConcern)) {
+ // Reject non change stream aggregation queries that try to use "majority" read concern when
+ // enableMajorityReadConcern=false.
+ result.readConcernSupport = {
+ ErrorCodes::ReadConcernMajorityNotEnabled,
+ "Only change stream aggregation queries support 'majority' read concern when "
+ "enableMajorityReadConcern=false"};
+ } else if (explain && level != repl::ReadConcernLevel::kLocalReadConcern) {
+ // Reject non-local read concern when the pipeline is being explained.
+ result.readConcernSupport = {
+ ErrorCodes::InvalidOptions,
+ str::stream() << "Explain for the aggregate command cannot run with a readConcern "
+ << "other than 'local'. Current readConcern level: "
+ << repl::readConcernLevels::toString(level)};
}
- uassert(ErrorCodes::InvalidOptions,
- str::stream() << "Explain for the aggregate command cannot run with a readConcern "
- << "other than 'local'. Current readConcern: " << readConcern.toString(),
- !explain || readConcern.getLevel() == repl::ReadConcernLevel::kLocalReadConcern);
+ // 2. Determine whether the default read concern must be denied for any pipeline-global reasons.
+ if (explain) {
+ result.defaultReadConcernPermit = {
+ ErrorCodes::InvalidOptions,
+ "Explain for the aggregate command does not permit default readConcern to be "
+ "applied."};
+ }
+ // 3. If either the specified or default readConcern have not already been rejected, determine
+ // whether the pipeline stages support them. If not, we record the first error we encounter.
for (auto&& spec : _stageSpecs) {
- spec->assertSupportsReadConcern(readConcern);
+ // If both result statuses are already not OK, stop checking further stages.
+ if (!result.readConcernSupport.isOK() && !result.defaultReadConcernPermit.isOK()) {
+ break;
+ }
+ result.merge(spec->supportsReadConcern(level));
}
+
+ return result;
}
void LiteParsedPipeline::assertSupportsMultiDocumentTransaction(
@@ -82,8 +102,6 @@ void LiteParsedPipeline::verifyIsSupported(
if (opCtx->inMultiDocumentTransaction()) {
assertSupportsMultiDocumentTransaction(explain);
}
- // Verify litePipe can be run at the given read concern.
- assertSupportsReadConcern(opCtx, explain, enableMajorityReadConcern);
// Verify that no involved namespace is sharded unless allowed by the pipeline.
for (const auto& nss : getInvolvedNamespaces()) {
uassert(28769,
diff --git a/src/mongo/db/pipeline/lite_parsed_pipeline.h b/src/mongo/db/pipeline/lite_parsed_pipeline.h
index 21e309dbaba..91404d25cbf 100644
--- a/src/mongo/db/pipeline/lite_parsed_pipeline.h
+++ b/src/mongo/db/pipeline/lite_parsed_pipeline.h
@@ -37,6 +37,7 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/pipeline/aggregation_request.h"
#include "mongo/db/pipeline/lite_parsed_document_source.h"
+#include "mongo/db/read_concern_support_result.h"
namespace mongo {
@@ -122,12 +123,11 @@ public:
}
/**
- * Verifies that this pipeline is allowed to run with the specified read concern. This ensures
- * that each stage is compatible, and throws a UserException if not.
+ * Verifies that this pipeline is allowed to run with the specified read concern level.
*/
- void assertSupportsReadConcern(OperationContext* opCtx,
- boost::optional<ExplainOptions::Verbosity> explain,
- bool enableMajorityReadConcern) const;
+ ReadConcernSupportResult supportsReadConcern(repl::ReadConcernLevel level,
+ boost::optional<ExplainOptions::Verbosity> explain,
+ bool enableMajorityReadConcern) const;
/**
* Verifies that this pipeline is allowed to run in a multi-document transaction. This ensures
diff --git a/src/mongo/db/read_concern_support_result.h b/src/mongo/db/read_concern_support_result.h
new file mode 100644
index 00000000000..9bca706a15b
--- /dev/null
+++ b/src/mongo/db/read_concern_support_result.h
@@ -0,0 +1,86 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/base/status.h"
+
+namespace mongo {
+
+/**
+ * The result of checking a thing's readConcern support. There are two parts:
+ * - Whether or not the thing supports the given readConcern.
+ * - Whether or not the thing permits having the default readConcern applied.
+ *
+ * The thing is a command invocation, an aggregation pipeline, or an aggregation stage.
+ */
+struct ReadConcernSupportResult {
+ /**
+ * Convenience method to explicitly return a ReadConcernSupportResult which both supports the
+ * given read concern and permits the default cluster-wide read concern to be applied.
+ */
+ static ReadConcernSupportResult allSupportedAndDefaultPermitted() {
+ return {Status::OK(), Status::OK()};
+ }
+
+ /**
+ * Whether this thing supports the requested readConcern level (and if not, why not).
+ */
+ Status readConcernSupport;
+
+ /**
+ * Whether this thing permits the default readConcern to be applied (and if not, why not).
+ */
+ Status defaultReadConcernPermit;
+
+ /**
+ * Construct with the given Statuses, or default to Status::OK if omitted.
+ */
+ ReadConcernSupportResult(boost::optional<Status> readConcernStatus,
+ boost::optional<Status> defaultReadConcernStatus)
+ : readConcernSupport(readConcernStatus.value_or(Status::OK())),
+ defaultReadConcernPermit(defaultReadConcernStatus.value_or(Status::OK())) {}
+
+ /**
+ * Combine the contents of another ReadConcernSupportResult with this one. The outcome is that,
+ * for each of supported and permitted, this ReadConcernSupportResult will be non-OK if either
+ * it or the other one is non-OK. If both are non-OK then preference is given to this one's
+ * Status.
+ */
+ void merge(const ReadConcernSupportResult& other) {
+ if (readConcernSupport.isOK()) {
+ readConcernSupport = other.readConcernSupport;
+ }
+ if (defaultReadConcernPermit.isOK()) {
+ defaultReadConcernPermit = other.defaultReadConcernPermit;
+ }
+ }
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index 4c7f25518c3..11d5ee56538 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -239,8 +239,7 @@ StatusWith<repl::ReadConcernArgs> _extractReadConcern(OperationContext* opCtx,
}
auto readConcernSupport = invocation->supportsReadConcern(readConcernArgs.getLevel());
- if (readConcernSupport.defaultReadConcern ==
- ReadConcernSupportResult::DefaultReadConcern::kPermitted &&
+ if (readConcernSupport.defaultReadConcernPermit.isOK() &&
!opCtx->getClient()->isInDirectClient()) {
if (serverGlobalParams.clusterRole == ClusterRole::ShardServer ||
serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
@@ -270,33 +269,45 @@ StatusWith<repl::ReadConcernArgs> _extractReadConcern(OperationContext* opCtx,
}
}
- auto readConcernLevel = readConcernArgs.getLevel();
// Update the readConcernSupport, in case the default RC was applied.
- readConcernSupport = invocation->supportsReadConcern(readConcernLevel);
- if (startTransaction && readConcernLevel != repl::ReadConcernLevel::kSnapshotReadConcern &&
- readConcernLevel != repl::ReadConcernLevel::kMajorityReadConcern &&
- readConcernLevel != repl::ReadConcernLevel::kLocalReadConcern) {
- return Status(
- ErrorCodes::InvalidOptions,
- "The readConcern level must be either 'local' (default), 'majority' or 'snapshot' in "
- "order to run in a transaction");
- }
-
- if (startTransaction && readConcernArgs.getArgsOpTime()) {
- return Status(ErrorCodes::InvalidOptions,
- str::stream()
- << "The readConcern cannot specify '"
- << repl::ReadConcernArgs::kAfterOpTimeFieldName << "' in a transaction");
+ readConcernSupport = invocation->supportsReadConcern(readConcernArgs.getLevel());
+
+ // If we are starting a transaction, we only need to check whether the read concern is
+ // appropriate for running a transaction. There is no need to check whether the specific
+ // command supports the read concern, because all commands that are allowed to run in a
+ // transaction must support all applicable read concerns.
+ if (startTransaction) {
+ switch (readConcernArgs.getLevel()) {
+ case repl::ReadConcernLevel::kLocalReadConcern:
+ case repl::ReadConcernLevel::kMajorityReadConcern:
+ case repl::ReadConcernLevel::kSnapshotReadConcern:
+ // Acceptable readConcern for a transaction.
+ break;
+ default:
+ return {ErrorCodes::InvalidOptions,
+ "The readConcern level must be either 'local' (default), 'majority' or "
+ "'snapshot' in "
+ "order to run in a transaction"};
+ }
+ if (readConcernArgs.getArgsOpTime()) {
+ return {ErrorCodes::InvalidOptions,
+ str::stream() << "The readConcern cannot specify '"
+ << repl::ReadConcernArgs::kAfterOpTimeFieldName
+ << "' in a transaction"};
+ }
}
- // There is no need to check if the command supports the read concern while in a transaction
- // because all commands that are allowed to run in a transaction must support all the read
- // concerns that can be used with a transaction.
- if (!startTransaction &&
- readConcernSupport.readConcern == ReadConcernSupportResult::ReadConcern::kNotSupported) {
- return {ErrorCodes::InvalidOptions,
- str::stream() << "Command does not support read concern "
- << readConcernArgs.toString()};
+ // Otherwise, if there is a read concern present - either user-specified or from the default -
+ // then check whether the command supports it. If there is no explicit read concern level, then
+ // it is implicitly "local". There is no need to check whether this is supported, because all
+ // commands either support "local" or upconvert the absent readConcern to a stronger level that
+ // they do support; for instance, $changeStream upconverts to RC level "majority".
+ if (!startTransaction && readConcernArgs.hasLevel()) {
+ if (!readConcernSupport.readConcernSupport.isOK()) {
+ return readConcernSupport.readConcernSupport.withContext(
+ str::stream() << "Command " << invocation->definition()->getName()
+ << " does not support " << readConcernArgs.toString());
+ }
}
// If this command invocation asked for 'majority' read concern, supports blocking majority
diff --git a/src/mongo/s/commands/cluster_count_cmd.cpp b/src/mongo/s/commands/cluster_count_cmd.cpp
index 67e486f9b3a..a3dd0039dfb 100644
--- a/src/mongo/s/commands/cluster_count_cmd.cpp
+++ b/src/mongo/s/commands/cluster_count_cmd.cpp
@@ -65,6 +65,12 @@ public:
return false;
}
+ ReadConcernSupportResult supportsReadConcern(const std::string& dbName,
+ const BSONObj& cmdObj,
+ repl::ReadConcernLevel level) const override {
+ return ReadConcernSupportResult::allSupportedAndDefaultPermitted();
+ }
+
void addRequiredPrivileges(const std::string& dbname,
const BSONObj& cmdObj,
std::vector<Privilege>* out) const override {
diff --git a/src/mongo/s/commands/cluster_current_op.cpp b/src/mongo/s/commands/cluster_current_op.cpp
index 5dc8f632f60..772f4c8ea0f 100644
--- a/src/mongo/s/commands/cluster_current_op.cpp
+++ b/src/mongo/s/commands/cluster_current_op.cpp
@@ -80,6 +80,7 @@ private:
opCtx,
ClusterAggregate::Namespaces{nss, nss},
request,
+ {request},
{Privilege(ResourcePattern::forClusterResource(), ActionType::inprog)},
&responseBuilder);
diff --git a/src/mongo/s/commands/cluster_distinct_cmd.cpp b/src/mongo/s/commands/cluster_distinct_cmd.cpp
index 0009fc6f89a..f0493f9fe98 100644
--- a/src/mongo/s/commands/cluster_distinct_cmd.cpp
+++ b/src/mongo/s/commands/cluster_distinct_cmd.cpp
@@ -75,8 +75,7 @@ public:
ReadConcernSupportResult supportsReadConcern(const std::string& dbName,
const BSONObj& cmdObj,
repl::ReadConcernLevel level) const final {
- return {ReadConcernSupportResult::ReadConcern::kSupported,
- ReadConcernSupportResult::DefaultReadConcern::kPermitted};
+ return ReadConcernSupportResult::allSupportedAndDefaultPermitted();
}
void addRequiredPrivileges(const std::string& dbname,
diff --git a/src/mongo/s/commands/cluster_explain_cmd.cpp b/src/mongo/s/commands/cluster_explain_cmd.cpp
index 3736947d113..3e2b3e16b38 100644
--- a/src/mongo/s/commands/cluster_explain_cmd.cpp
+++ b/src/mongo/s/commands/cluster_explain_cmd.cpp
@@ -189,7 +189,7 @@ std::unique_ptr<CommandInvocation> ClusterExplainCmd::parse(OperationContext* op
str::stream() << "Explain failed due to unknown command: " << cmdName,
explainedCommand);
auto innerRequest = std::make_unique<OpMsgRequest>(OpMsg{explainedObj});
- auto innerInvocation = explainedCommand->parse(opCtx, *innerRequest);
+ auto innerInvocation = explainedCommand->parseForExplain(opCtx, *innerRequest, verbosity);
return std::make_unique<Invocation>(
this, request, std::move(verbosity), std::move(innerRequest), std::move(innerInvocation));
}
diff --git a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
index 7169f2be79a..84133e9c5b8 100644
--- a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
+++ b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
@@ -156,8 +156,7 @@ public:
ReadConcernSupportResult supportsReadConcern(const std::string& dbName,
const BSONObj& cmdObj,
repl::ReadConcernLevel level) const final {
- return {ReadConcernSupportResult::ReadConcern::kSupported,
- ReadConcernSupportResult::DefaultReadConcern::kPermitted};
+ return ReadConcernSupportResult::allSupportedAndDefaultPermitted();
}
void addRequiredPrivileges(const std::string& dbname,
diff --git a/src/mongo/s/commands/cluster_find_cmd.cpp b/src/mongo/s/commands/cluster_find_cmd.cpp
index 36d15ca6ce8..d6a14cd7428 100644
--- a/src/mongo/s/commands/cluster_find_cmd.cpp
+++ b/src/mongo/s/commands/cluster_find_cmd.cpp
@@ -113,8 +113,7 @@ public:
}
ReadConcernSupportResult supportsReadConcern(repl::ReadConcernLevel level) const final {
- return {ReadConcernSupportResult::ReadConcern::kSupported,
- ReadConcernSupportResult::DefaultReadConcern::kPermitted};
+ return ReadConcernSupportResult::allSupportedAndDefaultPermitted();
}
NamespaceString ns() const override {
diff --git a/src/mongo/s/commands/cluster_killcursors_cmd.cpp b/src/mongo/s/commands/cluster_killcursors_cmd.cpp
index a90a24d3f53..2408f00fb0b 100644
--- a/src/mongo/s/commands/cluster_killcursors_cmd.cpp
+++ b/src/mongo/s/commands/cluster_killcursors_cmd.cpp
@@ -46,8 +46,7 @@ public:
const BSONObj& cmdObj,
repl::ReadConcernLevel level) const final {
// killCursors must support read concerns in order to be run in transactions.
- return {ReadConcernSupportResult::ReadConcern::kSupported,
- ReadConcernSupportResult::DefaultReadConcern::kPermitted};
+ return ReadConcernSupportResult::allSupportedAndDefaultPermitted();
}
bool run(OperationContext* opCtx,
diff --git a/src/mongo/s/commands/cluster_pipeline_cmd.cpp b/src/mongo/s/commands/cluster_pipeline_cmd.cpp
index 02a70417d72..3ca37e19909 100644
--- a/src/mongo/s/commands/cluster_pipeline_cmd.cpp
+++ b/src/mongo/s/commands/cluster_pipeline_cmd.cpp
@@ -34,6 +34,7 @@
#include "mongo/base/status.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/commands.h"
+#include "mongo/db/pipeline/lite_parsed_pipeline.h"
#include "mongo/db/views/resolved_view.h"
#include "mongo/s/query/cluster_aggregate.h"
@@ -44,25 +45,52 @@ class ClusterPipelineCommand final : public Command {
public:
ClusterPipelineCommand() : Command("aggregate") {}
+ /**
+ * It's not known until after parsing whether or not an aggregation command is an explain
+ * request, because it might include the `explain: true` field (ie. aggregation explains do not
+ * need to arrive via the `explain` command). Therefore even parsing of regular aggregation
+ * commands needs to be able to handle the explain case.
+ *
+ * As a result, aggregation command parsing is done in parseForExplain():
+ *
+ * - To parse a regular aggregation command, call parseForExplain() with `explainVerbosity` of
+ * boost::none.
+ *
+ * - To parse an aggregation command as the sub-command in an `explain` command, call
+ * parseForExplain() with `explainVerbosity` set to the desired verbosity.
+ */
std::unique_ptr<CommandInvocation> parse(OperationContext* opCtx,
const OpMsgRequest& opMsgRequest) override {
- auto privileges =
- uassertStatusOK(AuthorizationSession::get(opCtx->getClient())
- ->getPrivilegesForAggregate(
- AggregationRequest::parseNs(
- opMsgRequest.getDatabase().toString(), opMsgRequest.body),
- opMsgRequest.body,
- true));
- // TODO: Parsing to a Pipeline and/or AggregationRequest here.
- return std::make_unique<Invocation>(this, opMsgRequest, std::move(privileges));
+ return parseForExplain(opCtx, opMsgRequest, boost::none);
+ }
+
+ std::unique_ptr<CommandInvocation> parseForExplain(
+ OperationContext* opCtx,
+ const OpMsgRequest& opMsgRequest,
+ boost::optional<ExplainOptions::Verbosity> explainVerbosity) override {
+ const auto aggregationRequest = uassertStatusOK(AggregationRequest::parseFromBSON(
+ opMsgRequest.getDatabase().toString(), opMsgRequest.body, explainVerbosity));
+
+ auto privileges = uassertStatusOK(
+ AuthorizationSession::get(opCtx->getClient())
+ ->getPrivilegesForAggregate(
+ aggregationRequest.getNamespaceString(), opMsgRequest.body, true));
+
+ return std::make_unique<Invocation>(
+ this, opMsgRequest, std::move(aggregationRequest), std::move(privileges));
}
class Invocation final : public CommandInvocation {
public:
- Invocation(Command* cmd, const OpMsgRequest& request, PrivilegeVector privileges)
+ Invocation(Command* cmd,
+ const OpMsgRequest& request,
+ const AggregationRequest aggregationRequest,
+ PrivilegeVector privileges)
: CommandInvocation(cmd),
_request(request),
_dbName(request.getDatabase().toString()),
+ _aggregationRequest(std::move(aggregationRequest)),
+ _liteParsedPipeline(LiteParsedPipeline(_aggregationRequest)),
_privileges(std::move(privileges)) {}
private:
@@ -71,31 +99,31 @@ public:
}
ReadConcernSupportResult supportsReadConcern(repl::ReadConcernLevel level) const override {
- return {ReadConcernSupportResult::ReadConcern::kSupported,
- ReadConcernSupportResult::DefaultReadConcern::kPermitted};
+ return _liteParsedPipeline.supportsReadConcern(
+ level,
+ _aggregationRequest.getExplain(),
+ serverGlobalParams.enableMajorityReadConcern);
}
void _runAggCommand(OperationContext* opCtx,
const std::string& dbname,
const BSONObj& cmdObj,
- boost::optional<ExplainOptions::Verbosity> verbosity,
BSONObjBuilder* result) {
- const auto aggregationRequest =
- uassertStatusOK(AggregationRequest::parseFromBSON(dbname, cmdObj, verbosity));
- const auto& nss = aggregationRequest.getNamespaceString();
+ const auto& nss = _aggregationRequest.getNamespaceString();
try {
uassertStatusOK(
ClusterAggregate::runAggregate(opCtx,
ClusterAggregate::Namespaces{nss, nss},
- aggregationRequest,
+ _aggregationRequest,
+ _liteParsedPipeline,
_privileges,
result));
} catch (const ExceptionFor<ErrorCodes::CommandOnShardedViewNotSupportedOnMongod>& ex) {
// If the aggregation failed because the namespace is a view, re-run the command
// with the resolved view pipeline and namespace.
uassertStatusOK(ClusterAggregate::retryOnViewError(opCtx,
- aggregationRequest,
+ _aggregationRequest,
*ex.extraInfo<ResolvedView>(),
nss,
_privileges,
@@ -108,14 +136,14 @@ public:
opCtx, !Pipeline::aggHasWriteStage(_request.body));
auto bob = reply->getBodyBuilder();
- _runAggCommand(opCtx, _dbName, _request.body, boost::none, &bob);
+ _runAggCommand(opCtx, _dbName, _request.body, &bob);
}
void explain(OperationContext* opCtx,
ExplainOptions::Verbosity verbosity,
rpc::ReplyBuilderInterface* result) override {
auto bodyBuilder = result->getBodyBuilder();
- _runAggCommand(opCtx, _dbName, _request.body, verbosity, &bodyBuilder);
+ _runAggCommand(opCtx, _dbName, _request.body, &bodyBuilder);
}
void doCheckAuthorization(OperationContext* opCtx) const override {
@@ -126,11 +154,13 @@ public:
}
NamespaceString ns() const override {
- return AggregationRequest::parseNs(_dbName, _request.body);
+ return _aggregationRequest.getNamespaceString();
}
const OpMsgRequest& _request;
const std::string _dbName;
+ const AggregationRequest _aggregationRequest;
+ const LiteParsedPipeline _liteParsedPipeline;
const PrivilegeVector _privileges;
};
diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp
index 5732d1243d3..c018f4eaf56 100644
--- a/src/mongo/s/commands/cluster_write_cmd.cpp
+++ b/src/mongo/s/commands/cluster_write_cmd.cpp
@@ -549,8 +549,7 @@ private:
}
ReadConcernSupportResult supportsReadConcern(repl::ReadConcernLevel level) const final {
- return {ReadConcernSupportResult::ReadConcern::kSupported,
- ReadConcernSupportResult::DefaultReadConcern::kPermitted};
+ return ReadConcernSupportResult::allSupportedAndDefaultPermitted();
}
void doCheckAuthorization(OperationContext* opCtx) const final {
diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp
index 2f734c11654..b0a591cadc5 100644
--- a/src/mongo/s/commands/strategy.cpp
+++ b/src/mongo/s/commands/strategy.cpp
@@ -394,9 +394,12 @@ void runCommand(OperationContext* opCtx,
!readConcernArgs.getArgsAtClusterTime());
}
+ auto readConcernSupport = invocation->supportsReadConcern(readConcernArgs.getLevel());
+
boost::optional<RouterOperationContextSession> routerSession;
try {
CommandHelpers::evaluateFailCommandFailPoint(opCtx, commandName, invocation->ns());
+ bool startTransaction = false;
if (osi.getAutocommit()) {
routerSession.emplace(opCtx);
@@ -419,6 +422,7 @@ void runCommand(OperationContext* opCtx,
return TransactionRouter::TransactionActions::kContinue;
})();
+ startTransaction = (transactionAction == TransactionRouter::TransactionActions::kStart);
txnRouter.beginOrContinueTxn(opCtx, *txnNumber, transactionAction);
}
@@ -455,6 +459,55 @@ void runCommand(OperationContext* opCtx,
opCtx->setWriteConcern(wc);
}
+ // If we are starting a transaction, we only need to check whether the read concern is
+ // appropriate for running a transaction. There is no need to check whether the specific
+ // command supports the read concern, because all commands that are allowed to run in a
+ // transaction must support all applicable read concerns.
+ if (startTransaction) {
+ switch (readConcernArgs.getLevel()) {
+ case repl::ReadConcernLevel::kLocalReadConcern:
+ case repl::ReadConcernLevel::kMajorityReadConcern:
+ case repl::ReadConcernLevel::kSnapshotReadConcern:
+ // Acceptable readConcern for a transaction.
+ break;
+ default:
+ auto responseBuilder = replyBuilder->getBodyBuilder();
+ CommandHelpers::appendCommandStatusNoThrow(
+ responseBuilder,
+ {ErrorCodes::InvalidOptions,
+ "The readConcern level must be either 'local' (default), 'majority' or "
+ "'snapshot' in order to run in a transaction"});
+ return;
+ }
+ if (readConcernArgs.getArgsOpTime()) {
+ auto responseBuilder = replyBuilder->getBodyBuilder();
+ CommandHelpers::appendCommandStatusNoThrow(
+ responseBuilder,
+ {ErrorCodes::InvalidOptions,
+ str::stream()
+ << "The readConcern cannot specify '"
+ << repl::ReadConcernArgs::kAfterOpTimeFieldName << "' in a transaction"});
+ return;
+ }
+ }
+
+ // Otherwise, if there is a read concern present - either user-specified or the default -
+ // then check whether the command supports it. If there is no explicit read concern level,
+ // then it is implicitly "local". There is no need to check whether this is supported,
+ // because all commands either support "local" or upconvert the absent readConcern to a
+ // stronger level that they do support; e.g. $changeStream upconverts to RC "majority".
+ if (!startTransaction && readConcernArgs.hasLevel()) {
+ if (!readConcernSupport.readConcernSupport.isOK()) {
+ auto responseBuilder = replyBuilder->getBodyBuilder();
+ CommandHelpers::appendCommandStatusNoThrow(
+ responseBuilder,
+ readConcernSupport.readConcernSupport.withContext(
+ str::stream() << "Command " << invocation->definition()->getName()
+ << " does not support " << readConcernArgs.toString()));
+ return;
+ }
+ }
+
for (int tries = 0;; ++tries) {
// Try kMaxNumStaleVersionRetries times. On the last try, exceptions are rethrown.
bool canRetry = tries < kMaxNumStaleVersionRetries - 1;
diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp
index 7ee03280a30..ad307c3bfe5 100644
--- a/src/mongo/s/query/cluster_aggregate.cpp
+++ b/src/mongo/s/query/cluster_aggregate.cpp
@@ -145,6 +145,15 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
const AggregationRequest& request,
const PrivilegeVector& privileges,
BSONObjBuilder* result) {
+ return runAggregate(opCtx, namespaces, request, {request}, privileges, result);
+}
+
+Status ClusterAggregate::runAggregate(OperationContext* opCtx,
+ const Namespaces& namespaces,
+ const AggregationRequest& request,
+ const LiteParsedPipeline& liteParsedPipeline,
+ const PrivilegeVector& privileges,
+ BSONObjBuilder* result) {
uassert(51028, "Cannot specify exchange option to a mongos", !request.getExchangeSpec());
uassert(51143,
"Cannot specify runtime constants option to a mongos",
@@ -161,11 +170,10 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
return resolvedNsRoutingInfo.cm().get();
};
- LiteParsedPipeline litePipe(request);
- litePipe.verifyIsSupported(
+ liteParsedPipeline.verifyIsSupported(
opCtx, isSharded, request.getExplain(), serverGlobalParams.enableMajorityReadConcern);
- auto hasChangeStream = litePipe.hasChangeStream();
- auto involvedNamespaces = litePipe.getInvolvedNamespaces();
+ auto hasChangeStream = liteParsedPipeline.hasChangeStream();
+ auto involvedNamespaces = liteParsedPipeline.getInvolvedNamespaces();
// If the routing table is valid, we obtain a reference to it. If the table is not valid, then
// either the database does not exist, or there are no shards in the cluster. In the latter
@@ -214,14 +222,14 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
return pipeline;
};
- auto targeter =
- sharded_agg_helpers::AggregationTargeter::make(opCtx,
- namespaces.executionNss,
- pipelineBuilder,
- routingInfo,
- involvedNamespaces,
- hasChangeStream,
- litePipe.allowedToPassthroughFromMongos());
+ auto targeter = sharded_agg_helpers::AggregationTargeter::make(
+ opCtx,
+ namespaces.executionNss,
+ pipelineBuilder,
+ routingInfo,
+ involvedNamespaces,
+ hasChangeStream,
+ liteParsedPipeline.allowedToPassthroughFromMongos());
if (!expCtx) {
// When the AggregationTargeter chooses a "passthrough" policy, it does not call the
@@ -308,8 +316,8 @@ Status ClusterAggregate::retryOnViewError(OperationContext* opCtx,
nsStruct.requestedNss = requestedNss;
nsStruct.executionNss = resolvedView.getNamespace();
- auto status =
- ClusterAggregate::runAggregate(opCtx, nsStruct, resolvedAggRequest, privileges, result);
+ auto status = ClusterAggregate::runAggregate(
+ opCtx, nsStruct, resolvedAggRequest, {resolvedAggRequest}, privileges, result);
// If the underlying namespace was changed to a view during retry, then re-run the aggregation
// on the new resolved namespace.
diff --git a/src/mongo/s/query/cluster_aggregate.h b/src/mongo/s/query/cluster_aggregate.h
index 9d3c0a90eba..4fb474d2f8f 100644
--- a/src/mongo/s/query/cluster_aggregate.h
+++ b/src/mongo/s/query/cluster_aggregate.h
@@ -37,6 +37,7 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/pipeline/aggregation_request.h"
#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/lite_parsed_pipeline.h"
#include "mongo/s/async_requests_sender.h"
#include "mongo/s/catalog_cache.h"
#include "mongo/s/commands/strategy.h"
@@ -45,7 +46,6 @@
namespace mongo {
-class LiteParsedPipeline;
class OperationContext;
class ShardId;
@@ -85,6 +85,16 @@ public:
static Status runAggregate(OperationContext* opCtx,
const Namespaces& namespaces,
const AggregationRequest& request,
+ const LiteParsedPipeline& liteParsedPipeline,
+ const PrivilegeVector& privileges,
+ BSONObjBuilder* result);
+
+ /**
+ * Convenience version that internally constructs the LiteParsedPipeline.
+ */
+ static Status runAggregate(OperationContext* opCtx,
+ const Namespaces& namespaces,
+ const AggregationRequest& request,
const PrivilegeVector& privileges,
BSONObjBuilder* result);
diff --git a/src/mongo/s/query/cluster_aggregate_test.cpp b/src/mongo/s/query/cluster_aggregate_test.cpp
index 0b27b29042f..d48ba9f3d41 100644
--- a/src/mongo/s/query/cluster_aggregate_test.cpp
+++ b/src/mongo/s/query/cluster_aggregate_test.cpp
@@ -103,6 +103,7 @@ protected:
return ClusterAggregate::runAggregate(opCtx.get(),
ClusterAggregate::Namespaces{nss, nss},
request.getValue(),
+ {request.getValue()},
PrivilegeVector(),
&result);
}