summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2017-05-25 20:36:12 +0100
committerBernard Gorman <bernard.gorman@gmail.com>2017-05-26 08:05:22 +0100
commit58486605c09672b3bfce8608dca403a145413bba (patch)
tree62f44cbd43515b591e6af6347c0a75eb5c6ab5fb /src
parent4b88bf79dc47cbdfd74a11605cc5ee1c61e379f8 (diff)
downloadmongo-58486605c09672b3bfce8608dca403a145413bba.tar.gz
SERVER-19318 Add $currentOp aggregation stage for mongoD
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/auth/authorization_session.cpp52
-rw-r--r--src/mongo/db/auth/authorization_session_test.cpp31
-rw-r--r--src/mongo/db/commands.h8
-rw-r--r--src/mongo/db/commands/count_cmd.cpp2
-rw-r--r--src/mongo/db/commands/distinct.cpp2
-rw-r--r--src/mongo/db/commands/find_cmd.cpp2
-rw-r--r--src/mongo/db/commands/geo_near_cmd.cpp2
-rw-r--r--src/mongo/db/commands/getmore_cmd.cpp2
-rw-r--r--src/mongo/db/commands/group_cmd.cpp2
-rw-r--r--src/mongo/db/commands/haystack.cpp2
-rw-r--r--src/mongo/db/commands/parallel_collection_scan.cpp2
-rw-r--r--src/mongo/db/commands/pipeline_command.cpp13
-rw-r--r--src/mongo/db/namespace_string.cpp12
-rw-r--r--src/mongo/db/namespace_string.h7
-rw-r--r--src/mongo/db/pipeline/SConscript2
-rw-r--r--src/mongo/db/pipeline/aggregation_context_fixture.h6
-rw-r--r--src/mongo/db/pipeline/aggregation_request.cpp35
-rw-r--r--src/mongo/db/pipeline/aggregation_request.h16
-rw-r--r--src/mongo/db/pipeline/aggregation_request_test.cpp60
-rw-r--r--src/mongo/db/pipeline/document_source.h51
-rw-r--r--src/mongo/db/pipeline/document_source_coll_stats.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_coll_stats.h2
-rw-r--r--src/mongo/db/pipeline/document_source_current_op.cpp178
-rw-r--r--src/mongo/db/pipeline/document_source_current_op.h73
-rw-r--r--src/mongo/db/pipeline/document_source_current_op_test.cpp235
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.h4
-rw-r--r--src/mongo/db/pipeline/document_source_facet.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_facet_test.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_geo_near.h4
-rw-r--r--src/mongo/db/pipeline/document_source_index_stats.h4
-rw-r--r--src/mongo/db/pipeline/document_source_merge_cursors.h4
-rw-r--r--src/mongo/db/pipeline/document_source_mock.h4
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp39
-rw-r--r--src/mongo/db/pipeline/pipeline.h7
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp77
-rw-r--r--src/mongo/db/pipeline/pipeline_test.cpp59
-rw-r--r--src/mongo/db/pipeline/stub_mongod_interface.h9
-rw-r--r--src/mongo/db/run_commands.cpp7
38 files changed, 965 insertions, 60 deletions
diff --git a/src/mongo/db/auth/authorization_session.cpp b/src/mongo/db/auth/authorization_session.cpp
index eb4de1b2083..cd2eac0d1cd 100644
--- a/src/mongo/db/auth/authorization_session.cpp
+++ b/src/mongo/db/auth/authorization_session.cpp
@@ -256,10 +256,15 @@ void AuthorizationSession::_addPrivilegesForStage(const std::string& db,
Status AuthorizationSession::checkAuthForAggregate(const NamespaceString& ns,
const BSONObj& cmdObj) {
std::string db(ns.db().toString());
- auto inputResource = ResourcePattern::forExactNamespace(ns);
uassert(
17138, mongoutils::str::stream() << "Invalid input namespace, " << ns.ns(), ns.isValid());
+ // If this connection does not need to be authenticated (for instance, if auth is disabled),
+ // return Status::OK() immediately.
+ if (_externalState->shouldIgnoreAuthChecks()) {
+ return Status::OK();
+ }
+
PrivilegeVector privileges;
BSONElement pipelineElem = cmdObj["pipeline"];
@@ -270,8 +275,8 @@ Status AuthorizationSession::checkAuthForAggregate(const NamespaceString& ns,
BSONObj pipeline = pipelineElem.embeddedObject();
if (pipeline.isEmpty()) {
// The pipeline is empty, so we require only the find action.
- Privilege::addPrivilegeToPrivilegeVector(&privileges,
- Privilege(inputResource, ActionType::find));
+ Privilege::addPrivilegeToPrivilegeVector(
+ &privileges, Privilege(ResourcePattern::forExactNamespace(ns), ActionType::find));
} else {
if (pipeline.firstElementType() != BSONType::Object) {
// The pipeline contains something that's not an object.
@@ -284,15 +289,48 @@ Status AuthorizationSession::checkAuthForAggregate(const NamespaceString& ns,
BSONObj firstPipelineStage = pipeline.firstElement().embeddedObject();
if (str::equals("$indexStats", firstPipelineStage.firstElementFieldName())) {
Privilege::addPrivilegeToPrivilegeVector(
- &privileges, Privilege(inputResource, ActionType::indexStats));
+ &privileges,
+ Privilege(ResourcePattern::forExactNamespace(ns), ActionType::indexStats));
} else if (str::equals("$collStats", firstPipelineStage.firstElementFieldName())) {
Privilege::addPrivilegeToPrivilegeVector(
- &privileges, Privilege(inputResource, ActionType::collStats));
+ &privileges,
+ Privilege(ResourcePattern::forExactNamespace(ns), ActionType::collStats));
+ } else if (str::equals("$currentOp", firstPipelineStage.firstElementFieldName())) {
+ // Need to check the value of allUsers; if true then inprog privilege is required.
+ // {$currentOp: {idleConnections: <boolean|false>, allUsers: <boolean|false>}}
+ BSONElement spec = firstPipelineStage["$currentOp"];
+
+ if (spec.type() != BSONType::Object) {
+ return Status(
+ ErrorCodes::TypeMismatch,
+ str::stream()
+ << "$currentOp options must be specified in an object, but found: "
+ << typeName(spec.type()));
+ }
+
+ auto allUsersElt = spec["allUsers"];
+
+ if (allUsersElt && allUsersElt.type() != BSONType::Bool) {
+ return Status(ErrorCodes::TypeMismatch,
+ str::stream() << "The 'allUsers' parameter of the $currentOp stage "
+ "must be a boolean value, but found: "
+ << typeName(allUsersElt.type()));
+ }
+
+ if (allUsersElt && allUsersElt.boolean()) {
+ Privilege::addPrivilegeToPrivilegeVector(
+ &privileges,
+ Privilege(ResourcePattern::forClusterResource(), ActionType::inprog));
+ } else if (!getAuthenticatedUserNames().more()) {
+ // This connection is not authenticated, so we should return an error even though
+ // there are no privilege requirements when allUsers is false.
+ return Status(ErrorCodes::Unauthorized, "unauthorized");
+ }
} else {
// If no source requiring an alternative permission scheme is specified then default to
// requiring find() privileges on the given namespace.
- Privilege::addPrivilegeToPrivilegeVector(&privileges,
- Privilege(inputResource, ActionType::find));
+ Privilege::addPrivilegeToPrivilegeVector(
+ &privileges, Privilege(ResourcePattern::forExactNamespace(ns), ActionType::find));
}
// Add additional required privileges for each stage in the pipeline.
diff --git a/src/mongo/db/auth/authorization_session_test.cpp b/src/mongo/db/auth/authorization_session_test.cpp
index 1537f7dce73..806bc3795e8 100644
--- a/src/mongo/db/auth/authorization_session_test.cpp
+++ b/src/mongo/db/auth/authorization_session_test.cpp
@@ -627,6 +627,37 @@ TEST_F(AuthorizationSessionTest, CanAggregateIndexStatsWithIndexStatsAction) {
ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj));
}
+TEST_F(AuthorizationSessionTest, CanAggregateCurrentOpAllUsersFalseWithoutInprogAction) {
+ authzSession->assumePrivilegesForDB(Privilege(testFooCollResource, {ActionType::find}));
+
+ BSONArray pipeline = BSON_ARRAY(BSON("$currentOp" << BSON("allUsers" << false)));
+ BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline);
+ ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj));
+}
+
+TEST_F(AuthorizationSessionTest, CannotAggregateCurrentOpAllUsersFalseIfNotAuthenticated) {
+ BSONArray pipeline = BSON_ARRAY(BSON("$currentOp" << BSON("allUsers" << false)));
+ BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline);
+ ASSERT_EQ(ErrorCodes::Unauthorized, authzSession->checkAuthForAggregate(testFooNss, cmdObj));
+}
+
+TEST_F(AuthorizationSessionTest, CannotAggregateCurrentOpAllUsersTrueWithoutInprogAction) {
+ authzSession->assumePrivilegesForDB(Privilege(testFooCollResource, {ActionType::find}));
+
+ BSONArray pipeline = BSON_ARRAY(BSON("$currentOp" << BSON("allUsers" << true)));
+ BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline);
+ ASSERT_EQ(ErrorCodes::Unauthorized, authzSession->checkAuthForAggregate(testFooNss, cmdObj));
+}
+
+TEST_F(AuthorizationSessionTest, CanAggregateCurrentOpAllUsersTrueWithInprogAction) {
+ authzSession->assumePrivilegesForDB(
+ Privilege(ResourcePattern::forClusterResource(), {ActionType::inprog}));
+
+ BSONArray pipeline = BSON_ARRAY(BSON("$currentOp" << BSON("allUsers" << true)));
+ BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline);
+ ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj));
+}
+
TEST_F(AuthorizationSessionTest, AddPrivilegesForStageFailsIfOutNamespaceIsNotValid) {
BSONArray pipeline = BSON_ARRAY(BSON("$out"
<< ""));
diff --git a/src/mongo/db/commands.h b/src/mongo/db/commands.h
index fd0840fdddd..94c2f5d1a06 100644
--- a/src/mongo/db/commands.h
+++ b/src/mongo/db/commands.h
@@ -205,7 +205,9 @@ public:
virtual bool maintenanceOk() const = 0;
/**
- * Returns true if this Command supports the readConcern argument.
+ * Returns true if this Command supports the readConcern argument. Takes the command object and
+ * the name of the database on which it was invoked as arguments, so that readConcern can be
+ * conditionally rejected based on the command's parameters and/or namespace.
*
* If the readConcern argument is sent to a command that returns false the command processor
* will reject the command, returning an appropriate error message. For commands that support
@@ -216,7 +218,7 @@ public:
* the option to the shards as needed. We rely on the shards to fail the commands in the
* cases where it isn't supported.
*/
- virtual bool supportsReadConcern() const = 0;
+ virtual bool supportsReadConcern(const std::string& dbName, const BSONObj& cmdObj) const = 0;
/**
* Returns LogicalOp for this command.
@@ -348,7 +350,7 @@ public:
return true; /* assumed true prior to commit */
}
- bool supportsReadConcern() const override {
+ bool supportsReadConcern(const std::string& dbName, const BSONObj& cmdObj) const override {
return false;
}
diff --git a/src/mongo/db/commands/count_cmd.cpp b/src/mongo/db/commands/count_cmd.cpp
index 0003b2bf806..bb836ace5dd 100644
--- a/src/mongo/db/commands/count_cmd.cpp
+++ b/src/mongo/db/commands/count_cmd.cpp
@@ -80,7 +80,7 @@ public:
return false;
}
- bool supportsReadConcern() const final {
+ bool supportsReadConcern(const std::string& dbName, const BSONObj& cmdObj) const final {
return true;
}
diff --git a/src/mongo/db/commands/distinct.cpp b/src/mongo/db/commands/distinct.cpp
index c1958e34f55..6643c952395 100644
--- a/src/mongo/db/commands/distinct.cpp
+++ b/src/mongo/db/commands/distinct.cpp
@@ -88,7 +88,7 @@ public:
return false;
}
- bool supportsReadConcern() const final {
+ bool supportsReadConcern(const std::string& dbName, const BSONObj& cmdObj) const final {
return true;
}
diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp
index 8c7759471ce..4a1fbd18510 100644
--- a/src/mongo/db/commands/find_cmd.cpp
+++ b/src/mongo/db/commands/find_cmd.cpp
@@ -94,7 +94,7 @@ public:
return false;
}
- bool supportsReadConcern() const final {
+ bool supportsReadConcern(const std::string& dbName, const BSONObj& cmdObj) const final {
return true;
}
diff --git a/src/mongo/db/commands/geo_near_cmd.cpp b/src/mongo/db/commands/geo_near_cmd.cpp
index ce4f54a5790..ac62769c96d 100644
--- a/src/mongo/db/commands/geo_near_cmd.cpp
+++ b/src/mongo/db/commands/geo_near_cmd.cpp
@@ -74,7 +74,7 @@ public:
bool slaveOverrideOk() const {
return true;
}
- bool supportsReadConcern() const final {
+ bool supportsReadConcern(const std::string& dbName, const BSONObj& cmdObj) const final {
return true;
}
diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp
index 59c2848313c..d14f77b249b 100644
--- a/src/mongo/db/commands/getmore_cmd.cpp
+++ b/src/mongo/db/commands/getmore_cmd.cpp
@@ -97,7 +97,7 @@ public:
return false;
}
- bool supportsReadConcern() const final {
+ bool supportsReadConcern(const std::string& dbName, const BSONObj& cmdObj) const final {
// Uses the readConcern setting from whatever created the cursor.
return false;
}
diff --git a/src/mongo/db/commands/group_cmd.cpp b/src/mongo/db/commands/group_cmd.cpp
index 45cd9c999ce..c0cccd56f6a 100644
--- a/src/mongo/db/commands/group_cmd.cpp
+++ b/src/mongo/db/commands/group_cmd.cpp
@@ -80,7 +80,7 @@ private:
return true;
}
- bool supportsReadConcern() const final {
+ bool supportsReadConcern(const std::string& dbName, const BSONObj& cmdObj) const final {
return true;
}
diff --git a/src/mongo/db/commands/haystack.cpp b/src/mongo/db/commands/haystack.cpp
index b2b7f9977b6..84e9bda79e8 100644
--- a/src/mongo/db/commands/haystack.cpp
+++ b/src/mongo/db/commands/haystack.cpp
@@ -75,7 +75,7 @@ public:
return true;
}
- bool supportsReadConcern() const final {
+ bool supportsReadConcern(const std::string& dbName, const BSONObj& cmdObj) const final {
return true;
}
diff --git a/src/mongo/db/commands/parallel_collection_scan.cpp b/src/mongo/db/commands/parallel_collection_scan.cpp
index a81c9e3a4c5..de4ce3aa8f2 100644
--- a/src/mongo/db/commands/parallel_collection_scan.cpp
+++ b/src/mongo/db/commands/parallel_collection_scan.cpp
@@ -66,7 +66,7 @@ public:
return true;
}
- bool supportsReadConcern() const final {
+ bool supportsReadConcern(const std::string& dbName, const BSONObj& cmdObj) const final {
return true;
}
diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp
index 79a2e76270d..2d9a3e9006d 100644
--- a/src/mongo/db/commands/pipeline_command.cpp
+++ b/src/mongo/db/commands/pipeline_command.cpp
@@ -66,8 +66,8 @@ public:
return true;
}
- bool supportsReadConcern() const override {
- return true;
+ bool supportsReadConcern(const std::string& dbName, const BSONObj& cmdObj) const override {
+ return !AggregationRequest::parseNs(dbName, cmdObj).isCollectionlessAggregateNS();
}
ReadWriteType getReadWriteType() const {
@@ -77,7 +77,7 @@ public:
Status checkAuthForCommand(Client* client,
const std::string& dbname,
const BSONObj& cmdObj) override {
- const NamespaceString nss(parseNsCollectionRequired(dbname, cmdObj));
+ const NamespaceString nss(AggregationRequest::parseNs(dbname, cmdObj));
return AuthorizationSession::get(client)->checkAuthForAggregate(nss, cmdObj);
}
@@ -104,10 +104,8 @@ private:
const BSONObj& cmdObj,
boost::optional<ExplainOptions::Verbosity> verbosity,
BSONObjBuilder* result) {
- const NamespaceString nss(parseNsCollectionRequired(dbname, cmdObj));
-
const auto aggregationRequest =
- uassertStatusOK(AggregationRequest::parseFromBSON(nss, cmdObj, verbosity));
+ uassertStatusOK(AggregationRequest::parseFromBSON(dbname, cmdObj, verbosity));
// If the featureCompatibilityVersion is 3.2, we disallow collation from the user. However,
// operations should still respect the collection default collation. The mongos attaches the
@@ -122,7 +120,8 @@ private:
ServerGlobalParams::FeatureCompatibility::Version::k32 ||
isMergePipeline(aggregationRequest.getPipeline()));
- return runAggregate(opCtx, nss, aggregationRequest, cmdObj, *result);
+ return runAggregate(
+ opCtx, aggregationRequest.getNamespaceString(), aggregationRequest, cmdObj, *result);
}
} pipelineCmd;
diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp
index d2bc6ac1545..0e195ee7870 100644
--- a/src/mongo/db/namespace_string.cpp
+++ b/src/mongo/db/namespace_string.cpp
@@ -77,6 +77,7 @@ const char kLogicalTimeKeysCollection[] = "admin.system.keys";
constexpr auto listCollectionsCursorCol = "$cmd.listCollections"_sd;
constexpr auto listIndexesCursorNSPrefix = "$cmd.listIndexes."_sd;
+constexpr auto collectionlessAggregateCursorCol = "$cmd.aggregate"_sd;
constexpr auto dropPendingNSPrefix = "system.drop."_sd;
} // namespace
@@ -124,6 +125,10 @@ bool NamespaceString::isListIndexesCursorNS() const {
coll().startsWith(listIndexesCursorNSPrefix);
}
+bool NamespaceString::isCollectionlessAggregateNS() const {
+ return coll() == collectionlessAggregateCursorCol;
+}
+
NamespaceString NamespaceString::makeListCollectionsNSS(StringData dbName) {
NamespaceString nss(dbName, listCollectionsCursorCol);
dassert(nss.isValid());
@@ -138,6 +143,13 @@ NamespaceString NamespaceString::makeListIndexesNSS(StringData dbName, StringDat
return nss;
}
+NamespaceString NamespaceString::makeCollectionlessAggregateNSS(StringData dbname) {
+ NamespaceString nss(dbname, collectionlessAggregateCursorCol);
+ dassert(nss.isValid());
+ dassert(nss.isCollectionlessAggregateNS());
+ return nss;
+}
+
NamespaceString NamespaceString::getTargetNSForListIndexes() const {
dassert(isListIndexesCursorNS());
return NamespaceString(db(), coll().substr(listIndexesCursorNSPrefix.size()));
diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h
index deaf6ba2111..f49054c57ef 100644
--- a/src/mongo/db/namespace_string.h
+++ b/src/mongo/db/namespace_string.h
@@ -96,6 +96,12 @@ public:
NamespaceString(StringData dbName, StringData collectionName);
/**
+ * Constructs the namespace '<dbName>.$cmd.aggregate', which we use as the namespace for
+ * aggregation commands with the format {aggregate: 1}.
+ */
+ static NamespaceString makeCollectionlessAggregateNSS(StringData dbName);
+
+ /**
* Constructs a NamespaceString representing a listCollections namespace. The format for this
* namespace is "<dbName>.$cmd.listCollections".
*/
@@ -213,6 +219,7 @@ public:
return coll().startsWith("$cmd."_sd);
}
+ bool isCollectionlessAggregateNS() const;
bool isListCollectionsCursorNS() const;
bool isListIndexesCursorNS() const;
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index b9d2ee31c33..f8b58061d5c 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -128,6 +128,7 @@ env.CppUnitTest(
'document_source_bucket_auto_test.cpp',
'document_source_bucket_test.cpp',
'document_source_count_test.cpp',
+ 'document_source_current_op_test.cpp',
'document_source_geo_near_test.cpp',
'document_source_group_test.cpp',
'document_source_limit_test.cpp',
@@ -227,6 +228,7 @@ docSourceEnv.Library(
'document_source_bucket_auto.cpp',
'document_source_coll_stats.cpp',
'document_source_count.cpp',
+ 'document_source_current_op.cpp',
'document_source_geo_near.cpp',
'document_source_group.cpp',
'document_source_index_stats.cpp',
diff --git a/src/mongo/db/pipeline/aggregation_context_fixture.h b/src/mongo/db/pipeline/aggregation_context_fixture.h
index 786f50a9b5a..ebdb5edcd27 100644
--- a/src/mongo/db/pipeline/aggregation_context_fixture.h
+++ b/src/mongo/db/pipeline/aggregation_context_fixture.h
@@ -46,10 +46,12 @@ namespace mongo {
class AggregationContextFixture : public unittest::Test {
public:
AggregationContextFixture()
+ : AggregationContextFixture(NamespaceString("unittests.pipeline_test")) {}
+
+ AggregationContextFixture(NamespaceString nss)
: _queryServiceContext(stdx::make_unique<QueryTestServiceContext>()),
_opCtx(_queryServiceContext->makeOperationContext()),
- _expCtx(new ExpressionContextForTest(
- _opCtx.get(), AggregationRequest(NamespaceString("unittests.pipeline_test"), {}))) {}
+ _expCtx(new ExpressionContextForTest(_opCtx.get(), AggregationRequest(nss, {}))) {}
boost::intrusive_ptr<ExpressionContextForTest> getExpCtx() {
return _expCtx.get();
diff --git a/src/mongo/db/pipeline/aggregation_request.cpp b/src/mongo/db/pipeline/aggregation_request.cpp
index d2cdfddd0c5..844907bd89a 100644
--- a/src/mongo/db/pipeline/aggregation_request.cpp
+++ b/src/mongo/db/pipeline/aggregation_request.cpp
@@ -82,6 +82,13 @@ StatusWith<std::vector<BSONObj>> AggregationRequest::parsePipelineFromBSON(
}
StatusWith<AggregationRequest> AggregationRequest::parseFromBSON(
+ const std::string& dbName,
+ const BSONObj& cmdObj,
+ boost::optional<ExplainOptions::Verbosity> explainVerbosity) {
+ return parseFromBSON(parseNs(dbName, cmdObj), cmdObj, explainVerbosity);
+}
+
+StatusWith<AggregationRequest> AggregationRequest::parseFromBSON(
NamespaceString nss,
const BSONObj& cmdObj,
boost::optional<ExplainOptions::Verbosity> explainVerbosity) {
@@ -232,10 +239,36 @@ StatusWith<AggregationRequest> AggregationRequest::parseFromBSON(
return request;
}
+NamespaceString AggregationRequest::parseNs(const std::string& dbname, const BSONObj& cmdObj) {
+ auto firstElement = cmdObj.firstElement();
+
+ if (firstElement.isNumber()) {
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << "Invalid command format: the '"
+ << firstElement.fieldNameStringData()
+ << "' field must specify a collection name or 1",
+ firstElement.number() == 1);
+ return NamespaceString::makeCollectionlessAggregateNSS(dbname);
+ } else {
+ uassert(ErrorCodes::TypeMismatch,
+ str::stream() << "collection name has invalid type: "
+ << typeName(firstElement.type()),
+ firstElement.type() == BSONType::String);
+
+ const NamespaceString nss(dbname, firstElement.valueStringData());
+
+ uassert(ErrorCodes::InvalidNamespace,
+ str::stream() << "Invalid namespace specified '" << nss.ns() << "'",
+ nss.isValid() && !nss.isCollectionlessAggregateNS());
+
+ return nss;
+ }
+}
+
Document AggregationRequest::serializeToCommandObj() const {
MutableDocument serialized;
return Document{
- {kCommandName, _nss.coll()},
+ {kCommandName, (_nss.isCollectionlessAggregateNS() ? Value(1) : Value(_nss.coll()))},
{kPipelineName, _pipeline},
// Only serialize booleans if different than their default.
{kAllowDiskUseName, _allowDiskUse ? Value(true) : Value()},
diff --git a/src/mongo/db/pipeline/aggregation_request.h b/src/mongo/db/pipeline/aggregation_request.h
index 4feaea931a8..11e06ec442c 100644
--- a/src/mongo/db/pipeline/aggregation_request.h
+++ b/src/mongo/db/pipeline/aggregation_request.h
@@ -81,6 +81,22 @@ public:
boost::optional<ExplainOptions::Verbosity> explainVerbosity = boost::none);
/**
+ * Convenience overload which constructs the request's NamespaceString from the given database
+ * name and command object.
+ */
+ static StatusWith<AggregationRequest> parseFromBSON(
+ const std::string& dbName,
+ const BSONObj& cmdObj,
+ boost::optional<ExplainOptions::Verbosity> explainVerbosity = boost::none);
+
+ /*
+ * The first field in 'cmdObj' must be a string representing a valid collection name, or the
+ * number 1. In the latter case, returns a reserved namespace that does not represent a user
+ * collection. See 'NamespaceString::makeCollectionlessAggregateNSS()'.
+ */
+ static NamespaceString parseNs(const std::string& dbname, const BSONObj& cmdObj);
+
+ /**
* Constructs an AggregationRequest over the given namespace with the given pipeline. All
* options aside from the pipeline assume their default values.
*/
diff --git a/src/mongo/db/pipeline/aggregation_request_test.cpp b/src/mongo/db/pipeline/aggregation_request_test.cpp
index cb68a8af79d..0b1330009da 100644
--- a/src/mongo/db/pipeline/aggregation_request_test.cpp
+++ b/src/mongo/db/pipeline/aggregation_request_test.cpp
@@ -203,6 +203,20 @@ TEST(AggregationRequestTest, ShouldSerializeBatchSizeIfSetAndExplainFalse) {
ASSERT_DOCUMENT_EQ(request.serializeToCommandObj(), expectedSerialization);
}
+TEST(AggregationRequestTest, ShouldSerialiseAggregateFieldToOneIfCollectionIsAggregateOneNSS) {
+ NamespaceString nss = NamespaceString::makeCollectionlessAggregateNSS("a");
+ AggregationRequest request(nss, {});
+
+ auto expectedSerialization =
+ Document{{AggregationRequest::kCommandName, 1},
+ {AggregationRequest::kPipelineName, Value(std::vector<Value>{})},
+ {AggregationRequest::kCursorName,
+ Value(Document({{AggregationRequest::kBatchSizeName,
+ AggregationRequest::kDefaultBatchSize}}))}};
+
+ ASSERT_DOCUMENT_EQ(request.serializeToCommandObj(), expectedSerialization);
+}
+
TEST(AggregationRequestTest, ShouldSetBatchSizeToDefaultOnEmptyCursorObject) {
NamespaceString nss("a.collection");
const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}}");
@@ -364,6 +378,52 @@ TEST(AggregationRequestTest, ShouldRejectExplainExecStatsVerbosityWithWriteConce
.getStatus());
}
+TEST(AggregationRequestTest, ParseNSShouldReturnAggregateOneNSIfAggregateFieldIsOne) {
+ const std::vector<std::string> ones{
+ "1", "1.0", "NumberInt(1)", "NumberLong(1)", "NumberDecimal('1')"};
+
+ for (auto& one : ones) {
+ const BSONObj inputBSON =
+ fromjson(str::stream() << "{aggregate: " << one << ", pipeline: []}");
+ ASSERT(AggregationRequest::parseNs("a", inputBSON).isCollectionlessAggregateNS());
+ }
+}
+
+TEST(AggregationRequestTest, ParseNSShouldRejectNumericNSIfAggregateFieldIsNotOne) {
+ const BSONObj inputBSON = fromjson("{aggregate: 2, pipeline: []}");
+ ASSERT_THROWS_CODE(
+ AggregationRequest::parseNs("a", inputBSON), UserException, ErrorCodes::FailedToParse);
+}
+
+TEST(AggregationRequestTest, ParseNSShouldRejectNonStringNonNumericNS) {
+ const BSONObj inputBSON = fromjson("{aggregate: {}, pipeline: []}");
+ ASSERT_THROWS_CODE(
+ AggregationRequest::parseNs("a", inputBSON), UserException, ErrorCodes::TypeMismatch);
+}
+
+TEST(AggregationRequestTest, ParseNSShouldRejectAggregateOneStringAsCollectionName) {
+ const BSONObj inputBSON = fromjson("{aggregate: '$cmd.aggregate', pipeline: []}");
+ ASSERT_THROWS_CODE(
+ AggregationRequest::parseNs("a", inputBSON), UserException, ErrorCodes::InvalidNamespace);
+}
+
+TEST(AggregationRequestTest, ParseNSShouldRejectInvalidCollectionName) {
+ const BSONObj inputBSON = fromjson("{aggregate: '', pipeline: []}");
+ ASSERT_THROWS_CODE(
+ AggregationRequest::parseNs("a", inputBSON), UserException, ErrorCodes::InvalidNamespace);
+}
+
+TEST(AggregationRequestTest, ParseFromBSONOverloadsShouldProduceIdenticalRequests) {
+ const BSONObj inputBSON =
+ fromjson("{aggregate: 'collection', pipeline: [{$match: {}}, {$project: {}}], cursor: {}}");
+ NamespaceString nss("a.collection");
+
+ auto aggReqDBName = unittest::assertGet(AggregationRequest::parseFromBSON("a", inputBSON));
+ auto aggReqNSS = unittest::assertGet(AggregationRequest::parseFromBSON(nss, inputBSON));
+
+ ASSERT_DOCUMENT_EQ(aggReqDBName.serializeToCommandObj(), aggReqNSS.serializeToCommandObj());
+}
+
//
// Ignore fields parsed elsewhere.
//
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index 7b724b555cd..fab65e25173 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -118,6 +118,16 @@ public:
using Parser = stdx::function<std::vector<boost::intrusive_ptr<DocumentSource>>(
BSONElement, const boost::intrusive_ptr<ExpressionContext>&)>;
+ enum class InitialSourceType {
+ // Stage requires input from a preceding DocumentSource.
+ kNotInitialSource,
+ // Stage does not need an input source and should be the first stage in the pipeline.
+ kInitialSource,
+ // Similar to kInitialSource, but does not require an underlying collection to produce
+ // output.
+ kCollectionlessInitialSource
+ };
+
/**
* This is what is returned from the main DocumentSource API: getNext(). It is essentially a
* (ReturnStatus, Document) pair, with the first entry being used to communicate information
@@ -249,10 +259,28 @@ public:
boost::optional<ExplainOptions::Verbosity> explain = boost::none) const;
/**
- * Returns true if doesn't require an input source (most DocumentSources do).
+ * Subclasses should return InitialSourceType::kInitialSource if the stage does not require an
+ * input source, or InitialSourceType::kCollectionlessInitialSource if the stage will produce
+ * the input for the pipeline independent of an underlying collection. The latter are specified
+ * with {aggregate: 1}, e.g. $currentOp.
*/
- virtual bool isValidInitialSource() const {
- return false;
+ virtual InitialSourceType getInitialSourceType() const {
+ return InitialSourceType::kNotInitialSource;
+ }
+
+ /**
+ * Returns true if this stage does not require an input source.
+ */
+ bool isInitialSource() const {
+ return getInitialSourceType() != InitialSourceType::kNotInitialSource;
+ }
+
+ /**
+ * Returns true if this stage will produce the input for the pipeline independent of an
+ * underlying collection. These are specified with {aggregate: 1}, e.g. $currentOp.
+ */
+ bool isCollectionlessInitialSource() const {
+ return getInitialSourceType() == InitialSourceType::kCollectionlessInitialSource;
}
/**
@@ -525,6 +553,9 @@ public:
// Wraps mongod-specific functions to allow linking into mongos.
class MongodInterface {
public:
+ enum class CurrentOpConnectionsMode { kIncludeIdle, kExcludeIdle };
+ enum class CurrentOpUserMode { kIncludeAll, kExcludeOthers };
+
virtual ~MongodInterface(){};
/**
@@ -593,6 +624,20 @@ public:
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx) = 0;
+ /**
+ * Returns a vector of owned BSONObjs, each of which contains details of an in-progress
+ * operation or, optionally, an idle connection. If userMode is kIncludeAllUsers, report
+ * operations for all authenticated users; otherwise, report only the current user's
+ * operations.
+ */
+ virtual std::vector<BSONObj> getCurrentOps(CurrentOpConnectionsMode connMode,
+ CurrentOpUserMode userMode) const = 0;
+
+ /**
+ * Returns the name of the local shard if sharding is enabled, or an empty string.
+ */
+ virtual std::string getShardName(OperationContext* opCtx) const = 0;
+
// Add new methods as needed.
};
diff --git a/src/mongo/db/pipeline/document_source_coll_stats.cpp b/src/mongo/db/pipeline/document_source_coll_stats.cpp
index 56ebaca69e3..bdf2ee35b5c 100644
--- a/src/mongo/db/pipeline/document_source_coll_stats.cpp
+++ b/src/mongo/db/pipeline/document_source_coll_stats.cpp
@@ -122,8 +122,8 @@ DocumentSource::GetNextResult DocumentSourceCollStats::getNext() {
return {Document(builder.obj())};
}
-bool DocumentSourceCollStats::isValidInitialSource() const {
- return true;
+DocumentSource::InitialSourceType DocumentSourceCollStats::getInitialSourceType() const {
+ return InitialSourceType::kInitialSource;
}
Value DocumentSourceCollStats::serialize(boost::optional<ExplainOptions::Verbosity> explain) const {
diff --git a/src/mongo/db/pipeline/document_source_coll_stats.h b/src/mongo/db/pipeline/document_source_coll_stats.h
index 9e5aa2a4f3f..ad8673643e6 100644
--- a/src/mongo/db/pipeline/document_source_coll_stats.h
+++ b/src/mongo/db/pipeline/document_source_coll_stats.h
@@ -61,7 +61,7 @@ public:
const char* getSourceName() const final;
- bool isValidInitialSource() const final;
+ InitialSourceType getInitialSourceType() const final;
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
diff --git a/src/mongo/db/pipeline/document_source_current_op.cpp b/src/mongo/db/pipeline/document_source_current_op.cpp
new file mode 100644
index 00000000000..835650961c2
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_current_op.cpp
@@ -0,0 +1,178 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/document_source_current_op.h"
+
+#include "mongo/db/pipeline/lite_parsed_document_source.h"
+#include "mongo/db/server_options.h"
+#include "mongo/util/net/sock.h"
+
+namespace mongo {
+
+namespace {
+const StringData kAllUsersFieldName = "allUsers"_sd;
+const StringData kIdleConnectionsFieldName = "idleConnections"_sd;
+
+const StringData kOpIdFieldName = "opid"_sd;
+const StringData kClientFieldName = "client"_sd;
+const StringData kMongosClientFieldName = "client_s"_sd;
+} // namespace
+
+using boost::intrusive_ptr;
+
+REGISTER_DOCUMENT_SOURCE(currentOp,
+ LiteParsedDocumentSourceDefault::parse,
+ DocumentSourceCurrentOp::createFromBson);
+
+const char* DocumentSourceCurrentOp::getSourceName() const {
+ return "$currentOp";
+}
+
+DocumentSource::InitialSourceType DocumentSourceCurrentOp::getInitialSourceType() const {
+ return InitialSourceType::kCollectionlessInitialSource;
+}
+
+DocumentSource::GetNextResult DocumentSourceCurrentOp::getNext() {
+ if (_ops.empty()) {
+ _ops = _mongod->getCurrentOps(_includeIdleConnections, _includeOpsFromAllUsers);
+
+ _opsIter = _ops.begin();
+
+ if (pExpCtx->inShard) {
+ _shardName = _mongod->getShardName(pExpCtx->opCtx);
+
+ uassert(40465,
+ "Aggregation request specified 'fromRouter' but unable to retrieve shard name "
+ "for $currentOp pipeline stage.",
+ !_shardName.empty());
+ }
+ }
+
+ if (_opsIter != _ops.end()) {
+ if (!pExpCtx->inShard) {
+ return Document(*_opsIter++);
+ }
+
+ // This $currentOp is running in a sharded context.
+ invariant(!_shardName.empty());
+
+ const BSONObj& op = *_opsIter++;
+ MutableDocument doc;
+
+ // For operations on a shard, we change the opid from the raw numeric form to
+ // 'shardname:opid'. We also change the fieldname 'client' to 'client_s' to indicate
+ // that the IP is that of the mongos which initiated this request.
+ for (auto&& elt : op) {
+ StringData fieldName = elt.fieldNameStringData();
+
+ if (fieldName == kOpIdFieldName) {
+ uassert(ErrorCodes::TypeMismatch,
+ str::stream() << "expected numeric opid for $currentOp response from '"
+ << _shardName
+ << "' but got: "
+ << typeName(elt.type()),
+ elt.isNumber());
+
+ std::string shardOpID = (str::stream() << _shardName << ":" << elt.numberInt());
+ doc.addField(kOpIdFieldName, Value(shardOpID));
+ } else if (fieldName == kClientFieldName) {
+ doc.addField(kMongosClientFieldName, Value(elt.str()));
+ } else {
+ doc.addField(fieldName, Value(elt));
+ }
+ }
+
+ return doc.freeze();
+ }
+
+ return GetNextResult::makeEOF();
+}
+
+intrusive_ptr<DocumentSource> DocumentSourceCurrentOp::createFromBson(
+ BSONElement spec, const intrusive_ptr<ExpressionContext>& pExpCtx) {
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << "$currentOp options must be specified in an object, but found: "
+ << typeName(spec.type()),
+ spec.type() == BSONType::Object);
+
+ const NamespaceString& nss = pExpCtx->ns;
+
+ uassert(ErrorCodes::InvalidNamespace,
+ "$currentOp must be run against the 'admin' database with {aggregate: 1}",
+ nss.db() == NamespaceString::kAdminDb && nss.isCollectionlessAggregateNS());
+
+ ConnMode includeIdleConnections = ConnMode::kExcludeIdle;
+ UserMode includeOpsFromAllUsers = UserMode::kExcludeOthers;
+
+ for (auto&& elem : spec.embeddedObject()) {
+ const auto fieldName = elem.fieldNameStringData();
+
+ if (fieldName == kIdleConnectionsFieldName) {
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << "The 'idleConnections' parameter of the $currentOp stage must "
+ "be a boolean value, but found: "
+ << typeName(elem.type()),
+ elem.type() == BSONType::Bool);
+ includeIdleConnections =
+ (elem.Bool() ? ConnMode::kIncludeIdle : ConnMode::kExcludeIdle);
+ } else if (fieldName == kAllUsersFieldName) {
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << "The 'allUsers' parameter of the $currentOp stage must be a "
+ "boolean value, but found: "
+ << typeName(elem.type()),
+ elem.type() == BSONType::Bool);
+ includeOpsFromAllUsers =
+ (elem.Bool() ? UserMode::kIncludeAll : UserMode::kExcludeOthers);
+ } else {
+ uasserted(ErrorCodes::FailedToParse,
+ str::stream() << "Unrecognized option '" << fieldName
+ << "' in $currentOp stage.");
+ }
+ }
+
+ return intrusive_ptr<DocumentSourceCurrentOp>(
+ new DocumentSourceCurrentOp(pExpCtx, includeIdleConnections, includeOpsFromAllUsers));
+}
+
+intrusive_ptr<DocumentSourceCurrentOp> DocumentSourceCurrentOp::create(
+ const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
+ ConnMode includeIdleConnections,
+ UserMode includeOpsFromAllUsers) {
+ return intrusive_ptr<DocumentSourceCurrentOp>(
+ new DocumentSourceCurrentOp(pExpCtx, includeIdleConnections, includeOpsFromAllUsers));
+}
+
+Value DocumentSourceCurrentOp::serialize(boost::optional<ExplainOptions::Verbosity> explain) const {
+ return Value(Document{
+ {getSourceName(),
+ Document{{kIdleConnectionsFieldName, (_includeIdleConnections == ConnMode::kIncludeIdle)},
+ {kAllUsersFieldName, (_includeOpsFromAllUsers == UserMode::kIncludeAll)}}}});
+}
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_current_op.h b/src/mongo/db/pipeline/document_source_current_op.h
new file mode 100644
index 00000000000..b770cff75e4
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_current_op.h
@@ -0,0 +1,73 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/document_source.h"
+
+namespace mongo {
+
+class DocumentSourceCurrentOp final : public DocumentSourceNeedsMongod {
+public:
+ using ConnMode = MongodInterface::CurrentOpConnectionsMode;
+ using UserMode = MongodInterface::CurrentOpUserMode;
+
+ static boost::intrusive_ptr<DocumentSourceCurrentOp> create(
+ const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
+ ConnMode includeIdleConnections = ConnMode::kExcludeIdle,
+ UserMode includeOpsFromAllUsers = UserMode::kExcludeOthers);
+
+ GetNextResult getNext() final;
+
+ const char* getSourceName() const final;
+
+ InitialSourceType getInitialSourceType() const final;
+
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+ Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
+
+private:
+ DocumentSourceCurrentOp(const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
+ ConnMode includeIdleConnections = ConnMode::kExcludeIdle,
+ UserMode includeOpsFromAllUsers = UserMode::kExcludeOthers)
+ : DocumentSourceNeedsMongod(pExpCtx),
+ _includeIdleConnections(includeIdleConnections),
+ _includeOpsFromAllUsers(includeOpsFromAllUsers) {}
+
+ ConnMode _includeIdleConnections = ConnMode::kExcludeIdle;
+ UserMode _includeOpsFromAllUsers = UserMode::kExcludeOthers;
+
+ std::string _shardName;
+
+ std::vector<BSONObj> _ops;
+ std::vector<BSONObj>::iterator _opsIter;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_current_op_test.cpp b/src/mongo/db/pipeline/document_source_current_op_test.cpp
new file mode 100644
index 00000000000..e6fa1c8ba95
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_current_op_test.cpp
@@ -0,0 +1,235 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/aggregation_context_fixture.h"
+#include "mongo/db/pipeline/document.h"
+#include "mongo/db/pipeline/document_source_current_op.h"
+#include "mongo/db/pipeline/document_value_test_util.h"
+#include "mongo/db/pipeline/stub_mongod_interface.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/assert_util.h"
+#include "mongo/util/mongoutils/str.h"
+
+namespace mongo {
+
+namespace {
+
+const std::string kMockShardName = "testshard";
+
+/**
+ * Subclass AggregationContextFixture to set the ExpressionContext's namespace to 'admin' with
+ * {aggregate: 1} by default, so that parsing tests other than those which validate the namespace do
+ * not need to explicitly set it.
+ */
+class DocumentSourceCurrentOpTest : public AggregationContextFixture {
+public:
+ DocumentSourceCurrentOpTest()
+ : AggregationContextFixture(NamespaceString::makeCollectionlessAggregateNSS("admin")) {}
+};
+
+/**
+ * A MongodInterface used for testing which returns artificial currentOp entries.
+ */
+class MockMongodImplementation final : public StubMongodInterface {
+public:
+ MockMongodImplementation(std::vector<BSONObj> ops, bool hasShardName = true)
+ : _ops(std::move(ops)), _hasShardName(hasShardName) {}
+
+ MockMongodImplementation(bool hasShardName = true) : _hasShardName(hasShardName) {}
+
+ std::vector<BSONObj> getCurrentOps(CurrentOpConnectionsMode connMode,
+ CurrentOpUserMode userMode) const {
+ return _ops;
+ }
+
+ std::string getShardName(OperationContext* opCtx) const {
+ if (_hasShardName) {
+ return kMockShardName;
+ }
+
+ return std::string();
+ }
+
+private:
+ std::vector<BSONObj> _ops;
+ bool _hasShardName;
+};
+
+TEST_F(DocumentSourceCurrentOpTest, ShouldFailToParseIfSpecIsNotObject) {
+ const auto specObj = fromjson("{$currentOp:1}");
+ ASSERT_THROWS_CODE(DocumentSourceCurrentOp::createFromBson(specObj.firstElement(), getExpCtx()),
+ UserException,
+ ErrorCodes::FailedToParse);
+}
+
+TEST_F(DocumentSourceCurrentOpTest, ShouldFailToParseIfNotRunOnAdmin) {
+ const auto specObj = fromjson("{$currentOp:{}}");
+ getExpCtx()->ns = NamespaceString::makeCollectionlessAggregateNSS("foo");
+ ASSERT_THROWS_CODE(DocumentSourceCurrentOp::createFromBson(specObj.firstElement(), getExpCtx()),
+ UserException,
+ ErrorCodes::InvalidNamespace);
+}
+
+TEST_F(DocumentSourceCurrentOpTest, ShouldFailToParseIfNotRunWithAggregateOne) {
+ const auto specObj = fromjson("{$currentOp:{}}");
+ getExpCtx()->ns = NamespaceString("admin.foo");
+ ASSERT_THROWS_CODE(DocumentSourceCurrentOp::createFromBson(specObj.firstElement(), getExpCtx()),
+ UserException,
+ ErrorCodes::InvalidNamespace);
+}
+
+TEST_F(DocumentSourceCurrentOpTest, ShouldFailToParseIdleConnectionsIfNotBoolean) {
+ const auto specObj = fromjson("{$currentOp:{idleConnections:1}}");
+ ASSERT_THROWS_CODE(DocumentSourceCurrentOp::createFromBson(specObj.firstElement(), getExpCtx()),
+ UserException,
+ ErrorCodes::FailedToParse);
+}
+
+TEST_F(DocumentSourceCurrentOpTest, ShouldFailToParseAllUsersIfNotBoolean) {
+ const auto specObj = fromjson("{$currentOp:{allUsers:1}}");
+ ASSERT_THROWS_CODE(DocumentSourceCurrentOp::createFromBson(specObj.firstElement(), getExpCtx()),
+ UserException,
+ ErrorCodes::FailedToParse);
+}
+
+TEST_F(DocumentSourceCurrentOpTest, ShouldFailToParseIfUnrecognisedParameterSpecified) {
+ const auto specObj = fromjson("{$currentOp:{foo:true}}");
+ ASSERT_THROWS_CODE(DocumentSourceCurrentOp::createFromBson(specObj.firstElement(), getExpCtx()),
+ UserException,
+ ErrorCodes::FailedToParse);
+}
+
+TEST_F(DocumentSourceCurrentOpTest, ShouldParseAndSerializeTrueOptionalArguments) {
+ const auto specObj = fromjson("{$currentOp:{idleConnections:true, allUsers:true}}");
+
+ const auto parsed =
+ DocumentSourceCurrentOp::createFromBson(specObj.firstElement(), getExpCtx());
+
+ const auto currentOp = static_cast<DocumentSourceCurrentOp*>(parsed.get());
+
+ const auto expectedOutput =
+ Document{{"$currentOp", Document{{"idleConnections", true}, {"allUsers", true}}}};
+
+ ASSERT_DOCUMENT_EQ(currentOp->serialize().getDocument(), expectedOutput);
+}
+
+TEST_F(DocumentSourceCurrentOpTest, ShouldParseAndSerializeFalseOptionalArguments) {
+ const auto specObj = fromjson("{$currentOp:{idleConnections:false, allUsers:false}}");
+
+ const auto parsed =
+ DocumentSourceCurrentOp::createFromBson(specObj.firstElement(), getExpCtx());
+
+ const auto currentOp = static_cast<DocumentSourceCurrentOp*>(parsed.get());
+
+ const auto expectedOutput =
+ Document{{"$currentOp", Document{{"idleConnections", false}, {"allUsers", false}}}};
+
+ ASSERT_DOCUMENT_EQ(currentOp->serialize().getDocument(), expectedOutput);
+}
+
+TEST_F(DocumentSourceCurrentOpTest, ShouldSerializeOmittedOptionalArgumentsAsDefaultValues) {
+ const auto specObj = fromjson("{$currentOp:{}}");
+
+ const auto parsed =
+ DocumentSourceCurrentOp::createFromBson(specObj.firstElement(), getExpCtx());
+
+ const auto currentOp = static_cast<DocumentSourceCurrentOp*>(parsed.get());
+
+ const auto expectedOutput =
+ Document{{"$currentOp", Document{{"idleConnections", false}, {"allUsers", false}}}};
+
+ ASSERT_DOCUMENT_EQ(currentOp->serialize().getDocument(), expectedOutput);
+}
+
+TEST_F(DocumentSourceCurrentOpTest, ShouldReturnEOFImmediatelyIfNoCurrentOps) {
+ const auto currentOp = DocumentSourceCurrentOp::create(getExpCtx());
+ const auto mongod = std::make_shared<MockMongodImplementation>();
+ currentOp->injectMongodInterface(mongod);
+
+ ASSERT(currentOp->getNext().isEOF());
+}
+
+TEST_F(DocumentSourceCurrentOpTest, ShouldModifyOpIDAndClientFieldNameInShardedContext) {
+ getExpCtx()->inShard = true;
+
+ std::vector<BSONObj> ops{fromjson("{ client: '192.168.1.10:50844', opid: 430 }")};
+ const auto mongod = std::make_shared<MockMongodImplementation>(ops);
+
+ const auto currentOp = DocumentSourceCurrentOp::create(getExpCtx());
+ currentOp->injectMongodInterface(mongod);
+
+ const auto expectedOutput =
+ Document{{"client_s", std::string("192.168.1.10:50844")},
+ {"opid", std::string(str::stream() << kMockShardName << ":430")}};
+
+ ASSERT_DOCUMENT_EQ(currentOp->getNext().getDocument(), expectedOutput);
+}
+
+TEST_F(DocumentSourceCurrentOpTest,
+ ShouldReturnOpIDAndClientFieldNameUnmodifiedWhenNotInShardedContext) {
+ getExpCtx()->inShard = false;
+
+ std::vector<BSONObj> ops{fromjson("{ client: '192.168.1.10:50844', opid: 430 }")};
+ const auto mongod = std::make_shared<MockMongodImplementation>(ops);
+
+ const auto currentOp = DocumentSourceCurrentOp::create(getExpCtx());
+ currentOp->injectMongodInterface(mongod);
+
+ const auto expectedOutput =
+ Document{{"client", std::string("192.168.1.10:50844")}, {"opid", 430}};
+
+ ASSERT_DOCUMENT_EQ(currentOp->getNext().getDocument(), expectedOutput);
+}
+
+TEST_F(DocumentSourceCurrentOpTest, ShouldFailIfNoShardNameAvailableForShardedRequest) {
+ getExpCtx()->inShard = true;
+
+ const auto mongod = std::make_shared<MockMongodImplementation>(false);
+
+ const auto currentOp = DocumentSourceCurrentOp::create(getExpCtx());
+ currentOp->injectMongodInterface(mongod);
+
+ ASSERT_THROWS_CODE(currentOp->getNext(), UserException, 40465);
+}
+
+TEST_F(DocumentSourceCurrentOpTest, ShouldFailIfOpIDIsNonNumericWhenModifyingInShardedContext) {
+ getExpCtx()->inShard = true;
+
+ std::vector<BSONObj> ops{fromjson("{ client: '192.168.1.10:50844', opid: 'string' }")};
+ const auto mongod = std::make_shared<MockMongodImplementation>(ops);
+
+ const auto currentOp = DocumentSourceCurrentOp::create(getExpCtx());
+ currentOp->injectMongodInterface(mongod);
+
+ ASSERT_THROWS_CODE(currentOp->getNext(), UserException, ErrorCodes::TypeMismatch);
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_cursor.h b/src/mongo/db/pipeline/document_source_cursor.h
index 85e79b3cb2a..b3cdf802d4e 100644
--- a/src/mongo/db/pipeline/document_source_cursor.h
+++ b/src/mongo/db/pipeline/document_source_cursor.h
@@ -50,8 +50,8 @@ public:
return _outputSorts;
}
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
- bool isValidInitialSource() const final {
- return true;
+ InitialSourceType getInitialSourceType() const final {
+ return InitialSourceType::kInitialSource;
}
void detachFromOperationContext() final;
diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp
index 99dcbd85827..c2248189a3f 100644
--- a/src/mongo/db/pipeline/document_source_facet.cpp
+++ b/src/mongo/db/pipeline/document_source_facet.cpp
@@ -278,7 +278,7 @@ intrusive_ptr<DocumentSource> DocumentSourceFacet::createFromBson(
// Disallow any stages that need to be the first stage in the pipeline.
for (auto&& stage : pipeline->getSources()) {
- if (stage->isValidInitialSource()) {
+ if (stage->isInitialSource()) {
uasserted(40173,
str::stream() << stage->getSourceName()
<< " is not allowed to be used within a $facet stage: "
diff --git a/src/mongo/db/pipeline/document_source_facet_test.cpp b/src/mongo/db/pipeline/document_source_facet_test.cpp
index 54c22a306fc..da1f035374f 100644
--- a/src/mongo/db/pipeline/document_source_facet_test.cpp
+++ b/src/mongo/db/pipeline/document_source_facet_test.cpp
@@ -170,8 +170,8 @@ public:
DocumentSourcePassthrough() : DocumentSourceMock({}) {}
// We need this to be false so that it can be used in a $facet stage.
- bool isValidInitialSource() const final {
- return false;
+ InitialSourceType getInitialSourceType() const final {
+ return InitialSourceType::kNotInitialSource;
}
DocumentSource::GetNextResult getNext() final {
diff --git a/src/mongo/db/pipeline/document_source_geo_near.h b/src/mongo/db/pipeline/document_source_geo_near.h
index 03bcf304eee..5178a89c18a 100644
--- a/src/mongo/db/pipeline/document_source_geo_near.h
+++ b/src/mongo/db/pipeline/document_source_geo_near.h
@@ -46,8 +46,8 @@ public:
*/
Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr,
Pipeline::SourceContainer* container) final;
- bool isValidInitialSource() const final {
- return true;
+ InitialSourceType getInitialSourceType() const final {
+ return InitialSourceType::kInitialSource;
}
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
BSONObjSet getOutputSorts() final {
diff --git a/src/mongo/db/pipeline/document_source_index_stats.h b/src/mongo/db/pipeline/document_source_index_stats.h
index 26beb7edb8b..e802e0d7016 100644
--- a/src/mongo/db/pipeline/document_source_index_stats.h
+++ b/src/mongo/db/pipeline/document_source_index_stats.h
@@ -44,8 +44,8 @@ public:
const char* getSourceName() const final;
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
- virtual bool isValidInitialSource() const final {
- return true;
+ virtual InitialSourceType getInitialSourceType() const final {
+ return InitialSourceType::kInitialSource;
}
static boost::intrusive_ptr<DocumentSource> createFromBson(
diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.h b/src/mongo/db/pipeline/document_source_merge_cursors.h
index b32e1170573..ebe0a861ad6 100644
--- a/src/mongo/db/pipeline/document_source_merge_cursors.h
+++ b/src/mongo/db/pipeline/document_source_merge_cursors.h
@@ -52,8 +52,8 @@ public:
GetNextResult getNext() final;
const char* getSourceName() const final;
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
- bool isValidInitialSource() const final {
- return true;
+ InitialSourceType getInitialSourceType() const final {
+ return InitialSourceType::kInitialSource;
}
static boost::intrusive_ptr<DocumentSource> createFromBson(
diff --git a/src/mongo/db/pipeline/document_source_mock.h b/src/mongo/db/pipeline/document_source_mock.h
index bdde16b711f..9236e55ad62 100644
--- a/src/mongo/db/pipeline/document_source_mock.h
+++ b/src/mongo/db/pipeline/document_source_mock.h
@@ -48,8 +48,8 @@ public:
const char* getSourceName() const override;
Value serialize(
boost::optional<ExplainOptions::Verbosity> explain = boost::none) const override;
- bool isValidInitialSource() const override {
- return true;
+ InitialSourceType getInitialSourceType() const override {
+ return InitialSourceType::kInitialSource;
}
BSONObjSet getOutputSorts() override {
return sorts;
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index 8c486e9ca4d..01171b86e6d 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -42,6 +42,7 @@
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_geo_near.h"
#include "mongo/db/pipeline/document_source_match.h"
+#include "mongo/db/pipeline/document_source_merge_cursors.h"
#include "mongo/db/pipeline/document_source_out.h"
#include "mongo/db/pipeline/document_source_project.h"
#include "mongo/db/pipeline/document_source_unwind.h"
@@ -79,10 +80,11 @@ StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> Pipeline::parse(
pipeline->_sources.end(), parsedSources.begin(), parsedSources.end());
}
- auto status = pipeline->ensureAllStagesAreInLegalPositions();
+ auto status = pipeline->validate();
if (!status.isOK()) {
return status;
}
+
pipeline->stitch();
return std::move(pipeline);
}
@@ -91,18 +93,47 @@ StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> Pipeline::create(
SourceContainer stages, const intrusive_ptr<ExpressionContext>& expCtx) {
std::unique_ptr<Pipeline, Pipeline::Deleter> pipeline(new Pipeline(stages, expCtx),
Pipeline::Deleter(expCtx->opCtx));
- auto status = pipeline->ensureAllStagesAreInLegalPositions();
+ auto status = pipeline->validate();
if (!status.isOK()) {
return status;
}
+
pipeline->stitch();
return std::move(pipeline);
}
-Status Pipeline::ensureAllStagesAreInLegalPositions() const {
+Status Pipeline::validate() const {
+ // Verify that the specified namespace is valid for the initial stage of this pipeline.
+ const NamespaceString& nss = pCtx->ns;
+
+ if (_sources.empty()) {
+ if (nss.isCollectionlessAggregateNS()) {
+ return {ErrorCodes::InvalidNamespace,
+ "{aggregate: 1} is not valid for an empty pipeline."};
+ }
+ } else if (!dynamic_cast<DocumentSourceMergeCursors*>(_sources.front().get())) {
+ // The $mergeCursors stage can take {aggregate: 1} or a normal namespace. Aside from this,
+ // {aggregate: 1} is only valid for collectionless sources, and vice-versa.
+ const auto firstStage = _sources.front().get();
+
+ if (nss.isCollectionlessAggregateNS() && !firstStage->isCollectionlessInitialSource()) {
+ return {ErrorCodes::InvalidNamespace,
+ str::stream() << "{aggregate: 1} is not valid for '"
+ << firstStage->getSourceName()
+ << "'; a collection is required."};
+ }
+
+ if (!nss.isCollectionlessAggregateNS() && firstStage->isCollectionlessInitialSource()) {
+ return {ErrorCodes::InvalidNamespace,
+ str::stream() << "'" << firstStage->getSourceName()
+ << "' can only be run with {aggregate: 1}"};
+ }
+ }
+
+ // Verify that all stages of the pipeline are in legal positions.
size_t i = 0;
for (auto&& stage : _sources) {
- if (stage->isValidInitialSource() && i != 0) {
+ if (stage->isInitialSource() && i != 0) {
return {ErrorCodes::BadValue,
str::stream() << stage->getSourceName()
<< " is only valid as the first stage in a pipeline."};
diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h
index b32b5df76d5..8a7f4211490 100644
--- a/src/mongo/db/pipeline/pipeline.h
+++ b/src/mongo/db/pipeline/pipeline.h
@@ -253,10 +253,11 @@ private:
void unstitch();
/**
- * Returns a non-OK status if any stage is in an invalid position. For example, if an $out stage
- * is present but is not the last stage in the pipeline.
+ * Returns a non-OK status if the pipeline fails any of a set of semantic checks. For example,
+ * if an $out stage is present then it must come last in the pipeline, while initial stages such
+ * as $indexStats must be at the start.
*/
- Status ensureAllStagesAreInLegalPositions() const;
+ Status validate() const;
SourceContainer _sources;
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index b26baeb7bb4..06777b9d162 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -34,6 +34,7 @@
#include "mongo/bson/simple_bsonobj_comparator.h"
#include "mongo/client/dbclientinterface.h"
+#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog/database.h"
#include "mongo/db/catalog/document_validation.h"
@@ -66,10 +67,12 @@
#include "mongo/db/s/sharded_connection_info.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/service_context.h"
+#include "mongo/db/stats/fill_locker_info.h"
#include "mongo/db/stats/storage_stats.h"
#include "mongo/db/stats/top.h"
#include "mongo/db/storage/record_store.h"
#include "mongo/db/storage/sorted_data_interface.h"
+#include "mongo/rpc/metadata/client_metadata_ismaster.h"
#include "mongo/s/chunk_version.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/log.h"
@@ -212,6 +215,78 @@ public:
return pipeline;
}
+ std::vector<BSONObj> getCurrentOps(CurrentOpConnectionsMode connMode,
+ CurrentOpUserMode userMode) const {
+ AuthorizationSession* ctxAuth = AuthorizationSession::get(_ctx->opCtx->getClient());
+
+ std::vector<BSONObj> ops;
+
+ for (ServiceContext::LockedClientsCursor cursor(
+ _ctx->opCtx->getClient()->getServiceContext());
+ Client* client = cursor.next();) {
+ invariant(client);
+
+ stdx::lock_guard<Client> lk(*client);
+
+ // If auth is disabled, ignore the allUsers parameter.
+ if (ctxAuth->getAuthorizationManager().isAuthEnabled() &&
+ userMode == CurrentOpUserMode::kExcludeOthers &&
+ !ctxAuth->isCoauthorizedWithClient(client)) {
+ continue;
+ }
+
+ const OperationContext* clientOpCtx = client->getOperationContext();
+
+ if (!clientOpCtx && connMode == CurrentOpConnectionsMode::kExcludeIdle) {
+ continue;
+ }
+
+ BSONObjBuilder infoBuilder;
+
+ client->reportState(infoBuilder);
+
+ const auto& clientMetadata =
+ ClientMetadataIsMasterState::get(client).getClientMetadata();
+
+ if (clientMetadata) {
+ auto appName = clientMetadata.get().getApplicationName();
+ if (!appName.empty()) {
+ infoBuilder.append("appName", appName);
+ }
+
+ auto clientMetadataDocument = clientMetadata.get().getDocument();
+ infoBuilder.append("clientMetadata", clientMetadataDocument);
+ }
+
+ // Fill out the rest of the BSONObj with opCtx specific details.
+ infoBuilder.appendBool("active", static_cast<bool>(clientOpCtx));
+ if (clientOpCtx) {
+ infoBuilder.append("opid", clientOpCtx->getOpID());
+ if (clientOpCtx->isKillPending()) {
+ infoBuilder.append("killPending", true);
+ }
+
+ CurOp::get(clientOpCtx)->reportState(&infoBuilder);
+
+ Locker::LockerInfo lockerInfo;
+ clientOpCtx->lockState()->getLockerInfo(&lockerInfo);
+ fillLockerInfo(lockerInfo, infoBuilder);
+ }
+
+ ops.emplace_back(infoBuilder.obj());
+ }
+
+ return ops;
+ }
+
+ std::string getShardName(OperationContext* opCtx) const {
+ if (ShardingState::get(opCtx)->enabled()) {
+ return ShardingState::get(opCtx)->getShardName();
+ }
+
+ return std::string();
+ }
+
private:
intrusive_ptr<ExpressionContext> _ctx;
DBDirectClient _client;
@@ -355,7 +430,7 @@ void PipelineD::prepareCursorSource(Collection* collection,
}
if (!sources.empty()) {
- if (sources.front()->isValidInitialSource()) {
+ if (sources.front()->isInitialSource()) {
return; // don't need a cursor
}
diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp
index a0f50825b49..3a1aae261be 100644
--- a/src/mongo/db/pipeline/pipeline_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_test.cpp
@@ -1275,6 +1275,61 @@ TEST(PipelineInitialSource, MatchInitialQuery) {
ASSERT_BSONOBJ_EQ(pipe->getInitialQuery(), BSON("a" << 4));
}
+namespace Namespaces {
+
+using PipelineInitialSourceNSTest = AggregationContextFixture;
+
+class DocumentSourceCollectionlessMock : public DocumentSourceMock {
+public:
+ DocumentSourceCollectionlessMock() : DocumentSourceMock({}) {}
+
+ InitialSourceType getInitialSourceType() const final {
+ return InitialSourceType::kCollectionlessInitialSource;
+ }
+
+ static boost::intrusive_ptr<DocumentSourceCollectionlessMock> create() {
+ return new DocumentSourceCollectionlessMock();
+ }
+};
+
+TEST_F(PipelineInitialSourceNSTest, AggregateOneNSNotValidForEmptyPipeline) {
+ const std::vector<BSONObj> rawPipeline = {};
+ auto ctx = getExpCtx();
+
+ ctx->ns = NamespaceString::makeCollectionlessAggregateNSS("a");
+
+ ASSERT_NOT_OK(Pipeline::parse(rawPipeline, ctx).getStatus());
+}
+
+TEST_F(PipelineInitialSourceNSTest, AggregateOneNSNotValidIfInitialStageRequiresCollection) {
+ const std::vector<BSONObj> rawPipeline = {fromjson("{$match: {}}")};
+ auto ctx = getExpCtx();
+
+ ctx->ns = NamespaceString::makeCollectionlessAggregateNSS("a");
+
+ ASSERT_NOT_OK(Pipeline::parse(rawPipeline, ctx).getStatus());
+}
+
+TEST_F(PipelineInitialSourceNSTest, AggregateOneNSValidIfInitialStageIsCollectionless) {
+ auto collectionlessSource = DocumentSourceCollectionlessMock::create();
+ auto ctx = getExpCtx();
+
+ ctx->ns = NamespaceString::makeCollectionlessAggregateNSS("a");
+
+ ASSERT_OK(Pipeline::create({collectionlessSource}, ctx).getStatus());
+}
+
+TEST_F(PipelineInitialSourceNSTest, CollectionNSNotValidIfInitialStageIsCollectionless) {
+ auto collectionlessSource = DocumentSourceCollectionlessMock::create();
+ auto ctx = getExpCtx();
+
+ ctx->ns = NamespaceString("a.collection");
+
+ ASSERT_NOT_OK(Pipeline::create({collectionlessSource}, ctx).getStatus());
+}
+
+} // namespace Namespaces
+
namespace Dependencies {
using PipelineDependenciesTest = AggregationContextFixture;
@@ -1300,8 +1355,8 @@ class DocumentSourceDependencyDummy : public DocumentSourceMock {
public:
DocumentSourceDependencyDummy() : DocumentSourceMock({}) {}
- bool isValidInitialSource() const final {
- return false;
+ InitialSourceType getInitialSourceType() const final {
+ return InitialSourceType::kNotInitialSource;
}
};
diff --git a/src/mongo/db/pipeline/stub_mongod_interface.h b/src/mongo/db/pipeline/stub_mongod_interface.h
index e21ccc4c822..e75e98e91f8 100644
--- a/src/mongo/db/pipeline/stub_mongod_interface.h
+++ b/src/mongo/db/pipeline/stub_mongod_interface.h
@@ -92,5 +92,14 @@ public:
const boost::intrusive_ptr<ExpressionContext>& expCtx) override {
MONGO_UNREACHABLE;
}
+
+ std::vector<BSONObj> getCurrentOps(CurrentOpConnectionsMode connMode,
+ CurrentOpUserMode userMode) const override {
+ MONGO_UNREACHABLE;
+ }
+
+ std::string getShardName(OperationContext* opCtx) const override {
+ MONGO_UNREACHABLE;
+ }
};
} // namespace mongo
diff --git a/src/mongo/db/run_commands.cpp b/src/mongo/db/run_commands.cpp
index 704549bf868..fca1cfb1cde 100644
--- a/src/mongo/db/run_commands.cpp
+++ b/src/mongo/db/run_commands.cpp
@@ -365,7 +365,7 @@ bool runCommandImpl(OperationContext* opCtx,
const std::string db = request.getDatabase().toString();
BSONObjBuilder inPlaceReplyBob = replyBuilder->getInPlaceReplyBuilder(bytesToReserve);
- auto readConcernArgsStatus = _extractReadConcern(cmd, command->supportsReadConcern());
+ auto readConcernArgsStatus = _extractReadConcern(cmd, command->supportsReadConcern(db, cmd));
if (!readConcernArgsStatus.isOK()) {
auto result =
@@ -433,8 +433,9 @@ bool runCommandImpl(OperationContext* opCtx,
// When a linearizable read command is passed in, check to make sure we're reading
// from the primary.
- if (command->supportsReadConcern() && (readConcernArgsStatus.getValue().getLevel() ==
- repl::ReadConcernLevel::kLinearizableReadConcern) &&
+ if (command->supportsReadConcern(db, cmd) &&
+ (readConcernArgsStatus.getValue().getLevel() ==
+ repl::ReadConcernLevel::kLinearizableReadConcern) &&
(request.getCommandName() != "getMore")) {
auto linearizableReadStatus = waitForLinearizableReadConcern(opCtx);