diff options
author | Charlie Swanson <charlie.swanson@mongodb.com> | 2018-08-27 18:12:31 -0400 |
---|---|---|
committer | Charlie Swanson <charlie.swanson@mongodb.com> | 2018-09-04 11:04:17 -0400 |
commit | 92f108d9379c8b7f7d5769ed38be517b45858b28 (patch) | |
tree | 93c1fca99110af4b07a3459895fc901250f1b028 | |
parent | 23794c14b03bb272daad7a2b25eca0b80c03a31c (diff) | |
download | mongo-92f108d9379c8b7f7d5769ed38be517b45858b28.tar.gz |
SERVER-35776 Basic 4.0-4.2 up/downgrade for change streams
11 files changed, 275 insertions, 76 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml index 0fce9a14973..adfe330650d 100644 --- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml +++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml @@ -43,24 +43,6 @@ selector: - jstests/sharding/shard_collection_existing_zones.js - jstests/sharding/snapshot_cursor_commands_mongos.js - jstests/sharding/update_sharded.js - # SERVER-35977 - - jstests/sharding/change_stream_lookup_single_shard_cluster.js - - jstests/sharding/change_stream_metadata_notifications.js - - jstests/sharding/change_stream_read_preference.js - - jstests/sharding/change_stream_resume_from_different_mongos.js - - jstests/sharding/change_streams.js - - jstests/sharding/change_stream_shard_failover.js - - jstests/sharding/change_streams_primary_shard_unaware.js - - jstests/sharding/change_streams_unsharded_becomes_sharded.js - - jstests/sharding/change_streams_whole_db.js - - jstests/sharding/change_stream_update_lookup_collation.js - - jstests/sharding/change_stream_update_lookup_read_concern.js - - jstests/sharding/lookup_change_stream_post_image_compound_shard_key.js - - jstests/sharding/lookup_change_stream_post_image_hashed_shard_key.js - - jstests/sharding/lookup_change_stream_post_image_id_shard_key.js - - jstests/sharding/resume_change_stream.js - - jstests/sharding/resume_change_stream_from_stale_mongos.js - - jstests/sharding/resume_change_stream_on_subset_of_shards.js # Enable if SERVER-20865 is backported or 4.2 becomes last-stable - jstests/sharding/sharding_statistics_server_status.js executor: diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml index 89252167a40..54d69dbd0e7 100644 --- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml +++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml @@ -358,24 +358,6 @@ selector: - jstests/sharding/shard_collection_existing_zones.js - jstests/sharding/snapshot_cursor_commands_mongos.js - jstests/sharding/update_sharded.js - # SERVER-35977 - - jstests/sharding/change_stream_lookup_single_shard_cluster.js - - jstests/sharding/change_stream_metadata_notifications.js - - jstests/sharding/change_stream_read_preference.js - - jstests/sharding/change_stream_resume_from_different_mongos.js - - jstests/sharding/change_streams.js - - jstests/sharding/change_stream_shard_failover.js - - jstests/sharding/change_streams_primary_shard_unaware.js - - jstests/sharding/change_streams_unsharded_becomes_sharded.js - - jstests/sharding/change_streams_whole_db.js - - jstests/sharding/change_stream_update_lookup_collation.js - - jstests/sharding/change_stream_update_lookup_read_concern.js - - jstests/sharding/lookup_change_stream_post_image_compound_shard_key.js - - jstests/sharding/lookup_change_stream_post_image_hashed_shard_key.js - - jstests/sharding/lookup_change_stream_post_image_id_shard_key.js - - jstests/sharding/resume_change_stream.js - - jstests/sharding/resume_change_stream_from_stale_mongos.js - - jstests/sharding/resume_change_stream_on_subset_of_shards.js # Enable if SERVER-20865 is backported or 4.2 becomes last-stable - jstests/sharding/sharding_statistics_server_status.js diff --git a/buildscripts/templates/generate_resmoke_suites/sharding_last_stable_mongos_and_mixed_shards.yml.j2 b/buildscripts/templates/generate_resmoke_suites/sharding_last_stable_mongos_and_mixed_shards.yml.j2 index 5859dd6f677..da50c0baef0 100644 --- a/buildscripts/templates/generate_resmoke_suites/sharding_last_stable_mongos_and_mixed_shards.yml.j2 +++ b/buildscripts/templates/generate_resmoke_suites/sharding_last_stable_mongos_and_mixed_shards.yml.j2 @@ -56,24 +56,6 @@ selector: - jstests/sharding/shard_collection_existing_zones.js - jstests/sharding/snapshot_cursor_commands_mongos.js - jstests/sharding/update_sharded.js - # SERVER-35977 - - jstests/sharding/change_stream_lookup_single_shard_cluster.js - - jstests/sharding/change_stream_metadata_notifications.js - - jstests/sharding/change_stream_read_preference.js - - jstests/sharding/change_stream_resume_from_different_mongos.js - - jstests/sharding/change_streams.js - - jstests/sharding/change_stream_shard_failover.js - - jstests/sharding/change_streams_primary_shard_unaware.js - - jstests/sharding/change_streams_unsharded_becomes_sharded.js - - jstests/sharding/change_streams_whole_db.js - - jstests/sharding/change_stream_update_lookup_collation.js - - jstests/sharding/change_stream_update_lookup_read_concern.js - - jstests/sharding/lookup_change_stream_post_image_compound_shard_key.js - - jstests/sharding/lookup_change_stream_post_image_hashed_shard_key.js - - jstests/sharding/lookup_change_stream_post_image_id_shard_key.js - - jstests/sharding/resume_change_stream.js - - jstests/sharding/resume_change_stream_from_stale_mongos.js - - jstests/sharding/resume_change_stream_on_subset_of_shards.js # Enable if SERVER-20865 is backported or 4.2 becomes last-stable - jstests/sharding/sharding_statistics_server_status.js diff --git a/jstests/multiVersion/change_streams_feature_compatibility_version.js b/jstests/multiVersion/change_streams_feature_compatibility_version.js new file mode 100644 index 00000000000..69615ff0a30 --- /dev/null +++ b/jstests/multiVersion/change_streams_feature_compatibility_version.js @@ -0,0 +1,104 @@ +// Test that a change stream is able to survive an upgrade. This is the most basic test to +// demonstrate the survival of a stream, presuming the driver will attempt to retry and resume the +// stream after network errors. +(function() { + "use strict"; + + load("jstests/multiVersion/libs/multi_rs.js"); // For upgradeSet. + load("jstests/replsets/rslib.js"); // For startSetIfSupportsReadMajority. + + const rst = new ReplSetTest({ + nodes: 2, + nodeOptions: {binVersion: "last-stable"}, + }); + + if (!startSetIfSupportsReadMajority(rst)) { + jsTestLog("Skipping test since storage engine doesn't support majority read concern."); + rst.stopSet(); + return; + } + + rst.initiate(); + + let testDB = rst.getPrimary().getDB(jsTestName()); + let coll = testDB.change_stream_upgrade; + + // Open a change stream against a 4.0 binary. We will use the resume token from this stream to + // resume the stream once the set has been upgraded. + let streamStartedOnOldVersion = coll.watch(); + assert.commandWorked(coll.insert({_id: "first insert, just for resume token"})); + + assert.soon(() => streamStartedOnOldVersion.hasNext()); + let change = streamStartedOnOldVersion.next(); + assert.eq(change.operationType, "insert", tojson(change)); + assert.eq(change.documentKey._id, "first insert, just for resume token", tojson(change)); + const resumeTokenFromLastStable = change._id; + + assert.commandWorked(coll.insert({_id: "before binary upgrade"})); + // Upgrade the set to the new binary version, but keep the feature compatibility version at 4.0. + rst.upgradeSet({binVersion: "latest"}); + testDB = rst.getPrimary().getDB(jsTestName()); + coll = testDB.change_stream_upgrade; + + // Test that we can resume the stream on the new binaries. + streamStartedOnOldVersion = coll.watch([], {resumeAfter: resumeTokenFromLastStable}); + assert.soon(() => streamStartedOnOldVersion.hasNext()); + change = streamStartedOnOldVersion.next(); + assert.eq(change.operationType, "insert", tojson(change)); + assert.eq(change.documentKey._id, "before binary upgrade", tojson(change)); + + let streamStartedOnNewVersionOldFCV = coll.watch(); + + assert.commandWorked(coll.insert({_id: "after binary upgrade, before fcv switch"})); + + let resumeTokenFromNewVersionOldFCV; + [streamStartedOnOldVersion, streamStartedOnNewVersionOldFCV].forEach(stream => { + assert.soon(() => stream.hasNext()); + change = stream.next(); + assert.eq(change.operationType, "insert", tojson(change)); + assert.eq( + change.documentKey._id, "after binary upgrade, before fcv switch", tojson(change)); + if (resumeTokenFromNewVersionOldFCV === undefined) { + resumeTokenFromNewVersionOldFCV = change._id; + } else { + assert.eq(resumeTokenFromNewVersionOldFCV, change._id); + } + }); + + // Explicitly set feature compatibility version to 4.2. + assert.commandWorked(testDB.adminCommand({setFeatureCompatibilityVersion: "4.2"})); + + const streamStartedOnNewVersion = coll.watch(); + + // Test that we can still resume with the token from the old version. We should see the same + // document again. + streamStartedOnOldVersion = coll.watch([], {resumeAfter: resumeTokenFromLastStable}); + assert.soon(() => streamStartedOnOldVersion.hasNext()); + change = streamStartedOnOldVersion.next(); + assert.eq(change.operationType, "insert", tojson(change)); + assert.eq(change.documentKey._id, "before binary upgrade", tojson(change)); + + assert.soon(() => streamStartedOnOldVersion.hasNext()); + change = streamStartedOnOldVersion.next(); + assert.eq(change.operationType, "insert", tojson(change)); + assert.eq(change.documentKey._id, "after binary upgrade, before fcv switch", tojson(change)); + + assert.commandWorked(coll.insert({_id: "after fcv upgrade"})); + const resumedStreamOnNewVersion = + coll.watch([], {resumeAfter: resumeTokenFromNewVersionOldFCV}); + + // Test that all open streams continue to produce change events, and that the newly resumed + // stream sees the write that just happened since it comes after the resume token used. + for (let stream of[streamStartedOnOldVersion, + streamStartedOnNewVersionOldFCV, + streamStartedOnNewVersion, + resumedStreamOnNewVersion]) { + assert.soon(() => stream.hasNext()); + change = stream.next(); + assert.eq(change.operationType, "insert", tojson(change)); + assert.eq(change.documentKey._id, "after fcv upgrade", tojson(change)); + stream.close(); + } + + rst.stopSet(); +}()); diff --git a/jstests/multiVersion/change_streams_resume_token_version.js b/jstests/multiVersion/change_streams_resume_token_version.js new file mode 100644 index 00000000000..62e97127c78 --- /dev/null +++ b/jstests/multiVersion/change_streams_resume_token_version.js @@ -0,0 +1,107 @@ +// Tests that a resume token from FCV 4.0 cannot be used with the new 'startAfter' option, because +// the old version of the resume token doesn't contain enough information to distinguish an +// invalidate event from the event which generated the invalidate. +(function() { + "use strict"; + + load("jstests/replsets/rslib.js"); // For startSetIfSupportsReadMajority. + + const rst = new ReplSetTest({nodes: 1}); + if (!startSetIfSupportsReadMajority(rst)) { + jsTestLog("Skipping test since storage engine doesn't support majority read concern."); + rst.stopSet(); + return; + } + rst.initiate(); + + let testDB = rst.getPrimary().getDB(jsTestName()); + let coll = testDB.change_stream_upgrade; + + assert.commandWorked(testDB.adminCommand({setFeatureCompatibilityVersion: "4.0"})); + + (function testInvalidatesOldFCV() { + // Open a change stream in 4.0 compatibility version. This stream should be using the old + // version of resume tokens which cannot distinguish between a drop and the invalidate that + // follows the drop. + assert.commandWorked(testDB.runCommand({create: coll.getName()})); + const streamStartedOnOldFCV = coll.watch(); + coll.drop(); + + assert.soon(() => streamStartedOnOldFCV.hasNext()); + let change = streamStartedOnOldFCV.next(); + assert.eq(change.operationType, "drop", tojson(change)); + const resumeTokenFromDrop = change._id; + + assert.soon(() => streamStartedOnOldFCV.hasNext()); + change = streamStartedOnOldFCV.next(); + assert.eq(change.operationType, "invalidate", tojson(change)); + const resumeTokenFromInvalidate = change._id; + + // Only on FCV 4.0 - these two resume tokens should be the same. Because they cannot be + // distinguished, any attempt to resume or start a new stream should immediately return + // invalidate. + assert.eq(resumeTokenFromDrop, resumeTokenFromInvalidate); + for (let token of[resumeTokenFromDrop, resumeTokenFromInvalidate]) { + let newStream = coll.watch([], {startAfter: token, collation: {locale: "simple"}}); + assert.soon(() => newStream.hasNext()); + assert.eq(newStream.next().operationType, "invalidate"); + + // Test the same thing but with 'resumeAfter' instead of 'startAfter'. + newStream = coll.watch([], {resumeAfter: token, collation: {locale: "simple"}}); + assert.soon(() => newStream.hasNext()); + assert.eq(newStream.next().operationType, "invalidate"); + } + }()); + + assert.commandWorked(testDB.adminCommand({setFeatureCompatibilityVersion: "4.2"})); + + (function testInvalidatesNewFCV() { + // Open a change stream in 4.2 compatibility version. This stream should be using the new + // version of resume tokens which *can* distinguish between a drop and the invalidate that + // follows the drop. + assert.commandWorked(testDB.runCommand({create: coll.getName()})); + const changeStream = coll.watch(); + coll.drop(); + + assert.soon(() => changeStream.hasNext()); + let change = changeStream.next(); + assert.eq(change.operationType, "drop", tojson(change)); + const resumeTokenFromDrop = change._id; + + assert.soon(() => changeStream.hasNext()); + change = changeStream.next(); + assert.eq(change.operationType, "invalidate", tojson(change)); + const resumeTokenFromInvalidate = change._id; + + assert.commandWorked(coll.insert({_id: "insert after drop"})); + + assert.neq(resumeTokenFromDrop, + resumeTokenFromInvalidate, + () => tojson(resumeTokenFromDrop) + " should not equal " + + tojson(resumeTokenFromInvalidate)); + let newStream = + coll.watch([], {startAfter: resumeTokenFromDrop, collation: {locale: "simple"}}); + assert.soon(() => newStream.hasNext()); + assert.eq(newStream.next().operationType, "invalidate"); + + newStream = + coll.watch([], {startAfter: resumeTokenFromInvalidate, collation: {locale: "simple"}}); + assert.soon(() => newStream.hasNext()); + change = newStream.next(); + assert.eq(change.operationType, "insert"); + assert.eq(change.documentKey._id, "insert after drop"); + + // Test the same thing but with 'resumeAfter' instead of 'startAfter'. This should see an + // invalidate on the first, and reject the second. + newStream = + coll.watch([], {resumeAfter: resumeTokenFromDrop, collation: {locale: "simple"}}); + assert.soon(() => newStream.hasNext()); + assert.eq(newStream.next().operationType, "invalidate"); + const error = assert.throws( + () => coll.watch( + [], {resumeAfter: resumeTokenFromInvalidate, collation: {locale: "simple"}})); + assert.eq(error.code, ErrorCodes.InvalidResumeToken); + }()); + + rst.stopSet(); +}()); diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index 448ac608724..5ad06bd9329 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -386,7 +386,9 @@ list<intrusive_ptr<DocumentSource>> buildPipeline(const intrusive_ptr<Expression expCtx)); } - stages.push_back(DocumentSourceChangeStreamTransform::create(expCtx, elem.embeddedObject())); + const auto fcv = serverGlobalParams.featureCompatibility.getVersion(); + stages.push_back( + DocumentSourceChangeStreamTransform::create(expCtx, fcv, elem.embeddedObject())); stages.push_back(DocumentSourceCheckInvalidate::create(expCtx, ignoreFirstInvalidate)); // The resume stage must come after the check invalidate stage to allow the check invalidate diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index c1432f4d960..a7f6e01d235 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -68,16 +68,21 @@ constexpr auto checkValueType = &DocumentSourceChangeStream::checkValueType; } // namespace boost::intrusive_ptr<DocumentSourceChangeStreamTransform> -DocumentSourceChangeStreamTransform::create(const boost::intrusive_ptr<ExpressionContext>& expCtx, - BSONObj changeStreamSpec) { - return new DocumentSourceChangeStreamTransform(expCtx, changeStreamSpec); +DocumentSourceChangeStreamTransform::create( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const ServerGlobalParams::FeatureCompatibility::Version& fcv, + BSONObj changeStreamSpec) { + return new DocumentSourceChangeStreamTransform(expCtx, fcv, changeStreamSpec); } DocumentSourceChangeStreamTransform::DocumentSourceChangeStreamTransform( - const boost::intrusive_ptr<ExpressionContext>& expCtx, BSONObj changeStreamSpec) + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const ServerGlobalParams::FeatureCompatibility::Version& fcv, + BSONObj changeStreamSpec) : DocumentSource(expCtx), _changeStreamSpec(changeStreamSpec.getOwned()), - _isIndependentOfAnyCollection(expCtx->ns.isCollectionlessAggregateNS()) { + _isIndependentOfAnyCollection(expCtx->ns.isCollectionlessAggregateNS()), + _fcv(fcv) { _nsRegex.emplace(DocumentSourceChangeStream::getNsRegexForChangeStream(expCtx->ns)); @@ -174,6 +179,10 @@ ResumeTokenData DocumentSourceChangeStreamTransform::getResumeToken(Value ts, if (!uuid.missing()) resumeTokenData.uuid = uuid.getUuid(); + if (_fcv < ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42) { + resumeTokenData.version = 0; + } + return resumeTokenData; } diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.h b/src/mongo/db/pipeline/document_source_change_stream_transform.h index 543a03aedd5..d0b2450fb91 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.h +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.h @@ -42,7 +42,9 @@ public: * Creates a new transformation stage from the given specification. */ static boost::intrusive_ptr<DocumentSourceChangeStreamTransform> create( - const boost::intrusive_ptr<ExpressionContext>&, BSONObj changeStreamSpec); + const boost::intrusive_ptr<ExpressionContext>&, + const ServerGlobalParams::FeatureCompatibility::Version&, + BSONObj changeStreamSpec); Document applyTransformation(const Document& input); DepsTracker::State getDependencies(DepsTracker* deps) const final; @@ -57,7 +59,8 @@ public: private: // This constructor is private, callers should use the 'create()' method above. - DocumentSourceChangeStreamTransform(const boost::intrusive_ptr<ExpressionContext>& expCtx, + DocumentSourceChangeStreamTransform(const boost::intrusive_ptr<ExpressionContext>&, + const ServerGlobalParams::FeatureCompatibility::Version&, BSONObj changeStreamSpec); struct DocumentKeyCacheEntry { @@ -146,6 +149,11 @@ private: // Set to true if this transformation stage can be run on the collectionless namespace. bool _isIndependentOfAnyCollection; + + // '_fcv' is used to determine which version of the resume token to generate for each change. + // This is a snapshot of what the feature compatibility version was at the time the stream was + // opened or resumed. + ServerGlobalParams::FeatureCompatibility::Version _fcv; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/resume_token.cpp b/src/mongo/db/pipeline/resume_token.cpp index 77643c51f34..626ae556d18 100644 --- a/src/mongo/db/pipeline/resume_token.cpp +++ b/src/mongo/db/pipeline/resume_token.cpp @@ -45,13 +45,18 @@ constexpr StringData ResumeToken::kDataFieldName; constexpr StringData ResumeToken::kTypeBitsFieldName; bool ResumeTokenData::operator==(const ResumeTokenData& other) const { - return clusterTime == other.clusterTime && + return clusterTime == other.clusterTime && version == other.version && + fromInvalidate == other.fromInvalidate && (Value::compare(this->documentKey, other.documentKey, nullptr) == 0) && uuid == other.uuid; } std::ostream& operator<<(std::ostream& out, const ResumeTokenData& tokenData) { - return out << "{clusterTime: " << tokenData.clusterTime.toString() - << " documentKey: " << tokenData.documentKey << " uuid: " << tokenData.uuid << "}"; + out << "{clusterTime: " << tokenData.clusterTime.toString() + << ", version: " << tokenData.version << ", applyOpsIndex" << tokenData.applyOpsIndex; + if (tokenData.version > 0) { + out << ", fromInvalidate: " << (tokenData.fromInvalidate ? "0" : "1"); + } + return out << ", documentKey: " << tokenData.documentKey << ", uuid: " << tokenData.uuid << "}"; } ResumeToken::ResumeToken(const Document& resumeDoc) { @@ -77,7 +82,9 @@ ResumeToken::ResumeToken(const ResumeTokenData& data) { builder.append("", data.clusterTime); builder.append("", data.version); builder.appendNumber("", data.applyOpsIndex); - builder.appendBool("", data.fromInvalidate); + if (data.version >= 1) { + builder.appendBool("", data.fromInvalidate); + } uassert(50788, "Unexpected resume token with a documentKey but no UUID", data.uuid || data.documentKey.missing()); @@ -137,7 +144,9 @@ ResumeTokenData ResumeToken::getData() const { "Invalid resume token: wrong type for version", versionElt.type() == BSONType::NumberInt); result.version = versionElt.numberInt(); - uassert(50795, "Invalid Resume Token: only supports version 0", result.version == 0); + uassert(50795, + "Invalid Resume Token: only supports version 0 or 1", + result.version == 0 || result.version == 1); // Next comes the applyOps index. uassert(50793, "Resume Token does not contain applyOpsIndex", i.more()); @@ -149,12 +158,16 @@ ResumeTokenData ResumeToken::getData() const { uassert(50794, "Invalid Resume Token: applyOpsIndex should be non-negative", applyOpsInd >= 0); result.applyOpsIndex = applyOpsInd; - uassert(50872, "Resume Token does not contain fromInvalidate", i.more()); - auto fromInvalidate = i.next(); - uassert(50870, - "Resume Token fromInvalidate is not a boolean.", - fromInvalidate.type() == BSONType::Bool); - result.fromInvalidate = ResumeTokenData::FromInvalidate(fromInvalidate.boolean()); + if (result.version >= 1) { + // The 'fromInvalidate' bool was added in version 1 resume tokens. We don't expect to see it + // on version 0 tokens. After this bool, the remaining fields should be the same. + uassert(50872, "Resume Token does not contain fromInvalidate", i.more()); + auto fromInvalidate = i.next(); + uassert(50870, + "Resume Token fromInvalidate is not a boolean.", + fromInvalidate.type() == BSONType::Bool); + result.fromInvalidate = ResumeTokenData::FromInvalidate(fromInvalidate.boolean()); + } // The UUID and documentKey are not required. if (!i.more()) { diff --git a/src/mongo/db/pipeline/resume_token.h b/src/mongo/db/pipeline/resume_token.h index 25318719e67..6c2b5fd5965 100644 --- a/src/mongo/db/pipeline/resume_token.h +++ b/src/mongo/db/pipeline/resume_token.h @@ -66,7 +66,7 @@ struct ResumeTokenData { }; Timestamp clusterTime; - int version = 0; + int version = 1; size_t applyOpsIndex = 0; Value documentKey; boost::optional<UUID> uuid; diff --git a/src/mongo/db/pipeline/resume_token_test.cpp b/src/mongo/db/pipeline/resume_token_test.cpp index 21883a4c190..dda65275c3c 100644 --- a/src/mongo/db/pipeline/resume_token_test.cpp +++ b/src/mongo/db/pipeline/resume_token_test.cpp @@ -206,15 +206,25 @@ TEST(ResumeToken, WrongVersionToken) { ResumeTokenData resumeTokenDataIn; resumeTokenDataIn.clusterTime = ts; resumeTokenDataIn.version = 0; + resumeTokenDataIn.fromInvalidate = ResumeTokenData::FromInvalidate::kFromInvalidate; - // This one with version 0 should succeed. + // This one with version 0 should succeed. Version 0 cannot encode the fromInvalidate bool, so + // we expect it to be set to the default 'kNotFromInvalidate' after serialization. auto rtToken = ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument().toBson()); ResumeTokenData tokenData = rtToken.getData(); + ASSERT_NE(resumeTokenDataIn, tokenData); + tokenData.fromInvalidate = ResumeTokenData::FromInvalidate::kFromInvalidate; ASSERT_EQ(resumeTokenDataIn, tokenData); - // With version 1 it should fail. + // Version 1 should include the 'fromInvalidate' bool through serialization. resumeTokenDataIn.version = 1; rtToken = ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument().toBson()); + tokenData = rtToken.getData(); + ASSERT_EQ(resumeTokenDataIn, tokenData); + + // With version 2 it should fail - the maximum supported version is 1. + resumeTokenDataIn.version = 2; + rtToken = ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument().toBson()); ASSERT_THROWS(rtToken.getData(), AssertionException); } |