diff options
22 files changed, 532 insertions, 40 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_per_shard_cursor_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_per_shard_cursor_passthrough.yml new file mode 100644 index 00000000000..6e512ba4f16 --- /dev/null +++ b/buildscripts/resmokeconfig/suites/change_streams_per_shard_cursor_passthrough.yml @@ -0,0 +1,62 @@ +test_kind: js_test + +selector: + roots: + - jstests/change_streams/**/*.js + exclude_files: + # Exercises an internal detail of mongos<->mongod communication. Not expected to work on mongos. + - jstests/change_streams/report_latest_observed_oplog_timestamp.js + exclude_with_any_tags: + ## + # The next tags correspond to the special errors thrown by the + # set_read_and_write_concerns.js override when it refuses to replace the readConcern or + # writeConcern of a particular command. Above each tag are the message(s) that cause the tag to be + # warranted. + ## + # "Cowardly refusing to override write concern of command: ..." + - assumes_write_concern_unchanged + # Exclude any that assume sharding is disabled + - assumes_against_mongod_not_mongos + - assumes_unsharded_collection + +executor: + archive: + hooks: + - CheckReplDBHash + - ValidateCollections + config: + shell_options: + global_vars: + TestData: + defaultReadConcernLevel: null + enableMajorityReadConcern: '' + # Enable causal consistency for change streams suites using 1 node replica sets. See + # change_streams.yml for detailed explanation. + eval: >- + var testingReplication = true; + load('jstests/libs/override_methods/set_read_and_write_concerns.js'); + load('jstests/libs/override_methods/implicitly_shard_accessed_collections.js'); + load('jstests/libs/override_methods/enable_causal_consistency_without_read_pref.js'); + load('jstests/libs/override_methods/implicit_passthrough_to_shard_changestreams.js'); + readMode: commands + hooks: + - class: CheckReplDBHash + - class: ValidateCollections + - class: CleanEveryN + n: 20 + fixture: + class: ShardedClusterFixture + mongos_options: + bind_ip_all: '' + set_parameters: + enableTestCommands: 1 + mongod_options: + bind_ip_all: '' + set_parameters: + enableTestCommands: 1 + writePeriodicNoops: 1 + periodicNoopIntervalSecs: 1 + num_rs_nodes_per_shard: 1 + num_shards: 1 + enable_sharding: + - test diff --git a/etc/evergreen.yml b/etc/evergreen.yml index a7dbb0bb24a..d7cc573dcc7 100644 --- a/etc/evergreen.yml +++ b/etc/evergreen.yml @@ -4853,6 +4853,15 @@ tasks: resmoke_jobs_max: 1 - <<: *task_template + name: change_streams_per_shard_cursor_passthrough + tags: ["change_streams"] + commands: + - func: "do setup" + - func: "run tests" + vars: + resmoke_args: --suites=change_streams_per_shard_cursor_passthrough --storageEngine=wiredTiger + +- <<: *task_template name: disk_wiredtiger commands: - func: "do setup" diff --git a/jstests/change_streams/lookup_post_image.js b/jstests/change_streams/lookup_post_image.js index fa2658ed6f8..a062c9f83ba 100644 --- a/jstests/change_streams/lookup_post_image.js +++ b/jstests/change_streams/lookup_post_image.js @@ -202,9 +202,21 @@ assert.eq(latestChange.fullDocument, null); assertCreateCollection(db, coll.getName()); assert.writeOK(coll.insert({_id: "fullDocument is lookup 2"})); -// Confirm that the next entry's post-image is null since new collection has a different +// Confirm that the update's post-image is null since new collection has a different // UUID. -latestChange = cst.getOneChange(cursorBeforeDrop); +const cursorOldUUID = cst.startWatchingChanges({ + collection: coll, + pipeline: [ + {$changeStream: {fullDocument: "updateLookup", resumeAfter: deleteDocResumePoint}}, + {$match: {operationType: "update"}} + ], + aggregateOptions: {cursor: {batchSize: 0}} +}); + +// The next entry is the 'update' operation. Confirm that the next entry's post-image is null +// because the original collection (i.e. the collection that the 'update' was applied to) has +// been dropped and the new incarnation of the collection has a different UUID. +latestChange = cst.getOneChange(cursorOldUUID); assert.eq(latestChange.operationType, "update"); assert(latestChange.hasOwnProperty("fullDocument")); assert.eq(latestChange.fullDocument, null); diff --git a/jstests/libs/override_methods/implicit_passthrough_to_shard_changestreams.js b/jstests/libs/override_methods/implicit_passthrough_to_shard_changestreams.js new file mode 100644 index 00000000000..8e8779e7ca5 --- /dev/null +++ b/jstests/libs/override_methods/implicit_passthrough_to_shard_changestreams.js @@ -0,0 +1,47 @@ +/** + * Overrides runCommand to use the $_passthroughToShard parameter. The changestreams per-shard + * cursor passthrough suite ensures changestream tests can still run correctly on a single-shard + * cluster. By adding this parameter, we pass through to that single shard, running the pipelines + * directly on that mongod. This will test the machinery of per-shard cursors via mongos. + */ + +(function() { +'use strict'; + +load("jstests/libs/override_methods/override_helpers.js"); // For 'OverrideHelpers'. +load("jstests/libs/discover_topology.js"); // For 'DiscoverTopology'. + +// To be eligible, a command must be a changeStream request sent to a mongos. +const isEligibleForPerShardCursor = function(conn, cmdObj) { + if (!(cmdObj && cmdObj.aggregate && Array.isArray(cmdObj.pipeline) && + cmdObj.pipeline.length > 0 && typeof cmdObj.pipeline[0].$changeStream == "object" && + cmdObj.pipeline[0].$changeStream.constructor === Object)) { + return false; + } + return conn.isMongos(); +}; + +const discoverShardId = function(conn) { + const topology = DiscoverTopology.findConnectedNodes(conn); + const shards = topology.shards; + let shardName = Object.keys(shards)[0]; + return {shard: shardName}; +}; + +function runCommandWithPassthroughToShard( + conn, _dbName, _commandName, commandObj, func, makeFuncArgs) { + if (typeof commandObj !== "object" || commandObj === null) { + return func.apply(conn, makeFuncArgs(commandObj)); + } + if (!isEligibleForPerShardCursor(conn, commandObj)) { + return func.apply(conn, makeFuncArgs(commandObj)); + } + commandObj.$_passthroughToShard = discoverShardId(conn); + return func.apply(conn, makeFuncArgs(commandObj)); +} + +OverrideHelpers.prependOverrideInParallelShell( + "jstests/libs/override_methods/implicit_passthrough_to_shard_changestreams.js"); + +OverrideHelpers.overrideRunCommand(runCommandWithPassthroughToShard); +}()); diff --git a/jstests/noPassthrough/change_streams_per_shard_cursor.js b/jstests/noPassthrough/change_streams_per_shard_cursor.js new file mode 100644 index 00000000000..41effab8619 --- /dev/null +++ b/jstests/noPassthrough/change_streams_per_shard_cursor.js @@ -0,0 +1,207 @@ +/** + * @tags: [ + * requires_sharding, + * uses_change_streams, + * ] + */ +(function() { +"use strict"; + +const dbName = jsTestName(); +const setupShardedCluster = (shards = 1) => { + const st = new ShardingTest( + {shards, mongos: 1, config: 1, rs: {nodes: 1, setParameter: {writePeriodicNoops: false}}}); + const sdb = st.s0.getDB(dbName); + assert.commandWorked(sdb.dropDatabase()); + + sdb.getSiblingDB("admin").setProfilingLevel(0, -1); + st.shard0.getDB(dbName).setProfilingLevel(0, -1); + + // Shard the relevant collections. + assert.commandWorked(st.s.adminCommand({enableSharding: dbName})); + st.ensurePrimaryShard(dbName, st.shard0.name); + if (shards === 2) { + // Shard the collection on {_id: 1}, split at {_id: 0} and move the empty upper chunk to + // shard1. + st.shardColl("coll", {_id: 1}, {_id: 0}, {_id: 0}, dbName); + st.shardColl("coll2", {_id: 1}, {_id: 0}, {_id: 0}, dbName); + } else { + assert(shards === 1, "only 1 or 2 shards supported"); + assert.commandWorked(st.s.adminCommand({shardCollection: dbName + ".coll", key: {_id: 1}})); + assert.commandWorked( + st.s.adminCommand({shardCollection: dbName + ".coll2", key: {_id: 1}})); + } + + const shardId = st.shard0.shardName; + return [sdb, st, shardId]; +}; + +const pscWatch = (db, coll, shardId, options = {}, csOptions = {}) => { + let cmd = { + aggregate: coll, + cursor: {}, + pipeline: [{$changeStream: csOptions}], + $_passthroughToShard: {shard: shardId} + }; + cmd = Object.assign({}, cmd, options); + if (options.pipeline) { + cmd.pipeline = [{$changeStream: csOptions}].concat(options.pipeline); + } + const resp = db.runCommand(cmd); + assert.commandWorked(resp); + if (options.explain) { + return resp; + } + return new DBCommandCursor(db, resp); +}; + +// Parsing +let [sdb, st, shardId] = setupShardedCluster(); + +// Should not allow pipeline without $changeStream. +assert.commandFailedWithCode(sdb.runCommand({ + aggregate: "coll", + cursor: {}, + pipeline: [{$match: {perfect: true}}], + $_passthroughToShard: {shard: shardId} +}), + 6273801); + +// $out can't passthrough so it's not allowed. +assert.commandFailedWithCode( + assert.throws(() => pscWatch(sdb, "coll", shardId, {pipeline: [{$out: "h"}]})), 6273802); + +// Shard option should be specified. +assert.commandFailedWithCode( + sdb.runCommand( + {aggregate: "coll", cursor: {}, pipeline: [{$changeStream: {}}], $_passthroughToShard: {}}), + ErrorCodes.FailedToParse); + +// The shardId field should be a string. +assert.commandFailedWithCode(assert.throws(() => pscWatch(sdb, "coll", 42)), + ErrorCodes.TypeMismatch); +// Can't open a per shard cursor on the config RS. +assert.commandFailedWithCode(assert.throws(() => pscWatch(sdb, "coll", "config")), 6273803); + +// The shardId should be a valid shard. +assert.commandFailedWithCode( + assert.throws(() => pscWatch(sdb, "coll", "Dwane 'the Shard' Johnson")), + ErrorCodes.ShardNotFound); + +// Correctness. + +// Simple collection level watch +// this insert shouldn't show up since it happens before we make a cursor. +sdb.coll.insertOne({location: 1}); +let c = pscWatch(sdb, "coll", shardId); +// these inserts should show up since they're after we make a cursor. +for (let i = 1; i <= 4; i++) { + sdb.coll.insertOne({location: 2, i}); + assert(!c.isExhausted()); + assert.soon(() => c.hasNext()); + c.next(); +} +assert(!c.hasNext()); + +// Simple database level watch +c = pscWatch(sdb, 1, shardId); + +sdb.coll.insertOne({location: 3}); +assert(!c.isExhausted()); +assert.soon(() => c.hasNext()); +c.next(); + +sdb.coll2.insertOne({location: 4}); +assert(!c.isExhausted()); +assert.soon(() => c.hasNext()); +c.next(); + +assert(!c.hasNext()); + +// Watching collection that doesn't exist yet. +c = pscWatch(sdb, "toBeCreated", shardId); +assert(!c.isExhausted()); +assert(!c.hasNext()); + +st.s.adminCommand({shardCollection: dbName + ".toBeCreated", key: {_id: 1}}); +assert(!c.isExhausted()); +assert(!c.hasNext()); + +sdb.toBeCreated.insertOne({location: 8}); +assert(!c.isExhausted()); +assert.soon(() => c.hasNext()); +c.next(); + +assert(!c.hasNext()); + +// Explain output should not have a split pipeline. It should look like mongod explain output. +let explainOut = pscWatch(sdb, "coll", shardId, {explain: true}); +assert(!explainOut.hasOwnProperty("splitPipeline")); +assert.hasOwnProperty(explainOut, "stages"); + +// If we getMore an invalidated cursor the cursor should have been closed on mongos and we should +// get CursorNotFound, even if the invalidate event was never recieved by mongos. +[[], [{$match: {f: "filter out invalidate event"}}]].forEach((pipeline) => { + assert.commandWorked(st.s.adminCommand({shardCollection: dbName + ".toDrop", key: {_id: 1}})); + let c = pscWatch(sdb, "toDrop", shardId, {pipeline}); + let cid = c._cursorid; + sdb.toDrop.insertOne({}); + sdb.toDrop.drop(); + assert.retry(() => { + // After an invalidate the cursorid gets set to 0 + if (c._cursorid == 0) { + return true; + } + c._runGetMoreCommand(); + return false; + }, "change stream should have been invalidated by now", 4); + let res = sdb.runCommand({getMore: cid, collection: "toDrop"}); + assert.eq(res.code, ErrorCodes.CursorNotFound); +}); + +// Test getMore batchSize: 1 actually gives you 1 +c = pscWatch(sdb, "coll", shardId); +let cid = c._cursorid; +sdb.coll.insertMany([{}, {}]); +let res = sdb.runCommand({getMore: cid, collection: "coll", batchSize: 1}); +assert.commandWorked(res); +assert.eq(res.cursor.nextBatch.length, 1); + +st.stop(); + +// Isolated from events on other shards. +[sdb, st, shardId] = setupShardedCluster(2); +c = pscWatch(sdb, "coll", shardId); + +sdb.coll.insertOne({location: 5, _id: -2}); +assert(!c.isExhausted()); +assert.soon(() => c.hasNext()); +c.next(); + +sdb.coll.insertOne({location: 6, _id: 2}); +assert(!c.isExhausted()); +assert(!c.hasNext()); + +// Isolated from events on other shards with whole db. +c = pscWatch(sdb.getSiblingDB("admin"), 1, shardId, {}, {allChangesForCluster: true}); + +sdb.coll.insertOne({location: 7, _id: -3}); +assert(!c.isExhausted()); +assert.soon(() => c.hasNext()); +c.next(); + +sdb.coll2.insertOne({location: 8, _id: -4}); +assert(!c.isExhausted()); +assert.soon(() => c.hasNext()); +c.next(); + +sdb.coll.insertOne({location: 9, _id: 3}); +assert(!c.isExhausted()); +assert(!c.hasNext()); + +sdb.coll2.insertOne({location: 10, _id: 4}); +assert(!c.isExhausted()); +assert(!c.hasNext()); + +st.stop(); +})(); diff --git a/src/mongo/db/exec/change_stream_proxy.cpp b/src/mongo/db/exec/change_stream_proxy.cpp index 0de28de0b55..5bef2cd87ce 100644 --- a/src/mongo/db/exec/change_stream_proxy.cpp +++ b/src/mongo/db/exec/change_stream_proxy.cpp @@ -79,8 +79,9 @@ boost::optional<BSONObj> ChangeStreamProxyStage::getNextBson() { } BSONObj ChangeStreamProxyStage::_validateAndConvertToBSON(const Document& event) const { - // If we are producing output to be merged on mongoS, then no stages can have modified the _id. - if (_includeMetaData) { + // If we are producing output to be merged on mongoS, then the _id cannot have been modified by + // any stage. + if (_includeMetaData && !_pipeline->getContext()->forPerShardCursor) { return event.toBsonWithMetaData(); } // Confirm that the document _id field matches the original resume token in the sort key field. @@ -97,6 +98,13 @@ BSONObj ChangeStreamProxyStage::_validateAndConvertToBSON(const Document& event) << BSON("_id" << resumeToken) << " but found: " << (eventBSON["_id"] ? BSON("_id" << eventBSON["_id"]) : BSONObj()), idField.binaryEqual(resumeToken)); + // If we are producing output to be merged on mongos, then metadata must also be returned. We + // must do the above check first since if the request is for a per shard cursor, a project can + // be pushed down to a shard. + if (_pipeline->getContext()->forPerShardCursor) { + return event.toBsonWithMetaData(); + } + return eventBSON; } diff --git a/src/mongo/db/exec/pipeline_proxy.cpp b/src/mongo/db/exec/pipeline_proxy.cpp index 8639307271d..0e3531db219 100644 --- a/src/mongo/db/exec/pipeline_proxy.cpp +++ b/src/mongo/db/exec/pipeline_proxy.cpp @@ -58,7 +58,8 @@ PipelineProxyStage::PipelineProxyStage(OperationContext* opCtx, const char* stageTypeName) : PlanStage(stageTypeName, opCtx), _pipeline(std::move(pipeline)), - _includeMetaData(_pipeline->getContext()->needsMerge), // send metadata to merger + _includeMetaData(_pipeline->getContext()->needsMerge || + _pipeline->getContext()->forPerShardCursor), // send metadata to merger _ws(ws) { // We take over responsibility for disposing of the Pipeline, since it is required that // doDispose() will be called before destruction of this PipelineProxyStage. diff --git a/src/mongo/db/pipeline/aggregation_request.cpp b/src/mongo/db/pipeline/aggregation_request.cpp index 9733168fb51..608554d4645 100644 --- a/src/mongo/db/pipeline/aggregation_request.cpp +++ b/src/mongo/db/pipeline/aggregation_request.cpp @@ -215,6 +215,31 @@ StatusWith<AggregationRequest> AggregationRequest::parseFromBSON( } catch (const DBException& ex) { return ex.toStatus(); } + } else if (kPassthroughToShardName == fieldName) { + if (elem.type() != BSONType::Object) { + return {ErrorCodes::TypeMismatch, + str::stream() + << fieldName << " must be an object, not a " << typeName(elem.type())}; + } + for (auto&& innerElem : elem.Obj()) { + if (innerElem.fieldName() != "shard"_sd) { + return {ErrorCodes::FailedToParse, + str::stream() << fieldName << "." << innerElem.fieldName() + << " is an unexpected field"}; + } + } + auto shardIdElem = elem["shard"_sd]; + if (shardIdElem.eoo()) { + return {ErrorCodes::FailedToParse, + str::stream() + << fieldName + << ".shard was missing. Must specify a shard to pass through to"}; + } else if (shardIdElem.type() != BSONType::String) { + return {ErrorCodes::TypeMismatch, + str::stream() << fieldName << ".shard must be a string, not a " + << typeName(shardIdElem.type())}; + } + request.setPassthroughToShard(ShardId(shardIdElem.String())); } else if (bypassDocumentValidationCommandOption() == fieldName) { request.setBypassDocumentValidation(elem.trueValue()); } else if (WriteConcernOptions::kWriteConcernField == fieldName) { @@ -341,6 +366,9 @@ Document AggregationRequest::serializeToCommandObj() const { // Only serialize runtime constants if any were specified. {kRuntimeConstants, _runtimeConstants ? Value(_runtimeConstants->toBSON()) : Value()}, {kUseNewUpsert, _useNewUpsert ? Value(true) : Value()}, - }; + {kPassthroughToShardName, + _passthroughToShard.has_value() + ? Value(Document{{"shard", _passthroughToShard->toString()}}) + : Value()}}; } } // namespace mongo diff --git a/src/mongo/db/pipeline/aggregation_request.h b/src/mongo/db/pipeline/aggregation_request.h index b15a06f898c..e498d02c682 100644 --- a/src/mongo/db/pipeline/aggregation_request.h +++ b/src/mongo/db/pipeline/aggregation_request.h @@ -39,6 +39,7 @@ #include "mongo/db/pipeline/runtime_constants_gen.h" #include "mongo/db/query/explain_options.h" #include "mongo/db/write_concern_options.h" +#include "mongo/s/shard_id.h" namespace mongo { @@ -64,6 +65,7 @@ public: static constexpr StringData kHintName = "hint"_sd; static constexpr StringData kCommentName = "comment"_sd; static constexpr StringData kExchangeName = "exchange"_sd; + static constexpr StringData kPassthroughToShardName = "$_passthroughToShard"_sd; static constexpr StringData kRuntimeConstants = "runtimeConstants"_sd; static constexpr StringData kUseNewUpsert = "useNewUpsert"_sd; @@ -243,6 +245,10 @@ public: return _useNewUpsert; } + boost::optional<ShardId> getPassthroughToShard() const { + return _passthroughToShard; + } + // // Setters for optional fields. // @@ -319,6 +325,10 @@ public: _useNewUpsert = useNewUpsert; } + void setPassthroughToShard(boost::optional<ShardId> shardId) { + _passthroughToShard = shardId; + } + private: // Required fields. const NamespaceString _nss; @@ -376,5 +386,7 @@ private: // Indicates whether the aggregation may use the new 'upsertSupplied' mechanism when running // $merge stages. Versions of mongoS from 4.2.2 onwards always set this flag. bool _useNewUpsert = false; + + boost::optional<ShardId> _passthroughToShard; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/aggregation_request_test.cpp b/src/mongo/db/pipeline/aggregation_request_test.cpp index 359ee994f4b..6e624272737 100644 --- a/src/mongo/db/pipeline/aggregation_request_test.cpp +++ b/src/mongo/db/pipeline/aggregation_request_test.cpp @@ -61,7 +61,8 @@ TEST(AggregationRequestTest, ShouldParseAllKnownOptions) { "needsMerge: true, mergeByPBRT: true, bypassDocumentValidation: true, collation: {locale: " "'en_US'}, cursor: {batchSize: 10}, hint: {a: 1}, maxTimeMS: 100, readConcern: {level: " "'linearizable'}, $queryOptions: {$readPreference: 'nearest'}, comment: 'agg_comment', " - "exchange: {policy: 'roundrobin', consumers:NumberInt(2)}}"); + "exchange: {policy: 'roundrobin', consumers:NumberInt(2)}, $_passthroughToShard: " + "{shard: 'foo'}}"); auto request = unittest::assertGet(AggregationRequest::parseFromBSON(nss, inputBson)); ASSERT_FALSE(request.getExplain()); ASSERT_TRUE(request.shouldAllowDiskUse()); @@ -512,6 +513,20 @@ TEST(AggregationRequestTest, ShouldRejectInvalidWriteConcern) { fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, writeConcern: 'invalid'}"); ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); } + +TEST(AggregationRequestTest, ShouldRejectInvalidPassthroughToShard) { + NamespaceString nss("a.collection"); + const BSONObj inputBson = + fromjson("{pipeline: [{$changeStream: {}}], cursor: {}, passthroughToShard: {foo: 'f'}}"); + ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); + const BSONObj inputBson2 = + fromjson("{pipeline: [{$changeStream: {}}], cursor: {}, passthroughToShard: {shard: 5}}"); + ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson2).getStatus()); + const BSONObj inputBson3 = + fromjson("{pipeline: [{$changeStream: {}}], cursor: {}, passthroughToShard: {}}"); + ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson3).getStatus()); +} + // // Ignore fields parsed elsewhere. // diff --git a/src/mongo/db/pipeline/expression_context.cpp b/src/mongo/db/pipeline/expression_context.cpp index 1bc7186eec5..41db43edf93 100644 --- a/src/mongo/db/pipeline/expression_context.cpp +++ b/src/mongo/db/pipeline/expression_context.cpp @@ -56,6 +56,7 @@ ExpressionContext::ExpressionContext(OperationContext* opCtx, needsMerge = request.needsMerge(); mergeByPBRT = request.mergeByPBRT(); allowDiskUse = request.shouldAllowDiskUse(); + forPerShardCursor = request.getPassthroughToShard().has_value(); bypassDocumentValidation = request.shouldBypassDocumentValidation(); ns = request.getNamespaceString(); mongoProcessInterface = std::move(processInterface); diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h index 8717d4d518c..674b632d7f7 100644 --- a/src/mongo/db/pipeline/expression_context.h +++ b/src/mongo/db/pipeline/expression_context.h @@ -239,6 +239,7 @@ public: bool needsMerge = false; bool mergeByPBRT = false; bool inMongos = false; + bool forPerShardCursor = false; bool allowDiskUse = false; bool bypassDocumentValidation = false; bool inMultiDocumentTransaction = false; diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index d9e10e6eecc..4f1ffd77e88 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -77,27 +77,49 @@ BSONObj createPassthroughCommandForShard(OperationContext* opCtx, const AggregationRequest& request, const boost::optional<RuntimeConstants>& constants, Pipeline* pipeline, - BSONObj collationObj) { + BSONObj collationObj, + bool forPerShardCursor, + boost::optional<int> overrideBatchSize) { // Create the command for the shards. - MutableDocument targetedCmd(request.serializeToCommandObj()); + auto serializedCommand = request.serializeToCommandObj(); + MutableDocument targetedCmd(serializedCommand); if (pipeline) { targetedCmd[AggregationRequest::kPipelineName] = Value(pipeline->serialize()); } + if (forPerShardCursor) { + // If this is a change stream aggregation, set the 'mergeByPBRT' flag on the command. This + // notifies the shards that the mongoS is capable of merging streams based on resume token. + // TODO SERVER-38539: the 'mergeByPBRT' flag is no longer necessary in 4.4. + targetedCmd[AggregationRequest::kMergeByPBRTName] = Value(true); + } + + if (overrideBatchSize.has_value()) { + if (serializedCommand[AggregationRequest::kCursorName].missing()) { + targetedCmd[AggregationRequest::kCursorName] = + Value(DOC(AggregationRequest::kBatchSizeName << Value(*overrideBatchSize))); + } else { + targetedCmd[AggregationRequest::kCursorName][AggregationRequest::kBatchSizeName] = + Value(*overrideBatchSize); + } + } return genericTransformForShards( - std::move(targetedCmd), opCtx, request, constants, collationObj); + std::move(targetedCmd), opCtx, request, constants, collationObj, forPerShardCursor); } BSONObj genericTransformForShards(MutableDocument&& cmdForShards, OperationContext* opCtx, const AggregationRequest& request, const boost::optional<RuntimeConstants>& constants, - BSONObj collationObj) { + BSONObj collationObj, + bool forPerShardCursor) { if (constants) { cmdForShards[AggregationRequest::kRuntimeConstants] = Value(constants.get().toBSON()); } - cmdForShards[AggregationRequest::kFromMongosName] = Value(true); + if (!forPerShardCursor) { + cmdForShards[AggregationRequest::kFromMongosName] = Value(true); + } // If this is a request for an aggregation explain, then we must wrap the aggregate inside an // explain command. if (auto explainVerbosity = request.getExplain()) { @@ -286,8 +308,13 @@ DispatchShardPipelineResults dispatchShardPipeline( exchangeSpec, expCtx->getRuntimeConstants(), true) - : createPassthroughCommandForShard( - opCtx, aggRequest, expCtx->getRuntimeConstants(), pipeline.get(), collationObj); + : createPassthroughCommandForShard(opCtx, + aggRequest, + expCtx->getRuntimeConstants(), + pipeline.get(), + collationObj, + false, + boost::none); // In order for a $changeStream to work reliably, we need the shard registry to be at least as // current as the logical time at which the pipeline was serialized to 'targetedCommand' above. diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.h b/src/mongo/db/pipeline/sharded_agg_helpers.h index 15e0dd51c2e..ed362cf1e0c 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.h +++ b/src/mongo/db/pipeline/sharded_agg_helpers.h @@ -117,13 +117,16 @@ BSONObj createPassthroughCommandForShard(OperationContext* opCtx, const AggregationRequest& request, const boost::optional<RuntimeConstants>& constants, Pipeline* pipeline, - BSONObj collationObj); + BSONObj collationObj, + bool forPerShardCursor, + boost::optional<int> overrideBatchSize); BSONObj genericTransformForShards(MutableDocument&& cmdForShards, OperationContext* opCtx, const AggregationRequest& request, const boost::optional<RuntimeConstants>& constants, - BSONObj collationObj); + BSONObj collationObj, + bool forPerShardCursor = false); /** * For a sharded collection, establishes remote cursors on each shard that may have results, and diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp index 74f2132b4b0..390ccae608c 100644 --- a/src/mongo/s/query/cluster_aggregate.cpp +++ b/src/mongo/s/query/cluster_aggregate.cpp @@ -42,6 +42,7 @@ #include "mongo/db/curop.h" #include "mongo/db/logical_clock.h" #include "mongo/db/operation_context.h" +#include "mongo/db/pipeline/change_stream_constants.h" #include "mongo/db/pipeline/document_source_change_stream.h" #include "mongo/db/pipeline/document_source_out.h" #include "mongo/db/pipeline/expression_context.h" @@ -863,6 +864,28 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, resolveInvolvedNamespaces(opCtx, litePipe, request.getNamespaceString()); auto status = [&]() { + if (request.getPassthroughToShard().has_value()) { + uassert(6273801, + "per shard cursor pipeline must contain $changeStream", + litePipe.hasChangeStream()); + + // Make sure the rest of the pipeline can be pushed down. + auto pipeline = request.getPipeline(); + std::vector<BSONObj> nonChangeStreamPart(pipeline.begin() + 1, pipeline.end()); + LiteParsedPipeline nonChangeStreamLite( + AggregationRequest(request.getNamespaceString(), nonChangeStreamPart)); + uassert(6273802, + "$_passthroughToShard specified with a stage that is not allowed to " + "passthrough from mongos", + nonChangeStreamLite.allowedToPassthroughFromMongos()); + ShardId shardId = *request.getPassthroughToShard(); + uassert(6273803, + "$_passthroughToShard not supported for queries against config replica set", + shardId != ShardRegistry::kConfigServerShardId); + + return aggPassthrough( + opCtx, namespaces, shardId, request, litePipe, privileges, result, true); + } // A pipeline is allowed to passthrough to the primary shard iff the following conditions // are met: // @@ -874,7 +897,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, litePipe.allowedToPassthroughFromMongos() && !involvesShardedCollections) { const auto primaryShardId = routingInfo->db().primary()->getId(); return aggPassthrough( - opCtx, namespaces, primaryShardId, request, litePipe, privileges, result); + opCtx, namespaces, primaryShardId, request, litePipe, privileges, result, false); } // Populate the collection UUID and the appropriate collation to use. @@ -967,19 +990,30 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx, const AggregationRequest& aggRequest, const LiteParsedPipeline& liteParsedPipeline, const PrivilegeVector& privileges, - BSONObjBuilder* out) { + BSONObjBuilder* out, + bool forPerShardCursor) { // Format the command for the shard. This adds the 'fromMongos' field, wraps the command as an // explain if necessary, and rewrites the result into a format safe to forward to shards. BSONObj cmdObj = CommandHelpers::filterCommandRequestForPassthrough( sharded_agg_helpers::createPassthroughCommandForShard( - opCtx, aggRequest, boost::none, nullptr, BSONObj())); + opCtx, + aggRequest, + boost::none, + nullptr, + BSONObj(), + forPerShardCursor, + forPerShardCursor ? boost::optional<int>(0) : boost::none)); + + uassert(6900400, + "shouldn't have fromMongos set for per shard cursor", + !forPerShardCursor || cmdObj[AggregationRequest::kFromMongosName].eoo()); MultiStatementTransactionRequestsSender ars( opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), namespaces.executionNss.db().toString(), {{shardId, - shardId != ShardRegistry::kConfigServerShardId + shardId != ShardRegistry::kConfigServerShardId && !forPerShardCursor ? appendShardVersion(std::move(cmdObj), ChunkVersion::UNSHARDED()) : std::move(cmdObj)}}, ReadPreferenceSetting::get(opCtx), @@ -1005,16 +1039,18 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx, auto tailMode = liteParsedPipeline.hasChangeStream() ? TailableModeEnum::kTailableAndAwaitData : TailableModeEnum::kNormal; - result = uassertStatusOK( - storePossibleCursor(opCtx, - shardId, - *response.shardHostAndPort, - response.swResponse.getValue().data, - namespaces.requestedNss, - Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), - Grid::get(opCtx)->getCursorManager(), - privileges, - tailMode)); + result = uassertStatusOK(storePossibleCursor( + opCtx, + shardId, + *response.shardHostAndPort, + response.swResponse.getValue().data, + namespaces.requestedNss, + Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), + Grid::get(opCtx)->getCursorManager(), + privileges, + tailMode, + forPerShardCursor ? boost::optional<BSONObj>(change_stream_constants::kSortSpec) + : boost::none)); } // First append the properly constructed writeConcernError. It will then be skipped diff --git a/src/mongo/s/query/cluster_aggregate.h b/src/mongo/s/query/cluster_aggregate.h index 9d605043def..2d02f0f6624 100644 --- a/src/mongo/s/query/cluster_aggregate.h +++ b/src/mongo/s/query/cluster_aggregate.h @@ -121,7 +121,8 @@ private: const AggregationRequest&, const LiteParsedPipeline&, const PrivilegeVector& privileges, - BSONObjBuilder* result); + BSONObjBuilder* result, + bool forPerShardCursor); }; } // namespace mongo diff --git a/src/mongo/s/query/router_exec_stage.h b/src/mongo/s/query/router_exec_stage.h index 5b611623ea9..5b99a4f668e 100644 --- a/src/mongo/s/query/router_exec_stage.h +++ b/src/mongo/s/query/router_exec_stage.h @@ -126,7 +126,7 @@ public: * Returns the postBatchResumeToken if this RouterExecStage tree is executing a $changeStream; * otherwise, returns an empty BSONObj. Default implementation forwards to the stage's child. */ - virtual BSONObj getPostBatchResumeToken() const { + virtual BSONObj getPostBatchResumeToken() { return _child ? _child->getPostBatchResumeToken() : BSONObj(); } diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h index a7cead17d89..861c170277d 100644 --- a/src/mongo/s/query/router_stage_merge.h +++ b/src/mongo/s/query/router_stage_merge.h @@ -67,6 +67,10 @@ public: return _resultsMerger.getNumRemotes(); } + BSONObj getPostBatchResumeToken() final { + return _resultsMerger.getHighWaterMark(); + } + protected: Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final { return _resultsMerger.setAwaitDataTimeout(awaitDataTimeout); diff --git a/src/mongo/s/query/router_stage_pipeline.cpp b/src/mongo/s/query/router_stage_pipeline.cpp index aaaad0c3e96..deab88c2720 100644 --- a/src/mongo/s/query/router_stage_pipeline.cpp +++ b/src/mongo/s/query/router_stage_pipeline.cpp @@ -86,7 +86,7 @@ std::size_t RouterStagePipeline::getNumRemotes() const { return 0; } -BSONObj RouterStagePipeline::getPostBatchResumeToken() const { +BSONObj RouterStagePipeline::getPostBatchResumeToken() { return _mergeCursorsStage ? _mergeCursorsStage->getHighWaterMark() : BSONObj(); } diff --git a/src/mongo/s/query/router_stage_pipeline.h b/src/mongo/s/query/router_stage_pipeline.h index de0dc25b310..88f5930e3d5 100644 --- a/src/mongo/s/query/router_stage_pipeline.h +++ b/src/mongo/s/query/router_stage_pipeline.h @@ -53,7 +53,7 @@ public: std::size_t getNumRemotes() const final; - BSONObj getPostBatchResumeToken() const final; + BSONObj getPostBatchResumeToken() final; protected: Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final; diff --git a/src/mongo/s/query/store_possible_cursor.cpp b/src/mongo/s/query/store_possible_cursor.cpp index 9877af451a2..d033898833a 100644 --- a/src/mongo/s/query/store_possible_cursor.cpp +++ b/src/mongo/s/query/store_possible_cursor.cpp @@ -77,7 +77,8 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx, std::shared_ptr<executor::TaskExecutor> executor, ClusterCursorManager* cursorManager, PrivilegeVector privileges, - TailableModeEnum tailableMode) { + TailableModeEnum tailableMode, + boost::optional<BSONObj> routerSort) { if (!cmdResult["ok"].trueValue() || !cmdResult.hasField("cursor")) { return cmdResult; } @@ -104,14 +105,22 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx, auto& remoteCursor = params.remotes.back(); remoteCursor.setShardId(shardId.toString()); remoteCursor.setHostAndPort(server); - remoteCursor.setCursorResponse(CursorResponse(incomingCursorResponse.getValue().getNSS(), - incomingCursorResponse.getValue().getCursorId(), - {})); + remoteCursor.setCursorResponse( + CursorResponse(incomingCursorResponse.getValue().getNSS(), + incomingCursorResponse.getValue().getCursorId(), + {}, /* batch */ + incomingCursorResponse.getValue().getNumReturnedSoFar(), + incomingCursorResponse.getValue().getLastOplogTimestamp(), + incomingCursorResponse.getValue().getPostBatchResumeToken(), + incomingCursorResponse.getValue().getWriteConcernError())); params.originatingCommandObj = CurOp::get(opCtx)->opDescription().getOwned(); params.tailableMode = tailableMode; params.lsid = opCtx->getLogicalSessionId(); params.txnNumber = opCtx->getTxnNumber(); params.originatingPrivileges = std::move(privileges); + if (routerSort) { + params.sort = *routerSort; + } if (TransactionRouter::get(opCtx)) { params.isAutoCommit = false; @@ -137,7 +146,13 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx, CurOp::get(opCtx)->debug().cursorid = clusterCursorId.getValue(); CursorResponse outgoingCursorResponse( - requestedNss, clusterCursorId.getValue(), incomingCursorResponse.getValue().getBatch()); + requestedNss, + clusterCursorId.getValue(), + incomingCursorResponse.getValue().getBatch(), + incomingCursorResponse.getValue().getNumReturnedSoFar(), + incomingCursorResponse.getValue().getLastOplogTimestamp(), + incomingCursorResponse.getValue().getPostBatchResumeToken(), + incomingCursorResponse.getValue().getWriteConcernError()); return outgoingCursorResponse.toBSON(CursorResponse::ResponseType::InitialResponse); } diff --git a/src/mongo/s/query/store_possible_cursor.h b/src/mongo/s/query/store_possible_cursor.h index 43157322b0b..0bbaddfeaab 100644 --- a/src/mongo/s/query/store_possible_cursor.h +++ b/src/mongo/s/query/store_possible_cursor.h @@ -72,6 +72,8 @@ class TaskExecutor; * @ cursorManager the ClusterCursorManager on which to register the resulting ClusterClientCursor * @ privileges the PrivilegeVector of privileges needed for the original command, to be used for * auth checking by GetMore + * @ routerSort the sort to apply on the router. With only one cursor this shouldn't be common, but + * is needed to set up change stream post-batch resume tokens correctly for per shard cursors. */ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx, const ShardId& shardId, @@ -81,7 +83,8 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx, std::shared_ptr<executor::TaskExecutor> executor, ClusterCursorManager* cursorManager, PrivilegeVector privileges, - TailableModeEnum tailableMode = TailableModeEnum::kNormal); + TailableModeEnum tailableMode = TailableModeEnum::kNormal, + boost::optional<BSONObj> routerSort = boost::none); /** * Convenience function which extracts all necessary information from the passed RemoteCursor, and |