summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorKevin Pulo <kevin.pulo@mongodb.com>2019-11-26 03:23:07 +0000
committerevergreen <evergreen@mongodb.com>2019-11-26 03:23:07 +0000
commit8b0f534a706005d366e200ee56af5c76217656b2 (patch)
tree4ff0d0d1ea68d4f1a17ce39c4ab8087cfa617434 /src/mongo/db
parent34e093782f53dec39ff89116c0c7128430c99bae (diff)
downloadmongo-8b0f534a706005d366e200ee56af5c76217656b2.tar.gz
SERVER-44470 Parse aggregation commands earlier, and rationalize aggregation readConcern handling
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/commands.h58
-rw-r--r--src/mongo/db/commands/count_cmd.cpp3
-rw-r--r--src/mongo/db/commands/distinct.cpp3
-rw-r--r--src/mongo/db/commands/explain_cmd.cpp2
-rw-r--r--src/mongo/db/commands/find_cmd.cpp3
-rw-r--r--src/mongo/db/commands/haystack.cpp3
-rw-r--r--src/mongo/db/commands/pipeline_command.cpp88
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp16
-rw-r--r--src/mongo/db/commands/run_aggregate.h12
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h16
-rw-r--r--src/mongo/db/pipeline/document_source_current_op.h4
-rw-r--r--src/mongo/db/pipeline/document_source_list_cached_and_active_users.h4
-rw-r--r--src/mongo/db/pipeline/document_source_list_local_sessions.h4
-rw-r--r--src/mongo/db/pipeline/document_source_plan_cache_stats.h4
-rw-r--r--src/mongo/db/pipeline/lite_parsed_document_source.h36
-rw-r--r--src/mongo/db/pipeline/lite_parsed_pipeline.cpp52
-rw-r--r--src/mongo/db/pipeline/lite_parsed_pipeline.h10
-rw-r--r--src/mongo/db/read_concern_support_result.h86
-rw-r--r--src/mongo/db/service_entry_point_common.cpp63
19 files changed, 295 insertions, 172 deletions
diff --git a/src/mongo/db/commands.h b/src/mongo/db/commands.h
index ea62ac51fc0..2350a040b6d 100644
--- a/src/mongo/db/commands.h
+++ b/src/mongo/db/commands.h
@@ -45,6 +45,7 @@
#include "mongo/db/commands/test_commands_enabled.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/query/explain.h"
+#include "mongo/db/read_concern_support_result.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/db/write_concern.h"
#include "mongo/rpc/op_msg.h"
@@ -269,6 +270,13 @@ public:
virtual std::unique_ptr<CommandInvocation> parse(OperationContext* opCtx,
const OpMsgRequest& request) = 0;
+ virtual std::unique_ptr<CommandInvocation> parseForExplain(
+ OperationContext* opCtx,
+ const OpMsgRequest& request,
+ boost::optional<ExplainOptions::Verbosity> explainVerbosity) {
+ return parse(opCtx, request);
+ }
+
/**
* Returns the command's name. This value never changes for the lifetime of the command.
*/
@@ -433,46 +441,6 @@ private:
};
/**
- * The result of checking an invocation's support for readConcern. There are two parts:
- * - Whether or not the invocation supports the given readConcern.
- * - Whether or not the invocation permits having the default readConcern applied to it.
- */
-struct ReadConcernSupportResult {
- /**
- * Whether this command invocation supports the requested readConcern level. This only
- * applies when running outside transactions because all commands that are allowed to run
- * in a transaction must support all the read concerns that can be used in a transaction.
- */
- enum class ReadConcern { kSupported, kNotSupported } readConcern;
-
- /**
- * Whether this command invocation supports applying the default readConcern to it.
- */
- enum class DefaultReadConcern { kPermitted, kNotPermitted } defaultReadConcern;
-
- /**
- * Construct with either the enum value or a bool, where true indicates
- * ReadConcern::kSupported or DefaultReadConcern::kPermitted (as appropriate).
- */
- ReadConcernSupportResult(ReadConcern supported, DefaultReadConcern defaultPermitted)
- : readConcern(supported), defaultReadConcern(defaultPermitted) {}
-
- ReadConcernSupportResult(bool supported, DefaultReadConcern defaultPermitted)
- : readConcern(supported ? ReadConcern::kSupported : ReadConcern::kNotSupported),
- defaultReadConcern(defaultPermitted) {}
-
- ReadConcernSupportResult(ReadConcern supported, bool defaultPermitted)
- : readConcern(supported),
- defaultReadConcern(defaultPermitted ? DefaultReadConcern::kPermitted
- : DefaultReadConcern::kNotPermitted) {}
-
- ReadConcernSupportResult(bool supported, bool defaultPermitted)
- : readConcern(supported ? ReadConcern::kSupported : ReadConcern::kNotSupported),
- defaultReadConcern(defaultPermitted ? DefaultReadConcern::kPermitted
- : DefaultReadConcern::kNotPermitted) {}
-};
-
-/**
* Represents a single invocation of a given command.
*/
class CommandInvocation {
@@ -516,8 +484,9 @@ public:
* Returns this invocation's support for readConcern.
*/
virtual ReadConcernSupportResult supportsReadConcern(repl::ReadConcernLevel level) const {
- return {level == repl::ReadConcernLevel::kLocalReadConcern,
- ReadConcernSupportResult::DefaultReadConcern::kNotPermitted};
+ return {{level != repl::ReadConcernLevel::kLocalReadConcern,
+ {ErrorCodes::InvalidOptions, "read concern not supported"}},
+ {{ErrorCodes::InvalidOptions, "default read concern not permitted"}}};
}
/**
@@ -667,8 +636,9 @@ public:
virtual ReadConcernSupportResult supportsReadConcern(const std::string& dbName,
const BSONObj& cmdObj,
repl::ReadConcernLevel level) const {
- return {level == repl::ReadConcernLevel::kLocalReadConcern,
- ReadConcernSupportResult::DefaultReadConcern::kNotPermitted};
+ return {{level != repl::ReadConcernLevel::kLocalReadConcern,
+ {ErrorCodes::InvalidOptions, "read concern not supported"}},
+ {{ErrorCodes::InvalidOptions, "default read concern not permitted"}}};
}
virtual bool allowsAfterClusterTime(const BSONObj& cmdObj) const {
diff --git a/src/mongo/db/commands/count_cmd.cpp b/src/mongo/db/commands/count_cmd.cpp
index da2033e7645..b31bb5822cc 100644
--- a/src/mongo/db/commands/count_cmd.cpp
+++ b/src/mongo/db/commands/count_cmd.cpp
@@ -93,8 +93,7 @@ public:
ReadConcernSupportResult supportsReadConcern(const std::string& dbName,
const BSONObj& cmdObj,
repl::ReadConcernLevel level) const override {
- return {ReadConcernSupportResult::ReadConcern::kSupported,
- ReadConcernSupportResult::DefaultReadConcern::kPermitted};
+ return ReadConcernSupportResult::allSupportedAndDefaultPermitted();
}
ReadWriteType getReadWriteType() const override {
diff --git a/src/mongo/db/commands/distinct.cpp b/src/mongo/db/commands/distinct.cpp
index ff93a8fcfae..4da2af80abf 100644
--- a/src/mongo/db/commands/distinct.cpp
+++ b/src/mongo/db/commands/distinct.cpp
@@ -87,8 +87,7 @@ public:
ReadConcernSupportResult supportsReadConcern(const std::string& dbName,
const BSONObj& cmdObj,
repl::ReadConcernLevel level) const override {
- return {ReadConcernSupportResult::ReadConcern::kSupported,
- ReadConcernSupportResult::DefaultReadConcern::kPermitted};
+ return ReadConcernSupportResult::allSupportedAndDefaultPermitted();
}
ReadWriteType getReadWriteType() const override {
diff --git a/src/mongo/db/commands/explain_cmd.cpp b/src/mongo/db/commands/explain_cmd.cpp
index 78fa4d67217..2d3460d6a63 100644
--- a/src/mongo/db/commands/explain_cmd.cpp
+++ b/src/mongo/db/commands/explain_cmd.cpp
@@ -170,7 +170,7 @@ std::unique_ptr<CommandInvocation> CmdExplain::parse(OperationContext* opCtx,
explainedCommand);
auto innerRequest =
std::make_unique<OpMsgRequest>(OpMsgRequest::fromDBAndBody(dbname, explainedObj));
- auto innerInvocation = explainedCommand->parse(opCtx, *innerRequest);
+ auto innerInvocation = explainedCommand->parseForExplain(opCtx, *innerRequest, verbosity);
return std::make_unique<Invocation>(
this, request, std::move(verbosity), std::move(innerRequest), std::move(innerInvocation));
}
diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp
index 56e65aad432..bd24d034e29 100644
--- a/src/mongo/db/commands/find_cmd.cpp
+++ b/src/mongo/db/commands/find_cmd.cpp
@@ -178,8 +178,7 @@ public:
}
ReadConcernSupportResult supportsReadConcern(repl::ReadConcernLevel level) const final {
- return {ReadConcernSupportResult::ReadConcern::kSupported,
- ReadConcernSupportResult::DefaultReadConcern::kPermitted};
+ return ReadConcernSupportResult::allSupportedAndDefaultPermitted();
}
bool canIgnorePrepareConflicts() const override {
diff --git a/src/mongo/db/commands/haystack.cpp b/src/mongo/db/commands/haystack.cpp
index a14a4fd3a29..3065e3e3f7c 100644
--- a/src/mongo/db/commands/haystack.cpp
+++ b/src/mongo/db/commands/haystack.cpp
@@ -76,8 +76,7 @@ public:
ReadConcernSupportResult supportsReadConcern(const std::string& dbName,
const BSONObj& cmdObj,
repl::ReadConcernLevel level) const final {
- return {ReadConcernSupportResult::ReadConcern::kSupported,
- ReadConcernSupportResult::DefaultReadConcern::kPermitted};
+ return ReadConcernSupportResult::allSupportedAndDefaultPermitted();
}
ReadWriteType getReadWriteType() const {
diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp
index 85c220d091a..a70ae8a51d3 100644
--- a/src/mongo/db/commands/pipeline_command.cpp
+++ b/src/mongo/db/commands/pipeline_command.cpp
@@ -33,42 +33,62 @@
#include "mongo/db/commands.h"
#include "mongo/db/commands/run_aggregate.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/pipeline/lite_parsed_pipeline.h"
#include "mongo/db/pipeline/pipeline.h"
namespace mongo {
namespace {
-bool isMergePipeline(const std::vector<BSONObj>& pipeline) {
- if (pipeline.empty()) {
- return false;
- }
- return pipeline[0].hasField("$mergeCursors");
-}
-
class PipelineCommand final : public Command {
public:
PipelineCommand() : Command("aggregate") {}
+ /**
+ * It's not known until after parsing whether or not an aggregation command is an explain
+ * request, because it might include the `explain: true` field (ie. aggregation explains do not
+ * need to arrive via the `explain` command). Therefore even parsing of regular aggregation
+ * commands needs to be able to handle the explain case.
+ *
+ * As a result, aggregation command parsing is done in parseForExplain():
+ *
+ * - To parse a regular aggregation command, call parseForExplain() with `explainVerbosity` of
+ * boost::none.
+ *
+ * - To parse an aggregation command as the sub-command in an `explain` command, call
+ * parseForExplain() with `explainVerbosity` set to the desired verbosity.
+ */
std::unique_ptr<CommandInvocation> parse(OperationContext* opCtx,
const OpMsgRequest& opMsgRequest) override {
- // TODO: Parsing to a Pipeline and/or AggregationRequest here.
-
- auto privileges =
- uassertStatusOK(AuthorizationSession::get(opCtx->getClient())
- ->getPrivilegesForAggregate(
- AggregationRequest::parseNs(
- opMsgRequest.getDatabase().toString(), opMsgRequest.body),
- opMsgRequest.body,
- false));
- return std::make_unique<Invocation>(this, opMsgRequest, std::move(privileges));
+ return parseForExplain(opCtx, opMsgRequest, boost::none);
+ }
+
+ std::unique_ptr<CommandInvocation> parseForExplain(
+ OperationContext* opCtx,
+ const OpMsgRequest& opMsgRequest,
+ boost::optional<ExplainOptions::Verbosity> explainVerbosity) override {
+ const auto aggregationRequest = uassertStatusOK(AggregationRequest::parseFromBSON(
+ opMsgRequest.getDatabase().toString(), opMsgRequest.body, explainVerbosity));
+
+ auto privileges = uassertStatusOK(
+ AuthorizationSession::get(opCtx->getClient())
+ ->getPrivilegesForAggregate(
+ aggregationRequest.getNamespaceString(), opMsgRequest.body, false));
+
+ return std::make_unique<Invocation>(
+ this, opMsgRequest, std::move(aggregationRequest), std::move(privileges));
}
class Invocation final : public CommandInvocation {
public:
- Invocation(Command* cmd, const OpMsgRequest& request, PrivilegeVector privileges)
+ Invocation(Command* cmd,
+ const OpMsgRequest& request,
+ const AggregationRequest aggregationRequest,
+ PrivilegeVector privileges)
: CommandInvocation(cmd),
_request(request),
_dbName(request.getDatabase().toString()),
+ _aggregationRequest(std::move(aggregationRequest)),
+ _liteParsedPipeline(_aggregationRequest),
_privileges(std::move(privileges)) {}
private:
@@ -83,16 +103,10 @@ public:
}
ReadConcernSupportResult supportsReadConcern(repl::ReadConcernLevel level) const override {
- // Aggregations that are run directly against a collection allow any read concern.
- // Otherwise, if the aggregate is collectionless then the read concern must be 'local'
- // (e.g. $currentOp). The exception to this is a $changeStream on a whole database,
- // which is considered collectionless but must be read concern 'majority'. Further read
- // concern validation is done one the pipeline is parsed.
- return {level == repl::ReadConcernLevel::kLocalReadConcern ||
- level == repl::ReadConcernLevel::kMajorityReadConcern ||
- !AggregationRequest::parseNs(_dbName, _request.body)
- .isCollectionlessAggregateNS(),
- ReadConcernSupportResult::DefaultReadConcern::kPermitted};
+ return _liteParsedPipeline.supportsReadConcern(
+ level,
+ _aggregationRequest.getExplain(),
+ serverGlobalParams.enableMajorityReadConcern);
}
bool allowsSpeculativeMajorityReads() const override {
@@ -106,29 +120,27 @@ public:
CommandHelpers::handleMarkKillOnClientDisconnect(
opCtx, !Pipeline::aggHasWriteStage(_request.body));
- const auto aggregationRequest = uassertStatusOK(
- AggregationRequest::parseFromBSON(_dbName, _request.body, boost::none));
uassertStatusOK(runAggregate(opCtx,
- aggregationRequest.getNamespaceString(),
- aggregationRequest,
+ _aggregationRequest.getNamespaceString(),
+ _aggregationRequest,
+ _liteParsedPipeline,
_request.body,
_privileges,
reply));
}
NamespaceString ns() const override {
- return AggregationRequest::parseNs(_dbName, _request.body);
+ return _aggregationRequest.getNamespaceString();
}
void explain(OperationContext* opCtx,
ExplainOptions::Verbosity verbosity,
rpc::ReplyBuilderInterface* result) override {
- const auto aggregationRequest = uassertStatusOK(
- AggregationRequest::parseFromBSON(_dbName, _request.body, verbosity));
uassertStatusOK(runAggregate(opCtx,
- aggregationRequest.getNamespaceString(),
- aggregationRequest,
+ _aggregationRequest.getNamespaceString(),
+ _aggregationRequest,
+ _liteParsedPipeline,
_request.body,
_privileges,
result));
@@ -143,6 +155,8 @@ public:
const OpMsgRequest& _request;
const std::string _dbName;
+ const AggregationRequest _aggregationRequest;
+ const LiteParsedPipeline _liteParsedPipeline;
const PrivilegeVector _privileges;
};
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index b7b87dd6018..ef7394df62a 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -480,8 +480,18 @@ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> createOuterPipelineProxyExe
} // namespace
Status runAggregate(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const AggregationRequest& request,
+ const BSONObj& cmdObj,
+ const PrivilegeVector& privileges,
+ rpc::ReplyBuilderInterface* result) {
+ return runAggregate(opCtx, nss, request, {request}, cmdObj, privileges, result);
+}
+
+Status runAggregate(OperationContext* opCtx,
const NamespaceString& origNss,
const AggregationRequest& request,
+ const LiteParsedPipeline& liteParsedPipeline,
const BSONObj& cmdObj,
const PrivilegeVector& privileges,
rpc::ReplyBuilderInterface* result) {
@@ -504,18 +514,12 @@ Status runAggregate(OperationContext* opCtx,
boost::intrusive_ptr<ExpressionContext> expCtx;
auto curOp = CurOp::get(opCtx);
{
- const LiteParsedPipeline liteParsedPipeline(request);
-
// If we are in a transaction, check whether the parsed pipeline supports
// being in a transaction.
if (opCtx->inMultiDocumentTransaction()) {
liteParsedPipeline.assertSupportsMultiDocumentTransaction(request.getExplain());
}
- // Check whether the parsed pipeline supports the given read concern.
- liteParsedPipeline.assertSupportsReadConcern(
- opCtx, request.getExplain(), serverGlobalParams.enableMajorityReadConcern);
-
const auto& pipelineInvolvedNamespaces = liteParsedPipeline.getInvolvedNamespaces();
// If this is a collectionless aggregation, we won't create 'ctx' but will still need an
diff --git a/src/mongo/db/commands/run_aggregate.h b/src/mongo/db/commands/run_aggregate.h
index 77d6bd7f9f9..059c48a599b 100644
--- a/src/mongo/db/commands/run_aggregate.h
+++ b/src/mongo/db/commands/run_aggregate.h
@@ -35,6 +35,7 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/pipeline/aggregation_request.h"
+#include "mongo/db/pipeline/lite_parsed_pipeline.h"
#include "mongo/rpc/op_msg_rpc_impls.h"
namespace mongo {
@@ -53,6 +54,17 @@ namespace mongo {
Status runAggregate(OperationContext* opCtx,
const NamespaceString& nss,
const AggregationRequest& request,
+ const LiteParsedPipeline& liteParsedPipeline,
+ const BSONObj& cmdObj,
+ const PrivilegeVector& privileges,
+ rpc::ReplyBuilderInterface* result);
+
+/**
+ * Convenience version that internally constructs the LiteParsedPipeline.
+ */
+Status runAggregate(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const AggregationRequest& request,
const BSONObj& cmdObj,
const PrivilegeVector& privileges,
rpc::ReplyBuilderInterface* result);
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
index d3492b0c319..8f8ef6f6e79 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.h
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -80,15 +80,13 @@ public:
}
}
- void assertSupportsReadConcern(const repl::ReadConcernArgs& readConcern) const {
- // Only "majority" is allowed for change streams.
- uassert(
- ErrorCodes::InvalidOptions,
- str::stream()
- << "$changeStream cannot run with a readConcern other than 'majority'. Current "
- << "readConcern: " << readConcern.toString(),
- !readConcern.hasLevel() ||
- readConcern.getLevel() == repl::ReadConcernLevel::kMajorityReadConcern);
+ ReadConcernSupportResult supportsReadConcern(repl::ReadConcernLevel level) const {
+ // Change streams require "majority" readConcern. If the client did not specify an
+ // explicit readConcern, change streams will internally upconvert the readConcern to
+ // majority (so clients can always send aggregations without readConcern). We therefore
+ // do not permit the cluster-wide default to be applied.
+ return onlySingleReadConcernSupported(
+ kStageName, repl::ReadConcernLevel::kMajorityReadConcern, level);
}
void assertSupportsMultiDocumentTransaction() const {
diff --git a/src/mongo/db/pipeline/document_source_current_op.h b/src/mongo/db/pipeline/document_source_current_op.h
index bc66e0fef64..f2e6470f370 100644
--- a/src/mongo/db/pipeline/document_source_current_op.h
+++ b/src/mongo/db/pipeline/document_source_current_op.h
@@ -79,8 +79,8 @@ public:
return true;
}
- void assertSupportsReadConcern(const repl::ReadConcernArgs& readConcern) const {
- onlyReadConcernLocalSupported(kStageName, readConcern);
+ ReadConcernSupportResult supportsReadConcern(repl::ReadConcernLevel level) const {
+ return onlyReadConcernLocalSupported(kStageName, level);
}
void assertSupportsMultiDocumentTransaction() const {
diff --git a/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h b/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h
index 9406bf2e126..fabc4e0cfd7 100644
--- a/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h
+++ b/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h
@@ -69,8 +69,8 @@ public:
return false;
}
- void assertSupportsReadConcern(const repl::ReadConcernArgs& readConcern) const {
- onlyReadConcernLocalSupported(kStageName, readConcern);
+ ReadConcernSupportResult supportsReadConcern(repl::ReadConcernLevel level) const {
+ return onlyReadConcernLocalSupported(kStageName, level);
}
void assertSupportsMultiDocumentTransaction() const {
diff --git a/src/mongo/db/pipeline/document_source_list_local_sessions.h b/src/mongo/db/pipeline/document_source_list_local_sessions.h
index b4439a4323d..0b1738b4f53 100644
--- a/src/mongo/db/pipeline/document_source_list_local_sessions.h
+++ b/src/mongo/db/pipeline/document_source_list_local_sessions.h
@@ -81,8 +81,8 @@ public:
return false;
}
- void assertSupportsReadConcern(const repl::ReadConcernArgs& readConcern) const {
- onlyReadConcernLocalSupported(DocumentSourceListLocalSessions::kStageName, readConcern);
+ ReadConcernSupportResult supportsReadConcern(repl::ReadConcernLevel level) const {
+ return onlyReadConcernLocalSupported(kStageName, level);
}
void assertSupportsMultiDocumentTransaction() const {
diff --git a/src/mongo/db/pipeline/document_source_plan_cache_stats.h b/src/mongo/db/pipeline/document_source_plan_cache_stats.h
index 5902896ee72..cdb582bed14 100644
--- a/src/mongo/db/pipeline/document_source_plan_cache_stats.h
+++ b/src/mongo/db/pipeline/document_source_plan_cache_stats.h
@@ -65,8 +65,8 @@ public:
return false;
}
- void assertSupportsReadConcern(const repl::ReadConcernArgs& readConcern) const {
- onlyReadConcernLocalSupported(DocumentSourcePlanCacheStats::kStageName, readConcern);
+ ReadConcernSupportResult supportsReadConcern(repl::ReadConcernLevel level) const {
+ return onlyReadConcernLocalSupported(kStageName, level);
}
void assertSupportsMultiDocumentTransaction() const {
diff --git a/src/mongo/db/pipeline/lite_parsed_document_source.h b/src/mongo/db/pipeline/lite_parsed_document_source.h
index 064e22389f3..82d9ce1b7f7 100644
--- a/src/mongo/db/pipeline/lite_parsed_document_source.h
+++ b/src/mongo/db/pipeline/lite_parsed_document_source.h
@@ -37,6 +37,7 @@
#include "mongo/db/auth/privilege.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/pipeline/aggregation_request.h"
+#include "mongo/db/read_concern_support_result.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/stdx/unordered_set.h"
@@ -130,10 +131,11 @@ public:
}
/**
- * Verifies that this stage is allowed to run with the specified read concern. Throws a
- * UserException if not compatible.
+ * Verifies that this stage is allowed to run with the specified read concern level.
*/
- virtual void assertSupportsReadConcern(const repl::ReadConcernArgs& readConcern) const {}
+ virtual ReadConcernSupportResult supportsReadConcern(repl::ReadConcernLevel level) const {
+ return ReadConcernSupportResult::allSupportedAndDefaultPermitted();
+ }
/**
* Verifies that this stage is allowed to run in a multi-document transaction. Throws a
@@ -150,14 +152,26 @@ protected:
<< "multi-document transaction.");
}
- void onlyReadConcernLocalSupported(StringData stageName,
- const repl::ReadConcernArgs& readConcern) const {
- uassert(ErrorCodes::InvalidOptions,
- str::stream()
- << "Aggregation stage " << stageName
- << " cannot run with a readConcern other than 'local'. Current readConcern: "
- << readConcern.toString(),
- readConcern.getLevel() == repl::ReadConcernLevel::kLocalReadConcern);
+ ReadConcernSupportResult onlySingleReadConcernSupported(
+ StringData stageName,
+ repl::ReadConcernLevel supportedLevel,
+ repl::ReadConcernLevel candidateLevel) const {
+ return {{candidateLevel != supportedLevel,
+ {ErrorCodes::InvalidOptions,
+ str::stream() << "Aggregation stage " << stageName
+ << " cannot run with a readConcern other than '"
+ << repl::readConcernLevels::toString(supportedLevel)
+ << "'. Current readConcern: "
+ << repl::readConcernLevels::toString(candidateLevel)}},
+ {{ErrorCodes::InvalidOptions,
+ str::stream() << "Aggregation stage " << stageName
+ << " does not permit default readConcern to be applied."}}};
+ }
+
+ ReadConcernSupportResult onlyReadConcernLocalSupported(StringData stageName,
+ repl::ReadConcernLevel level) const {
+ return onlySingleReadConcernSupported(
+ stageName, repl::ReadConcernLevel::kLocalReadConcern, level);
}
};
diff --git a/src/mongo/db/pipeline/lite_parsed_pipeline.cpp b/src/mongo/db/pipeline/lite_parsed_pipeline.cpp
index 6af52389e6a..954239fa928 100644
--- a/src/mongo/db/pipeline/lite_parsed_pipeline.cpp
+++ b/src/mongo/db/pipeline/lite_parsed_pipeline.cpp
@@ -35,30 +35,50 @@
namespace mongo {
-void LiteParsedPipeline::assertSupportsReadConcern(
- OperationContext* opCtx,
+ReadConcernSupportResult LiteParsedPipeline::supportsReadConcern(
+ repl::ReadConcernLevel level,
boost::optional<ExplainOptions::Verbosity> explain,
bool enableMajorityReadConcern) const {
- auto readConcern = repl::ReadConcernArgs::get(opCtx);
+ // Start by assuming that we will support both readConcern and cluster-wide default.
+ ReadConcernSupportResult result = ReadConcernSupportResult::allSupportedAndDefaultPermitted();
- // Reject non change stream aggregation queries that try to use "majority" read concern when
- // enableMajorityReadConcern=false.
+ // 1. Determine whether the given read concern must be rejected for any pipeline-global reasons.
if (!hasChangeStream() && !enableMajorityReadConcern &&
- (repl::ReadConcernArgs::get(opCtx).getLevel() ==
- repl::ReadConcernLevel::kMajorityReadConcern)) {
- uasserted(ErrorCodes::ReadConcernMajorityNotEnabled,
- "Only change stream aggregation queries support 'majority' read concern when "
- "enableMajorityReadConcern=false");
+ (level == repl::ReadConcernLevel::kMajorityReadConcern)) {
+ // Reject non change stream aggregation queries that try to use "majority" read concern when
+ // enableMajorityReadConcern=false.
+ result.readConcernSupport = {
+ ErrorCodes::ReadConcernMajorityNotEnabled,
+ "Only change stream aggregation queries support 'majority' read concern when "
+ "enableMajorityReadConcern=false"};
+ } else if (explain && level != repl::ReadConcernLevel::kLocalReadConcern) {
+ // Reject non-local read concern when the pipeline is being explained.
+ result.readConcernSupport = {
+ ErrorCodes::InvalidOptions,
+ str::stream() << "Explain for the aggregate command cannot run with a readConcern "
+ << "other than 'local'. Current readConcern level: "
+ << repl::readConcernLevels::toString(level)};
}
- uassert(ErrorCodes::InvalidOptions,
- str::stream() << "Explain for the aggregate command cannot run with a readConcern "
- << "other than 'local'. Current readConcern: " << readConcern.toString(),
- !explain || readConcern.getLevel() == repl::ReadConcernLevel::kLocalReadConcern);
+ // 2. Determine whether the default read concern must be denied for any pipeline-global reasons.
+ if (explain) {
+ result.defaultReadConcernPermit = {
+ ErrorCodes::InvalidOptions,
+ "Explain for the aggregate command does not permit default readConcern to be "
+ "applied."};
+ }
+ // 3. If either the specified or default readConcern have not already been rejected, determine
+ // whether the pipeline stages support them. If not, we record the first error we encounter.
for (auto&& spec : _stageSpecs) {
- spec->assertSupportsReadConcern(readConcern);
+ // If both result statuses are already not OK, stop checking further stages.
+ if (!result.readConcernSupport.isOK() && !result.defaultReadConcernPermit.isOK()) {
+ break;
+ }
+ result.merge(spec->supportsReadConcern(level));
}
+
+ return result;
}
void LiteParsedPipeline::assertSupportsMultiDocumentTransaction(
@@ -82,8 +102,6 @@ void LiteParsedPipeline::verifyIsSupported(
if (opCtx->inMultiDocumentTransaction()) {
assertSupportsMultiDocumentTransaction(explain);
}
- // Verify litePipe can be run at the given read concern.
- assertSupportsReadConcern(opCtx, explain, enableMajorityReadConcern);
// Verify that no involved namespace is sharded unless allowed by the pipeline.
for (const auto& nss : getInvolvedNamespaces()) {
uassert(28769,
diff --git a/src/mongo/db/pipeline/lite_parsed_pipeline.h b/src/mongo/db/pipeline/lite_parsed_pipeline.h
index 21e309dbaba..91404d25cbf 100644
--- a/src/mongo/db/pipeline/lite_parsed_pipeline.h
+++ b/src/mongo/db/pipeline/lite_parsed_pipeline.h
@@ -37,6 +37,7 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/pipeline/aggregation_request.h"
#include "mongo/db/pipeline/lite_parsed_document_source.h"
+#include "mongo/db/read_concern_support_result.h"
namespace mongo {
@@ -122,12 +123,11 @@ public:
}
/**
- * Verifies that this pipeline is allowed to run with the specified read concern. This ensures
- * that each stage is compatible, and throws a UserException if not.
+ * Verifies that this pipeline is allowed to run with the specified read concern level.
*/
- void assertSupportsReadConcern(OperationContext* opCtx,
- boost::optional<ExplainOptions::Verbosity> explain,
- bool enableMajorityReadConcern) const;
+ ReadConcernSupportResult supportsReadConcern(repl::ReadConcernLevel level,
+ boost::optional<ExplainOptions::Verbosity> explain,
+ bool enableMajorityReadConcern) const;
/**
* Verifies that this pipeline is allowed to run in a multi-document transaction. This ensures
diff --git a/src/mongo/db/read_concern_support_result.h b/src/mongo/db/read_concern_support_result.h
new file mode 100644
index 00000000000..9bca706a15b
--- /dev/null
+++ b/src/mongo/db/read_concern_support_result.h
@@ -0,0 +1,86 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/base/status.h"
+
+namespace mongo {
+
+/**
+ * The result of checking a thing's readConcern support. There are two parts:
+ * - Whether or not the thing supports the given readConcern.
+ * - Whether or not the thing permits having the default readConcern applied.
+ *
+ * The thing is a command invocation, an aggregation pipeline, or an aggregation stage.
+ */
+struct ReadConcernSupportResult {
+ /**
+ * Convenience method to explicitly return a ReadConcernSupportResult which both supports the
+ * given read concern and permits the default cluster-wide read concern to be applied.
+ */
+ static ReadConcernSupportResult allSupportedAndDefaultPermitted() {
+ return {Status::OK(), Status::OK()};
+ }
+
+ /**
+ * Whether this thing supports the requested readConcern level (and if not, why not).
+ */
+ Status readConcernSupport;
+
+ /**
+ * Whether this thing permits the default readConcern to be applied (and if not, why not).
+ */
+ Status defaultReadConcernPermit;
+
+ /**
+ * Construct with the given Statuses, or default to Status::OK if omitted.
+ */
+ ReadConcernSupportResult(boost::optional<Status> readConcernStatus,
+ boost::optional<Status> defaultReadConcernStatus)
+ : readConcernSupport(readConcernStatus.value_or(Status::OK())),
+ defaultReadConcernPermit(defaultReadConcernStatus.value_or(Status::OK())) {}
+
+ /**
+ * Combine the contents of another ReadConcernSupportResult with this one. The outcome is that,
+ * for each of supported and permitted, this ReadConcernSupportResult will be non-OK if either
+ * it or the other one is non-OK. If both are non-OK then preference is given to this one's
+ * Status.
+ */
+ void merge(const ReadConcernSupportResult& other) {
+ if (readConcernSupport.isOK()) {
+ readConcernSupport = other.readConcernSupport;
+ }
+ if (defaultReadConcernPermit.isOK()) {
+ defaultReadConcernPermit = other.defaultReadConcernPermit;
+ }
+ }
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index 4c7f25518c3..11d5ee56538 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -239,8 +239,7 @@ StatusWith<repl::ReadConcernArgs> _extractReadConcern(OperationContext* opCtx,
}
auto readConcernSupport = invocation->supportsReadConcern(readConcernArgs.getLevel());
- if (readConcernSupport.defaultReadConcern ==
- ReadConcernSupportResult::DefaultReadConcern::kPermitted &&
+ if (readConcernSupport.defaultReadConcernPermit.isOK() &&
!opCtx->getClient()->isInDirectClient()) {
if (serverGlobalParams.clusterRole == ClusterRole::ShardServer ||
serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
@@ -270,33 +269,45 @@ StatusWith<repl::ReadConcernArgs> _extractReadConcern(OperationContext* opCtx,
}
}
- auto readConcernLevel = readConcernArgs.getLevel();
// Update the readConcernSupport, in case the default RC was applied.
- readConcernSupport = invocation->supportsReadConcern(readConcernLevel);
- if (startTransaction && readConcernLevel != repl::ReadConcernLevel::kSnapshotReadConcern &&
- readConcernLevel != repl::ReadConcernLevel::kMajorityReadConcern &&
- readConcernLevel != repl::ReadConcernLevel::kLocalReadConcern) {
- return Status(
- ErrorCodes::InvalidOptions,
- "The readConcern level must be either 'local' (default), 'majority' or 'snapshot' in "
- "order to run in a transaction");
- }
-
- if (startTransaction && readConcernArgs.getArgsOpTime()) {
- return Status(ErrorCodes::InvalidOptions,
- str::stream()
- << "The readConcern cannot specify '"
- << repl::ReadConcernArgs::kAfterOpTimeFieldName << "' in a transaction");
+ readConcernSupport = invocation->supportsReadConcern(readConcernArgs.getLevel());
+
+ // If we are starting a transaction, we only need to check whether the read concern is
+ // appropriate for running a transaction. There is no need to check whether the specific
+ // command supports the read concern, because all commands that are allowed to run in a
+ // transaction must support all applicable read concerns.
+ if (startTransaction) {
+ switch (readConcernArgs.getLevel()) {
+ case repl::ReadConcernLevel::kLocalReadConcern:
+ case repl::ReadConcernLevel::kMajorityReadConcern:
+ case repl::ReadConcernLevel::kSnapshotReadConcern:
+ // Acceptable readConcern for a transaction.
+ break;
+ default:
+ return {ErrorCodes::InvalidOptions,
+ "The readConcern level must be either 'local' (default), 'majority' or "
+ "'snapshot' in "
+ "order to run in a transaction"};
+ }
+ if (readConcernArgs.getArgsOpTime()) {
+ return {ErrorCodes::InvalidOptions,
+ str::stream() << "The readConcern cannot specify '"
+ << repl::ReadConcernArgs::kAfterOpTimeFieldName
+ << "' in a transaction"};
+ }
}
- // There is no need to check if the command supports the read concern while in a transaction
- // because all commands that are allowed to run in a transaction must support all the read
- // concerns that can be used with a transaction.
- if (!startTransaction &&
- readConcernSupport.readConcern == ReadConcernSupportResult::ReadConcern::kNotSupported) {
- return {ErrorCodes::InvalidOptions,
- str::stream() << "Command does not support read concern "
- << readConcernArgs.toString()};
+ // Otherwise, if there is a read concern present - either user-specified or from the default -
+ // then check whether the command supports it. If there is no explicit read concern level, then
+ // it is implicitly "local". There is no need to check whether this is supported, because all
+ // commands either support "local" or upconvert the absent readConcern to a stronger level that
+ // they do support; for instance, $changeStream upconverts to RC level "majority".
+ if (!startTransaction && readConcernArgs.hasLevel()) {
+ if (!readConcernSupport.readConcernSupport.isOK()) {
+ return readConcernSupport.readConcernSupport.withContext(
+ str::stream() << "Command " << invocation->definition()->getName()
+ << " does not support " << readConcernArgs.toString());
+ }
}
// If this command invocation asked for 'majority' read concern, supports blocking majority