summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMickey. J Winters <mickey.winters@mongodb.com>2022-02-25 14:52:13 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-02-25 15:56:46 +0000
commit586663fec7c3a7d4a8b0185ff24825bd15e80dff (patch)
tree57539dcde8d2a38184536582367a6c4f6c96a592
parentf01a90660cb0a0a22d6b2166cd8b70d7990a6b12 (diff)
downloadmongo-586663fec7c3a7d4a8b0185ff24825bd15e80dff.tar.gz
SERVER-62738 implement aggregate $_passthroughToShard option
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_per_shard_cursor_passthrough.yml16
-rw-r--r--jstests/noPassthrough/change_streams_per_shard_cursor.js216
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp1
-rw-r--r--src/mongo/db/pipeline/aggregate_command.idl11
-rw-r--r--src/mongo/db/pipeline/expression_context.h1
-rw-r--r--src/mongo/db/pipeline/plan_executor_pipeline.cpp3
-rw-r--r--src/mongo/s/commands/cluster_map_reduce_agg.cpp14
-rw-r--r--src/mongo/s/query/cluster_aggregate.cpp40
-rw-r--r--src/mongo/s/query/cluster_aggregation_planner.cpp119
-rw-r--r--src/mongo/s/query/cluster_aggregation_planner.h19
-rw-r--r--src/mongo/s/query/store_possible_cursor.cpp6
-rw-r--r--src/mongo/s/query/store_possible_cursor.h5
12 files changed, 436 insertions, 15 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_per_shard_cursor_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_per_shard_cursor_passthrough.yml
index db39ec22c09..6471aa541df 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_per_shard_cursor_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_per_shard_cursor_passthrough.yml
@@ -3,6 +3,22 @@ test_kind: js_test
selector:
roots:
- jstests/change_streams/**/*.js
+ exclude_files:
+ # TODO SERVER-63771 unblock all these tests.
+ - jstests/change_streams/apply_ops_resumability.js
+ - jstests/change_streams/apply_ops.js
+ - jstests/change_streams/ddl_create_event.js
+ - jstests/change_streams/does_not_implicitly_create_database.js
+ - jstests/change_streams/metadata_notifications.js
+ - jstests/change_streams/oplog_rewrite/change_stream_null_existence_eq_rewrite_test.js
+ - jstests/change_streams/report_post_batch_resume_token.js
+ - jstests/change_streams/resume_from_high_water_mark_token.js
+ - jstests/change_streams/shell_helper.js
+ - jstests/change_streams/show_expanded_events.js
+ - jstests/change_streams/start_after_invalidation_exception.js
+ - jstests/change_streams/whole_db_resumability.js
+ - jstests/change_streams/whole_db_metadata_notifications.js
+ - jstests/change_streams/whole_db.js
exclude_with_any_tags:
##
# The next tags correspond to the special errors thrown by the
diff --git a/jstests/noPassthrough/change_streams_per_shard_cursor.js b/jstests/noPassthrough/change_streams_per_shard_cursor.js
new file mode 100644
index 00000000000..0d1e0e2fceb
--- /dev/null
+++ b/jstests/noPassthrough/change_streams_per_shard_cursor.js
@@ -0,0 +1,216 @@
+/**
+ * @tags: [
+ * requires_sharding,
+ * uses_change_streams,
+ * ]
+ */
+(function() {
+"use strict";
+
+const checkPerShardCursorEnabled = () => {
+ const conn = MongoRunner.runMongod();
+ const res = conn.adminCommand({
+ getParameter: 1,
+ featureFlagPerShardCursor: 1,
+ });
+ MongoRunner.stopMongod(conn);
+ return res.featureFlagPerShardCursor.value;
+};
+
+const dbName = jsTestName();
+const setupShardedCluster = (shards = 1) => {
+ const st = new ShardingTest(
+ {shards, mongos: 1, config: 1, rs: {nodes: 1, setParameter: {writePeriodicNoops: false}}});
+ const sdb = st.s0.getDB(dbName);
+ assert.commandWorked(sdb.dropDatabase());
+
+ sdb.setProfilingLevel(0, -1);
+ st.shard0.getDB(dbName).setProfilingLevel(0, -1);
+
+ // Shard the relevant collections.
+ assert.commandWorked(st.s.adminCommand({enableSharding: dbName}));
+ st.ensurePrimaryShard(dbName, st.shard0.name);
+ if (shards === 2) {
+ // Shard the collection on {_id: 1}, split at {_id: 0} and move the empty upper chunk to
+ // shard1.
+ st.shardColl("coll", {_id: 1}, {_id: 0}, {_id: 0}, dbName);
+ st.shardColl("coll2", {_id: 1}, {_id: 0}, {_id: 0}, dbName);
+ } else {
+ assert(shards === 1, "only 1 or 2 shards supported");
+ assert.commandWorked(st.s.adminCommand({shardCollection: dbName + ".coll", key: {_id: 1}}));
+ assert.commandWorked(
+ st.s.adminCommand({shardCollection: dbName + ".coll2", key: {_id: 1}}));
+ }
+
+ const shardId = st.shard0.shardName;
+ return [sdb, st, shardId];
+};
+
+const pscWatch = (db, coll, shardId, options = {}, csOptions = {}) => {
+ let cmd = {
+ aggregate: coll,
+ cursor: {},
+ pipeline: [{$changeStream: csOptions}],
+ $_passthroughToShard: {shard: shardId}
+ };
+ cmd = Object.assign({}, cmd, options);
+ if (options.pipeline) {
+ cmd.pipeline = [{$changeStream: csOptions}].concat(options.pipeline);
+ }
+ const resp = db.runCommand(cmd);
+ assert.commandWorked(resp);
+ if (options.explain) {
+ return resp;
+ }
+ return new DBCommandCursor(db, resp);
+};
+
+if (!checkPerShardCursorEnabled()) {
+ let [sdb, st, shardId] = setupShardedCluster();
+
+ // Should only work with feature flag on.
+ assert.commandFailedWithCode(assert.throws(() => pscWatch(sdb, "coll", shardId)), 6273800);
+ st.stop();
+ jsTestLog("Skipping the rest of the test because featureFlagPerSardCursor is not enabled");
+ return;
+}
+
+// Parsing
+let [sdb, st, shardId] = setupShardedCluster();
+
+// Should not allow pipeline without $changeStream.
+assert.commandFailedWithCode(sdb.runCommand({
+ aggregate: "coll",
+ cursor: {},
+ pipeline: [{$match: {perfect: true}}],
+ $_passthroughToShard: {shard: shardId}
+}),
+ 6273801);
+
+// $out can't passthrough so it's not allowed.
+assert.commandFailedWithCode(
+ assert.throws(() => pscWatch(sdb, "coll", shardId, {pipeline: [{$out: "h"}]})), 6273802);
+
+// Shard option should be specified.
+assert.commandFailedWithCode(
+ sdb.runCommand(
+ {aggregate: "coll", cursor: {}, pipeline: [{$changeStream: {}}], $_passthroughToShard: {}}),
+ 40414);
+
+// The shardId field should be a string.
+assert.commandFailedWithCode(assert.throws(() => pscWatch(sdb, "coll", 42)),
+ ErrorCodes.TypeMismatch);
+// Can't open a per shard cursor on the config RS.
+assert.commandFailedWithCode(assert.throws(() => pscWatch(sdb, "coll", "config")), 6273803);
+
+// The shardId should be a valid shard.
+assert.commandFailedWithCode(
+ assert.throws(() => pscWatch(sdb, "coll", "Dwane 'the Shard' Johnson")),
+ ErrorCodes.ShardNotFound);
+
+// Correctness.
+
+// Simple collection level watch
+// this insert shouldn't show up since it happens before we make a cursor.
+sdb.coll.insertOne({location: 1});
+let c = pscWatch(sdb, "coll", shardId);
+// these inserts should show up since they're after we make a cursor.
+for (let i = 1; i <= 4; i++) {
+ sdb.coll.insertOne({location: 2, i});
+ assert(!c.isExhausted());
+ assert(c.hasNext());
+ c.next();
+}
+assert(!c.hasNext());
+
+// Simple database level watch
+c = pscWatch(sdb, 1, shardId);
+
+sdb.coll.insertOne({location: 3});
+assert(!c.isExhausted());
+assert(c.hasNext());
+c.next();
+
+sdb.coll2.insertOne({location: 4});
+assert(!c.isExhausted());
+assert(c.hasNext());
+c.next();
+
+assert(!c.hasNext());
+
+// Watching collection that doesn't exist yet.
+c = pscWatch(sdb, "toBeCreated", shardId);
+assert(!c.isExhausted());
+assert(!c.hasNext());
+
+st.s.adminCommand({shardCollection: dbName + ".toBeCreated", key: {_id: 1}});
+assert(!c.isExhausted());
+assert(!c.hasNext());
+
+sdb.toBeCreated.insertOne({location: 8});
+assert(!c.isExhausted());
+assert(c.hasNext());
+c.next();
+
+assert(!c.hasNext());
+
+// Explain output should not have a split pipeline. It should look like mongod explain output.
+let explainOut = pscWatch(sdb, "coll", shardId, {explain: true});
+assert(!explainOut.hasOwnProperty("splitPipeline"));
+assert.hasOwnProperty(explainOut, "stages");
+
+// If we getMore an invalidated cursor the cursor should have been closed on mongos and we should
+// get CursorNotFound, even if the invalidate event was never recieved by mongos.
+[[], [{$match: {f: "filter out invalidate event"}}]].forEach((pipeline) => {
+ assert.commandWorked(st.s.adminCommand({shardCollection: dbName + ".toDrop", key: {_id: 1}}));
+ let c = pscWatch(sdb, "toDrop", shardId, {pipeline});
+ sdb.toDrop.insertOne({});
+ sdb.toDrop.drop();
+ assert.commandFailedWithCode(
+ assert.throws(() => {
+ assert.retry(() => {
+ c._runGetMoreCommand();
+ return false;
+ }, "change stream should have been invalidated by now", 4);
+ }),
+ ErrorCodes.CursorNotFound);
+});
+
+st.stop();
+
+// Isolated from events on other shards.
+[sdb, st, shardId] = setupShardedCluster(2);
+c = pscWatch(sdb, "coll", shardId);
+
+sdb.coll.insertOne({location: 5, _id: -2});
+assert(!c.isExhausted());
+assert(c.hasNext());
+c.next();
+
+sdb.coll.insertOne({location: 6, _id: 2});
+assert(!c.isExhausted());
+assert(!c.hasNext());
+
+// Isolated from events on other shards with whole db.
+c = pscWatch(sdb.getSiblingDB("admin"), 1, shardId, {}, {allChangesForCluster: true});
+
+sdb.coll.insertOne({location: 7, _id: -3});
+assert(!c.isExhausted());
+assert(c.hasNext());
+c.next();
+
+sdb.coll2.insertOne({location: 8, _id: -4});
+assert(!c.isExhausted());
+assert(c.hasNext());
+c.next();
+
+sdb.coll.insertOne({location: 9, _id: 3});
+assert(!c.isExhausted());
+assert(!c.hasNext());
+
+sdb.coll2.insertOne({location: 10, _id: 4});
+assert(!c.isExhausted());
+assert(!c.hasNext());
+
+st.stop();
+})();
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index cc5744e7334..5dfe03a12b5 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -450,6 +450,7 @@ boost::intrusive_ptr<ExpressionContext> makeExpressionContext(
expCtx->tempDir = storageGlobalParams.dbpath + "/_tmp";
expCtx->inMultiDocumentTransaction = opCtx->inMultiDocumentTransaction();
expCtx->collationMatchesDefault = collationMatchesDefault;
+ expCtx->forPerShardCursor = request.getPassthroughToShard().has_value();
return expCtx;
}
diff --git a/src/mongo/db/pipeline/aggregate_command.idl b/src/mongo/db/pipeline/aggregate_command.idl
index da834d159f5..a30ce427578 100644
--- a/src/mongo/db/pipeline/aggregate_command.idl
+++ b/src/mongo/db/pipeline/aggregate_command.idl
@@ -62,6 +62,15 @@ types:
serializer: ::mongo::serializeExplainToBSON
deserializer: ::mongo::parseExplainModeFromBSON
+structs:
+ PassthroughToShardOptions:
+ description: "options for commands requesting a per shard cursor"
+ fields:
+ shard:
+ description: "id of the shard to passthrough to"
+ type: string
+ unstable: true
+
commands:
aggregate:
description: "Represents the user-supplied options to the aggregate command."
@@ -227,7 +236,7 @@ commands:
unstable: true
$_passthroughToShard:
description: "An optional internal parameter for this request. If a shard key is specified, then that specific shard will be targeted."
- type: object
+ type: PassthroughToShardOptions
cpp_name: passthroughToShard
optional: true
unstable: true
diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h
index f6ba5812e86..fbce57cbbd9 100644
--- a/src/mongo/db/pipeline/expression_context.h
+++ b/src/mongo/db/pipeline/expression_context.h
@@ -377,6 +377,7 @@ public:
bool fromMongos = false;
bool needsMerge = false;
bool inMongos = false;
+ bool forPerShardCursor = false;
bool allowDiskUse = false;
bool bypassDocumentValidation = false;
bool inMultiDocumentTransaction = false;
diff --git a/src/mongo/db/pipeline/plan_executor_pipeline.cpp b/src/mongo/db/pipeline/plan_executor_pipeline.cpp
index 2b1d70b5681..e557f071b8a 100644
--- a/src/mongo/db/pipeline/plan_executor_pipeline.cpp
+++ b/src/mongo/db/pipeline/plan_executor_pipeline.cpp
@@ -79,7 +79,8 @@ PlanExecutor::ExecState PlanExecutorPipeline::getNext(BSONObj* objOut, RecordId*
auto execState = getNextDocument(&docOut, nullptr);
if (execState == PlanExecutor::ADVANCED) {
// Include metadata if the output will be consumed by a merging node.
- *objOut = _expCtx->needsMerge ? docOut.toBsonWithMetaData() : docOut.toBson();
+ *objOut = _expCtx->needsMerge || _expCtx->forPerShardCursor ? docOut.toBsonWithMetaData()
+ : docOut.toBson();
}
return execState;
}
diff --git a/src/mongo/s/commands/cluster_map_reduce_agg.cpp b/src/mongo/s/commands/cluster_map_reduce_agg.cpp
index d478ef08d48..52a61ab9325 100644
--- a/src/mongo/s/commands/cluster_map_reduce_agg.cpp
+++ b/src/mongo/s/commands/cluster_map_reduce_agg.cpp
@@ -186,8 +186,9 @@ bool runAggregationMapReduce(OperationContext* opCtx,
pipelineBuilder,
cm,
involvedNamespaces,
- false, // hasChangeStream
- true); // allowedToPassthrough
+ false, // hasChangeStream
+ true, // allowedToPassthrough
+ false); // perShardCursor
try {
switch (targeter.policy) {
case cluster_aggregation_planner::AggregationTargeter::TargetingPolicy::kPassthrough: {
@@ -209,8 +210,7 @@ bool runAggregationMapReduce(OperationContext* opCtx,
case cluster_aggregation_planner::AggregationTargeter::TargetingPolicy::
kMongosRequired: {
// Pipelines generated from mapReduce should never be required to run on mongos.
- uasserted(31291, "Internal error during mapReduce translation");
- break;
+ MONGO_UNREACHABLE_TASSERT(31291);
}
case cluster_aggregation_planner::AggregationTargeter::TargetingPolicy::kAnyShard: {
@@ -233,6 +233,12 @@ bool runAggregationMapReduce(OperationContext* opCtx,
false)); // hasChangeStream
break;
}
+
+ case cluster_aggregation_planner::AggregationTargeter::TargetingPolicy::
+ kSpecificShardOnly: {
+ // It should not be possible to pass $_passthroughToShard to a map reduce command.
+ MONGO_UNREACHABLE_TASSERT(6273803);
+ }
}
} catch (DBException& e) {
uassert(ErrorCodes::CommandNotSupportedOnView,
diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp
index 1950cbc0254..200cef980d2 100644
--- a/src/mongo/s/query/cluster_aggregate.cpp
+++ b/src/mongo/s/query/cluster_aggregate.cpp
@@ -325,14 +325,17 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
cm,
involvedNamespaces,
hasChangeStream,
- liteParsedPipeline.allowedToPassthroughFromMongos());
+ liteParsedPipeline.allowedToPassthroughFromMongos(),
+ request.getPassthroughToShard().has_value());
if (!expCtx) {
// When the AggregationTargeter chooses a "passthrough" policy, it does not call the
// 'pipelineBuilder' function, so we never get an expression context. Because this is a
// passthrough, we only need a bare minimum expression context anyway.
invariant(targeter.policy ==
- cluster_aggregation_planner::AggregationTargeter::kPassthrough);
+ cluster_aggregation_planner::AggregationTargeter::kPassthrough ||
+ targeter.policy ==
+ cluster_aggregation_planner::AggregationTargeter::kSpecificShardOnly);
expCtx = make_intrusive<ExpressionContext>(
opCtx, nullptr, namespaces.executionNss, boost::none, request.getLet());
}
@@ -390,6 +393,39 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
result,
hasChangeStream);
}
+ case cluster_aggregation_planner::AggregationTargeter::TargetingPolicy::
+ kSpecificShardOnly: {
+ // Mark expCtx as tailable and await data so CCC behaves accordingly.
+ expCtx->tailableMode = TailableModeEnum::kTailableAndAwaitData;
+
+ uassert(6273801,
+ "per shard cursor pipeline must contain $changeStream",
+ hasChangeStream);
+
+ // Make sure the rest of the pipeline can be pushed down.
+ auto pipeline = request.getPipeline();
+ std::vector<BSONObj> nonChangeStreamPart(pipeline.begin() + 1, pipeline.end());
+ LiteParsedPipeline nonChangeStreamLite(request.getNamespace(), nonChangeStreamPart);
+ uassert(6273802,
+ "$_passthroughToShard specified with a stage that is not allowed to "
+ "passthrough from mongos",
+ nonChangeStreamLite.allowedToPassthroughFromMongos());
+ ShardId shardId(std::string(request.getPassthroughToShard()->getShard()));
+ uassert(6273803,
+ "$_passthroughToShard not supported for queries against config replica set",
+ shardId != ShardId::kConfigServerId);
+
+ return cluster_aggregation_planner::runPipelineOnSpecificShardOnly(
+ expCtx,
+ namespaces,
+ *targeter.cm,
+ request.getExplain(),
+ aggregation_request_helper::serializeToCommandDoc(request),
+ privileges,
+ shardId,
+ true,
+ result);
+ }
MONGO_UNREACHABLE;
}
diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp
index 6062c02e6ba..9430ddb8270 100644
--- a/src/mongo/s/query/cluster_aggregation_planner.cpp
+++ b/src/mongo/s/query/cluster_aggregation_planner.cpp
@@ -35,6 +35,7 @@
#include "mongo/bson/util/bson_extract.h"
#include "mongo/client/connpool.h"
#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/pipeline/change_stream_constants.h"
#include "mongo/db/pipeline/change_stream_invalidation_info.h"
#include "mongo/db/pipeline/document_source_limit.h"
#include "mongo/db/pipeline/document_source_skip.h"
@@ -101,7 +102,9 @@ AsyncRequestsSender::Response establishMergingShardCursor(OperationContext* opCt
ReadPreferenceSetting::get(opCtx),
sharded_agg_helpers::getDesiredRetryPolicy(opCtx));
const auto response = ars.next();
- invariant(ars.done());
+ tassert(6273807,
+ "requested and received data from just one shard, but results are still pending",
+ ars.done());
return response;
}
@@ -567,7 +570,14 @@ AggregationTargeter AggregationTargeter::make(
boost::optional<ChunkManager> cm,
stdx::unordered_set<NamespaceString> involvedNamespaces,
bool hasChangeStream,
- bool allowedToPassthrough) {
+ bool allowedToPassthrough,
+ bool perShardCursor) {
+ if (perShardCursor) {
+ uassert(6273800,
+ "featureFlagPerShardCursor must be enabled to use $_passthroughToShard",
+ feature_flags::gFeatureFlagPerShardCursor.isEnabledAndIgnoreFCV());
+ return {TargetingPolicy::kSpecificShardOnly, nullptr, cm};
+ }
// Check if any of the involved collections are sharded.
bool involvesShardedCollections = [&]() {
@@ -616,6 +626,19 @@ Status runPipelineOnPrimaryShard(const boost::intrusive_ptr<ExpressionContext>&
Document serializedCommand,
const PrivilegeVector& privileges,
BSONObjBuilder* out) {
+ if (feature_flags::gFeatureFlagPerShardCursor.isEnabledAndIgnoreFCV()) {
+ return runPipelineOnSpecificShardOnly(expCtx,
+ namespaces,
+ cm,
+ explain,
+ serializedCommand,
+ privileges,
+ cm.dbPrimary(),
+ false,
+ out);
+ }
+ // TODO SERVER-58673 remove the if statement here, remove code below and just call
+ // runPipelineOnSpecificDirectly. make sure to clean up divergence between the two functions.
auto opCtx = expCtx->opCtx;
// Format the command for the shard. This adds the 'fromMongos' field, wraps the command as an
@@ -640,7 +663,9 @@ Status runPipelineOnPrimaryShard(const boost::intrusive_ptr<ExpressionContext>&
ReadPreferenceSetting::get(opCtx),
Shard::RetryPolicy::kIdempotent);
auto response = ars.next();
- invariant(ars.done());
+ tassert(6273805,
+ "requested and received data from just one shard, but results are still pending",
+ ars.done());
uassertStatusOK(response.swResponse);
auto commandStatus = getStatusFromCommandResult(response.swResponse.getValue().data);
@@ -649,7 +674,7 @@ Status runPipelineOnPrimaryShard(const boost::intrusive_ptr<ExpressionContext>&
uassertStatusOK(commandStatus.withContext("command failed because of stale config"));
} else if (ErrorCodes::isSnapshotError(commandStatus.code())) {
uassertStatusOK(
- commandStatus.withContext("command failed because can not establish a snapshot"));
+ commandStatus.withContext("command failed because establishing a snapshot failed"));
}
BSONObj result;
@@ -820,5 +845,91 @@ std::pair<BSONObj, boost::optional<UUID>> getCollationAndUUID(
return {collation.isEmpty() ? getCollation() : collation, getUUID()};
}
+Status runPipelineOnSpecificShardOnly(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const ClusterAggregate::Namespaces& namespaces,
+ const ChunkManager& cm,
+ boost::optional<ExplainOptions::Verbosity> explain,
+ Document serializedCommand,
+ const PrivilegeVector& privileges,
+ ShardId shardId,
+ bool forPerShardCursor,
+ BSONObjBuilder* out) {
+ auto opCtx = expCtx->opCtx;
+
+ if (forPerShardCursor) {
+ tassert(6273804,
+ "Per shard cursors are supposed to pass fromMongos: false to shards",
+ !expCtx->inMongos);
+ }
+
+ // Format the command for the shard. This wraps the command as an explain if necessary, and
+ // rewrites the result into a format safe to forward to shards.
+ BSONObj cmdObj = sharded_agg_helpers::createPassthroughCommandForShard(expCtx,
+ serializedCommand,
+ explain,
+ nullptr, /* pipeline */
+ BSONObj(),
+ boost::none);
+
+ if (!forPerShardCursor && shardId != ShardId::kConfigServerId) {
+ cmdObj = appendShardVersion(std::move(cmdObj), ChunkVersion::UNSHARDED());
+ }
+ if (!forPerShardCursor) {
+ // Per shard cursors should not send any shard version info.
+ cmdObj = appendDbVersionIfPresent(std::move(cmdObj), cm.dbVersion());
+ }
+
+ MultiStatementTransactionRequestsSender ars(
+ opCtx,
+ Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ namespaces.executionNss.db().toString(),
+ {{shardId, cmdObj}},
+ ReadPreferenceSetting::get(opCtx),
+ Shard::RetryPolicy::kIdempotent);
+ auto response = ars.next();
+ tassert(6273806,
+ "requested and received data from just one shard, but results are still pending",
+ ars.done());
+
+ uassertStatusOK(response.swResponse);
+ auto commandStatus = getStatusFromCommandResult(response.swResponse.getValue().data);
+
+ if (ErrorCodes::isStaleShardVersionError(commandStatus.code())) {
+ uassertStatusOK(commandStatus.withContext("command failed because of stale config"));
+ } else if (ErrorCodes::isSnapshotError(commandStatus.code())) {
+ uassertStatusOK(
+ commandStatus.withContext("command failed because can not establish a snapshot"));
+ }
+
+ BSONObj result;
+ if (explain) {
+ // If this was an explain, then we get back an explain result object rather than a cursor.
+ result = response.swResponse.getValue().data;
+ } else {
+ result = uassertStatusOK(storePossibleCursor(
+ opCtx,
+ shardId,
+ *response.shardHostAndPort,
+ response.swResponse.getValue().data,
+ namespaces.requestedNss,
+ Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ Grid::get(opCtx)->getCursorManager(),
+ privileges,
+ expCtx->tailableMode,
+ forPerShardCursor ? boost::optional<BSONObj>(change_stream_constants::kSortSpec)
+ : boost::none));
+ }
+
+ // First append the properly constructed writeConcernError. It will then be skipped
+ // in appendElementsUnique.
+ if (auto wcErrorElem = result["writeConcernError"]) {
+ appendWriteConcernErrorToCmdResponse(shardId, wcErrorElem, *out);
+ }
+
+ out->appendElementsUnique(CommandHelpers::filterCommandReplyForPassthrough(result));
+
+ return getStatusFromCommandResult(out->asTempObj());
+}
+
} // namespace cluster_aggregation_planner
} // namespace mongo
diff --git a/src/mongo/s/query/cluster_aggregation_planner.h b/src/mongo/s/query/cluster_aggregation_planner.h
index cf8c52583ff..68c09cc1edb 100644
--- a/src/mongo/s/query/cluster_aggregation_planner.h
+++ b/src/mongo/s/query/cluster_aggregation_planner.h
@@ -82,12 +82,14 @@ struct AggregationTargeter {
boost::optional<ChunkManager> cm,
stdx::unordered_set<NamespaceString> involvedNamespaces,
bool hasChangeStream,
- bool allowedToPassthrough);
+ bool allowedToPassthrough,
+ bool perShardCursor);
enum TargetingPolicy {
kPassthrough,
kMongosRequired,
kAnyShard,
+ kSpecificShardOnly,
} policy;
std::unique_ptr<Pipeline, PipelineDeleter> pipeline;
@@ -125,5 +127,20 @@ Status dispatchPipelineAndMerge(OperationContext* opCtx,
BSONObjBuilder* result,
bool hasChangeStream);
+/**
+ * Similar to runPipelineOnPrimaryShard but allows $changeStreams. Intended for use by per shard
+ * $changeStream cursors. Note: if forPerShardCursor is true shard versions will not be added to the
+ * request sent to mongod.
+ */
+Status runPipelineOnSpecificShardOnly(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const ClusterAggregate::Namespaces& namespaces,
+ const ChunkManager& cm,
+ boost::optional<ExplainOptions::Verbosity> explain,
+ Document serializedCommand,
+ const PrivilegeVector& privileges,
+ ShardId shardId,
+ bool forPerShardCursor,
+ BSONObjBuilder* out);
+
} // namespace cluster_aggregation_planner
} // namespace mongo
diff --git a/src/mongo/s/query/store_possible_cursor.cpp b/src/mongo/s/query/store_possible_cursor.cpp
index 9560a139bbe..dd9c5b43d44 100644
--- a/src/mongo/s/query/store_possible_cursor.cpp
+++ b/src/mongo/s/query/store_possible_cursor.cpp
@@ -77,7 +77,8 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
std::shared_ptr<executor::TaskExecutor> executor,
ClusterCursorManager* cursorManager,
PrivilegeVector privileges,
- TailableModeEnum tailableMode) {
+ TailableModeEnum tailableMode,
+ boost::optional<BSONObj> routerSort) {
if (!cmdResult["ok"].trueValue() || !cmdResult.hasField("cursor")) {
return cmdResult;
}
@@ -115,6 +116,9 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
params.lsid = opCtx->getLogicalSessionId();
params.txnNumber = opCtx->getTxnNumber();
params.originatingPrivileges = std::move(privileges);
+ if (routerSort) {
+ params.sortToApplyOnRouter = *routerSort;
+ }
if (TransactionRouter::get(opCtx)) {
params.isAutoCommit = false;
diff --git a/src/mongo/s/query/store_possible_cursor.h b/src/mongo/s/query/store_possible_cursor.h
index 59f0a79c38a..a80302bc17d 100644
--- a/src/mongo/s/query/store_possible_cursor.h
+++ b/src/mongo/s/query/store_possible_cursor.h
@@ -72,6 +72,8 @@ class TaskExecutor;
* @ cursorManager the ClusterCursorManager on which to register the resulting ClusterClientCursor
* @ privileges the PrivilegeVector of privileges needed for the original command, to be used for
* auth checking by GetMore
+ * @ routerSort the sort to apply on the router. With only one cursor this shouldn't be common, but
+ * is needed to set up change stream post-batch resume tokens correctly for per shard cursors.
*/
StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
const ShardId& shardId,
@@ -81,7 +83,8 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
std::shared_ptr<executor::TaskExecutor> executor,
ClusterCursorManager* cursorManager,
PrivilegeVector privileges,
- TailableModeEnum tailableMode = TailableModeEnum::kNormal);
+ TailableModeEnum tailableMode = TailableModeEnum::kNormal,
+ boost::optional<BSONObj> routerSort = boost::none);
/**
* Convenience function which extracts all necessary information from the passed RemoteCursor, and