diff options
author | Mickey. J Winters <mickey.winters@mongodb.com> | 2022-09-02 16:50:44 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-09-07 22:46:38 +0000 |
commit | 4d5451b20fecaf4a054cca94de6962fcf28c577f (patch) | |
tree | dc2645876c2daac52bdf09f088bb876e9c3bc196 | |
parent | 4b37709d13d1ee29cb0420f875af4cd19d3c2a6c (diff) | |
download | mongo-4d5451b20fecaf4a054cca94de6962fcf28c577f.tar.gz |
SERVER-69002 backport per shard cursors
SERVER-62400
SERVER-62681
SERVER-62738
SERVER-63781
SERVER-63774
SERVER-63772
SERVER-63773
SERVER-58673
(cherry-picked from commit fc54ebd0137a25ea664c022b51b685667dd037c7)
(cherry-picked from commit 53d7bceee61f73a1d6959edb5d490c3b338f3c0d)
(cherry-picked from commit 586663fec7c3a7d4a8b0185ff24825bd15e80dff)
(cherry-picked from commit ef2a62dcc27461d2be1b619c75bc04effa1f2021)
(cherry-picked from commit 4f3626ff4486e672569699dfde1cc0ae8c54d348)
(cherry-picked from commit 0f7683455bc06b153f14368a3f05f0b69671717e)
(cherry-picked from commit 11d01816f743d6764c4f12c42697f5edf813ce27)
(cherry-picked from commit 1fe77b5bd9fb13f9eb74275359dcc4ba69f2d5e9)
22 files changed, 562 insertions, 96 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_per_shard_cursor_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_per_shard_cursor_passthrough.yml new file mode 100644 index 00000000000..db39ec22c09 --- /dev/null +++ b/buildscripts/resmokeconfig/suites/change_streams_per_shard_cursor_passthrough.yml @@ -0,0 +1,58 @@ +test_kind: js_test + +selector: + roots: + - jstests/change_streams/**/*.js + exclude_with_any_tags: + ## + # The next tags correspond to the special errors thrown by the + # set_read_and_write_concerns.js override when it refuses to replace the readConcern or + # writeConcern of a particular command. Above each tag are the message(s) that cause the tag to be + # warranted. + ## + # "Cowardly refusing to override write concern of command: ..." + - assumes_write_concern_unchanged + # Exclude any that assume sharding is disabled + - assumes_against_mongod_not_mongos + - assumes_unsharded_collection + +executor: + archive: + hooks: + - CheckReplDBHash + - ValidateCollections + config: + shell_options: + global_vars: + TestData: + defaultReadConcernLevel: null + enableMajorityReadConcern: '' + # Enable causal consistency for change streams suites using 1 node replica sets. See + # change_streams.yml for detailed explanation. + eval: >- + var testingReplication = true; + load('jstests/libs/override_methods/set_read_and_write_concerns.js'); + load('jstests/libs/override_methods/implicitly_shard_accessed_collections.js'); + load('jstests/libs/override_methods/enable_causal_consistency_without_read_pref.js'); + load('jstests/libs/override_methods/implicit_passthrough_to_shard_changestreams.js'); + hooks: + - class: CheckReplDBHash + - class: ValidateCollections + - class: CleanEveryN + n: 20 + fixture: + class: ShardedClusterFixture + mongos_options: + bind_ip_all: '' + set_parameters: + enableTestCommands: 1 + mongod_options: + bind_ip_all: '' + set_parameters: + enableTestCommands: 1 + writePeriodicNoops: 1 + periodicNoopIntervalSecs: 1 + coordinateCommitReturnImmediatelyAfterPersistingDecision: true + num_shards: 1 + enable_sharding: + - test diff --git a/etc/evergreen.yml b/etc/evergreen.yml index 10e4cd3c8b8..80ce89976f3 100644 --- a/etc/evergreen.yml +++ b/etc/evergreen.yml @@ -4028,6 +4028,13 @@ tasks: resmoke_args: --suites=change_streams_multi_stmt_txn_sharded_collections_passthrough --storageEngine=wiredTiger - <<: *task_template + name: change_streams_per_shard_cursor_passthrough + tags: ["change_streams"] + commands: + - func: "do setup" + - func: "run tests" + +- <<: *task_template name: disk_wiredtiger commands: - func: "do setup" diff --git a/jstests/libs/override_methods/implicit_passthrough_to_shard_changestreams.js b/jstests/libs/override_methods/implicit_passthrough_to_shard_changestreams.js new file mode 100644 index 00000000000..8e8779e7ca5 --- /dev/null +++ b/jstests/libs/override_methods/implicit_passthrough_to_shard_changestreams.js @@ -0,0 +1,47 @@ +/** + * Overrides runCommand to use the $_passthroughToShard parameter. The changestreams per-shard + * cursor passthrough suite ensures changestream tests can still run correctly on a single-shard + * cluster. By adding this parameter, we pass through to that single shard, running the pipelines + * directly on that mongod. This will test the machinery of per-shard cursors via mongos. + */ + +(function() { +'use strict'; + +load("jstests/libs/override_methods/override_helpers.js"); // For 'OverrideHelpers'. +load("jstests/libs/discover_topology.js"); // For 'DiscoverTopology'. + +// To be eligible, a command must be a changeStream request sent to a mongos. +const isEligibleForPerShardCursor = function(conn, cmdObj) { + if (!(cmdObj && cmdObj.aggregate && Array.isArray(cmdObj.pipeline) && + cmdObj.pipeline.length > 0 && typeof cmdObj.pipeline[0].$changeStream == "object" && + cmdObj.pipeline[0].$changeStream.constructor === Object)) { + return false; + } + return conn.isMongos(); +}; + +const discoverShardId = function(conn) { + const topology = DiscoverTopology.findConnectedNodes(conn); + const shards = topology.shards; + let shardName = Object.keys(shards)[0]; + return {shard: shardName}; +}; + +function runCommandWithPassthroughToShard( + conn, _dbName, _commandName, commandObj, func, makeFuncArgs) { + if (typeof commandObj !== "object" || commandObj === null) { + return func.apply(conn, makeFuncArgs(commandObj)); + } + if (!isEligibleForPerShardCursor(conn, commandObj)) { + return func.apply(conn, makeFuncArgs(commandObj)); + } + commandObj.$_passthroughToShard = discoverShardId(conn); + return func.apply(conn, makeFuncArgs(commandObj)); +} + +OverrideHelpers.prependOverrideInParallelShell( + "jstests/libs/override_methods/implicit_passthrough_to_shard_changestreams.js"); + +OverrideHelpers.overrideRunCommand(runCommandWithPassthroughToShard); +}()); diff --git a/jstests/noPassthrough/change_streams_per_shard_cursor.js b/jstests/noPassthrough/change_streams_per_shard_cursor.js new file mode 100644 index 00000000000..bece83e65c7 --- /dev/null +++ b/jstests/noPassthrough/change_streams_per_shard_cursor.js @@ -0,0 +1,196 @@ +/** + * @tags: [ + * requires_sharding, + * uses_change_streams, + * ] + */ +(function() { +"use strict"; + +const dbName = jsTestName(); +const setupShardedCluster = (shards = 1) => { + const st = new ShardingTest( + {shards, mongos: 1, config: 1, rs: {nodes: 1, setParameter: {writePeriodicNoops: false}}}); + const sdb = st.s0.getDB(dbName); + assert.commandWorked(sdb.dropDatabase()); + + sdb.setProfilingLevel(0, -1); + st.shard0.getDB(dbName).setProfilingLevel(0, -1); + + // Shard the relevant collections. + assert.commandWorked(st.s.adminCommand({enableSharding: dbName})); + st.ensurePrimaryShard(dbName, st.shard0.name); + if (shards === 2) { + // Shard the collection on {_id: 1}, split at {_id: 0} and move the empty upper chunk to + // shard1. + st.shardColl("coll", {_id: 1}, {_id: 0}, {_id: 0}, dbName); + st.shardColl("coll2", {_id: 1}, {_id: 0}, {_id: 0}, dbName); + } else { + assert(shards === 1, "only 1 or 2 shards supported"); + assert.commandWorked(st.s.adminCommand({shardCollection: dbName + ".coll", key: {_id: 1}})); + assert.commandWorked( + st.s.adminCommand({shardCollection: dbName + ".coll2", key: {_id: 1}})); + } + + const shardId = st.shard0.shardName; + return [sdb, st, shardId]; +}; + +const pscWatch = (db, coll, shardId, options = {}, csOptions = {}) => { + let cmd = { + aggregate: coll, + cursor: {}, + pipeline: [{$changeStream: csOptions}], + $_passthroughToShard: {shard: shardId} + }; + cmd = Object.assign({}, cmd, options); + if (options.pipeline) { + cmd.pipeline = [{$changeStream: csOptions}].concat(options.pipeline); + } + const resp = db.runCommand(cmd); + assert.commandWorked(resp); + if (options.explain) { + return resp; + } + return new DBCommandCursor(db, resp); +}; + +// Parsing +let [sdb, st, shardId] = setupShardedCluster(); + +// Should not allow pipeline without $changeStream. +assert.commandFailedWithCode(sdb.runCommand({ + aggregate: "coll", + cursor: {}, + pipeline: [{$match: {perfect: true}}], + $_passthroughToShard: {shard: shardId} +}), + 6273801); + +// $out can't passthrough so it's not allowed. +assert.commandFailedWithCode( + assert.throws(() => pscWatch(sdb, "coll", shardId, {pipeline: [{$out: "h"}]})), 6273802); + +// Shard option should be specified. +assert.commandFailedWithCode( + sdb.runCommand( + {aggregate: "coll", cursor: {}, pipeline: [{$changeStream: {}}], $_passthroughToShard: {}}), + 40414); + +// The shardId field should be a string. +assert.commandFailedWithCode(assert.throws(() => pscWatch(sdb, "coll", 42)), + ErrorCodes.TypeMismatch); +// Can't open a per shard cursor on the config RS. +assert.commandFailedWithCode(assert.throws(() => pscWatch(sdb, "coll", "config")), 6273803); + +// The shardId should be a valid shard. +assert.commandFailedWithCode( + assert.throws(() => pscWatch(sdb, "coll", "Dwane 'the Shard' Johnson")), + ErrorCodes.ShardNotFound); + +// Correctness. + +// Simple collection level watch +// this insert shouldn't show up since it happens before we make a cursor. +sdb.coll.insertOne({location: 1}); +let c = pscWatch(sdb, "coll", shardId); +// these inserts should show up since they're after we make a cursor. +for (let i = 1; i <= 4; i++) { + sdb.coll.insertOne({location: 2, i}); + assert(!c.isExhausted()); + assert(c.hasNext()); + c.next(); +} +assert(!c.hasNext()); + +// Simple database level watch +c = pscWatch(sdb, 1, shardId); + +sdb.coll.insertOne({location: 3}); +assert(!c.isExhausted()); +assert(c.hasNext()); +c.next(); + +sdb.coll2.insertOne({location: 4}); +assert(!c.isExhausted()); +assert(c.hasNext()); +c.next(); + +assert(!c.hasNext()); + +// Watching collection that doesn't exist yet. +c = pscWatch(sdb, "toBeCreated", shardId); +assert(!c.isExhausted()); +assert(!c.hasNext()); + +st.s.adminCommand({shardCollection: dbName + ".toBeCreated", key: {_id: 1}}); +assert(!c.isExhausted()); +assert(!c.hasNext()); + +sdb.toBeCreated.insertOne({location: 8}); +assert(!c.isExhausted()); +assert(c.hasNext()); +c.next(); + +assert(!c.hasNext()); + +// Explain output should not have a split pipeline. It should look like mongod explain output. +let explainOut = pscWatch(sdb, "coll", shardId, {explain: true}); +assert(!explainOut.hasOwnProperty("splitPipeline")); +assert.hasOwnProperty(explainOut, "stages"); + +// If we getMore an invalidated cursor the cursor should have been closed on mongos and we should +// get CursorNotFound, even if the invalidate event was never recieved by mongos. +[[], [{$match: {f: "filter out invalidate event"}}]].forEach((pipeline) => { + assert.commandWorked(st.s.adminCommand({shardCollection: dbName + ".toDrop", key: {_id: 1}})); + let c = pscWatch(sdb, "toDrop", shardId, {pipeline}); + sdb.toDrop.insertOne({}); + sdb.toDrop.drop(); + assert.commandFailedWithCode( + assert.throws(() => { + assert.retry(() => { + c._runGetMoreCommand(); + return false; + }, "change stream should have been invalidated by now", 4); + }), + ErrorCodes.CursorNotFound); +}); + +st.stop(); + +// Isolated from events on other shards. +[sdb, st, shardId] = setupShardedCluster(2); +c = pscWatch(sdb, "coll", shardId); + +sdb.coll.insertOne({location: 5, _id: -2}); +assert(!c.isExhausted()); +assert(c.hasNext()); +c.next(); + +sdb.coll.insertOne({location: 6, _id: 2}); +assert(!c.isExhausted()); +assert(!c.hasNext()); + +// Isolated from events on other shards with whole db. +c = pscWatch(sdb.getSiblingDB("admin"), 1, shardId, {}, {allChangesForCluster: true}); + +sdb.coll.insertOne({location: 7, _id: -3}); +assert(!c.isExhausted()); +assert(c.hasNext()); +c.next(); + +sdb.coll2.insertOne({location: 8, _id: -4}); +assert(!c.isExhausted()); +assert(c.hasNext()); +c.next(); + +sdb.coll.insertOne({location: 9, _id: 3}); +assert(!c.isExhausted()); +assert(!c.hasNext()); + +sdb.coll2.insertOne({location: 10, _id: 4}); +assert(!c.isExhausted()); +assert(!c.hasNext()); + +st.stop(); +})(); diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index 93012d0e40a..0393035063a 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -463,6 +463,7 @@ boost::intrusive_ptr<ExpressionContext> makeExpressionContext( CurOp::get(opCtx)->dbProfileLevel() > 0); expCtx->tempDir = storageGlobalParams.dbpath + "/_tmp"; expCtx->collationMatchesDefault = collationMatchesDefault; + expCtx->forPerShardCursor = request.getPassthroughToShard().has_value(); return expCtx; } diff --git a/src/mongo/db/pipeline/aggregate_command.idl b/src/mongo/db/pipeline/aggregate_command.idl index 1146df571e3..467f5cafb38 100644 --- a/src/mongo/db/pipeline/aggregate_command.idl +++ b/src/mongo/db/pipeline/aggregate_command.idl @@ -62,6 +62,15 @@ types: serializer: ::mongo::serializeExplainToBSON deserializer: ::mongo::parseExplainModeFromBSON +structs: + PassthroughToShardOptions: + description: "options for commands requesting a per shard cursor" + fields: + shard: + description: "id of the shard to passthrough to" + type: string + unstable: true + commands: aggregate: description: "Represents the user-supplied options to the aggregate command." @@ -230,3 +239,9 @@ commands: type: bool ignore: true unstable: true + $_passthroughToShard: + description: "An optional internal parameter for this request. If a shard key is specified, then that specific shard will be targeted." + type: PassthroughToShardOptions + cpp_name: passthroughToShard + optional: true + unstable: true diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h index c1dae29660a..6d54c695f8a 100644 --- a/src/mongo/db/pipeline/expression_context.h +++ b/src/mongo/db/pipeline/expression_context.h @@ -367,6 +367,7 @@ public: bool fromMongos = false; bool needsMerge = false; bool inMongos = false; + bool forPerShardCursor = false; bool allowDiskUse = false; bool bypassDocumentValidation = false; bool hasWhereClause = false; diff --git a/src/mongo/db/pipeline/plan_executor_pipeline.cpp b/src/mongo/db/pipeline/plan_executor_pipeline.cpp index 01d31be0ff8..4bd0ee5cf18 100644 --- a/src/mongo/db/pipeline/plan_executor_pipeline.cpp +++ b/src/mongo/db/pipeline/plan_executor_pipeline.cpp @@ -77,7 +77,8 @@ PlanExecutor::ExecState PlanExecutorPipeline::getNext(BSONObj* objOut, RecordId* auto execState = getNextDocument(&docOut, nullptr); if (execState == PlanExecutor::ADVANCED) { // Include metadata if the output will be consumed by a merging node. - *objOut = _expCtx->needsMerge ? docOut.toBsonWithMetaData() : docOut.toBson(); + *objOut = _expCtx->needsMerge || _expCtx->forPerShardCursor ? docOut.toBsonWithMetaData() + : docOut.toBson(); } return execState; } diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index 45ac9df6300..5ac435ce9a7 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -858,13 +858,24 @@ BSONObj createPassthroughCommandForShard( Document serializedCommand, boost::optional<ExplainOptions::Verbosity> explainVerbosity, Pipeline* pipeline, - BSONObj collationObj) { + BSONObj collationObj, + boost::optional<int> overrideBatchSize) { // Create the command for the shards. MutableDocument targetedCmd(serializedCommand); if (pipeline) { targetedCmd[AggregateCommandRequest::kPipelineFieldName] = Value(pipeline->serialize()); } + if (overrideBatchSize.has_value()) { + if (serializedCommand[AggregateCommandRequest::kCursorFieldName].missing()) { + targetedCmd[AggregateCommandRequest::kCursorFieldName] = + Value(DOC(SimpleCursorOptions::kBatchSizeFieldName << Value(*overrideBatchSize))); + } else { + targetedCmd[AggregateCommandRequest::kCursorFieldName] + [SimpleCursorOptions::kBatchSizeFieldName] = Value(*overrideBatchSize); + } + } + auto shardCommand = genericTransformForShards(std::move(targetedCmd), expCtx, explainVerbosity, collationObj); @@ -1007,8 +1018,12 @@ DispatchShardPipelineResults dispatchShardPipeline( (splitPipelines ? createCommandForTargetedShards( expCtx, serializedCommand, *splitPipelines, exchangeSpec, true /* needsMerge */) - : createPassthroughCommandForShard( - expCtx, serializedCommand, expCtx->explain, pipeline.get(), collationObj)); + : createPassthroughCommandForShard(expCtx, + serializedCommand, + expCtx->explain, + pipeline.get(), + collationObj, + boost::none)); // A $changeStream pipeline must run on all shards, and will also open an extra cursor on the // config server in order to monitor for new shards. To guarantee that we do not miss any diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.h b/src/mongo/db/pipeline/sharded_agg_helpers.h index 1114c1a1d1a..e2369deb2db 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.h +++ b/src/mongo/db/pipeline/sharded_agg_helpers.h @@ -131,7 +131,8 @@ BSONObj createPassthroughCommandForShard( Document serializedCommand, boost::optional<ExplainOptions::Verbosity> explainVerbosity, Pipeline* pipeline, - BSONObj collationObj); + BSONObj collationObj, + boost::optional<int> overrideBatchSize); BSONObj createCommandForTargetedShards(const boost::intrusive_ptr<ExpressionContext>& expCtx, Document serializedCommand, diff --git a/src/mongo/db/query/query_feature_flags.idl b/src/mongo/db/query/query_feature_flags.idl index b89670b06ea..10e233aed2b 100644 --- a/src/mongo/db/query/query_feature_flags.idl +++ b/src/mongo/db/query/query_feature_flags.idl @@ -6,21 +6,21 @@ # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # Server Side Public License for more details. # # You should have received a copy of the Server Side Public License -# along with this program. If not, see +# along with this program. If not, see # <http://www.mongodb.com/licensing/server-side-public-license>. # -# As a special exception, the copyright holders give permission to link the +# As a special exception, the copyright holders give permission to link the # code of portions of this program with the OpenSSL library under certain # conditions as described in each individual source file and distribute -# linked combinations including the program with the OpenSSL library. You -# must comply with the Server Side Public License in all respects for +# linked combinations including the program with the OpenSSL library. You +# must comply with the Server Side Public License in all respects for # all of the code used other than as permitted herein. If you modify file(s) -# with this exception, you may extend this exception to your version of the -# file(s), but you are not obligated to do so. If you do not wish to do so, +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, # delete this exception statement from your version. If you delete this # exception statement from all source files in the program, then also delete # it in the license file. @@ -58,4 +58,3 @@ feature_flags: cpp_varname: gFeatureFlagShardedTimeSeriesUpdateDelete default: true version: 5.0 - diff --git a/src/mongo/s/commands/cluster_map_reduce_agg.cpp b/src/mongo/s/commands/cluster_map_reduce_agg.cpp index 7cba5b51bd5..996f726cd67 100644 --- a/src/mongo/s/commands/cluster_map_reduce_agg.cpp +++ b/src/mongo/s/commands/cluster_map_reduce_agg.cpp @@ -187,8 +187,9 @@ bool runAggregationMapReduce(OperationContext* opCtx, pipelineBuilder, cm, involvedNamespaces, - false, // hasChangeStream - true); // allowedToPassthrough + false, // hasChangeStream + true, // allowedToPassthrough + false); // perShardCursor try { switch (targeter.policy) { case cluster_aggregation_planner::AggregationTargeter::TargetingPolicy::kPassthrough: { @@ -210,8 +211,7 @@ bool runAggregationMapReduce(OperationContext* opCtx, case cluster_aggregation_planner::AggregationTargeter::TargetingPolicy:: kMongosRequired: { // Pipelines generated from mapReduce should never be required to run on mongos. - uasserted(31291, "Internal error during mapReduce translation"); - break; + MONGO_UNREACHABLE_TASSERT(31291); } case cluster_aggregation_planner::AggregationTargeter::TargetingPolicy::kAnyShard: { @@ -234,6 +234,12 @@ bool runAggregationMapReduce(OperationContext* opCtx, false)); // hasChangeStream break; } + + case cluster_aggregation_planner::AggregationTargeter::TargetingPolicy:: + kSpecificShardOnly: { + // It should not be possible to pass $_passthroughToShard to a map reduce command. + MONGO_UNREACHABLE_TASSERT(6273803); + } } } catch (DBException& e) { uassert(ErrorCodes::CommandNotSupportedOnView, diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index 5ac4777ad48..c503cce6024 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -168,7 +168,9 @@ bool AsyncResultsMerger::remotesExhausted() const { bool AsyncResultsMerger::_remotesExhausted(WithLock) const { for (const auto& remote : _remotes) { - if (!remote.exhausted()) { + // If any remote has been invalidated, we must force the batch-building code to make another + // attempt to retrieve more results. This will (correctly) throw via _assertNotInvalidated. + if (!remote.exhausted() || remote.invalidated) { return false; } } diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp index b9f68107a40..04a5040c7d8 100644 --- a/src/mongo/s/query/cluster_aggregate.cpp +++ b/src/mongo/s/query/cluster_aggregate.cpp @@ -323,14 +323,17 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, cm, involvedNamespaces, hasChangeStream, - liteParsedPipeline.allowedToPassthroughFromMongos()); + liteParsedPipeline.allowedToPassthroughFromMongos(), + request.getPassthroughToShard().has_value()); if (!expCtx) { // When the AggregationTargeter chooses a "passthrough" policy, it does not call the // 'pipelineBuilder' function, so we never get an expression context. Because this is a // passthrough, we only need a bare minimum expression context anyway. invariant(targeter.policy == - cluster_aggregation_planner::AggregationTargeter::kPassthrough); + cluster_aggregation_planner::AggregationTargeter::kPassthrough || + targeter.policy == + cluster_aggregation_planner::AggregationTargeter::kSpecificShardOnly); expCtx = make_intrusive<ExpressionContext>( opCtx, nullptr, namespaces.executionNss, boost::none, request.getLet()); } @@ -388,6 +391,39 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, result, hasChangeStream); } + case cluster_aggregation_planner::AggregationTargeter::TargetingPolicy:: + kSpecificShardOnly: { + // Mark expCtx as tailable and await data so CCC behaves accordingly. + expCtx->tailableMode = TailableModeEnum::kTailableAndAwaitData; + + uassert(6273801, + "per shard cursor pipeline must contain $changeStream", + hasChangeStream); + + // Make sure the rest of the pipeline can be pushed down. + auto pipeline = request.getPipeline(); + std::vector<BSONObj> nonChangeStreamPart(pipeline.begin() + 1, pipeline.end()); + LiteParsedPipeline nonChangeStreamLite(request.getNamespace(), nonChangeStreamPart); + uassert(6273802, + "$_passthroughToShard specified with a stage that is not allowed to " + "passthrough from mongos", + nonChangeStreamLite.allowedToPassthroughFromMongos()); + ShardId shardId(std::string(request.getPassthroughToShard()->getShard())); + uassert(6273803, + "$_passthroughToShard not supported for queries against config replica set", + shardId != ShardId::kConfigServerId); + + return cluster_aggregation_planner::runPipelineOnSpecificShardOnly( + expCtx, + namespaces, + boost::none, + request.getExplain(), + aggregation_request_helper::serializeToCommandDoc(request), + privileges, + shardId, + true, + result); + } MONGO_UNREACHABLE; } diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp index dcd4ffde2f0..2ccebf5870b 100644 --- a/src/mongo/s/query/cluster_aggregation_planner.cpp +++ b/src/mongo/s/query/cluster_aggregation_planner.cpp @@ -35,6 +35,7 @@ #include "mongo/bson/util/bson_extract.h" #include "mongo/client/connpool.h" #include "mongo/db/auth/authorization_session.h" +#include "mongo/db/pipeline/change_stream_constants.h" #include "mongo/db/pipeline/change_stream_invalidation_info.h" #include "mongo/db/pipeline/document_source_limit.h" #include "mongo/db/pipeline/document_source_skip.h" @@ -101,7 +102,9 @@ AsyncRequestsSender::Response establishMergingShardCursor(OperationContext* opCt ReadPreferenceSetting::get(opCtx), sharded_agg_helpers::getDesiredRetryPolicy(opCtx)); const auto response = ars.next(); - invariant(ars.done()); + tassert(6273807, + "requested and received data from just one shard, but results are still pending", + ars.done()); return response; } @@ -568,7 +571,11 @@ AggregationTargeter AggregationTargeter::make( boost::optional<ChunkManager> cm, stdx::unordered_set<NamespaceString> involvedNamespaces, bool hasChangeStream, - bool allowedToPassthrough) { + bool allowedToPassthrough, + bool perShardCursor) { + if (perShardCursor) { + return {TargetingPolicy::kSpecificShardOnly, nullptr, cm}; + } // Check if any of the involved collections are sharded. bool involvesShardedCollections = [&]() { @@ -617,67 +624,15 @@ Status runPipelineOnPrimaryShard(const boost::intrusive_ptr<ExpressionContext>& Document serializedCommand, const PrivilegeVector& privileges, BSONObjBuilder* out) { - auto opCtx = expCtx->opCtx; - - // Format the command for the shard. This adds the 'fromMongos' field, wraps the command as an - // explain if necessary, and rewrites the result into a format safe to forward to shards. - BSONObj cmdObj = sharded_agg_helpers::createPassthroughCommandForShard(expCtx, - serializedCommand, - explain, - nullptr, /* pipeline */ - BSONObj()); - - const auto shardId = cm.dbPrimary(); - const auto cmdObjWithShardVersion = (shardId != ShardId::kConfigServerId) - ? appendShardVersion(std::move(cmdObj), ChunkVersion::UNSHARDED()) - : std::move(cmdObj); - - MultiStatementTransactionRequestsSender ars( - opCtx, - Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), - namespaces.executionNss.db().toString(), - {{shardId, appendDbVersionIfPresent(cmdObjWithShardVersion, cm.dbVersion())}}, - ReadPreferenceSetting::get(opCtx), - Shard::RetryPolicy::kIdempotent); - auto response = ars.next(); - invariant(ars.done()); - - uassertStatusOK(response.swResponse); - auto commandStatus = getStatusFromCommandResult(response.swResponse.getValue().data); - - if (ErrorCodes::isStaleShardVersionError(commandStatus.code())) { - uassertStatusOK(commandStatus.withContext("command failed because of stale config")); - } else if (ErrorCodes::isSnapshotError(commandStatus.code())) { - uassertStatusOK( - commandStatus.withContext("command failed because can not establish a snapshot")); - } - - BSONObj result; - if (explain) { - // If this was an explain, then we get back an explain result object rather than a cursor. - result = response.swResponse.getValue().data; - } else { - result = uassertStatusOK( - storePossibleCursor(opCtx, - shardId, - *response.shardHostAndPort, - response.swResponse.getValue().data, - namespaces.requestedNss, - Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), - Grid::get(opCtx)->getCursorManager(), - privileges, - TailableModeEnum::kNormal)); - } - - // First append the properly constructed writeConcernError. It will then be skipped - // in appendElementsUnique. - if (auto wcErrorElem = result["writeConcernError"]) { - appendWriteConcernErrorToCmdResponse(shardId, wcErrorElem, *out); - } - - out->appendElementsUnique(CommandHelpers::filterCommandReplyForPassthrough(result)); - - return getStatusFromCommandResult(out->asTempObj()); + return runPipelineOnSpecificShardOnly(expCtx, + namespaces, + boost::optional<DatabaseVersion>(cm.dbVersion()), + explain, + serializedCommand, + privileges, + cm.dbPrimary(), + false, + out); } Status runPipelineOnMongoS(const ClusterAggregate::Namespaces& namespaces, @@ -820,5 +775,96 @@ std::pair<BSONObj, boost::optional<UUID>> getCollationAndUUID( return {collation.isEmpty() ? getCollation() : collation, getUUID()}; } +Status runPipelineOnSpecificShardOnly(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const ClusterAggregate::Namespaces& namespaces, + boost::optional<DatabaseVersion> dbVersion, + boost::optional<ExplainOptions::Verbosity> explain, + Document serializedCommand, + const PrivilegeVector& privileges, + ShardId shardId, + bool forPerShardCursor, + BSONObjBuilder* out) { + auto opCtx = expCtx->opCtx; + + boost::optional<int> overrideBatchSize; + if (forPerShardCursor) { + tassert(6273804, + "Per shard cursors are supposed to pass fromMongos: false to shards", + !expCtx->inMongos); + // By using an initial batchSize of zero all of the events will get returned through + // the getMore path and have metadata stripped out. + overrideBatchSize = 0; + } + + // Format the command for the shard. This wraps the command as an explain if necessary, and + // rewrites the result into a format safe to forward to shards. + BSONObj cmdObj = sharded_agg_helpers::createPassthroughCommandForShard(expCtx, + serializedCommand, + explain, + nullptr, /* pipeline */ + BSONObj(), + overrideBatchSize); + + if (!forPerShardCursor && shardId != ShardId::kConfigServerId) { + cmdObj = appendShardVersion(std::move(cmdObj), ChunkVersion::UNSHARDED()); + } + if (!forPerShardCursor) { + // Unless this is a per shard cursor, we need to send shard version info. + tassert(6377400, "Missing shard versioning information", dbVersion.has_value()); + cmdObj = appendDbVersionIfPresent(std::move(cmdObj), *dbVersion); + } + + MultiStatementTransactionRequestsSender ars( + opCtx, + Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), + namespaces.executionNss.db().toString(), + {{shardId, cmdObj}}, + ReadPreferenceSetting::get(opCtx), + Shard::RetryPolicy::kIdempotent); + auto response = ars.next(); + tassert(6273806, + "requested and received data from just one shard, but results are still pending", + ars.done()); + + uassertStatusOK(response.swResponse); + auto commandStatus = getStatusFromCommandResult(response.swResponse.getValue().data); + + if (ErrorCodes::isStaleShardVersionError(commandStatus.code())) { + uassertStatusOK(commandStatus.withContext("command failed because of stale config")); + } else if (ErrorCodes::isSnapshotError(commandStatus.code())) { + uassertStatusOK( + commandStatus.withContext("command failed because can not establish a snapshot")); + } + + BSONObj result; + if (explain) { + // If this was an explain, then we get back an explain result object rather than a cursor. + result = response.swResponse.getValue().data; + } else { + result = uassertStatusOK(storePossibleCursor( + opCtx, + shardId, + *response.shardHostAndPort, + response.swResponse.getValue().data, + namespaces.requestedNss, + Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), + Grid::get(opCtx)->getCursorManager(), + privileges, + expCtx->tailableMode, + forPerShardCursor ? boost::optional<BSONObj>(change_stream_constants::kSortSpec) + : boost::none)); + } + + // First append the properly constructed writeConcernError. It will then be skipped + // in appendElementsUnique. + if (auto wcErrorElem = result["writeConcernError"]) { + appendWriteConcernErrorToCmdResponse(shardId, wcErrorElem, *out); + } + + out->appendElementsUnique(CommandHelpers::filterCommandReplyForPassthrough(result)); + + return getStatusFromCommandResult(out->asTempObj()); +} + } // namespace cluster_aggregation_planner } // namespace mongo diff --git a/src/mongo/s/query/cluster_aggregation_planner.h b/src/mongo/s/query/cluster_aggregation_planner.h index 8cc0ccc4cea..f8e05fa2081 100644 --- a/src/mongo/s/query/cluster_aggregation_planner.h +++ b/src/mongo/s/query/cluster_aggregation_planner.h @@ -82,12 +82,14 @@ struct AggregationTargeter { boost::optional<ChunkManager> cm, stdx::unordered_set<NamespaceString> involvedNamespaces, bool hasChangeStream, - bool allowedToPassthrough); + bool allowedToPassthrough, + bool perShardCursor); enum TargetingPolicy { kPassthrough, kMongosRequired, kAnyShard, + kSpecificShardOnly, } policy; std::unique_ptr<Pipeline, PipelineDeleter> pipeline; @@ -125,5 +127,20 @@ Status dispatchPipelineAndMerge(OperationContext* opCtx, BSONObjBuilder* result, bool hasChangeStream); +/** + * Similar to runPipelineOnPrimaryShard but allows $changeStreams. Intended for use by per shard + * $changeStream cursors. Note: if forPerShardCursor is true shard versions will not be added to the + * request sent to mongod. + */ +Status runPipelineOnSpecificShardOnly(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const ClusterAggregate::Namespaces& namespaces, + boost::optional<DatabaseVersion> dbVersion, + boost::optional<ExplainOptions::Verbosity> explain, + Document serializedCommand, + const PrivilegeVector& privileges, + ShardId shardId, + bool forPerShardCursor, + BSONObjBuilder* out); + } // namespace cluster_aggregation_planner } // namespace mongo diff --git a/src/mongo/s/query/router_exec_stage.h b/src/mongo/s/query/router_exec_stage.h index 7c5ee7aefb6..73c33b96da6 100644 --- a/src/mongo/s/query/router_exec_stage.h +++ b/src/mongo/s/query/router_exec_stage.h @@ -134,7 +134,7 @@ public: * Returns the postBatchResumeToken if this RouterExecStage tree is executing a $changeStream; * otherwise, returns an empty BSONObj. Default implementation forwards to the stage's child. */ - virtual BSONObj getPostBatchResumeToken() const { + virtual BSONObj getPostBatchResumeToken() { return _child ? _child->getPostBatchResumeToken() : BSONObj(); } diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h index 0b088f12cbd..adb683fda81 100644 --- a/src/mongo/s/query/router_stage_merge.h +++ b/src/mongo/s/query/router_stage_merge.h @@ -71,6 +71,10 @@ public: return _resultsMerger.getNumRemotes(); } + BSONObj getPostBatchResumeToken() final { + return _resultsMerger.getHighWaterMark(); + } + protected: Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final { return _resultsMerger.setAwaitDataTimeout(awaitDataTimeout); diff --git a/src/mongo/s/query/router_stage_pipeline.cpp b/src/mongo/s/query/router_stage_pipeline.cpp index 4208f8fb254..ed1eb80782d 100644 --- a/src/mongo/s/query/router_stage_pipeline.cpp +++ b/src/mongo/s/query/router_stage_pipeline.cpp @@ -86,7 +86,7 @@ std::size_t RouterStagePipeline::getNumRemotes() const { return 0; } -BSONObj RouterStagePipeline::getPostBatchResumeToken() const { +BSONObj RouterStagePipeline::getPostBatchResumeToken() { return _mergeCursorsStage ? _mergeCursorsStage->getHighWaterMark() : BSONObj(); } diff --git a/src/mongo/s/query/router_stage_pipeline.h b/src/mongo/s/query/router_stage_pipeline.h index de0dc25b310..88f5930e3d5 100644 --- a/src/mongo/s/query/router_stage_pipeline.h +++ b/src/mongo/s/query/router_stage_pipeline.h @@ -53,7 +53,7 @@ public: std::size_t getNumRemotes() const final; - BSONObj getPostBatchResumeToken() const final; + BSONObj getPostBatchResumeToken() final; protected: Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final; diff --git a/src/mongo/s/query/store_possible_cursor.cpp b/src/mongo/s/query/store_possible_cursor.cpp index 51c7612e059..bca770593b8 100644 --- a/src/mongo/s/query/store_possible_cursor.cpp +++ b/src/mongo/s/query/store_possible_cursor.cpp @@ -77,7 +77,8 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx, std::shared_ptr<executor::TaskExecutor> executor, ClusterCursorManager* cursorManager, PrivilegeVector privileges, - TailableModeEnum tailableMode) { + TailableModeEnum tailableMode, + boost::optional<BSONObj> routerSort) { if (!cmdResult["ok"].trueValue() || !cmdResult.hasField("cursor")) { return cmdResult; } @@ -107,14 +108,21 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx, auto& remoteCursor = params.remotes.back(); remoteCursor.setShardId(shardId.toString()); remoteCursor.setHostAndPort(server); - remoteCursor.setCursorResponse(CursorResponse(incomingCursorResponse.getValue().getNSS(), - incomingCursorResponse.getValue().getCursorId(), - {})); + remoteCursor.setCursorResponse( + CursorResponse(incomingCursorResponse.getValue().getNSS(), + incomingCursorResponse.getValue().getCursorId(), + {}, /* batch */ + incomingCursorResponse.getValue().getAtClusterTime(), + incomingCursorResponse.getValue().getNumReturnedSoFar(), + incomingCursorResponse.getValue().getPostBatchResumeToken())); params.originatingCommandObj = CurOp::get(opCtx)->opDescription().getOwned(); params.tailableMode = tailableMode; params.lsid = opCtx->getLogicalSessionId(); params.txnNumber = opCtx->getTxnNumber(); params.originatingPrivileges = std::move(privileges); + if (routerSort) { + params.sortToApplyOnRouter = *routerSort; + } if (TransactionRouter::get(opCtx)) { params.isAutoCommit = false; @@ -139,10 +147,13 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx, CurOp::get(opCtx)->debug().cursorid = clusterCursorId.getValue(); - CursorResponse outgoingCursorResponse(requestedNss, - clusterCursorId.getValue(), - incomingCursorResponse.getValue().getBatch(), - incomingCursorResponse.getValue().getAtClusterTime()); + CursorResponse outgoingCursorResponse( + requestedNss, + clusterCursorId.getValue(), + incomingCursorResponse.getValue().getBatch(), + incomingCursorResponse.getValue().getAtClusterTime(), + incomingCursorResponse.getValue().getNumReturnedSoFar(), + incomingCursorResponse.getValue().getPostBatchResumeToken()); return outgoingCursorResponse.toBSON(CursorResponse::ResponseType::InitialResponse); } diff --git a/src/mongo/s/query/store_possible_cursor.h b/src/mongo/s/query/store_possible_cursor.h index 43157322b0b..0bbaddfeaab 100644 --- a/src/mongo/s/query/store_possible_cursor.h +++ b/src/mongo/s/query/store_possible_cursor.h @@ -72,6 +72,8 @@ class TaskExecutor; * @ cursorManager the ClusterCursorManager on which to register the resulting ClusterClientCursor * @ privileges the PrivilegeVector of privileges needed for the original command, to be used for * auth checking by GetMore + * @ routerSort the sort to apply on the router. With only one cursor this shouldn't be common, but + * is needed to set up change stream post-batch resume tokens correctly for per shard cursors. */ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx, const ShardId& shardId, @@ -81,7 +83,8 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx, std::shared_ptr<executor::TaskExecutor> executor, ClusterCursorManager* cursorManager, PrivilegeVector privileges, - TailableModeEnum tailableMode = TailableModeEnum::kNormal); + TailableModeEnum tailableMode = TailableModeEnum::kNormal, + boost::optional<BSONObj> routerSort = boost::none); /** * Convenience function which extracts all necessary information from the passed RemoteCursor, and |