summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2018-08-27 18:12:31 -0400
committerCharlie Swanson <charlie.swanson@mongodb.com>2018-09-04 11:04:17 -0400
commit92f108d9379c8b7f7d5769ed38be517b45858b28 (patch)
tree93c1fca99110af4b07a3459895fc901250f1b028
parent23794c14b03bb272daad7a2b25eca0b80c03a31c (diff)
downloadmongo-92f108d9379c8b7f7d5769ed38be517b45858b28.tar.gz
SERVER-35776 Basic 4.0-4.2 up/downgrade for change streams
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml18
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml18
-rw-r--r--buildscripts/templates/generate_resmoke_suites/sharding_last_stable_mongos_and_mixed_shards.yml.j218
-rw-r--r--jstests/multiVersion/change_streams_feature_compatibility_version.js104
-rw-r--r--jstests/multiVersion/change_streams_resume_token_version.js107
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.cpp19
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.h12
-rw-r--r--src/mongo/db/pipeline/resume_token.cpp35
-rw-r--r--src/mongo/db/pipeline/resume_token.h2
-rw-r--r--src/mongo/db/pipeline/resume_token_test.cpp14
11 files changed, 275 insertions, 76 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
index 0fce9a14973..adfe330650d 100644
--- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
@@ -43,24 +43,6 @@ selector:
- jstests/sharding/shard_collection_existing_zones.js
- jstests/sharding/snapshot_cursor_commands_mongos.js
- jstests/sharding/update_sharded.js
- # SERVER-35977
- - jstests/sharding/change_stream_lookup_single_shard_cluster.js
- - jstests/sharding/change_stream_metadata_notifications.js
- - jstests/sharding/change_stream_read_preference.js
- - jstests/sharding/change_stream_resume_from_different_mongos.js
- - jstests/sharding/change_streams.js
- - jstests/sharding/change_stream_shard_failover.js
- - jstests/sharding/change_streams_primary_shard_unaware.js
- - jstests/sharding/change_streams_unsharded_becomes_sharded.js
- - jstests/sharding/change_streams_whole_db.js
- - jstests/sharding/change_stream_update_lookup_collation.js
- - jstests/sharding/change_stream_update_lookup_read_concern.js
- - jstests/sharding/lookup_change_stream_post_image_compound_shard_key.js
- - jstests/sharding/lookup_change_stream_post_image_hashed_shard_key.js
- - jstests/sharding/lookup_change_stream_post_image_id_shard_key.js
- - jstests/sharding/resume_change_stream.js
- - jstests/sharding/resume_change_stream_from_stale_mongos.js
- - jstests/sharding/resume_change_stream_on_subset_of_shards.js
# Enable if SERVER-20865 is backported or 4.2 becomes last-stable
- jstests/sharding/sharding_statistics_server_status.js
executor:
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml
index 89252167a40..54d69dbd0e7 100644
--- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml
@@ -358,24 +358,6 @@ selector:
- jstests/sharding/shard_collection_existing_zones.js
- jstests/sharding/snapshot_cursor_commands_mongos.js
- jstests/sharding/update_sharded.js
- # SERVER-35977
- - jstests/sharding/change_stream_lookup_single_shard_cluster.js
- - jstests/sharding/change_stream_metadata_notifications.js
- - jstests/sharding/change_stream_read_preference.js
- - jstests/sharding/change_stream_resume_from_different_mongos.js
- - jstests/sharding/change_streams.js
- - jstests/sharding/change_stream_shard_failover.js
- - jstests/sharding/change_streams_primary_shard_unaware.js
- - jstests/sharding/change_streams_unsharded_becomes_sharded.js
- - jstests/sharding/change_streams_whole_db.js
- - jstests/sharding/change_stream_update_lookup_collation.js
- - jstests/sharding/change_stream_update_lookup_read_concern.js
- - jstests/sharding/lookup_change_stream_post_image_compound_shard_key.js
- - jstests/sharding/lookup_change_stream_post_image_hashed_shard_key.js
- - jstests/sharding/lookup_change_stream_post_image_id_shard_key.js
- - jstests/sharding/resume_change_stream.js
- - jstests/sharding/resume_change_stream_from_stale_mongos.js
- - jstests/sharding/resume_change_stream_on_subset_of_shards.js
# Enable if SERVER-20865 is backported or 4.2 becomes last-stable
- jstests/sharding/sharding_statistics_server_status.js
diff --git a/buildscripts/templates/generate_resmoke_suites/sharding_last_stable_mongos_and_mixed_shards.yml.j2 b/buildscripts/templates/generate_resmoke_suites/sharding_last_stable_mongos_and_mixed_shards.yml.j2
index 5859dd6f677..da50c0baef0 100644
--- a/buildscripts/templates/generate_resmoke_suites/sharding_last_stable_mongos_and_mixed_shards.yml.j2
+++ b/buildscripts/templates/generate_resmoke_suites/sharding_last_stable_mongos_and_mixed_shards.yml.j2
@@ -56,24 +56,6 @@ selector:
- jstests/sharding/shard_collection_existing_zones.js
- jstests/sharding/snapshot_cursor_commands_mongos.js
- jstests/sharding/update_sharded.js
- # SERVER-35977
- - jstests/sharding/change_stream_lookup_single_shard_cluster.js
- - jstests/sharding/change_stream_metadata_notifications.js
- - jstests/sharding/change_stream_read_preference.js
- - jstests/sharding/change_stream_resume_from_different_mongos.js
- - jstests/sharding/change_streams.js
- - jstests/sharding/change_stream_shard_failover.js
- - jstests/sharding/change_streams_primary_shard_unaware.js
- - jstests/sharding/change_streams_unsharded_becomes_sharded.js
- - jstests/sharding/change_streams_whole_db.js
- - jstests/sharding/change_stream_update_lookup_collation.js
- - jstests/sharding/change_stream_update_lookup_read_concern.js
- - jstests/sharding/lookup_change_stream_post_image_compound_shard_key.js
- - jstests/sharding/lookup_change_stream_post_image_hashed_shard_key.js
- - jstests/sharding/lookup_change_stream_post_image_id_shard_key.js
- - jstests/sharding/resume_change_stream.js
- - jstests/sharding/resume_change_stream_from_stale_mongos.js
- - jstests/sharding/resume_change_stream_on_subset_of_shards.js
# Enable if SERVER-20865 is backported or 4.2 becomes last-stable
- jstests/sharding/sharding_statistics_server_status.js
diff --git a/jstests/multiVersion/change_streams_feature_compatibility_version.js b/jstests/multiVersion/change_streams_feature_compatibility_version.js
new file mode 100644
index 00000000000..69615ff0a30
--- /dev/null
+++ b/jstests/multiVersion/change_streams_feature_compatibility_version.js
@@ -0,0 +1,104 @@
+// Test that a change stream is able to survive an upgrade. This is the most basic test to
+// demonstrate the survival of a stream, presuming the driver will attempt to retry and resume the
+// stream after network errors.
+(function() {
+ "use strict";
+
+ load("jstests/multiVersion/libs/multi_rs.js"); // For upgradeSet.
+ load("jstests/replsets/rslib.js"); // For startSetIfSupportsReadMajority.
+
+ const rst = new ReplSetTest({
+ nodes: 2,
+ nodeOptions: {binVersion: "last-stable"},
+ });
+
+ if (!startSetIfSupportsReadMajority(rst)) {
+ jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
+ rst.stopSet();
+ return;
+ }
+
+ rst.initiate();
+
+ let testDB = rst.getPrimary().getDB(jsTestName());
+ let coll = testDB.change_stream_upgrade;
+
+ // Open a change stream against a 4.0 binary. We will use the resume token from this stream to
+ // resume the stream once the set has been upgraded.
+ let streamStartedOnOldVersion = coll.watch();
+ assert.commandWorked(coll.insert({_id: "first insert, just for resume token"}));
+
+ assert.soon(() => streamStartedOnOldVersion.hasNext());
+ let change = streamStartedOnOldVersion.next();
+ assert.eq(change.operationType, "insert", tojson(change));
+ assert.eq(change.documentKey._id, "first insert, just for resume token", tojson(change));
+ const resumeTokenFromLastStable = change._id;
+
+ assert.commandWorked(coll.insert({_id: "before binary upgrade"}));
+ // Upgrade the set to the new binary version, but keep the feature compatibility version at 4.0.
+ rst.upgradeSet({binVersion: "latest"});
+ testDB = rst.getPrimary().getDB(jsTestName());
+ coll = testDB.change_stream_upgrade;
+
+ // Test that we can resume the stream on the new binaries.
+ streamStartedOnOldVersion = coll.watch([], {resumeAfter: resumeTokenFromLastStable});
+ assert.soon(() => streamStartedOnOldVersion.hasNext());
+ change = streamStartedOnOldVersion.next();
+ assert.eq(change.operationType, "insert", tojson(change));
+ assert.eq(change.documentKey._id, "before binary upgrade", tojson(change));
+
+ let streamStartedOnNewVersionOldFCV = coll.watch();
+
+ assert.commandWorked(coll.insert({_id: "after binary upgrade, before fcv switch"}));
+
+ let resumeTokenFromNewVersionOldFCV;
+ [streamStartedOnOldVersion, streamStartedOnNewVersionOldFCV].forEach(stream => {
+ assert.soon(() => stream.hasNext());
+ change = stream.next();
+ assert.eq(change.operationType, "insert", tojson(change));
+ assert.eq(
+ change.documentKey._id, "after binary upgrade, before fcv switch", tojson(change));
+ if (resumeTokenFromNewVersionOldFCV === undefined) {
+ resumeTokenFromNewVersionOldFCV = change._id;
+ } else {
+ assert.eq(resumeTokenFromNewVersionOldFCV, change._id);
+ }
+ });
+
+ // Explicitly set feature compatibility version to 4.2.
+ assert.commandWorked(testDB.adminCommand({setFeatureCompatibilityVersion: "4.2"}));
+
+ const streamStartedOnNewVersion = coll.watch();
+
+ // Test that we can still resume with the token from the old version. We should see the same
+ // document again.
+ streamStartedOnOldVersion = coll.watch([], {resumeAfter: resumeTokenFromLastStable});
+ assert.soon(() => streamStartedOnOldVersion.hasNext());
+ change = streamStartedOnOldVersion.next();
+ assert.eq(change.operationType, "insert", tojson(change));
+ assert.eq(change.documentKey._id, "before binary upgrade", tojson(change));
+
+ assert.soon(() => streamStartedOnOldVersion.hasNext());
+ change = streamStartedOnOldVersion.next();
+ assert.eq(change.operationType, "insert", tojson(change));
+ assert.eq(change.documentKey._id, "after binary upgrade, before fcv switch", tojson(change));
+
+ assert.commandWorked(coll.insert({_id: "after fcv upgrade"}));
+ const resumedStreamOnNewVersion =
+ coll.watch([], {resumeAfter: resumeTokenFromNewVersionOldFCV});
+
+ // Test that all open streams continue to produce change events, and that the newly resumed
+ // stream sees the write that just happened since it comes after the resume token used.
+ for (let stream of[streamStartedOnOldVersion,
+ streamStartedOnNewVersionOldFCV,
+ streamStartedOnNewVersion,
+ resumedStreamOnNewVersion]) {
+ assert.soon(() => stream.hasNext());
+ change = stream.next();
+ assert.eq(change.operationType, "insert", tojson(change));
+ assert.eq(change.documentKey._id, "after fcv upgrade", tojson(change));
+ stream.close();
+ }
+
+ rst.stopSet();
+}());
diff --git a/jstests/multiVersion/change_streams_resume_token_version.js b/jstests/multiVersion/change_streams_resume_token_version.js
new file mode 100644
index 00000000000..62e97127c78
--- /dev/null
+++ b/jstests/multiVersion/change_streams_resume_token_version.js
@@ -0,0 +1,107 @@
+// Tests that a resume token from FCV 4.0 cannot be used with the new 'startAfter' option, because
+// the old version of the resume token doesn't contain enough information to distinguish an
+// invalidate event from the event which generated the invalidate.
+(function() {
+ "use strict";
+
+ load("jstests/replsets/rslib.js"); // For startSetIfSupportsReadMajority.
+
+ const rst = new ReplSetTest({nodes: 1});
+ if (!startSetIfSupportsReadMajority(rst)) {
+ jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
+ rst.stopSet();
+ return;
+ }
+ rst.initiate();
+
+ let testDB = rst.getPrimary().getDB(jsTestName());
+ let coll = testDB.change_stream_upgrade;
+
+ assert.commandWorked(testDB.adminCommand({setFeatureCompatibilityVersion: "4.0"}));
+
+ (function testInvalidatesOldFCV() {
+ // Open a change stream in 4.0 compatibility version. This stream should be using the old
+ // version of resume tokens which cannot distinguish between a drop and the invalidate that
+ // follows the drop.
+ assert.commandWorked(testDB.runCommand({create: coll.getName()}));
+ const streamStartedOnOldFCV = coll.watch();
+ coll.drop();
+
+ assert.soon(() => streamStartedOnOldFCV.hasNext());
+ let change = streamStartedOnOldFCV.next();
+ assert.eq(change.operationType, "drop", tojson(change));
+ const resumeTokenFromDrop = change._id;
+
+ assert.soon(() => streamStartedOnOldFCV.hasNext());
+ change = streamStartedOnOldFCV.next();
+ assert.eq(change.operationType, "invalidate", tojson(change));
+ const resumeTokenFromInvalidate = change._id;
+
+ // Only on FCV 4.0 - these two resume tokens should be the same. Because they cannot be
+ // distinguished, any attempt to resume or start a new stream should immediately return
+ // invalidate.
+ assert.eq(resumeTokenFromDrop, resumeTokenFromInvalidate);
+ for (let token of[resumeTokenFromDrop, resumeTokenFromInvalidate]) {
+ let newStream = coll.watch([], {startAfter: token, collation: {locale: "simple"}});
+ assert.soon(() => newStream.hasNext());
+ assert.eq(newStream.next().operationType, "invalidate");
+
+ // Test the same thing but with 'resumeAfter' instead of 'startAfter'.
+ newStream = coll.watch([], {resumeAfter: token, collation: {locale: "simple"}});
+ assert.soon(() => newStream.hasNext());
+ assert.eq(newStream.next().operationType, "invalidate");
+ }
+ }());
+
+ assert.commandWorked(testDB.adminCommand({setFeatureCompatibilityVersion: "4.2"}));
+
+ (function testInvalidatesNewFCV() {
+ // Open a change stream in 4.2 compatibility version. This stream should be using the new
+ // version of resume tokens which *can* distinguish between a drop and the invalidate that
+ // follows the drop.
+ assert.commandWorked(testDB.runCommand({create: coll.getName()}));
+ const changeStream = coll.watch();
+ coll.drop();
+
+ assert.soon(() => changeStream.hasNext());
+ let change = changeStream.next();
+ assert.eq(change.operationType, "drop", tojson(change));
+ const resumeTokenFromDrop = change._id;
+
+ assert.soon(() => changeStream.hasNext());
+ change = changeStream.next();
+ assert.eq(change.operationType, "invalidate", tojson(change));
+ const resumeTokenFromInvalidate = change._id;
+
+ assert.commandWorked(coll.insert({_id: "insert after drop"}));
+
+ assert.neq(resumeTokenFromDrop,
+ resumeTokenFromInvalidate,
+ () => tojson(resumeTokenFromDrop) + " should not equal " +
+ tojson(resumeTokenFromInvalidate));
+ let newStream =
+ coll.watch([], {startAfter: resumeTokenFromDrop, collation: {locale: "simple"}});
+ assert.soon(() => newStream.hasNext());
+ assert.eq(newStream.next().operationType, "invalidate");
+
+ newStream =
+ coll.watch([], {startAfter: resumeTokenFromInvalidate, collation: {locale: "simple"}});
+ assert.soon(() => newStream.hasNext());
+ change = newStream.next();
+ assert.eq(change.operationType, "insert");
+ assert.eq(change.documentKey._id, "insert after drop");
+
+ // Test the same thing but with 'resumeAfter' instead of 'startAfter'. This should see an
+ // invalidate on the first, and reject the second.
+ newStream =
+ coll.watch([], {resumeAfter: resumeTokenFromDrop, collation: {locale: "simple"}});
+ assert.soon(() => newStream.hasNext());
+ assert.eq(newStream.next().operationType, "invalidate");
+ const error = assert.throws(
+ () => coll.watch(
+ [], {resumeAfter: resumeTokenFromInvalidate, collation: {locale: "simple"}}));
+ assert.eq(error.code, ErrorCodes.InvalidResumeToken);
+ }());
+
+ rst.stopSet();
+}());
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index 448ac608724..5ad06bd9329 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -386,7 +386,9 @@ list<intrusive_ptr<DocumentSource>> buildPipeline(const intrusive_ptr<Expression
expCtx));
}
- stages.push_back(DocumentSourceChangeStreamTransform::create(expCtx, elem.embeddedObject()));
+ const auto fcv = serverGlobalParams.featureCompatibility.getVersion();
+ stages.push_back(
+ DocumentSourceChangeStreamTransform::create(expCtx, fcv, elem.embeddedObject()));
stages.push_back(DocumentSourceCheckInvalidate::create(expCtx, ignoreFirstInvalidate));
// The resume stage must come after the check invalidate stage to allow the check invalidate
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
index c1432f4d960..a7f6e01d235 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
@@ -68,16 +68,21 @@ constexpr auto checkValueType = &DocumentSourceChangeStream::checkValueType;
} // namespace
boost::intrusive_ptr<DocumentSourceChangeStreamTransform>
-DocumentSourceChangeStreamTransform::create(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- BSONObj changeStreamSpec) {
- return new DocumentSourceChangeStreamTransform(expCtx, changeStreamSpec);
+DocumentSourceChangeStreamTransform::create(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const ServerGlobalParams::FeatureCompatibility::Version& fcv,
+ BSONObj changeStreamSpec) {
+ return new DocumentSourceChangeStreamTransform(expCtx, fcv, changeStreamSpec);
}
DocumentSourceChangeStreamTransform::DocumentSourceChangeStreamTransform(
- const boost::intrusive_ptr<ExpressionContext>& expCtx, BSONObj changeStreamSpec)
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const ServerGlobalParams::FeatureCompatibility::Version& fcv,
+ BSONObj changeStreamSpec)
: DocumentSource(expCtx),
_changeStreamSpec(changeStreamSpec.getOwned()),
- _isIndependentOfAnyCollection(expCtx->ns.isCollectionlessAggregateNS()) {
+ _isIndependentOfAnyCollection(expCtx->ns.isCollectionlessAggregateNS()),
+ _fcv(fcv) {
_nsRegex.emplace(DocumentSourceChangeStream::getNsRegexForChangeStream(expCtx->ns));
@@ -174,6 +179,10 @@ ResumeTokenData DocumentSourceChangeStreamTransform::getResumeToken(Value ts,
if (!uuid.missing())
resumeTokenData.uuid = uuid.getUuid();
+ if (_fcv < ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42) {
+ resumeTokenData.version = 0;
+ }
+
return resumeTokenData;
}
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.h b/src/mongo/db/pipeline/document_source_change_stream_transform.h
index 543a03aedd5..d0b2450fb91 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.h
@@ -42,7 +42,9 @@ public:
* Creates a new transformation stage from the given specification.
*/
static boost::intrusive_ptr<DocumentSourceChangeStreamTransform> create(
- const boost::intrusive_ptr<ExpressionContext>&, BSONObj changeStreamSpec);
+ const boost::intrusive_ptr<ExpressionContext>&,
+ const ServerGlobalParams::FeatureCompatibility::Version&,
+ BSONObj changeStreamSpec);
Document applyTransformation(const Document& input);
DepsTracker::State getDependencies(DepsTracker* deps) const final;
@@ -57,7 +59,8 @@ public:
private:
// This constructor is private, callers should use the 'create()' method above.
- DocumentSourceChangeStreamTransform(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ DocumentSourceChangeStreamTransform(const boost::intrusive_ptr<ExpressionContext>&,
+ const ServerGlobalParams::FeatureCompatibility::Version&,
BSONObj changeStreamSpec);
struct DocumentKeyCacheEntry {
@@ -146,6 +149,11 @@ private:
// Set to true if this transformation stage can be run on the collectionless namespace.
bool _isIndependentOfAnyCollection;
+
+ // '_fcv' is used to determine which version of the resume token to generate for each change.
+ // This is a snapshot of what the feature compatibility version was at the time the stream was
+ // opened or resumed.
+ ServerGlobalParams::FeatureCompatibility::Version _fcv;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/resume_token.cpp b/src/mongo/db/pipeline/resume_token.cpp
index 77643c51f34..626ae556d18 100644
--- a/src/mongo/db/pipeline/resume_token.cpp
+++ b/src/mongo/db/pipeline/resume_token.cpp
@@ -45,13 +45,18 @@ constexpr StringData ResumeToken::kDataFieldName;
constexpr StringData ResumeToken::kTypeBitsFieldName;
bool ResumeTokenData::operator==(const ResumeTokenData& other) const {
- return clusterTime == other.clusterTime &&
+ return clusterTime == other.clusterTime && version == other.version &&
+ fromInvalidate == other.fromInvalidate &&
(Value::compare(this->documentKey, other.documentKey, nullptr) == 0) && uuid == other.uuid;
}
std::ostream& operator<<(std::ostream& out, const ResumeTokenData& tokenData) {
- return out << "{clusterTime: " << tokenData.clusterTime.toString()
- << " documentKey: " << tokenData.documentKey << " uuid: " << tokenData.uuid << "}";
+ out << "{clusterTime: " << tokenData.clusterTime.toString()
+ << ", version: " << tokenData.version << ", applyOpsIndex" << tokenData.applyOpsIndex;
+ if (tokenData.version > 0) {
+ out << ", fromInvalidate: " << (tokenData.fromInvalidate ? "0" : "1");
+ }
+ return out << ", documentKey: " << tokenData.documentKey << ", uuid: " << tokenData.uuid << "}";
}
ResumeToken::ResumeToken(const Document& resumeDoc) {
@@ -77,7 +82,9 @@ ResumeToken::ResumeToken(const ResumeTokenData& data) {
builder.append("", data.clusterTime);
builder.append("", data.version);
builder.appendNumber("", data.applyOpsIndex);
- builder.appendBool("", data.fromInvalidate);
+ if (data.version >= 1) {
+ builder.appendBool("", data.fromInvalidate);
+ }
uassert(50788,
"Unexpected resume token with a documentKey but no UUID",
data.uuid || data.documentKey.missing());
@@ -137,7 +144,9 @@ ResumeTokenData ResumeToken::getData() const {
"Invalid resume token: wrong type for version",
versionElt.type() == BSONType::NumberInt);
result.version = versionElt.numberInt();
- uassert(50795, "Invalid Resume Token: only supports version 0", result.version == 0);
+ uassert(50795,
+ "Invalid Resume Token: only supports version 0 or 1",
+ result.version == 0 || result.version == 1);
// Next comes the applyOps index.
uassert(50793, "Resume Token does not contain applyOpsIndex", i.more());
@@ -149,12 +158,16 @@ ResumeTokenData ResumeToken::getData() const {
uassert(50794, "Invalid Resume Token: applyOpsIndex should be non-negative", applyOpsInd >= 0);
result.applyOpsIndex = applyOpsInd;
- uassert(50872, "Resume Token does not contain fromInvalidate", i.more());
- auto fromInvalidate = i.next();
- uassert(50870,
- "Resume Token fromInvalidate is not a boolean.",
- fromInvalidate.type() == BSONType::Bool);
- result.fromInvalidate = ResumeTokenData::FromInvalidate(fromInvalidate.boolean());
+ if (result.version >= 1) {
+ // The 'fromInvalidate' bool was added in version 1 resume tokens. We don't expect to see it
+ // on version 0 tokens. After this bool, the remaining fields should be the same.
+ uassert(50872, "Resume Token does not contain fromInvalidate", i.more());
+ auto fromInvalidate = i.next();
+ uassert(50870,
+ "Resume Token fromInvalidate is not a boolean.",
+ fromInvalidate.type() == BSONType::Bool);
+ result.fromInvalidate = ResumeTokenData::FromInvalidate(fromInvalidate.boolean());
+ }
// The UUID and documentKey are not required.
if (!i.more()) {
diff --git a/src/mongo/db/pipeline/resume_token.h b/src/mongo/db/pipeline/resume_token.h
index 25318719e67..6c2b5fd5965 100644
--- a/src/mongo/db/pipeline/resume_token.h
+++ b/src/mongo/db/pipeline/resume_token.h
@@ -66,7 +66,7 @@ struct ResumeTokenData {
};
Timestamp clusterTime;
- int version = 0;
+ int version = 1;
size_t applyOpsIndex = 0;
Value documentKey;
boost::optional<UUID> uuid;
diff --git a/src/mongo/db/pipeline/resume_token_test.cpp b/src/mongo/db/pipeline/resume_token_test.cpp
index 21883a4c190..dda65275c3c 100644
--- a/src/mongo/db/pipeline/resume_token_test.cpp
+++ b/src/mongo/db/pipeline/resume_token_test.cpp
@@ -206,15 +206,25 @@ TEST(ResumeToken, WrongVersionToken) {
ResumeTokenData resumeTokenDataIn;
resumeTokenDataIn.clusterTime = ts;
resumeTokenDataIn.version = 0;
+ resumeTokenDataIn.fromInvalidate = ResumeTokenData::FromInvalidate::kFromInvalidate;
- // This one with version 0 should succeed.
+ // This one with version 0 should succeed. Version 0 cannot encode the fromInvalidate bool, so
+ // we expect it to be set to the default 'kNotFromInvalidate' after serialization.
auto rtToken = ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument().toBson());
ResumeTokenData tokenData = rtToken.getData();
+ ASSERT_NE(resumeTokenDataIn, tokenData);
+ tokenData.fromInvalidate = ResumeTokenData::FromInvalidate::kFromInvalidate;
ASSERT_EQ(resumeTokenDataIn, tokenData);
- // With version 1 it should fail.
+ // Version 1 should include the 'fromInvalidate' bool through serialization.
resumeTokenDataIn.version = 1;
rtToken = ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument().toBson());
+ tokenData = rtToken.getData();
+ ASSERT_EQ(resumeTokenDataIn, tokenData);
+
+ // With version 2 it should fail - the maximum supported version is 1.
+ resumeTokenDataIn.version = 2;
+ rtToken = ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument().toBson());
ASSERT_THROWS(rtToken.getData(), AssertionException);
}