summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2020-04-15 17:21:33 +0100
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-05-04 00:25:05 +0000
commit1abf05a44fbb0d072ebdb3fb133e35e0c17c57e3 (patch)
treedd71153ca5366c86cc1ebeb64b2983e2cb9eb6a0
parent59668f23bc796f1457407e617572b7111fe46aea (diff)
downloadmongo-1abf05a44fbb0d072ebdb3fb133e35e0c17c57e3.tar.gz
SERVER-47581 Set 'useNewUpsert' on $mergeCursors aggregations
(cherry picked from commit dda2fb45cbf624c9270f8fad7f3c5c5a2f0834eb)
-rw-r--r--jstests/multiVersion/agg_merge_upsert_supplied_cluster.js194
-rw-r--r--jstests/multiVersion/libs/multi_cluster.js27
-rw-r--r--src/mongo/s/query/cluster_aggregate.cpp7
3 files changed, 106 insertions, 122 deletions
diff --git a/jstests/multiVersion/agg_merge_upsert_supplied_cluster.js b/jstests/multiVersion/agg_merge_upsert_supplied_cluster.js
index 25d4433b25c..a3a3f4034a1 100644
--- a/jstests/multiVersion/agg_merge_upsert_supplied_cluster.js
+++ b/jstests/multiVersion/agg_merge_upsert_supplied_cluster.js
@@ -47,16 +47,12 @@ function refreshCluster(version, components, singleShard) {
if (singleShard) {
singleShard.upgradeSet({binVersion: version});
+ st.waitUntilStable();
} else {
- st.upgradeCluster(version, components);
+ const upgradeOptions = Object.assign({waitUntilStable: true}, components);
+ st.upgradeCluster(version, upgradeOptions);
}
- // Wait for the config server and shards to become available, and restart mongoS.
- st.configRS.awaitSecondaryNodes();
- st.rs0.awaitSecondaryNodes();
- st.rs1.awaitSecondaryNodes();
- st.restartMongoses();
-
// Having upgraded the cluster, reacquire references to each component.
mongosDB = st.s.getDB(jsTestName());
sourceSharded = mongosDB.source_coll_sharded;
@@ -69,7 +65,7 @@ function refreshCluster(version, components, singleShard) {
// assert.soon timeout. This is necessary because there is a period after one shard's Primary steps
// down during upgrade where a $merge on the other shard may still target the previous Primary.
function tryWhileNotMaster(sourceColl, targetColl, pipeline, options) {
- assert.soon(() => {
+ assert.soonNoExcept(() => {
const aggCmdParams = Object.assign({pipeline: pipeline, cursor: {}}, options);
const cmdRes = sourceColl.runCommand("aggregate", aggCmdParams);
if (cmdRes.ok) {
@@ -106,56 +102,24 @@ for (let i = -20; i < 20; ++i) {
}
// Define a series of test cases covering all $merge distributed planning scenarios.
-const testCases = [
- // $merge from unsharded to unsharded, passthrough from mongoS and write locally.
- {
- sourceColl: () => sourceUnsharded,
- targetColl: () => targetUnsharded,
- preMergePipeline: [],
- allowDiskUse: false,
- disableExchange: false
- },
- // $merge from unsharded to sharded, passthrough from mongoS and write cross-shard.
- {
- sourceColl: () => sourceUnsharded,
- targetColl: () => targetSharded,
- preMergePipeline: [],
- allowDiskUse: false,
- disableExchange: false
- },
- // $merge from sharded to sharded, writes from shard to shard in parallel.
- {
- sourceColl: () => sourceSharded,
- targetColl: () => targetSharded,
- preMergePipeline: [],
- allowDiskUse: false,
- disableExchange: false
- },
- // $group with exchange, sends input documents to relevant shard and $merges locally.
- {
- sourceColl: () => sourceSharded,
- targetColl: () => targetSharded,
- preMergePipeline: [{$group: {_id: "$_id"}}],
- allowDiskUse: false,
- disableExchange: false
- },
- // $group, exchange prohibited, $merge is executed on mongoS.
- {
- sourceColl: () => sourceSharded,
- targetColl: () => targetSharded,
- preMergePipeline: [{$group: {_id: "$_id"}}],
- allowDiskUse: false,
- disableExchange: true
- },
- // $group, exchange prohibited, $merge sent to single shard and writes cross-shard.
- {
- sourceColl: () => sourceSharded,
- targetColl: () => targetSharded,
- preMergePipeline: [{$group: {_id: "$_id"}}],
- allowDiskUse: true,
- disableExchange: true
- },
-];
+const testCases = [];
+for (let sourceColl of [sourceSharded, sourceUnsharded]) {
+ for (let targetColl of [targetSharded, targetUnsharded]) {
+ for (let preMergePipeline of [[], [{$group: {_id: "$_id"}}]]) {
+ for (let disableExchange of [false, true]) {
+ for (let allowDiskUse of [false, true]) {
+ testCases.push({
+ sourceColl: () => sourceColl,
+ targetColl: () => targetColl,
+ preMergePipeline: preMergePipeline,
+ allowDiskUse: allowDiskUse,
+ disableExchange: disableExchange
+ });
+ }
+ }
+ }
+ }
+}
// The 'whenMatched' pipeline to apply as part of the $merge. When the old 4.2.1 behaviour is in
// effect, output documents will all have an _id field and the field added by this pipeline.
@@ -166,61 +130,71 @@ const expectedOldBehaviourOutput = Array.from(sourceSharded.find().toArray(), (d
return {_id: doc._id, docWasGeneratedFromWhenMatchedPipeline: true};
});
-for (let testCaseNum = 0; testCaseNum < testCases.length; ++testCaseNum) {
- // Perform initial test-case setup. Disable the exchange optimization if appropriate.
- const testCase = testCases[testCaseNum];
- assert.commandWorked(mongosDB.adminCommand(
- {setParameter: 1, internalQueryDisableExchange: testCase.disableExchange}));
-
- // Construct the options object that will be supplied along with the pipeline.
- const aggOptions = {allowDiskUse: testCase.allowDiskUse};
-
- // Construct the final pipeline by appending $merge to the the testCase's preMergePipeline.
- const finalPipeline = testCase.preMergePipeline.concat([{
- $merge: {
- into: testCase.targetColl().getName(),
- whenMatched: mergePipe,
- whenNotMatched: "insert"
+// Generate the array of output documents we expect to see under the new upsert behaviour.
+const expectedNewBehaviourOutput = sourceSharded.find().toArray();
+
+// Run all test cases, validating that the specified output is produced in each case.
+function runAllTestCases(expectedOutput) {
+ for (let testCase of testCases) {
+ // Print out the current testcase in the logs.
+ jsTestLog(tojson(Object.assign({}, testCase, {
+ sourceColl: testCase.sourceColl().getName(),
+ targetColl: testCase.targetColl().getName()
+ })));
+
+ // Perform initial test-case setup. Disable the exchange optimization if appropriate.
+ assert.commandWorked(mongosDB.adminCommand(
+ {setParameter: 1, internalQueryDisableExchange: testCase.disableExchange}));
+
+ // Construct the options object that will be supplied along with the pipeline.
+ const aggOptions = {allowDiskUse: testCase.allowDiskUse};
+
+ // Construct the final pipeline by appending $merge to the the testCase's preMergePipeline.
+ const finalPipeline = testCase.preMergePipeline.concat([{
+ $merge: {
+ into: testCase.targetColl().getName(),
+ whenMatched: mergePipe,
+ whenNotMatched: "insert"
+ }
+ }]);
+
+ // If exchange is disabled for this test, make sure that we don't see a plan that uses it.
+ if (testCase.disableExchange) {
+ const explainOut = testCase.sourceColl().explain().aggregate(finalPipeline, aggOptions);
+ assert.neq(explainOut.mergeType, "exchange");
}
- }]);
-
- // Run a $merge with the whole cluster on 'preBackport42Version' and confirm that the output
- // documents are produced using the old upsert behaviour.
- tryWhileNotMaster(testCase.sourceColl(), testCase.targetColl(), finalPipeline, aggOptions);
- assert.sameMembers(testCase.targetColl().find().toArray(), expectedOldBehaviourOutput);
- assert.commandWorked(testCase.targetColl().remove({}));
-
- // Upgrade a single shard to latest but leave the mongoS on 'preBackport42Version'. The upgraded
- // shard continues to produce upsert requests that are compatible with the pre-backport shards.
- refreshCluster(latestVersion, null, st.rs1);
- tryWhileNotMaster(testCase.sourceColl(), testCase.targetColl(), finalPipeline, aggOptions);
- assert.sameMembers(testCase.targetColl().find().toArray(), expectedOldBehaviourOutput);
- assert.commandWorked(testCase.targetColl().remove({}));
-
- // Upgrade the configs and the remaining shard to latest but leave mongoS on pre-backport 4.2.
- // The shards continue to produce upsert requests that use the pre-backport behaviour. This is
- // to ensure that the pipeline produces the same behaviour regardless of whether $merge is
- // pushed down to the shards or run on the mongoS itself.
- refreshCluster(latestVersion, {upgradeShards: true, upgradeConfigs: true});
- tryWhileNotMaster(testCase.sourceColl(), testCase.targetColl(), finalPipeline, aggOptions);
- assert.sameMembers(testCase.targetColl().find().toArray(), expectedOldBehaviourOutput);
- assert.commandWorked(testCase.targetColl().remove({}));
-
- // Upgrade the mongoS to latest. We should now see that the $merge adopts the new behaviour, and
- // inserts the exact source document rather than generating one from the whenMatched pipeline.
- refreshCluster(latestVersion, {upgradeMongos: true});
- tryWhileNotMaster(testCase.sourceColl(), testCase.targetColl(), finalPipeline, aggOptions);
- assert.sameMembers(testCase.targetColl().find().toArray(),
- testCase.sourceColl().find().toArray());
- assert.commandWorked(testCase.targetColl().remove({}));
-
- // Finally, downgrade the cluster to pre-backport 4.2 in preparation for the next test case. No
- // need to do this after the final test, as it will simply extend the runtime for no reason.
- if (testCaseNum < testCases.length - 1) {
- refreshCluster(preBackport42Version,
- {upgradeMongos: true, upgradeShards: true, upgradeConfigs: true});
+
+ // Run the test case and confirm that the target collection matches the expected output.
+ tryWhileNotMaster(testCase.sourceColl(), testCase.targetColl(), finalPipeline, aggOptions);
+ assert.sameMembers(testCase.targetColl().find().toArray(), expectedOutput);
+ assert.commandWorked(testCase.targetColl().remove({}));
}
}
+// Run all $merge test-cases with the whole cluster on 'preBackport42Version' and confirm that the
+// output documents are produced using the old upsert behaviour.
+jsTestLog(`UPGRADE: Running tests with whole cluster on ${preBackport42Version}`);
+runAllTestCases(expectedOldBehaviourOutput);
+
+// Upgrade a single shard to latest but leave the mongoS on 'preBackport42Version'. The upgraded
+// shard continues to produce upsert requests that are compatible with the pre-backport shards.
+jsTestLog(`UPGRADE: rs1 to ${latestVersion}; rs0, configs & mongos on ${preBackport42Version}`);
+refreshCluster(latestVersion, null, st.rs1);
+runAllTestCases(expectedOldBehaviourOutput);
+
+// Upgrade the configs and the remaining shard to latest but leave mongoS on pre-backport 4.2. The
+// shards continue to produce upsert requests that use the pre-backport behaviour. This is to ensure
+// that the pipeline produces the same behaviour regardless of whether $merge is pushed down to the
+// shards or run on the mongoS itself.
+jsTestLog(`UPGRADE: shards and configs to ${latestVersion}; mongos on ${preBackport42Version}`);
+refreshCluster(latestVersion, {upgradeShards: true, upgradeConfigs: true});
+runAllTestCases(expectedOldBehaviourOutput);
+
+// Upgrade the mongoS to latest. We should now see that the $merge adopts the new behaviour, and
+// inserts the exact source document rather than generating one from the whenMatched pipeline.
+jsTestLog(`UPGRADE: mongos to ${latestVersion}; whole cluster now on ${latestVersion}`);
+refreshCluster(latestVersion, {upgradeMongos: true});
+runAllTestCases(expectedNewBehaviourOutput);
+
st.stop();
})(); \ No newline at end of file
diff --git a/jstests/multiVersion/libs/multi_cluster.js b/jstests/multiVersion/libs/multi_cluster.js
index c1c8074986a..5244fbe17a8 100644
--- a/jstests/multiVersion/libs/multi_cluster.js
+++ b/jstests/multiVersion/libs/multi_cluster.js
@@ -93,19 +93,22 @@ ShardingTest.prototype.upgradeCluster = function(binVersion, options) {
}
if (options.waitUntilStable) {
- // Wait for the config server and shards to become available.
- this.configRS.awaitSecondaryNodes();
- let shardPrimaries = [];
- for (let rs of this._rs) {
- rs.test.awaitSecondaryNodes();
- shardPrimaries.push(rs.test.getPrimary());
- }
+ this.waitUntilStable();
+ }
+};
- // Wait for the ReplicaSetMonitor on mongoS and each shard to reflect the state of all
- // shards.
- for (let client of [...this._mongos, ...shardPrimaries]) {
- awaitRSClientHosts(client, shardPrimaries, {ok: true, ismaster: true});
- }
+ShardingTest.prototype.waitUntilStable = function() {
+ // Wait for the config server and shards to become available.
+ this.configRS.awaitSecondaryNodes();
+ let shardPrimaries = [];
+ for (let rs of this._rs) {
+ rs.test.awaitSecondaryNodes();
+ shardPrimaries.push(rs.test.getPrimary());
+ }
+
+ // Wait for the ReplicaSetMonitor on mongoS and each shard to reflect the state of all shards.
+ for (let client of [...this._mongos, ...shardPrimaries]) {
+ awaitRSClientHosts(client, shardPrimaries, {ok: true, ismaster: true});
}
};
diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp
index 1343bdd56ae..e06d49d3850 100644
--- a/src/mongo/s/query/cluster_aggregate.cpp
+++ b/src/mongo/s/query/cluster_aggregate.cpp
@@ -125,6 +125,13 @@ BSONObj createCommandForMergingShard(const AggregationRequest& request,
: Value(Document{CollationSpec::kSimpleSpec});
}
+ if (mergeCtx->inMongos) {
+ // TODO SERVER-44884: We set this flag to indicate that the shards should always use the new
+ // upsert mechanism when executing relevant $merge modes. After branching for 4.5, supported
+ // upgrade versions will all use the new mechanism, and we can remove this flag.
+ mergeCmd[AggregationRequest::kUseNewUpsert] = Value(true);
+ }
+
const auto txnRouter = TransactionRouter::get(mergeCtx->opCtx);
if (txnRouter && mergingShardContributesData) {
// Don't include a readConcern since we can only include read concerns on the _first_