summaryrefslogtreecommitdiff
path: root/jstests/change_streams
diff options
context:
space:
mode:
authorNick Zolnierz <nicholas.zolnierz@mongodb.com>2018-04-10 10:04:41 -0400
committerNick Zolnierz <nicholas.zolnierz@mongodb.com>2018-05-29 09:51:51 -0400
commita76082905d63ac8aaaae25e5c76812e6edf9bc07 (patch)
tree5882b480ff3b7378a30f27106cd8fad6e8271407 /jstests/change_streams
parent8150b50f14579b6cbd673f12968726670f6e1b78 (diff)
downloadmongo-a76082905d63ac8aaaae25e5c76812e6edf9bc07.tar.gz
SERVER-32088: ChangeStream resumeAfter does not work on sharded collections if not all shards have chunks for the collection
Diffstat (limited to 'jstests/change_streams')
-rw-r--r--jstests/change_streams/change_stream_collation.js112
-rw-r--r--jstests/change_streams/change_stream_invalidation.js113
-rw-r--r--jstests/change_streams/change_stream_rename_resumability.js7
-rw-r--r--jstests/change_streams/lookup_post_image.js24
4 files changed, 211 insertions, 45 deletions
diff --git a/jstests/change_streams/change_stream_collation.js b/jstests/change_streams/change_stream_collation.js
index ceb1eb463dc..4375da16843 100644
--- a/jstests/change_streams/change_stream_collation.js
+++ b/jstests/change_streams/change_stream_collation.js
@@ -5,7 +5,8 @@
"use strict";
load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
- load("jstests/libs/change_stream_util.js"); // For 'ChangeStreamTest'.
+ load("jstests/libs/change_stream_util.js"); // For 'ChangeStreamTest' and
+ // 'runCommandChangeStreamPassthroughAware'.
load("jstests/libs/fixture_helpers.js"); // For 'isMongos'.
if (FixtureHelpers.isMongos(db)) {
@@ -230,6 +231,115 @@
assert.docEq(cursor.next(), {docId: 1});
assert(!cursor.hasNext());
}());
+
+ // Test that resuming a change stream on a collection that has been dropped requires the
+ // user to explicitly specify the collation. This is testing that we cannot resume if we
+ // need to retrieve the collection metadata.
+ (function() {
+ const collName = "change_stream_case_insensitive";
+ let caseInsensitiveCollection =
+ assertDropAndRecreateCollection(db, collName, {collation: caseInsensitive});
+
+ let changeStream = caseInsensitiveCollection.watch(
+ [{$match: {"fullDocument.text": "abc"}}], {collation: caseInsensitive});
+
+ assert.writeOK(caseInsensitiveCollection.insert({_id: 0, text: "abc"}));
+
+ assert.soon(() => changeStream.hasNext());
+ const next = changeStream.next();
+ assert.docEq(next.documentKey, {_id: 0});
+ const resumeToken = next._id;
+
+ // Insert a second document to see after resuming.
+ assert.writeOK(caseInsensitiveCollection.insert({_id: "dropped_coll", text: "ABC"}));
+
+ // Drop the collection to invalidate the stream.
+ assertDropCollection(db, collName);
+
+ // Test that a $changeStream is allowed to resume on the dropped collection if an
+ // explicit collation is provided, even if it doesn't match the original collection
+ // default collation.
+ changeStream = caseInsensitiveCollection.watch(
+ [{$match: {"fullDocument.text": "ABC"}}],
+ {resumeAfter: resumeToken, collation: {locale: "simple"}});
+
+ assert.soon(() => changeStream.hasNext());
+ assert.docEq(changeStream.next().documentKey, {_id: "dropped_coll"});
+
+ // Test that a pipeline without an explicit collation is not allowed to resume the
+ // change stream after the collection has been dropped. Do not modify this command in
+ // the passthrough suite(s) since whole-db and whole-cluster change streams are allowed
+ // to resume without an explicit collation.
+ assert.commandFailedWithCode(
+ runCommandChangeStreamPassthroughAware(
+ db,
+ {
+ aggregate: collName,
+ pipeline: [{$changeStream: {resumeAfter: resumeToken}}],
+ cursor: {},
+ },
+ true), // doNotModifyInPassthroughs
+ ErrorCodes.InvalidResumeToken);
+ }());
+
+ // Test that the default collation of a new version of the collection is not applied when
+ // resuming a change stream from before a collection drop.
+ (function() {
+ const collName = "change_stream_case_insensitive";
+ let caseInsensitiveCollection =
+ assertDropAndRecreateCollection(db, collName, {collation: caseInsensitive});
+
+ let changeStream = caseInsensitiveCollection.watch(
+ [{$match: {"fullDocument.text": "abc"}}], {collation: caseInsensitive});
+
+ assert.writeOK(caseInsensitiveCollection.insert({_id: 0, text: "abc"}));
+
+ assert.soon(() => changeStream.hasNext());
+ const next = changeStream.next();
+ assert.docEq(next.documentKey, {_id: 0});
+ const resumeToken = next._id;
+
+ // Insert a second document to see after resuming.
+ assert.writeOK(caseInsensitiveCollection.insert({_id: "dropped_coll", text: "ABC"}));
+
+ // Recreate the collection with a different collation.
+ caseInsensitiveCollection = assertDropAndRecreateCollection(
+ db, caseInsensitiveCollection.getName(), {collation: {locale: "simple"}});
+ assert.writeOK(caseInsensitiveCollection.insert({_id: "new collection", text: "abc"}));
+
+ // Verify that the stream sees the insert before the drop and then is exhausted. We
+ // won't see the invalidate because the pipeline has a $match stage after the
+ // $changeStream.
+ assert.soon(() => changeStream.hasNext());
+ assert.docEq(changeStream.next().fullDocument, {_id: "dropped_coll", text: "ABC"});
+ assert(changeStream.isExhausted());
+
+ // Test that a pipeline with an explicit collation is allowed to resume from before the
+ // collection is dropped and recreated.
+ changeStream = caseInsensitiveCollection.watch(
+ [{$match: {"fullDocument.text": "ABC"}}],
+ {resumeAfter: resumeToken, collation: {locale: "fr"}});
+
+ assert.soon(() => changeStream.hasNext());
+ assert.docEq(changeStream.next().documentKey, {_id: "dropped_coll"});
+ assert(changeStream.isExhausted());
+
+ // Test that a pipeline without an explicit collation is not allowed to resume,
+ // even though the collection has been recreated with the same default collation as it
+ // had previously. Do not modify this command in the passthrough suite(s) since whole-db
+ // and whole-cluster change streams are allowed to resume without an explicit collation.
+ assert.commandFailedWithCode(
+ runCommandChangeStreamPassthroughAware(
+ db,
+ {
+ aggregate: collName,
+ pipeline: [{$changeStream: {resumeAfter: resumeToken}}],
+ cursor: {}
+ },
+ true), // doNotModifyInPassthroughs
+ ErrorCodes.InvalidResumeToken);
+ }());
+
} finally {
if (FixtureHelpers.isMongos(db)) {
// TODO: SERVER-33944 Change streams on sharded collection with non-simple default
diff --git a/jstests/change_streams/change_stream_invalidation.js b/jstests/change_streams/change_stream_invalidation.js
index c525ece3f16..e955af3af87 100644
--- a/jstests/change_streams/change_stream_invalidation.js
+++ b/jstests/change_streams/change_stream_invalidation.js
@@ -3,26 +3,18 @@
(function() {
"use strict";
- load("jstests/libs/change_stream_util.js");
- load('jstests/libs/uuid_util.js');
load('jstests/replsets/libs/two_phase_drops.js'); // For 'TwoPhaseDropCollectionTest'.
load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
- let cst = new ChangeStreamTest(db);
-
db.getMongo().forceReadMode('commands');
// Write a document to the collection and test that the change stream returns it
// and getMore command closes the cursor afterwards.
const collGetMore = assertDropAndRecreateCollection(db, "change_stream_getmore_invalidations");
// We awaited the replication of the first write, so the change stream shouldn't return it.
- // Use { w: "majority" } to deal with journaling correctly, even though we only have one node.
- assert.writeOK(collGetMore.insert({_id: 0, a: 1}, {writeConcern: {w: "majority"}}));
-
- let aggcursor =
- cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: collGetMore});
+ assert.writeOK(collGetMore.insert({_id: 0, a: 1}));
- const collGetMoreUuid = getUUIDFromListCollections(db, collGetMore.getName());
+ let changeStream = collGetMore.watch();
// Drop the collection and test that we return "invalidate" entry and close the cursor. However,
// we return all oplog entries preceding the drop.
@@ -33,45 +25,61 @@
assert.writeOK(collGetMore.remove({_id: 1}));
// Drop the collection.
assert.commandWorked(db.runCommand({drop: collGetMore.getName()}));
+
// We should get 4 oplog entries of type insert, update, delete, and invalidate. The cursor
// should be closed.
- let change = cst.getOneChange(aggcursor);
- assert.eq(change.operationType, "insert", tojson(change));
- change = cst.getOneChange(aggcursor);
- assert.eq(change.operationType, "update", tojson(change));
- change = cst.getOneChange(aggcursor);
- assert.eq(change.operationType, "delete", tojson(change));
- cst.assertNextChangesEqual({
- cursor: aggcursor,
- expectedChanges: [{operationType: "invalidate"}],
- expectInvalidate: true
- });
+ assert.soon(() => changeStream.hasNext());
+ assert.eq(changeStream.next().operationType, "insert");
+ assert.soon(() => changeStream.hasNext());
+ assert.eq(changeStream.next().operationType, "update");
+ assert.soon(() => changeStream.hasNext());
+ assert.eq(changeStream.next().operationType, "delete");
+ assert.soon(() => changeStream.hasNext());
+ assert.eq(changeStream.next().operationType, "invalidate");
+ assert(changeStream.isExhausted());
jsTestLog("Testing aggregate command closes cursor for invalidate entries");
const collAgg = assertDropAndRecreateCollection(db, "change_stream_agg_invalidations");
- const collAggUuid = getUUIDFromListCollections(db, collAgg.getName());
+
// Get a valid resume token that the next aggregate command can use.
- aggcursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: collAgg});
+ changeStream = collAgg.watch();
- assert.writeOK(collAgg.insert({_id: 1}, {writeConcern: {w: "majority"}}));
+ assert.writeOK(collAgg.insert({_id: 1}));
- change = cst.getOneChange(aggcursor, false);
+ assert.soon(() => changeStream.hasNext());
+ let change = changeStream.next();
+ assert.eq(change.operationType, "insert");
+ assert.eq(change.documentKey, {_id: 1});
const resumeToken = change._id;
- // It should not possible to resume a change stream after a collection drop, even if the
- // invalidate has not been received.
+ // Insert another document after storing the resume token.
+ assert.writeOK(collAgg.insert({_id: 2}));
+
+ assert.soon(() => changeStream.hasNext());
+ change = changeStream.next();
+ assert.eq(change.operationType, "insert");
+ assert.eq(change.documentKey, {_id: 2});
+
+ // Drop the collection and invalidate the change stream.
assertDropCollection(db, collAgg.getName());
// Wait for two-phase drop to complete, so that the UUID no longer exists.
assert.soon(function() {
return !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(db, collAgg.getName());
});
- ChangeStreamTest.assertChangeStreamThrowsCode({
- db: db,
- collName: collAgg.getName(),
- pipeline: [{$changeStream: {resumeAfter: resumeToken}}],
- expectedCode: 40615,
- doNotModifyInPassthroughs: true
- });
+
+ // Resume the change stream after the collection drop, up to and including the invalidate. This
+ // is allowed if an explicit collation is provided.
+ changeStream = collAgg.watch([], {resumeAfter: resumeToken, collation: {locale: "simple"}});
+
+ assert.soon(() => changeStream.hasNext());
+ change = changeStream.next();
+ assert.eq(change.operationType, "insert");
+ assert.eq(change.documentKey, {_id: 2});
+
+ assert.soon(() => changeStream.hasNext());
+ change = changeStream.next();
+ assert.eq(change.operationType, "invalidate");
+ assert(changeStream.isExhausted());
// Test that it is possible to open a new change stream cursor on a collection that does not
// exist.
@@ -80,20 +88,41 @@
assertDropCollection(db, collDoesNotExistName);
// Cursor creation succeeds, but there are no results.
- aggcursor = cst.startWatchingChanges(
- {collection: collDoesNotExistName, pipeline: [{$changeStream: {}}]});
+ const cursorObj = assert
+ .commandWorked(db.runCommand({
+ aggregate: collDoesNotExistName,
+ pipeline: [{$changeStream: {}}],
+ cursor: {batchSize: 1},
+ }))
+ .cursor;
// We explicitly test getMore, to ensure that the getMore command for a non-existent collection
// does not return an error.
- aggcursor = cst.getNextBatch(aggcursor);
- assert.neq(aggcursor.id, 0);
- assert.eq(aggcursor.nextBatch.length, 0, tojson(aggcursor.nextBatch));
+ let getMoreResult =
+ assert
+ .commandWorked(db.runCommand(
+ {getMore: cursorObj.id, collection: collDoesNotExistName, batchSize: 1}))
+ .cursor;
+ assert.neq(getMoreResult.id, 0);
+ assert.eq(getMoreResult.nextBatch.length, 0, tojson(getMoreResult.nextBatch));
// After collection creation, we see oplog entries for the collection.
const collNowExists = assertCreateCollection(db, collDoesNotExistName);
assert.writeOK(collNowExists.insert({_id: 0}));
- change = cst.getOneChange(aggcursor);
- assert.eq(change.operationType, "insert", tojson(change));
- cst.cleanUp();
+ assert.soon(function() {
+ getMoreResult =
+ assert
+ .commandWorked(db.runCommand(
+ {getMore: cursorObj.id, collection: collDoesNotExistName, batchSize: 1}))
+ .cursor;
+ assert.neq(getMoreResult.id, 0);
+ return getMoreResult.nextBatch.length > 0;
+ }, "Timed out waiting for another result from getMore on non-existent collection.");
+ assert.eq(getMoreResult.nextBatch.length, 1);
+ assert.eq(getMoreResult.nextBatch[0].operationType, "insert");
+ assert.eq(getMoreResult.nextBatch[0].documentKey, {_id: 0});
+
+ assert.commandWorked(
+ db.runCommand({killCursors: collDoesNotExistName, cursors: [getMoreResult.id]}));
}());
diff --git a/jstests/change_streams/change_stream_rename_resumability.js b/jstests/change_streams/change_stream_rename_resumability.js
index 5d7e8d3c153..46a3b3575b0 100644
--- a/jstests/change_streams/change_stream_rename_resumability.js
+++ b/jstests/change_streams/change_stream_rename_resumability.js
@@ -5,7 +5,7 @@
load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
- const coll = assertDropAndRecreateCollection(db, "change_stream_invalidate_resumability");
+ let coll = assertDropAndRecreateCollection(db, "change_stream_invalidate_resumability");
// Drop the collection we'll rename to _before_ starting the changeStream, so that we don't
// get accidentally an invalidate when running on the whole DB or cluster.
@@ -19,8 +19,11 @@
assert.commandWorked(coll.renameCollection(coll.getName() + "_renamed"));
+ // Update 'coll' to point to the renamed collection.
+ coll = db[coll.getName() + "_renamed"];
+
// Insert another document after the rename.
- assert.commandWorked(coll.insert({_id: 2}));
+ assert.writeOK(coll.insert({_id: 2}));
// We should get 2 oplog entries of type insert and invalidate.
assert.soon(() => cursor.hasNext());
diff --git a/jstests/change_streams/lookup_post_image.js b/jstests/change_streams/lookup_post_image.js
index 61e51f989d2..ec7f47d57f6 100644
--- a/jstests/change_streams/lookup_post_image.js
+++ b/jstests/change_streams/lookup_post_image.js
@@ -169,6 +169,30 @@
assert(latestChange.hasOwnProperty("fullDocument"));
assert.eq(latestChange.fullDocument, null);
+ // Test that we can resume a change stream with 'fullDocument: updateLookup' after the
+ // collection has been dropped. This is only allowed if an explicit collation is provided.
+ cursor = cst.startWatchingChanges({
+ collection: coll,
+ pipeline: [
+ {$changeStream: {resumeAfter: deleteDocResumePoint, fullDocument: "updateLookup"}},
+ {$match: {operationType: {$ne: "delete"}}}
+ ],
+ aggregateOptions: {cursor: {batchSize: 0}, collation: {locale: "simple"}}
+ });
+
+ // Check the next $changeStream entry; this is the test document inserted above.
+ latestChange = cst.getOneChange(cursor);
+ assert.eq(latestChange.operationType, "insert");
+ assert(latestChange.hasOwnProperty("fullDocument"));
+ assert.eq(latestChange.fullDocument, {_id: "fullDocument is lookup 2"});
+
+ // The next entry is the 'update' operation. Because the collection has been dropped, our
+ // attempt to look up the post-image results in a null document.
+ latestChange = cst.getOneChange(cursor);
+ assert.eq(latestChange.operationType, "update");
+ assert(latestChange.hasOwnProperty("fullDocument"));
+ assert.eq(latestChange.fullDocument, null);
+
// Test that looking up the post image of an update after the collection has been dropped
// and created again will result in 'fullDocument' with a value of null. This must be done
// using getMore because new cursors cannot be established after a collection drop.