summaryrefslogtreecommitdiff
path: root/jstests/sharding/resharding_oplog_sync_agg_assert_min_oplog.js
blob: a06e2b633be792b17d688f617b6e0df5bd9e5158 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
/**
 * Tests that the resharding operation will fail if a recipient shard would have missed oplog
 * entries from a donor shard.
 * @tags: [requires_fcv_47]
 */
(function() {
"use strict";

const rst = new ReplSetTest({
    // Set the syncdelay to 1s to speed up checkpointing.
    nodeOptions: {syncdelay: 1},
    nodes: [{}, {rsConfig: {priority: 0, votes: 0}}]
});
// Set max oplog size to 1MB.
rst.startSet({oplogSize: 1});
rst.initiate();

jsTest.log("Inserting documents to generate oplog entries");
let testDB = rst.getPrimary().getDB("test");
let testColl = testDB.foo;
const localDb = rst.getPrimary().getDB("local");

// 400KB each so that oplog can keep at most two insert oplog entries.
const longString = new Array(400 * 1024).join("a");

// fill the oplog with two large documents
assert.commandWorked(testColl.insert({_id: 0, longString: longString}));
assert.commandWorked(testColl.insert({_id: 1, longString: longString}));

let oplogEntry = localDb.oplog.rs.findOne({"op": "i", "o._id": 0});

jsTest.log("Run aggregation pipeline on oplog with $_requestReshardingResumeToken set");
assert.commandWorked(localDb.runCommand({
    aggregate: "oplog.rs",
    pipeline: [{$match: {ts: {$gte: oplogEntry.ts}}}],
    $_requestReshardingResumeToken: true,
    cursor: {}
}));

let id = 2;
assert.soon(() => {
    // keep inserting documents until the oplog truncates
    assert.commandWorked(testColl.insert({_id: id, longString: longString}));
    id++;
    return timestampCmp(localDb.oplog.rs.findOne().ts, oplogEntry.ts) == 1;
}, "Timeout waiting for oplog to roll over on primary");

assert.commandFailedWithCode(localDb.runCommand({
    aggregate: "oplog.rs",
    pipeline: [{$match: {ts: {$gte: oplogEntry.ts}}}],
    $_requestReshardingResumeToken: true,
    cursor: {}
}),
                             ErrorCodes.OplogQueryMinTsMissing);

jsTest.log(
    "Run aggregation pipeline on incomplete oplog with $_requestReshardingResumeToken set to false");
assert.commandWorked(localDb.runCommand({
    aggregate: "oplog.rs",
    pipeline: [{$match: {ts: {$gte: oplogEntry.ts}}}],
    $_requestReshardingResumeToken: false,
    cursor: {}
}));

jsTest.log("Run non-$gte oplog aggregation pipeline with $_requestReshardingResumeToken set");
assert.commandFailedWithCode(localDb.runCommand({
    aggregate: "oplog.rs",
    pipeline: [{$match: {"op": "i"}}],
    $_requestReshardingResumeToken: true,
    cursor: {}
}),
                             ErrorCodes.InvalidOptions);

jsTest.log("End of test");

rst.stopSet();
})();