summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline
diff options
context:
space:
mode:
authorJustin Seyster <justin.seyster@mongodb.com>2021-01-11 18:37:53 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-01-12 05:00:32 +0000
commite9122ba5078eca4fbc7ea858221dba6af00e90a9 (patch)
treee45d27dc9d477e10c9b8f01a4a900a3572fbf123 /src/mongo/db/pipeline
parentb6cb60db7b6e3de8ebee94a9711334d3fc037d0c (diff)
downloadmongo-e9122ba5078eca4fbc7ea858221dba6af00e90a9.tar.gz
SERVER-50769 Change streams no longer balk at empty applyOps
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp332
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.cpp1
3 files changed, 335 insertions, 4 deletions
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index e9a31a8912a..bad92703daa 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -173,8 +173,10 @@ BSONObj getTxnApplyOpsFilter(BSONElement nsMatch, const NamespaceString& nss) {
BSONObjBuilder applyOpsBuilder;
applyOpsBuilder.append("op", "c");
- // "o.applyOps" must be an array with at least one element
- applyOpsBuilder.append("o.applyOps.0", BSON("$exists" << true));
+ // "o.applyOps" stores the list of operations, so it must be an array.
+ applyOpsBuilder.append("o.applyOps",
+ BSON("$type"
+ << "array"));
applyOpsBuilder.append("lsid", BSON("$exists" << true));
applyOpsBuilder.append("txnNumber", BSON("$exists" << true));
applyOpsBuilder.append("o.prepare", BSON("$not" << BSON("$eq" << true)));
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index deb4088d97c..f84e4279257 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -1287,7 +1287,7 @@ TEST_F(ChangeStreamStageTest, TransactionWithMultipleOplogEntries) {
V{std::vector<Document>{
D{{"op", "i"_sd}, {"ns", nss.ns()}, {"ui", testUuid()}, {"o", V{D{{"_id", 789}}}}},
}}},
- /* The abscence of the "partialTxn" and "prepare" fields indicates that this command commits
+ /* The absence of the "partialTxn" and "prepare" fields indicates that this command commits
the transaction. */
};
@@ -1366,6 +1366,216 @@ TEST_F(ChangeStreamStageTest, TransactionWithMultipleOplogEntries) {
2));
}
+TEST_F(ChangeStreamStageTest, TransactionWithEmptyOplogEntries) {
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setTxnNumber(1);
+ sessionInfo.setSessionId(makeLogicalSessionIdForTest());
+
+ // Create a transaction that is chained across 5 applyOps oplog entries. The first, third, and
+ // final oplog entries in the transaction chain contain empty applyOps arrays. The test verifies
+ // that change streams (1) correctly detect the transaction chain despite the fact that the
+ // final applyOps, which implicitly commits the transaction, is empty; and (2) behaves correctly
+ // upon encountering empty applyOps at other stages of the transaction chain.
+ repl::OpTime applyOpsOpTime1(Timestamp(100, 1), 1);
+ Document applyOps1{
+ {"applyOps", V{std::vector<Document>{}}},
+ {"partialTxn", true},
+ };
+
+ auto transactionEntry1 = makeOplogEntry(OpTypeEnum::kCommand,
+ nss.getCommandNS(),
+ applyOps1.toBson(),
+ testUuid(),
+ boost::none, // fromMigrate
+ boost::none, // o2 field
+ applyOpsOpTime1,
+ sessionInfo,
+ repl::OpTime());
+
+ repl::OpTime applyOpsOpTime2(Timestamp(100, 2), 1);
+ Document applyOps2{
+ {"applyOps",
+ V{std::vector<Document>{
+ D{{"op", "i"_sd},
+ {"ns", nss.ns()},
+ {"ui", testUuid()},
+ {"o", V{Document{{"_id", 123}}}}},
+ }}},
+ {"partialTxn", true},
+ };
+
+ auto transactionEntry2 = makeOplogEntry(OpTypeEnum::kCommand,
+ nss.getCommandNS(),
+ applyOps2.toBson(),
+ testUuid(),
+ boost::none, // fromMigrate
+ boost::none, // o2 field
+ applyOpsOpTime2,
+ sessionInfo,
+ applyOpsOpTime1);
+
+ repl::OpTime applyOpsOpTime3(Timestamp(100, 3), 1);
+ Document applyOps3{
+ {"applyOps", V{std::vector<Document>{}}},
+ {"partialTxn", true},
+ };
+
+ auto transactionEntry3 = makeOplogEntry(OpTypeEnum::kCommand,
+ nss.getCommandNS(),
+ applyOps3.toBson(),
+ testUuid(),
+ boost::none, // fromMigrate
+ boost::none, // o2 field
+ applyOpsOpTime3,
+ sessionInfo,
+ applyOpsOpTime2);
+
+ repl::OpTime applyOpsOpTime4(Timestamp(100, 4), 1);
+ Document applyOps4{
+ {"applyOps",
+ V{std::vector<Document>{D{{"op", "i"_sd},
+ {"ns", nss.ns()},
+ {"ui", testUuid()},
+ {"o", V{Document{{"_id", 456}}}}}}}},
+ {"partialTxn", true},
+ };
+
+ auto transactionEntry4 = makeOplogEntry(OpTypeEnum::kCommand,
+ nss.getCommandNS(),
+ applyOps4.toBson(),
+ testUuid(),
+ boost::none, // fromMigrate
+ boost::none, // o2 field
+ applyOpsOpTime4,
+ sessionInfo,
+ applyOpsOpTime3);
+
+ repl::OpTime applyOpsOpTime5(Timestamp(100, 5), 1);
+ Document applyOps5{
+ {"applyOps", V{std::vector<Document>{}}},
+ /* The absence of the "partialTxn" and "prepare" fields indicates that this command commits
+ the transaction. */
+ };
+
+ auto transactionEntry5 = makeOplogEntry(OpTypeEnum::kCommand,
+ nss.getCommandNS(),
+ applyOps5.toBson(),
+ testUuid(),
+ boost::none, // fromMigrate
+ boost::none, // o2 field
+ applyOpsOpTime5,
+ sessionInfo,
+ applyOpsOpTime4);
+
+ // We do not use the checkTransformation() pattern that other tests use since we expect multiple
+ // documents to be returned from one applyOps.
+ auto stages = makeStages(transactionEntry5);
+ auto transform = stages[2].get();
+ invariant(dynamic_cast<DocumentSourceChangeStreamTransform*>(transform) != nullptr);
+
+ // Populate the MockTransactionHistoryEditor in reverse chronological order.
+ getExpCtx()->mongoProcessInterface =
+ std::make_unique<MockMongoInterface>(std::vector<FieldPath>{},
+ std::vector<repl::OplogEntry>{transactionEntry5,
+ transactionEntry4,
+ transactionEntry3,
+ transactionEntry2,
+ transactionEntry1});
+
+ // We should get three documents from the change stream, based on the documents in the two
+ // applyOps entries.
+ auto next = transform->getNext();
+ ASSERT(next.isAdvanced());
+ auto nextDoc = next.releaseDocument();
+ ASSERT_EQ(nextDoc[DSChangeStream::kTxnNumberField].getLong(), *sessionInfo.getTxnNumber());
+ ASSERT_EQ(nextDoc[DSChangeStream::kOperationTypeField].getString(),
+ DSChangeStream::kInsertOpType);
+ ASSERT_EQ(nextDoc[DSChangeStream::kFullDocumentField]["_id"].getInt(), 123);
+ ASSERT_EQ(
+ nextDoc["lsid"].getDocument().toBson().woCompare(sessionInfo.getSessionId()->toBSON()), 0);
+ auto resumeToken = ResumeToken::parse(nextDoc["_id"].getDocument()).toDocument();
+ ASSERT_DOCUMENT_EQ(resumeToken,
+ makeResumeToken(applyOpsOpTime5.getTimestamp(),
+ testUuid(),
+ V{D{}},
+ ResumeTokenData::FromInvalidate::kNotFromInvalidate,
+ 0));
+
+ next = transform->getNext();
+ ASSERT(next.isAdvanced());
+ nextDoc = next.releaseDocument();
+ ASSERT_EQ(nextDoc[DSChangeStream::kTxnNumberField].getLong(), *sessionInfo.getTxnNumber());
+ ASSERT_EQ(nextDoc[DSChangeStream::kOperationTypeField].getString(),
+ DSChangeStream::kInsertOpType);
+ ASSERT_EQ(nextDoc[DSChangeStream::kFullDocumentField]["_id"].getInt(), 456);
+ ASSERT_EQ(
+ nextDoc["lsid"].getDocument().toBson().woCompare(sessionInfo.getSessionId()->toBSON()), 0);
+ resumeToken = ResumeToken::parse(nextDoc["_id"].getDocument()).toDocument();
+ ASSERT_DOCUMENT_EQ(resumeToken,
+ makeResumeToken(applyOpsOpTime5.getTimestamp(),
+ testUuid(),
+ V{D{}},
+ ResumeTokenData::FromInvalidate::kNotFromInvalidate,
+ 1));
+}
+
+TEST_F(ChangeStreamStageTest, TransactionWithOnlyEmptyOplogEntries) {
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setTxnNumber(1);
+ sessionInfo.setSessionId(makeLogicalSessionIdForTest());
+
+ // Create a transaction that is chained across 2 applyOps oplog entries. This test verifies that
+ // a change stream correctly reads an empty transaction and does not observe any events from it.
+ repl::OpTime applyOpsOpTime1(Timestamp(100, 1), 1);
+ Document applyOps1{
+ {"applyOps", V{std::vector<Document>{}}},
+ {"partialTxn", true},
+ };
+
+ auto transactionEntry1 = makeOplogEntry(OpTypeEnum::kCommand,
+ nss.getCommandNS(),
+ applyOps1.toBson(),
+ testUuid(),
+ boost::none, // fromMigrate
+ boost::none, // o2 field
+ applyOpsOpTime1,
+ sessionInfo,
+ repl::OpTime());
+
+ repl::OpTime applyOpsOpTime2(Timestamp(100, 2), 1);
+ Document applyOps2{
+ {"applyOps", V{std::vector<Document>{}}},
+ /* The absence of the "partialTxn" and "prepare" fields indicates that this command commits
+ the transaction. */
+ };
+
+ auto transactionEntry2 = makeOplogEntry(OpTypeEnum::kCommand,
+ nss.getCommandNS(),
+ applyOps2.toBson(),
+ testUuid(),
+ boost::none, // fromMigrate
+ boost::none, // o2 field
+ applyOpsOpTime2,
+ sessionInfo,
+ applyOpsOpTime1);
+
+ // We do not use the checkTransformation() pattern that other tests use since we expect multiple
+ // documents to be returned from one applyOps.
+ auto stages = makeStages(transactionEntry2);
+ auto transform = stages[2].get();
+ invariant(dynamic_cast<DocumentSourceChangeStreamTransform*>(transform) != nullptr);
+
+ // Populate the MockTransactionHistoryEditor in reverse chronological order.
+ getExpCtx()->mongoProcessInterface = std::make_unique<MockMongoInterface>(
+ std::vector<FieldPath>{},
+ std::vector<repl::OplogEntry>{transactionEntry2, transactionEntry1});
+
+ // We should get three documents from the change stream, based on the documents in the two
+ // applyOps entries.
+ auto next = transform->getNext();
+ ASSERT(!next.isAdvanced());
+}
+
TEST_F(ChangeStreamStageTest, PreparedTransactionWithMultipleOplogEntries) {
OperationSessionInfo sessionInfo;
sessionInfo.setTxnNumber(1);
@@ -1498,6 +1708,126 @@ TEST_F(ChangeStreamStageTest, PreparedTransactionWithMultipleOplogEntries) {
V{D{}},
ResumeTokenData::FromInvalidate::kNotFromInvalidate,
2));
+
+ next = transform->getNext();
+ ASSERT(!next.isAdvanced());
+}
+
+TEST_F(ChangeStreamStageTest, PreparedTransactionEndingWithEmptyApplyOps) {
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setTxnNumber(1);
+ sessionInfo.setSessionId(makeLogicalSessionIdForTest());
+
+ // Create two applyOps entries that together represent a whole transaction.
+ repl::OpTime applyOpsOpTime1(Timestamp(99, 1), 1);
+ Document applyOps1{
+ {"applyOps",
+ V{std::vector<Document>{
+ D{{"op", "i"_sd}, {"ns", nss.ns()}, {"ui", testUuid()}, {"o", V{D{{"_id", 123}}}}},
+ D{{"op", "i"_sd}, {"ns", nss.ns()}, {"ui", testUuid()}, {"o", V{D{{"_id", 456}}}}},
+ }}},
+ {"partialTxn", true},
+ };
+
+ auto transactionEntry1 = makeOplogEntry(OpTypeEnum::kCommand,
+ nss.getCommandNS(),
+ applyOps1.toBson(),
+ testUuid(),
+ boost::none, // fromMigrate
+ boost::none, // o2 field
+ applyOpsOpTime1,
+ sessionInfo,
+ repl::OpTime());
+
+ repl::OpTime applyOpsOpTime2(Timestamp(99, 2), 1);
+ Document applyOps2{
+ {"applyOps", V{std::vector<Document>{}}},
+ {"prepare", true},
+ };
+
+ // The second applyOps is empty.
+ auto transactionEntry2 = makeOplogEntry(OpTypeEnum::kCommand,
+ nss.getCommandNS(),
+ applyOps2.toBson(),
+ testUuid(),
+ boost::none, // fromMigrate
+ boost::none, // o2 field
+ applyOpsOpTime2,
+ sessionInfo,
+ applyOpsOpTime1);
+
+ // Create an oplog entry representing the commit for the prepared transaction.
+ auto commitEntry = repl::DurableOplogEntry(
+ kDefaultOpTime, // optime
+ 1LL, // hash
+ OpTypeEnum::kCommand, // opType
+ nss.getCommandNS(), // namespace
+ boost::none, // uuid
+ boost::none, // fromMigrate
+ repl::OplogEntry::kOplogVersion, // version
+ BSON("commitTransaction" << 1), // o
+ boost::none, // o2
+ sessionInfo, // sessionInfo
+ boost::none, // upsert
+ Date_t(), // wall clock time
+ boost::none, // statement id
+ applyOpsOpTime2, // optime of previous write within same transaction
+ boost::none, // pre-image optime
+ boost::none, // post-image optime
+ boost::none, // ShardId of resharding recipient
+ boost::none); // _id
+
+ // We do not use the checkTransformation() pattern that other tests use since we expect multiple
+ // documents to be returned from one applyOps.
+ auto stages = makeStages(commitEntry);
+ auto transform = stages[2].get();
+ invariant(dynamic_cast<DocumentSourceChangeStreamTransform*>(transform) != nullptr);
+
+ // Populate the MockTransactionHistoryEditor in reverse chronological order.
+ getExpCtx()->mongoProcessInterface = std::make_unique<MockMongoInterface>(
+ std::vector<FieldPath>{},
+ std::vector<repl::OplogEntry>{commitEntry, transactionEntry2, transactionEntry1});
+
+ // We should get two documents from the change stream, based on the documents in the non-empty
+ // applyOps entry.
+ auto next = transform->getNext();
+ ASSERT(next.isAdvanced());
+ auto nextDoc = next.releaseDocument();
+ ASSERT_EQ(nextDoc[DSChangeStream::kTxnNumberField].getLong(), *sessionInfo.getTxnNumber());
+ ASSERT_EQ(nextDoc[DSChangeStream::kOperationTypeField].getString(),
+ DSChangeStream::kInsertOpType);
+ ASSERT_EQ(nextDoc[DSChangeStream::kFullDocumentField]["_id"].getInt(), 123);
+ ASSERT_EQ(
+ nextDoc["lsid"].getDocument().toBson().woCompare(sessionInfo.getSessionId()->toBSON()), 0);
+ auto resumeToken = ResumeToken::parse(nextDoc["_id"].getDocument()).toDocument();
+ ASSERT_DOCUMENT_EQ(
+ resumeToken,
+ makeResumeToken(kDefaultOpTime.getTimestamp(), // Timestamp of the commitCommand.
+ testUuid(),
+ V{D{}},
+ ResumeTokenData::FromInvalidate::kNotFromInvalidate,
+ 0));
+
+ next = transform->getNext();
+ ASSERT(next.isAdvanced());
+ nextDoc = next.releaseDocument();
+ ASSERT_EQ(nextDoc[DSChangeStream::kTxnNumberField].getLong(), *sessionInfo.getTxnNumber());
+ ASSERT_EQ(nextDoc[DSChangeStream::kOperationTypeField].getString(),
+ DSChangeStream::kInsertOpType);
+ ASSERT_EQ(nextDoc[DSChangeStream::kFullDocumentField]["_id"].getInt(), 456);
+ ASSERT_EQ(
+ nextDoc["lsid"].getDocument().toBson().woCompare(sessionInfo.getSessionId()->toBSON()), 0);
+ resumeToken = ResumeToken::parse(nextDoc["_id"].getDocument()).toDocument();
+ ASSERT_DOCUMENT_EQ(
+ resumeToken,
+ makeResumeToken(kDefaultOpTime.getTimestamp(), // Timestamp of the commitCommand.
+ testUuid(),
+ V{D{}},
+ ResumeTokenData::FromInvalidate::kNotFromInvalidate,
+ 1));
+
+ next = transform->getNext();
+ ASSERT(!next.isAdvanced());
}
TEST_F(ChangeStreamStageTest, TransformApplyOps) {
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
index fd12c8822c3..693035d2417 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
@@ -552,7 +552,6 @@ DocumentSourceChangeStreamTransform::TransactionOpIterator::TransactionOpIterato
}
checkValueType(_currentApplyOps, "applyOps", BSONType::Array);
- invariant(_currentApplyOps.getArrayLength() > 0);
// Initialize iterators at the beginning of the transaction.
_currentApplyOpsIt = _currentApplyOps.getArray().begin();