summaryrefslogtreecommitdiff
path: root/jstests/change_streams/change_stream_invalidation.js
diff options
context:
space:
mode:
authorNick Zolnierz <nicholas.zolnierz@mongodb.com>2018-04-10 10:04:41 -0400
committerNick Zolnierz <nicholas.zolnierz@mongodb.com>2018-05-29 09:51:51 -0400
commita76082905d63ac8aaaae25e5c76812e6edf9bc07 (patch)
tree5882b480ff3b7378a30f27106cd8fad6e8271407 /jstests/change_streams/change_stream_invalidation.js
parent8150b50f14579b6cbd673f12968726670f6e1b78 (diff)
downloadmongo-a76082905d63ac8aaaae25e5c76812e6edf9bc07.tar.gz
SERVER-32088: ChangeStream resumeAfter does not work on sharded collections if not all shards have chunks for the collection
Diffstat (limited to 'jstests/change_streams/change_stream_invalidation.js')
-rw-r--r--jstests/change_streams/change_stream_invalidation.js113
1 files changed, 71 insertions, 42 deletions
diff --git a/jstests/change_streams/change_stream_invalidation.js b/jstests/change_streams/change_stream_invalidation.js
index c525ece3f16..e955af3af87 100644
--- a/jstests/change_streams/change_stream_invalidation.js
+++ b/jstests/change_streams/change_stream_invalidation.js
@@ -3,26 +3,18 @@
(function() {
"use strict";
- load("jstests/libs/change_stream_util.js");
- load('jstests/libs/uuid_util.js');
load('jstests/replsets/libs/two_phase_drops.js'); // For 'TwoPhaseDropCollectionTest'.
load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
- let cst = new ChangeStreamTest(db);
-
db.getMongo().forceReadMode('commands');
// Write a document to the collection and test that the change stream returns it
// and getMore command closes the cursor afterwards.
const collGetMore = assertDropAndRecreateCollection(db, "change_stream_getmore_invalidations");
// We awaited the replication of the first write, so the change stream shouldn't return it.
- // Use { w: "majority" } to deal with journaling correctly, even though we only have one node.
- assert.writeOK(collGetMore.insert({_id: 0, a: 1}, {writeConcern: {w: "majority"}}));
-
- let aggcursor =
- cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: collGetMore});
+ assert.writeOK(collGetMore.insert({_id: 0, a: 1}));
- const collGetMoreUuid = getUUIDFromListCollections(db, collGetMore.getName());
+ let changeStream = collGetMore.watch();
// Drop the collection and test that we return "invalidate" entry and close the cursor. However,
// we return all oplog entries preceding the drop.
@@ -33,45 +25,61 @@
assert.writeOK(collGetMore.remove({_id: 1}));
// Drop the collection.
assert.commandWorked(db.runCommand({drop: collGetMore.getName()}));
+
// We should get 4 oplog entries of type insert, update, delete, and invalidate. The cursor
// should be closed.
- let change = cst.getOneChange(aggcursor);
- assert.eq(change.operationType, "insert", tojson(change));
- change = cst.getOneChange(aggcursor);
- assert.eq(change.operationType, "update", tojson(change));
- change = cst.getOneChange(aggcursor);
- assert.eq(change.operationType, "delete", tojson(change));
- cst.assertNextChangesEqual({
- cursor: aggcursor,
- expectedChanges: [{operationType: "invalidate"}],
- expectInvalidate: true
- });
+ assert.soon(() => changeStream.hasNext());
+ assert.eq(changeStream.next().operationType, "insert");
+ assert.soon(() => changeStream.hasNext());
+ assert.eq(changeStream.next().operationType, "update");
+ assert.soon(() => changeStream.hasNext());
+ assert.eq(changeStream.next().operationType, "delete");
+ assert.soon(() => changeStream.hasNext());
+ assert.eq(changeStream.next().operationType, "invalidate");
+ assert(changeStream.isExhausted());
jsTestLog("Testing aggregate command closes cursor for invalidate entries");
const collAgg = assertDropAndRecreateCollection(db, "change_stream_agg_invalidations");
- const collAggUuid = getUUIDFromListCollections(db, collAgg.getName());
+
// Get a valid resume token that the next aggregate command can use.
- aggcursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: collAgg});
+ changeStream = collAgg.watch();
- assert.writeOK(collAgg.insert({_id: 1}, {writeConcern: {w: "majority"}}));
+ assert.writeOK(collAgg.insert({_id: 1}));
- change = cst.getOneChange(aggcursor, false);
+ assert.soon(() => changeStream.hasNext());
+ let change = changeStream.next();
+ assert.eq(change.operationType, "insert");
+ assert.eq(change.documentKey, {_id: 1});
const resumeToken = change._id;
- // It should not possible to resume a change stream after a collection drop, even if the
- // invalidate has not been received.
+ // Insert another document after storing the resume token.
+ assert.writeOK(collAgg.insert({_id: 2}));
+
+ assert.soon(() => changeStream.hasNext());
+ change = changeStream.next();
+ assert.eq(change.operationType, "insert");
+ assert.eq(change.documentKey, {_id: 2});
+
+ // Drop the collection and invalidate the change stream.
assertDropCollection(db, collAgg.getName());
// Wait for two-phase drop to complete, so that the UUID no longer exists.
assert.soon(function() {
return !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(db, collAgg.getName());
});
- ChangeStreamTest.assertChangeStreamThrowsCode({
- db: db,
- collName: collAgg.getName(),
- pipeline: [{$changeStream: {resumeAfter: resumeToken}}],
- expectedCode: 40615,
- doNotModifyInPassthroughs: true
- });
+
+ // Resume the change stream after the collection drop, up to and including the invalidate. This
+ // is allowed if an explicit collation is provided.
+ changeStream = collAgg.watch([], {resumeAfter: resumeToken, collation: {locale: "simple"}});
+
+ assert.soon(() => changeStream.hasNext());
+ change = changeStream.next();
+ assert.eq(change.operationType, "insert");
+ assert.eq(change.documentKey, {_id: 2});
+
+ assert.soon(() => changeStream.hasNext());
+ change = changeStream.next();
+ assert.eq(change.operationType, "invalidate");
+ assert(changeStream.isExhausted());
// Test that it is possible to open a new change stream cursor on a collection that does not
// exist.
@@ -80,20 +88,41 @@
assertDropCollection(db, collDoesNotExistName);
// Cursor creation succeeds, but there are no results.
- aggcursor = cst.startWatchingChanges(
- {collection: collDoesNotExistName, pipeline: [{$changeStream: {}}]});
+ const cursorObj = assert
+ .commandWorked(db.runCommand({
+ aggregate: collDoesNotExistName,
+ pipeline: [{$changeStream: {}}],
+ cursor: {batchSize: 1},
+ }))
+ .cursor;
// We explicitly test getMore, to ensure that the getMore command for a non-existent collection
// does not return an error.
- aggcursor = cst.getNextBatch(aggcursor);
- assert.neq(aggcursor.id, 0);
- assert.eq(aggcursor.nextBatch.length, 0, tojson(aggcursor.nextBatch));
+ let getMoreResult =
+ assert
+ .commandWorked(db.runCommand(
+ {getMore: cursorObj.id, collection: collDoesNotExistName, batchSize: 1}))
+ .cursor;
+ assert.neq(getMoreResult.id, 0);
+ assert.eq(getMoreResult.nextBatch.length, 0, tojson(getMoreResult.nextBatch));
// After collection creation, we see oplog entries for the collection.
const collNowExists = assertCreateCollection(db, collDoesNotExistName);
assert.writeOK(collNowExists.insert({_id: 0}));
- change = cst.getOneChange(aggcursor);
- assert.eq(change.operationType, "insert", tojson(change));
- cst.cleanUp();
+ assert.soon(function() {
+ getMoreResult =
+ assert
+ .commandWorked(db.runCommand(
+ {getMore: cursorObj.id, collection: collDoesNotExistName, batchSize: 1}))
+ .cursor;
+ assert.neq(getMoreResult.id, 0);
+ return getMoreResult.nextBatch.length > 0;
+ }, "Timed out waiting for another result from getMore on non-existent collection.");
+ assert.eq(getMoreResult.nextBatch.length, 1);
+ assert.eq(getMoreResult.nextBatch[0].operationType, "insert");
+ assert.eq(getMoreResult.nextBatch[0].documentKey, {_id: 0});
+
+ assert.commandWorked(
+ db.runCommand({killCursors: collDoesNotExistName, cursors: [getMoreResult.id]}));
}());