From 586663fec7c3a7d4a8b0185ff24825bd15e80dff Mon Sep 17 00:00:00 2001 From: "Mickey. J Winters" Date: Fri, 25 Feb 2022 14:52:13 +0000 Subject: SERVER-62738 implement aggregate $_passthroughToShard option --- ...change_streams_per_shard_cursor_passthrough.yml | 16 ++ .../change_streams_per_shard_cursor.js | 216 +++++++++++++++++++++ src/mongo/db/commands/run_aggregate.cpp | 1 + src/mongo/db/pipeline/aggregate_command.idl | 11 +- src/mongo/db/pipeline/expression_context.h | 1 + src/mongo/db/pipeline/plan_executor_pipeline.cpp | 3 +- src/mongo/s/commands/cluster_map_reduce_agg.cpp | 14 +- src/mongo/s/query/cluster_aggregate.cpp | 40 +++- src/mongo/s/query/cluster_aggregation_planner.cpp | 119 +++++++++++- src/mongo/s/query/cluster_aggregation_planner.h | 19 +- src/mongo/s/query/store_possible_cursor.cpp | 6 +- src/mongo/s/query/store_possible_cursor.h | 5 +- 12 files changed, 436 insertions(+), 15 deletions(-) create mode 100644 jstests/noPassthrough/change_streams_per_shard_cursor.js diff --git a/buildscripts/resmokeconfig/suites/change_streams_per_shard_cursor_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_per_shard_cursor_passthrough.yml index db39ec22c09..6471aa541df 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_per_shard_cursor_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_per_shard_cursor_passthrough.yml @@ -3,6 +3,22 @@ test_kind: js_test selector: roots: - jstests/change_streams/**/*.js + exclude_files: + # TODO SERVER-63771 unblock all these tests. + - jstests/change_streams/apply_ops_resumability.js + - jstests/change_streams/apply_ops.js + - jstests/change_streams/ddl_create_event.js + - jstests/change_streams/does_not_implicitly_create_database.js + - jstests/change_streams/metadata_notifications.js + - jstests/change_streams/oplog_rewrite/change_stream_null_existence_eq_rewrite_test.js + - jstests/change_streams/report_post_batch_resume_token.js + - jstests/change_streams/resume_from_high_water_mark_token.js + - jstests/change_streams/shell_helper.js + - jstests/change_streams/show_expanded_events.js + - jstests/change_streams/start_after_invalidation_exception.js + - jstests/change_streams/whole_db_resumability.js + - jstests/change_streams/whole_db_metadata_notifications.js + - jstests/change_streams/whole_db.js exclude_with_any_tags: ## # The next tags correspond to the special errors thrown by the diff --git a/jstests/noPassthrough/change_streams_per_shard_cursor.js b/jstests/noPassthrough/change_streams_per_shard_cursor.js new file mode 100644 index 00000000000..0d1e0e2fceb --- /dev/null +++ b/jstests/noPassthrough/change_streams_per_shard_cursor.js @@ -0,0 +1,216 @@ +/** + * @tags: [ + * requires_sharding, + * uses_change_streams, + * ] + */ +(function() { +"use strict"; + +const checkPerShardCursorEnabled = () => { + const conn = MongoRunner.runMongod(); + const res = conn.adminCommand({ + getParameter: 1, + featureFlagPerShardCursor: 1, + }); + MongoRunner.stopMongod(conn); + return res.featureFlagPerShardCursor.value; +}; + +const dbName = jsTestName(); +const setupShardedCluster = (shards = 1) => { + const st = new ShardingTest( + {shards, mongos: 1, config: 1, rs: {nodes: 1, setParameter: {writePeriodicNoops: false}}}); + const sdb = st.s0.getDB(dbName); + assert.commandWorked(sdb.dropDatabase()); + + sdb.setProfilingLevel(0, -1); + st.shard0.getDB(dbName).setProfilingLevel(0, -1); + + // Shard the relevant collections. + assert.commandWorked(st.s.adminCommand({enableSharding: dbName})); + st.ensurePrimaryShard(dbName, st.shard0.name); + if (shards === 2) { + // Shard the collection on {_id: 1}, split at {_id: 0} and move the empty upper chunk to + // shard1. + st.shardColl("coll", {_id: 1}, {_id: 0}, {_id: 0}, dbName); + st.shardColl("coll2", {_id: 1}, {_id: 0}, {_id: 0}, dbName); + } else { + assert(shards === 1, "only 1 or 2 shards supported"); + assert.commandWorked(st.s.adminCommand({shardCollection: dbName + ".coll", key: {_id: 1}})); + assert.commandWorked( + st.s.adminCommand({shardCollection: dbName + ".coll2", key: {_id: 1}})); + } + + const shardId = st.shard0.shardName; + return [sdb, st, shardId]; +}; + +const pscWatch = (db, coll, shardId, options = {}, csOptions = {}) => { + let cmd = { + aggregate: coll, + cursor: {}, + pipeline: [{$changeStream: csOptions}], + $_passthroughToShard: {shard: shardId} + }; + cmd = Object.assign({}, cmd, options); + if (options.pipeline) { + cmd.pipeline = [{$changeStream: csOptions}].concat(options.pipeline); + } + const resp = db.runCommand(cmd); + assert.commandWorked(resp); + if (options.explain) { + return resp; + } + return new DBCommandCursor(db, resp); +}; + +if (!checkPerShardCursorEnabled()) { + let [sdb, st, shardId] = setupShardedCluster(); + + // Should only work with feature flag on. + assert.commandFailedWithCode(assert.throws(() => pscWatch(sdb, "coll", shardId)), 6273800); + st.stop(); + jsTestLog("Skipping the rest of the test because featureFlagPerSardCursor is not enabled"); + return; +} + +// Parsing +let [sdb, st, shardId] = setupShardedCluster(); + +// Should not allow pipeline without $changeStream. +assert.commandFailedWithCode(sdb.runCommand({ + aggregate: "coll", + cursor: {}, + pipeline: [{$match: {perfect: true}}], + $_passthroughToShard: {shard: shardId} +}), + 6273801); + +// $out can't passthrough so it's not allowed. +assert.commandFailedWithCode( + assert.throws(() => pscWatch(sdb, "coll", shardId, {pipeline: [{$out: "h"}]})), 6273802); + +// Shard option should be specified. +assert.commandFailedWithCode( + sdb.runCommand( + {aggregate: "coll", cursor: {}, pipeline: [{$changeStream: {}}], $_passthroughToShard: {}}), + 40414); + +// The shardId field should be a string. +assert.commandFailedWithCode(assert.throws(() => pscWatch(sdb, "coll", 42)), + ErrorCodes.TypeMismatch); +// Can't open a per shard cursor on the config RS. +assert.commandFailedWithCode(assert.throws(() => pscWatch(sdb, "coll", "config")), 6273803); + +// The shardId should be a valid shard. +assert.commandFailedWithCode( + assert.throws(() => pscWatch(sdb, "coll", "Dwane 'the Shard' Johnson")), + ErrorCodes.ShardNotFound); + +// Correctness. + +// Simple collection level watch +// this insert shouldn't show up since it happens before we make a cursor. +sdb.coll.insertOne({location: 1}); +let c = pscWatch(sdb, "coll", shardId); +// these inserts should show up since they're after we make a cursor. +for (let i = 1; i <= 4; i++) { + sdb.coll.insertOne({location: 2, i}); + assert(!c.isExhausted()); + assert(c.hasNext()); + c.next(); +} +assert(!c.hasNext()); + +// Simple database level watch +c = pscWatch(sdb, 1, shardId); + +sdb.coll.insertOne({location: 3}); +assert(!c.isExhausted()); +assert(c.hasNext()); +c.next(); + +sdb.coll2.insertOne({location: 4}); +assert(!c.isExhausted()); +assert(c.hasNext()); +c.next(); + +assert(!c.hasNext()); + +// Watching collection that doesn't exist yet. +c = pscWatch(sdb, "toBeCreated", shardId); +assert(!c.isExhausted()); +assert(!c.hasNext()); + +st.s.adminCommand({shardCollection: dbName + ".toBeCreated", key: {_id: 1}}); +assert(!c.isExhausted()); +assert(!c.hasNext()); + +sdb.toBeCreated.insertOne({location: 8}); +assert(!c.isExhausted()); +assert(c.hasNext()); +c.next(); + +assert(!c.hasNext()); + +// Explain output should not have a split pipeline. It should look like mongod explain output. +let explainOut = pscWatch(sdb, "coll", shardId, {explain: true}); +assert(!explainOut.hasOwnProperty("splitPipeline")); +assert.hasOwnProperty(explainOut, "stages"); + +// If we getMore an invalidated cursor the cursor should have been closed on mongos and we should +// get CursorNotFound, even if the invalidate event was never recieved by mongos. +[[], [{$match: {f: "filter out invalidate event"}}]].forEach((pipeline) => { + assert.commandWorked(st.s.adminCommand({shardCollection: dbName + ".toDrop", key: {_id: 1}})); + let c = pscWatch(sdb, "toDrop", shardId, {pipeline}); + sdb.toDrop.insertOne({}); + sdb.toDrop.drop(); + assert.commandFailedWithCode( + assert.throws(() => { + assert.retry(() => { + c._runGetMoreCommand(); + return false; + }, "change stream should have been invalidated by now", 4); + }), + ErrorCodes.CursorNotFound); +}); + +st.stop(); + +// Isolated from events on other shards. +[sdb, st, shardId] = setupShardedCluster(2); +c = pscWatch(sdb, "coll", shardId); + +sdb.coll.insertOne({location: 5, _id: -2}); +assert(!c.isExhausted()); +assert(c.hasNext()); +c.next(); + +sdb.coll.insertOne({location: 6, _id: 2}); +assert(!c.isExhausted()); +assert(!c.hasNext()); + +// Isolated from events on other shards with whole db. +c = pscWatch(sdb.getSiblingDB("admin"), 1, shardId, {}, {allChangesForCluster: true}); + +sdb.coll.insertOne({location: 7, _id: -3}); +assert(!c.isExhausted()); +assert(c.hasNext()); +c.next(); + +sdb.coll2.insertOne({location: 8, _id: -4}); +assert(!c.isExhausted()); +assert(c.hasNext()); +c.next(); + +sdb.coll.insertOne({location: 9, _id: 3}); +assert(!c.isExhausted()); +assert(!c.hasNext()); + +sdb.coll2.insertOne({location: 10, _id: 4}); +assert(!c.isExhausted()); +assert(!c.hasNext()); + +st.stop(); +})(); diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index cc5744e7334..5dfe03a12b5 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -450,6 +450,7 @@ boost::intrusive_ptr makeExpressionContext( expCtx->tempDir = storageGlobalParams.dbpath + "/_tmp"; expCtx->inMultiDocumentTransaction = opCtx->inMultiDocumentTransaction(); expCtx->collationMatchesDefault = collationMatchesDefault; + expCtx->forPerShardCursor = request.getPassthroughToShard().has_value(); return expCtx; } diff --git a/src/mongo/db/pipeline/aggregate_command.idl b/src/mongo/db/pipeline/aggregate_command.idl index da834d159f5..a30ce427578 100644 --- a/src/mongo/db/pipeline/aggregate_command.idl +++ b/src/mongo/db/pipeline/aggregate_command.idl @@ -62,6 +62,15 @@ types: serializer: ::mongo::serializeExplainToBSON deserializer: ::mongo::parseExplainModeFromBSON +structs: + PassthroughToShardOptions: + description: "options for commands requesting a per shard cursor" + fields: + shard: + description: "id of the shard to passthrough to" + type: string + unstable: true + commands: aggregate: description: "Represents the user-supplied options to the aggregate command." @@ -227,7 +236,7 @@ commands: unstable: true $_passthroughToShard: description: "An optional internal parameter for this request. If a shard key is specified, then that specific shard will be targeted." - type: object + type: PassthroughToShardOptions cpp_name: passthroughToShard optional: true unstable: true diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h index f6ba5812e86..fbce57cbbd9 100644 --- a/src/mongo/db/pipeline/expression_context.h +++ b/src/mongo/db/pipeline/expression_context.h @@ -377,6 +377,7 @@ public: bool fromMongos = false; bool needsMerge = false; bool inMongos = false; + bool forPerShardCursor = false; bool allowDiskUse = false; bool bypassDocumentValidation = false; bool inMultiDocumentTransaction = false; diff --git a/src/mongo/db/pipeline/plan_executor_pipeline.cpp b/src/mongo/db/pipeline/plan_executor_pipeline.cpp index 2b1d70b5681..e557f071b8a 100644 --- a/src/mongo/db/pipeline/plan_executor_pipeline.cpp +++ b/src/mongo/db/pipeline/plan_executor_pipeline.cpp @@ -79,7 +79,8 @@ PlanExecutor::ExecState PlanExecutorPipeline::getNext(BSONObj* objOut, RecordId* auto execState = getNextDocument(&docOut, nullptr); if (execState == PlanExecutor::ADVANCED) { // Include metadata if the output will be consumed by a merging node. - *objOut = _expCtx->needsMerge ? docOut.toBsonWithMetaData() : docOut.toBson(); + *objOut = _expCtx->needsMerge || _expCtx->forPerShardCursor ? docOut.toBsonWithMetaData() + : docOut.toBson(); } return execState; } diff --git a/src/mongo/s/commands/cluster_map_reduce_agg.cpp b/src/mongo/s/commands/cluster_map_reduce_agg.cpp index d478ef08d48..52a61ab9325 100644 --- a/src/mongo/s/commands/cluster_map_reduce_agg.cpp +++ b/src/mongo/s/commands/cluster_map_reduce_agg.cpp @@ -186,8 +186,9 @@ bool runAggregationMapReduce(OperationContext* opCtx, pipelineBuilder, cm, involvedNamespaces, - false, // hasChangeStream - true); // allowedToPassthrough + false, // hasChangeStream + true, // allowedToPassthrough + false); // perShardCursor try { switch (targeter.policy) { case cluster_aggregation_planner::AggregationTargeter::TargetingPolicy::kPassthrough: { @@ -209,8 +210,7 @@ bool runAggregationMapReduce(OperationContext* opCtx, case cluster_aggregation_planner::AggregationTargeter::TargetingPolicy:: kMongosRequired: { // Pipelines generated from mapReduce should never be required to run on mongos. - uasserted(31291, "Internal error during mapReduce translation"); - break; + MONGO_UNREACHABLE_TASSERT(31291); } case cluster_aggregation_planner::AggregationTargeter::TargetingPolicy::kAnyShard: { @@ -233,6 +233,12 @@ bool runAggregationMapReduce(OperationContext* opCtx, false)); // hasChangeStream break; } + + case cluster_aggregation_planner::AggregationTargeter::TargetingPolicy:: + kSpecificShardOnly: { + // It should not be possible to pass $_passthroughToShard to a map reduce command. + MONGO_UNREACHABLE_TASSERT(6273803); + } } } catch (DBException& e) { uassert(ErrorCodes::CommandNotSupportedOnView, diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp index 1950cbc0254..200cef980d2 100644 --- a/src/mongo/s/query/cluster_aggregate.cpp +++ b/src/mongo/s/query/cluster_aggregate.cpp @@ -325,14 +325,17 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, cm, involvedNamespaces, hasChangeStream, - liteParsedPipeline.allowedToPassthroughFromMongos()); + liteParsedPipeline.allowedToPassthroughFromMongos(), + request.getPassthroughToShard().has_value()); if (!expCtx) { // When the AggregationTargeter chooses a "passthrough" policy, it does not call the // 'pipelineBuilder' function, so we never get an expression context. Because this is a // passthrough, we only need a bare minimum expression context anyway. invariant(targeter.policy == - cluster_aggregation_planner::AggregationTargeter::kPassthrough); + cluster_aggregation_planner::AggregationTargeter::kPassthrough || + targeter.policy == + cluster_aggregation_planner::AggregationTargeter::kSpecificShardOnly); expCtx = make_intrusive( opCtx, nullptr, namespaces.executionNss, boost::none, request.getLet()); } @@ -390,6 +393,39 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, result, hasChangeStream); } + case cluster_aggregation_planner::AggregationTargeter::TargetingPolicy:: + kSpecificShardOnly: { + // Mark expCtx as tailable and await data so CCC behaves accordingly. + expCtx->tailableMode = TailableModeEnum::kTailableAndAwaitData; + + uassert(6273801, + "per shard cursor pipeline must contain $changeStream", + hasChangeStream); + + // Make sure the rest of the pipeline can be pushed down. + auto pipeline = request.getPipeline(); + std::vector nonChangeStreamPart(pipeline.begin() + 1, pipeline.end()); + LiteParsedPipeline nonChangeStreamLite(request.getNamespace(), nonChangeStreamPart); + uassert(6273802, + "$_passthroughToShard specified with a stage that is not allowed to " + "passthrough from mongos", + nonChangeStreamLite.allowedToPassthroughFromMongos()); + ShardId shardId(std::string(request.getPassthroughToShard()->getShard())); + uassert(6273803, + "$_passthroughToShard not supported for queries against config replica set", + shardId != ShardId::kConfigServerId); + + return cluster_aggregation_planner::runPipelineOnSpecificShardOnly( + expCtx, + namespaces, + *targeter.cm, + request.getExplain(), + aggregation_request_helper::serializeToCommandDoc(request), + privileges, + shardId, + true, + result); + } MONGO_UNREACHABLE; } diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp index 6062c02e6ba..9430ddb8270 100644 --- a/src/mongo/s/query/cluster_aggregation_planner.cpp +++ b/src/mongo/s/query/cluster_aggregation_planner.cpp @@ -35,6 +35,7 @@ #include "mongo/bson/util/bson_extract.h" #include "mongo/client/connpool.h" #include "mongo/db/auth/authorization_session.h" +#include "mongo/db/pipeline/change_stream_constants.h" #include "mongo/db/pipeline/change_stream_invalidation_info.h" #include "mongo/db/pipeline/document_source_limit.h" #include "mongo/db/pipeline/document_source_skip.h" @@ -101,7 +102,9 @@ AsyncRequestsSender::Response establishMergingShardCursor(OperationContext* opCt ReadPreferenceSetting::get(opCtx), sharded_agg_helpers::getDesiredRetryPolicy(opCtx)); const auto response = ars.next(); - invariant(ars.done()); + tassert(6273807, + "requested and received data from just one shard, but results are still pending", + ars.done()); return response; } @@ -567,7 +570,14 @@ AggregationTargeter AggregationTargeter::make( boost::optional cm, stdx::unordered_set involvedNamespaces, bool hasChangeStream, - bool allowedToPassthrough) { + bool allowedToPassthrough, + bool perShardCursor) { + if (perShardCursor) { + uassert(6273800, + "featureFlagPerShardCursor must be enabled to use $_passthroughToShard", + feature_flags::gFeatureFlagPerShardCursor.isEnabledAndIgnoreFCV()); + return {TargetingPolicy::kSpecificShardOnly, nullptr, cm}; + } // Check if any of the involved collections are sharded. bool involvesShardedCollections = [&]() { @@ -616,6 +626,19 @@ Status runPipelineOnPrimaryShard(const boost::intrusive_ptr& Document serializedCommand, const PrivilegeVector& privileges, BSONObjBuilder* out) { + if (feature_flags::gFeatureFlagPerShardCursor.isEnabledAndIgnoreFCV()) { + return runPipelineOnSpecificShardOnly(expCtx, + namespaces, + cm, + explain, + serializedCommand, + privileges, + cm.dbPrimary(), + false, + out); + } + // TODO SERVER-58673 remove the if statement here, remove code below and just call + // runPipelineOnSpecificDirectly. make sure to clean up divergence between the two functions. auto opCtx = expCtx->opCtx; // Format the command for the shard. This adds the 'fromMongos' field, wraps the command as an @@ -640,7 +663,9 @@ Status runPipelineOnPrimaryShard(const boost::intrusive_ptr& ReadPreferenceSetting::get(opCtx), Shard::RetryPolicy::kIdempotent); auto response = ars.next(); - invariant(ars.done()); + tassert(6273805, + "requested and received data from just one shard, but results are still pending", + ars.done()); uassertStatusOK(response.swResponse); auto commandStatus = getStatusFromCommandResult(response.swResponse.getValue().data); @@ -649,7 +674,7 @@ Status runPipelineOnPrimaryShard(const boost::intrusive_ptr& uassertStatusOK(commandStatus.withContext("command failed because of stale config")); } else if (ErrorCodes::isSnapshotError(commandStatus.code())) { uassertStatusOK( - commandStatus.withContext("command failed because can not establish a snapshot")); + commandStatus.withContext("command failed because establishing a snapshot failed")); } BSONObj result; @@ -820,5 +845,91 @@ std::pair> getCollationAndUUID( return {collation.isEmpty() ? getCollation() : collation, getUUID()}; } +Status runPipelineOnSpecificShardOnly(const boost::intrusive_ptr& expCtx, + const ClusterAggregate::Namespaces& namespaces, + const ChunkManager& cm, + boost::optional explain, + Document serializedCommand, + const PrivilegeVector& privileges, + ShardId shardId, + bool forPerShardCursor, + BSONObjBuilder* out) { + auto opCtx = expCtx->opCtx; + + if (forPerShardCursor) { + tassert(6273804, + "Per shard cursors are supposed to pass fromMongos: false to shards", + !expCtx->inMongos); + } + + // Format the command for the shard. This wraps the command as an explain if necessary, and + // rewrites the result into a format safe to forward to shards. + BSONObj cmdObj = sharded_agg_helpers::createPassthroughCommandForShard(expCtx, + serializedCommand, + explain, + nullptr, /* pipeline */ + BSONObj(), + boost::none); + + if (!forPerShardCursor && shardId != ShardId::kConfigServerId) { + cmdObj = appendShardVersion(std::move(cmdObj), ChunkVersion::UNSHARDED()); + } + if (!forPerShardCursor) { + // Per shard cursors should not send any shard version info. + cmdObj = appendDbVersionIfPresent(std::move(cmdObj), cm.dbVersion()); + } + + MultiStatementTransactionRequestsSender ars( + opCtx, + Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), + namespaces.executionNss.db().toString(), + {{shardId, cmdObj}}, + ReadPreferenceSetting::get(opCtx), + Shard::RetryPolicy::kIdempotent); + auto response = ars.next(); + tassert(6273806, + "requested and received data from just one shard, but results are still pending", + ars.done()); + + uassertStatusOK(response.swResponse); + auto commandStatus = getStatusFromCommandResult(response.swResponse.getValue().data); + + if (ErrorCodes::isStaleShardVersionError(commandStatus.code())) { + uassertStatusOK(commandStatus.withContext("command failed because of stale config")); + } else if (ErrorCodes::isSnapshotError(commandStatus.code())) { + uassertStatusOK( + commandStatus.withContext("command failed because can not establish a snapshot")); + } + + BSONObj result; + if (explain) { + // If this was an explain, then we get back an explain result object rather than a cursor. + result = response.swResponse.getValue().data; + } else { + result = uassertStatusOK(storePossibleCursor( + opCtx, + shardId, + *response.shardHostAndPort, + response.swResponse.getValue().data, + namespaces.requestedNss, + Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), + Grid::get(opCtx)->getCursorManager(), + privileges, + expCtx->tailableMode, + forPerShardCursor ? boost::optional(change_stream_constants::kSortSpec) + : boost::none)); + } + + // First append the properly constructed writeConcernError. It will then be skipped + // in appendElementsUnique. + if (auto wcErrorElem = result["writeConcernError"]) { + appendWriteConcernErrorToCmdResponse(shardId, wcErrorElem, *out); + } + + out->appendElementsUnique(CommandHelpers::filterCommandReplyForPassthrough(result)); + + return getStatusFromCommandResult(out->asTempObj()); +} + } // namespace cluster_aggregation_planner } // namespace mongo diff --git a/src/mongo/s/query/cluster_aggregation_planner.h b/src/mongo/s/query/cluster_aggregation_planner.h index cf8c52583ff..68c09cc1edb 100644 --- a/src/mongo/s/query/cluster_aggregation_planner.h +++ b/src/mongo/s/query/cluster_aggregation_planner.h @@ -82,12 +82,14 @@ struct AggregationTargeter { boost::optional cm, stdx::unordered_set involvedNamespaces, bool hasChangeStream, - bool allowedToPassthrough); + bool allowedToPassthrough, + bool perShardCursor); enum TargetingPolicy { kPassthrough, kMongosRequired, kAnyShard, + kSpecificShardOnly, } policy; std::unique_ptr pipeline; @@ -125,5 +127,20 @@ Status dispatchPipelineAndMerge(OperationContext* opCtx, BSONObjBuilder* result, bool hasChangeStream); +/** + * Similar to runPipelineOnPrimaryShard but allows $changeStreams. Intended for use by per shard + * $changeStream cursors. Note: if forPerShardCursor is true shard versions will not be added to the + * request sent to mongod. + */ +Status runPipelineOnSpecificShardOnly(const boost::intrusive_ptr& expCtx, + const ClusterAggregate::Namespaces& namespaces, + const ChunkManager& cm, + boost::optional explain, + Document serializedCommand, + const PrivilegeVector& privileges, + ShardId shardId, + bool forPerShardCursor, + BSONObjBuilder* out); + } // namespace cluster_aggregation_planner } // namespace mongo diff --git a/src/mongo/s/query/store_possible_cursor.cpp b/src/mongo/s/query/store_possible_cursor.cpp index 9560a139bbe..dd9c5b43d44 100644 --- a/src/mongo/s/query/store_possible_cursor.cpp +++ b/src/mongo/s/query/store_possible_cursor.cpp @@ -77,7 +77,8 @@ StatusWith storePossibleCursor(OperationContext* opCtx, std::shared_ptr executor, ClusterCursorManager* cursorManager, PrivilegeVector privileges, - TailableModeEnum tailableMode) { + TailableModeEnum tailableMode, + boost::optional routerSort) { if (!cmdResult["ok"].trueValue() || !cmdResult.hasField("cursor")) { return cmdResult; } @@ -115,6 +116,9 @@ StatusWith storePossibleCursor(OperationContext* opCtx, params.lsid = opCtx->getLogicalSessionId(); params.txnNumber = opCtx->getTxnNumber(); params.originatingPrivileges = std::move(privileges); + if (routerSort) { + params.sortToApplyOnRouter = *routerSort; + } if (TransactionRouter::get(opCtx)) { params.isAutoCommit = false; diff --git a/src/mongo/s/query/store_possible_cursor.h b/src/mongo/s/query/store_possible_cursor.h index 59f0a79c38a..a80302bc17d 100644 --- a/src/mongo/s/query/store_possible_cursor.h +++ b/src/mongo/s/query/store_possible_cursor.h @@ -72,6 +72,8 @@ class TaskExecutor; * @ cursorManager the ClusterCursorManager on which to register the resulting ClusterClientCursor * @ privileges the PrivilegeVector of privileges needed for the original command, to be used for * auth checking by GetMore + * @ routerSort the sort to apply on the router. With only one cursor this shouldn't be common, but + * is needed to set up change stream post-batch resume tokens correctly for per shard cursors. */ StatusWith storePossibleCursor(OperationContext* opCtx, const ShardId& shardId, @@ -81,7 +83,8 @@ StatusWith storePossibleCursor(OperationContext* opCtx, std::shared_ptr executor, ClusterCursorManager* cursorManager, PrivilegeVector privileges, - TailableModeEnum tailableMode = TailableModeEnum::kNormal); + TailableModeEnum tailableMode = TailableModeEnum::kNormal, + boost::optional routerSort = boost::none); /** * Convenience function which extracts all necessary information from the passed RemoteCursor, and -- cgit v1.2.1