summaryrefslogtreecommitdiff
path: root/jstests/replsets/oplog_visibility.js
blob: ccdcf5c6d93903e43e4fa21f4ffb9e32ba8f2e39 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
/**
 * Test oplog visibility enforcement of primaries and secondaries. This test uses a client to read
 * the oplog while there are concurrent writers. The client copies all the timestamps it sees and
 * verifies a later scan over the range returns the same values.
 *
 * @tags: [requires_document_locking]
 */
(function() {
    "use strict";

    load("jstests/libs/parallelTester.js");  // for ScopedThread.

    const replTest = new ReplSetTest({
        name: "oplog_visibility",
        nodes: [{}, {rsConfig: {priority: 0}}, {rsConfig: {priority: 0}}],
        settings: {chainingAllowed: true}
    });
    replTest.startSet();
    replTest.initiate();

    jsTestLog("Enabling `sleepBeforeCommit` failpoint.");
    for (let node of replTest.nodes) {
        assert.commandWorked(node.adminCommand(
            {configureFailPoint: "sleepBeforeCommit", mode: {activationProbability: 0.01}}));
    }

    jsTestLog("Starting concurrent writers.");
    let stopLatch = new CountDownLatch(1);
    let writers = [];
    for (let idx = 0; idx < 2; ++idx) {
        let coll = "coll_" + idx;
        let writer = new ScopedThread(function(host, coll, stopLatch) {
            const conn = new Mongo(host);
            let id = 0;

            // Cap the amount of data being inserted to avoid rolling over a 10MiB oplog. It takes
            // ~70,000 "basic" ~150 byte oplog documents to fill a 10MiB oplog. Note this number is
            // for each of two writer threads.
            const maxDocsToInsert = 20 * 1000;
            while (stopLatch.getCount() > 0 && id < maxDocsToInsert) {
                conn.getDB("test").getCollection(coll).insert({_id: id});
                id++;
            }
            jsTestLog({"NumDocsWritten": id});
        }, replTest.getPrimary().host, coll, stopLatch);

        writer.start();
        writers.push(writer);
    }

    for (let node of replTest.nodes) {
        let testOplog = function(node) {
            let timestamps = [];

            let local = node.getDB("local");
            let oplogStart =
                local.getCollection("oplog.rs").find().sort({$natural: -1}).limit(-1).next()["ts"];
            jsTestLog({"Node": node.host, "StartTs": oplogStart});

            while (timestamps.length < 1000) {
                // Query with $gte to validate continuinity. Do not add this first record to the
                // recorded timestamps. Its value was already added in the last cursor.
                let cursor = local.getCollection("oplog.rs")
                                 .find({ts: {$gte: oplogStart}})
                                 .sort({$natural: 1})
                                 .tailable(true)
                                 .batchSize(100);
                assert(cursor.hasNext());
                assert.eq(oplogStart, cursor.next()["ts"]);

                // While this method wants to capture 1000 timestamps, the cursor has a batch size
                // of 100 and this loop makes 200 iterations before getting a new cursor from a
                // fresh query. The goal is to exercise getMores, which use different code paths
                // for establishing their oplog reader transactions.
                for (let num = 0; num < 200 && timestamps.length < 1000; ++num) {
                    try {
                        if (cursor.hasNext() == false) {
                            break;
                        }
                    } catch (exc) {
                        break;
                    }
                    let ts = cursor.next()["ts"];
                    timestamps.push(ts);
                    oplogStart = ts;
                }
            }

            jsTestLog({"Verifying": node.host, "StartTs": timestamps[0], "EndTs": timestamps[999]});
            oplogStart = timestamps[0];
            let cursor =
                local.getCollection("oplog.rs").find({ts: {$gte: oplogStart}}).sort({$natural: 1});
            for (let observedTsIdx in timestamps) {
                let observedTs = timestamps[observedTsIdx];
                assert(cursor.hasNext());
                let actualTs = cursor.next()["ts"];
                assert.eq(actualTs, observedTs, function() {
                    let prev = null;
                    let next = null;
                    if (observedTsIdx > 0) {
                        prev = timestamps[observedTsIdx - 1];
                    }
                    if (observedTsIdx + 1 < timestamps.length) {
                        next = timestamps[observedTsIdx + 1];
                    }

                    return tojson({
                        "Missing": actualTs,
                        "ObservedIdx": observedTsIdx,
                        "PrevObserved": prev,
                        "NextObserved": next
                    });
                });
            }
        };

        jsTestLog({"Testing": node.host});
        testOplog(node);
    }
    jsTestLog("Stopping writers.");
    stopLatch.countDown();
    writers.forEach((writer) => {
        writer.join();
    });

    replTest.stopSet();
})();