summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMickey. J Winters <mickey.winters@mongodb.com>2022-11-17 17:49:17 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-11-17 18:18:26 +0000
commitaeb1f951af29d0b577943b6318af20169a93134b (patch)
tree7265d099047c11d0efbdc2bd5a3f73b27ba830ce
parent9348dbff25128de576b94b76997fcefd997eb4b3 (diff)
downloadmongo-aeb1f951af29d0b577943b6318af20169a93134b.tar.gz
SERVER-70084 add per shard cursor multiversion sort key test and correct psc sortKey behavior
-rw-r--r--jstests/multiVersion/change_streams_multi_version_sortkey.js58
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp2
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp2
-rw-r--r--src/mongo/s/query/cluster_aggregate.cpp1
4 files changed, 40 insertions, 23 deletions
diff --git a/jstests/multiVersion/change_streams_multi_version_sortkey.js b/jstests/multiVersion/change_streams_multi_version_sortkey.js
index 24f34f82550..524c08a9536 100644
--- a/jstests/multiVersion/change_streams_multi_version_sortkey.js
+++ b/jstests/multiVersion/change_streams_multi_version_sortkey.js
@@ -47,28 +47,44 @@ st.shardColl(collName,
// 'changeStream' cursor, verify that the change stream results have monotonically increasing
// timestamps, and return the resume token.
var nextId = 0;
-function insertAndValidateChanges(coll, changeStream) {
+function insertAndValidateChanges(coll, options) {
+ const changeStream = coll.watch([], options);
+ // Also set up a per shard cursor.
+ const pscRes = coll.getDB().runCommand({
+ aggregate: coll.getName(),
+ cursor: {},
+ pipeline: [{$changeStream: options}],
+ $_passthroughToShard: {shard: st.shard0.shardName}
+ });
+ assert.commandWorked(pscRes);
+ const psc = new DBCommandCursor(coll.getDB(), pscRes);
+
const docsToInsert =
- Array.from({length: 10}, (_, i) => ({_id: nextId + i, shard: i % 2, val: i}));
+ Array.from({length: 10}, (_, i) => ({_id: nextId + i, shard: i % 2 + 1, val: i}));
nextId += docsToInsert.length;
assert.commandWorked(coll.insert(docsToInsert));
- const changeList = [];
- assert.soon(function() {
- while (changeStream.hasNext()) {
- const change = changeStream.next();
- changeList.push(change);
- }
-
- return changeList.length === docsToInsert.length;
- }, changeList);
-
- for (let i = 0; i + 1 < changeList.length; ++i) {
- assert(timestampCmp(changeList[i].clusterTime, changeList[i + 1].clusterTime) <= 0,
- "Change timestamps are not monotonically increasing: " + tojson(changeList));
- }
- // TODO SERVER-70084 update this test to check PSCs work with mixed (4.4/4.2) clusters.
+ [{changeStream, expectedNEvents: docsToInsert.length, type: "normal"},
+ {changeStream: psc, expectedNEvents: docsToInsert.length / 2, type: "per-shard"}]
+ .forEach(({changeStream, expectedNEvents, type}) => {
+ jsTestLog(`Validating ${type} change stream`);
+ const changeList = [];
+ assert.soon(function() {
+ while (changeStream.hasNext()) {
+ const change = changeStream.next();
+ changeList.push(change);
+ }
+ jsTestLog(`Have ${changeList.length} of ${expectedNEvents} expected events`);
+ return changeList.length === expectedNEvents;
+ }, changeList);
+
+ for (let i = 0; i + 1 < changeList.length; ++i) {
+ assert(
+ timestampCmp(changeList[i].clusterTime, changeList[i + 1].clusterTime) <= 0,
+ `Change timestamps are not monotonically increasing: ${tojson(changeList)}`);
+ }
+ });
return changeStream.getResumeToken();
}
@@ -77,7 +93,7 @@ function insertAndValidateChanges(coll, changeStream) {
// Open and read a change stream on the "last-stable" cluster.
//
let coll = mongosConn.getDB(dbName)[collName];
-let resumeToken = insertAndValidateChanges(coll, coll.watch());
+let resumeToken = insertAndValidateChanges(coll, {});
//
// Upgrade the config db and the shards to the "latest" binVersion.
@@ -90,7 +106,7 @@ st.upgradeCluster(
// Open and read a change stream on the upgraded cluster but still using a "last-stable" version of
// mongos and "last-stable" for the FCV.
//
-resumeToken = insertAndValidateChanges(coll, coll.watch([], {resumeAfter: resumeToken}));
+resumeToken = insertAndValidateChanges(coll, {resumeAfter: resumeToken});
//
// Upgrade mongos to the "latest" binVersion and then open and read a change stream, this time with
@@ -102,7 +118,7 @@ st.upgradeCluster(
mongosConn = st.s;
coll = mongosConn.getDB(dbName)[collName];
-resumeToken = insertAndValidateChanges(coll, coll.watch([], {resumeAfter: resumeToken}));
+resumeToken = insertAndValidateChanges(coll, {resumeAfter: resumeToken});
//
// Set the FCV to the "latest" version, and then open and read a change stream on the completely
@@ -116,7 +132,7 @@ checkFCV(st.rs1.getPrimary().getDB("admin"), latestFCV);
//
// Open and read a change stream on the upgraded cluster.
//
-resumeToken = insertAndValidateChanges(coll, coll.watch([], {resumeAfter: resumeToken}));
+resumeToken = insertAndValidateChanges(coll, {resumeAfter: resumeToken});
st.stop();
}());
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index d2fa515a6a6..b85fa27d6e0 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -668,7 +668,7 @@ Status runAggregate(OperationContext* opCtx,
// generated this query. We determine the version by checking for the "use44SortKeys" flag
// in the aggregation request.
// TODO (SERVER-43361): This check will be unnecessary after branching for 4.5.
- if (expCtx->fromMongos) {
+ if (expCtx->fromMongos || request.getPassthroughToShard().has_value()) {
if (request.getUse44SortKeys()) {
// This request originated with 4.4-or-newer mongoS, which can understand the new
// sort key format. Note: it's possible that merging will actually occur on a mongoD
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
index 03ce73828b3..18ca234437d 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -153,7 +153,7 @@ BSONObj genericTransformForShards(MutableDocument&& cmdForShards,
Value(static_cast<long long>(*expCtx->opCtx->getTxnNumber()));
}
- if (expCtx->inMongos) {
+ if (expCtx->inMongos || expCtx->forPerShardCursor) {
// TODO (SERVER-43361): We set this flag to indicate to the shards that the mongos will be
// able to understand change stream sort keys in the new format. After branching for 4.5,
// there will only be one sort key format for changes streams, so there will be no need to
diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp
index b5ea6ce04ed..a7b57ce974d 100644
--- a/src/mongo/s/query/cluster_aggregate.cpp
+++ b/src/mongo/s/query/cluster_aggregate.cpp
@@ -347,6 +347,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
kSpecificShardOnly: {
// Mark expCtx as tailable and await data so CCC behaves accordingly.
expCtx->tailableMode = TailableModeEnum::kTailableAndAwaitData;
+ expCtx->forPerShardCursor = true;
uassert(6273801,
"per shard cursor pipeline must contain $changeStream",