diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2017-06-16 11:10:59 +0100 |
---|---|---|
committer | Bernard Gorman <bernard.gorman@gmail.com> | 2017-06-16 22:57:40 +0100 |
commit | 69fd148aafff5bc9c33596b75560f05a6a260f7a (patch) | |
tree | cdfacbaafae55ea7741766516130eeba5a8da181 | |
parent | e5eb1981a68bfc3250b72ca14a9131e2749b4cf7 (diff) | |
download | mongo-69fd148aafff5bc9c33596b75560f05a6a260f7a.tar.gz |
SERVER-19318 Allow $currentOp aggregations to be run on mongoS
22 files changed, 782 insertions, 556 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_auth.yml b/buildscripts/resmokeconfig/suites/sharding_auth.yml index beaa46b226f..78ed6fd8db5 100644 --- a/buildscripts/resmokeconfig/suites/sharding_auth.yml +++ b/buildscripts/resmokeconfig/suites/sharding_auth.yml @@ -11,6 +11,7 @@ selector: exclude_files: # Skip any tests that run with auth explicitly. - jstests/sharding/*[aA]uth*.js + - jstests/sharding/aggregation_currentop.js # SERVER-19318 # Skip these additional tests when running with auth enabled. - jstests/sharding/copydb_from_mongos.js # SERVER-13080 - jstests/sharding/parallel.js diff --git a/buildscripts/resmokeconfig/suites/sharding_auth_audit.yml b/buildscripts/resmokeconfig/suites/sharding_auth_audit.yml index c6713f0a088..cca681230a4 100644 --- a/buildscripts/resmokeconfig/suites/sharding_auth_audit.yml +++ b/buildscripts/resmokeconfig/suites/sharding_auth_audit.yml @@ -11,6 +11,7 @@ selector: exclude_files: # Skip any tests that run with auth explicitly. - jstests/sharding/*[aA]uth*.js + - jstests/sharding/aggregation_currentop.js # SERVER-19318 # Skip these additional tests when running with auth enabled. - jstests/sharding/copydb_from_mongos.js # SERVER-13080 - jstests/sharding/parallel.js diff --git a/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml b/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml index 30e0da2764a..4930e255dea 100644 --- a/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml +++ b/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml @@ -10,6 +10,7 @@ selector: - jstests/sharding/localhostAuthBypass.js - jstests/sharding/mongos_rs_auth_shard_failure_tolerance.js - jstests/sharding/mrShardedOutputAuth.js + - jstests/sharding/aggregation_currentop.js # Count/write/aggregate/group commands against the config shard do not support retries yet - jstests/sharding/addshard1.js - jstests/sharding/addshard2.js diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml index eff67db9902..9fb0e5d93a7 100644 --- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml +++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml @@ -32,6 +32,9 @@ selector: - jstests/sharding/migration_ignore_interrupts_4.js - jstests/sharding/mongos_query_comment.js - jstests/sharding/sharded_profile.js + # SERVER-19318: added new $currentOp agg stage in 3.6 + - jstests/sharding/aggregation_currentop.js + - jstests/sharding/read_pref_cmd.js executor: config: diff --git a/jstests/auth/lib/commands_lib.js b/jstests/auth/lib/commands_lib.js index 1751a33cb37..a71a8af6fca 100644 --- a/jstests/auth/lib/commands_lib.js +++ b/jstests/auth/lib/commands_lib.js @@ -640,7 +640,6 @@ var authCommandsLib = { { testname: "aggregate_currentOp_allUsers_true", command: {aggregate: 1, pipeline: [{$currentOp: {allUsers: true}}], cursor: {}}, - skipSharded: true, testcases: [ { runOnDb: adminDbName, @@ -658,8 +657,8 @@ var authCommandsLib = { { testname: "aggregate_currentOp_allUsers_false", command: {aggregate: 1, pipeline: [{$currentOp: {allUsers: false}}], cursor: {}}, - skipSharded: true, - testcases: [{runOnDb: adminDbName, roles: roles_all}] + testcases: [{runOnDb: adminDbName, roles: roles_all}], + skipSharded: true }, { testname: "aggregate_lookup", diff --git a/jstests/noPassthrough/aggregation_currentop.js b/jstests/noPassthrough/aggregation_currentop.js deleted file mode 100644 index ed554402d4f..00000000000 --- a/jstests/noPassthrough/aggregation_currentop.js +++ /dev/null @@ -1,347 +0,0 @@ -/** - * Tests that the $currentOp aggregation stage behaves as expected. Specifically: - * - It must be the fist stage in the pipeline. - * - It can only be run on admin, and the "aggregate" field must be 1. - * - Only active connections are shown unless {idleConnections: true} is specified. - * - A user without the inprog privilege can see their own ops, but no-one else's. - * - A user with the inprog privilege can see all ops. - * - Non-local readConcerns are rejected. - * - Collation rules are respected. - * - * This test requires replica set configuration and user credentials to persist across a restart. - * @tags: [requires_persistence] - */ -(function() { - "use strict"; - - const key = "jstests/libs/key1"; - - // Create a new replica set for testing. We set the internalQueryExecYieldIterations parameter - // so that plan execution yields on every iteration. For some tests, we will temporarily set - // yields to hang the mongod so we can capture particular operations in the currentOp output. - const rst = new ReplSetTest({ - name: jsTestName(), - nodes: 3, - keyFile: key, - nodeOptions: {setParameter: {internalQueryExecYieldIterations: 1}} - }); - - const nodes = rst.nodeList(); - - rst.startSet(); - rst.initiate({ - _id: jsTestName(), - members: [ - {_id: 0, host: nodes[0], priority: 1}, - {_id: 1, host: nodes[1], priority: 0}, - {_id: 2, host: nodes[2], arbiterOnly: true} - ], - }); - - let primary = rst.getPrimary(); - - let testDB = primary.getDB(jsTestName()); - let adminDB = primary.getDB("admin"); - - // Create an admin user, one user with the inprog privilege, and one without. - assert.commandWorked(adminDB.runCommand({createUser: "admin", pwd: "pwd", roles: ["root"]})); - assert(adminDB.auth("admin", "pwd")); - - assert.commandWorked(adminDB.runCommand({ - createRole: "role_inprog", - roles: [], - privileges: [{resource: {cluster: true}, actions: ["inprog"]}] - })); - - assert.commandWorked(adminDB.runCommand( - {createUser: "user_inprog", pwd: "pwd", roles: ["role_inprog", "readAnyDatabase"]})); - - assert.commandWorked( - adminDB.runCommand({createUser: "user_no_inprog", pwd: "pwd", roles: ["readAnyDatabase"]})); - - // Create some dummy test data. - testDB.test.drop(); - - for (let i = 0; i < 5; i++) { - assert.writeOK(testDB.test.insert({_id: i, a: i})); - } - - // Functions to support running an operation in a parallel shell for testing allUsers behaviour. - function runInParallelShell({testfunc, username, password}) { - TestData.aggCurOpTest = testfunc; - TestData.aggCurOpUser = username; - TestData.aggCurOpPwd = password; - - assert.commandWorked( - adminDB.runCommand({configureFailPoint: "setYieldAllLocksHang", mode: "alwaysOn"})); - - testfunc = function() { - db.getSiblingDB("admin").auth(TestData.aggCurOpUser, TestData.aggCurOpPwd); - TestData.aggCurOpTest(); - db.getSiblingDB("admin").logout(); - }; - - return startParallelShell(testfunc, primary.port); - } - - function assertCurrentOpHasSingleMatchingEntry({currentOpAggFilter, curOpOpts}) { - curOpOpts = (curOpOpts || {allUsers: true}); - - let result = null; - - assert.soon( - function() { - result = adminDB.runCommand({ - aggregate: 1, - pipeline: [{$currentOp: curOpOpts}, {$match: currentOpAggFilter}], - cursor: {} - }); - - assert.commandWorked(result); - - return (result.cursor.firstBatch.length === 1); - }, - function() { - return "Failed to find operation in $currentOp output: " + tojson(result); - }); - } - - function waitForParallelShell(awaitShell) { - assert.commandWorked( - adminDB.runCommand({configureFailPoint: "setYieldAllLocksHang", mode: "off"})); - - awaitShell(); - } - - /** - * Restarts a replica set with additional parameters, and optionally re-authenticates. - */ - function restartReplSet(replSet, newOpts, user, pwd) { - const numNodes = replSet.nodeList().length; - - for (let n = 0; n < numNodes; n++) { - replSet.restart(n, newOpts); - } - - primary = replSet.getPrimary(); - replSet.awaitSecondaryNodes(); - - testDB = primary.getDB(jsTestName()); - adminDB = primary.getDB("admin"); - - if (user && pwd) { - adminDB.auth(user, pwd); - } - } - - // - // Authenticate as user_no_inprog. - // - assert(adminDB.logout()); - assert(adminDB.auth("user_no_inprog", "pwd")); - - // Test that $currentOp fails with {allUsers: true} for a user without the "inprog" privilege. - assert.commandFailedWithCode( - adminDB.runCommand({aggregate: 1, pipeline: [{$currentOp: {allUsers: true}}], cursor: {}}), - ErrorCodes.Unauthorized); - - // Test that $currentOp succeeds with {allUsers: false} for a user without the "inprog" - // privilege. - assert.commandWorked(adminDB.runCommand( - {aggregate: 1, pipeline: [{$currentOp: {allUsers: false}}], cursor: {}})); - - // Test that $currentOp fails when run as {aggregate: 1} on a database other than admin. - assert.commandFailedWithCode( - testDB.runCommand({aggregate: 1, pipeline: [{$currentOp: {}}], cursor: {}}), - ErrorCodes.InvalidNamespace); - - // Test that $currentOp fails when run on admin without {aggregate: 1}. - assert.commandFailedWithCode( - adminDB.runCommand({aggregate: "collname", pipeline: [{$currentOp: {}}], cursor: {}}), - ErrorCodes.InvalidNamespace); - - // Test that $currentOp accepts all numeric types. - const ones = [1, 1.0, NumberInt(1), NumberLong(1), NumberDecimal(1)]; - - for (let one of ones) { - assert.commandWorked( - adminDB.runCommand({aggregate: one, pipeline: [{$currentOp: {}}], cursor: {}})); - } - - // Test that {aggregate: 1} fails when the first stage in the pipeline is not $currentOp. - assert.commandFailedWithCode( - adminDB.runCommand({aggregate: 1, pipeline: [{$match: {}}], cursor: {}}), - ErrorCodes.InvalidNamespace); - - // Test that $currentOp fails when it is not the first stage in the pipeline. We use two - // $currentOp stages since any other stage in the initial position will trip the {aggregate: 1} - // namespace check. - assert.commandFailedWithCode( - adminDB.runCommand( - {aggregate: 1, pipeline: [{$currentOp: {}}, {$currentOp: {}}], cursor: {}}), - ErrorCodes.BadValue); - - // Test that $currentOp succeeds if local readConcern is specified. - assert.commandWorked(adminDB.runCommand( - {aggregate: 1, pipeline: [{$currentOp: {}}], readConcern: {level: "local"}, cursor: {}})); - - // Test that $currentOp fails if a non-local readConcern is specified. - assert.commandFailedWithCode(adminDB.runCommand({ - aggregate: 1, - pipeline: [{$currentOp: {}}], - readConcern: {level: "linearizable"}, - cursor: {} - }), - ErrorCodes.InvalidOptions); - - // Test that a user without the inprog privilege cannot see another user's operations. - // Temporarily log in as 'user_inprog' to validate that the op is present in $currentOp output. - assert(adminDB.logout()); - assert(adminDB.auth("user_inprog", "pwd")); - - let awaitShell = runInParallelShell({ - testfunc: function() { - assert.eq(db.getSiblingDB(jsTestName()) - .test.find({}) - .comment("agg_current_op_allusers_test") - .itcount(), - 5); - }, - username: "user_inprog", - password: "pwd" - }); - - assertCurrentOpHasSingleMatchingEntry({ - currentOpAggFilter: {"command.comment": "agg_current_op_allusers_test"}, - curOpOpts: {allUsers: true} - }); - - // Log back in as 'user_no_inprog' and validate that the user cannot see the op. - assert(adminDB.logout()); - assert(adminDB.auth("user_no_inprog", "pwd")); - - assert.eq(adminDB - .runCommand({ - aggregate: 1, - pipeline: [ - {$currentOp: {allUsers: false}}, - {$match: {"command.comment": "agg_current_op_allusers_test"}} - ], - cursor: {} - }) - .cursor.firstBatch.length, - 0); - - waitForParallelShell(awaitShell); - - // - // Authenticate as user_inprog. - // - assert(adminDB.logout()); - assert(adminDB.auth("user_inprog", "pwd")); - - // Test that $currentOp with {allUsers: true} succeeds for a user with the "inprog" - // privilege. - assert.commandWorked( - adminDB.runCommand({aggregate: 1, pipeline: [{$currentOp: {allUsers: true}}], cursor: {}})); - - // Test that {idleConnections: false} returns only active connections. - assert.eq(adminDB - .runCommand({ - aggregate: 1, - pipeline: [ - {$currentOp: {allUsers: true, idleConnections: false}}, - {$match: {"active": false}} - ], - cursor: {} - }) - .cursor.firstBatch.length, - 0); - - // Test that {idleConnections: true} returns inactive connections. - const idleConn = new Mongo(primary.host); - - assert.gte(adminDB - .runCommand({ - aggregate: 1, - pipeline: [ - {$currentOp: {allUsers: true, idleConnections: true}}, - {$match: {active: false}} - ], - cursor: {} - }) - .cursor.firstBatch.length, - 1); - - // Test that a user with the inprog privilege can see another user's operations with - // {allUsers: true} - awaitShell = runInParallelShell({ - testfunc: function() { - assert.eq(db.getSiblingDB(jsTestName()) - .test.find({}) - .comment("agg_current_op_allusers_test") - .itcount(), - 5); - }, - username: "user_no_inprog", - password: "pwd" - }); - - assertCurrentOpHasSingleMatchingEntry( - {currentOpAggFilter: {"command.comment": "agg_current_op_allusers_test"}}); - - waitForParallelShell(awaitShell); - - // Test that collation rules apply to matches on $currentOp output. - assert.eq( - adminDB - .runCommand({ - aggregate: 1, - pipeline: - [{$currentOp: {}}, {$match: {"command.comment": "AGG_currént_op_COLLATION"}}], - collation: {locale: "en_US", strength: 1}, // Case and diacritic insensitive. - comment: "agg_current_op_collation", - cursor: {} - }) - .cursor.firstBatch.length, - 1); - - // Test that $currentOp is explainable. - const explainPlan = assert.commandWorked(adminDB.runCommand({ - aggregate: 1, - pipeline: - [{$currentOp: {idleConnections: true, allUsers: false}}, {$match: {desc: "test"}}], - explain: true - })); - - const expectedStages = - [{$currentOp: {idleConnections: true, allUsers: false}}, {$match: {desc: "test"}}]; - - assert.eq(explainPlan.stages, expectedStages); - - // Test that the allUsers parameter is ignored when authentication is disabled. - restartReplSet(rst, {keyFile: null}); - - // Ensure that there is at least one other connection present. - const otherConn = new Mongo(primary.host); - - // Verify that $currentOp displays all operations when auth is disabled regardless of the - // allUsers parameter, by checking that the output is the same in both cases. We project static - // fields from each operation so that a thread which becomes active between the two aggregations - // is still comparable across the output of both. - let aggCmd = { - aggregate: 1, - pipeline: [ - {$currentOp: {allUsers: true, idleConnections: true}}, - {$project: {desc: 1, threadId: 1, connectionId: 1, appName: 1}}, - {$sort: {threadId: 1}} - ], - cursor: {} - }; - - const aggAllUsersTrue = assert.commandWorked(adminDB.runCommand(aggCmd)); - aggCmd.pipeline[0].$currentOp.allUsers = false; - const aggAllUsersFalse = assert.commandWorked(adminDB.runCommand(aggCmd)); - - assert.eq(aggAllUsersFalse.cursor.firstBatch, aggAllUsersTrue.cursor.firstBatch); -})(); diff --git a/jstests/sharding/aggregation_currentop.js b/jstests/sharding/aggregation_currentop.js new file mode 100644 index 00000000000..e848ce7beee --- /dev/null +++ b/jstests/sharding/aggregation_currentop.js @@ -0,0 +1,439 @@ +/** + * Tests that the $currentOp aggregation stage behaves as expected. Specifically: + * - It must be the fist stage in the pipeline. + * - It can only be run on admin, and the "aggregate" field must be 1. + * - Only active connections are shown unless {idleConnections: true} is specified. + * - A user without the inprog privilege can see their own ops, but no-one else's. + * - A user with the inprog privilege can see all ops. + * - Non-local readConcerns are rejected. + * - Collation rules are respected. + * + * This test requires replica set configuration and user credentials to persist across a restart. + * @tags: [requires_persistence] + */ +(function() { + "use strict"; + + const key = "jstests/libs/key1"; + + // Create a new sharded cluster for testing. We set the internalQueryExecYieldIterations + // parameter so that plan execution yields on every iteration. For some tests, we will + // temporarily set yields to hang the mongod so we can capture particular operations in the + // currentOp output. + const st = new ShardingTest({ + name: jsTestName(), + keyFile: key, + shards: 3, + rs: { + nodes: [ + {rsConfig: {priority: 1}}, + {rsConfig: {priority: 0}}, + {rsConfig: {arbiterOnly: true}} + ], + setParameter: {internalQueryExecYieldIterations: 1} + } + }); + + // Assign various elements of the cluster. We will use shard rs0 to test replica-set level + // $currentOp behaviour. + let shardConn = st.rs0.getPrimary(); + const mongosConn = st.s; + const shardRS = st.rs0; + + const clusterTestDB = mongosConn.getDB(jsTestName()); + const clusterAdminDB = mongosConn.getDB("admin"); + let shardAdminDB = shardConn.getDB("admin"); + + function createUsers(conn) { + let adminDB = conn.getDB("admin"); + + // Create an admin user, one user with the inprog privilege, and one without. + assert.commandWorked( + adminDB.runCommand({createUser: "admin", pwd: "pwd", roles: ["root"]})); + assert(adminDB.auth("admin", "pwd")); + + assert.commandWorked(adminDB.runCommand({ + createRole: "role_inprog", + roles: [], + privileges: [{resource: {cluster: true}, actions: ["inprog"]}] + })); + + assert.commandWorked(adminDB.runCommand( + {createUser: "user_inprog", pwd: "pwd", roles: ["role_inprog", "readAnyDatabase"]})); + + assert.commandWorked(adminDB.runCommand( + {createUser: "user_no_inprog", pwd: "pwd", roles: ["readAnyDatabase"]})); + } + + // Create necessary users at both cluster and shard-local level. + createUsers(shardConn); + createUsers(mongosConn); + + // Create a test database and some dummy data on rs0. + assert(clusterAdminDB.auth("admin", "pwd")); + + for (let i = 0; i < 5; i++) { + assert.writeOK(clusterTestDB.test.insert({_id: i, a: i})); + } + + st.ensurePrimaryShard(clusterTestDB.getName(), shardRS.name); + + // Run a command on the specified database and return a cursor over the result. + function cmdCursor(inputDB, cmd) { + return new DBCommandCursor(inputDB.getMongo(), + assert.commandWorked(inputDB.runCommand(cmd))); + } + + // Restarts a replica set with additional parameters, and optionally re-authenticates. + function restartReplSet(replSet, newOpts, user, pwd) { + const numNodes = replSet.nodeList().length; + + for (let n = 0; n < numNodes; n++) { + replSet.restart(n, newOpts); + } + + shardConn = replSet.getPrimary(); + replSet.awaitSecondaryNodes(); + + shardAdminDB = shardConn.getDB("admin"); + + if (user && pwd) { + shardAdminDB.auth(user, pwd); + } + } + + // Functions to support running an operation in a parallel shell for testing allUsers behaviour. + function runInParallelShell({conn, testfunc, username, password}) { + TestData.aggCurOpTest = testfunc; + TestData.aggCurOpUser = username; + TestData.aggCurOpPwd = password; + + assert.commandWorked(conn.getDB("admin").runCommand( + {configureFailPoint: "setYieldAllLocksHang", mode: "alwaysOn"})); + + testfunc = function() { + db.getSiblingDB("admin").auth(TestData.aggCurOpUser, TestData.aggCurOpPwd); + TestData.aggCurOpTest(); + db.getSiblingDB("admin").logout(); + }; + + return startParallelShell(testfunc, conn.port); + } + + function assertCurrentOpHasSingleMatchingEntry({conn, currentOpAggFilter, curOpOpts}) { + curOpOpts = (curOpOpts || {allUsers: true}); + + const connAdminDB = conn.getDB("admin"); + + assert.soon( + function() { + return cmdCursor(connAdminDB, { + aggregate: 1, + pipeline: [{$currentOp: curOpOpts}, {$match: currentOpAggFilter}], + cursor: {} + }).itcount() === 1; + }, + function() { + const curOps = cmdCursor( + connAdminDB, {aggregate: 1, pipeline: [{$currentOp: curOpOpts}], cursor: {}}); + + return "Failed to find operation " + tojson(currentOpAggFilter) + + " in $currentOp output: " + tojson(curOps.toArray()); + }); + } + + function waitForParallelShell(conn, awaitShell) { + assert.commandWorked(conn.getDB("admin").runCommand( + {configureFailPoint: "setYieldAllLocksHang", mode: "off"})); + + awaitShell(); + } + + // Runs a suite of tests for behaviour that is common to both the replica set and cluster + // levels. + function runCommonTests(conn) { + const testDB = conn.getDB(jsTestName()); + const adminDB = conn.getDB("admin"); + + const isMongos = (conn == mongosConn); + + // Test that an unauthenticated connection cannot run $currentOp even with {allUsers: + // false}. + assert(adminDB.logout()); + + assert.commandFailedWithCode( + adminDB.runCommand( + {aggregate: 1, pipeline: [{$currentOp: {allUsers: false}}], cursor: {}}), + ErrorCodes.Unauthorized); + + // + // Authenticate as user_no_inprog. + // + assert(adminDB.logout()); + assert(adminDB.auth("user_no_inprog", "pwd")); + + // Test that $currentOp fails with {allUsers: true} for a user without the "inprog" + // privilege. + assert.commandFailedWithCode( + adminDB.runCommand( + {aggregate: 1, pipeline: [{$currentOp: {allUsers: true}}], cursor: {}}), + ErrorCodes.Unauthorized); + + // + // Authenticate as user_inprog. + // + assert(adminDB.logout()); + assert(adminDB.auth("user_inprog", "pwd")); + + // Test that $currentOp fails when run as {aggregate: 1} on a database other than admin. + assert.commandFailedWithCode( + testDB.runCommand({aggregate: 1, pipeline: [{$currentOp: {}}], cursor: {}}), + ErrorCodes.InvalidNamespace); + + // Test that $currentOp fails when run on admin without {aggregate: 1}. + assert.commandFailedWithCode( + adminDB.runCommand({aggregate: "collname", pipeline: [{$currentOp: {}}], cursor: {}}), + ErrorCodes.InvalidNamespace); + + // Test that $currentOp accepts all numeric types. + const ones = [1, 1.0, NumberInt(1), NumberLong(1), NumberDecimal(1)]; + + for (let one of ones) { + assert.commandWorked( + adminDB.runCommand({aggregate: one, pipeline: [{$currentOp: {}}], cursor: {}})); + } + + // Test that {aggregate: 1} fails when the first stage in the pipeline is not $currentOp. + assert.commandFailedWithCode( + adminDB.runCommand({aggregate: 1, pipeline: [{$match: {}}], cursor: {}}), + ErrorCodes.InvalidNamespace); + + // Test that $currentOp fails when it is not the first stage in the pipeline. We use two + // $currentOp stages since any other stage in the initial position will trip the {aggregate: + // 1} namespace check. + assert.commandFailedWithCode( + adminDB.runCommand( + {aggregate: 1, pipeline: [{$currentOp: {}}, {$currentOp: {}}], cursor: {}}), + ErrorCodes.BadValue); + + // Test that $currentOp with {allUsers: true} succeeds for a user with the "inprog" + // privilege. + assert.commandWorked(adminDB.runCommand( + {aggregate: 1, pipeline: [{$currentOp: {allUsers: true}}], cursor: {}})); + + // Test that $currentOp succeeds if local readConcern is specified. + assert.commandWorked(adminDB.runCommand({ + aggregate: 1, + pipeline: [{$currentOp: {}}], + readConcern: {level: "local"}, + cursor: {} + })); + + // Test that $currentOp fails if a non-local readConcern is specified. + assert.commandFailedWithCode(adminDB.runCommand({ + aggregate: 1, + pipeline: [{$currentOp: {}}], + readConcern: {level: "linearizable"}, + cursor: {} + }), + ErrorCodes.InvalidOptions); + + // Test that {idleConnections: false} returns only active connections. + const idleConn = new Mongo(conn.host); + + assert.eq(cmdCursor(adminDB, { + aggregate: 1, + pipeline: [ + {$currentOp: {allUsers: true, idleConnections: false}}, + {$match: {"active": false}} + ], + cursor: {} + }).itcount(), + 0); + + // Test that {idleConnections: true} returns inactive connections. + assert.gte(cmdCursor(adminDB, { + aggregate: 1, + pipeline: [ + {$currentOp: {allUsers: true, idleConnections: true}}, + {$match: {active: false}} + ], + cursor: {} + }).itcount(), + 1); + + // Test that collation rules apply to matches on $currentOp output. + const matchField = (isMongos ? "originatingCommand.comment" : "command.comment"); + const numExpectedMatches = (isMongos ? 3 : 1); + + assert.eq( + cmdCursor(adminDB, { + aggregate: 1, + pipeline: [{$currentOp: {}}, {$match: {[matchField]: "AGG_currént_op_COLLATION"}}], + collation: {locale: "en_US", strength: 1}, // Case and diacritic insensitive. + comment: "agg_current_op_collation", + cursor: {} + }).itcount(), + numExpectedMatches); + + // Test that $currentOp is explainable. + const explainPlan = assert.commandWorked(adminDB.runCommand({ + aggregate: 1, + pipeline: + [{$currentOp: {idleConnections: true, allUsers: false}}, {$match: {desc: "test"}}], + explain: true + })); + + const expectedStages = + [{$currentOp: {idleConnections: true, allUsers: false}}, {$match: {desc: "test"}}]; + + if (isMongos) { + assert.eq(explainPlan.splitPipeline.shardsPart, expectedStages); + + for (let i = 0; i < 3; i++) { + let shardName = st["rs" + i].name; + assert.eq(explainPlan.shards[shardName].stages, expectedStages); + } + } else { + assert.eq(explainPlan.stages, expectedStages); + } + } + + runCommonTests(shardConn); + runCommonTests(mongosConn); + + // + // mongoS specific tests. + // + + // Test that a user without the inprog privilege cannot run cluster $currentOp via mongoS even + // if allUsers is false. + assert(clusterAdminDB.logout()); + assert(clusterAdminDB.auth("user_no_inprog", "pwd")); + + assert.commandFailedWithCode( + clusterAdminDB.runCommand( + {aggregate: 1, pipeline: [{$currentOp: {allUsers: false}}], cursor: {}}), + ErrorCodes.Unauthorized); + + // Test that a $currentOp pipeline returns results from all shards. + assert(clusterAdminDB.logout()); + assert(clusterAdminDB.auth("user_inprog", "pwd")); + + assert.eq(cmdCursor(clusterAdminDB, { + aggregate: 1, + pipeline: [ + {$currentOp: {allUsers: true}}, + {$project: {opid: {$split: ["$opid", ":"]}}}, + {$group: {_id: {$arrayElemAt: ["$opid", 0]}}}, + {$sort: {_id: 1}} + ], + cursor: {} + }).toArray(), + [ + {_id: "aggregation_currentop-rs0"}, + {_id: "aggregation_currentop-rs1"}, + {_id: "aggregation_currentop-rs2"} + ]); + + // + // ReplSet specific tests. + // + + // Test that a user with the inprog privilege can see another user's operations with {allUsers: + // true} when run on a mongoD. + assert(shardAdminDB.logout()); + assert(shardAdminDB.auth("user_inprog", "pwd")); + + let awaitShell = runInParallelShell({ + testfunc: function() { + assert.eq(db.getSiblingDB(jsTestName()) + .test.find({}) + .comment("agg_current_op_allusers_test") + .itcount(), + 5); + }, + conn: shardConn, + username: "user_no_inprog", + password: "pwd" + }); + + assertCurrentOpHasSingleMatchingEntry( + {conn: shardConn, currentOpAggFilter: {"command.comment": "agg_current_op_allusers_test"}}); + + waitForParallelShell(shardConn, awaitShell); + + // Test that $currentOp succeeds with {allUsers: false} for a user without the "inprog" + // privilege when run on a mongoD. + assert(shardAdminDB.logout()); + assert(shardAdminDB.auth("user_no_inprog", "pwd")); + + assert.commandWorked(shardAdminDB.runCommand( + {aggregate: 1, pipeline: [{$currentOp: {allUsers: false}}], cursor: {}})); + + // Test that a user without the inprog privilege cannot see another user's operations. + // Temporarily log in as 'user_inprog' to validate that the op is present in $currentOp output. + assert(shardAdminDB.logout()); + assert(shardAdminDB.auth("user_inprog", "pwd")); + + awaitShell = runInParallelShell({ + testfunc: function() { + assert.eq(db.getSiblingDB(jsTestName()) + .test.find({}) + .comment("agg_current_op_allusers_test") + .itcount(), + 5); + }, + conn: shardConn, + username: "user_inprog", + password: "pwd" + }); + + assertCurrentOpHasSingleMatchingEntry({ + currentOpAggFilter: {"command.comment": "agg_current_op_allusers_test"}, + curOpOpts: {allUsers: true}, + conn: shardConn + }); + + // Log back in as 'user_no_inprog' and validate that the user cannot see the op. + assert(shardAdminDB.logout()); + assert(shardAdminDB.auth("user_no_inprog", "pwd")); + + assert.eq(cmdCursor(shardAdminDB, { + aggregate: 1, + pipeline: [ + {$currentOp: {allUsers: false}}, + {$match: {"command.comment": "agg_current_op_allusers_test"}} + ], + cursor: {} + }).itcount(), + 0); + + waitForParallelShell(shardConn, awaitShell); + + // Test that the allUsers parameter is ignored when authentication is disabled. + restartReplSet(shardRS, {shardsvr: null, keyFile: null}); + + // Ensure that there is at least one other connection present. + const otherConn = new Mongo(shardConn.host); + + // Verify that $currentOp displays all operations when auth is disabled regardless of the + // allUsers parameter, by checking that the output is the same in both cases. We project + // static fields from each operation so that a thread which becomes active between the two + // aggregations is still comparable across the output of both. + let aggCmd = { + aggregate: 1, + pipeline: [ + {$currentOp: {allUsers: true, idleConnections: true}}, + {$project: {desc: 1, threadId: 1, connectionId: 1, appName: 1}}, + {$sort: {threadId: 1}} + ], + cursor: {} + }; + + const aggAllUsersTrue = cmdCursor(shardAdminDB, aggCmd).toArray(); + aggCmd.pipeline[0].$currentOp.allUsers = false; + const aggAllUsersFalse = cmdCursor(shardAdminDB, aggCmd).toArray(); + + assert.eq(aggAllUsersFalse, aggAllUsersTrue); +})(); diff --git a/jstests/sharding/read_pref_cmd.js b/jstests/sharding/read_pref_cmd.js index 02ca8f25b63..a241ec58518 100644 --- a/jstests/sharding/read_pref_cmd.js +++ b/jstests/sharding/read_pref_cmd.js @@ -178,15 +178,12 @@ var testReadPreference = function(conn, hostList, isMongos, mode, tagSets, secEx formatProfileQuery({aggregate: 'mrIn'})); // Test $currentOp aggregation stage. - // TODO SERVER-19318: Remove check once the $currentOp stage is supported on mongos. - if (!isMongos) { - let curOpComment = 'agg_currentOp_' + ObjectId(); + let curOpComment = 'agg_currentOp_' + ObjectId(); - cmdTest({aggregate: 1, pipeline: [{$currentOp: {}}], comment: curOpComment, cursor: {}}, - true, - formatProfileQuery({comment: curOpComment}), - "admin"); - } + cmdTest({aggregate: 1, pipeline: [{$currentOp: {}}], comment: curOpComment, cursor: {}}, + true, + formatProfileQuery({comment: curOpComment}), + "admin"); }; /** diff --git a/src/mongo/db/auth/authorization_session.cpp b/src/mongo/db/auth/authorization_session.cpp index cd2eac0d1cd..6b0410610d3 100644 --- a/src/mongo/db/auth/authorization_session.cpp +++ b/src/mongo/db/auth/authorization_session.cpp @@ -68,7 +68,8 @@ const std::string ADMIN_DBNAME = "admin"; Status checkAuthForCreateOrModifyView(AuthorizationSession* authzSession, const NamespaceString& viewNs, const NamespaceString& viewOnNs, - const BSONArray& viewPipeline) { + const BSONArray& viewPipeline, + bool isMongos) { // It's safe to allow a user to create or modify a view if they can't read it anyway. if (!authzSession->isAuthorizedForActionsOnNamespace(viewNs, ActionType::find)) { return Status::OK(); @@ -78,7 +79,7 @@ Status checkAuthForCreateOrModifyView(AuthorizationSession* authzSession, // view definition with an invalid specification like {$lookup: "blah"}, the authorization check // will succeed but the pipeline will fail to parse later in Command::run(). return authzSession->checkAuthForAggregate( - viewOnNs, BSON("aggregate" << viewOnNs.coll() << "pipeline" << viewPipeline)); + viewOnNs, BSON("aggregate" << viewOnNs.coll() << "pipeline" << viewPipeline), isMongos); } } // namespace @@ -254,7 +255,8 @@ void AuthorizationSession::_addPrivilegesForStage(const std::string& db, } Status AuthorizationSession::checkAuthForAggregate(const NamespaceString& ns, - const BSONObj& cmdObj) { + const BSONObj& cmdObj, + bool isMongos) { std::string db(ns.db().toString()); uassert( 17138, mongoutils::str::stream() << "Invalid input namespace, " << ns.ns(), ns.isValid()); @@ -287,37 +289,48 @@ Status AuthorizationSession::checkAuthForAggregate(const NamespaceString& ns, // We treat the first stage in the pipeline specially, as some aggregation stages that are // valid initial sources have different auth requirements. BSONObj firstPipelineStage = pipeline.firstElement().embeddedObject(); - if (str::equals("$indexStats", firstPipelineStage.firstElementFieldName())) { + BSONElement firstStageSpec = firstPipelineStage.firstElement(); + if (str::equals("$indexStats", firstStageSpec.fieldName())) { Privilege::addPrivilegeToPrivilegeVector( &privileges, Privilege(ResourcePattern::forExactNamespace(ns), ActionType::indexStats)); - } else if (str::equals("$collStats", firstPipelineStage.firstElementFieldName())) { + } else if (str::equals("$collStats", firstStageSpec.fieldName())) { Privilege::addPrivilegeToPrivilegeVector( &privileges, Privilege(ResourcePattern::forExactNamespace(ns), ActionType::collStats)); - } else if (str::equals("$currentOp", firstPipelineStage.firstElementFieldName())) { + } else if (str::equals("$currentOp", firstStageSpec.fieldName())) { // Need to check the value of allUsers; if true then inprog privilege is required. // {$currentOp: {idleConnections: <boolean|false>, allUsers: <boolean|false>}} - BSONElement spec = firstPipelineStage["$currentOp"]; - - if (spec.type() != BSONType::Object) { + if (firstStageSpec.type() != BSONType::Object) { return Status( ErrorCodes::TypeMismatch, str::stream() << "$currentOp options must be specified in an object, but found: " - << typeName(spec.type())); + << typeName(firstStageSpec.type())); } - auto allUsersElt = spec["allUsers"]; - - if (allUsersElt && allUsersElt.type() != BSONType::Bool) { - return Status(ErrorCodes::TypeMismatch, - str::stream() << "The 'allUsers' parameter of the $currentOp stage " - "must be a boolean value, but found: " - << typeName(allUsersElt.type())); + bool allUsers = false; + + // Check the spec for all fields named 'allUsers'. If any of them are 'true', we require + // the 'inprog' privilege. This avoids the possibility that a spec with multiple + // allUsers fields might allow an unauthorized user to view all operations. + for (auto&& elem : firstStageSpec.embeddedObject()) { + if (elem.fieldNameStringData() == "allUsers"_sd) { + if (elem.type() != BSONType::Bool) { + return Status(ErrorCodes::TypeMismatch, + str::stream() + << "The 'allUsers' parameter of the $currentOp stage " + "must be a boolean value, but found: " + << typeName(elem.type())); + } else if (elem.Bool()) { + allUsers = true; + break; + } + } } - if (allUsersElt && allUsersElt.boolean()) { + // In a sharded cluster, we always need the inprog privilege to run $currentOp. + if (isMongos || allUsers) { Privilege::addPrivilegeToPrivilegeVector( &privileges, Privilege(ResourcePattern::forClusterResource(), ActionType::inprog)); @@ -506,7 +519,9 @@ Status AuthorizationSession::checkAuthForKillCursors(const NamespaceString& ns, return Status::OK(); } -Status AuthorizationSession::checkAuthForCreate(const NamespaceString& ns, const BSONObj& cmdObj) { +Status AuthorizationSession::checkAuthForCreate(const NamespaceString& ns, + const BSONObj& cmdObj, + bool isMongos) { if (cmdObj["capped"].trueValue() && !isAuthorizedForActionsOnNamespace(ns, ActionType::convertToCapped)) { return Status(ErrorCodes::Unauthorized, "unauthorized"); @@ -528,7 +543,7 @@ Status AuthorizationSession::checkAuthForCreate(const NamespaceString& ns, const NamespaceString viewOnNs(ns.db(), cmdObj["viewOn"].checkAndGetStringData()); auto pipeline = cmdObj.hasField("pipeline") ? BSONArray(cmdObj["pipeline"].Obj()) : BSONArray(); - return checkAuthForCreateOrModifyView(this, ns, viewOnNs, pipeline); + return checkAuthForCreateOrModifyView(this, ns, viewOnNs, pipeline, isMongos); } // To create a regular collection, ActionType::createCollection or ActionType::insert are @@ -540,7 +555,9 @@ Status AuthorizationSession::checkAuthForCreate(const NamespaceString& ns, const return Status(ErrorCodes::Unauthorized, "unauthorized"); } -Status AuthorizationSession::checkAuthForCollMod(const NamespaceString& ns, const BSONObj& cmdObj) { +Status AuthorizationSession::checkAuthForCollMod(const NamespaceString& ns, + const BSONObj& cmdObj, + bool isMongos) { if (!isAuthorizedForActionsOnNamespace(ns, ActionType::collMod)) { return Status(ErrorCodes::Unauthorized, "unauthorized"); } @@ -559,7 +576,7 @@ Status AuthorizationSession::checkAuthForCollMod(const NamespaceString& ns, cons if (hasViewOn) { NamespaceString viewOnNs(ns.db(), cmdObj["viewOn"].checkAndGetStringData()); auto viewPipeline = BSONArray(cmdObj["pipeline"].Obj()); - return checkAuthForCreateOrModifyView(this, ns, viewOnNs, viewPipeline); + return checkAuthForCreateOrModifyView(this, ns, viewOnNs, viewPipeline, isMongos); } return Status::OK(); diff --git a/src/mongo/db/auth/authorization_session.h b/src/mongo/db/auth/authorization_session.h index f93aec27d6a..bd300f2d1f7 100644 --- a/src/mongo/db/auth/authorization_session.h +++ b/src/mongo/db/auth/authorization_session.h @@ -180,16 +180,16 @@ public: Status checkAuthForKillCursors(const NamespaceString& ns, long long cursorID); // Checks if this connection has the privileges necessary to run the aggregation pipeline - // specified in 'cmdObj' on the namespace 'ns'. - Status checkAuthForAggregate(const NamespaceString& ns, const BSONObj& cmdObj); + // specified in 'cmdObj' on the namespace 'ns' either directly on mongoD or via mongoS. + Status checkAuthForAggregate(const NamespaceString& ns, const BSONObj& cmdObj, bool isMongos); // Checks if this connection has the privileges necessary to create 'ns' with the options - // supplied in 'cmdObj'. - Status checkAuthForCreate(const NamespaceString& ns, const BSONObj& cmdObj); + // supplied in 'cmdObj' either directly on mongoD or via mongoS. + Status checkAuthForCreate(const NamespaceString& ns, const BSONObj& cmdObj, bool isMongos); // Checks if this connection has the privileges necessary to modify 'ns' with the options - // supplied in 'cmdObj'. - Status checkAuthForCollMod(const NamespaceString& ns, const BSONObj& cmdObj); + // supplied in 'cmdObj' either directly on mongoD or via mongoS. + Status checkAuthForCollMod(const NamespaceString& ns, const BSONObj& cmdObj, bool isMongos); // Checks if this connection has the privileges necessary to grant the given privilege // to a role. diff --git a/src/mongo/db/auth/authorization_session_test.cpp b/src/mongo/db/auth/authorization_session_test.cpp index 806bc3795e8..71b7f00ee51 100644 --- a/src/mongo/db/auth/authorization_session_test.cpp +++ b/src/mongo/db/auth/authorization_session_test.cpp @@ -40,6 +40,7 @@ #include "mongo/db/json.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context_noop.h" +#include "mongo/s/is_mongos.h" #include "mongo/stdx/memory.h" #include "mongo/unittest/unittest.h" #include "mongo/util/map_util.h" @@ -541,39 +542,40 @@ TEST_F(AuthorizationSessionTest, UseOldUserInfoInFaceOfConnectivityProblems) { TEST_F(AuthorizationSessionTest, CheckAuthForAggregateFailsIfPipelineIsNotAnArray) { BSONObj cmdObjIntPipeline = BSON("aggregate" << testFooNss.coll() << "pipeline" << 7); ASSERT_EQ(ErrorCodes::TypeMismatch, - authzSession->checkAuthForAggregate(testFooNss, cmdObjIntPipeline)); + authzSession->checkAuthForAggregate(testFooNss, cmdObjIntPipeline, false)); BSONObj cmdObjObjPipeline = BSON("aggregate" << testFooNss.coll() << "pipeline" << BSONObj()); ASSERT_EQ(ErrorCodes::TypeMismatch, - authzSession->checkAuthForAggregate(testFooNss, cmdObjObjPipeline)); + authzSession->checkAuthForAggregate(testFooNss, cmdObjObjPipeline, false)); BSONObj cmdObjNoPipeline = BSON("aggregate" << testFooNss.coll()); ASSERT_EQ(ErrorCodes::TypeMismatch, - authzSession->checkAuthForAggregate(testFooNss, cmdObjNoPipeline)); + authzSession->checkAuthForAggregate(testFooNss, cmdObjNoPipeline, false)); } TEST_F(AuthorizationSessionTest, CheckAuthForAggregateFailsIfPipelineFirstStageIsNotAnObject) { BSONObj cmdObjFirstStageInt = BSON("aggregate" << testFooNss.coll() << "pipeline" << BSON_ARRAY(7)); ASSERT_EQ(ErrorCodes::TypeMismatch, - authzSession->checkAuthForAggregate(testFooNss, cmdObjFirstStageInt)); + authzSession->checkAuthForAggregate(testFooNss, cmdObjFirstStageInt, false)); BSONObj cmdObjFirstStageArray = BSON("aggregate" << testFooNss.coll() << "pipeline" << BSON_ARRAY(BSONArray())); ASSERT_EQ(ErrorCodes::TypeMismatch, - authzSession->checkAuthForAggregate(testFooNss, cmdObjFirstStageArray)); + authzSession->checkAuthForAggregate(testFooNss, cmdObjFirstStageArray, false)); } TEST_F(AuthorizationSessionTest, CannotAggregateEmptyPipelineWithoutFindAction) { BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << BSONArray()); - ASSERT_EQ(ErrorCodes::Unauthorized, authzSession->checkAuthForAggregate(testFooNss, cmdObj)); + ASSERT_EQ(ErrorCodes::Unauthorized, + authzSession->checkAuthForAggregate(testFooNss, cmdObj, false)); } TEST_F(AuthorizationSessionTest, CanAggregateEmptyPipelineWithFindAction) { authzSession->assumePrivilegesForDB(Privilege(testFooCollResource, {ActionType::find})); BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << BSONArray()); - ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj)); + ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj, false)); } TEST_F(AuthorizationSessionTest, CannotAggregateWithoutFindActionIfFirstStageNotIndexOrCollStats) { @@ -583,7 +585,8 @@ TEST_F(AuthorizationSessionTest, CannotAggregateWithoutFindActionIfFirstStageNot BSONArray pipeline = BSON_ARRAY(BSON("$limit" << 1) << BSON("$collStats" << BSONObj()) << BSON("$indexStats" << BSONObj())); BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline); - ASSERT_EQ(ErrorCodes::Unauthorized, authzSession->checkAuthForAggregate(testFooNss, cmdObj)); + ASSERT_EQ(ErrorCodes::Unauthorized, + authzSession->checkAuthForAggregate(testFooNss, cmdObj, false)); } TEST_F(AuthorizationSessionTest, CanAggregateWithFindActionIfFirstStageNotIndexOrCollStats) { @@ -592,7 +595,7 @@ TEST_F(AuthorizationSessionTest, CanAggregateWithFindActionIfFirstStageNotIndexO BSONArray pipeline = BSON_ARRAY(BSON("$limit" << 1) << BSON("$collStats" << BSONObj()) << BSON("$indexStats" << BSONObj())); BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline); - ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj)); + ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj, false)); } TEST_F(AuthorizationSessionTest, CannotAggregateCollStatsWithoutCollStatsAction) { @@ -600,7 +603,8 @@ TEST_F(AuthorizationSessionTest, CannotAggregateCollStatsWithoutCollStatsAction) BSONArray pipeline = BSON_ARRAY(BSON("$collStats" << BSONObj())); BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline); - ASSERT_EQ(ErrorCodes::Unauthorized, authzSession->checkAuthForAggregate(testFooNss, cmdObj)); + ASSERT_EQ(ErrorCodes::Unauthorized, + authzSession->checkAuthForAggregate(testFooNss, cmdObj, false)); } TEST_F(AuthorizationSessionTest, CanAggregateCollStatsWithCollStatsAction) { @@ -608,7 +612,7 @@ TEST_F(AuthorizationSessionTest, CanAggregateCollStatsWithCollStatsAction) { BSONArray pipeline = BSON_ARRAY(BSON("$collStats" << BSONObj())); BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline); - ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj)); + ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj, false)); } TEST_F(AuthorizationSessionTest, CannotAggregateIndexStatsWithoutIndexStatsAction) { @@ -616,7 +620,8 @@ TEST_F(AuthorizationSessionTest, CannotAggregateIndexStatsWithoutIndexStatsActio BSONArray pipeline = BSON_ARRAY(BSON("$indexStats" << BSONObj())); BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline); - ASSERT_EQ(ErrorCodes::Unauthorized, authzSession->checkAuthForAggregate(testFooNss, cmdObj)); + ASSERT_EQ(ErrorCodes::Unauthorized, + authzSession->checkAuthForAggregate(testFooNss, cmdObj, false)); } TEST_F(AuthorizationSessionTest, CanAggregateIndexStatsWithIndexStatsAction) { @@ -624,38 +629,96 @@ TEST_F(AuthorizationSessionTest, CanAggregateIndexStatsWithIndexStatsAction) { BSONArray pipeline = BSON_ARRAY(BSON("$indexStats" << BSONObj())); BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline); - ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj)); + ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj, false)); } -TEST_F(AuthorizationSessionTest, CanAggregateCurrentOpAllUsersFalseWithoutInprogAction) { +TEST_F(AuthorizationSessionTest, CanAggregateCurrentOpAllUsersFalseWithoutInprogActionOnMongoD) { authzSession->assumePrivilegesForDB(Privilege(testFooCollResource, {ActionType::find})); BSONArray pipeline = BSON_ARRAY(BSON("$currentOp" << BSON("allUsers" << false))); BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline); - ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj)); + ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj, false)); } -TEST_F(AuthorizationSessionTest, CannotAggregateCurrentOpAllUsersFalseIfNotAuthenticated) { +TEST_F(AuthorizationSessionTest, CannotAggregateCurrentOpAllUsersFalseWithoutInprogActionOnMongoS) { + authzSession->assumePrivilegesForDB(Privilege(testFooCollResource, {ActionType::find})); + + BSONArray pipeline = BSON_ARRAY(BSON("$currentOp" << BSON("allUsers" << false))); + BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline); + ASSERT_EQ(ErrorCodes::Unauthorized, + authzSession->checkAuthForAggregate(testFooNss, cmdObj, true)); +} + +TEST_F(AuthorizationSessionTest, CannotAggregateCurrentOpAllUsersFalseIfNotAuthenticatedOnMongoD) { BSONArray pipeline = BSON_ARRAY(BSON("$currentOp" << BSON("allUsers" << false))); BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline); - ASSERT_EQ(ErrorCodes::Unauthorized, authzSession->checkAuthForAggregate(testFooNss, cmdObj)); + + ASSERT_EQ(ErrorCodes::Unauthorized, + authzSession->checkAuthForAggregate(testFooNss, cmdObj, false)); } -TEST_F(AuthorizationSessionTest, CannotAggregateCurrentOpAllUsersTrueWithoutInprogAction) { +TEST_F(AuthorizationSessionTest, CannotAggregateCurrentOpAllUsersFalseIfNotAuthenticatedOnMongoS) { + BSONArray pipeline = BSON_ARRAY(BSON("$currentOp" << BSON("allUsers" << false))); + BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline); + + ASSERT_EQ(ErrorCodes::Unauthorized, + authzSession->checkAuthForAggregate(testFooNss, cmdObj, true)); +} + +TEST_F(AuthorizationSessionTest, CannotAggregateCurrentOpAllUsersTrueWithoutInprogActionOnMongoD) { authzSession->assumePrivilegesForDB(Privilege(testFooCollResource, {ActionType::find})); BSONArray pipeline = BSON_ARRAY(BSON("$currentOp" << BSON("allUsers" << true))); BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline); - ASSERT_EQ(ErrorCodes::Unauthorized, authzSession->checkAuthForAggregate(testFooNss, cmdObj)); + ASSERT_EQ(ErrorCodes::Unauthorized, + authzSession->checkAuthForAggregate(testFooNss, cmdObj, false)); +} + +TEST_F(AuthorizationSessionTest, CannotAggregateCurrentOpAllUsersTrueWithoutInprogActionOnMongoS) { + authzSession->assumePrivilegesForDB(Privilege(testFooCollResource, {ActionType::find})); + + BSONArray pipeline = BSON_ARRAY(BSON("$currentOp" << BSON("allUsers" << true))); + BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline); + ASSERT_EQ(ErrorCodes::Unauthorized, + authzSession->checkAuthForAggregate(testFooNss, cmdObj, true)); +} + +TEST_F(AuthorizationSessionTest, CanAggregateCurrentOpAllUsersTrueWithInprogActionOnMongoD) { + authzSession->assumePrivilegesForDB( + Privilege(ResourcePattern::forClusterResource(), {ActionType::inprog})); + + BSONArray pipeline = BSON_ARRAY(BSON("$currentOp" << BSON("allUsers" << true))); + BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline); + ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj, false)); } -TEST_F(AuthorizationSessionTest, CanAggregateCurrentOpAllUsersTrueWithInprogAction) { +TEST_F(AuthorizationSessionTest, CanAggregateCurrentOpAllUsersTrueWithInprogActionOnMongoS) { authzSession->assumePrivilegesForDB( Privilege(ResourcePattern::forClusterResource(), {ActionType::inprog})); BSONArray pipeline = BSON_ARRAY(BSON("$currentOp" << BSON("allUsers" << true))); BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline); - ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj)); + ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj, true)); +} + +TEST_F(AuthorizationSessionTest, CannotSpoofAllUsersTrueWithoutInprogActionOnMongoD) { + authzSession->assumePrivilegesForDB(Privilege(testFooCollResource, {ActionType::find})); + + BSONArray pipeline = + BSON_ARRAY(BSON("$currentOp" << BSON("allUsers" << false << "allUsers" << true))); + BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline); + ASSERT_EQ(ErrorCodes::Unauthorized, + authzSession->checkAuthForAggregate(testFooNss, cmdObj, false)); +} + +TEST_F(AuthorizationSessionTest, CannotSpoofAllUsersTrueWithoutInprogActionOnMongoS) { + authzSession->assumePrivilegesForDB(Privilege(testFooCollResource, {ActionType::find})); + + BSONArray pipeline = + BSON_ARRAY(BSON("$currentOp" << BSON("allUsers" << false << "allUsers" << true))); + BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline); + ASSERT_EQ(ErrorCodes::Unauthorized, + authzSession->checkAuthForAggregate(testFooNss, cmdObj, true)); } TEST_F(AuthorizationSessionTest, AddPrivilegesForStageFailsIfOutNamespaceIsNotValid) { @@ -663,7 +726,7 @@ TEST_F(AuthorizationSessionTest, AddPrivilegesForStageFailsIfOutNamespaceIsNotVa << "")); BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline); ASSERT_THROWS_CODE( - authzSession->checkAuthForAggregate(testFooNss, cmdObj), UserException, 17139); + authzSession->checkAuthForAggregate(testFooNss, cmdObj, false), UserException, 17139); } TEST_F(AuthorizationSessionTest, CannotAggregateOutWithoutInsertAndRemoveOnTargetNamespace) { @@ -672,17 +735,20 @@ TEST_F(AuthorizationSessionTest, CannotAggregateOutWithoutInsertAndRemoveOnTarge BSONArray pipeline = BSON_ARRAY(BSON("$out" << testBarNss.coll())); BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline); - ASSERT_EQ(ErrorCodes::Unauthorized, authzSession->checkAuthForAggregate(testFooNss, cmdObj)); + ASSERT_EQ(ErrorCodes::Unauthorized, + authzSession->checkAuthForAggregate(testFooNss, cmdObj, false)); // We have insert but not remove on the $out namespace. authzSession->assumePrivilegesForDB({Privilege(testFooCollResource, {ActionType::find}), Privilege(testBarCollResource, {ActionType::insert})}); - ASSERT_EQ(ErrorCodes::Unauthorized, authzSession->checkAuthForAggregate(testFooNss, cmdObj)); + ASSERT_EQ(ErrorCodes::Unauthorized, + authzSession->checkAuthForAggregate(testFooNss, cmdObj, false)); // We have remove but not insert on the $out namespace. authzSession->assumePrivilegesForDB({Privilege(testFooCollResource, {ActionType::find}), Privilege(testBarCollResource, {ActionType::remove})}); - ASSERT_EQ(ErrorCodes::Unauthorized, authzSession->checkAuthForAggregate(testFooNss, cmdObj)); + ASSERT_EQ(ErrorCodes::Unauthorized, + authzSession->checkAuthForAggregate(testFooNss, cmdObj, false)); } TEST_F(AuthorizationSessionTest, CanAggregateOutWithInsertAndRemoveOnTargetNamespace) { @@ -692,12 +758,13 @@ TEST_F(AuthorizationSessionTest, CanAggregateOutWithInsertAndRemoveOnTargetNames BSONArray pipeline = BSON_ARRAY(BSON("$out" << testBarNss.coll())); BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline); - ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj)); + ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj, false)); BSONObj cmdObjNoBypassDocumentValidation = BSON( "aggregate" << testFooNss.coll() << "pipeline" << pipeline << "bypassDocumentValidation" << false); - ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObjNoBypassDocumentValidation)); + ASSERT_OK( + authzSession->checkAuthForAggregate(testFooNss, cmdObjNoBypassDocumentValidation, false)); } TEST_F(AuthorizationSessionTest, @@ -710,7 +777,8 @@ TEST_F(AuthorizationSessionTest, BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "bypassDocumentValidation" << true); - ASSERT_EQ(ErrorCodes::Unauthorized, authzSession->checkAuthForAggregate(testFooNss, cmdObj)); + ASSERT_EQ(ErrorCodes::Unauthorized, + authzSession->checkAuthForAggregate(testFooNss, cmdObj, false)); } TEST_F(AuthorizationSessionTest, @@ -725,7 +793,7 @@ TEST_F(AuthorizationSessionTest, BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline << "bypassDocumentValidation" << true); - ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj)); + ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj, false)); } TEST_F(AuthorizationSessionTest, CannotAggregateLookupWithoutFindOnJoinedNamespace) { @@ -733,7 +801,8 @@ TEST_F(AuthorizationSessionTest, CannotAggregateLookupWithoutFindOnJoinedNamespa BSONArray pipeline = BSON_ARRAY(BSON("$lookup" << BSON("from" << testBarNss.coll()))); BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline); - ASSERT_EQ(ErrorCodes::Unauthorized, authzSession->checkAuthForAggregate(testFooNss, cmdObj)); + ASSERT_EQ(ErrorCodes::Unauthorized, + authzSession->checkAuthForAggregate(testFooNss, cmdObj, false)); } TEST_F(AuthorizationSessionTest, CanAggregateLookupWithFindOnJoinedNamespace) { @@ -742,7 +811,7 @@ TEST_F(AuthorizationSessionTest, CanAggregateLookupWithFindOnJoinedNamespace) { BSONArray pipeline = BSON_ARRAY(BSON("$lookup" << BSON("from" << testBarNss.coll()))); BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline); - ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj)); + ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj, false)); } TEST_F(AuthorizationSessionTest, CannotAggregateGraphLookupWithoutFindOnJoinedNamespace) { @@ -750,7 +819,8 @@ TEST_F(AuthorizationSessionTest, CannotAggregateGraphLookupWithoutFindOnJoinedNa BSONArray pipeline = BSON_ARRAY(BSON("$graphLookup" << BSON("from" << testBarNss.coll()))); BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline); - ASSERT_EQ(ErrorCodes::Unauthorized, authzSession->checkAuthForAggregate(testFooNss, cmdObj)); + ASSERT_EQ(ErrorCodes::Unauthorized, + authzSession->checkAuthForAggregate(testFooNss, cmdObj, false)); } TEST_F(AuthorizationSessionTest, CanAggregateGraphLookupWithFindOnJoinedNamespace) { @@ -759,7 +829,7 @@ TEST_F(AuthorizationSessionTest, CanAggregateGraphLookupWithFindOnJoinedNamespac BSONArray pipeline = BSON_ARRAY(BSON("$graphLookup" << BSON("from" << testBarNss.coll()))); BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline); - ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj)); + ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj, false)); } TEST_F(AuthorizationSessionTest, @@ -771,17 +841,20 @@ TEST_F(AuthorizationSessionTest, BSON_ARRAY(fromjson("{$facet: {lookup: [{$lookup: {from: 'bar'}}], graphLookup: " "[{$graphLookup: {from: 'qux'}}]}}")); BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline); - ASSERT_EQ(ErrorCodes::Unauthorized, authzSession->checkAuthForAggregate(testFooNss, cmdObj)); + ASSERT_EQ(ErrorCodes::Unauthorized, + authzSession->checkAuthForAggregate(testFooNss, cmdObj, false)); // We have find on the $lookup namespace but not on the $graphLookup namespace. authzSession->assumePrivilegesForDB({Privilege(testFooCollResource, {ActionType::find}), Privilege(testBarCollResource, {ActionType::find})}); - ASSERT_EQ(ErrorCodes::Unauthorized, authzSession->checkAuthForAggregate(testFooNss, cmdObj)); + ASSERT_EQ(ErrorCodes::Unauthorized, + authzSession->checkAuthForAggregate(testFooNss, cmdObj, false)); // We have find on the $graphLookup namespace but not on the $lookup namespace. authzSession->assumePrivilegesForDB({Privilege(testFooCollResource, {ActionType::find}), Privilege(testQuxCollResource, {ActionType::find})}); - ASSERT_EQ(ErrorCodes::Unauthorized, authzSession->checkAuthForAggregate(testFooNss, cmdObj)); + ASSERT_EQ(ErrorCodes::Unauthorized, + authzSession->checkAuthForAggregate(testFooNss, cmdObj, false)); } TEST_F(AuthorizationSessionTest, @@ -794,7 +867,7 @@ TEST_F(AuthorizationSessionTest, BSON_ARRAY(fromjson("{$facet: {lookup: [{$lookup: {from: 'bar'}}], graphLookup: " "[{$graphLookup: {from: 'qux'}}]}}")); BSONObj cmdObj = BSON("aggregate" << testFooNss.coll() << "pipeline" << pipeline); - ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj)); + ASSERT_OK(authzSession->checkAuthForAggregate(testFooNss, cmdObj, false)); } TEST_F(AuthorizationSessionTest, UnauthorizedSessionIsCoauthorizedWithEmptyUserSet) { diff --git a/src/mongo/db/commands/dbcommands.cpp b/src/mongo/db/commands/dbcommands.cpp index bb59ebe0ba3..a3b83b60383 100644 --- a/src/mongo/db/commands/dbcommands.cpp +++ b/src/mongo/db/commands/dbcommands.cpp @@ -528,7 +528,7 @@ public: const std::string& dbname, const BSONObj& cmdObj) { const NamespaceString nss(parseNs(dbname, cmdObj)); - return AuthorizationSession::get(client)->checkAuthForCreate(nss, cmdObj); + return AuthorizationSession::get(client)->checkAuthForCreate(nss, cmdObj, false); } virtual bool run(OperationContext* opCtx, @@ -1026,7 +1026,7 @@ public: const std::string& dbname, const BSONObj& cmdObj) { const NamespaceString nss(parseNs(dbname, cmdObj)); - return AuthorizationSession::get(client)->checkAuthForCollMod(nss, cmdObj); + return AuthorizationSession::get(client)->checkAuthForCollMod(nss, cmdObj, false); } bool run(OperationContext* opCtx, diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp index 3a7f1a452e1..031f2560284 100644 --- a/src/mongo/db/commands/pipeline_command.cpp +++ b/src/mongo/db/commands/pipeline_command.cpp @@ -77,7 +77,7 @@ public: const std::string& dbname, const BSONObj& cmdObj) override { const NamespaceString nss(AggregationRequest::parseNs(dbname, cmdObj)); - return AuthorizationSession::get(client)->checkAuthForAggregate(nss, cmdObj); + return AuthorizationSession::get(client)->checkAuthForAggregate(nss, cmdObj, false); } bool run(OperationContext* opCtx, diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp index d9bd51dcd53..72a860f1517 100644 --- a/src/mongo/s/commands/cluster_aggregate.cpp +++ b/src/mongo/s/commands/cluster_aggregate.cpp @@ -99,11 +99,19 @@ establishCursorsRetryOnStaleVersion(OperationContext* opCtx, auto routingInfo = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); - // Use the routing table to decide which shards to target, and build versioned requests for - // them. std::vector<std::pair<ShardId, BSONObj>> requests; - if (routingInfo.cm()) { - // The collection is sharded. Target based on the query and collation. + + if (nss.isCollectionlessAggregateNS()) { + // The pipeline begins with a stage which produces its own input and does not use an + // underlying collection. It should be run unversioned on all shards. + std::vector<ShardId> shardIds; + Grid::get(opCtx)->shardRegistry()->getAllShardIds(&shardIds); + for (auto& shardId : shardIds) { + requests.emplace_back(std::move(shardId), cmdObj); + } + } else if (routingInfo.cm()) { + // The collection is sharded. Use the routing table to decide which shards to target + // based on the query and collation, and build versioned requests for them. std::set<ShardId> shardIds; routingInfo.cm()->getShardIdsForQuery(opCtx, query, collation, &shardIds); for (auto& shardId : shardIds) { @@ -186,7 +194,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, resolvedNamespaces.try_emplace(nss.coll(), nss, std::vector<BSONObj>{}); } - if (!executionNsRoutingInfo.cm()) { + if (!executionNsRoutingInfo.cm() && !namespaces.executionNss.isCollectionlessAggregateNS()) { return aggPassthrough( opCtx, namespaces, executionNsRoutingInfo.primary()->getId(), request, cmdObj, result); } @@ -196,7 +204,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, if (!request.getCollation().isEmpty()) { collation = uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext()) ->makeFromBSON(request.getCollation())); - } else if (chunkMgr->getDefaultCollator()) { + } else if (chunkMgr && chunkMgr->getDefaultCollator()) { collation = chunkMgr->getDefaultCollator()->clone(); } @@ -215,7 +223,9 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, // If the first $match stage is an exact match on the shard key (with a simple collation or no // string matching), we only have to send it to one shard, so send the command to that shard. - const bool singleShard = [&]() { + const bool singleShard = !namespaces.executionNss.isCollectionlessAggregateNS() && [&]() { + invariant(chunkMgr); + BSONObj firstMatchQuery = pipeline.getValue()->getInitialQuery(); BSONObj shardKeyMatches = uassertStatusOK( chunkMgr->getShardKeyPattern().extractShardKeyFromQuery(opCtx, firstMatchQuery)); @@ -271,12 +281,16 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, if (mergeCtx->explain) { auto shardResults = - uassertStatusOK(scatterGatherForNamespace(opCtx, - namespaces.executionNss, - targetedCommand, - ReadPreferenceSetting::get(opCtx), - shardQuery, - request.getCollation())); + uassertStatusOK(scatterGather(opCtx, + namespaces.executionNss.db().toString(), + namespaces.executionNss, + targetedCommand, + ReadPreferenceSetting::get(opCtx), + namespaces.executionNss.isCollectionlessAggregateNS() + ? ShardTargetingPolicy::BroadcastToAllShards + : ShardTargetingPolicy::UseRoutingTable, + shardQuery, + request.getCollation())); // This must be checked before we start modifying result. uassertAllShardsSupportExplain(shardResults); diff --git a/src/mongo/s/commands/cluster_commands_helpers.cpp b/src/mongo/s/commands/cluster_commands_helpers.cpp index 5ccb9110708..b7311ac4dbe 100644 --- a/src/mongo/s/commands/cluster_commands_helpers.cpp +++ b/src/mongo/s/commands/cluster_commands_helpers.cpp @@ -223,52 +223,68 @@ BSONObj appendShardVersion(const BSONObj& cmdObj, ChunkVersion version) { StatusWith<std::vector<AsyncRequestsSender::Response>> scatterGather( OperationContext* opCtx, const std::string& dbName, - const BSONObj& cmdObj, - const ReadPreferenceSetting& readPref) { - auto requests = buildRequestsForAllShards(opCtx, cmdObj); - return gatherResponses(opCtx, dbName, cmdObj, readPref, requests, nullptr /* viewDefinition */); -} - -StatusWith<std::vector<AsyncRequestsSender::Response>> scatterGatherForNamespace( - OperationContext* opCtx, - const NamespaceString& nss, + const boost::optional<NamespaceString> nss, const BSONObj& cmdObj, const ReadPreferenceSetting& readPref, + const ShardTargetingPolicy targetPolicy, const boost::optional<BSONObj> query, const boost::optional<BSONObj> collation, const bool appendShardVersion, BSONObj* viewDefinition) { - int numAttempts = 0; - StatusWith<std::vector<AsyncRequestsSender::Response>> swResponses( - (std::vector<AsyncRequestsSender::Response>())); - do { - // Get the routing table cache. - auto swRoutingInfo = Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss); - if (!swRoutingInfo.isOK()) { - return swRoutingInfo.getStatus(); + + // If a NamespaceString is specified, it must match the dbName. + invariant(!nss || (nss.get().db() == dbName)); + + switch (targetPolicy) { + case ShardTargetingPolicy::BroadcastToAllShards: { + // Send unversioned commands to all shards. + auto requests = buildRequestsForAllShards(opCtx, cmdObj); + return gatherResponses(opCtx, dbName, cmdObj, readPref, requests, viewDefinition); } - auto routingInfo = swRoutingInfo.getValue(); - - // Use the routing table cache to decide which shards to target, and build the requests to - // send to them. - auto requests = buildRequestsForShardsForNamespace( - opCtx, routingInfo, cmdObj, query, collation, appendShardVersion); - - // Retrieve the responses from the shards. - swResponses = - gatherResponses(opCtx, nss.db().toString(), cmdObj, readPref, requests, viewDefinition); - ++numAttempts; - - // If any shard returned a stale shardVersion error, invalidate the routing table cache. - // This will cause the cache to be refreshed the next time it is accessed. - if (ErrorCodes::isStaleShardingError(swResponses.getStatus().code())) { - Grid::get(opCtx)->catalogCache()->onStaleConfigError(std::move(routingInfo)); - LOG(1) << "got stale shardVersion error " << swResponses.getStatus() - << " while dispatching " << redact(cmdObj) << " after " << numAttempts - << " dispatch attempts"; + + case ShardTargetingPolicy::UseRoutingTable: { + // We must have a valid NamespaceString. + invariant(nss && nss.get().isValid()); + + int numAttempts = 0; + StatusWith<std::vector<AsyncRequestsSender::Response>> swResponses( + (std::vector<AsyncRequestsSender::Response>())); + + do { + // Get the routing table cache. + auto swRoutingInfo = + Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, *nss); + if (!swRoutingInfo.isOK()) { + return swRoutingInfo.getStatus(); + } + auto routingInfo = swRoutingInfo.getValue(); + + // Use the routing table cache to decide which shards to target, and build the + // requests to send to them. + auto requests = buildRequestsForShardsForNamespace( + opCtx, routingInfo, cmdObj, query, collation, appendShardVersion); + + // Retrieve the responses from the shards. + swResponses = + gatherResponses(opCtx, dbName, cmdObj, readPref, requests, viewDefinition); + ++numAttempts; + + // If any shard returned a stale shardVersion error, invalidate the routing table + // cache. This will cause the cache to be refreshed the next time it is accessed. + if (ErrorCodes::isStaleShardingError(swResponses.getStatus().code())) { + Grid::get(opCtx)->catalogCache()->onStaleConfigError(std::move(routingInfo)); + LOG(1) << "got stale shardVersion error " << swResponses.getStatus() + << " while dispatching " << redact(cmdObj) << " after " << numAttempts + << " dispatch attempts"; + } + } while (numAttempts < kMaxNumStaleVersionRetries && !swResponses.getStatus().isOK()); + + return swResponses; } - } while (numAttempts < kMaxNumStaleVersionRetries && !swResponses.getStatus().isOK()); - return swResponses; + + default: + MONGO_UNREACHABLE; + } } bool appendRawResponses(OperationContext* opCtx, diff --git a/src/mongo/s/commands/cluster_commands_helpers.h b/src/mongo/s/commands/cluster_commands_helpers.h index c9865a5da33..184be2d6063 100644 --- a/src/mongo/s/commands/cluster_commands_helpers.h +++ b/src/mongo/s/commands/cluster_commands_helpers.h @@ -47,6 +47,12 @@ class CachedDatabaseInfo; class OperationContext; class ShardId; +/* + * Allows callers of routing functions to specify a preferred targeting policy. See scatterGather + * for a usage example. + */ +enum class ShardTargetingPolicy { UseRoutingTable, BroadcastToAllShards }; + /** * This function appends the provided writeConcernError BSONElement to the sharded response. */ @@ -59,44 +65,39 @@ void appendWriteConcernErrorToCmdResponse(const ShardId& shardID, BSONObj appendShardVersion(const BSONObj& cmdObj, ChunkVersion version); /** - * Broadcasts 'cmdObj' to all shards and returns the responses as a vector. + * Generic function for dispatching commands to the cluster. * - * Returns a non-OK status if a failure occurs on *this* node during execution. - * Otherwise, returns success and a list of responses from shards (including errors from the shards - * or errors reaching the shards). - */ -StatusWith<std::vector<AsyncRequestsSender::Response>> scatterGather( - OperationContext* opCtx, - const std::string& dbName, - const BSONObj& cmdObj, - const ReadPreferenceSetting& readPref); - -/** - * Uses the routing table cache to broadcast a command on a namespace. By default, attaches - * shardVersions to the outgoing requests to shards, and retargets and retries if it receives a - * stale shardVersion error from any shard. + * If 'targetPolicy' is ShardTargetingPolicy::BroadcastToAllShards, the command will be sent + * unversioned to all shards and run on database 'dbName'. The 'query', 'collation' and + * 'appendShardVersion' arguments, if supplied, will be ignored. * - * If 'query' is specified, only shards that own data for the namespace are targeted. Otherwise, - * all shards are targeted. + * If 'targetPolicy' is ShardTargetingPolicy::UseRoutingTable, the routing table cache will be used + * to determine which shards the command should be dispatched to. If the namespace specified by + * 'nss' is an unsharded collection, the command will be sent to the Primary shard for the database. + * If 'query' is specified, only shards that own data needed by the query are targeted; otherwise, + * all shards are targeted. By default, shardVersions are attached to the outgoing requests, and the + * function will re-target and retry if it receives a stale shardVersion error from any shard. * * Returns a non-OK status if a failure occurs on *this* node during execution or on seeing an error * from a shard that means the operation as a whole should fail, such as a exceeding retries for * stale shardVersion errors. + * + * If a shard returns an error saying that the request was on a view, the shard will also return a + * view definition. This will be stored in the BSONObj* viewDefinition argument, if non-null, so + * that the caller can re-run the operation as an aggregation. + * * Otherwise, returns success and a list of responses from shards (including errors from the shards * or errors reaching the shards). - * - * @appendShardVersion: if false, does not attach shardVersions to the outgoing requests. - * @viewDefinition: if a shard returns an error saying that the request was on a view, the shard - * will also return a view definition. The returned viewDefinition is stored in - * this parameter, so that the caller can re-run the operation as an aggregation. */ -StatusWith<std::vector<AsyncRequestsSender::Response>> scatterGatherForNamespace( +StatusWith<std::vector<AsyncRequestsSender::Response>> scatterGather( OperationContext* opCtx, - const NamespaceString& nss, + const std::string& dbName, + const boost::optional<NamespaceString> nss, const BSONObj& cmdObj, const ReadPreferenceSetting& readPref, - const boost::optional<BSONObj> query, - const boost::optional<BSONObj> collation, + const ShardTargetingPolicy targetPolicy = ShardTargetingPolicy::UseRoutingTable, + const boost::optional<BSONObj> query = boost::none, + const boost::optional<BSONObj> collation = boost::none, const bool appendShardVersion = true, BSONObj* viewDefinition = nullptr); diff --git a/src/mongo/s/commands/cluster_count_cmd.cpp b/src/mongo/s/commands/cluster_count_cmd.cpp index f1d822bf700..e21dc7bad41 100644 --- a/src/mongo/s/commands/cluster_count_cmd.cpp +++ b/src/mongo/s/commands/cluster_count_cmd.cpp @@ -141,14 +141,16 @@ public: auto countCmdObj = countCmdBuilder.done(); BSONObj viewDefinition; - auto swShardResponses = scatterGatherForNamespace(opCtx, - nss, - countCmdObj, - ReadPreferenceSetting::get(opCtx), - filter, - collation, - true, // do shard versioning - &viewDefinition); + auto swShardResponses = scatterGather(opCtx, + dbname, + nss, + countCmdObj, + ReadPreferenceSetting::get(opCtx), + ShardTargetingPolicy::UseRoutingTable, + filter, + collation, + true, // do shard versioning + &viewDefinition); if (ErrorCodes::CommandOnShardedViewNotSupportedOnMongod == swShardResponses.getStatus()) { if (viewDefinition.isEmpty()) { @@ -269,14 +271,16 @@ public: Timer timer; BSONObj viewDefinition; - auto swShardResponses = scatterGatherForNamespace(opCtx, - nss, - explainCmd, - ReadPreferenceSetting::get(opCtx), - targetingQuery, - targetingCollation, - true, // do shard versioning - &viewDefinition); + auto swShardResponses = scatterGather(opCtx, + dbname, + nss, + explainCmd, + ReadPreferenceSetting::get(opCtx), + ShardTargetingPolicy::UseRoutingTable, + targetingQuery, + targetingCollation, + true, // do shard versioning + &viewDefinition); long long millisElapsed = timer.millis(); diff --git a/src/mongo/s/commands/cluster_current_op.cpp b/src/mongo/s/commands/cluster_current_op.cpp index c6eda43af9a..2c71acb2c27 100644 --- a/src/mongo/s/commands/cluster_current_op.cpp +++ b/src/mongo/s/commands/cluster_current_op.cpp @@ -88,8 +88,10 @@ public: auto shardResponses = uassertStatusOK(scatterGather(opCtx, dbName, + boost::none, filterCommandRequestForPassthrough(cmdObj), - ReadPreferenceSetting::get(opCtx))); + ReadPreferenceSetting::get(opCtx), + ShardTargetingPolicy::BroadcastToAllShards)); if (!appendRawResponses(opCtx, &errmsg, &output, shardResponses)) { return false; } diff --git a/src/mongo/s/commands/cluster_db_stats_cmd.cpp b/src/mongo/s/commands/cluster_db_stats_cmd.cpp index 67955f1f0cb..bdf52f52a84 100644 --- a/src/mongo/s/commands/cluster_db_stats_cmd.cpp +++ b/src/mongo/s/commands/cluster_db_stats_cmd.cpp @@ -71,8 +71,10 @@ public: auto shardResponses = uassertStatusOK(scatterGather(opCtx, dbName, + boost::none, filterCommandRequestForPassthrough(cmdObj), - ReadPreferenceSetting::get(opCtx))); + ReadPreferenceSetting::get(opCtx), + ShardTargetingPolicy::BroadcastToAllShards)); if (!appendRawResponses(opCtx, &errmsg, &output, shardResponses)) { return false; } diff --git a/src/mongo/s/commands/cluster_pipeline_cmd.cpp b/src/mongo/s/commands/cluster_pipeline_cmd.cpp index e07042da85f..05fa2a8d493 100644 --- a/src/mongo/s/commands/cluster_pipeline_cmd.cpp +++ b/src/mongo/s/commands/cluster_pipeline_cmd.cpp @@ -62,8 +62,8 @@ public: Status checkAuthForCommand(Client* client, const std::string& dbname, const BSONObj& cmdObj) override { - const NamespaceString nss(parseNsCollectionRequired(dbname, cmdObj)); - return AuthorizationSession::get(client)->checkAuthForAggregate(nss, cmdObj); + const NamespaceString nss(AggregationRequest::parseNs(dbname, cmdObj)); + return AuthorizationSession::get(client)->checkAuthForAggregate(nss, cmdObj, true); } bool run(OperationContext* opCtx, @@ -89,16 +89,13 @@ private: const BSONObj& cmdObj, boost::optional<ExplainOptions::Verbosity> verbosity, BSONObjBuilder* result) { - NamespaceString nss(parseNsCollectionRequired(dbname, cmdObj)); - const auto aggregationRequest = - uassertStatusOK(AggregationRequest::parseFromBSON(nss, cmdObj, verbosity)); + uassertStatusOK(AggregationRequest::parseFromBSON(dbname, cmdObj, verbosity)); + + const auto& nss = aggregationRequest.getNamespaceString(); - return ClusterAggregate::runAggregate(opCtx, - ClusterAggregate::Namespaces{nss, std::move(nss)}, - aggregationRequest, - cmdObj, - result); + return ClusterAggregate::runAggregate( + opCtx, ClusterAggregate::Namespaces{nss, nss}, aggregationRequest, cmdObj, result); } } clusterPipelineCmd; diff --git a/src/mongo/s/commands/commands_public.cpp b/src/mongo/s/commands/commands_public.cpp index cf1accb4ccc..a3cadb1ddb7 100644 --- a/src/mongo/s/commands/commands_public.cpp +++ b/src/mongo/s/commands/commands_public.cpp @@ -224,13 +224,15 @@ protected: } auto shardResponses = - uassertStatusOK(scatterGatherForNamespace(opCtx, - nss, - filterCommandRequestForPassthrough(cmdObj), - ReadPreferenceSetting::get(opCtx), - boost::none, // filter - boost::none, // collation - _appendShardVersion)); + uassertStatusOK(scatterGather(opCtx, + dbName, + nss, + filterCommandRequestForPassthrough(cmdObj), + ReadPreferenceSetting::get(opCtx), + ShardTargetingPolicy::UseRoutingTable, + boost::none, // filter + boost::none, // collation + _appendShardVersion)); return appendRawResponses(opCtx, &errmsg, &output, std::move(shardResponses)); } @@ -327,7 +329,7 @@ public: const std::string& dbname, const BSONObj& cmdObj) { const NamespaceString nss(parseNsCollectionRequired(dbname, cmdObj)); - return AuthorizationSession::get(client)->checkAuthForCollMod(nss, cmdObj); + return AuthorizationSession::get(client)->checkAuthForCollMod(nss, cmdObj, true); } virtual bool supportsWriteConcern(const BSONObj& cmd) const override { @@ -421,7 +423,7 @@ public: const std::string& dbname, const BSONObj& cmdObj) override { const NamespaceString nss(parseNsCollectionRequired(dbname, cmdObj)); - return AuthorizationSession::get(client)->checkAuthForCreate(nss, cmdObj); + return AuthorizationSession::get(client)->checkAuthForCreate(nss, cmdObj, true); } bool supportsWriteConcern(const BSONObj& cmd) const override { @@ -1151,14 +1153,16 @@ public: Timer timer; BSONObj viewDefinition; - auto swShardResponses = scatterGatherForNamespace(opCtx, - nss, - explainCmd, - ReadPreferenceSetting::get(opCtx), - targetingQuery, - targetingCollation, - true, // do shard versioning - &viewDefinition); + auto swShardResponses = scatterGather(opCtx, + dbname, + nss, + explainCmd, + ReadPreferenceSetting::get(opCtx), + ShardTargetingPolicy::UseRoutingTable, + targetingQuery, + targetingCollation, + true, // do shard versioning + &viewDefinition); long long millisElapsed = timer.millis(); diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp index 3a875e578a4..c6e69ee1bdd 100644 --- a/src/mongo/s/commands/strategy.cpp +++ b/src/mongo/s/commands/strategy.cpp @@ -698,14 +698,16 @@ Status Strategy::explainFind(OperationContext* opCtx, Timer timer; BSONObj viewDefinition; - auto swShardResponses = scatterGatherForNamespace(opCtx, - qr.nss(), - explainCmd, - readPref, - qr.getFilter(), - qr.getCollation(), - true, // do shard versioning - &viewDefinition); + auto swShardResponses = scatterGather(opCtx, + qr.nss().db().toString(), + qr.nss(), + explainCmd, + readPref, + ShardTargetingPolicy::UseRoutingTable, + qr.getFilter(), + qr.getCollation(), + true, // do shard versioning + &viewDefinition); long long millisElapsed = timer.millis(); |