summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Neupauer <martin.neupauer@mongodb.com>2018-11-14 17:47:04 -0500
committerMartin Neupauer <martin.neupauer@mongodb.com>2018-11-20 11:36:57 -0500
commitdc03360c19a272542ac7a73b931f3b7222b97d46 (patch)
tree1db2e3406a2eb225dc000ac351c5c60b793e0d27
parent1473da0d8984295f6298384299fc6be1cc51f0f5 (diff)
downloadmongo-dc03360c19a272542ac7a73b931f3b7222b97d46.tar.gz
SERVER-37027 Adapt change stream resume token when a stream detects FCV changes.
-rw-r--r--jstests/multiVersion/change_streams_feature_compatibility_version.js14
-rw-r--r--jstests/noPassthrough/change_fcv_during_change_stream.js102
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp12
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.cpp33
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.h6
5 files changed, 159 insertions, 8 deletions
diff --git a/jstests/multiVersion/change_streams_feature_compatibility_version.js b/jstests/multiVersion/change_streams_feature_compatibility_version.js
index 790da8b5fc4..346a03d4b38 100644
--- a/jstests/multiVersion/change_streams_feature_compatibility_version.js
+++ b/jstests/multiVersion/change_streams_feature_compatibility_version.js
@@ -116,12 +116,11 @@
assert.writeOK(coll.insert({_id: 2}));
- // Test that the stream opened in FCV 3.6 continues to work and still generates tokens in the
- // old format.
+ // Test that the stream opened in FCV 3.6 continues to work.
change = cst.getOneChange(streamOnOldVersion);
assert.eq(change.operationType, "insert", tojson(change));
assert.eq(change.documentKey._id, 2);
- assertResumeTokenUsesBinDataFormat(change._id);
+ assertResumeTokenUsesStringFormat(change._id);
// Test all the newly created streams can see an insert.
change = cst.getOneChange(wholeDbCursor);
@@ -169,22 +168,23 @@
// Set the feature compatibility version to 3.6.
assert.commandWorked(adminDB.runCommand({setFeatureCompatibilityVersion: "3.6"}));
- // Test that existing streams continue, but still generate resume tokens in the new format.
+ // Test that existing streams continue, but switch back to the BinData format after observing
+ // the change to FCV.
assert.writeOK(coll.insert({_id: 3}));
change = cst.getOneChange(wholeDbCursor);
assert.eq(change.operationType, "insert", tojson(change));
assert.eq(change.documentKey._id, 3);
- assertResumeTokenUsesStringFormat(change._id);
+ assertResumeTokenUsesBinDataFormat(change._id);
change = adminCST.getOneChange(wholeClusterCursor);
assert.eq(change.operationType, "insert", tojson(change));
assert.eq(change.documentKey._id, 3);
- assertResumeTokenUsesStringFormat(change._id);
+ assertResumeTokenUsesBinDataFormat(change._id);
change = cst.getOneChange(cursorStartedWithTime);
assert.eq(change.operationType, "insert", tojson(change));
assert.eq(change.documentKey._id, 3);
- assertResumeTokenUsesStringFormat(change._id);
+ assertResumeTokenUsesBinDataFormat(change._id);
// Creating a new change stream with a 4.0 feature should fail.
assert.commandFailedWithCode(
diff --git a/jstests/noPassthrough/change_fcv_during_change_stream.js b/jstests/noPassthrough/change_fcv_during_change_stream.js
new file mode 100644
index 00000000000..5eedbf32ccb
--- /dev/null
+++ b/jstests/noPassthrough/change_fcv_during_change_stream.js
@@ -0,0 +1,102 @@
+// Tests that a change stream's resume token format will adapt as the server's feature compatibility
+// version changes.
+// @tags: [requires_replication]
+(function() {
+ "use strict";
+
+ const rst = new ReplSetTest({nodes: 1});
+ rst.startSet();
+ rst.initiate();
+
+ const primary = rst.getPrimary();
+ const testDB = primary.getDB("test");
+
+ assert.commandWorked(testDB.adminCommand({setFeatureCompatibilityVersion: "3.6"}));
+
+ const coll = testDB.change_fcv_during_change_stream;
+ const changeStream36 = coll.watch([], {cursor: {batchSize: 1}});
+
+ assert.commandWorked(coll.insert({_id: "first in 3.6"}));
+ assert.commandWorked(coll.insert({_id: "second in 3.6"}));
+
+ assert.commandWorked(testDB.adminCommand({setFeatureCompatibilityVersion: "4.0"}));
+
+ assert.commandWorked(coll.insert({_id: "first in 4.0"}));
+
+ const session = testDB.getMongo().startSession({causalConsistency: false});
+ const sessionDb = session.getDatabase("test");
+ const sessionColl = sessionDb.change_fcv_during_change_stream;
+
+ session.startTransaction();
+
+ assert.commandWorked(sessionColl.insert({_id: "first in transaction"}));
+ assert.commandWorked(sessionColl.insert({_id: "second in transaction"}));
+ assert.commandWorked(sessionColl.remove({_id: "second in transaction"}));
+ assert.commandWorked(sessionColl.insert({_id: "second in transaction"}));
+ assert.commandWorked(sessionColl.insert({_id: "third in transaction"}));
+
+ session.commitTransaction();
+ session.endSession();
+
+ function assertResumeTokenUsesStringFormat(resumeToken) {
+ assert.neq(resumeToken._data, undefined);
+ assert.eq(typeof resumeToken._data,
+ "string",
+ () => "Resume token: " + tojson(resumeToken) + ", oplog contents: " +
+ tojson(testDB.getSiblingDB("local").oplog.rs.find().toArray()));
+ }
+
+ function assertResumeTokenUsesBinDataFormat(resumeToken) {
+ assert.neq(resumeToken._data, undefined);
+ assert(resumeToken._data instanceof BinData, tojson(resumeToken));
+ }
+
+ // Start reading the change stream. The stream has been opened in 3.6 so make sure that the
+ // resumeToken is in the appropriate format.
+ assert.soon(() => changeStream36.hasNext());
+ let nextChange = changeStream36.next();
+ assert.eq(nextChange.documentKey._id, "first in 3.6");
+ assertResumeTokenUsesBinDataFormat(nextChange._id);
+
+ assert.soon(() => changeStream36.hasNext());
+ nextChange = changeStream36.next();
+ assert.eq(nextChange.documentKey._id, "second in 3.6");
+ assertResumeTokenUsesBinDataFormat(nextChange._id);
+
+ // At this point the server is switched to 4.0 and the resumeToken format should switch in 4.0
+ // on the fly too.
+ assert.soon(() => changeStream36.hasNext());
+ nextChange = changeStream36.next();
+ assert.eq(nextChange.documentKey._id, "first in 4.0");
+ assertResumeTokenUsesStringFormat(nextChange._id);
+
+ assert.soon(() => changeStream36.hasNext());
+ nextChange = changeStream36.next();
+ assert.eq(nextChange.documentKey._id, "first in transaction");
+ assertResumeTokenUsesStringFormat(nextChange._id);
+
+ assert.soon(() => changeStream36.hasNext());
+ nextChange = changeStream36.next();
+ assert.eq(nextChange.documentKey._id, "second in transaction");
+ assertResumeTokenUsesStringFormat(nextChange._id);
+
+ // Open a new change stream and position it in the middle of a transaction. Only the new 4.0
+ // format resumeTokens are able to position correctly.
+ const changeStream40 = coll.watch([], {resumeAfter: nextChange._id, cursor: {batchSize: 1}});
+ assert.soon(() => changeStream40.hasNext());
+ nextChange = changeStream40.next();
+ assert.eq(nextChange.documentKey._id, "second in transaction");
+ assertResumeTokenUsesStringFormat(nextChange._id);
+
+ assert.soon(() => changeStream40.hasNext());
+ nextChange = changeStream40.next();
+ assert.eq(nextChange.documentKey._id, "second in transaction");
+ assertResumeTokenUsesStringFormat(nextChange._id);
+
+ assert.soon(() => changeStream40.hasNext());
+ nextChange = changeStream40.next();
+ assert.eq(nextChange.documentKey._id, "third in transaction");
+ assertResumeTokenUsesStringFormat(nextChange._id);
+
+ rst.stopSet();
+}());
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index cf6137b2ebe..af54cce4ffa 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -273,11 +273,21 @@ BSONObj DocumentSourceChangeStream::buildMatchFilter(
// 3) Look for 'applyOps' which were created as part of a transaction.
BSONObj applyOps = getTxnApplyOpsFilter(nsMatch["ns"], nss);
+ // 4) Look for changes to the feature compatibility version. These show up as updates to the
+ // admin.system.version collection.
+ BSONObj fcvChange = BSON(
+ "$and" << BSON_ARRAY(
+ BSON("ns" << NamespaceString::kServerConfigurationNamespace.ns())
+ // Ignore entries which correspond to the beginning of a transition. We only need to
+ // care about those which actually commit a change to the feature compatibility version.
+ << BSON("o.targetVersion" << BSON("$exists" << false))
+ << BSON("o.version" << BSON("$exists" << true))));
+
// Match oplog entries after "start" and are either supported (1) commands or (2) operations,
// excepting those tagged "fromMigrate". Include the resume token, if resuming, so we can verify
// it was still present in the oplog.
return BSON("$and" << BSON_ARRAY(BSON("ts" << (startFromInclusive ? GTE : GT) << startFrom)
- << BSON(OR(opMatch, commandMatch, applyOps))
+ << BSON(OR(opMatch, commandMatch, applyOps, fcvChange))
<< BSON("fromMigrate" << NE << true)));
}
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
index 4ea3aa9ea7f..6643c186ebc 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
@@ -67,6 +67,11 @@ using std::vector;
namespace {
constexpr auto checkValueType = &DocumentSourceChangeStream::checkValueType;
+
+bool isFCVChange(const Document& input) {
+ const auto ns = input[repl::OplogEntry::kNamespaceFieldName];
+ return NamespaceString(ns.getStringData()).isServerConfigurationCollection();
+}
} // namespace
DocumentSourceChangeStreamTransform::DocumentSourceChangeStreamTransform(
@@ -385,6 +390,27 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document
return doc.freeze();
}
+void DocumentSourceChangeStreamTransform::switchResumeTokenFormat(
+ const Document& fcvUpdateOplogEntry) {
+ checkValueType(fcvUpdateOplogEntry[repl::OplogEntry::kObjectFieldName],
+ repl::OplogEntry::kObjectFieldName,
+ BSONType::Object);
+ Document opObject = fcvUpdateOplogEntry[repl::OplogEntry::kObjectFieldName].getDocument();
+ // The FCV update is done with a replacement-style update, so there will be no $set. Instead,
+ // 'opObject' is the entire new document.
+ Value newVersion = opObject["version"];
+
+ checkValueType(newVersion, "o.version", BSONType::String);
+ if (newVersion.getStringData() == "3.6") {
+ _resumeTokenFormat = ResumeToken::SerializationFormat::kBinData;
+ } else {
+ uassert(65537,
+ "Feature compatibility version updated to an unknown version",
+ newVersion.getStringData() == "4.0");
+ _resumeTokenFormat = ResumeToken::SerializationFormat::kHexString;
+ }
+}
+
Value DocumentSourceChangeStreamTransform::serialize(
boost::optional<ExplainOptions::Verbosity> explain) const {
Document changeStreamOptions(_changeStreamSpec);
@@ -447,6 +473,13 @@ DocumentSource::GetNextResult DocumentSourceChangeStreamTransform::getNext() {
// Get the next input document.
auto input = pSource->getNext();
+
+ // Check for FCV changes.
+ while (input.isAdvanced() && isFCVChange(input.getDocument())) {
+ switchResumeTokenFormat(input.getDocument());
+ input = pSource->getNext();
+ }
+
if (!input.isAdvanced()) {
return input;
}
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.h b/src/mongo/db/pipeline/document_source_change_stream_transform.h
index 11e058544cf..84219ed4f90 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.h
@@ -116,6 +116,12 @@ private:
void initializeTransactionContext(const Document& input);
/**
+ * Switches the format used to encode the resume token. This must happen if the stream sees an
+ * update to the admin.system.version collection.
+ */
+ void switchResumeTokenFormat(const Document& fcvUpdateOplogEntry);
+
+ /**
* Gets the next relevant applyOps entry that should be returned. If there is none, returns
* empty document.
*/