diff options
author | Nick Zolnierz <nicholas.zolnierz@mongodb.com> | 2018-04-10 10:04:41 -0400 |
---|---|---|
committer | Nick Zolnierz <nicholas.zolnierz@mongodb.com> | 2018-05-29 09:51:51 -0400 |
commit | a76082905d63ac8aaaae25e5c76812e6edf9bc07 (patch) | |
tree | 5882b480ff3b7378a30f27106cd8fad6e8271407 /jstests/change_streams/change_stream_invalidation.js | |
parent | 8150b50f14579b6cbd673f12968726670f6e1b78 (diff) | |
download | mongo-a76082905d63ac8aaaae25e5c76812e6edf9bc07.tar.gz |
SERVER-32088: ChangeStream resumeAfter does not work on sharded collections if not all shards have chunks for the collection
Diffstat (limited to 'jstests/change_streams/change_stream_invalidation.js')
-rw-r--r-- | jstests/change_streams/change_stream_invalidation.js | 113 |
1 files changed, 71 insertions, 42 deletions
diff --git a/jstests/change_streams/change_stream_invalidation.js b/jstests/change_streams/change_stream_invalidation.js index c525ece3f16..e955af3af87 100644 --- a/jstests/change_streams/change_stream_invalidation.js +++ b/jstests/change_streams/change_stream_invalidation.js @@ -3,26 +3,18 @@ (function() { "use strict"; - load("jstests/libs/change_stream_util.js"); - load('jstests/libs/uuid_util.js'); load('jstests/replsets/libs/two_phase_drops.js'); // For 'TwoPhaseDropCollectionTest'. load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. - let cst = new ChangeStreamTest(db); - db.getMongo().forceReadMode('commands'); // Write a document to the collection and test that the change stream returns it // and getMore command closes the cursor afterwards. const collGetMore = assertDropAndRecreateCollection(db, "change_stream_getmore_invalidations"); // We awaited the replication of the first write, so the change stream shouldn't return it. - // Use { w: "majority" } to deal with journaling correctly, even though we only have one node. - assert.writeOK(collGetMore.insert({_id: 0, a: 1}, {writeConcern: {w: "majority"}})); - - let aggcursor = - cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: collGetMore}); + assert.writeOK(collGetMore.insert({_id: 0, a: 1})); - const collGetMoreUuid = getUUIDFromListCollections(db, collGetMore.getName()); + let changeStream = collGetMore.watch(); // Drop the collection and test that we return "invalidate" entry and close the cursor. However, // we return all oplog entries preceding the drop. @@ -33,45 +25,61 @@ assert.writeOK(collGetMore.remove({_id: 1})); // Drop the collection. assert.commandWorked(db.runCommand({drop: collGetMore.getName()})); + // We should get 4 oplog entries of type insert, update, delete, and invalidate. The cursor // should be closed. - let change = cst.getOneChange(aggcursor); - assert.eq(change.operationType, "insert", tojson(change)); - change = cst.getOneChange(aggcursor); - assert.eq(change.operationType, "update", tojson(change)); - change = cst.getOneChange(aggcursor); - assert.eq(change.operationType, "delete", tojson(change)); - cst.assertNextChangesEqual({ - cursor: aggcursor, - expectedChanges: [{operationType: "invalidate"}], - expectInvalidate: true - }); + assert.soon(() => changeStream.hasNext()); + assert.eq(changeStream.next().operationType, "insert"); + assert.soon(() => changeStream.hasNext()); + assert.eq(changeStream.next().operationType, "update"); + assert.soon(() => changeStream.hasNext()); + assert.eq(changeStream.next().operationType, "delete"); + assert.soon(() => changeStream.hasNext()); + assert.eq(changeStream.next().operationType, "invalidate"); + assert(changeStream.isExhausted()); jsTestLog("Testing aggregate command closes cursor for invalidate entries"); const collAgg = assertDropAndRecreateCollection(db, "change_stream_agg_invalidations"); - const collAggUuid = getUUIDFromListCollections(db, collAgg.getName()); + // Get a valid resume token that the next aggregate command can use. - aggcursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: collAgg}); + changeStream = collAgg.watch(); - assert.writeOK(collAgg.insert({_id: 1}, {writeConcern: {w: "majority"}})); + assert.writeOK(collAgg.insert({_id: 1})); - change = cst.getOneChange(aggcursor, false); + assert.soon(() => changeStream.hasNext()); + let change = changeStream.next(); + assert.eq(change.operationType, "insert"); + assert.eq(change.documentKey, {_id: 1}); const resumeToken = change._id; - // It should not possible to resume a change stream after a collection drop, even if the - // invalidate has not been received. + // Insert another document after storing the resume token. + assert.writeOK(collAgg.insert({_id: 2})); + + assert.soon(() => changeStream.hasNext()); + change = changeStream.next(); + assert.eq(change.operationType, "insert"); + assert.eq(change.documentKey, {_id: 2}); + + // Drop the collection and invalidate the change stream. assertDropCollection(db, collAgg.getName()); // Wait for two-phase drop to complete, so that the UUID no longer exists. assert.soon(function() { return !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(db, collAgg.getName()); }); - ChangeStreamTest.assertChangeStreamThrowsCode({ - db: db, - collName: collAgg.getName(), - pipeline: [{$changeStream: {resumeAfter: resumeToken}}], - expectedCode: 40615, - doNotModifyInPassthroughs: true - }); + + // Resume the change stream after the collection drop, up to and including the invalidate. This + // is allowed if an explicit collation is provided. + changeStream = collAgg.watch([], {resumeAfter: resumeToken, collation: {locale: "simple"}}); + + assert.soon(() => changeStream.hasNext()); + change = changeStream.next(); + assert.eq(change.operationType, "insert"); + assert.eq(change.documentKey, {_id: 2}); + + assert.soon(() => changeStream.hasNext()); + change = changeStream.next(); + assert.eq(change.operationType, "invalidate"); + assert(changeStream.isExhausted()); // Test that it is possible to open a new change stream cursor on a collection that does not // exist. @@ -80,20 +88,41 @@ assertDropCollection(db, collDoesNotExistName); // Cursor creation succeeds, but there are no results. - aggcursor = cst.startWatchingChanges( - {collection: collDoesNotExistName, pipeline: [{$changeStream: {}}]}); + const cursorObj = assert + .commandWorked(db.runCommand({ + aggregate: collDoesNotExistName, + pipeline: [{$changeStream: {}}], + cursor: {batchSize: 1}, + })) + .cursor; // We explicitly test getMore, to ensure that the getMore command for a non-existent collection // does not return an error. - aggcursor = cst.getNextBatch(aggcursor); - assert.neq(aggcursor.id, 0); - assert.eq(aggcursor.nextBatch.length, 0, tojson(aggcursor.nextBatch)); + let getMoreResult = + assert + .commandWorked(db.runCommand( + {getMore: cursorObj.id, collection: collDoesNotExistName, batchSize: 1})) + .cursor; + assert.neq(getMoreResult.id, 0); + assert.eq(getMoreResult.nextBatch.length, 0, tojson(getMoreResult.nextBatch)); // After collection creation, we see oplog entries for the collection. const collNowExists = assertCreateCollection(db, collDoesNotExistName); assert.writeOK(collNowExists.insert({_id: 0})); - change = cst.getOneChange(aggcursor); - assert.eq(change.operationType, "insert", tojson(change)); - cst.cleanUp(); + assert.soon(function() { + getMoreResult = + assert + .commandWorked(db.runCommand( + {getMore: cursorObj.id, collection: collDoesNotExistName, batchSize: 1})) + .cursor; + assert.neq(getMoreResult.id, 0); + return getMoreResult.nextBatch.length > 0; + }, "Timed out waiting for another result from getMore on non-existent collection."); + assert.eq(getMoreResult.nextBatch.length, 1); + assert.eq(getMoreResult.nextBatch[0].operationType, "insert"); + assert.eq(getMoreResult.nextBatch[0].documentKey, {_id: 0}); + + assert.commandWorked( + db.runCommand({killCursors: collDoesNotExistName, cursors: [getMoreResult.id]})); }()); |