diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2017-05-25 20:36:12 +0100 |
---|---|---|
committer | Bernard Gorman <bernard.gorman@gmail.com> | 2017-05-26 08:05:22 +0100 |
commit | 58486605c09672b3bfce8608dca403a145413bba (patch) | |
tree | 62f44cbd43515b591e6af6347c0a75eb5c6ab5fb /src | |
parent | 4b88bf79dc47cbdfd74a11605cc5ee1c61e379f8 (diff) | |
download | mongo-58486605c09672b3bfce8608dca403a145413bba.tar.gz |
SERVER-19318 Add $currentOp aggregation stage for mongoD
Diffstat (limited to 'src')
38 files changed, 965 insertions, 60 deletions
diff --git a/src/mongo/db/auth/authorization_session.cpp b/src/mongo/db/auth/authorization_session.cpp index eb4de1b2083..cd2eac0d1cd 100644 --- a/src/mongo/db/auth/authorization_session.cpp +++ b/src/mongo/db/auth/authorization_session.cpp @@ -256,10 +256,15 @@ void AuthorizationSession::_addPrivilegesForStage(const std::string& db, Status AuthorizationSession::checkAuthForAggregate(const NamespaceString& ns, const BSONObj& cmdObj) { std::string db(ns.db().toString()); - auto inputResource = ResourcePattern::forExactNamespace(ns); uassert( 17138, mongoutils::str::stream() << "Invalid input namespace, " << ns.ns(), ns.isValid()); + // If this connection does not need to be authenticated (for instance, if auth is disabled), + // return Status::OK() immediately. + if (_externalState->shouldIgnoreAuthChecks()) { + return Status::OK(); + } + PrivilegeVector privileges; BSONElement pipelineElem = cmdObj["pipeline"]; @@ -270,8 +275,8 @@ Status AuthorizationSession::checkAuthForAggregate(const NamespaceString& ns, BSONObj pipeline = pipelineElem.embeddedObject(); if (pipeline.isEmpty()) { // The pipeline is empty, so we require only the find action. - Privilege::addPrivilegeToPrivilegeVector(&privileges, - Privilege(inputResource, ActionType::find)); + Privilege::addPrivilegeToPrivilegeVector( + &privileges, Privilege(ResourcePattern::forExactNamespace(ns), ActionType::find)); } else { if (pipeline.firstElementType() != BSONType::Object) { // The pipeline contains something that's not an object. @@ -284,15 +289,48 @@ Status AuthorizationSession::checkAuthForAggregate(const NamespaceString& ns, BSONObj firstPipelineStage = pipeline.firstElement().embeddedObject(); if (str::equals("$indexStats", firstPipelineStage.firstElementFieldName())) { Privilege::addPrivilegeToPrivilegeVector( - &privileges, Privilege(inputResource, ActionType::indexStats)); + &privileges, + Privilege(ResourcePattern::forExactNamespace(ns), ActionType::indexStats)); } else if (str::equals("$collStats", firstPipelineStage.firstElementFieldName())) { Privilege::addPrivilegeToPrivilegeVector( - &privileges, Privilege(inputResource, ActionType::collStats)); + &privileges, + Privilege(ResourcePattern::forExactNamespace(ns), ActionType::collStats)); + } else if (str::equals("$currentOp", firstPipelineStage.firstElementFieldName())) { + // Need to check the value of allUsers; if true then inprog privilege is required. + // {$currentOp: {idleConnections: <boolean|false>, allUsers: <boolean|false>}} + BSONElement spec = firstPipelineStage["$currentOp"]; + + if (spec.type() != BSONType::Object) { + return Status( + ErrorCodes::TypeMismatch, + str::stream() + << "$currentOp options must be specified in an object, but found: " + << typeName(spec.type())); + } + + auto allUsersElt = spec["allUsers"]; + + if (allUsersElt && allUsersElt.type() != BSONType::Bool) { + return Status(ErrorCodes::TypeMismatch, + str::stream() << "The 'allUsers' parameter of the $currentOp stage " + "must be a boolean value, but found: " + << typeName(allUsersElt.type())); + } + + if (allUsersElt && allUsersElt.boolean()) { + Privilege::addPrivilegeToPrivilegeVector( + &privileges, + Privilege(ResourcePattern::forClusterResource(), ActionType::inprog)); + } else if (!getAuthenticatedUserNames().more()) { + // This connection is not authenticated, so we should return an error even though + // there are no privilege requirements when allUsers is false. + return Status(ErrorCodes::Unauthorized, "unauthorized"); + } } else { // If no source requiring an alternative permission scheme is specified then default to // requiring find() privileges on the given namespace. - Privilege::addPrivilegeToPrivilegeVector(&privileges, - Privilege(inputResource, ActionType::find)); + Privilege::addPrivilegeToPrivilegeVector( + &privileges, Privilege(ResourcePattern::forExactNamespace(ns), ActionType::find)); } // Add additional required privileges for each stage in the pipeline. diff --git a/src/mongo/db/auth/authorization_session_test.cpp b/src/mongo/db/auth/authorization_session_test.cpp index 1537f7dce73..806bc3795e8 100644 --- a/src/mongo/db/auth/authorization_session_test.cpp +++ b/src/mongo/db/auth/authorization_session_test.cpp @@ -627,6 +627,37 @@ TEST_F(AuthorizationSessionTest, CanAggregateIndexStatsWithIndexStatsAction) { ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj)); } +TEST_F(AuthorizationSessionTest, CanAggregateCurrentOpAllUsersFalseWithoutInprogAction) { + authzSession->assumePrivilegesForDB(Privilege(testFooCollResource, {ActionType::find})); + + BSONArray pipeline = BSON_ARRAY(BSON("$currentOp" << BSON("allUsers" << false))); + BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline); + ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj)); +} + +TEST_F(AuthorizationSessionTest, CannotAggregateCurrentOpAllUsersFalseIfNotAuthenticated) { + BSONArray pipeline = BSON_ARRAY(BSON("$currentOp" << BSON("allUsers" << false))); + BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline); + ASSERT_EQ(ErrorCodes::Unauthorized, authzSession->checkAuthForAggregate(testFooNss, cmdObj)); +} + +TEST_F(AuthorizationSessionTest, CannotAggregateCurrentOpAllUsersTrueWithoutInprogAction) { + authzSession->assumePrivilegesForDB(Privilege(testFooCollResource, {ActionType::find})); + + BSONArray pipeline = BSON_ARRAY(BSON("$currentOp" << BSON("allUsers" << true))); + BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline); + ASSERT_EQ(ErrorCodes::Unauthorized, authzSession->checkAuthForAggregate(testFooNss, cmdObj)); +} + +TEST_F(AuthorizationSessionTest, CanAggregateCurrentOpAllUsersTrueWithInprogAction) { + authzSession->assumePrivilegesForDB( + Privilege(ResourcePattern::forClusterResource(), {ActionType::inprog})); + + BSONArray pipeline = BSON_ARRAY(BSON("$currentOp" << BSON("allUsers" << true))); + BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline); + ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj)); +} + TEST_F(AuthorizationSessionTest, AddPrivilegesForStageFailsIfOutNamespaceIsNotValid) { BSONArray pipeline = BSON_ARRAY(BSON("$out" << "")); diff --git a/src/mongo/db/commands.h b/src/mongo/db/commands.h index fd0840fdddd..94c2f5d1a06 100644 --- a/src/mongo/db/commands.h +++ b/src/mongo/db/commands.h @@ -205,7 +205,9 @@ public: virtual bool maintenanceOk() const = 0; /** - * Returns true if this Command supports the readConcern argument. + * Returns true if this Command supports the readConcern argument. Takes the command object and + * the name of the database on which it was invoked as arguments, so that readConcern can be + * conditionally rejected based on the command's parameters and/or namespace. * * If the readConcern argument is sent to a command that returns false the command processor * will reject the command, returning an appropriate error message. For commands that support @@ -216,7 +218,7 @@ public: * the option to the shards as needed. We rely on the shards to fail the commands in the * cases where it isn't supported. */ - virtual bool supportsReadConcern() const = 0; + virtual bool supportsReadConcern(const std::string& dbName, const BSONObj& cmdObj) const = 0; /** * Returns LogicalOp for this command. @@ -348,7 +350,7 @@ public: return true; /* assumed true prior to commit */ } - bool supportsReadConcern() const override { + bool supportsReadConcern(const std::string& dbName, const BSONObj& cmdObj) const override { return false; } diff --git a/src/mongo/db/commands/count_cmd.cpp b/src/mongo/db/commands/count_cmd.cpp index 0003b2bf806..bb836ace5dd 100644 --- a/src/mongo/db/commands/count_cmd.cpp +++ b/src/mongo/db/commands/count_cmd.cpp @@ -80,7 +80,7 @@ public: return false; } - bool supportsReadConcern() const final { + bool supportsReadConcern(const std::string& dbName, const BSONObj& cmdObj) const final { return true; } diff --git a/src/mongo/db/commands/distinct.cpp b/src/mongo/db/commands/distinct.cpp index c1958e34f55..6643c952395 100644 --- a/src/mongo/db/commands/distinct.cpp +++ b/src/mongo/db/commands/distinct.cpp @@ -88,7 +88,7 @@ public: return false; } - bool supportsReadConcern() const final { + bool supportsReadConcern(const std::string& dbName, const BSONObj& cmdObj) const final { return true; } diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp index 8c7759471ce..4a1fbd18510 100644 --- a/src/mongo/db/commands/find_cmd.cpp +++ b/src/mongo/db/commands/find_cmd.cpp @@ -94,7 +94,7 @@ public: return false; } - bool supportsReadConcern() const final { + bool supportsReadConcern(const std::string& dbName, const BSONObj& cmdObj) const final { return true; } diff --git a/src/mongo/db/commands/geo_near_cmd.cpp b/src/mongo/db/commands/geo_near_cmd.cpp index ce4f54a5790..ac62769c96d 100644 --- a/src/mongo/db/commands/geo_near_cmd.cpp +++ b/src/mongo/db/commands/geo_near_cmd.cpp @@ -74,7 +74,7 @@ public: bool slaveOverrideOk() const { return true; } - bool supportsReadConcern() const final { + bool supportsReadConcern(const std::string& dbName, const BSONObj& cmdObj) const final { return true; } diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp index 59c2848313c..d14f77b249b 100644 --- a/src/mongo/db/commands/getmore_cmd.cpp +++ b/src/mongo/db/commands/getmore_cmd.cpp @@ -97,7 +97,7 @@ public: return false; } - bool supportsReadConcern() const final { + bool supportsReadConcern(const std::string& dbName, const BSONObj& cmdObj) const final { // Uses the readConcern setting from whatever created the cursor. return false; } diff --git a/src/mongo/db/commands/group_cmd.cpp b/src/mongo/db/commands/group_cmd.cpp index 45cd9c999ce..c0cccd56f6a 100644 --- a/src/mongo/db/commands/group_cmd.cpp +++ b/src/mongo/db/commands/group_cmd.cpp @@ -80,7 +80,7 @@ private: return true; } - bool supportsReadConcern() const final { + bool supportsReadConcern(const std::string& dbName, const BSONObj& cmdObj) const final { return true; } diff --git a/src/mongo/db/commands/haystack.cpp b/src/mongo/db/commands/haystack.cpp index b2b7f9977b6..84e9bda79e8 100644 --- a/src/mongo/db/commands/haystack.cpp +++ b/src/mongo/db/commands/haystack.cpp @@ -75,7 +75,7 @@ public: return true; } - bool supportsReadConcern() const final { + bool supportsReadConcern(const std::string& dbName, const BSONObj& cmdObj) const final { return true; } diff --git a/src/mongo/db/commands/parallel_collection_scan.cpp b/src/mongo/db/commands/parallel_collection_scan.cpp index a81c9e3a4c5..de4ce3aa8f2 100644 --- a/src/mongo/db/commands/parallel_collection_scan.cpp +++ b/src/mongo/db/commands/parallel_collection_scan.cpp @@ -66,7 +66,7 @@ public: return true; } - bool supportsReadConcern() const final { + bool supportsReadConcern(const std::string& dbName, const BSONObj& cmdObj) const final { return true; } diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp index 79a2e76270d..2d9a3e9006d 100644 --- a/src/mongo/db/commands/pipeline_command.cpp +++ b/src/mongo/db/commands/pipeline_command.cpp @@ -66,8 +66,8 @@ public: return true; } - bool supportsReadConcern() const override { - return true; + bool supportsReadConcern(const std::string& dbName, const BSONObj& cmdObj) const override { + return !AggregationRequest::parseNs(dbName, cmdObj).isCollectionlessAggregateNS(); } ReadWriteType getReadWriteType() const { @@ -77,7 +77,7 @@ public: Status checkAuthForCommand(Client* client, const std::string& dbname, const BSONObj& cmdObj) override { - const NamespaceString nss(parseNsCollectionRequired(dbname, cmdObj)); + const NamespaceString nss(AggregationRequest::parseNs(dbname, cmdObj)); return AuthorizationSession::get(client)->checkAuthForAggregate(nss, cmdObj); } @@ -104,10 +104,8 @@ private: const BSONObj& cmdObj, boost::optional<ExplainOptions::Verbosity> verbosity, BSONObjBuilder* result) { - const NamespaceString nss(parseNsCollectionRequired(dbname, cmdObj)); - const auto aggregationRequest = - uassertStatusOK(AggregationRequest::parseFromBSON(nss, cmdObj, verbosity)); + uassertStatusOK(AggregationRequest::parseFromBSON(dbname, cmdObj, verbosity)); // If the featureCompatibilityVersion is 3.2, we disallow collation from the user. However, // operations should still respect the collection default collation. The mongos attaches the @@ -122,7 +120,8 @@ private: ServerGlobalParams::FeatureCompatibility::Version::k32 || isMergePipeline(aggregationRequest.getPipeline())); - return runAggregate(opCtx, nss, aggregationRequest, cmdObj, *result); + return runAggregate( + opCtx, aggregationRequest.getNamespaceString(), aggregationRequest, cmdObj, *result); } } pipelineCmd; diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp index d2bc6ac1545..0e195ee7870 100644 --- a/src/mongo/db/namespace_string.cpp +++ b/src/mongo/db/namespace_string.cpp @@ -77,6 +77,7 @@ const char kLogicalTimeKeysCollection[] = "admin.system.keys"; constexpr auto listCollectionsCursorCol = "$cmd.listCollections"_sd; constexpr auto listIndexesCursorNSPrefix = "$cmd.listIndexes."_sd; +constexpr auto collectionlessAggregateCursorCol = "$cmd.aggregate"_sd; constexpr auto dropPendingNSPrefix = "system.drop."_sd; } // namespace @@ -124,6 +125,10 @@ bool NamespaceString::isListIndexesCursorNS() const { coll().startsWith(listIndexesCursorNSPrefix); } +bool NamespaceString::isCollectionlessAggregateNS() const { + return coll() == collectionlessAggregateCursorCol; +} + NamespaceString NamespaceString::makeListCollectionsNSS(StringData dbName) { NamespaceString nss(dbName, listCollectionsCursorCol); dassert(nss.isValid()); @@ -138,6 +143,13 @@ NamespaceString NamespaceString::makeListIndexesNSS(StringData dbName, StringDat return nss; } +NamespaceString NamespaceString::makeCollectionlessAggregateNSS(StringData dbname) { + NamespaceString nss(dbname, collectionlessAggregateCursorCol); + dassert(nss.isValid()); + dassert(nss.isCollectionlessAggregateNS()); + return nss; +} + NamespaceString NamespaceString::getTargetNSForListIndexes() const { dassert(isListIndexesCursorNS()); return NamespaceString(db(), coll().substr(listIndexesCursorNSPrefix.size())); diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h index deaf6ba2111..f49054c57ef 100644 --- a/src/mongo/db/namespace_string.h +++ b/src/mongo/db/namespace_string.h @@ -96,6 +96,12 @@ public: NamespaceString(StringData dbName, StringData collectionName); /** + * Constructs the namespace '<dbName>.$cmd.aggregate', which we use as the namespace for + * aggregation commands with the format {aggregate: 1}. + */ + static NamespaceString makeCollectionlessAggregateNSS(StringData dbName); + + /** * Constructs a NamespaceString representing a listCollections namespace. The format for this * namespace is "<dbName>.$cmd.listCollections". */ @@ -213,6 +219,7 @@ public: return coll().startsWith("$cmd."_sd); } + bool isCollectionlessAggregateNS() const; bool isListCollectionsCursorNS() const; bool isListIndexesCursorNS() const; diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index b9d2ee31c33..f8b58061d5c 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -128,6 +128,7 @@ env.CppUnitTest( 'document_source_bucket_auto_test.cpp', 'document_source_bucket_test.cpp', 'document_source_count_test.cpp', + 'document_source_current_op_test.cpp', 'document_source_geo_near_test.cpp', 'document_source_group_test.cpp', 'document_source_limit_test.cpp', @@ -227,6 +228,7 @@ docSourceEnv.Library( 'document_source_bucket_auto.cpp', 'document_source_coll_stats.cpp', 'document_source_count.cpp', + 'document_source_current_op.cpp', 'document_source_geo_near.cpp', 'document_source_group.cpp', 'document_source_index_stats.cpp', diff --git a/src/mongo/db/pipeline/aggregation_context_fixture.h b/src/mongo/db/pipeline/aggregation_context_fixture.h index 786f50a9b5a..ebdb5edcd27 100644 --- a/src/mongo/db/pipeline/aggregation_context_fixture.h +++ b/src/mongo/db/pipeline/aggregation_context_fixture.h @@ -46,10 +46,12 @@ namespace mongo { class AggregationContextFixture : public unittest::Test { public: AggregationContextFixture() + : AggregationContextFixture(NamespaceString("unittests.pipeline_test")) {} + + AggregationContextFixture(NamespaceString nss) : _queryServiceContext(stdx::make_unique<QueryTestServiceContext>()), _opCtx(_queryServiceContext->makeOperationContext()), - _expCtx(new ExpressionContextForTest( - _opCtx.get(), AggregationRequest(NamespaceString("unittests.pipeline_test"), {}))) {} + _expCtx(new ExpressionContextForTest(_opCtx.get(), AggregationRequest(nss, {}))) {} boost::intrusive_ptr<ExpressionContextForTest> getExpCtx() { return _expCtx.get(); diff --git a/src/mongo/db/pipeline/aggregation_request.cpp b/src/mongo/db/pipeline/aggregation_request.cpp index d2cdfddd0c5..844907bd89a 100644 --- a/src/mongo/db/pipeline/aggregation_request.cpp +++ b/src/mongo/db/pipeline/aggregation_request.cpp @@ -82,6 +82,13 @@ StatusWith<std::vector<BSONObj>> AggregationRequest::parsePipelineFromBSON( } StatusWith<AggregationRequest> AggregationRequest::parseFromBSON( + const std::string& dbName, + const BSONObj& cmdObj, + boost::optional<ExplainOptions::Verbosity> explainVerbosity) { + return parseFromBSON(parseNs(dbName, cmdObj), cmdObj, explainVerbosity); +} + +StatusWith<AggregationRequest> AggregationRequest::parseFromBSON( NamespaceString nss, const BSONObj& cmdObj, boost::optional<ExplainOptions::Verbosity> explainVerbosity) { @@ -232,10 +239,36 @@ StatusWith<AggregationRequest> AggregationRequest::parseFromBSON( return request; } +NamespaceString AggregationRequest::parseNs(const std::string& dbname, const BSONObj& cmdObj) { + auto firstElement = cmdObj.firstElement(); + + if (firstElement.isNumber()) { + uassert(ErrorCodes::FailedToParse, + str::stream() << "Invalid command format: the '" + << firstElement.fieldNameStringData() + << "' field must specify a collection name or 1", + firstElement.number() == 1); + return NamespaceString::makeCollectionlessAggregateNSS(dbname); + } else { + uassert(ErrorCodes::TypeMismatch, + str::stream() << "collection name has invalid type: " + << typeName(firstElement.type()), + firstElement.type() == BSONType::String); + + const NamespaceString nss(dbname, firstElement.valueStringData()); + + uassert(ErrorCodes::InvalidNamespace, + str::stream() << "Invalid namespace specified '" << nss.ns() << "'", + nss.isValid() && !nss.isCollectionlessAggregateNS()); + + return nss; + } +} + Document AggregationRequest::serializeToCommandObj() const { MutableDocument serialized; return Document{ - {kCommandName, _nss.coll()}, + {kCommandName, (_nss.isCollectionlessAggregateNS() ? Value(1) : Value(_nss.coll()))}, {kPipelineName, _pipeline}, // Only serialize booleans if different than their default. {kAllowDiskUseName, _allowDiskUse ? Value(true) : Value()}, diff --git a/src/mongo/db/pipeline/aggregation_request.h b/src/mongo/db/pipeline/aggregation_request.h index 4feaea931a8..11e06ec442c 100644 --- a/src/mongo/db/pipeline/aggregation_request.h +++ b/src/mongo/db/pipeline/aggregation_request.h @@ -81,6 +81,22 @@ public: boost::optional<ExplainOptions::Verbosity> explainVerbosity = boost::none); /** + * Convenience overload which constructs the request's NamespaceString from the given database + * name and command object. + */ + static StatusWith<AggregationRequest> parseFromBSON( + const std::string& dbName, + const BSONObj& cmdObj, + boost::optional<ExplainOptions::Verbosity> explainVerbosity = boost::none); + + /* + * The first field in 'cmdObj' must be a string representing a valid collection name, or the + * number 1. In the latter case, returns a reserved namespace that does not represent a user + * collection. See 'NamespaceString::makeCollectionlessAggregateNSS()'. + */ + static NamespaceString parseNs(const std::string& dbname, const BSONObj& cmdObj); + + /** * Constructs an AggregationRequest over the given namespace with the given pipeline. All * options aside from the pipeline assume their default values. */ diff --git a/src/mongo/db/pipeline/aggregation_request_test.cpp b/src/mongo/db/pipeline/aggregation_request_test.cpp index cb68a8af79d..0b1330009da 100644 --- a/src/mongo/db/pipeline/aggregation_request_test.cpp +++ b/src/mongo/db/pipeline/aggregation_request_test.cpp @@ -203,6 +203,20 @@ TEST(AggregationRequestTest, ShouldSerializeBatchSizeIfSetAndExplainFalse) { ASSERT_DOCUMENT_EQ(request.serializeToCommandObj(), expectedSerialization); } +TEST(AggregationRequestTest, ShouldSerialiseAggregateFieldToOneIfCollectionIsAggregateOneNSS) { + NamespaceString nss = NamespaceString::makeCollectionlessAggregateNSS("a"); + AggregationRequest request(nss, {}); + + auto expectedSerialization = + Document{{AggregationRequest::kCommandName, 1}, + {AggregationRequest::kPipelineName, Value(std::vector<Value>{})}, + {AggregationRequest::kCursorName, + Value(Document({{AggregationRequest::kBatchSizeName, + AggregationRequest::kDefaultBatchSize}}))}}; + + ASSERT_DOCUMENT_EQ(request.serializeToCommandObj(), expectedSerialization); +} + TEST(AggregationRequestTest, ShouldSetBatchSizeToDefaultOnEmptyCursorObject) { NamespaceString nss("a.collection"); const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}}"); @@ -364,6 +378,52 @@ TEST(AggregationRequestTest, ShouldRejectExplainExecStatsVerbosityWithWriteConce .getStatus()); } +TEST(AggregationRequestTest, ParseNSShouldReturnAggregateOneNSIfAggregateFieldIsOne) { + const std::vector<std::string> ones{ + "1", "1.0", "NumberInt(1)", "NumberLong(1)", "NumberDecimal('1')"}; + + for (auto& one : ones) { + const BSONObj inputBSON = + fromjson(str::stream() << "{aggregate: " << one << ", pipeline: []}"); + ASSERT(AggregationRequest::parseNs("a", inputBSON).isCollectionlessAggregateNS()); + } +} + +TEST(AggregationRequestTest, ParseNSShouldRejectNumericNSIfAggregateFieldIsNotOne) { + const BSONObj inputBSON = fromjson("{aggregate: 2, pipeline: []}"); + ASSERT_THROWS_CODE( + AggregationRequest::parseNs("a", inputBSON), UserException, ErrorCodes::FailedToParse); +} + +TEST(AggregationRequestTest, ParseNSShouldRejectNonStringNonNumericNS) { + const BSONObj inputBSON = fromjson("{aggregate: {}, pipeline: []}"); + ASSERT_THROWS_CODE( + AggregationRequest::parseNs("a", inputBSON), UserException, ErrorCodes::TypeMismatch); +} + +TEST(AggregationRequestTest, ParseNSShouldRejectAggregateOneStringAsCollectionName) { + const BSONObj inputBSON = fromjson("{aggregate: '$cmd.aggregate', pipeline: []}"); + ASSERT_THROWS_CODE( + AggregationRequest::parseNs("a", inputBSON), UserException, ErrorCodes::InvalidNamespace); +} + +TEST(AggregationRequestTest, ParseNSShouldRejectInvalidCollectionName) { + const BSONObj inputBSON = fromjson("{aggregate: '', pipeline: []}"); + ASSERT_THROWS_CODE( + AggregationRequest::parseNs("a", inputBSON), UserException, ErrorCodes::InvalidNamespace); +} + +TEST(AggregationRequestTest, ParseFromBSONOverloadsShouldProduceIdenticalRequests) { + const BSONObj inputBSON = + fromjson("{aggregate: 'collection', pipeline: [{$match: {}}, {$project: {}}], cursor: {}}"); + NamespaceString nss("a.collection"); + + auto aggReqDBName = unittest::assertGet(AggregationRequest::parseFromBSON("a", inputBSON)); + auto aggReqNSS = unittest::assertGet(AggregationRequest::parseFromBSON(nss, inputBSON)); + + ASSERT_DOCUMENT_EQ(aggReqDBName.serializeToCommandObj(), aggReqNSS.serializeToCommandObj()); +} + // // Ignore fields parsed elsewhere. // diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index 7b724b555cd..fab65e25173 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -118,6 +118,16 @@ public: using Parser = stdx::function<std::vector<boost::intrusive_ptr<DocumentSource>>( BSONElement, const boost::intrusive_ptr<ExpressionContext>&)>; + enum class InitialSourceType { + // Stage requires input from a preceding DocumentSource. + kNotInitialSource, + // Stage does not need an input source and should be the first stage in the pipeline. + kInitialSource, + // Similar to kInitialSource, but does not require an underlying collection to produce + // output. + kCollectionlessInitialSource + }; + /** * This is what is returned from the main DocumentSource API: getNext(). It is essentially a * (ReturnStatus, Document) pair, with the first entry being used to communicate information @@ -249,10 +259,28 @@ public: boost::optional<ExplainOptions::Verbosity> explain = boost::none) const; /** - * Returns true if doesn't require an input source (most DocumentSources do). + * Subclasses should return InitialSourceType::kInitialSource if the stage does not require an + * input source, or InitialSourceType::kCollectionlessInitialSource if the stage will produce + * the input for the pipeline independent of an underlying collection. The latter are specified + * with {aggregate: 1}, e.g. $currentOp. */ - virtual bool isValidInitialSource() const { - return false; + virtual InitialSourceType getInitialSourceType() const { + return InitialSourceType::kNotInitialSource; + } + + /** + * Returns true if this stage does not require an input source. + */ + bool isInitialSource() const { + return getInitialSourceType() != InitialSourceType::kNotInitialSource; + } + + /** + * Returns true if this stage will produce the input for the pipeline independent of an + * underlying collection. These are specified with {aggregate: 1}, e.g. $currentOp. + */ + bool isCollectionlessInitialSource() const { + return getInitialSourceType() == InitialSourceType::kCollectionlessInitialSource; } /** @@ -525,6 +553,9 @@ public: // Wraps mongod-specific functions to allow linking into mongos. class MongodInterface { public: + enum class CurrentOpConnectionsMode { kIncludeIdle, kExcludeIdle }; + enum class CurrentOpUserMode { kIncludeAll, kExcludeOthers }; + virtual ~MongodInterface(){}; /** @@ -593,6 +624,20 @@ public: const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx) = 0; + /** + * Returns a vector of owned BSONObjs, each of which contains details of an in-progress + * operation or, optionally, an idle connection. If userMode is kIncludeAllUsers, report + * operations for all authenticated users; otherwise, report only the current user's + * operations. + */ + virtual std::vector<BSONObj> getCurrentOps(CurrentOpConnectionsMode connMode, + CurrentOpUserMode userMode) const = 0; + + /** + * Returns the name of the local shard if sharding is enabled, or an empty string. + */ + virtual std::string getShardName(OperationContext* opCtx) const = 0; + // Add new methods as needed. }; diff --git a/src/mongo/db/pipeline/document_source_coll_stats.cpp b/src/mongo/db/pipeline/document_source_coll_stats.cpp index 56ebaca69e3..bdf2ee35b5c 100644 --- a/src/mongo/db/pipeline/document_source_coll_stats.cpp +++ b/src/mongo/db/pipeline/document_source_coll_stats.cpp @@ -122,8 +122,8 @@ DocumentSource::GetNextResult DocumentSourceCollStats::getNext() { return {Document(builder.obj())}; } -bool DocumentSourceCollStats::isValidInitialSource() const { - return true; +DocumentSource::InitialSourceType DocumentSourceCollStats::getInitialSourceType() const { + return InitialSourceType::kInitialSource; } Value DocumentSourceCollStats::serialize(boost::optional<ExplainOptions::Verbosity> explain) const { diff --git a/src/mongo/db/pipeline/document_source_coll_stats.h b/src/mongo/db/pipeline/document_source_coll_stats.h index 9e5aa2a4f3f..ad8673643e6 100644 --- a/src/mongo/db/pipeline/document_source_coll_stats.h +++ b/src/mongo/db/pipeline/document_source_coll_stats.h @@ -61,7 +61,7 @@ public: const char* getSourceName() const final; - bool isValidInitialSource() const final; + InitialSourceType getInitialSourceType() const final; Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; diff --git a/src/mongo/db/pipeline/document_source_current_op.cpp b/src/mongo/db/pipeline/document_source_current_op.cpp new file mode 100644 index 00000000000..835650961c2 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_current_op.cpp @@ -0,0 +1,178 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/document_source_current_op.h" + +#include "mongo/db/pipeline/lite_parsed_document_source.h" +#include "mongo/db/server_options.h" +#include "mongo/util/net/sock.h" + +namespace mongo { + +namespace { +const StringData kAllUsersFieldName = "allUsers"_sd; +const StringData kIdleConnectionsFieldName = "idleConnections"_sd; + +const StringData kOpIdFieldName = "opid"_sd; +const StringData kClientFieldName = "client"_sd; +const StringData kMongosClientFieldName = "client_s"_sd; +} // namespace + +using boost::intrusive_ptr; + +REGISTER_DOCUMENT_SOURCE(currentOp, + LiteParsedDocumentSourceDefault::parse, + DocumentSourceCurrentOp::createFromBson); + +const char* DocumentSourceCurrentOp::getSourceName() const { + return "$currentOp"; +} + +DocumentSource::InitialSourceType DocumentSourceCurrentOp::getInitialSourceType() const { + return InitialSourceType::kCollectionlessInitialSource; +} + +DocumentSource::GetNextResult DocumentSourceCurrentOp::getNext() { + if (_ops.empty()) { + _ops = _mongod->getCurrentOps(_includeIdleConnections, _includeOpsFromAllUsers); + + _opsIter = _ops.begin(); + + if (pExpCtx->inShard) { + _shardName = _mongod->getShardName(pExpCtx->opCtx); + + uassert(40465, + "Aggregation request specified 'fromRouter' but unable to retrieve shard name " + "for $currentOp pipeline stage.", + !_shardName.empty()); + } + } + + if (_opsIter != _ops.end()) { + if (!pExpCtx->inShard) { + return Document(*_opsIter++); + } + + // This $currentOp is running in a sharded context. + invariant(!_shardName.empty()); + + const BSONObj& op = *_opsIter++; + MutableDocument doc; + + // For operations on a shard, we change the opid from the raw numeric form to + // 'shardname:opid'. We also change the fieldname 'client' to 'client_s' to indicate + // that the IP is that of the mongos which initiated this request. + for (auto&& elt : op) { + StringData fieldName = elt.fieldNameStringData(); + + if (fieldName == kOpIdFieldName) { + uassert(ErrorCodes::TypeMismatch, + str::stream() << "expected numeric opid for $currentOp response from '" + << _shardName + << "' but got: " + << typeName(elt.type()), + elt.isNumber()); + + std::string shardOpID = (str::stream() << _shardName << ":" << elt.numberInt()); + doc.addField(kOpIdFieldName, Value(shardOpID)); + } else if (fieldName == kClientFieldName) { + doc.addField(kMongosClientFieldName, Value(elt.str())); + } else { + doc.addField(fieldName, Value(elt)); + } + } + + return doc.freeze(); + } + + return GetNextResult::makeEOF(); +} + +intrusive_ptr<DocumentSource> DocumentSourceCurrentOp::createFromBson( + BSONElement spec, const intrusive_ptr<ExpressionContext>& pExpCtx) { + uassert(ErrorCodes::FailedToParse, + str::stream() << "$currentOp options must be specified in an object, but found: " + << typeName(spec.type()), + spec.type() == BSONType::Object); + + const NamespaceString& nss = pExpCtx->ns; + + uassert(ErrorCodes::InvalidNamespace, + "$currentOp must be run against the 'admin' database with {aggregate: 1}", + nss.db() == NamespaceString::kAdminDb && nss.isCollectionlessAggregateNS()); + + ConnMode includeIdleConnections = ConnMode::kExcludeIdle; + UserMode includeOpsFromAllUsers = UserMode::kExcludeOthers; + + for (auto&& elem : spec.embeddedObject()) { + const auto fieldName = elem.fieldNameStringData(); + + if (fieldName == kIdleConnectionsFieldName) { + uassert(ErrorCodes::FailedToParse, + str::stream() << "The 'idleConnections' parameter of the $currentOp stage must " + "be a boolean value, but found: " + << typeName(elem.type()), + elem.type() == BSONType::Bool); + includeIdleConnections = + (elem.Bool() ? ConnMode::kIncludeIdle : ConnMode::kExcludeIdle); + } else if (fieldName == kAllUsersFieldName) { + uassert(ErrorCodes::FailedToParse, + str::stream() << "The 'allUsers' parameter of the $currentOp stage must be a " + "boolean value, but found: " + << typeName(elem.type()), + elem.type() == BSONType::Bool); + includeOpsFromAllUsers = + (elem.Bool() ? UserMode::kIncludeAll : UserMode::kExcludeOthers); + } else { + uasserted(ErrorCodes::FailedToParse, + str::stream() << "Unrecognized option '" << fieldName + << "' in $currentOp stage."); + } + } + + return intrusive_ptr<DocumentSourceCurrentOp>( + new DocumentSourceCurrentOp(pExpCtx, includeIdleConnections, includeOpsFromAllUsers)); +} + +intrusive_ptr<DocumentSourceCurrentOp> DocumentSourceCurrentOp::create( + const boost::intrusive_ptr<ExpressionContext>& pExpCtx, + ConnMode includeIdleConnections, + UserMode includeOpsFromAllUsers) { + return intrusive_ptr<DocumentSourceCurrentOp>( + new DocumentSourceCurrentOp(pExpCtx, includeIdleConnections, includeOpsFromAllUsers)); +} + +Value DocumentSourceCurrentOp::serialize(boost::optional<ExplainOptions::Verbosity> explain) const { + return Value(Document{ + {getSourceName(), + Document{{kIdleConnectionsFieldName, (_includeIdleConnections == ConnMode::kIncludeIdle)}, + {kAllUsersFieldName, (_includeOpsFromAllUsers == UserMode::kIncludeAll)}}}}); +} +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_current_op.h b/src/mongo/db/pipeline/document_source_current_op.h new file mode 100644 index 00000000000..b770cff75e4 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_current_op.h @@ -0,0 +1,73 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/document_source.h" + +namespace mongo { + +class DocumentSourceCurrentOp final : public DocumentSourceNeedsMongod { +public: + using ConnMode = MongodInterface::CurrentOpConnectionsMode; + using UserMode = MongodInterface::CurrentOpUserMode; + + static boost::intrusive_ptr<DocumentSourceCurrentOp> create( + const boost::intrusive_ptr<ExpressionContext>& pExpCtx, + ConnMode includeIdleConnections = ConnMode::kExcludeIdle, + UserMode includeOpsFromAllUsers = UserMode::kExcludeOthers); + + GetNextResult getNext() final; + + const char* getSourceName() const final; + + InitialSourceType getInitialSourceType() const final; + + static boost::intrusive_ptr<DocumentSource> createFromBson( + BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + + Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; + +private: + DocumentSourceCurrentOp(const boost::intrusive_ptr<ExpressionContext>& pExpCtx, + ConnMode includeIdleConnections = ConnMode::kExcludeIdle, + UserMode includeOpsFromAllUsers = UserMode::kExcludeOthers) + : DocumentSourceNeedsMongod(pExpCtx), + _includeIdleConnections(includeIdleConnections), + _includeOpsFromAllUsers(includeOpsFromAllUsers) {} + + ConnMode _includeIdleConnections = ConnMode::kExcludeIdle; + UserMode _includeOpsFromAllUsers = UserMode::kExcludeOthers; + + std::string _shardName; + + std::vector<BSONObj> _ops; + std::vector<BSONObj>::iterator _opsIter; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_current_op_test.cpp b/src/mongo/db/pipeline/document_source_current_op_test.cpp new file mode 100644 index 00000000000..e6fa1c8ba95 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_current_op_test.cpp @@ -0,0 +1,235 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/aggregation_context_fixture.h" +#include "mongo/db/pipeline/document.h" +#include "mongo/db/pipeline/document_source_current_op.h" +#include "mongo/db/pipeline/document_value_test_util.h" +#include "mongo/db/pipeline/stub_mongod_interface.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/assert_util.h" +#include "mongo/util/mongoutils/str.h" + +namespace mongo { + +namespace { + +const std::string kMockShardName = "testshard"; + +/** + * Subclass AggregationContextFixture to set the ExpressionContext's namespace to 'admin' with + * {aggregate: 1} by default, so that parsing tests other than those which validate the namespace do + * not need to explicitly set it. + */ +class DocumentSourceCurrentOpTest : public AggregationContextFixture { +public: + DocumentSourceCurrentOpTest() + : AggregationContextFixture(NamespaceString::makeCollectionlessAggregateNSS("admin")) {} +}; + +/** + * A MongodInterface used for testing which returns artificial currentOp entries. + */ +class MockMongodImplementation final : public StubMongodInterface { +public: + MockMongodImplementation(std::vector<BSONObj> ops, bool hasShardName = true) + : _ops(std::move(ops)), _hasShardName(hasShardName) {} + + MockMongodImplementation(bool hasShardName = true) : _hasShardName(hasShardName) {} + + std::vector<BSONObj> getCurrentOps(CurrentOpConnectionsMode connMode, + CurrentOpUserMode userMode) const { + return _ops; + } + + std::string getShardName(OperationContext* opCtx) const { + if (_hasShardName) { + return kMockShardName; + } + + return std::string(); + } + +private: + std::vector<BSONObj> _ops; + bool _hasShardName; +}; + +TEST_F(DocumentSourceCurrentOpTest, ShouldFailToParseIfSpecIsNotObject) { + const auto specObj = fromjson("{$currentOp:1}"); + ASSERT_THROWS_CODE(DocumentSourceCurrentOp::createFromBson(specObj.firstElement(), getExpCtx()), + UserException, + ErrorCodes::FailedToParse); +} + +TEST_F(DocumentSourceCurrentOpTest, ShouldFailToParseIfNotRunOnAdmin) { + const auto specObj = fromjson("{$currentOp:{}}"); + getExpCtx()->ns = NamespaceString::makeCollectionlessAggregateNSS("foo"); + ASSERT_THROWS_CODE(DocumentSourceCurrentOp::createFromBson(specObj.firstElement(), getExpCtx()), + UserException, + ErrorCodes::InvalidNamespace); +} + +TEST_F(DocumentSourceCurrentOpTest, ShouldFailToParseIfNotRunWithAggregateOne) { + const auto specObj = fromjson("{$currentOp:{}}"); + getExpCtx()->ns = NamespaceString("admin.foo"); + ASSERT_THROWS_CODE(DocumentSourceCurrentOp::createFromBson(specObj.firstElement(), getExpCtx()), + UserException, + ErrorCodes::InvalidNamespace); +} + +TEST_F(DocumentSourceCurrentOpTest, ShouldFailToParseIdleConnectionsIfNotBoolean) { + const auto specObj = fromjson("{$currentOp:{idleConnections:1}}"); + ASSERT_THROWS_CODE(DocumentSourceCurrentOp::createFromBson(specObj.firstElement(), getExpCtx()), + UserException, + ErrorCodes::FailedToParse); +} + +TEST_F(DocumentSourceCurrentOpTest, ShouldFailToParseAllUsersIfNotBoolean) { + const auto specObj = fromjson("{$currentOp:{allUsers:1}}"); + ASSERT_THROWS_CODE(DocumentSourceCurrentOp::createFromBson(specObj.firstElement(), getExpCtx()), + UserException, + ErrorCodes::FailedToParse); +} + +TEST_F(DocumentSourceCurrentOpTest, ShouldFailToParseIfUnrecognisedParameterSpecified) { + const auto specObj = fromjson("{$currentOp:{foo:true}}"); + ASSERT_THROWS_CODE(DocumentSourceCurrentOp::createFromBson(specObj.firstElement(), getExpCtx()), + UserException, + ErrorCodes::FailedToParse); +} + +TEST_F(DocumentSourceCurrentOpTest, ShouldParseAndSerializeTrueOptionalArguments) { + const auto specObj = fromjson("{$currentOp:{idleConnections:true, allUsers:true}}"); + + const auto parsed = + DocumentSourceCurrentOp::createFromBson(specObj.firstElement(), getExpCtx()); + + const auto currentOp = static_cast<DocumentSourceCurrentOp*>(parsed.get()); + + const auto expectedOutput = + Document{{"$currentOp", Document{{"idleConnections", true}, {"allUsers", true}}}}; + + ASSERT_DOCUMENT_EQ(currentOp->serialize().getDocument(), expectedOutput); +} + +TEST_F(DocumentSourceCurrentOpTest, ShouldParseAndSerializeFalseOptionalArguments) { + const auto specObj = fromjson("{$currentOp:{idleConnections:false, allUsers:false}}"); + + const auto parsed = + DocumentSourceCurrentOp::createFromBson(specObj.firstElement(), getExpCtx()); + + const auto currentOp = static_cast<DocumentSourceCurrentOp*>(parsed.get()); + + const auto expectedOutput = + Document{{"$currentOp", Document{{"idleConnections", false}, {"allUsers", false}}}}; + + ASSERT_DOCUMENT_EQ(currentOp->serialize().getDocument(), expectedOutput); +} + +TEST_F(DocumentSourceCurrentOpTest, ShouldSerializeOmittedOptionalArgumentsAsDefaultValues) { + const auto specObj = fromjson("{$currentOp:{}}"); + + const auto parsed = + DocumentSourceCurrentOp::createFromBson(specObj.firstElement(), getExpCtx()); + + const auto currentOp = static_cast<DocumentSourceCurrentOp*>(parsed.get()); + + const auto expectedOutput = + Document{{"$currentOp", Document{{"idleConnections", false}, {"allUsers", false}}}}; + + ASSERT_DOCUMENT_EQ(currentOp->serialize().getDocument(), expectedOutput); +} + +TEST_F(DocumentSourceCurrentOpTest, ShouldReturnEOFImmediatelyIfNoCurrentOps) { + const auto currentOp = DocumentSourceCurrentOp::create(getExpCtx()); + const auto mongod = std::make_shared<MockMongodImplementation>(); + currentOp->injectMongodInterface(mongod); + + ASSERT(currentOp->getNext().isEOF()); +} + +TEST_F(DocumentSourceCurrentOpTest, ShouldModifyOpIDAndClientFieldNameInShardedContext) { + getExpCtx()->inShard = true; + + std::vector<BSONObj> ops{fromjson("{ client: '192.168.1.10:50844', opid: 430 }")}; + const auto mongod = std::make_shared<MockMongodImplementation>(ops); + + const auto currentOp = DocumentSourceCurrentOp::create(getExpCtx()); + currentOp->injectMongodInterface(mongod); + + const auto expectedOutput = + Document{{"client_s", std::string("192.168.1.10:50844")}, + {"opid", std::string(str::stream() << kMockShardName << ":430")}}; + + ASSERT_DOCUMENT_EQ(currentOp->getNext().getDocument(), expectedOutput); +} + +TEST_F(DocumentSourceCurrentOpTest, + ShouldReturnOpIDAndClientFieldNameUnmodifiedWhenNotInShardedContext) { + getExpCtx()->inShard = false; + + std::vector<BSONObj> ops{fromjson("{ client: '192.168.1.10:50844', opid: 430 }")}; + const auto mongod = std::make_shared<MockMongodImplementation>(ops); + + const auto currentOp = DocumentSourceCurrentOp::create(getExpCtx()); + currentOp->injectMongodInterface(mongod); + + const auto expectedOutput = + Document{{"client", std::string("192.168.1.10:50844")}, {"opid", 430}}; + + ASSERT_DOCUMENT_EQ(currentOp->getNext().getDocument(), expectedOutput); +} + +TEST_F(DocumentSourceCurrentOpTest, ShouldFailIfNoShardNameAvailableForShardedRequest) { + getExpCtx()->inShard = true; + + const auto mongod = std::make_shared<MockMongodImplementation>(false); + + const auto currentOp = DocumentSourceCurrentOp::create(getExpCtx()); + currentOp->injectMongodInterface(mongod); + + ASSERT_THROWS_CODE(currentOp->getNext(), UserException, 40465); +} + +TEST_F(DocumentSourceCurrentOpTest, ShouldFailIfOpIDIsNonNumericWhenModifyingInShardedContext) { + getExpCtx()->inShard = true; + + std::vector<BSONObj> ops{fromjson("{ client: '192.168.1.10:50844', opid: 'string' }")}; + const auto mongod = std::make_shared<MockMongodImplementation>(ops); + + const auto currentOp = DocumentSourceCurrentOp::create(getExpCtx()); + currentOp->injectMongodInterface(mongod); + + ASSERT_THROWS_CODE(currentOp->getNext(), UserException, ErrorCodes::TypeMismatch); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_cursor.h b/src/mongo/db/pipeline/document_source_cursor.h index 85e79b3cb2a..b3cdf802d4e 100644 --- a/src/mongo/db/pipeline/document_source_cursor.h +++ b/src/mongo/db/pipeline/document_source_cursor.h @@ -50,8 +50,8 @@ public: return _outputSorts; } Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; - bool isValidInitialSource() const final { - return true; + InitialSourceType getInitialSourceType() const final { + return InitialSourceType::kInitialSource; } void detachFromOperationContext() final; diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp index 99dcbd85827..c2248189a3f 100644 --- a/src/mongo/db/pipeline/document_source_facet.cpp +++ b/src/mongo/db/pipeline/document_source_facet.cpp @@ -278,7 +278,7 @@ intrusive_ptr<DocumentSource> DocumentSourceFacet::createFromBson( // Disallow any stages that need to be the first stage in the pipeline. for (auto&& stage : pipeline->getSources()) { - if (stage->isValidInitialSource()) { + if (stage->isInitialSource()) { uasserted(40173, str::stream() << stage->getSourceName() << " is not allowed to be used within a $facet stage: " diff --git a/src/mongo/db/pipeline/document_source_facet_test.cpp b/src/mongo/db/pipeline/document_source_facet_test.cpp index 54c22a306fc..da1f035374f 100644 --- a/src/mongo/db/pipeline/document_source_facet_test.cpp +++ b/src/mongo/db/pipeline/document_source_facet_test.cpp @@ -170,8 +170,8 @@ public: DocumentSourcePassthrough() : DocumentSourceMock({}) {} // We need this to be false so that it can be used in a $facet stage. - bool isValidInitialSource() const final { - return false; + InitialSourceType getInitialSourceType() const final { + return InitialSourceType::kNotInitialSource; } DocumentSource::GetNextResult getNext() final { diff --git a/src/mongo/db/pipeline/document_source_geo_near.h b/src/mongo/db/pipeline/document_source_geo_near.h index 03bcf304eee..5178a89c18a 100644 --- a/src/mongo/db/pipeline/document_source_geo_near.h +++ b/src/mongo/db/pipeline/document_source_geo_near.h @@ -46,8 +46,8 @@ public: */ Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) final; - bool isValidInitialSource() const final { - return true; + InitialSourceType getInitialSourceType() const final { + return InitialSourceType::kInitialSource; } Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; BSONObjSet getOutputSorts() final { diff --git a/src/mongo/db/pipeline/document_source_index_stats.h b/src/mongo/db/pipeline/document_source_index_stats.h index 26beb7edb8b..e802e0d7016 100644 --- a/src/mongo/db/pipeline/document_source_index_stats.h +++ b/src/mongo/db/pipeline/document_source_index_stats.h @@ -44,8 +44,8 @@ public: const char* getSourceName() const final; Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; - virtual bool isValidInitialSource() const final { - return true; + virtual InitialSourceType getInitialSourceType() const final { + return InitialSourceType::kInitialSource; } static boost::intrusive_ptr<DocumentSource> createFromBson( diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.h b/src/mongo/db/pipeline/document_source_merge_cursors.h index b32e1170573..ebe0a861ad6 100644 --- a/src/mongo/db/pipeline/document_source_merge_cursors.h +++ b/src/mongo/db/pipeline/document_source_merge_cursors.h @@ -52,8 +52,8 @@ public: GetNextResult getNext() final; const char* getSourceName() const final; Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; - bool isValidInitialSource() const final { - return true; + InitialSourceType getInitialSourceType() const final { + return InitialSourceType::kInitialSource; } static boost::intrusive_ptr<DocumentSource> createFromBson( diff --git a/src/mongo/db/pipeline/document_source_mock.h b/src/mongo/db/pipeline/document_source_mock.h index bdde16b711f..9236e55ad62 100644 --- a/src/mongo/db/pipeline/document_source_mock.h +++ b/src/mongo/db/pipeline/document_source_mock.h @@ -48,8 +48,8 @@ public: const char* getSourceName() const override; Value serialize( boost::optional<ExplainOptions::Verbosity> explain = boost::none) const override; - bool isValidInitialSource() const override { - return true; + InitialSourceType getInitialSourceType() const override { + return InitialSourceType::kInitialSource; } BSONObjSet getOutputSorts() override { return sorts; diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index 8c486e9ca4d..01171b86e6d 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -42,6 +42,7 @@ #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_geo_near.h" #include "mongo/db/pipeline/document_source_match.h" +#include "mongo/db/pipeline/document_source_merge_cursors.h" #include "mongo/db/pipeline/document_source_out.h" #include "mongo/db/pipeline/document_source_project.h" #include "mongo/db/pipeline/document_source_unwind.h" @@ -79,10 +80,11 @@ StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> Pipeline::parse( pipeline->_sources.end(), parsedSources.begin(), parsedSources.end()); } - auto status = pipeline->ensureAllStagesAreInLegalPositions(); + auto status = pipeline->validate(); if (!status.isOK()) { return status; } + pipeline->stitch(); return std::move(pipeline); } @@ -91,18 +93,47 @@ StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> Pipeline::create( SourceContainer stages, const intrusive_ptr<ExpressionContext>& expCtx) { std::unique_ptr<Pipeline, Pipeline::Deleter> pipeline(new Pipeline(stages, expCtx), Pipeline::Deleter(expCtx->opCtx)); - auto status = pipeline->ensureAllStagesAreInLegalPositions(); + auto status = pipeline->validate(); if (!status.isOK()) { return status; } + pipeline->stitch(); return std::move(pipeline); } -Status Pipeline::ensureAllStagesAreInLegalPositions() const { +Status Pipeline::validate() const { + // Verify that the specified namespace is valid for the initial stage of this pipeline. + const NamespaceString& nss = pCtx->ns; + + if (_sources.empty()) { + if (nss.isCollectionlessAggregateNS()) { + return {ErrorCodes::InvalidNamespace, + "{aggregate: 1} is not valid for an empty pipeline."}; + } + } else if (!dynamic_cast<DocumentSourceMergeCursors*>(_sources.front().get())) { + // The $mergeCursors stage can take {aggregate: 1} or a normal namespace. Aside from this, + // {aggregate: 1} is only valid for collectionless sources, and vice-versa. + const auto firstStage = _sources.front().get(); + + if (nss.isCollectionlessAggregateNS() && !firstStage->isCollectionlessInitialSource()) { + return {ErrorCodes::InvalidNamespace, + str::stream() << "{aggregate: 1} is not valid for '" + << firstStage->getSourceName() + << "'; a collection is required."}; + } + + if (!nss.isCollectionlessAggregateNS() && firstStage->isCollectionlessInitialSource()) { + return {ErrorCodes::InvalidNamespace, + str::stream() << "'" << firstStage->getSourceName() + << "' can only be run with {aggregate: 1}"}; + } + } + + // Verify that all stages of the pipeline are in legal positions. size_t i = 0; for (auto&& stage : _sources) { - if (stage->isValidInitialSource() && i != 0) { + if (stage->isInitialSource() && i != 0) { return {ErrorCodes::BadValue, str::stream() << stage->getSourceName() << " is only valid as the first stage in a pipeline."}; diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h index b32b5df76d5..8a7f4211490 100644 --- a/src/mongo/db/pipeline/pipeline.h +++ b/src/mongo/db/pipeline/pipeline.h @@ -253,10 +253,11 @@ private: void unstitch(); /** - * Returns a non-OK status if any stage is in an invalid position. For example, if an $out stage - * is present but is not the last stage in the pipeline. + * Returns a non-OK status if the pipeline fails any of a set of semantic checks. For example, + * if an $out stage is present then it must come last in the pipeline, while initial stages such + * as $indexStats must be at the start. */ - Status ensureAllStagesAreInLegalPositions() const; + Status validate() const; SourceContainer _sources; diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index b26baeb7bb4..06777b9d162 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -34,6 +34,7 @@ #include "mongo/bson/simple_bsonobj_comparator.h" #include "mongo/client/dbclientinterface.h" +#include "mongo/db/auth/authorization_session.h" #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/database.h" #include "mongo/db/catalog/document_validation.h" @@ -66,10 +67,12 @@ #include "mongo/db/s/sharded_connection_info.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/service_context.h" +#include "mongo/db/stats/fill_locker_info.h" #include "mongo/db/stats/storage_stats.h" #include "mongo/db/stats/top.h" #include "mongo/db/storage/record_store.h" #include "mongo/db/storage/sorted_data_interface.h" +#include "mongo/rpc/metadata/client_metadata_ismaster.h" #include "mongo/s/chunk_version.h" #include "mongo/stdx/memory.h" #include "mongo/util/log.h" @@ -212,6 +215,78 @@ public: return pipeline; } + std::vector<BSONObj> getCurrentOps(CurrentOpConnectionsMode connMode, + CurrentOpUserMode userMode) const { + AuthorizationSession* ctxAuth = AuthorizationSession::get(_ctx->opCtx->getClient()); + + std::vector<BSONObj> ops; + + for (ServiceContext::LockedClientsCursor cursor( + _ctx->opCtx->getClient()->getServiceContext()); + Client* client = cursor.next();) { + invariant(client); + + stdx::lock_guard<Client> lk(*client); + + // If auth is disabled, ignore the allUsers parameter. + if (ctxAuth->getAuthorizationManager().isAuthEnabled() && + userMode == CurrentOpUserMode::kExcludeOthers && + !ctxAuth->isCoauthorizedWithClient(client)) { + continue; + } + + const OperationContext* clientOpCtx = client->getOperationContext(); + + if (!clientOpCtx && connMode == CurrentOpConnectionsMode::kExcludeIdle) { + continue; + } + + BSONObjBuilder infoBuilder; + + client->reportState(infoBuilder); + + const auto& clientMetadata = + ClientMetadataIsMasterState::get(client).getClientMetadata(); + + if (clientMetadata) { + auto appName = clientMetadata.get().getApplicationName(); + if (!appName.empty()) { + infoBuilder.append("appName", appName); + } + + auto clientMetadataDocument = clientMetadata.get().getDocument(); + infoBuilder.append("clientMetadata", clientMetadataDocument); + } + + // Fill out the rest of the BSONObj with opCtx specific details. + infoBuilder.appendBool("active", static_cast<bool>(clientOpCtx)); + if (clientOpCtx) { + infoBuilder.append("opid", clientOpCtx->getOpID()); + if (clientOpCtx->isKillPending()) { + infoBuilder.append("killPending", true); + } + + CurOp::get(clientOpCtx)->reportState(&infoBuilder); + + Locker::LockerInfo lockerInfo; + clientOpCtx->lockState()->getLockerInfo(&lockerInfo); + fillLockerInfo(lockerInfo, infoBuilder); + } + + ops.emplace_back(infoBuilder.obj()); + } + + return ops; + } + + std::string getShardName(OperationContext* opCtx) const { + if (ShardingState::get(opCtx)->enabled()) { + return ShardingState::get(opCtx)->getShardName(); + } + + return std::string(); + } + private: intrusive_ptr<ExpressionContext> _ctx; DBDirectClient _client; @@ -355,7 +430,7 @@ void PipelineD::prepareCursorSource(Collection* collection, } if (!sources.empty()) { - if (sources.front()->isValidInitialSource()) { + if (sources.front()->isInitialSource()) { return; // don't need a cursor } diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index a0f50825b49..3a1aae261be 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -1275,6 +1275,61 @@ TEST(PipelineInitialSource, MatchInitialQuery) { ASSERT_BSONOBJ_EQ(pipe->getInitialQuery(), BSON("a" << 4)); } +namespace Namespaces { + +using PipelineInitialSourceNSTest = AggregationContextFixture; + +class DocumentSourceCollectionlessMock : public DocumentSourceMock { +public: + DocumentSourceCollectionlessMock() : DocumentSourceMock({}) {} + + InitialSourceType getInitialSourceType() const final { + return InitialSourceType::kCollectionlessInitialSource; + } + + static boost::intrusive_ptr<DocumentSourceCollectionlessMock> create() { + return new DocumentSourceCollectionlessMock(); + } +}; + +TEST_F(PipelineInitialSourceNSTest, AggregateOneNSNotValidForEmptyPipeline) { + const std::vector<BSONObj> rawPipeline = {}; + auto ctx = getExpCtx(); + + ctx->ns = NamespaceString::makeCollectionlessAggregateNSS("a"); + + ASSERT_NOT_OK(Pipeline::parse(rawPipeline, ctx).getStatus()); +} + +TEST_F(PipelineInitialSourceNSTest, AggregateOneNSNotValidIfInitialStageRequiresCollection) { + const std::vector<BSONObj> rawPipeline = {fromjson("{$match: {}}")}; + auto ctx = getExpCtx(); + + ctx->ns = NamespaceString::makeCollectionlessAggregateNSS("a"); + + ASSERT_NOT_OK(Pipeline::parse(rawPipeline, ctx).getStatus()); +} + +TEST_F(PipelineInitialSourceNSTest, AggregateOneNSValidIfInitialStageIsCollectionless) { + auto collectionlessSource = DocumentSourceCollectionlessMock::create(); + auto ctx = getExpCtx(); + + ctx->ns = NamespaceString::makeCollectionlessAggregateNSS("a"); + + ASSERT_OK(Pipeline::create({collectionlessSource}, ctx).getStatus()); +} + +TEST_F(PipelineInitialSourceNSTest, CollectionNSNotValidIfInitialStageIsCollectionless) { + auto collectionlessSource = DocumentSourceCollectionlessMock::create(); + auto ctx = getExpCtx(); + + ctx->ns = NamespaceString("a.collection"); + + ASSERT_NOT_OK(Pipeline::create({collectionlessSource}, ctx).getStatus()); +} + +} // namespace Namespaces + namespace Dependencies { using PipelineDependenciesTest = AggregationContextFixture; @@ -1300,8 +1355,8 @@ class DocumentSourceDependencyDummy : public DocumentSourceMock { public: DocumentSourceDependencyDummy() : DocumentSourceMock({}) {} - bool isValidInitialSource() const final { - return false; + InitialSourceType getInitialSourceType() const final { + return InitialSourceType::kNotInitialSource; } }; diff --git a/src/mongo/db/pipeline/stub_mongod_interface.h b/src/mongo/db/pipeline/stub_mongod_interface.h index e21ccc4c822..e75e98e91f8 100644 --- a/src/mongo/db/pipeline/stub_mongod_interface.h +++ b/src/mongo/db/pipeline/stub_mongod_interface.h @@ -92,5 +92,14 @@ public: const boost::intrusive_ptr<ExpressionContext>& expCtx) override { MONGO_UNREACHABLE; } + + std::vector<BSONObj> getCurrentOps(CurrentOpConnectionsMode connMode, + CurrentOpUserMode userMode) const override { + MONGO_UNREACHABLE; + } + + std::string getShardName(OperationContext* opCtx) const override { + MONGO_UNREACHABLE; + } }; } // namespace mongo diff --git a/src/mongo/db/run_commands.cpp b/src/mongo/db/run_commands.cpp index 704549bf868..fca1cfb1cde 100644 --- a/src/mongo/db/run_commands.cpp +++ b/src/mongo/db/run_commands.cpp @@ -365,7 +365,7 @@ bool runCommandImpl(OperationContext* opCtx, const std::string db = request.getDatabase().toString(); BSONObjBuilder inPlaceReplyBob = replyBuilder->getInPlaceReplyBuilder(bytesToReserve); - auto readConcernArgsStatus = _extractReadConcern(cmd, command->supportsReadConcern()); + auto readConcernArgsStatus = _extractReadConcern(cmd, command->supportsReadConcern(db, cmd)); if (!readConcernArgsStatus.isOK()) { auto result = @@ -433,8 +433,9 @@ bool runCommandImpl(OperationContext* opCtx, // When a linearizable read command is passed in, check to make sure we're reading // from the primary. - if (command->supportsReadConcern() && (readConcernArgsStatus.getValue().getLevel() == - repl::ReadConcernLevel::kLinearizableReadConcern) && + if (command->supportsReadConcern(db, cmd) && + (readConcernArgsStatus.getValue().getLevel() == + repl::ReadConcernLevel::kLinearizableReadConcern) && (request.getCommandName() != "getMore")) { auto linearizableReadStatus = waitForLinearizableReadConcern(opCtx); |