summaryrefslogtreecommitdiff
path: root/jstests/change_streams/ddl_create_index_txn.js
blob: dce99f94038d4e6ac4e35211b8187b43ee63d8f1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
/**
 * Tests that a change stream will correctly unwind createIndexes operations from applyOps when
 * createIndexes is done in a transaction.
 *
 * @tags: [
 *     uses_transactions,
 *     requires_majority_read_concern,
 *     featureFlagChangeStreamsVisibility,
 *     requires_fcv_60,
 *     # In order to run this test with sharding we would have to create a transaction that creates
 *     # the collection, shards it, and then creates the index. however sharding a collection in a
 *     # transaction is not allowed and creating an index in a transaction on a collection that was
 *     # not created in that transaction is also not allowed. so this test only works with unsharded
 *     # collections.
 *     assumes_unsharded_collection
 * ]
 */
(function() {

load("jstests/libs/auto_retry_transaction_in_sharding.js");  // For withTxnAndAutoRetryOnMongos.
load("jstests/libs/change_stream_util.js");                  // For ChangeStreamTest.
load("jstests/libs/fixture_helpers.js");                     // For FixtureHelpers.isMongos.
load('jstests/libs/collection_drop_recreate.js');            // 'assertDropCollection'.

const dbName = "test";
const collName = jsTestName();
const otherCollName = jsTestName() + "_2";
const coll = db[jsTestName()];

const otherDBName = jsTestName() + "_db";
const otherDB = db.getSiblingDB(otherDBName);
const otherDBCollName = "someColl";

const session = db.getMongo().startSession();

const sessionDB = session.getDatabase(db.getName());
const sessionOtherDB = session.getDatabase(otherDBName);
const sessionColl = sessionDB[collName];
const sessionOtherColl = sessionDB[otherCollName];
const sessionOtherDBColl = sessionOtherDB[otherDBCollName];

assertDropCollection(sessionDB, collName);
assertDropCollection(sessionDB, otherCollName);
assertDropCollection(sessionOtherDB, otherDBCollName);

let csOptions = {showExpandedEvents: true};
const pipeline = [{$changeStream: csOptions}, {$project: {"lsid.uid": 0}}];

let cst = new ChangeStreamTest(db);
let changeStream = cst.startWatchingChanges({pipeline, collection: coll});

const testStartTime = changeStream.postBatchResumeToken;
assert.neq(testStartTime, undefined);

const txnOptions = {
    readConcern: {level: "local"},
    writeConcern: {w: "majority"}
};

withTxnAndAutoRetryOnMongos(session, () => {
    assert.commandWorked(sessionColl.createIndex({unused: 1}));
    assert.commandWorked(sessionOtherColl.createIndex({unused: 1}));
    assert.commandWorked(sessionOtherDBColl.createIndex({unused: 1}));
}, txnOptions);

const lsid = session.getSessionId();
const txnNumber = session.getTxnNumber_forTesting();

const expectedChanges = [
    {operationType: "create", ns: {db: dbName, coll: collName}, lsid, txnNumber},
    {
        operationType: "createIndexes",
        ns: {db: dbName, coll: collName},
        "operationDescription": {"indexes": [{"v": 2, "key": {"unused": 1}, "name": "unused_1"}]},
        lsid,
        txnNumber
    }
];

// Test single coll changeStream.
cst.assertNextChangesEqual({cursor: changeStream, expectedChanges});

// Test whole db changeStream.
const otherCollEvents = [
    {operationType: "create", ns: {db: dbName, coll: otherCollName}},
    {
        operationType: "createIndexes",
        ns: {db: dbName, coll: otherCollName},
        "operationDescription": {"indexes": [{"v": 2, "key": {"unused": 1}, "name": "unused_1"}]},
        lsid,
        txnNumber
    }
];
expectedChanges.push(...otherCollEvents);
csOptions.startAfter = testStartTime;
changeStream = cst.startWatchingChanges({pipeline, collection: 1});
cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges});

cst.cleanUp();
cst = new ChangeStreamTest(db.getSiblingDB("admin"));

// Test whole cluster changeStream.
const otherDBEvents = [
    {operationType: "create", ns: {db: otherDBName, coll: otherDBCollName}},
    {
        operationType: "createIndexes",
        ns: {db: otherDBName, coll: otherDBCollName},
        "operationDescription": {"indexes": [{"v": 2, "key": {"unused": 1}, "name": "unused_1"}]},
        lsid,
        txnNumber
    }
];
expectedChanges.push(...otherDBEvents);
csOptions.allChangesForCluster = true;
changeStream = cst.startWatchingChanges({pipeline, collection: 1});
cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges});

cst.cleanUp();
})();