diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2019-07-15 11:15:19 +0100 |
---|---|---|
committer | Bernard Gorman <bernard.gorman@gmail.com> | 2019-07-19 02:24:50 +0100 |
commit | 417d1a712e9f040d54beca8e4943edce218e9a8c (patch) | |
tree | fc98767f6cedbc0b600d9f075fcd1b6fd1761542 | |
parent | c6b7ac3206ec53add5dc735e75adef22d931c699 (diff) | |
download | mongo-417d1a712e9f040d54beca8e4943edce218e9a8c.tar.gz |
SERVER-42232 Adding a new shard renders all preceding resume tokens invalidr4.0.11-rc0r4.0.11
(cherry picked from commit ffdb59938db0dfc8ec48e8b74df7a54d07b3a128)
-rw-r--r-- | jstests/noPassthrough/change_stream_resume_before_add_shard.js | 120 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_check_resume_token.cpp | 13 |
2 files changed, 130 insertions, 3 deletions
diff --git a/jstests/noPassthrough/change_stream_resume_before_add_shard.js b/jstests/noPassthrough/change_stream_resume_before_add_shard.js new file mode 100644 index 00000000000..ddbdc6f3d35 --- /dev/null +++ b/jstests/noPassthrough/change_stream_resume_before_add_shard.js @@ -0,0 +1,120 @@ +/** + * Tests that a change stream can be resumed from a point in time before a new shard was added to + * the cluster. Exercises the fix for SERVER-42232. + * @tags: [uses_change_streams, requires_sharding] + */ +(function() { + "use strict"; + + const rsNodeOptions = {setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}}; + const st = + new ShardingTest({shards: 1, mongos: 1, rs: {nodes: 1}, other: {rsOptions: rsNodeOptions}}); + + const mongosDB = st.s.getDB(jsTestName()); + const coll = mongosDB.test; + + // Helper function to confirm that a stream sees an expected sequence of documents. This + // function also pushes all observed changes into the supplied 'eventList' array. + function assertAllEventsObserved(changeStream, expectedDocs, eventList) { + for (let expectedDoc of expectedDocs) { + assert.soon(() => changeStream.hasNext()); + const nextEvent = changeStream.next(); + assert.eq(nextEvent.fullDocument, expectedDoc); + if (eventList) { + eventList.push(nextEvent); + } + } + } + + // Helper function to add a new ReplSetTest shard into the cluster. Using single-node shards + // ensures that the "initiating set" entry cannot be rolled back. + function addShardToCluster(shardName) { + const replTest = new ReplSetTest({name: shardName, nodes: 1, nodeOptions: rsNodeOptions}); + replTest.startSet({shardsvr: ""}); + replTest.initiate(); + assert.commandWorked(st.s.adminCommand({addShard: replTest.getURL(), name: shardName})); + + // Verify that the new shard's first oplog entry contains the string "initiating set". This + // is used by change streams as a sentinel to indicate that no writes have occurred on the + // replica set before this point. + const firstOplogEntry = replTest.getPrimary().getCollection("local.oplog.rs").findOne(); + assert.docEq(firstOplogEntry.o, {msg: "initiating set"}); + assert.eq(firstOplogEntry.op, "n"); + + return replTest; + } + + // Helper function to resume from each event in a given list and confirm that the resumed stream + // sees the subsequent events in the correct expected order. We specify an explicit collation + // because in some cases we will be testing resumability after the collection is dropped. + function assertCanResumeFromEachEvent(eventList) { + for (let i = 0; i < eventList.length; ++i) { + const resumedStream = + coll.watch([], {resumeAfter: eventList[i]._id, collation: {locale: "simple"}}); + for (let j = i + 1; j < eventList.length; ++j) { + assert.soon(() => resumedStream.hasNext()); + assert.docEq(resumedStream.next(), eventList[j]); + } + resumedStream.close(); + } + } + + // Open a change stream on the unsharded test collection. + const csCursor = coll.watch(); + assert(!csCursor.hasNext()); + const changeList = []; + + // Insert some docs into the unsharded collection, and obtain a change stream event for each. + const insertedDocs = [{_id: 1}, {_id: 2}, {_id: 3}]; + assert.commandWorked(coll.insert(insertedDocs)); + assertAllEventsObserved(csCursor, insertedDocs, changeList); + + // Verify that, for a brand new shard, we can start at an operation time before the set existed. + let startAtDawnOfTimeCursor = coll.watch([], {startAtOperationTime: Timestamp(1, 1)}); + assertAllEventsObserved(startAtDawnOfTimeCursor, insertedDocs); + startAtDawnOfTimeCursor.close(); + + // Add a new shard into the cluster. Wait three seconds so that its initiation time is + // guaranteed to be later than any of the events in the existing shard's oplog. + const newShard1 = sleep(3000) || addShardToCluster("newShard1"); + + // .. and confirm that we can resume from any point before the shard was added. + assertCanResumeFromEachEvent(changeList); + + // Now shard the collection on _id and move one chunk to the new shard. + st.shardColl(coll, {_id: 1}, {_id: 3}, false); + assert.commandWorked(st.s.adminCommand( + {moveChunk: coll.getFullName(), find: {_id: 3}, to: "newShard1", _waitForDelete: true})); + + // Insert some new documents into the new shard and verify that the original stream sees them. + const newInsertedDocs = [{_id: 4}, {_id: 5}]; + assert.commandWorked(coll.insert(newInsertedDocs)); + assertAllEventsObserved(csCursor, newInsertedDocs, changeList); + + // Add a third shard into the cluster... + const newShard2 = sleep(3000) || addShardToCluster("newShard2"); + + // ... and verify that we can resume the stream from any of the preceding events. + assertCanResumeFromEachEvent(changeList); + + // Now drop the collection, and verify that we can still resume from any point. + assert(coll.drop()); + for (let expectedEvent of["drop", "invalidate"]) { + assert.soon(() => csCursor.hasNext()); + assert.eq(csCursor.next().operationType, expectedEvent); + } + assertCanResumeFromEachEvent(changeList); + + // Verify that we can start at an operation time before the cluster existed and see all events. + // We include an explicit collation because the collection has now been dropped. + startAtDawnOfTimeCursor = + coll.watch([], {startAtOperationTime: Timestamp(1, 1), collation: {locale: "simple"}}); + assertAllEventsObserved(startAtDawnOfTimeCursor, insertedDocs.concat(newInsertedDocs)); + startAtDawnOfTimeCursor.close(); + + st.stop(); + + // Stop the new shards manually since the ShardingTest doesn't know anything about them. + newShard1.stopSet(); + newShard2.stopSet(); +})(); diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_check_resume_token.cpp index fab7fcf2e5f..8f9928631a9 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token.cpp @@ -297,10 +297,17 @@ void DocumentSourceShardCheckResumability::_assertOplogHasEnoughHistory( pExpCtx->mongoProcessInterface->makePipeline({matchSpec}, firstEntryExpCtx)); if (auto first = pipeline->getNext()) { auto firstOplogEntry = Value(*first); + // If the first entry in the oplog is the replset initialization, then it doesn't matter + // if its timestamp is later than the resume token. No events earlier than the token can + // have fallen off this oplog, and it is therefore safe to resume. Otherwise, verify that + // the timestamp of the first oplog entry is earlier than that of the resume token. + const bool isNewRS = + Value::compare(firstOplogEntry["o"]["msg"], Value("initiating set"_sd), nullptr) == 0 && + Value::compare(firstOplogEntry["op"], Value("n"_sd), nullptr) == 0; uassert(40576, - "Resume of change stream was not possible, as the resume point may no longer " - "be in the oplog. ", - firstOplogEntry["ts"].getTimestamp() < _tokenFromClient.clusterTime); + "Resume of change stream was not possible, as the resume point may no longer be in " + "the oplog. ", + isNewRS || firstOplogEntry["ts"].getTimestamp() < _tokenFromClient.clusterTime); } else { // Very unusual case: the oplog is empty. We can always resume. However, it should never be // possible to have obtained a document that matched the filter if the oplog is empty. |