1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
/**
* Tests mongoD-specific semantics of postBatchResumeToken for $changeStream aggregations.
* @tags: [
* requires_majority_read_concern,
* uses_transactions,
* ]
*/
(function() {
"use strict";
load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
// Create a new single-node replica set, and ensure that it can support $changeStream.
const rst = new ReplSetTest({nodes: 1});
rst.startSet();
rst.initiate();
const db = rst.getPrimary().getDB(jsTestName());
const collName = "report_post_batch_resume_token";
const testCollection = assertDropAndRecreateCollection(db, collName);
const otherCollection = assertDropAndRecreateCollection(db, "unrelated_" + collName);
const oplogColl = db.getSiblingDB("local").oplog.rs;
let docId = 0; // Tracks _id of documents inserted to ensure that we do not duplicate.
const batchSize = 2;
// Helper function to perform generic comparisons and dump the oplog on failure.
function assertCompare(cmpFn, left, right, cmpOp, cmpVal) {
assert[cmpOp](cmpFn(left, right),
cmpVal,
{left: left, right: right, oplogEntries: oplogColl.find().toArray()});
}
// Start watching the test collection in order to capture a resume token.
let csCursor = testCollection.watch();
// Write some documents to the test collection and get the resume token from the first doc.
for (let i = 0; i < 5; ++i) {
assert.commandWorked(testCollection.insert({_id: docId++}));
}
const resumeTokenFromDoc = csCursor.next()._id;
csCursor.close();
// Test that postBatchResumeToken is present on a non-empty initial aggregate batch.
assert.soon(() => {
csCursor = testCollection.watch([], {resumeAfter: resumeTokenFromDoc});
csCursor.close(); // We don't need any results after the initial batch.
return csCursor.objsLeftInBatch();
});
while (csCursor.objsLeftInBatch()) {
csCursor.next();
}
let initialAggPBRT = csCursor.getResumeToken();
assert.neq(undefined, initialAggPBRT);
// Test that the PBRT is correctly updated when reading events from within a transaction.
const session = db.getMongo().startSession();
const sessionDB = session.getDatabase(db.getName());
const sessionColl = sessionDB[testCollection.getName()];
const sessionOtherColl = sessionDB[otherCollection.getName()];
session.startTransaction();
// Open a stream of batchSize:2 and grab the PBRT of the initial batch.
csCursor = testCollection.watch([], {cursor: {batchSize: batchSize}});
initialAggPBRT = csCursor.getResumeToken();
assert.eq(csCursor.objsLeftInBatch(), 0);
// Write 3 documents to testCollection and 1 to the unrelated collection within the transaction.
for (let i = 0; i < 3; ++i) {
assert.commandWorked(sessionColl.insert({_id: docId++}));
}
assert.commandWorked(sessionOtherColl.insert({}));
assert.commandWorked(session.commitTransaction_forTesting());
session.endSession();
// Grab the next 2 events, which should be the first 2 events in the transaction.
assert(csCursor.hasNext()); // Causes a getMore to be dispatched.
assert.eq(csCursor.objsLeftInBatch(), 2);
// The clusterTime should be the same on each, but the resume token keeps advancing.
const txnEvent1 = csCursor.next(), txnEvent2 = csCursor.next();
const txnClusterTime = txnEvent1.clusterTime;
assertCompare(timestampCmp, txnEvent2.clusterTime, txnClusterTime, "eq", 0);
assertCompare(bsonWoCompare, txnEvent1._id, initialAggPBRT, "gt", 0);
assertCompare(bsonWoCompare, txnEvent2._id, txnEvent1._id, "gt", 0);
// The PBRT of the first transaction batch is equal to the last document's resumeToken.
let getMorePBRT = csCursor.getResumeToken();
assertCompare(bsonWoCompare, getMorePBRT, txnEvent2._id, "eq", 0);
// Save this PBRT so that we can test resuming from it later on.
const resumePBRT = getMorePBRT;
// Now get the next batch. This contains the third of the four transaction operations.
let previousGetMorePBRT = getMorePBRT;
assert(csCursor.hasNext()); // Causes a getMore to be dispatched.
assert.eq(csCursor.objsLeftInBatch(), 1);
// The clusterTime of this event is the same as the two events from the previous batch, but its
// resume token is greater than the previous PBRT.
const txnEvent3 = csCursor.next();
assertCompare(timestampCmp, txnEvent3.clusterTime, txnClusterTime, "eq", 0);
assertCompare(bsonWoCompare, txnEvent3._id, previousGetMorePBRT, "gt", 0);
// Because we wrote to the unrelated collection, the final event in the transaction does not
// appear in the batch. But in this case it also does not allow our PBRT to advance beyond the
// last event in the batch, because the unrelated event is within the same transaction and
// therefore has the same clusterTime.
getMorePBRT = csCursor.getResumeToken();
assertCompare(bsonWoCompare, getMorePBRT, txnEvent3._id, "eq", 0);
// Confirm that resuming from the PBRT of the first batch gives us the third transaction write.
csCursor = testCollection.watch([], {resumeAfter: resumePBRT});
assert.docEq(csCursor.next(), txnEvent3);
assert(!csCursor.hasNext());
rst.stopSet();
})();
|