summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_current_op.cpp
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2017-05-25 20:36:12 +0100
committerBernard Gorman <bernard.gorman@gmail.com>2017-05-26 08:05:22 +0100
commit58486605c09672b3bfce8608dca403a145413bba (patch)
tree62f44cbd43515b591e6af6347c0a75eb5c6ab5fb /src/mongo/db/pipeline/document_source_current_op.cpp
parent4b88bf79dc47cbdfd74a11605cc5ee1c61e379f8 (diff)
downloadmongo-58486605c09672b3bfce8608dca403a145413bba.tar.gz
SERVER-19318 Add $currentOp aggregation stage for mongoD
Diffstat (limited to 'src/mongo/db/pipeline/document_source_current_op.cpp')
-rw-r--r--src/mongo/db/pipeline/document_source_current_op.cpp178
1 files changed, 178 insertions, 0 deletions
diff --git a/src/mongo/db/pipeline/document_source_current_op.cpp b/src/mongo/db/pipeline/document_source_current_op.cpp
new file mode 100644
index 00000000000..835650961c2
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_current_op.cpp
@@ -0,0 +1,178 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/document_source_current_op.h"
+
+#include "mongo/db/pipeline/lite_parsed_document_source.h"
+#include "mongo/db/server_options.h"
+#include "mongo/util/net/sock.h"
+
+namespace mongo {
+
+namespace {
+const StringData kAllUsersFieldName = "allUsers"_sd;
+const StringData kIdleConnectionsFieldName = "idleConnections"_sd;
+
+const StringData kOpIdFieldName = "opid"_sd;
+const StringData kClientFieldName = "client"_sd;
+const StringData kMongosClientFieldName = "client_s"_sd;
+} // namespace
+
+using boost::intrusive_ptr;
+
+REGISTER_DOCUMENT_SOURCE(currentOp,
+ LiteParsedDocumentSourceDefault::parse,
+ DocumentSourceCurrentOp::createFromBson);
+
+const char* DocumentSourceCurrentOp::getSourceName() const {
+ return "$currentOp";
+}
+
+DocumentSource::InitialSourceType DocumentSourceCurrentOp::getInitialSourceType() const {
+ return InitialSourceType::kCollectionlessInitialSource;
+}
+
+DocumentSource::GetNextResult DocumentSourceCurrentOp::getNext() {
+ if (_ops.empty()) {
+ _ops = _mongod->getCurrentOps(_includeIdleConnections, _includeOpsFromAllUsers);
+
+ _opsIter = _ops.begin();
+
+ if (pExpCtx->inShard) {
+ _shardName = _mongod->getShardName(pExpCtx->opCtx);
+
+ uassert(40465,
+ "Aggregation request specified 'fromRouter' but unable to retrieve shard name "
+ "for $currentOp pipeline stage.",
+ !_shardName.empty());
+ }
+ }
+
+ if (_opsIter != _ops.end()) {
+ if (!pExpCtx->inShard) {
+ return Document(*_opsIter++);
+ }
+
+ // This $currentOp is running in a sharded context.
+ invariant(!_shardName.empty());
+
+ const BSONObj& op = *_opsIter++;
+ MutableDocument doc;
+
+ // For operations on a shard, we change the opid from the raw numeric form to
+ // 'shardname:opid'. We also change the fieldname 'client' to 'client_s' to indicate
+ // that the IP is that of the mongos which initiated this request.
+ for (auto&& elt : op) {
+ StringData fieldName = elt.fieldNameStringData();
+
+ if (fieldName == kOpIdFieldName) {
+ uassert(ErrorCodes::TypeMismatch,
+ str::stream() << "expected numeric opid for $currentOp response from '"
+ << _shardName
+ << "' but got: "
+ << typeName(elt.type()),
+ elt.isNumber());
+
+ std::string shardOpID = (str::stream() << _shardName << ":" << elt.numberInt());
+ doc.addField(kOpIdFieldName, Value(shardOpID));
+ } else if (fieldName == kClientFieldName) {
+ doc.addField(kMongosClientFieldName, Value(elt.str()));
+ } else {
+ doc.addField(fieldName, Value(elt));
+ }
+ }
+
+ return doc.freeze();
+ }
+
+ return GetNextResult::makeEOF();
+}
+
+intrusive_ptr<DocumentSource> DocumentSourceCurrentOp::createFromBson(
+ BSONElement spec, const intrusive_ptr<ExpressionContext>& pExpCtx) {
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << "$currentOp options must be specified in an object, but found: "
+ << typeName(spec.type()),
+ spec.type() == BSONType::Object);
+
+ const NamespaceString& nss = pExpCtx->ns;
+
+ uassert(ErrorCodes::InvalidNamespace,
+ "$currentOp must be run against the 'admin' database with {aggregate: 1}",
+ nss.db() == NamespaceString::kAdminDb && nss.isCollectionlessAggregateNS());
+
+ ConnMode includeIdleConnections = ConnMode::kExcludeIdle;
+ UserMode includeOpsFromAllUsers = UserMode::kExcludeOthers;
+
+ for (auto&& elem : spec.embeddedObject()) {
+ const auto fieldName = elem.fieldNameStringData();
+
+ if (fieldName == kIdleConnectionsFieldName) {
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << "The 'idleConnections' parameter of the $currentOp stage must "
+ "be a boolean value, but found: "
+ << typeName(elem.type()),
+ elem.type() == BSONType::Bool);
+ includeIdleConnections =
+ (elem.Bool() ? ConnMode::kIncludeIdle : ConnMode::kExcludeIdle);
+ } else if (fieldName == kAllUsersFieldName) {
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << "The 'allUsers' parameter of the $currentOp stage must be a "
+ "boolean value, but found: "
+ << typeName(elem.type()),
+ elem.type() == BSONType::Bool);
+ includeOpsFromAllUsers =
+ (elem.Bool() ? UserMode::kIncludeAll : UserMode::kExcludeOthers);
+ } else {
+ uasserted(ErrorCodes::FailedToParse,
+ str::stream() << "Unrecognized option '" << fieldName
+ << "' in $currentOp stage.");
+ }
+ }
+
+ return intrusive_ptr<DocumentSourceCurrentOp>(
+ new DocumentSourceCurrentOp(pExpCtx, includeIdleConnections, includeOpsFromAllUsers));
+}
+
+intrusive_ptr<DocumentSourceCurrentOp> DocumentSourceCurrentOp::create(
+ const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
+ ConnMode includeIdleConnections,
+ UserMode includeOpsFromAllUsers) {
+ return intrusive_ptr<DocumentSourceCurrentOp>(
+ new DocumentSourceCurrentOp(pExpCtx, includeIdleConnections, includeOpsFromAllUsers));
+}
+
+Value DocumentSourceCurrentOp::serialize(boost::optional<ExplainOptions::Verbosity> explain) const {
+ return Value(Document{
+ {getSourceName(),
+ Document{{kIdleConnectionsFieldName, (_includeIdleConnections == ConnMode::kIncludeIdle)},
+ {kAllUsersFieldName, (_includeOpsFromAllUsers == UserMode::kIncludeAll)}}}});
+}
+} // namespace mongo