summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2017-06-16 11:10:59 +0100
committerBernard Gorman <bernard.gorman@gmail.com>2017-06-16 22:57:40 +0100
commit69fd148aafff5bc9c33596b75560f05a6a260f7a (patch)
treecdfacbaafae55ea7741766516130eeba5a8da181
parente5eb1981a68bfc3250b72ca14a9131e2749b4cf7 (diff)
downloadmongo-69fd148aafff5bc9c33596b75560f05a6a260f7a.tar.gz
SERVER-19318 Allow $currentOp aggregations to be run on mongoS
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_auth.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_auth_audit.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml3
-rw-r--r--jstests/auth/lib/commands_lib.js5
-rw-r--r--jstests/noPassthrough/aggregation_currentop.js347
-rw-r--r--jstests/sharding/aggregation_currentop.js439
-rw-r--r--jstests/sharding/read_pref_cmd.js13
-rw-r--r--src/mongo/db/auth/authorization_session.cpp61
-rw-r--r--src/mongo/db/auth/authorization_session.h12
-rw-r--r--src/mongo/db/auth/authorization_session_test.cpp147
-rw-r--r--src/mongo/db/commands/dbcommands.cpp4
-rw-r--r--src/mongo/db/commands/pipeline_command.cpp2
-rw-r--r--src/mongo/s/commands/cluster_aggregate.cpp40
-rw-r--r--src/mongo/s/commands/cluster_commands_helpers.cpp92
-rw-r--r--src/mongo/s/commands/cluster_commands_helpers.h53
-rw-r--r--src/mongo/s/commands/cluster_count_cmd.cpp36
-rw-r--r--src/mongo/s/commands/cluster_current_op.cpp4
-rw-r--r--src/mongo/s/commands/cluster_db_stats_cmd.cpp4
-rw-r--r--src/mongo/s/commands/cluster_pipeline_cmd.cpp17
-rw-r--r--src/mongo/s/commands/commands_public.cpp38
-rw-r--r--src/mongo/s/commands/strategy.cpp18
22 files changed, 782 insertions, 556 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_auth.yml b/buildscripts/resmokeconfig/suites/sharding_auth.yml
index beaa46b226f..78ed6fd8db5 100644
--- a/buildscripts/resmokeconfig/suites/sharding_auth.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_auth.yml
@@ -11,6 +11,7 @@ selector:
exclude_files:
# Skip any tests that run with auth explicitly.
- jstests/sharding/*[aA]uth*.js
+ - jstests/sharding/aggregation_currentop.js # SERVER-19318
# Skip these additional tests when running with auth enabled.
- jstests/sharding/copydb_from_mongos.js # SERVER-13080
- jstests/sharding/parallel.js
diff --git a/buildscripts/resmokeconfig/suites/sharding_auth_audit.yml b/buildscripts/resmokeconfig/suites/sharding_auth_audit.yml
index c6713f0a088..cca681230a4 100644
--- a/buildscripts/resmokeconfig/suites/sharding_auth_audit.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_auth_audit.yml
@@ -11,6 +11,7 @@ selector:
exclude_files:
# Skip any tests that run with auth explicitly.
- jstests/sharding/*[aA]uth*.js
+ - jstests/sharding/aggregation_currentop.js # SERVER-19318
# Skip these additional tests when running with auth enabled.
- jstests/sharding/copydb_from_mongos.js # SERVER-13080
- jstests/sharding/parallel.js
diff --git a/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml b/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml
index 30e0da2764a..4930e255dea 100644
--- a/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml
@@ -10,6 +10,7 @@ selector:
- jstests/sharding/localhostAuthBypass.js
- jstests/sharding/mongos_rs_auth_shard_failure_tolerance.js
- jstests/sharding/mrShardedOutputAuth.js
+ - jstests/sharding/aggregation_currentop.js
# Count/write/aggregate/group commands against the config shard do not support retries yet
- jstests/sharding/addshard1.js
- jstests/sharding/addshard2.js
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
index eff67db9902..9fb0e5d93a7 100644
--- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
@@ -32,6 +32,9 @@ selector:
- jstests/sharding/migration_ignore_interrupts_4.js
- jstests/sharding/mongos_query_comment.js
- jstests/sharding/sharded_profile.js
+ # SERVER-19318: added new $currentOp agg stage in 3.6
+ - jstests/sharding/aggregation_currentop.js
+ - jstests/sharding/read_pref_cmd.js
executor:
config:
diff --git a/jstests/auth/lib/commands_lib.js b/jstests/auth/lib/commands_lib.js
index 1751a33cb37..a71a8af6fca 100644
--- a/jstests/auth/lib/commands_lib.js
+++ b/jstests/auth/lib/commands_lib.js
@@ -640,7 +640,6 @@ var authCommandsLib = {
{
testname: "aggregate_currentOp_allUsers_true",
command: {aggregate: 1, pipeline: [{$currentOp: {allUsers: true}}], cursor: {}},
- skipSharded: true,
testcases: [
{
runOnDb: adminDbName,
@@ -658,8 +657,8 @@ var authCommandsLib = {
{
testname: "aggregate_currentOp_allUsers_false",
command: {aggregate: 1, pipeline: [{$currentOp: {allUsers: false}}], cursor: {}},
- skipSharded: true,
- testcases: [{runOnDb: adminDbName, roles: roles_all}]
+ testcases: [{runOnDb: adminDbName, roles: roles_all}],
+ skipSharded: true
},
{
testname: "aggregate_lookup",
diff --git a/jstests/noPassthrough/aggregation_currentop.js b/jstests/noPassthrough/aggregation_currentop.js
deleted file mode 100644
index ed554402d4f..00000000000
--- a/jstests/noPassthrough/aggregation_currentop.js
+++ /dev/null
@@ -1,347 +0,0 @@
-/**
- * Tests that the $currentOp aggregation stage behaves as expected. Specifically:
- * - It must be the fist stage in the pipeline.
- * - It can only be run on admin, and the "aggregate" field must be 1.
- * - Only active connections are shown unless {idleConnections: true} is specified.
- * - A user without the inprog privilege can see their own ops, but no-one else's.
- * - A user with the inprog privilege can see all ops.
- * - Non-local readConcerns are rejected.
- * - Collation rules are respected.
- *
- * This test requires replica set configuration and user credentials to persist across a restart.
- * @tags: [requires_persistence]
- */
-(function() {
- "use strict";
-
- const key = "jstests/libs/key1";
-
- // Create a new replica set for testing. We set the internalQueryExecYieldIterations parameter
- // so that plan execution yields on every iteration. For some tests, we will temporarily set
- // yields to hang the mongod so we can capture particular operations in the currentOp output.
- const rst = new ReplSetTest({
- name: jsTestName(),
- nodes: 3,
- keyFile: key,
- nodeOptions: {setParameter: {internalQueryExecYieldIterations: 1}}
- });
-
- const nodes = rst.nodeList();
-
- rst.startSet();
- rst.initiate({
- _id: jsTestName(),
- members: [
- {_id: 0, host: nodes[0], priority: 1},
- {_id: 1, host: nodes[1], priority: 0},
- {_id: 2, host: nodes[2], arbiterOnly: true}
- ],
- });
-
- let primary = rst.getPrimary();
-
- let testDB = primary.getDB(jsTestName());
- let adminDB = primary.getDB("admin");
-
- // Create an admin user, one user with the inprog privilege, and one without.
- assert.commandWorked(adminDB.runCommand({createUser: "admin", pwd: "pwd", roles: ["root"]}));
- assert(adminDB.auth("admin", "pwd"));
-
- assert.commandWorked(adminDB.runCommand({
- createRole: "role_inprog",
- roles: [],
- privileges: [{resource: {cluster: true}, actions: ["inprog"]}]
- }));
-
- assert.commandWorked(adminDB.runCommand(
- {createUser: "user_inprog", pwd: "pwd", roles: ["role_inprog", "readAnyDatabase"]}));
-
- assert.commandWorked(
- adminDB.runCommand({createUser: "user_no_inprog", pwd: "pwd", roles: ["readAnyDatabase"]}));
-
- // Create some dummy test data.
- testDB.test.drop();
-
- for (let i = 0; i < 5; i++) {
- assert.writeOK(testDB.test.insert({_id: i, a: i}));
- }
-
- // Functions to support running an operation in a parallel shell for testing allUsers behaviour.
- function runInParallelShell({testfunc, username, password}) {
- TestData.aggCurOpTest = testfunc;
- TestData.aggCurOpUser = username;
- TestData.aggCurOpPwd = password;
-
- assert.commandWorked(
- adminDB.runCommand({configureFailPoint: "setYieldAllLocksHang", mode: "alwaysOn"}));
-
- testfunc = function() {
- db.getSiblingDB("admin").auth(TestData.aggCurOpUser, TestData.aggCurOpPwd);
- TestData.aggCurOpTest();
- db.getSiblingDB("admin").logout();
- };
-
- return startParallelShell(testfunc, primary.port);
- }
-
- function assertCurrentOpHasSingleMatchingEntry({currentOpAggFilter, curOpOpts}) {
- curOpOpts = (curOpOpts || {allUsers: true});
-
- let result = null;
-
- assert.soon(
- function() {
- result = adminDB.runCommand({
- aggregate: 1,
- pipeline: [{$currentOp: curOpOpts}, {$match: currentOpAggFilter}],
- cursor: {}
- });
-
- assert.commandWorked(result);
-
- return (result.cursor.firstBatch.length === 1);
- },
- function() {
- return "Failed to find operation in $currentOp output: " + tojson(result);
- });
- }
-
- function waitForParallelShell(awaitShell) {
- assert.commandWorked(
- adminDB.runCommand({configureFailPoint: "setYieldAllLocksHang", mode: "off"}));
-
- awaitShell();
- }
-
- /**
- * Restarts a replica set with additional parameters, and optionally re-authenticates.
- */
- function restartReplSet(replSet, newOpts, user, pwd) {
- const numNodes = replSet.nodeList().length;
-
- for (let n = 0; n < numNodes; n++) {
- replSet.restart(n, newOpts);
- }
-
- primary = replSet.getPrimary();
- replSet.awaitSecondaryNodes();
-
- testDB = primary.getDB(jsTestName());
- adminDB = primary.getDB("admin");
-
- if (user && pwd) {
- adminDB.auth(user, pwd);
- }
- }
-
- //
- // Authenticate as user_no_inprog.
- //
- assert(adminDB.logout());
- assert(adminDB.auth("user_no_inprog", "pwd"));
-
- // Test that $currentOp fails with {allUsers: true} for a user without the "inprog" privilege.
- assert.commandFailedWithCode(
- adminDB.runCommand({aggregate: 1, pipeline: [{$currentOp: {allUsers: true}}], cursor: {}}),
- ErrorCodes.Unauthorized);
-
- // Test that $currentOp succeeds with {allUsers: false} for a user without the "inprog"
- // privilege.
- assert.commandWorked(adminDB.runCommand(
- {aggregate: 1, pipeline: [{$currentOp: {allUsers: false}}], cursor: {}}));
-
- // Test that $currentOp fails when run as {aggregate: 1} on a database other than admin.
- assert.commandFailedWithCode(
- testDB.runCommand({aggregate: 1, pipeline: [{$currentOp: {}}], cursor: {}}),
- ErrorCodes.InvalidNamespace);
-
- // Test that $currentOp fails when run on admin without {aggregate: 1}.
- assert.commandFailedWithCode(
- adminDB.runCommand({aggregate: "collname", pipeline: [{$currentOp: {}}], cursor: {}}),
- ErrorCodes.InvalidNamespace);
-
- // Test that $currentOp accepts all numeric types.
- const ones = [1, 1.0, NumberInt(1), NumberLong(1), NumberDecimal(1)];
-
- for (let one of ones) {
- assert.commandWorked(
- adminDB.runCommand({aggregate: one, pipeline: [{$currentOp: {}}], cursor: {}}));
- }
-
- // Test that {aggregate: 1} fails when the first stage in the pipeline is not $currentOp.
- assert.commandFailedWithCode(
- adminDB.runCommand({aggregate: 1, pipeline: [{$match: {}}], cursor: {}}),
- ErrorCodes.InvalidNamespace);
-
- // Test that $currentOp fails when it is not the first stage in the pipeline. We use two
- // $currentOp stages since any other stage in the initial position will trip the {aggregate: 1}
- // namespace check.
- assert.commandFailedWithCode(
- adminDB.runCommand(
- {aggregate: 1, pipeline: [{$currentOp: {}}, {$currentOp: {}}], cursor: {}}),
- ErrorCodes.BadValue);
-
- // Test that $currentOp succeeds if local readConcern is specified.
- assert.commandWorked(adminDB.runCommand(
- {aggregate: 1, pipeline: [{$currentOp: {}}], readConcern: {level: "local"}, cursor: {}}));
-
- // Test that $currentOp fails if a non-local readConcern is specified.
- assert.commandFailedWithCode(adminDB.runCommand({
- aggregate: 1,
- pipeline: [{$currentOp: {}}],
- readConcern: {level: "linearizable"},
- cursor: {}
- }),
- ErrorCodes.InvalidOptions);
-
- // Test that a user without the inprog privilege cannot see another user's operations.
- // Temporarily log in as 'user_inprog' to validate that the op is present in $currentOp output.
- assert(adminDB.logout());
- assert(adminDB.auth("user_inprog", "pwd"));
-
- let awaitShell = runInParallelShell({
- testfunc: function() {
- assert.eq(db.getSiblingDB(jsTestName())
- .test.find({})
- .comment("agg_current_op_allusers_test")
- .itcount(),
- 5);
- },
- username: "user_inprog",
- password: "pwd"
- });
-
- assertCurrentOpHasSingleMatchingEntry({
- currentOpAggFilter: {"command.comment": "agg_current_op_allusers_test"},
- curOpOpts: {allUsers: true}
- });
-
- // Log back in as 'user_no_inprog' and validate that the user cannot see the op.
- assert(adminDB.logout());
- assert(adminDB.auth("user_no_inprog", "pwd"));
-
- assert.eq(adminDB
- .runCommand({
- aggregate: 1,
- pipeline: [
- {$currentOp: {allUsers: false}},
- {$match: {"command.comment": "agg_current_op_allusers_test"}}
- ],
- cursor: {}
- })
- .cursor.firstBatch.length,
- 0);
-
- waitForParallelShell(awaitShell);
-
- //
- // Authenticate as user_inprog.
- //
- assert(adminDB.logout());
- assert(adminDB.auth("user_inprog", "pwd"));
-
- // Test that $currentOp with {allUsers: true} succeeds for a user with the "inprog"
- // privilege.
- assert.commandWorked(
- adminDB.runCommand({aggregate: 1, pipeline: [{$currentOp: {allUsers: true}}], cursor: {}}));
-
- // Test that {idleConnections: false} returns only active connections.
- assert.eq(adminDB
- .runCommand({
- aggregate: 1,
- pipeline: [
- {$currentOp: {allUsers: true, idleConnections: false}},
- {$match: {"active": false}}
- ],
- cursor: {}
- })
- .cursor.firstBatch.length,
- 0);
-
- // Test that {idleConnections: true} returns inactive connections.
- const idleConn = new Mongo(primary.host);
-
- assert.gte(adminDB
- .runCommand({
- aggregate: 1,
- pipeline: [
- {$currentOp: {allUsers: true, idleConnections: true}},
- {$match: {active: false}}
- ],
- cursor: {}
- })
- .cursor.firstBatch.length,
- 1);
-
- // Test that a user with the inprog privilege can see another user's operations with
- // {allUsers: true}
- awaitShell = runInParallelShell({
- testfunc: function() {
- assert.eq(db.getSiblingDB(jsTestName())
- .test.find({})
- .comment("agg_current_op_allusers_test")
- .itcount(),
- 5);
- },
- username: "user_no_inprog",
- password: "pwd"
- });
-
- assertCurrentOpHasSingleMatchingEntry(
- {currentOpAggFilter: {"command.comment": "agg_current_op_allusers_test"}});
-
- waitForParallelShell(awaitShell);
-
- // Test that collation rules apply to matches on $currentOp output.
- assert.eq(
- adminDB
- .runCommand({
- aggregate: 1,
- pipeline:
- [{$currentOp: {}}, {$match: {"command.comment": "AGG_currént_op_COLLATION"}}],
- collation: {locale: "en_US", strength: 1}, // Case and diacritic insensitive.
- comment: "agg_current_op_collation",
- cursor: {}
- })
- .cursor.firstBatch.length,
- 1);
-
- // Test that $currentOp is explainable.
- const explainPlan = assert.commandWorked(adminDB.runCommand({
- aggregate: 1,
- pipeline:
- [{$currentOp: {idleConnections: true, allUsers: false}}, {$match: {desc: "test"}}],
- explain: true
- }));
-
- const expectedStages =
- [{$currentOp: {idleConnections: true, allUsers: false}}, {$match: {desc: "test"}}];
-
- assert.eq(explainPlan.stages, expectedStages);
-
- // Test that the allUsers parameter is ignored when authentication is disabled.
- restartReplSet(rst, {keyFile: null});
-
- // Ensure that there is at least one other connection present.
- const otherConn = new Mongo(primary.host);
-
- // Verify that $currentOp displays all operations when auth is disabled regardless of the
- // allUsers parameter, by checking that the output is the same in both cases. We project static
- // fields from each operation so that a thread which becomes active between the two aggregations
- // is still comparable across the output of both.
- let aggCmd = {
- aggregate: 1,
- pipeline: [
- {$currentOp: {allUsers: true, idleConnections: true}},
- {$project: {desc: 1, threadId: 1, connectionId: 1, appName: 1}},
- {$sort: {threadId: 1}}
- ],
- cursor: {}
- };
-
- const aggAllUsersTrue = assert.commandWorked(adminDB.runCommand(aggCmd));
- aggCmd.pipeline[0].$currentOp.allUsers = false;
- const aggAllUsersFalse = assert.commandWorked(adminDB.runCommand(aggCmd));
-
- assert.eq(aggAllUsersFalse.cursor.firstBatch, aggAllUsersTrue.cursor.firstBatch);
-})();
diff --git a/jstests/sharding/aggregation_currentop.js b/jstests/sharding/aggregation_currentop.js
new file mode 100644
index 00000000000..e848ce7beee
--- /dev/null
+++ b/jstests/sharding/aggregation_currentop.js
@@ -0,0 +1,439 @@
+/**
+ * Tests that the $currentOp aggregation stage behaves as expected. Specifically:
+ * - It must be the fist stage in the pipeline.
+ * - It can only be run on admin, and the "aggregate" field must be 1.
+ * - Only active connections are shown unless {idleConnections: true} is specified.
+ * - A user without the inprog privilege can see their own ops, but no-one else's.
+ * - A user with the inprog privilege can see all ops.
+ * - Non-local readConcerns are rejected.
+ * - Collation rules are respected.
+ *
+ * This test requires replica set configuration and user credentials to persist across a restart.
+ * @tags: [requires_persistence]
+ */
+(function() {
+ "use strict";
+
+ const key = "jstests/libs/key1";
+
+ // Create a new sharded cluster for testing. We set the internalQueryExecYieldIterations
+ // parameter so that plan execution yields on every iteration. For some tests, we will
+ // temporarily set yields to hang the mongod so we can capture particular operations in the
+ // currentOp output.
+ const st = new ShardingTest({
+ name: jsTestName(),
+ keyFile: key,
+ shards: 3,
+ rs: {
+ nodes: [
+ {rsConfig: {priority: 1}},
+ {rsConfig: {priority: 0}},
+ {rsConfig: {arbiterOnly: true}}
+ ],
+ setParameter: {internalQueryExecYieldIterations: 1}
+ }
+ });
+
+ // Assign various elements of the cluster. We will use shard rs0 to test replica-set level
+ // $currentOp behaviour.
+ let shardConn = st.rs0.getPrimary();
+ const mongosConn = st.s;
+ const shardRS = st.rs0;
+
+ const clusterTestDB = mongosConn.getDB(jsTestName());
+ const clusterAdminDB = mongosConn.getDB("admin");
+ let shardAdminDB = shardConn.getDB("admin");
+
+ function createUsers(conn) {
+ let adminDB = conn.getDB("admin");
+
+ // Create an admin user, one user with the inprog privilege, and one without.
+ assert.commandWorked(
+ adminDB.runCommand({createUser: "admin", pwd: "pwd", roles: ["root"]}));
+ assert(adminDB.auth("admin", "pwd"));
+
+ assert.commandWorked(adminDB.runCommand({
+ createRole: "role_inprog",
+ roles: [],
+ privileges: [{resource: {cluster: true}, actions: ["inprog"]}]
+ }));
+
+ assert.commandWorked(adminDB.runCommand(
+ {createUser: "user_inprog", pwd: "pwd", roles: ["role_inprog", "readAnyDatabase"]}));
+
+ assert.commandWorked(adminDB.runCommand(
+ {createUser: "user_no_inprog", pwd: "pwd", roles: ["readAnyDatabase"]}));
+ }
+
+ // Create necessary users at both cluster and shard-local level.
+ createUsers(shardConn);
+ createUsers(mongosConn);
+
+ // Create a test database and some dummy data on rs0.
+ assert(clusterAdminDB.auth("admin", "pwd"));
+
+ for (let i = 0; i < 5; i++) {
+ assert.writeOK(clusterTestDB.test.insert({_id: i, a: i}));
+ }
+
+ st.ensurePrimaryShard(clusterTestDB.getName(), shardRS.name);
+
+ // Run a command on the specified database and return a cursor over the result.
+ function cmdCursor(inputDB, cmd) {
+ return new DBCommandCursor(inputDB.getMongo(),
+ assert.commandWorked(inputDB.runCommand(cmd)));
+ }
+
+ // Restarts a replica set with additional parameters, and optionally re-authenticates.
+ function restartReplSet(replSet, newOpts, user, pwd) {
+ const numNodes = replSet.nodeList().length;
+
+ for (let n = 0; n < numNodes; n++) {
+ replSet.restart(n, newOpts);
+ }
+
+ shardConn = replSet.getPrimary();
+ replSet.awaitSecondaryNodes();
+
+ shardAdminDB = shardConn.getDB("admin");
+
+ if (user && pwd) {
+ shardAdminDB.auth(user, pwd);
+ }
+ }
+
+ // Functions to support running an operation in a parallel shell for testing allUsers behaviour.
+ function runInParallelShell({conn, testfunc, username, password}) {
+ TestData.aggCurOpTest = testfunc;
+ TestData.aggCurOpUser = username;
+ TestData.aggCurOpPwd = password;
+
+ assert.commandWorked(conn.getDB("admin").runCommand(
+ {configureFailPoint: "setYieldAllLocksHang", mode: "alwaysOn"}));
+
+ testfunc = function() {
+ db.getSiblingDB("admin").auth(TestData.aggCurOpUser, TestData.aggCurOpPwd);
+ TestData.aggCurOpTest();
+ db.getSiblingDB("admin").logout();
+ };
+
+ return startParallelShell(testfunc, conn.port);
+ }
+
+ function assertCurrentOpHasSingleMatchingEntry({conn, currentOpAggFilter, curOpOpts}) {
+ curOpOpts = (curOpOpts || {allUsers: true});
+
+ const connAdminDB = conn.getDB("admin");
+
+ assert.soon(
+ function() {
+ return cmdCursor(connAdminDB, {
+ aggregate: 1,
+ pipeline: [{$currentOp: curOpOpts}, {$match: currentOpAggFilter}],
+ cursor: {}
+ }).itcount() === 1;
+ },
+ function() {
+ const curOps = cmdCursor(
+ connAdminDB, {aggregate: 1, pipeline: [{$currentOp: curOpOpts}], cursor: {}});
+
+ return "Failed to find operation " + tojson(currentOpAggFilter) +
+ " in $currentOp output: " + tojson(curOps.toArray());
+ });
+ }
+
+ function waitForParallelShell(conn, awaitShell) {
+ assert.commandWorked(conn.getDB("admin").runCommand(
+ {configureFailPoint: "setYieldAllLocksHang", mode: "off"}));
+
+ awaitShell();
+ }
+
+ // Runs a suite of tests for behaviour that is common to both the replica set and cluster
+ // levels.
+ function runCommonTests(conn) {
+ const testDB = conn.getDB(jsTestName());
+ const adminDB = conn.getDB("admin");
+
+ const isMongos = (conn == mongosConn);
+
+ // Test that an unauthenticated connection cannot run $currentOp even with {allUsers:
+ // false}.
+ assert(adminDB.logout());
+
+ assert.commandFailedWithCode(
+ adminDB.runCommand(
+ {aggregate: 1, pipeline: [{$currentOp: {allUsers: false}}], cursor: {}}),
+ ErrorCodes.Unauthorized);
+
+ //
+ // Authenticate as user_no_inprog.
+ //
+ assert(adminDB.logout());
+ assert(adminDB.auth("user_no_inprog", "pwd"));
+
+ // Test that $currentOp fails with {allUsers: true} for a user without the "inprog"
+ // privilege.
+ assert.commandFailedWithCode(
+ adminDB.runCommand(
+ {aggregate: 1, pipeline: [{$currentOp: {allUsers: true}}], cursor: {}}),
+ ErrorCodes.Unauthorized);
+
+ //
+ // Authenticate as user_inprog.
+ //
+ assert(adminDB.logout());
+ assert(adminDB.auth("user_inprog", "pwd"));
+
+ // Test that $currentOp fails when run as {aggregate: 1} on a database other than admin.
+ assert.commandFailedWithCode(
+ testDB.runCommand({aggregate: 1, pipeline: [{$currentOp: {}}], cursor: {}}),
+ ErrorCodes.InvalidNamespace);
+
+ // Test that $currentOp fails when run on admin without {aggregate: 1}.
+ assert.commandFailedWithCode(
+ adminDB.runCommand({aggregate: "collname", pipeline: [{$currentOp: {}}], cursor: {}}),
+ ErrorCodes.InvalidNamespace);
+
+ // Test that $currentOp accepts all numeric types.
+ const ones = [1, 1.0, NumberInt(1), NumberLong(1), NumberDecimal(1)];
+
+ for (let one of ones) {
+ assert.commandWorked(
+ adminDB.runCommand({aggregate: one, pipeline: [{$currentOp: {}}], cursor: {}}));
+ }
+
+ // Test that {aggregate: 1} fails when the first stage in the pipeline is not $currentOp.
+ assert.commandFailedWithCode(
+ adminDB.runCommand({aggregate: 1, pipeline: [{$match: {}}], cursor: {}}),
+ ErrorCodes.InvalidNamespace);
+
+ // Test that $currentOp fails when it is not the first stage in the pipeline. We use two
+ // $currentOp stages since any other stage in the initial position will trip the {aggregate:
+ // 1} namespace check.
+ assert.commandFailedWithCode(
+ adminDB.runCommand(
+ {aggregate: 1, pipeline: [{$currentOp: {}}, {$currentOp: {}}], cursor: {}}),
+ ErrorCodes.BadValue);
+
+ // Test that $currentOp with {allUsers: true} succeeds for a user with the "inprog"
+ // privilege.
+ assert.commandWorked(adminDB.runCommand(
+ {aggregate: 1, pipeline: [{$currentOp: {allUsers: true}}], cursor: {}}));
+
+ // Test that $currentOp succeeds if local readConcern is specified.
+ assert.commandWorked(adminDB.runCommand({
+ aggregate: 1,
+ pipeline: [{$currentOp: {}}],
+ readConcern: {level: "local"},
+ cursor: {}
+ }));
+
+ // Test that $currentOp fails if a non-local readConcern is specified.
+ assert.commandFailedWithCode(adminDB.runCommand({
+ aggregate: 1,
+ pipeline: [{$currentOp: {}}],
+ readConcern: {level: "linearizable"},
+ cursor: {}
+ }),
+ ErrorCodes.InvalidOptions);
+
+ // Test that {idleConnections: false} returns only active connections.
+ const idleConn = new Mongo(conn.host);
+
+ assert.eq(cmdCursor(adminDB, {
+ aggregate: 1,
+ pipeline: [
+ {$currentOp: {allUsers: true, idleConnections: false}},
+ {$match: {"active": false}}
+ ],
+ cursor: {}
+ }).itcount(),
+ 0);
+
+ // Test that {idleConnections: true} returns inactive connections.
+ assert.gte(cmdCursor(adminDB, {
+ aggregate: 1,
+ pipeline: [
+ {$currentOp: {allUsers: true, idleConnections: true}},
+ {$match: {active: false}}
+ ],
+ cursor: {}
+ }).itcount(),
+ 1);
+
+ // Test that collation rules apply to matches on $currentOp output.
+ const matchField = (isMongos ? "originatingCommand.comment" : "command.comment");
+ const numExpectedMatches = (isMongos ? 3 : 1);
+
+ assert.eq(
+ cmdCursor(adminDB, {
+ aggregate: 1,
+ pipeline: [{$currentOp: {}}, {$match: {[matchField]: "AGG_currént_op_COLLATION"}}],
+ collation: {locale: "en_US", strength: 1}, // Case and diacritic insensitive.
+ comment: "agg_current_op_collation",
+ cursor: {}
+ }).itcount(),
+ numExpectedMatches);
+
+ // Test that $currentOp is explainable.
+ const explainPlan = assert.commandWorked(adminDB.runCommand({
+ aggregate: 1,
+ pipeline:
+ [{$currentOp: {idleConnections: true, allUsers: false}}, {$match: {desc: "test"}}],
+ explain: true
+ }));
+
+ const expectedStages =
+ [{$currentOp: {idleConnections: true, allUsers: false}}, {$match: {desc: "test"}}];
+
+ if (isMongos) {
+ assert.eq(explainPlan.splitPipeline.shardsPart, expectedStages);
+
+ for (let i = 0; i < 3; i++) {
+ let shardName = st["rs" + i].name;
+ assert.eq(explainPlan.shards[shardName].stages, expectedStages);
+ }
+ } else {
+ assert.eq(explainPlan.stages, expectedStages);
+ }
+ }
+
+ runCommonTests(shardConn);
+ runCommonTests(mongosConn);
+
+ //
+ // mongoS specific tests.
+ //
+
+ // Test that a user without the inprog privilege cannot run cluster $currentOp via mongoS even
+ // if allUsers is false.
+ assert(clusterAdminDB.logout());
+ assert(clusterAdminDB.auth("user_no_inprog", "pwd"));
+
+ assert.commandFailedWithCode(
+ clusterAdminDB.runCommand(
+ {aggregate: 1, pipeline: [{$currentOp: {allUsers: false}}], cursor: {}}),
+ ErrorCodes.Unauthorized);
+
+ // Test that a $currentOp pipeline returns results from all shards.
+ assert(clusterAdminDB.logout());
+ assert(clusterAdminDB.auth("user_inprog", "pwd"));
+
+ assert.eq(cmdCursor(clusterAdminDB, {
+ aggregate: 1,
+ pipeline: [
+ {$currentOp: {allUsers: true}},
+ {$project: {opid: {$split: ["$opid", ":"]}}},
+ {$group: {_id: {$arrayElemAt: ["$opid", 0]}}},
+ {$sort: {_id: 1}}
+ ],
+ cursor: {}
+ }).toArray(),
+ [
+ {_id: "aggregation_currentop-rs0"},
+ {_id: "aggregation_currentop-rs1"},
+ {_id: "aggregation_currentop-rs2"}
+ ]);
+
+ //
+ // ReplSet specific tests.
+ //
+
+ // Test that a user with the inprog privilege can see another user's operations with {allUsers:
+ // true} when run on a mongoD.
+ assert(shardAdminDB.logout());
+ assert(shardAdminDB.auth("user_inprog", "pwd"));
+
+ let awaitShell = runInParallelShell({
+ testfunc: function() {
+ assert.eq(db.getSiblingDB(jsTestName())
+ .test.find({})
+ .comment("agg_current_op_allusers_test")
+ .itcount(),
+ 5);
+ },
+ conn: shardConn,
+ username: "user_no_inprog",
+ password: "pwd"
+ });
+
+ assertCurrentOpHasSingleMatchingEntry(
+ {conn: shardConn, currentOpAggFilter: {"command.comment": "agg_current_op_allusers_test"}});
+
+ waitForParallelShell(shardConn, awaitShell);
+
+ // Test that $currentOp succeeds with {allUsers: false} for a user without the "inprog"
+ // privilege when run on a mongoD.
+ assert(shardAdminDB.logout());
+ assert(shardAdminDB.auth("user_no_inprog", "pwd"));
+
+ assert.commandWorked(shardAdminDB.runCommand(
+ {aggregate: 1, pipeline: [{$currentOp: {allUsers: false}}], cursor: {}}));
+
+ // Test that a user without the inprog privilege cannot see another user's operations.
+ // Temporarily log in as 'user_inprog' to validate that the op is present in $currentOp output.
+ assert(shardAdminDB.logout());
+ assert(shardAdminDB.auth("user_inprog", "pwd"));
+
+ awaitShell = runInParallelShell({
+ testfunc: function() {
+ assert.eq(db.getSiblingDB(jsTestName())
+ .test.find({})
+ .comment("agg_current_op_allusers_test")
+ .itcount(),
+ 5);
+ },
+ conn: shardConn,
+ username: "user_inprog",
+ password: "pwd"
+ });
+
+ assertCurrentOpHasSingleMatchingEntry({
+ currentOpAggFilter: {"command.comment": "agg_current_op_allusers_test"},
+ curOpOpts: {allUsers: true},
+ conn: shardConn
+ });
+
+ // Log back in as 'user_no_inprog' and validate that the user cannot see the op.
+ assert(shardAdminDB.logout());
+ assert(shardAdminDB.auth("user_no_inprog", "pwd"));
+
+ assert.eq(cmdCursor(shardAdminDB, {
+ aggregate: 1,
+ pipeline: [
+ {$currentOp: {allUsers: false}},
+ {$match: {"command.comment": "agg_current_op_allusers_test"}}
+ ],
+ cursor: {}
+ }).itcount(),
+ 0);
+
+ waitForParallelShell(shardConn, awaitShell);
+
+ // Test that the allUsers parameter is ignored when authentication is disabled.
+ restartReplSet(shardRS, {shardsvr: null, keyFile: null});
+
+ // Ensure that there is at least one other connection present.
+ const otherConn = new Mongo(shardConn.host);
+
+ // Verify that $currentOp displays all operations when auth is disabled regardless of the
+ // allUsers parameter, by checking that the output is the same in both cases. We project
+ // static fields from each operation so that a thread which becomes active between the two
+ // aggregations is still comparable across the output of both.
+ let aggCmd = {
+ aggregate: 1,
+ pipeline: [
+ {$currentOp: {allUsers: true, idleConnections: true}},
+ {$project: {desc: 1, threadId: 1, connectionId: 1, appName: 1}},
+ {$sort: {threadId: 1}}
+ ],
+ cursor: {}
+ };
+
+ const aggAllUsersTrue = cmdCursor(shardAdminDB, aggCmd).toArray();
+ aggCmd.pipeline[0].$currentOp.allUsers = false;
+ const aggAllUsersFalse = cmdCursor(shardAdminDB, aggCmd).toArray();
+
+ assert.eq(aggAllUsersFalse, aggAllUsersTrue);
+})();
diff --git a/jstests/sharding/read_pref_cmd.js b/jstests/sharding/read_pref_cmd.js
index 02ca8f25b63..a241ec58518 100644
--- a/jstests/sharding/read_pref_cmd.js
+++ b/jstests/sharding/read_pref_cmd.js
@@ -178,15 +178,12 @@ var testReadPreference = function(conn, hostList, isMongos, mode, tagSets, secEx
formatProfileQuery({aggregate: 'mrIn'}));
// Test $currentOp aggregation stage.
- // TODO SERVER-19318: Remove check once the $currentOp stage is supported on mongos.
- if (!isMongos) {
- let curOpComment = 'agg_currentOp_' + ObjectId();
+ let curOpComment = 'agg_currentOp_' + ObjectId();
- cmdTest({aggregate: 1, pipeline: [{$currentOp: {}}], comment: curOpComment, cursor: {}},
- true,
- formatProfileQuery({comment: curOpComment}),
- "admin");
- }
+ cmdTest({aggregate: 1, pipeline: [{$currentOp: {}}], comment: curOpComment, cursor: {}},
+ true,
+ formatProfileQuery({comment: curOpComment}),
+ "admin");
};
/**
diff --git a/src/mongo/db/auth/authorization_session.cpp b/src/mongo/db/auth/authorization_session.cpp
index cd2eac0d1cd..6b0410610d3 100644
--- a/src/mongo/db/auth/authorization_session.cpp
+++ b/src/mongo/db/auth/authorization_session.cpp
@@ -68,7 +68,8 @@ const std::string ADMIN_DBNAME = "admin";
Status checkAuthForCreateOrModifyView(AuthorizationSession* authzSession,
const NamespaceString& viewNs,
const NamespaceString& viewOnNs,
- const BSONArray& viewPipeline) {
+ const BSONArray& viewPipeline,
+ bool isMongos) {
// It's safe to allow a user to create or modify a view if they can't read it anyway.
if (!authzSession->isAuthorizedForActionsOnNamespace(viewNs, ActionType::find)) {
return Status::OK();
@@ -78,7 +79,7 @@ Status checkAuthForCreateOrModifyView(AuthorizationSession* authzSession,
// view definition with an invalid specification like {$lookup: "blah"}, the authorization check
// will succeed but the pipeline will fail to parse later in Command::run().
return authzSession->checkAuthForAggregate(
- viewOnNs, BSON("aggregate" << viewOnNs.coll() << "pipeline" << viewPipeline));
+ viewOnNs, BSON("aggregate" << viewOnNs.coll() << "pipeline" << viewPipeline), isMongos);
}
} // namespace
@@ -254,7 +255,8 @@ void AuthorizationSession::_addPrivilegesForStage(const std::string& db,
}
Status AuthorizationSession::checkAuthForAggregate(const NamespaceString& ns,
- const BSONObj& cmdObj) {
+ const BSONObj& cmdObj,
+ bool isMongos) {
std::string db(ns.db().toString());
uassert(
17138, mongoutils::str::stream() << "Invalid input namespace, " << ns.ns(), ns.isValid());
@@ -287,37 +289,48 @@ Status AuthorizationSession::checkAuthForAggregate(const NamespaceString& ns,
// We treat the first stage in the pipeline specially, as some aggregation stages that are
// valid initial sources have different auth requirements.
BSONObj firstPipelineStage = pipeline.firstElement().embeddedObject();
- if (str::equals("$indexStats", firstPipelineStage.firstElementFieldName())) {
+ BSONElement firstStageSpec = firstPipelineStage.firstElement();
+ if (str::equals("$indexStats", firstStageSpec.fieldName())) {
Privilege::addPrivilegeToPrivilegeVector(
&privileges,
Privilege(ResourcePattern::forExactNamespace(ns), ActionType::indexStats));
- } else if (str::equals("$collStats", firstPipelineStage.firstElementFieldName())) {
+ } else if (str::equals("$collStats", firstStageSpec.fieldName())) {
Privilege::addPrivilegeToPrivilegeVector(
&privileges,
Privilege(ResourcePattern::forExactNamespace(ns), ActionType::collStats));
- } else if (str::equals("$currentOp", firstPipelineStage.firstElementFieldName())) {
+ } else if (str::equals("$currentOp", firstStageSpec.fieldName())) {
// Need to check the value of allUsers; if true then inprog privilege is required.
// {$currentOp: {idleConnections: <boolean|false>, allUsers: <boolean|false>}}
- BSONElement spec = firstPipelineStage["$currentOp"];
-
- if (spec.type() != BSONType::Object) {
+ if (firstStageSpec.type() != BSONType::Object) {
return Status(
ErrorCodes::TypeMismatch,
str::stream()
<< "$currentOp options must be specified in an object, but found: "
- << typeName(spec.type()));
+ << typeName(firstStageSpec.type()));
}
- auto allUsersElt = spec["allUsers"];
-
- if (allUsersElt && allUsersElt.type() != BSONType::Bool) {
- return Status(ErrorCodes::TypeMismatch,
- str::stream() << "The 'allUsers' parameter of the $currentOp stage "
- "must be a boolean value, but found: "
- << typeName(allUsersElt.type()));
+ bool allUsers = false;
+
+ // Check the spec for all fields named 'allUsers'. If any of them are 'true', we require
+ // the 'inprog' privilege. This avoids the possibility that a spec with multiple
+ // allUsers fields might allow an unauthorized user to view all operations.
+ for (auto&& elem : firstStageSpec.embeddedObject()) {
+ if (elem.fieldNameStringData() == "allUsers"_sd) {
+ if (elem.type() != BSONType::Bool) {
+ return Status(ErrorCodes::TypeMismatch,
+ str::stream()
+ << "The 'allUsers' parameter of the $currentOp stage "
+ "must be a boolean value, but found: "
+ << typeName(elem.type()));
+ } else if (elem.Bool()) {
+ allUsers = true;
+ break;
+ }
+ }
}
- if (allUsersElt && allUsersElt.boolean()) {
+ // In a sharded cluster, we always need the inprog privilege to run $currentOp.
+ if (isMongos || allUsers) {
Privilege::addPrivilegeToPrivilegeVector(
&privileges,
Privilege(ResourcePattern::forClusterResource(), ActionType::inprog));
@@ -506,7 +519,9 @@ Status AuthorizationSession::checkAuthForKillCursors(const NamespaceString& ns,
return Status::OK();
}
-Status AuthorizationSession::checkAuthForCreate(const NamespaceString& ns, const BSONObj& cmdObj) {
+Status AuthorizationSession::checkAuthForCreate(const NamespaceString& ns,
+ const BSONObj& cmdObj,
+ bool isMongos) {
if (cmdObj["capped"].trueValue() &&
!isAuthorizedForActionsOnNamespace(ns, ActionType::convertToCapped)) {
return Status(ErrorCodes::Unauthorized, "unauthorized");
@@ -528,7 +543,7 @@ Status AuthorizationSession::checkAuthForCreate(const NamespaceString& ns, const
NamespaceString viewOnNs(ns.db(), cmdObj["viewOn"].checkAndGetStringData());
auto pipeline =
cmdObj.hasField("pipeline") ? BSONArray(cmdObj["pipeline"].Obj()) : BSONArray();
- return checkAuthForCreateOrModifyView(this, ns, viewOnNs, pipeline);
+ return checkAuthForCreateOrModifyView(this, ns, viewOnNs, pipeline, isMongos);
}
// To create a regular collection, ActionType::createCollection or ActionType::insert are
@@ -540,7 +555,9 @@ Status AuthorizationSession::checkAuthForCreate(const NamespaceString& ns, const
return Status(ErrorCodes::Unauthorized, "unauthorized");
}
-Status AuthorizationSession::checkAuthForCollMod(const NamespaceString& ns, const BSONObj& cmdObj) {
+Status AuthorizationSession::checkAuthForCollMod(const NamespaceString& ns,
+ const BSONObj& cmdObj,
+ bool isMongos) {
if (!isAuthorizedForActionsOnNamespace(ns, ActionType::collMod)) {
return Status(ErrorCodes::Unauthorized, "unauthorized");
}
@@ -559,7 +576,7 @@ Status AuthorizationSession::checkAuthForCollMod(const NamespaceString& ns, cons
if (hasViewOn) {
NamespaceString viewOnNs(ns.db(), cmdObj["viewOn"].checkAndGetStringData());
auto viewPipeline = BSONArray(cmdObj["pipeline"].Obj());
- return checkAuthForCreateOrModifyView(this, ns, viewOnNs, viewPipeline);
+ return checkAuthForCreateOrModifyView(this, ns, viewOnNs, viewPipeline, isMongos);
}
return Status::OK();
diff --git a/src/mongo/db/auth/authorization_session.h b/src/mongo/db/auth/authorization_session.h
index f93aec27d6a..bd300f2d1f7 100644
--- a/src/mongo/db/auth/authorization_session.h
+++ b/src/mongo/db/auth/authorization_session.h
@@ -180,16 +180,16 @@ public:
Status checkAuthForKillCursors(const NamespaceString& ns, long long cursorID);
// Checks if this connection has the privileges necessary to run the aggregation pipeline
- // specified in 'cmdObj' on the namespace 'ns'.
- Status checkAuthForAggregate(const NamespaceString& ns, const BSONObj& cmdObj);
+ // specified in 'cmdObj' on the namespace 'ns' either directly on mongoD or via mongoS.
+ Status checkAuthForAggregate(const NamespaceString& ns, const BSONObj& cmdObj, bool isMongos);
// Checks if this connection has the privileges necessary to create 'ns' with the options
- // supplied in 'cmdObj'.
- Status checkAuthForCreate(const NamespaceString& ns, const BSONObj& cmdObj);
+ // supplied in 'cmdObj' either directly on mongoD or via mongoS.
+ Status checkAuthForCreate(const NamespaceString& ns, const BSONObj& cmdObj, bool isMongos);
// Checks if this connection has the privileges necessary to modify 'ns' with the options
- // supplied in 'cmdObj'.
- Status checkAuthForCollMod(const NamespaceString& ns, const BSONObj& cmdObj);
+ // supplied in 'cmdObj' either directly on mongoD or via mongoS.
+ Status checkAuthForCollMod(const NamespaceString& ns, const BSONObj& cmdObj, bool isMongos);
// Checks if this connection has the privileges necessary to grant the given privilege
// to a role.
diff --git a/src/mongo/db/auth/authorization_session_test.cpp b/src/mongo/db/auth/authorization_session_test.cpp
index 806bc3795e8..71b7f00ee51 100644
--- a/src/mongo/db/auth/authorization_session_test.cpp
+++ b/src/mongo/db/auth/authorization_session_test.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/json.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context_noop.h"
+#include "mongo/s/is_mongos.h"
#include "mongo/stdx/memory.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/map_util.h"
@@ -541,39 +542,40 @@ TEST_F(AuthorizationSessionTest, UseOldUserInfoInFaceOfConnectivityProblems) {
TEST_F(AuthorizationSessionTest, CheckAuthForAggregateFailsIfPipelineIsNotAnArray) {
BSONObj cmdObjIntPipeline = BSON("aggregate" << testFooNss.coll() << "pipeline" << 7);
ASSERT_EQ(ErrorCodes::TypeMismatch,
- authzSession->checkAuthForAggregate(testFooNss, cmdObjIntPipeline));
+ authzSession->checkAuthForAggregate(testFooNss, cmdObjIntPipeline, false));
BSONObj cmdObjObjPipeline = BSON("aggregate" << testFooNss.coll() << "pipeline" << BSONObj());
ASSERT_EQ(ErrorCodes::TypeMismatch,
- authzSession->checkAuthForAggregate(testFooNss, cmdObjObjPipeline));
+ authzSession->checkAuthForAggregate(testFooNss, cmdObjObjPipeline, false));
BSONObj cmdObjNoPipeline = BSON("aggregate" << testFooNss.coll());
ASSERT_EQ(ErrorCodes::TypeMismatch,
- authzSession->checkAuthForAggregate(testFooNss, cmdObjNoPipeline));
+ authzSession->checkAuthForAggregate(testFooNss, cmdObjNoPipeline, false));
}
TEST_F(AuthorizationSessionTest, CheckAuthForAggregateFailsIfPipelineFirstStageIsNotAnObject) {
BSONObj cmdObjFirstStageInt =
BSON("aggregate" << testFooNss.coll() << "pipeline" << BSON_ARRAY(7));
ASSERT_EQ(ErrorCodes::TypeMismatch,
- authzSession->checkAuthForAggregate(testFooNss, cmdObjFirstStageInt));
+ authzSession->checkAuthForAggregate(testFooNss, cmdObjFirstStageInt, false));
BSONObj cmdObjFirstStageArray =
BSON("aggregate" << testFooNss.coll() << "pipeline" << BSON_ARRAY(BSONArray()));
ASSERT_EQ(ErrorCodes::TypeMismatch,
- authzSession->checkAuthForAggregate(testFooNss, cmdObjFirstStageArray));
+ authzSession->checkAuthForAggregate(testFooNss, cmdObjFirstStageArray, false));
}
TEST_F(AuthorizationSessionTest, CannotAggregateEmptyPipelineWithoutFindAction) {
BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << BSONArray());
- ASSERT_EQ(ErrorCodes::Unauthorized, authzSession->checkAuthForAggregate(testFooNss, cmdObj));
+ ASSERT_EQ(ErrorCodes::Unauthorized,
+ authzSession->checkAuthForAggregate(testFooNss, cmdObj, false));
}
TEST_F(AuthorizationSessionTest, CanAggregateEmptyPipelineWithFindAction) {
authzSession->assumePrivilegesForDB(Privilege(testFooCollResource, {ActionType::find}));
BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << BSONArray());
- ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj));
+ ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj, false));
}
TEST_F(AuthorizationSessionTest, CannotAggregateWithoutFindActionIfFirstStageNotIndexOrCollStats) {
@@ -583,7 +585,8 @@ TEST_F(AuthorizationSessionTest, CannotAggregateWithoutFindActionIfFirstStageNot
BSONArray pipeline = BSON_ARRAY(BSON("$limit" << 1) << BSON("$collStats" << BSONObj())
<< BSON("$indexStats" << BSONObj()));
BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline);
- ASSERT_EQ(ErrorCodes::Unauthorized, authzSession->checkAuthForAggregate(testFooNss, cmdObj));
+ ASSERT_EQ(ErrorCodes::Unauthorized,
+ authzSession->checkAuthForAggregate(testFooNss, cmdObj, false));
}
TEST_F(AuthorizationSessionTest, CanAggregateWithFindActionIfFirstStageNotIndexOrCollStats) {
@@ -592,7 +595,7 @@ TEST_F(AuthorizationSessionTest, CanAggregateWithFindActionIfFirstStageNotIndexO
BSONArray pipeline = BSON_ARRAY(BSON("$limit" << 1) << BSON("$collStats" << BSONObj())
<< BSON("$indexStats" << BSONObj()));
BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline);
- ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj));
+ ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj, false));
}
TEST_F(AuthorizationSessionTest, CannotAggregateCollStatsWithoutCollStatsAction) {
@@ -600,7 +603,8 @@ TEST_F(AuthorizationSessionTest, CannotAggregateCollStatsWithoutCollStatsAction)
BSONArray pipeline = BSON_ARRAY(BSON("$collStats" << BSONObj()));
BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline);
- ASSERT_EQ(ErrorCodes::Unauthorized, authzSession->checkAuthForAggregate(testFooNss, cmdObj));
+ ASSERT_EQ(ErrorCodes::Unauthorized,
+ authzSession->checkAuthForAggregate(testFooNss, cmdObj, false));
}
TEST_F(AuthorizationSessionTest, CanAggregateCollStatsWithCollStatsAction) {
@@ -608,7 +612,7 @@ TEST_F(AuthorizationSessionTest, CanAggregateCollStatsWithCollStatsAction) {
BSONArray pipeline = BSON_ARRAY(BSON("$collStats" << BSONObj()));
BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline);
- ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj));
+ ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj, false));
}
TEST_F(AuthorizationSessionTest, CannotAggregateIndexStatsWithoutIndexStatsAction) {
@@ -616,7 +620,8 @@ TEST_F(AuthorizationSessionTest, CannotAggregateIndexStatsWithoutIndexStatsActio
BSONArray pipeline = BSON_ARRAY(BSON("$indexStats" << BSONObj()));
BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline);
- ASSERT_EQ(ErrorCodes::Unauthorized, authzSession->checkAuthForAggregate(testFooNss, cmdObj));
+ ASSERT_EQ(ErrorCodes::Unauthorized,
+ authzSession->checkAuthForAggregate(testFooNss, cmdObj, false));
}
TEST_F(AuthorizationSessionTest, CanAggregateIndexStatsWithIndexStatsAction) {
@@ -624,38 +629,96 @@ TEST_F(AuthorizationSessionTest, CanAggregateIndexStatsWithIndexStatsAction) {
BSONArray pipeline = BSON_ARRAY(BSON("$indexStats" << BSONObj()));
BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline);
- ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj));
+ ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj, false));
}
-TEST_F(AuthorizationSessionTest, CanAggregateCurrentOpAllUsersFalseWithoutInprogAction) {
+TEST_F(AuthorizationSessionTest, CanAggregateCurrentOpAllUsersFalseWithoutInprogActionOnMongoD) {
authzSession->assumePrivilegesForDB(Privilege(testFooCollResource, {ActionType::find}));
BSONArray pipeline = BSON_ARRAY(BSON("$currentOp" << BSON("allUsers" << false)));
BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline);
- ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj));
+ ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj, false));
}
-TEST_F(AuthorizationSessionTest, CannotAggregateCurrentOpAllUsersFalseIfNotAuthenticated) {
+TEST_F(AuthorizationSessionTest, CannotAggregateCurrentOpAllUsersFalseWithoutInprogActionOnMongoS) {
+ authzSession->assumePrivilegesForDB(Privilege(testFooCollResource, {ActionType::find}));
+
+ BSONArray pipeline = BSON_ARRAY(BSON("$currentOp" << BSON("allUsers" << false)));
+ BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline);
+ ASSERT_EQ(ErrorCodes::Unauthorized,
+ authzSession->checkAuthForAggregate(testFooNss, cmdObj, true));
+}
+
+TEST_F(AuthorizationSessionTest, CannotAggregateCurrentOpAllUsersFalseIfNotAuthenticatedOnMongoD) {
BSONArray pipeline = BSON_ARRAY(BSON("$currentOp" << BSON("allUsers" << false)));
BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline);
- ASSERT_EQ(ErrorCodes::Unauthorized, authzSession->checkAuthForAggregate(testFooNss, cmdObj));
+
+ ASSERT_EQ(ErrorCodes::Unauthorized,
+ authzSession->checkAuthForAggregate(testFooNss, cmdObj, false));
}
-TEST_F(AuthorizationSessionTest, CannotAggregateCurrentOpAllUsersTrueWithoutInprogAction) {
+TEST_F(AuthorizationSessionTest, CannotAggregateCurrentOpAllUsersFalseIfNotAuthenticatedOnMongoS) {
+ BSONArray pipeline = BSON_ARRAY(BSON("$currentOp" << BSON("allUsers" << false)));
+ BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline);
+
+ ASSERT_EQ(ErrorCodes::Unauthorized,
+ authzSession->checkAuthForAggregate(testFooNss, cmdObj, true));
+}
+
+TEST_F(AuthorizationSessionTest, CannotAggregateCurrentOpAllUsersTrueWithoutInprogActionOnMongoD) {
authzSession->assumePrivilegesForDB(Privilege(testFooCollResource, {ActionType::find}));
BSONArray pipeline = BSON_ARRAY(BSON("$currentOp" << BSON("allUsers" << true)));
BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline);
- ASSERT_EQ(ErrorCodes::Unauthorized, authzSession->checkAuthForAggregate(testFooNss, cmdObj));
+ ASSERT_EQ(ErrorCodes::Unauthorized,
+ authzSession->checkAuthForAggregate(testFooNss, cmdObj, false));
+}
+
+TEST_F(AuthorizationSessionTest, CannotAggregateCurrentOpAllUsersTrueWithoutInprogActionOnMongoS) {
+ authzSession->assumePrivilegesForDB(Privilege(testFooCollResource, {ActionType::find}));
+
+ BSONArray pipeline = BSON_ARRAY(BSON("$currentOp" << BSON("allUsers" << true)));
+ BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline);
+ ASSERT_EQ(ErrorCodes::Unauthorized,
+ authzSession->checkAuthForAggregate(testFooNss, cmdObj, true));
+}
+
+TEST_F(AuthorizationSessionTest, CanAggregateCurrentOpAllUsersTrueWithInprogActionOnMongoD) {
+ authzSession->assumePrivilegesForDB(
+ Privilege(ResourcePattern::forClusterResource(), {ActionType::inprog}));
+
+ BSONArray pipeline = BSON_ARRAY(BSON("$currentOp" << BSON("allUsers" << true)));
+ BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline);
+ ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj, false));
}
-TEST_F(AuthorizationSessionTest, CanAggregateCurrentOpAllUsersTrueWithInprogAction) {
+TEST_F(AuthorizationSessionTest, CanAggregateCurrentOpAllUsersTrueWithInprogActionOnMongoS) {
authzSession->assumePrivilegesForDB(
Privilege(ResourcePattern::forClusterResource(), {ActionType::inprog}));
BSONArray pipeline = BSON_ARRAY(BSON("$currentOp" << BSON("allUsers" << true)));
BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline);
- ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj));
+ ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj, true));
+}
+
+TEST_F(AuthorizationSessionTest, CannotSpoofAllUsersTrueWithoutInprogActionOnMongoD) {
+ authzSession->assumePrivilegesForDB(Privilege(testFooCollResource, {ActionType::find}));
+
+ BSONArray pipeline =
+ BSON_ARRAY(BSON("$currentOp" << BSON("allUsers" << false << "allUsers" << true)));
+ BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline);
+ ASSERT_EQ(ErrorCodes::Unauthorized,
+ authzSession->checkAuthForAggregate(testFooNss, cmdObj, false));
+}
+
+TEST_F(AuthorizationSessionTest, CannotSpoofAllUsersTrueWithoutInprogActionOnMongoS) {
+ authzSession->assumePrivilegesForDB(Privilege(testFooCollResource, {ActionType::find}));
+
+ BSONArray pipeline =
+ BSON_ARRAY(BSON("$currentOp" << BSON("allUsers" << false << "allUsers" << true)));
+ BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline);
+ ASSERT_EQ(ErrorCodes::Unauthorized,
+ authzSession->checkAuthForAggregate(testFooNss, cmdObj, true));
}
TEST_F(AuthorizationSessionTest, AddPrivilegesForStageFailsIfOutNamespaceIsNotValid) {
@@ -663,7 +726,7 @@ TEST_F(AuthorizationSessionTest, AddPrivilegesForStageFailsIfOutNamespaceIsNotVa
<< ""));
BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline);
ASSERT_THROWS_CODE(
- authzSession->checkAuthForAggregate(testFooNss, cmdObj), UserException, 17139);
+ authzSession->checkAuthForAggregate(testFooNss, cmdObj, false), UserException, 17139);
}
TEST_F(AuthorizationSessionTest, CannotAggregateOutWithoutInsertAndRemoveOnTargetNamespace) {
@@ -672,17 +735,20 @@ TEST_F(AuthorizationSessionTest, CannotAggregateOutWithoutInsertAndRemoveOnTarge
BSONArray pipeline = BSON_ARRAY(BSON("$out" << testBarNss.coll()));
BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline);
- ASSERT_EQ(ErrorCodes::Unauthorized, authzSession->checkAuthForAggregate(testFooNss, cmdObj));
+ ASSERT_EQ(ErrorCodes::Unauthorized,
+ authzSession->checkAuthForAggregate(testFooNss, cmdObj, false));
// We have insert but not remove on the $out namespace.
authzSession->assumePrivilegesForDB({Privilege(testFooCollResource, {ActionType::find}),
Privilege(testBarCollResource, {ActionType::insert})});
- ASSERT_EQ(ErrorCodes::Unauthorized, authzSession->checkAuthForAggregate(testFooNss, cmdObj));
+ ASSERT_EQ(ErrorCodes::Unauthorized,
+ authzSession->checkAuthForAggregate(testFooNss, cmdObj, false));
// We have remove but not insert on the $out namespace.
authzSession->assumePrivilegesForDB({Privilege(testFooCollResource, {ActionType::find}),
Privilege(testBarCollResource, {ActionType::remove})});
- ASSERT_EQ(ErrorCodes::Unauthorized, authzSession->checkAuthForAggregate(testFooNss, cmdObj));
+ ASSERT_EQ(ErrorCodes::Unauthorized,
+ authzSession->checkAuthForAggregate(testFooNss, cmdObj, false));
}
TEST_F(AuthorizationSessionTest, CanAggregateOutWithInsertAndRemoveOnTargetNamespace) {
@@ -692,12 +758,13 @@ TEST_F(AuthorizationSessionTest, CanAggregateOutWithInsertAndRemoveOnTargetNames
BSONArray pipeline = BSON_ARRAY(BSON("$out" << testBarNss.coll()));
BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline);
- ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj));
+ ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj, false));
BSONObj cmdObjNoBypassDocumentValidation = BSON(
"aggregate" << testFooNss.coll() << "pipeline" << pipeline << "bypassDocumentValidation"
<< false);
- ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObjNoBypassDocumentValidation));
+ ASSERT_OK(
+ authzSession->checkAuthForAggregate(testFooNss, cmdObjNoBypassDocumentValidation, false));
}
TEST_F(AuthorizationSessionTest,
@@ -710,7 +777,8 @@ TEST_F(AuthorizationSessionTest,
BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline
<< "bypassDocumentValidation"
<< true);
- ASSERT_EQ(ErrorCodes::Unauthorized, authzSession->checkAuthForAggregate(testFooNss, cmdObj));
+ ASSERT_EQ(ErrorCodes::Unauthorized,
+ authzSession->checkAuthForAggregate(testFooNss, cmdObj, false));
}
TEST_F(AuthorizationSessionTest,
@@ -725,7 +793,7 @@ TEST_F(AuthorizationSessionTest,
BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline
<< "bypassDocumentValidation"
<< true);
- ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj));
+ ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj, false));
}
TEST_F(AuthorizationSessionTest, CannotAggregateLookupWithoutFindOnJoinedNamespace) {
@@ -733,7 +801,8 @@ TEST_F(AuthorizationSessionTest, CannotAggregateLookupWithoutFindOnJoinedNamespa
BSONArray pipeline = BSON_ARRAY(BSON("$lookup" << BSON("from" << testBarNss.coll())));
BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline);
- ASSERT_EQ(ErrorCodes::Unauthorized, authzSession->checkAuthForAggregate(testFooNss, cmdObj));
+ ASSERT_EQ(ErrorCodes::Unauthorized,
+ authzSession->checkAuthForAggregate(testFooNss, cmdObj, false));
}
TEST_F(AuthorizationSessionTest, CanAggregateLookupWithFindOnJoinedNamespace) {
@@ -742,7 +811,7 @@ TEST_F(AuthorizationSessionTest, CanAggregateLookupWithFindOnJoinedNamespace) {
BSONArray pipeline = BSON_ARRAY(BSON("$lookup" << BSON("from" << testBarNss.coll())));
BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline);
- ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj));
+ ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj, false));
}
TEST_F(AuthorizationSessionTest, CannotAggregateGraphLookupWithoutFindOnJoinedNamespace) {
@@ -750,7 +819,8 @@ TEST_F(AuthorizationSessionTest, CannotAggregateGraphLookupWithoutFindOnJoinedNa
BSONArray pipeline = BSON_ARRAY(BSON("$graphLookup" << BSON("from" << testBarNss.coll())));
BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline);
- ASSERT_EQ(ErrorCodes::Unauthorized, authzSession->checkAuthForAggregate(testFooNss, cmdObj));
+ ASSERT_EQ(ErrorCodes::Unauthorized,
+ authzSession->checkAuthForAggregate(testFooNss, cmdObj, false));
}
TEST_F(AuthorizationSessionTest, CanAggregateGraphLookupWithFindOnJoinedNamespace) {
@@ -759,7 +829,7 @@ TEST_F(AuthorizationSessionTest, CanAggregateGraphLookupWithFindOnJoinedNamespac
BSONArray pipeline = BSON_ARRAY(BSON("$graphLookup" << BSON("from" << testBarNss.coll())));
BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline);
- ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj));
+ ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj, false));
}
TEST_F(AuthorizationSessionTest,
@@ -771,17 +841,20 @@ TEST_F(AuthorizationSessionTest,
BSON_ARRAY(fromjson("{$facet: {lookup: [{$lookup: {from: 'bar'}}], graphLookup: "
"[{$graphLookup: {from: 'qux'}}]}}"));
BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline);
- ASSERT_EQ(ErrorCodes::Unauthorized, authzSession->checkAuthForAggregate(testFooNss, cmdObj));
+ ASSERT_EQ(ErrorCodes::Unauthorized,
+ authzSession->checkAuthForAggregate(testFooNss, cmdObj, false));
// We have find on the $lookup namespace but not on the $graphLookup namespace.
authzSession->assumePrivilegesForDB({Privilege(testFooCollResource, {ActionType::find}),
Privilege(testBarCollResource, {ActionType::find})});
- ASSERT_EQ(ErrorCodes::Unauthorized, authzSession->checkAuthForAggregate(testFooNss, cmdObj));
+ ASSERT_EQ(ErrorCodes::Unauthorized,
+ authzSession->checkAuthForAggregate(testFooNss, cmdObj, false));
// We have find on the $graphLookup namespace but not on the $lookup namespace.
authzSession->assumePrivilegesForDB({Privilege(testFooCollResource, {ActionType::find}),
Privilege(testQuxCollResource, {ActionType::find})});
- ASSERT_EQ(ErrorCodes::Unauthorized, authzSession->checkAuthForAggregate(testFooNss, cmdObj));
+ ASSERT_EQ(ErrorCodes::Unauthorized,
+ authzSession->checkAuthForAggregate(testFooNss, cmdObj, false));
}
TEST_F(AuthorizationSessionTest,
@@ -794,7 +867,7 @@ TEST_F(AuthorizationSessionTest,
BSON_ARRAY(fromjson("{$facet: {lookup: [{$lookup: {from: 'bar'}}], graphLookup: "
"[{$graphLookup: {from: 'qux'}}]}}"));
BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline);
- ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj));
+ ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj, false));
}
TEST_F(AuthorizationSessionTest, UnauthorizedSessionIsCoauthorizedWithEmptyUserSet) {
diff --git a/src/mongo/db/commands/dbcommands.cpp b/src/mongo/db/commands/dbcommands.cpp
index bb59ebe0ba3..a3b83b60383 100644
--- a/src/mongo/db/commands/dbcommands.cpp
+++ b/src/mongo/db/commands/dbcommands.cpp
@@ -528,7 +528,7 @@ public:
const std::string& dbname,
const BSONObj& cmdObj) {
const NamespaceString nss(parseNs(dbname, cmdObj));
- return AuthorizationSession::get(client)->checkAuthForCreate(nss, cmdObj);
+ return AuthorizationSession::get(client)->checkAuthForCreate(nss, cmdObj, false);
}
virtual bool run(OperationContext* opCtx,
@@ -1026,7 +1026,7 @@ public:
const std::string& dbname,
const BSONObj& cmdObj) {
const NamespaceString nss(parseNs(dbname, cmdObj));
- return AuthorizationSession::get(client)->checkAuthForCollMod(nss, cmdObj);
+ return AuthorizationSession::get(client)->checkAuthForCollMod(nss, cmdObj, false);
}
bool run(OperationContext* opCtx,
diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp
index 3a7f1a452e1..031f2560284 100644
--- a/src/mongo/db/commands/pipeline_command.cpp
+++ b/src/mongo/db/commands/pipeline_command.cpp
@@ -77,7 +77,7 @@ public:
const std::string& dbname,
const BSONObj& cmdObj) override {
const NamespaceString nss(AggregationRequest::parseNs(dbname, cmdObj));
- return AuthorizationSession::get(client)->checkAuthForAggregate(nss, cmdObj);
+ return AuthorizationSession::get(client)->checkAuthForAggregate(nss, cmdObj, false);
}
bool run(OperationContext* opCtx,
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp
index d9bd51dcd53..72a860f1517 100644
--- a/src/mongo/s/commands/cluster_aggregate.cpp
+++ b/src/mongo/s/commands/cluster_aggregate.cpp
@@ -99,11 +99,19 @@ establishCursorsRetryOnStaleVersion(OperationContext* opCtx,
auto routingInfo =
uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss));
- // Use the routing table to decide which shards to target, and build versioned requests for
- // them.
std::vector<std::pair<ShardId, BSONObj>> requests;
- if (routingInfo.cm()) {
- // The collection is sharded. Target based on the query and collation.
+
+ if (nss.isCollectionlessAggregateNS()) {
+ // The pipeline begins with a stage which produces its own input and does not use an
+ // underlying collection. It should be run unversioned on all shards.
+ std::vector<ShardId> shardIds;
+ Grid::get(opCtx)->shardRegistry()->getAllShardIds(&shardIds);
+ for (auto& shardId : shardIds) {
+ requests.emplace_back(std::move(shardId), cmdObj);
+ }
+ } else if (routingInfo.cm()) {
+ // The collection is sharded. Use the routing table to decide which shards to target
+ // based on the query and collation, and build versioned requests for them.
std::set<ShardId> shardIds;
routingInfo.cm()->getShardIdsForQuery(opCtx, query, collation, &shardIds);
for (auto& shardId : shardIds) {
@@ -186,7 +194,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
resolvedNamespaces.try_emplace(nss.coll(), nss, std::vector<BSONObj>{});
}
- if (!executionNsRoutingInfo.cm()) {
+ if (!executionNsRoutingInfo.cm() && !namespaces.executionNss.isCollectionlessAggregateNS()) {
return aggPassthrough(
opCtx, namespaces, executionNsRoutingInfo.primary()->getId(), request, cmdObj, result);
}
@@ -196,7 +204,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
if (!request.getCollation().isEmpty()) {
collation = uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext())
->makeFromBSON(request.getCollation()));
- } else if (chunkMgr->getDefaultCollator()) {
+ } else if (chunkMgr && chunkMgr->getDefaultCollator()) {
collation = chunkMgr->getDefaultCollator()->clone();
}
@@ -215,7 +223,9 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
// If the first $match stage is an exact match on the shard key (with a simple collation or no
// string matching), we only have to send it to one shard, so send the command to that shard.
- const bool singleShard = [&]() {
+ const bool singleShard = !namespaces.executionNss.isCollectionlessAggregateNS() && [&]() {
+ invariant(chunkMgr);
+
BSONObj firstMatchQuery = pipeline.getValue()->getInitialQuery();
BSONObj shardKeyMatches = uassertStatusOK(
chunkMgr->getShardKeyPattern().extractShardKeyFromQuery(opCtx, firstMatchQuery));
@@ -271,12 +281,16 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
if (mergeCtx->explain) {
auto shardResults =
- uassertStatusOK(scatterGatherForNamespace(opCtx,
- namespaces.executionNss,
- targetedCommand,
- ReadPreferenceSetting::get(opCtx),
- shardQuery,
- request.getCollation()));
+ uassertStatusOK(scatterGather(opCtx,
+ namespaces.executionNss.db().toString(),
+ namespaces.executionNss,
+ targetedCommand,
+ ReadPreferenceSetting::get(opCtx),
+ namespaces.executionNss.isCollectionlessAggregateNS()
+ ? ShardTargetingPolicy::BroadcastToAllShards
+ : ShardTargetingPolicy::UseRoutingTable,
+ shardQuery,
+ request.getCollation()));
// This must be checked before we start modifying result.
uassertAllShardsSupportExplain(shardResults);
diff --git a/src/mongo/s/commands/cluster_commands_helpers.cpp b/src/mongo/s/commands/cluster_commands_helpers.cpp
index 5ccb9110708..b7311ac4dbe 100644
--- a/src/mongo/s/commands/cluster_commands_helpers.cpp
+++ b/src/mongo/s/commands/cluster_commands_helpers.cpp
@@ -223,52 +223,68 @@ BSONObj appendShardVersion(const BSONObj& cmdObj, ChunkVersion version) {
StatusWith<std::vector<AsyncRequestsSender::Response>> scatterGather(
OperationContext* opCtx,
const std::string& dbName,
- const BSONObj& cmdObj,
- const ReadPreferenceSetting& readPref) {
- auto requests = buildRequestsForAllShards(opCtx, cmdObj);
- return gatherResponses(opCtx, dbName, cmdObj, readPref, requests, nullptr /* viewDefinition */);
-}
-
-StatusWith<std::vector<AsyncRequestsSender::Response>> scatterGatherForNamespace(
- OperationContext* opCtx,
- const NamespaceString& nss,
+ const boost::optional<NamespaceString> nss,
const BSONObj& cmdObj,
const ReadPreferenceSetting& readPref,
+ const ShardTargetingPolicy targetPolicy,
const boost::optional<BSONObj> query,
const boost::optional<BSONObj> collation,
const bool appendShardVersion,
BSONObj* viewDefinition) {
- int numAttempts = 0;
- StatusWith<std::vector<AsyncRequestsSender::Response>> swResponses(
- (std::vector<AsyncRequestsSender::Response>()));
- do {
- // Get the routing table cache.
- auto swRoutingInfo = Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss);
- if (!swRoutingInfo.isOK()) {
- return swRoutingInfo.getStatus();
+
+ // If a NamespaceString is specified, it must match the dbName.
+ invariant(!nss || (nss.get().db() == dbName));
+
+ switch (targetPolicy) {
+ case ShardTargetingPolicy::BroadcastToAllShards: {
+ // Send unversioned commands to all shards.
+ auto requests = buildRequestsForAllShards(opCtx, cmdObj);
+ return gatherResponses(opCtx, dbName, cmdObj, readPref, requests, viewDefinition);
}
- auto routingInfo = swRoutingInfo.getValue();
-
- // Use the routing table cache to decide which shards to target, and build the requests to
- // send to them.
- auto requests = buildRequestsForShardsForNamespace(
- opCtx, routingInfo, cmdObj, query, collation, appendShardVersion);
-
- // Retrieve the responses from the shards.
- swResponses =
- gatherResponses(opCtx, nss.db().toString(), cmdObj, readPref, requests, viewDefinition);
- ++numAttempts;
-
- // If any shard returned a stale shardVersion error, invalidate the routing table cache.
- // This will cause the cache to be refreshed the next time it is accessed.
- if (ErrorCodes::isStaleShardingError(swResponses.getStatus().code())) {
- Grid::get(opCtx)->catalogCache()->onStaleConfigError(std::move(routingInfo));
- LOG(1) << "got stale shardVersion error " << swResponses.getStatus()
- << " while dispatching " << redact(cmdObj) << " after " << numAttempts
- << " dispatch attempts";
+
+ case ShardTargetingPolicy::UseRoutingTable: {
+ // We must have a valid NamespaceString.
+ invariant(nss && nss.get().isValid());
+
+ int numAttempts = 0;
+ StatusWith<std::vector<AsyncRequestsSender::Response>> swResponses(
+ (std::vector<AsyncRequestsSender::Response>()));
+
+ do {
+ // Get the routing table cache.
+ auto swRoutingInfo =
+ Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, *nss);
+ if (!swRoutingInfo.isOK()) {
+ return swRoutingInfo.getStatus();
+ }
+ auto routingInfo = swRoutingInfo.getValue();
+
+ // Use the routing table cache to decide which shards to target, and build the
+ // requests to send to them.
+ auto requests = buildRequestsForShardsForNamespace(
+ opCtx, routingInfo, cmdObj, query, collation, appendShardVersion);
+
+ // Retrieve the responses from the shards.
+ swResponses =
+ gatherResponses(opCtx, dbName, cmdObj, readPref, requests, viewDefinition);
+ ++numAttempts;
+
+ // If any shard returned a stale shardVersion error, invalidate the routing table
+ // cache. This will cause the cache to be refreshed the next time it is accessed.
+ if (ErrorCodes::isStaleShardingError(swResponses.getStatus().code())) {
+ Grid::get(opCtx)->catalogCache()->onStaleConfigError(std::move(routingInfo));
+ LOG(1) << "got stale shardVersion error " << swResponses.getStatus()
+ << " while dispatching " << redact(cmdObj) << " after " << numAttempts
+ << " dispatch attempts";
+ }
+ } while (numAttempts < kMaxNumStaleVersionRetries && !swResponses.getStatus().isOK());
+
+ return swResponses;
}
- } while (numAttempts < kMaxNumStaleVersionRetries && !swResponses.getStatus().isOK());
- return swResponses;
+
+ default:
+ MONGO_UNREACHABLE;
+ }
}
bool appendRawResponses(OperationContext* opCtx,
diff --git a/src/mongo/s/commands/cluster_commands_helpers.h b/src/mongo/s/commands/cluster_commands_helpers.h
index c9865a5da33..184be2d6063 100644
--- a/src/mongo/s/commands/cluster_commands_helpers.h
+++ b/src/mongo/s/commands/cluster_commands_helpers.h
@@ -47,6 +47,12 @@ class CachedDatabaseInfo;
class OperationContext;
class ShardId;
+/*
+ * Allows callers of routing functions to specify a preferred targeting policy. See scatterGather
+ * for a usage example.
+ */
+enum class ShardTargetingPolicy { UseRoutingTable, BroadcastToAllShards };
+
/**
* This function appends the provided writeConcernError BSONElement to the sharded response.
*/
@@ -59,44 +65,39 @@ void appendWriteConcernErrorToCmdResponse(const ShardId& shardID,
BSONObj appendShardVersion(const BSONObj& cmdObj, ChunkVersion version);
/**
- * Broadcasts 'cmdObj' to all shards and returns the responses as a vector.
+ * Generic function for dispatching commands to the cluster.
*
- * Returns a non-OK status if a failure occurs on *this* node during execution.
- * Otherwise, returns success and a list of responses from shards (including errors from the shards
- * or errors reaching the shards).
- */
-StatusWith<std::vector<AsyncRequestsSender::Response>> scatterGather(
- OperationContext* opCtx,
- const std::string& dbName,
- const BSONObj& cmdObj,
- const ReadPreferenceSetting& readPref);
-
-/**
- * Uses the routing table cache to broadcast a command on a namespace. By default, attaches
- * shardVersions to the outgoing requests to shards, and retargets and retries if it receives a
- * stale shardVersion error from any shard.
+ * If 'targetPolicy' is ShardTargetingPolicy::BroadcastToAllShards, the command will be sent
+ * unversioned to all shards and run on database 'dbName'. The 'query', 'collation' and
+ * 'appendShardVersion' arguments, if supplied, will be ignored.
*
- * If 'query' is specified, only shards that own data for the namespace are targeted. Otherwise,
- * all shards are targeted.
+ * If 'targetPolicy' is ShardTargetingPolicy::UseRoutingTable, the routing table cache will be used
+ * to determine which shards the command should be dispatched to. If the namespace specified by
+ * 'nss' is an unsharded collection, the command will be sent to the Primary shard for the database.
+ * If 'query' is specified, only shards that own data needed by the query are targeted; otherwise,
+ * all shards are targeted. By default, shardVersions are attached to the outgoing requests, and the
+ * function will re-target and retry if it receives a stale shardVersion error from any shard.
*
* Returns a non-OK status if a failure occurs on *this* node during execution or on seeing an error
* from a shard that means the operation as a whole should fail, such as a exceeding retries for
* stale shardVersion errors.
+ *
+ * If a shard returns an error saying that the request was on a view, the shard will also return a
+ * view definition. This will be stored in the BSONObj* viewDefinition argument, if non-null, so
+ * that the caller can re-run the operation as an aggregation.
+ *
* Otherwise, returns success and a list of responses from shards (including errors from the shards
* or errors reaching the shards).
- *
- * @appendShardVersion: if false, does not attach shardVersions to the outgoing requests.
- * @viewDefinition: if a shard returns an error saying that the request was on a view, the shard
- * will also return a view definition. The returned viewDefinition is stored in
- * this parameter, so that the caller can re-run the operation as an aggregation.
*/
-StatusWith<std::vector<AsyncRequestsSender::Response>> scatterGatherForNamespace(
+StatusWith<std::vector<AsyncRequestsSender::Response>> scatterGather(
OperationContext* opCtx,
- const NamespaceString& nss,
+ const std::string& dbName,
+ const boost::optional<NamespaceString> nss,
const BSONObj& cmdObj,
const ReadPreferenceSetting& readPref,
- const boost::optional<BSONObj> query,
- const boost::optional<BSONObj> collation,
+ const ShardTargetingPolicy targetPolicy = ShardTargetingPolicy::UseRoutingTable,
+ const boost::optional<BSONObj> query = boost::none,
+ const boost::optional<BSONObj> collation = boost::none,
const bool appendShardVersion = true,
BSONObj* viewDefinition = nullptr);
diff --git a/src/mongo/s/commands/cluster_count_cmd.cpp b/src/mongo/s/commands/cluster_count_cmd.cpp
index f1d822bf700..e21dc7bad41 100644
--- a/src/mongo/s/commands/cluster_count_cmd.cpp
+++ b/src/mongo/s/commands/cluster_count_cmd.cpp
@@ -141,14 +141,16 @@ public:
auto countCmdObj = countCmdBuilder.done();
BSONObj viewDefinition;
- auto swShardResponses = scatterGatherForNamespace(opCtx,
- nss,
- countCmdObj,
- ReadPreferenceSetting::get(opCtx),
- filter,
- collation,
- true, // do shard versioning
- &viewDefinition);
+ auto swShardResponses = scatterGather(opCtx,
+ dbname,
+ nss,
+ countCmdObj,
+ ReadPreferenceSetting::get(opCtx),
+ ShardTargetingPolicy::UseRoutingTable,
+ filter,
+ collation,
+ true, // do shard versioning
+ &viewDefinition);
if (ErrorCodes::CommandOnShardedViewNotSupportedOnMongod == swShardResponses.getStatus()) {
if (viewDefinition.isEmpty()) {
@@ -269,14 +271,16 @@ public:
Timer timer;
BSONObj viewDefinition;
- auto swShardResponses = scatterGatherForNamespace(opCtx,
- nss,
- explainCmd,
- ReadPreferenceSetting::get(opCtx),
- targetingQuery,
- targetingCollation,
- true, // do shard versioning
- &viewDefinition);
+ auto swShardResponses = scatterGather(opCtx,
+ dbname,
+ nss,
+ explainCmd,
+ ReadPreferenceSetting::get(opCtx),
+ ShardTargetingPolicy::UseRoutingTable,
+ targetingQuery,
+ targetingCollation,
+ true, // do shard versioning
+ &viewDefinition);
long long millisElapsed = timer.millis();
diff --git a/src/mongo/s/commands/cluster_current_op.cpp b/src/mongo/s/commands/cluster_current_op.cpp
index c6eda43af9a..2c71acb2c27 100644
--- a/src/mongo/s/commands/cluster_current_op.cpp
+++ b/src/mongo/s/commands/cluster_current_op.cpp
@@ -88,8 +88,10 @@ public:
auto shardResponses =
uassertStatusOK(scatterGather(opCtx,
dbName,
+ boost::none,
filterCommandRequestForPassthrough(cmdObj),
- ReadPreferenceSetting::get(opCtx)));
+ ReadPreferenceSetting::get(opCtx),
+ ShardTargetingPolicy::BroadcastToAllShards));
if (!appendRawResponses(opCtx, &errmsg, &output, shardResponses)) {
return false;
}
diff --git a/src/mongo/s/commands/cluster_db_stats_cmd.cpp b/src/mongo/s/commands/cluster_db_stats_cmd.cpp
index 67955f1f0cb..bdf52f52a84 100644
--- a/src/mongo/s/commands/cluster_db_stats_cmd.cpp
+++ b/src/mongo/s/commands/cluster_db_stats_cmd.cpp
@@ -71,8 +71,10 @@ public:
auto shardResponses =
uassertStatusOK(scatterGather(opCtx,
dbName,
+ boost::none,
filterCommandRequestForPassthrough(cmdObj),
- ReadPreferenceSetting::get(opCtx)));
+ ReadPreferenceSetting::get(opCtx),
+ ShardTargetingPolicy::BroadcastToAllShards));
if (!appendRawResponses(opCtx, &errmsg, &output, shardResponses)) {
return false;
}
diff --git a/src/mongo/s/commands/cluster_pipeline_cmd.cpp b/src/mongo/s/commands/cluster_pipeline_cmd.cpp
index e07042da85f..05fa2a8d493 100644
--- a/src/mongo/s/commands/cluster_pipeline_cmd.cpp
+++ b/src/mongo/s/commands/cluster_pipeline_cmd.cpp
@@ -62,8 +62,8 @@ public:
Status checkAuthForCommand(Client* client,
const std::string& dbname,
const BSONObj& cmdObj) override {
- const NamespaceString nss(parseNsCollectionRequired(dbname, cmdObj));
- return AuthorizationSession::get(client)->checkAuthForAggregate(nss, cmdObj);
+ const NamespaceString nss(AggregationRequest::parseNs(dbname, cmdObj));
+ return AuthorizationSession::get(client)->checkAuthForAggregate(nss, cmdObj, true);
}
bool run(OperationContext* opCtx,
@@ -89,16 +89,13 @@ private:
const BSONObj& cmdObj,
boost::optional<ExplainOptions::Verbosity> verbosity,
BSONObjBuilder* result) {
- NamespaceString nss(parseNsCollectionRequired(dbname, cmdObj));
-
const auto aggregationRequest =
- uassertStatusOK(AggregationRequest::parseFromBSON(nss, cmdObj, verbosity));
+ uassertStatusOK(AggregationRequest::parseFromBSON(dbname, cmdObj, verbosity));
+
+ const auto& nss = aggregationRequest.getNamespaceString();
- return ClusterAggregate::runAggregate(opCtx,
- ClusterAggregate::Namespaces{nss, std::move(nss)},
- aggregationRequest,
- cmdObj,
- result);
+ return ClusterAggregate::runAggregate(
+ opCtx, ClusterAggregate::Namespaces{nss, nss}, aggregationRequest, cmdObj, result);
}
} clusterPipelineCmd;
diff --git a/src/mongo/s/commands/commands_public.cpp b/src/mongo/s/commands/commands_public.cpp
index cf1accb4ccc..a3cadb1ddb7 100644
--- a/src/mongo/s/commands/commands_public.cpp
+++ b/src/mongo/s/commands/commands_public.cpp
@@ -224,13 +224,15 @@ protected:
}
auto shardResponses =
- uassertStatusOK(scatterGatherForNamespace(opCtx,
- nss,
- filterCommandRequestForPassthrough(cmdObj),
- ReadPreferenceSetting::get(opCtx),
- boost::none, // filter
- boost::none, // collation
- _appendShardVersion));
+ uassertStatusOK(scatterGather(opCtx,
+ dbName,
+ nss,
+ filterCommandRequestForPassthrough(cmdObj),
+ ReadPreferenceSetting::get(opCtx),
+ ShardTargetingPolicy::UseRoutingTable,
+ boost::none, // filter
+ boost::none, // collation
+ _appendShardVersion));
return appendRawResponses(opCtx, &errmsg, &output, std::move(shardResponses));
}
@@ -327,7 +329,7 @@ public:
const std::string& dbname,
const BSONObj& cmdObj) {
const NamespaceString nss(parseNsCollectionRequired(dbname, cmdObj));
- return AuthorizationSession::get(client)->checkAuthForCollMod(nss, cmdObj);
+ return AuthorizationSession::get(client)->checkAuthForCollMod(nss, cmdObj, true);
}
virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
@@ -421,7 +423,7 @@ public:
const std::string& dbname,
const BSONObj& cmdObj) override {
const NamespaceString nss(parseNsCollectionRequired(dbname, cmdObj));
- return AuthorizationSession::get(client)->checkAuthForCreate(nss, cmdObj);
+ return AuthorizationSession::get(client)->checkAuthForCreate(nss, cmdObj, true);
}
bool supportsWriteConcern(const BSONObj& cmd) const override {
@@ -1151,14 +1153,16 @@ public:
Timer timer;
BSONObj viewDefinition;
- auto swShardResponses = scatterGatherForNamespace(opCtx,
- nss,
- explainCmd,
- ReadPreferenceSetting::get(opCtx),
- targetingQuery,
- targetingCollation,
- true, // do shard versioning
- &viewDefinition);
+ auto swShardResponses = scatterGather(opCtx,
+ dbname,
+ nss,
+ explainCmd,
+ ReadPreferenceSetting::get(opCtx),
+ ShardTargetingPolicy::UseRoutingTable,
+ targetingQuery,
+ targetingCollation,
+ true, // do shard versioning
+ &viewDefinition);
long long millisElapsed = timer.millis();
diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp
index 3a875e578a4..c6e69ee1bdd 100644
--- a/src/mongo/s/commands/strategy.cpp
+++ b/src/mongo/s/commands/strategy.cpp
@@ -698,14 +698,16 @@ Status Strategy::explainFind(OperationContext* opCtx,
Timer timer;
BSONObj viewDefinition;
- auto swShardResponses = scatterGatherForNamespace(opCtx,
- qr.nss(),
- explainCmd,
- readPref,
- qr.getFilter(),
- qr.getCollation(),
- true, // do shard versioning
- &viewDefinition);
+ auto swShardResponses = scatterGather(opCtx,
+ qr.nss().db().toString(),
+ qr.nss(),
+ explainCmd,
+ readPref,
+ ShardTargetingPolicy::UseRoutingTable,
+ qr.getFilter(),
+ qr.getCollation(),
+ true, // do shard versioning
+ &viewDefinition);
long long millisElapsed = timer.millis();