summaryrefslogtreecommitdiff
path: root/jstests/change_streams/apply_ops.js
blob: e8a1308833786f58114392c94773e4f3b29bce65 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
// Tests that a change stream will correctly unwind applyOps entries generated by a transaction.
// @tags: [uses_transactions, requires_snapshot_read, requires_majority_read_concern]

(function() {
"use strict";

load("jstests/libs/auto_retry_transaction_in_sharding.js");  // For withTxnAndAutoRetryOnMongos.
load("jstests/libs/change_stream_util.js");                  // For ChangeStreamTest.
load("jstests/libs/collection_drop_recreate.js");            // For assert[Drop|Create]Collection.
load("jstests/libs/fixture_helpers.js");                     // For FixtureHelpers.isMongos.

const otherCollName = "change_stream_apply_ops_2";
const coll = assertDropAndRecreateCollection(db, "change_stream_apply_ops");
assertDropAndRecreateCollection(db, otherCollName);

const otherDbName = "change_stream_apply_ops_db";
const otherDbCollName = "someColl";
assertDropAndRecreateCollection(db.getSiblingDB(otherDbName), otherDbCollName);

// Insert a document that gets deleted as part of the transaction.
const kDeletedDocumentId = 0;
const insertRes = assert.commandWorked(coll.runCommand("insert", {
    documents: [{_id: kDeletedDocumentId, a: "I was here before the transaction"}],
    writeConcern: {w: "majority"}
}));

// Record the clusterTime of the insert, and increment it to give the test start time.
const testStartTime = insertRes.$clusterTime.clusterTime;
testStartTime.i++;

let cst = new ChangeStreamTest(db);
let changeStream = cst.startWatchingChanges({
    pipeline: [{$changeStream: {}}, {$project: {"lsid.uid": 0}}],
    collection: coll,
    doNotModifyInPassthroughs:
        true  // A collection drop only invalidates single-collection change streams.
});

const sessionOptions = {
    causalConsistency: false
};
const txnOptions = {
    readConcern: {level: "snapshot"},
    writeConcern: {w: "majority"}
};

const session = db.getMongo().startSession(sessionOptions);

// Create these variables before starting the transaction. In sharded passthroughs, accessing
// db[collname] may attempt to implicitly shard the collection, which is not allowed in a txn.
const sessionDb = session.getDatabase(db.getName());
const sessionColl = sessionDb[coll.getName()];
const sessionOtherColl = sessionDb[otherCollName];
const sessionOtherDbColl = session.getDatabase(otherDbName)[otherDbCollName];

withTxnAndAutoRetryOnMongos(session, () => {
    // Two inserts on the main test collection.
    assert.commandWorked(sessionColl.insert({_id: 1, a: 0}));
    assert.commandWorked(sessionColl.insert({_id: 2, a: 0}));

    // One insert on a collection that we're not watching. This should be skipped by the
    // single-collection changestream.
    assert.commandWorked(sessionOtherColl.insert({_id: 111, a: "Doc on other collection"}));

    // One insert on a collection in a different database. This should be skipped by the single
    // collection and single-db changestreams.
    assert.commandWorked(sessionOtherDbColl.insert({_id: 222, a: "Doc on other DB"}));

    assert.commandWorked(sessionColl.updateOne({_id: 1}, {$inc: {a: 1}}));

    assert.commandWorked(sessionColl.deleteOne({_id: kDeletedDocumentId}));
}, txnOptions);

// Do applyOps on the collection that we care about. This is an "external" applyOps, though (not run
// as part of a transaction) so its entries should be skipped in the change stream. This checks that
// applyOps that don't have an 'lsid' and 'txnNumber' field do not get unwound. Skip if running in a
// sharded passthrough, since the applyOps command does not exist on mongoS.
if (!FixtureHelpers.isMongos(db)) {
    assert.commandWorked(db.runCommand({
        applyOps: [
            {op: "i", ns: coll.getFullName(), o: {_id: 3, a: "SHOULD NOT READ THIS"}},
        ]
    }));
}

// Drop the collection. This will trigger an "invalidate" event at the end of the stream.
assert.commandWorked(db.runCommand({drop: coll.getName()}));

// Define the set of changes expected for the single-collection case per the operations above.
const expectedChanges = [
    {
        documentKey: {_id: 1},
        fullDocument: {_id: 1, a: 0},
        ns: {db: db.getName(), coll: coll.getName()},
        operationType: "insert",
        lsid: session.getSessionId(),
        txnNumber: session.getTxnNumber_forTesting(),
    },
    {
        documentKey: {_id: 2},
        fullDocument: {_id: 2, a: 0},
        ns: {db: db.getName(), coll: coll.getName()},
        operationType: "insert",
        lsid: session.getSessionId(),
        txnNumber: session.getTxnNumber_forTesting(),
    },
    {
        documentKey: {_id: 1},
        ns: {db: db.getName(), coll: coll.getName()},
        operationType: "update",
        updateDescription: {removedFields: [], updatedFields: {a: 1}},
        lsid: session.getSessionId(),
        txnNumber: session.getTxnNumber_forTesting(),
    },
    {
        documentKey: {_id: kDeletedDocumentId},
        ns: {db: db.getName(), coll: coll.getName()},
        operationType: "delete",
        lsid: session.getSessionId(),
        txnNumber: session.getTxnNumber_forTesting(),
    },
    {
        operationType: "drop",
        ns: {db: db.getName(), coll: coll.getName()},
    },
];

// If we are running in a sharded passthrough, then this may have been a multi-shard transaction.
// Change streams will interleave the txn events from across the shards in (clusterTime, txnOpIndex)
// order, and so may not reflect the ordering of writes in the test. We thus verify that exactly the
// expected set of events are observed, but we relax the ordering requirements.
function assertNextChangesEqual({cursor, expectedChanges, expectInvalidate}) {
    const assertEqualFunc = FixtureHelpers.isMongos(db) ? cst.assertNextChangesEqualUnordered
                                                        : cst.assertNextChangesEqual;
    return assertEqualFunc(
        {cursor: cursor, expectedChanges: expectedChanges, expectInvalidate: expectInvalidate});
}

//
// Test behavior of single-collection change streams with apply ops.
//

// Verify that the stream returns the expected sequence of changes.
const changes = assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges});

