summaryrefslogtreecommitdiff
path: root/jstests/change_streams/report_post_batch_resume_token.js
blob: b8f975d2585815e5de57b415aab4b36d2da71f32 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
/**
 * Tests that an aggregate with a $changeStream stage reports the latest postBatchResumeToken. This
 * test verifies postBatchResumeToken semantics that are common to sharded and unsharded streams.
 * @tags: [uses_transactions]
 */
(function() {
"use strict";

load("jstests/libs/collection_drop_recreate.js");  // For assert[Drop|Create]Collection.
load("jstests/libs/fixture_helpers.js");           // For isMongos.

// Drop and recreate collections to assure a clean run.
const collName = "report_post_batch_resume_token";
const testCollection = assertDropAndRecreateCollection(db, collName);
const otherCollection = assertDropAndRecreateCollection(db, "unrelated_" + collName);
const adminDB = db.getSiblingDB("admin");

// Helper function which swallows an assertion if we are running on a sharded cluster.
assert.eqIfNotMongos = function(val1, val2, errMsg) {
    if (!FixtureHelpers.isMongos(db)) {
        assert.eq(val1, val2, errMsg);
    }
};

let docId = 0;  // Tracks _id of documents inserted to ensure that we do not duplicate.
const batchSize = 2;

// Test that postBatchResumeToken is present on an initial aggregate of batchSize: 0.
let csCursor = testCollection.watch([], {cursor: {batchSize: 0}});
assert.eq(csCursor.objsLeftInBatch(), 0);
let initialAggPBRT = csCursor.getResumeToken();
assert.neq(undefined, initialAggPBRT);

// Test that the PBRT does not advance beyond its initial value for a change stream whose
// startAtOperationTime is in the future, even as writes are made to the test collection.
const timestampIn2100 = Timestamp(4102444800, 1);
csCursor = testCollection.watch([], {startAtOperationTime: timestampIn2100});
assert.eq(csCursor.objsLeftInBatch(), 0);
initialAggPBRT = csCursor.getResumeToken();
assert.neq(undefined, initialAggPBRT);

// Verify that no events are returned and the PBRT does not advance or go backwards, even as
// documents are written into the collection.
for (let i = 0; i < 5; ++i) {
    assert(!csCursor.hasNext());  // Causes a getMore to be dispatched.
    const getMorePBRT = csCursor.getResumeToken();
    assert.eq(bsonWoCompare(initialAggPBRT, getMorePBRT), 0);
    assert.commandWorked(testCollection.insert({_id: docId++}));
}

// Test that postBatchResumeToken is present on empty initial aggregate batch.
csCursor = testCollection.watch();
assert.eq(csCursor.objsLeftInBatch(), 0);
initialAggPBRT = csCursor.getResumeToken();
assert.neq(undefined, initialAggPBRT);

// Test that postBatchResumeToken is present on empty getMore batch.
assert(!csCursor.hasNext());  // Causes a getMore to be dispatched.
let getMorePBRT = csCursor.getResumeToken();
assert.neq(undefined, getMorePBRT);
assert.gte(bsonWoCompare(getMorePBRT, initialAggPBRT), 0);

// Test that postBatchResumeToken advances with returned events. Insert one document into the
// collection and consume the resulting change stream event.
assert.commandWorked(testCollection.insert({_id: docId++}));
assert.soon(() => csCursor.hasNext());  // Causes a getMore to be dispatched.
assert(csCursor.objsLeftInBatch() == 1);

// Because the retrieved event is the most recent entry in the oplog, the PBRT should be equal to
// the resume token of the last item in the batch and greater than the initial PBRT.
let resumeTokenFromDoc = csCursor.next()._id;
getMorePBRT = csCursor.getResumeToken();
// When running in a sharded passthrough, we cannot guarantee that the retrieved event was the last
// item in the oplog, and so we cannot assert that the PBRT is equal to the event's resume token.
assert.eqIfNotMongos(bsonWoCompare(getMorePBRT, resumeTokenFromDoc), 0);
assert.gt(bsonWoCompare(getMorePBRT, initialAggPBRT), 0);

// Now seed the collection with enough documents to fit in two batches.
for (let i = 0; i < batchSize * 2; i++) {
    assert.commandWorked(testCollection.insert({_id: docId++}));
}

// Test that the PBRT for a resumed stream is the given resume token if no result are returned.
csCursor = testCollection.watch([], {resumeAfter: resumeTokenFromDoc, cursor: {batchSize: 0}});
assert.eq(csCursor.objsLeftInBatch(), 0);
initialAggPBRT = csCursor.getResumeToken();
assert.neq(undefined, initialAggPBRT);
assert.eq(bsonWoCompare(initialAggPBRT, resumeTokenFromDoc), 0);

// Test that postBatchResumeToken advances with getMore. Iterate the cursor and assert that the
// observed postBatchResumeToken advanced.
assert.soon(() => csCursor.hasNext());  // Causes a getMore to be dispatched.

// The postBatchResumeToken is again equal to the final token in the batch, and greater than the
// PBRT from the initial response.
let eventFromCursor = null;
while (csCursor.objsLeftInBatch()) {
    eventFromCursor = csCursor.next();
    resumeTokenFromDoc = eventFromCursor._id;
}
getMorePBRT = csCursor.getResumeToken();
// When running in a sharded passthrough, we cannot guarantee that the retrieved event was the last
// item in the oplog, and so we cannot assert that the PBRT is equal to the event's resume token.
assert.eqIfNotMongos(bsonWoCompare(resumeTokenFromDoc, getMorePBRT), 0);
assert.gt(bsonWoCompare(getMorePBRT, initialAggPBRT), 0);

// Test that postBatchResumeToken advances with writes to an unrelated collection. First make
// sure there is nothing left in our cursor, and obtain the latest PBRT...
while (eventFromCursor.fullDocument._id < (docId - 1)) {
    assert.soon(() => csCursor.hasNext());
    eventFromCursor = csCursor.next();
}
assert(!csCursor.hasNext());
let previousGetMorePBRT = csCursor.getResumeToken();
assert.neq(undefined, previousGetMorePBRT);

// ... then test that it advances on an insert to an unrelated collection.
assert.commandWorked(otherCollection.insert({_id: docId}));
assert.soon(() => {
    assert(!csCursor.hasNext());  // Causes a getMore to be dispatched.
    getMorePBRT = csCursor.getResumeToken();
    return bsonWoCompare(getMorePBRT, previousGetMorePBRT) > 0;
});

// Insert two documents into the collection which are of the maximum BSON object size.
const bsonUserSizeLimit = assert.commandWorked(adminDB.isMaster()).maxBsonObjectSize;
assert.gt(bsonUserSizeLimit, 0);
for (let i = 0; i < 2; ++i) {
    const docToInsert = {_id: docId++, padding: ""};
    docToInsert.padding = "a".repeat(bsonUserSizeLimit - Object.bsonsize(docToInsert));
    assert.commandWorked(testCollection.insert(docToInsert));
}

// Test that we return the correct postBatchResumeToken in the event that the batch hits the
// byte size limit. Despite the fact that the batchSize is 2, we should only see 1 result,
// because the second result cannot fit in the batch.
assert.soon(() => csCursor.hasNext());  // Causes a getMore to be dispatched.
assert.eq(csCursor.objsLeftInBatch(), 1);

// Obtain the resume token and the PBRT from the first document.
resumeTokenFromDoc = csCursor.next()._id;
getMorePBRT = csCursor.getResumeToken();

// Now retrieve the second event and confirm that the PBRTs and resume tokens are in-order.
previousGetMorePBRT = getMorePBRT;
assert.soon(() => csCursor.hasNext());  // Causes a getMore to be dispatched.
assert.eq(csCursor.objsLeftInBatch(), 1);
const resumeTokenFromSecondDoc = csCursor.next()._id;
getMorePBRT = csCursor.getResumeToken();
assert.gte(bsonWoCompare(previousGetMorePBRT, resumeTokenFromDoc), 0);
assert.gt(bsonWoCompare(resumeTokenFromSecondDoc, previousGetMorePBRT), 0);
assert.gte(bsonWoCompare(getMorePBRT, resumeTokenFromSecondDoc), 0);

// Sharded collection passthroughs use prepared transactions, which require majority read concern.
// If the collection is sharded and majority read concern is disabled, skip the transaction tests.
const rcCmdRes = testCollection.runCommand("find", {readConcern: {level: "majority"}});
if (FixtureHelpers.isSharded(testCollection) &&
    rcCmdRes.code === ErrorCodes.ReadConcernMajorityNotEnabled) {
    jsTestLog("Skipping transaction tests since majority read concern is disabled.");
    return;
}

// Test that the PBRT is correctly updated when reading events from within a transaction.
const session = db.getMongo().startSession();
const sessionDB = session.getDatabase(db.getName());

const sessionColl = sessionDB[testCollection.getName()];
const sessionOtherColl = sessionDB[otherCollection.getName()];
session.startTransaction();

// Write 3 documents to testCollection and 1 to the unrelated collection within the transaction.
for (let i = 0; i < 3; ++i) {
    assert.commandWorked(sessionColl.insert({_id: docId++}));
}
assert.commandWorked(sessionOtherColl.insert({_id: docId}));
assert.commandWorked(session.commitTransaction_forTesting());
session.endSession();

// Grab the next 2 events, which should be the first 2 events in the transaction. As of SERVER-37364
// the co-ordinator of a distributed transaction returns before all participants have acknowledged
// the decision, and so not all events may yet be majority-visible. We therefore wait until we see
// both expected events in the first set of results retrieved from the transaction.
previousGetMorePBRT = getMorePBRT;
assert.soon(() => {
    // Start a new stream from the most recent resume token we retrieved.
    csCursor = testCollection.watch(
        [], {resumeAfter: previousGetMorePBRT, cursor: {batchSize: batchSize}});
    // Wait until we see the first results from the stream.
    assert.soon(() => csCursor.hasNext());
    // There should be two distinct events in the batch.
    return csCursor.objsLeftInBatch() === 2;
});

// The clusterTime should be the same on each, but the resume token keeps advancing.
const txnEvent1 = csCursor.next(), txnEvent2 = csCursor.next();
const txnClusterTime = txnEvent1.clusterTime;
// On a sharded cluster, the events in the txn may be spread across multiple shards. Events from
// each shard will all have the same clusterTime, but the clusterTimes may differ between shards.
// Therefore, we cannot guarantee that the clusterTime of txnEvent2 is always the same as the
// clusterTime of txnEvent1, since the events may have occurred on different shards.
assert.eqIfNotMongos(txnEvent2.clusterTime, txnClusterTime);
assert.gt(bsonWoCompare(txnEvent1._id, previousGetMorePBRT), 0);
assert.gt(bsonWoCompare(txnEvent2._id, txnEvent1._id), 0);

// The PBRT of the first transaction batch is equal to the last document's resumeToken. We have
// more events to return from the transaction, and so the PBRT cannot have advanced any further.
getMorePBRT = csCursor.getResumeToken();
assert.eq(bsonWoCompare(getMorePBRT, txnEvent2._id), 0);

// Now get the next batch. This contains the third of the four transaction operations.
previousGetMorePBRT = getMorePBRT;
assert.soon(() => csCursor.hasNext());  // Causes a getMore to be dispatched.
assert.eq(csCursor.objsLeftInBatch(), 1);

// The clusterTime of this event is the same as the two events from the previous batch, but its
// resume token is greater than the previous PBRT.
const txnEvent3 = csCursor.next();
// As before, we cannot guarantee that the clusterTime of txnEvent3 is always the same as that of
// txnEvent1 when running in a sharded cluster. However, the PBRT should advance in any deployment.
assert.eqIfNotMongos(txnEvent3.clusterTime, txnClusterTime);
assert.gt(bsonWoCompare(txnEvent3._id, previousGetMorePBRT), 0);

// Because we wrote to the unrelated collection, the final event in the transaction does not
// appear in the batch. Confirm that the postBatchResumeToken has been set correctly.
getMorePBRT = csCursor.getResumeToken();
assert.gte(bsonWoCompare(getMorePBRT, txnEvent3._id), 0);
})();