summaryrefslogtreecommitdiff
path: root/jstests/replsets/oplog_visibility.js
blob: c0d3e693c72316e0689006a3b43a17a77c5c154e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
/**
 * Test oplog visibility enforcement of primaries and secondaries. This test uses a client to read
 * the oplog while there are concurrent writers. The client copies all the timestamps it sees and
 * verifies a later scan over the range returns the same values.
 */
(function() {
"use strict";

load("jstests/libs/parallelTester.js");  // for Thread.

const replTest = new ReplSetTest({
    name: "oplog_visibility",
    nodes: [{}, {rsConfig: {priority: 0}}, {rsConfig: {priority: 0}}],
    settings: {chainingAllowed: true}
});
replTest.startSet();
replTest.initiate();

jsTestLog("Enabling `sleepBeforeCommit` failpoint.");
for (let node of replTest.nodes) {
    assert.commandWorked(node.adminCommand(
        {configureFailPoint: "sleepBeforeCommit", mode: {activationProbability: 0.01}}));
}

jsTestLog("Starting concurrent writers.");
let stopLatch = new CountDownLatch(1);
let writers = [];
for (let idx = 0; idx < 2; ++idx) {
    let coll = "coll_" + idx;
    let writer = new Thread(function(host, coll, stopLatch) {
        const conn = new Mongo(host);
        let id = 0;

        // Cap the amount of data being inserted to avoid rolling over a 10MiB oplog. It takes
        // ~70,000 "basic" ~150 byte oplog documents to fill a 10MiB oplog. Note this number is
        // for each of two writer threads.
        const maxDocsToInsert = 20 * 1000;
        while (stopLatch.getCount() > 0 && id < maxDocsToInsert) {
            conn.getDB("test").getCollection(coll).insert({_id: id});
            id++;
        }
        jsTestLog({"NumDocsWritten": id});
    }, replTest.getPrimary().host, coll, stopLatch);

    writer.start();
    writers.push(writer);
}

for (let node of replTest.nodes) {
    let testOplog = function(node) {
        let timestamps = [];

        let local = node.getDB("local");
        let oplogStart =
            local.getCollection("oplog.rs").find().sort({$natural: -1}).limit(-1).next()["ts"];
        jsTestLog({"Node": node.host, "StartTs": oplogStart});

        while (timestamps.length < 1000) {
            // Query with $gte to validate continuinity. Do not add this first record to the
            // recorded timestamps. Its value was already added in the last cursor.
            let cursor = local.getCollection("oplog.rs")
                             .find({ts: {$gte: oplogStart}})
                             .sort({$natural: 1})
                             .tailable(true)
                             .batchSize(100);
            assert(cursor.hasNext());
            assert.eq(oplogStart, cursor.next()["ts"]);

            // While this method wants to capture 1000 timestamps, the cursor has a batch size
            // of 100 and this loop makes 200 iterations before getting a new cursor from a
            // fresh query. The goal is to exercise getMores, which use different code paths
            // for establishing their oplog reader transactions.
            for (let num = 0; num < 200 && timestamps.length < 1000; ++num) {
                try {
                    if (cursor.hasNext() == false) {
                        break;
                    }
                } catch (exc) {
                    break;
                }
                let ts = cursor.next()["ts"];
                timestamps.push(ts);
                oplogStart = ts;
            }
        }

        jsTestLog({"Verifying": node.host, "StartTs": timestamps[0], "EndTs": timestamps[999]});
        oplogStart = timestamps[0];
        let cursor =
            local.getCollection("oplog.rs").find({ts: {$gte: oplogStart}}).sort({$natural: 1});
        for (let observedTsIdx = 0; observedTsIdx < timestamps.length; ++observedTsIdx) {
            let observedTs = timestamps[observedTsIdx];

            const makeMissingTsMsgFn = function(actualTs) {
                let prev = null;
                let next = null;
                if (observedTsIdx > 0) {
                    prev = timestamps[observedTsIdx - 1];
                }
                if (observedTsIdx + 1 < timestamps.length) {
                    next = timestamps[observedTsIdx + 1];
                }

                return tojson({
                    "Missing": actualTs,
                    "ObservedTs": observedTs,
                    "ObservedIdx": observedTsIdx,
                    "PrevObserved": prev,
                    "NextObserved": next
                });
            };

            assert(cursor.hasNext(), makeMissingTsMsgFn('cursor returned no data'));
            let doc = cursor.next();
            let actualTs = doc["ts"];
            assert.eq(actualTs, observedTs, makeMissingTsMsgFn(actualTs));
        }
    };

    jsTestLog({"Testing": node.host});
    testOplog(node);
}
jsTestLog("Stopping writers.");
stopLatch.countDown();
writers.forEach((writer) => {
    writer.join();
});

replTest.stopSet();
})();