diff options
author | Martin Neupauer <martin.neupauer@10gen.com> | 2018-01-09 10:57:15 -0500 |
---|---|---|
committer | Martin Neupauer <martin.neupauer@mongodb.com> | 2018-01-19 17:42:54 -0500 |
commit | 194ec4857fa0db8085da88e22eaae96687902d66 (patch) | |
tree | 7973e4852e590ebda38f6e39823fff50a97b466d /jstests | |
parent | eaa820e25fed6b52ea3e9b9eade337e85aa91386 (diff) | |
download | mongo-194ec4857fa0db8085da88e22eaae96687902d66.tar.gz |
SERVER-32349 Change streams over sharded collections may produce merged op log entries
with the same timestamps if the operations are coming from multiple shards. When we
resume the change stream we have to position to the right place - the position is determined
both by the timestamp and the document id. Previously we checked the timestamp only,
now we loop over the equal timestamps and find the right document.
Diffstat (limited to 'jstests')
-rw-r--r-- | jstests/sharding/resume_change_stream.js | 55 |
1 files changed, 53 insertions, 2 deletions
diff --git a/jstests/sharding/resume_change_stream.js b/jstests/sharding/resume_change_stream.js index edd127e81d5..41a2426796a 100644 --- a/jstests/sharding/resume_change_stream.js +++ b/jstests/sharding/resume_change_stream.js @@ -59,8 +59,6 @@ // We awaited the replication of the first writes, so the change stream shouldn't return them. assert.writeOK(mongosColl.update({_id: -1}, {$set: {updated: true}})); assert.writeOK(mongosColl.update({_id: 1}, {$set: {updated: true}})); - st.rs0.awaitReplication(); - st.rs1.awaitReplication(); // Test that we see the two writes, and remember their resume tokens. assert.soon(() => changeStream.hasNext()); @@ -134,5 +132,58 @@ expectedCode: 40576 }); + // Drop the collection. + assert(mongosColl.drop()); + + // Shard the test collection on shardKey. + assert.commandWorked( + mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {shardKey: 1}})); + + // Split the collection into 2 chunks: [MinKey, 50), [50, MaxKey]. + assert.commandWorked( + mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {shardKey: 50}})); + + // Move the [50, MaxKey] chunk to shard0001. + assert.commandWorked(mongosDB.adminCommand( + {moveChunk: mongosColl.getFullName(), find: {shardKey: 51}, to: st.rs1.getURL()})); + + const numberOfDocs = 100; + + // Insert test documents. + for (let counter = 0; counter < numberOfDocs / 5; ++counter) { + assert.writeOK(mongosColl.insert({_id: "abcd" + counter, shardKey: counter * 5 + 0}, + {writeConcern: {w: "majority"}})); + assert.writeOK(mongosColl.insert({_id: "Abcd" + counter, shardKey: counter * 5 + 1}, + {writeConcern: {w: "majority"}})); + assert.writeOK(mongosColl.insert({_id: "aBcd" + counter, shardKey: counter * 5 + 2}, + {writeConcern: {w: "majority"}})); + assert.writeOK(mongosColl.insert({_id: "abCd" + counter, shardKey: counter * 5 + 3}, + {writeConcern: {w: "majority"}})); + assert.writeOK(mongosColl.insert({_id: "abcD" + counter, shardKey: counter * 5 + 4}, + {writeConcern: {w: "majority"}})); + } + + let allChangesCursor = mongosColl.aggregate([{$changeStream: {}}]); + + // Perform the multi-update that will induce timestamp collisions + assert.writeOK(mongosColl.update({}, {$set: {updated: true}}, {multi: true})); + + // Loop over documents and open inner change streams resuming from a specified position. + // Note we skip the last document as it does not have the next document so we would + // hang indefinitely. + for (let counter = 0; counter < numberOfDocs - 1; ++counter) { + assert.soon(() => allChangesCursor.hasNext()); + let next = allChangesCursor.next(); + + const resumeToken = next._id; + const caseInsensitive = {locale: "en_US", strength: 2}; + let resumedCaseInsensitiveCursor = mongosColl.aggregate( + [{$changeStream: {resumeAfter: resumeToken}}], {collation: caseInsensitive}); + assert.soon(() => resumedCaseInsensitiveCursor.hasNext()); + resumedCaseInsensitiveCursor.close(); + } + + allChangesCursor.close(); + st.stop(); })(); |