diff options
Diffstat (limited to 'jstests/noPassthrough/change_stream_mongos_explicitly_requests_v1_resume_token.js')
-rw-r--r-- | jstests/noPassthrough/change_stream_mongos_explicitly_requests_v1_resume_token.js | 109 |
1 files changed, 109 insertions, 0 deletions
diff --git a/jstests/noPassthrough/change_stream_mongos_explicitly_requests_v1_resume_token.js b/jstests/noPassthrough/change_stream_mongos_explicitly_requests_v1_resume_token.js new file mode 100644 index 00000000000..5139bff5b56 --- /dev/null +++ b/jstests/noPassthrough/change_stream_mongos_explicitly_requests_v1_resume_token.js @@ -0,0 +1,109 @@ +/** + * Test that mongoS explicitly sets the value of $_generateV2ResumeTokens to 'false' on the commands + * it sends to the shards if no value was specified by the client, and that if a value was + * specified, it forwards it to the shards. On a replica set, no explicit value is set; the + * aggregation simply treats it as default-false. + * TODO SERVER-65370: remove or rework this test when v2 tokens become the default. + * @tags: [ + * uses_change_streams, + * requires_sharding, + * requires_replication, + * requires_wiredtiger, + * ] + */ +(function() { +"use strict"; + +load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateCollection. +load("jstests/libs/profiler.js"); // For profilerHasSingleMatchingEntryOrThrow. + +// Create a sharding fixture with a single one-node replset shard and a one-node replset config +// server. The latter is to ensure that there is only one node that the internal new-shard monitor +// $changeStream can be sent to, since it is dispatched with secondaryPreferred readPreference. +const st = new ShardingTest({shards: 1, rs: {nodes: 1}, config: {nodes: 1}}); + +const mongosDB = st.s.getDB("test"); +const shardDB = st.rs0.getPrimary().getDB(mongosDB.getName()); +const configDB = st.configRS.getPrimary().getDB("config"); + +const mongosColl = assertDropAndRecreateCollection(mongosDB, jsTestName()); +const shardColl = shardDB[mongosColl.getName()]; +const configColl = configDB.shards; + +// Enable profiling on the shard and config server. +assert.commandWorked(shardDB.setProfilingLevel(2)); +assert.commandWorked(configDB.setProfilingLevel(2)); + +// Create one stream on mongoS that returns v1 tokens, the default. +const v1MongosStream = mongosColl.watch([], {comment: "v1MongosStream"}); + +// Create a second stream on mongoS that explicitly requests v2 tokens. +const v2MongosStream = + mongosColl.watch([], {comment: "v2MongosStream", $_generateV2ResumeTokens: true}); + +// Create a stream directly on the shard which returns the default v1 tokens. +const v1ShardStream = shardColl.watch([], {comment: "v1ShardStream"}); + +// Insert a test document into the collection. +assert.commandWorked(mongosColl.insert({_id: 1})); + +// Wait until all streams have encountered the insert operation. +assert.soon(() => v1MongosStream.hasNext() && v2MongosStream.hasNext() && v1ShardStream.hasNext()); + +// Confirm that in a sharded cluster, mongoS explicitly sets $_generateV2ResumeTokens to false on +// the command that it sends to the shards if nothing was specified by the client. +profilerHasAtLeastOneMatchingEntryOrThrow({ + profileDB: shardDB, + filter: { + "originatingCommand.aggregate": mongosColl.getName(), + "originatingCommand.comment": "v1MongosStream", + "originatingCommand.$_generateV2ResumeTokens": false + } +}); + +// Confirm that we also set $_generateV2ResumeTokens to false on the internal new-shard monitoring +// $changeStream that we dispatch to the config servers. +profilerHasAtLeastOneMatchingEntryOrThrow({ + profileDB: configDB, + filter: { + "originatingCommand.aggregate": configColl.getName(), + "originatingCommand.comment": "v1MongosStream", + "originatingCommand.$_generateV2ResumeTokens": false + } +}); + +// Confirm that mongoS correctly forwards the value of $_generateV2ResumeTokens to the shards if it +// is specified by the client. +profilerHasAtLeastOneMatchingEntryOrThrow({ + profileDB: shardDB, + filter: { + "originatingCommand.aggregate": mongosColl.getName(), + "originatingCommand.comment": "v2MongosStream", + "originatingCommand.$_generateV2ResumeTokens": true + } +}); + +// Confirm that we also forward the value of $_generateV2ResumeTokens to the config servers. +profilerHasAtLeastOneMatchingEntryOrThrow({ + profileDB: configDB, + filter: { + "originatingCommand.aggregate": configColl.getName(), + "originatingCommand.comment": "v2MongosStream", + "originatingCommand.$_generateV2ResumeTokens": true + } +}); + +// Confirm that on a replica set - in this case, a direct connection to the shard - no value is set +// for $_generateV2ResumeTokens if the client did not specify one. The aggregation defaults to +// treating the value as false. +profilerHasAtLeastOneMatchingEntryOrThrow({ + profileDB: shardDB, + filter: { + "originatingCommand.aggregate": mongosColl.getName(), + "originatingCommand.comment": "v1ShardStream", + "originatingCommand.$_generateV2ResumeTokens": {$exists: false} + } +}); + +st.stop(); +})();
\ No newline at end of file |