1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
|
/**
* Tests that when $out/$merge is run on a secondary and the primary steps down, the command
* will fail with a `NotWritablePrimary` error.
*
* @tags: [
* requires_replication,
* ]
*/
(function() {
"use strict";
load("jstests/libs/fail_point_util.js"); // For configureFailPoint(), wait(), and off().
let replTest = new ReplSetTest({nodes: 2});
replTest.startSet();
replTest.initiate();
const nDocs = 100;
const dbName = jsTestName();
let primary = replTest.getPrimary();
let secondary = replTest.getSecondary();
let primaryDB = primary.getDB(dbName);
secondary.setReadPref("secondary");
const inputCollName = "inputColl";
const outputCollName = "outputColl";
const inputCollPrimary = primaryDB.getCollection(inputCollName);
for (let i = 0; i < nDocs; i++) {
assert.commandWorked(inputCollPrimary.insert({_id: i, a: i + 1}, {writeConcern: {w: 2}}));
}
replTest.awaitReplication();
/**
* Given an agg 'writeStage' (an $out or $merge), passed as a string, enables and waits for
* 'failpoint' to be reached by the aggregate containing 'writeStage' running on a secondary and
* verifies that the aggregate fails with a 'NotWritablePrimary' error when the primary is forced to
* step down.
*/
let runTest = function(writeStage, failpoint) {
let outFn = `
const sourceDB = db.getSiblingDB("${dbName}");
let cmdRes = sourceDB.runCommand({
aggregate: "${inputCollName}",
pipeline: [
{$addFields: {b: {$sum: ['$a', '$_id']}}},
${writeStage}],
cursor: {},
$readPreference: {mode: "secondary"}
});
assert.commandFailed(cmdRes);
assert(ErrorCodes.isNotPrimaryError(cmdRes.code), cmdRes);
`;
// Enable the fail point to stop the aggregate.
const failPoint = configureFailPoint(secondary, failpoint);
// Issue aggregate against the secondary.
let aggOnSecondary = startParallelShell(outFn, secondary.port);
// Wait for the aggregate to hit the fail point.
failPoint.wait();
// Force current primary to step down.
assert.commandWorked(primaryDB.adminCommand({"replSetStepDown": 60 * 60, "force": true}));
failPoint.off();
// Join the aggregate.
aggOnSecondary();
};
const mergeFailPoint = "hangWhileBuildingDocumentSourceMergeBatch";
const outFailPoint = "outWaitAfterTempCollectionCreation";
const mergeStage = `{$merge: {
into: "${outputCollName}",
whenMatched: "fail",
whenNotMatched: "insert",
}}`;
runTest(mergeStage, mergeFailPoint);
// Wait for the replica set to select a primary before running the next aggregate command.
replTest.awaitNodesAgreeOnPrimary();
primary = replTest.getPrimary();
primaryDB = primary.getDB(dbName);
secondary = replTest.getSecondary();
const outStage = `{$out: "${outputCollName}"}`;
runTest(outStage, outFailPoint);
replTest.stopSet();
})();
|