summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2019-03-12 14:19:53 -0400
committerBernard Gorman <bernard.gorman@gmail.com>2019-03-20 02:39:11 +0000
commit1b82c812a9c0bbf6dc79d5400de9ea99e6ffa025 (patch)
tree1423c9a61395b83fd6df9dd094e5f7d5125c815c
parentd3c65df80079bcd6fa74ceed033690c04ec16156 (diff)
downloadmongo-r4.0.7-rc1.tar.gz
SERVER-40094 Do not prematurely reject resume attempt in DSShardCheckResumabilityr4.0.7-rc1r4.0.7
(cherry picked from commit 4fb1c39e803ea1876ac330b80e2ec5a443b29d33)
-rw-r--r--jstests/noPassthrough/change_streams_resume_same_clustertime_different_uuid.js92
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token.cpp25
2 files changed, 110 insertions, 7 deletions
diff --git a/jstests/noPassthrough/change_streams_resume_same_clustertime_different_uuid.js b/jstests/noPassthrough/change_streams_resume_same_clustertime_different_uuid.js
new file mode 100644
index 00000000000..7fd5aa2abc4
--- /dev/null
+++ b/jstests/noPassthrough/change_streams_resume_same_clustertime_different_uuid.js
@@ -0,0 +1,92 @@
+/**
+ * Confirms that resuming from an event which has the same clusterTime but a different UUID than on
+ * another shard does not cause the resume attempt to be prematurely rejected. Reproduction script
+ * for the bug described in SERVER-40094.
+ * @tags: [requires_sharding, uses_change_streams]
+ */
+(function() {
+ "use strict";
+
+ // Asserts that the expected operation type and documentKey are found on the change stream
+ // cursor. Returns the change stream document.
+ function assertWriteVisible({cursor, opType, docKey}) {
+ assert.soon(() => cursor.hasNext());
+ const changeDoc = cursor.next();
+ assert.eq(opType, changeDoc.operationType, changeDoc);
+ assert.eq(docKey, changeDoc.documentKey, changeDoc);
+ return changeDoc;
+ }
+
+ // Create a new cluster with 2 shards. Disable periodic no-ops to ensure that we have control
+ // over the ordering of events across the cluster.
+ const st = new ShardingTest({
+ shards: 2,
+ rs: {nodes: 1, setParameter: {writePeriodicNoops: false, periodicNoopIntervalSecs: 1}}
+ });
+
+ // Create two databases. We will place one of these on each shard.
+ const mongosDB0 = st.s.getDB(`${jsTestName()}_0`);
+ const mongosDB1 = st.s.getDB(`${jsTestName()}_1`);
+ const adminDB = st.s.getDB("admin");
+
+ // Enable sharding on mongosDB0 and ensure its primary is shard0.
+ assert.commandWorked(mongosDB0.adminCommand({enableSharding: mongosDB0.getName()}));
+ st.ensurePrimaryShard(mongosDB0.getName(), st.rs0.getURL());
+
+ // Enable sharding on mongosDB1 and ensure its primary is shard1.
+ assert.commandWorked(mongosDB1.adminCommand({enableSharding: mongosDB1.getName()}));
+ st.ensurePrimaryShard(mongosDB1.getName(), st.rs1.getURL());
+
+ // Open a connection to a different collection on each shard. We use direct connections to
+ // ensure that the oplog timestamps across the shards overlap.
+ const coll0 = st.rs0.getPrimary().getCollection(`${mongosDB0.getName()}.test`);
+ const coll1 = st.rs1.getPrimary().getCollection(`${mongosDB1.getName()}.test`);
+
+ // Open a change stream on the test cluster. We will capture events in 'changeList'.
+ const changeStreamCursor = adminDB.aggregate([{$changeStream: {allChangesForCluster: true}}]);
+ const changeList = [];
+
+ // Insert ten documents on each shard, alternating between the two collections.
+ for (let i = 0; i < 20; ++i) {
+ const coll = (i % 2 ? coll1 : coll0);
+ assert.commandWorked(coll.insert({shard: (i % 2)}));
+ }
+
+ // Verify that each shard now has ten total documents present in the associated collection.
+ assert.eq(st.rs0.getPrimary().getCollection(coll0.getFullName()).count(), 10);
+ assert.eq(st.rs1.getPrimary().getCollection(coll1.getFullName()).count(), 10);
+
+ // Read the stream of events, capture them in 'changeList', and confirm that all events occurred
+ // at or later than the clusterTime of the first event. Unfortunately, we cannot guarantee that
+ // corresponding events occurred at the same clusterTime on both shards; we expect, however,
+ // that this will be true in the vast majority of runs, and so there is value in testing.
+ for (let i = 0; i < 19; ++i) {
+ assert.soon(() => changeStreamCursor.hasNext());
+ changeList.push(changeStreamCursor.next());
+ }
+ const clusterTime = changeList[0].clusterTime;
+ for (let event of changeList) {
+ assert.gte(event.clusterTime, clusterTime);
+ }
+
+ // Test that resuming from each event returns the expected set of subsequent documents.
+ for (let i = 0; i < changeList.length; ++i) {
+ const resumeCursor = adminDB.aggregate(
+ [{$changeStream: {allChangesForCluster: true, resumeAfter: changeList[i]._id}}]);
+
+ // Confirm that the first event in the resumed stream matches the next event recorded in
+ // 'changeList' from the original stream. The order of the events should be stable across
+ // resumes from any point.
+ for (let x = (i + 1); x < changeList.length; ++x) {
+ const expectedChangeDoc = changeList[x];
+ assertWriteVisible({
+ cursor: resumeCursor,
+ opType: expectedChangeDoc.operationType,
+ docKey: expectedChangeDoc.documentKey
+ });
+ }
+ resumeCursor.close();
+ }
+
+ st.stop();
+})();
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_check_resume_token.cpp
index eea032da4f5..fab7fcf2e5f 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token.cpp
@@ -73,8 +73,10 @@ ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionConte
// If the tokenType exceeds the client token's type, then we have passed the resume token point.
// This can happen if the client resumes from a synthetic 'high water mark' token from another
// shard which happens to have the same clusterTime as an actual change on this shard.
- if (tokenDataFromResumedStream.tokenType > tokenDataFromClient.tokenType) {
- return ResumeStatus::kSurpassedToken;
+ if (tokenDataFromResumedStream.tokenType != tokenDataFromClient.tokenType) {
+ return tokenDataFromResumedStream.tokenType > tokenDataFromClient.tokenType
+ ? ResumeStatus::kSurpassedToken
+ : ResumeStatus::kCheckNextDoc;
}
if (tokenDataFromResumedStream.applyOpsIndex < tokenDataFromClient.applyOpsIndex) {
@@ -82,8 +84,11 @@ ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionConte
} else if (tokenDataFromResumedStream.applyOpsIndex > tokenDataFromClient.applyOpsIndex) {
// This could happen if the client provided an applyOpsIndex of 0, yet the 0th document in
// the applyOps was irrelevant (meaning it was an operation on a collection or DB not being
- // watched). This indicates a corrupt resume token.
- uasserted(50792, "Invalid resumeToken: applyOpsIndex was skipped");
+ // watched). If we are looking for the resume token on a shard then this simply means that
+ // the resume token may be on a different shard; otherwise, it indicates a corrupt token.
+ uassert(50792, "Invalid resumeToken: applyOpsIndex was skipped", expCtx->needsMerge);
+ // We are running on a merging shard. Signal that we have read beyond the resume token.
+ return ResumeStatus::kSurpassedToken;
}
// It is acceptable for the stream UUID to differ from the client's, if this is a whole-database
@@ -91,17 +96,23 @@ ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionConte
// clusterTime. If the stream UUID sorts after the client's, however, then the stream is not
// resumable; we are past the point in the stream where the token should have appeared.
if (tokenDataFromResumedStream.uuid != tokenDataFromClient.uuid) {
- // If we're not in mongos then this must be a replica set deployment, in which case we don't
- // ever expect to see identical timestamps and we reject the resume attempt immediately.
- return !expCtx->inMongos || tokenDataFromResumedStream.uuid > tokenDataFromClient.uuid
+ // If we are running on a replica set deployment, we don't ever expect to see identical time
+ // stamps and applyOpsIndex but differing UUIDs, and we reject the resume attempt at once.
+ if (!expCtx->inMongos && !expCtx->needsMerge) {
+ return ResumeStatus::kSurpassedToken;
+ }
+ // Otherwise, return a ResumeStatus based on the sort-order of the client and stream UUIDs.
+ return tokenDataFromResumedStream.uuid > tokenDataFromClient.uuid
? ResumeStatus::kSurpassedToken
: ResumeStatus::kCheckNextDoc;
}
+
// If all the fields match exactly, then we have found the token.
if (ValueComparator::kInstance.evaluate(tokenDataFromResumedStream.documentKey ==
tokenDataFromClient.documentKey)) {
return ResumeStatus::kFoundToken;
}
+
// At this point, we know that the tokens differ only by documentKey. The status we return will
// depend on whether the stream token is logically before or after the client token. If the
// latter, then we will never see the resume token and the stream cannot be resumed. However,