1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
/**
* This is a library for testing secondary reads against a replica set
*/
"use strict";
load("jstests/replsets/rslib.js");
function SecondaryReadsTest(name = "secondary_reads_test") {
let rst = performStandardSetup();
let dbName = name;
let primary = rst.getPrimary();
let primaryDB = primary.getDB(dbName);
let secondary = rst.getSecondary();
let secondaryDB = secondary.getDB(dbName);
secondaryDB.getMongo().setSlaveOk();
let readers = [];
let signalColl = "signalColl";
const testDoneId = "testDone";
/**
* Return an instance of ReplSetTest initialized with a standard
* two-node replica set running with the latest version.
*/
function performStandardSetup() {
let replSet = new ReplSetTest({name, nodes: 2});
replSet.startSet();
const nodes = replSet.nodeList();
replSet.initiate({
_id: name,
members: [{_id: 0, host: nodes[0]}, {_id: 1, host: nodes[1], priority: 0, votes: 0}]
});
return replSet;
}
this.startSecondaryReaders = function(nReaders, readFn) {
let read = function() {
db.getMongo().setSlaveOk();
db = db.getSiblingDB(TestData.dbName);
while (true) {
readFn();
let signalDoc = db.getCollection(TestData.signalColl)
.find({_id: TestData.testDoneId})
.itcount();
if (signalDoc != 0) {
print("signal doc found. quitting...");
quit();
}
}
};
TestData.dbName = dbName;
TestData.signalColl = signalColl;
TestData.testDoneId = testDoneId;
let serializedFn = "let readFn = " + readFn.toString() + ";(" + read.toString() + ")();";
for (let i = 0; i < nReaders; i++) {
readers.push(startParallelShell(serializedFn, secondary.port));
print("reader " + readers.length + " started");
}
};
let failPoint = "pauseBatchApplicationBeforeCompletion";
// This returns a function that should be called once after performing a replicated write on a
// primary. The write will start a batch on a secondary and immediately pause before completion.
// The returned function will return once the batch has reached the point where it has applied
// but not updated the last applied optime.
this.pauseSecondaryBatchApplication = function() {
clearRawMongoProgramOutput();
assert.commandWorked(
secondaryDB.adminCommand({configureFailPoint: failPoint, mode: "alwaysOn"}));
return function() {
assert.soon(function() {
return rawMongoProgramOutput().match(failPoint + " fail point enabled");
});
};
};
this.resumeSecondaryBatchApplication = function() {
assert.commandWorked(
secondaryDB.adminCommand({configureFailPoint: failPoint, mode: "off"}));
};
this.getPrimaryDB = function() {
return primaryDB;
};
this.getSecondaryDB = function() {
return secondaryDB;
};
this.stopReaders = function() {
print("signaling readers to stop...");
assert.gt(readers.length, 0, "no readers to stop");
assert.writeOK(primaryDB.getCollection(signalColl).insert({_id: testDoneId}));
for (let i = 0; i < readers.length; i++) {
const await = readers[i];
await ();
print("reader " + i + " done");
}
readers = [];
};
this.getReplset = function() {
return rst;
};
this.stop = function() {
if (readers.length > 0) {
this.stopReaders();
}
rst.stopSet();
};
}
|