summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/noPassthrough/change_stream_resume_before_add_shard.js120
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token.cpp13
2 files changed, 130 insertions, 3 deletions
diff --git a/jstests/noPassthrough/change_stream_resume_before_add_shard.js b/jstests/noPassthrough/change_stream_resume_before_add_shard.js
new file mode 100644
index 00000000000..ddbdc6f3d35
--- /dev/null
+++ b/jstests/noPassthrough/change_stream_resume_before_add_shard.js
@@ -0,0 +1,120 @@
+/**
+ * Tests that a change stream can be resumed from a point in time before a new shard was added to
+ * the cluster. Exercises the fix for SERVER-42232.
+ * @tags: [uses_change_streams, requires_sharding]
+ */
+(function() {
+ "use strict";
+
+ const rsNodeOptions = {setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}};
+ const st =
+ new ShardingTest({shards: 1, mongos: 1, rs: {nodes: 1}, other: {rsOptions: rsNodeOptions}});
+
+ const mongosDB = st.s.getDB(jsTestName());
+ const coll = mongosDB.test;
+
+ // Helper function to confirm that a stream sees an expected sequence of documents. This
+ // function also pushes all observed changes into the supplied 'eventList' array.
+ function assertAllEventsObserved(changeStream, expectedDocs, eventList) {
+ for (let expectedDoc of expectedDocs) {
+ assert.soon(() => changeStream.hasNext());
+ const nextEvent = changeStream.next();
+ assert.eq(nextEvent.fullDocument, expectedDoc);
+ if (eventList) {
+ eventList.push(nextEvent);
+ }
+ }
+ }
+
+ // Helper function to add a new ReplSetTest shard into the cluster. Using single-node shards
+ // ensures that the "initiating set" entry cannot be rolled back.
+ function addShardToCluster(shardName) {
+ const replTest = new ReplSetTest({name: shardName, nodes: 1, nodeOptions: rsNodeOptions});
+ replTest.startSet({shardsvr: ""});
+ replTest.initiate();
+ assert.commandWorked(st.s.adminCommand({addShard: replTest.getURL(), name: shardName}));
+
+ // Verify that the new shard's first oplog entry contains the string "initiating set". This
+ // is used by change streams as a sentinel to indicate that no writes have occurred on the
+ // replica set before this point.
+ const firstOplogEntry = replTest.getPrimary().getCollection("local.oplog.rs").findOne();
+ assert.docEq(firstOplogEntry.o, {msg: "initiating set"});
+ assert.eq(firstOplogEntry.op, "n");
+
+ return replTest;
+ }
+
+ // Helper function to resume from each event in a given list and confirm that the resumed stream
+ // sees the subsequent events in the correct expected order. We specify an explicit collation
+ // because in some cases we will be testing resumability after the collection is dropped.
+ function assertCanResumeFromEachEvent(eventList) {
+ for (let i = 0; i < eventList.length; ++i) {
+ const resumedStream =
+ coll.watch([], {resumeAfter: eventList[i]._id, collation: {locale: "simple"}});
+ for (let j = i + 1; j < eventList.length; ++j) {
+ assert.soon(() => resumedStream.hasNext());
+ assert.docEq(resumedStream.next(), eventList[j]);
+ }
+ resumedStream.close();
+ }
+ }
+
+ // Open a change stream on the unsharded test collection.
+ const csCursor = coll.watch();
+ assert(!csCursor.hasNext());
+ const changeList = [];
+
+ // Insert some docs into the unsharded collection, and obtain a change stream event for each.
+ const insertedDocs = [{_id: 1}, {_id: 2}, {_id: 3}];
+ assert.commandWorked(coll.insert(insertedDocs));
+ assertAllEventsObserved(csCursor, insertedDocs, changeList);
+
+ // Verify that, for a brand new shard, we can start at an operation time before the set existed.
+ let startAtDawnOfTimeCursor = coll.watch([], {startAtOperationTime: Timestamp(1, 1)});
+ assertAllEventsObserved(startAtDawnOfTimeCursor, insertedDocs);
+ startAtDawnOfTimeCursor.close();
+
+ // Add a new shard into the cluster. Wait three seconds so that its initiation time is
+ // guaranteed to be later than any of the events in the existing shard's oplog.
+ const newShard1 = sleep(3000) || addShardToCluster("newShard1");
+
+ // .. and confirm that we can resume from any point before the shard was added.
+ assertCanResumeFromEachEvent(changeList);
+
+ // Now shard the collection on _id and move one chunk to the new shard.
+ st.shardColl(coll, {_id: 1}, {_id: 3}, false);
+ assert.commandWorked(st.s.adminCommand(
+ {moveChunk: coll.getFullName(), find: {_id: 3}, to: "newShard1", _waitForDelete: true}));
+
+ // Insert some new documents into the new shard and verify that the original stream sees them.
+ const newInsertedDocs = [{_id: 4}, {_id: 5}];
+ assert.commandWorked(coll.insert(newInsertedDocs));
+ assertAllEventsObserved(csCursor, newInsertedDocs, changeList);
+
+ // Add a third shard into the cluster...
+ const newShard2 = sleep(3000) || addShardToCluster("newShard2");
+
+ // ... and verify that we can resume the stream from any of the preceding events.
+ assertCanResumeFromEachEvent(changeList);
+
+ // Now drop the collection, and verify that we can still resume from any point.
+ assert(coll.drop());
+ for (let expectedEvent of["drop", "invalidate"]) {
+ assert.soon(() => csCursor.hasNext());
+ assert.eq(csCursor.next().operationType, expectedEvent);
+ }
+ assertCanResumeFromEachEvent(changeList);
+
+ // Verify that we can start at an operation time before the cluster existed and see all events.
+ // We include an explicit collation because the collection has now been dropped.
+ startAtDawnOfTimeCursor =
+ coll.watch([], {startAtOperationTime: Timestamp(1, 1), collation: {locale: "simple"}});
+ assertAllEventsObserved(startAtDawnOfTimeCursor, insertedDocs.concat(newInsertedDocs));
+ startAtDawnOfTimeCursor.close();
+
+ st.stop();
+
+ // Stop the new shards manually since the ShardingTest doesn't know anything about them.
+ newShard1.stopSet();
+ newShard2.stopSet();
+})();
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_check_resume_token.cpp
index fab7fcf2e5f..8f9928631a9 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token.cpp
@@ -297,10 +297,17 @@ void DocumentSourceShardCheckResumability::_assertOplogHasEnoughHistory(
pExpCtx->mongoProcessInterface->makePipeline({matchSpec}, firstEntryExpCtx));
if (auto first = pipeline->getNext()) {
auto firstOplogEntry = Value(*first);
+ // If the first entry in the oplog is the replset initialization, then it doesn't matter
+ // if its timestamp is later than the resume token. No events earlier than the token can
+ // have fallen off this oplog, and it is therefore safe to resume. Otherwise, verify that
+ // the timestamp of the first oplog entry is earlier than that of the resume token.
+ const bool isNewRS =
+ Value::compare(firstOplogEntry["o"]["msg"], Value("initiating set"_sd), nullptr) == 0 &&
+ Value::compare(firstOplogEntry["op"], Value("n"_sd), nullptr) == 0;
uassert(40576,
- "Resume of change stream was not possible, as the resume point may no longer "
- "be in the oplog. ",
- firstOplogEntry["ts"].getTimestamp() < _tokenFromClient.clusterTime);
+ "Resume of change stream was not possible, as the resume point may no longer be in "
+ "the oplog. ",
+ isNewRS || firstOplogEntry["ts"].getTimestamp() < _tokenFromClient.clusterTime);
} else {
// Very unusual case: the oplog is empty. We can always resume. However, it should never be
// possible to have obtained a document that matched the filter if the oplog is empty.