summaryrefslogtreecommitdiff
path: root/jstests/change_streams
diff options
context:
space:
mode:
authorNick Zolnierz <nicholas.zolnierz@mongodb.com>2018-04-03 17:00:41 -0400
committerNick Zolnierz <nicholas.zolnierz@mongodb.com>2018-04-11 15:53:03 -0400
commitfff261ac550155065fce4b7b1529061f18980599 (patch)
tree09ce022d7b8319f1af3c2db2354427ecfe1aa389 /jstests/change_streams
parent0fa7bcb8bea5d4585fdbc1003b5116cd7bf28540 (diff)
downloadmongo-fff261ac550155065fce4b7b1529061f18980599.tar.gz
SERVER-29134: Support change streams on an entire database in a sharded cluster
Diffstat (limited to 'jstests/change_streams')
-rw-r--r--jstests/change_streams/change_stream.js10
-rw-r--r--jstests/change_streams/change_stream_shell_helper.js8
-rw-r--r--jstests/change_streams/change_stream_whole_db.js16
-rw-r--r--jstests/change_streams/change_stream_whole_db_invalidations.js29
-rw-r--r--jstests/change_streams/change_stream_whole_db_resumability.js19
5 files changed, 52 insertions, 30 deletions
diff --git a/jstests/change_streams/change_stream.js b/jstests/change_streams/change_stream.js
index 4401df139e4..e7520d974b2 100644
--- a/jstests/change_streams/change_stream.js
+++ b/jstests/change_streams/change_stream.js
@@ -20,14 +20,14 @@
}
// Test that a change stream cannot be opened on 'system.' collections.
- assertInvalidChangeStreamNss("test", "system.users");
- assertInvalidChangeStreamNss("test", "system.profile");
- assertInvalidChangeStreamNss("test", "system.version");
+ assertInvalidChangeStreamNss(db.getName(), "system.users");
+ assertInvalidChangeStreamNss(db.getName(), "system.profile");
+ assertInvalidChangeStreamNss(db.getName(), "system.version");
// Test that a change stream can be opened on namespaces with 'system' in the name, but not
// considered an internal 'system dot' namespace.
- assertValidChangeStreamNss("test", "systemindexes");
- assertValidChangeStreamNss("test", "system_users");
+ assertValidChangeStreamNss(db.getName(), "systemindexes");
+ assertValidChangeStreamNss(db.getName(), "system_users");
// Similar test but for DB names that are not considered internal.
assert.writeOK(db.getSiblingDB("admincustomDB")["test"].insert({}));
diff --git a/jstests/change_streams/change_stream_shell_helper.js b/jstests/change_streams/change_stream_shell_helper.js
index 49e4d1656bd..62d7ed1522e 100644
--- a/jstests/change_streams/change_stream_shell_helper.js
+++ b/jstests/change_streams/change_stream_shell_helper.js
@@ -67,7 +67,11 @@
jsTestLog("Testing watch() with pipeline");
cursor = coll.watch([{$project: {_id: 0, docId: "$documentKey._id"}}]);
wholeDbCursor = db.watch([{$project: {_id: 0, docId: "$documentKey._id"}}]);
- assert.writeOK(coll.insert({_id: 1, x: 1}));
+
+ // Store the cluster time of the insert as the timestamp to start from.
+ const resumeTime =
+ assert.commandWorked(db.runCommand({insert: coll.getName(), documents: [{_id: 1, x: 1}]}))
+ .$clusterTime.clusterTime;
checkNextChange(cursor, {docId: 1});
checkNextChange(wholeDbCursor, {docId: 1});
@@ -80,8 +84,6 @@
checkNextChange(wholeDbCursor, {docId: 1});
jsTestLog("Testing watch() with pipeline and startAtClusterTime");
- // Store the cluster time of the last insert as the timestamp to start from.
- const resumeTime = db.runCommand({isMaster: 1}).$clusterTime.clusterTime;
cursor = coll.watch([{$project: {_id: 0, docId: "$documentKey._id"}}],
{startAtClusterTime: {ts: resumeTime}});
wholeDbCursor = db.watch([{$project: {_id: 0, docId: "$documentKey._id"}}],
diff --git a/jstests/change_streams/change_stream_whole_db.js b/jstests/change_streams/change_stream_whole_db.js
index b24faeead15..4b2cd749c3e 100644
--- a/jstests/change_streams/change_stream_whole_db.js
+++ b/jstests/change_streams/change_stream_whole_db.js
@@ -5,20 +5,25 @@
load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest and
// assert[Valid|Invalid]ChangeStreamNss.
+ load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.
// Test that a change stream cannot be opened on the "admin", "config", or "local" databases.
// TODO SERVER-34086: $changeStream may run against 'admin' if 'allChangesForCluster' is true.
assertInvalidChangeStreamNss("admin", 1);
assertInvalidChangeStreamNss("config", 1);
- assertInvalidChangeStreamNss("local", 1);
+ if (!FixtureHelpers.isMongos(db)) {
+ assertInvalidChangeStreamNss("local", 1);
+ }
- // Test that a change stream can be opened before a database exists.
- assert.commandWorked(db.dropDatabase());
+ assertDropCollection(db, "t1");
+ assertDropCollection(db, "t2");
+
+ assertCreateCollection(db, "t1");
+ assertCreateCollection(db, "t2");
let cst = new ChangeStreamTest(db);
let cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1});
- assertCreateCollection(db, "t1");
// Test that if there are no changes, we return an empty batch.
assert.eq(0, cursor.firstBatch.length, "Cursor had changes: " + tojson(cursor));
@@ -45,8 +50,7 @@
// Dropping the database should invalidate the change stream.
assert.commandWorked(db.dropDatabase());
- expected = {operationType: "invalidate"};
- cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
+ cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [{operationType: "invalidate"}]});
cst.cleanUp();
}());
diff --git a/jstests/change_streams/change_stream_whole_db_invalidations.js b/jstests/change_streams/change_stream_whole_db_invalidations.js
index a5523b7362c..0c7db3be0dc 100644
--- a/jstests/change_streams/change_stream_whole_db_invalidations.js
+++ b/jstests/change_streams/change_stream_whole_db_invalidations.js
@@ -5,6 +5,7 @@
load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest.
load('jstests/replsets/libs/two_phase_drops.js'); // For 'TwoPhaseDropCollectionTest'.
load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
+ load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.
const testDB = db.getSiblingDB(jsTestName());
let cst = new ChangeStreamTest(testDB);
@@ -79,19 +80,27 @@
assert.eq(change.operationType, "insert", tojson(change));
assert.eq(change.documentKey._id, 1);
+ // Test that renaming a collection will invalidate the change stream. MongoDB does not allow
+ // renaming of sharded collections, so only perform this test if the collection is not sharded.
+ if (!FixtureHelpers.isSharded(coll)) {
+ assertDropCollection(testDB, coll.getName());
+
+ assertCreateCollection(testDB, coll.getName());
+ assertDropCollection(testDB, "renamed_coll");
+ aggCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1});
+ assert.writeOK(coll.renameCollection("renamed_coll"));
+ cst.assertNextChangesEqual({
+ cursor: aggCursor,
+ expectedChanges: [{operationType: "invalidate"}],
+ expectInvalidate: true
+ });
+ }
+
// Dropping a collection should invalidate the change stream.
assertDropCollection(testDB, coll.getName());
- cst.assertNextChangesEqual({
- cursor: aggCursor,
- expectedChanges: [{operationType: "invalidate"}],
- expectInvalidate: true
- });
-
- // Renaming a collection should invalidate the change stream.
- assertCreateCollection(testDB, coll.getName());
- assertDropCollection(testDB, "renamed_coll");
aggCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1});
- assert.writeOK(coll.renameCollection("renamed_coll"));
+ assertCreateCollection(testDB, coll.getName());
+ assertDropCollection(testDB, coll.getName());
cst.assertNextChangesEqual({
cursor: aggCursor,
expectedChanges: [{operationType: "invalidate"}],
diff --git a/jstests/change_streams/change_stream_whole_db_resumability.js b/jstests/change_streams/change_stream_whole_db_resumability.js
index e3b11ab9c60..fa6cd369f75 100644
--- a/jstests/change_streams/change_stream_whole_db_resumability.js
+++ b/jstests/change_streams/change_stream_whole_db_resumability.js
@@ -5,31 +5,38 @@
load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest.
- assertDropAndRecreateCollection(db, "resumeColl");
- const coll = db.resumeColl;
+ const coll = db[jsTestName() + "resume_coll"];
+ const otherColl = db[jsTestName() + "resume_coll_other"];
+ coll.drop();
+ otherColl.drop();
- // Note we do not project away 'id.ts' as it is part of the resume token.
let cst = new ChangeStreamTest(db);
let resumeCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1});
- // Insert a document and save the resulting change stream.
+ // Insert a single document to each collection and save the resume token from the first insert.
assert.writeOK(coll.insert({_id: 1}));
+ assert.writeOK(otherColl.insert({_id: 2}));
const firstInsertChangeDoc = cst.getOneChange(resumeCursor);
assert.docEq(firstInsertChangeDoc.fullDocument, {_id: 1});
+ assert.eq(firstInsertChangeDoc.ns, {db: "test", coll: coll.getName()});
- // Test resume after an insert.
+ // Test resuming the change stream after the first insert should pick up the insert on the
+ // second collection.
resumeCursor = cst.startWatchingChanges({
pipeline: [{$changeStream: {resumeAfter: firstInsertChangeDoc._id}}],
collection: 1,
aggregateOptions: {cursor: {batchSize: 0}},
});
- assert.writeOK(coll.insert({_id: 2}));
const secondInsertChangeDoc = cst.getOneChange(resumeCursor);
assert.docEq(secondInsertChangeDoc.fullDocument, {_id: 2});
+ assert.eq(secondInsertChangeDoc.ns, {db: "test", coll: otherColl.getName()});
+
+ // Insert a third document to the first collection and test that the change stream picks it up.
assert.writeOK(coll.insert({_id: 3}));
const thirdInsertChangeDoc = cst.getOneChange(resumeCursor);
assert.docEq(thirdInsertChangeDoc.fullDocument, {_id: 3});
+ assert.eq(thirdInsertChangeDoc.ns, {db: "test", coll: coll.getName()});
// Test resuming after the first insert again.
resumeCursor = cst.startWatchingChanges({