summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2018-06-29 12:39:29 -0400
committerCharlie Swanson <charlie.swanson@mongodb.com>2018-07-03 13:51:37 -0400
commitc457b59f9599c61144ceee50b5190aec8f8b888b (patch)
treea961111959966fd10abbd6fa7a1dfa203871d81e
parentf4ad4f148a0ac37957bc854aa41c9ed53deceed9 (diff)
downloadmongo-c457b59f9599c61144ceee50b5190aec8f8b888b.tar.gz
SERVER-35917 Blacklist resumeAfter from sharded change streams suite
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml3
-rw-r--r--jstests/change_streams/change_stream.js47
-rw-r--r--jstests/change_streams/change_stream_shell_helper.js2
-rw-r--r--jstests/change_streams/lookup_post_image.js93
-rw-r--r--jstests/change_streams/lookup_post_image_resume_after.js108
-rw-r--r--jstests/change_streams/resume_after.js57
6 files changed, 172 insertions, 138 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml
index 827f88fe878..1c305f62970 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml
@@ -20,6 +20,9 @@ selector:
# "Cowardly refusing to run test with overridden write concern when it uses a command that can
# only perform w=1 writes: ..."
- requires_eval_command
+ # SERVER-32088 Resuming a change stream may not work if not all shards have chunks for the
+ # collection, which can happen if the initial sharding of the collection has a failed migration.
+ - uses_resume_after
executor:
archive:
diff --git a/jstests/change_streams/change_stream.js b/jstests/change_streams/change_stream.js
index 59e8df872f1..7570ecca52f 100644
--- a/jstests/change_streams/change_stream.js
+++ b/jstests/change_streams/change_stream.js
@@ -156,52 +156,5 @@
db.getMongo().forceReadMode('commands');
}
- jsTestLog("Testing resumability");
- assertDropAndRecreateCollection(db, "resume1");
-
- // Note we do not project away 'id.ts' as it is part of the resume token.
- let resumeCursor = cst.startWatchingChanges(
- {pipeline: [{$changeStream: {}}], collection: db.resume1, includeToken: true});
-
- // Insert a document and save the resulting change stream.
- assert.writeOK(db.resume1.insert({_id: 1}));
- const firstInsertChangeDoc = cst.getOneChange(resumeCursor);
- assert.docEq(firstInsertChangeDoc.fullDocument, {_id: 1});
-
- jsTestLog("Testing resume after one document.");
- resumeCursor = cst.startWatchingChanges({
- pipeline: [{$changeStream: {resumeAfter: firstInsertChangeDoc._id}}],
- collection: db.resume1,
- includeToken: true,
- aggregateOptions: {cursor: {batchSize: 0}},
- });
-
- jsTestLog("Inserting additional documents.");
- assert.writeOK(db.resume1.insert({_id: 2}));
- const secondInsertChangeDoc = cst.getOneChange(resumeCursor);
- assert.docEq(secondInsertChangeDoc.fullDocument, {_id: 2});
- assert.writeOK(db.resume1.insert({_id: 3}));
- const thirdInsertChangeDoc = cst.getOneChange(resumeCursor);
- assert.docEq(thirdInsertChangeDoc.fullDocument, {_id: 3});
-
- jsTestLog("Testing resume after first document of three.");
- resumeCursor = cst.startWatchingChanges({
- pipeline: [{$changeStream: {resumeAfter: firstInsertChangeDoc._id}}],
- collection: db.resume1,
- includeToken: true,
- aggregateOptions: {cursor: {batchSize: 0}},
- });
- assert.docEq(cst.getOneChange(resumeCursor), secondInsertChangeDoc);
- assert.docEq(cst.getOneChange(resumeCursor), thirdInsertChangeDoc);
-
- jsTestLog("Testing resume after second document of three.");
- resumeCursor = cst.startWatchingChanges({
- pipeline: [{$changeStream: {resumeAfter: secondInsertChangeDoc._id}}],
- collection: db.resume1,
- includeToken: true,
- aggregateOptions: {cursor: {batchSize: 0}},
- });
- assert.docEq(cst.getOneChange(resumeCursor), thirdInsertChangeDoc);
-
cst.cleanUp();
}());
diff --git a/jstests/change_streams/change_stream_shell_helper.js b/jstests/change_streams/change_stream_shell_helper.js
index 58d20645ae1..949366fa04e 100644
--- a/jstests/change_streams/change_stream_shell_helper.js
+++ b/jstests/change_streams/change_stream_shell_helper.js
@@ -1,5 +1,5 @@
// Test DBCollection.watch() shell helper and its options.
-
+// @tags: [uses_resume_after]
(function() {
"use strict";
diff --git a/jstests/change_streams/lookup_post_image.js b/jstests/change_streams/lookup_post_image.js
index bbf2b8027c7..ce22137f09c 100644
--- a/jstests/change_streams/lookup_post_image.js
+++ b/jstests/change_streams/lookup_post_image.js
@@ -116,92 +116,6 @@
assert.eq(latestChange.operationType, "update");
assert(latestChange.hasOwnProperty("fullDocument"));
assert.eq(latestChange.fullDocument, null);
- const deleteDocResumePoint = latestChange._id;
-
- // Test that looking up the post image of an update after the collection has been dropped will
- // result in 'fullDocument' with a value of null. This must be done using getMore because new
- // cursors cannot be established after a collection drop.
- assert.writeOK(coll.insert({_id: "fullDocument is lookup 2"}));
- assert.writeOK(coll.update({_id: "fullDocument is lookup 2"}, {$set: {updated: true}}));
-
- // Open a $changeStream cursor with batchSize 0, so that no oplog entries are retrieved yet.
- cursor = cst.startWatchingChanges({
- collection: coll,
- pipeline: [
- {$changeStream: {fullDocument: "updateLookup", resumeAfter: deleteDocResumePoint}},
- {$match: {operationType: {$ne: "delete"}}}
- ],
- aggregateOptions: {cursor: {batchSize: 0}}
- });
-
- // Save another stream to test post-image lookup after the collection is recreated.
- let cursorBeforeDrop = cst.startWatchingChanges({
- collection: coll,
- pipeline: [
- {$changeStream: {fullDocument: "updateLookup", resumeAfter: deleteDocResumePoint}},
- {$match: {operationType: {$ne: "delete"}}}
- ],
- aggregateOptions: {cursor: {batchSize: 0}}
- });
-
- // Retrieve the 'insert' operation from the latter stream. This is necessary on a sharded
- // collection so that the documentKey is retrieved before the collection is recreated;
- // otherwise, per SERVER-31691, a uassert will occur.
- // TODO SERVER-31847: all remaining operations on the old UUID should be visible even if we have
- // not retrieved the first oplog entry before the collection is recreated.
- latestChange = cst.getOneChange(cursorBeforeDrop);
- assert.eq(latestChange.operationType, "insert");
- assert(latestChange.hasOwnProperty("fullDocument"));
- assert.eq(latestChange.fullDocument, {_id: "fullDocument is lookup 2"});
-
- // Drop the collection and wait until two-phase drop finishes.
- assertDropCollection(db, coll.getName());
- assert.soon(function() {
- return !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(db, coll.getName());
- });
- // If this test is running with secondary read preference, it's necessary for the drop
- // to propagate to all secondary nodes and be available for majority reads before we can
- // assume looking up the document will fail.
- FixtureHelpers.awaitLastOpCommitted();
-
- // Check the next $changeStream entry; this is the test document inserted above.
- latestChange = cst.getOneChange(cursor);
- assert.eq(latestChange.operationType, "insert");
- assert(latestChange.hasOwnProperty("fullDocument"));
- assert.eq(latestChange.fullDocument, {_id: "fullDocument is lookup 2"});
-
- // The next entry is the 'update' operation. Because the collection has been dropped, our
- // attempt to look up the post-image results in a null document.
- latestChange = cst.getOneChange(cursor);
- assert.eq(latestChange.operationType, "update");
- assert(latestChange.hasOwnProperty("fullDocument"));
- assert.eq(latestChange.fullDocument, null);
-
- // Test establishing new cursors with resume token on dropped collections fails.
- let res = db.runCommand({
- aggregate: coll.getName(),
- pipeline: [
- {$changeStream: {fullDocument: "updateLookup", resumeAfter: deleteDocResumePoint}},
- {$match: {operationType: "update"}}
- ],
- cursor: {batchSize: 0}
- });
- assert.commandFailedWithCode(res, 40615);
-
- // Test that looking up the post image of an update after the collection has been dropped and
- // created again will result in 'fullDocument' with a value of null. This must be done using
- // getMore because new cursors cannot be established after a collection drop.
-
- // Insert a document with the same _id, verify the change stream won't return it due to
- // different UUID.
- assertCreateCollection(db, coll.getName());
- assert.writeOK(coll.insert({_id: "fullDocument is lookup 2"}));
-
- // Confirm that the next entry's post-image is null since new collection has a different UUID.
- latestChange = cst.getOneChange(cursorBeforeDrop);
- assert.eq(latestChange.operationType, "update");
- assert(latestChange.hasOwnProperty("fullDocument"));
- assert.eq(latestChange.fullDocument, null);
// Test that invalidate entries don't have 'fullDocument' even if 'updateLookup' is specified.
const collInvalidate = assertDropAndRecreateCollection(db, "collInvalidate");
@@ -213,10 +127,9 @@
assert.writeOK(collInvalidate.insert({_id: "testing invalidate"}));
assertDropCollection(db, collInvalidate.getName());
// Wait until two-phase drop finishes.
- assert.soon(function() {
- return !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(
- db, collInvalidate.getName());
- });
+ assert.soon(() => !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(
+ db, collInvalidate.getName()));
+
latestChange = cst.getOneChange(cursor);
assert.eq(latestChange.operationType, "insert");
latestChange = cst.getOneChange(cursor, true);
diff --git a/jstests/change_streams/lookup_post_image_resume_after.js b/jstests/change_streams/lookup_post_image_resume_after.js
new file mode 100644
index 00000000000..b2d8e345679
--- /dev/null
+++ b/jstests/change_streams/lookup_post_image_resume_after.js
@@ -0,0 +1,108 @@
+// Tests the behavior of using fullDocument: "updateLookup" with a resumeToken, possibly from far
+// enough in the past that the document doesn't exist yet.
+// @tags: [uses_resume_after]
+(function() {
+ "use strict";
+
+ load("jstests/libs/change_stream_util.js");
+ load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
+ load("jstests/libs/fixture_helpers.js"); // For awaitLastOpCommitted().
+ load("jstests/replsets/libs/two_phase_drops.js"); // For 'TwoPhaseDropCollectionTest'.
+
+ let cst = new ChangeStreamTest(db);
+ const coll = assertDropAndRecreateCollection(db, "post_image_resume_after");
+ const streamToGetResumeToken = cst.startWatchingChanges({
+ collection: coll,
+ pipeline: [{$changeStream: {}}],
+ includeToken: true,
+ aggregateOptions: {cursor: {batchSize: 0}}
+ });
+ assert.writeOK(coll.insert({_id: "for resuming later"}));
+ const resumePoint = cst.getOneChange(streamToGetResumeToken)._id;
+
+ // Test that looking up the post image of an update after the collection has been dropped will
+ // result in 'fullDocument' with a value of null. This must be done using getMore because new
+ // cursors cannot be established after a collection drop.
+ assert.writeOK(coll.insert({_id: "TARGET"}));
+ assert.writeOK(coll.update({_id: "TARGET"}, {$set: {updated: true}}));
+
+ // Open a $changeStream cursor with batchSize 0, so that no oplog entries are retrieved yet.
+ const firstResumeCursor = cst.startWatchingChanges({
+ collection: coll,
+ pipeline: [{$changeStream: {fullDocument: "updateLookup", resumeAfter: resumePoint}}],
+ aggregateOptions: {cursor: {batchSize: 0}}
+ });
+
+ // Save another stream to test post-image lookup after the collection is recreated. We have to
+ // create this before we drop the collection because otherwise the resume token's UUID will not
+ // match that of the collection.
+ const secondResumeCursor = cst.startWatchingChanges({
+ collection: coll,
+ pipeline: [{$changeStream: {fullDocument: "updateLookup", resumeAfter: resumePoint}}],
+ aggregateOptions: {cursor: {batchSize: 0}}
+ });
+
+ // Drop the collection and wait until two-phase drop finishes.
+ assertDropCollection(db, coll.getName());
+ assert.soon(
+ () => !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(db, coll.getName()));
+
+ // If this test is running with secondary read preference, it's necessary for the drop to
+ // propagate to all secondary nodes and be available for majority reads before we can assume
+ // looking up the document will fail.
+ FixtureHelpers.awaitLastOpCommitted();
+
+ // Check the next $changeStream entry; this is the test document inserted above.
+ let latestChange = cst.getOneChange(firstResumeCursor);
+ assert.eq(latestChange.operationType, "insert");
+ assert.eq(latestChange.fullDocument, {_id: "TARGET"});
+
+ // The next entry is the 'update' operation. Because the collection has been dropped, our
+ // attempt to look up the post-image results in a null document.
+ latestChange = cst.getOneChange(firstResumeCursor);
+ assert.eq(latestChange.operationType, "update");
+ assert(latestChange.hasOwnProperty("fullDocument"));
+ assert.eq(latestChange.fullDocument, null);
+
+ // Test establishing new cursors with resume token on dropped collections fails.
+ assert.commandFailedWithCode(db.runCommand({
+ aggregate: coll.getName(),
+ pipeline: [
+ {$changeStream: {fullDocument: "updateLookup", resumeAfter: resumePoint}},
+ {$match: {operationType: "update"}}
+ ],
+ cursor: {batchSize: 0}
+ }),
+ 40615);
+
+ // Before we re-create the collection we must consume the insert notification. This is to
+ // prevent the change stream from throwing an assertion when it goes to look up the shard key
+ // for the collection and finds that it has a mismatching UUID. It should proceed without error
+ // if the collection doesn't exist (has no UUID).
+ cst.assertNextChangesEqual({
+ cursor: secondResumeCursor,
+ expectedChanges: [{
+ documentKey: {_id: "TARGET"},
+ fullDocument: {_id: "TARGET"},
+ ns: {db: db.getName(), coll: coll.getName()},
+ operationType: "insert"
+ }]
+ });
+
+ // Recreate the collection, insert a document with the same _id, and test that the change stream
+ // won't return it because the collection will now have a different UUID.
+ assert.writeOK(coll.insert({_id: "fullDocument is lookup 2"}));
+
+ cst.assertNextChangesEqual({
+ cursor: secondResumeCursor,
+ expectedChanges: [{
+ documentKey: {_id: "TARGET"},
+ fullDocument: null,
+ ns: {db: db.getName(), coll: coll.getName()},
+ operationType: "update",
+ updateDescription: {updatedFields: {updated: true}, removedFields: []}
+ }]
+ });
+
+ cst.cleanUp();
+}());
diff --git a/jstests/change_streams/resume_after.js b/jstests/change_streams/resume_after.js
new file mode 100644
index 00000000000..41a9f665969
--- /dev/null
+++ b/jstests/change_streams/resume_after.js
@@ -0,0 +1,57 @@
+// Tests the ability to resume a change stream at different points in the stream.
+// @tags: [uses_resume_after]
+(function() {
+ "use strict";
+
+ load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
+ load("jstests/libs/change_stream_util.js");
+
+ let cst = new ChangeStreamTest(db);
+ assertDropAndRecreateCollection(db, "resume_after");
+
+ // Note we do not project away 'id.ts' as it is part of the resume token.
+ let resumeCursor = cst.startWatchingChanges(
+ {pipeline: [{$changeStream: {}}], collection: db.resume_after, includeToken: true});
+
+ // Insert a document and save the resulting change stream.
+ assert.writeOK(db.resume_after.insert({_id: 1}));
+ const firstInsertChangeDoc = cst.getOneChange(resumeCursor);
+ assert.docEq(firstInsertChangeDoc.fullDocument, {_id: 1});
+
+ jsTestLog("Testing resume after one document.");
+ resumeCursor = cst.startWatchingChanges({
+ pipeline: [{$changeStream: {resumeAfter: firstInsertChangeDoc._id}}],
+ collection: db.resume_after,
+ includeToken: true,
+ aggregateOptions: {cursor: {batchSize: 0}},
+ });
+
+ jsTestLog("Inserting additional documents.");
+ assert.writeOK(db.resume_after.insert({_id: 2}));
+ const secondInsertChangeDoc = cst.getOneChange(resumeCursor);
+ assert.docEq(secondInsertChangeDoc.fullDocument, {_id: 2});
+ assert.writeOK(db.resume_after.insert({_id: 3}));
+ const thirdInsertChangeDoc = cst.getOneChange(resumeCursor);
+ assert.docEq(thirdInsertChangeDoc.fullDocument, {_id: 3});
+
+ jsTestLog("Testing resume after first document of three.");
+ resumeCursor = cst.startWatchingChanges({
+ pipeline: [{$changeStream: {resumeAfter: firstInsertChangeDoc._id}}],
+ collection: db.resume_after,
+ includeToken: true,
+ aggregateOptions: {cursor: {batchSize: 0}},
+ });
+ assert.docEq(cst.getOneChange(resumeCursor), secondInsertChangeDoc);
+ assert.docEq(cst.getOneChange(resumeCursor), thirdInsertChangeDoc);
+
+ jsTestLog("Testing resume after second document of three.");
+ resumeCursor = cst.startWatchingChanges({
+ pipeline: [{$changeStream: {resumeAfter: secondInsertChangeDoc._id}}],
+ collection: db.resume_after,
+ includeToken: true,
+ aggregateOptions: {cursor: {batchSize: 0}},
+ });
+ assert.docEq(cst.getOneChange(resumeCursor), thirdInsertChangeDoc);
+
+ cst.cleanUp();
+}());