summaryrefslogtreecommitdiff
path: root/jstests/replsets/oplog_visibility.js
blob: ee62e2475e806ba992be4f0c34d19c5c181650b6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
/**
 * Test oplog visibility enforcement of primaries and secondaries. This test uses a client to read
 * the oplog while there are concurrent writers. The client copies all the timestamps it sees and
 * verifies a later scan over the range returns the same values.
 */
(function() {
"use strict";

load("jstests/libs/parallelTester.js");  // for Thread.

const replTest = new ReplSetTest({
    name: "oplog_visibility",
    nodes: [{}, {rsConfig: {priority: 0}}, {rsConfig: {priority: 0}}],
    settings: {chainingAllowed: true}
});
replTest.startSet();
replTest.initiate();

jsTestLog("Enabling `sleepBeforeCommit` failpoint.");
for (let node of replTest.nodes) {
    assert.commandWorked(node.adminCommand(
        {configureFailPoint: "sleepBeforeCommit", mode: {activationProbability: 0.01}}));
}

jsTestLog("Starting concurrent writers.");
let stopLatch = new CountDownLatch(1);
let writers = [];
for (let idx = 0; idx < 2; ++idx) {
    let coll = "coll_" + idx;
    let writer = new Thread(function(host, coll, stopLatch) {
        const conn = new Mongo(host);
        let id = 0;

        // Cap the amount of data being inserted to avoid rolling over a 10MiB oplog. It takes
        // ~70,000 "basic" ~150 byte oplog documents to fill a 10MiB oplog. Note this number is
        // for each of two writer threads.
        const maxDocsToInsert = 20 * 1000;
        while (stopLatch.getCount() > 0 && id < maxDocsToInsert) {
            conn.getDB("test").getCollection(coll).insert({_id: id});
            id++;
        }
        jsTestLog({"NumDocsWritten": id});
    }, replTest.getPrimary().host, coll, stopLatch);

    writer.start();
    writers.push(writer);
}

for (let node of replTest.nodes) {
    let testOplog = function(node) {
        let timestamps = [];

        let local = node.getDB("local");
        let oplogStart =
            local.getCollection("oplog.rs").find().sort({$natural: -1}).limit(-1).next()["ts"];
        jsTestLog({"Node": node.host, "StartTs": oplogStart});

        while (timestamps.length < 1000) {
            // Query with $gte to validate continuinity. Do not add this first record to the
            // recorded timestamps. Its value was already added in the last cursor.
            let cursor = local.getCollection("oplog.rs")
                             .find({ts: {$gte: oplogStart}})
                             .sort({$natural: 1})
                             .tailable(true)
                             .batchSize(100);
            assert(cursor.hasNext());
            assert.eq(oplogStart, cursor.next()["ts"]);

            // While this method wants to capture 1000 timestamps, the cursor has a batch size
            // of 100 and this loop makes 200 iterations before getting a new cursor from a
            // fresh query. The goal is to exercise getMores, which use different code paths
            // for establishing their oplog reader transactions.
            for (let num = 0; num < 200 && timestamps.length < 1000; ++num) {
                try {
                    if (cursor.hasNext() == false) {
                        break;
                    }
                } catch (exc) {
                    break;
                }
                let ts = cursor.next()["ts"];
                timestamps.push(ts);
                oplogStart = ts;
            }
        }

        jsTestLog({"Verifying": node.host, "StartTs": timestamps[0], "EndTs": timestamps[999]});
        oplogStart = timestamps[0];
        let cursor =
            local.getCollection("oplog.rs").find({ts: {$gte: oplogStart}}).sort({$natural: 1});
        for (let observedTsIdx = 0; observedTsIdx < timestamps.length; ++observedTsIdx) {
            let observedTs = timestamps[observedTsIdx];
            assert(cursor.hasNext());
            let doc = cursor.next();
            let actualTs = doc["ts"];
            assert.eq(actualTs, observedTs, function() {
                let prev = null;
                let next = null;
                if (observedTsIdx > 0) {
                    prev = timestamps[observedTsIdx - 1];
                }
                if (observedTsIdx + 1 < timestamps.length) {
                    next = timestamps[observedTsIdx + 1];
                }

                return tojson({
                    "Missing": actualTs,
                    "ObservedIdx": observedTsIdx,
                    "PrevObserved": prev,
                    "NextObserved": next
                });
            });
        }
    };

    jsTestLog({"Testing": node.host});
    testOplog(node);
}
jsTestLog("Stopping writers.");
stopLatch.countDown();
writers.forEach((writer) => {
    writer.join();
});

replTest.stopSet();
})();