diff options
author | Jack Mulrow <jack.mulrow@mongodb.com> | 2022-03-31 17:08:44 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-03-31 18:54:46 +0000 |
commit | 75cbc1477e90fc4ac145a718a5b646724e483104 (patch) | |
tree | c1fa877ad1af94b693d6a93ef4faa30943f36436 | |
parent | 176f398bd3cf651f349660bb82285eaf46252650 (diff) | |
download | mongo-75cbc1477e90fc4ac145a718a5b646724e483104.tar.gz |
SERVER-63495 Link cluster aggregate and getMore into mongod
22 files changed, 433 insertions, 33 deletions
diff --git a/jstests/auth/lib/commands_lib.js b/jstests/auth/lib/commands_lib.js index 743366749a5..d42234eef7f 100644 --- a/jstests/auth/lib/commands_lib.js +++ b/jstests/auth/lib/commands_lib.js @@ -1035,7 +1035,19 @@ var authCommandsLib = { }, ] }, - + { + testname: "clusterAggregate", + command: {clusterAggregate: "foo", pipeline: [], cursor: {}}, + skipSharded: true, + testcases: [ + { + runOnDb: firstDbName, + roles: {__system: 1}, + privileges: [{resource: {cluster: true}, actions: ["internal"]}], + expectFail: true, + }, + ] + }, { testname: "aggregate_readonly", command: {aggregate: "foo", pipeline: [], cursor: {}}, @@ -4251,6 +4263,19 @@ var authCommandsLib = { ] }, { + testname: "clusterGetMore", + command: {clusterGetMore: NumberLong(1), collection: "foo"}, + skipSharded: true, + testcases: [ + { + runOnDb: firstDbName, + roles: {__system: 1}, + privileges: [{resource: {cluster: true}, actions: ["internal"]}], + expectFail: true, + }, + ] + }, + { testname: "getMoreWithTerm", command: {getMore: NumberLong("1"), collection: "foo", term: NumberLong(1)}, testcases: [{ diff --git a/jstests/core/views/views_all_commands.js b/jstests/core/views/views_all_commands.js index aa28998a1f9..0fe746c8047 100644 --- a/jstests/core/views/views_all_commands.js +++ b/jstests/core/views/views_all_commands.js @@ -232,9 +232,11 @@ let viewsCommandTests = { expectFailure: true, }, clusterAbortTransaction: {skip: "already tested by 'abortTransaction' tests on mongos"}, + clusterAggregate: {skip: "already tested by 'aggregate' tests on mongos"}, clusterCommitTransaction: {skip: "already tested by 'commitTransaction' tests on mongos"}, clusterDelete: {skip: "already tested by 'delete' tests on mongos"}, clusterFind: {skip: "already tested by 'find' tests on mongos"}, + clusterGetMore: {skip: "already tested by 'getMore' tests on mongos"}, clusterInsert: {skip: "already tested by 'insert' tests on mongos"}, clusterUpdate: {skip: "already tested by 'update' tests on mongos"}, collMod: {command: {collMod: "view", viewOn: "other", pipeline: []}}, diff --git a/jstests/noPassthrough/cluster_commands_require_cluster_node.js b/jstests/noPassthrough/cluster_commands_require_cluster_node.js index fe146211efe..05a4e80685f 100644 --- a/jstests/noPassthrough/cluster_commands_require_cluster_node.js +++ b/jstests/noPassthrough/cluster_commands_require_cluster_node.js @@ -16,9 +16,14 @@ const kCollName = "bar"; // error was different than when a command is rejected for not having sharding enabled. const clusterCommandsCases = [ {cmd: {clusterAbortTransaction: 1}, expectedErr: ErrorCodes.InvalidOptions}, + {cmd: {clusterAggregate: kCollName, pipeline: [{$match: {}}], cursor: {}}}, {cmd: {clusterCommitTransaction: 1}, expectedErr: ErrorCodes.InvalidOptions}, {cmd: {clusterDelete: kCollName, deletes: [{q: {}, limit: 1}]}}, {cmd: {clusterFind: kCollName}}, + { + cmd: {clusterGetMore: NumberLong(1), collection: kCollName}, + expectedErr: ErrorCodes.CursorNotFound + }, {cmd: {clusterInsert: kCollName, documents: [{x: 1}]}}, {cmd: {clusterUpdate: kCollName, updates: [{q: {doesNotExist: 1}, u: {x: 1}}]}}, ]; diff --git a/jstests/replsets/db_reads_while_recovering_all_commands.js b/jstests/replsets/db_reads_while_recovering_all_commands.js index 5c09d2361b6..6a71fb16347 100644 --- a/jstests/replsets/db_reads_while_recovering_all_commands.js +++ b/jstests/replsets/db_reads_while_recovering_all_commands.js @@ -121,9 +121,11 @@ const allCommands = { clearLog: {skip: isNotAUserDataRead}, cloneCollectionAsCapped: {skip: isPrimaryOnly}, clusterAbortTransaction: {skip: "already tested by 'abortTransaction' tests on mongos"}, + clusterAggregate: {skip: "already tested by 'aggregate' tests on mongos"}, clusterCommitTransaction: {skip: "already tested by 'commitTransaction' tests on mongos"}, clusterDelete: {skip: "already tested by 'delete' tests on mongos"}, clusterFind: {skip: "already tested by 'find' tests on mongos"}, + clusterGetMore: {skip: "already tested by 'getMore' tests on mongos"}, clusterInsert: {skip: "already tested by 'insert' tests on mongos"}, clusterUpdate: {skip: "already tested by 'update' tests on mongos"}, collMod: {skip: isPrimaryOnly}, diff --git a/jstests/sharding/libs/last_lts_mongod_commands.js b/jstests/sharding/libs/last_lts_mongod_commands.js index 224c14644ab..f289ba2720d 100644 --- a/jstests/sharding/libs/last_lts_mongod_commands.js +++ b/jstests/sharding/libs/last_lts_mongod_commands.js @@ -13,9 +13,11 @@ const commandsRemovedFromMongodSinceLastLTS = [ // test defined without always existing on the mongod being used. const commandsAddedToMongodSinceLastLTS = [ "clusterAbortTransaction", + "clusterAggregate", "clusterCommitTransaction", "clusterDelete", "clusterFind", + "clusterGetMore", "clusterInsert", "clusterUpdate", "rotateCertificates", diff --git a/jstests/sharding/read_write_concern_defaults_application.js b/jstests/sharding/read_write_concern_defaults_application.js index a59f7bbb1bc..7642ce34e25 100644 --- a/jstests/sharding/read_write_concern_defaults_application.js +++ b/jstests/sharding/read_write_concern_defaults_application.js @@ -244,9 +244,11 @@ let testCases = { checkWriteConcern: true, }, clusterAbortTransaction: {skip: "already tested by 'abortTransaction' tests on mongos"}, + clusterAggregate: {skip: "already tested by 'aggregate' tests on mongos"}, clusterCommitTransaction: {skip: "already tested by 'commitTransaction' tests on mongos"}, clusterDelete: {skip: "already tested by 'delete' tests on mongos"}, clusterFind: {skip: "already tested by 'find' tests on mongos"}, + clusterGetMore: {skip: "already tested by 'getMore' tests on mongos"}, clusterInsert: {skip: "already tested by 'insert' tests on mongos"}, clusterUpdate: {skip: "already tested by 'update' tests on mongos"}, collMod: { diff --git a/jstests/sharding/safe_secondary_reads_drop_recreate.js b/jstests/sharding/safe_secondary_reads_drop_recreate.js index 3ee63252205..4835739324b 100644 --- a/jstests/sharding/safe_secondary_reads_drop_recreate.js +++ b/jstests/sharding/safe_secondary_reads_drop_recreate.js @@ -124,9 +124,11 @@ let testCases = { clone: {skip: "primary only"}, cloneCollectionAsCapped: {skip: "primary only"}, clusterAbortTransaction: {skip: "already tested by 'abortTransaction' tests on mongos"}, + clusterAggregate: {skip: "already tested by 'aggregate' tests on mongos"}, clusterCommitTransaction: {skip: "already tested by 'commitTransaction' tests on mongos"}, clusterDelete: {skip: "already tested by 'delete' tests on mongos"}, clusterFind: {skip: "already tested by 'find' tests on mongos"}, + clusterGetMore: {skip: "already tested by 'getMore' tests on mongos"}, clusterInsert: {skip: "already tested by 'insert' tests on mongos"}, clusterUpdate: {skip: "already tested by 'update' tests on mongos"}, collMod: {skip: "primary only"}, diff --git a/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js b/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js index fdaeacfed34..ee81222de94 100644 --- a/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js +++ b/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js @@ -140,9 +140,11 @@ let testCases = { clone: {skip: "primary only"}, cloneCollectionAsCapped: {skip: "primary only"}, clusterAbortTransaction: {skip: "already tested by 'abortTransaction' tests on mongos"}, + clusterAggregate: {skip: "already tested by 'aggregate' tests on mongos"}, clusterCommitTransaction: {skip: "already tested by 'commitTransaction' tests on mongos"}, clusterDelete: {skip: "already tested by 'delete' tests on mongos"}, clusterFind: {skip: "already tested by 'find' tests on mongos"}, + clusterGetMore: {skip: "already tested by 'getMore' tests on mongos"}, clusterInsert: {skip: "already tested by 'insert' tests on mongos"}, clusterUpdate: {skip: "already tested by 'update' tests on mongos"}, commitReshardCollection: {skip: "primary only"}, diff --git a/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js b/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js index c212b21c39e..23eac4fe2c2 100644 --- a/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js +++ b/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js @@ -127,9 +127,11 @@ let testCases = { clone: {skip: "primary only"}, cloneCollectionAsCapped: {skip: "primary only"}, clusterAbortTransaction: {skip: "already tested by 'abortTransaction' tests on mongos"}, + clusterAggregate: {skip: "already tested by 'aggregate' tests on mongos"}, clusterCommitTransaction: {skip: "already tested by 'commitTransaction' tests on mongos"}, clusterDelete: {skip: "already tested by 'delete' tests on mongos"}, clusterFind: {skip: "already tested by 'find' tests on mongos"}, + clusterGetMore: {skip: "already tested by 'getMore' tests on mongos"}, clusterInsert: {skip: "already tested by 'insert' tests on mongos"}, clusterUpdate: {skip: "already tested by 'update' tests on mongos"}, collMod: {skip: "primary only"}, diff --git a/jstests/sharding/transaction_api_distributed_from_shard.js b/jstests/sharding/transaction_api_distributed_from_shard.js index e44fca1850e..d4b42290ade 100644 --- a/jstests/sharding/transaction_api_distributed_from_shard.js +++ b/jstests/sharding/transaction_api_distributed_from_shard.js @@ -26,6 +26,7 @@ function runTestSuccess() { }, {dbName: kDbName, command: {delete: kCollName, deletes: [{q: {_id: 3}, limit: 1}]}}, {dbName: kDbName, command: {find: kCollName, singleBatch: true}}, + {dbName: kDbName, command: {aggregate: kCollName, pipeline: [{$match: {}}], cursor: {}}}, ]; // Insert initial data. @@ -44,6 +45,10 @@ function runTestSuccess() { assert.eq(res.responses[3], {n: 1, ok: 1}, tojson(res)); assert.sameMembers( res.responses[4].cursor.firstBatch, [{_id: 1}, {_id: 2, updated: true}], tojson(res)); + assert.eq(res.responses[4].cursor.id, 0, tojson(res)); + assert.sameMembers( + res.responses[5].cursor.firstBatch, [{_id: 1}, {_id: 2, updated: true}], tojson(res)); + assert.eq(res.responses[5].cursor.id, 0, tojson(res)); // The written documents should be visible outside the transaction. assert.sameMembers(st.s.getCollection(kNs).find().toArray(), @@ -84,12 +89,59 @@ function runTestFailure() { assert.commandWorked(st.s.getCollection(kNs).remove({}, false /* justOne */)); } +function runTestGetMore() { + // Insert initial data. + const startVal = -50; + const numDocs = 100; + + let bulk = st.s.getCollection(kNs).initializeUnorderedBulkOp(); + for (let i = startVal; i < startVal + numDocs; i++) { + bulk.insert({_id: i}); + } + assert.commandWorked(bulk.execute()); + + const commands = [ + // Use a batch size < number of documents so the API must use getMores to exhaust the + // cursor. + {dbName: kDbName, command: {find: kCollName, batchSize: 17}, exhaustCursor: true}, + ]; + + const commandMetricsBefore = shard0Primary.getDB(kDbName).serverStatus().metrics.commands; + + const res = assert.commandWorked( + shard0Primary.adminCommand({testInternalTransactions: 1, commandInfos: commands})); + assert.eq(res.responses.length, 1, tojson(res)); + + // The response from an exhausted cursor is an array of BSON objects, so we don't assert the + // command worked. + assert.eq(res.responses[0].docs.length, numDocs, tojson(res)); + for (let i = 0; i < numDocs; ++i) { + assert.eq(res.responses[0].docs[i]._id, startVal + i, tojson(res.responses[0].docs[i])); + } + + // Verify getMores were used by checking serverStatus metrics. + const commandMetricsAfter = shard0Primary.getDB(kDbName).serverStatus().metrics.commands; + + assert.gt(commandMetricsAfter.clusterFind.total, commandMetricsBefore.clusterFind.total); + if (!commandMetricsBefore.clusterGetMore) { + // The unsharded case runs before any cluster getMores are run. + assert.gt(commandMetricsAfter.clusterGetMore.total, 0); + } else { + assert.gt(commandMetricsAfter.clusterGetMore.total, + commandMetricsBefore.clusterGetMore.total); + } + + // Clean up. + assert.commandWorked(st.s.getCollection(kNs).remove({}, false /* justOne */)); +} + // // Unsharded collection case. // runTestSuccess(); runTestFailure(); +runTestGetMore(); // // Sharded collection case. @@ -105,6 +157,7 @@ assert.commandWorked(st.s.adminCommand({moveChunk: kNs, find: {x: 0}, to: st.sha runTestSuccess(); runTestFailure(); +runTestGetMore(); st.stop(); })(); diff --git a/src/mongo/db/cluster_transaction_api.cpp b/src/mongo/db/cluster_transaction_api.cpp index 770d5bad816..befd2e09313 100644 --- a/src/mongo/db/cluster_transaction_api.cpp +++ b/src/mongo/db/cluster_transaction_api.cpp @@ -43,11 +43,13 @@ namespace { StringMap<std::string> clusterCommandTranslations = { {"abortTransaction", "clusterAbortTransaction"}, + {"aggregate", "clusterAggregate"}, {"commitTransaction", "clusterCommitTransaction"}, {"delete", "clusterDelete"}, + {"find", "clusterFind"}, + {"getMore", "clusterGetMore"}, {"insert", "clusterInsert"}, - {"update", "clusterUpdate"}, - {"find", "clusterFind"}}; + {"update", "clusterUpdate"}}; BSONObj replaceCommandNameWithClusterCommandName(BSONObj cmdObj) { auto cmdName = cmdObj.firstElement().fieldNameStringData(); diff --git a/src/mongo/db/commands.cpp b/src/mongo/db/commands.cpp index d1df81deb92..579576989e5 100644 --- a/src/mongo/db/commands.cpp +++ b/src/mongo/db/commands.cpp @@ -121,9 +121,11 @@ bool checkAuthorizationImplPreParse(OperationContext* opCtx, const StringMap<int> txnCmdAllowlist = {{"abortTransaction", 1}, {"aggregate", 1}, {"clusterAbortTransaction", 1}, + {"clusterAggregate", 1}, {"clusterCommitTransaction", 1}, {"clusterDelete", 1}, {"clusterFind", 1}, + {"clusterGetMore", 1}, {"clusterInsert", 1}, {"clusterUpdate", 1}, {"commitTransaction", 1}, diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index bc2977f8f1c..38cae8e2be5 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -314,6 +314,8 @@ env.Library( 'cluster_abort_transaction_cmd_d.cpp', 'cluster_commit_transaction_cmd_d.cpp', 'cluster_find_cmd_d.cpp', + 'cluster_getmore_cmd_d.cpp', + 'cluster_pipeline_cmd_d.cpp', 'cluster_write_cmd_d.cpp', 'collmod_coordinator.cpp', 'collmod_coordinator_document.idl', diff --git a/src/mongo/db/s/cluster_getmore_cmd_d.cpp b/src/mongo/db/s/cluster_getmore_cmd_d.cpp new file mode 100644 index 00000000000..634ede87443 --- /dev/null +++ b/src/mongo/db/s/cluster_getmore_cmd_d.cpp @@ -0,0 +1,69 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/s/sharding_state.h" +#include "mongo/s/commands/cluster_getmore_cmd.h" +#include "mongo/s/grid.h" + +namespace mongo { +namespace { + +/** + * Implements the cluster getMore command on mongod. + */ +struct ClusterGetMoreCmdD { + static constexpr StringData kName = "clusterGetMore"_sd; + + static const std::set<std::string>& getApiVersions() { + return kNoApiVersions; + } + + static void doCheckAuthorization(OperationContext* opCtx, + const NamespaceString& nss, + long long cursorID, + bool hasTerm) { + uassert(ErrorCodes::Unauthorized, + "Unauthorized", + AuthorizationSession::get(opCtx->getClient()) + ->isAuthorizedForActionsOnResource(ResourcePattern::forClusterResource(), + ActionType::internal)); + } + + static void checkCanRunHere(OperationContext* opCtx) { + Grid::get(opCtx)->assertShardingIsInitialized(); + + // A cluster command on the config server may attempt to use a ShardLocal to target itself, + // which triggers an invariant, so only shard servers can run this. + uassertStatusOK(ShardingState::get(opCtx)->canAcceptShardedCommands()); + } +}; +ClusterGetMoreCmdBase<ClusterGetMoreCmdD> clusterGetMoreCmdD; + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/s/cluster_pipeline_cmd_d.cpp b/src/mongo/db/s/cluster_pipeline_cmd_d.cpp new file mode 100644 index 00000000000..cd78fa52e18 --- /dev/null +++ b/src/mongo/db/s/cluster_pipeline_cmd_d.cpp @@ -0,0 +1,80 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/pipeline/aggregate_command_gen.h" +#include "mongo/db/s/sharding_state.h" +#include "mongo/s/commands/cluster_pipeline_cmd.h" +#include "mongo/s/grid.h" + +namespace mongo { +namespace { + +/** + * Implements the cluster aggregate command on mongod. + */ +struct ClusterPipelineCommandD { + static constexpr StringData kName = "clusterAggregate"_sd; + + static const std::set<std::string>& getApiVersions() { + return kNoApiVersions; + } + + static void doCheckAuthorization(OperationContext* opCtx, const PrivilegeVector& privileges) { + uassert(ErrorCodes::Unauthorized, + "Unauthorized", + AuthorizationSession::get(opCtx->getClient()) + ->isAuthorizedForActionsOnResource(ResourcePattern::forClusterResource(), + ActionType::internal)); + } + + static void checkCanRunHere(OperationContext* opCtx) { + Grid::get(opCtx)->assertShardingIsInitialized(); + + // A cluster command on the config server may attempt to use a ShardLocal to target itself, + // which triggers an invariant, so only shard servers can run this. + uassertStatusOK(ShardingState::get(opCtx)->canAcceptShardedCommands()); + } + + static AggregateCommandRequest parseAggregationRequest( + const OpMsgRequest& opMsgRequest, + boost::optional<ExplainOptions::Verbosity> explainVerbosity, + bool apiStrict) { + // Replace clusterAggregate in the request body because the parser doesn't recognize it. + auto modifiedRequestBody = + opMsgRequest.body.replaceFieldNames(BSON(AggregateCommandRequest::kCommandName << 1)); + return aggregation_request_helper::parseFromBSON(opMsgRequest.getDatabase().toString(), + modifiedRequestBody, + explainVerbosity, + apiStrict); + } +}; +ClusterPipelineCommandBase<ClusterPipelineCommandD> clusterPipelineCmdD; + +} // namespace +} // namespace mongo diff --git a/src/mongo/s/commands/SConscript b/src/mongo/s/commands/SConscript index df97cd30619..b9f96eb6a29 100644 --- a/src/mongo/s/commands/SConscript +++ b/src/mongo/s/commands/SConscript @@ -61,7 +61,7 @@ env.Library( 'cluster_ftdc_commands.cpp', 'cluster_get_last_error_cmd.cpp', 'cluster_get_shard_version_cmd.cpp', - 'cluster_getmore_cmd.cpp', + 'cluster_getmore_cmd_s.cpp', 'cluster_index_filter_cmd.cpp', 'cluster_is_db_grid_cmd.cpp', 'cluster_hello_cmd.cpp', @@ -75,7 +75,7 @@ env.Library( 'cluster_map_reduce_cmd.cpp', 'cluster_multicast_cmd.cpp', 'cluster_netstat_cmd.cpp', - 'cluster_pipeline_cmd.cpp', + 'cluster_pipeline_cmd_s.cpp', 'cluster_plan_cache_clear_cmd.cpp', 'cluster_profile_cmd.cpp', 'cluster_refine_collection_shard_key_cmd.cpp', diff --git a/src/mongo/s/commands/cluster_getmore_cmd.cpp b/src/mongo/s/commands/cluster_getmore_cmd.h index 7b7c9a8898b..45441cbdeb7 100644 --- a/src/mongo/s/commands/cluster_getmore_cmd.cpp +++ b/src/mongo/s/commands/cluster_getmore_cmd.h @@ -1,5 +1,5 @@ /** - * Copyright (C) 2018-present MongoDB, Inc. + * Copyright (C) 2022-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, @@ -27,7 +27,7 @@ * it in the license file. */ -#include "mongo/platform/basic.h" +#pragma once #include "mongo/db/api_parameters.h" #include "mongo/db/auth/authorization_checks.h" @@ -56,12 +56,13 @@ static const ReadConcernSupportResult kSupportsReadConcernResult{ * corresponding to the cursor id passed from the application. In order to generate these results, * may issue getMore commands to remote nodes in one or more shards. */ -class ClusterGetMoreCmd final : public Command { +template <typename Impl> +class ClusterGetMoreCmdBase final : public Command { public: - ClusterGetMoreCmd() : Command("getMore") {} + ClusterGetMoreCmdBase() : Command(Impl::kName) {} const std::set<std::string>& apiVersions() const { - return kApiVersions1; + return Impl::getApiVersions(); } std::unique_ptr<CommandInvocation> parse(OperationContext* opCtx, @@ -73,7 +74,7 @@ public: public: Invocation(Command* cmd, const OpMsgRequest& request) : CommandInvocation(cmd), - _cmd(GetMoreCommandRequest::parse({"getMore"}, request.body)) {} + _cmd(GetMoreCommandRequest::parse({Impl::kName}, request.body)) {} private: NamespaceString ns() const override { @@ -90,15 +91,16 @@ public: } void doCheckAuthorization(OperationContext* opCtx) const override { - uassertStatusOK(auth::checkAuthForGetMore(AuthorizationSession::get(opCtx->getClient()), - ns(), - _cmd.getCommandParameter(), - _cmd.getTerm().is_initialized())); + Impl::doCheckAuthorization( + opCtx, ns(), _cmd.getCommandParameter(), _cmd.getTerm().is_initialized()); } void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* reply) override { // Counted as a getMore, not as a command. globalOpCounters.gotGetMore(); + + Impl::checkCanRunHere(opCtx); + auto bob = reply->getBodyBuilder(); auto response = uassertStatusOK(ClusterFind::runGetMore(opCtx, _cmd)); response.addToBSON(CursorResponse::ResponseType::SubsequentResponse, &bob); @@ -141,7 +143,7 @@ public: LogicalOp getLogicalOp() const override { return LogicalOp::opGetMore; } -} cmdGetMoreCluster; +}; } // namespace } // namespace mongo diff --git a/src/mongo/s/commands/cluster_getmore_cmd_s.cpp b/src/mongo/s/commands/cluster_getmore_cmd_s.cpp new file mode 100644 index 00000000000..63a04722d93 --- /dev/null +++ b/src/mongo/s/commands/cluster_getmore_cmd_s.cpp @@ -0,0 +1,60 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/s/commands/cluster_getmore_cmd.h" + +namespace mongo { +namespace { + +/** + * Implements the cluster getMore command on mongos. + */ +struct ClusterGetMoreCmdS { + static constexpr StringData kName = "getMore"_sd; + + static const std::set<std::string>& getApiVersions() { + return kApiVersions1; + } + + static void doCheckAuthorization(OperationContext* opCtx, + const NamespaceString& nss, + long long cursorID, + bool hasTerm) { + uassertStatusOK(auth::checkAuthForGetMore( + AuthorizationSession::get(opCtx->getClient()), nss, cursorID, hasTerm)); + } + + static void checkCanRunHere(OperationContext* opCtx) { + // Can always run on a mongos. + } +}; +ClusterGetMoreCmdBase<ClusterGetMoreCmdS> clusterGetMoreCmdS; + +} // namespace +} // namespace mongo diff --git a/src/mongo/s/commands/cluster_pipeline_cmd.cpp b/src/mongo/s/commands/cluster_pipeline_cmd.h index aec0641e905..788a974803d 100644 --- a/src/mongo/s/commands/cluster_pipeline_cmd.cpp +++ b/src/mongo/s/commands/cluster_pipeline_cmd.h @@ -1,5 +1,5 @@ /** - * Copyright (C) 2018-present MongoDB, Inc. + * Copyright (C) 2022-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, @@ -27,9 +27,7 @@ * it in the license file. */ -#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kCommand - -#include "mongo/platform/basic.h" +#pragma once #include "mongo/base/status.h" #include "mongo/db/auth/authorization_checks.h" @@ -43,12 +41,13 @@ namespace mongo { namespace { -class ClusterPipelineCommand final : public Command { +template <typename Impl> +class ClusterPipelineCommandBase final : public Command { public: - ClusterPipelineCommand() : Command("aggregate") {} + ClusterPipelineCommandBase() : Command(Impl::kName) {} const std::set<std::string>& apiVersions() const { - return kApiVersions1; + return Impl::getApiVersions(); } /** @@ -74,11 +73,10 @@ public: OperationContext* opCtx, const OpMsgRequest& opMsgRequest, boost::optional<ExplainOptions::Verbosity> explainVerbosity) override { - const auto aggregationRequest = aggregation_request_helper::parseFromBSON( - opMsgRequest.getDatabase().toString(), - opMsgRequest.body, - explainVerbosity, - APIParameters::get(opCtx).getAPIStrict().value_or(false)); + const auto aggregationRequest = + Impl::parseAggregationRequest(opMsgRequest, + explainVerbosity, + APIParameters::get(opCtx).getAPIStrict().value_or(false)); auto privileges = uassertStatusOK( auth::getPrivilegesForAggregate(AuthorizationSession::get(opCtx->getClient()), @@ -147,6 +145,8 @@ public: CommandHelpers::handleMarkKillOnClientDisconnect( opCtx, !Pipeline::aggHasWriteStage(_request.body)); + Impl::checkCanRunHere(opCtx); + auto bob = reply->getBodyBuilder(); _runAggCommand(opCtx, _dbName, _request.body, &bob); } @@ -159,10 +159,7 @@ public: } void doCheckAuthorization(OperationContext* opCtx) const override { - uassert(ErrorCodes::Unauthorized, - "unauthorized", - AuthorizationSession::get(opCtx->getClient()) - ->isAuthorizedForPrivileges(_privileges)); + Impl::doCheckAuthorization(opCtx, _privileges); } NamespaceString ns() const override { @@ -192,7 +189,7 @@ public: const AuthorizationContract* getAuthorizationContract() const final { return &::mongo::AggregateCommandRequest::kAuthorizationContract; } -} clusterPipelineCmd; +}; } // namespace } // namespace mongo diff --git a/src/mongo/s/commands/cluster_pipeline_cmd_s.cpp b/src/mongo/s/commands/cluster_pipeline_cmd_s.cpp new file mode 100644 index 00000000000..3009d9265e6 --- /dev/null +++ b/src/mongo/s/commands/cluster_pipeline_cmd_s.cpp @@ -0,0 +1,67 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/s/commands/cluster_pipeline_cmd.h" + +namespace mongo { +namespace { + +/** + * Implements the cluster aggregate command on mongos. + */ +struct ClusterPipelineCommandS { + static constexpr StringData kName = "aggregate"_sd; + + static const std::set<std::string>& getApiVersions() { + return kApiVersions1; + } + + static void doCheckAuthorization(OperationContext* opCtx, const PrivilegeVector& privileges) { + uassert( + ErrorCodes::Unauthorized, + "unauthorized", + AuthorizationSession::get(opCtx->getClient())->isAuthorizedForPrivileges(privileges)); + } + + static void checkCanRunHere(OperationContext* opCtx) { + // Can always run on a mongos. + } + + static AggregateCommandRequest parseAggregationRequest( + const OpMsgRequest& opMsgRequest, + boost::optional<ExplainOptions::Verbosity> explainVerbosity, + bool apiStrict) { + return aggregation_request_helper::parseFromBSON( + opMsgRequest.getDatabase().toString(), opMsgRequest.body, explainVerbosity, apiStrict); + } +}; +ClusterPipelineCommandBase<ClusterPipelineCommandS> clusterPipelineCmdS; + +} // namespace +} // namespace mongo diff --git a/src/mongo/s/commands/internal_transactions_test_commands.cpp b/src/mongo/s/commands/internal_transactions_test_commands.cpp index 9e827f8d35c..1843dd9c28e 100644 --- a/src/mongo/s/commands/internal_transactions_test_commands.cpp +++ b/src/mongo/s/commands/internal_transactions_test_commands.cpp @@ -32,6 +32,7 @@ #include "mongo/db/auth/authorization_session.h" #include "mongo/db/cluster_transaction_api.h" #include "mongo/db/commands.h" +#include "mongo/db/query/find_command_gen.h" #include "mongo/db/transaction_api.h" #include "mongo/logv2/log.h" #include "mongo/s/commands/internal_transactions_test_commands_gen.h" @@ -81,6 +82,24 @@ public: const auto& dbName = commandInfo.getDbName(); const auto& command = commandInfo.getCommand(); auto assertSucceeds = commandInfo.getAssertSucceeds(); + auto exhaustCursor = commandInfo.getExhaustCursor(); + + if (exhaustCursor == boost::optional<bool>(true)) { + // We can't call a getMore without knowing its cursor's id, so we + // use the exhaustiveFind helper to test getMores. Make an OpMsgRequest + // from the command to append $db, which FindCommandRequest expects. + auto findOpMsgRequest = OpMsgRequest::fromDBAndBody(dbName, command); + auto findCommand = FindCommandRequest::parse( + IDLParserErrorContext("FindCommandRequest", false /* apiStrict */), + findOpMsgRequest.body); + + auto docs = txnClient.exhaustiveFind(findCommand).get(); + + BSONObjBuilder resBob; + resBob.append("docs", std::move(docs)); + sharedBlock->responses.emplace_back(resBob.obj()); + continue; + } auto res = txnClient.runCommand(dbName, command).get(); sharedBlock->responses.emplace_back( diff --git a/src/mongo/s/commands/internal_transactions_test_commands.idl b/src/mongo/s/commands/internal_transactions_test_commands.idl index e34d4adbbb9..1e68776ba94 100644 --- a/src/mongo/s/commands/internal_transactions_test_commands.idl +++ b/src/mongo/s/commands/internal_transactions_test_commands.idl @@ -51,6 +51,9 @@ structs: assertSucceeds: type: bool default: true + exhaustCursor: + type: bool + optional: true commands: testInternalTransactions: |