diff options
author | Mickey. J Winters <mickey.winters@mongodb.com> | 2022-11-17 17:49:17 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-11-17 18:18:26 +0000 |
commit | aeb1f951af29d0b577943b6318af20169a93134b (patch) | |
tree | 7265d099047c11d0efbdc2bd5a3f73b27ba830ce | |
parent | 9348dbff25128de576b94b76997fcefd997eb4b3 (diff) | |
download | mongo-aeb1f951af29d0b577943b6318af20169a93134b.tar.gz |
SERVER-70084 add per shard cursor multiversion sort key test and correct psc sortKey behavior
-rw-r--r-- | jstests/multiVersion/change_streams_multi_version_sortkey.js | 58 | ||||
-rw-r--r-- | src/mongo/db/commands/run_aggregate.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/pipeline/sharded_agg_helpers.cpp | 2 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_aggregate.cpp | 1 |
4 files changed, 40 insertions, 23 deletions
diff --git a/jstests/multiVersion/change_streams_multi_version_sortkey.js b/jstests/multiVersion/change_streams_multi_version_sortkey.js index 24f34f82550..524c08a9536 100644 --- a/jstests/multiVersion/change_streams_multi_version_sortkey.js +++ b/jstests/multiVersion/change_streams_multi_version_sortkey.js @@ -47,28 +47,44 @@ st.shardColl(collName, // 'changeStream' cursor, verify that the change stream results have monotonically increasing // timestamps, and return the resume token. var nextId = 0; -function insertAndValidateChanges(coll, changeStream) { +function insertAndValidateChanges(coll, options) { + const changeStream = coll.watch([], options); + // Also set up a per shard cursor. + const pscRes = coll.getDB().runCommand({ + aggregate: coll.getName(), + cursor: {}, + pipeline: [{$changeStream: options}], + $_passthroughToShard: {shard: st.shard0.shardName} + }); + assert.commandWorked(pscRes); + const psc = new DBCommandCursor(coll.getDB(), pscRes); + const docsToInsert = - Array.from({length: 10}, (_, i) => ({_id: nextId + i, shard: i % 2, val: i})); + Array.from({length: 10}, (_, i) => ({_id: nextId + i, shard: i % 2 + 1, val: i})); nextId += docsToInsert.length; assert.commandWorked(coll.insert(docsToInsert)); - const changeList = []; - assert.soon(function() { - while (changeStream.hasNext()) { - const change = changeStream.next(); - changeList.push(change); - } - - return changeList.length === docsToInsert.length; - }, changeList); - - for (let i = 0; i + 1 < changeList.length; ++i) { - assert(timestampCmp(changeList[i].clusterTime, changeList[i + 1].clusterTime) <= 0, - "Change timestamps are not monotonically increasing: " + tojson(changeList)); - } - // TODO SERVER-70084 update this test to check PSCs work with mixed (4.4/4.2) clusters. + [{changeStream, expectedNEvents: docsToInsert.length, type: "normal"}, + {changeStream: psc, expectedNEvents: docsToInsert.length / 2, type: "per-shard"}] + .forEach(({changeStream, expectedNEvents, type}) => { + jsTestLog(`Validating ${type} change stream`); + const changeList = []; + assert.soon(function() { + while (changeStream.hasNext()) { + const change = changeStream.next(); + changeList.push(change); + } + jsTestLog(`Have ${changeList.length} of ${expectedNEvents} expected events`); + return changeList.length === expectedNEvents; + }, changeList); + + for (let i = 0; i + 1 < changeList.length; ++i) { + assert( + timestampCmp(changeList[i].clusterTime, changeList[i + 1].clusterTime) <= 0, + `Change timestamps are not monotonically increasing: ${tojson(changeList)}`); + } + }); return changeStream.getResumeToken(); } @@ -77,7 +93,7 @@ function insertAndValidateChanges(coll, changeStream) { // Open and read a change stream on the "last-stable" cluster. // let coll = mongosConn.getDB(dbName)[collName]; -let resumeToken = insertAndValidateChanges(coll, coll.watch()); +let resumeToken = insertAndValidateChanges(coll, {}); // // Upgrade the config db and the shards to the "latest" binVersion. @@ -90,7 +106,7 @@ st.upgradeCluster( // Open and read a change stream on the upgraded cluster but still using a "last-stable" version of // mongos and "last-stable" for the FCV. // -resumeToken = insertAndValidateChanges(coll, coll.watch([], {resumeAfter: resumeToken})); +resumeToken = insertAndValidateChanges(coll, {resumeAfter: resumeToken}); // // Upgrade mongos to the "latest" binVersion and then open and read a change stream, this time with @@ -102,7 +118,7 @@ st.upgradeCluster( mongosConn = st.s; coll = mongosConn.getDB(dbName)[collName]; -resumeToken = insertAndValidateChanges(coll, coll.watch([], {resumeAfter: resumeToken})); +resumeToken = insertAndValidateChanges(coll, {resumeAfter: resumeToken}); // // Set the FCV to the "latest" version, and then open and read a change stream on the completely @@ -116,7 +132,7 @@ checkFCV(st.rs1.getPrimary().getDB("admin"), latestFCV); // // Open and read a change stream on the upgraded cluster. // -resumeToken = insertAndValidateChanges(coll, coll.watch([], {resumeAfter: resumeToken})); +resumeToken = insertAndValidateChanges(coll, {resumeAfter: resumeToken}); st.stop(); }()); diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index d2fa515a6a6..b85fa27d6e0 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -668,7 +668,7 @@ Status runAggregate(OperationContext* opCtx, // generated this query. We determine the version by checking for the "use44SortKeys" flag // in the aggregation request. // TODO (SERVER-43361): This check will be unnecessary after branching for 4.5. - if (expCtx->fromMongos) { + if (expCtx->fromMongos || request.getPassthroughToShard().has_value()) { if (request.getUse44SortKeys()) { // This request originated with 4.4-or-newer mongoS, which can understand the new // sort key format. Note: it's possible that merging will actually occur on a mongoD diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index 03ce73828b3..18ca234437d 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -153,7 +153,7 @@ BSONObj genericTransformForShards(MutableDocument&& cmdForShards, Value(static_cast<long long>(*expCtx->opCtx->getTxnNumber())); } - if (expCtx->inMongos) { + if (expCtx->inMongos || expCtx->forPerShardCursor) { // TODO (SERVER-43361): We set this flag to indicate to the shards that the mongos will be // able to understand change stream sort keys in the new format. After branching for 4.5, // there will only be one sort key format for changes streams, so there will be no need to diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp index b5ea6ce04ed..a7b57ce974d 100644 --- a/src/mongo/s/query/cluster_aggregate.cpp +++ b/src/mongo/s/query/cluster_aggregate.cpp @@ -347,6 +347,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, kSpecificShardOnly: { // Mark expCtx as tailable and await data so CCC behaves accordingly. expCtx->tailableMode = TailableModeEnum::kTailableAndAwaitData; + expCtx->forPerShardCursor = true; uassert(6273801, "per shard cursor pipeline must contain $changeStream", |