diff options
6 files changed, 411 insertions, 4 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml index bd30ef03e88..2cd6843cb66 100644 --- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml +++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml @@ -53,6 +53,7 @@ selector: - jstests/sharding/agg_explain_fmt.js - jstests/sharding/change_stream_metadata_notifications.js - jstests/sharding/change_stream_transaction_sharded.js + - jstests/sharding/change_stream_empty_apply_ops.js - jstests/sharding/change_streams.js - jstests/sharding/collation_lookup.js - jstests/sharding/collation_targeting.js diff --git a/etc/backports_required_for_multiversion_tests.yml b/etc/backports_required_for_multiversion_tests.yml index 9dd115218fb..9b307bfe98d 100644 --- a/etc/backports_required_for_multiversion_tests.yml +++ b/etc/backports_required_for_multiversion_tests.yml @@ -88,6 +88,8 @@ all: test_file: jstests/replsets/read_operations_during_step_down.js - ticket: SERVER-50417 test_file: jstests/replsets/read_operations_during_step_up.js + - ticket: SERVER-50769 + test_file: jstests/sharding/change_stream_empty_apply_ops.js # Tests that should only be excluded from particular suites should be listed under that suite. suites: diff --git a/jstests/sharding/change_stream_empty_apply_ops.js b/jstests/sharding/change_stream_empty_apply_ops.js new file mode 100644 index 00000000000..836bf9fad69 --- /dev/null +++ b/jstests/sharding/change_stream_empty_apply_ops.js @@ -0,0 +1,75 @@ +// Confirms that change streams correctly handle prepared transactions with an empty applyOps entry. +// This test creats a multi-shard transaction in which one of the participating shards has only a +// no-op write, resulting in the empty applyOps scenario we wish to test. Exercises the fix for +// SERVER-50769. +// @tags: [ +// requires_sharding, +// uses_change_streams, +// uses_multi_shard_transaction, +// uses_transactions, +// ] +(function() { +"use strict"; + +const dbName = "test"; +const collName = "change_stream_empty_apply_ops"; +const namespace = dbName + "." + collName; + +const st = new ShardingTest({ + shards: 2, + rs: {nodes: 1, setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}} +}); + +const mongosConn = st.s; +const db = mongosConn.getDB(dbName); +const coll = db.getCollection(collName); + +assert.commandWorked(coll.createIndex({shard: 1})); +st.ensurePrimaryShard(dbName, st.shard0.shardName); +// Shard the test collection and split it into two chunks: one that contains all {shard: 1} +// documents and one that contains all {shard: 2} documents. +st.shardColl(collName, + {shard: 1} /* shard key */, + {shard: 2} /* split at */, + {shard: 2} /* move the chunk containing {shard: 2} to its own shard */, + dbName, + true); +// Seed each chunk with an initial document. +assert.commandWorked(coll.insert({shard: 1}, {writeConcern: {w: "majority"}})); +assert.commandWorked(coll.insert({shard: 2}, {writeConcern: {w: "majority"}})); + +// Open up change streams. +const changeStreamCursorColl = coll.watch(); +const changeStreamCursorDB = db.watch(); +const changeStreamCursorCluster = mongosConn.watch(); + +// Start a transaction, which will include both shards. +const sesion = db.getMongo().startSession({causalConsistency: true}); +const sessionDb = sesion.getDatabase(dbName); +const sessionColl = sessionDb[collName]; + +sesion.startTransaction({readConcern: {level: "majority"}}); + +// This no-op will make one of the shards a transaction participant without generating an actual +// write. The transaction will send an empty prepared transaction to the shard, in the form of an +// applyOps command with no operations. +sessionColl.findAndModify({query: {shard: 1}, update: {$setOnInsert: {a: 1}}}); + +// This write, which is not a no-op, occurs on the other shard. +sessionColl.findAndModify({query: {shard: 2}, update: {$set: {a: 1}}}); + +assert.commandWorked(sesion.commitTransaction_forTesting()); + +// Each change stream should see exactly one update, resulting from the valid write on shard 2. +[changeStreamCursorColl, changeStreamCursorDB, changeStreamCursorCluster].forEach(function( + changeStreamCursor) { + assert.soon(() => changeStreamCursor.hasNext()); + const changeDoc = changeStreamCursor.next(); + assert.eq(changeDoc.documentKey.shard, 2); + assert.eq(changeDoc.operationType, "update"); + + assert(!changeStreamCursor.hasNext()); +}); + +st.stop(); +})(); diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index 012c16ffbaa..82a1b177e92 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -177,8 +177,10 @@ BSONObj getTxnApplyOpsFilter(BSONElement nsMatch, const NamespaceString& nss) { BSONObjBuilder applyOpsBuilder; applyOpsBuilder.append("op", "c"); - // "o.applyOps" must be an array with at least one element - applyOpsBuilder.append("o.applyOps.0", BSON("$exists" << true)); + // "o.applyOps" stores the list of operations, so it must be an array. + applyOpsBuilder.append("o.applyOps", + BSON("$type" + << "array")); applyOpsBuilder.append("lsid", BSON("$exists" << true)); applyOpsBuilder.append("txnNumber", BSON("$exists" << true)); applyOpsBuilder.append("o.prepare", BSON("$not" << BSON("$eq" << true))); diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index eb2583f705b..e9c0082f632 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -1090,7 +1090,7 @@ TEST_F(ChangeStreamStageTest, TransactionWithMultipleOplogEntries) { V{std::vector<Document>{ D{{"op", "i"_sd}, {"ns", nss.ns()}, {"ui", testUuid()}, {"o", V{D{{"_id", 789}}}}}, }}}, - /* The abscence of the "partialTxn" and "prepare" fields indicates that this command commits + /* The absence of the "partialTxn" and "prepare" fields indicates that this command commits the transaction. */ }; @@ -1169,6 +1169,216 @@ TEST_F(ChangeStreamStageTest, TransactionWithMultipleOplogEntries) { 2)); } +TEST_F(ChangeStreamStageTest, TransactionWithEmptyOplogEntries) { + OperationSessionInfo sessionInfo; + sessionInfo.setTxnNumber(1); + sessionInfo.setSessionId(makeLogicalSessionIdForTest()); + + // Create a transaction that is chained across 5 applyOps oplog entries. The first, third, and + // final oplog entries in the transaction chain contain empty applyOps arrays. The test verifies + // that change streams (1) correctly detect the transaction chain despite the fact that the + // final applyOps, which implicitly commits the transaction, is empty; and (2) behaves correctly + // upon encountering empty applyOps at other stages of the transaction chain. + repl::OpTime applyOpsOpTime1(Timestamp(100, 1), 1); + Document applyOps1{ + {"applyOps", V{std::vector<Document>{}}}, + {"partialTxn", true}, + }; + + auto transactionEntry1 = makeOplogEntry(OpTypeEnum::kCommand, + nss.getCommandNS(), + applyOps1.toBson(), + testUuid(), + boost::none, // fromMigrate + boost::none, // o2 field + applyOpsOpTime1, + sessionInfo, + repl::OpTime()); + + repl::OpTime applyOpsOpTime2(Timestamp(100, 2), 1); + Document applyOps2{ + {"applyOps", + V{std::vector<Document>{ + D{{"op", "i"_sd}, + {"ns", nss.ns()}, + {"ui", testUuid()}, + {"o", V{Document{{"_id", 123}}}}}, + }}}, + {"partialTxn", true}, + }; + + auto transactionEntry2 = makeOplogEntry(OpTypeEnum::kCommand, + nss.getCommandNS(), + applyOps2.toBson(), + testUuid(), + boost::none, // fromMigrate + boost::none, // o2 field + applyOpsOpTime2, + sessionInfo, + applyOpsOpTime1); + + repl::OpTime applyOpsOpTime3(Timestamp(100, 3), 1); + Document applyOps3{ + {"applyOps", V{std::vector<Document>{}}}, + {"partialTxn", true}, + }; + + auto transactionEntry3 = makeOplogEntry(OpTypeEnum::kCommand, + nss.getCommandNS(), + applyOps3.toBson(), + testUuid(), + boost::none, // fromMigrate + boost::none, // o2 field + applyOpsOpTime3, + sessionInfo, + applyOpsOpTime2); + + repl::OpTime applyOpsOpTime4(Timestamp(100, 4), 1); + Document applyOps4{ + {"applyOps", + V{std::vector<Document>{D{{"op", "i"_sd}, + {"ns", nss.ns()}, + {"ui", testUuid()}, + {"o", V{Document{{"_id", 456}}}}}}}}, + {"partialTxn", true}, + }; + + auto transactionEntry4 = makeOplogEntry(OpTypeEnum::kCommand, + nss.getCommandNS(), + applyOps4.toBson(), + testUuid(), + boost::none, // fromMigrate + boost::none, // o2 field + applyOpsOpTime4, + sessionInfo, + applyOpsOpTime3); + + repl::OpTime applyOpsOpTime5(Timestamp(100, 5), 1); + Document applyOps5{ + {"applyOps", V{std::vector<Document>{}}}, + /* The absence of the "partialTxn" and "prepare" fields indicates that this command commits + the transaction. */ + }; + + auto transactionEntry5 = makeOplogEntry(OpTypeEnum::kCommand, + nss.getCommandNS(), + applyOps5.toBson(), + testUuid(), + boost::none, // fromMigrate + boost::none, // o2 field + applyOpsOpTime5, + sessionInfo, + applyOpsOpTime4); + + // We do not use the checkTransformation() pattern that other tests use since we expect multiple + // documents to be returned from one applyOps. + auto stages = makeStages(transactionEntry5); + auto transform = stages[2].get(); + invariant(dynamic_cast<DocumentSourceChangeStreamTransform*>(transform) != nullptr); + + // Populate the MockTransactionHistoryEditor in reverse chronological order. + getExpCtx()->mongoProcessInterface = + std::make_unique<MockMongoInterface>(std::vector<FieldPath>{}, + std::vector<repl::OplogEntry>{transactionEntry5, + transactionEntry4, + transactionEntry3, + transactionEntry2, + transactionEntry1}); + + // We should get three documents from the change stream, based on the documents in the two + // applyOps entries. + auto next = transform->getNext(); + ASSERT(next.isAdvanced()); + auto nextDoc = next.releaseDocument(); + ASSERT_EQ(nextDoc[DSChangeStream::kTxnNumberField].getLong(), *sessionInfo.getTxnNumber()); + ASSERT_EQ(nextDoc[DSChangeStream::kOperationTypeField].getString(), + DSChangeStream::kInsertOpType); + ASSERT_EQ(nextDoc[DSChangeStream::kFullDocumentField]["_id"].getInt(), 123); + ASSERT_EQ( + nextDoc["lsid"].getDocument().toBson().woCompare(sessionInfo.getSessionId()->toBSON()), 0); + auto resumeToken = ResumeToken::parse(nextDoc["_id"].getDocument()).toDocument(); + ASSERT_DOCUMENT_EQ(resumeToken, + makeResumeToken(applyOpsOpTime5.getTimestamp(), + testUuid(), + V{D{}}, + ResumeTokenData::FromInvalidate::kNotFromInvalidate, + 0)); + + next = transform->getNext(); + ASSERT(next.isAdvanced()); + nextDoc = next.releaseDocument(); + ASSERT_EQ(nextDoc[DSChangeStream::kTxnNumberField].getLong(), *sessionInfo.getTxnNumber()); + ASSERT_EQ(nextDoc[DSChangeStream::kOperationTypeField].getString(), + DSChangeStream::kInsertOpType); + ASSERT_EQ(nextDoc[DSChangeStream::kFullDocumentField]["_id"].getInt(), 456); + ASSERT_EQ( + nextDoc["lsid"].getDocument().toBson().woCompare(sessionInfo.getSessionId()->toBSON()), 0); + resumeToken = ResumeToken::parse(nextDoc["_id"].getDocument()).toDocument(); + ASSERT_DOCUMENT_EQ(resumeToken, + makeResumeToken(applyOpsOpTime5.getTimestamp(), + testUuid(), + V{D{}}, + ResumeTokenData::FromInvalidate::kNotFromInvalidate, + 1)); +} + +TEST_F(ChangeStreamStageTest, TransactionWithOnlyEmptyOplogEntries) { + OperationSessionInfo sessionInfo; + sessionInfo.setTxnNumber(1); + sessionInfo.setSessionId(makeLogicalSessionIdForTest()); + + // Create a transaction that is chained across 2 applyOps oplog entries. This test verifies that + // a change stream correctly reads an empty transaction and does not observe any events from it. + repl::OpTime applyOpsOpTime1(Timestamp(100, 1), 1); + Document applyOps1{ + {"applyOps", V{std::vector<Document>{}}}, + {"partialTxn", true}, + }; + + auto transactionEntry1 = makeOplogEntry(OpTypeEnum::kCommand, + nss.getCommandNS(), + applyOps1.toBson(), + testUuid(), + boost::none, // fromMigrate + boost::none, // o2 field + applyOpsOpTime1, + sessionInfo, + repl::OpTime()); + + repl::OpTime applyOpsOpTime2(Timestamp(100, 2), 1); + Document applyOps2{ + {"applyOps", V{std::vector<Document>{}}}, + /* The absence of the "partialTxn" and "prepare" fields indicates that this command commits + the transaction. */ + }; + + auto transactionEntry2 = makeOplogEntry(OpTypeEnum::kCommand, + nss.getCommandNS(), + applyOps2.toBson(), + testUuid(), + boost::none, // fromMigrate + boost::none, // o2 field + applyOpsOpTime2, + sessionInfo, + applyOpsOpTime1); + + // We do not use the checkTransformation() pattern that other tests use since we expect multiple + // documents to be returned from one applyOps. + auto stages = makeStages(transactionEntry2); + auto transform = stages[2].get(); + invariant(dynamic_cast<DocumentSourceChangeStreamTransform*>(transform) != nullptr); + + // Populate the MockTransactionHistoryEditor in reverse chronological order. + getExpCtx()->mongoProcessInterface = std::make_unique<MockMongoInterface>( + std::vector<FieldPath>{}, + std::vector<repl::OplogEntry>{transactionEntry2, transactionEntry1}); + + // We should get three documents from the change stream, based on the documents in the two + // applyOps entries. + auto next = transform->getNext(); + ASSERT(!next.isAdvanced()); +} + TEST_F(ChangeStreamStageTest, PreparedTransactionWithMultipleOplogEntries) { OperationSessionInfo sessionInfo; sessionInfo.setTxnNumber(1); @@ -1299,6 +1509,124 @@ TEST_F(ChangeStreamStageTest, PreparedTransactionWithMultipleOplogEntries) { V{D{}}, ResumeTokenData::FromInvalidate::kNotFromInvalidate, 2)); + + next = transform->getNext(); + ASSERT(!next.isAdvanced()); +} + +TEST_F(ChangeStreamStageTest, PreparedTransactionEndingWithEmptyApplyOps) { + OperationSessionInfo sessionInfo; + sessionInfo.setTxnNumber(1); + sessionInfo.setSessionId(makeLogicalSessionIdForTest()); + + // Create two applyOps entries that together represent a whole transaction. + repl::OpTime applyOpsOpTime1(Timestamp(99, 1), 1); + Document applyOps1{ + {"applyOps", + V{std::vector<Document>{ + D{{"op", "i"_sd}, {"ns", nss.ns()}, {"ui", testUuid()}, {"o", V{D{{"_id", 123}}}}}, + D{{"op", "i"_sd}, {"ns", nss.ns()}, {"ui", testUuid()}, {"o", V{D{{"_id", 456}}}}}, + }}}, + {"partialTxn", true}, + }; + + auto transactionEntry1 = makeOplogEntry(OpTypeEnum::kCommand, + nss.getCommandNS(), + applyOps1.toBson(), + testUuid(), + boost::none, // fromMigrate + boost::none, // o2 field + applyOpsOpTime1, + sessionInfo, + repl::OpTime()); + + repl::OpTime applyOpsOpTime2(Timestamp(99, 2), 1); + Document applyOps2{ + {"applyOps", V{std::vector<Document>{}}}, + {"prepare", true}, + }; + + // The second applyOps is empty. + auto transactionEntry2 = makeOplogEntry(OpTypeEnum::kCommand, + nss.getCommandNS(), + applyOps2.toBson(), + testUuid(), + boost::none, // fromMigrate + boost::none, // o2 field + applyOpsOpTime2, + sessionInfo, + applyOpsOpTime1); + + // Create an oplog entry representing the commit for the prepared transaction. + auto commitEntry = + repl::OplogEntry(kDefaultOpTime, // optime + 1LL, // hash + OpTypeEnum::kCommand, // opType + nss.getCommandNS(), // namespace + boost::none, // uuid + boost::none, // fromMigrate + repl::OplogEntry::kOplogVersion, // version + BSON("commitTransaction" << 1), // o + boost::none, // o2 + sessionInfo, // sessionInfo + boost::none, // upsert + Date_t(), // wall clock time + boost::none, // statement id + applyOpsOpTime2, // optime of previous write within same transaction + boost::none, // pre-image optime + boost::none); // post-image optime + + // We do not use the checkTransformation() pattern that other tests use since we expect multiple + // documents to be returned from one applyOps. + auto stages = makeStages(commitEntry); + auto transform = stages[2].get(); + invariant(dynamic_cast<DocumentSourceChangeStreamTransform*>(transform) != nullptr); + + // Populate the MockTransactionHistoryEditor in reverse chronological order. + getExpCtx()->mongoProcessInterface = std::make_unique<MockMongoInterface>( + std::vector<FieldPath>{}, + std::vector<repl::OplogEntry>{commitEntry, transactionEntry2, transactionEntry1}); + + // We should get two documents from the change stream, based on the documents in the non-empty + // applyOps entry. + auto next = transform->getNext(); + ASSERT(next.isAdvanced()); + auto nextDoc = next.releaseDocument(); + ASSERT_EQ(nextDoc[DSChangeStream::kTxnNumberField].getLong(), *sessionInfo.getTxnNumber()); + ASSERT_EQ(nextDoc[DSChangeStream::kOperationTypeField].getString(), + DSChangeStream::kInsertOpType); + ASSERT_EQ(nextDoc[DSChangeStream::kFullDocumentField]["_id"].getInt(), 123); + ASSERT_EQ( + nextDoc["lsid"].getDocument().toBson().woCompare(sessionInfo.getSessionId()->toBSON()), 0); + auto resumeToken = ResumeToken::parse(nextDoc["_id"].getDocument()).toDocument(); + ASSERT_DOCUMENT_EQ( + resumeToken, + makeResumeToken(kDefaultOpTime.getTimestamp(), // Timestamp of the commitCommand. + testUuid(), + V{D{}}, + ResumeTokenData::FromInvalidate::kNotFromInvalidate, + 0)); + + next = transform->getNext(); + ASSERT(next.isAdvanced()); + nextDoc = next.releaseDocument(); + ASSERT_EQ(nextDoc[DSChangeStream::kTxnNumberField].getLong(), *sessionInfo.getTxnNumber()); + ASSERT_EQ(nextDoc[DSChangeStream::kOperationTypeField].getString(), + DSChangeStream::kInsertOpType); + ASSERT_EQ(nextDoc[DSChangeStream::kFullDocumentField]["_id"].getInt(), 456); + ASSERT_EQ( + nextDoc["lsid"].getDocument().toBson().woCompare(sessionInfo.getSessionId()->toBSON()), 0); + resumeToken = ResumeToken::parse(nextDoc["_id"].getDocument()).toDocument(); + ASSERT_DOCUMENT_EQ( + resumeToken, + makeResumeToken(kDefaultOpTime.getTimestamp(), // Timestamp of the commitCommand. + testUuid(), + V{D{}}, + ResumeTokenData::FromInvalidate::kNotFromInvalidate, + 1)); + + next = transform->getNext(); + ASSERT(!next.isAdvanced()); } TEST_F(ChangeStreamStageTest, TransformApplyOps) { diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index 0773a2fb347..e055104b91a 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -534,7 +534,6 @@ DocumentSourceChangeStreamTransform::TransactionOpIterator::TransactionOpIterato } checkValueType(_currentApplyOps, "applyOps", BSONType::Array); - invariant(_currentApplyOps.getArrayLength() > 0); // Initialize iterators at the beginning of the transaction. _currentApplyOpsIt = _currentApplyOps.getArray().begin(); |