1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
|
/**
* Confirms that resuming from an event which has the same clusterTime as a transaction on another
* shard does not cause the resume attempt to be prematurely rejected. Reproduction script for the
* bug described in SERVER-40094.
* @tags: [
* requires_sharding,
* uses_multi_shard_transaction,
* uses_transactions,
* ]
*/
(function() {
"use strict";
// Asserts that the expected operation type and documentKey are found on the change stream
// cursor. Returns the change stream document.
function assertWriteVisible({cursor, opType, docKey}) {
assert.soon(() => cursor.hasNext());
const changeDoc = cursor.next();
assert.eq(opType, changeDoc.operationType, changeDoc);
assert.eq(docKey, changeDoc.documentKey, changeDoc);
return changeDoc;
}
// Create a new cluster with 2 shards. Enable 1-second period no-ops to ensure that all relevant
// events eventually become available.
const st = new ShardingTest({
shards: 2,
rs: {nodes: 1, setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}}
});
const mongosDB = st.s.getDB(jsTestName());
const mongosColl = mongosDB.test;
// Enable sharding on the test DB and ensure its primary is shard0.
assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL());
// Shard on {shard:1}, split at {shard:1}, and move the upper chunk to shard1.
st.shardColl(mongosColl, {shard: 1}, {shard: 1}, {shard: 1}, mongosDB.getName(), true);
// Seed each shard with one document.
assert.commandWorked(
mongosColl.insert([{shard: 0, _id: "initial_doc"}, {shard: 1, _id: "initial doc"}]));
// Start a transaction which will be used to write documents across both shards.
const session = mongosDB.getMongo().startSession();
const sessionDB = session.getDatabase(mongosDB.getName());
const sessionColl = sessionDB[mongosColl.getName()];
session.startTransaction({readConcern: {level: "majority"}});
// Open a change stream on the test collection. We will capture events in 'changeList'.
const changeStreamCursor = mongosColl.watch();
const changeList = [];
// Insert four documents on each shard under the transaction.
assert.commandWorked(
sessionColl.insert([{shard: 0, _id: "txn1-doc-0"}, {shard: 1, _id: "txn1-doc-1"}]));
assert.commandWorked(
sessionColl.insert([{shard: 0, _id: "txn1-doc-2"}, {shard: 1, _id: "txn1-doc-3"}]));
assert.commandWorked(
sessionColl.insert([{shard: 0, _id: "txn1-doc-4"}, {shard: 1, _id: "txn1-doc-5"}]));
assert.commandWorked(
sessionColl.insert([{shard: 0, _id: "txn1-doc-6"}, {shard: 1, _id: "txn1-doc-7"}]));
// Commit the transaction.
assert.commandWorked(session.commitTransaction_forTesting());
// Read the stream of events, capture them in 'changeList', and confirm that all events occurred
// at or later than the clusterTime of the first event. Unfortunately, we cannot guarantee that
// all events occurred at the same clusterTime on both shards, even in the case where all events
// occur within a single transaction. We expect, however, that this will be true in the vast
// majority of test runs, and so there is value in retaining this test.
for (let i = 0; i < 8; ++i) {
assert.soon(() => changeStreamCursor.hasNext());
changeList.push(changeStreamCursor.next());
}
const clusterTime = changeList[0].clusterTime;
for (let event of changeList) {
assert.gte(event.clusterTime, clusterTime);
}
// Test that resuming from each event returns the expected set of subsequent documents.
for (let i = 0; i < changeList.length; ++i) {
const resumeCursor = mongosColl.watch([], {startAfter: changeList[i]._id});
// Confirm that the first event in the resumed stream matches the next event recorded in
// 'changeList' from the original stream. The order of the events should be stable across
// resumes from any point.
for (let x = (i + 1); x < changeList.length; ++x) {
const expectedChangeDoc = changeList[x];
assertWriteVisible({
cursor: resumeCursor,
opType: expectedChangeDoc.operationType,
docKey: expectedChangeDoc.documentKey
});
}
assert(!resumeCursor.hasNext(), () => `Unexpected event: ${tojson(resumeCursor.next())}`);
resumeCursor.close();
}
st.stop();
})();
|