diff options
author | Justin Seyster <justin.seyster@mongodb.com> | 2021-01-11 18:37:53 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-01-12 18:43:54 +0000 |
commit | 5593fd8e33b60c75802edab304e23998fa0ce8a5 (patch) | |
tree | 137d5d51d5ca9fc0b881054128d9a4cd142921af /src/mongo | |
parent | 425a5ff8f6a4566aa17a2b25875a1cb12037c797 (diff) | |
download | mongo-5593fd8e33b60c75802edab304e23998fa0ce8a5.tar.gz |
SERVER-50769 Change streams no longer balk at empty applyOpsr4.2.12-rc0r4.2.12
(cherry picked from commit e9122ba5078eca4fbc7ea858221dba6af00e90a9)
Diffstat (limited to 'src/mongo')
3 files changed, 333 insertions, 4 deletions
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index 012c16ffbaa..82a1b177e92 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -177,8 +177,10 @@ BSONObj getTxnApplyOpsFilter(BSONElement nsMatch, const NamespaceString& nss) { BSONObjBuilder applyOpsBuilder; applyOpsBuilder.append("op", "c"); - // "o.applyOps" must be an array with at least one element - applyOpsBuilder.append("o.applyOps.0", BSON("$exists" << true)); + // "o.applyOps" stores the list of operations, so it must be an array. + applyOpsBuilder.append("o.applyOps", + BSON("$type" + << "array")); applyOpsBuilder.append("lsid", BSON("$exists" << true)); applyOpsBuilder.append("txnNumber", BSON("$exists" << true)); applyOpsBuilder.append("o.prepare", BSON("$not" << BSON("$eq" << true))); diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index eb2583f705b..e9c0082f632 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -1090,7 +1090,7 @@ TEST_F(ChangeStreamStageTest, TransactionWithMultipleOplogEntries) { V{std::vector<Document>{ D{{"op", "i"_sd}, {"ns", nss.ns()}, {"ui", testUuid()}, {"o", V{D{{"_id", 789}}}}}, }}}, - /* The abscence of the "partialTxn" and "prepare" fields indicates that this command commits + /* The absence of the "partialTxn" and "prepare" fields indicates that this command commits the transaction. */ }; @@ -1169,6 +1169,216 @@ TEST_F(ChangeStreamStageTest, TransactionWithMultipleOplogEntries) { 2)); } +TEST_F(ChangeStreamStageTest, TransactionWithEmptyOplogEntries) { + OperationSessionInfo sessionInfo; + sessionInfo.setTxnNumber(1); + sessionInfo.setSessionId(makeLogicalSessionIdForTest()); + + // Create a transaction that is chained across 5 applyOps oplog entries. The first, third, and + // final oplog entries in the transaction chain contain empty applyOps arrays. The test verifies + // that change streams (1) correctly detect the transaction chain despite the fact that the + // final applyOps, which implicitly commits the transaction, is empty; and (2) behaves correctly + // upon encountering empty applyOps at other stages of the transaction chain. + repl::OpTime applyOpsOpTime1(Timestamp(100, 1), 1); + Document applyOps1{ + {"applyOps", V{std::vector<Document>{}}}, + {"partialTxn", true}, + }; + + auto transactionEntry1 = makeOplogEntry(OpTypeEnum::kCommand, + nss.getCommandNS(), + applyOps1.toBson(), + testUuid(), + boost::none, // fromMigrate + boost::none, // o2 field + applyOpsOpTime1, + sessionInfo, + repl::OpTime()); + + repl::OpTime applyOpsOpTime2(Timestamp(100, 2), 1); + Document applyOps2{ + {"applyOps", + V{std::vector<Document>{ + D{{"op", "i"_sd}, + {"ns", nss.ns()}, + {"ui", testUuid()}, + {"o", V{Document{{"_id", 123}}}}}, + }}}, + {"partialTxn", true}, + }; + + auto transactionEntry2 = makeOplogEntry(OpTypeEnum::kCommand, + nss.getCommandNS(), + applyOps2.toBson(), + testUuid(), + boost::none, // fromMigrate + boost::none, // o2 field + applyOpsOpTime2, + sessionInfo, + applyOpsOpTime1); + + repl::OpTime applyOpsOpTime3(Timestamp(100, 3), 1); + Document applyOps3{ + {"applyOps", V{std::vector<Document>{}}}, + {"partialTxn", true}, + }; + + auto transactionEntry3 = makeOplogEntry(OpTypeEnum::kCommand, + nss.getCommandNS(), + applyOps3.toBson(), + testUuid(), + boost::none, // fromMigrate + boost::none, // o2 field + applyOpsOpTime3, + sessionInfo, + applyOpsOpTime2); + + repl::OpTime applyOpsOpTime4(Timestamp(100, 4), 1); + Document applyOps4{ + {"applyOps", + V{std::vector<Document>{D{{"op", "i"_sd}, + {"ns", nss.ns()}, + {"ui", testUuid()}, + {"o", V{Document{{"_id", 456}}}}}}}}, + {"partialTxn", true}, + }; + + auto transactionEntry4 = makeOplogEntry(OpTypeEnum::kCommand, + nss.getCommandNS(), + applyOps4.toBson(), + testUuid(), + boost::none, // fromMigrate + boost::none, // o2 field + applyOpsOpTime4, + sessionInfo, + applyOpsOpTime3); + + repl::OpTime applyOpsOpTime5(Timestamp(100, 5), 1); + Document applyOps5{ + {"applyOps", V{std::vector<Document>{}}}, + /* The absence of the "partialTxn" and "prepare" fields indicates that this command commits + the transaction. */ + }; + + auto transactionEntry5 = makeOplogEntry(OpTypeEnum::kCommand, + nss.getCommandNS(), + applyOps5.toBson(), + testUuid(), + boost::none, // fromMigrate + boost::none, // o2 field + applyOpsOpTime5, + sessionInfo, + applyOpsOpTime4); + + // We do not use the checkTransformation() pattern that other tests use since we expect multiple + // documents to be returned from one applyOps. + auto stages = makeStages(transactionEntry5); + auto transform = stages[2].get(); + invariant(dynamic_cast<DocumentSourceChangeStreamTransform*>(transform) != nullptr); + + // Populate the MockTransactionHistoryEditor in reverse chronological order. + getExpCtx()->mongoProcessInterface = + std::make_unique<MockMongoInterface>(std::vector<FieldPath>{}, + std::vector<repl::OplogEntry>{transactionEntry5, + transactionEntry4, + transactionEntry3, + transactionEntry2, + transactionEntry1}); + + // We should get three documents from the change stream, based on the documents in the two + // applyOps entries. + auto next = transform->getNext(); + ASSERT(next.isAdvanced()); + auto nextDoc = next.releaseDocument(); + ASSERT_EQ(nextDoc[DSChangeStream::kTxnNumberField].getLong(), *sessionInfo.getTxnNumber()); + ASSERT_EQ(nextDoc[DSChangeStream::kOperationTypeField].getString(), + DSChangeStream::kInsertOpType); + ASSERT_EQ(nextDoc[DSChangeStream::kFullDocumentField]["_id"].getInt(), 123); + ASSERT_EQ( + nextDoc["lsid"].getDocument().toBson().woCompare(sessionInfo.getSessionId()->toBSON()), 0); + auto resumeToken = ResumeToken::parse(nextDoc["_id"].getDocument()).toDocument(); + ASSERT_DOCUMENT_EQ(resumeToken, + makeResumeToken(applyOpsOpTime5.getTimestamp(), + testUuid(), + V{D{}}, + ResumeTokenData::FromInvalidate::kNotFromInvalidate, + 0)); + + next = transform->getNext(); + ASSERT(next.isAdvanced()); + nextDoc = next.releaseDocument(); + ASSERT_EQ(nextDoc[DSChangeStream::kTxnNumberField].getLong(), *sessionInfo.getTxnNumber()); + ASSERT_EQ(nextDoc[DSChangeStream::kOperationTypeField].getString(), + DSChangeStream::kInsertOpType); + ASSERT_EQ(nextDoc[DSChangeStream::kFullDocumentField]["_id"].getInt(), 456); + ASSERT_EQ( + nextDoc["lsid"].getDocument().toBson().woCompare(sessionInfo.getSessionId()->toBSON()), 0); + resumeToken = ResumeToken::parse(nextDoc["_id"].getDocument()).toDocument(); + ASSERT_DOCUMENT_EQ(resumeToken, + makeResumeToken(applyOpsOpTime5.getTimestamp(), + testUuid(), + V{D{}}, + ResumeTokenData::FromInvalidate::kNotFromInvalidate, + 1)); +} + +TEST_F(ChangeStreamStageTest, TransactionWithOnlyEmptyOplogEntries) { + OperationSessionInfo sessionInfo; + sessionInfo.setTxnNumber(1); + sessionInfo.setSessionId(makeLogicalSessionIdForTest()); + + // Create a transaction that is chained across 2 applyOps oplog entries. This test verifies that + // a change stream correctly reads an empty transaction and does not observe any events from it. + repl::OpTime applyOpsOpTime1(Timestamp(100, 1), 1); + Document applyOps1{ + {"applyOps", V{std::vector<Document>{}}}, + {"partialTxn", true}, + }; + + auto transactionEntry1 = makeOplogEntry(OpTypeEnum::kCommand, + nss.getCommandNS(), + applyOps1.toBson(), + testUuid(), + boost::none, // fromMigrate + boost::none, // o2 field + applyOpsOpTime1, + sessionInfo, + repl::OpTime()); + + repl::OpTime applyOpsOpTime2(Timestamp(100, 2), 1); + Document applyOps2{ + {"applyOps", V{std::vector<Document>{}}}, + /* The absence of the "partialTxn" and "prepare" fields indicates that this command commits + the transaction. */ + }; + + auto transactionEntry2 = makeOplogEntry(OpTypeEnum::kCommand, + nss.getCommandNS(), + applyOps2.toBson(), + testUuid(), + boost::none, // fromMigrate + boost::none, // o2 field + applyOpsOpTime2, + sessionInfo, + applyOpsOpTime1); + + // We do not use the checkTransformation() pattern that other tests use since we expect multiple + // documents to be returned from one applyOps. + auto stages = makeStages(transactionEntry2); + auto transform = stages[2].get(); + invariant(dynamic_cast<DocumentSourceChangeStreamTransform*>(transform) != nullptr); + + // Populate the MockTransactionHistoryEditor in reverse chronological order. + getExpCtx()->mongoProcessInterface = std::make_unique<MockMongoInterface>( + std::vector<FieldPath>{}, + std::vector<repl::OplogEntry>{transactionEntry2, transactionEntry1}); + + // We should get three documents from the change stream, based on the documents in the two + // applyOps entries. + auto next = transform->getNext(); + ASSERT(!next.isAdvanced()); +} + TEST_F(ChangeStreamStageTest, PreparedTransactionWithMultipleOplogEntries) { OperationSessionInfo sessionInfo; sessionInfo.setTxnNumber(1); @@ -1299,6 +1509,124 @@ TEST_F(ChangeStreamStageTest, PreparedTransactionWithMultipleOplogEntries) { V{D{}}, ResumeTokenData::FromInvalidate::kNotFromInvalidate, 2)); + + next = transform->getNext(); + ASSERT(!next.isAdvanced()); +} + +TEST_F(ChangeStreamStageTest, PreparedTransactionEndingWithEmptyApplyOps) { + OperationSessionInfo sessionInfo; + sessionInfo.setTxnNumber(1); + sessionInfo.setSessionId(makeLogicalSessionIdForTest()); + + // Create two applyOps entries that together represent a whole transaction. + repl::OpTime applyOpsOpTime1(Timestamp(99, 1), 1); + Document applyOps1{ + {"applyOps", + V{std::vector<Document>{ + D{{"op", "i"_sd}, {"ns", nss.ns()}, {"ui", testUuid()}, {"o", V{D{{"_id", 123}}}}}, + D{{"op", "i"_sd}, {"ns", nss.ns()}, {"ui", testUuid()}, {"o", V{D{{"_id", 456}}}}}, + }}}, + {"partialTxn", true}, + }; + + auto transactionEntry1 = makeOplogEntry(OpTypeEnum::kCommand, + nss.getCommandNS(), + applyOps1.toBson(), + testUuid(), + boost::none, // fromMigrate + boost::none, // o2 field + applyOpsOpTime1, + sessionInfo, + repl::OpTime()); + + repl::OpTime applyOpsOpTime2(Timestamp(99, 2), 1); + Document applyOps2{ + {"applyOps", V{std::vector<Document>{}}}, + {"prepare", true}, + }; + + // The second applyOps is empty. + auto transactionEntry2 = makeOplogEntry(OpTypeEnum::kCommand, + nss.getCommandNS(), + applyOps2.toBson(), + testUuid(), + boost::none, // fromMigrate + boost::none, // o2 field + applyOpsOpTime2, + sessionInfo, + applyOpsOpTime1); + + // Create an oplog entry representing the commit for the prepared transaction. + auto commitEntry = + repl::OplogEntry(kDefaultOpTime, // optime + 1LL, // hash + OpTypeEnum::kCommand, // opType + nss.getCommandNS(), // namespace + boost::none, // uuid + boost::none, // fromMigrate + repl::OplogEntry::kOplogVersion, // version + BSON("commitTransaction" << 1), // o + boost::none, // o2 + sessionInfo, // sessionInfo + boost::none, // upsert + Date_t(), // wall clock time + boost::none, // statement id + applyOpsOpTime2, // optime of previous write within same transaction + boost::none, // pre-image optime + boost::none); // post-image optime + + // We do not use the checkTransformation() pattern that other tests use since we expect multiple + // documents to be returned from one applyOps. + auto stages = makeStages(commitEntry); + auto transform = stages[2].get(); + invariant(dynamic_cast<DocumentSourceChangeStreamTransform*>(transform) != nullptr); + + // Populate the MockTransactionHistoryEditor in reverse chronological order. + getExpCtx()->mongoProcessInterface = std::make_unique<MockMongoInterface>( + std::vector<FieldPath>{}, + std::vector<repl::OplogEntry>{commitEntry, transactionEntry2, transactionEntry1}); + + // We should get two documents from the change stream, based on the documents in the non-empty + // applyOps entry. + auto next = transform->getNext(); + ASSERT(next.isAdvanced()); + auto nextDoc = next.releaseDocument(); + ASSERT_EQ(nextDoc[DSChangeStream::kTxnNumberField].getLong(), *sessionInfo.getTxnNumber()); + ASSERT_EQ(nextDoc[DSChangeStream::kOperationTypeField].getString(), + DSChangeStream::kInsertOpType); + ASSERT_EQ(nextDoc[DSChangeStream::kFullDocumentField]["_id"].getInt(), 123); + ASSERT_EQ( + nextDoc["lsid"].getDocument().toBson().woCompare(sessionInfo.getSessionId()->toBSON()), 0); + auto resumeToken = ResumeToken::parse(nextDoc["_id"].getDocument()).toDocument(); + ASSERT_DOCUMENT_EQ( + resumeToken, + makeResumeToken(kDefaultOpTime.getTimestamp(), // Timestamp of the commitCommand. + testUuid(), + V{D{}}, + ResumeTokenData::FromInvalidate::kNotFromInvalidate, + 0)); + + next = transform->getNext(); + ASSERT(next.isAdvanced()); + nextDoc = next.releaseDocument(); + ASSERT_EQ(nextDoc[DSChangeStream::kTxnNumberField].getLong(), *sessionInfo.getTxnNumber()); + ASSERT_EQ(nextDoc[DSChangeStream::kOperationTypeField].getString(), + DSChangeStream::kInsertOpType); + ASSERT_EQ(nextDoc[DSChangeStream::kFullDocumentField]["_id"].getInt(), 456); + ASSERT_EQ( + nextDoc["lsid"].getDocument().toBson().woCompare(sessionInfo.getSessionId()->toBSON()), 0); + resumeToken = ResumeToken::parse(nextDoc["_id"].getDocument()).toDocument(); + ASSERT_DOCUMENT_EQ( + resumeToken, + makeResumeToken(kDefaultOpTime.getTimestamp(), // Timestamp of the commitCommand. + testUuid(), + V{D{}}, + ResumeTokenData::FromInvalidate::kNotFromInvalidate, + 1)); + + next = transform->getNext(); + ASSERT(!next.isAdvanced()); } TEST_F(ChangeStreamStageTest, TransformApplyOps) { diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index 0773a2fb347..e055104b91a 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -534,7 +534,6 @@ DocumentSourceChangeStreamTransform::TransactionOpIterator::TransactionOpIterato } checkValueType(_currentApplyOps, "applyOps", BSONType::Array); - invariant(_currentApplyOps.getArrayLength() > 0); // Initialize iterators at the beginning of the transaction. _currentApplyOpsIt = _currentApplyOps.getArray().begin(); |