summaryrefslogtreecommitdiff
path: root/jstests/noPassthrough/change_streams_per_shard_cursor.js
diff options
context:
space:
mode:
Diffstat (limited to 'jstests/noPassthrough/change_streams_per_shard_cursor.js')
-rw-r--r--jstests/noPassthrough/change_streams_per_shard_cursor.js216
1 files changed, 216 insertions, 0 deletions
diff --git a/jstests/noPassthrough/change_streams_per_shard_cursor.js b/jstests/noPassthrough/change_streams_per_shard_cursor.js
new file mode 100644
index 00000000000..0d1e0e2fceb
--- /dev/null
+++ b/jstests/noPassthrough/change_streams_per_shard_cursor.js
@@ -0,0 +1,216 @@
+/**
+ * @tags: [
+ * requires_sharding,
+ * uses_change_streams,
+ * ]
+ */
+(function() {
+"use strict";
+
+const checkPerShardCursorEnabled = () => {
+ const conn = MongoRunner.runMongod();
+ const res = conn.adminCommand({
+ getParameter: 1,
+ featureFlagPerShardCursor: 1,
+ });
+ MongoRunner.stopMongod(conn);
+ return res.featureFlagPerShardCursor.value;
+};
+
+const dbName = jsTestName();
+const setupShardedCluster = (shards = 1) => {
+ const st = new ShardingTest(
+ {shards, mongos: 1, config: 1, rs: {nodes: 1, setParameter: {writePeriodicNoops: false}}});
+ const sdb = st.s0.getDB(dbName);
+ assert.commandWorked(sdb.dropDatabase());
+
+ sdb.setProfilingLevel(0, -1);
+ st.shard0.getDB(dbName).setProfilingLevel(0, -1);
+
+ // Shard the relevant collections.
+ assert.commandWorked(st.s.adminCommand({enableSharding: dbName}));
+ st.ensurePrimaryShard(dbName, st.shard0.name);
+ if (shards === 2) {
+ // Shard the collection on {_id: 1}, split at {_id: 0} and move the empty upper chunk to
+ // shard1.
+ st.shardColl("coll", {_id: 1}, {_id: 0}, {_id: 0}, dbName);
+ st.shardColl("coll2", {_id: 1}, {_id: 0}, {_id: 0}, dbName);
+ } else {
+ assert(shards === 1, "only 1 or 2 shards supported");
+ assert.commandWorked(st.s.adminCommand({shardCollection: dbName + ".coll", key: {_id: 1}}));
+ assert.commandWorked(
+ st.s.adminCommand({shardCollection: dbName + ".coll2", key: {_id: 1}}));
+ }
+
+ const shardId = st.shard0.shardName;
+ return [sdb, st, shardId];
+};
+
+const pscWatch = (db, coll, shardId, options = {}, csOptions = {}) => {
+ let cmd = {
+ aggregate: coll,
+ cursor: {},
+ pipeline: [{$changeStream: csOptions}],
+ $_passthroughToShard: {shard: shardId}
+ };
+ cmd = Object.assign({}, cmd, options);
+ if (options.pipeline) {
+ cmd.pipeline = [{$changeStream: csOptions}].concat(options.pipeline);
+ }
+ const resp = db.runCommand(cmd);
+ assert.commandWorked(resp);
+ if (options.explain) {
+ return resp;
+ }
+ return new DBCommandCursor(db, resp);
+};
+
+if (!checkPerShardCursorEnabled()) {
+ let [sdb, st, shardId] = setupShardedCluster();
+
+ // Should only work with feature flag on.
+ assert.commandFailedWithCode(assert.throws(() => pscWatch(sdb, "coll", shardId)), 6273800);
+ st.stop();
+ jsTestLog("Skipping the rest of the test because featureFlagPerSardCursor is not enabled");
+ return;
+}
+
+// Parsing
+let [sdb, st, shardId] = setupShardedCluster();
+
+// Should not allow pipeline without $changeStream.
+assert.commandFailedWithCode(sdb.runCommand({
+ aggregate: "coll",
+ cursor: {},
+ pipeline: [{$match: {perfect: true}}],
+ $_passthroughToShard: {shard: shardId}
+}),
+ 6273801);
+
+// $out can't passthrough so it's not allowed.
+assert.commandFailedWithCode(
+ assert.throws(() => pscWatch(sdb, "coll", shardId, {pipeline: [{$out: "h"}]})), 6273802);
+
+// Shard option should be specified.
+assert.commandFailedWithCode(
+ sdb.runCommand(
+ {aggregate: "coll", cursor: {}, pipeline: [{$changeStream: {}}], $_passthroughToShard: {}}),
+ 40414);
+
+// The shardId field should be a string.
+assert.commandFailedWithCode(assert.throws(() => pscWatch(sdb, "coll", 42)),
+ ErrorCodes.TypeMismatch);
+// Can't open a per shard cursor on the config RS.
+assert.commandFailedWithCode(assert.throws(() => pscWatch(sdb, "coll", "config")), 6273803);
+
+// The shardId should be a valid shard.
+assert.commandFailedWithCode(
+ assert.throws(() => pscWatch(sdb, "coll", "Dwane 'the Shard' Johnson")),
+ ErrorCodes.ShardNotFound);
+
+// Correctness.
+
+// Simple collection level watch
+// this insert shouldn't show up since it happens before we make a cursor.
+sdb.coll.insertOne({location: 1});
+let c = pscWatch(sdb, "coll", shardId);
+// these inserts should show up since they're after we make a cursor.
+for (let i = 1; i <= 4; i++) {
+ sdb.coll.insertOne({location: 2, i});
+ assert(!c.isExhausted());
+ assert(c.hasNext());
+ c.next();
+}
+assert(!c.hasNext());
+
+// Simple database level watch
+c = pscWatch(sdb, 1, shardId);
+
+sdb.coll.insertOne({location: 3});
+assert(!c.isExhausted());
+assert(c.hasNext());
+c.next();
+
+sdb.coll2.insertOne({location: 4});
+assert(!c.isExhausted());
+assert(c.hasNext());
+c.next();
+
+assert(!c.hasNext());
+
+// Watching collection that doesn't exist yet.
+c = pscWatch(sdb, "toBeCreated", shardId);
+assert(!c.isExhausted());
+assert(!c.hasNext());
+
+st.s.adminCommand({shardCollection: dbName + ".toBeCreated", key: {_id: 1}});
+assert(!c.isExhausted());
+assert(!c.hasNext());
+
+sdb.toBeCreated.insertOne({location: 8});
+assert(!c.isExhausted());
+assert(c.hasNext());
+c.next();
+
+assert(!c.hasNext());
+
+// Explain output should not have a split pipeline. It should look like mongod explain output.
+let explainOut = pscWatch(sdb, "coll", shardId, {explain: true});
+assert(!explainOut.hasOwnProperty("splitPipeline"));
+assert.hasOwnProperty(explainOut, "stages");
+
+// If we getMore an invalidated cursor the cursor should have been closed on mongos and we should
+// get CursorNotFound, even if the invalidate event was never recieved by mongos.
+[[], [{$match: {f: "filter out invalidate event"}}]].forEach((pipeline) => {
+ assert.commandWorked(st.s.adminCommand({shardCollection: dbName + ".toDrop", key: {_id: 1}}));
+ let c = pscWatch(sdb, "toDrop", shardId, {pipeline});
+ sdb.toDrop.insertOne({});
+ sdb.toDrop.drop();
+ assert.commandFailedWithCode(
+ assert.throws(() => {
+ assert.retry(() => {
+ c._runGetMoreCommand();
+ return false;
+ }, "change stream should have been invalidated by now", 4);
+ }),
+ ErrorCodes.CursorNotFound);
+});
+
+st.stop();
+
+// Isolated from events on other shards.
+[sdb, st, shardId] = setupShardedCluster(2);
+c = pscWatch(sdb, "coll", shardId);
+
+sdb.coll.insertOne({location: 5, _id: -2});
+assert(!c.isExhausted());
+assert(c.hasNext());
+c.next();
+
+sdb.coll.insertOne({location: 6, _id: 2});
+assert(!c.isExhausted());
+assert(!c.hasNext());
+
+// Isolated from events on other shards with whole db.
+c = pscWatch(sdb.getSiblingDB("admin"), 1, shardId, {}, {allChangesForCluster: true});
+
+sdb.coll.insertOne({location: 7, _id: -3});
+assert(!c.isExhausted());
+assert(c.hasNext());
+c.next();
+
+sdb.coll2.insertOne({location: 8, _id: -4});
+assert(!c.isExhausted());
+assert(c.hasNext());
+c.next();
+
+sdb.coll.insertOne({location: 9, _id: 3});
+assert(!c.isExhausted());
+assert(!c.hasNext());
+
+sdb.coll2.insertOne({location: 10, _id: 4});
+assert(!c.isExhausted());
+assert(!c.hasNext());
+
+st.stop();
+})();