From 1b82c812a9c0bbf6dc79d5400de9ea99e6ffa025 Mon Sep 17 00:00:00 2001 From: Bernard Gorman Date: Tue, 12 Mar 2019 14:19:53 -0400 Subject: SERVER-40094 Do not prematurely reject resume attempt in DSShardCheckResumability (cherry picked from commit 4fb1c39e803ea1876ac330b80e2ec5a443b29d33) --- ...reams_resume_same_clustertime_different_uuid.js | 92 ++++++++++++++++++++++ .../document_source_check_resume_token.cpp | 25 ++++-- 2 files changed, 110 insertions(+), 7 deletions(-) create mode 100644 jstests/noPassthrough/change_streams_resume_same_clustertime_different_uuid.js diff --git a/jstests/noPassthrough/change_streams_resume_same_clustertime_different_uuid.js b/jstests/noPassthrough/change_streams_resume_same_clustertime_different_uuid.js new file mode 100644 index 00000000000..7fd5aa2abc4 --- /dev/null +++ b/jstests/noPassthrough/change_streams_resume_same_clustertime_different_uuid.js @@ -0,0 +1,92 @@ +/** + * Confirms that resuming from an event which has the same clusterTime but a different UUID than on + * another shard does not cause the resume attempt to be prematurely rejected. Reproduction script + * for the bug described in SERVER-40094. + * @tags: [requires_sharding, uses_change_streams] + */ +(function() { + "use strict"; + + // Asserts that the expected operation type and documentKey are found on the change stream + // cursor. Returns the change stream document. + function assertWriteVisible({cursor, opType, docKey}) { + assert.soon(() => cursor.hasNext()); + const changeDoc = cursor.next(); + assert.eq(opType, changeDoc.operationType, changeDoc); + assert.eq(docKey, changeDoc.documentKey, changeDoc); + return changeDoc; + } + + // Create a new cluster with 2 shards. Disable periodic no-ops to ensure that we have control + // over the ordering of events across the cluster. + const st = new ShardingTest({ + shards: 2, + rs: {nodes: 1, setParameter: {writePeriodicNoops: false, periodicNoopIntervalSecs: 1}} + }); + + // Create two databases. We will place one of these on each shard. + const mongosDB0 = st.s.getDB(`${jsTestName()}_0`); + const mongosDB1 = st.s.getDB(`${jsTestName()}_1`); + const adminDB = st.s.getDB("admin"); + + // Enable sharding on mongosDB0 and ensure its primary is shard0. + assert.commandWorked(mongosDB0.adminCommand({enableSharding: mongosDB0.getName()})); + st.ensurePrimaryShard(mongosDB0.getName(), st.rs0.getURL()); + + // Enable sharding on mongosDB1 and ensure its primary is shard1. + assert.commandWorked(mongosDB1.adminCommand({enableSharding: mongosDB1.getName()})); + st.ensurePrimaryShard(mongosDB1.getName(), st.rs1.getURL()); + + // Open a connection to a different collection on each shard. We use direct connections to + // ensure that the oplog timestamps across the shards overlap. + const coll0 = st.rs0.getPrimary().getCollection(`${mongosDB0.getName()}.test`); + const coll1 = st.rs1.getPrimary().getCollection(`${mongosDB1.getName()}.test`); + + // Open a change stream on the test cluster. We will capture events in 'changeList'. + const changeStreamCursor = adminDB.aggregate([{$changeStream: {allChangesForCluster: true}}]); + const changeList = []; + + // Insert ten documents on each shard, alternating between the two collections. + for (let i = 0; i < 20; ++i) { + const coll = (i % 2 ? coll1 : coll0); + assert.commandWorked(coll.insert({shard: (i % 2)})); + } + + // Verify that each shard now has ten total documents present in the associated collection. + assert.eq(st.rs0.getPrimary().getCollection(coll0.getFullName()).count(), 10); + assert.eq(st.rs1.getPrimary().getCollection(coll1.getFullName()).count(), 10); + + // Read the stream of events, capture them in 'changeList', and confirm that all events occurred + // at or later than the clusterTime of the first event. Unfortunately, we cannot guarantee that + // corresponding events occurred at the same clusterTime on both shards; we expect, however, + // that this will be true in the vast majority of runs, and so there is value in testing. + for (let i = 0; i < 19; ++i) { + assert.soon(() => changeStreamCursor.hasNext()); + changeList.push(changeStreamCursor.next()); + } + const clusterTime = changeList[0].clusterTime; + for (let event of changeList) { + assert.gte(event.clusterTime, clusterTime); + } + + // Test that resuming from each event returns the expected set of subsequent documents. + for (let i = 0; i < changeList.length; ++i) { + const resumeCursor = adminDB.aggregate( + [{$changeStream: {allChangesForCluster: true, resumeAfter: changeList[i]._id}}]); + + // Confirm that the first event in the resumed stream matches the next event recorded in + // 'changeList' from the original stream. The order of the events should be stable across + // resumes from any point. + for (let x = (i + 1); x < changeList.length; ++x) { + const expectedChangeDoc = changeList[x]; + assertWriteVisible({ + cursor: resumeCursor, + opType: expectedChangeDoc.operationType, + docKey: expectedChangeDoc.documentKey + }); + } + resumeCursor.close(); + } + + st.stop(); +})(); diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_check_resume_token.cpp index eea032da4f5..fab7fcf2e5f 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token.cpp @@ -73,8 +73,10 @@ ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr tokenDataFromClient.tokenType) { - return ResumeStatus::kSurpassedToken; + if (tokenDataFromResumedStream.tokenType != tokenDataFromClient.tokenType) { + return tokenDataFromResumedStream.tokenType > tokenDataFromClient.tokenType + ? ResumeStatus::kSurpassedToken + : ResumeStatus::kCheckNextDoc; } if (tokenDataFromResumedStream.applyOpsIndex < tokenDataFromClient.applyOpsIndex) { @@ -82,8 +84,11 @@ ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr tokenDataFromClient.applyOpsIndex) { // This could happen if the client provided an applyOpsIndex of 0, yet the 0th document in // the applyOps was irrelevant (meaning it was an operation on a collection or DB not being - // watched). This indicates a corrupt resume token. - uasserted(50792, "Invalid resumeToken: applyOpsIndex was skipped"); + // watched). If we are looking for the resume token on a shard then this simply means that + // the resume token may be on a different shard; otherwise, it indicates a corrupt token. + uassert(50792, "Invalid resumeToken: applyOpsIndex was skipped", expCtx->needsMerge); + // We are running on a merging shard. Signal that we have read beyond the resume token. + return ResumeStatus::kSurpassedToken; } // It is acceptable for the stream UUID to differ from the client's, if this is a whole-database @@ -91,17 +96,23 @@ ResumeStatus compareAgainstClientResumeToken(const intrusive_ptrinMongos || tokenDataFromResumedStream.uuid > tokenDataFromClient.uuid + // If we are running on a replica set deployment, we don't ever expect to see identical time + // stamps and applyOpsIndex but differing UUIDs, and we reject the resume attempt at once. + if (!expCtx->inMongos && !expCtx->needsMerge) { + return ResumeStatus::kSurpassedToken; + } + // Otherwise, return a ResumeStatus based on the sort-order of the client and stream UUIDs. + return tokenDataFromResumedStream.uuid > tokenDataFromClient.uuid ? ResumeStatus::kSurpassedToken : ResumeStatus::kCheckNextDoc; } + // If all the fields match exactly, then we have found the token. if (ValueComparator::kInstance.evaluate(tokenDataFromResumedStream.documentKey == tokenDataFromClient.documentKey)) { return ResumeStatus::kFoundToken; } + // At this point, we know that the tokens differ only by documentKey. The status we return will // depend on whether the stream token is logically before or after the client token. If the // latter, then we will never see the resume token and the stream cannot be resumed. However, -- cgit v1.2.1