diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2020-04-15 17:21:33 +0100 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-05-04 00:25:05 +0000 |
commit | 1abf05a44fbb0d072ebdb3fb133e35e0c17c57e3 (patch) | |
tree | dd71153ca5366c86cc1ebeb64b2983e2cb9eb6a0 | |
parent | 59668f23bc796f1457407e617572b7111fe46aea (diff) | |
download | mongo-1abf05a44fbb0d072ebdb3fb133e35e0c17c57e3.tar.gz |
SERVER-47581 Set 'useNewUpsert' on $mergeCursors aggregations
(cherry picked from commit dda2fb45cbf624c9270f8fad7f3c5c5a2f0834eb)
-rw-r--r-- | jstests/multiVersion/agg_merge_upsert_supplied_cluster.js | 194 | ||||
-rw-r--r-- | jstests/multiVersion/libs/multi_cluster.js | 27 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_aggregate.cpp | 7 |
3 files changed, 106 insertions, 122 deletions
diff --git a/jstests/multiVersion/agg_merge_upsert_supplied_cluster.js b/jstests/multiVersion/agg_merge_upsert_supplied_cluster.js index 25d4433b25c..a3a3f4034a1 100644 --- a/jstests/multiVersion/agg_merge_upsert_supplied_cluster.js +++ b/jstests/multiVersion/agg_merge_upsert_supplied_cluster.js @@ -47,16 +47,12 @@ function refreshCluster(version, components, singleShard) { if (singleShard) { singleShard.upgradeSet({binVersion: version}); + st.waitUntilStable(); } else { - st.upgradeCluster(version, components); + const upgradeOptions = Object.assign({waitUntilStable: true}, components); + st.upgradeCluster(version, upgradeOptions); } - // Wait for the config server and shards to become available, and restart mongoS. - st.configRS.awaitSecondaryNodes(); - st.rs0.awaitSecondaryNodes(); - st.rs1.awaitSecondaryNodes(); - st.restartMongoses(); - // Having upgraded the cluster, reacquire references to each component. mongosDB = st.s.getDB(jsTestName()); sourceSharded = mongosDB.source_coll_sharded; @@ -69,7 +65,7 @@ function refreshCluster(version, components, singleShard) { // assert.soon timeout. This is necessary because there is a period after one shard's Primary steps // down during upgrade where a $merge on the other shard may still target the previous Primary. function tryWhileNotMaster(sourceColl, targetColl, pipeline, options) { - assert.soon(() => { + assert.soonNoExcept(() => { const aggCmdParams = Object.assign({pipeline: pipeline, cursor: {}}, options); const cmdRes = sourceColl.runCommand("aggregate", aggCmdParams); if (cmdRes.ok) { @@ -106,56 +102,24 @@ for (let i = -20; i < 20; ++i) { } // Define a series of test cases covering all $merge distributed planning scenarios. -const testCases = [ - // $merge from unsharded to unsharded, passthrough from mongoS and write locally. - { - sourceColl: () => sourceUnsharded, - targetColl: () => targetUnsharded, - preMergePipeline: [], - allowDiskUse: false, - disableExchange: false - }, - // $merge from unsharded to sharded, passthrough from mongoS and write cross-shard. - { - sourceColl: () => sourceUnsharded, - targetColl: () => targetSharded, - preMergePipeline: [], - allowDiskUse: false, - disableExchange: false - }, - // $merge from sharded to sharded, writes from shard to shard in parallel. - { - sourceColl: () => sourceSharded, - targetColl: () => targetSharded, - preMergePipeline: [], - allowDiskUse: false, - disableExchange: false - }, - // $group with exchange, sends input documents to relevant shard and $merges locally. - { - sourceColl: () => sourceSharded, - targetColl: () => targetSharded, - preMergePipeline: [{$group: {_id: "$_id"}}], - allowDiskUse: false, - disableExchange: false - }, - // $group, exchange prohibited, $merge is executed on mongoS. - { - sourceColl: () => sourceSharded, - targetColl: () => targetSharded, - preMergePipeline: [{$group: {_id: "$_id"}}], - allowDiskUse: false, - disableExchange: true - }, - // $group, exchange prohibited, $merge sent to single shard and writes cross-shard. - { - sourceColl: () => sourceSharded, - targetColl: () => targetSharded, - preMergePipeline: [{$group: {_id: "$_id"}}], - allowDiskUse: true, - disableExchange: true - }, -]; +const testCases = []; +for (let sourceColl of [sourceSharded, sourceUnsharded]) { + for (let targetColl of [targetSharded, targetUnsharded]) { + for (let preMergePipeline of [[], [{$group: {_id: "$_id"}}]]) { + for (let disableExchange of [false, true]) { + for (let allowDiskUse of [false, true]) { + testCases.push({ + sourceColl: () => sourceColl, + targetColl: () => targetColl, + preMergePipeline: preMergePipeline, + allowDiskUse: allowDiskUse, + disableExchange: disableExchange + }); + } + } + } + } +} // The 'whenMatched' pipeline to apply as part of the $merge. When the old 4.2.1 behaviour is in // effect, output documents will all have an _id field and the field added by this pipeline. @@ -166,61 +130,71 @@ const expectedOldBehaviourOutput = Array.from(sourceSharded.find().toArray(), (d return {_id: doc._id, docWasGeneratedFromWhenMatchedPipeline: true}; }); -for (let testCaseNum = 0; testCaseNum < testCases.length; ++testCaseNum) { - // Perform initial test-case setup. Disable the exchange optimization if appropriate. - const testCase = testCases[testCaseNum]; - assert.commandWorked(mongosDB.adminCommand( - {setParameter: 1, internalQueryDisableExchange: testCase.disableExchange})); - - // Construct the options object that will be supplied along with the pipeline. - const aggOptions = {allowDiskUse: testCase.allowDiskUse}; - - // Construct the final pipeline by appending $merge to the the testCase's preMergePipeline. - const finalPipeline = testCase.preMergePipeline.concat([{ - $merge: { - into: testCase.targetColl().getName(), - whenMatched: mergePipe, - whenNotMatched: "insert" +// Generate the array of output documents we expect to see under the new upsert behaviour. +const expectedNewBehaviourOutput = sourceSharded.find().toArray(); + +// Run all test cases, validating that the specified output is produced in each case. +function runAllTestCases(expectedOutput) { + for (let testCase of testCases) { + // Print out the current testcase in the logs. + jsTestLog(tojson(Object.assign({}, testCase, { + sourceColl: testCase.sourceColl().getName(), + targetColl: testCase.targetColl().getName() + }))); + + // Perform initial test-case setup. Disable the exchange optimization if appropriate. + assert.commandWorked(mongosDB.adminCommand( + {setParameter: 1, internalQueryDisableExchange: testCase.disableExchange})); + + // Construct the options object that will be supplied along with the pipeline. + const aggOptions = {allowDiskUse: testCase.allowDiskUse}; + + // Construct the final pipeline by appending $merge to the the testCase's preMergePipeline. + const finalPipeline = testCase.preMergePipeline.concat([{ + $merge: { + into: testCase.targetColl().getName(), + whenMatched: mergePipe, + whenNotMatched: "insert" + } + }]); + + // If exchange is disabled for this test, make sure that we don't see a plan that uses it. + if (testCase.disableExchange) { + const explainOut = testCase.sourceColl().explain().aggregate(finalPipeline, aggOptions); + assert.neq(explainOut.mergeType, "exchange"); } - }]); - - // Run a $merge with the whole cluster on 'preBackport42Version' and confirm that the output - // documents are produced using the old upsert behaviour. - tryWhileNotMaster(testCase.sourceColl(), testCase.targetColl(), finalPipeline, aggOptions); - assert.sameMembers(testCase.targetColl().find().toArray(), expectedOldBehaviourOutput); - assert.commandWorked(testCase.targetColl().remove({})); - - // Upgrade a single shard to latest but leave the mongoS on 'preBackport42Version'. The upgraded - // shard continues to produce upsert requests that are compatible with the pre-backport shards. - refreshCluster(latestVersion, null, st.rs1); - tryWhileNotMaster(testCase.sourceColl(), testCase.targetColl(), finalPipeline, aggOptions); - assert.sameMembers(testCase.targetColl().find().toArray(), expectedOldBehaviourOutput); - assert.commandWorked(testCase.targetColl().remove({})); - - // Upgrade the configs and the remaining shard to latest but leave mongoS on pre-backport 4.2. - // The shards continue to produce upsert requests that use the pre-backport behaviour. This is - // to ensure that the pipeline produces the same behaviour regardless of whether $merge is - // pushed down to the shards or run on the mongoS itself. - refreshCluster(latestVersion, {upgradeShards: true, upgradeConfigs: true}); - tryWhileNotMaster(testCase.sourceColl(), testCase.targetColl(), finalPipeline, aggOptions); - assert.sameMembers(testCase.targetColl().find().toArray(), expectedOldBehaviourOutput); - assert.commandWorked(testCase.targetColl().remove({})); - - // Upgrade the mongoS to latest. We should now see that the $merge adopts the new behaviour, and - // inserts the exact source document rather than generating one from the whenMatched pipeline. - refreshCluster(latestVersion, {upgradeMongos: true}); - tryWhileNotMaster(testCase.sourceColl(), testCase.targetColl(), finalPipeline, aggOptions); - assert.sameMembers(testCase.targetColl().find().toArray(), - testCase.sourceColl().find().toArray()); - assert.commandWorked(testCase.targetColl().remove({})); - - // Finally, downgrade the cluster to pre-backport 4.2 in preparation for the next test case. No - // need to do this after the final test, as it will simply extend the runtime for no reason. - if (testCaseNum < testCases.length - 1) { - refreshCluster(preBackport42Version, - {upgradeMongos: true, upgradeShards: true, upgradeConfigs: true}); + + // Run the test case and confirm that the target collection matches the expected output. + tryWhileNotMaster(testCase.sourceColl(), testCase.targetColl(), finalPipeline, aggOptions); + assert.sameMembers(testCase.targetColl().find().toArray(), expectedOutput); + assert.commandWorked(testCase.targetColl().remove({})); } } +// Run all $merge test-cases with the whole cluster on 'preBackport42Version' and confirm that the +// output documents are produced using the old upsert behaviour. +jsTestLog(`UPGRADE: Running tests with whole cluster on ${preBackport42Version}`); +runAllTestCases(expectedOldBehaviourOutput); + +// Upgrade a single shard to latest but leave the mongoS on 'preBackport42Version'. The upgraded +// shard continues to produce upsert requests that are compatible with the pre-backport shards. +jsTestLog(`UPGRADE: rs1 to ${latestVersion}; rs0, configs & mongos on ${preBackport42Version}`); +refreshCluster(latestVersion, null, st.rs1); +runAllTestCases(expectedOldBehaviourOutput); + +// Upgrade the configs and the remaining shard to latest but leave mongoS on pre-backport 4.2. The +// shards continue to produce upsert requests that use the pre-backport behaviour. This is to ensure +// that the pipeline produces the same behaviour regardless of whether $merge is pushed down to the +// shards or run on the mongoS itself. +jsTestLog(`UPGRADE: shards and configs to ${latestVersion}; mongos on ${preBackport42Version}`); +refreshCluster(latestVersion, {upgradeShards: true, upgradeConfigs: true}); +runAllTestCases(expectedOldBehaviourOutput); + +// Upgrade the mongoS to latest. We should now see that the $merge adopts the new behaviour, and +// inserts the exact source document rather than generating one from the whenMatched pipeline. +jsTestLog(`UPGRADE: mongos to ${latestVersion}; whole cluster now on ${latestVersion}`); +refreshCluster(latestVersion, {upgradeMongos: true}); +runAllTestCases(expectedNewBehaviourOutput); + st.stop(); })();
\ No newline at end of file diff --git a/jstests/multiVersion/libs/multi_cluster.js b/jstests/multiVersion/libs/multi_cluster.js index c1c8074986a..5244fbe17a8 100644 --- a/jstests/multiVersion/libs/multi_cluster.js +++ b/jstests/multiVersion/libs/multi_cluster.js @@ -93,19 +93,22 @@ ShardingTest.prototype.upgradeCluster = function(binVersion, options) { } if (options.waitUntilStable) { - // Wait for the config server and shards to become available. - this.configRS.awaitSecondaryNodes(); - let shardPrimaries = []; - for (let rs of this._rs) { - rs.test.awaitSecondaryNodes(); - shardPrimaries.push(rs.test.getPrimary()); - } + this.waitUntilStable(); + } +}; - // Wait for the ReplicaSetMonitor on mongoS and each shard to reflect the state of all - // shards. - for (let client of [...this._mongos, ...shardPrimaries]) { - awaitRSClientHosts(client, shardPrimaries, {ok: true, ismaster: true}); - } +ShardingTest.prototype.waitUntilStable = function() { + // Wait for the config server and shards to become available. + this.configRS.awaitSecondaryNodes(); + let shardPrimaries = []; + for (let rs of this._rs) { + rs.test.awaitSecondaryNodes(); + shardPrimaries.push(rs.test.getPrimary()); + } + + // Wait for the ReplicaSetMonitor on mongoS and each shard to reflect the state of all shards. + for (let client of [...this._mongos, ...shardPrimaries]) { + awaitRSClientHosts(client, shardPrimaries, {ok: true, ismaster: true}); } }; diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp index 1343bdd56ae..e06d49d3850 100644 --- a/src/mongo/s/query/cluster_aggregate.cpp +++ b/src/mongo/s/query/cluster_aggregate.cpp @@ -125,6 +125,13 @@ BSONObj createCommandForMergingShard(const AggregationRequest& request, : Value(Document{CollationSpec::kSimpleSpec}); } + if (mergeCtx->inMongos) { + // TODO SERVER-44884: We set this flag to indicate that the shards should always use the new + // upsert mechanism when executing relevant $merge modes. After branching for 4.5, supported + // upgrade versions will all use the new mechanism, and we can remove this flag. + mergeCmd[AggregationRequest::kUseNewUpsert] = Value(true); + } + const auto txnRouter = TransactionRouter::get(mergeCtx->opCtx); if (txnRouter && mergingShardContributesData) { // Don't include a readConcern since we can only include read concerns on the _first_ |