1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
|
/**
* Tests that $merge with {whenMatched: [], whenNotMatched: 'insert'} is handled correctly during
* upgrade from and downgrade to a pre-backport version of 4.2 on a single replica set.
*/
(function() {
"use strict";
load("jstests/multiVersion/libs/multi_rs.js"); // For upgradeSet.
load("jstests/replsets/rslib.js"); // For startSetIfSupportsReadMajority.
const preBackport42Version = "4.2.1";
const latestVersion = "latest";
const rst = new ReplSetTest({
nodes: 3,
nodeOptions: {binVersion: preBackport42Version},
});
if (!startSetIfSupportsReadMajority(rst)) {
jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
rst.stopSet();
return;
}
rst.initiate();
// Obtain references to the test database and create the test collection.
let testDB = rst.getPrimary().getDB(jsTestName());
let sourceColl = testDB.source_coll;
let targetColl = testDB.target_coll;
// Up- or downgrades the replset and then refreshes our references to the test collection.
function refreshReplSet(version, secondariesOnly) {
// Upgrade the set and wait for it to become available again.
if (secondariesOnly) {
rst.upgradeSecondaries(rst.getPrimary(), {binVersion: version});
} else {
rst.upgradeSet({binVersion: version});
}
rst.awaitSecondaryNodes();
// Having upgraded the set, reacquire references to the db and collection.
testDB = rst.getPrimary().getDB(jsTestName());
sourceColl = testDB.source_coll;
targetColl = testDB.target_coll;
}
// Insert a set of test data.
for (let i = -20; i < 20; ++i) {
assert.commandWorked(sourceColl.insert({_id: i}));
}
// The 'whenMatched' pipeline to apply as part of the $merge. When the old 4.2.1 behaviour is in
// effect, output documents will all have an _id field and the field added by this pipeline.
const mergePipe = [{$addFields: {docWasGeneratedFromWhenMatchedPipeline: true}}];
// Generate the array of output documents we expect to see under the old upsert behaviour.
const expectedOldBehaviourOutput = Array.from(sourceColl.find().toArray(), (doc) => {
return {_id: doc._id, docWasGeneratedFromWhenMatchedPipeline: true};
});
// The pipeline to run for each test. Results in different output depending on upsert mode used.
const finalPipeline =
[{$merge: {into: targetColl.getName(), whenMatched: mergePipe, whenNotMatched: "insert"}}];
// Run a $merge with the whole cluster on 'preBackport42Version' and confirm that the output
// documents are produced using the old upsert behaviour.
sourceColl.aggregate(finalPipeline);
assert.sameMembers(targetColl.find().toArray(), expectedOldBehaviourOutput);
assert.commandWorked(targetColl.remove({}));
// Upgrade the Secondaries but leave the Primary on 'preBackport42Version'. The set continues to
// produce output documents using the old upsert behaviour.
refreshReplSet(latestVersion, true);
sourceColl.aggregate(finalPipeline);
assert.sameMembers(targetColl.find().toArray(), expectedOldBehaviourOutput);
assert.commandWorked(targetColl.remove({}));
// Since we cannot run $merge on a Secondary, we cannot end up in a situation where an upgraded
// Secondary issues an 'upsertSupplied' request to the pre-backport Primary.
assert.throws(
() => rst.getSecondaries()[0].getCollection(sourceColl.getFullName()).aggregate(finalPipeline));
// Upgrade the Primary to latest. We should now see that the $merge adopts the new behaviour, and
// inserts the exact source document rather than generating one from the whenMatched pipeline.
refreshReplSet(latestVersion);
sourceColl.aggregate(finalPipeline);
assert.sameMembers(targetColl.find().toArray(), sourceColl.find().toArray());
assert.commandWorked(targetColl.remove({}));
rst.stopSet();
})();
|