summaryrefslogtreecommitdiff
path: root/jstests/change_streams/whole_cluster_metadata_notifications.js
blob: e960affc2ef8ccd5e952108dd8184469c5505afc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
// Tests of metadata notifications for a $changeStream on a whole cluster.
// When run in sharded+transaction passthrough suites, it is possible that the two unsharded
// collections will live on different shards. Majority read concern cannot be off with multi-shard
// transactions, which is why this test needs the tag below.
// @tags: [requires_majority_read_concern]
(function() {
"use strict";

load("jstests/libs/change_stream_util.js");        // For ChangeStreamTest.
load('jstests/replsets/libs/two_phase_drops.js');  // For 'TwoPhaseDropCollectionTest'.
load("jstests/libs/collection_drop_recreate.js");  // For assert[Drop|Create]Collection.
load("jstests/libs/fixture_helpers.js");           // For FixtureHelpers.

// Define two databases. We will conduct our tests by creating one collection in each.
const testDB1 = db.getSiblingDB(jsTestName()), testDB2 = db.getSiblingDB(jsTestName() + "_other");
const adminDB = db.getSiblingDB("admin");

assert.commandWorked(testDB1.dropDatabase());
assert.commandWorked(testDB2.dropDatabase());

// Create one collection on each database.
let [db1Coll, db2Coll] =
    [testDB1, testDB2].map((testDB) => assertDropAndRecreateCollection(testDB, "test"));

// Create a ChangeStreamTest on the 'admin' db. Cluster-wide change streams can only be opened
// on admin.
let cst = new ChangeStreamTest(adminDB);
let aggCursor = cst.startWatchingAllChangesForCluster();

// Generate oplog entries of type insert, update, and delete across both databases.
for (let coll of [db1Coll, db2Coll]) {
    assert.commandWorked(coll.insert({_id: 1}));
    assert.commandWorked(coll.update({_id: 1}, {$set: {a: 1}}));
    assert.commandWorked(coll.remove({_id: 1}));
}

// Drop the second database, which should generate a 'drop' entry for the collection followed
// by a 'dropDatabase' entry.
assert.commandWorked(testDB2.dropDatabase());

const changes = {
    [testDB1.getName()]: [],
    [testDB2.getName()]: []
};

for (let i = 0; i < 6; i++) {
    const change = cst.getOneChange(aggCursor);
    changes[change.ns.db].push(change);
}

// We should get 6 oplog entries; three ops of type insert, update, delete from each database.
for (let expectedDB of [testDB1, testDB2]) {
    const dbChanges = changes[expectedDB.getName()];
    assert.eq(dbChanges[0].operationType, "insert", tojson(changes));
    assert.eq(dbChanges[1].operationType, "update", tojson(changes));
    assert.eq(dbChanges[2].operationType, "delete", tojson(changes));
}
cst.assertDatabaseDrop({cursor: aggCursor, db: testDB2});

// Test that a cluster-wide change stream can be resumed using a token from a collection which
// has been dropped.
db1Coll = assertDropAndRecreateCollection(testDB1, db1Coll.getName());

// Get a valid resume token that the next change stream can use.
aggCursor = cst.startWatchingAllChangesForCluster();

assert.commandWorked(db1Coll.insert({_id: 1}, {writeConcern: {w: "majority"}}));

let change = cst.getOneChange(aggCursor, false);
const resumeToken = change._id;

// For cluster-wide streams, it is possible to resume at a point before a collection is dropped,
// even if the "drop" notification has not been received on the original stream yet.
assertDropCollection(db1Coll, db1Coll.getName());
// Wait for two-phase drop to complete, so that the UUID no longer exists.
assert.soon(function() {
    return !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(testDB1,
                                                                         db1Coll.getName());
});
assert.commandWorked(adminDB.runCommand({
    aggregate: 1,
    pipeline: [{$changeStream: {resumeAfter: resumeToken, allChangesForCluster: true}}],
    cursor: {}
}));

// Test that collection drops from any database result in "drop" notifications for the stream.
[db1Coll, db2Coll] =
    [testDB1, testDB2].map((testDB) => assertDropAndRecreateCollection(testDB, "test"));
let _idForTest = 0;
for (let collToInvalidate of [db1Coll, db2Coll]) {
    // Start watching all changes in the cluster.
    aggCursor = cst.startWatchingAllChangesForCluster();

    let testDB = collToInvalidate.getDB();

    // Insert into the collections on both databases, and verify the change stream is able to
    // pick them up.
    for (let collToWrite of [db1Coll, db2Coll]) {
        assert.commandWorked(collToWrite.insert({_id: _idForTest}));
        change = cst.getOneChange(aggCursor);
        assert.eq(change.operationType, "insert", tojson(change));
        assert.eq(change.documentKey._id, _idForTest);
        assert.eq(change.ns.db, collToWrite.getDB().getName());
        _idForTest++;
    }

    // Renaming the collection should generate a 'rename' notification. Skip this test when
    // running on a sharded collection, since these cannot be renamed.
    if (!FixtureHelpers.isSharded(collToInvalidate)) {
        assertDropAndRecreateCollection(testDB, collToInvalidate.getName());
        const collName = collToInvalidate.getName();

        // Start watching all changes in the cluster.
        aggCursor = cst.startWatchingAllChangesForCluster();
        assert.commandWorked(collToInvalidate.renameCollection("renamed_coll"));
        cst.assertNextChangesEqual({
            cursor: aggCursor,
            expectedChanges: [
                {
                    operationType: "rename",
                    ns: {db: testDB.getName(), coll: collToInvalidate.getName()},
                    to: {db: testDB.getName(), coll: "renamed_coll"}
                },
            ]
        });

        // Repeat the test, this time using the 'dropTarget' option with an existing target
        // collection.
        collToInvalidate = testDB.getCollection("renamed_coll");
        assertDropAndRecreateCollection(testDB, collName);
        assert.commandWorked(testDB[collName].insert({_id: 0}));
        assert.commandWorked(collToInvalidate.renameCollection(collName, true /* dropTarget */));
        cst.assertNextChangesEqual({
            cursor: aggCursor,
            expectedChanges: [
                {
                    operationType: "insert",
                    ns: {db: testDB.getName(), coll: collName},
                    documentKey: {_id: 0},
                    fullDocument: {_id: 0}
                },
                {
                    operationType: "rename",
                    ns: {db: testDB.getName(), coll: "renamed_coll"},
                    to: {db: testDB.getName(), coll: collName}
                }
            ]
        });

        collToInvalidate = testDB[collName];

        // Test renaming a collection to a different database. Do not run this in the mongos
        // passthrough suites since we cannot guarantee the primary shard of the target database
        // and renameCollection requires the source and destination to be on the same shard.
        if (!FixtureHelpers.isMongos(testDB)) {
            const otherDB = testDB.getSiblingDB(testDB.getName() + "_rename_target");
            // Ensure the target database exists.
            const collOtherDB = assertDropAndRecreateCollection(otherDB, "test");
            assertDropCollection(otherDB, collOtherDB.getName());
            aggCursor = cst.startWatchingAllChangesForCluster();
            assert.commandWorked(testDB.adminCommand(
                {renameCollection: collToInvalidate.getFullName(), to: collOtherDB.getFullName()}));
            // Do not check the 'ns' field since it will contain the namespace of the temp
            // collection created when renaming a collection across databases.
            change = cst.getOneChange(aggCursor);
            assert.eq(change.operationType, "rename", tojson(change));
            assert.eq(
                change.to, {db: otherDB.getName(), coll: collOtherDB.getName()}, tojson(change));
            // Rename across databases also drops the source collection after the collection is
            // copied over.
            cst.assertNextChangesEqual({
                cursor: aggCursor,
                expectedChanges: [{
                    operationType: "drop",
                    ns: {db: testDB.getName(), coll: collToInvalidate.getName()}
                }]
            });
        }

        // Test the behavior of a change stream watching the target collection of a $out
        // aggregation stage.
        collToInvalidate.aggregate([{$out: "renamed_coll"}]);
        // Do not check the 'ns' field since it will contain the namespace of the temp
        // collection created by the $out stage, before renaming to 'renamed_coll'.
        const rename = cst.getOneChange(aggCursor);
        assert.eq(rename.operationType, "rename", tojson(rename));
        assert.eq(rename.to, {db: testDB.getName(), coll: "renamed_coll"}, tojson(rename));

        // The change stream should not be invalidated by the rename(s).
        assert.eq(0, cst.getNextBatch(aggCursor).nextBatch.length);
        assert.commandWorked(collToInvalidate.insert({_id: 2}));
        assert.eq(cst.getOneChange(aggCursor).operationType, "insert");

        // Drop the "system.views" collection to avoid view catalog errors in subsequent tests.
        assertDropCollection(testDB, "system.views");

        // Recreate the test collection for the remainder of the test.
        assert.commandWorked(collToInvalidate.insert({_id: 0}));
        cst.assertNextChangesEqual({
            cursor: aggCursor,
            expectedChanges: [{
                operationType: "insert",
                ns: {db: testDB.getName(), coll: collToInvalidate.getName()},
                documentKey: {_id: 0},
                fullDocument: {_id: 0}
            }]
        });
    }

    // Dropping a collection should generate a 'drop' entry.
    assertDropCollection(testDB, collToInvalidate.getName());
    // Insert to the test collection to queue up another change after the drop. This is needed
    // since the number of 'drop' notifications is not deterministic in the sharded passthrough
    // suites.
    assert.commandWorked(collToInvalidate.insert({_id: 0}));
    cst.consumeDropUpTo({
        cursor: aggCursor,
        dropType: "drop",
        expectedNext: {
            documentKey: {_id: 0},
            fullDocument: {_id: 0},
            ns: {db: testDB.getName(), coll: collToInvalidate.getName()},
            operationType: "insert",
        },
    });

    // Operations on internal "system" collections should be filtered out and not included in
    // the change stream.
    aggCursor = cst.startWatchingAllChangesForCluster();
    // Creating a view will generate an insert entry on the "system.views" collection.
    assert.commandWorked(
        testDB.runCommand({create: "view1", viewOn: collToInvalidate.getName(), pipeline: []}));
    // Drop the "system.views" collection.
    assertDropCollection(testDB, "system.views");
    // Verify that the change stream does not report the insertion into "system.views", and is
    // not invalidated by dropping the system collection. Instead, it correctly reports the next
    // write to the test collection.
    assert.commandWorked(collToInvalidate.insert({_id: 1}));
    change = cst.getOneChange(aggCursor);
    assert.eq(change.operationType, "insert", tojson(change));
    assert.eq(change.ns, {db: testDB.getName(), coll: collToInvalidate.getName()});
}

cst.cleanUp();
}());