summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJack Mulrow <jack.mulrow@mongodb.com>2022-03-31 17:08:44 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-03-31 18:54:46 +0000
commit75cbc1477e90fc4ac145a718a5b646724e483104 (patch)
treec1fa877ad1af94b693d6a93ef4faa30943f36436
parent176f398bd3cf651f349660bb82285eaf46252650 (diff)
downloadmongo-75cbc1477e90fc4ac145a718a5b646724e483104.tar.gz
SERVER-63495 Link cluster aggregate and getMore into mongod
-rw-r--r--jstests/auth/lib/commands_lib.js27
-rw-r--r--jstests/core/views/views_all_commands.js2
-rw-r--r--jstests/noPassthrough/cluster_commands_require_cluster_node.js5
-rw-r--r--jstests/replsets/db_reads_while_recovering_all_commands.js2
-rw-r--r--jstests/sharding/libs/last_lts_mongod_commands.js2
-rw-r--r--jstests/sharding/read_write_concern_defaults_application.js2
-rw-r--r--jstests/sharding/safe_secondary_reads_drop_recreate.js2
-rw-r--r--jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js2
-rw-r--r--jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js2
-rw-r--r--jstests/sharding/transaction_api_distributed_from_shard.js53
-rw-r--r--src/mongo/db/cluster_transaction_api.cpp6
-rw-r--r--src/mongo/db/commands.cpp2
-rw-r--r--src/mongo/db/s/SConscript2
-rw-r--r--src/mongo/db/s/cluster_getmore_cmd_d.cpp69
-rw-r--r--src/mongo/db/s/cluster_pipeline_cmd_d.cpp80
-rw-r--r--src/mongo/s/commands/SConscript4
-rw-r--r--src/mongo/s/commands/cluster_getmore_cmd.h (renamed from src/mongo/s/commands/cluster_getmore_cmd.cpp)24
-rw-r--r--src/mongo/s/commands/cluster_getmore_cmd_s.cpp60
-rw-r--r--src/mongo/s/commands/cluster_pipeline_cmd.h (renamed from src/mongo/s/commands/cluster_pipeline_cmd.cpp)31
-rw-r--r--src/mongo/s/commands/cluster_pipeline_cmd_s.cpp67
-rw-r--r--src/mongo/s/commands/internal_transactions_test_commands.cpp19
-rw-r--r--src/mongo/s/commands/internal_transactions_test_commands.idl3
22 files changed, 433 insertions, 33 deletions
diff --git a/jstests/auth/lib/commands_lib.js b/jstests/auth/lib/commands_lib.js
index 743366749a5..d42234eef7f 100644
--- a/jstests/auth/lib/commands_lib.js
+++ b/jstests/auth/lib/commands_lib.js
@@ -1035,7 +1035,19 @@ var authCommandsLib = {
},
]
},
-
+ {
+ testname: "clusterAggregate",
+ command: {clusterAggregate: "foo", pipeline: [], cursor: {}},
+ skipSharded: true,
+ testcases: [
+ {
+ runOnDb: firstDbName,
+ roles: {__system: 1},
+ privileges: [{resource: {cluster: true}, actions: ["internal"]}],
+ expectFail: true,
+ },
+ ]
+ },
{
testname: "aggregate_readonly",
command: {aggregate: "foo", pipeline: [], cursor: {}},
@@ -4251,6 +4263,19 @@ var authCommandsLib = {
]
},
{
+ testname: "clusterGetMore",
+ command: {clusterGetMore: NumberLong(1), collection: "foo"},
+ skipSharded: true,
+ testcases: [
+ {
+ runOnDb: firstDbName,
+ roles: {__system: 1},
+ privileges: [{resource: {cluster: true}, actions: ["internal"]}],
+ expectFail: true,
+ },
+ ]
+ },
+ {
testname: "getMoreWithTerm",
command: {getMore: NumberLong("1"), collection: "foo", term: NumberLong(1)},
testcases: [{
diff --git a/jstests/core/views/views_all_commands.js b/jstests/core/views/views_all_commands.js
index aa28998a1f9..0fe746c8047 100644
--- a/jstests/core/views/views_all_commands.js
+++ b/jstests/core/views/views_all_commands.js
@@ -232,9 +232,11 @@ let viewsCommandTests = {
expectFailure: true,
},
clusterAbortTransaction: {skip: "already tested by 'abortTransaction' tests on mongos"},
+ clusterAggregate: {skip: "already tested by 'aggregate' tests on mongos"},
clusterCommitTransaction: {skip: "already tested by 'commitTransaction' tests on mongos"},
clusterDelete: {skip: "already tested by 'delete' tests on mongos"},
clusterFind: {skip: "already tested by 'find' tests on mongos"},
+ clusterGetMore: {skip: "already tested by 'getMore' tests on mongos"},
clusterInsert: {skip: "already tested by 'insert' tests on mongos"},
clusterUpdate: {skip: "already tested by 'update' tests on mongos"},
collMod: {command: {collMod: "view", viewOn: "other", pipeline: []}},
diff --git a/jstests/noPassthrough/cluster_commands_require_cluster_node.js b/jstests/noPassthrough/cluster_commands_require_cluster_node.js
index fe146211efe..05a4e80685f 100644
--- a/jstests/noPassthrough/cluster_commands_require_cluster_node.js
+++ b/jstests/noPassthrough/cluster_commands_require_cluster_node.js
@@ -16,9 +16,14 @@ const kCollName = "bar";
// error was different than when a command is rejected for not having sharding enabled.
const clusterCommandsCases = [
{cmd: {clusterAbortTransaction: 1}, expectedErr: ErrorCodes.InvalidOptions},
+ {cmd: {clusterAggregate: kCollName, pipeline: [{$match: {}}], cursor: {}}},
{cmd: {clusterCommitTransaction: 1}, expectedErr: ErrorCodes.InvalidOptions},
{cmd: {clusterDelete: kCollName, deletes: [{q: {}, limit: 1}]}},
{cmd: {clusterFind: kCollName}},
+ {
+ cmd: {clusterGetMore: NumberLong(1), collection: kCollName},
+ expectedErr: ErrorCodes.CursorNotFound
+ },
{cmd: {clusterInsert: kCollName, documents: [{x: 1}]}},
{cmd: {clusterUpdate: kCollName, updates: [{q: {doesNotExist: 1}, u: {x: 1}}]}},
];
diff --git a/jstests/replsets/db_reads_while_recovering_all_commands.js b/jstests/replsets/db_reads_while_recovering_all_commands.js
index 5c09d2361b6..6a71fb16347 100644
--- a/jstests/replsets/db_reads_while_recovering_all_commands.js
+++ b/jstests/replsets/db_reads_while_recovering_all_commands.js
@@ -121,9 +121,11 @@ const allCommands = {
clearLog: {skip: isNotAUserDataRead},
cloneCollectionAsCapped: {skip: isPrimaryOnly},
clusterAbortTransaction: {skip: "already tested by 'abortTransaction' tests on mongos"},
+ clusterAggregate: {skip: "already tested by 'aggregate' tests on mongos"},
clusterCommitTransaction: {skip: "already tested by 'commitTransaction' tests on mongos"},
clusterDelete: {skip: "already tested by 'delete' tests on mongos"},
clusterFind: {skip: "already tested by 'find' tests on mongos"},
+ clusterGetMore: {skip: "already tested by 'getMore' tests on mongos"},
clusterInsert: {skip: "already tested by 'insert' tests on mongos"},
clusterUpdate: {skip: "already tested by 'update' tests on mongos"},
collMod: {skip: isPrimaryOnly},
diff --git a/jstests/sharding/libs/last_lts_mongod_commands.js b/jstests/sharding/libs/last_lts_mongod_commands.js
index 224c14644ab..f289ba2720d 100644
--- a/jstests/sharding/libs/last_lts_mongod_commands.js
+++ b/jstests/sharding/libs/last_lts_mongod_commands.js
@@ -13,9 +13,11 @@ const commandsRemovedFromMongodSinceLastLTS = [
// test defined without always existing on the mongod being used.
const commandsAddedToMongodSinceLastLTS = [
"clusterAbortTransaction",
+ "clusterAggregate",
"clusterCommitTransaction",
"clusterDelete",
"clusterFind",
+ "clusterGetMore",
"clusterInsert",
"clusterUpdate",
"rotateCertificates",
diff --git a/jstests/sharding/read_write_concern_defaults_application.js b/jstests/sharding/read_write_concern_defaults_application.js
index a59f7bbb1bc..7642ce34e25 100644
--- a/jstests/sharding/read_write_concern_defaults_application.js
+++ b/jstests/sharding/read_write_concern_defaults_application.js
@@ -244,9 +244,11 @@ let testCases = {
checkWriteConcern: true,
},
clusterAbortTransaction: {skip: "already tested by 'abortTransaction' tests on mongos"},
+ clusterAggregate: {skip: "already tested by 'aggregate' tests on mongos"},
clusterCommitTransaction: {skip: "already tested by 'commitTransaction' tests on mongos"},
clusterDelete: {skip: "already tested by 'delete' tests on mongos"},
clusterFind: {skip: "already tested by 'find' tests on mongos"},
+ clusterGetMore: {skip: "already tested by 'getMore' tests on mongos"},
clusterInsert: {skip: "already tested by 'insert' tests on mongos"},
clusterUpdate: {skip: "already tested by 'update' tests on mongos"},
collMod: {
diff --git a/jstests/sharding/safe_secondary_reads_drop_recreate.js b/jstests/sharding/safe_secondary_reads_drop_recreate.js
index 3ee63252205..4835739324b 100644
--- a/jstests/sharding/safe_secondary_reads_drop_recreate.js
+++ b/jstests/sharding/safe_secondary_reads_drop_recreate.js
@@ -124,9 +124,11 @@ let testCases = {
clone: {skip: "primary only"},
cloneCollectionAsCapped: {skip: "primary only"},
clusterAbortTransaction: {skip: "already tested by 'abortTransaction' tests on mongos"},
+ clusterAggregate: {skip: "already tested by 'aggregate' tests on mongos"},
clusterCommitTransaction: {skip: "already tested by 'commitTransaction' tests on mongos"},
clusterDelete: {skip: "already tested by 'delete' tests on mongos"},
clusterFind: {skip: "already tested by 'find' tests on mongos"},
+ clusterGetMore: {skip: "already tested by 'getMore' tests on mongos"},
clusterInsert: {skip: "already tested by 'insert' tests on mongos"},
clusterUpdate: {skip: "already tested by 'update' tests on mongos"},
collMod: {skip: "primary only"},
diff --git a/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js b/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js
index fdaeacfed34..ee81222de94 100644
--- a/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js
+++ b/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js
@@ -140,9 +140,11 @@ let testCases = {
clone: {skip: "primary only"},
cloneCollectionAsCapped: {skip: "primary only"},
clusterAbortTransaction: {skip: "already tested by 'abortTransaction' tests on mongos"},
+ clusterAggregate: {skip: "already tested by 'aggregate' tests on mongos"},
clusterCommitTransaction: {skip: "already tested by 'commitTransaction' tests on mongos"},
clusterDelete: {skip: "already tested by 'delete' tests on mongos"},
clusterFind: {skip: "already tested by 'find' tests on mongos"},
+ clusterGetMore: {skip: "already tested by 'getMore' tests on mongos"},
clusterInsert: {skip: "already tested by 'insert' tests on mongos"},
clusterUpdate: {skip: "already tested by 'update' tests on mongos"},
commitReshardCollection: {skip: "primary only"},
diff --git a/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js b/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js
index c212b21c39e..23eac4fe2c2 100644
--- a/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js
+++ b/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js
@@ -127,9 +127,11 @@ let testCases = {
clone: {skip: "primary only"},
cloneCollectionAsCapped: {skip: "primary only"},
clusterAbortTransaction: {skip: "already tested by 'abortTransaction' tests on mongos"},
+ clusterAggregate: {skip: "already tested by 'aggregate' tests on mongos"},
clusterCommitTransaction: {skip: "already tested by 'commitTransaction' tests on mongos"},
clusterDelete: {skip: "already tested by 'delete' tests on mongos"},
clusterFind: {skip: "already tested by 'find' tests on mongos"},
+ clusterGetMore: {skip: "already tested by 'getMore' tests on mongos"},
clusterInsert: {skip: "already tested by 'insert' tests on mongos"},
clusterUpdate: {skip: "already tested by 'update' tests on mongos"},
collMod: {skip: "primary only"},
diff --git a/jstests/sharding/transaction_api_distributed_from_shard.js b/jstests/sharding/transaction_api_distributed_from_shard.js
index e44fca1850e..d4b42290ade 100644
--- a/jstests/sharding/transaction_api_distributed_from_shard.js
+++ b/jstests/sharding/transaction_api_distributed_from_shard.js
@@ -26,6 +26,7 @@ function runTestSuccess() {
},
{dbName: kDbName, command: {delete: kCollName, deletes: [{q: {_id: 3}, limit: 1}]}},
{dbName: kDbName, command: {find: kCollName, singleBatch: true}},
+ {dbName: kDbName, command: {aggregate: kCollName, pipeline: [{$match: {}}], cursor: {}}},
];
// Insert initial data.
@@ -44,6 +45,10 @@ function runTestSuccess() {
assert.eq(res.responses[3], {n: 1, ok: 1}, tojson(res));
assert.sameMembers(
res.responses[4].cursor.firstBatch, [{_id: 1}, {_id: 2, updated: true}], tojson(res));
+ assert.eq(res.responses[4].cursor.id, 0, tojson(res));
+ assert.sameMembers(
+ res.responses[5].cursor.firstBatch, [{_id: 1}, {_id: 2, updated: true}], tojson(res));
+ assert.eq(res.responses[5].cursor.id, 0, tojson(res));
// The written documents should be visible outside the transaction.
assert.sameMembers(st.s.getCollection(kNs).find().toArray(),
@@ -84,12 +89,59 @@ function runTestFailure() {
assert.commandWorked(st.s.getCollection(kNs).remove({}, false /* justOne */));
}
+function runTestGetMore() {
+ // Insert initial data.
+ const startVal = -50;
+ const numDocs = 100;
+
+ let bulk = st.s.getCollection(kNs).initializeUnorderedBulkOp();
+ for (let i = startVal; i < startVal + numDocs; i++) {
+ bulk.insert({_id: i});
+ }
+ assert.commandWorked(bulk.execute());
+
+ const commands = [
+ // Use a batch size < number of documents so the API must use getMores to exhaust the
+ // cursor.
+ {dbName: kDbName, command: {find: kCollName, batchSize: 17}, exhaustCursor: true},
+ ];
+
+ const commandMetricsBefore = shard0Primary.getDB(kDbName).serverStatus().metrics.commands;
+
+ const res = assert.commandWorked(
+ shard0Primary.adminCommand({testInternalTransactions: 1, commandInfos: commands}));
+ assert.eq(res.responses.length, 1, tojson(res));
+
+ // The response from an exhausted cursor is an array of BSON objects, so we don't assert the
+ // command worked.
+ assert.eq(res.responses[0].docs.length, numDocs, tojson(res));
+ for (let i = 0; i < numDocs; ++i) {
+ assert.eq(res.responses[0].docs[i]._id, startVal + i, tojson(res.responses[0].docs[i]));
+ }
+
+ // Verify getMores were used by checking serverStatus metrics.
+ const commandMetricsAfter = shard0Primary.getDB(kDbName).serverStatus().metrics.commands;
+
+ assert.gt(commandMetricsAfter.clusterFind.total, commandMetricsBefore.clusterFind.total);
+ if (!commandMetricsBefore.clusterGetMore) {
+ // The unsharded case runs before any cluster getMores are run.
+ assert.gt(commandMetricsAfter.clusterGetMore.total, 0);
+ } else {
+ assert.gt(commandMetricsAfter.clusterGetMore.total,
+ commandMetricsBefore.clusterGetMore.total);
+ }
+
+ // Clean up.
+ assert.commandWorked(st.s.getCollection(kNs).remove({}, false /* justOne */));
+}
+
//
// Unsharded collection case.
//
runTestSuccess();
runTestFailure();
+runTestGetMore();
//
// Sharded collection case.
@@ -105,6 +157,7 @@ assert.commandWorked(st.s.adminCommand({moveChunk: kNs, find: {x: 0}, to: st.sha
runTestSuccess();
runTestFailure();
+runTestGetMore();
st.stop();
})();
diff --git a/src/mongo/db/cluster_transaction_api.cpp b/src/mongo/db/cluster_transaction_api.cpp
index 770d5bad816..befd2e09313 100644
--- a/src/mongo/db/cluster_transaction_api.cpp
+++ b/src/mongo/db/cluster_transaction_api.cpp
@@ -43,11 +43,13 @@ namespace {
StringMap<std::string> clusterCommandTranslations = {
{"abortTransaction", "clusterAbortTransaction"},
+ {"aggregate", "clusterAggregate"},
{"commitTransaction", "clusterCommitTransaction"},
{"delete", "clusterDelete"},
+ {"find", "clusterFind"},
+ {"getMore", "clusterGetMore"},
{"insert", "clusterInsert"},
- {"update", "clusterUpdate"},
- {"find", "clusterFind"}};
+ {"update", "clusterUpdate"}};
BSONObj replaceCommandNameWithClusterCommandName(BSONObj cmdObj) {
auto cmdName = cmdObj.firstElement().fieldNameStringData();
diff --git a/src/mongo/db/commands.cpp b/src/mongo/db/commands.cpp
index d1df81deb92..579576989e5 100644
--- a/src/mongo/db/commands.cpp
+++ b/src/mongo/db/commands.cpp
@@ -121,9 +121,11 @@ bool checkAuthorizationImplPreParse(OperationContext* opCtx,
const StringMap<int> txnCmdAllowlist = {{"abortTransaction", 1},
{"aggregate", 1},
{"clusterAbortTransaction", 1},
+ {"clusterAggregate", 1},
{"clusterCommitTransaction", 1},
{"clusterDelete", 1},
{"clusterFind", 1},
+ {"clusterGetMore", 1},
{"clusterInsert", 1},
{"clusterUpdate", 1},
{"commitTransaction", 1},
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index bc2977f8f1c..38cae8e2be5 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -314,6 +314,8 @@ env.Library(
'cluster_abort_transaction_cmd_d.cpp',
'cluster_commit_transaction_cmd_d.cpp',
'cluster_find_cmd_d.cpp',
+ 'cluster_getmore_cmd_d.cpp',
+ 'cluster_pipeline_cmd_d.cpp',
'cluster_write_cmd_d.cpp',
'collmod_coordinator.cpp',
'collmod_coordinator_document.idl',
diff --git a/src/mongo/db/s/cluster_getmore_cmd_d.cpp b/src/mongo/db/s/cluster_getmore_cmd_d.cpp
new file mode 100644
index 00000000000..634ede87443
--- /dev/null
+++ b/src/mongo/db/s/cluster_getmore_cmd_d.cpp
@@ -0,0 +1,69 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/s/sharding_state.h"
+#include "mongo/s/commands/cluster_getmore_cmd.h"
+#include "mongo/s/grid.h"
+
+namespace mongo {
+namespace {
+
+/**
+ * Implements the cluster getMore command on mongod.
+ */
+struct ClusterGetMoreCmdD {
+ static constexpr StringData kName = "clusterGetMore"_sd;
+
+ static const std::set<std::string>& getApiVersions() {
+ return kNoApiVersions;
+ }
+
+ static void doCheckAuthorization(OperationContext* opCtx,
+ const NamespaceString& nss,
+ long long cursorID,
+ bool hasTerm) {
+ uassert(ErrorCodes::Unauthorized,
+ "Unauthorized",
+ AuthorizationSession::get(opCtx->getClient())
+ ->isAuthorizedForActionsOnResource(ResourcePattern::forClusterResource(),
+ ActionType::internal));
+ }
+
+ static void checkCanRunHere(OperationContext* opCtx) {
+ Grid::get(opCtx)->assertShardingIsInitialized();
+
+ // A cluster command on the config server may attempt to use a ShardLocal to target itself,
+ // which triggers an invariant, so only shard servers can run this.
+ uassertStatusOK(ShardingState::get(opCtx)->canAcceptShardedCommands());
+ }
+};
+ClusterGetMoreCmdBase<ClusterGetMoreCmdD> clusterGetMoreCmdD;
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/s/cluster_pipeline_cmd_d.cpp b/src/mongo/db/s/cluster_pipeline_cmd_d.cpp
new file mode 100644
index 00000000000..cd78fa52e18
--- /dev/null
+++ b/src/mongo/db/s/cluster_pipeline_cmd_d.cpp
@@ -0,0 +1,80 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/pipeline/aggregate_command_gen.h"
+#include "mongo/db/s/sharding_state.h"
+#include "mongo/s/commands/cluster_pipeline_cmd.h"
+#include "mongo/s/grid.h"
+
+namespace mongo {
+namespace {
+
+/**
+ * Implements the cluster aggregate command on mongod.
+ */
+struct ClusterPipelineCommandD {
+ static constexpr StringData kName = "clusterAggregate"_sd;
+
+ static const std::set<std::string>& getApiVersions() {
+ return kNoApiVersions;
+ }
+
+ static void doCheckAuthorization(OperationContext* opCtx, const PrivilegeVector& privileges) {
+ uassert(ErrorCodes::Unauthorized,
+ "Unauthorized",
+ AuthorizationSession::get(opCtx->getClient())
+ ->isAuthorizedForActionsOnResource(ResourcePattern::forClusterResource(),
+ ActionType::internal));
+ }
+
+ static void checkCanRunHere(OperationContext* opCtx) {
+ Grid::get(opCtx)->assertShardingIsInitialized();
+
+ // A cluster command on the config server may attempt to use a ShardLocal to target itself,
+ // which triggers an invariant, so only shard servers can run this.
+ uassertStatusOK(ShardingState::get(opCtx)->canAcceptShardedCommands());
+ }
+
+ static AggregateCommandRequest parseAggregationRequest(
+ const OpMsgRequest& opMsgRequest,
+ boost::optional<ExplainOptions::Verbosity> explainVerbosity,
+ bool apiStrict) {
+ // Replace clusterAggregate in the request body because the parser doesn't recognize it.
+ auto modifiedRequestBody =
+ opMsgRequest.body.replaceFieldNames(BSON(AggregateCommandRequest::kCommandName << 1));
+ return aggregation_request_helper::parseFromBSON(opMsgRequest.getDatabase().toString(),
+ modifiedRequestBody,
+ explainVerbosity,
+ apiStrict);
+ }
+};
+ClusterPipelineCommandBase<ClusterPipelineCommandD> clusterPipelineCmdD;
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/s/commands/SConscript b/src/mongo/s/commands/SConscript
index df97cd30619..b9f96eb6a29 100644
--- a/src/mongo/s/commands/SConscript
+++ b/src/mongo/s/commands/SConscript
@@ -61,7 +61,7 @@ env.Library(
'cluster_ftdc_commands.cpp',
'cluster_get_last_error_cmd.cpp',
'cluster_get_shard_version_cmd.cpp',
- 'cluster_getmore_cmd.cpp',
+ 'cluster_getmore_cmd_s.cpp',
'cluster_index_filter_cmd.cpp',
'cluster_is_db_grid_cmd.cpp',
'cluster_hello_cmd.cpp',
@@ -75,7 +75,7 @@ env.Library(
'cluster_map_reduce_cmd.cpp',
'cluster_multicast_cmd.cpp',
'cluster_netstat_cmd.cpp',
- 'cluster_pipeline_cmd.cpp',
+ 'cluster_pipeline_cmd_s.cpp',
'cluster_plan_cache_clear_cmd.cpp',
'cluster_profile_cmd.cpp',
'cluster_refine_collection_shard_key_cmd.cpp',
diff --git a/src/mongo/s/commands/cluster_getmore_cmd.cpp b/src/mongo/s/commands/cluster_getmore_cmd.h
index 7b7c9a8898b..45441cbdeb7 100644
--- a/src/mongo/s/commands/cluster_getmore_cmd.cpp
+++ b/src/mongo/s/commands/cluster_getmore_cmd.h
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2018-present MongoDB, Inc.
+ * Copyright (C) 2022-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
@@ -27,7 +27,7 @@
* it in the license file.
*/
-#include "mongo/platform/basic.h"
+#pragma once
#include "mongo/db/api_parameters.h"
#include "mongo/db/auth/authorization_checks.h"
@@ -56,12 +56,13 @@ static const ReadConcernSupportResult kSupportsReadConcernResult{
* corresponding to the cursor id passed from the application. In order to generate these results,
* may issue getMore commands to remote nodes in one or more shards.
*/
-class ClusterGetMoreCmd final : public Command {
+template <typename Impl>
+class ClusterGetMoreCmdBase final : public Command {
public:
- ClusterGetMoreCmd() : Command("getMore") {}
+ ClusterGetMoreCmdBase() : Command(Impl::kName) {}
const std::set<std::string>& apiVersions() const {
- return kApiVersions1;
+ return Impl::getApiVersions();
}
std::unique_ptr<CommandInvocation> parse(OperationContext* opCtx,
@@ -73,7 +74,7 @@ public:
public:
Invocation(Command* cmd, const OpMsgRequest& request)
: CommandInvocation(cmd),
- _cmd(GetMoreCommandRequest::parse({"getMore"}, request.body)) {}
+ _cmd(GetMoreCommandRequest::parse({Impl::kName}, request.body)) {}
private:
NamespaceString ns() const override {
@@ -90,15 +91,16 @@ public:
}
void doCheckAuthorization(OperationContext* opCtx) const override {
- uassertStatusOK(auth::checkAuthForGetMore(AuthorizationSession::get(opCtx->getClient()),
- ns(),
- _cmd.getCommandParameter(),
- _cmd.getTerm().is_initialized()));
+ Impl::doCheckAuthorization(
+ opCtx, ns(), _cmd.getCommandParameter(), _cmd.getTerm().is_initialized());
}
void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* reply) override {
// Counted as a getMore, not as a command.
globalOpCounters.gotGetMore();
+
+ Impl::checkCanRunHere(opCtx);
+
auto bob = reply->getBodyBuilder();
auto response = uassertStatusOK(ClusterFind::runGetMore(opCtx, _cmd));
response.addToBSON(CursorResponse::ResponseType::SubsequentResponse, &bob);
@@ -141,7 +143,7 @@ public:
LogicalOp getLogicalOp() const override {
return LogicalOp::opGetMore;
}
-} cmdGetMoreCluster;
+};
} // namespace
} // namespace mongo
diff --git a/src/mongo/s/commands/cluster_getmore_cmd_s.cpp b/src/mongo/s/commands/cluster_getmore_cmd_s.cpp
new file mode 100644
index 00000000000..63a04722d93
--- /dev/null
+++ b/src/mongo/s/commands/cluster_getmore_cmd_s.cpp
@@ -0,0 +1,60 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/s/commands/cluster_getmore_cmd.h"
+
+namespace mongo {
+namespace {
+
+/**
+ * Implements the cluster getMore command on mongos.
+ */
+struct ClusterGetMoreCmdS {
+ static constexpr StringData kName = "getMore"_sd;
+
+ static const std::set<std::string>& getApiVersions() {
+ return kApiVersions1;
+ }
+
+ static void doCheckAuthorization(OperationContext* opCtx,
+ const NamespaceString& nss,
+ long long cursorID,
+ bool hasTerm) {
+ uassertStatusOK(auth::checkAuthForGetMore(
+ AuthorizationSession::get(opCtx->getClient()), nss, cursorID, hasTerm));
+ }
+
+ static void checkCanRunHere(OperationContext* opCtx) {
+ // Can always run on a mongos.
+ }
+};
+ClusterGetMoreCmdBase<ClusterGetMoreCmdS> clusterGetMoreCmdS;
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/s/commands/cluster_pipeline_cmd.cpp b/src/mongo/s/commands/cluster_pipeline_cmd.h
index aec0641e905..788a974803d 100644
--- a/src/mongo/s/commands/cluster_pipeline_cmd.cpp
+++ b/src/mongo/s/commands/cluster_pipeline_cmd.h
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2018-present MongoDB, Inc.
+ * Copyright (C) 2022-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
@@ -27,9 +27,7 @@
* it in the license file.
*/
-#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kCommand
-
-#include "mongo/platform/basic.h"
+#pragma once
#include "mongo/base/status.h"
#include "mongo/db/auth/authorization_checks.h"
@@ -43,12 +41,13 @@
namespace mongo {
namespace {
-class ClusterPipelineCommand final : public Command {
+template <typename Impl>
+class ClusterPipelineCommandBase final : public Command {
public:
- ClusterPipelineCommand() : Command("aggregate") {}
+ ClusterPipelineCommandBase() : Command(Impl::kName) {}
const std::set<std::string>& apiVersions() const {
- return kApiVersions1;
+ return Impl::getApiVersions();
}
/**
@@ -74,11 +73,10 @@ public:
OperationContext* opCtx,
const OpMsgRequest& opMsgRequest,
boost::optional<ExplainOptions::Verbosity> explainVerbosity) override {
- const auto aggregationRequest = aggregation_request_helper::parseFromBSON(
- opMsgRequest.getDatabase().toString(),
- opMsgRequest.body,
- explainVerbosity,
- APIParameters::get(opCtx).getAPIStrict().value_or(false));
+ const auto aggregationRequest =
+ Impl::parseAggregationRequest(opMsgRequest,
+ explainVerbosity,
+ APIParameters::get(opCtx).getAPIStrict().value_or(false));
auto privileges = uassertStatusOK(
auth::getPrivilegesForAggregate(AuthorizationSession::get(opCtx->getClient()),
@@ -147,6 +145,8 @@ public:
CommandHelpers::handleMarkKillOnClientDisconnect(
opCtx, !Pipeline::aggHasWriteStage(_request.body));
+ Impl::checkCanRunHere(opCtx);
+
auto bob = reply->getBodyBuilder();
_runAggCommand(opCtx, _dbName, _request.body, &bob);
}
@@ -159,10 +159,7 @@ public:
}
void doCheckAuthorization(OperationContext* opCtx) const override {
- uassert(ErrorCodes::Unauthorized,
- "unauthorized",
- AuthorizationSession::get(opCtx->getClient())
- ->isAuthorizedForPrivileges(_privileges));
+ Impl::doCheckAuthorization(opCtx, _privileges);
}
NamespaceString ns() const override {
@@ -192,7 +189,7 @@ public:
const AuthorizationContract* getAuthorizationContract() const final {
return &::mongo::AggregateCommandRequest::kAuthorizationContract;
}
-} clusterPipelineCmd;
+};
} // namespace
} // namespace mongo
diff --git a/src/mongo/s/commands/cluster_pipeline_cmd_s.cpp b/src/mongo/s/commands/cluster_pipeline_cmd_s.cpp
new file mode 100644
index 00000000000..3009d9265e6
--- /dev/null
+++ b/src/mongo/s/commands/cluster_pipeline_cmd_s.cpp
@@ -0,0 +1,67 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/s/commands/cluster_pipeline_cmd.h"
+
+namespace mongo {
+namespace {
+
+/**
+ * Implements the cluster aggregate command on mongos.
+ */
+struct ClusterPipelineCommandS {
+ static constexpr StringData kName = "aggregate"_sd;
+
+ static const std::set<std::string>& getApiVersions() {
+ return kApiVersions1;
+ }
+
+ static void doCheckAuthorization(OperationContext* opCtx, const PrivilegeVector& privileges) {
+ uassert(
+ ErrorCodes::Unauthorized,
+ "unauthorized",
+ AuthorizationSession::get(opCtx->getClient())->isAuthorizedForPrivileges(privileges));
+ }
+
+ static void checkCanRunHere(OperationContext* opCtx) {
+ // Can always run on a mongos.
+ }
+
+ static AggregateCommandRequest parseAggregationRequest(
+ const OpMsgRequest& opMsgRequest,
+ boost::optional<ExplainOptions::Verbosity> explainVerbosity,
+ bool apiStrict) {
+ return aggregation_request_helper::parseFromBSON(
+ opMsgRequest.getDatabase().toString(), opMsgRequest.body, explainVerbosity, apiStrict);
+ }
+};
+ClusterPipelineCommandBase<ClusterPipelineCommandS> clusterPipelineCmdS;
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/s/commands/internal_transactions_test_commands.cpp b/src/mongo/s/commands/internal_transactions_test_commands.cpp
index 9e827f8d35c..1843dd9c28e 100644
--- a/src/mongo/s/commands/internal_transactions_test_commands.cpp
+++ b/src/mongo/s/commands/internal_transactions_test_commands.cpp
@@ -32,6 +32,7 @@
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/cluster_transaction_api.h"
#include "mongo/db/commands.h"
+#include "mongo/db/query/find_command_gen.h"
#include "mongo/db/transaction_api.h"
#include "mongo/logv2/log.h"
#include "mongo/s/commands/internal_transactions_test_commands_gen.h"
@@ -81,6 +82,24 @@ public:
const auto& dbName = commandInfo.getDbName();
const auto& command = commandInfo.getCommand();
auto assertSucceeds = commandInfo.getAssertSucceeds();
+ auto exhaustCursor = commandInfo.getExhaustCursor();
+
+ if (exhaustCursor == boost::optional<bool>(true)) {
+ // We can't call a getMore without knowing its cursor's id, so we
+ // use the exhaustiveFind helper to test getMores. Make an OpMsgRequest
+ // from the command to append $db, which FindCommandRequest expects.
+ auto findOpMsgRequest = OpMsgRequest::fromDBAndBody(dbName, command);
+ auto findCommand = FindCommandRequest::parse(
+ IDLParserErrorContext("FindCommandRequest", false /* apiStrict */),
+ findOpMsgRequest.body);
+
+ auto docs = txnClient.exhaustiveFind(findCommand).get();
+
+ BSONObjBuilder resBob;
+ resBob.append("docs", std::move(docs));
+ sharedBlock->responses.emplace_back(resBob.obj());
+ continue;
+ }
auto res = txnClient.runCommand(dbName, command).get();
sharedBlock->responses.emplace_back(
diff --git a/src/mongo/s/commands/internal_transactions_test_commands.idl b/src/mongo/s/commands/internal_transactions_test_commands.idl
index e34d4adbbb9..1e68776ba94 100644
--- a/src/mongo/s/commands/internal_transactions_test_commands.idl
+++ b/src/mongo/s/commands/internal_transactions_test_commands.idl
@@ -51,6 +51,9 @@ structs:
assertSucceeds:
type: bool
default: true
+ exhaustCursor:
+ type: bool
+ optional: true
commands:
testInternalTransactions: