1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
/**
* Test oplog visibility enforcement of primaries and secondaries. This test uses a client to read
* the oplog while there are concurrent writers. The client copies all the timestamps it sees and
* verifies a later scan over the range returns the same values.
*
* @tags: [requires_document_locking]
*/
(function() {
"use strict";
load("jstests/libs/parallelTester.js"); // for ScopedThread.
const replTest = new ReplSetTest({
name: "oplog_visibility",
nodes: [{}, {rsConfig: {priority: 0}}, {rsConfig: {priority: 0}}],
settings: {chainingAllowed: true}
});
replTest.startSet();
replTest.initiate();
jsTestLog("Enabling `sleepBeforeCommit` failpoint.");
for (let node of replTest.nodes) {
assert.commandWorked(node.adminCommand(
{configureFailPoint: "sleepBeforeCommit", mode: {activationProbability: 0.01}}));
}
jsTestLog("Starting concurrent writers.");
let stopLatch = new CountDownLatch(1);
let writers = [];
for (let idx = 0; idx < 2; ++idx) {
let coll = "coll_" + idx;
let writer = new ScopedThread(function(host, coll, stopLatch) {
const conn = new Mongo(host);
let id = 0;
// Cap the amount of data being inserted to avoid rolling over a 10MiB oplog. It takes
// ~70,000 "basic" ~150 byte oplog documents to fill a 10MiB oplog. Note this number is
// for each of two writer threads.
const maxDocsToInsert = 20 * 1000;
while (stopLatch.getCount() > 0 && id < maxDocsToInsert) {
conn.getDB("test").getCollection(coll).insert({_id: id});
id++;
}
jsTestLog({"NumDocsWritten": id});
}, replTest.getPrimary().host, coll, stopLatch);
writer.start();
writers.push(writer);
}
for (let node of replTest.nodes) {
let testOplog = function(node) {
let timestamps = [];
let local = node.getDB("local");
let oplogStart =
local.getCollection("oplog.rs").find().sort({$natural: -1}).limit(-1).next()["ts"];
jsTestLog({"Node": node.host, "StartTs": oplogStart});
while (timestamps.length < 1000) {
// Query with $gte to validate continuinity. Do not add this first record to the
// recorded timestamps. Its value was already added in the last cursor.
let cursor = local.getCollection("oplog.rs")
.find({ts: {$gte: oplogStart}})
.sort({$natural: 1})
.tailable(true)
.batchSize(100);
assert(cursor.hasNext());
assert.eq(oplogStart, cursor.next()["ts"]);
// While this method wants to capture 1000 timestamps, the cursor has a batch size
// of 100 and this loop makes 200 iterations before getting a new cursor from a
// fresh query. The goal is to exercise getMores, which use different code paths
// for establishing their oplog reader transactions.
for (let num = 0; num < 200 && timestamps.length < 1000; ++num) {
try {
if (cursor.hasNext() == false) {
break;
}
} catch (exc) {
break;
}
let ts = cursor.next()["ts"];
timestamps.push(ts);
oplogStart = ts;
}
}
jsTestLog({"Verifying": node.host, "StartTs": timestamps[0], "EndTs": timestamps[999]});
oplogStart = timestamps[0];
let cursor =
local.getCollection("oplog.rs").find({ts: {$gte: oplogStart}}).sort({$natural: 1});
for (let observedTsIdx in timestamps) {
let observedTs = timestamps[observedTsIdx];
assert(cursor.hasNext());
let actualTs = cursor.next()["ts"];
assert.eq(actualTs, observedTs, function() {
let prev = null;
let next = null;
if (observedTsIdx > 0) {
prev = timestamps[observedTsIdx - 1];
}
if (observedTsIdx + 1 < timestamps.length) {
next = timestamps[observedTsIdx + 1];
}
return tojson({
"Missing": actualTs,
"ObservedIdx": observedTsIdx,
"PrevObserved": prev,
"NextObserved": next
});
});
}
};
jsTestLog({"Testing": node.host});
testOplog(node);
}
jsTestLog("Stopping writers.");
stopLatch.countDown();
writers.forEach((writer) => {
writer.join();
});
replTest.stopSet();
})();
|