summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMickey. J Winters <mickey.winters@mongodb.com>2022-11-08 18:34:59 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-11-08 23:00:12 +0000
commit792f237975470ab61e3e9cd4b54ede87ed9377d8 (patch)
tree66d4dfa05fbb932f2bc3615766f6e4f89f5940a5
parenta7d307ecb1a97c286c2e1084a92dead054ebd270 (diff)
downloadmongo-792f237975470ab61e3e9cd4b54ede87ed9377d8.tar.gz
SERVER-69004 backport per shard cursors
SERVER-62400 SERVER-62681 SERVER-62738 SERVER-63781 SERVER-63774 SERVER-63772 SERVER-63773 SERVER-58673 SERVER-70633 SERVER-69785 both modified: src/mongo/s/query/cluster_aggregate.cpp both modified: src/mongo/s/query/cluster_aggregation_planner.cpp both modified: src/mongo/s/query/cluster_aggregation_planner.h these files had irreconcilable differences and the changes had to be reimplemented by hand. this is due to some refactoring and AggregationTargeter not existing (cherry-picked from commit e424f3ed1266bcd772c52bcf23c518b2fe6c83ab) (cherry-picked from commit ba8f9a28c8769dfe10f73b190c943ee4a57ee7a1) (cherry-picked from commit 1219ff764c932f16a68c7ba1afd9b925f9f876c8)
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_per_shard_cursor_passthrough.yml62
-rw-r--r--etc/evergreen.yml9
-rw-r--r--jstests/change_streams/lookup_post_image.js16
-rw-r--r--jstests/libs/override_methods/implicit_passthrough_to_shard_changestreams.js47
-rw-r--r--jstests/noPassthrough/change_streams_per_shard_cursor.js207
-rw-r--r--src/mongo/db/exec/change_stream_proxy.cpp12
-rw-r--r--src/mongo/db/exec/pipeline_proxy.cpp3
-rw-r--r--src/mongo/db/pipeline/aggregation_request.cpp30
-rw-r--r--src/mongo/db/pipeline/aggregation_request.h12
-rw-r--r--src/mongo/db/pipeline/aggregation_request_test.cpp17
-rw-r--r--src/mongo/db/pipeline/expression_context.cpp1
-rw-r--r--src/mongo/db/pipeline/expression_context.h1
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp41
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.h7
-rw-r--r--src/mongo/s/query/cluster_aggregate.cpp64
-rw-r--r--src/mongo/s/query/cluster_aggregate.h3
-rw-r--r--src/mongo/s/query/router_exec_stage.h2
-rw-r--r--src/mongo/s/query/router_stage_merge.h4
-rw-r--r--src/mongo/s/query/router_stage_pipeline.cpp2
-rw-r--r--src/mongo/s/query/router_stage_pipeline.h2
-rw-r--r--src/mongo/s/query/store_possible_cursor.cpp25
-rw-r--r--src/mongo/s/query/store_possible_cursor.h5
22 files changed, 532 insertions, 40 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_per_shard_cursor_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_per_shard_cursor_passthrough.yml
new file mode 100644
index 00000000000..6e512ba4f16
--- /dev/null
+++ b/buildscripts/resmokeconfig/suites/change_streams_per_shard_cursor_passthrough.yml
@@ -0,0 +1,62 @@
+test_kind: js_test
+
+selector:
+ roots:
+ - jstests/change_streams/**/*.js
+ exclude_files:
+ # Exercises an internal detail of mongos<->mongod communication. Not expected to work on mongos.
+ - jstests/change_streams/report_latest_observed_oplog_timestamp.js
+ exclude_with_any_tags:
+ ##
+ # The next tags correspond to the special errors thrown by the
+ # set_read_and_write_concerns.js override when it refuses to replace the readConcern or
+ # writeConcern of a particular command. Above each tag are the message(s) that cause the tag to be
+ # warranted.
+ ##
+ # "Cowardly refusing to override write concern of command: ..."
+ - assumes_write_concern_unchanged
+ # Exclude any that assume sharding is disabled
+ - assumes_against_mongod_not_mongos
+ - assumes_unsharded_collection
+
+executor:
+ archive:
+ hooks:
+ - CheckReplDBHash
+ - ValidateCollections
+ config:
+ shell_options:
+ global_vars:
+ TestData:
+ defaultReadConcernLevel: null
+ enableMajorityReadConcern: ''
+ # Enable causal consistency for change streams suites using 1 node replica sets. See
+ # change_streams.yml for detailed explanation.
+ eval: >-
+ var testingReplication = true;
+ load('jstests/libs/override_methods/set_read_and_write_concerns.js');
+ load('jstests/libs/override_methods/implicitly_shard_accessed_collections.js');
+ load('jstests/libs/override_methods/enable_causal_consistency_without_read_pref.js');
+ load('jstests/libs/override_methods/implicit_passthrough_to_shard_changestreams.js');
+ readMode: commands
+ hooks:
+ - class: CheckReplDBHash
+ - class: ValidateCollections
+ - class: CleanEveryN
+ n: 20
+ fixture:
+ class: ShardedClusterFixture
+ mongos_options:
+ bind_ip_all: ''
+ set_parameters:
+ enableTestCommands: 1
+ mongod_options:
+ bind_ip_all: ''
+ set_parameters:
+ enableTestCommands: 1
+ writePeriodicNoops: 1
+ periodicNoopIntervalSecs: 1
+ num_rs_nodes_per_shard: 1
+ num_shards: 1
+ enable_sharding:
+ - test
diff --git a/etc/evergreen.yml b/etc/evergreen.yml
index a7dbb0bb24a..d7cc573dcc7 100644
--- a/etc/evergreen.yml
+++ b/etc/evergreen.yml
@@ -4853,6 +4853,15 @@ tasks:
resmoke_jobs_max: 1
- <<: *task_template
+ name: change_streams_per_shard_cursor_passthrough
+ tags: ["change_streams"]
+ commands:
+ - func: "do setup"
+ - func: "run tests"
+ vars:
+ resmoke_args: --suites=change_streams_per_shard_cursor_passthrough --storageEngine=wiredTiger
+
+- <<: *task_template
name: disk_wiredtiger
commands:
- func: "do setup"
diff --git a/jstests/change_streams/lookup_post_image.js b/jstests/change_streams/lookup_post_image.js
index fa2658ed6f8..a062c9f83ba 100644
--- a/jstests/change_streams/lookup_post_image.js
+++ b/jstests/change_streams/lookup_post_image.js
@@ -202,9 +202,21 @@ assert.eq(latestChange.fullDocument, null);
assertCreateCollection(db, coll.getName());
assert.writeOK(coll.insert({_id: "fullDocument is lookup 2"}));
-// Confirm that the next entry's post-image is null since new collection has a different
+// Confirm that the update's post-image is null since new collection has a different
// UUID.
-latestChange = cst.getOneChange(cursorBeforeDrop);
+const cursorOldUUID = cst.startWatchingChanges({
+ collection: coll,
+ pipeline: [
+ {$changeStream: {fullDocument: "updateLookup", resumeAfter: deleteDocResumePoint}},
+ {$match: {operationType: "update"}}
+ ],
+ aggregateOptions: {cursor: {batchSize: 0}}
+});
+
+// The next entry is the 'update' operation. Confirm that the next entry's post-image is null
+// because the original collection (i.e. the collection that the 'update' was applied to) has
+// been dropped and the new incarnation of the collection has a different UUID.
+latestChange = cst.getOneChange(cursorOldUUID);
assert.eq(latestChange.operationType, "update");
assert(latestChange.hasOwnProperty("fullDocument"));
assert.eq(latestChange.fullDocument, null);
diff --git a/jstests/libs/override_methods/implicit_passthrough_to_shard_changestreams.js b/jstests/libs/override_methods/implicit_passthrough_to_shard_changestreams.js
new file mode 100644
index 00000000000..8e8779e7ca5
--- /dev/null
+++ b/jstests/libs/override_methods/implicit_passthrough_to_shard_changestreams.js
@@ -0,0 +1,47 @@
+/**
+ * Overrides runCommand to use the $_passthroughToShard parameter. The changestreams per-shard
+ * cursor passthrough suite ensures changestream tests can still run correctly on a single-shard
+ * cluster. By adding this parameter, we pass through to that single shard, running the pipelines
+ * directly on that mongod. This will test the machinery of per-shard cursors via mongos.
+ */
+
+(function() {
+'use strict';
+
+load("jstests/libs/override_methods/override_helpers.js"); // For 'OverrideHelpers'.
+load("jstests/libs/discover_topology.js"); // For 'DiscoverTopology'.
+
+// To be eligible, a command must be a changeStream request sent to a mongos.
+const isEligibleForPerShardCursor = function(conn, cmdObj) {
+ if (!(cmdObj && cmdObj.aggregate && Array.isArray(cmdObj.pipeline) &&
+ cmdObj.pipeline.length > 0 && typeof cmdObj.pipeline[0].$changeStream == "object" &&
+ cmdObj.pipeline[0].$changeStream.constructor === Object)) {
+ return false;
+ }
+ return conn.isMongos();
+};
+
+const discoverShardId = function(conn) {
+ const topology = DiscoverTopology.findConnectedNodes(conn);
+ const shards = topology.shards;
+ let shardName = Object.keys(shards)[0];
+ return {shard: shardName};
+};
+
+function runCommandWithPassthroughToShard(
+ conn, _dbName, _commandName, commandObj, func, makeFuncArgs) {
+ if (typeof commandObj !== "object" || commandObj === null) {
+ return func.apply(conn, makeFuncArgs(commandObj));
+ }
+ if (!isEligibleForPerShardCursor(conn, commandObj)) {
+ return func.apply(conn, makeFuncArgs(commandObj));
+ }
+ commandObj.$_passthroughToShard = discoverShardId(conn);
+ return func.apply(conn, makeFuncArgs(commandObj));
+}
+
+OverrideHelpers.prependOverrideInParallelShell(
+ "jstests/libs/override_methods/implicit_passthrough_to_shard_changestreams.js");
+
+OverrideHelpers.overrideRunCommand(runCommandWithPassthroughToShard);
+}());
diff --git a/jstests/noPassthrough/change_streams_per_shard_cursor.js b/jstests/noPassthrough/change_streams_per_shard_cursor.js
new file mode 100644
index 00000000000..41effab8619
--- /dev/null
+++ b/jstests/noPassthrough/change_streams_per_shard_cursor.js
@@ -0,0 +1,207 @@
+/**
+ * @tags: [
+ * requires_sharding,
+ * uses_change_streams,
+ * ]
+ */
+(function() {
+"use strict";
+
+const dbName = jsTestName();
+const setupShardedCluster = (shards = 1) => {
+ const st = new ShardingTest(
+ {shards, mongos: 1, config: 1, rs: {nodes: 1, setParameter: {writePeriodicNoops: false}}});
+ const sdb = st.s0.getDB(dbName);
+ assert.commandWorked(sdb.dropDatabase());
+
+ sdb.getSiblingDB("admin").setProfilingLevel(0, -1);
+ st.shard0.getDB(dbName).setProfilingLevel(0, -1);
+
+ // Shard the relevant collections.
+ assert.commandWorked(st.s.adminCommand({enableSharding: dbName}));
+ st.ensurePrimaryShard(dbName, st.shard0.name);
+ if (shards === 2) {
+ // Shard the collection on {_id: 1}, split at {_id: 0} and move the empty upper chunk to
+ // shard1.
+ st.shardColl("coll", {_id: 1}, {_id: 0}, {_id: 0}, dbName);
+ st.shardColl("coll2", {_id: 1}, {_id: 0}, {_id: 0}, dbName);
+ } else {
+ assert(shards === 1, "only 1 or 2 shards supported");
+ assert.commandWorked(st.s.adminCommand({shardCollection: dbName + ".coll", key: {_id: 1}}));
+ assert.commandWorked(
+ st.s.adminCommand({shardCollection: dbName + ".coll2", key: {_id: 1}}));
+ }
+
+ const shardId = st.shard0.shardName;
+ return [sdb, st, shardId];
+};
+
+const pscWatch = (db, coll, shardId, options = {}, csOptions = {}) => {
+ let cmd = {
+ aggregate: coll,
+ cursor: {},
+ pipeline: [{$changeStream: csOptions}],
+ $_passthroughToShard: {shard: shardId}
+ };
+ cmd = Object.assign({}, cmd, options);
+ if (options.pipeline) {
+ cmd.pipeline = [{$changeStream: csOptions}].concat(options.pipeline);
+ }
+ const resp = db.runCommand(cmd);
+ assert.commandWorked(resp);
+ if (options.explain) {
+ return resp;
+ }
+ return new DBCommandCursor(db, resp);
+};
+
+// Parsing
+let [sdb, st, shardId] = setupShardedCluster();
+
+// Should not allow pipeline without $changeStream.
+assert.commandFailedWithCode(sdb.runCommand({
+ aggregate: "coll",
+ cursor: {},
+ pipeline: [{$match: {perfect: true}}],
+ $_passthroughToShard: {shard: shardId}
+}),
+ 6273801);
+
+// $out can't passthrough so it's not allowed.
+assert.commandFailedWithCode(
+ assert.throws(() => pscWatch(sdb, "coll", shardId, {pipeline: [{$out: "h"}]})), 6273802);
+
+// Shard option should be specified.
+assert.commandFailedWithCode(
+ sdb.runCommand(
+ {aggregate: "coll", cursor: {}, pipeline: [{$changeStream: {}}], $_passthroughToShard: {}}),
+ ErrorCodes.FailedToParse);
+
+// The shardId field should be a string.
+assert.commandFailedWithCode(assert.throws(() => pscWatch(sdb, "coll", 42)),
+ ErrorCodes.TypeMismatch);
+// Can't open a per shard cursor on the config RS.
+assert.commandFailedWithCode(assert.throws(() => pscWatch(sdb, "coll", "config")), 6273803);
+
+// The shardId should be a valid shard.
+assert.commandFailedWithCode(
+ assert.throws(() => pscWatch(sdb, "coll", "Dwane 'the Shard' Johnson")),
+ ErrorCodes.ShardNotFound);
+
+// Correctness.
+
+// Simple collection level watch
+// this insert shouldn't show up since it happens before we make a cursor.
+sdb.coll.insertOne({location: 1});
+let c = pscWatch(sdb, "coll", shardId);
+// these inserts should show up since they're after we make a cursor.
+for (let i = 1; i <= 4; i++) {
+ sdb.coll.insertOne({location: 2, i});
+ assert(!c.isExhausted());
+ assert.soon(() => c.hasNext());
+ c.next();
+}
+assert(!c.hasNext());
+
+// Simple database level watch
+c = pscWatch(sdb, 1, shardId);
+
+sdb.coll.insertOne({location: 3});
+assert(!c.isExhausted());
+assert.soon(() => c.hasNext());
+c.next();
+
+sdb.coll2.insertOne({location: 4});
+assert(!c.isExhausted());
+assert.soon(() => c.hasNext());
+c.next();
+
+assert(!c.hasNext());
+
+// Watching collection that doesn't exist yet.
+c = pscWatch(sdb, "toBeCreated", shardId);
+assert(!c.isExhausted());
+assert(!c.hasNext());
+
+st.s.adminCommand({shardCollection: dbName + ".toBeCreated", key: {_id: 1}});
+assert(!c.isExhausted());
+assert(!c.hasNext());
+
+sdb.toBeCreated.insertOne({location: 8});
+assert(!c.isExhausted());
+assert.soon(() => c.hasNext());
+c.next();
+
+assert(!c.hasNext());
+
+// Explain output should not have a split pipeline. It should look like mongod explain output.
+let explainOut = pscWatch(sdb, "coll", shardId, {explain: true});
+assert(!explainOut.hasOwnProperty("splitPipeline"));
+assert.hasOwnProperty(explainOut, "stages");
+
+// If we getMore an invalidated cursor the cursor should have been closed on mongos and we should
+// get CursorNotFound, even if the invalidate event was never recieved by mongos.
+[[], [{$match: {f: "filter out invalidate event"}}]].forEach((pipeline) => {
+ assert.commandWorked(st.s.adminCommand({shardCollection: dbName + ".toDrop", key: {_id: 1}}));
+ let c = pscWatch(sdb, "toDrop", shardId, {pipeline});
+ let cid = c._cursorid;
+ sdb.toDrop.insertOne({});
+ sdb.toDrop.drop();
+ assert.retry(() => {
+ // After an invalidate the cursorid gets set to 0
+ if (c._cursorid == 0) {
+ return true;
+ }
+ c._runGetMoreCommand();
+ return false;
+ }, "change stream should have been invalidated by now", 4);
+ let res = sdb.runCommand({getMore: cid, collection: "toDrop"});
+ assert.eq(res.code, ErrorCodes.CursorNotFound);
+});
+
+// Test getMore batchSize: 1 actually gives you 1
+c = pscWatch(sdb, "coll", shardId);
+let cid = c._cursorid;
+sdb.coll.insertMany([{}, {}]);
+let res = sdb.runCommand({getMore: cid, collection: "coll", batchSize: 1});
+assert.commandWorked(res);
+assert.eq(res.cursor.nextBatch.length, 1);
+
+st.stop();
+
+// Isolated from events on other shards.
+[sdb, st, shardId] = setupShardedCluster(2);
+c = pscWatch(sdb, "coll", shardId);
+
+sdb.coll.insertOne({location: 5, _id: -2});
+assert(!c.isExhausted());
+assert.soon(() => c.hasNext());
+c.next();
+
+sdb.coll.insertOne({location: 6, _id: 2});
+assert(!c.isExhausted());
+assert(!c.hasNext());
+
+// Isolated from events on other shards with whole db.
+c = pscWatch(sdb.getSiblingDB("admin"), 1, shardId, {}, {allChangesForCluster: true});
+
+sdb.coll.insertOne({location: 7, _id: -3});
+assert(!c.isExhausted());
+assert.soon(() => c.hasNext());
+c.next();
+
+sdb.coll2.insertOne({location: 8, _id: -4});
+assert(!c.isExhausted());
+assert.soon(() => c.hasNext());
+c.next();
+
+sdb.coll.insertOne({location: 9, _id: 3});
+assert(!c.isExhausted());
+assert(!c.hasNext());
+
+sdb.coll2.insertOne({location: 10, _id: 4});
+assert(!c.isExhausted());
+assert(!c.hasNext());
+
+st.stop();
+})();
diff --git a/src/mongo/db/exec/change_stream_proxy.cpp b/src/mongo/db/exec/change_stream_proxy.cpp
index 0de28de0b55..5bef2cd87ce 100644
--- a/src/mongo/db/exec/change_stream_proxy.cpp
+++ b/src/mongo/db/exec/change_stream_proxy.cpp
@@ -79,8 +79,9 @@ boost::optional<BSONObj> ChangeStreamProxyStage::getNextBson() {
}
BSONObj ChangeStreamProxyStage::_validateAndConvertToBSON(const Document& event) const {
- // If we are producing output to be merged on mongoS, then no stages can have modified the _id.
- if (_includeMetaData) {
+ // If we are producing output to be merged on mongoS, then the _id cannot have been modified by
+ // any stage.
+ if (_includeMetaData && !_pipeline->getContext()->forPerShardCursor) {
return event.toBsonWithMetaData();
}
// Confirm that the document _id field matches the original resume token in the sort key field.
@@ -97,6 +98,13 @@ BSONObj ChangeStreamProxyStage::_validateAndConvertToBSON(const Document& event)
<< BSON("_id" << resumeToken) << " but found: "
<< (eventBSON["_id"] ? BSON("_id" << eventBSON["_id"]) : BSONObj()),
idField.binaryEqual(resumeToken));
+ // If we are producing output to be merged on mongos, then metadata must also be returned. We
+ // must do the above check first since if the request is for a per shard cursor, a project can
+ // be pushed down to a shard.
+ if (_pipeline->getContext()->forPerShardCursor) {
+ return event.toBsonWithMetaData();
+ }
+
return eventBSON;
}
diff --git a/src/mongo/db/exec/pipeline_proxy.cpp b/src/mongo/db/exec/pipeline_proxy.cpp
index 8639307271d..0e3531db219 100644
--- a/src/mongo/db/exec/pipeline_proxy.cpp
+++ b/src/mongo/db/exec/pipeline_proxy.cpp
@@ -58,7 +58,8 @@ PipelineProxyStage::PipelineProxyStage(OperationContext* opCtx,
const char* stageTypeName)
: PlanStage(stageTypeName, opCtx),
_pipeline(std::move(pipeline)),
- _includeMetaData(_pipeline->getContext()->needsMerge), // send metadata to merger
+ _includeMetaData(_pipeline->getContext()->needsMerge ||
+ _pipeline->getContext()->forPerShardCursor), // send metadata to merger
_ws(ws) {
// We take over responsibility for disposing of the Pipeline, since it is required that
// doDispose() will be called before destruction of this PipelineProxyStage.
diff --git a/src/mongo/db/pipeline/aggregation_request.cpp b/src/mongo/db/pipeline/aggregation_request.cpp
index 9733168fb51..608554d4645 100644
--- a/src/mongo/db/pipeline/aggregation_request.cpp
+++ b/src/mongo/db/pipeline/aggregation_request.cpp
@@ -215,6 +215,31 @@ StatusWith<AggregationRequest> AggregationRequest::parseFromBSON(
} catch (const DBException& ex) {
return ex.toStatus();
}
+ } else if (kPassthroughToShardName == fieldName) {
+ if (elem.type() != BSONType::Object) {
+ return {ErrorCodes::TypeMismatch,
+ str::stream()
+ << fieldName << " must be an object, not a " << typeName(elem.type())};
+ }
+ for (auto&& innerElem : elem.Obj()) {
+ if (innerElem.fieldName() != "shard"_sd) {
+ return {ErrorCodes::FailedToParse,
+ str::stream() << fieldName << "." << innerElem.fieldName()
+ << " is an unexpected field"};
+ }
+ }
+ auto shardIdElem = elem["shard"_sd];
+ if (shardIdElem.eoo()) {
+ return {ErrorCodes::FailedToParse,
+ str::stream()
+ << fieldName
+ << ".shard was missing. Must specify a shard to pass through to"};
+ } else if (shardIdElem.type() != BSONType::String) {
+ return {ErrorCodes::TypeMismatch,
+ str::stream() << fieldName << ".shard must be a string, not a "
+ << typeName(shardIdElem.type())};
+ }
+ request.setPassthroughToShard(ShardId(shardIdElem.String()));
} else if (bypassDocumentValidationCommandOption() == fieldName) {
request.setBypassDocumentValidation(elem.trueValue());
} else if (WriteConcernOptions::kWriteConcernField == fieldName) {
@@ -341,6 +366,9 @@ Document AggregationRequest::serializeToCommandObj() const {
// Only serialize runtime constants if any were specified.
{kRuntimeConstants, _runtimeConstants ? Value(_runtimeConstants->toBSON()) : Value()},
{kUseNewUpsert, _useNewUpsert ? Value(true) : Value()},
- };
+ {kPassthroughToShardName,
+ _passthroughToShard.has_value()
+ ? Value(Document{{"shard", _passthroughToShard->toString()}})
+ : Value()}};
}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/aggregation_request.h b/src/mongo/db/pipeline/aggregation_request.h
index b15a06f898c..e498d02c682 100644
--- a/src/mongo/db/pipeline/aggregation_request.h
+++ b/src/mongo/db/pipeline/aggregation_request.h
@@ -39,6 +39,7 @@
#include "mongo/db/pipeline/runtime_constants_gen.h"
#include "mongo/db/query/explain_options.h"
#include "mongo/db/write_concern_options.h"
+#include "mongo/s/shard_id.h"
namespace mongo {
@@ -64,6 +65,7 @@ public:
static constexpr StringData kHintName = "hint"_sd;
static constexpr StringData kCommentName = "comment"_sd;
static constexpr StringData kExchangeName = "exchange"_sd;
+ static constexpr StringData kPassthroughToShardName = "$_passthroughToShard"_sd;
static constexpr StringData kRuntimeConstants = "runtimeConstants"_sd;
static constexpr StringData kUseNewUpsert = "useNewUpsert"_sd;
@@ -243,6 +245,10 @@ public:
return _useNewUpsert;
}
+ boost::optional<ShardId> getPassthroughToShard() const {
+ return _passthroughToShard;
+ }
+
//
// Setters for optional fields.
//
@@ -319,6 +325,10 @@ public:
_useNewUpsert = useNewUpsert;
}
+ void setPassthroughToShard(boost::optional<ShardId> shardId) {
+ _passthroughToShard = shardId;
+ }
+
private:
// Required fields.
const NamespaceString _nss;
@@ -376,5 +386,7 @@ private:
// Indicates whether the aggregation may use the new 'upsertSupplied' mechanism when running
// $merge stages. Versions of mongoS from 4.2.2 onwards always set this flag.
bool _useNewUpsert = false;
+
+ boost::optional<ShardId> _passthroughToShard;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/aggregation_request_test.cpp b/src/mongo/db/pipeline/aggregation_request_test.cpp
index 359ee994f4b..6e624272737 100644
--- a/src/mongo/db/pipeline/aggregation_request_test.cpp
+++ b/src/mongo/db/pipeline/aggregation_request_test.cpp
@@ -61,7 +61,8 @@ TEST(AggregationRequestTest, ShouldParseAllKnownOptions) {
"needsMerge: true, mergeByPBRT: true, bypassDocumentValidation: true, collation: {locale: "
"'en_US'}, cursor: {batchSize: 10}, hint: {a: 1}, maxTimeMS: 100, readConcern: {level: "
"'linearizable'}, $queryOptions: {$readPreference: 'nearest'}, comment: 'agg_comment', "
- "exchange: {policy: 'roundrobin', consumers:NumberInt(2)}}");
+ "exchange: {policy: 'roundrobin', consumers:NumberInt(2)}, $_passthroughToShard: "
+ "{shard: 'foo'}}");
auto request = unittest::assertGet(AggregationRequest::parseFromBSON(nss, inputBson));
ASSERT_FALSE(request.getExplain());
ASSERT_TRUE(request.shouldAllowDiskUse());
@@ -512,6 +513,20 @@ TEST(AggregationRequestTest, ShouldRejectInvalidWriteConcern) {
fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, writeConcern: 'invalid'}");
ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
}
+
+TEST(AggregationRequestTest, ShouldRejectInvalidPassthroughToShard) {
+ NamespaceString nss("a.collection");
+ const BSONObj inputBson =
+ fromjson("{pipeline: [{$changeStream: {}}], cursor: {}, passthroughToShard: {foo: 'f'}}");
+ ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
+ const BSONObj inputBson2 =
+ fromjson("{pipeline: [{$changeStream: {}}], cursor: {}, passthroughToShard: {shard: 5}}");
+ ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson2).getStatus());
+ const BSONObj inputBson3 =
+ fromjson("{pipeline: [{$changeStream: {}}], cursor: {}, passthroughToShard: {}}");
+ ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson3).getStatus());
+}
+
//
// Ignore fields parsed elsewhere.
//
diff --git a/src/mongo/db/pipeline/expression_context.cpp b/src/mongo/db/pipeline/expression_context.cpp
index 1bc7186eec5..41db43edf93 100644
--- a/src/mongo/db/pipeline/expression_context.cpp
+++ b/src/mongo/db/pipeline/expression_context.cpp
@@ -56,6 +56,7 @@ ExpressionContext::ExpressionContext(OperationContext* opCtx,
needsMerge = request.needsMerge();
mergeByPBRT = request.mergeByPBRT();
allowDiskUse = request.shouldAllowDiskUse();
+ forPerShardCursor = request.getPassthroughToShard().has_value();
bypassDocumentValidation = request.shouldBypassDocumentValidation();
ns = request.getNamespaceString();
mongoProcessInterface = std::move(processInterface);
diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h
index 8717d4d518c..674b632d7f7 100644
--- a/src/mongo/db/pipeline/expression_context.h
+++ b/src/mongo/db/pipeline/expression_context.h
@@ -239,6 +239,7 @@ public:
bool needsMerge = false;
bool mergeByPBRT = false;
bool inMongos = false;
+ bool forPerShardCursor = false;
bool allowDiskUse = false;
bool bypassDocumentValidation = false;
bool inMultiDocumentTransaction = false;
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
index d9e10e6eecc..4f1ffd77e88 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -77,27 +77,49 @@ BSONObj createPassthroughCommandForShard(OperationContext* opCtx,
const AggregationRequest& request,
const boost::optional<RuntimeConstants>& constants,
Pipeline* pipeline,
- BSONObj collationObj) {
+ BSONObj collationObj,
+ bool forPerShardCursor,
+ boost::optional<int> overrideBatchSize) {
// Create the command for the shards.
- MutableDocument targetedCmd(request.serializeToCommandObj());
+ auto serializedCommand = request.serializeToCommandObj();
+ MutableDocument targetedCmd(serializedCommand);
if (pipeline) {
targetedCmd[AggregationRequest::kPipelineName] = Value(pipeline->serialize());
}
+ if (forPerShardCursor) {
+ // If this is a change stream aggregation, set the 'mergeByPBRT' flag on the command. This
+ // notifies the shards that the mongoS is capable of merging streams based on resume token.
+ // TODO SERVER-38539: the 'mergeByPBRT' flag is no longer necessary in 4.4.
+ targetedCmd[AggregationRequest::kMergeByPBRTName] = Value(true);
+ }
+
+ if (overrideBatchSize.has_value()) {
+ if (serializedCommand[AggregationRequest::kCursorName].missing()) {
+ targetedCmd[AggregationRequest::kCursorName] =
+ Value(DOC(AggregationRequest::kBatchSizeName << Value(*overrideBatchSize)));
+ } else {
+ targetedCmd[AggregationRequest::kCursorName][AggregationRequest::kBatchSizeName] =
+ Value(*overrideBatchSize);
+ }
+ }
return genericTransformForShards(
- std::move(targetedCmd), opCtx, request, constants, collationObj);
+ std::move(targetedCmd), opCtx, request, constants, collationObj, forPerShardCursor);
}
BSONObj genericTransformForShards(MutableDocument&& cmdForShards,
OperationContext* opCtx,
const AggregationRequest& request,
const boost::optional<RuntimeConstants>& constants,
- BSONObj collationObj) {
+ BSONObj collationObj,
+ bool forPerShardCursor) {
if (constants) {
cmdForShards[AggregationRequest::kRuntimeConstants] = Value(constants.get().toBSON());
}
- cmdForShards[AggregationRequest::kFromMongosName] = Value(true);
+ if (!forPerShardCursor) {
+ cmdForShards[AggregationRequest::kFromMongosName] = Value(true);
+ }
// If this is a request for an aggregation explain, then we must wrap the aggregate inside an
// explain command.
if (auto explainVerbosity = request.getExplain()) {
@@ -286,8 +308,13 @@ DispatchShardPipelineResults dispatchShardPipeline(
exchangeSpec,
expCtx->getRuntimeConstants(),
true)
- : createPassthroughCommandForShard(
- opCtx, aggRequest, expCtx->getRuntimeConstants(), pipeline.get(), collationObj);
+ : createPassthroughCommandForShard(opCtx,
+ aggRequest,
+ expCtx->getRuntimeConstants(),
+ pipeline.get(),
+ collationObj,
+ false,
+ boost::none);
// In order for a $changeStream to work reliably, we need the shard registry to be at least as
// current as the logical time at which the pipeline was serialized to 'targetedCommand' above.
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.h b/src/mongo/db/pipeline/sharded_agg_helpers.h
index 15e0dd51c2e..ed362cf1e0c 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.h
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.h
@@ -117,13 +117,16 @@ BSONObj createPassthroughCommandForShard(OperationContext* opCtx,
const AggregationRequest& request,
const boost::optional<RuntimeConstants>& constants,
Pipeline* pipeline,
- BSONObj collationObj);
+ BSONObj collationObj,
+ bool forPerShardCursor,
+ boost::optional<int> overrideBatchSize);
BSONObj genericTransformForShards(MutableDocument&& cmdForShards,
OperationContext* opCtx,
const AggregationRequest& request,
const boost::optional<RuntimeConstants>& constants,
- BSONObj collationObj);
+ BSONObj collationObj,
+ bool forPerShardCursor = false);
/**
* For a sharded collection, establishes remote cursors on each shard that may have results, and
diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp
index 74f2132b4b0..390ccae608c 100644
--- a/src/mongo/s/query/cluster_aggregate.cpp
+++ b/src/mongo/s/query/cluster_aggregate.cpp
@@ -42,6 +42,7 @@
#include "mongo/db/curop.h"
#include "mongo/db/logical_clock.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/pipeline/change_stream_constants.h"
#include "mongo/db/pipeline/document_source_change_stream.h"
#include "mongo/db/pipeline/document_source_out.h"
#include "mongo/db/pipeline/expression_context.h"
@@ -863,6 +864,28 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
resolveInvolvedNamespaces(opCtx, litePipe, request.getNamespaceString());
auto status = [&]() {
+ if (request.getPassthroughToShard().has_value()) {
+ uassert(6273801,
+ "per shard cursor pipeline must contain $changeStream",
+ litePipe.hasChangeStream());
+
+ // Make sure the rest of the pipeline can be pushed down.
+ auto pipeline = request.getPipeline();
+ std::vector<BSONObj> nonChangeStreamPart(pipeline.begin() + 1, pipeline.end());
+ LiteParsedPipeline nonChangeStreamLite(
+ AggregationRequest(request.getNamespaceString(), nonChangeStreamPart));
+ uassert(6273802,
+ "$_passthroughToShard specified with a stage that is not allowed to "
+ "passthrough from mongos",
+ nonChangeStreamLite.allowedToPassthroughFromMongos());
+ ShardId shardId = *request.getPassthroughToShard();
+ uassert(6273803,
+ "$_passthroughToShard not supported for queries against config replica set",
+ shardId != ShardRegistry::kConfigServerShardId);
+
+ return aggPassthrough(
+ opCtx, namespaces, shardId, request, litePipe, privileges, result, true);
+ }
// A pipeline is allowed to passthrough to the primary shard iff the following conditions
// are met:
//
@@ -874,7 +897,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
litePipe.allowedToPassthroughFromMongos() && !involvesShardedCollections) {
const auto primaryShardId = routingInfo->db().primary()->getId();
return aggPassthrough(
- opCtx, namespaces, primaryShardId, request, litePipe, privileges, result);
+ opCtx, namespaces, primaryShardId, request, litePipe, privileges, result, false);
}
// Populate the collection UUID and the appropriate collation to use.
@@ -967,19 +990,30 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx,
const AggregationRequest& aggRequest,
const LiteParsedPipeline& liteParsedPipeline,
const PrivilegeVector& privileges,
- BSONObjBuilder* out) {
+ BSONObjBuilder* out,
+ bool forPerShardCursor) {
// Format the command for the shard. This adds the 'fromMongos' field, wraps the command as an
// explain if necessary, and rewrites the result into a format safe to forward to shards.
BSONObj cmdObj = CommandHelpers::filterCommandRequestForPassthrough(
sharded_agg_helpers::createPassthroughCommandForShard(
- opCtx, aggRequest, boost::none, nullptr, BSONObj()));
+ opCtx,
+ aggRequest,
+ boost::none,
+ nullptr,
+ BSONObj(),
+ forPerShardCursor,
+ forPerShardCursor ? boost::optional<int>(0) : boost::none));
+
+ uassert(6900400,
+ "shouldn't have fromMongos set for per shard cursor",
+ !forPerShardCursor || cmdObj[AggregationRequest::kFromMongosName].eoo());
MultiStatementTransactionRequestsSender ars(
opCtx,
Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
namespaces.executionNss.db().toString(),
{{shardId,
- shardId != ShardRegistry::kConfigServerShardId
+ shardId != ShardRegistry::kConfigServerShardId && !forPerShardCursor
? appendShardVersion(std::move(cmdObj), ChunkVersion::UNSHARDED())
: std::move(cmdObj)}},
ReadPreferenceSetting::get(opCtx),
@@ -1005,16 +1039,18 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx,
auto tailMode = liteParsedPipeline.hasChangeStream()
? TailableModeEnum::kTailableAndAwaitData
: TailableModeEnum::kNormal;
- result = uassertStatusOK(
- storePossibleCursor(opCtx,
- shardId,
- *response.shardHostAndPort,
- response.swResponse.getValue().data,
- namespaces.requestedNss,
- Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
- Grid::get(opCtx)->getCursorManager(),
- privileges,
- tailMode));
+ result = uassertStatusOK(storePossibleCursor(
+ opCtx,
+ shardId,
+ *response.shardHostAndPort,
+ response.swResponse.getValue().data,
+ namespaces.requestedNss,
+ Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ Grid::get(opCtx)->getCursorManager(),
+ privileges,
+ tailMode,
+ forPerShardCursor ? boost::optional<BSONObj>(change_stream_constants::kSortSpec)
+ : boost::none));
}
// First append the properly constructed writeConcernError. It will then be skipped
diff --git a/src/mongo/s/query/cluster_aggregate.h b/src/mongo/s/query/cluster_aggregate.h
index 9d605043def..2d02f0f6624 100644
--- a/src/mongo/s/query/cluster_aggregate.h
+++ b/src/mongo/s/query/cluster_aggregate.h
@@ -121,7 +121,8 @@ private:
const AggregationRequest&,
const LiteParsedPipeline&,
const PrivilegeVector& privileges,
- BSONObjBuilder* result);
+ BSONObjBuilder* result,
+ bool forPerShardCursor);
};
} // namespace mongo
diff --git a/src/mongo/s/query/router_exec_stage.h b/src/mongo/s/query/router_exec_stage.h
index 5b611623ea9..5b99a4f668e 100644
--- a/src/mongo/s/query/router_exec_stage.h
+++ b/src/mongo/s/query/router_exec_stage.h
@@ -126,7 +126,7 @@ public:
* Returns the postBatchResumeToken if this RouterExecStage tree is executing a $changeStream;
* otherwise, returns an empty BSONObj. Default implementation forwards to the stage's child.
*/
- virtual BSONObj getPostBatchResumeToken() const {
+ virtual BSONObj getPostBatchResumeToken() {
return _child ? _child->getPostBatchResumeToken() : BSONObj();
}
diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h
index a7cead17d89..861c170277d 100644
--- a/src/mongo/s/query/router_stage_merge.h
+++ b/src/mongo/s/query/router_stage_merge.h
@@ -67,6 +67,10 @@ public:
return _resultsMerger.getNumRemotes();
}
+ BSONObj getPostBatchResumeToken() final {
+ return _resultsMerger.getHighWaterMark();
+ }
+
protected:
Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final {
return _resultsMerger.setAwaitDataTimeout(awaitDataTimeout);
diff --git a/src/mongo/s/query/router_stage_pipeline.cpp b/src/mongo/s/query/router_stage_pipeline.cpp
index aaaad0c3e96..deab88c2720 100644
--- a/src/mongo/s/query/router_stage_pipeline.cpp
+++ b/src/mongo/s/query/router_stage_pipeline.cpp
@@ -86,7 +86,7 @@ std::size_t RouterStagePipeline::getNumRemotes() const {
return 0;
}
-BSONObj RouterStagePipeline::getPostBatchResumeToken() const {
+BSONObj RouterStagePipeline::getPostBatchResumeToken() {
return _mergeCursorsStage ? _mergeCursorsStage->getHighWaterMark() : BSONObj();
}
diff --git a/src/mongo/s/query/router_stage_pipeline.h b/src/mongo/s/query/router_stage_pipeline.h
index de0dc25b310..88f5930e3d5 100644
--- a/src/mongo/s/query/router_stage_pipeline.h
+++ b/src/mongo/s/query/router_stage_pipeline.h
@@ -53,7 +53,7 @@ public:
std::size_t getNumRemotes() const final;
- BSONObj getPostBatchResumeToken() const final;
+ BSONObj getPostBatchResumeToken() final;
protected:
Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
diff --git a/src/mongo/s/query/store_possible_cursor.cpp b/src/mongo/s/query/store_possible_cursor.cpp
index 9877af451a2..d033898833a 100644
--- a/src/mongo/s/query/store_possible_cursor.cpp
+++ b/src/mongo/s/query/store_possible_cursor.cpp
@@ -77,7 +77,8 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
std::shared_ptr<executor::TaskExecutor> executor,
ClusterCursorManager* cursorManager,
PrivilegeVector privileges,
- TailableModeEnum tailableMode) {
+ TailableModeEnum tailableMode,
+ boost::optional<BSONObj> routerSort) {
if (!cmdResult["ok"].trueValue() || !cmdResult.hasField("cursor")) {
return cmdResult;
}
@@ -104,14 +105,22 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
auto& remoteCursor = params.remotes.back();
remoteCursor.setShardId(shardId.toString());
remoteCursor.setHostAndPort(server);
- remoteCursor.setCursorResponse(CursorResponse(incomingCursorResponse.getValue().getNSS(),
- incomingCursorResponse.getValue().getCursorId(),
- {}));
+ remoteCursor.setCursorResponse(
+ CursorResponse(incomingCursorResponse.getValue().getNSS(),
+ incomingCursorResponse.getValue().getCursorId(),
+ {}, /* batch */
+ incomingCursorResponse.getValue().getNumReturnedSoFar(),
+ incomingCursorResponse.getValue().getLastOplogTimestamp(),
+ incomingCursorResponse.getValue().getPostBatchResumeToken(),
+ incomingCursorResponse.getValue().getWriteConcernError()));
params.originatingCommandObj = CurOp::get(opCtx)->opDescription().getOwned();
params.tailableMode = tailableMode;
params.lsid = opCtx->getLogicalSessionId();
params.txnNumber = opCtx->getTxnNumber();
params.originatingPrivileges = std::move(privileges);
+ if (routerSort) {
+ params.sort = *routerSort;
+ }
if (TransactionRouter::get(opCtx)) {
params.isAutoCommit = false;
@@ -137,7 +146,13 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
CurOp::get(opCtx)->debug().cursorid = clusterCursorId.getValue();
CursorResponse outgoingCursorResponse(
- requestedNss, clusterCursorId.getValue(), incomingCursorResponse.getValue().getBatch());
+ requestedNss,
+ clusterCursorId.getValue(),
+ incomingCursorResponse.getValue().getBatch(),
+ incomingCursorResponse.getValue().getNumReturnedSoFar(),
+ incomingCursorResponse.getValue().getLastOplogTimestamp(),
+ incomingCursorResponse.getValue().getPostBatchResumeToken(),
+ incomingCursorResponse.getValue().getWriteConcernError());
return outgoingCursorResponse.toBSON(CursorResponse::ResponseType::InitialResponse);
}
diff --git a/src/mongo/s/query/store_possible_cursor.h b/src/mongo/s/query/store_possible_cursor.h
index 43157322b0b..0bbaddfeaab 100644
--- a/src/mongo/s/query/store_possible_cursor.h
+++ b/src/mongo/s/query/store_possible_cursor.h
@@ -72,6 +72,8 @@ class TaskExecutor;
* @ cursorManager the ClusterCursorManager on which to register the resulting ClusterClientCursor
* @ privileges the PrivilegeVector of privileges needed for the original command, to be used for
* auth checking by GetMore
+ * @ routerSort the sort to apply on the router. With only one cursor this shouldn't be common, but
+ * is needed to set up change stream post-batch resume tokens correctly for per shard cursors.
*/
StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
const ShardId& shardId,
@@ -81,7 +83,8 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
std::shared_ptr<executor::TaskExecutor> executor,
ClusterCursorManager* cursorManager,
PrivilegeVector privileges,
- TailableModeEnum tailableMode = TailableModeEnum::kNormal);
+ TailableModeEnum tailableMode = TailableModeEnum::kNormal,
+ boost::optional<BSONObj> routerSort = boost::none);
/**
* Convenience function which extracts all necessary information from the passed RemoteCursor, and