diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2018-03-14 17:46:22 +0000 |
---|---|---|
committer | Bernard Gorman <bernard.gorman@gmail.com> | 2018-03-15 17:33:50 +0000 |
commit | 5ecf2c0a5bffa837c96ad20dea23a94c5165739a (patch) | |
tree | 254b1baa62ecd3092b8962778fcc0480c066de03 /src | |
parent | 8c639f958eb5edff4d52560e4c3dfe04c7a78f31 (diff) | |
download | mongo-5ecf2c0a5bffa837c96ad20dea23a94c5165739a.tar.gz |
SERVER-18094 Add 'localOps' parameter to $currentOp to show local mongoS operations
Diffstat (limited to 'src')
31 files changed, 502 insertions, 191 deletions
diff --git a/src/mongo/db/curop.cpp b/src/mongo/db/curop.cpp index ea71baacde0..3eb90f98ff2 100644 --- a/src/mongo/db/curop.cpp +++ b/src/mongo/db/curop.cpp @@ -45,6 +45,7 @@ #include "mongo/rpc/metadata/client_metadata.h" #include "mongo/rpc/metadata/client_metadata_ismaster.h" #include "mongo/util/log.h" +#include "mongo/util/net/sock.h" #include "mongo/util/stringutils.h" namespace mongo { @@ -224,6 +225,49 @@ CurOp* CurOp::get(const OperationContext& opCtx) { return _curopStack(opCtx).top(); } +void CurOp::reportCurrentOpForClient(OperationContext* opCtx, + Client* client, + bool truncateOps, + BSONObjBuilder* infoBuilder) { + invariant(client); + OperationContext* clientOpCtx = client->getOperationContext(); + + const std::string hostName = getHostNameCachedAndPort(); + infoBuilder->append("host", hostName); + + client->reportState(*infoBuilder); + const auto& clientMetadata = ClientMetadataIsMasterState::get(client).getClientMetadata(); + + if (clientMetadata) { + auto appName = clientMetadata.get().getApplicationName(); + if (!appName.empty()) { + infoBuilder->append("appName", appName); + } + + auto clientMetadataDocument = clientMetadata.get().getDocument(); + infoBuilder->append("clientMetadata", clientMetadataDocument); + } + + // Fill out the rest of the BSONObj with opCtx specific details. + infoBuilder->appendBool("active", static_cast<bool>(clientOpCtx)); + infoBuilder->append("currentOpTime", + opCtx->getServiceContext()->getPreciseClockSource()->now().toString()); + + if (clientOpCtx) { + infoBuilder->append("opid", clientOpCtx->getOpID()); + if (clientOpCtx->isKillPending()) { + infoBuilder->append("killPending", true); + } + + if (clientOpCtx->getLogicalSessionId()) { + BSONObjBuilder bob(infoBuilder->subobjStart("lsid")); + clientOpCtx->getLogicalSessionId()->serialize(&bob); + } + + CurOp::get(clientOpCtx)->reportState(infoBuilder, truncateOps); + } +} + CurOp::CurOp(OperationContext* opCtx) : CurOp(opCtx, &_curopStack(opCtx)) {} CurOp::CurOp(OperationContext* opCtx, CurOpStack* stack) : _stack(stack) { @@ -234,6 +278,26 @@ CurOp::CurOp(OperationContext* opCtx, CurOpStack* stack) : _stack(stack) { } } +CurOp::~CurOp() { + invariant(this == _stack->pop()); +} + +void CurOp::setGenericOpRequestDetails(OperationContext* opCtx, + const NamespaceString& nss, + const Command* command, + BSONObj cmdObj, + NetworkOp op) { + auto logicalOp = (command ? command->getLogicalOp() : networkOpToLogicalOp(op)); + + stdx::lock_guard<Client> clientLock(*opCtx->getClient()); + _isCommand = _debug.iscommand = (command != nullptr); + _logicalOp = _debug.logicalOp = logicalOp; + _networkOp = _debug.networkOp = op; + _opDescription = cmdObj; + _command = command; + _ns = nss.ns(); +} + ProgressMeter& CurOp::setMessage_inlock(const char* msg, std::string name, unsigned long long progressMeterTotal, @@ -252,10 +316,6 @@ ProgressMeter& CurOp::setMessage_inlock(const char* msg, return _progressMeter; } -CurOp::~CurOp() { - invariant(this == _stack->pop()); -} - void CurOp::setNS_inlock(StringData ns) { _ns = ns.toString(); } @@ -354,23 +414,7 @@ void CurOp::reportState(BSONObjBuilder* builder, bool truncateOps) { // is true, limit the size of each op to 1000 bytes. Otherwise, do not truncate. const boost::optional<size_t> maxQuerySize{truncateOps, 1000}; - if (!_command && _networkOp == dbQuery) { - // This is a legacy OP_QUERY. We upconvert the "query" field of the currentOp output to look - // similar to a find command. - // - // CurOp doesn't have access to the ntoreturn or ntoskip values. By setting them to zero, we - // will omit mention of them in the currentOp output. - const int ntoreturn = 0; - const int ntoskip = 0; - - appendAsObjOrString( - "command", - upconvertQueryEntry(_opDescription, NamespaceString(_ns), ntoreturn, ntoskip), - maxQuerySize, - builder); - } else { - appendAsObjOrString("command", _opDescription, maxQuerySize, builder); - } + appendAsObjOrString("command", _opDescription, maxQuerySize, builder); if (!_originatingCommand.isEmpty()) { appendAsObjOrString("originatingCommand", _originatingCommand, maxQuerySize, builder); @@ -436,21 +480,11 @@ string OpDebug::report(Client* client, } } - BSONObj query; - - // If necessary, upconvert legacy find operations so that their log lines resemble their find - // command counterpart. - if (!iscommand && networkOp == dbQuery) { - query = upconvertQueryEntry( - curop.opDescription(), NamespaceString(curop.getNS()), ntoreturn, ntoskip); - } else { - query = curop.opDescription(); - } - + auto query = curop.opDescription(); if (!query.isEmpty()) { s << " command: "; if (iscommand) { - Command* curCommand = curop.getCommand(); + const Command* curCommand = curop.getCommand(); if (curCommand) { mutablebson::Document cmdToLog(query, mutablebson::Document::kInPlaceDisabled); curCommand->redactForLogging(&cmdToLog); @@ -555,14 +589,7 @@ void OpDebug::append(const CurOp& curop, NamespaceString nss = NamespaceString(curop.getNS()); b.append("ns", nss.ns()); - if (!iscommand && networkOp == dbQuery) { - appendAsObjOrString("command", - upconvertQueryEntry(curop.opDescription(), nss, ntoreturn, ntoskip), - maxElementSize, - &b); - } else if (curop.haveOpDescription()) { - appendAsObjOrString("command", curop.opDescription(), maxElementSize, &b); - } + appendAsObjOrString("command", curop.opDescription(), maxElementSize, &b); auto originatingCommand = curop.originatingCommand(); if (!originatingCommand.isEmpty()) { diff --git a/src/mongo/db/curop.h b/src/mongo/db/curop.h index 046516aaed2..bdb30791060 100644 --- a/src/mongo/db/curop.h +++ b/src/mongo/db/curop.h @@ -158,11 +158,34 @@ public: static CurOp* get(const OperationContext& opCtx); /** + * Writes a report of the operation being executed by the given client to the supplied + * BSONObjBuilder, in a format suitable for display in currentOp. Does not include a lockInfo + * report, since this may be called in either a mongoD or mongoS context and the latter does not + * supply lock stats. The client must be locked before calling this method. + */ + static void reportCurrentOpForClient(OperationContext* opCtx, + Client* client, + bool truncateOps, + BSONObjBuilder* infoBuilder); + + /** * Constructs a nested CurOp at the top of the given "opCtx"'s CurOp stack. */ explicit CurOp(OperationContext* opCtx); ~CurOp(); + /** + * Fills out CurOp and OpDebug with basic info common to all commands. We require the NetworkOp + * in order to distinguish which protocol delivered this request, e.g. OP_QUERY or OP_MSG. This + * is set early in the request processing backend and does not typically need to be called + * thereafter. Locks the client as needed to apply the specified settings. + */ + void setGenericOpRequestDetails(OperationContext* opCtx, + const NamespaceString& nss, + const Command* command, + BSONObj cmdObj, + NetworkOp op); + bool haveOpDescription() const { return !_opDescription.isEmpty(); } @@ -265,6 +288,13 @@ public: } /** + * Returns true if this CurOp represents a non-command OP_QUERY request. + */ + bool isLegacyQuery() const { + return _networkOp == NetworkOp::dbQuery && !isCommand(); + } + + /** * Returns true if the current operation is known to be a command. */ bool isCommand() const { @@ -369,10 +399,10 @@ public: _originatingCommand = commandObj.getOwned(); } - Command* getCommand() const { + const Command* getCommand() const { return _command; } - void setCommand_inlock(Command* command) { + void setCommand_inlock(const Command* command) { _command = command; } @@ -455,7 +485,7 @@ private: CurOpStack* _stack; CurOp* _parent{nullptr}; - Command* _command{nullptr}; + const Command* _command{nullptr}; // The time at which this CurOp instance was marked as started. long long _start{0}; diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index bcef4903325..0e761e36290 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -693,7 +693,7 @@ WriteResult performUpdates(OperationContext* opCtx, const write_ops::Update& who // TODO: don't create nested CurOp for legacy writes. // Add Command pointer to the nested CurOp. auto& parentCurOp = *CurOp::get(opCtx); - Command* cmd = parentCurOp.getCommand(); + const Command* cmd = parentCurOp.getCommand(); CurOp curOp(opCtx); { stdx::lock_guard<Client> lk(*opCtx->getClient()); @@ -833,7 +833,7 @@ WriteResult performDeletes(OperationContext* opCtx, const write_ops::Delete& who // TODO: don't create nested CurOp for legacy writes. // Add Command pointer to the nested CurOp. auto& parentCurOp = *CurOp::get(opCtx); - Command* cmd = parentCurOp.getCommand(); + const Command* cmd = parentCurOp.getCommand(); CurOp curOp(opCtx); { stdx::lock_guard<Client> lk(*opCtx->getClient()); diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 1019ee08d1c..5f25c2c6545 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -302,6 +302,7 @@ pipelineeEnv.Library( 'document_source_sort_by_count.cpp', 'document_source_tee_consumer.cpp', 'document_source_unwind.cpp', + 'mongo_process_common.cpp', 'pipeline.cpp', 'sequential_document_cache.cpp', 'tee_buffer.cpp', diff --git a/src/mongo/db/pipeline/document_source_current_op.cpp b/src/mongo/db/pipeline/document_source_current_op.cpp index c650f9ae0b9..f7c6dc2e171 100644 --- a/src/mongo/db/pipeline/document_source_current_op.cpp +++ b/src/mongo/db/pipeline/document_source_current_op.cpp @@ -37,6 +37,7 @@ namespace mongo { namespace { const StringData kAllUsersFieldName = "allUsers"_sd; const StringData kIdleConnectionsFieldName = "idleConnections"_sd; +const StringData kLocalOpsFieldName = "localOps"_sd; const StringData kTruncateOpsFieldName = "truncateOps"_sd; const StringData kOpIdFieldName = "opid"_sd; @@ -60,11 +61,14 @@ std::unique_ptr<DocumentSourceCurrentOp::LiteParsed> DocumentSourceCurrentOp::Li << typeName(spec.type())); } - bool allUsers = false; + auto allUsers = UserMode::kExcludeOthers; + auto localOps = LocalOpsMode::kRemoteShardOps; // Check the spec for all fields named 'allUsers'. If any of them are 'true', we require // the 'inprog' privilege. This avoids the possibility that a spec with multiple - // allUsers fields might allow an unauthorized user to view all operations. + // allUsers fields might allow an unauthorized user to view all operations. We also check for + // the presence of a 'localOps' field, which instructs this $currentOp to list local mongoS + // operations rather than forwarding the request to the shards. for (auto&& elem : spec.embeddedObject()) { if (elem.fieldNameStringData() == "allUsers"_sd) { if (elem.type() != BSONType::Bool) { @@ -74,11 +78,23 @@ std::unique_ptr<DocumentSourceCurrentOp::LiteParsed> DocumentSourceCurrentOp::Li << typeName(elem.type())); } - allUsers = allUsers || elem.boolean(); + if (elem.boolean()) { + allUsers = UserMode::kIncludeAll; + } + } else if (elem.fieldNameStringData() == "localOps") { + uassert(ErrorCodes::TypeMismatch, + str::stream() << "The 'localOps' parameter of the $currentOp stage must be a " + "boolean value, but found: " + << typeName(elem.type()), + elem.type() == BSONType::Bool); + + if (elem.boolean()) { + localOps = LocalOpsMode::kLocalMongosOps; + } } } - return stdx::make_unique<DocumentSourceCurrentOp::LiteParsed>(allUsers); + return stdx::make_unique<DocumentSourceCurrentOp::LiteParsed>(allUsers, localOps); } @@ -163,6 +179,7 @@ intrusive_ptr<DocumentSource> DocumentSourceCurrentOp::createFromBson( ConnMode includeIdleConnections = ConnMode::kExcludeIdle; UserMode includeOpsFromAllUsers = UserMode::kExcludeOthers; + LocalOpsMode showLocalOpsOnMongoS = LocalOpsMode::kRemoteShardOps; TruncationMode truncateOps = TruncationMode::kNoTruncation; for (auto&& elem : spec.embeddedObject()) { @@ -175,7 +192,7 @@ intrusive_ptr<DocumentSource> DocumentSourceCurrentOp::createFromBson( << typeName(elem.type()), elem.type() == BSONType::Bool); includeIdleConnections = - (elem.Bool() ? ConnMode::kIncludeIdle : ConnMode::kExcludeIdle); + (elem.boolean() ? ConnMode::kIncludeIdle : ConnMode::kExcludeIdle); } else if (fieldName == kAllUsersFieldName) { uassert(ErrorCodes::FailedToParse, str::stream() << "The 'allUsers' parameter of the $currentOp stage must be a " @@ -183,7 +200,15 @@ intrusive_ptr<DocumentSource> DocumentSourceCurrentOp::createFromBson( << typeName(elem.type()), elem.type() == BSONType::Bool); includeOpsFromAllUsers = - (elem.Bool() ? UserMode::kIncludeAll : UserMode::kExcludeOthers); + (elem.boolean() ? UserMode::kIncludeAll : UserMode::kExcludeOthers); + } else if (fieldName == kLocalOpsFieldName) { + uassert(ErrorCodes::FailedToParse, + str::stream() << "The 'localOps' parameter of the $currentOp stage must be " + "a boolean value, but found: " + << typeName(elem.type()), + elem.type() == BSONType::Bool); + showLocalOpsOnMongoS = + (elem.boolean() ? LocalOpsMode::kLocalMongosOps : LocalOpsMode::kRemoteShardOps); } else if (fieldName == kTruncateOpsFieldName) { uassert(ErrorCodes::FailedToParse, str::stream() << "The 'truncateOps' parameter of the $currentOp stage must be " @@ -191,7 +216,7 @@ intrusive_ptr<DocumentSource> DocumentSourceCurrentOp::createFromBson( << typeName(elem.type()), elem.type() == BSONType::Bool); truncateOps = - (elem.Bool() ? TruncationMode::kTruncateOps : TruncationMode::kNoTruncation); + (elem.boolean() ? TruncationMode::kTruncateOps : TruncationMode::kNoTruncation); } else { uasserted(ErrorCodes::FailedToParse, str::stream() << "Unrecognized option '" << fieldName @@ -199,24 +224,30 @@ intrusive_ptr<DocumentSource> DocumentSourceCurrentOp::createFromBson( } } - return intrusive_ptr<DocumentSourceCurrentOp>(new DocumentSourceCurrentOp( - pExpCtx, includeIdleConnections, includeOpsFromAllUsers, truncateOps)); + return new DocumentSourceCurrentOp( + pExpCtx, includeIdleConnections, includeOpsFromAllUsers, showLocalOpsOnMongoS, truncateOps); } intrusive_ptr<DocumentSourceCurrentOp> DocumentSourceCurrentOp::create( const boost::intrusive_ptr<ExpressionContext>& pExpCtx, ConnMode includeIdleConnections, UserMode includeOpsFromAllUsers, + LocalOpsMode showLocalOpsOnMongoS, TruncationMode truncateOps) { - return intrusive_ptr<DocumentSourceCurrentOp>(new DocumentSourceCurrentOp( - pExpCtx, includeIdleConnections, includeOpsFromAllUsers, truncateOps)); + return new DocumentSourceCurrentOp( + pExpCtx, includeIdleConnections, includeOpsFromAllUsers, showLocalOpsOnMongoS, truncateOps); } Value DocumentSourceCurrentOp::serialize(boost::optional<ExplainOptions::Verbosity> explain) const { return Value(Document{ {getSourceName(), - Document{{kIdleConnectionsFieldName, (_includeIdleConnections == ConnMode::kIncludeIdle)}, - {kAllUsersFieldName, (_includeOpsFromAllUsers == UserMode::kIncludeAll)}, - {kTruncateOpsFieldName, (_truncateOps == TruncationMode::kTruncateOps)}}}}); + Document{{kIdleConnectionsFieldName, + _includeIdleConnections == ConnMode::kIncludeIdle ? Value(true) : Value()}, + {kAllUsersFieldName, + _includeOpsFromAllUsers == UserMode::kIncludeAll ? Value(true) : Value()}, + {kLocalOpsFieldName, + _showLocalOpsOnMongoS == LocalOpsMode::kLocalMongosOps ? Value(true) : Value()}, + {kTruncateOpsFieldName, + _truncateOps == TruncationMode::kTruncateOps ? Value(true) : Value()}}}}); } } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_current_op.h b/src/mongo/db/pipeline/document_source_current_op.h index cb180a99085..d655c0ed2bf 100644 --- a/src/mongo/db/pipeline/document_source_current_op.h +++ b/src/mongo/db/pipeline/document_source_current_op.h @@ -34,12 +34,18 @@ namespace mongo { class DocumentSourceCurrentOp final : public DocumentSource { public: + using TruncationMode = MongoProcessInterface::CurrentOpTruncateMode; + using ConnMode = MongoProcessInterface::CurrentOpConnectionsMode; + using LocalOpsMode = MongoProcessInterface::CurrentOpLocalOpsMode; + using UserMode = MongoProcessInterface::CurrentOpUserMode; + class LiteParsed final : public LiteParsedDocumentSource { public: static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request, const BSONElement& spec); - explicit LiteParsed(bool allUsers) : _allUsers(allUsers) {} + LiteParsed(UserMode allUsers, LocalOpsMode localOps) + : _allUsers(allUsers), _localOps(localOps) {} stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final { return stdx::unordered_set<NamespaceString>(); @@ -48,30 +54,39 @@ public: PrivilegeVector requiredPrivileges(bool isMongos) const final { PrivilegeVector privileges; - // In a sharded cluster, we always need the inprog privilege to run $currentOp. - if (isMongos || _allUsers) { + // In a sharded cluster, we always need the inprog privilege to run $currentOp on the + // shards. If we are only looking up local mongoS operations, we do not need inprog to + // view our own ops but *do* require it to view other users' ops. + if (_allUsers == UserMode::kIncludeAll || + (isMongos && _localOps == LocalOpsMode::kRemoteShardOps)) { privileges.push_back({ResourcePattern::forClusterResource(), ActionType::inprog}); } return privileges; } + bool allowedToForwardFromMongos() const final { + return _localOps == LocalOpsMode::kRemoteShardOps; + } + + bool allowedToPassthroughFromMongos() const final { + return _localOps == LocalOpsMode::kRemoteShardOps; + } + bool isInitialSource() const final { return true; } private: - const bool _allUsers; + const UserMode _allUsers; + const LocalOpsMode _localOps; }; - using TruncationMode = MongoProcessInterface::CurrentOpTruncateMode; - using ConnMode = MongoProcessInterface::CurrentOpConnectionsMode; - using UserMode = MongoProcessInterface::CurrentOpUserMode; - static boost::intrusive_ptr<DocumentSourceCurrentOp> create( const boost::intrusive_ptr<ExpressionContext>& pExpCtx, ConnMode includeIdleConnections = ConnMode::kExcludeIdle, UserMode includeOpsFromAllUsers = UserMode::kExcludeOthers, + LocalOpsMode showLocalOpsOnMongoS = LocalOpsMode::kRemoteShardOps, TruncationMode truncateOps = TruncationMode::kNoTruncation); GetNextResult getNext() final; @@ -81,7 +96,9 @@ public: StageConstraints constraints(Pipeline::SplitState pipeState) const final { StageConstraints constraints(StreamType::kStreaming, PositionRequirement::kFirst, - HostTypeRequirement::kAnyShard, + (_showLocalOpsOnMongoS == LocalOpsMode::kLocalMongosOps + ? HostTypeRequirement::kLocalOnly + : HostTypeRequirement::kAnyShard), DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, TransactionRequirement::kNotAllowed); @@ -100,14 +117,17 @@ private: DocumentSourceCurrentOp(const boost::intrusive_ptr<ExpressionContext>& pExpCtx, ConnMode includeIdleConnections = ConnMode::kExcludeIdle, UserMode includeOpsFromAllUsers = UserMode::kExcludeOthers, + LocalOpsMode showLocalOpsOnMongoS = LocalOpsMode::kRemoteShardOps, TruncationMode truncateOps = TruncationMode::kNoTruncation) : DocumentSource(pExpCtx), _includeIdleConnections(includeIdleConnections), _includeOpsFromAllUsers(includeOpsFromAllUsers), + _showLocalOpsOnMongoS(showLocalOpsOnMongoS), _truncateOps(truncateOps) {} ConnMode _includeIdleConnections = ConnMode::kExcludeIdle; UserMode _includeOpsFromAllUsers = UserMode::kExcludeOthers; + LocalOpsMode _showLocalOpsOnMongoS = LocalOpsMode::kRemoteShardOps; TruncationMode _truncateOps = TruncationMode::kNoTruncation; std::string _shardName; diff --git a/src/mongo/db/pipeline/document_source_current_op_test.cpp b/src/mongo/db/pipeline/document_source_current_op_test.cpp index f2acc179d75..127dd1f4ee8 100644 --- a/src/mongo/db/pipeline/document_source_current_op_test.cpp +++ b/src/mongo/db/pipeline/document_source_current_op_test.cpp @@ -41,6 +41,7 @@ namespace mongo { namespace { +const auto kQueryPlanner = ExplainOptions::Verbosity::kQueryPlanner; const std::string kMockShardName = "testshard"; /** @@ -121,6 +122,13 @@ TEST_F(DocumentSourceCurrentOpTest, ShouldFailToParseAllUsersIfNotBoolean) { ErrorCodes::FailedToParse); } +TEST_F(DocumentSourceCurrentOpTest, ShouldFailToParseLocalOpsIfNotBoolean) { + const auto specObj = fromjson("{$currentOp:{localOps:1}}"); + ASSERT_THROWS_CODE(DocumentSourceCurrentOp::createFromBson(specObj.firstElement(), getExpCtx()), + AssertionException, + ErrorCodes::FailedToParse); +} + TEST_F(DocumentSourceCurrentOpTest, ShouldFailToParseTruncateOpsIfNotBoolean) { const auto specObj = fromjson("{$currentOp:{truncateOps:1}}"); ASSERT_THROWS_CODE(DocumentSourceCurrentOp::createFromBson(specObj.firstElement(), getExpCtx()), @@ -136,38 +144,38 @@ TEST_F(DocumentSourceCurrentOpTest, ShouldFailToParseIfUnrecognisedParameterSpec } TEST_F(DocumentSourceCurrentOpTest, ShouldParseAndSerializeTrueOptionalArguments) { - const auto specObj = - fromjson("{$currentOp:{idleConnections:true, allUsers:true, truncateOps:true}}"); + const auto specObj = fromjson( + "{$currentOp:{idleConnections:true, allUsers:true, localOps:true, truncateOps:true}}"); const auto parsed = DocumentSourceCurrentOp::createFromBson(specObj.firstElement(), getExpCtx()); const auto currentOp = static_cast<DocumentSourceCurrentOp*>(parsed.get()); - const auto expectedOutput = - Document{{"$currentOp", - Document{{"idleConnections", true}, {"allUsers", true}, {"truncateOps", true}}}}; + const auto expectedOutput = Document{{"$currentOp", + Document{{"idleConnections", true}, + {"allUsers", true}, + {"localOps", true}, + {"truncateOps", true}}}}; ASSERT_DOCUMENT_EQ(currentOp->serialize().getDocument(), expectedOutput); } -TEST_F(DocumentSourceCurrentOpTest, ShouldParseAndSerializeFalseOptionalArguments) { - const auto specObj = - fromjson("{$currentOp:{idleConnections:false, allUsers:false, truncateOps:false}}"); +TEST_F(DocumentSourceCurrentOpTest, ShouldParseButNotSerializeFalseOptionalArguments) { + const auto specObj = fromjson( + "{$currentOp:{idleConnections:false, allUsers:false, localOps:false, truncateOps:false}}"); const auto parsed = DocumentSourceCurrentOp::createFromBson(specObj.firstElement(), getExpCtx()); const auto currentOp = static_cast<DocumentSourceCurrentOp*>(parsed.get()); - const auto expectedOutput = Document{ - {"$currentOp", - Document{{"idleConnections", false}, {"allUsers", false}, {"truncateOps", false}}}}; + const auto expectedOutput = Document{{"$currentOp", Document{}}}; ASSERT_DOCUMENT_EQ(currentOp->serialize().getDocument(), expectedOutput); } -TEST_F(DocumentSourceCurrentOpTest, ShouldSerializeOmittedOptionalArgumentsAsDefaultValues) { +TEST_F(DocumentSourceCurrentOpTest, ShouldNotSerializeOmittedOptionalArguments) { const auto specObj = fromjson("{$currentOp:{}}"); const auto parsed = @@ -175,9 +183,7 @@ TEST_F(DocumentSourceCurrentOpTest, ShouldSerializeOmittedOptionalArgumentsAsDef const auto currentOp = static_cast<DocumentSourceCurrentOp*>(parsed.get()); - const auto expectedOutput = Document{ - {"$currentOp", - Document{{"idleConnections", false}, {"allUsers", false}, {"truncateOps", false}}}}; + const auto expectedOutput = Document{{"$currentOp", Document{}}}; ASSERT_DOCUMENT_EQ(currentOp->serialize().getDocument(), expectedOutput); } diff --git a/src/mongo/db/pipeline/mongo_process_common.cpp b/src/mongo/db/pipeline/mongo_process_common.cpp new file mode 100644 index 00000000000..634f9c71231 --- /dev/null +++ b/src/mongo/db/pipeline/mongo_process_common.cpp @@ -0,0 +1,74 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/mongo_process_common.h" + +#include "mongo/db/auth/authorization_manager.h" +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/client.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/service_context.h" + +namespace mongo { + +std::vector<BSONObj> MongoProcessCommon::getCurrentOps(OperationContext* opCtx, + CurrentOpConnectionsMode connMode, + CurrentOpUserMode userMode, + CurrentOpTruncateMode truncateMode) const { + AuthorizationSession* ctxAuth = AuthorizationSession::get(opCtx->getClient()); + + std::vector<BSONObj> ops; + + for (ServiceContext::LockedClientsCursor cursor(opCtx->getClient()->getServiceContext()); + Client* client = cursor.next();) { + invariant(client); + + stdx::lock_guard<Client> lk(*client); + + // If auth is disabled, ignore the allUsers parameter. + if (ctxAuth->getAuthorizationManager().isAuthEnabled() && + userMode == CurrentOpUserMode::kExcludeOthers && + !ctxAuth->isCoauthorizedWithClient(client)) { + continue; + } + + // Ignore inactive connections unless 'idleConnections' is true. + if (!client->getOperationContext() && connMode == CurrentOpConnectionsMode::kExcludeIdle) { + continue; + } + + // Delegate to the mongoD- or mongoS-specific implementation of _reportCurrentOpForClient. + ops.emplace_back(_reportCurrentOpForClient(opCtx, client, truncateMode)); + } + + return ops; +} + +} // namespace mongo diff --git a/src/mongo/db/pipeline/mongo_process_common.h b/src/mongo/db/pipeline/mongo_process_common.h new file mode 100644 index 00000000000..19596ade67a --- /dev/null +++ b/src/mongo/db/pipeline/mongo_process_common.h @@ -0,0 +1,62 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include <vector> + +#include "mongo/bson/bsonobj.h" +#include "mongo/db/pipeline/mongo_process_interface.h" + +namespace mongo { + +/** + * MongoProcessCommon provides base implementations of any MongoProcessInterface methods whose code + * is largely identical on mongoD and mongoS. + */ +class MongoProcessCommon : public MongoProcessInterface { +public: + virtual ~MongoProcessCommon() = default; + + std::vector<BSONObj> getCurrentOps(OperationContext* opCtx, + CurrentOpConnectionsMode connMode, + CurrentOpUserMode userMode, + CurrentOpTruncateMode) const final; + +protected: + /** + * Returns a BSONObj representing a report of the operation which is currently being + * executed by the supplied client. This method is called by the getCurrentOps method of + * MongoProcessCommon to delegate to the mongoS- or mongoD- specific implementation. + */ + virtual BSONObj _reportCurrentOpForClient(OperationContext* opCtx, + Client* client, + CurrentOpTruncateMode truncateOps) const = 0; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h index d32e8276c63..cdfae197c1f 100644 --- a/src/mongo/db/pipeline/mongo_process_interface.h +++ b/src/mongo/db/pipeline/mongo_process_interface.h @@ -62,6 +62,7 @@ public: enum class CurrentOpConnectionsMode { kIncludeIdle, kExcludeIdle }; enum class CurrentOpUserMode { kIncludeAll, kExcludeOthers }; enum class CurrentOpTruncateMode { kNoTruncation, kTruncateOps }; + enum class CurrentOpLocalOpsMode { kLocalMongosOps, kRemoteShardOps }; struct MakePipelineOptions { MakePipelineOptions(){}; diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index ae130fa7595..e460407f89e 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -42,6 +42,7 @@ #include "mongo/db/catalog/index_catalog.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/curop.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/exec/fetch.h" @@ -732,83 +733,6 @@ Status PipelineD::MongoDInterface::attachCursorSourceToPipeline( return Status::OK(); } -std::vector<BSONObj> PipelineD::MongoDInterface::getCurrentOps( - OperationContext* opCtx, - CurrentOpConnectionsMode connMode, - CurrentOpUserMode userMode, - CurrentOpTruncateMode truncateMode) const { - AuthorizationSession* ctxAuth = AuthorizationSession::get(opCtx->getClient()); - - const std::string hostName = getHostNameCachedAndPort(); - - std::vector<BSONObj> ops; - - for (ServiceContext::LockedClientsCursor cursor(opCtx->getClient()->getServiceContext()); - Client* client = cursor.next();) { - invariant(client); - - stdx::lock_guard<Client> lk(*client); - - // If auth is disabled, ignore the allUsers parameter. - if (ctxAuth->getAuthorizationManager().isAuthEnabled() && - userMode == CurrentOpUserMode::kExcludeOthers && - !ctxAuth->isCoauthorizedWithClient(client)) { - continue; - } - - const OperationContext* clientOpCtx = client->getOperationContext(); - - if (!clientOpCtx && connMode == CurrentOpConnectionsMode::kExcludeIdle) { - continue; - } - - BSONObjBuilder infoBuilder; - - infoBuilder.append("host", hostName); - - client->reportState(infoBuilder); - const auto& clientMetadata = ClientMetadataIsMasterState::get(client).getClientMetadata(); - - if (clientMetadata) { - auto appName = clientMetadata.get().getApplicationName(); - if (!appName.empty()) { - infoBuilder.append("appName", appName); - } - - auto clientMetadataDocument = clientMetadata.get().getDocument(); - infoBuilder.append("clientMetadata", clientMetadataDocument); - } - - // Fill out the rest of the BSONObj with opCtx specific details. - infoBuilder.appendBool("active", static_cast<bool>(clientOpCtx)); - infoBuilder.append("currentOpTime", - opCtx->getServiceContext()->getPreciseClockSource()->now().toString()); - - if (clientOpCtx) { - infoBuilder.append("opid", clientOpCtx->getOpID()); - if (clientOpCtx->isKillPending()) { - infoBuilder.append("killPending", true); - } - - if (clientOpCtx->getLogicalSessionId()) { - BSONObjBuilder bob(infoBuilder.subobjStart("lsid")); - clientOpCtx->getLogicalSessionId()->serialize(&bob); - } - - CurOp::get(clientOpCtx) - ->reportState(&infoBuilder, (truncateMode == CurrentOpTruncateMode::kTruncateOps)); - - Locker::LockerInfo lockerInfo; - clientOpCtx->lockState()->getLockerInfo(&lockerInfo); - fillLockerInfo(lockerInfo, infoBuilder); - } - - ops.emplace_back(infoBuilder.obj()); - } - - return ops; -} - std::string PipelineD::MongoDInterface::getShardName(OperationContext* opCtx) const { if (ShardingState::get(opCtx)->enabled()) { return ShardingState::get(opCtx)->getShardName(); @@ -890,6 +814,23 @@ boost::optional<Document> PipelineD::MongoDInterface::lookupSingleDocument( return lookedUpDocument; } +BSONObj PipelineD::MongoDInterface::_reportCurrentOpForClient( + OperationContext* opCtx, Client* client, CurrentOpTruncateMode truncateOps) const { + BSONObjBuilder builder; + + CurOp::reportCurrentOpForClient( + opCtx, client, (truncateOps == CurrentOpTruncateMode::kTruncateOps), &builder); + + // Append lock stats before returning. + if (auto clientOpCtx = client->getOperationContext()) { + Locker::LockerInfo lockerInfo; + clientOpCtx->lockState()->getLockerInfo(&lockerInfo); + fillLockerInfo(lockerInfo, builder); + } + + return builder.obj(); +} + std::unique_ptr<CollatorInterface> PipelineD::MongoDInterface::_getCollectionDefaultCollator( OperationContext* opCtx, StringData dbName, UUID collectionUUID) { auto it = _collatorCache.find(collectionUUID); diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h index 60ad684cc7b..630644413df 100644 --- a/src/mongo/db/pipeline/pipeline_d.h +++ b/src/mongo/db/pipeline/pipeline_d.h @@ -35,7 +35,7 @@ #include "mongo/db/dbdirectclient.h" #include "mongo/db/namespace_string.h" #include "mongo/db/pipeline/aggregation_request.h" -#include "mongo/db/pipeline/mongo_process_interface.h" +#include "mongo/db/pipeline/mongo_process_common.h" #include "mongo/db/query/plan_executor.h" namespace mongo { @@ -61,7 +61,7 @@ struct DepsTracker; */ class PipelineD { public: - class MongoDInterface final : public MongoProcessInterface { + class MongoDInterface final : public MongoProcessCommon { public: MongoDInterface(OperationContext* opCtx); @@ -97,10 +97,6 @@ public: const MakePipelineOptions opts = MakePipelineOptions{}) final; Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final; - std::vector<BSONObj> getCurrentOps(OperationContext* opCtx, - CurrentOpConnectionsMode connMode, - CurrentOpUserMode userMode, - CurrentOpTruncateMode truncateMode) const final; std::string getShardName(OperationContext* opCtx) const final; std::vector<FieldPath> collectDocumentKeyFields(OperationContext* opCtx, const NamespaceString& nss, @@ -114,6 +110,11 @@ public: std::vector<GenericCursor> getCursors( const boost::intrusive_ptr<ExpressionContext>& expCtx) const final; + protected: + BSONObj _reportCurrentOpForClient(OperationContext* opCtx, + Client* client, + CurrentOpTruncateMode truncateOps) const final; + private: /** * Looks up the collection default collator for the collection given by 'collectionUUID'. A diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp index 8d754771d8a..fdd402b76bd 100644 --- a/src/mongo/db/query/find.cpp +++ b/src/mongo/db/query/find.cpp @@ -542,7 +542,8 @@ std::string runQuery(OperationContext* opCtx, invariant(!nss.isCommand()); // Set CurOp information. - beginQueryOp(opCtx, nss, q.query, q.ntoreturn, q.ntoskip); + const auto upconvertedQuery = upconvertQueryEntry(q.query, nss, q.ntoreturn, q.ntoskip); + beginQueryOp(opCtx, nss, upconvertedQuery, q.ntoreturn, q.ntoskip); // Parse the qm into a CanonicalQuery. const boost::intrusive_ptr<ExpressionContext> expCtx; @@ -703,7 +704,7 @@ std::string runQuery(OperationContext* opCtx, nss, AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), opCtx->recoveryUnit()->getReadConcernLevel(), - upconvertQueryEntry(q.query, qr.nss(), q.ntoreturn, q.ntoskip)}); + upconvertedQuery}); ccId = pinnedCursor.getCursor()->cursorid(); LOG(5) << "caching executor with cursorid " << ccId << " after returning " << numResults diff --git a/src/mongo/rpc/metadata/client_metadata.cpp b/src/mongo/rpc/metadata/client_metadata.cpp index adcb3711b71..eebf1b6d9dc 100644 --- a/src/mongo/rpc/metadata/client_metadata.cpp +++ b/src/mongo/rpc/metadata/client_metadata.cpp @@ -306,9 +306,26 @@ void ClientMetadata::setMongoSMetadata(StringData hostAndPort, sub.append(kVersion, version); } - _document = builder.obj(); -} + auto document = builder.obj(); + + if (!_appName.empty()) { + // The _appName field points into the existing _document, which we are about to replace. + // We must redirect _appName to point into the new doc *before* replacing the old doc. We + // expect the 'application' metadata of the new document to be identical to the old. + auto appMetaData = document[kApplication]; + invariant(appMetaData.isABSONObj()); + + auto appNameEl = appMetaData[kName]; + invariant(appNameEl.type() == BSONType::String); + + auto appName = appNameEl.valueStringData(); + invariant(appName == _appName); + _appName = appName; + } + + _document = std::move(document); +} void ClientMetadata::serialize(StringData driverName, StringData driverVersion, diff --git a/src/mongo/rpc/metadata/client_metadata_test.cpp b/src/mongo/rpc/metadata/client_metadata_test.cpp index 7d3166127f4..30e96b33bf8 100644 --- a/src/mongo/rpc/metadata/client_metadata_test.cpp +++ b/src/mongo/rpc/metadata/client_metadata_test.cpp @@ -313,6 +313,8 @@ TEST(ClientMetadatTest, TestMongoSAppend) { ASSERT_EQUALS("g", swParseStatus.getValue().get().getApplicationName()); swParseStatus.getValue().get().setMongoSMetadata("h", "i", "j"); + ASSERT_EQUALS("g", swParseStatus.getValue().get().getApplicationName()); + auto doc = swParseStatus.getValue().get().getDocument(); constexpr auto kMongos = "mongos"_sd; diff --git a/src/mongo/s/commands/cluster_getmore_cmd.cpp b/src/mongo/s/commands/cluster_getmore_cmd.cpp index 337e6cab708..70e8bdc7048 100644 --- a/src/mongo/s/commands/cluster_getmore_cmd.cpp +++ b/src/mongo/s/commands/cluster_getmore_cmd.cpp @@ -50,6 +50,9 @@ class ClusterGetMoreCmd final : public BasicCommand { public: ClusterGetMoreCmd() : BasicCommand("getMore") {} + std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const final { + return GetMoreRequest::parseNs(dbname, cmdObj).ns(); + } virtual bool supportsWriteConcern(const BSONObj& cmd) const override { return false; diff --git a/src/mongo/s/commands/cluster_pipeline_cmd.cpp b/src/mongo/s/commands/cluster_pipeline_cmd.cpp index 816c94ec087..03edb25f56c 100644 --- a/src/mongo/s/commands/cluster_pipeline_cmd.cpp +++ b/src/mongo/s/commands/cluster_pipeline_cmd.cpp @@ -47,6 +47,10 @@ public: "http://dochub.mongodb.org/core/aggregation for more details."; } + std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const final { + return AggregationRequest::parseNs(dbname, cmdObj).ns(); + } + AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { return AllowedOnSecondary::kAlways; } diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp index 11130f2d4ec..f23ae411854 100644 --- a/src/mongo/s/commands/cluster_write_cmd.cpp +++ b/src/mongo/s/commands/cluster_write_cmd.cpp @@ -147,6 +147,19 @@ public: return true; } + LogicalOp getLogicalOp() const { + switch (_writeType) { + case BatchedCommandRequest::BatchType::BatchType_Insert: + return LogicalOp::opInsert; + case BatchedCommandRequest::BatchType::BatchType_Delete: + return LogicalOp::opDelete; + case BatchedCommandRequest::BatchType::BatchType_Update: + return LogicalOp::opUpdate; + } + + MONGO_UNREACHABLE; + } + Status checkAuthForRequest(OperationContext* opCtx, const OpMsgRequest& request) const final { Status status = auth::checkAuthForWriteCommand( AuthorizationSession::get(opCtx->getClient()), _writeType, request); diff --git a/src/mongo/s/commands/pipeline_s.cpp b/src/mongo/s/commands/pipeline_s.cpp index 791cb7fa2f7..7d631715cba 100644 --- a/src/mongo/s/commands/pipeline_s.cpp +++ b/src/mongo/s/commands/pipeline_s.cpp @@ -30,6 +30,7 @@ #include "mongo/s/commands/pipeline_s.h" +#include "mongo/db/curop.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/query/collation/collation_spec.h" #include "mongo/db/repl/read_concern_args.h" @@ -184,6 +185,16 @@ boost::optional<Document> PipelineS::MongoSInterface::lookupSingleDocument( return (!batch.empty() ? Document(batch.front()) : boost::optional<Document>{}); } +BSONObj PipelineS::MongoSInterface::_reportCurrentOpForClient( + OperationContext* opCtx, Client* client, CurrentOpTruncateMode truncateOps) const { + BSONObjBuilder builder; + + CurOp::reportCurrentOpForClient( + opCtx, client, (truncateOps == CurrentOpTruncateMode::kTruncateOps), &builder); + + return builder.obj(); +} + std::vector<GenericCursor> PipelineS::MongoSInterface::getCursors( const intrusive_ptr<ExpressionContext>& expCtx) const { invariant(hasGlobalServiceContext()); diff --git a/src/mongo/s/commands/pipeline_s.h b/src/mongo/s/commands/pipeline_s.h index cdf72158e31..c0955d491d1 100644 --- a/src/mongo/s/commands/pipeline_s.h +++ b/src/mongo/s/commands/pipeline_s.h @@ -28,7 +28,7 @@ #pragma once -#include "mongo/db/pipeline/mongo_process_interface.h" +#include "mongo/db/pipeline/mongo_process_common.h" #include "mongo/db/pipeline/pipeline.h" namespace mongo { @@ -43,7 +43,7 @@ public: * Class to provide access to mongos-specific implementations of methods required by some * document sources. */ - class MongoSInterface final : public MongoProcessInterface { + class MongoSInterface final : public MongoProcessCommon { public: MongoSInterface() = default; @@ -51,6 +51,16 @@ public: void setOperationContext(OperationContext* opCtx) final {} + boost::optional<Document> lookupSingleDocument( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& nss, + UUID collectionUUID, + const Document& documentKey, + boost::optional<BSONObj> readConcern) final; + + std::vector<GenericCursor> getCursors( + const boost::intrusive_ptr<ExpressionContext>& expCtx) const final; + DBClientBase* directClient() final { MONGO_UNREACHABLE; } @@ -108,13 +118,6 @@ public: MONGO_UNREACHABLE; } - std::vector<BSONObj> getCurrentOps(OperationContext* opCtx, - CurrentOpConnectionsMode connMode, - CurrentOpUserMode userMode, - CurrentOpTruncateMode truncateMode) const final { - MONGO_UNREACHABLE; - } - std::string getShardName(OperationContext* opCtx) const final { MONGO_UNREACHABLE; } @@ -132,15 +135,10 @@ public: MONGO_UNREACHABLE; } - boost::optional<Document> lookupSingleDocument( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& nss, - UUID collectionUUID, - const Document& documentKey, - boost::optional<BSONObj> readConcern) final; - - std::vector<GenericCursor> getCursors( - const boost::intrusive_ptr<ExpressionContext>& expCtx) const final; + protected: + BSONObj _reportCurrentOpForClient(OperationContext* opCtx, + Client* client, + CurrentOpTruncateMode truncateOps) const final; }; private: diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp index 8c7491772f1..d3ea2c636e0 100644 --- a/src/mongo/s/commands/strategy.cpp +++ b/src/mongo/s/commands/strategy.cpp @@ -41,6 +41,7 @@ #include "mongo/db/auth/action_type.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/commands.h" +#include "mongo/db/curop.h" #include "mongo/db/initialize_operation_session_info.h" #include "mongo/db/lasterror.h" #include "mongo/db/logical_clock.h" @@ -275,7 +276,10 @@ void execCommandClient(OperationContext* opCtx, } } -void runCommand(OperationContext* opCtx, const OpMsgRequest& request, BSONObjBuilder&& builder) { +void runCommand(OperationContext* opCtx, + const OpMsgRequest& request, + const NetworkOp opType, + BSONObjBuilder&& builder) { // Handle command option maxTimeMS first thing while processing the command so that the // subsequent code has the deadline available uassert(ErrorCodes::InvalidOptions, @@ -299,6 +303,15 @@ void runCommand(OperationContext* opCtx, const OpMsgRequest& request, BSONObjBui return; } + // Set the logical optype, command object and namespace as soon as we identify the command. If + // the command does not define a fully-qualified namespace, set CurOp to the generic command + // namespace db.$cmd. + auto ns = command->parseNs(request.getDatabase().toString(), request.body); + auto nss = (request.getDatabase() == ns ? NamespaceString(ns, "$cmd") : NamespaceString(ns)); + + // Fill out all currentOp details. + CurOp::get(opCtx)->setGenericOpRequestDetails(opCtx, nss, command, request.body, opType); + initializeOperationSessionInfo(opCtx, request.body, command->requiresAuth(), true, true); auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); @@ -368,6 +381,12 @@ DbResponse Strategy::queryOp(OperationContext* opCtx, const NamespaceString& nss const QueryMessage q(*dbm); + const auto upconvertedQuery = upconvertQueryEntry(q.query, nss, q.ntoreturn, q.ntoskip); + + // Set the upconverted query as the CurOp command object. + CurOp::get(opCtx)->setGenericOpRequestDetails( + opCtx, nss, nullptr, upconvertedQuery, dbm->msg().operation()); + Client* const client = opCtx->getClient(); AuthorizationSession* const authSession = AuthorizationSession::get(client); @@ -475,7 +494,7 @@ DbResponse Strategy::clientCommand(OperationContext* opCtx, const Message& m) { try { // Execute. LOG(3) << "Command begin db: " << db << " msg id: " << m.header().getId(); - runCommand(opCtx, request, reply->getInPlaceReplyBuilder(0)); + runCommand(opCtx, request, m.operation(), reply->getInPlaceReplyBuilder(0)); LOG(3) << "Command end db: " << db << " msg id: " << m.header().getId(); } catch (const DBException& ex) { LOG(1) << "Exception thrown while processing command on " << db @@ -549,6 +568,10 @@ DbResponse Strategy::getMore(OperationContext* opCtx, const NamespaceString& nss GetMoreRequest getMoreRequest(nss, cursorId, batchSize, boost::none, boost::none, boost::none); + // Set the upconverted getMore as the CurOp command object. + CurOp::get(opCtx)->setGenericOpRequestDetails( + opCtx, nss, nullptr, getMoreRequest.toBSON(), dbm->msg().operation()); + auto cursorResponse = ClusterFind::runGetMore(opCtx, getMoreRequest); if (cursorResponse == ErrorCodes::CursorNotFound) { return replyToQuery(ResultFlag_CursorNotFound, nullptr, 0, 0); @@ -644,6 +667,7 @@ void Strategy::writeOp(OperationContext* opCtx, DbMessage* dbm) { MONGO_UNREACHABLE; } }(), + dbm->msg().operation(), BSONObjBuilder{bb}); // built object is ignored } diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript index 4ff2e432ae6..ec2cb77da8a 100644 --- a/src/mongo/s/query/SConscript +++ b/src/mongo/s/query/SConscript @@ -12,6 +12,7 @@ env.Library( ], LIBDEPS=[ '$BUILD_DIR/mongo/db/commands', + '$BUILD_DIR/mongo/db/curop', '$BUILD_DIR/mongo/db/query/query_common', "cluster_client_cursor", "cluster_cursor_cleanup_job", diff --git a/src/mongo/s/query/cluster_client_cursor.h b/src/mongo/s/query/cluster_client_cursor.h index a3b680e9de3..ccc44f7a9a0 100644 --- a/src/mongo/s/query/cluster_client_cursor.h +++ b/src/mongo/s/query/cluster_client_cursor.h @@ -105,6 +105,11 @@ public: virtual bool isTailableAndAwaitData() const = 0; /** + * Returns the original command object which created this cursor. + */ + virtual BSONObj getOriginatingCommand() const = 0; + + /** * Returns the number of result documents returned so far by this cursor via the next() method. */ virtual long long getNumReturnedSoFar() const = 0; diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp index 10152522f59..375be3176c0 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.cpp +++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp @@ -142,6 +142,10 @@ bool ClusterClientCursorImpl::isTailableAndAwaitData() const { return _params.tailableMode == TailableMode::kTailableAndAwaitData; } +BSONObj ClusterClientCursorImpl::getOriginatingCommand() const { + return _params.originatingCommandObj; +} + long long ClusterClientCursorImpl::getNumReturnedSoFar() const { return _numReturnedSoFar; } diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h index e2ffee6b869..877910387d5 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.h +++ b/src/mongo/s/query/cluster_client_cursor_impl.h @@ -103,6 +103,8 @@ public: bool isTailableAndAwaitData() const final; + BSONObj getOriginatingCommand() const final; + long long getNumReturnedSoFar() const final; void queueResult(const ClusterQueryResult& result) final; diff --git a/src/mongo/s/query/cluster_client_cursor_mock.cpp b/src/mongo/s/query/cluster_client_cursor_mock.cpp index 1fa270bfd7e..94f8af9e414 100644 --- a/src/mongo/s/query/cluster_client_cursor_mock.cpp +++ b/src/mongo/s/query/cluster_client_cursor_mock.cpp @@ -64,6 +64,10 @@ StatusWith<ClusterQueryResult> ClusterClientCursorMock::next( return out.getValue(); } +BSONObj ClusterClientCursorMock::getOriginatingCommand() const { + return _originatingCommand; +} + long long ClusterClientCursorMock::getNumReturnedSoFar() const { return _numReturnedSoFar; } diff --git a/src/mongo/s/query/cluster_client_cursor_mock.h b/src/mongo/s/query/cluster_client_cursor_mock.h index 81fb52f15b4..607250bba46 100644 --- a/src/mongo/s/query/cluster_client_cursor_mock.h +++ b/src/mongo/s/query/cluster_client_cursor_mock.h @@ -67,6 +67,8 @@ public: bool isTailableAndAwaitData() const final; + BSONObj getOriginatingCommand() const final; + long long getNumReturnedSoFar() const final; void queueResult(const ClusterQueryResult& result) final; @@ -96,6 +98,9 @@ private: std::queue<StatusWith<ClusterQueryResult>> _resultsQueue; stdx::function<void(void)> _killCallback; + // Originating command object. + BSONObj _originatingCommand; + // Number of returned documents. long long _numReturnedSoFar = 0; diff --git a/src/mongo/s/query/cluster_client_cursor_params.h b/src/mongo/s/query/cluster_client_cursor_params.h index d213b0ea73f..116634bcee8 100644 --- a/src/mongo/s/query/cluster_client_cursor_params.h +++ b/src/mongo/s/query/cluster_client_cursor_params.h @@ -88,6 +88,9 @@ struct ClusterClientCursorParams { // Namespace against which the cursors exist. NamespaceString nsString; + // The original command object which generated this cursor. Must either be empty or owned. + BSONObj originatingCommandObj; + // Per-remote node data. std::vector<RemoteCursor> remotes; diff --git a/src/mongo/s/query/cluster_cursor_manager.cpp b/src/mongo/s/query/cluster_cursor_manager.cpp index 8d853eb3e57..2275ccbcb8f 100644 --- a/src/mongo/s/query/cluster_cursor_manager.cpp +++ b/src/mongo/s/query/cluster_cursor_manager.cpp @@ -142,6 +142,11 @@ void ClusterCursorManager::PinnedCursor::returnCursor(CursorState cursorState) { *this = PinnedCursor(); } +BSONObj ClusterCursorManager::PinnedCursor::getOriginatingCommand() const { + invariant(_cursor); + return _cursor->getOriginatingCommand(); +} + CursorId ClusterCursorManager::PinnedCursor::getCursorId() const { return _cursorId; } diff --git a/src/mongo/s/query/cluster_cursor_manager.h b/src/mongo/s/query/cluster_cursor_manager.h index bb7bc857130..11f67ff533a 100644 --- a/src/mongo/s/query/cluster_cursor_manager.h +++ b/src/mongo/s/query/cluster_cursor_manager.h @@ -190,6 +190,11 @@ public: void returnCursor(CursorState cursorState); /** + * Returns the command object which originally created this cursor. + */ + BSONObj getOriginatingCommand() const; + + /** * Returns the cursor id for the underlying cursor, or zero if no cursor is owned. */ CursorId getCursorId() const; diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index acab84e033d..71a3ef4f922 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -41,6 +41,7 @@ #include "mongo/client/read_preference.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/commands.h" +#include "mongo/db/curop.h" #include "mongo/db/query/canonical_query.h" #include "mongo/db/query/find_common.h" #include "mongo/db/query/getmore_request.h" @@ -191,6 +192,7 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx, // Construct the query and parameters. ClusterClientCursorParams params(query.nss(), readPref); + params.originatingCommandObj = CurOp::get(opCtx)->opDescription().getOwned(); params.limit = query.getQueryRequest().getLimit(); params.batchSize = query.getQueryRequest().getEffectiveBatchSize(); params.skip = query.getQueryRequest().getSkip(); @@ -402,6 +404,14 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx, } invariant(request.cursorid == pinnedCursor.getValue().getCursorId()); + // Set the originatingCommand object and the cursorID in CurOp. + { + CurOp::get(opCtx)->debug().cursorid = request.cursorid; + stdx::lock_guard<Client> lk(*opCtx->getClient()); + CurOp::get(opCtx)->setOriginatingCommand_inlock( + pinnedCursor.getValue().getOriginatingCommand()); + } + // If the fail point is enabled, busy wait until it is disabled. while (MONGO_FAIL_POINT(waitAfterPinningCursorBeforeGetMoreBatch)) { } |