summaryrefslogtreecommitdiff
path: root/jstests/sharding/resharding_oplog_sync_agg_resume_token.js
blob: da85a2826aff6ff1a8bdae7c323650eed4331b0b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
/**
 * Test that the postBatchResumeToken field is only included for the oplog namespace when
 * $_requestReshardingResumeToken is specified for an aggregate command.
 *
 * @tags: [
 * ]
 */
(function() {
"use strict";

// Returns true if timestamp 'ts1' value is greater than timestamp 'ts2' value.
function timestampGreaterThan(ts1, ts2) {
    return ts1.getTime() > ts2.getTime() ||
        (ts1.getTime() == ts2.getTime() && ts1.getInc() > ts2.getInc());
}

var rst = new ReplSetTest({nodes: 1});
rst.startSet();
rst.initiate();

const dbName = "test";
const collName = "foo";
const ns = dbName + "." + collName;

// Insert documents to generate oplog entries.
let testDB = rst.getPrimary().getDB(dbName);
let testColl = testDB.foo;
for (let i = 0; i < 10; i++) {
    assert.commandWorked(testColl.insert({x: i}));
}

const localDb = rst.getPrimary().getDB("local");

// Run aggregation pipeline on oplog with $_requestReshardingResumeToken set when the pipeline can
// be optimized away.
const resEnabled = localDb.runCommand({
    aggregate: "oplog.rs",
    pipeline: [{$match: {ts: {$gte: Timestamp(0, 0)}}}],
    $_requestReshardingResumeToken: true,
    cursor: {batchSize: 1}
});

assert.commandWorked(resEnabled);
assert(resEnabled.cursor.hasOwnProperty("postBatchResumeToken"), resEnabled);
assert(resEnabled.cursor.postBatchResumeToken.hasOwnProperty("ts"), resEnabled);

// Ensure that postBatchResumeToken attribute is returned for getMore command.
const cursorId = resEnabled.cursor.id;
const resGetMore =
    assert.commandWorked(localDb.runCommand({getMore: cursorId, collection: "oplog.rs"}));

assert.commandWorked(resGetMore);
assert(resGetMore.cursor.hasOwnProperty("postBatchResumeToken"), resGetMore);
assert(resGetMore.cursor.postBatchResumeToken.hasOwnProperty("ts"), resGetMore);

// Run aggregation pipeline on oplog with $_requestReshardingResumeToken disabled.
const resDisabled = localDb.runCommand({
    aggregate: "oplog.rs",
    pipeline: [{$match: {ts: {$gte: Timestamp(0, 0)}}}],
    $_requestReshardingResumeToken: false,
    cursor: {}
});

assert.commandWorked(resDisabled);
assert(!resDisabled.cursor.hasOwnProperty("postBatchResumeToken"), resDisabled);

// Run aggregation pipeline on oplog with $_requestReshardingResumeToken unspecified and defaulting
// to disabled.
const resWithout = localDb.runCommand(
    {aggregate: "oplog.rs", pipeline: [{$match: {ts: {$gte: Timestamp(0, 0)}}}], cursor: {}});

assert.commandWorked(resWithout);
assert(!resWithout.cursor.hasOwnProperty("postBatchResumeToken"), resWithout);

// Run aggregation pipeline on non-oplog with $_requestReshardingResumeToken set.
const resNotOplog = localDb.runCommand(
    {aggregate: ns, pipeline: [{$limit: 100}], $_requestReshardingResumeToken: true, cursor: {}});

assert.commandFailedWithCode(resNotOplog,
                             ErrorCodes.FailedToParse,
                             "$_requestReshardingResumeToken set on non-oplog should fail");

// Run $changeStream on oplog with $_requestReshardingResumeToken set.
const resChangeStreamOnOplogWithRequestReshardingResumeToken = localDb.runCommand({
    aggregate: "oplog.rs",
    pipeline: [{$changeStream: {}}],
    $_requestReshardingResumeToken: true,
    cursor: {}
});
assert.commandFailedWithCode(resChangeStreamOnOplogWithRequestReshardingResumeToken,
                             ErrorCodes.InvalidNamespace,
                             "$changeStream on oplog should fail");

// Run $changeStream with $_requestReshardingResumeToken set on non-oplog collection.
const resChangeStreamWithRequestReshardingResumeToken = testDB.runCommand({
    aggregate: collName,
    pipeline: [{$changeStream: {}}],
    $_requestReshardingResumeToken: true,
    cursor: {}
});
assert.commandFailedWithCode(resChangeStreamWithRequestReshardingResumeToken,
                             ErrorCodes.FailedToParse,
                             "$_requestReshardingResumeToken set with $changeStream should fail");

// Run aggregation pipeline on oplog with empty batch.
const resEmpty = localDb.runCommand({
    aggregate: "oplog.rs",
    pipeline: [{$match: {ts: {$gte: Timestamp(0, 0)}}}],
    $_requestReshardingResumeToken: true,
    cursor: {batchSize: 0}
});

assert.commandWorked(resEmpty);
assert(resEmpty.cursor.hasOwnProperty("postBatchResumeToken"), resEmpty);
assert(resEmpty.cursor.postBatchResumeToken.hasOwnProperty("ts"), resEmpty);
assert.eq(resEmpty.cursor.postBatchResumeToken.ts, new Timestamp(0, 0));

// Run aggregation pipeline on oplog with $_requestReshardingResumeToken set when the pipeline can
// not be optimized away.
const batchSize = 5;
let result = localDb.runCommand({
    aggregate: "oplog.rs",
    // The $_internalInhibitOptimization prevents the pipeline from being optimized away as a simple
    // plan executor. This is necessary to force the pipeline to be evaluated using
    // PlanExecutorPipeline.
    pipeline: [
        {$match: {ts: {$gte: Timestamp(0, 0)}, "o.x": {$lt: 8}}},
        {$_internalInhibitOptimization: {}}
    ],
    $_requestReshardingResumeToken: true,
    cursor: {batchSize: batchSize}
});
assert.commandWorked(result);
assert(result.cursor.hasOwnProperty("postBatchResumeToken"), result);
assert(result.cursor.postBatchResumeToken.hasOwnProperty("ts"), result);
assert.eq(result.cursor.firstBatch.length, batchSize, result);

// Verify that the postBatchResumeToken is equal to the 'ts' of the last record.
assert.eq(result.cursor.postBatchResumeToken.ts,
          result.cursor.firstBatch[result.cursor.firstBatch.length - 1].ts);

// Ensure that postBatchResumeToken attribute is returned for getMore command by reading the second
// batch. There are not enough matching documents left in the oplog to fill an entire batch, so we
// expect the PBRT to exceed the ts of the final entry.
result =
    assert.commandWorked(localDb.runCommand({getMore: result.cursor.id, collection: "oplog.rs"}));
assert(result.cursor.hasOwnProperty("postBatchResumeToken"), result);
assert(result.cursor.postBatchResumeToken.hasOwnProperty("ts"), result);
let resultsBatch = result.cursor.nextBatch;
assert(resultsBatch.length < batchSize, result);

// Verify that the postBatchResumeToken is greater than the 'ts' of the last read record since
// the documents in the rest of the collection do not match the filter "o.x": {$lt: 8}.
assert(timestampGreaterThan(result.cursor.postBatchResumeToken.ts,
                            resultsBatch[resultsBatch.length - 1].ts),
       "postBatchResumeToken value should be greater than 'ts' of the last record");

// Read all records in one batch.
result = localDb.runCommand({
    aggregate: "oplog.rs",
    pipeline: [
        {$match: {ts: {$gte: Timestamp(0, 0)}, "o.x": {$lt: 2}}},
        {$_internalInhibitOptimization: {}}
    ],
    $_requestReshardingResumeToken: true,
    cursor: {}
});
assert.commandWorked(result);
assert(result.cursor.hasOwnProperty("postBatchResumeToken"), result);
assert(result.cursor.postBatchResumeToken.hasOwnProperty("ts"), result);
resultsBatch = result.cursor.firstBatch;

// Verify that the postBatchResumeToken is greater than the 'ts' of the last read record since
// the documents in the rest of the collection do not match the filter "o.x": {$lt: 2}.
assert(timestampGreaterThan(result.cursor.postBatchResumeToken.ts,
                            resultsBatch[resultsBatch.length - 1].ts),
       "postBatchResumeToken value should be greater than 'ts' of the last record");

// Run aggregation pipeline on oplog with batchSize: 0 when the pipeline can not be optimized away.
result = localDb.runCommand({
    aggregate: "oplog.rs",
    pipeline: [
        {$match: {ts: {$gte: Timestamp(0, 0)}, "o.x": {$lt: 2}}},
        {$_internalInhibitOptimization: {}}
    ],
    $_requestReshardingResumeToken: true,
    cursor: {batchSize: 0}
});

assert.commandWorked(result);
assert(result.cursor.hasOwnProperty("postBatchResumeToken"), result);
assert(result.cursor.postBatchResumeToken.hasOwnProperty("ts"), result);
assert.eq(result.cursor.postBatchResumeToken.ts, new Timestamp(0, 0));

// Run aggregation pipeline on oplog with $_requestReshardingResumeToken set to false when the
// pipeline can not be optimized away.
result = localDb.runCommand({
    aggregate: "oplog.rs",
    pipeline: [{$match: {ts: {$gte: Timestamp(0, 0)}}}, {$_internalInhibitOptimization: {}}],
    $_requestReshardingResumeToken: false,
    cursor: {batchSize: 5}
});
assert.commandWorked(result);
assert(!result.cursor.hasOwnProperty("postBatchResumeToken"), result);
rst.stopSet();
})();