1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
/**
* Tests that a change stream will correctly unwind createIndexes operations from applyOps when
* createIndexes is done in a transaction.
*
* @tags: [
* uses_transactions,
* requires_majority_read_concern,
* featureFlagChangeStreamsVisibility,
* requires_fcv_60,
* # In order to run this test with sharding we would have to create a transaction that creates
* # the collection, shards it, and then creates the index. however sharding a collection in a
* # transaction is not allowed and creating an index in a transaction on a collection that was
* # not created in that transaction is also not allowed. so this test only works with unsharded
* # collections.
* assumes_unsharded_collection
* ]
*/
(function() {
load("jstests/libs/auto_retry_transaction_in_sharding.js"); // For withTxnAndAutoRetryOnMongos.
load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest.
load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.isMongos.
load('jstests/libs/collection_drop_recreate.js'); // 'assertDropCollection'.
const dbName = "test";
const collName = jsTestName();
const otherCollName = jsTestName() + "_2";
const coll = db[jsTestName()];
const otherDBName = jsTestName() + "_db";
const otherDB = db.getSiblingDB(otherDBName);
const otherDBCollName = "someColl";
const session = db.getMongo().startSession();
const sessionDB = session.getDatabase(db.getName());
const sessionOtherDB = session.getDatabase(otherDBName);
const sessionColl = sessionDB[collName];
const sessionOtherColl = sessionDB[otherCollName];
const sessionOtherDBColl = sessionOtherDB[otherDBCollName];
assertDropCollection(sessionDB, collName);
assertDropCollection(sessionDB, otherCollName);
assertDropCollection(sessionOtherDB, otherDBCollName);
let csOptions = {showExpandedEvents: true};
const pipeline = [{$changeStream: csOptions}, {$project: {"lsid.uid": 0}}];
let cst = new ChangeStreamTest(db);
let changeStream = cst.startWatchingChanges({pipeline, collection: coll});
const testStartTime = changeStream.postBatchResumeToken;
assert.neq(testStartTime, undefined);
const txnOptions = {
readConcern: {level: "local"},
writeConcern: {w: "majority"}
};
withTxnAndAutoRetryOnMongos(session, () => {
assert.commandWorked(sessionColl.createIndex({unused: 1}));
assert.commandWorked(sessionOtherColl.createIndex({unused: 1}));
assert.commandWorked(sessionOtherDBColl.createIndex({unused: 1}));
}, txnOptions);
const lsid = session.getSessionId();
const txnNumber = session.getTxnNumber_forTesting();
const expectedChanges = [
{operationType: "create", ns: {db: dbName, coll: collName}, lsid, txnNumber},
{
operationType: "createIndexes",
ns: {db: dbName, coll: collName},
"operationDescription": {"indexes": [{"v": 2, "key": {"unused": 1}, "name": "unused_1"}]},
lsid,
txnNumber
}
];
// Test single coll changeStream.
cst.assertNextChangesEqual({cursor: changeStream, expectedChanges});
// Test whole db changeStream.
const otherCollEvents = [
{operationType: "create", ns: {db: dbName, coll: otherCollName}},
{
operationType: "createIndexes",
ns: {db: dbName, coll: otherCollName},
"operationDescription": {"indexes": [{"v": 2, "key": {"unused": 1}, "name": "unused_1"}]},
lsid,
txnNumber
}
];
expectedChanges.push(...otherCollEvents);
csOptions.startAfter = testStartTime;
changeStream = cst.startWatchingChanges({pipeline, collection: 1});
cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges});
cst.cleanUp();
cst = new ChangeStreamTest(db.getSiblingDB("admin"));
// Test whole cluster changeStream.
const otherDBEvents = [
{operationType: "create", ns: {db: otherDBName, coll: otherDBCollName}},
{
operationType: "createIndexes",
ns: {db: otherDBName, coll: otherDBCollName},
"operationDescription": {"indexes": [{"v": 2, "key": {"unused": 1}, "name": "unused_1"}]},
lsid,
txnNumber
}
];
expectedChanges.push(...otherDBEvents);
csOptions.allChangesForCluster = true;
changeStream = cst.startWatchingChanges({pipeline, collection: 1});
cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges});
cst.cleanUp();
})();
|