summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2018-03-14 17:46:22 +0000
committerBernard Gorman <bernard.gorman@gmail.com>2018-03-15 17:33:50 +0000
commit5ecf2c0a5bffa837c96ad20dea23a94c5165739a (patch)
tree254b1baa62ecd3092b8962778fcc0480c066de03 /src
parent8c639f958eb5edff4d52560e4c3dfe04c7a78f31 (diff)
downloadmongo-5ecf2c0a5bffa837c96ad20dea23a94c5165739a.tar.gz
SERVER-18094 Add 'localOps' parameter to $currentOp to show local mongoS operations
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/curop.cpp109
-rw-r--r--src/mongo/db/curop.h36
-rw-r--r--src/mongo/db/ops/write_ops_exec.cpp4
-rw-r--r--src/mongo/db/pipeline/SConscript1
-rw-r--r--src/mongo/db/pipeline/document_source_current_op.cpp59
-rw-r--r--src/mongo/db/pipeline/document_source_current_op.h38
-rw-r--r--src/mongo/db/pipeline/document_source_current_op_test.cpp36
-rw-r--r--src/mongo/db/pipeline/mongo_process_common.cpp74
-rw-r--r--src/mongo/db/pipeline/mongo_process_common.h62
-rw-r--r--src/mongo/db/pipeline/mongo_process_interface.h1
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp95
-rw-r--r--src/mongo/db/pipeline/pipeline_d.h13
-rw-r--r--src/mongo/db/query/find.cpp5
-rw-r--r--src/mongo/rpc/metadata/client_metadata.cpp21
-rw-r--r--src/mongo/rpc/metadata/client_metadata_test.cpp2
-rw-r--r--src/mongo/s/commands/cluster_getmore_cmd.cpp3
-rw-r--r--src/mongo/s/commands/cluster_pipeline_cmd.cpp4
-rw-r--r--src/mongo/s/commands/cluster_write_cmd.cpp13
-rw-r--r--src/mongo/s/commands/pipeline_s.cpp11
-rw-r--r--src/mongo/s/commands/pipeline_s.h34
-rw-r--r--src/mongo/s/commands/strategy.cpp28
-rw-r--r--src/mongo/s/query/SConscript1
-rw-r--r--src/mongo/s/query/cluster_client_cursor.h5
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.cpp4
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.h2
-rw-r--r--src/mongo/s/query/cluster_client_cursor_mock.cpp4
-rw-r--r--src/mongo/s/query/cluster_client_cursor_mock.h5
-rw-r--r--src/mongo/s/query/cluster_client_cursor_params.h3
-rw-r--r--src/mongo/s/query/cluster_cursor_manager.cpp5
-rw-r--r--src/mongo/s/query/cluster_cursor_manager.h5
-rw-r--r--src/mongo/s/query/cluster_find.cpp10
31 files changed, 502 insertions, 191 deletions
diff --git a/src/mongo/db/curop.cpp b/src/mongo/db/curop.cpp
index ea71baacde0..3eb90f98ff2 100644
--- a/src/mongo/db/curop.cpp
+++ b/src/mongo/db/curop.cpp
@@ -45,6 +45,7 @@
#include "mongo/rpc/metadata/client_metadata.h"
#include "mongo/rpc/metadata/client_metadata_ismaster.h"
#include "mongo/util/log.h"
+#include "mongo/util/net/sock.h"
#include "mongo/util/stringutils.h"
namespace mongo {
@@ -224,6 +225,49 @@ CurOp* CurOp::get(const OperationContext& opCtx) {
return _curopStack(opCtx).top();
}
+void CurOp::reportCurrentOpForClient(OperationContext* opCtx,
+ Client* client,
+ bool truncateOps,
+ BSONObjBuilder* infoBuilder) {
+ invariant(client);
+ OperationContext* clientOpCtx = client->getOperationContext();
+
+ const std::string hostName = getHostNameCachedAndPort();
+ infoBuilder->append("host", hostName);
+
+ client->reportState(*infoBuilder);
+ const auto& clientMetadata = ClientMetadataIsMasterState::get(client).getClientMetadata();
+
+ if (clientMetadata) {
+ auto appName = clientMetadata.get().getApplicationName();
+ if (!appName.empty()) {
+ infoBuilder->append("appName", appName);
+ }
+
+ auto clientMetadataDocument = clientMetadata.get().getDocument();
+ infoBuilder->append("clientMetadata", clientMetadataDocument);
+ }
+
+ // Fill out the rest of the BSONObj with opCtx specific details.
+ infoBuilder->appendBool("active", static_cast<bool>(clientOpCtx));
+ infoBuilder->append("currentOpTime",
+ opCtx->getServiceContext()->getPreciseClockSource()->now().toString());
+
+ if (clientOpCtx) {
+ infoBuilder->append("opid", clientOpCtx->getOpID());
+ if (clientOpCtx->isKillPending()) {
+ infoBuilder->append("killPending", true);
+ }
+
+ if (clientOpCtx->getLogicalSessionId()) {
+ BSONObjBuilder bob(infoBuilder->subobjStart("lsid"));
+ clientOpCtx->getLogicalSessionId()->serialize(&bob);
+ }
+
+ CurOp::get(clientOpCtx)->reportState(infoBuilder, truncateOps);
+ }
+}
+
CurOp::CurOp(OperationContext* opCtx) : CurOp(opCtx, &_curopStack(opCtx)) {}
CurOp::CurOp(OperationContext* opCtx, CurOpStack* stack) : _stack(stack) {
@@ -234,6 +278,26 @@ CurOp::CurOp(OperationContext* opCtx, CurOpStack* stack) : _stack(stack) {
}
}
+CurOp::~CurOp() {
+ invariant(this == _stack->pop());
+}
+
+void CurOp::setGenericOpRequestDetails(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const Command* command,
+ BSONObj cmdObj,
+ NetworkOp op) {
+ auto logicalOp = (command ? command->getLogicalOp() : networkOpToLogicalOp(op));
+
+ stdx::lock_guard<Client> clientLock(*opCtx->getClient());
+ _isCommand = _debug.iscommand = (command != nullptr);
+ _logicalOp = _debug.logicalOp = logicalOp;
+ _networkOp = _debug.networkOp = op;
+ _opDescription = cmdObj;
+ _command = command;
+ _ns = nss.ns();
+}
+
ProgressMeter& CurOp::setMessage_inlock(const char* msg,
std::string name,
unsigned long long progressMeterTotal,
@@ -252,10 +316,6 @@ ProgressMeter& CurOp::setMessage_inlock(const char* msg,
return _progressMeter;
}
-CurOp::~CurOp() {
- invariant(this == _stack->pop());
-}
-
void CurOp::setNS_inlock(StringData ns) {
_ns = ns.toString();
}
@@ -354,23 +414,7 @@ void CurOp::reportState(BSONObjBuilder* builder, bool truncateOps) {
// is true, limit the size of each op to 1000 bytes. Otherwise, do not truncate.
const boost::optional<size_t> maxQuerySize{truncateOps, 1000};
- if (!_command && _networkOp == dbQuery) {
- // This is a legacy OP_QUERY. We upconvert the "query" field of the currentOp output to look
- // similar to a find command.
- //
- // CurOp doesn't have access to the ntoreturn or ntoskip values. By setting them to zero, we
- // will omit mention of them in the currentOp output.
- const int ntoreturn = 0;
- const int ntoskip = 0;
-
- appendAsObjOrString(
- "command",
- upconvertQueryEntry(_opDescription, NamespaceString(_ns), ntoreturn, ntoskip),
- maxQuerySize,
- builder);
- } else {
- appendAsObjOrString("command", _opDescription, maxQuerySize, builder);
- }
+ appendAsObjOrString("command", _opDescription, maxQuerySize, builder);
if (!_originatingCommand.isEmpty()) {
appendAsObjOrString("originatingCommand", _originatingCommand, maxQuerySize, builder);
@@ -436,21 +480,11 @@ string OpDebug::report(Client* client,
}
}
- BSONObj query;
-
- // If necessary, upconvert legacy find operations so that their log lines resemble their find
- // command counterpart.
- if (!iscommand && networkOp == dbQuery) {
- query = upconvertQueryEntry(
- curop.opDescription(), NamespaceString(curop.getNS()), ntoreturn, ntoskip);
- } else {
- query = curop.opDescription();
- }
-
+ auto query = curop.opDescription();
if (!query.isEmpty()) {
s << " command: ";
if (iscommand) {
- Command* curCommand = curop.getCommand();
+ const Command* curCommand = curop.getCommand();
if (curCommand) {
mutablebson::Document cmdToLog(query, mutablebson::Document::kInPlaceDisabled);
curCommand->redactForLogging(&cmdToLog);
@@ -555,14 +589,7 @@ void OpDebug::append(const CurOp& curop,
NamespaceString nss = NamespaceString(curop.getNS());
b.append("ns", nss.ns());
- if (!iscommand && networkOp == dbQuery) {
- appendAsObjOrString("command",
- upconvertQueryEntry(curop.opDescription(), nss, ntoreturn, ntoskip),
- maxElementSize,
- &b);
- } else if (curop.haveOpDescription()) {
- appendAsObjOrString("command", curop.opDescription(), maxElementSize, &b);
- }
+ appendAsObjOrString("command", curop.opDescription(), maxElementSize, &b);
auto originatingCommand = curop.originatingCommand();
if (!originatingCommand.isEmpty()) {
diff --git a/src/mongo/db/curop.h b/src/mongo/db/curop.h
index 046516aaed2..bdb30791060 100644
--- a/src/mongo/db/curop.h
+++ b/src/mongo/db/curop.h
@@ -158,11 +158,34 @@ public:
static CurOp* get(const OperationContext& opCtx);
/**
+ * Writes a report of the operation being executed by the given client to the supplied
+ * BSONObjBuilder, in a format suitable for display in currentOp. Does not include a lockInfo
+ * report, since this may be called in either a mongoD or mongoS context and the latter does not
+ * supply lock stats. The client must be locked before calling this method.
+ */
+ static void reportCurrentOpForClient(OperationContext* opCtx,
+ Client* client,
+ bool truncateOps,
+ BSONObjBuilder* infoBuilder);
+
+ /**
* Constructs a nested CurOp at the top of the given "opCtx"'s CurOp stack.
*/
explicit CurOp(OperationContext* opCtx);
~CurOp();
+ /**
+ * Fills out CurOp and OpDebug with basic info common to all commands. We require the NetworkOp
+ * in order to distinguish which protocol delivered this request, e.g. OP_QUERY or OP_MSG. This
+ * is set early in the request processing backend and does not typically need to be called
+ * thereafter. Locks the client as needed to apply the specified settings.
+ */
+ void setGenericOpRequestDetails(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const Command* command,
+ BSONObj cmdObj,
+ NetworkOp op);
+
bool haveOpDescription() const {
return !_opDescription.isEmpty();
}
@@ -265,6 +288,13 @@ public:
}
/**
+ * Returns true if this CurOp represents a non-command OP_QUERY request.
+ */
+ bool isLegacyQuery() const {
+ return _networkOp == NetworkOp::dbQuery && !isCommand();
+ }
+
+ /**
* Returns true if the current operation is known to be a command.
*/
bool isCommand() const {
@@ -369,10 +399,10 @@ public:
_originatingCommand = commandObj.getOwned();
}
- Command* getCommand() const {
+ const Command* getCommand() const {
return _command;
}
- void setCommand_inlock(Command* command) {
+ void setCommand_inlock(const Command* command) {
_command = command;
}
@@ -455,7 +485,7 @@ private:
CurOpStack* _stack;
CurOp* _parent{nullptr};
- Command* _command{nullptr};
+ const Command* _command{nullptr};
// The time at which this CurOp instance was marked as started.
long long _start{0};
diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp
index bcef4903325..0e761e36290 100644
--- a/src/mongo/db/ops/write_ops_exec.cpp
+++ b/src/mongo/db/ops/write_ops_exec.cpp
@@ -693,7 +693,7 @@ WriteResult performUpdates(OperationContext* opCtx, const write_ops::Update& who
// TODO: don't create nested CurOp for legacy writes.
// Add Command pointer to the nested CurOp.
auto& parentCurOp = *CurOp::get(opCtx);
- Command* cmd = parentCurOp.getCommand();
+ const Command* cmd = parentCurOp.getCommand();
CurOp curOp(opCtx);
{
stdx::lock_guard<Client> lk(*opCtx->getClient());
@@ -833,7 +833,7 @@ WriteResult performDeletes(OperationContext* opCtx, const write_ops::Delete& who
// TODO: don't create nested CurOp for legacy writes.
// Add Command pointer to the nested CurOp.
auto& parentCurOp = *CurOp::get(opCtx);
- Command* cmd = parentCurOp.getCommand();
+ const Command* cmd = parentCurOp.getCommand();
CurOp curOp(opCtx);
{
stdx::lock_guard<Client> lk(*opCtx->getClient());
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 1019ee08d1c..5f25c2c6545 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -302,6 +302,7 @@ pipelineeEnv.Library(
'document_source_sort_by_count.cpp',
'document_source_tee_consumer.cpp',
'document_source_unwind.cpp',
+ 'mongo_process_common.cpp',
'pipeline.cpp',
'sequential_document_cache.cpp',
'tee_buffer.cpp',
diff --git a/src/mongo/db/pipeline/document_source_current_op.cpp b/src/mongo/db/pipeline/document_source_current_op.cpp
index c650f9ae0b9..f7c6dc2e171 100644
--- a/src/mongo/db/pipeline/document_source_current_op.cpp
+++ b/src/mongo/db/pipeline/document_source_current_op.cpp
@@ -37,6 +37,7 @@ namespace mongo {
namespace {
const StringData kAllUsersFieldName = "allUsers"_sd;
const StringData kIdleConnectionsFieldName = "idleConnections"_sd;
+const StringData kLocalOpsFieldName = "localOps"_sd;
const StringData kTruncateOpsFieldName = "truncateOps"_sd;
const StringData kOpIdFieldName = "opid"_sd;
@@ -60,11 +61,14 @@ std::unique_ptr<DocumentSourceCurrentOp::LiteParsed> DocumentSourceCurrentOp::Li
<< typeName(spec.type()));
}
- bool allUsers = false;
+ auto allUsers = UserMode::kExcludeOthers;
+ auto localOps = LocalOpsMode::kRemoteShardOps;
// Check the spec for all fields named 'allUsers'. If any of them are 'true', we require
// the 'inprog' privilege. This avoids the possibility that a spec with multiple
- // allUsers fields might allow an unauthorized user to view all operations.
+ // allUsers fields might allow an unauthorized user to view all operations. We also check for
+ // the presence of a 'localOps' field, which instructs this $currentOp to list local mongoS
+ // operations rather than forwarding the request to the shards.
for (auto&& elem : spec.embeddedObject()) {
if (elem.fieldNameStringData() == "allUsers"_sd) {
if (elem.type() != BSONType::Bool) {
@@ -74,11 +78,23 @@ std::unique_ptr<DocumentSourceCurrentOp::LiteParsed> DocumentSourceCurrentOp::Li
<< typeName(elem.type()));
}
- allUsers = allUsers || elem.boolean();
+ if (elem.boolean()) {
+ allUsers = UserMode::kIncludeAll;
+ }
+ } else if (elem.fieldNameStringData() == "localOps") {
+ uassert(ErrorCodes::TypeMismatch,
+ str::stream() << "The 'localOps' parameter of the $currentOp stage must be a "
+ "boolean value, but found: "
+ << typeName(elem.type()),
+ elem.type() == BSONType::Bool);
+
+ if (elem.boolean()) {
+ localOps = LocalOpsMode::kLocalMongosOps;
+ }
}
}
- return stdx::make_unique<DocumentSourceCurrentOp::LiteParsed>(allUsers);
+ return stdx::make_unique<DocumentSourceCurrentOp::LiteParsed>(allUsers, localOps);
}
@@ -163,6 +179,7 @@ intrusive_ptr<DocumentSource> DocumentSourceCurrentOp::createFromBson(
ConnMode includeIdleConnections = ConnMode::kExcludeIdle;
UserMode includeOpsFromAllUsers = UserMode::kExcludeOthers;
+ LocalOpsMode showLocalOpsOnMongoS = LocalOpsMode::kRemoteShardOps;
TruncationMode truncateOps = TruncationMode::kNoTruncation;
for (auto&& elem : spec.embeddedObject()) {
@@ -175,7 +192,7 @@ intrusive_ptr<DocumentSource> DocumentSourceCurrentOp::createFromBson(
<< typeName(elem.type()),
elem.type() == BSONType::Bool);
includeIdleConnections =
- (elem.Bool() ? ConnMode::kIncludeIdle : ConnMode::kExcludeIdle);
+ (elem.boolean() ? ConnMode::kIncludeIdle : ConnMode::kExcludeIdle);
} else if (fieldName == kAllUsersFieldName) {
uassert(ErrorCodes::FailedToParse,
str::stream() << "The 'allUsers' parameter of the $currentOp stage must be a "
@@ -183,7 +200,15 @@ intrusive_ptr<DocumentSource> DocumentSourceCurrentOp::createFromBson(
<< typeName(elem.type()),
elem.type() == BSONType::Bool);
includeOpsFromAllUsers =
- (elem.Bool() ? UserMode::kIncludeAll : UserMode::kExcludeOthers);
+ (elem.boolean() ? UserMode::kIncludeAll : UserMode::kExcludeOthers);
+ } else if (fieldName == kLocalOpsFieldName) {
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << "The 'localOps' parameter of the $currentOp stage must be "
+ "a boolean value, but found: "
+ << typeName(elem.type()),
+ elem.type() == BSONType::Bool);
+ showLocalOpsOnMongoS =
+ (elem.boolean() ? LocalOpsMode::kLocalMongosOps : LocalOpsMode::kRemoteShardOps);
} else if (fieldName == kTruncateOpsFieldName) {
uassert(ErrorCodes::FailedToParse,
str::stream() << "The 'truncateOps' parameter of the $currentOp stage must be "
@@ -191,7 +216,7 @@ intrusive_ptr<DocumentSource> DocumentSourceCurrentOp::createFromBson(
<< typeName(elem.type()),
elem.type() == BSONType::Bool);
truncateOps =
- (elem.Bool() ? TruncationMode::kTruncateOps : TruncationMode::kNoTruncation);
+ (elem.boolean() ? TruncationMode::kTruncateOps : TruncationMode::kNoTruncation);
} else {
uasserted(ErrorCodes::FailedToParse,
str::stream() << "Unrecognized option '" << fieldName
@@ -199,24 +224,30 @@ intrusive_ptr<DocumentSource> DocumentSourceCurrentOp::createFromBson(
}
}
- return intrusive_ptr<DocumentSourceCurrentOp>(new DocumentSourceCurrentOp(
- pExpCtx, includeIdleConnections, includeOpsFromAllUsers, truncateOps));
+ return new DocumentSourceCurrentOp(
+ pExpCtx, includeIdleConnections, includeOpsFromAllUsers, showLocalOpsOnMongoS, truncateOps);
}
intrusive_ptr<DocumentSourceCurrentOp> DocumentSourceCurrentOp::create(
const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
ConnMode includeIdleConnections,
UserMode includeOpsFromAllUsers,
+ LocalOpsMode showLocalOpsOnMongoS,
TruncationMode truncateOps) {
- return intrusive_ptr<DocumentSourceCurrentOp>(new DocumentSourceCurrentOp(
- pExpCtx, includeIdleConnections, includeOpsFromAllUsers, truncateOps));
+ return new DocumentSourceCurrentOp(
+ pExpCtx, includeIdleConnections, includeOpsFromAllUsers, showLocalOpsOnMongoS, truncateOps);
}
Value DocumentSourceCurrentOp::serialize(boost::optional<ExplainOptions::Verbosity> explain) const {
return Value(Document{
{getSourceName(),
- Document{{kIdleConnectionsFieldName, (_includeIdleConnections == ConnMode::kIncludeIdle)},
- {kAllUsersFieldName, (_includeOpsFromAllUsers == UserMode::kIncludeAll)},
- {kTruncateOpsFieldName, (_truncateOps == TruncationMode::kTruncateOps)}}}});
+ Document{{kIdleConnectionsFieldName,
+ _includeIdleConnections == ConnMode::kIncludeIdle ? Value(true) : Value()},
+ {kAllUsersFieldName,
+ _includeOpsFromAllUsers == UserMode::kIncludeAll ? Value(true) : Value()},
+ {kLocalOpsFieldName,
+ _showLocalOpsOnMongoS == LocalOpsMode::kLocalMongosOps ? Value(true) : Value()},
+ {kTruncateOpsFieldName,
+ _truncateOps == TruncationMode::kTruncateOps ? Value(true) : Value()}}}});
}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_current_op.h b/src/mongo/db/pipeline/document_source_current_op.h
index cb180a99085..d655c0ed2bf 100644
--- a/src/mongo/db/pipeline/document_source_current_op.h
+++ b/src/mongo/db/pipeline/document_source_current_op.h
@@ -34,12 +34,18 @@ namespace mongo {
class DocumentSourceCurrentOp final : public DocumentSource {
public:
+ using TruncationMode = MongoProcessInterface::CurrentOpTruncateMode;
+ using ConnMode = MongoProcessInterface::CurrentOpConnectionsMode;
+ using LocalOpsMode = MongoProcessInterface::CurrentOpLocalOpsMode;
+ using UserMode = MongoProcessInterface::CurrentOpUserMode;
+
class LiteParsed final : public LiteParsedDocumentSource {
public:
static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request,
const BSONElement& spec);
- explicit LiteParsed(bool allUsers) : _allUsers(allUsers) {}
+ LiteParsed(UserMode allUsers, LocalOpsMode localOps)
+ : _allUsers(allUsers), _localOps(localOps) {}
stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final {
return stdx::unordered_set<NamespaceString>();
@@ -48,30 +54,39 @@ public:
PrivilegeVector requiredPrivileges(bool isMongos) const final {
PrivilegeVector privileges;
- // In a sharded cluster, we always need the inprog privilege to run $currentOp.
- if (isMongos || _allUsers) {
+ // In a sharded cluster, we always need the inprog privilege to run $currentOp on the
+ // shards. If we are only looking up local mongoS operations, we do not need inprog to
+ // view our own ops but *do* require it to view other users' ops.
+ if (_allUsers == UserMode::kIncludeAll ||
+ (isMongos && _localOps == LocalOpsMode::kRemoteShardOps)) {
privileges.push_back({ResourcePattern::forClusterResource(), ActionType::inprog});
}
return privileges;
}
+ bool allowedToForwardFromMongos() const final {
+ return _localOps == LocalOpsMode::kRemoteShardOps;
+ }
+
+ bool allowedToPassthroughFromMongos() const final {
+ return _localOps == LocalOpsMode::kRemoteShardOps;
+ }
+
bool isInitialSource() const final {
return true;
}
private:
- const bool _allUsers;
+ const UserMode _allUsers;
+ const LocalOpsMode _localOps;
};
- using TruncationMode = MongoProcessInterface::CurrentOpTruncateMode;
- using ConnMode = MongoProcessInterface::CurrentOpConnectionsMode;
- using UserMode = MongoProcessInterface::CurrentOpUserMode;
-
static boost::intrusive_ptr<DocumentSourceCurrentOp> create(
const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
ConnMode includeIdleConnections = ConnMode::kExcludeIdle,
UserMode includeOpsFromAllUsers = UserMode::kExcludeOthers,
+ LocalOpsMode showLocalOpsOnMongoS = LocalOpsMode::kRemoteShardOps,
TruncationMode truncateOps = TruncationMode::kNoTruncation);
GetNextResult getNext() final;
@@ -81,7 +96,9 @@ public:
StageConstraints constraints(Pipeline::SplitState pipeState) const final {
StageConstraints constraints(StreamType::kStreaming,
PositionRequirement::kFirst,
- HostTypeRequirement::kAnyShard,
+ (_showLocalOpsOnMongoS == LocalOpsMode::kLocalMongosOps
+ ? HostTypeRequirement::kLocalOnly
+ : HostTypeRequirement::kAnyShard),
DiskUseRequirement::kNoDiskUse,
FacetRequirement::kNotAllowed,
TransactionRequirement::kNotAllowed);
@@ -100,14 +117,17 @@ private:
DocumentSourceCurrentOp(const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
ConnMode includeIdleConnections = ConnMode::kExcludeIdle,
UserMode includeOpsFromAllUsers = UserMode::kExcludeOthers,
+ LocalOpsMode showLocalOpsOnMongoS = LocalOpsMode::kRemoteShardOps,
TruncationMode truncateOps = TruncationMode::kNoTruncation)
: DocumentSource(pExpCtx),
_includeIdleConnections(includeIdleConnections),
_includeOpsFromAllUsers(includeOpsFromAllUsers),
+ _showLocalOpsOnMongoS(showLocalOpsOnMongoS),
_truncateOps(truncateOps) {}
ConnMode _includeIdleConnections = ConnMode::kExcludeIdle;
UserMode _includeOpsFromAllUsers = UserMode::kExcludeOthers;
+ LocalOpsMode _showLocalOpsOnMongoS = LocalOpsMode::kRemoteShardOps;
TruncationMode _truncateOps = TruncationMode::kNoTruncation;
std::string _shardName;
diff --git a/src/mongo/db/pipeline/document_source_current_op_test.cpp b/src/mongo/db/pipeline/document_source_current_op_test.cpp
index f2acc179d75..127dd1f4ee8 100644
--- a/src/mongo/db/pipeline/document_source_current_op_test.cpp
+++ b/src/mongo/db/pipeline/document_source_current_op_test.cpp
@@ -41,6 +41,7 @@ namespace mongo {
namespace {
+const auto kQueryPlanner = ExplainOptions::Verbosity::kQueryPlanner;
const std::string kMockShardName = "testshard";
/**
@@ -121,6 +122,13 @@ TEST_F(DocumentSourceCurrentOpTest, ShouldFailToParseAllUsersIfNotBoolean) {
ErrorCodes::FailedToParse);
}
+TEST_F(DocumentSourceCurrentOpTest, ShouldFailToParseLocalOpsIfNotBoolean) {
+ const auto specObj = fromjson("{$currentOp:{localOps:1}}");
+ ASSERT_THROWS_CODE(DocumentSourceCurrentOp::createFromBson(specObj.firstElement(), getExpCtx()),
+ AssertionException,
+ ErrorCodes::FailedToParse);
+}
+
TEST_F(DocumentSourceCurrentOpTest, ShouldFailToParseTruncateOpsIfNotBoolean) {
const auto specObj = fromjson("{$currentOp:{truncateOps:1}}");
ASSERT_THROWS_CODE(DocumentSourceCurrentOp::createFromBson(specObj.firstElement(), getExpCtx()),
@@ -136,38 +144,38 @@ TEST_F(DocumentSourceCurrentOpTest, ShouldFailToParseIfUnrecognisedParameterSpec
}
TEST_F(DocumentSourceCurrentOpTest, ShouldParseAndSerializeTrueOptionalArguments) {
- const auto specObj =
- fromjson("{$currentOp:{idleConnections:true, allUsers:true, truncateOps:true}}");
+ const auto specObj = fromjson(
+ "{$currentOp:{idleConnections:true, allUsers:true, localOps:true, truncateOps:true}}");
const auto parsed =
DocumentSourceCurrentOp::createFromBson(specObj.firstElement(), getExpCtx());
const auto currentOp = static_cast<DocumentSourceCurrentOp*>(parsed.get());
- const auto expectedOutput =
- Document{{"$currentOp",
- Document{{"idleConnections", true}, {"allUsers", true}, {"truncateOps", true}}}};
+ const auto expectedOutput = Document{{"$currentOp",
+ Document{{"idleConnections", true},
+ {"allUsers", true},
+ {"localOps", true},
+ {"truncateOps", true}}}};
ASSERT_DOCUMENT_EQ(currentOp->serialize().getDocument(), expectedOutput);
}
-TEST_F(DocumentSourceCurrentOpTest, ShouldParseAndSerializeFalseOptionalArguments) {
- const auto specObj =
- fromjson("{$currentOp:{idleConnections:false, allUsers:false, truncateOps:false}}");
+TEST_F(DocumentSourceCurrentOpTest, ShouldParseButNotSerializeFalseOptionalArguments) {
+ const auto specObj = fromjson(
+ "{$currentOp:{idleConnections:false, allUsers:false, localOps:false, truncateOps:false}}");
const auto parsed =
DocumentSourceCurrentOp::createFromBson(specObj.firstElement(), getExpCtx());
const auto currentOp = static_cast<DocumentSourceCurrentOp*>(parsed.get());
- const auto expectedOutput = Document{
- {"$currentOp",
- Document{{"idleConnections", false}, {"allUsers", false}, {"truncateOps", false}}}};
+ const auto expectedOutput = Document{{"$currentOp", Document{}}};
ASSERT_DOCUMENT_EQ(currentOp->serialize().getDocument(), expectedOutput);
}
-TEST_F(DocumentSourceCurrentOpTest, ShouldSerializeOmittedOptionalArgumentsAsDefaultValues) {
+TEST_F(DocumentSourceCurrentOpTest, ShouldNotSerializeOmittedOptionalArguments) {
const auto specObj = fromjson("{$currentOp:{}}");
const auto parsed =
@@ -175,9 +183,7 @@ TEST_F(DocumentSourceCurrentOpTest, ShouldSerializeOmittedOptionalArgumentsAsDef
const auto currentOp = static_cast<DocumentSourceCurrentOp*>(parsed.get());
- const auto expectedOutput = Document{
- {"$currentOp",
- Document{{"idleConnections", false}, {"allUsers", false}, {"truncateOps", false}}}};
+ const auto expectedOutput = Document{{"$currentOp", Document{}}};
ASSERT_DOCUMENT_EQ(currentOp->serialize().getDocument(), expectedOutput);
}
diff --git a/src/mongo/db/pipeline/mongo_process_common.cpp b/src/mongo/db/pipeline/mongo_process_common.cpp
new file mode 100644
index 00000000000..634f9c71231
--- /dev/null
+++ b/src/mongo/db/pipeline/mongo_process_common.cpp
@@ -0,0 +1,74 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/mongo_process_common.h"
+
+#include "mongo/db/auth/authorization_manager.h"
+#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/client.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/service_context.h"
+
+namespace mongo {
+
+std::vector<BSONObj> MongoProcessCommon::getCurrentOps(OperationContext* opCtx,
+ CurrentOpConnectionsMode connMode,
+ CurrentOpUserMode userMode,
+ CurrentOpTruncateMode truncateMode) const {
+ AuthorizationSession* ctxAuth = AuthorizationSession::get(opCtx->getClient());
+
+ std::vector<BSONObj> ops;
+
+ for (ServiceContext::LockedClientsCursor cursor(opCtx->getClient()->getServiceContext());
+ Client* client = cursor.next();) {
+ invariant(client);
+
+ stdx::lock_guard<Client> lk(*client);
+
+ // If auth is disabled, ignore the allUsers parameter.
+ if (ctxAuth->getAuthorizationManager().isAuthEnabled() &&
+ userMode == CurrentOpUserMode::kExcludeOthers &&
+ !ctxAuth->isCoauthorizedWithClient(client)) {
+ continue;
+ }
+
+ // Ignore inactive connections unless 'idleConnections' is true.
+ if (!client->getOperationContext() && connMode == CurrentOpConnectionsMode::kExcludeIdle) {
+ continue;
+ }
+
+ // Delegate to the mongoD- or mongoS-specific implementation of _reportCurrentOpForClient.
+ ops.emplace_back(_reportCurrentOpForClient(opCtx, client, truncateMode));
+ }
+
+ return ops;
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/mongo_process_common.h b/src/mongo/db/pipeline/mongo_process_common.h
new file mode 100644
index 00000000000..19596ade67a
--- /dev/null
+++ b/src/mongo/db/pipeline/mongo_process_common.h
@@ -0,0 +1,62 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include <vector>
+
+#include "mongo/bson/bsonobj.h"
+#include "mongo/db/pipeline/mongo_process_interface.h"
+
+namespace mongo {
+
+/**
+ * MongoProcessCommon provides base implementations of any MongoProcessInterface methods whose code
+ * is largely identical on mongoD and mongoS.
+ */
+class MongoProcessCommon : public MongoProcessInterface {
+public:
+ virtual ~MongoProcessCommon() = default;
+
+ std::vector<BSONObj> getCurrentOps(OperationContext* opCtx,
+ CurrentOpConnectionsMode connMode,
+ CurrentOpUserMode userMode,
+ CurrentOpTruncateMode) const final;
+
+protected:
+ /**
+ * Returns a BSONObj representing a report of the operation which is currently being
+ * executed by the supplied client. This method is called by the getCurrentOps method of
+ * MongoProcessCommon to delegate to the mongoS- or mongoD- specific implementation.
+ */
+ virtual BSONObj _reportCurrentOpForClient(OperationContext* opCtx,
+ Client* client,
+ CurrentOpTruncateMode truncateOps) const = 0;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h
index d32e8276c63..cdfae197c1f 100644
--- a/src/mongo/db/pipeline/mongo_process_interface.h
+++ b/src/mongo/db/pipeline/mongo_process_interface.h
@@ -62,6 +62,7 @@ public:
enum class CurrentOpConnectionsMode { kIncludeIdle, kExcludeIdle };
enum class CurrentOpUserMode { kIncludeAll, kExcludeOthers };
enum class CurrentOpTruncateMode { kNoTruncation, kTruncateOps };
+ enum class CurrentOpLocalOpsMode { kLocalMongosOps, kRemoteShardOps };
struct MakePipelineOptions {
MakePipelineOptions(){};
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index ae130fa7595..e460407f89e 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -42,6 +42,7 @@
#include "mongo/db/catalog/index_catalog.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/curop.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/exec/fetch.h"
@@ -732,83 +733,6 @@ Status PipelineD::MongoDInterface::attachCursorSourceToPipeline(
return Status::OK();
}
-std::vector<BSONObj> PipelineD::MongoDInterface::getCurrentOps(
- OperationContext* opCtx,
- CurrentOpConnectionsMode connMode,
- CurrentOpUserMode userMode,
- CurrentOpTruncateMode truncateMode) const {
- AuthorizationSession* ctxAuth = AuthorizationSession::get(opCtx->getClient());
-
- const std::string hostName = getHostNameCachedAndPort();
-
- std::vector<BSONObj> ops;
-
- for (ServiceContext::LockedClientsCursor cursor(opCtx->getClient()->getServiceContext());
- Client* client = cursor.next();) {
- invariant(client);
-
- stdx::lock_guard<Client> lk(*client);
-
- // If auth is disabled, ignore the allUsers parameter.
- if (ctxAuth->getAuthorizationManager().isAuthEnabled() &&
- userMode == CurrentOpUserMode::kExcludeOthers &&
- !ctxAuth->isCoauthorizedWithClient(client)) {
- continue;
- }
-
- const OperationContext* clientOpCtx = client->getOperationContext();
-
- if (!clientOpCtx && connMode == CurrentOpConnectionsMode::kExcludeIdle) {
- continue;
- }
-
- BSONObjBuilder infoBuilder;
-
- infoBuilder.append("host", hostName);
-
- client->reportState(infoBuilder);
- const auto& clientMetadata = ClientMetadataIsMasterState::get(client).getClientMetadata();
-
- if (clientMetadata) {
- auto appName = clientMetadata.get().getApplicationName();
- if (!appName.empty()) {
- infoBuilder.append("appName", appName);
- }
-
- auto clientMetadataDocument = clientMetadata.get().getDocument();
- infoBuilder.append("clientMetadata", clientMetadataDocument);
- }
-
- // Fill out the rest of the BSONObj with opCtx specific details.
- infoBuilder.appendBool("active", static_cast<bool>(clientOpCtx));
- infoBuilder.append("currentOpTime",
- opCtx->getServiceContext()->getPreciseClockSource()->now().toString());
-
- if (clientOpCtx) {
- infoBuilder.append("opid", clientOpCtx->getOpID());
- if (clientOpCtx->isKillPending()) {
- infoBuilder.append("killPending", true);
- }
-
- if (clientOpCtx->getLogicalSessionId()) {
- BSONObjBuilder bob(infoBuilder.subobjStart("lsid"));
- clientOpCtx->getLogicalSessionId()->serialize(&bob);
- }
-
- CurOp::get(clientOpCtx)
- ->reportState(&infoBuilder, (truncateMode == CurrentOpTruncateMode::kTruncateOps));
-
- Locker::LockerInfo lockerInfo;
- clientOpCtx->lockState()->getLockerInfo(&lockerInfo);
- fillLockerInfo(lockerInfo, infoBuilder);
- }
-
- ops.emplace_back(infoBuilder.obj());
- }
-
- return ops;
-}
-
std::string PipelineD::MongoDInterface::getShardName(OperationContext* opCtx) const {
if (ShardingState::get(opCtx)->enabled()) {
return ShardingState::get(opCtx)->getShardName();
@@ -890,6 +814,23 @@ boost::optional<Document> PipelineD::MongoDInterface::lookupSingleDocument(
return lookedUpDocument;
}
+BSONObj PipelineD::MongoDInterface::_reportCurrentOpForClient(
+ OperationContext* opCtx, Client* client, CurrentOpTruncateMode truncateOps) const {
+ BSONObjBuilder builder;
+
+ CurOp::reportCurrentOpForClient(
+ opCtx, client, (truncateOps == CurrentOpTruncateMode::kTruncateOps), &builder);
+
+ // Append lock stats before returning.
+ if (auto clientOpCtx = client->getOperationContext()) {
+ Locker::LockerInfo lockerInfo;
+ clientOpCtx->lockState()->getLockerInfo(&lockerInfo);
+ fillLockerInfo(lockerInfo, builder);
+ }
+
+ return builder.obj();
+}
+
std::unique_ptr<CollatorInterface> PipelineD::MongoDInterface::_getCollectionDefaultCollator(
OperationContext* opCtx, StringData dbName, UUID collectionUUID) {
auto it = _collatorCache.find(collectionUUID);
diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h
index 60ad684cc7b..630644413df 100644
--- a/src/mongo/db/pipeline/pipeline_d.h
+++ b/src/mongo/db/pipeline/pipeline_d.h
@@ -35,7 +35,7 @@
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/pipeline/aggregation_request.h"
-#include "mongo/db/pipeline/mongo_process_interface.h"
+#include "mongo/db/pipeline/mongo_process_common.h"
#include "mongo/db/query/plan_executor.h"
namespace mongo {
@@ -61,7 +61,7 @@ struct DepsTracker;
*/
class PipelineD {
public:
- class MongoDInterface final : public MongoProcessInterface {
+ class MongoDInterface final : public MongoProcessCommon {
public:
MongoDInterface(OperationContext* opCtx);
@@ -97,10 +97,6 @@ public:
const MakePipelineOptions opts = MakePipelineOptions{}) final;
Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
Pipeline* pipeline) final;
- std::vector<BSONObj> getCurrentOps(OperationContext* opCtx,
- CurrentOpConnectionsMode connMode,
- CurrentOpUserMode userMode,
- CurrentOpTruncateMode truncateMode) const final;
std::string getShardName(OperationContext* opCtx) const final;
std::vector<FieldPath> collectDocumentKeyFields(OperationContext* opCtx,
const NamespaceString& nss,
@@ -114,6 +110,11 @@ public:
std::vector<GenericCursor> getCursors(
const boost::intrusive_ptr<ExpressionContext>& expCtx) const final;
+ protected:
+ BSONObj _reportCurrentOpForClient(OperationContext* opCtx,
+ Client* client,
+ CurrentOpTruncateMode truncateOps) const final;
+
private:
/**
* Looks up the collection default collator for the collection given by 'collectionUUID'. A
diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp
index 8d754771d8a..fdd402b76bd 100644
--- a/src/mongo/db/query/find.cpp
+++ b/src/mongo/db/query/find.cpp
@@ -542,7 +542,8 @@ std::string runQuery(OperationContext* opCtx,
invariant(!nss.isCommand());
// Set CurOp information.
- beginQueryOp(opCtx, nss, q.query, q.ntoreturn, q.ntoskip);
+ const auto upconvertedQuery = upconvertQueryEntry(q.query, nss, q.ntoreturn, q.ntoskip);
+ beginQueryOp(opCtx, nss, upconvertedQuery, q.ntoreturn, q.ntoskip);
// Parse the qm into a CanonicalQuery.
const boost::intrusive_ptr<ExpressionContext> expCtx;
@@ -703,7 +704,7 @@ std::string runQuery(OperationContext* opCtx,
nss,
AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(),
opCtx->recoveryUnit()->getReadConcernLevel(),
- upconvertQueryEntry(q.query, qr.nss(), q.ntoreturn, q.ntoskip)});
+ upconvertedQuery});
ccId = pinnedCursor.getCursor()->cursorid();
LOG(5) << "caching executor with cursorid " << ccId << " after returning " << numResults
diff --git a/src/mongo/rpc/metadata/client_metadata.cpp b/src/mongo/rpc/metadata/client_metadata.cpp
index adcb3711b71..eebf1b6d9dc 100644
--- a/src/mongo/rpc/metadata/client_metadata.cpp
+++ b/src/mongo/rpc/metadata/client_metadata.cpp
@@ -306,9 +306,26 @@ void ClientMetadata::setMongoSMetadata(StringData hostAndPort,
sub.append(kVersion, version);
}
- _document = builder.obj();
-}
+ auto document = builder.obj();
+
+ if (!_appName.empty()) {
+ // The _appName field points into the existing _document, which we are about to replace.
+ // We must redirect _appName to point into the new doc *before* replacing the old doc. We
+ // expect the 'application' metadata of the new document to be identical to the old.
+ auto appMetaData = document[kApplication];
+ invariant(appMetaData.isABSONObj());
+
+ auto appNameEl = appMetaData[kName];
+ invariant(appNameEl.type() == BSONType::String);
+
+ auto appName = appNameEl.valueStringData();
+ invariant(appName == _appName);
+ _appName = appName;
+ }
+
+ _document = std::move(document);
+}
void ClientMetadata::serialize(StringData driverName,
StringData driverVersion,
diff --git a/src/mongo/rpc/metadata/client_metadata_test.cpp b/src/mongo/rpc/metadata/client_metadata_test.cpp
index 7d3166127f4..30e96b33bf8 100644
--- a/src/mongo/rpc/metadata/client_metadata_test.cpp
+++ b/src/mongo/rpc/metadata/client_metadata_test.cpp
@@ -313,6 +313,8 @@ TEST(ClientMetadatTest, TestMongoSAppend) {
ASSERT_EQUALS("g", swParseStatus.getValue().get().getApplicationName());
swParseStatus.getValue().get().setMongoSMetadata("h", "i", "j");
+ ASSERT_EQUALS("g", swParseStatus.getValue().get().getApplicationName());
+
auto doc = swParseStatus.getValue().get().getDocument();
constexpr auto kMongos = "mongos"_sd;
diff --git a/src/mongo/s/commands/cluster_getmore_cmd.cpp b/src/mongo/s/commands/cluster_getmore_cmd.cpp
index 337e6cab708..70e8bdc7048 100644
--- a/src/mongo/s/commands/cluster_getmore_cmd.cpp
+++ b/src/mongo/s/commands/cluster_getmore_cmd.cpp
@@ -50,6 +50,9 @@ class ClusterGetMoreCmd final : public BasicCommand {
public:
ClusterGetMoreCmd() : BasicCommand("getMore") {}
+ std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const final {
+ return GetMoreRequest::parseNs(dbname, cmdObj).ns();
+ }
virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
return false;
diff --git a/src/mongo/s/commands/cluster_pipeline_cmd.cpp b/src/mongo/s/commands/cluster_pipeline_cmd.cpp
index 816c94ec087..03edb25f56c 100644
--- a/src/mongo/s/commands/cluster_pipeline_cmd.cpp
+++ b/src/mongo/s/commands/cluster_pipeline_cmd.cpp
@@ -47,6 +47,10 @@ public:
"http://dochub.mongodb.org/core/aggregation for more details.";
}
+ std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const final {
+ return AggregationRequest::parseNs(dbname, cmdObj).ns();
+ }
+
AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
return AllowedOnSecondary::kAlways;
}
diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp
index 11130f2d4ec..f23ae411854 100644
--- a/src/mongo/s/commands/cluster_write_cmd.cpp
+++ b/src/mongo/s/commands/cluster_write_cmd.cpp
@@ -147,6 +147,19 @@ public:
return true;
}
+ LogicalOp getLogicalOp() const {
+ switch (_writeType) {
+ case BatchedCommandRequest::BatchType::BatchType_Insert:
+ return LogicalOp::opInsert;
+ case BatchedCommandRequest::BatchType::BatchType_Delete:
+ return LogicalOp::opDelete;
+ case BatchedCommandRequest::BatchType::BatchType_Update:
+ return LogicalOp::opUpdate;
+ }
+
+ MONGO_UNREACHABLE;
+ }
+
Status checkAuthForRequest(OperationContext* opCtx, const OpMsgRequest& request) const final {
Status status = auth::checkAuthForWriteCommand(
AuthorizationSession::get(opCtx->getClient()), _writeType, request);
diff --git a/src/mongo/s/commands/pipeline_s.cpp b/src/mongo/s/commands/pipeline_s.cpp
index 791cb7fa2f7..7d631715cba 100644
--- a/src/mongo/s/commands/pipeline_s.cpp
+++ b/src/mongo/s/commands/pipeline_s.cpp
@@ -30,6 +30,7 @@
#include "mongo/s/commands/pipeline_s.h"
+#include "mongo/db/curop.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/query/collation/collation_spec.h"
#include "mongo/db/repl/read_concern_args.h"
@@ -184,6 +185,16 @@ boost::optional<Document> PipelineS::MongoSInterface::lookupSingleDocument(
return (!batch.empty() ? Document(batch.front()) : boost::optional<Document>{});
}
+BSONObj PipelineS::MongoSInterface::_reportCurrentOpForClient(
+ OperationContext* opCtx, Client* client, CurrentOpTruncateMode truncateOps) const {
+ BSONObjBuilder builder;
+
+ CurOp::reportCurrentOpForClient(
+ opCtx, client, (truncateOps == CurrentOpTruncateMode::kTruncateOps), &builder);
+
+ return builder.obj();
+}
+
std::vector<GenericCursor> PipelineS::MongoSInterface::getCursors(
const intrusive_ptr<ExpressionContext>& expCtx) const {
invariant(hasGlobalServiceContext());
diff --git a/src/mongo/s/commands/pipeline_s.h b/src/mongo/s/commands/pipeline_s.h
index cdf72158e31..c0955d491d1 100644
--- a/src/mongo/s/commands/pipeline_s.h
+++ b/src/mongo/s/commands/pipeline_s.h
@@ -28,7 +28,7 @@
#pragma once
-#include "mongo/db/pipeline/mongo_process_interface.h"
+#include "mongo/db/pipeline/mongo_process_common.h"
#include "mongo/db/pipeline/pipeline.h"
namespace mongo {
@@ -43,7 +43,7 @@ public:
* Class to provide access to mongos-specific implementations of methods required by some
* document sources.
*/
- class MongoSInterface final : public MongoProcessInterface {
+ class MongoSInterface final : public MongoProcessCommon {
public:
MongoSInterface() = default;
@@ -51,6 +51,16 @@ public:
void setOperationContext(OperationContext* opCtx) final {}
+ boost::optional<Document> lookupSingleDocument(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& nss,
+ UUID collectionUUID,
+ const Document& documentKey,
+ boost::optional<BSONObj> readConcern) final;
+
+ std::vector<GenericCursor> getCursors(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx) const final;
+
DBClientBase* directClient() final {
MONGO_UNREACHABLE;
}
@@ -108,13 +118,6 @@ public:
MONGO_UNREACHABLE;
}
- std::vector<BSONObj> getCurrentOps(OperationContext* opCtx,
- CurrentOpConnectionsMode connMode,
- CurrentOpUserMode userMode,
- CurrentOpTruncateMode truncateMode) const final {
- MONGO_UNREACHABLE;
- }
-
std::string getShardName(OperationContext* opCtx) const final {
MONGO_UNREACHABLE;
}
@@ -132,15 +135,10 @@ public:
MONGO_UNREACHABLE;
}
- boost::optional<Document> lookupSingleDocument(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& nss,
- UUID collectionUUID,
- const Document& documentKey,
- boost::optional<BSONObj> readConcern) final;
-
- std::vector<GenericCursor> getCursors(
- const boost::intrusive_ptr<ExpressionContext>& expCtx) const final;
+ protected:
+ BSONObj _reportCurrentOpForClient(OperationContext* opCtx,
+ Client* client,
+ CurrentOpTruncateMode truncateOps) const final;
};
private:
diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp
index 8c7491772f1..d3ea2c636e0 100644
--- a/src/mongo/s/commands/strategy.cpp
+++ b/src/mongo/s/commands/strategy.cpp
@@ -41,6 +41,7 @@
#include "mongo/db/auth/action_type.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/commands.h"
+#include "mongo/db/curop.h"
#include "mongo/db/initialize_operation_session_info.h"
#include "mongo/db/lasterror.h"
#include "mongo/db/logical_clock.h"
@@ -275,7 +276,10 @@ void execCommandClient(OperationContext* opCtx,
}
}
-void runCommand(OperationContext* opCtx, const OpMsgRequest& request, BSONObjBuilder&& builder) {
+void runCommand(OperationContext* opCtx,
+ const OpMsgRequest& request,
+ const NetworkOp opType,
+ BSONObjBuilder&& builder) {
// Handle command option maxTimeMS first thing while processing the command so that the
// subsequent code has the deadline available
uassert(ErrorCodes::InvalidOptions,
@@ -299,6 +303,15 @@ void runCommand(OperationContext* opCtx, const OpMsgRequest& request, BSONObjBui
return;
}
+ // Set the logical optype, command object and namespace as soon as we identify the command. If
+ // the command does not define a fully-qualified namespace, set CurOp to the generic command
+ // namespace db.$cmd.
+ auto ns = command->parseNs(request.getDatabase().toString(), request.body);
+ auto nss = (request.getDatabase() == ns ? NamespaceString(ns, "$cmd") : NamespaceString(ns));
+
+ // Fill out all currentOp details.
+ CurOp::get(opCtx)->setGenericOpRequestDetails(opCtx, nss, command, request.body, opType);
+
initializeOperationSessionInfo(opCtx, request.body, command->requiresAuth(), true, true);
auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
@@ -368,6 +381,12 @@ DbResponse Strategy::queryOp(OperationContext* opCtx, const NamespaceString& nss
const QueryMessage q(*dbm);
+ const auto upconvertedQuery = upconvertQueryEntry(q.query, nss, q.ntoreturn, q.ntoskip);
+
+ // Set the upconverted query as the CurOp command object.
+ CurOp::get(opCtx)->setGenericOpRequestDetails(
+ opCtx, nss, nullptr, upconvertedQuery, dbm->msg().operation());
+
Client* const client = opCtx->getClient();
AuthorizationSession* const authSession = AuthorizationSession::get(client);
@@ -475,7 +494,7 @@ DbResponse Strategy::clientCommand(OperationContext* opCtx, const Message& m) {
try { // Execute.
LOG(3) << "Command begin db: " << db << " msg id: " << m.header().getId();
- runCommand(opCtx, request, reply->getInPlaceReplyBuilder(0));
+ runCommand(opCtx, request, m.operation(), reply->getInPlaceReplyBuilder(0));
LOG(3) << "Command end db: " << db << " msg id: " << m.header().getId();
} catch (const DBException& ex) {
LOG(1) << "Exception thrown while processing command on " << db
@@ -549,6 +568,10 @@ DbResponse Strategy::getMore(OperationContext* opCtx, const NamespaceString& nss
GetMoreRequest getMoreRequest(nss, cursorId, batchSize, boost::none, boost::none, boost::none);
+ // Set the upconverted getMore as the CurOp command object.
+ CurOp::get(opCtx)->setGenericOpRequestDetails(
+ opCtx, nss, nullptr, getMoreRequest.toBSON(), dbm->msg().operation());
+
auto cursorResponse = ClusterFind::runGetMore(opCtx, getMoreRequest);
if (cursorResponse == ErrorCodes::CursorNotFound) {
return replyToQuery(ResultFlag_CursorNotFound, nullptr, 0, 0);
@@ -644,6 +667,7 @@ void Strategy::writeOp(OperationContext* opCtx, DbMessage* dbm) {
MONGO_UNREACHABLE;
}
}(),
+ dbm->msg().operation(),
BSONObjBuilder{bb}); // built object is ignored
}
diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript
index 4ff2e432ae6..ec2cb77da8a 100644
--- a/src/mongo/s/query/SConscript
+++ b/src/mongo/s/query/SConscript
@@ -12,6 +12,7 @@ env.Library(
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/commands',
+ '$BUILD_DIR/mongo/db/curop',
'$BUILD_DIR/mongo/db/query/query_common',
"cluster_client_cursor",
"cluster_cursor_cleanup_job",
diff --git a/src/mongo/s/query/cluster_client_cursor.h b/src/mongo/s/query/cluster_client_cursor.h
index a3b680e9de3..ccc44f7a9a0 100644
--- a/src/mongo/s/query/cluster_client_cursor.h
+++ b/src/mongo/s/query/cluster_client_cursor.h
@@ -105,6 +105,11 @@ public:
virtual bool isTailableAndAwaitData() const = 0;
/**
+ * Returns the original command object which created this cursor.
+ */
+ virtual BSONObj getOriginatingCommand() const = 0;
+
+ /**
* Returns the number of result documents returned so far by this cursor via the next() method.
*/
virtual long long getNumReturnedSoFar() const = 0;
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp
index 10152522f59..375be3176c0 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp
@@ -142,6 +142,10 @@ bool ClusterClientCursorImpl::isTailableAndAwaitData() const {
return _params.tailableMode == TailableMode::kTailableAndAwaitData;
}
+BSONObj ClusterClientCursorImpl::getOriginatingCommand() const {
+ return _params.originatingCommandObj;
+}
+
long long ClusterClientCursorImpl::getNumReturnedSoFar() const {
return _numReturnedSoFar;
}
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h
index e2ffee6b869..877910387d5 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.h
+++ b/src/mongo/s/query/cluster_client_cursor_impl.h
@@ -103,6 +103,8 @@ public:
bool isTailableAndAwaitData() const final;
+ BSONObj getOriginatingCommand() const final;
+
long long getNumReturnedSoFar() const final;
void queueResult(const ClusterQueryResult& result) final;
diff --git a/src/mongo/s/query/cluster_client_cursor_mock.cpp b/src/mongo/s/query/cluster_client_cursor_mock.cpp
index 1fa270bfd7e..94f8af9e414 100644
--- a/src/mongo/s/query/cluster_client_cursor_mock.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_mock.cpp
@@ -64,6 +64,10 @@ StatusWith<ClusterQueryResult> ClusterClientCursorMock::next(
return out.getValue();
}
+BSONObj ClusterClientCursorMock::getOriginatingCommand() const {
+ return _originatingCommand;
+}
+
long long ClusterClientCursorMock::getNumReturnedSoFar() const {
return _numReturnedSoFar;
}
diff --git a/src/mongo/s/query/cluster_client_cursor_mock.h b/src/mongo/s/query/cluster_client_cursor_mock.h
index 81fb52f15b4..607250bba46 100644
--- a/src/mongo/s/query/cluster_client_cursor_mock.h
+++ b/src/mongo/s/query/cluster_client_cursor_mock.h
@@ -67,6 +67,8 @@ public:
bool isTailableAndAwaitData() const final;
+ BSONObj getOriginatingCommand() const final;
+
long long getNumReturnedSoFar() const final;
void queueResult(const ClusterQueryResult& result) final;
@@ -96,6 +98,9 @@ private:
std::queue<StatusWith<ClusterQueryResult>> _resultsQueue;
stdx::function<void(void)> _killCallback;
+ // Originating command object.
+ BSONObj _originatingCommand;
+
// Number of returned documents.
long long _numReturnedSoFar = 0;
diff --git a/src/mongo/s/query/cluster_client_cursor_params.h b/src/mongo/s/query/cluster_client_cursor_params.h
index d213b0ea73f..116634bcee8 100644
--- a/src/mongo/s/query/cluster_client_cursor_params.h
+++ b/src/mongo/s/query/cluster_client_cursor_params.h
@@ -88,6 +88,9 @@ struct ClusterClientCursorParams {
// Namespace against which the cursors exist.
NamespaceString nsString;
+ // The original command object which generated this cursor. Must either be empty or owned.
+ BSONObj originatingCommandObj;
+
// Per-remote node data.
std::vector<RemoteCursor> remotes;
diff --git a/src/mongo/s/query/cluster_cursor_manager.cpp b/src/mongo/s/query/cluster_cursor_manager.cpp
index 8d853eb3e57..2275ccbcb8f 100644
--- a/src/mongo/s/query/cluster_cursor_manager.cpp
+++ b/src/mongo/s/query/cluster_cursor_manager.cpp
@@ -142,6 +142,11 @@ void ClusterCursorManager::PinnedCursor::returnCursor(CursorState cursorState) {
*this = PinnedCursor();
}
+BSONObj ClusterCursorManager::PinnedCursor::getOriginatingCommand() const {
+ invariant(_cursor);
+ return _cursor->getOriginatingCommand();
+}
+
CursorId ClusterCursorManager::PinnedCursor::getCursorId() const {
return _cursorId;
}
diff --git a/src/mongo/s/query/cluster_cursor_manager.h b/src/mongo/s/query/cluster_cursor_manager.h
index bb7bc857130..11f67ff533a 100644
--- a/src/mongo/s/query/cluster_cursor_manager.h
+++ b/src/mongo/s/query/cluster_cursor_manager.h
@@ -190,6 +190,11 @@ public:
void returnCursor(CursorState cursorState);
/**
+ * Returns the command object which originally created this cursor.
+ */
+ BSONObj getOriginatingCommand() const;
+
+ /**
* Returns the cursor id for the underlying cursor, or zero if no cursor is owned.
*/
CursorId getCursorId() const;
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index acab84e033d..71a3ef4f922 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -41,6 +41,7 @@
#include "mongo/client/read_preference.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/commands.h"
+#include "mongo/db/curop.h"
#include "mongo/db/query/canonical_query.h"
#include "mongo/db/query/find_common.h"
#include "mongo/db/query/getmore_request.h"
@@ -191,6 +192,7 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx,
// Construct the query and parameters.
ClusterClientCursorParams params(query.nss(), readPref);
+ params.originatingCommandObj = CurOp::get(opCtx)->opDescription().getOwned();
params.limit = query.getQueryRequest().getLimit();
params.batchSize = query.getQueryRequest().getEffectiveBatchSize();
params.skip = query.getQueryRequest().getSkip();
@@ -402,6 +404,14 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx,
}
invariant(request.cursorid == pinnedCursor.getValue().getCursorId());
+ // Set the originatingCommand object and the cursorID in CurOp.
+ {
+ CurOp::get(opCtx)->debug().cursorid = request.cursorid;
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ CurOp::get(opCtx)->setOriginatingCommand_inlock(
+ pinnedCursor.getValue().getOriginatingCommand());
+ }
+
// If the fail point is enabled, busy wait until it is disabled.
while (MONGO_FAIL_POINT(waitAfterPinningCursorBeforeGetMoreBatch)) {
}