summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/auth/lib/commands_lib.js24
-rw-r--r--jstests/noPassthrough/aggregation_currentop.js344
-rw-r--r--jstests/noPassthrough/currentop_query.js39
-rw-r--r--jstests/sharding/read_pref_cmd.js33
-rw-r--r--src/mongo/db/auth/authorization_session.cpp52
-rw-r--r--src/mongo/db/auth/authorization_session_test.cpp31
-rw-r--r--src/mongo/db/commands.h8
-rw-r--r--src/mongo/db/commands/count_cmd.cpp2
-rw-r--r--src/mongo/db/commands/distinct.cpp2
-rw-r--r--src/mongo/db/commands/find_cmd.cpp2
-rw-r--r--src/mongo/db/commands/geo_near_cmd.cpp2
-rw-r--r--src/mongo/db/commands/getmore_cmd.cpp2
-rw-r--r--src/mongo/db/commands/group_cmd.cpp2
-rw-r--r--src/mongo/db/commands/haystack.cpp2
-rw-r--r--src/mongo/db/commands/parallel_collection_scan.cpp2
-rw-r--r--src/mongo/db/commands/pipeline_command.cpp13
-rw-r--r--src/mongo/db/namespace_string.cpp12
-rw-r--r--src/mongo/db/namespace_string.h7
-rw-r--r--src/mongo/db/pipeline/SConscript2
-rw-r--r--src/mongo/db/pipeline/aggregation_context_fixture.h6
-rw-r--r--src/mongo/db/pipeline/aggregation_request.cpp35
-rw-r--r--src/mongo/db/pipeline/aggregation_request.h16
-rw-r--r--src/mongo/db/pipeline/aggregation_request_test.cpp60
-rw-r--r--src/mongo/db/pipeline/document_source.h51
-rw-r--r--src/mongo/db/pipeline/document_source_coll_stats.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_coll_stats.h2
-rw-r--r--src/mongo/db/pipeline/document_source_current_op.cpp178
-rw-r--r--src/mongo/db/pipeline/document_source_current_op.h73
-rw-r--r--src/mongo/db/pipeline/document_source_current_op_test.cpp235
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.h4
-rw-r--r--src/mongo/db/pipeline/document_source_facet.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_facet_test.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_geo_near.h4
-rw-r--r--src/mongo/db/pipeline/document_source_index_stats.h4
-rw-r--r--src/mongo/db/pipeline/document_source_merge_cursors.h4
-rw-r--r--src/mongo/db/pipeline/document_source_mock.h4
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp39
-rw-r--r--src/mongo/db/pipeline/pipeline.h7
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp77
-rw-r--r--src/mongo/db/pipeline/pipeline_test.cpp59
-rw-r--r--src/mongo/db/pipeline/stub_mongod_interface.h9
-rw-r--r--src/mongo/db/run_commands.cpp7
42 files changed, 1388 insertions, 77 deletions
diff --git a/jstests/auth/lib/commands_lib.js b/jstests/auth/lib/commands_lib.js
index 5ac0a0c717f..1751a33cb37 100644
--- a/jstests/auth/lib/commands_lib.js
+++ b/jstests/auth/lib/commands_lib.js
@@ -638,6 +638,30 @@ var authCommandsLib = {
]
},
{
+ testname: "aggregate_currentOp_allUsers_true",
+ command: {aggregate: 1, pipeline: [{$currentOp: {allUsers: true}}], cursor: {}},
+ skipSharded: true,
+ testcases: [
+ {
+ runOnDb: adminDbName,
+ roles: roles_monitoring,
+ privileges: [{resource: {cluster: true}, actions: ["inprog"]}]
+ },
+ {
+ runOnDb: firstDbName,
+ roles: roles_monitoring,
+ privileges: [{resource: {cluster: true}, actions: ["inprog"]}],
+ expectFail: true
+ }
+ ]
+ },
+ {
+ testname: "aggregate_currentOp_allUsers_false",
+ command: {aggregate: 1, pipeline: [{$currentOp: {allUsers: false}}], cursor: {}},
+ skipSharded: true,
+ testcases: [{runOnDb: adminDbName, roles: roles_all}]
+ },
+ {
testname: "aggregate_lookup",
command: {
aggregate: "foo",
diff --git a/jstests/noPassthrough/aggregation_currentop.js b/jstests/noPassthrough/aggregation_currentop.js
new file mode 100644
index 00000000000..2cb167f092b
--- /dev/null
+++ b/jstests/noPassthrough/aggregation_currentop.js
@@ -0,0 +1,344 @@
+/**
+ * Tests that the $currentOp aggregation stage behaves as expected. Specifically:
+ * - It must be the fist stage in the pipeline.
+ * - It can only be run on admin, and the "aggregate" field must be 1.
+ * - Only active connections are shown unless {idleConnections: true} is specified.
+ * - A user without the inprog privilege can see their own ops, but no-one else's.
+ * - A user with the inprog privilege can see all ops.
+ * - Non-local readConcerns are rejected.
+ * - Collation rules are respected.
+ */
+(function() {
+ "use strict";
+
+ const key = "jstests/libs/key1";
+
+ // Create a new replica set for testing. We set the internalQueryExecYieldIterations parameter
+ // so that plan execution yields on every iteration. For some tests, we will temporarily set
+ // yields to hang the mongod so we can capture particular operations in the currentOp output.
+ const rst = new ReplSetTest({
+ name: jsTestName(),
+ nodes: 3,
+ keyFile: key,
+ nodeOptions: {setParameter: {internalQueryExecYieldIterations: 1}}
+ });
+
+ const nodes = rst.nodeList();
+
+ rst.startSet();
+ rst.initiate({
+ _id: jsTestName(),
+ members: [
+ {_id: 0, host: nodes[0], priority: 1},
+ {_id: 1, host: nodes[1], priority: 0},
+ {_id: 2, host: nodes[2], arbiterOnly: true}
+ ],
+ });
+
+ let primary = rst.getPrimary();
+
+ let testDB = primary.getDB(jsTestName());
+ let adminDB = primary.getDB("admin");
+
+ // Create an admin user, one user with the inprog privilege, and one without.
+ assert.commandWorked(adminDB.runCommand({createUser: "admin", pwd: "pwd", roles: ["root"]}));
+ assert(adminDB.auth("admin", "pwd"));
+
+ assert.commandWorked(adminDB.runCommand({
+ createRole: "role_inprog",
+ roles: [],
+ privileges: [{resource: {cluster: true}, actions: ["inprog"]}]
+ }));
+
+ assert.commandWorked(adminDB.runCommand(
+ {createUser: "user_inprog", pwd: "pwd", roles: ["role_inprog", "readAnyDatabase"]}));
+
+ assert.commandWorked(
+ adminDB.runCommand({createUser: "user_no_inprog", pwd: "pwd", roles: ["readAnyDatabase"]}));
+
+ // Create some dummy test data.
+ testDB.test.drop();
+
+ for (let i = 0; i < 5; i++) {
+ assert.writeOK(testDB.test.insert({_id: i, a: i}));
+ }
+
+ // Functions to support running an operation in a parallel shell for testing allUsers behaviour.
+ function runInParallelShell({testfunc, username, password}) {
+ TestData.aggCurOpTest = testfunc;
+ TestData.aggCurOpUser = username;
+ TestData.aggCurOpPwd = password;
+
+ assert.commandWorked(
+ adminDB.runCommand({configureFailPoint: "setYieldAllLocksHang", mode: "alwaysOn"}));
+
+ testfunc = function() {
+ db.getSiblingDB("admin").auth(TestData.aggCurOpUser, TestData.aggCurOpPwd);
+ TestData.aggCurOpTest();
+ db.getSiblingDB("admin").logout();
+ };
+
+ return startParallelShell(testfunc, primary.port);
+ }
+
+ function assertCurrentOpHasSingleMatchingEntry({currentOpAggFilter, curOpOpts}) {
+ curOpOpts = (curOpOpts || {allUsers: true});
+
+ let result = null;
+
+ assert.soon(
+ function() {
+ result = adminDB.runCommand({
+ aggregate: 1,
+ pipeline: [{$currentOp: curOpOpts}, {$match: currentOpAggFilter}],
+ cursor: {}
+ });
+
+ assert.commandWorked(result);
+
+ return (result.cursor.firstBatch.length === 1);
+ },
+ function() {
+ return "Failed to find operation in $currentOp output: " + tojson(result);
+ });
+ }
+
+ function waitForParallelShell(awaitShell) {
+ assert.commandWorked(
+ adminDB.runCommand({configureFailPoint: "setYieldAllLocksHang", mode: "off"}));
+
+ awaitShell();
+ }
+
+ /**
+ * Restarts a replica set with additional parameters, and optionally re-authenticates.
+ */
+ function restartReplSet(replSet, newOpts, user, pwd) {
+ const numNodes = replSet.nodeList().length;
+
+ for (let n = 0; n < numNodes; n++) {
+ replSet.restart(n, newOpts);
+ }
+
+ primary = replSet.getPrimary();
+ replSet.awaitSecondaryNodes();
+
+ testDB = primary.getDB(jsTestName());
+ adminDB = primary.getDB("admin");
+
+ if (user && pwd) {
+ adminDB.auth(user, pwd);
+ }
+ }
+
+ //
+ // Authenticate as user_no_inprog.
+ //
+ assert(adminDB.logout());
+ assert(adminDB.auth("user_no_inprog", "pwd"));
+
+ // Test that $currentOp fails with {allUsers: true} for a user without the "inprog" privilege.
+ assert.commandFailedWithCode(
+ adminDB.runCommand({aggregate: 1, pipeline: [{$currentOp: {allUsers: true}}], cursor: {}}),
+ ErrorCodes.Unauthorized);
+
+ // Test that $currentOp succeeds with {allUsers: false} for a user without the "inprog"
+ // privilege.
+ assert.commandWorked(adminDB.runCommand(
+ {aggregate: 1, pipeline: [{$currentOp: {allUsers: false}}], cursor: {}}));
+
+ // Test that $currentOp fails when run as {aggregate: 1} on a database other than admin.
+ assert.commandFailedWithCode(
+ testDB.runCommand({aggregate: 1, pipeline: [{$currentOp: {}}], cursor: {}}),
+ ErrorCodes.InvalidNamespace);
+
+ // Test that $currentOp fails when run on admin without {aggregate: 1}.
+ assert.commandFailedWithCode(
+ adminDB.runCommand({aggregate: "collname", pipeline: [{$currentOp: {}}], cursor: {}}),
+ ErrorCodes.InvalidNamespace);
+
+ // Test that $currentOp accepts all numeric types.
+ const ones = [1, 1.0, NumberInt(1), NumberLong(1), NumberDecimal(1)];
+
+ for (let one of ones) {
+ assert.commandWorked(
+ adminDB.runCommand({aggregate: one, pipeline: [{$currentOp: {}}], cursor: {}}));
+ }
+
+ // Test that {aggregate: 1} fails when the first stage in the pipeline is not $currentOp.
+ assert.commandFailedWithCode(
+ adminDB.runCommand({aggregate: 1, pipeline: [{$match: {}}], cursor: {}}),
+ ErrorCodes.InvalidNamespace);
+
+ // Test that $currentOp fails when it is not the first stage in the pipeline. We use two
+ // $currentOp stages since any other stage in the initial position will trip the {aggregate: 1}
+ // namespace check.
+ assert.commandFailedWithCode(
+ adminDB.runCommand(
+ {aggregate: 1, pipeline: [{$currentOp: {}}, {$currentOp: {}}], cursor: {}}),
+ ErrorCodes.BadValue);
+
+ // Test that $currentOp succeeds if local readConcern is specified.
+ assert.commandWorked(adminDB.runCommand(
+ {aggregate: 1, pipeline: [{$currentOp: {}}], readConcern: {level: "local"}, cursor: {}}));
+
+ // Test that $currentOp fails if a non-local readConcern is specified.
+ assert.commandFailedWithCode(adminDB.runCommand({
+ aggregate: 1,
+ pipeline: [{$currentOp: {}}],
+ readConcern: {level: "linearizable"},
+ cursor: {}
+ }),
+ ErrorCodes.InvalidOptions);
+
+ // Test that a user without the inprog privilege cannot see another user's operations.
+ // Temporarily log in as 'user_inprog' to validate that the op is present in $currentOp output.
+ assert(adminDB.logout());
+ assert(adminDB.auth("user_inprog", "pwd"));
+
+ let awaitShell = runInParallelShell({
+ testfunc: function() {
+ assert.eq(db.getSiblingDB(jsTestName())
+ .test.find({})
+ .comment("agg_current_op_allusers_test")
+ .itcount(),
+ 5);
+ },
+ username: "user_inprog",
+ password: "pwd"
+ });
+
+ assertCurrentOpHasSingleMatchingEntry({
+ currentOpAggFilter: {"command.comment": "agg_current_op_allusers_test"},
+ curOpOpts: {allUsers: true}
+ });
+
+ // Log back in as 'user_no_inprog' and validate that the user cannot see the op.
+ assert(adminDB.logout());
+ assert(adminDB.auth("user_no_inprog", "pwd"));
+
+ assert.eq(adminDB
+ .runCommand({
+ aggregate: 1,
+ pipeline: [
+ {$currentOp: {allUsers: false}},
+ {$match: {"command.comment": "agg_current_op_allusers_test"}}
+ ],
+ cursor: {}
+ })
+ .cursor.firstBatch.length,
+ 0);
+
+ waitForParallelShell(awaitShell);
+
+ //
+ // Authenticate as user_inprog.
+ //
+ assert(adminDB.logout());
+ assert(adminDB.auth("user_inprog", "pwd"));
+
+ // Test that $currentOp with {allUsers: true} succeeds for a user with the "inprog"
+ // privilege.
+ assert.commandWorked(
+ adminDB.runCommand({aggregate: 1, pipeline: [{$currentOp: {allUsers: true}}], cursor: {}}));
+
+ // Test that {idleConnections: false} returns only active connections.
+ assert.eq(adminDB
+ .runCommand({
+ aggregate: 1,
+ pipeline: [
+ {$currentOp: {allUsers: true, idleConnections: false}},
+ {$match: {"active": false}}
+ ],
+ cursor: {}
+ })
+ .cursor.firstBatch.length,
+ 0);
+
+ // Test that {idleConnections: true} returns inactive connections.
+ const idleConn = new Mongo(primary.host);
+
+ assert.gte(adminDB
+ .runCommand({
+ aggregate: 1,
+ pipeline: [
+ {$currentOp: {allUsers: true, idleConnections: true}},
+ {$match: {active: false}}
+ ],
+ cursor: {}
+ })
+ .cursor.firstBatch.length,
+ 1);
+
+ // Test that a user with the inprog privilege can see another user's operations with
+ // {allUsers: true}
+ awaitShell = runInParallelShell({
+ testfunc: function() {
+ assert.eq(db.getSiblingDB(jsTestName())
+ .test.find({})
+ .comment("agg_current_op_allusers_test")
+ .itcount(),
+ 5);
+ },
+ username: "user_no_inprog",
+ password: "pwd"
+ });
+
+ assertCurrentOpHasSingleMatchingEntry(
+ {currentOpAggFilter: {"command.comment": "agg_current_op_allusers_test"}});
+
+ waitForParallelShell(awaitShell);
+
+ // Test that collation rules apply to matches on $currentOp output.
+ assert.eq(
+ adminDB
+ .runCommand({
+ aggregate: 1,
+ pipeline:
+ [{$currentOp: {}}, {$match: {"command.comment": "AGG_currént_op_COLLATION"}}],
+ collation: {locale: "en_US", strength: 1}, // Case and diacritic insensitive.
+ comment: "agg_current_op_collation",
+ cursor: {}
+ })
+ .cursor.firstBatch.length,
+ 1);
+
+ // Test that $currentOp is explainable.
+ const explainPlan = assert.commandWorked(adminDB.runCommand({
+ aggregate: 1,
+ pipeline:
+ [{$currentOp: {idleConnections: true, allUsers: false}}, {$match: {desc: "test"}}],
+ explain: true
+ }));
+
+ const expectedStages =
+ [{$currentOp: {idleConnections: true, allUsers: false}}, {$match: {desc: "test"}}];
+
+ assert.eq(explainPlan.stages, expectedStages);
+
+ // Test that the allUsers parameter is ignored when authentication is disabled.
+ restartReplSet(rst, {keyFile: null});
+
+ // Ensure that there is at least one other connection present.
+ const otherConn = new Mongo(primary.host);
+
+ // Verify that $currentOp displays all operations when auth is disabled regardless of the
+ // allUsers parameter, by checking that the output is the same in both cases. We project static
+ // fields from each operation so that a thread which becomes active between the two aggregations
+ // is still comparable across the output of both.
+ let aggCmd = {
+ aggregate: 1,
+ pipeline: [
+ {$currentOp: {allUsers: true, idleConnections: true}},
+ {$project: {desc: 1, threadId: 1, connectionId: 1, appName: 1}},
+ {$sort: {threadId: 1}}
+ ],
+ cursor: {}
+ };
+
+ const aggAllUsersTrue = assert.commandWorked(adminDB.runCommand(aggCmd));
+ aggCmd.pipeline[0].$currentOp.allUsers = false;
+ const aggAllUsersFalse = assert.commandWorked(adminDB.runCommand(aggCmd));
+
+ assert.eq(aggAllUsersFalse.cursor.firstBatch, aggAllUsersTrue.cursor.firstBatch);
+})();
diff --git a/jstests/noPassthrough/currentop_query.js b/jstests/noPassthrough/currentop_query.js
index 950ac48d812..142371848b8 100644
--- a/jstests/noPassthrough/currentop_query.js
+++ b/jstests/noPassthrough/currentop_query.js
@@ -6,12 +6,14 @@
"use strict";
/**
- * @param {Object} params - Configuration options for the test.
- * @param {string} params.readMode - The read mode to use for the parallel shell. This allows
+ * @param {string} readMode - The read mode to use for the parallel shell. This allows
* testing currentOp() output for both OP_QUERY and OP_GET_MORE queries, as well as "find" and
* "getMore" commands.
+ * @params {function} currentOp - Function which takes a database object and a filter, and
+ * returns an array of matching current operations. This allows us to test output for both the
+ * currentOp command and the $currentOp aggregation stage.
*/
- function runTest(params) {
+ function runTest({readMode, currentOp}) {
var conn = MongoRunner.runMongod({smallfiles: "", nojournal: ""});
assert.neq(null, conn, "mongod was unable to start up");
@@ -44,7 +46,7 @@
{configureFailPoint: "setYieldAllLocksHang", mode: "alwaysOn"}));
// Set shell read mode for the parallel shell test.
- TestData.shellReadMode = params.readMode;
+ TestData.shellReadMode = readMode;
TestData.currentOpTest = testObj.test;
testObj.test = function() {
db.getMongo().forceReadMode(TestData.shellReadMode);
@@ -67,7 +69,7 @@
testObj.currentOpFilter.op = testObj.operation;
}
- var result = testDB.currentOp(testObj.currentOpFilter);
+ var result = currentOp(testDB, testObj.currentOpFilter);
assert.commandWorked(result);
if (result.inprog.length === 1) {
@@ -83,7 +85,7 @@
},
function() {
return "Failed to find operation from " + tojson(testObj) +
- " in currentOp() output: " + tojson(testDB.currentOp());
+ " in currentOp() output: " + tojson(currentOp(testDB, {}));
});
// Allow the query to complete.
@@ -246,7 +248,7 @@
//
// Confirm currentOp contains collation for find command.
//
- if (params.readMode === "commands") {
+ if (readMode === "commands") {
confirmCurrentOpContents({
test: function() {
assert.eq(db.currentop_query.find({a: 1})
@@ -324,7 +326,7 @@
// Confirm that currentOp displays upconverted getMore and originatingCommand in the case of
// a legacy query.
//
- if (params.readMode === "legacy") {
+ if (readMode === "legacy") {
let filter = {
"command.getMore": {$gt: 0},
"command.collection": "currentop_query",
@@ -454,6 +456,23 @@
delete TestData.queryFilter;
}
- runTest({readMode: "commands"});
- runTest({readMode: "legacy"});
+ function currentOpCommand(inputDB, filter) {
+ return inputDB.currentOp(filter);
+ }
+
+ function currentOpAgg(inputDB, filter) {
+ let adminDB = inputDB.getSiblingDB("admin");
+ let cmdRes = adminDB.runCommand(
+ {aggregate: 1, pipeline: [{$currentOp: {}}, {$match: filter}], cursor: {}});
+
+ assert.commandWorked(cmdRes);
+
+ return {inprog: new DBCommandCursor(inputDB.getMongo(), cmdRes, 5).toArray(), ok: 1};
+ }
+
+ runTest({readMode: "commands", currentOp: currentOpCommand});
+ runTest({readMode: "legacy", currentOp: currentOpCommand});
+
+ runTest({readMode: "commands", currentOp: currentOpAgg});
+ runTest({readMode: "legacy", currentOp: currentOpAgg});
})();
diff --git a/jstests/sharding/read_pref_cmd.js b/jstests/sharding/read_pref_cmd.js
index e8c06f48694..02ca8f25b63 100644
--- a/jstests/sharding/read_pref_cmd.js
+++ b/jstests/sharding/read_pref_cmd.js
@@ -10,9 +10,11 @@ var setUp = function() {
configDB.adminCommand({enableSharding: 'test'});
configDB.adminCommand({shardCollection: 'test.user', key: {x: 1}});
- // Each time we drop the 'test' DB we have to re-enable profiling
+ // Each time we drop the 'test' DB we have to re-enable profiling. Enable profiling on 'admin'
+ // to test the $currentOp aggregation stage.
st.rs0.nodes.forEach(function(node) {
node.getDB('test').setProfilingLevel(2);
+ node.getDB('admin').setProfilingLevel(2);
});
};
@@ -38,6 +40,7 @@ var tearDown = function() {
*/
var testReadPreference = function(conn, hostList, isMongos, mode, tagSets, secExpected) {
var testDB = conn.getDB('test');
+ var adminDB = conn.getDB('admin');
conn.setSlaveOk(false); // purely rely on readPref
jsTest.log('Testing mode: ' + mode + ', tag sets: ' + tojson(tagSets));
conn.setReadPref(mode, tagSets);
@@ -50,12 +53,17 @@ var testReadPreference = function(conn, hostList, isMongos, mode, tagSets, secEx
* @param secOk true if command should be routed to a secondary.
* @param profileQuery the query to perform agains the profile collection to
* look for the cmd just sent.
+ * @param dbName the name of the database against which to run the command,
+ * and to which the 'system.profile' entry for this command is written.
*/
- var cmdTest = function(cmdObj, secOk, profileQuery) {
+ var cmdTest = function(cmdObj, secOk, profileQuery, dbName = "test") {
jsTest.log('about to do: ' + tojson(cmdObj));
+
+ let runCmdDB = conn.getDB(dbName);
+
// use runReadCommand so that the cmdObj is modified with the readPreference
// set on the connection.
- var cmdResult = testDB.runReadCommand(cmdObj);
+ var cmdResult = runCmdDB.runReadCommand(cmdObj);
jsTest.log('cmd result: ' + tojson(cmdResult));
assert(cmdResult.ok);
@@ -64,18 +72,18 @@ var testReadPreference = function(conn, hostList, isMongos, mode, tagSets, secEx
Object.extend(query, profileQuery);
hostList.forEach(function(node) {
- var testDB = node.getDB('test');
- var result = testDB.system.profile.findOne(query);
+ var profileDB = node.getDB(dbName);
+ var result = profileDB.system.profile.findOne(query);
if (result != null) {
if (secOk && secExpected) {
// The command obeys read prefs and we expect to run
// commands on secondaries with this mode and tag sets
- assert(testDB.adminCommand({isMaster: 1}).secondary);
+ assert(profileDB.adminCommand({isMaster: 1}).secondary);
} else {
// The command does not obey read prefs, or we expect to run
// commands on primary with this mode or tag sets
- assert(testDB.adminCommand({isMaster: 1}).ismaster);
+ assert(profileDB.adminCommand({isMaster: 1}).ismaster);
}
testedAtLeastOnce = true;
@@ -168,6 +176,17 @@ var testReadPreference = function(conn, hostList, isMongos, mode, tagSets, secEx
cmdTest({aggregate: 'mrIn', pipeline: [{$project: {x: 1}}], cursor: {}},
true,
formatProfileQuery({aggregate: 'mrIn'}));
+
+ // Test $currentOp aggregation stage.
+ // TODO SERVER-19318: Remove check once the $currentOp stage is supported on mongos.
+ if (!isMongos) {
+ let curOpComment = 'agg_currentOp_' + ObjectId();
+
+ cmdTest({aggregate: 1, pipeline: [{$currentOp: {}}], comment: curOpComment, cursor: {}},
+ true,
+ formatProfileQuery({comment: curOpComment}),
+ "admin");
+ }
};
/**
diff --git a/src/mongo/db/auth/authorization_session.cpp b/src/mongo/db/auth/authorization_session.cpp
index eb4de1b2083..cd2eac0d1cd 100644
--- a/src/mongo/db/auth/authorization_session.cpp
+++ b/src/mongo/db/auth/authorization_session.cpp
@@ -256,10 +256,15 @@ void AuthorizationSession::_addPrivilegesForStage(const std::string& db,
Status AuthorizationSession::checkAuthForAggregate(const NamespaceString& ns,
const BSONObj& cmdObj) {
std::string db(ns.db().toString());
- auto inputResource = ResourcePattern::forExactNamespace(ns);
uassert(
17138, mongoutils::str::stream() << "Invalid input namespace, " << ns.ns(), ns.isValid());
+ // If this connection does not need to be authenticated (for instance, if auth is disabled),
+ // return Status::OK() immediately.
+ if (_externalState->shouldIgnoreAuthChecks()) {
+ return Status::OK();
+ }
+
PrivilegeVector privileges;
BSONElement pipelineElem = cmdObj["pipeline"];
@@ -270,8 +275,8 @@ Status AuthorizationSession::checkAuthForAggregate(const NamespaceString& ns,
BSONObj pipeline = pipelineElem.embeddedObject();
if (pipeline.isEmpty()) {
// The pipeline is empty, so we require only the find action.
- Privilege::addPrivilegeToPrivilegeVector(&privileges,
- Privilege(inputResource, ActionType::find));
+ Privilege::addPrivilegeToPrivilegeVector(
+ &privileges, Privilege(ResourcePattern::forExactNamespace(ns), ActionType::find));
} else {
if (pipeline.firstElementType() != BSONType::Object) {
// The pipeline contains something that's not an object.
@@ -284,15 +289,48 @@ Status AuthorizationSession::checkAuthForAggregate(const NamespaceString& ns,
BSONObj firstPipelineStage = pipeline.firstElement().embeddedObject();
if (str::equals("$indexStats", firstPipelineStage.firstElementFieldName())) {
Privilege::addPrivilegeToPrivilegeVector(
- &privileges, Privilege(inputResource, ActionType::indexStats));
+ &privileges,
+ Privilege(ResourcePattern::forExactNamespace(ns), ActionType::indexStats));
} else if (str::equals("$collStats", firstPipelineStage.firstElementFieldName())) {
Privilege::addPrivilegeToPrivilegeVector(
- &privileges, Privilege(inputResource, ActionType::collStats));
+ &privileges,
+ Privilege(ResourcePattern::forExactNamespace(ns), ActionType::collStats));
+ } else if (str::equals("$currentOp", firstPipelineStage.firstElementFieldName())) {
+ // Need to check the value of allUsers; if true then inprog privilege is required.
+ // {$currentOp: {idleConnections: <boolean|false>, allUsers: <boolean|false>}}
+ BSONElement spec = firstPipelineStage["$currentOp"];
+
+ if (spec.type() != BSONType::Object) {
+ return Status(
+ ErrorCodes::TypeMismatch,
+ str::stream()
+ << "$currentOp options must be specified in an object, but found: "
+ << typeName(spec.type()));
+ }
+
+ auto allUsersElt = spec["allUsers"];
+
+ if (allUsersElt && allUsersElt.type() != BSONType::Bool) {
+ return Status(ErrorCodes::TypeMismatch,
+ str::stream() << "The 'allUsers' parameter of the $currentOp stage "
+ "must be a boolean value, but found: "
+ << typeName(allUsersElt.type()));
+ }
+
+ if (allUsersElt && allUsersElt.boolean()) {
+ Privilege::addPrivilegeToPrivilegeVector(
+ &privileges,
+ Privilege(ResourcePattern::forClusterResource(), ActionType::inprog));
+ } else if (!getAuthenticatedUserNames().more()) {
+ // This connection is not authenticated, so we should return an error even though
+ // there are no privilege requirements when allUsers is false.
+ return Status(ErrorCodes::Unauthorized, "unauthorized");
+ }
} else {
// If no source requiring an alternative permission scheme is specified then default to
// requiring find() privileges on the given namespace.
- Privilege::addPrivilegeToPrivilegeVector(&privileges,
- Privilege(inputResource, ActionType::find));
+ Privilege::addPrivilegeToPrivilegeVector(
+ &privileges, Privilege(ResourcePattern::forExactNamespace(ns), ActionType::find));
}
// Add additional required privileges for each stage in the pipeline.
diff --git a/src/mongo/db/auth/authorization_session_test.cpp b/src/mongo/db/auth/authorization_session_test.cpp
index 1537f7dce73..806bc3795e8 100644
--- a/src/mongo/db/auth/authorization_session_test.cpp
+++ b/src/mongo/db/auth/authorization_session_test.cpp
@@ -627,6 +627,37 @@ TEST_F(AuthorizationSessionTest, CanAggregateIndexStatsWithIndexStatsAction) {
ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj));
}
+TEST_F(AuthorizationSessionTest, CanAggregateCurrentOpAllUsersFalseWithoutInprogAction) {
+ authzSession->assumePrivilegesForDB(Privilege(testFooCollResource, {ActionType::find}));
+
+ BSONArray pipeline = BSON_ARRAY(BSON("$currentOp" << BSON("allUsers" << false)));
+ BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline);
+ ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj));
+}
+
+TEST_F(AuthorizationSessionTest, CannotAggregateCurrentOpAllUsersFalseIfNotAuthenticated) {
+ BSONArray pipeline = BSON_ARRAY(BSON("$currentOp" << BSON("allUsers" << false)));
+ BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline);
+ ASSERT_EQ(ErrorCodes::Unauthorized, authzSession->checkAuthForAggregate(testFooNss, cmdObj));
+}
+
+TEST_F(AuthorizationSessionTest, CannotAggregateCurrentOpAllUsersTrueWithoutInprogAction) {
+ authzSession->assumePrivilegesForDB(Privilege(testFooCollResource, {ActionType::find}));
+
+ BSONArray pipeline = BSON_ARRAY(BSON("$currentOp" << BSON("allUsers" << true)));
+ BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline);
+ ASSERT_EQ(ErrorCodes::Unauthorized, authzSession->checkAuthForAggregate(testFooNss, cmdObj));
+}
+
+TEST_F(AuthorizationSessionTest, CanAggregateCurrentOpAllUsersTrueWithInprogAction) {
+ authzSession->assumePrivilegesForDB(
+ Privilege(ResourcePattern::forClusterResource(), {ActionType::inprog}));
+
+ BSONArray pipeline = BSON_ARRAY(BSON("$currentOp" << BSON("allUsers" << true)));
+ BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline);
+ ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj));
+}
+
TEST_F(AuthorizationSessionTest, AddPrivilegesForStageFailsIfOutNamespaceIsNotValid) {
BSONArray pipeline = BSON_ARRAY(BSON("$out"
<< ""));
diff --git a/src/mongo/db/commands.h b/src/mongo/db/commands.h
index fd0840fdddd..94c2f5d1a06 100644
--- a/src/mongo/db/commands.h
+++ b/src/mongo/db/commands.h
@@ -205,7 +205,9 @@ public:
virtual bool maintenanceOk() const = 0;
/**
- * Returns true if this Command supports the readConcern argument.
+ * Returns true if this Command supports the readConcern argument. Takes the command object and
+ * the name of the database on which it was invoked as arguments, so that readConcern can be
+ * conditionally rejected based on the command's parameters and/or namespace.
*
* If the readConcern argument is sent to a command that returns false the command processor
* will reject the command, returning an appropriate error message. For commands that support
@@ -216,7 +218,7 @@ public:
* the option to the shards as needed. We rely on the shards to fail the commands in the
* cases where it isn't supported.
*/
- virtual bool supportsReadConcern() const = 0;
+ virtual bool supportsReadConcern(const std::string& dbName, const BSONObj& cmdObj) const = 0;
/**
* Returns LogicalOp for this command.
@@ -348,7 +350,7 @@ public:
return true; /* assumed true prior to commit */
}
- bool supportsReadConcern() const override {
+ bool supportsReadConcern(const std::string& dbName, const BSONObj& cmdObj) const override {
return false;
}
diff --git a/src/mongo/db/commands/count_cmd.cpp b/src/mongo/db/commands/count_cmd.cpp
index 0003b2bf806..bb836ace5dd 100644
--- a/src/mongo/db/commands/count_cmd.cpp
+++ b/src/mongo/db/commands/count_cmd.cpp
@@ -80,7 +80,7 @@ public:
return false;
}
- bool supportsReadConcern() const final {
+ bool supportsReadConcern(const std::string& dbName, const BSONObj& cmdObj) const final {
return true;
}
diff --git a/src/mongo/db/commands/distinct.cpp b/src/mongo/db/commands/distinct.cpp
index c1958e34f55..6643c952395 100644
--- a/src/mongo/db/commands/distinct.cpp
+++ b/src/mongo/db/commands/distinct.cpp
@@ -88,7 +88,7 @@ public:
return false;
}
- bool supportsReadConcern() const final {
+ bool supportsReadConcern(const std::string& dbName, const BSONObj& cmdObj) const final {
return true;
}
diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp
index 8c7759471ce..4a1fbd18510 100644
--- a/src/mongo/db/commands/find_cmd.cpp
+++ b/src/mongo/db/commands/find_cmd.cpp
@@ -94,7 +94,7 @@ public:
return false;
}
- bool supportsReadConcern() const final {
+ bool supportsReadConcern(const std::string& dbName, const BSONObj& cmdObj) const final {
return true;
}
diff --git a/src/mongo/db/commands/geo_near_cmd.cpp b/src/mongo/db/commands/geo_near_cmd.cpp
index ce4f54a5790..ac62769c96d 100644
--- a/src/mongo/db/commands/geo_near_cmd.cpp
+++ b/src/mongo/db/commands/geo_near_cmd.cpp
@@ -74,7 +74,7 @@ public:
bool slaveOverrideOk() const {
return true;
}
- bool supportsReadConcern() const final {
+ bool supportsReadConcern(const std::string& dbName, const BSONObj& cmdObj) const final {
return true;
}
diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp
index 59c2848313c..d14f77b249b 100644
--- a/src/mongo/db/commands/getmore_cmd.cpp
+++ b/src/mongo/db/commands/getmore_cmd.cpp
@@ -97,7 +97,7 @@ public:
return false;
}
- bool supportsReadConcern() const final {
+ bool supportsReadConcern(const std::string& dbName, const BSONObj& cmdObj) const final {
// Uses the readConcern setting from whatever created the cursor.
return false;
}
diff --git a/src/mongo/db/commands/group_cmd.cpp b/src/mongo/db/commands/group_cmd.cpp
index 45cd9c999ce..c0cccd56f6a 100644
--- a/src/mongo/db/commands/group_cmd.cpp
+++ b/src/mongo/db/commands/group_cmd.cpp
@@ -80,7 +80,7 @@ private:
return true;
}
- bool supportsReadConcern() const final {
+ bool supportsReadConcern(const std::string& dbName, const BSONObj& cmdObj) const final {
return true;
}
diff --git a/src/mongo/db/commands/haystack.cpp b/src/mongo/db/commands/haystack.cpp
index b2b7f9977b6..84e9bda79e8 100644
--- a/src/mongo/db/commands/haystack.cpp
+++ b/src/mongo/db/commands/haystack.cpp
@@ -75,7 +75,7 @@ public:
return true;
}
- bool supportsReadConcern() const final {
+ bool supportsReadConcern(const std::string& dbName, const BSONObj& cmdObj) const final {
return true;
}
diff --git a/src/mongo/db/commands/parallel_collection_scan.cpp b/src/mongo/db/commands/parallel_collection_scan.cpp
index a81c9e3a4c5..de4ce3aa8f2 100644
--- a/src/mongo/db/commands/parallel_collection_scan.cpp
+++ b/src/mongo/db/commands/parallel_collection_scan.cpp
@@ -66,7 +66,7 @@ public:
return true;
}
- bool supportsReadConcern() const final {
+ bool supportsReadConcern(const std::string& dbName, const BSONObj& cmdObj) const final {
return true;
}
diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp
index 79a2e76270d..2d9a3e9006d 100644
--- a/src/mongo/db/commands/pipeline_command.cpp
+++ b/src/mongo/db/commands/pipeline_command.cpp
@@ -66,8 +66,8 @@ public:
return true;
}
- bool supportsReadConcern() const override {
- return true;
+ bool supportsReadConcern(const std::string& dbName, const BSONObj& cmdObj) const override {
+ return !AggregationRequest::parseNs(dbName, cmdObj).isCollectionlessAggregateNS();
}
ReadWriteType getReadWriteType() const {
@@ -77,7 +77,7 @@ public:
Status checkAuthForCommand(Client* client,
const std::string& dbname,
const BSONObj& cmdObj) override {
- const NamespaceString nss(parseNsCollectionRequired(dbname, cmdObj));
+ const NamespaceString nss(AggregationRequest::parseNs(dbname, cmdObj));
return AuthorizationSession::get(client)->checkAuthForAggregate(nss, cmdObj);
}
@@ -104,10 +104,8 @@ private:
const BSONObj& cmdObj,
boost::optional<ExplainOptions::Verbosity> verbosity,
BSONObjBuilder* result) {
- const NamespaceString nss(parseNsCollectionRequired(dbname, cmdObj));
-
const auto aggregationRequest =
- uassertStatusOK(AggregationRequest::parseFromBSON(nss, cmdObj, verbosity));
+ uassertStatusOK(AggregationRequest::parseFromBSON(dbname, cmdObj, verbosity));
// If the featureCompatibilityVersion is 3.2, we disallow collation from the user. However,
// operations should still respect the collection default collation. The mongos attaches the
@@ -122,7 +120,8 @@ private:
ServerGlobalParams::FeatureCompatibility::Version::k32 ||
isMergePipeline(aggregationRequest.getPipeline()));
- return runAggregate(opCtx, nss, aggregationRequest, cmdObj, *result);
+ return runAggregate(
+ opCtx, aggregationRequest.getNamespaceString(), aggregationRequest, cmdObj, *result);
}
} pipelineCmd;
diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp
index d2bc6ac1545..0e195ee7870 100644
--- a/src/mongo/db/namespace_string.cpp
+++ b/src/mongo/db/namespace_string.cpp
@@ -77,6 +77,7 @@ const char kLogicalTimeKeysCollection[] = "admin.system.keys";
constexpr auto listCollectionsCursorCol = "$cmd.listCollections"_sd;
constexpr auto listIndexesCursorNSPrefix = "$cmd.listIndexes."_sd;
+constexpr auto collectionlessAggregateCursorCol = "$cmd.aggregate"_sd;
constexpr auto dropPendingNSPrefix = "system.drop."_sd;
} // namespace
@@ -124,6 +125,10 @@ bool NamespaceString::isListIndexesCursorNS() const {
coll().startsWith(listIndexesCursorNSPrefix);
}
+bool NamespaceString::isCollectionlessAggregateNS() const {
+ return coll() == collectionlessAggregateCursorCol;
+}
+
NamespaceString NamespaceString::makeListCollectionsNSS(StringData dbName) {
NamespaceString nss(dbName, listCollectionsCursorCol);
dassert(nss.isValid());
@@ -138,6 +143,13 @@ NamespaceString NamespaceString::makeListIndexesNSS(StringData dbName, StringDat
return nss;
}
+NamespaceString NamespaceString::makeCollectionlessAggregateNSS(StringData dbname) {
+ NamespaceString nss(dbname, collectionlessAggregateCursorCol);
+ dassert(nss.isValid());
+ dassert(nss.isCollectionlessAggregateNS());
+ return nss;
+}
+
NamespaceString NamespaceString::getTargetNSForListIndexes() const {
dassert(isListIndexesCursorNS());
return NamespaceString(db(), coll().substr(listIndexesCursorNSPrefix.size()));
diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h
index deaf6ba2111..f49054c57ef 100644
--- a/src/mongo/db/namespace_string.h
+++ b/src/mongo/db/namespace_string.h
@@ -96,6 +96,12 @@ public:
NamespaceString(StringData dbName, StringData collectionName);
/**
+ * Constructs the namespace '<dbName>.$cmd.aggregate', which we use as the namespace for
+ * aggregation commands with the format {aggregate: 1}.
+ */
+ static NamespaceString makeCollectionlessAggregateNSS(StringData dbName);
+
+ /**
* Constructs a NamespaceString representing a listCollections namespace. The format for this
* namespace is "<dbName>.$cmd.listCollections".
*/
@@ -213,6 +219,7 @@ public:
return coll().startsWith("$cmd."_sd);
}
+ bool isCollectionlessAggregateNS() const;
bool isListCollectionsCursorNS() const;
bool isListIndexesCursorNS() const;
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index b9d2ee31c33..f8b58061d5c 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -128,6 +128,7 @@ env.CppUnitTest(
'document_source_bucket_auto_test.cpp',
'document_source_bucket_test.cpp',
'document_source_count_test.cpp',
+ 'document_source_current_op_test.cpp',
'document_source_geo_near_test.cpp',
'document_source_group_test.cpp',
'document_source_limit_test.cpp',
@@ -227,6 +228,7 @@ docSourceEnv.Library(
'document_source_bucket_auto.cpp',
'document_source_coll_stats.cpp',
'document_source_count.cpp',
+ 'document_source_current_op.cpp',
'document_source_geo_near.cpp',
'document_source_group.cpp',
'document_source_index_stats.cpp',
diff --git a/src/mongo/db/pipeline/aggregation_context_fixture.h b/src/mongo/db/pipeline/aggregation_context_fixture.h
index 786f50a9b5a..ebdb5edcd27 100644
--- a/src/mongo/db/pipeline/aggregation_context_fixture.h
+++ b/src/mongo/db/pipeline/aggregation_context_fixture.h
@@ -46,10 +46,12 @@ namespace mongo {
class AggregationContextFixture : public unittest::Test {
public:
AggregationContextFixture()
+ : AggregationContextFixture(NamespaceString("unittests.pipeline_test")) {}
+
+ AggregationContextFixture(NamespaceString nss)
: _queryServiceContext(stdx::make_unique<QueryTestServiceContext>()),
_opCtx(_queryServiceContext->makeOperationContext()),
- _expCtx(new ExpressionContextForTest(
- _opCtx.get(), AggregationRequest(NamespaceString("unittests.pipeline_test"), {}))) {}
+ _expCtx(new ExpressionContextForTest(_opCtx.get(), AggregationRequest(nss, {}))) {}
boost::intrusive_ptr<ExpressionContextForTest> getExpCtx() {
return _expCtx.get();
diff --git a/src/mongo/db/pipeline/aggregation_request.cpp b/src/mongo/db/pipeline/aggregation_request.cpp
index d2cdfddd0c5..844907bd89a 100644
--- a/src/mongo/db/pipeline/aggregation_request.cpp
+++ b/src/mongo/db/pipeline/aggregation_request.cpp
@@ -82,6 +82,13 @@ StatusWith<std::vector<BSONObj>> AggregationRequest::parsePipelineFromBSON(
}
StatusWith<AggregationRequest> AggregationRequest::parseFromBSON(
+ const std::string& dbName,
+ const BSONObj& cmdObj,
+ boost::optional<ExplainOptions::Verbosity> explainVerbosity) {
+ return parseFromBSON(parseNs(dbName, cmdObj), cmdObj, explainVerbosity);
+}
+
+StatusWith<AggregationRequest> AggregationRequest::parseFromBSON(
NamespaceString nss,
const BSONObj& cmdObj,
boost::optional<ExplainOptions::Verbosity> explainVerbosity) {
@@ -232,10 +239,36 @@ StatusWith<AggregationRequest> AggregationRequest::parseFromBSON(
return request;
}
+NamespaceString AggregationRequest::parseNs(const std::string& dbname, const BSONObj& cmdObj) {
+ auto firstElement = cmdObj.firstElement();
+
+ if (firstElement.isNumber()) {
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << "Invalid command format: the '"
+ << firstElement.fieldNameStringData()
+ << "' field must specify a collection name or 1",
+ firstElement.number() == 1);
+ return NamespaceString::makeCollectionlessAggregateNSS(dbname);
+ } else {
+ uassert(ErrorCodes::TypeMismatch,
+ str::stream() << "collection name has invalid type: "
+ << typeName(firstElement.type()),
+ firstElement.type() == BSONType::String);
+
+ const NamespaceString nss(dbname, firstElement.valueStringData());
+
+ uassert(ErrorCodes::InvalidNamespace,
+ str::stream() << "Invalid namespace specified '" << nss.ns() << "'",
+ nss.isValid() && !nss.isCollectionlessAggregateNS());
+
+ return nss;
+ }
+}
+
Document AggregationRequest::serializeToCommandObj() const {
MutableDocument serialized;
return Document{
- {kCommandName, _nss.coll()},
+ {kCommandName, (_nss.isCollectionlessAggregateNS() ? Value(1) : Value(_nss.coll()))},
{kPipelineName, _pipeline},
// Only serialize booleans if different than their default.
{kAllowDiskUseName, _allowDiskUse ? Value(true) : Value()},
diff --git a/src/mongo/db/pipeline/aggregation_request.h b/src/mongo/db/pipeline/aggregation_request.h
index 4feaea931a8..11e06ec442c 100644
--- a/src/mongo/db/pipeline/aggregation_request.h
+++ b/src/mongo/db/pipeline/aggregation_request.h
@@ -81,6 +81,22 @@ public:
boost::optional<ExplainOptions::Verbosity> explainVerbosity = boost::none);
/**
+ * Convenience overload which constructs the request's NamespaceString from the given database
+ * name and command object.
+ */
+ static StatusWith<AggregationRequest> parseFromBSON(
+ const std::string& dbName,
+ const BSONObj& cmdObj,
+ boost::optional<ExplainOptions::Verbosity> explainVerbosity = boost::none);
+
+ /*
+ * The first field in 'cmdObj' must be a string representing a valid collection name, or the
+ * number 1. In the latter case, returns a reserved namespace that does not represent a user
+ * collection. See 'NamespaceString::makeCollectionlessAggregateNSS()'.
+ */
+ static NamespaceString parseNs(const std::string& dbname, const BSONObj& cmdObj);
+
+ /**
* Constructs an AggregationRequest over the given namespace with the given pipeline. All
* options aside from the pipeline assume their default values.
*/
diff --git a/src/mongo/db/pipeline/aggregation_request_test.cpp b/src/mongo/db/pipeline/aggregation_request_test.cpp
index cb68a8af79d..0b1330009da 100644
--- a/src/mongo/db/pipeline/aggregation_request_test.cpp
+++ b/src/mongo/db/pipeline/aggregation_request_test.cpp
@@ -203,6 +203,20 @@ TEST(AggregationRequestTest, ShouldSerializeBatchSizeIfSetAndExplainFalse) {
ASSERT_DOCUMENT_EQ(request.serializeToCommandObj(), expectedSerialization);
}
+TEST(AggregationRequestTest, ShouldSerialiseAggregateFieldToOneIfCollectionIsAggregateOneNSS) {
+ NamespaceString nss = NamespaceString::makeCollectionlessAggregateNSS("a");
+ AggregationRequest request(nss, {});
+
+ auto expectedSerialization =
+ Document{{AggregationRequest::kCommandName, 1},
+ {AggregationRequest::kPipelineName, Value(std::vector<Value>{})},
+ {AggregationRequest::kCursorName,
+ Value(Document({{AggregationRequest::kBatchSizeName,
+ AggregationRequest::kDefaultBatchSize}}))}};
+
+ ASSERT_DOCUMENT_EQ(request.serializeToCommandObj(), expectedSerialization);
+}
+
TEST(AggregationRequestTest, ShouldSetBatchSizeToDefaultOnEmptyCursorObject) {
NamespaceString nss("a.collection");
const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}}");
@@ -364,6 +378,52 @@ TEST(AggregationRequestTest, ShouldRejectExplainExecStatsVerbosityWithWriteConce
.getStatus());
}
+TEST(AggregationRequestTest, ParseNSShouldReturnAggregateOneNSIfAggregateFieldIsOne) {
+ const std::vector<std::string> ones{
+ "1", "1.0", "NumberInt(1)", "NumberLong(1)", "NumberDecimal('1')"};
+
+ for (auto& one : ones) {
+ const BSONObj inputBSON =
+ fromjson(str::stream() << "{aggregate: " << one << ", pipeline: []}");
+ ASSERT(AggregationRequest::parseNs("a", inputBSON).isCollectionlessAggregateNS());
+ }
+}
+
+TEST(AggregationRequestTest, ParseNSShouldRejectNumericNSIfAggregateFieldIsNotOne) {
+ const BSONObj inputBSON = fromjson("{aggregate: 2, pipeline: []}");
+ ASSERT_THROWS_CODE(
+ AggregationRequest::parseNs("a", inputBSON), UserException, ErrorCodes::FailedToParse);
+}
+
+TEST(AggregationRequestTest, ParseNSShouldRejectNonStringNonNumericNS) {
+ const BSONObj inputBSON = fromjson("{aggregate: {}, pipeline: []}");
+ ASSERT_THROWS_CODE(
+ AggregationRequest::parseNs("a", inputBSON), UserException, ErrorCodes::TypeMismatch);
+}
+
+TEST(AggregationRequestTest, ParseNSShouldRejectAggregateOneStringAsCollectionName) {
+ const BSONObj inputBSON = fromjson("{aggregate: '$cmd.aggregate', pipeline: []}");
+ ASSERT_THROWS_CODE(
+ AggregationRequest::parseNs("a", inputBSON), UserException, ErrorCodes::InvalidNamespace);
+}
+
+TEST(AggregationRequestTest, ParseNSShouldRejectInvalidCollectionName) {
+ const BSONObj inputBSON = fromjson("{aggregate: '', pipeline: []}");
+ ASSERT_THROWS_CODE(
+ AggregationRequest::parseNs("a", inputBSON), UserException, ErrorCodes::InvalidNamespace);
+}
+
+TEST(AggregationRequestTest, ParseFromBSONOverloadsShouldProduceIdenticalRequests) {
+ const BSONObj inputBSON =
+ fromjson("{aggregate: 'collection', pipeline: [{$match: {}}, {$project: {}}], cursor: {}}");
+ NamespaceString nss("a.collection");
+
+ auto aggReqDBName = unittest::assertGet(AggregationRequest::parseFromBSON("a", inputBSON));
+ auto aggReqNSS = unittest::assertGet(AggregationRequest::parseFromBSON(nss, inputBSON));
+
+ ASSERT_DOCUMENT_EQ(aggReqDBName.serializeToCommandObj(), aggReqNSS.serializeToCommandObj());
+}
+
//
// Ignore fields parsed elsewhere.
//
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index 7b724b555cd..fab65e25173 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -118,6 +118,16 @@ public:
using Parser = stdx::function<std::vector<boost::intrusive_ptr<DocumentSource>>(
BSONElement, const boost::intrusive_ptr<ExpressionContext>&)>;
+ enum class InitialSourceType {
+ // Stage requires input from a preceding DocumentSource.
+ kNotInitialSource,
+ // Stage does not need an input source and should be the first stage in the pipeline.
+ kInitialSource,
+ // Similar to kInitialSource, but does not require an underlying collection to produce
+ // output.
+ kCollectionlessInitialSource
+ };
+
/**
* This is what is returned from the main DocumentSource API: getNext(). It is essentially a
* (ReturnStatus, Document) pair, with the first entry being used to communicate information
@@ -249,10 +259,28 @@ public:
boost::optional<ExplainOptions::Verbosity> explain = boost::none) const;
/**
- * Returns true if doesn't require an input source (most DocumentSources do).
+ * Subclasses should return InitialSourceType::kInitialSource if the stage does not require an
+ * input source, or InitialSourceType::kCollectionlessInitialSource if the stage will produce
+ * the input for the pipeline independent of an underlying collection. The latter are specified
+ * with {aggregate: 1}, e.g. $currentOp.
*/
- virtual bool isValidInitialSource() const {
- return false;
+ virtual InitialSourceType getInitialSourceType() const {
+ return InitialSourceType::kNotInitialSource;
+ }
+
+ /**
+ * Returns true if this stage does not require an input source.
+ */
+ bool isInitialSource() const {
+ return getInitialSourceType() != InitialSourceType::kNotInitialSource;
+ }
+
+ /**
+ * Returns true if this stage will produce the input for the pipeline independent of an
+ * underlying collection. These are specified with {aggregate: 1}, e.g. $currentOp.
+ */
+ bool isCollectionlessInitialSource() const {
+ return getInitialSourceType() == InitialSourceType::kCollectionlessInitialSource;
}
/**
@@ -525,6 +553,9 @@ public:
// Wraps mongod-specific functions to allow linking into mongos.
class MongodInterface {
public:
+ enum class CurrentOpConnectionsMode { kIncludeIdle, kExcludeIdle };
+ enum class CurrentOpUserMode { kIncludeAll, kExcludeOthers };
+
virtual ~MongodInterface(){};
/**
@@ -593,6 +624,20 @@ public:
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx) = 0;
+ /**
+ * Returns a vector of owned BSONObjs, each of which contains details of an in-progress
+ * operation or, optionally, an idle connection. If userMode is kIncludeAllUsers, report
+ * operations for all authenticated users; otherwise, report only the current user's
+ * operations.
+ */
+ virtual std::vector<BSONObj> getCurrentOps(CurrentOpConnectionsMode connMode,
+ CurrentOpUserMode userMode) const = 0;
+
+ /**
+ * Returns the name of the local shard if sharding is enabled, or an empty string.
+ */
+ virtual std::string getShardName(OperationContext* opCtx) const = 0;
+
// Add new methods as needed.
};
diff --git a/src/mongo/db/pipeline/document_source_coll_stats.cpp b/src/mongo/db/pipeline/document_source_coll_stats.cpp
index 56ebaca69e3..bdf2ee35b5c 100644
--- a/src/mongo/db/pipeline/document_source_coll_stats.cpp
+++ b/src/mongo/db/pipeline/document_source_coll_stats.cpp
@@ -122,8 +122,8 @@ DocumentSource::GetNextResult DocumentSourceCollStats::getNext() {
return {Document(builder.obj())};
}
-bool DocumentSourceCollStats::isValidInitialSource() const {
- return true;
+DocumentSource::InitialSourceType DocumentSourceCollStats::getInitialSourceType() const {
+ return InitialSourceType::kInitialSource;
}
Value DocumentSourceCollStats::serialize(boost::optional<ExplainOptions::Verbosity> explain) const {
diff --git a/src/mongo/db/pipeline/document_source_coll_stats.h b/src/mongo/db/pipeline/document_source_coll_stats.h
index 9e5aa2a4f3f..ad8673643e6 100644
--- a/src/mongo/db/pipeline/document_source_coll_stats.h
+++ b/src/mongo/db/pipeline/document_source_coll_stats.h
@@ -61,7 +61,7 @@ public:
const char* getSourceName() const final;
- bool isValidInitialSource() const final;
+ InitialSourceType getInitialSourceType() const final;
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
diff --git a/src/mongo/db/pipeline/document_source_current_op.cpp b/src/mongo/db/pipeline/document_source_current_op.cpp
new file mode 100644
index 00000000000..835650961c2
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_current_op.cpp
@@ -0,0 +1,178 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/document_source_current_op.h"
+
+#include "mongo/db/pipeline/lite_parsed_document_source.h"
+#include "mongo/db/server_options.h"
+#include "mongo/util/net/sock.h"
+
+namespace mongo {
+
+namespace {
+const StringData kAllUsersFieldName = "allUsers"_sd;
+const StringData kIdleConnectionsFieldName = "idleConnections"_sd;
+
+const StringData kOpIdFieldName = "opid"_sd;
+const StringData kClientFieldName = "client"_sd;
+const StringData kMongosClientFieldName = "client_s"_sd;
+} // namespace
+
+using boost::intrusive_ptr;
+
+REGISTER_DOCUMENT_SOURCE(currentOp,
+ LiteParsedDocumentSourceDefault::parse,
+ DocumentSourceCurrentOp::createFromBson);
+
+const char* DocumentSourceCurrentOp::getSourceName() const {
+ return "$currentOp";
+}
+
+DocumentSource::InitialSourceType DocumentSourceCurrentOp::getInitialSourceType() const {
+ return InitialSourceType::kCollectionlessInitialSource;
+}
+
+DocumentSource::GetNextResult DocumentSourceCurrentOp::getNext() {
+ if (_ops.empty()) {
+ _ops = _mongod->getCurrentOps(_includeIdleConnections, _includeOpsFromAllUsers);
+
+ _opsIter = _ops.begin();
+
+ if (pExpCtx->inShard) {
+ _shardName = _mongod->getShardName(pExpCtx->opCtx);
+
+ uassert(40465,
+ "Aggregation request specified 'fromRouter' but unable to retrieve shard name "
+ "for $currentOp pipeline stage.",
+ !_shardName.empty());
+ }
+ }
+
+ if (_opsIter != _ops.end()) {
+ if (!pExpCtx->inShard) {
+ return Document(*_opsIter++);
+ }
+
+ // This $currentOp is running in a sharded context.
+ invariant(!_shardName.empty());
+
+ const BSONObj& op = *_opsIter++;
+ MutableDocument doc;
+
+ // For operations on a shard, we change the opid from the raw numeric form to
+ // 'shardname:opid'. We also change the fieldname 'client' to 'client_s' to indicate
+ // that the IP is that of the mongos which initiated this request.
+ for (auto&& elt : op) {
+ StringData fieldName = elt.fieldNameStringData();
+
+ if (fieldName == kOpIdFieldName) {
+ uassert(ErrorCodes::TypeMismatch,
+ str::stream() << "expected numeric opid for $currentOp response from '"
+ << _shardName
+ << "' but got: "
+ << typeName(elt.type()),
+ elt.isNumber());
+
+ std::string shardOpID = (str::stream() << _shardName << ":" << elt.numberInt());
+ doc.addField(kOpIdFieldName, Value(shardOpID));
+ } else if (fieldName == kClientFieldName) {
+ doc.addField(kMongosClientFieldName, Value(elt.str()));
+ } else {
+ doc.addField(fieldName, Value(elt));
+ }
+ }
+
+ return doc.freeze();
+ }
+
+ return GetNextResult::makeEOF();
+}
+
+intrusive_ptr<DocumentSource> DocumentSourceCurrentOp::createFromBson(
+ BSONElement spec, const intrusive_ptr<ExpressionContext>& pExpCtx) {
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << "$currentOp options must be specified in an object, but found: "
+ << typeName(spec.type()),
+ spec.type() == BSONType::Object);
+
+ const NamespaceString& nss = pExpCtx->ns;
+
+ uassert(ErrorCodes::InvalidNamespace,
+ "$currentOp must be run against the 'admin' database with {aggregate: 1}",
+ nss.db() == NamespaceString::kAdminDb && nss.isCollectionlessAggregateNS());
+
+ ConnMode includeIdleConnections = ConnMode::kExcludeIdle;
+ UserMode includeOpsFromAllUsers = UserMode::kExcludeOthers;
+
+ for (auto&& elem : spec.embeddedObject()) {
+ const auto fieldName = elem.fieldNameStringData();
+
+ if (fieldName == kIdleConnectionsFieldName) {
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << "The 'idleConnections' parameter of the $currentOp stage must "
+ "be a boolean value, but found: "
+ << typeName(elem.type()),
+ elem.type() == BSONType::Bool);
+ includeIdleConnections =
+ (elem.Bool() ? ConnMode::kIncludeIdle : ConnMode::kExcludeIdle);
+ } else if (fieldName == kAllUsersFieldName) {
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << "The 'allUsers' parameter of the $currentOp stage must be a "
+ "boolean value, but found: "
+ << typeName(elem.type()),
+ elem.type() == BSONType::Bool);
+ includeOpsFromAllUsers =
+ (elem.Bool() ? UserMode::kIncludeAll : UserMode::kExcludeOthers);
+ } else {
+ uasserted(ErrorCodes::FailedToParse,
+ str::stream() << "Unrecognized option '" << fieldName
+ << "' in $currentOp stage.");
+ }
+ }
+
+ return intrusive_ptr<DocumentSourceCurrentOp>(
+ new DocumentSourceCurrentOp(pExpCtx, includeIdleConnections, includeOpsFromAllUsers));
+}
+
+intrusive_ptr<DocumentSourceCurrentOp> DocumentSourceCurrentOp::create(
+ const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
+ ConnMode includeIdleConnections,
+ UserMode includeOpsFromAllUsers) {
+ return intrusive_ptr<DocumentSourceCurrentOp>(
+ new DocumentSourceCurrentOp(pExpCtx, includeIdleConnections, includeOpsFromAllUsers));
+}
+
+Value DocumentSourceCurrentOp::serialize(boost::optional<ExplainOptions::Verbosity> explain) const {
+ return Value(Document{
+ {getSourceName(),
+ Document{{kIdleConnectionsFieldName, (_includeIdleConnections == ConnMode::kIncludeIdle)},
+ {kAllUsersFieldName, (_includeOpsFromAllUsers == UserMode::kIncludeAll)}}}});
+}
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_current_op.h b/src/mongo/db/pipeline/document_source_current_op.h
new file mode 100644
index 00000000000..b770cff75e4
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_current_op.h
@@ -0,0 +1,73 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/document_source.h"
+
+namespace mongo {
+
+class DocumentSourceCurrentOp final : public DocumentSourceNeedsMongod {
+public:
+ using ConnMode = MongodInterface::CurrentOpConnectionsMode;
+ using UserMode = MongodInterface::CurrentOpUserMode;
+
+ static boost::intrusive_ptr<DocumentSourceCurrentOp> create(
+ const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
+ ConnMode includeIdleConnections = ConnMode::kExcludeIdle,
+ UserMode includeOpsFromAllUsers = UserMode::kExcludeOthers);
+
+ GetNextResult getNext() final;
+
+ const char* getSourceName() const final;
+
+ InitialSourceType getInitialSourceType() const final;
+
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+ Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
+
+private:
+ DocumentSourceCurrentOp(const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
+ ConnMode includeIdleConnections = ConnMode::kExcludeIdle,
+ UserMode includeOpsFromAllUsers = UserMode::kExcludeOthers)
+ : DocumentSourceNeedsMongod(pExpCtx),
+ _includeIdleConnections(includeIdleConnections),
+ _includeOpsFromAllUsers(includeOpsFromAllUsers) {}
+
+ ConnMode _includeIdleConnections = ConnMode::kExcludeIdle;
+ UserMode _includeOpsFromAllUsers = UserMode::kExcludeOthers;
+
+ std::string _shardName;
+
+ std::vector<BSONObj> _ops;
+ std::vector<BSONObj>::iterator _opsIter;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_current_op_test.cpp b/src/mongo/db/pipeline/document_source_current_op_test.cpp
new file mode 100644
index 00000000000..e6fa1c8ba95
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_current_op_test.cpp
@@ -0,0 +1,235 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/aggregation_context_fixture.h"
+#include "mongo/db/pipeline/document.h"
+#include "mongo/db/pipeline/document_source_current_op.h"
+#include "mongo/db/pipeline/document_value_test_util.h"
+#include "mongo/db/pipeline/stub_mongod_interface.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/assert_util.h"
+#include "mongo/util/mongoutils/str.h"
+
+namespace mongo {
+
+namespace {
+
+const std::string kMockShardName = "testshard";
+
+/**
+ * Subclass AggregationContextFixture to set the ExpressionContext's namespace to 'admin' with
+ * {aggregate: 1} by default, so that parsing tests other than those which validate the namespace do
+ * not need to explicitly set it.
+ */
+class DocumentSourceCurrentOpTest : public AggregationContextFixture {
+public:
+ DocumentSourceCurrentOpTest()
+ : AggregationContextFixture(NamespaceString::makeCollectionlessAggregateNSS("admin")) {}
+};
+
+/**
+ * A MongodInterface used for testing which returns artificial currentOp entries.
+ */
+class MockMongodImplementation final : public StubMongodInterface {
+public:
+ MockMongodImplementation(std::vector<BSONObj> ops, bool hasShardName = true)
+ : _ops(std::move(ops)), _hasShardName(hasShardName) {}
+
+ MockMongodImplementation(bool hasShardName = true) : _hasShardName(hasShardName) {}
+
+ std::vector<BSONObj> getCurrentOps(CurrentOpConnectionsMode connMode,
+ CurrentOpUserMode userMode) const {
+ return _ops;
+ }
+
+ std::string getShardName(OperationContext* opCtx) const {
+ if (_hasShardName) {
+ return kMockShardName;
+ }
+
+ return std::string();
+ }
+
+private:
+ std::vector<BSONObj> _ops;
+ bool _hasShardName;
+};
+
+TEST_F(DocumentSourceCurrentOpTest, ShouldFailToParseIfSpecIsNotObject) {
+ const auto specObj = fromjson("{$currentOp:1}");
+ ASSERT_THROWS_CODE(DocumentSourceCurrentOp::createFromBson(specObj.firstElement(), getExpCtx()),
+ UserException,
+ ErrorCodes::FailedToParse);
+}
+
+TEST_F(DocumentSourceCurrentOpTest, ShouldFailToParseIfNotRunOnAdmin) {
+ const auto specObj = fromjson("{$currentOp:{}}");
+ getExpCtx()->ns = NamespaceString::makeCollectionlessAggregateNSS("foo");
+ ASSERT_THROWS_CODE(DocumentSourceCurrentOp::createFromBson(specObj.firstElement(), getExpCtx()),
+ UserException,
+ ErrorCodes::InvalidNamespace);
+}
+
+TEST_F(DocumentSourceCurrentOpTest, ShouldFailToParseIfNotRunWithAggregateOne) {
+ const auto specObj = fromjson("{$currentOp:{}}");
+ getExpCtx()->ns = NamespaceString("admin.foo");
+ ASSERT_THROWS_CODE(DocumentSourceCurrentOp::createFromBson(specObj.firstElement(), getExpCtx()),
+ UserException,
+ ErrorCodes::InvalidNamespace);
+}
+
+TEST_F(DocumentSourceCurrentOpTest, ShouldFailToParseIdleConnectionsIfNotBoolean) {
+ const auto specObj = fromjson("{$currentOp:{idleConnections:1}}");
+ ASSERT_THROWS_CODE(DocumentSourceCurrentOp::createFromBson(specObj.firstElement(), getExpCtx()),
+ UserException,
+ ErrorCodes::FailedToParse);
+}
+
+TEST_F(DocumentSourceCurrentOpTest, ShouldFailToParseAllUsersIfNotBoolean) {
+ const auto specObj = fromjson("{$currentOp:{allUsers:1}}");
+ ASSERT_THROWS_CODE(DocumentSourceCurrentOp::createFromBson(specObj.firstElement(), getExpCtx()),
+ UserException,
+ ErrorCodes::FailedToParse);
+}
+
+TEST_F(DocumentSourceCurrentOpTest, ShouldFailToParseIfUnrecognisedParameterSpecified) {
+ const auto specObj = fromjson("{$currentOp:{foo:true}}");
+ ASSERT_THROWS_CODE(DocumentSourceCurrentOp::createFromBson(specObj.firstElement(), getExpCtx()),
+ UserException,
+ ErrorCodes::FailedToParse);
+}
+
+TEST_F(DocumentSourceCurrentOpTest, ShouldParseAndSerializeTrueOptionalArguments) {
+ const auto specObj = fromjson("{$currentOp:{idleConnections:true, allUsers:true}}");
+
+ const auto parsed =
+ DocumentSourceCurrentOp::createFromBson(specObj.firstElement(), getExpCtx());
+
+ const auto currentOp = static_cast<DocumentSourceCurrentOp*>(parsed.get());
+
+ const auto expectedOutput =
+ Document{{"$currentOp", Document{{"idleConnections", true}, {"allUsers", true}}}};
+
+ ASSERT_DOCUMENT_EQ(currentOp->serialize().getDocument(), expectedOutput);
+}
+
+TEST_F(DocumentSourceCurrentOpTest, ShouldParseAndSerializeFalseOptionalArguments) {
+ const auto specObj = fromjson("{$currentOp:{idleConnections:false, allUsers:false}}");
+
+ const auto parsed =
+ DocumentSourceCurrentOp::createFromBson(specObj.firstElement(), getExpCtx());
+
+ const auto currentOp = static_cast<DocumentSourceCurrentOp*>(parsed.get());
+
+ const auto expectedOutput =
+ Document{{"$currentOp", Document{{"idleConnections", false}, {"allUsers", false}}}};
+
+ ASSERT_DOCUMENT_EQ(currentOp->serialize().getDocument(), expectedOutput);
+}
+
+TEST_F(DocumentSourceCurrentOpTest, ShouldSerializeOmittedOptionalArgumentsAsDefaultValues) {
+ const auto specObj = fromjson("{$currentOp:{}}");
+
+ const auto parsed =
+ DocumentSourceCurrentOp::createFromBson(specObj.firstElement(), getExpCtx());
+
+ const auto currentOp = static_cast<DocumentSourceCurrentOp*>(parsed.get());
+
+ const auto expectedOutput =
+ Document{{"$currentOp", Document{{"idleConnections", false}, {"allUsers", false}}}};
+
+ ASSERT_DOCUMENT_EQ(currentOp->serialize().getDocument(), expectedOutput);
+}
+
+TEST_F(DocumentSourceCurrentOpTest, ShouldReturnEOFImmediatelyIfNoCurrentOps) {
+ const auto currentOp = DocumentSourceCurrentOp::create(getExpCtx());
+ const auto mongod = std::make_shared<MockMongodImplementation>();
+ currentOp->injectMongodInterface(mongod);
+
+ ASSERT(currentOp->getNext().isEOF());
+}
+
+TEST_F(DocumentSourceCurrentOpTest, ShouldModifyOpIDAndClientFieldNameInShardedContext) {
+ getExpCtx()->inShard = true;
+
+ std::vector<BSONObj> ops{fromjson("{ client: '192.168.1.10:50844', opid: 430 }")};
+ const auto mongod = std::make_shared<MockMongodImplementation>(ops);
+
+ const auto currentOp = DocumentSourceCurrentOp::create(getExpCtx());
+ currentOp->injectMongodInterface(mongod);
+
+ const auto expectedOutput =
+ Document{{"client_s", std::string("192.168.1.10:50844")},
+ {"opid", std::string(str::stream() << kMockShardName << ":430")}};
+
+ ASSERT_DOCUMENT_EQ(currentOp->getNext().getDocument(), expectedOutput);
+}
+
+TEST_F(DocumentSourceCurrentOpTest,
+ ShouldReturnOpIDAndClientFieldNameUnmodifiedWhenNotInShardedContext) {
+ getExpCtx()->inShard = false;
+
+ std::vector<BSONObj> ops{fromjson("{ client: '192.168.1.10:50844', opid: 430 }")};
+ const auto mongod = std::make_shared<MockMongodImplementation>(ops);
+
+ const auto currentOp = DocumentSourceCurrentOp::create(getExpCtx());
+ currentOp->injectMongodInterface(mongod);
+
+ const auto expectedOutput =
+ Document{{"client", std::string("192.168.1.10:50844")}, {"opid", 430}};
+
+ ASSERT_DOCUMENT_EQ(currentOp->getNext().getDocument(), expectedOutput);
+}
+
+TEST_F(DocumentSourceCurrentOpTest, ShouldFailIfNoShardNameAvailableForShardedRequest) {
+ getExpCtx()->inShard = true;
+
+ const auto mongod = std::make_shared<MockMongodImplementation>(false);
+
+ const auto currentOp = DocumentSourceCurrentOp::create(getExpCtx());
+ currentOp->injectMongodInterface(mongod);
+
+ ASSERT_THROWS_CODE(currentOp->getNext(), UserException, 40465);
+}
+
+TEST_F(DocumentSourceCurrentOpTest, ShouldFailIfOpIDIsNonNumericWhenModifyingInShardedContext) {
+ getExpCtx()->inShard = true;
+
+ std::vector<BSONObj> ops{fromjson("{ client: '192.168.1.10:50844', opid: 'string' }")};
+ const auto mongod = std::make_shared<MockMongodImplementation>(ops);
+
+ const auto currentOp = DocumentSourceCurrentOp::create(getExpCtx());
+ currentOp->injectMongodInterface(mongod);
+
+ ASSERT_THROWS_CODE(currentOp->getNext(), UserException, ErrorCodes::TypeMismatch);
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_cursor.h b/src/mongo/db/pipeline/document_source_cursor.h
index 85e79b3cb2a..b3cdf802d4e 100644
--- a/src/mongo/db/pipeline/document_source_cursor.h
+++ b/src/mongo/db/pipeline/document_source_cursor.h
@@ -50,8 +50,8 @@ public:
return _outputSorts;
}
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
- bool isValidInitialSource() const final {
- return true;
+ InitialSourceType getInitialSourceType() const final {
+ return InitialSourceType::kInitialSource;
}
void detachFromOperationContext() final;
diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp
index 99dcbd85827..c2248189a3f 100644
--- a/src/mongo/db/pipeline/document_source_facet.cpp
+++ b/src/mongo/db/pipeline/document_source_facet.cpp
@@ -278,7 +278,7 @@ intrusive_ptr<DocumentSource> DocumentSourceFacet::createFromBson(
// Disallow any stages that need to be the first stage in the pipeline.
for (auto&& stage : pipeline->getSources()) {
- if (stage->isValidInitialSource()) {
+ if (stage->isInitialSource()) {
uasserted(40173,
str::stream() << stage->getSourceName()
<< " is not allowed to be used within a $facet stage: "
diff --git a/src/mongo/db/pipeline/document_source_facet_test.cpp b/src/mongo/db/pipeline/document_source_facet_test.cpp
index 54c22a306fc..da1f035374f 100644
--- a/src/mongo/db/pipeline/document_source_facet_test.cpp
+++ b/src/mongo/db/pipeline/document_source_facet_test.cpp
@@ -170,8 +170,8 @@ public:
DocumentSourcePassthrough() : DocumentSourceMock({}) {}
// We need this to be false so that it can be used in a $facet stage.
- bool isValidInitialSource() const final {
- return false;
+ InitialSourceType getInitialSourceType() const final {
+ return InitialSourceType::kNotInitialSource;
}
DocumentSource::GetNextResult getNext() final {
diff --git a/src/mongo/db/pipeline/document_source_geo_near.h b/src/mongo/db/pipeline/document_source_geo_near.h
index 03bcf304eee..5178a89c18a 100644
--- a/src/mongo/db/pipeline/document_source_geo_near.h
+++ b/src/mongo/db/pipeline/document_source_geo_near.h
@@ -46,8 +46,8 @@ public:
*/
Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr,
Pipeline::SourceContainer* container) final;
- bool isValidInitialSource() const final {
- return true;
+ InitialSourceType getInitialSourceType() const final {
+ return InitialSourceType::kInitialSource;
}
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
BSONObjSet getOutputSorts() final {
diff --git a/src/mongo/db/pipeline/document_source_index_stats.h b/src/mongo/db/pipeline/document_source_index_stats.h
index 26beb7edb8b..e802e0d7016 100644
--- a/src/mongo/db/pipeline/document_source_index_stats.h
+++ b/src/mongo/db/pipeline/document_source_index_stats.h
@@ -44,8 +44,8 @@ public:
const char* getSourceName() const final;
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
- virtual bool isValidInitialSource() const final {
- return true;
+ virtual InitialSourceType getInitialSourceType() const final {
+ return InitialSourceType::kInitialSource;
}
static boost::intrusive_ptr<DocumentSource> createFromBson(
diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.h b/src/mongo/db/pipeline/document_source_merge_cursors.h
index b32e1170573..ebe0a861ad6 100644
--- a/src/mongo/db/pipeline/document_source_merge_cursors.h
+++ b/src/mongo/db/pipeline/document_source_merge_cursors.h
@@ -52,8 +52,8 @@ public:
GetNextResult getNext() final;
const char* getSourceName() const final;
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
- bool isValidInitialSource() const final {
- return true;
+ InitialSourceType getInitialSourceType() const final {
+ return InitialSourceType::kInitialSource;
}
static boost::intrusive_ptr<DocumentSource> createFromBson(
diff --git a/src/mongo/db/pipeline/document_source_mock.h b/src/mongo/db/pipeline/document_source_mock.h
index bdde16b711f..9236e55ad62 100644
--- a/src/mongo/db/pipeline/document_source_mock.h
+++ b/src/mongo/db/pipeline/document_source_mock.h
@@ -48,8 +48,8 @@ public:
const char* getSourceName() const override;
Value serialize(
boost::optional<ExplainOptions::Verbosity> explain = boost::none) const override;
- bool isValidInitialSource() const override {
- return true;
+ InitialSourceType getInitialSourceType() const override {
+ return InitialSourceType::kInitialSource;
}
BSONObjSet getOutputSorts() override {
return sorts;
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index 8c486e9ca4d..01171b86e6d 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -42,6 +42,7 @@
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_geo_near.h"
#include "mongo/db/pipeline/document_source_match.h"
+#include "mongo/db/pipeline/document_source_merge_cursors.h"
#include "mongo/db/pipeline/document_source_out.h"
#include "mongo/db/pipeline/document_source_project.h"
#include "mongo/db/pipeline/document_source_unwind.h"
@@ -79,10 +80,11 @@ StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> Pipeline::parse(
pipeline->_sources.end(), parsedSources.begin(), parsedSources.end());
}
- auto status = pipeline->ensureAllStagesAreInLegalPositions();
+ auto status = pipeline->validate();
if (!status.isOK()) {
return status;
}
+
pipeline->stitch();
return std::move(pipeline);
}
@@ -91,18 +93,47 @@ StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> Pipeline::create(
SourceContainer stages, const intrusive_ptr<ExpressionContext>& expCtx) {
std::unique_ptr<Pipeline, Pipeline::Deleter> pipeline(new Pipeline(stages, expCtx),
Pipeline::Deleter(expCtx->opCtx));
- auto status = pipeline->ensureAllStagesAreInLegalPositions();
+ auto status = pipeline->validate();
if (!status.isOK()) {
return status;
}
+
pipeline->stitch();
return std::move(pipeline);
}
-Status Pipeline::ensureAllStagesAreInLegalPositions() const {
+Status Pipeline::validate() const {
+ // Verify that the specified namespace is valid for the initial stage of this pipeline.
+ const NamespaceString& nss = pCtx->ns;
+
+ if (_sources.empty()) {
+ if (nss.isCollectionlessAggregateNS()) {
+ return {ErrorCodes::InvalidNamespace,
+ "{aggregate: 1} is not valid for an empty pipeline."};
+ }
+ } else if (!dynamic_cast<DocumentSourceMergeCursors*>(_sources.front().get())) {
+ // The $mergeCursors stage can take {aggregate: 1} or a normal namespace. Aside from this,
+ // {aggregate: 1} is only valid for collectionless sources, and vice-versa.
+ const auto firstStage = _sources.front().get();
+
+ if (nss.isCollectionlessAggregateNS() && !firstStage->isCollectionlessInitialSource()) {
+ return {ErrorCodes::InvalidNamespace,
+ str::stream() << "{aggregate: 1} is not valid for '"
+ << firstStage->getSourceName()
+ << "'; a collection is required."};
+ }
+
+ if (!nss.isCollectionlessAggregateNS() && firstStage->isCollectionlessInitialSource()) {
+ return {ErrorCodes::InvalidNamespace,
+ str::stream() << "'" << firstStage->getSourceName()
+ << "' can only be run with {aggregate: 1}"};
+ }
+ }
+
+ // Verify that all stages of the pipeline are in legal positions.
size_t i = 0;
for (auto&& stage : _sources) {
- if (stage->isValidInitialSource() && i != 0) {
+ if (stage->isInitialSource() && i != 0) {
return {ErrorCodes::BadValue,
str::stream() << stage->getSourceName()
<< " is only valid as the first stage in a pipeline."};
diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h
index b32b5df76d5..8a7f4211490 100644
--- a/src/mongo/db/pipeline/pipeline.h
+++ b/src/mongo/db/pipeline/pipeline.h
@@ -253,10 +253,11 @@ private:
void unstitch();
/**
- * Returns a non-OK status if any stage is in an invalid position. For example, if an $out stage
- * is present but is not the last stage in the pipeline.
+ * Returns a non-OK status if the pipeline fails any of a set of semantic checks. For example,
+ * if an $out stage is present then it must come last in the pipeline, while initial stages such
+ * as $indexStats must be at the start.
*/
- Status ensureAllStagesAreInLegalPositions() const;
+ Status validate() const;
SourceContainer _sources;
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index b26baeb7bb4..06777b9d162 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -34,6 +34,7 @@
#include "mongo/bson/simple_bsonobj_comparator.h"
#include "mongo/client/dbclientinterface.h"
+#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog/database.h"
#include "mongo/db/catalog/document_validation.h"
@@ -66,10 +67,12 @@
#include "mongo/db/s/sharded_connection_info.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/service_context.h"
+#include "mongo/db/stats/fill_locker_info.h"
#include "mongo/db/stats/storage_stats.h"
#include "mongo/db/stats/top.h"
#include "mongo/db/storage/record_store.h"
#include "mongo/db/storage/sorted_data_interface.h"
+#include "mongo/rpc/metadata/client_metadata_ismaster.h"
#include "mongo/s/chunk_version.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/log.h"
@@ -212,6 +215,78 @@ public:
return pipeline;
}
+ std::vector<BSONObj> getCurrentOps(CurrentOpConnectionsMode connMode,
+ CurrentOpUserMode userMode) const {
+ AuthorizationSession* ctxAuth = AuthorizationSession::get(_ctx->opCtx->getClient());
+
+ std::vector<BSONObj> ops;
+
+ for (ServiceContext::LockedClientsCursor cursor(
+ _ctx->opCtx->getClient()->getServiceContext());
+ Client* client = cursor.next();) {
+ invariant(client);
+
+ stdx::lock_guard<Client> lk(*client);
+
+ // If auth is disabled, ignore the allUsers parameter.
+ if (ctxAuth->getAuthorizationManager().isAuthEnabled() &&
+ userMode == CurrentOpUserMode::kExcludeOthers &&
+ !ctxAuth->isCoauthorizedWithClient(client)) {
+ continue;
+ }
+
+ const OperationContext* clientOpCtx = client->getOperationContext();
+
+ if (!clientOpCtx && connMode == CurrentOpConnectionsMode::kExcludeIdle) {
+ continue;
+ }
+
+ BSONObjBuilder infoBuilder;
+
+ client->reportState(infoBuilder);
+
+ const auto& clientMetadata =
+ ClientMetadataIsMasterState::get(client).getClientMetadata();
+
+ if (clientMetadata) {
+ auto appName = clientMetadata.get().getApplicationName();
+ if (!appName.empty()) {
+ infoBuilder.append("appName", appName);
+ }
+
+ auto clientMetadataDocument = clientMetadata.get().getDocument();
+ infoBuilder.append("clientMetadata", clientMetadataDocument);
+ }
+
+ // Fill out the rest of the BSONObj with opCtx specific details.
+ infoBuilder.appendBool("active", static_cast<bool>(clientOpCtx));
+ if (clientOpCtx) {
+ infoBuilder.append("opid", clientOpCtx->getOpID());
+ if (clientOpCtx->isKillPending()) {
+ infoBuilder.append("killPending", true);
+ }
+
+ CurOp::get(clientOpCtx)->reportState(&infoBuilder);
+
+ Locker::LockerInfo lockerInfo;
+ clientOpCtx->lockState()->getLockerInfo(&lockerInfo);
+ fillLockerInfo(lockerInfo, infoBuilder);
+ }
+
+ ops.emplace_back(infoBuilder.obj());
+ }
+
+ return ops;
+ }
+
+ std::string getShardName(OperationContext* opCtx) const {
+ if (ShardingState::get(opCtx)->enabled()) {
+ return ShardingState::get(opCtx)->getShardName();
+ }
+
+ return std::string();
+ }
+
private:
intrusive_ptr<ExpressionContext> _ctx;
DBDirectClient _client;
@@ -355,7 +430,7 @@ void PipelineD::prepareCursorSource(Collection* collection,
}
if (!sources.empty()) {
- if (sources.front()->isValidInitialSource()) {
+ if (sources.front()->isInitialSource()) {
return; // don't need a cursor
}
diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp
index a0f50825b49..3a1aae261be 100644
--- a/src/mongo/db/pipeline/pipeline_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_test.cpp
@@ -1275,6 +1275,61 @@ TEST(PipelineInitialSource, MatchInitialQuery) {
ASSERT_BSONOBJ_EQ(pipe->getInitialQuery(), BSON("a" << 4));
}
+namespace Namespaces {
+
+using PipelineInitialSourceNSTest = AggregationContextFixture;
+
+class DocumentSourceCollectionlessMock : public DocumentSourceMock {
+public:
+ DocumentSourceCollectionlessMock() : DocumentSourceMock({}) {}
+
+ InitialSourceType getInitialSourceType() const final {
+ return InitialSourceType::kCollectionlessInitialSource;
+ }
+
+ static boost::intrusive_ptr<DocumentSourceCollectionlessMock> create() {
+ return new DocumentSourceCollectionlessMock();
+ }
+};
+
+TEST_F(PipelineInitialSourceNSTest, AggregateOneNSNotValidForEmptyPipeline) {
+ const std::vector<BSONObj> rawPipeline = {};
+ auto ctx = getExpCtx();
+
+ ctx->ns = NamespaceString::makeCollectionlessAggregateNSS("a");
+
+ ASSERT_NOT_OK(Pipeline::parse(rawPipeline, ctx).getStatus());
+}
+
+TEST_F(PipelineInitialSourceNSTest, AggregateOneNSNotValidIfInitialStageRequiresCollection) {
+ const std::vector<BSONObj> rawPipeline = {fromjson("{$match: {}}")};
+ auto ctx = getExpCtx();
+
+ ctx->ns = NamespaceString::makeCollectionlessAggregateNSS("a");
+
+ ASSERT_NOT_OK(Pipeline::parse(rawPipeline, ctx).getStatus());
+}
+
+TEST_F(PipelineInitialSourceNSTest, AggregateOneNSValidIfInitialStageIsCollectionless) {
+ auto collectionlessSource = DocumentSourceCollectionlessMock::create();
+ auto ctx = getExpCtx();
+
+ ctx->ns = NamespaceString::makeCollectionlessAggregateNSS("a");
+
+ ASSERT_OK(Pipeline::create({collectionlessSource}, ctx).getStatus());
+}
+
+TEST_F(PipelineInitialSourceNSTest, CollectionNSNotValidIfInitialStageIsCollectionless) {
+ auto collectionlessSource = DocumentSourceCollectionlessMock::create();
+ auto ctx = getExpCtx();
+
+ ctx->ns = NamespaceString("a.collection");
+
+ ASSERT_NOT_OK(Pipeline::create({collectionlessSource}, ctx).getStatus());
+}
+
+} // namespace Namespaces
+
namespace Dependencies {
using PipelineDependenciesTest = AggregationContextFixture;
@@ -1300,8 +1355,8 @@ class DocumentSourceDependencyDummy : public DocumentSourceMock {
public:
DocumentSourceDependencyDummy() : DocumentSourceMock({}) {}
- bool isValidInitialSource() const final {
- return false;
+ InitialSourceType getInitialSourceType() const final {
+ return InitialSourceType::kNotInitialSource;
}
};
diff --git a/src/mongo/db/pipeline/stub_mongod_interface.h b/src/mongo/db/pipeline/stub_mongod_interface.h
index e21ccc4c822..e75e98e91f8 100644
--- a/src/mongo/db/pipeline/stub_mongod_interface.h
+++ b/src/mongo/db/pipeline/stub_mongod_interface.h
@@ -92,5 +92,14 @@ public:
const boost::intrusive_ptr<ExpressionContext>& expCtx) override {
MONGO_UNREACHABLE;
}
+
+ std::vector<BSONObj> getCurrentOps(CurrentOpConnectionsMode connMode,
+ CurrentOpUserMode userMode) const override {
+ MONGO_UNREACHABLE;
+ }
+
+ std::string getShardName(OperationContext* opCtx) const override {
+ MONGO_UNREACHABLE;
+ }
};
} // namespace mongo
diff --git a/src/mongo/db/run_commands.cpp b/src/mongo/db/run_commands.cpp
index 704549bf868..fca1cfb1cde 100644
--- a/src/mongo/db/run_commands.cpp
+++ b/src/mongo/db/run_commands.cpp
@@ -365,7 +365,7 @@ bool runCommandImpl(OperationContext* opCtx,
const std::string db = request.getDatabase().toString();
BSONObjBuilder inPlaceReplyBob = replyBuilder->getInPlaceReplyBuilder(bytesToReserve);
- auto readConcernArgsStatus = _extractReadConcern(cmd, command->supportsReadConcern());
+ auto readConcernArgsStatus = _extractReadConcern(cmd, command->supportsReadConcern(db, cmd));
if (!readConcernArgsStatus.isOK()) {
auto result =
@@ -433,8 +433,9 @@ bool runCommandImpl(OperationContext* opCtx,
// When a linearizable read command is passed in, check to make sure we're reading
// from the primary.
- if (command->supportsReadConcern() && (readConcernArgsStatus.getValue().getLevel() ==
- repl::ReadConcernLevel::kLinearizableReadConcern) &&
+ if (command->supportsReadConcern(db, cmd) &&
+ (readConcernArgsStatus.getValue().getLevel() ==
+ repl::ReadConcernLevel::kLinearizableReadConcern) &&
(request.getCommandName() != "getMore")) {
auto linearizableReadStatus = waitForLinearizableReadConcern(opCtx);