summaryrefslogtreecommitdiff
path: root/jstests/change_streams/whole_db_metadata_notifications.js
blob: 1500402bc1c8d50269fdbbde390086f4028c66ca (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
// Tests of metadata notifications for a $changeStream on a whole database.
// Do not run in whole-cluster passthrough since this test assumes that the change stream will be
// invalidated by a database drop.
// @tags: [do_not_run_in_whole_cluster_passthrough]
(function() {
"use strict";

load("jstests/libs/change_stream_util.js");        // For ChangeStreamTest.
load('jstests/replsets/libs/two_phase_drops.js');  // For 'TwoPhaseDropCollectionTest'.
load("jstests/libs/collection_drop_recreate.js");  // For assert[Drop|Create]Collection.
load("jstests/libs/fixture_helpers.js");           // For FixtureHelpers.

const testDB = db.getSiblingDB(jsTestName());
testDB.dropDatabase();
let cst = new ChangeStreamTest(testDB);

// Write a document to the collection and test that the change stream returns it
// and getMore command closes the cursor afterwards.
const collName = "test";
let coll = assertDropAndRecreateCollection(testDB, collName);

let aggCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1});

// Create oplog entries of type insert, update, and delete.
assert.commandWorked(coll.insert({_id: 1}));
assert.commandWorked(coll.update({_id: 1}, {$set: {a: 1}}));
assert.commandWorked(coll.remove({_id: 1}));

// Drop and recreate the collection.
const collAgg = assertDropAndRecreateCollection(testDB, collName);

// We should get 4 oplog entries of type insert, update, delete, and drop.
let change = cst.getOneChange(aggCursor);
assert.eq(change.operationType, "insert", tojson(change));
change = cst.getOneChange(aggCursor);
assert.eq(change.operationType, "update", tojson(change));
change = cst.getOneChange(aggCursor);
assert.eq(change.operationType, "delete", tojson(change));
change = cst.getOneChange(aggCursor);
assert.eq(change.operationType, "drop", tojson(change));

// Get a valid resume token that the next change stream can use.
assert.commandWorked(collAgg.insert({_id: 1}));

change = cst.getOneChange(aggCursor, false);
const resumeToken = change._id;

// For whole-db streams, it is possible to resume at a point before a collection is dropped.
assertDropCollection(testDB, collAgg.getName());
// Wait for two-phase drop to complete, so that the UUID no longer exists.
assert.soon(function() {
    return !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(testDB, collAgg.getName());
});
assert.commandWorked(testDB.runCommand(
    {aggregate: 1, pipeline: [{$changeStream: {resumeAfter: resumeToken}}], cursor: {}}));

// Test that invalidation entries for other databases are filtered out.
const otherDB = testDB.getSiblingDB(jsTestName() + "other");
const otherDBColl = otherDB[collName + "_other"];
assert.commandWorked(otherDBColl.insert({_id: 0}));

// Create collection on the database being watched.
coll = assertDropAndRecreateCollection(testDB, collName);

// Create the $changeStream. We set 'doNotModifyInPassthroughs' so that this test will not be
// upconverted to a cluster-wide stream, which would return an entry for the dropped collection
// in the other database.
aggCursor = cst.startWatchingChanges(
    {pipeline: [{$changeStream: {}}], collection: 1, doNotModifyInPassthroughs: true});

// Drop the collection on the other database, this should *not* invalidate the change stream.
assertDropCollection(otherDB, otherDBColl.getName());

// Insert into the collection in the watched database, and verify the change stream is able to
// pick it up.
assert.commandWorked(coll.insert({_id: 1}));
change = cst.getOneChange(aggCursor);
assert.eq(change.operationType, "insert", tojson(change));
assert.eq(change.documentKey._id, 1);

// Test that renaming a collection generates a 'rename' entry for the 'from' collection. MongoDB
// does not allow renaming of sharded collections, so only perform this test if the collection
// is not sharded.
if (!FixtureHelpers.isSharded(coll)) {
    assertDropAndRecreateCollection(testDB, coll.getName());
    assertDropCollection(testDB, "renamed_coll");
    aggCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1});
    assert.commandWorked(coll.renameCollection("renamed_coll"));
    cst.assertNextChangesEqual({
        cursor: aggCursor,
        expectedChanges: [{
            operationType: "rename",
            ns: {db: testDB.getName(), coll: coll.getName()},
            to: {db: testDB.getName(), coll: "renamed_coll"}
        }]
    });

    // Repeat the test, this time using the 'dropTarget' option with an existing target
    // collection.
    coll = testDB["renamed_coll"];
    assertCreateCollection(testDB, collName);
    assert.commandWorked(testDB[collName].insert({_id: 0}));
    assert.commandWorked(coll.renameCollection(collName, true /* dropTarget */));
    cst.assertNextChangesEqual({
        cursor: aggCursor,
        expectedChanges: [
            {
                operationType: "insert",
                ns: {db: testDB.getName(), coll: collName},
                documentKey: {_id: 0},
                fullDocument: {_id: 0}
            },
            {
                operationType: "rename",
                ns: {db: testDB.getName(), coll: "renamed_coll"},
                to: {db: testDB.getName(), coll: collName}
            }
        ]
    });

    coll = testDB[collName];
    // Test renaming a collection from the database being watched to a different database. Do
    // not run this in the mongos passthrough suites since we cannot guarantee the primary shard
    // of the target database, and renameCollection requires the source and destination to be on
    // the same shard.
    if (!FixtureHelpers.isMongos(testDB)) {
        const otherDB = testDB.getSiblingDB(testDB.getName() + "_rename_target");
        // Create target collection to ensure the database exists.
        const collOtherDB = assertCreateCollection(otherDB, "test");
        assertDropCollection(otherDB, "test");
        assert.commandWorked(testDB.adminCommand(
            {renameCollection: coll.getFullName(), to: collOtherDB.getFullName()}));
        // Rename across databases drops the source collection after the collection is copied
        // over.
        cst.assertNextChangesEqual({
            cursor: aggCursor,
            expectedChanges:
                [{operationType: "drop", ns: {db: testDB.getName(), coll: coll.getName()}}]
        });

        // Test renaming a collection from a different database to the database being watched.
        assert.commandWorked(testDB.adminCommand(
            {renameCollection: collOtherDB.getFullName(), to: coll.getFullName()}));
        // Do not check the 'ns' field since it will contain the namespace of the temp
        // collection created when renaming a collection across databases.
        change = cst.getOneChange(aggCursor);
        assert.eq(change.operationType, "rename");
        assert.eq(change.to, {db: testDB.getName(), coll: coll.getName()});
    }

    // Test the behavior of a change stream watching the target collection of a $out aggregation
    // stage.
    coll.aggregate([{$out: "renamed_coll"}]);
    // Note that $out will first create a temp collection, and then rename the temp collection
    // to the target. Do not explicitly check the 'ns' field.
    const rename = cst.getOneChange(aggCursor);
    assert.eq(rename.operationType, "rename", tojson(rename));
    assert.eq(rename.to, {db: testDB.getName(), coll: "renamed_coll"}, tojson(rename));

    // The change stream should not be invalidated by the rename(s).
    assert.eq(0, cst.getNextBatch(aggCursor).nextBatch.length);
    assert.commandWorked(coll.insert({_id: 2}));
    assert.eq(cst.getOneChange(aggCursor).operationType, "insert");

    // Drop the new collection to avoid an additional 'drop' notification when the database is
    // dropped.
    assertDropCollection(testDB, "renamed_coll");
    cst.assertNextChangesEqual({
        cursor: aggCursor,
        expectedChanges:
            [{operationType: "drop", ns: {db: testDB.getName(), coll: "renamed_coll"}}],
    });
}

