diff options
author | Martin Neupauer <martin.neupauer@mongodb.com> | 2018-11-14 17:47:04 -0500 |
---|---|---|
committer | Martin Neupauer <martin.neupauer@mongodb.com> | 2018-11-20 11:36:57 -0500 |
commit | dc03360c19a272542ac7a73b931f3b7222b97d46 (patch) | |
tree | 1db2e3406a2eb225dc000ac351c5c60b793e0d27 | |
parent | 1473da0d8984295f6298384299fc6be1cc51f0f5 (diff) | |
download | mongo-dc03360c19a272542ac7a73b931f3b7222b97d46.tar.gz |
SERVER-37027 Adapt change stream resume token when a stream detects FCV changes.
5 files changed, 159 insertions, 8 deletions
diff --git a/jstests/multiVersion/change_streams_feature_compatibility_version.js b/jstests/multiVersion/change_streams_feature_compatibility_version.js index 790da8b5fc4..346a03d4b38 100644 --- a/jstests/multiVersion/change_streams_feature_compatibility_version.js +++ b/jstests/multiVersion/change_streams_feature_compatibility_version.js @@ -116,12 +116,11 @@ assert.writeOK(coll.insert({_id: 2})); - // Test that the stream opened in FCV 3.6 continues to work and still generates tokens in the - // old format. + // Test that the stream opened in FCV 3.6 continues to work. change = cst.getOneChange(streamOnOldVersion); assert.eq(change.operationType, "insert", tojson(change)); assert.eq(change.documentKey._id, 2); - assertResumeTokenUsesBinDataFormat(change._id); + assertResumeTokenUsesStringFormat(change._id); // Test all the newly created streams can see an insert. change = cst.getOneChange(wholeDbCursor); @@ -169,22 +168,23 @@ // Set the feature compatibility version to 3.6. assert.commandWorked(adminDB.runCommand({setFeatureCompatibilityVersion: "3.6"})); - // Test that existing streams continue, but still generate resume tokens in the new format. + // Test that existing streams continue, but switch back to the BinData format after observing + // the change to FCV. assert.writeOK(coll.insert({_id: 3})); change = cst.getOneChange(wholeDbCursor); assert.eq(change.operationType, "insert", tojson(change)); assert.eq(change.documentKey._id, 3); - assertResumeTokenUsesStringFormat(change._id); + assertResumeTokenUsesBinDataFormat(change._id); change = adminCST.getOneChange(wholeClusterCursor); assert.eq(change.operationType, "insert", tojson(change)); assert.eq(change.documentKey._id, 3); - assertResumeTokenUsesStringFormat(change._id); + assertResumeTokenUsesBinDataFormat(change._id); change = cst.getOneChange(cursorStartedWithTime); assert.eq(change.operationType, "insert", tojson(change)); assert.eq(change.documentKey._id, 3); - assertResumeTokenUsesStringFormat(change._id); + assertResumeTokenUsesBinDataFormat(change._id); // Creating a new change stream with a 4.0 feature should fail. assert.commandFailedWithCode( diff --git a/jstests/noPassthrough/change_fcv_during_change_stream.js b/jstests/noPassthrough/change_fcv_during_change_stream.js new file mode 100644 index 00000000000..5eedbf32ccb --- /dev/null +++ b/jstests/noPassthrough/change_fcv_during_change_stream.js @@ -0,0 +1,102 @@ +// Tests that a change stream's resume token format will adapt as the server's feature compatibility +// version changes. +// @tags: [requires_replication] +(function() { + "use strict"; + + const rst = new ReplSetTest({nodes: 1}); + rst.startSet(); + rst.initiate(); + + const primary = rst.getPrimary(); + const testDB = primary.getDB("test"); + + assert.commandWorked(testDB.adminCommand({setFeatureCompatibilityVersion: "3.6"})); + + const coll = testDB.change_fcv_during_change_stream; + const changeStream36 = coll.watch([], {cursor: {batchSize: 1}}); + + assert.commandWorked(coll.insert({_id: "first in 3.6"})); + assert.commandWorked(coll.insert({_id: "second in 3.6"})); + + assert.commandWorked(testDB.adminCommand({setFeatureCompatibilityVersion: "4.0"})); + + assert.commandWorked(coll.insert({_id: "first in 4.0"})); + + const session = testDB.getMongo().startSession({causalConsistency: false}); + const sessionDb = session.getDatabase("test"); + const sessionColl = sessionDb.change_fcv_during_change_stream; + + session.startTransaction(); + + assert.commandWorked(sessionColl.insert({_id: "first in transaction"})); + assert.commandWorked(sessionColl.insert({_id: "second in transaction"})); + assert.commandWorked(sessionColl.remove({_id: "second in transaction"})); + assert.commandWorked(sessionColl.insert({_id: "second in transaction"})); + assert.commandWorked(sessionColl.insert({_id: "third in transaction"})); + + session.commitTransaction(); + session.endSession(); + + function assertResumeTokenUsesStringFormat(resumeToken) { + assert.neq(resumeToken._data, undefined); + assert.eq(typeof resumeToken._data, + "string", + () => "Resume token: " + tojson(resumeToken) + ", oplog contents: " + + tojson(testDB.getSiblingDB("local").oplog.rs.find().toArray())); + } + + function assertResumeTokenUsesBinDataFormat(resumeToken) { + assert.neq(resumeToken._data, undefined); + assert(resumeToken._data instanceof BinData, tojson(resumeToken)); + } + + // Start reading the change stream. The stream has been opened in 3.6 so make sure that the + // resumeToken is in the appropriate format. + assert.soon(() => changeStream36.hasNext()); + let nextChange = changeStream36.next(); + assert.eq(nextChange.documentKey._id, "first in 3.6"); + assertResumeTokenUsesBinDataFormat(nextChange._id); + + assert.soon(() => changeStream36.hasNext()); + nextChange = changeStream36.next(); + assert.eq(nextChange.documentKey._id, "second in 3.6"); + assertResumeTokenUsesBinDataFormat(nextChange._id); + + // At this point the server is switched to 4.0 and the resumeToken format should switch in 4.0 + // on the fly too. + assert.soon(() => changeStream36.hasNext()); + nextChange = changeStream36.next(); + assert.eq(nextChange.documentKey._id, "first in 4.0"); + assertResumeTokenUsesStringFormat(nextChange._id); + + assert.soon(() => changeStream36.hasNext()); + nextChange = changeStream36.next(); + assert.eq(nextChange.documentKey._id, "first in transaction"); + assertResumeTokenUsesStringFormat(nextChange._id); + + assert.soon(() => changeStream36.hasNext()); + nextChange = changeStream36.next(); + assert.eq(nextChange.documentKey._id, "second in transaction"); + assertResumeTokenUsesStringFormat(nextChange._id); + + // Open a new change stream and position it in the middle of a transaction. Only the new 4.0 + // format resumeTokens are able to position correctly. + const changeStream40 = coll.watch([], {resumeAfter: nextChange._id, cursor: {batchSize: 1}}); + assert.soon(() => changeStream40.hasNext()); + nextChange = changeStream40.next(); + assert.eq(nextChange.documentKey._id, "second in transaction"); + assertResumeTokenUsesStringFormat(nextChange._id); + + assert.soon(() => changeStream40.hasNext()); + nextChange = changeStream40.next(); + assert.eq(nextChange.documentKey._id, "second in transaction"); + assertResumeTokenUsesStringFormat(nextChange._id); + + assert.soon(() => changeStream40.hasNext()); + nextChange = changeStream40.next(); + assert.eq(nextChange.documentKey._id, "third in transaction"); + assertResumeTokenUsesStringFormat(nextChange._id); + + rst.stopSet(); +}()); diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index cf6137b2ebe..af54cce4ffa 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -273,11 +273,21 @@ BSONObj DocumentSourceChangeStream::buildMatchFilter( // 3) Look for 'applyOps' which were created as part of a transaction. BSONObj applyOps = getTxnApplyOpsFilter(nsMatch["ns"], nss); + // 4) Look for changes to the feature compatibility version. These show up as updates to the + // admin.system.version collection. + BSONObj fcvChange = BSON( + "$and" << BSON_ARRAY( + BSON("ns" << NamespaceString::kServerConfigurationNamespace.ns()) + // Ignore entries which correspond to the beginning of a transition. We only need to + // care about those which actually commit a change to the feature compatibility version. + << BSON("o.targetVersion" << BSON("$exists" << false)) + << BSON("o.version" << BSON("$exists" << true)))); + // Match oplog entries after "start" and are either supported (1) commands or (2) operations, // excepting those tagged "fromMigrate". Include the resume token, if resuming, so we can verify // it was still present in the oplog. return BSON("$and" << BSON_ARRAY(BSON("ts" << (startFromInclusive ? GTE : GT) << startFrom) - << BSON(OR(opMatch, commandMatch, applyOps)) + << BSON(OR(opMatch, commandMatch, applyOps, fcvChange)) << BSON("fromMigrate" << NE << true))); } diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index 4ea3aa9ea7f..6643c186ebc 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -67,6 +67,11 @@ using std::vector; namespace { constexpr auto checkValueType = &DocumentSourceChangeStream::checkValueType; + +bool isFCVChange(const Document& input) { + const auto ns = input[repl::OplogEntry::kNamespaceFieldName]; + return NamespaceString(ns.getStringData()).isServerConfigurationCollection(); +} } // namespace DocumentSourceChangeStreamTransform::DocumentSourceChangeStreamTransform( @@ -385,6 +390,27 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document return doc.freeze(); } +void DocumentSourceChangeStreamTransform::switchResumeTokenFormat( + const Document& fcvUpdateOplogEntry) { + checkValueType(fcvUpdateOplogEntry[repl::OplogEntry::kObjectFieldName], + repl::OplogEntry::kObjectFieldName, + BSONType::Object); + Document opObject = fcvUpdateOplogEntry[repl::OplogEntry::kObjectFieldName].getDocument(); + // The FCV update is done with a replacement-style update, so there will be no $set. Instead, + // 'opObject' is the entire new document. + Value newVersion = opObject["version"]; + + checkValueType(newVersion, "o.version", BSONType::String); + if (newVersion.getStringData() == "3.6") { + _resumeTokenFormat = ResumeToken::SerializationFormat::kBinData; + } else { + uassert(65537, + "Feature compatibility version updated to an unknown version", + newVersion.getStringData() == "4.0"); + _resumeTokenFormat = ResumeToken::SerializationFormat::kHexString; + } +} + Value DocumentSourceChangeStreamTransform::serialize( boost::optional<ExplainOptions::Verbosity> explain) const { Document changeStreamOptions(_changeStreamSpec); @@ -447,6 +473,13 @@ DocumentSource::GetNextResult DocumentSourceChangeStreamTransform::getNext() { // Get the next input document. auto input = pSource->getNext(); + + // Check for FCV changes. + while (input.isAdvanced() && isFCVChange(input.getDocument())) { + switchResumeTokenFormat(input.getDocument()); + input = pSource->getNext(); + } + if (!input.isAdvanced()) { return input; } diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.h b/src/mongo/db/pipeline/document_source_change_stream_transform.h index 11e058544cf..84219ed4f90 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.h +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.h @@ -116,6 +116,12 @@ private: void initializeTransactionContext(const Document& input); /** + * Switches the format used to encode the resume token. This must happen if the stream sees an + * update to the admin.system.version collection. + */ + void switchResumeTokenFormat(const Document& fcvUpdateOplogEntry); + + /** * Gets the next relevant applyOps entry that should be returned. If there is none, returns * empty document. */ |