1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
/**
* Tests that the resharding operation will fail if a recipient shard would have missed oplog
* entries from a donor shard.
* @tags: [requires_fcv_47]
*/
(function() {
"use strict";
const rst = new ReplSetTest({
// Set the syncdelay to 1s to speed up checkpointing.
nodeOptions: {syncdelay: 1},
nodes: [{}, {rsConfig: {priority: 0, votes: 0}}]
});
// Set max oplog size to 1MB.
rst.startSet({oplogSize: 1});
rst.initiate();
jsTest.log("Inserting documents to generate oplog entries");
let testDB = rst.getPrimary().getDB("test");
let testColl = testDB.foo;
const localDb = rst.getPrimary().getDB("local");
// 400KB each so that oplog can keep at most two insert oplog entries.
const longString = new Array(400 * 1024).join("a");
// fill the oplog with two large documents
assert.commandWorked(testColl.insert({_id: 0, longString: longString}));
assert.commandWorked(testColl.insert({_id: 1, longString: longString}));
let oplogEntry = localDb.oplog.rs.findOne({"op": "i", "o._id": 0});
jsTest.log("Run aggregation pipeline on oplog with $_requestReshardingResumeToken set");
assert.commandWorked(localDb.runCommand({
aggregate: "oplog.rs",
pipeline: [{$match: {ts: {$gte: oplogEntry.ts}}}],
$_requestReshardingResumeToken: true,
cursor: {}
}));
let id = 2;
assert.soon(() => {
// keep inserting documents until the oplog truncates
assert.commandWorked(testColl.insert({_id: id, longString: longString}));
id++;
return timestampCmp(localDb.oplog.rs.findOne().ts, oplogEntry.ts) == 1;
}, "Timeout waiting for oplog to roll over on primary");
assert.commandFailedWithCode(localDb.runCommand({
aggregate: "oplog.rs",
pipeline: [{$match: {ts: {$gte: oplogEntry.ts}}}],
$_requestReshardingResumeToken: true,
cursor: {}
}),
ErrorCodes.OplogQueryMinTsMissing);
jsTest.log(
"Run aggregation pipeline on incomplete oplog with $_requestReshardingResumeToken set to false");
assert.commandWorked(localDb.runCommand({
aggregate: "oplog.rs",
pipeline: [{$match: {ts: {$gte: oplogEntry.ts}}}],
$_requestReshardingResumeToken: false,
cursor: {}
}));
jsTest.log("Run non-$gte oplog aggregation pipeline with $_requestReshardingResumeToken set");
assert.commandFailedWithCode(localDb.runCommand({
aggregate: "oplog.rs",
pipeline: [{$match: {"op": "i"}}],
$_requestReshardingResumeToken: true,
cursor: {}
}),
ErrorCodes.InvalidOptions);
jsTest.log("End of test");
rst.stopSet();
})();
|