summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMickey. J Winters <mickey.winters@mongodb.com>2022-09-02 16:50:44 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-09-07 22:46:38 +0000
commit4d5451b20fecaf4a054cca94de6962fcf28c577f (patch)
treedc2645876c2daac52bdf09f088bb876e9c3bc196
parent4b37709d13d1ee29cb0420f875af4cd19d3c2a6c (diff)
downloadmongo-4d5451b20fecaf4a054cca94de6962fcf28c577f.tar.gz
SERVER-69002 backport per shard cursors
SERVER-62400 SERVER-62681 SERVER-62738 SERVER-63781 SERVER-63774 SERVER-63772 SERVER-63773 SERVER-58673 (cherry-picked from commit fc54ebd0137a25ea664c022b51b685667dd037c7) (cherry-picked from commit 53d7bceee61f73a1d6959edb5d490c3b338f3c0d) (cherry-picked from commit 586663fec7c3a7d4a8b0185ff24825bd15e80dff) (cherry-picked from commit ef2a62dcc27461d2be1b619c75bc04effa1f2021) (cherry-picked from commit 4f3626ff4486e672569699dfde1cc0ae8c54d348) (cherry-picked from commit 0f7683455bc06b153f14368a3f05f0b69671717e) (cherry-picked from commit 11d01816f743d6764c4f12c42697f5edf813ce27) (cherry-picked from commit 1fe77b5bd9fb13f9eb74275359dcc4ba69f2d5e9)
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_per_shard_cursor_passthrough.yml58
-rw-r--r--etc/evergreen.yml7
-rw-r--r--jstests/libs/override_methods/implicit_passthrough_to_shard_changestreams.js47
-rw-r--r--jstests/noPassthrough/change_streams_per_shard_cursor.js196
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp1
-rw-r--r--src/mongo/db/pipeline/aggregate_command.idl15
-rw-r--r--src/mongo/db/pipeline/expression_context.h1
-rw-r--r--src/mongo/db/pipeline/plan_executor_pipeline.cpp3
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp21
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.h3
-rw-r--r--src/mongo/db/query/query_feature_flags.idl15
-rw-r--r--src/mongo/s/commands/cluster_map_reduce_agg.cpp14
-rw-r--r--src/mongo/s/query/async_results_merger.cpp4
-rw-r--r--src/mongo/s/query/cluster_aggregate.cpp40
-rw-r--r--src/mongo/s/query/cluster_aggregation_planner.cpp172
-rw-r--r--src/mongo/s/query/cluster_aggregation_planner.h19
-rw-r--r--src/mongo/s/query/router_exec_stage.h2
-rw-r--r--src/mongo/s/query/router_stage_merge.h4
-rw-r--r--src/mongo/s/query/router_stage_pipeline.cpp2
-rw-r--r--src/mongo/s/query/router_stage_pipeline.h2
-rw-r--r--src/mongo/s/query/store_possible_cursor.cpp27
-rw-r--r--src/mongo/s/query/store_possible_cursor.h5
22 files changed, 562 insertions, 96 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_per_shard_cursor_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_per_shard_cursor_passthrough.yml
new file mode 100644
index 00000000000..db39ec22c09
--- /dev/null
+++ b/buildscripts/resmokeconfig/suites/change_streams_per_shard_cursor_passthrough.yml
@@ -0,0 +1,58 @@
+test_kind: js_test
+
+selector:
+ roots:
+ - jstests/change_streams/**/*.js
+ exclude_with_any_tags:
+ ##
+ # The next tags correspond to the special errors thrown by the
+ # set_read_and_write_concerns.js override when it refuses to replace the readConcern or
+ # writeConcern of a particular command. Above each tag are the message(s) that cause the tag to be
+ # warranted.
+ ##
+ # "Cowardly refusing to override write concern of command: ..."
+ - assumes_write_concern_unchanged
+ # Exclude any that assume sharding is disabled
+ - assumes_against_mongod_not_mongos
+ - assumes_unsharded_collection
+
+executor:
+ archive:
+ hooks:
+ - CheckReplDBHash
+ - ValidateCollections
+ config:
+ shell_options:
+ global_vars:
+ TestData:
+ defaultReadConcernLevel: null
+ enableMajorityReadConcern: ''
+ # Enable causal consistency for change streams suites using 1 node replica sets. See
+ # change_streams.yml for detailed explanation.
+ eval: >-
+ var testingReplication = true;
+ load('jstests/libs/override_methods/set_read_and_write_concerns.js');
+ load('jstests/libs/override_methods/implicitly_shard_accessed_collections.js');
+ load('jstests/libs/override_methods/enable_causal_consistency_without_read_pref.js');
+ load('jstests/libs/override_methods/implicit_passthrough_to_shard_changestreams.js');
+ hooks:
+ - class: CheckReplDBHash
+ - class: ValidateCollections
+ - class: CleanEveryN
+ n: 20
+ fixture:
+ class: ShardedClusterFixture
+ mongos_options:
+ bind_ip_all: ''
+ set_parameters:
+ enableTestCommands: 1
+ mongod_options:
+ bind_ip_all: ''
+ set_parameters:
+ enableTestCommands: 1
+ writePeriodicNoops: 1
+ periodicNoopIntervalSecs: 1
+ coordinateCommitReturnImmediatelyAfterPersistingDecision: true
+ num_shards: 1
+ enable_sharding:
+ - test
diff --git a/etc/evergreen.yml b/etc/evergreen.yml
index 10e4cd3c8b8..80ce89976f3 100644
--- a/etc/evergreen.yml
+++ b/etc/evergreen.yml
@@ -4028,6 +4028,13 @@ tasks:
resmoke_args: --suites=change_streams_multi_stmt_txn_sharded_collections_passthrough --storageEngine=wiredTiger
- <<: *task_template
+ name: change_streams_per_shard_cursor_passthrough
+ tags: ["change_streams"]
+ commands:
+ - func: "do setup"
+ - func: "run tests"
+
+- <<: *task_template
name: disk_wiredtiger
commands:
- func: "do setup"
diff --git a/jstests/libs/override_methods/implicit_passthrough_to_shard_changestreams.js b/jstests/libs/override_methods/implicit_passthrough_to_shard_changestreams.js
new file mode 100644
index 00000000000..8e8779e7ca5
--- /dev/null
+++ b/jstests/libs/override_methods/implicit_passthrough_to_shard_changestreams.js
@@ -0,0 +1,47 @@
+/**
+ * Overrides runCommand to use the $_passthroughToShard parameter. The changestreams per-shard
+ * cursor passthrough suite ensures changestream tests can still run correctly on a single-shard
+ * cluster. By adding this parameter, we pass through to that single shard, running the pipelines
+ * directly on that mongod. This will test the machinery of per-shard cursors via mongos.
+ */
+
+(function() {
+'use strict';
+
+load("jstests/libs/override_methods/override_helpers.js"); // For 'OverrideHelpers'.
+load("jstests/libs/discover_topology.js"); // For 'DiscoverTopology'.
+
+// To be eligible, a command must be a changeStream request sent to a mongos.
+const isEligibleForPerShardCursor = function(conn, cmdObj) {
+ if (!(cmdObj && cmdObj.aggregate && Array.isArray(cmdObj.pipeline) &&
+ cmdObj.pipeline.length > 0 && typeof cmdObj.pipeline[0].$changeStream == "object" &&
+ cmdObj.pipeline[0].$changeStream.constructor === Object)) {
+ return false;
+ }
+ return conn.isMongos();
+};
+
+const discoverShardId = function(conn) {
+ const topology = DiscoverTopology.findConnectedNodes(conn);
+ const shards = topology.shards;
+ let shardName = Object.keys(shards)[0];
+ return {shard: shardName};
+};
+
+function runCommandWithPassthroughToShard(
+ conn, _dbName, _commandName, commandObj, func, makeFuncArgs) {
+ if (typeof commandObj !== "object" || commandObj === null) {
+ return func.apply(conn, makeFuncArgs(commandObj));
+ }
+ if (!isEligibleForPerShardCursor(conn, commandObj)) {
+ return func.apply(conn, makeFuncArgs(commandObj));
+ }
+ commandObj.$_passthroughToShard = discoverShardId(conn);
+ return func.apply(conn, makeFuncArgs(commandObj));
+}
+
+OverrideHelpers.prependOverrideInParallelShell(
+ "jstests/libs/override_methods/implicit_passthrough_to_shard_changestreams.js");
+
+OverrideHelpers.overrideRunCommand(runCommandWithPassthroughToShard);
+}());
diff --git a/jstests/noPassthrough/change_streams_per_shard_cursor.js b/jstests/noPassthrough/change_streams_per_shard_cursor.js
new file mode 100644
index 00000000000..bece83e65c7
--- /dev/null
+++ b/jstests/noPassthrough/change_streams_per_shard_cursor.js
@@ -0,0 +1,196 @@
+/**
+ * @tags: [
+ * requires_sharding,
+ * uses_change_streams,
+ * ]
+ */
+(function() {
+"use strict";
+
+const dbName = jsTestName();
+const setupShardedCluster = (shards = 1) => {
+ const st = new ShardingTest(
+ {shards, mongos: 1, config: 1, rs: {nodes: 1, setParameter: {writePeriodicNoops: false}}});
+ const sdb = st.s0.getDB(dbName);
+ assert.commandWorked(sdb.dropDatabase());
+
+ sdb.setProfilingLevel(0, -1);
+ st.shard0.getDB(dbName).setProfilingLevel(0, -1);
+
+ // Shard the relevant collections.
+ assert.commandWorked(st.s.adminCommand({enableSharding: dbName}));
+ st.ensurePrimaryShard(dbName, st.shard0.name);
+ if (shards === 2) {
+ // Shard the collection on {_id: 1}, split at {_id: 0} and move the empty upper chunk to
+ // shard1.
+ st.shardColl("coll", {_id: 1}, {_id: 0}, {_id: 0}, dbName);
+ st.shardColl("coll2", {_id: 1}, {_id: 0}, {_id: 0}, dbName);
+ } else {
+ assert(shards === 1, "only 1 or 2 shards supported");
+ assert.commandWorked(st.s.adminCommand({shardCollection: dbName + ".coll", key: {_id: 1}}));
+ assert.commandWorked(
+ st.s.adminCommand({shardCollection: dbName + ".coll2", key: {_id: 1}}));
+ }
+
+ const shardId = st.shard0.shardName;
+ return [sdb, st, shardId];
+};
+
+const pscWatch = (db, coll, shardId, options = {}, csOptions = {}) => {
+ let cmd = {
+ aggregate: coll,
+ cursor: {},
+ pipeline: [{$changeStream: csOptions}],
+ $_passthroughToShard: {shard: shardId}
+ };
+ cmd = Object.assign({}, cmd, options);
+ if (options.pipeline) {
+ cmd.pipeline = [{$changeStream: csOptions}].concat(options.pipeline);
+ }
+ const resp = db.runCommand(cmd);
+ assert.commandWorked(resp);
+ if (options.explain) {
+ return resp;
+ }
+ return new DBCommandCursor(db, resp);
+};
+
+// Parsing
+let [sdb, st, shardId] = setupShardedCluster();
+
+// Should not allow pipeline without $changeStream.
+assert.commandFailedWithCode(sdb.runCommand({
+ aggregate: "coll",
+ cursor: {},
+ pipeline: [{$match: {perfect: true}}],
+ $_passthroughToShard: {shard: shardId}
+}),
+ 6273801);
+
+// $out can't passthrough so it's not allowed.
+assert.commandFailedWithCode(
+ assert.throws(() => pscWatch(sdb, "coll", shardId, {pipeline: [{$out: "h"}]})), 6273802);
+
+// Shard option should be specified.
+assert.commandFailedWithCode(
+ sdb.runCommand(
+ {aggregate: "coll", cursor: {}, pipeline: [{$changeStream: {}}], $_passthroughToShard: {}}),
+ 40414);
+
+// The shardId field should be a string.
+assert.commandFailedWithCode(assert.throws(() => pscWatch(sdb, "coll", 42)),
+ ErrorCodes.TypeMismatch);
+// Can't open a per shard cursor on the config RS.
+assert.commandFailedWithCode(assert.throws(() => pscWatch(sdb, "coll", "config")), 6273803);
+
+// The shardId should be a valid shard.
+assert.commandFailedWithCode(
+ assert.throws(() => pscWatch(sdb, "coll", "Dwane 'the Shard' Johnson")),
+ ErrorCodes.ShardNotFound);
+
+// Correctness.
+
+// Simple collection level watch
+// this insert shouldn't show up since it happens before we make a cursor.
+sdb.coll.insertOne({location: 1});
+let c = pscWatch(sdb, "coll", shardId);
+// these inserts should show up since they're after we make a cursor.
+for (let i = 1; i <= 4; i++) {
+ sdb.coll.insertOne({location: 2, i});
+ assert(!c.isExhausted());
+ assert(c.hasNext());
+ c.next();
+}
+assert(!c.hasNext());
+
+// Simple database level watch
+c = pscWatch(sdb, 1, shardId);
+
+sdb.coll.insertOne({location: 3});
+assert(!c.isExhausted());
+assert(c.hasNext());
+c.next();
+
+sdb.coll2.insertOne({location: 4});
+assert(!c.isExhausted());
+assert(c.hasNext());
+c.next();
+
+assert(!c.hasNext());
+
+// Watching collection that doesn't exist yet.
+c = pscWatch(sdb, "toBeCreated", shardId);
+assert(!c.isExhausted());
+assert(!c.hasNext());
+
+st.s.adminCommand({shardCollection: dbName + ".toBeCreated", key: {_id: 1}});
+assert(!c.isExhausted());
+assert(!c.hasNext());
+
+sdb.toBeCreated.insertOne({location: 8});
+assert(!c.isExhausted());
+assert(c.hasNext());
+c.next();
+
+assert(!c.hasNext());
+
+// Explain output should not have a split pipeline. It should look like mongod explain output.
+let explainOut = pscWatch(sdb, "coll", shardId, {explain: true});
+assert(!explainOut.hasOwnProperty("splitPipeline"));
+assert.hasOwnProperty(explainOut, "stages");
+
+// If we getMore an invalidated cursor the cursor should have been closed on mongos and we should
+// get CursorNotFound, even if the invalidate event was never recieved by mongos.
+[[], [{$match: {f: "filter out invalidate event"}}]].forEach((pipeline) => {
+ assert.commandWorked(st.s.adminCommand({shardCollection: dbName + ".toDrop", key: {_id: 1}}));
+ let c = pscWatch(sdb, "toDrop", shardId, {pipeline});
+ sdb.toDrop.insertOne({});
+ sdb.toDrop.drop();
+ assert.commandFailedWithCode(
+ assert.throws(() => {
+ assert.retry(() => {
+ c._runGetMoreCommand();
+ return false;
+ }, "change stream should have been invalidated by now", 4);
+ }),
+ ErrorCodes.CursorNotFound);
+});
+
+st.stop();
+
+// Isolated from events on other shards.
+[sdb, st, shardId] = setupShardedCluster(2);
+c = pscWatch(sdb, "coll", shardId);
+
+sdb.coll.insertOne({location: 5, _id: -2});
+assert(!c.isExhausted());
+assert(c.hasNext());
+c.next();
+
+sdb.coll.insertOne({location: 6, _id: 2});
+assert(!c.isExhausted());
+assert(!c.hasNext());
+
+// Isolated from events on other shards with whole db.
+c = pscWatch(sdb.getSiblingDB("admin"), 1, shardId, {}, {allChangesForCluster: true});
+
+sdb.coll.insertOne({location: 7, _id: -3});
+assert(!c.isExhausted());
+assert(c.hasNext());
+c.next();
+
+sdb.coll2.insertOne({location: 8, _id: -4});
+assert(!c.isExhausted());
+assert(c.hasNext());
+c.next();
+
+sdb.coll.insertOne({location: 9, _id: 3});
+assert(!c.isExhausted());
+assert(!c.hasNext());
+
+sdb.coll2.insertOne({location: 10, _id: 4});
+assert(!c.isExhausted());
+assert(!c.hasNext());
+
+st.stop();
+})();
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index 93012d0e40a..0393035063a 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -463,6 +463,7 @@ boost::intrusive_ptr<ExpressionContext> makeExpressionContext(
CurOp::get(opCtx)->dbProfileLevel() > 0);
expCtx->tempDir = storageGlobalParams.dbpath + "/_tmp";
expCtx->collationMatchesDefault = collationMatchesDefault;
+ expCtx->forPerShardCursor = request.getPassthroughToShard().has_value();
return expCtx;
}
diff --git a/src/mongo/db/pipeline/aggregate_command.idl b/src/mongo/db/pipeline/aggregate_command.idl
index 1146df571e3..467f5cafb38 100644
--- a/src/mongo/db/pipeline/aggregate_command.idl
+++ b/src/mongo/db/pipeline/aggregate_command.idl
@@ -62,6 +62,15 @@ types:
serializer: ::mongo::serializeExplainToBSON
deserializer: ::mongo::parseExplainModeFromBSON
+structs:
+ PassthroughToShardOptions:
+ description: "options for commands requesting a per shard cursor"
+ fields:
+ shard:
+ description: "id of the shard to passthrough to"
+ type: string
+ unstable: true
+
commands:
aggregate:
description: "Represents the user-supplied options to the aggregate command."
@@ -230,3 +239,9 @@ commands:
type: bool
ignore: true
unstable: true
+ $_passthroughToShard:
+ description: "An optional internal parameter for this request. If a shard key is specified, then that specific shard will be targeted."
+ type: PassthroughToShardOptions
+ cpp_name: passthroughToShard
+ optional: true
+ unstable: true
diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h
index c1dae29660a..6d54c695f8a 100644
--- a/src/mongo/db/pipeline/expression_context.h
+++ b/src/mongo/db/pipeline/expression_context.h
@@ -367,6 +367,7 @@ public:
bool fromMongos = false;
bool needsMerge = false;
bool inMongos = false;
+ bool forPerShardCursor = false;
bool allowDiskUse = false;
bool bypassDocumentValidation = false;
bool hasWhereClause = false;
diff --git a/src/mongo/db/pipeline/plan_executor_pipeline.cpp b/src/mongo/db/pipeline/plan_executor_pipeline.cpp
index 01d31be0ff8..4bd0ee5cf18 100644
--- a/src/mongo/db/pipeline/plan_executor_pipeline.cpp
+++ b/src/mongo/db/pipeline/plan_executor_pipeline.cpp
@@ -77,7 +77,8 @@ PlanExecutor::ExecState PlanExecutorPipeline::getNext(BSONObj* objOut, RecordId*
auto execState = getNextDocument(&docOut, nullptr);
if (execState == PlanExecutor::ADVANCED) {
// Include metadata if the output will be consumed by a merging node.
- *objOut = _expCtx->needsMerge ? docOut.toBsonWithMetaData() : docOut.toBson();
+ *objOut = _expCtx->needsMerge || _expCtx->forPerShardCursor ? docOut.toBsonWithMetaData()
+ : docOut.toBson();
}
return execState;
}
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
index 45ac9df6300..5ac435ce9a7 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -858,13 +858,24 @@ BSONObj createPassthroughCommandForShard(
Document serializedCommand,
boost::optional<ExplainOptions::Verbosity> explainVerbosity,
Pipeline* pipeline,
- BSONObj collationObj) {
+ BSONObj collationObj,
+ boost::optional<int> overrideBatchSize) {
// Create the command for the shards.
MutableDocument targetedCmd(serializedCommand);
if (pipeline) {
targetedCmd[AggregateCommandRequest::kPipelineFieldName] = Value(pipeline->serialize());
}
+ if (overrideBatchSize.has_value()) {
+ if (serializedCommand[AggregateCommandRequest::kCursorFieldName].missing()) {
+ targetedCmd[AggregateCommandRequest::kCursorFieldName] =
+ Value(DOC(SimpleCursorOptions::kBatchSizeFieldName << Value(*overrideBatchSize)));
+ } else {
+ targetedCmd[AggregateCommandRequest::kCursorFieldName]
+ [SimpleCursorOptions::kBatchSizeFieldName] = Value(*overrideBatchSize);
+ }
+ }
+
auto shardCommand =
genericTransformForShards(std::move(targetedCmd), expCtx, explainVerbosity, collationObj);
@@ -1007,8 +1018,12 @@ DispatchShardPipelineResults dispatchShardPipeline(
(splitPipelines
? createCommandForTargetedShards(
expCtx, serializedCommand, *splitPipelines, exchangeSpec, true /* needsMerge */)
- : createPassthroughCommandForShard(
- expCtx, serializedCommand, expCtx->explain, pipeline.get(), collationObj));
+ : createPassthroughCommandForShard(expCtx,
+ serializedCommand,
+ expCtx->explain,
+ pipeline.get(),
+ collationObj,
+ boost::none));
// A $changeStream pipeline must run on all shards, and will also open an extra cursor on the
// config server in order to monitor for new shards. To guarantee that we do not miss any
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.h b/src/mongo/db/pipeline/sharded_agg_helpers.h
index 1114c1a1d1a..e2369deb2db 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.h
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.h
@@ -131,7 +131,8 @@ BSONObj createPassthroughCommandForShard(
Document serializedCommand,
boost::optional<ExplainOptions::Verbosity> explainVerbosity,
Pipeline* pipeline,
- BSONObj collationObj);
+ BSONObj collationObj,
+ boost::optional<int> overrideBatchSize);
BSONObj createCommandForTargetedShards(const boost::intrusive_ptr<ExpressionContext>& expCtx,
Document serializedCommand,
diff --git a/src/mongo/db/query/query_feature_flags.idl b/src/mongo/db/query/query_feature_flags.idl
index b89670b06ea..10e233aed2b 100644
--- a/src/mongo/db/query/query_feature_flags.idl
+++ b/src/mongo/db/query/query_feature_flags.idl
@@ -6,21 +6,21 @@
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# Server Side Public License for more details.
#
# You should have received a copy of the Server Side Public License
-# along with this program. If not, see
+# along with this program. If not, see
# <http://www.mongodb.com/licensing/server-side-public-license>.
#
-# As a special exception, the copyright holders give permission to link the
+# As a special exception, the copyright holders give permission to link the
# code of portions of this program with the OpenSSL library under certain
# conditions as described in each individual source file and distribute
-# linked combinations including the program with the OpenSSL library. You
-# must comply with the Server Side Public License in all respects for
+# linked combinations including the program with the OpenSSL library. You
+# must comply with the Server Side Public License in all respects for
# all of the code used other than as permitted herein. If you modify file(s)
-# with this exception, you may extend this exception to your version of the
-# file(s), but you are not obligated to do so. If you do not wish to do so,
+# with this exception, you may extend this exception to your version of the
+# file(s), but you are not obligated to do so. If you do not wish to do so,
# delete this exception statement from your version. If you delete this
# exception statement from all source files in the program, then also delete
# it in the license file.
@@ -58,4 +58,3 @@ feature_flags:
cpp_varname: gFeatureFlagShardedTimeSeriesUpdateDelete
default: true
version: 5.0
-
diff --git a/src/mongo/s/commands/cluster_map_reduce_agg.cpp b/src/mongo/s/commands/cluster_map_reduce_agg.cpp
index 7cba5b51bd5..996f726cd67 100644
--- a/src/mongo/s/commands/cluster_map_reduce_agg.cpp
+++ b/src/mongo/s/commands/cluster_map_reduce_agg.cpp
@@ -187,8 +187,9 @@ bool runAggregationMapReduce(OperationContext* opCtx,
pipelineBuilder,
cm,
involvedNamespaces,
- false, // hasChangeStream
- true); // allowedToPassthrough
+ false, // hasChangeStream
+ true, // allowedToPassthrough
+ false); // perShardCursor
try {
switch (targeter.policy) {
case cluster_aggregation_planner::AggregationTargeter::TargetingPolicy::kPassthrough: {
@@ -210,8 +211,7 @@ bool runAggregationMapReduce(OperationContext* opCtx,
case cluster_aggregation_planner::AggregationTargeter::TargetingPolicy::
kMongosRequired: {
// Pipelines generated from mapReduce should never be required to run on mongos.
- uasserted(31291, "Internal error during mapReduce translation");
- break;
+ MONGO_UNREACHABLE_TASSERT(31291);
}
case cluster_aggregation_planner::AggregationTargeter::TargetingPolicy::kAnyShard: {
@@ -234,6 +234,12 @@ bool runAggregationMapReduce(OperationContext* opCtx,
false)); // hasChangeStream
break;
}
+
+ case cluster_aggregation_planner::AggregationTargeter::TargetingPolicy::
+ kSpecificShardOnly: {
+ // It should not be possible to pass $_passthroughToShard to a map reduce command.
+ MONGO_UNREACHABLE_TASSERT(6273803);
+ }
}
} catch (DBException& e) {
uassert(ErrorCodes::CommandNotSupportedOnView,
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index 5ac4777ad48..c503cce6024 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -168,7 +168,9 @@ bool AsyncResultsMerger::remotesExhausted() const {
bool AsyncResultsMerger::_remotesExhausted(WithLock) const {
for (const auto& remote : _remotes) {
- if (!remote.exhausted()) {
+ // If any remote has been invalidated, we must force the batch-building code to make another
+ // attempt to retrieve more results. This will (correctly) throw via _assertNotInvalidated.
+ if (!remote.exhausted() || remote.invalidated) {
return false;
}
}
diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp
index b9f68107a40..04a5040c7d8 100644
--- a/src/mongo/s/query/cluster_aggregate.cpp
+++ b/src/mongo/s/query/cluster_aggregate.cpp
@@ -323,14 +323,17 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
cm,
involvedNamespaces,
hasChangeStream,
- liteParsedPipeline.allowedToPassthroughFromMongos());
+ liteParsedPipeline.allowedToPassthroughFromMongos(),
+ request.getPassthroughToShard().has_value());
if (!expCtx) {
// When the AggregationTargeter chooses a "passthrough" policy, it does not call the
// 'pipelineBuilder' function, so we never get an expression context. Because this is a
// passthrough, we only need a bare minimum expression context anyway.
invariant(targeter.policy ==
- cluster_aggregation_planner::AggregationTargeter::kPassthrough);
+ cluster_aggregation_planner::AggregationTargeter::kPassthrough ||
+ targeter.policy ==
+ cluster_aggregation_planner::AggregationTargeter::kSpecificShardOnly);
expCtx = make_intrusive<ExpressionContext>(
opCtx, nullptr, namespaces.executionNss, boost::none, request.getLet());
}
@@ -388,6 +391,39 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
result,
hasChangeStream);
}
+ case cluster_aggregation_planner::AggregationTargeter::TargetingPolicy::
+ kSpecificShardOnly: {
+ // Mark expCtx as tailable and await data so CCC behaves accordingly.
+ expCtx->tailableMode = TailableModeEnum::kTailableAndAwaitData;
+
+ uassert(6273801,
+ "per shard cursor pipeline must contain $changeStream",
+ hasChangeStream);
+
+ // Make sure the rest of the pipeline can be pushed down.
+ auto pipeline = request.getPipeline();
+ std::vector<BSONObj> nonChangeStreamPart(pipeline.begin() + 1, pipeline.end());
+ LiteParsedPipeline nonChangeStreamLite(request.getNamespace(), nonChangeStreamPart);
+ uassert(6273802,
+ "$_passthroughToShard specified with a stage that is not allowed to "
+ "passthrough from mongos",
+ nonChangeStreamLite.allowedToPassthroughFromMongos());
+ ShardId shardId(std::string(request.getPassthroughToShard()->getShard()));
+ uassert(6273803,
+ "$_passthroughToShard not supported for queries against config replica set",
+ shardId != ShardId::kConfigServerId);
+
+ return cluster_aggregation_planner::runPipelineOnSpecificShardOnly(
+ expCtx,
+ namespaces,
+ boost::none,
+ request.getExplain(),
+ aggregation_request_helper::serializeToCommandDoc(request),
+ privileges,
+ shardId,
+ true,
+ result);
+ }
MONGO_UNREACHABLE;
}
diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp
index dcd4ffde2f0..2ccebf5870b 100644
--- a/src/mongo/s/query/cluster_aggregation_planner.cpp
+++ b/src/mongo/s/query/cluster_aggregation_planner.cpp
@@ -35,6 +35,7 @@
#include "mongo/bson/util/bson_extract.h"
#include "mongo/client/connpool.h"
#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/pipeline/change_stream_constants.h"
#include "mongo/db/pipeline/change_stream_invalidation_info.h"
#include "mongo/db/pipeline/document_source_limit.h"
#include "mongo/db/pipeline/document_source_skip.h"
@@ -101,7 +102,9 @@ AsyncRequestsSender::Response establishMergingShardCursor(OperationContext* opCt
ReadPreferenceSetting::get(opCtx),
sharded_agg_helpers::getDesiredRetryPolicy(opCtx));
const auto response = ars.next();
- invariant(ars.done());
+ tassert(6273807,
+ "requested and received data from just one shard, but results are still pending",
+ ars.done());
return response;
}
@@ -568,7 +571,11 @@ AggregationTargeter AggregationTargeter::make(
boost::optional<ChunkManager> cm,
stdx::unordered_set<NamespaceString> involvedNamespaces,
bool hasChangeStream,
- bool allowedToPassthrough) {
+ bool allowedToPassthrough,
+ bool perShardCursor) {
+ if (perShardCursor) {
+ return {TargetingPolicy::kSpecificShardOnly, nullptr, cm};
+ }
// Check if any of the involved collections are sharded.
bool involvesShardedCollections = [&]() {
@@ -617,67 +624,15 @@ Status runPipelineOnPrimaryShard(const boost::intrusive_ptr<ExpressionContext>&
Document serializedCommand,
const PrivilegeVector& privileges,
BSONObjBuilder* out) {
- auto opCtx = expCtx->opCtx;
-
- // Format the command for the shard. This adds the 'fromMongos' field, wraps the command as an
- // explain if necessary, and rewrites the result into a format safe to forward to shards.
- BSONObj cmdObj = sharded_agg_helpers::createPassthroughCommandForShard(expCtx,
- serializedCommand,
- explain,
- nullptr, /* pipeline */
- BSONObj());
-
- const auto shardId = cm.dbPrimary();
- const auto cmdObjWithShardVersion = (shardId != ShardId::kConfigServerId)
- ? appendShardVersion(std::move(cmdObj), ChunkVersion::UNSHARDED())
- : std::move(cmdObj);
-
- MultiStatementTransactionRequestsSender ars(
- opCtx,
- Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
- namespaces.executionNss.db().toString(),
- {{shardId, appendDbVersionIfPresent(cmdObjWithShardVersion, cm.dbVersion())}},
- ReadPreferenceSetting::get(opCtx),
- Shard::RetryPolicy::kIdempotent);
- auto response = ars.next();
- invariant(ars.done());
-
- uassertStatusOK(response.swResponse);
- auto commandStatus = getStatusFromCommandResult(response.swResponse.getValue().data);
-
- if (ErrorCodes::isStaleShardVersionError(commandStatus.code())) {
- uassertStatusOK(commandStatus.withContext("command failed because of stale config"));
- } else if (ErrorCodes::isSnapshotError(commandStatus.code())) {
- uassertStatusOK(
- commandStatus.withContext("command failed because can not establish a snapshot"));
- }
-
- BSONObj result;
- if (explain) {
- // If this was an explain, then we get back an explain result object rather than a cursor.
- result = response.swResponse.getValue().data;
- } else {
- result = uassertStatusOK(
- storePossibleCursor(opCtx,
- shardId,
- *response.shardHostAndPort,
- response.swResponse.getValue().data,
- namespaces.requestedNss,
- Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
- Grid::get(opCtx)->getCursorManager(),
- privileges,
- TailableModeEnum::kNormal));
- }
-
- // First append the properly constructed writeConcernError. It will then be skipped
- // in appendElementsUnique.
- if (auto wcErrorElem = result["writeConcernError"]) {
- appendWriteConcernErrorToCmdResponse(shardId, wcErrorElem, *out);
- }
-
- out->appendElementsUnique(CommandHelpers::filterCommandReplyForPassthrough(result));
-
- return getStatusFromCommandResult(out->asTempObj());
+ return runPipelineOnSpecificShardOnly(expCtx,
+ namespaces,
+ boost::optional<DatabaseVersion>(cm.dbVersion()),
+ explain,
+ serializedCommand,
+ privileges,
+ cm.dbPrimary(),
+ false,
+ out);
}
Status runPipelineOnMongoS(const ClusterAggregate::Namespaces& namespaces,
@@ -820,5 +775,96 @@ std::pair<BSONObj, boost::optional<UUID>> getCollationAndUUID(
return {collation.isEmpty() ? getCollation() : collation, getUUID()};
}
+Status runPipelineOnSpecificShardOnly(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const ClusterAggregate::Namespaces& namespaces,
+ boost::optional<DatabaseVersion> dbVersion,
+ boost::optional<ExplainOptions::Verbosity> explain,
+ Document serializedCommand,
+ const PrivilegeVector& privileges,
+ ShardId shardId,
+ bool forPerShardCursor,
+ BSONObjBuilder* out) {
+ auto opCtx = expCtx->opCtx;
+
+ boost::optional<int> overrideBatchSize;
+ if (forPerShardCursor) {
+ tassert(6273804,
+ "Per shard cursors are supposed to pass fromMongos: false to shards",
+ !expCtx->inMongos);
+ // By using an initial batchSize of zero all of the events will get returned through
+ // the getMore path and have metadata stripped out.
+ overrideBatchSize = 0;
+ }
+
+ // Format the command for the shard. This wraps the command as an explain if necessary, and
+ // rewrites the result into a format safe to forward to shards.
+ BSONObj cmdObj = sharded_agg_helpers::createPassthroughCommandForShard(expCtx,
+ serializedCommand,
+ explain,
+ nullptr, /* pipeline */
+ BSONObj(),
+ overrideBatchSize);
+
+ if (!forPerShardCursor && shardId != ShardId::kConfigServerId) {
+ cmdObj = appendShardVersion(std::move(cmdObj), ChunkVersion::UNSHARDED());
+ }
+ if (!forPerShardCursor) {
+ // Unless this is a per shard cursor, we need to send shard version info.
+ tassert(6377400, "Missing shard versioning information", dbVersion.has_value());
+ cmdObj = appendDbVersionIfPresent(std::move(cmdObj), *dbVersion);
+ }
+
+ MultiStatementTransactionRequestsSender ars(
+ opCtx,
+ Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ namespaces.executionNss.db().toString(),
+ {{shardId, cmdObj}},
+ ReadPreferenceSetting::get(opCtx),
+ Shard::RetryPolicy::kIdempotent);
+ auto response = ars.next();
+ tassert(6273806,
+ "requested and received data from just one shard, but results are still pending",
+ ars.done());
+
+ uassertStatusOK(response.swResponse);
+ auto commandStatus = getStatusFromCommandResult(response.swResponse.getValue().data);
+
+ if (ErrorCodes::isStaleShardVersionError(commandStatus.code())) {
+ uassertStatusOK(commandStatus.withContext("command failed because of stale config"));
+ } else if (ErrorCodes::isSnapshotError(commandStatus.code())) {
+ uassertStatusOK(
+ commandStatus.withContext("command failed because can not establish a snapshot"));
+ }
+
+ BSONObj result;
+ if (explain) {
+ // If this was an explain, then we get back an explain result object rather than a cursor.
+ result = response.swResponse.getValue().data;
+ } else {
+ result = uassertStatusOK(storePossibleCursor(
+ opCtx,
+ shardId,
+ *response.shardHostAndPort,
+ response.swResponse.getValue().data,
+ namespaces.requestedNss,
+ Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ Grid::get(opCtx)->getCursorManager(),
+ privileges,
+ expCtx->tailableMode,
+ forPerShardCursor ? boost::optional<BSONObj>(change_stream_constants::kSortSpec)
+ : boost::none));
+ }
+
+ // First append the properly constructed writeConcernError. It will then be skipped
+ // in appendElementsUnique.
+ if (auto wcErrorElem = result["writeConcernError"]) {
+ appendWriteConcernErrorToCmdResponse(shardId, wcErrorElem, *out);
+ }
+
+ out->appendElementsUnique(CommandHelpers::filterCommandReplyForPassthrough(result));
+
+ return getStatusFromCommandResult(out->asTempObj());
+}
+
} // namespace cluster_aggregation_planner
} // namespace mongo
diff --git a/src/mongo/s/query/cluster_aggregation_planner.h b/src/mongo/s/query/cluster_aggregation_planner.h
index 8cc0ccc4cea..f8e05fa2081 100644
--- a/src/mongo/s/query/cluster_aggregation_planner.h
+++ b/src/mongo/s/query/cluster_aggregation_planner.h
@@ -82,12 +82,14 @@ struct AggregationTargeter {
boost::optional<ChunkManager> cm,
stdx::unordered_set<NamespaceString> involvedNamespaces,
bool hasChangeStream,
- bool allowedToPassthrough);
+ bool allowedToPassthrough,
+ bool perShardCursor);
enum TargetingPolicy {
kPassthrough,
kMongosRequired,
kAnyShard,
+ kSpecificShardOnly,
} policy;
std::unique_ptr<Pipeline, PipelineDeleter> pipeline;
@@ -125,5 +127,20 @@ Status dispatchPipelineAndMerge(OperationContext* opCtx,
BSONObjBuilder* result,
bool hasChangeStream);
+/**
+ * Similar to runPipelineOnPrimaryShard but allows $changeStreams. Intended for use by per shard
+ * $changeStream cursors. Note: if forPerShardCursor is true shard versions will not be added to the
+ * request sent to mongod.
+ */
+Status runPipelineOnSpecificShardOnly(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const ClusterAggregate::Namespaces& namespaces,
+ boost::optional<DatabaseVersion> dbVersion,
+ boost::optional<ExplainOptions::Verbosity> explain,
+ Document serializedCommand,
+ const PrivilegeVector& privileges,
+ ShardId shardId,
+ bool forPerShardCursor,
+ BSONObjBuilder* out);
+
} // namespace cluster_aggregation_planner
} // namespace mongo
diff --git a/src/mongo/s/query/router_exec_stage.h b/src/mongo/s/query/router_exec_stage.h
index 7c5ee7aefb6..73c33b96da6 100644
--- a/src/mongo/s/query/router_exec_stage.h
+++ b/src/mongo/s/query/router_exec_stage.h
@@ -134,7 +134,7 @@ public:
* Returns the postBatchResumeToken if this RouterExecStage tree is executing a $changeStream;
* otherwise, returns an empty BSONObj. Default implementation forwards to the stage's child.
*/
- virtual BSONObj getPostBatchResumeToken() const {
+ virtual BSONObj getPostBatchResumeToken() {
return _child ? _child->getPostBatchResumeToken() : BSONObj();
}
diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h
index 0b088f12cbd..adb683fda81 100644
--- a/src/mongo/s/query/router_stage_merge.h
+++ b/src/mongo/s/query/router_stage_merge.h
@@ -71,6 +71,10 @@ public:
return _resultsMerger.getNumRemotes();
}
+ BSONObj getPostBatchResumeToken() final {
+ return _resultsMerger.getHighWaterMark();
+ }
+
protected:
Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final {
return _resultsMerger.setAwaitDataTimeout(awaitDataTimeout);
diff --git a/src/mongo/s/query/router_stage_pipeline.cpp b/src/mongo/s/query/router_stage_pipeline.cpp
index 4208f8fb254..ed1eb80782d 100644
--- a/src/mongo/s/query/router_stage_pipeline.cpp
+++ b/src/mongo/s/query/router_stage_pipeline.cpp
@@ -86,7 +86,7 @@ std::size_t RouterStagePipeline::getNumRemotes() const {
return 0;
}
-BSONObj RouterStagePipeline::getPostBatchResumeToken() const {
+BSONObj RouterStagePipeline::getPostBatchResumeToken() {
return _mergeCursorsStage ? _mergeCursorsStage->getHighWaterMark() : BSONObj();
}
diff --git a/src/mongo/s/query/router_stage_pipeline.h b/src/mongo/s/query/router_stage_pipeline.h
index de0dc25b310..88f5930e3d5 100644
--- a/src/mongo/s/query/router_stage_pipeline.h
+++ b/src/mongo/s/query/router_stage_pipeline.h
@@ -53,7 +53,7 @@ public:
std::size_t getNumRemotes() const final;
- BSONObj getPostBatchResumeToken() const final;
+ BSONObj getPostBatchResumeToken() final;
protected:
Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
diff --git a/src/mongo/s/query/store_possible_cursor.cpp b/src/mongo/s/query/store_possible_cursor.cpp
index 51c7612e059..bca770593b8 100644
--- a/src/mongo/s/query/store_possible_cursor.cpp
+++ b/src/mongo/s/query/store_possible_cursor.cpp
@@ -77,7 +77,8 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
std::shared_ptr<executor::TaskExecutor> executor,
ClusterCursorManager* cursorManager,
PrivilegeVector privileges,
- TailableModeEnum tailableMode) {
+ TailableModeEnum tailableMode,
+ boost::optional<BSONObj> routerSort) {
if (!cmdResult["ok"].trueValue() || !cmdResult.hasField("cursor")) {
return cmdResult;
}
@@ -107,14 +108,21 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
auto& remoteCursor = params.remotes.back();
remoteCursor.setShardId(shardId.toString());
remoteCursor.setHostAndPort(server);
- remoteCursor.setCursorResponse(CursorResponse(incomingCursorResponse.getValue().getNSS(),
- incomingCursorResponse.getValue().getCursorId(),
- {}));
+ remoteCursor.setCursorResponse(
+ CursorResponse(incomingCursorResponse.getValue().getNSS(),
+ incomingCursorResponse.getValue().getCursorId(),
+ {}, /* batch */
+ incomingCursorResponse.getValue().getAtClusterTime(),
+ incomingCursorResponse.getValue().getNumReturnedSoFar(),
+ incomingCursorResponse.getValue().getPostBatchResumeToken()));
params.originatingCommandObj = CurOp::get(opCtx)->opDescription().getOwned();
params.tailableMode = tailableMode;
params.lsid = opCtx->getLogicalSessionId();
params.txnNumber = opCtx->getTxnNumber();
params.originatingPrivileges = std::move(privileges);
+ if (routerSort) {
+ params.sortToApplyOnRouter = *routerSort;
+ }
if (TransactionRouter::get(opCtx)) {
params.isAutoCommit = false;
@@ -139,10 +147,13 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
CurOp::get(opCtx)->debug().cursorid = clusterCursorId.getValue();
- CursorResponse outgoingCursorResponse(requestedNss,
- clusterCursorId.getValue(),
- incomingCursorResponse.getValue().getBatch(),
- incomingCursorResponse.getValue().getAtClusterTime());
+ CursorResponse outgoingCursorResponse(
+ requestedNss,
+ clusterCursorId.getValue(),
+ incomingCursorResponse.getValue().getBatch(),
+ incomingCursorResponse.getValue().getAtClusterTime(),
+ incomingCursorResponse.getValue().getNumReturnedSoFar(),
+ incomingCursorResponse.getValue().getPostBatchResumeToken());
return outgoingCursorResponse.toBSON(CursorResponse::ResponseType::InitialResponse);
}
diff --git a/src/mongo/s/query/store_possible_cursor.h b/src/mongo/s/query/store_possible_cursor.h
index 43157322b0b..0bbaddfeaab 100644
--- a/src/mongo/s/query/store_possible_cursor.h
+++ b/src/mongo/s/query/store_possible_cursor.h
@@ -72,6 +72,8 @@ class TaskExecutor;
* @ cursorManager the ClusterCursorManager on which to register the resulting ClusterClientCursor
* @ privileges the PrivilegeVector of privileges needed for the original command, to be used for
* auth checking by GetMore
+ * @ routerSort the sort to apply on the router. With only one cursor this shouldn't be common, but
+ * is needed to set up change stream post-batch resume tokens correctly for per shard cursors.
*/
StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
const ShardId& shardId,
@@ -81,7 +83,8 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
std::shared_ptr<executor::TaskExecutor> executor,
ClusterCursorManager* cursorManager,
PrivilegeVector privileges,
- TailableModeEnum tailableMode = TailableModeEnum::kNormal);
+ TailableModeEnum tailableMode = TailableModeEnum::kNormal,
+ boost::optional<BSONObj> routerSort = boost::none);
/**
* Convenience function which extracts all necessary information from the passed RemoteCursor, and