// Single collection change stream should also be invalidated by the drop.
assertNextChangesEqual({
    cursor: changeStream,
    expectedChanges: [{operationType: "invalidate"}],
    expectInvalidate: true
});

//
// Test behavior of whole-db change streams with apply ops.
//

// In a sharded cluster, whole-db-or-cluster streams will see a collection drop from each shard.
for (let i = 1; i < FixtureHelpers.numberOfShardsForCollection(coll); ++i) {
    expectedChanges.push({operationType: "drop", ns: {db: db.getName(), coll: coll.getName()}});
}

// Add an entry for the insert on db.otherColl into expectedChanges.
expectedChanges.splice(2, 0, {
    documentKey: {_id: 111},
    fullDocument: {_id: 111, a: "Doc on other collection"},
    ns: {db: db.getName(), coll: otherCollName},
    operationType: "insert",
    lsid: session.getSessionId(),
    txnNumber: session.getTxnNumber_forTesting(),
});

// Verify that a whole-db stream returns the expected sequence of changes, including the insert
// on the other collection but NOT the changes on the other DB or the manual applyOps.
changeStream = cst.startWatchingChanges({
    pipeline: [{$changeStream: {startAtOperationTime: testStartTime}}, {$project: {"lsid.uid": 0}}],
    collection: 1
});
assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges});

//
// Test behavior of whole-cluster change streams with apply ops.
//

// Add an entry for the insert on otherDb.otherDbColl into expectedChanges.
expectedChanges.splice(3, 0, {
    documentKey: {_id: 222},
    fullDocument: {_id: 222, a: "Doc on other DB"},
    ns: {db: otherDbName, coll: otherDbCollName},
    operationType: "insert",
    lsid: session.getSessionId(),
    txnNumber: session.getTxnNumber_forTesting(),
});

// Verify that a whole-cluster stream returns the expected sequence of changes, including the
// inserts on the other collection and the other database, but NOT the manual applyOps.
cst = new ChangeStreamTest(db.getSiblingDB("admin"));
changeStream = cst.startWatchingChanges({
    pipeline: [
        {$changeStream: {startAtOperationTime: testStartTime, allChangesForCluster: true}},
        {$project: {"lsid.uid": 0}}
    ],
    collection: 1
});
assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges});

cst.cleanUp();
}());