summaryrefslogtreecommitdiff
path: root/jstests/multiVersion/agg_merge_upsert_supplied_replset.js
blob: 2ca233e8e48e2a23bac841c54943628b81186c55 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
/**
 * Tests that $merge with {whenMatched: [], whenNotMatched: 'insert'} is handled correctly during
 * upgrade from and downgrade to a pre-backport version of 4.2 on a single replica set.
 */
(function() {
"use strict";

load("jstests/multiVersion/libs/multi_rs.js");  // For upgradeSet.
load("jstests/replsets/rslib.js");              // For startSetIfSupportsReadMajority.

const preBackport42Version = "4.2.1";
const latestVersion = "latest";

const rst = new ReplSetTest({
    nodes: 3,
    nodeOptions: {binVersion: preBackport42Version},
});
if (!startSetIfSupportsReadMajority(rst)) {
    jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
    rst.stopSet();
    return;
}
rst.initiate();

// Obtain references to the test database and create the test collection.
let testDB = rst.getPrimary().getDB(jsTestName());
let sourceColl = testDB.source_coll;
let targetColl = testDB.target_coll;

// Up- or downgrades the replset and then refreshes our references to the test collection.
function refreshReplSet(version, secondariesOnly) {
    // Upgrade the set and wait for it to become available again.
    if (secondariesOnly) {
        rst.upgradeSecondaries(rst.getPrimary(), {binVersion: version});
    } else {
        rst.upgradeSet({binVersion: version});
    }
    rst.awaitSecondaryNodes();

    // Having upgraded the set, reacquire references to the db and collection.
    testDB = rst.getPrimary().getDB(jsTestName());
    sourceColl = testDB.source_coll;
    targetColl = testDB.target_coll;
}

// Insert a set of test data.
for (let i = -20; i < 20; ++i) {
    assert.commandWorked(sourceColl.insert({_id: i}));
}

// The 'whenMatched' pipeline to apply as part of the $merge. When the old 4.2.1 behaviour is in
// effect, output documents will all have an _id field and the field added by this pipeline.
const mergePipe = [{$addFields: {docWasGeneratedFromWhenMatchedPipeline: true}}];

// Generate the array of output documents we expect to see under the old upsert behaviour.
const expectedOldBehaviourOutput = Array.from(sourceColl.find().toArray(), (doc) => {
    return {_id: doc._id, docWasGeneratedFromWhenMatchedPipeline: true};
});

// The pipeline to run for each test. Results in different output depending on upsert mode used.
const finalPipeline =
    [{$merge: {into: targetColl.getName(), whenMatched: mergePipe, whenNotMatched: "insert"}}];

// Run a $merge with the whole cluster on 'preBackport42Version' and confirm that the output
// documents are produced using the old upsert behaviour.
sourceColl.aggregate(finalPipeline);
assert.sameMembers(targetColl.find().toArray(), expectedOldBehaviourOutput);
assert.commandWorked(targetColl.remove({}));

// Upgrade the Secondaries but leave the Primary on 'preBackport42Version'. The set continues to
// produce output documents using the old upsert behaviour.
refreshReplSet(latestVersion, true);
sourceColl.aggregate(finalPipeline);
assert.sameMembers(targetColl.find().toArray(), expectedOldBehaviourOutput);
assert.commandWorked(targetColl.remove({}));

// Since we cannot run $merge on a Secondary, we cannot end up in a situation where an upgraded
// Secondary issues an 'upsertSupplied' request to the pre-backport Primary.
assert.throws(
    () => rst.getSecondaries()[0].getCollection(sourceColl.getFullName()).aggregate(finalPipeline));

// Upgrade the Primary to latest. We should now see that the $merge adopts the new behaviour, and
// inserts the exact source document rather than generating one from the whenMatched pipeline.
refreshReplSet(latestVersion);
sourceColl.aggregate(finalPipeline);
assert.sameMembers(targetColl.find().toArray(), sourceColl.find().toArray());
assert.commandWorked(targetColl.remove({}));

rst.stopSet();
})();