summaryrefslogtreecommitdiff
path: root/jstests/noPassthrough/merge_out_on_secondary_fail_on_stepdown.js
blob: ad2010bcf0ff7c7742861b81da5b96fa66a3d34a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
/**
 * Tests that when $out/$merge is run on a secondary and the primary steps down, the command
 * will fail with a `NotWritablePrimary` error.
 *
 * @tags: [
 *   requires_replication,
 * ]
 */
(function() {
"use strict";

load("jstests/libs/fail_point_util.js");  // For configureFailPoint(), wait(), and off().

let replTest = new ReplSetTest({nodes: 2});
replTest.startSet();
replTest.initiate();
const nDocs = 100;
const dbName = jsTestName();
let primary = replTest.getPrimary();
let secondary = replTest.getSecondary();
let primaryDB = primary.getDB(dbName);
secondary.setReadPref("secondary");

const inputCollName = "inputColl";
const outputCollName = "outputColl";
const inputCollPrimary = primaryDB.getCollection(inputCollName);

for (let i = 0; i < nDocs; i++) {
    assert.commandWorked(inputCollPrimary.insert({_id: i, a: i + 1}, {writeConcern: {w: 2}}));
}

replTest.awaitReplication();

/**
 * Given an agg 'writeStage' (an $out or $merge), passed as a string, enables and waits for
 * 'failpoint' to be reached by the aggregate containing 'writeStage' running on a secondary and
 * verifies that the aggregate fails with a 'NotWritablePrimary' error when the primary is forced to
 * step down.
 */
let runTest = function(writeStage, failpoint) {
    let outFn = `
    const sourceDB = db.getSiblingDB("${dbName}");
    let cmdRes = sourceDB.runCommand({
        aggregate: "${inputCollName}",
        pipeline: [
        {$addFields: {b: {$sum: ['$a', '$_id']}}},
         ${writeStage}],
        cursor: {},
        $readPreference: {mode: "secondary"}
    });
    
    assert.commandFailed(cmdRes);
    assert(ErrorCodes.isNotPrimaryError(cmdRes.code), cmdRes);
    `;

    // Enable the fail point to stop the aggregate.
    const failPoint = configureFailPoint(secondary, failpoint);

    // Issue aggregate against the secondary.
    let aggOnSecondary = startParallelShell(outFn, secondary.port);
    // Wait for the aggregate to hit the fail point.
    failPoint.wait();

    // Force current primary to step down.
    assert.commandWorked(primaryDB.adminCommand({"replSetStepDown": 60 * 60, "force": true}));

    failPoint.off();

    // Join the aggregate.
    aggOnSecondary();
};

const mergeFailPoint = "hangWhileBuildingDocumentSourceMergeBatch";
const outFailPoint = "outWaitAfterTempCollectionCreation";

const mergeStage = `{$merge: {
into: "${outputCollName}",
whenMatched: "fail",
whenNotMatched: "insert",
}}`;

runTest(mergeStage, mergeFailPoint);

// Wait for the replica set to select a primary before running the next aggregate command.
replTest.awaitNodesAgreeOnPrimary();
primary = replTest.getPrimary();
primaryDB = primary.getDB(dbName);
secondary = replTest.getSecondary();

const outStage = `{$out: "${outputCollName}"}`;
runTest(outStage, outFailPoint);

replTest.stopSet();
})();