summaryrefslogtreecommitdiff
path: root/jstests
diff options
context:
space:
mode:
authorNick Zolnierz <nicholas.zolnierz@mongodb.com>2018-05-09 12:25:35 -0400
committerNick Zolnierz <nicholas.zolnierz@mongodb.com>2018-06-04 12:09:50 -0400
commit753cedd024b9f4fe1a83632db792f29d3a7e4454 (patch)
tree0cd094ebb24806358e02d111cc47dadf113484ff /jstests
parent851c59e7bc5b54c0cf5feb683398a0eb6dffc20f (diff)
downloadmongo-753cedd024b9f4fe1a83632db792f29d3a7e4454.tar.gz
SERVER-34705: Whole-DB or whole-cluster change streams may not provide a total ordering if resumed after a drop
(cherry picked from commit 55f4dbf94a1cce9d8642af9bba9ac4cc77627293)
Diffstat (limited to 'jstests')
-rw-r--r--jstests/sharding/change_stream_invalidation.js34
-rw-r--r--jstests/sharding/change_streams_primary_shard_unaware.js2
-rw-r--r--jstests/sharding/change_streams_whole_db.js34
3 files changed, 52 insertions, 18 deletions
diff --git a/jstests/sharding/change_stream_invalidation.js b/jstests/sharding/change_stream_invalidation.js
index 396db586c4f..eb51724f8ea 100644
--- a/jstests/sharding/change_stream_invalidation.js
+++ b/jstests/sharding/change_stream_invalidation.js
@@ -30,50 +30,53 @@
assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL());
- // Shard the test collection on _id.
+ // Shard the test collection on a field called 'shardKey'.
assert.commandWorked(
- mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}}));
+ mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {shardKey: 1}}));
// Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey].
assert.commandWorked(
- mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}}));
+ mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {shardKey: 0}}));
// Move the [0, MaxKey] chunk to st.shard1.shardName.
assert.commandWorked(mongosDB.adminCommand(
- {moveChunk: mongosColl.getFullName(), find: {_id: 1}, to: st.rs1.getURL()}));
+ {moveChunk: mongosColl.getFullName(), find: {shardKey: 1}, to: st.rs1.getURL()}));
// Write a document to each chunk.
- assert.writeOK(mongosColl.insert({_id: -1}, {writeConcern: {w: "majority"}}));
- assert.writeOK(mongosColl.insert({_id: 1}, {writeConcern: {w: "majority"}}));
+ assert.writeOK(mongosColl.insert({shardKey: -1, _id: -1}, {writeConcern: {w: "majority"}}));
+ assert.writeOK(mongosColl.insert({shardKey: 1, _id: 1}, {writeConcern: {w: "majority"}}));
- let changeStream = mongosColl.aggregate([{$changeStream: {}}]);
+ let changeStream = mongosColl.watch();
// We awaited the replication of the first writes, so the change stream shouldn't return them.
- assert.writeOK(mongosColl.update({_id: -1}, {$set: {updated: true}}));
- assert.writeOK(mongosColl.update({_id: 1}, {$set: {updated: true}}));
+ assert.writeOK(mongosColl.update({shardKey: -1, _id: -1}, {$set: {updated: true}}));
+ assert.writeOK(mongosColl.update({shardKey: 1, _id: 1}, {$set: {updated: true}}));
+ assert.writeOK(mongosColl.insert({shardKey: 2, _id: 2}));
// Drop the collection and test that we return "invalidate" entry and close the cursor.
mongosColl.drop();
- st.rs0.awaitReplication();
- st.rs1.awaitReplication();
// Test that we see the two writes that happened before the invalidation.
assert.soon(() => changeStream.hasNext());
let next = changeStream.next();
assert.eq(next.operationType, "update");
- assert.eq(next.documentKey._id, -1);
+ assert.eq(next.documentKey.shardKey, -1);
const resumeTokenFromFirstUpdate = next._id;
assert.soon(() => changeStream.hasNext());
next = changeStream.next();
assert.eq(next.operationType, "update");
- assert.eq(next.documentKey._id, 1);
+ assert.eq(next.documentKey.shardKey, 1);
assert.soon(() => changeStream.hasNext());
next = changeStream.next();
- assert.eq(next.operationType, "invalidate");
+ assert.eq(next.operationType, "insert");
+ assert.eq(next.documentKey, {_id: 2});
- assert(!changeStream.hasNext(), "expected invalidation to cause the cursor to be closed");
+ assert.soon(() => changeStream.hasNext());
+ next = changeStream.next();
+ assert.eq(next.operationType, "invalidate");
+ assert(changeStream.isExhausted());
// Test that it is not possible to resume a change stream after a collection has been dropped.
// Once it's been dropped, we won't be able to figure out the shard key.
@@ -82,7 +85,6 @@
assert.commandFailedWithCode(mongosDB.runCommand({
aggregate: mongosColl.getName(),
pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdate}}],
- readConcern: {level: "majority"},
cursor: {}
}),
40615);
diff --git a/jstests/sharding/change_streams_primary_shard_unaware.js b/jstests/sharding/change_streams_primary_shard_unaware.js
index ec4027bb800..89f8d0ebf6d 100644
--- a/jstests/sharding/change_streams_primary_shard_unaware.js
+++ b/jstests/sharding/change_streams_primary_shard_unaware.js
@@ -168,7 +168,7 @@
cstMongos2.assertNextChangesEqual({
cursor: cursorMongos2,
expectedChanges: [{
- documentKey: {_id: 2},
+ documentKey: {_id: 2, a: 2},
fullDocument: {_id: 2, a: 2},
ns: {db: mongos2DB.getName(), coll: mongos2Coll.getName()},
operationType: "insert",
diff --git a/jstests/sharding/change_streams_whole_db.js b/jstests/sharding/change_streams_whole_db.js
index d43294e7772..bc7d559610a 100644
--- a/jstests/sharding/change_streams_whole_db.js
+++ b/jstests/sharding/change_streams_whole_db.js
@@ -5,6 +5,7 @@
load('jstests/replsets/libs/two_phase_drops.js'); // For TwoPhaseDropCollectionTest.
load('jstests/aggregation/extras/utils.js'); // For assertErrorCode().
load('jstests/libs/change_stream_util.js'); // For ChangeStreamTest.
+ load("jstests/libs/collection_drop_recreate.js"); // For assertDropCollection.
// For supportsMajorityReadConcern().
load("jstests/multiVersion/libs/causal_consistency_helpers.js");
@@ -151,7 +152,38 @@
operationType: "insert",
},
];
- cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected});
+
+ const results = cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected});
+ // Store the resume token of the first insert to use after dropping the collection.
+ const resumeTokenBeforeDrop = results[0]._id;
+
+ // Write one more document to the collection that will be dropped, to be returned after
+ // resuming.
+ assert.writeOK(mongosCollShardedOnX.insert({_id: 4, x: 4}));
+
+ // Drop the collection, invalidating the open change stream.
+ assertDropCollection(mongosDB, mongosCollShardedOnX.getName());
+
+ // Resume the change stream from before the collection drop, and verify that the documentKey
+ // field contains the extracted shard key from the resume token.
+ cursor = cst.startWatchingChanges({
+ pipeline: [
+ {$changeStream: {resumeAfter: resumeTokenBeforeDrop}},
+ {$match: {"ns.coll": mongosCollShardedOnX.getName()}}
+ ],
+ collection: 1
+ });
+ cst.assertNextChangesEqual({
+ cursor: cursor,
+ expectedChanges: [
+ {
+ documentKey: {_id: 4, x: 4},
+ fullDocument: {_id: 4, x: 4},
+ ns: {db: mongosDB.getName(), coll: mongosCollShardedOnX.getName()},
+ operationType: "insert",
+ },
+ ]
+ });
cst.cleanUp();