// Dropping a collection should return a 'drop' entry.
assertDropCollection(testDB, coll.getName());
cst.assertNextChangesEqual({
    cursor: aggCursor,
    expectedChanges: [{operationType: "drop", ns: {db: testDB.getName(), coll: coll.getName()}}],
});

// Operations on internal "system" collections should be filtered out and not included in the
// change stream.
aggCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1});
// Creating a view will generate an insert entry on the "system.views" collection.
assert.commandWorked(testDB.runCommand({create: "view1", viewOn: coll.getName(), pipeline: []}));
// Drop the "system.views" collection.
assertDropCollection(testDB, "system.views");
// Verify that the change stream does not report the insertion into "system.views", and is
// not invalidated by dropping the system collection. Instead, it correctly reports the next
// write to the test collection.
assert.commandWorked(coll.insert({_id: 0}));
change = cst.getOneChange(aggCursor);
assert.eq(change.operationType, "insert", tojson(change));
assert.eq(change.ns, {db: testDB.getName(), coll: coll.getName()});

// Test that renaming a "system" collection *does* return a notification if the target of
// the rename is a non-system collection.
assert.commandWorked(testDB.runCommand({create: "view1", viewOn: coll.getName(), pipeline: []}));
assert.commandWorked(testDB.system.views.renameCollection("non_system_collection"));
cst.assertNextChangesEqual({
    cursor: aggCursor,
    expectedChanges: [{
        operationType: "rename",
        ns: {db: testDB.getName(), coll: "system.views"},
        to: {db: testDB.getName(), coll: "non_system_collection"}
    }],
});

// Test that renaming a "system" collection to a different "system" collection does not
// result in a notification in the change stream.
aggCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1});
assert.commandWorked(testDB.runCommand({create: "view1", viewOn: coll.getName(), pipeline: []}));
// Note that the target of the rename must be a valid "system" collection.
assert.commandWorked(testDB.system.views.renameCollection("system.users"));
// Verify that the change stream filters out the rename above, instead returning the next insert
// to the test collection.
assert.commandWorked(coll.insert({_id: 1}));
change = cst.getOneChange(aggCursor);
assert.eq(change.operationType, "insert", tojson(change));
assert.eq(change.ns, {db: testDB.getName(), coll: coll.getName()});

// Test that renaming a user collection to a "system" collection *is* returned in the change
// stream.
assert.commandWorked(coll.renameCollection("system.views"));
cst.assertNextChangesEqual({
    cursor: aggCursor,
    expectedChanges: [{
        operationType: "rename",
        ns: {db: testDB.getName(), coll: coll.getName()},
        to: {db: testDB.getName(), coll: "system.views"}
    }],
});

// Drop the "system.views" collection to avoid view catalog errors in subsequent tests.
assertDropCollection(testDB, "system.views");
assertDropCollection(testDB, "non_system_collection");
cst.assertNextChangesEqual({
    cursor: aggCursor,
    expectedChanges: [
        {operationType: "drop", ns: {db: testDB.getName(), coll: "non_system_collection"}},
    ]
});

// Dropping the database should generate a 'dropDatabase' notification followed by an
// 'invalidate'.
assert.commandWorked(testDB.dropDatabase());
cst.assertDatabaseDrop({cursor: aggCursor, db: testDB});
cst.assertNextChangesEqual({cursor: aggCursor, expectedChanges: [{operationType: "invalidate"}]});

cst.cleanUp();
}());