summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJustin Seyster <justin.seyster@mongodb.com>2021-01-11 18:37:53 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-01-12 18:43:54 +0000
commit5593fd8e33b60c75802edab304e23998fa0ce8a5 (patch)
tree137d5d51d5ca9fc0b881054128d9a4cd142921af
parent425a5ff8f6a4566aa17a2b25875a1cb12037c797 (diff)
downloadmongo-5593fd8e33b60c75802edab304e23998fa0ce8a5.tar.gz
SERVER-50769 Change streams no longer balk at empty applyOpsr4.2.12-rc0r4.2.12
(cherry picked from commit e9122ba5078eca4fbc7ea858221dba6af00e90a9)
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml1
-rw-r--r--etc/backports_required_for_multiversion_tests.yml2
-rw-r--r--jstests/sharding/change_stream_empty_apply_ops.js75
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp330
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.cpp1
6 files changed, 411 insertions, 4 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
index bd30ef03e88..2cd6843cb66 100644
--- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
@@ -53,6 +53,7 @@ selector:
- jstests/sharding/agg_explain_fmt.js
- jstests/sharding/change_stream_metadata_notifications.js
- jstests/sharding/change_stream_transaction_sharded.js
+ - jstests/sharding/change_stream_empty_apply_ops.js
- jstests/sharding/change_streams.js
- jstests/sharding/collation_lookup.js
- jstests/sharding/collation_targeting.js
diff --git a/etc/backports_required_for_multiversion_tests.yml b/etc/backports_required_for_multiversion_tests.yml
index 9dd115218fb..9b307bfe98d 100644
--- a/etc/backports_required_for_multiversion_tests.yml
+++ b/etc/backports_required_for_multiversion_tests.yml
@@ -88,6 +88,8 @@ all:
test_file: jstests/replsets/read_operations_during_step_down.js
- ticket: SERVER-50417
test_file: jstests/replsets/read_operations_during_step_up.js
+ - ticket: SERVER-50769
+ test_file: jstests/sharding/change_stream_empty_apply_ops.js
# Tests that should only be excluded from particular suites should be listed under that suite.
suites:
diff --git a/jstests/sharding/change_stream_empty_apply_ops.js b/jstests/sharding/change_stream_empty_apply_ops.js
new file mode 100644
index 00000000000..836bf9fad69
--- /dev/null
+++ b/jstests/sharding/change_stream_empty_apply_ops.js
@@ -0,0 +1,75 @@
+// Confirms that change streams correctly handle prepared transactions with an empty applyOps entry.
+// This test creats a multi-shard transaction in which one of the participating shards has only a
+// no-op write, resulting in the empty applyOps scenario we wish to test. Exercises the fix for
+// SERVER-50769.
+// @tags: [
+// requires_sharding,
+// uses_change_streams,
+// uses_multi_shard_transaction,
+// uses_transactions,
+// ]
+(function() {
+"use strict";
+
+const dbName = "test";
+const collName = "change_stream_empty_apply_ops";
+const namespace = dbName + "." + collName;
+
+const st = new ShardingTest({
+ shards: 2,
+ rs: {nodes: 1, setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}}
+});
+
+const mongosConn = st.s;
+const db = mongosConn.getDB(dbName);
+const coll = db.getCollection(collName);
+
+assert.commandWorked(coll.createIndex({shard: 1}));
+st.ensurePrimaryShard(dbName, st.shard0.shardName);
+// Shard the test collection and split it into two chunks: one that contains all {shard: 1}
+// documents and one that contains all {shard: 2} documents.
+st.shardColl(collName,
+ {shard: 1} /* shard key */,
+ {shard: 2} /* split at */,
+ {shard: 2} /* move the chunk containing {shard: 2} to its own shard */,
+ dbName,
+ true);
+// Seed each chunk with an initial document.
+assert.commandWorked(coll.insert({shard: 1}, {writeConcern: {w: "majority"}}));
+assert.commandWorked(coll.insert({shard: 2}, {writeConcern: {w: "majority"}}));
+
+// Open up change streams.
+const changeStreamCursorColl = coll.watch();
+const changeStreamCursorDB = db.watch();
+const changeStreamCursorCluster = mongosConn.watch();
+
+// Start a transaction, which will include both shards.
+const sesion = db.getMongo().startSession({causalConsistency: true});
+const sessionDb = sesion.getDatabase(dbName);
+const sessionColl = sessionDb[collName];
+
+sesion.startTransaction({readConcern: {level: "majority"}});
+
+// This no-op will make one of the shards a transaction participant without generating an actual
+// write. The transaction will send an empty prepared transaction to the shard, in the form of an
+// applyOps command with no operations.
+sessionColl.findAndModify({query: {shard: 1}, update: {$setOnInsert: {a: 1}}});
+
+// This write, which is not a no-op, occurs on the other shard.
+sessionColl.findAndModify({query: {shard: 2}, update: {$set: {a: 1}}});
+
+assert.commandWorked(sesion.commitTransaction_forTesting());
+
+// Each change stream should see exactly one update, resulting from the valid write on shard 2.
+[changeStreamCursorColl, changeStreamCursorDB, changeStreamCursorCluster].forEach(function(
+ changeStreamCursor) {
+ assert.soon(() => changeStreamCursor.hasNext());
+ const changeDoc = changeStreamCursor.next();
+ assert.eq(changeDoc.documentKey.shard, 2);
+ assert.eq(changeDoc.operationType, "update");
+
+ assert(!changeStreamCursor.hasNext());
+});
+
+st.stop();
+})();
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index 012c16ffbaa..82a1b177e92 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -177,8 +177,10 @@ BSONObj getTxnApplyOpsFilter(BSONElement nsMatch, const NamespaceString& nss) {
BSONObjBuilder applyOpsBuilder;
applyOpsBuilder.append("op", "c");
- // "o.applyOps" must be an array with at least one element
- applyOpsBuilder.append("o.applyOps.0", BSON("$exists" << true));
+ // "o.applyOps" stores the list of operations, so it must be an array.
+ applyOpsBuilder.append("o.applyOps",
+ BSON("$type"
+ << "array"));
applyOpsBuilder.append("lsid", BSON("$exists" << true));
applyOpsBuilder.append("txnNumber", BSON("$exists" << true));
applyOpsBuilder.append("o.prepare", BSON("$not" << BSON("$eq" << true)));
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index eb2583f705b..e9c0082f632 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -1090,7 +1090,7 @@ TEST_F(ChangeStreamStageTest, TransactionWithMultipleOplogEntries) {
V{std::vector<Document>{
D{{"op", "i"_sd}, {"ns", nss.ns()}, {"ui", testUuid()}, {"o", V{D{{"_id", 789}}}}},
}}},
- /* The abscence of the "partialTxn" and "prepare" fields indicates that this command commits
+ /* The absence of the "partialTxn" and "prepare" fields indicates that this command commits
the transaction. */
};
@@ -1169,6 +1169,216 @@ TEST_F(ChangeStreamStageTest, TransactionWithMultipleOplogEntries) {
2));
}
+TEST_F(ChangeStreamStageTest, TransactionWithEmptyOplogEntries) {
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setTxnNumber(1);
+ sessionInfo.setSessionId(makeLogicalSessionIdForTest());
+
+ // Create a transaction that is chained across 5 applyOps oplog entries. The first, third, and
+ // final oplog entries in the transaction chain contain empty applyOps arrays. The test verifies
+ // that change streams (1) correctly detect the transaction chain despite the fact that the
+ // final applyOps, which implicitly commits the transaction, is empty; and (2) behaves correctly
+ // upon encountering empty applyOps at other stages of the transaction chain.
+ repl::OpTime applyOpsOpTime1(Timestamp(100, 1), 1);
+ Document applyOps1{
+ {"applyOps", V{std::vector<Document>{}}},
+ {"partialTxn", true},
+ };
+
+ auto transactionEntry1 = makeOplogEntry(OpTypeEnum::kCommand,
+ nss.getCommandNS(),
+ applyOps1.toBson(),
+ testUuid(),
+ boost::none, // fromMigrate
+ boost::none, // o2 field
+ applyOpsOpTime1,
+ sessionInfo,
+ repl::OpTime());
+
+ repl::OpTime applyOpsOpTime2(Timestamp(100, 2), 1);
+ Document applyOps2{
+ {"applyOps",
+ V{std::vector<Document>{
+ D{{"op", "i"_sd},
+ {"ns", nss.ns()},
+ {"ui", testUuid()},
+ {"o", V{Document{{"_id", 123}}}}},
+ }}},
+ {"partialTxn", true},
+ };
+
+ auto transactionEntry2 = makeOplogEntry(OpTypeEnum::kCommand,
+ nss.getCommandNS(),
+ applyOps2.toBson(),
+ testUuid(),
+ boost::none, // fromMigrate
+ boost::none, // o2 field
+ applyOpsOpTime2,
+ sessionInfo,
+ applyOpsOpTime1);
+
+ repl::OpTime applyOpsOpTime3(Timestamp(100, 3), 1);
+ Document applyOps3{
+ {"applyOps", V{std::vector<Document>{}}},
+ {"partialTxn", true},
+ };
+
+ auto transactionEntry3 = makeOplogEntry(OpTypeEnum::kCommand,
+ nss.getCommandNS(),
+ applyOps3.toBson(),
+ testUuid(),
+ boost::none, // fromMigrate
+ boost::none, // o2 field
+ applyOpsOpTime3,
+ sessionInfo,
+ applyOpsOpTime2);
+
+ repl::OpTime applyOpsOpTime4(Timestamp(100, 4), 1);
+ Document applyOps4{
+ {"applyOps",
+ V{std::vector<Document>{D{{"op", "i"_sd},
+ {"ns", nss.ns()},
+ {"ui", testUuid()},
+ {"o", V{Document{{"_id", 456}}}}}}}},
+ {"partialTxn", true},
+ };
+
+ auto transactionEntry4 = makeOplogEntry(OpTypeEnum::kCommand,
+ nss.getCommandNS(),
+ applyOps4.toBson(),
+ testUuid(),
+ boost::none, // fromMigrate
+ boost::none, // o2 field
+ applyOpsOpTime4,
+ sessionInfo,
+ applyOpsOpTime3);
+
+ repl::OpTime applyOpsOpTime5(Timestamp(100, 5), 1);
+ Document applyOps5{
+ {"applyOps", V{std::vector<Document>{}}},
+ /* The absence of the "partialTxn" and "prepare" fields indicates that this command commits
+ the transaction. */
+ };
+
+ auto transactionEntry5 = makeOplogEntry(OpTypeEnum::kCommand,
+ nss.getCommandNS(),
+ applyOps5.toBson(),
+ testUuid(),
+ boost::none, // fromMigrate
+ boost::none, // o2 field
+ applyOpsOpTime5,
+ sessionInfo,
+ applyOpsOpTime4);
+
+ // We do not use the checkTransformation() pattern that other tests use since we expect multiple
+ // documents to be returned from one applyOps.
+ auto stages = makeStages(transactionEntry5);
+ auto transform = stages[2].get();
+ invariant(dynamic_cast<DocumentSourceChangeStreamTransform*>(transform) != nullptr);
+
+ // Populate the MockTransactionHistoryEditor in reverse chronological order.
+ getExpCtx()->mongoProcessInterface =
+ std::make_unique<MockMongoInterface>(std::vector<FieldPath>{},
+ std::vector<repl::OplogEntry>{transactionEntry5,
+ transactionEntry4,
+ transactionEntry3,
+ transactionEntry2,
+ transactionEntry1});
+
+ // We should get three documents from the change stream, based on the documents in the two
+ // applyOps entries.
+ auto next = transform->getNext();
+ ASSERT(next.isAdvanced());
+ auto nextDoc = next.releaseDocument();
+ ASSERT_EQ(nextDoc[DSChangeStream::kTxnNumberField].getLong(), *sessionInfo.getTxnNumber());
+ ASSERT_EQ(nextDoc[DSChangeStream::kOperationTypeField].getString(),
+ DSChangeStream::kInsertOpType);
+ ASSERT_EQ(nextDoc[DSChangeStream::kFullDocumentField]["_id"].getInt(), 123);
+ ASSERT_EQ(
+ nextDoc["lsid"].getDocument().toBson().woCompare(sessionInfo.getSessionId()->toBSON()), 0);
+ auto resumeToken = ResumeToken::parse(nextDoc["_id"].getDocument()).toDocument();
+ ASSERT_DOCUMENT_EQ(resumeToken,
+ makeResumeToken(applyOpsOpTime5.getTimestamp(),
+ testUuid(),
+ V{D{}},
+ ResumeTokenData::FromInvalidate::kNotFromInvalidate,
+ 0));
+
+ next = transform->getNext();
+ ASSERT(next.isAdvanced());
+ nextDoc = next.releaseDocument();
+ ASSERT_EQ(nextDoc[DSChangeStream::kTxnNumberField].getLong(), *sessionInfo.getTxnNumber());
+ ASSERT_EQ(nextDoc[DSChangeStream::kOperationTypeField].getString(),
+ DSChangeStream::kInsertOpType);
+ ASSERT_EQ(nextDoc[DSChangeStream::kFullDocumentField]["_id"].getInt(), 456);
+ ASSERT_EQ(
+ nextDoc["lsid"].getDocument().toBson().woCompare(sessionInfo.getSessionId()->toBSON()), 0);
+ resumeToken = ResumeToken::parse(nextDoc["_id"].getDocument()).toDocument();
+ ASSERT_DOCUMENT_EQ(resumeToken,
+ makeResumeToken(applyOpsOpTime5.getTimestamp(),
+ testUuid(),
+ V{D{}},
+ ResumeTokenData::FromInvalidate::kNotFromInvalidate,
+ 1));
+}
+
+TEST_F(ChangeStreamStageTest, TransactionWithOnlyEmptyOplogEntries) {
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setTxnNumber(1);
+ sessionInfo.setSessionId(makeLogicalSessionIdForTest());
+
+ // Create a transaction that is chained across 2 applyOps oplog entries. This test verifies that
+ // a change stream correctly reads an empty transaction and does not observe any events from it.
+ repl::OpTime applyOpsOpTime1(Timestamp(100, 1), 1);
+ Document applyOps1{
+ {"applyOps", V{std::vector<Document>{}}},
+ {"partialTxn", true},
+ };
+
+ auto transactionEntry1 = makeOplogEntry(OpTypeEnum::kCommand,
+ nss.getCommandNS(),
+ applyOps1.toBson(),
+ testUuid(),
+ boost::none, // fromMigrate
+ boost::none, // o2 field
+ applyOpsOpTime1,
+ sessionInfo,
+ repl::OpTime());
+
+ repl::OpTime applyOpsOpTime2(Timestamp(100, 2), 1);
+ Document applyOps2{
+ {"applyOps", V{std::vector<Document>{}}},
+ /* The absence of the "partialTxn" and "prepare" fields indicates that this command commits
+ the transaction. */
+ };
+
+ auto transactionEntry2 = makeOplogEntry(OpTypeEnum::kCommand,
+ nss.getCommandNS(),
+ applyOps2.toBson(),
+ testUuid(),
+ boost::none, // fromMigrate
+ boost::none, // o2 field
+ applyOpsOpTime2,
+ sessionInfo,
+ applyOpsOpTime1);
+
+ // We do not use the checkTransformation() pattern that other tests use since we expect multiple
+ // documents to be returned from one applyOps.
+ auto stages = makeStages(transactionEntry2);
+ auto transform = stages[2].get();
+ invariant(dynamic_cast<DocumentSourceChangeStreamTransform*>(transform) != nullptr);
+
+ // Populate the MockTransactionHistoryEditor in reverse chronological order.
+ getExpCtx()->mongoProcessInterface = std::make_unique<MockMongoInterface>(
+ std::vector<FieldPath>{},
+ std::vector<repl::OplogEntry>{transactionEntry2, transactionEntry1});
+
+ // We should get three documents from the change stream, based on the documents in the two
+ // applyOps entries.
+ auto next = transform->getNext();
+ ASSERT(!next.isAdvanced());
+}
+
TEST_F(ChangeStreamStageTest, PreparedTransactionWithMultipleOplogEntries) {
OperationSessionInfo sessionInfo;
sessionInfo.setTxnNumber(1);
@@ -1299,6 +1509,124 @@ TEST_F(ChangeStreamStageTest, PreparedTransactionWithMultipleOplogEntries) {
V{D{}},
ResumeTokenData::FromInvalidate::kNotFromInvalidate,
2));
+
+ next = transform->getNext();
+ ASSERT(!next.isAdvanced());
+}
+
+TEST_F(ChangeStreamStageTest, PreparedTransactionEndingWithEmptyApplyOps) {
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setTxnNumber(1);
+ sessionInfo.setSessionId(makeLogicalSessionIdForTest());
+
+ // Create two applyOps entries that together represent a whole transaction.
+ repl::OpTime applyOpsOpTime1(Timestamp(99, 1), 1);
+ Document applyOps1{
+ {"applyOps",
+ V{std::vector<Document>{
+ D{{"op", "i"_sd}, {"ns", nss.ns()}, {"ui", testUuid()}, {"o", V{D{{"_id", 123}}}}},
+ D{{"op", "i"_sd}, {"ns", nss.ns()}, {"ui", testUuid()}, {"o", V{D{{"_id", 456}}}}},
+ }}},
+ {"partialTxn", true},
+ };
+
+ auto transactionEntry1 = makeOplogEntry(OpTypeEnum::kCommand,
+ nss.getCommandNS(),
+ applyOps1.toBson(),
+ testUuid(),
+ boost::none, // fromMigrate
+ boost::none, // o2 field
+ applyOpsOpTime1,
+ sessionInfo,
+ repl::OpTime());
+
+ repl::OpTime applyOpsOpTime2(Timestamp(99, 2), 1);
+ Document applyOps2{
+ {"applyOps", V{std::vector<Document>{}}},
+ {"prepare", true},
+ };
+
+ // The second applyOps is empty.
+ auto transactionEntry2 = makeOplogEntry(OpTypeEnum::kCommand,
+ nss.getCommandNS(),
+ applyOps2.toBson(),
+ testUuid(),
+ boost::none, // fromMigrate
+ boost::none, // o2 field
+ applyOpsOpTime2,
+ sessionInfo,
+ applyOpsOpTime1);
+
+ // Create an oplog entry representing the commit for the prepared transaction.
+ auto commitEntry =
+ repl::OplogEntry(kDefaultOpTime, // optime
+ 1LL, // hash
+ OpTypeEnum::kCommand, // opType
+ nss.getCommandNS(), // namespace
+ boost::none, // uuid
+ boost::none, // fromMigrate
+ repl::OplogEntry::kOplogVersion, // version
+ BSON("commitTransaction" << 1), // o
+ boost::none, // o2
+ sessionInfo, // sessionInfo
+ boost::none, // upsert
+ Date_t(), // wall clock time
+ boost::none, // statement id
+ applyOpsOpTime2, // optime of previous write within same transaction
+ boost::none, // pre-image optime
+ boost::none); // post-image optime
+
+ // We do not use the checkTransformation() pattern that other tests use since we expect multiple
+ // documents to be returned from one applyOps.
+ auto stages = makeStages(commitEntry);
+ auto transform = stages[2].get();
+ invariant(dynamic_cast<DocumentSourceChangeStreamTransform*>(transform) != nullptr);
+
+ // Populate the MockTransactionHistoryEditor in reverse chronological order.
+ getExpCtx()->mongoProcessInterface = std::make_unique<MockMongoInterface>(
+ std::vector<FieldPath>{},
+ std::vector<repl::OplogEntry>{commitEntry, transactionEntry2, transactionEntry1});
+
+ // We should get two documents from the change stream, based on the documents in the non-empty
+ // applyOps entry.
+ auto next = transform->getNext();
+ ASSERT(next.isAdvanced());
+ auto nextDoc = next.releaseDocument();
+ ASSERT_EQ(nextDoc[DSChangeStream::kTxnNumberField].getLong(), *sessionInfo.getTxnNumber());
+ ASSERT_EQ(nextDoc[DSChangeStream::kOperationTypeField].getString(),
+ DSChangeStream::kInsertOpType);
+ ASSERT_EQ(nextDoc[DSChangeStream::kFullDocumentField]["_id"].getInt(), 123);
+ ASSERT_EQ(
+ nextDoc["lsid"].getDocument().toBson().woCompare(sessionInfo.getSessionId()->toBSON()), 0);
+ auto resumeToken = ResumeToken::parse(nextDoc["_id"].getDocument()).toDocument();
+ ASSERT_DOCUMENT_EQ(
+ resumeToken,
+ makeResumeToken(kDefaultOpTime.getTimestamp(), // Timestamp of the commitCommand.
+ testUuid(),
+ V{D{}},
+ ResumeTokenData::FromInvalidate::kNotFromInvalidate,
+ 0));
+
+ next = transform->getNext();
+ ASSERT(next.isAdvanced());
+ nextDoc = next.releaseDocument();
+ ASSERT_EQ(nextDoc[DSChangeStream::kTxnNumberField].getLong(), *sessionInfo.getTxnNumber());
+ ASSERT_EQ(nextDoc[DSChangeStream::kOperationTypeField].getString(),
+ DSChangeStream::kInsertOpType);
+ ASSERT_EQ(nextDoc[DSChangeStream::kFullDocumentField]["_id"].getInt(), 456);
+ ASSERT_EQ(
+ nextDoc["lsid"].getDocument().toBson().woCompare(sessionInfo.getSessionId()->toBSON()), 0);
+ resumeToken = ResumeToken::parse(nextDoc["_id"].getDocument()).toDocument();
+ ASSERT_DOCUMENT_EQ(
+ resumeToken,
+ makeResumeToken(kDefaultOpTime.getTimestamp(), // Timestamp of the commitCommand.
+ testUuid(),
+ V{D{}},
+ ResumeTokenData::FromInvalidate::kNotFromInvalidate,
+ 1));
+
+ next = transform->getNext();
+ ASSERT(!next.isAdvanced());
}
TEST_F(ChangeStreamStageTest, TransformApplyOps) {
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
index 0773a2fb347..e055104b91a 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
@@ -534,7 +534,6 @@ DocumentSourceChangeStreamTransform::TransactionOpIterator::TransactionOpIterato
}
checkValueType(_currentApplyOps, "applyOps", BSONType::Array);
- invariant(_currentApplyOps.getArrayLength() > 0);
// Initialize iterators at the beginning of the transaction.
_currentApplyOpsIt = _currentApplyOps.getArray().begin();