diff options
17 files changed, 744 insertions, 264 deletions
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index 7a8451dab0a..20c146a73ca 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -159,19 +159,46 @@ void DocumentSourceChangeStream::checkValueType(const Value v, namespace { /** - * Constructs a filter matching 'applyOps' oplog entries that: - * 1) Represent a committed transaction (i.e., not just the "prepare" part of a two-phase - * transaction). - * 2) Have sub-entries which should be returned in the change stream. + * Constructs a filter matching any 'applyOps' commands that commit a transaction. An 'applyOps' + * command implicitly commits a transaction if _both_ of the following are true: + * 1) it is not marked with the 'partialTxn' field, which would indicate that there are more entries + * to come in the transaction and + * 2) it is not marked with the 'prepare' field, which would indicate that the transaction is only + * committed if there is a follow-up 'commitTransaction' command in the oplog. + * + * This filter will ignore all but the last 'applyOps' command in a transaction comprising multiple + * 'applyOps' commands, and it will ignore all 'applyOps' commands in a prepared transaction. The + * change stream traverses back through the oplog to recover the ignored commands when it sees an + * entry that commits a transaction. + * + * As an optimization, this filter also ignores any transaction with just a single 'applyOps' if + * that 'applyOps' does not contain any updates that modify the namespace that the change stream is + * watching. */ BSONObj getTxnApplyOpsFilter(BSONElement nsMatch, const NamespaceString& nss) { BSONObjBuilder applyOpsBuilder; applyOpsBuilder.append("op", "c"); + + // "o.applyOps" must be an array with at least one element + applyOpsBuilder.append("o.applyOps.0", BSON("$exists" << true)); applyOpsBuilder.append("lsid", BSON("$exists" << true)); applyOpsBuilder.append("txnNumber", BSON("$exists" << true)); applyOpsBuilder.append("o.prepare", BSON("$not" << BSON("$eq" << true))); - const std::string& kApplyOpsNs = "o.applyOps.ns"; - applyOpsBuilder.appendAs(nsMatch, kApplyOpsNs); + applyOpsBuilder.append("o.partialTxn", BSON("$not" << BSON("$eq" << true))); + { + // Include this 'applyOps' if it has an operation with a matching namespace _or_ if it has a + // 'prevOpTime' link to another 'applyOps' command, indicating a multi-entry transaction. + BSONArrayBuilder orBuilder(applyOpsBuilder.subarrayStart("$or")); + { + { + BSONObjBuilder nsMatchBuilder(orBuilder.subobjStart()); + nsMatchBuilder.appendAs(nsMatch, "o.applyOps.ns"_sd); + } + // The default repl::OpTime is the value used to indicate a null "prevOpTime" link. + orBuilder.append(BSON(repl::OplogEntry::kPrevWriteOpTimeInTransactionFieldName + << BSON("$ne" << repl::OpTime().toBSON()))); + } + } return applyOpsBuilder.obj(); } } // namespace diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index 7f093290eb1..016d7132283 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -53,6 +53,7 @@ #include "mongo/db/pipeline/value_comparator.h" #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/repl/replication_coordinator_mock.h" +#include "mongo/db/transaction_history_iterator.h" #include "mongo/stdx/memory.h" #include "mongo/unittest/death_test.h" #include "mongo/unittest/unittest.h" @@ -75,8 +76,6 @@ using DSChangeStream = DocumentSourceChangeStream; static const Timestamp kDefaultTs(100, 1); static const repl::OpTime kDefaultOpTime(kDefaultTs, 1); -static const Timestamp kPreparedTransactionTs(99, 1); -static const repl::OpTime kPreparedTransactionOpTime(kPreparedTransactionTs, 1); static const NamespaceString nss("unittests.change_stream"); static const BSONObj kDefaultSpec = fromjson("{$changeStream: {}}"); @@ -89,15 +88,47 @@ public: struct MockMongoInterface final : public StubMongoProcessInterface { + // This mock iterator simulates a traversal of transaction history in the oplog by returning + // mock oplog entries from a list. + struct MockTransactionHistoryIterator : public TransactionHistoryIteratorBase { + bool hasNext() const final { + return (mockEntriesIt != mockEntries.end()); + } + + repl::OplogEntry next(OperationContext* opCtx) final { + ASSERT(hasNext()); + return *(mockEntriesIt++); + } + + repl::OpTime nextOpTime(OperationContext* opCtx) final { + ASSERT(hasNext()); + return (mockEntriesIt++)->getOpTime(); + } + + std::vector<repl::OplogEntry> mockEntries; + std::vector<repl::OplogEntry>::const_iterator mockEntriesIt; + }; + MockMongoInterface(std::vector<FieldPath> fields, - boost::optional<repl::OplogEntry> preparedTransaction = {}) - : _fields(std::move(fields)), _preparedTransaction(preparedTransaction) {} - - // For tests of "commitTransaction" commands. - repl::OplogEntry lookUpOplogEntryByOpTime(OperationContext* opCtx, - repl::OpTime lookupTime) final { - invariant(_preparedTransaction && (lookupTime == _preparedTransaction->getOpTime())); - return *_preparedTransaction; + std::vector<repl::OplogEntry> transactionEntries = {}) + : _fields(std::move(fields)), _transactionEntries(std::move(transactionEntries)) {} + + // For tests of transactions that involve multiple oplog entries. + std::unique_ptr<TransactionHistoryIteratorBase> createTransactionHistoryIterator( + repl::OpTime time) const { + auto iterator = stdx::make_unique<MockTransactionHistoryIterator>(); + + // Simulate a lookup on the oplog timestamp by manually advancing the iterator until we + // reach the desired timestamp. + iterator->mockEntries = _transactionEntries; + ASSERT(iterator->mockEntries.size() > 0); + for (iterator->mockEntriesIt = iterator->mockEntries.begin(); + iterator->mockEntriesIt->getOpTime() != time; + ++iterator->mockEntriesIt) { + ASSERT(iterator->mockEntriesIt != iterator->mockEntries.end()); + } + + return iterator; } // For "insert" tests. @@ -107,7 +138,17 @@ struct MockMongoInterface final : public StubMongoProcessInterface { } std::vector<FieldPath> _fields; - boost::optional<repl::OplogEntry> _preparedTransaction; + + // Stores oplog entries associated with a commit operation, including the oplog entries that a + // real DocumentSourceChangeStream would not see, because they are marked with a "prepare" or + // "partialTxn" flag. When the DocumentSourceChangeStream sees the commit for the transaction, + // either an explicit "commitCommand" or an implicit commit represented by an "applyOps" that is + // not marked with the "prepare" or "partialTxn" flag, it uses a TransactionHistoryIterator to + // go back and look up these entries. + // + // These entries are stored in the order they would be returned by the + // TransactionHistoryIterator, which is the _reverse_ of the order they appear in the oplog. + std::vector<repl::OplogEntry> _transactionEntries; }; class ChangeStreamStageTest : public ChangeStreamStageTestNoSetup { @@ -129,12 +170,12 @@ public: std::vector<FieldPath> docKeyFields = {}, const BSONObj& spec = kDefaultSpec, const boost::optional<Document> expectedInvalidate = {}, - const boost::optional<repl::OplogEntry> preparedTransaction = {}) { + const std::vector<repl::OplogEntry> transactionEntries = {}) { vector<intrusive_ptr<DocumentSource>> stages = makeStages(entry.toBSON(), spec); auto closeCursor = stages.back(); getExpCtx()->mongoProcessInterface = - stdx::make_unique<MockMongoInterface>(docKeyFields, preparedTransaction); + stdx::make_unique<MockMongoInterface>(docKeyFields, transactionEntries); auto next = closeCursor->getNext(); // Match stage should pass the doc down if expectedDoc is given. @@ -218,11 +259,13 @@ public: ImplicitValue uuid = Value(), ImplicitValue docKey = Value(), ResumeTokenData::FromInvalidate fromInvalidate = - ResumeTokenData::FromInvalidate::kNotFromInvalidate) { + ResumeTokenData::FromInvalidate::kNotFromInvalidate, + size_t txnOpIndex = 0) { ResumeTokenData tokenData; tokenData.clusterTime = ts; tokenData.documentKey = docKey; tokenData.fromInvalidate = fromInvalidate; + tokenData.txnOpIndex = txnOpIndex; if (!uuid.missing()) tokenData.uuid = uuid.getUuid(); return ResumeToken(tokenData).toDocument(); @@ -289,7 +332,9 @@ public: boost::optional<UUID> uuid = testUuid(), boost::optional<bool> fromMigrate = boost::none, boost::optional<BSONObj> object2 = boost::none, - boost::optional<repl::OpTime> opTime = boost::none) { + boost::optional<repl::OpTime> opTime = boost::none, + OperationSessionInfo sessionInfo = {}, + boost::optional<repl::OpTime> prevOpTime = {}) { long long hash = 1LL; return repl::OplogEntry(opTime ? *opTime : kDefaultOpTime, // optime hash, // hash @@ -300,11 +345,11 @@ public: repl::OplogEntry::kOplogVersion, // version object, // o object2, // o2 - {}, // sessionInfo + sessionInfo, // sessionInfo boost::none, // upsert boost::none, // wall clock time boost::none, // statement id - boost::none, // optime of previous write within same transaction + prevOpTime, // optime of previous write within same transaction boost::none, // pre-image optime boost::none); // post-image optime } @@ -962,21 +1007,19 @@ TEST_F(ChangeStreamStageTest, CommitCommandReturnsOperationsFromPreparedTransact Document preparedApplyOps{ {"applyOps", Value{std::vector<Document>{ - Document{{"op", "i"_sd}, - {"ns", nss.ns()}, - {"ui", testUuid()}, - {"o", Value{Document{{"_id", 123}}}}}, + D{{"op", "i"_sd}, {"ns", nss.ns()}, {"ui", testUuid()}, {"o", V{D{{"_id", 123}}}}}, }}}, {"prepare", true}, }; + repl::OpTime applyOpsOpTime(Timestamp(99, 1), 1); auto preparedTransaction = makeOplogEntry(OpTypeEnum::kCommand, nss.getCommandNS(), preparedApplyOps.toBson(), testUuid(), boost::none, // fromMigrate boost::none, // o2 field - kPreparedTransactionOpTime); + applyOpsOpTime); // Create an oplog entry representing the commit for the prepared transaction. The commit has a // 'prevWriteOpTimeInTransaction' value that matches the 'preparedApplyOps' entry, which the @@ -984,23 +1027,23 @@ TEST_F(ChangeStreamStageTest, CommitCommandReturnsOperationsFromPreparedTransact OperationSessionInfo sessionInfo; sessionInfo.setTxnNumber(1); sessionInfo.setSessionId(makeLogicalSessionIdForTest()); - auto oplogEntry = repl::OplogEntry( - kDefaultOpTime, // optime - 1LL, // hash - OpTypeEnum::kCommand, // opType - nss.getCommandNS(), // namespace - boost::none, // uuid - boost::none, // fromMigrate - repl::OplogEntry::kOplogVersion, // version - BSON("commitTransaction" << 1), // o - boost::none, // o2 - sessionInfo, // sessionInfo - boost::none, // upsert - boost::none, // wall clock time - boost::none, // statement id - kPreparedTransactionOpTime, // optime of previous write within same transaction - boost::none, // pre-image optime - boost::none); // post-image optime + auto oplogEntry = + repl::OplogEntry(kDefaultOpTime, // optime + 1LL, // hash + OpTypeEnum::kCommand, // opType + nss.getCommandNS(), // namespace + boost::none, // uuid + boost::none, // fromMigrate + repl::OplogEntry::kOplogVersion, // version + BSON("commitTransaction" << 1), // o + boost::none, // o2 + sessionInfo, // sessionInfo + boost::none, // upsert + boost::none, // wall clock time + boost::none, // statement id + applyOpsOpTime, // optime of previous write within same transaction + boost::none, // pre-image optime + boost::none); // post-image optime // When the DocumentSourceChangeStreamTransform sees the "commitTransaction" oplog entry, we // expect it to return the insert op within our 'preparedApplyOps' oplog entry. @@ -1015,7 +1058,256 @@ TEST_F(ChangeStreamStageTest, CommitCommandReturnsOperationsFromPreparedTransact {DSChangeStream::kDocumentKeyField, D{}}, }; - checkTransformation(oplogEntry, expectedResult, {}, kDefaultSpec, {}, preparedTransaction); + checkTransformation(oplogEntry, expectedResult, {}, kDefaultSpec, {}, {preparedTransaction}); +} + +TEST_F(ChangeStreamStageTest, TransactionWithMultipleOplogEntries) { + OperationSessionInfo sessionInfo; + sessionInfo.setTxnNumber(1); + sessionInfo.setSessionId(makeLogicalSessionIdForTest()); + + // Create two applyOps entries that together represent a whole transaction. + repl::OpTime applyOpsOpTime1(Timestamp(100, 1), 1); + Document applyOps1{ + {"applyOps", + V{std::vector<Document>{ + D{{"op", "i"_sd}, + {"ns", nss.ns()}, + {"ui", testUuid()}, + {"o", V{Document{{"_id", 123}}}}}, + D{{"op", "i"_sd}, + {"ns", nss.ns()}, + {"ui", testUuid()}, + {"o", V{Document{{"_id", 456}}}}}, + }}}, + {"partialTxn", true}, + }; + + auto transactionEntry1 = makeOplogEntry(OpTypeEnum::kCommand, + nss.getCommandNS(), + applyOps1.toBson(), + testUuid(), + boost::none, // fromMigrate + boost::none, // o2 field + applyOpsOpTime1, + sessionInfo, + repl::OpTime()); + + repl::OpTime applyOpsOpTime2(Timestamp(100, 2), 1); + Document applyOps2{ + {"applyOps", + V{std::vector<Document>{ + D{{"op", "i"_sd}, {"ns", nss.ns()}, {"ui", testUuid()}, {"o", V{D{{"_id", 789}}}}}, + }}}, + /* The abscence of the "partialTxn" and "prepare" fields indicates that this command commits + the transaction. */ + }; + + auto transactionEntry2 = makeOplogEntry(OpTypeEnum::kCommand, + nss.getCommandNS(), + applyOps2.toBson(), + testUuid(), + boost::none, // fromMigrate + boost::none, // o2 field + applyOpsOpTime2, + sessionInfo, + applyOpsOpTime1); + + // We do not use the checkTransformation() pattern that other tests use since we expect multiple + // documents to be returned from one applyOps. + auto stages = makeStages(transactionEntry2); + auto transform = stages[2].get(); + invariant(dynamic_cast<DocumentSourceChangeStreamTransform*>(transform) != nullptr); + + // Populate the MockTransactionHistoryEditor in reverse chronological order. + getExpCtx()->mongoProcessInterface = stdx::make_unique<MockMongoInterface>( + std::vector<FieldPath>{}, + std::vector<repl::OplogEntry>{transactionEntry2, transactionEntry1}); + + // We should get three documents from the change stream, based on the documents in the two + // applyOps entries. + auto next = transform->getNext(); + ASSERT(next.isAdvanced()); + auto nextDoc = next.releaseDocument(); + ASSERT_EQ(nextDoc[DSChangeStream::kTxnNumberField].getLong(), *sessionInfo.getTxnNumber()); + ASSERT_EQ(nextDoc[DSChangeStream::kOperationTypeField].getString(), + DSChangeStream::kInsertOpType); + ASSERT_EQ(nextDoc[DSChangeStream::kFullDocumentField]["_id"].getInt(), 123); + ASSERT_EQ( + nextDoc["lsid"].getDocument().toBson().woCompare(sessionInfo.getSessionId()->toBSON()), 0); + auto resumeToken = ResumeToken::parse(nextDoc["_id"].getDocument()).toDocument(); + ASSERT_DOCUMENT_EQ(resumeToken, + makeResumeToken(applyOpsOpTime2.getTimestamp(), + testUuid(), + V{D{}}, + ResumeTokenData::FromInvalidate::kNotFromInvalidate, + 0)); + + next = transform->getNext(); + ASSERT(next.isAdvanced()); + nextDoc = next.releaseDocument(); + ASSERT_EQ(nextDoc[DSChangeStream::kTxnNumberField].getLong(), *sessionInfo.getTxnNumber()); + ASSERT_EQ(nextDoc[DSChangeStream::kOperationTypeField].getString(), + DSChangeStream::kInsertOpType); + ASSERT_EQ(nextDoc[DSChangeStream::kFullDocumentField]["_id"].getInt(), 456); + ASSERT_EQ( + nextDoc["lsid"].getDocument().toBson().woCompare(sessionInfo.getSessionId()->toBSON()), 0); + resumeToken = ResumeToken::parse(nextDoc["_id"].getDocument()).toDocument(); + ASSERT_DOCUMENT_EQ(resumeToken, + makeResumeToken(applyOpsOpTime2.getTimestamp(), + testUuid(), + V{D{}}, + ResumeTokenData::FromInvalidate::kNotFromInvalidate, + 1)); + + next = transform->getNext(); + ASSERT(next.isAdvanced()); + nextDoc = next.releaseDocument(); + ASSERT_EQ(nextDoc[DSChangeStream::kTxnNumberField].getLong(), *sessionInfo.getTxnNumber()); + ASSERT_EQ(nextDoc[DSChangeStream::kOperationTypeField].getString(), + DSChangeStream::kInsertOpType); + ASSERT_EQ(nextDoc[DSChangeStream::kFullDocumentField]["_id"].getInt(), 789); + ASSERT_EQ( + nextDoc["lsid"].getDocument().toBson().woCompare(sessionInfo.getSessionId()->toBSON()), 0); + resumeToken = ResumeToken::parse(nextDoc["_id"].getDocument()).toDocument(); + ASSERT_DOCUMENT_EQ(resumeToken, + makeResumeToken(applyOpsOpTime2.getTimestamp(), + testUuid(), + V{D{}}, + ResumeTokenData::FromInvalidate::kNotFromInvalidate, + 2)); +} + +TEST_F(ChangeStreamStageTest, PreparedTransactionWithMultipleOplogEntries) { + OperationSessionInfo sessionInfo; + sessionInfo.setTxnNumber(1); + sessionInfo.setSessionId(makeLogicalSessionIdForTest()); + + // Create two applyOps entries that together represent a whole transaction. + repl::OpTime applyOpsOpTime1(Timestamp(99, 1), 1); + Document applyOps1{ + {"applyOps", + V{std::vector<Document>{ + D{{"op", "i"_sd}, {"ns", nss.ns()}, {"ui", testUuid()}, {"o", V{D{{"_id", 123}}}}}, + D{{"op", "i"_sd}, {"ns", nss.ns()}, {"ui", testUuid()}, {"o", V{D{{"_id", 456}}}}}, + }}}, + {"partialTxn", true}, + }; + + auto transactionEntry1 = makeOplogEntry(OpTypeEnum::kCommand, + nss.getCommandNS(), + applyOps1.toBson(), + testUuid(), + boost::none, // fromMigrate + boost::none, // o2 field + applyOpsOpTime1, + sessionInfo, + repl::OpTime()); + + repl::OpTime applyOpsOpTime2(Timestamp(99, 2), 1); + Document applyOps2{ + {"applyOps", + V{std::vector<Document>{ + D{{"op", "i"_sd}, {"ns", nss.ns()}, {"ui", testUuid()}, {"o", V{D{{"_id", 789}}}}}, + }}}, + {"prepare", true}, + }; + + auto transactionEntry2 = makeOplogEntry(OpTypeEnum::kCommand, + nss.getCommandNS(), + applyOps2.toBson(), + testUuid(), + boost::none, // fromMigrate + boost::none, // o2 field + applyOpsOpTime2, + sessionInfo, + applyOpsOpTime1); + + // Create an oplog entry representing the commit for the prepared transaction. + auto commitEntry = + repl::OplogEntry(kDefaultOpTime, // optime + 1LL, // hash + OpTypeEnum::kCommand, // opType + nss.getCommandNS(), // namespace + boost::none, // uuid + boost::none, // fromMigrate + repl::OplogEntry::kOplogVersion, // version + BSON("commitTransaction" << 1), // o + boost::none, // o2 + sessionInfo, // sessionInfo + boost::none, // upsert + boost::none, // wall clock time + boost::none, // statement id + applyOpsOpTime2, // optime of previous write within same transaction + boost::none, // pre-image optime + boost::none); // post-image optime + + // We do not use the checkTransformation() pattern that other tests use since we expect multiple + // documents to be returned from one applyOps. + auto stages = makeStages(commitEntry); + auto transform = stages[2].get(); + invariant(dynamic_cast<DocumentSourceChangeStreamTransform*>(transform) != nullptr); + + // Populate the MockTransactionHistoryEditor in reverse chronological order. + getExpCtx()->mongoProcessInterface = stdx::make_unique<MockMongoInterface>( + std::vector<FieldPath>{}, + std::vector<repl::OplogEntry>{commitEntry, transactionEntry2, transactionEntry1}); + + // We should get three documents from the change stream, based on the documents in the two + // applyOps entries. + auto next = transform->getNext(); + ASSERT(next.isAdvanced()); + auto nextDoc = next.releaseDocument(); + ASSERT_EQ(nextDoc[DSChangeStream::kTxnNumberField].getLong(), *sessionInfo.getTxnNumber()); + ASSERT_EQ(nextDoc[DSChangeStream::kOperationTypeField].getString(), + DSChangeStream::kInsertOpType); + ASSERT_EQ(nextDoc[DSChangeStream::kFullDocumentField]["_id"].getInt(), 123); + ASSERT_EQ( + nextDoc["lsid"].getDocument().toBson().woCompare(sessionInfo.getSessionId()->toBSON()), 0); + auto resumeToken = ResumeToken::parse(nextDoc["_id"].getDocument()).toDocument(); + ASSERT_DOCUMENT_EQ( + resumeToken, + makeResumeToken(kDefaultOpTime.getTimestamp(), // Timestamp of the commitCommand. + testUuid(), + V{D{}}, + ResumeTokenData::FromInvalidate::kNotFromInvalidate, + 0)); + + next = transform->getNext(); + ASSERT(next.isAdvanced()); + nextDoc = next.releaseDocument(); + ASSERT_EQ(nextDoc[DSChangeStream::kTxnNumberField].getLong(), *sessionInfo.getTxnNumber()); + ASSERT_EQ(nextDoc[DSChangeStream::kOperationTypeField].getString(), + DSChangeStream::kInsertOpType); + ASSERT_EQ(nextDoc[DSChangeStream::kFullDocumentField]["_id"].getInt(), 456); + ASSERT_EQ( + nextDoc["lsid"].getDocument().toBson().woCompare(sessionInfo.getSessionId()->toBSON()), 0); + resumeToken = ResumeToken::parse(nextDoc["_id"].getDocument()).toDocument(); + ASSERT_DOCUMENT_EQ( + resumeToken, + makeResumeToken(kDefaultOpTime.getTimestamp(), // Timestamp of the commitCommand. + testUuid(), + V{D{}}, + ResumeTokenData::FromInvalidate::kNotFromInvalidate, + 1)); + + next = transform->getNext(); + ASSERT(next.isAdvanced()); + nextDoc = next.releaseDocument(); + ASSERT_EQ(nextDoc[DSChangeStream::kTxnNumberField].getLong(), *sessionInfo.getTxnNumber()); + ASSERT_EQ(nextDoc[DSChangeStream::kOperationTypeField].getString(), + DSChangeStream::kInsertOpType); + ASSERT_EQ(nextDoc[DSChangeStream::kFullDocumentField]["_id"].getInt(), 789); + ASSERT_EQ( + nextDoc["lsid"].getDocument().toBson().woCompare(sessionInfo.getSessionId()->toBSON()), 0); + resumeToken = ResumeToken::parse(nextDoc["_id"].getDocument()).toDocument(); + ASSERT_DOCUMENT_EQ( + resumeToken, + makeResumeToken(kDefaultOpTime.getTimestamp(), // Timestamp of the commitCommand. + testUuid(), + V{D{}}, + ResumeTokenData::FromInvalidate::kNotFromInvalidate, + 2)); } TEST_F(ChangeStreamStageTest, TransformApplyOps) { diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index 1220286da9e..0773a2fb347 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -141,19 +141,15 @@ ResumeTokenData DocumentSourceChangeStreamTransform::getResumeToken(Value ts, Value uuid, Value documentKey) { ResumeTokenData resumeTokenData; - if (_txnContext) { + if (_txnIterator) { // We're in the middle of unwinding an 'applyOps'. // Use the clusterTime from the higher level applyOps - resumeTokenData.clusterTime = _txnContext->clusterTime; - - // 'pos' points to the _next_ applyOps index, so we must subtract one to get the index of - // the entry being examined right now. - invariant(_txnContext->pos >= 1); - resumeTokenData.applyOpsIndex = _txnContext->pos - 1; + resumeTokenData.clusterTime = _txnIterator->clusterTime(); + resumeTokenData.txnOpIndex = _txnIterator->txnOpIndex(); } else { resumeTokenData.clusterTime = ts.getTimestamp(); - resumeTokenData.applyOpsIndex = 0; + resumeTokenData.txnOpIndex = 0; } resumeTokenData.documentKey = documentKey; @@ -328,10 +324,10 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document auto resumeToken = ResumeToken(resumeTokenData).toDocument(); // Add some additional fields only relevant to transactions. - if (_txnContext) { + if (_txnIterator) { doc.addField(DocumentSourceChangeStream::kTxnNumberField, - Value(static_cast<long long>(_txnContext->txnNumber))); - doc.addField(DocumentSourceChangeStream::kLsidField, Value(_txnContext->lsid)); + Value(static_cast<long long>(_txnIterator->txnNumber()))); + doc.addField(DocumentSourceChangeStream::kLsidField, Value(_txnIterator->lsid())); } doc.addField(DocumentSourceChangeStream::kIdField, Value(resumeToken)); @@ -409,55 +405,6 @@ DocumentSource::GetModPathsReturn DocumentSourceChangeStreamTransform::getModifi return {DocumentSource::GetModPathsReturn::Type::kAllPaths, std::set<string>{}, {}}; } -void DocumentSourceChangeStreamTransform::initializeTransactionContext(const Document& input) { - // The only two commands we will see here are an applyOps or a commit, which both mean we - // need to open a "transaction context" representing a group of updates that all occurred at - // once as part of a transaction. If we already have a transaction context open, that would - // mean we are looking at an applyOps or commit nested within an applyOps, which is not - // allowed in the oplog. - invariant(!_txnContext); - - Value lsid = input["lsid"]; - checkValueType(lsid, "lsid", BSONType::Object); - - Value txnNumber = input["txnNumber"]; - checkValueType(txnNumber, "txnNumber", BSONType::NumberLong); - - Value ts = input[repl::OplogEntry::kTimestampFieldName]; - Timestamp txnApplyTime = ts.getTimestamp(); - - auto commandObj = input["o"].getDocument(); - Value applyOps = commandObj["applyOps"]; - if (!applyOps.missing()) { - // An "applyOps" command represents an immediately-committed transaction. We place the - // operations within the "applyOps" array directly into the transaction context. - applyOps = input.getNestedField("o.applyOps"); - } else { - invariant(!commandObj["commitTransaction"].missing()); - - // A "commit" command is the second part of a transaction that has been split up into - // two oplog entries. The lsid, txnNumber, and timestamp are in this entry, but the - // "applyOps" array is in a previous entry, which we must look up. - repl::OpTime opTime; - uassertStatusOK(bsonExtractOpTimeField(input.toBson(), "prevOpTime", &opTime)); - - auto applyOpsEntry = - pExpCtx->mongoProcessInterface->lookUpOplogEntryByOpTime(pExpCtx->opCtx, opTime); - invariant(applyOpsEntry.isCommand() && - (repl::OplogEntry::CommandType::kApplyOps == applyOpsEntry.getCommandType())); - invariant(applyOpsEntry.shouldPrepare()); - - auto bsonOp = applyOpsEntry.getOperationToApply(); - invariant(BSONType::Array == bsonOp["applyOps"].type()); - applyOps = Value(bsonOp["applyOps"]); - } - - checkValueType(applyOps, "applyOps", BSONType::Array); - invariant(applyOps.getArrayLength() > 0); - - _txnContext.emplace(applyOps, txnApplyTime, lsid.getDocument(), txnNumber.getLong()); -} - DocumentSource::GetNextResult DocumentSourceChangeStreamTransform::getNext() { pExpCtx->checkForInterrupt(); @@ -469,10 +416,11 @@ DocumentSource::GetNextResult DocumentSourceChangeStreamTransform::getNext() { while (1) { // If we're unwinding an 'applyOps' from a transaction, check if there are any documents we // have stored that can be returned. - if (_txnContext) { - if (auto next = extractNextApplyOpsEntry()) { + if (_txnIterator) { + if (auto next = _txnIterator->getNextTransactionOp(pExpCtx->opCtx)) { return applyTransformation(*next); } + _txnIterator = boost::none; } // Get the next input document. @@ -498,16 +446,103 @@ DocumentSource::GetNextResult DocumentSourceChangeStreamTransform::getNext() { return applyTransformation(doc); } - initializeTransactionContext(doc); + // The only two commands we will see here are an applyOps or a commit, which both mean we + // need to open a "transaction context" representing a group of updates that all occurred at + // once as part of a transaction. If we already have a transaction context open, that would + // mean we are looking at an applyOps or commit nested within an applyOps, which is not + // allowed in the oplog. + invariant(!_txnIterator); + _txnIterator.emplace(pExpCtx->opCtx, pExpCtx->mongoProcessInterface, doc, *_nsRegex); + + // Once we initialize the transaction iterator, we can loop back to the top in order to call + // 'getNextTransactionOp' on it. Note that is possible for the transaction iterator + // to be empty of any relevant operations, meaning that this loop may need to execute + // multiple times before it encounters a relevant change to return. + } +} + +DocumentSourceChangeStreamTransform::TransactionOpIterator::TransactionOpIterator( + OperationContext* opCtx, + std::shared_ptr<MongoProcessInterface> mongoProcessInterface, + const Document& input, + const pcrecpp::RE& nsRegex) + : _mongoProcessInterface(mongoProcessInterface), _nsRegex(nsRegex) { + Value lsidValue = input["lsid"]; + checkValueType(lsidValue, "lsid", BSONType::Object); + _lsid = lsidValue.getDocument(); + + Value txnNumberValue = input["txnNumber"]; + checkValueType(txnNumberValue, "txnNumber", BSONType::NumberLong); + _txnNumber = txnNumberValue.getLong(); + + // We want to parse the OpTime out of this document using the BSON OpTime parser. Instead of + // converting the entire Document back to BSON, we convert only the fields we need. + repl::OpTime txnOpTime = repl::OpTime::parse(BSON(repl::OpTime::kTimestampFieldName + << input[repl::OpTime::kTimestampFieldName] + << repl::OpTime::kTermFieldName + << input[repl::OpTime::kTermFieldName])); + _clusterTime = txnOpTime.getTimestamp(); + + auto commandObj = input["o"].getDocument(); + Value applyOps = commandObj["applyOps"]; + + if (!applyOps.missing()) { + // We found an applyOps that implicitly commits a transaction. We include it in the + // '_txnOplogEntries' stack of applyOps entries that the change stream should process as + // part of this transaction. There may be additional applyOps entries linked through the + // 'prevOpTime' field, which will also get added to '_txnOplogEntries' later in this + // function. Note that this style of transaction does not have a 'commitTransaction' + // command. + _txnOplogEntries.push(txnOpTime); + } else { + // This must be a "commitTransaction" command, which commits a prepared transaction. This + // style of transaction does not have an applyOps entry that implicitly commits it, as in + // the previous case. We're going to iterate through the other oplog entries in the + // transaction, but this entry does not have any updates in it, so we do not include it in + // the '_txnOplogEntries' stack. + invariant(!commandObj["commitTransaction"].missing()); + } - // Once we initialize the transaction context, we can loop back to the top in order to call - // 'extractNextApplyOpsEntry' on it. Note that is possible for the transaction context to be - // empty of any relevant operations, meaning that this loop may need to execute multiple - // times before it encounters a relevant change to return. + if (BSONType::Object == + input[repl::OplogEntry::kPrevWriteOpTimeInTransactionFieldName].getType()) { + // As with the 'txnOpTime' parsing above, we convert a portion of 'input' back to BSON in + // order to parse an OpTime, this time from the "prevOpTime" field. + repl::OpTime prevOpTime = repl::OpTime::parse( + input[repl::OplogEntry::kPrevWriteOpTimeInTransactionFieldName].getDocument().toBson()); + _collectAllOpTimesFromTransaction(opCtx, prevOpTime); } + + // Pop the first OpTime off the stack and use it to load the first oplog entry into the + // '_currentApplyOps' field. + invariant(_txnOplogEntries.size() > 0); + const auto firstTimestamp = _txnOplogEntries.top(); + _txnOplogEntries.pop(); + + if (firstTimestamp == txnOpTime) { + // This transaction consists of only one oplog entry, from which we have already extracted + // the "applyOps" array, so there is no need to do any more work. + invariant(_txnOplogEntries.size() == 0); + _currentApplyOps = std::move(applyOps); + } else { + // This transaction consists of multiple oplog entries; grab the chronologically first entry + // and extract its "applyOps" array. + auto firstApplyOpsEntry = _lookUpOplogEntryByOpTime(opCtx, firstTimestamp); + + auto bsonOp = firstApplyOpsEntry.getOperationToApply(); + invariant(BSONType::Array == bsonOp["applyOps"].type()); + _currentApplyOps = Value(bsonOp["applyOps"]); + } + + checkValueType(_currentApplyOps, "applyOps", BSONType::Array); + invariant(_currentApplyOps.getArrayLength() > 0); + + // Initialize iterators at the beginning of the transaction. + _currentApplyOpsIt = _currentApplyOps.getArray().begin(); + _txnOpIndex = 0; } -bool DocumentSourceChangeStreamTransform::isDocumentRelevant(const Document& d) { +bool DocumentSourceChangeStreamTransform::TransactionOpIterator::_isDocumentRelevant( + const Document& d) const { invariant( d["op"].getType() == BSONType::String, str::stream() @@ -519,21 +554,71 @@ bool DocumentSourceChangeStreamTransform::isDocumentRelevant(const Document& d) Value nsField = d["ns"]; invariant(!nsField.missing()); - return _nsRegex->PartialMatch(nsField.getString()); + return _nsRegex.PartialMatch(nsField.getString()); } -boost::optional<Document> DocumentSourceChangeStreamTransform::extractNextApplyOpsEntry() { +boost::optional<Document> +DocumentSourceChangeStreamTransform::TransactionOpIterator::getNextTransactionOp( + OperationContext* opCtx) { + while (true) { + while (_currentApplyOpsIt != _currentApplyOps.getArray().end()) { + Document d = (_currentApplyOpsIt++)->getDocument(); + ++_txnOpIndex; + if (_isDocumentRelevant(d)) { + return d; + } + } - while (_txnContext && _txnContext->pos < _txnContext->arr.size()) { - Document d = _txnContext->arr[_txnContext->pos++].getDocument(); - if (isDocumentRelevant(d)) { - return d; + if (_txnOplogEntries.empty()) { + // There are no more operations in this transaction. + return boost::none; } + + // We've processed all the operations in the previous applyOps entry, but we have a new one + // to process. + auto applyOpsEntry = _lookUpOplogEntryByOpTime(opCtx, _txnOplogEntries.top()); + _txnOplogEntries.pop(); + + auto bsonOp = applyOpsEntry.getOperationToApply(); + invariant(BSONType::Array == bsonOp["applyOps"].type()); + + _currentApplyOps = Value(bsonOp["applyOps"]); + _currentApplyOpsIt = _currentApplyOps.getArray().begin(); } +} + +repl::OplogEntry +DocumentSourceChangeStreamTransform::TransactionOpIterator::_lookUpOplogEntryByOpTime( + OperationContext* opCtx, repl::OpTime lookupTime) const { + invariant(!lookupTime.isNull()); + + std::unique_ptr<TransactionHistoryIteratorBase> iterator( + _mongoProcessInterface->createTransactionHistoryIterator(lookupTime)); + try { + return iterator->next(opCtx); + } catch (ExceptionFor<ErrorCodes::IncompleteTransactionHistory>& ex) { + ex.addContext( + "Oplog no longer has history necessary for $changeStream to observe operations from a " + "committed transaction."); + uasserted(ErrorCodes::ChangeStreamHistoryLost, ex.reason()); + } +} - _txnContext = boost::none; +void DocumentSourceChangeStreamTransform::TransactionOpIterator::_collectAllOpTimesFromTransaction( + OperationContext* opCtx, repl::OpTime firstOpTime) { + std::unique_ptr<TransactionHistoryIteratorBase> iterator( + _mongoProcessInterface->createTransactionHistoryIterator(firstOpTime)); - return boost::none; + try { + while (iterator->hasNext()) { + _txnOplogEntries.push(iterator->nextOpTime(opCtx)); + } + } catch (ExceptionFor<ErrorCodes::IncompleteTransactionHistory>& ex) { + ex.addContext( + "Oplog no longer has history necessary for $changeStream to observe operations from a " + "committed transaction."); + uasserted(ErrorCodes::ChangeStreamHistoryLost, ex.reason()); + } } } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.h b/src/mongo/db/pipeline/document_source_change_stream_transform.h index 415dacac16d..c20f5864e67 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.h +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.h @@ -86,60 +86,116 @@ private: }; /** - * Represents the DocumentSource's state if it's currently reading from an 'applyOps' entry - * which was created as part of a transaction. + * Represents the DocumentSource's state if it's currently reading from a transaction. + * Transaction operations are packed into 'applyOps' entries in the oplog. + * + * This iterator returns operations from a transaction that are relevant to the change stream in + * the same order they appear on the oplog (chronological order). Note that the + * TransactionHistoryIterator, which this class uses to query the oplog, returns the oplog + * entries in _reverse_ order. We internally reverse the output of the + * TransactionHistoryIterator in order to get the desired order. + * + * Note that our view of a transaction in the oplog is like an array of arrays with an "outer" + * array of applyOps entries represented by the 'txnOplogEntries' field and "inner" arrays of + * applyOps entries. Each applyOps entry gets loaded on demand, with only a single applyOps + * loaded into '_applyOpsValue' and '_currentApplyOps' at any time. + * + * Likewise, there are "outer" and "inner" iterators, 'txnOplogEntriesIt' and + * '_currentApplyOpsIt' respectively, that together reference the current transaction operation. */ - struct TransactionContext { - TransactionContext(const TransactionContext&) = delete; - TransactionContext& operator=(const TransactionContext&) = delete; - - // The array of oplog entries from an 'applyOps' representing the transaction. Only kept - // around so that the underlying memory of 'arr' isn't freed. - Value opArray; - - // Array representation of the 'opArray' field. Stored like this to avoid re-typechecking - // each call to next(), or copying the entire array. - const std::vector<Value>& arr; - - // Our current place in the 'opArray'. - size_t pos; - - // The clusterTime of the applyOps. - Timestamp clusterTime; - - // Fields that were taken from the 'applyOps' oplog entry. - Document lsid; - TxnNumber txnNumber; - - TransactionContext(const Value& applyOpsVal, - Timestamp ts, - const Document& lsidDoc, - TxnNumber n) - : opArray(applyOpsVal), - arr(opArray.getArray()), - pos(0), - clusterTime(ts), - lsid(lsidDoc), - txnNumber(n) {} + class TransactionOpIterator { + public: + TransactionOpIterator(const TransactionOpIterator&) = delete; + TransactionOpIterator& operator=(const TransactionOpIterator&) = delete; + + TransactionOpIterator(OperationContext* opCtx, + std::shared_ptr<MongoProcessInterface> mongoProcessInterface, + const Document& input, + const pcrecpp::RE& nsRegex); + + // Returns the index for the last operation returned by getNextTransactionOp(). It is + // illegal to call this before calling getNextTransactionOp() at least once. + size_t txnOpIndex() const { + // 'txnOpIndex' points to the _next_ transaction index, so we must subtract one to get + // the index of the entry being examined right now. + invariant(_txnOpIndex >= 1); + return _txnOpIndex - 1; + } + + Timestamp clusterTime() const { + return _clusterTime; + } + + Document lsid() const { + return _lsid; + } + + TxnNumber txnNumber() const { + return _txnNumber; + } + + // Extract one Document from the transaction and advance the iterator. Returns boost::none + // to indicate that there are no operations left. + boost::optional<Document> getNextTransactionOp(OperationContext* opCtx); + + private: + // Perform a find on the oplog to find an OplogEntry by its OpTime. + repl::OplogEntry _lookUpOplogEntryByOpTime(OperationContext* opCtx, + repl::OpTime lookupTime) const; + + // Helper for getNextTransactionOp(). Checks the namespace of the given document to see if + // it should be returned in the change stream. + bool _isDocumentRelevant(const Document& d) const; + + // Traverse backwards through the oplog by starting at the entry at 'firstOpTime' and + // following "prevOpTime" links until reaching the terminal "prevOpTime" value, and push the + // OpTime value to '_txnOplogEntries' for each entry traversed, including the 'firstOpTime' + // entry. Note that we follow the oplog links _backwards_ through the oplog (i.e., in + // reverse chronological order) but because this is a stack, the iterator will process them + // in the opposite order, allowing iteration to proceed fowards and return operations in + // chronological order. + void _collectAllOpTimesFromTransaction(OperationContext* opCtx, repl::OpTime firstOpTime); + + // This stack contains the timestamps for all oplog entries in this transaction that have + // yet to be processed by the iterator. Each time the TransactionOpIterator finishes + // iterating the contents of the '_currentApplyOps' array, it pops an entry off the stack + // and uses it to load the next applyOps entry in the '_currentApplyOps' array, meaning that + // the top entry is always the next entry to be processed. From top-to-bottom, the stack is + // ordered chronologically, in the same order as entries appear in the oplog. + std::stack<repl::OpTime> _txnOplogEntries; + + // The '_currentapplyOps' stores the applyOps array that the TransactionOpIterator is + // currently iterating. + Value _currentApplyOps; + + // This iterator references the next operation within the '_currentApplyOps' array that the + // the getNextTransactionOp() method will return. When there are no more operations to + // iterate, this iterator will point to the array's "end" sentinel, and '_txnOplogEntries' + // will be empty. + typename std::vector<Value>::const_iterator _currentApplyOpsIt; + + // Our current place within the entire transaction, which may consist of multiple 'applyOps' + // arrays. + size_t _txnOpIndex; + + // The clusterTime of the _applyOps. + Timestamp _clusterTime; + + // Fields that were taken from the '_applyOps' oplog entry. + Document _lsid; + TxnNumber _txnNumber; + + // Used for traversing the oplog with TransactionHistoryInterface. + std::shared_ptr<MongoProcessInterface> _mongoProcessInterface; + + // An operation is relevant to a change stream iff its namespace matches this regex. + const pcrecpp::RE& _nsRegex; }; /** * Helper used for determining what resume token to return. */ ResumeTokenData getResumeToken(Value ts, Value uuid, Value documentKey); - void initializeTransactionContext(const Document& input); - - /** - * Gets the next relevant applyOps entry that should be returned. If there is none, returns - * empty document. - */ - boost::optional<Document> extractNextApplyOpsEntry(); - - /** - * Helper for extractNextApplyOpsEntry(). Checks the namespace of the given document to see - * if it should be returned in the change stream. - */ - bool isDocumentRelevant(const Document& d); BSONObj _changeStreamSpec; @@ -151,8 +207,8 @@ private: // boost::none, and an exact string equality check is used instead. boost::optional<pcrecpp::RE> _nsRegex; - // Represents if the current 'applyOps' we're unwinding, if any. - boost::optional<TransactionContext> _txnContext; + // Represents the current transaction we're unwinding, if any. + boost::optional<TransactionOpIterator> _txnIterator; // Set to true if this transformation stage can be run on the collectionless namespace. bool _isIndependentOfAnyCollection; diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_check_resume_token.cpp index 3ab2ec80191..c3383dfb9d9 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token.cpp @@ -42,17 +42,17 @@ using ResumeStatus = DocumentSourceEnsureResumeTokenPresent::ResumeStatus; // the client's resume token, ResumeStatus::kCheckNextDoc if it is older than the client's token, // and ResumeToken::kCannotResume if it is more recent than the client's resume token (indicating // that we will never see the token). If the resume token's documentKey contains only the _id field -// while the pipeline documentKey contains additional fields, then the collection has become -// sharded since the resume token was generated. In that case, we relax the requirements such that -// only the timestamp, version, applyOpsIndex, UUID and documentKey._id need match. This remains -// correct, since the only circumstances under which the resume token omits the shard key is if it -// was generated either (1) before the collection was sharded, (2) after the collection was sharded -// but before the primary shard became aware of that fact, implying that it was before the first -// chunk moved off the shard, or (3) by a malicious client who has constructed their own resume -// token. In the first two cases, we can be guaranteed that the _id is unique and the stream can -// therefore be resumed seamlessly; in the third case, the worst that can happen is that some -// entries are missed or duplicated. Note that the simple collation is used to compare the resume -// tokens, and that we purposefully avoid the user's requested collation if present. +// while the pipeline documentKey contains additional fields, then the collection has become sharded +// since the resume token was generated. In that case, we relax the requirements such that only the +// timestamp, version, txnOpIndex, UUID and documentKey._id need match. This remains correct, since +// the only circumstances under which the resume token omits the shard key is if it was generated +// either (1) before the collection was sharded, (2) after the collection was sharded but before the +// primary shard became aware of that fact, implying that it was before the first chunk moved off +// the shard, or (3) by a malicious client who has constructed their own resume token. In the first +// two cases, we can be guaranteed that the _id is unique and the stream can therefore be resumed +// seamlessly; in the third case, the worst that can happen is that some entries are missed or +// duplicated. Note that the simple collation is used to compare the resume tokens, and that we +// purposefully avoid the user's requested collation if present. ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionContext>& expCtx, const Document& documentFromResumedStream, const ResumeTokenData& tokenDataFromClient) { @@ -78,14 +78,14 @@ ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionConte : ResumeStatus::kCheckNextDoc; } - if (tokenDataFromResumedStream.applyOpsIndex < tokenDataFromClient.applyOpsIndex) { + if (tokenDataFromResumedStream.txnOpIndex < tokenDataFromClient.txnOpIndex) { return ResumeStatus::kCheckNextDoc; - } else if (tokenDataFromResumedStream.applyOpsIndex > tokenDataFromClient.applyOpsIndex) { - // This could happen if the client provided an applyOpsIndex of 0, yet the 0th document in - // the applyOps was irrelevant (meaning it was an operation on a collection or DB not being + } else if (tokenDataFromResumedStream.txnOpIndex > tokenDataFromClient.txnOpIndex) { + // This could happen if the client provided a txnOpIndex of 0, yet the 0th document in the + // applyOps was irrelevant (meaning it was an operation on a collection or DB not being // watched). If we are looking for the resume token on a shard then this simply means that // the resume token may be on a different shard; otherwise, it indicates a corrupt token. - uassert(50792, "Invalid resumeToken: applyOpsIndex was skipped", expCtx->needsMerge); + uassert(50792, "Invalid resumeToken: txnOpIndex was skipped", expCtx->needsMerge); // We are running on a merging shard. Signal that we have read beyond the resume token. return ResumeStatus::kSurpassedToken; } @@ -96,7 +96,7 @@ ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionConte // resumable; we are past the point in the stream where the token should have appeared. if (tokenDataFromResumedStream.uuid != tokenDataFromClient.uuid) { // If we are running on a replica set deployment, we don't ever expect to see identical time - // stamps and applyOpsIndex but differing UUIDs, and we reject the resume attempt at once. + // stamps and txnOpIndex but differing UUIDs, and we reject the resume attempt at once. if (!expCtx->inMongos && !expCtx->needsMerge) { return ResumeStatus::kSurpassedToken; } diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp index ff7c7674425..85d424b3bdd 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp @@ -63,19 +63,19 @@ public: protected: /** * Pushes a document with a resume token corresponding to the given timestamp, version, - * applyOpsIndex, docKey, and namespace into the mock queue. + * txnOpIndex, docKey, and namespace into the mock queue. */ void addDocument( - Timestamp ts, int version, std::size_t applyOpsIndex, Document docKey, UUID uuid) { + Timestamp ts, int version, std::size_t txnOpIndex, Document docKey, UUID uuid) { _mock->push_back( Document{{"_id", - ResumeToken(ResumeTokenData(ts, version, applyOpsIndex, uuid, Value(docKey))) + ResumeToken(ResumeTokenData(ts, version, txnOpIndex, uuid, Value(docKey))) .toDocument()}}); } /** * Pushes a document with a resume token corresponding to the given timestamp, version, - * applyOpsIndex, docKey, and namespace into the mock queue. + * txnOpIndex, docKey, and namespace into the mock queue. */ void addDocument(Timestamp ts, Document docKey, UUID uuid = testUuid()) { addDocument(ts, 0, 0, docKey, uuid); @@ -99,11 +99,11 @@ protected: intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createCheckResumeToken( Timestamp ts, int version, - std::size_t applyOpsIndex, + std::size_t txnOpIndex, boost::optional<Document> docKey, UUID uuid) { auto checkResumeToken = DocumentSourceEnsureResumeTokenPresent::create( - getExpCtx(), {ts, version, applyOpsIndex, uuid, docKey ? Value(*docKey) : Value()}); + getExpCtx(), {ts, version, txnOpIndex, uuid, docKey ? Value(*docKey) : Value()}); checkResumeToken->setSource(_mock.get()); return checkResumeToken; } @@ -409,7 +409,7 @@ TEST_F(CheckResumeTokenTest, ASSERT_THROWS_CODE(checkResumeToken->getNext(), AssertionException, 40585); } -TEST_F(CheckResumeTokenTest, ShouldSkipResumeTokensWithEarlierApplyOpsIndex) { +TEST_F(CheckResumeTokenTest, ShouldSkipResumeTokensWithEarlierTxnOpIndex) { Timestamp resumeTimestamp(100, 1); // Create an ordered array of 3 UUIDs. diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h index 1e19144b4f5..87dce6017e9 100644 --- a/src/mongo/db/pipeline/mongo_process_interface.h +++ b/src/mongo/db/pipeline/mongo_process_interface.h @@ -59,6 +59,7 @@ namespace mongo { class ExpressionContext; class Pipeline; class PipelineDeleter; +class TransactionHistoryIteratorBase; /** * Any functionality needed by an aggregation stage that is either context specific to a mongod or @@ -119,10 +120,11 @@ public: virtual DBClientBase* directClient() = 0; /** - * Query the oplog for an entry with a matching OpTime. + * Creates a new TransactionHistoryIterator object. Only applicable in processes which support + * locally traversing the oplog. */ - virtual repl::OplogEntry lookUpOplogEntryByOpTime(OperationContext* opCtx, - repl::OpTime lookupTime) = 0; + virtual std::unique_ptr<TransactionHistoryIteratorBase> createTransactionHistoryIterator( + repl::OpTime time) const = 0; /** * Note that in some rare cases this could return a false negative but will never return a false diff --git a/src/mongo/db/pipeline/mongos_process_interface.h b/src/mongo/db/pipeline/mongos_process_interface.h index 16780eb1398..4c3b06ad30c 100644 --- a/src/mongo/db/pipeline/mongos_process_interface.h +++ b/src/mongo/db/pipeline/mongos_process_interface.h @@ -89,8 +89,8 @@ public: std::vector<GenericCursor> getIdleCursors(const boost::intrusive_ptr<ExpressionContext>& expCtx, CurrentOpUserMode userMode) const final; - repl::OplogEntry lookUpOplogEntryByOpTime(OperationContext* opCtx, - repl::OpTime lookupTime) final { + std::unique_ptr<TransactionHistoryIteratorBase> createTransactionHistoryIterator( + repl::OpTime time) const override { MONGO_UNREACHABLE; } diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp index ca12adc1640..7d493a6ba7a 100644 --- a/src/mongo/db/pipeline/process_interface_standalone.cpp +++ b/src/mongo/db/pipeline/process_interface_standalone.cpp @@ -147,28 +147,11 @@ DBClientBase* MongoInterfaceStandalone::directClient() { return &_client; } -repl::OplogEntry MongoInterfaceStandalone::lookUpOplogEntryByOpTime(OperationContext* opCtx, - repl::OpTime lookupTime) { - invariant(!lookupTime.isNull()); - +std::unique_ptr<TransactionHistoryIteratorBase> +MongoInterfaceStandalone::createTransactionHistoryIterator(repl::OpTime time) const { bool permitYield = true; - TransactionHistoryIterator iterator(lookupTime, permitYield); - try { - auto result = iterator.next(opCtx); - - // This function is intended to link a "commit" command to its corresponding "applyOps" - // command, which represents a prepared transaction. There should be no additional entries - // in the transaction's chain of operations. Note that when the oplog changes gated by - // 'useMultipleOplogEntryFormatForTransactions' become permanent, these assumptions about - // iterating transactions will no longer hold. - invariant(!iterator.hasNext()); - return result; - } catch (ExceptionFor<ErrorCodes::IncompleteTransactionHistory>& ex) { - ex.addContext( - "Oplog no longer has history necessary for $changeStream to observe operations from a " - "committed transaction."); - uasserted(ErrorCodes::ChangeStreamHistoryLost, ex.reason()); - } + return std::unique_ptr<TransactionHistoryIteratorBase>( + new TransactionHistoryIterator(time, permitYield)); } bool MongoInterfaceStandalone::isSharded(OperationContext* opCtx, const NamespaceString& nss) { diff --git a/src/mongo/db/pipeline/process_interface_standalone.h b/src/mongo/db/pipeline/process_interface_standalone.h index d6b15346f97..9c9fc301c18 100644 --- a/src/mongo/db/pipeline/process_interface_standalone.h +++ b/src/mongo/db/pipeline/process_interface_standalone.h @@ -54,8 +54,8 @@ public: void setOperationContext(OperationContext* opCtx) final; DBClientBase* directClient() final; - virtual repl::OplogEntry lookUpOplogEntryByOpTime(OperationContext* opCtx, - repl::OpTime lookupTime) final; + std::unique_ptr<TransactionHistoryIteratorBase> createTransactionHistoryIterator( + repl::OpTime time) const final; bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final; void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, diff --git a/src/mongo/db/pipeline/resume_token.cpp b/src/mongo/db/pipeline/resume_token.cpp index e9b06a74e9e..02a3fdbccf3 100644 --- a/src/mongo/db/pipeline/resume_token.cpp +++ b/src/mongo/db/pipeline/resume_token.cpp @@ -59,7 +59,7 @@ ResumeTokenData makeHighWaterMarkResumeTokenData(Timestamp clusterTime, bool ResumeTokenData::operator==(const ResumeTokenData& other) const { return clusterTime == other.clusterTime && version == other.version && - tokenType == other.tokenType && applyOpsIndex == other.applyOpsIndex && + tokenType == other.tokenType && txnOpIndex == other.txnOpIndex && fromInvalidate == other.fromInvalidate && uuid == other.uuid && (Value::compare(this->documentKey, other.documentKey, nullptr) == 0); } @@ -70,7 +70,7 @@ std::ostream& operator<<(std::ostream& out, const ResumeTokenData& tokenData) { if (tokenData.version > 0) { out << ", tokenType: " << tokenData.tokenType; } - out << ", applyOpsIndex: " << tokenData.applyOpsIndex; + out << ", txnOpIndex: " << tokenData.txnOpIndex; if (tokenData.version > 0) { out << ", fromInvalidate: " << static_cast<bool>(tokenData.fromInvalidate); } @@ -95,8 +95,8 @@ ResumeToken::ResumeToken(const Document& resumeDoc) { } // We encode the resume token as a KeyString with the sequence: -// clusterTime, version, applyOpsIndex, fromInvalidate, uuid, documentKey -// Only the clusterTime, version, applyOpsIndex, and fromInvalidate are required. +// clusterTime, version, txnOpIndex, fromInvalidate, uuid, documentKey Only the clusterTime, +// version, txnOpIndex, and fromInvalidate are required. ResumeToken::ResumeToken(const ResumeTokenData& data) { BSONObjBuilder builder; builder.append("", data.clusterTime); @@ -104,7 +104,7 @@ ResumeToken::ResumeToken(const ResumeTokenData& data) { if (data.version >= 1) { builder.appendNumber("", data.tokenType); } - builder.appendNumber("", data.applyOpsIndex); + builder.appendNumber("", data.txnOpIndex); if (data.version >= 1) { builder.appendBool("", data.fromInvalidate); } @@ -186,15 +186,15 @@ ResumeTokenData ResumeToken::getData() const { result.tokenType = static_cast<ResumeTokenData::TokenType>(typeInt); } - // Next comes the applyOps index. - uassert(50793, "Resume Token does not contain applyOpsIndex", i.more()); - auto applyOpsElt = i.next(); + // Next comes the txnOpIndex value. + uassert(50793, "Resume Token does not contain txnOpIndex", i.more()); + auto txnOpIndexElt = i.next(); uassert(50855, - "Resume Token applyOpsIndex is not an integer", - applyOpsElt.type() == BSONType::NumberInt); - const int applyOpsInd = applyOpsElt.numberInt(); - uassert(50794, "Invalid Resume Token: applyOpsIndex should be non-negative", applyOpsInd >= 0); - result.applyOpsIndex = applyOpsInd; + "Resume Token txnOpIndex is not an integer", + txnOpIndexElt.type() == BSONType::NumberInt); + const int txnOpIndexInd = txnOpIndexElt.numberInt(); + uassert(50794, "Invalid Resume Token: txnOpIndex should be non-negative", txnOpIndexInd >= 0); + result.txnOpIndex = txnOpIndexInd; if (result.version >= 1) { // The 'fromInvalidate' bool was added in version 1 resume tokens. We don't expect to see it diff --git a/src/mongo/db/pipeline/resume_token.h b/src/mongo/db/pipeline/resume_token.h index 52ae1b1ab1e..236b39aa39a 100644 --- a/src/mongo/db/pipeline/resume_token.h +++ b/src/mongo/db/pipeline/resume_token.h @@ -60,12 +60,12 @@ struct ResumeTokenData { ResumeTokenData(){}; ResumeTokenData(Timestamp clusterTimeIn, int versionIn, - size_t applyOpsIndexIn, + size_t txnOpIndexIn, const boost::optional<UUID>& uuidIn, Value documentKeyIn) : clusterTime(clusterTimeIn), version(versionIn), - applyOpsIndex(applyOpsIndexIn), + txnOpIndex(txnOpIndexIn), uuid(uuidIn), documentKey(std::move(documentKeyIn)){}; @@ -77,7 +77,11 @@ struct ResumeTokenData { Timestamp clusterTime; int version = 1; TokenType tokenType = TokenType::kEventToken; - size_t applyOpsIndex = 0; + // When a resume token references an operation in a transaction, the 'clusterTime' stores the + // commit time of the transaction, and the 'txnOpIndex' field stores the index of the operation + // within its transaction. Operations that are not in a transaction always have a value of 0 for + // this field. + size_t txnOpIndex = 0; // Flag to indicate that this resume token is from an "invalidate" entry. This will not be set // on a token from a command that *would* invalidate a change stream, but rather the invalidate // notification itself. @@ -93,7 +97,7 @@ std::ostream& operator<<(std::ostream& out, const ResumeTokenData& tokenData); * This token has the following format: * { * _data: String, A hex encoding of the binary generated by keystring encoding the clusterTime, - * version, applyOpsIndex, UUID, then documentKey in that order. + * version, txnOpIndex, UUID, then documentKey in that order. * _typeBits: BinData - The keystring type bits used for deserialization. * } * The _data field data is encoded such that string comparisons provide the correct ordering of diff --git a/src/mongo/db/pipeline/resume_token_test.cpp b/src/mongo/db/pipeline/resume_token_test.cpp index b52d53764dc..72894880953 100644 --- a/src/mongo/db/pipeline/resume_token_test.cpp +++ b/src/mongo/db/pipeline/resume_token_test.cpp @@ -257,20 +257,20 @@ TEST(ResumeToken, WrongVersionToken) { ASSERT_THROWS(rtToken.getData(), AssertionException); } -TEST(ResumeToken, InvalidApplyOpsIndex) { +TEST(ResumeToken, InvalidTxnOpIndex) { Timestamp ts(1001, 3); ResumeTokenData resumeTokenDataIn; resumeTokenDataIn.clusterTime = ts; - resumeTokenDataIn.applyOpsIndex = 1234; + resumeTokenDataIn.txnOpIndex = 1234; - // Should round trip with a non-negative applyOpsIndex. + // Should round trip with a non-negative txnOpIndex. auto rtToken = ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument().toBson()); ResumeTokenData tokenData = rtToken.getData(); ASSERT_EQ(resumeTokenDataIn, tokenData); - // Should fail with a negative applyOpsIndex. - resumeTokenDataIn.applyOpsIndex = std::numeric_limits<size_t>::max(); + // Should fail with a negative txnOpIndex. + resumeTokenDataIn.txnOpIndex = std::numeric_limits<size_t>::max(); rtToken = ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument().toBson()); ASSERT_THROWS(rtToken.getData(), AssertionException); @@ -278,7 +278,7 @@ TEST(ResumeToken, InvalidApplyOpsIndex) { TEST(ResumeToken, StringEncodingSortsCorrectly) { // Make sure that the string encoding of the resume tokens will compare in the correct order, - // namely timestamp, version, applyOpsIndex, uuid, then documentKey. + // namely timestamp, version, txnOpIndex, uuid, then documentKey. Timestamp ts2_2(2, 2); Timestamp ts10_4(10, 4); Timestamp ts10_5(10, 5); @@ -310,8 +310,7 @@ TEST(ResumeToken, StringEncodingSortsCorrectly) { assertLt({ts2_2, 0, 0, boost::none, Value()}, {ts2_2, 1, 0, boost::none, Value()}); assertLt({ts10_4, 5, 0, boost::none, Value()}, {ts10_4, 10, 0, boost::none, Value()}); - // Test that the Timestamp is more important than the version, applyOpsIndex, UUID and - // documentKey. + // Test that the Timestamp is more important than the version, txnOpIndex, UUID and documentKey. assertLt({ts10_4, 0, 0, lower_uuid, Value(Document{{"_id", 0}})}, {ts10_5, 0, 0, lower_uuid, Value(Document{{"_id", 0}})}); assertLt({ts2_2, 0, 0, lower_uuid, Value(Document{{"_id", 0}})}, @@ -331,14 +330,13 @@ TEST(ResumeToken, StringEncodingSortsCorrectly) { assertLt({ts10_4, 1, 0, lower_uuid, Value(Document{{"_id", 1}})}, {ts10_4, 2, 0, lower_uuid, Value(Document{{"_id", 0}})}); - // Test that when the Timestamp and version are the same, the applyOpsIndex breaks the tie. + // Test that when the Timestamp and version are the same, the txnOpIndex breaks the tie. assertLt({ts10_4, 1, 6, lower_uuid, Value(Document{{"_id", 0}})}, {ts10_4, 1, 50, lower_uuid, Value(Document{{"_id", 0}})}); assertLt({ts2_2, 0, 0, higher_uuid, Value(Document{{"_id", 0}})}, {ts2_2, 0, 4, lower_uuid, Value(Document{{"_id", 0}})}); - // Test that when the Timestamp, version, and applyOpsIndex are the same, the UUID breaks the - // tie. + // Test that when the Timestamp, version, and txnOpIndex are the same, the UUID breaks the tie. assertLt({ts2_2, 0, 0, lower_uuid, Value(Document{{"_id", 0}})}, {ts2_2, 0, 0, higher_uuid, Value(Document{{"_id", 0}})}); assertLt({ts10_4, 0, 0, lower_uuid, Value(Document{{"_id", 0}})}, @@ -350,7 +348,7 @@ TEST(ResumeToken, StringEncodingSortsCorrectly) { assertLt({ts10_4, 0, 0, lower_uuid, Value(Document{{"_id", 1}})}, {ts10_4, 0, 0, higher_uuid, Value(Document{{"_id", 2}})}); - // Test that when the Timestamp, version, applyOpsIndex, and UUID are the same, the documentKey + // Test that when the Timestamp, version, txnOpIndex, and UUID are the same, the documentKey // breaks the tie. assertLt({ts2_2, 0, 0, lower_uuid, Value(Document{{"_id", 0}})}, {ts2_2, 0, 0, lower_uuid, Value(Document{{"_id", 1}})}); diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h index 4647a8e2cb8..77f76eb5c9b 100644 --- a/src/mongo/db/pipeline/stub_mongo_process_interface.h +++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h @@ -54,8 +54,8 @@ public: MONGO_UNREACHABLE; } - repl::OplogEntry lookUpOplogEntryByOpTime(OperationContext* opCtx, - repl::OpTime lookupTime) override { + std::unique_ptr<TransactionHistoryIteratorBase> createTransactionHistoryIterator( + repl::OpTime time) const override { MONGO_UNREACHABLE; } diff --git a/src/mongo/db/repl/oplog_interface_mock.cpp b/src/mongo/db/repl/oplog_interface_mock.cpp index 8b96460e1e2..6352fa7566a 100644 --- a/src/mongo/db/repl/oplog_interface_mock.cpp +++ b/src/mongo/db/repl/oplog_interface_mock.cpp @@ -97,6 +97,10 @@ public: MONGO_UNREACHABLE; } + repl::OpTime nextOpTime(OperationContext*) override { + MONGO_UNREACHABLE; + } + virtual ~TransactionHistoryIteratorMock() {} bool hasNext() const override { diff --git a/src/mongo/db/transaction_history_iterator.cpp b/src/mongo/db/transaction_history_iterator.cpp index 8d4e40cc87b..901fc41d05b 100644 --- a/src/mongo/db/transaction_history_iterator.cpp +++ b/src/mongo/db/transaction_history_iterator.cpp @@ -47,7 +47,10 @@ namespace { /** * Query the oplog for an entry with the given timestamp. */ -BSONObj findOneOplogEntry(OperationContext* opCtx, const repl::OpTime& opTime, bool permitYield) { +BSONObj findOneOplogEntry(OperationContext* opCtx, + const repl::OpTime& opTime, + bool permitYield, + bool prevOpOnly = false) { BSONObj oplogBSON; invariant(!opTime.isNull()); @@ -55,6 +58,11 @@ BSONObj findOneOplogEntry(OperationContext* opCtx, const repl::OpTime& opTime, b qr->setFilter(opTime.asQuery()); qr->setOplogReplay(true); // QueryOption_OplogReplay + if (prevOpOnly) { + qr->setProj( + BSON("_id" << 0 << repl::OplogEntry::kPrevWriteOpTimeInTransactionFieldName << 1LL)); + } + const boost::intrusive_ptr<ExpressionContext> expCtx; auto statusWithCQ = CanonicalQuery::canonicalize(opCtx, @@ -106,15 +114,30 @@ repl::OplogEntry TransactionHistoryIterator::next(OperationContext* opCtx) { auto oplogEntry = uassertStatusOK(repl::OplogEntry::parse(oplogBSON)); const auto& oplogPrevTsOption = oplogEntry.getPrevWriteOpTimeInTransaction(); - uassert( - ErrorCodes::FailedToParse, - str::stream() << "Missing prevTs field on oplog entry of previous write in transaction: " - << redact(oplogBSON), - oplogPrevTsOption); + uassert(ErrorCodes::FailedToParse, + str::stream() + << "Missing prevOpTime field on oplog entry of previous write in transaction: " + << redact(oplogBSON), + oplogPrevTsOption); _nextOpTime = oplogPrevTsOption.value(); return oplogEntry; } +repl::OpTime TransactionHistoryIterator::nextOpTime(OperationContext* opCtx) { + BSONObj oplogBSON = findOneOplogEntry(opCtx, _nextOpTime, _permitYield, true /* prevOpOnly */); + + auto prevOpTime = oplogBSON[repl::OplogEntry::kPrevWriteOpTimeInTransactionFieldName]; + uassert(ErrorCodes::FailedToParse, + str::stream() + << "Missing prevOpTime field on oplog entry of previous write in transaction: " + << redact(oplogBSON), + !prevOpTime.eoo() && prevOpTime.isABSONObj()); + + auto returnOpTime = _nextOpTime; + _nextOpTime = repl::OpTime::parse(prevOpTime.Obj()); + return returnOpTime; +} + } // namespace mongo diff --git a/src/mongo/db/transaction_history_iterator.h b/src/mongo/db/transaction_history_iterator.h index daca8370f89..f3583eebc44 100644 --- a/src/mongo/db/transaction_history_iterator.h +++ b/src/mongo/db/transaction_history_iterator.h @@ -36,8 +36,8 @@ namespace mongo { class OperationContext; /** - * An iterator class that can traverse through the oplog entries that are linked via the prevTs - * field. + * An iterator class that traverses backwards through a transaction's oplog entries by following the + * "prevOpTime" link in each entry. */ class TransactionHistoryIteratorBase { public: @@ -49,12 +49,17 @@ public: virtual bool hasNext() const = 0; /** - * Returns the next oplog entry. + * Returns an oplog entry and advances the iterator one step back through the oplog. * Should not be called if hasNext is false. * Throws if next oplog entry is in a unrecognized format or if it can't find the next oplog * entry. */ virtual repl::OplogEntry next(OperationContext* opCtx) = 0; + + /** + * Same as next() but returns only the OpTime, instead of the entire entry. + */ + virtual repl::OpTime nextOpTime(OperationContext* opCtx) = 0; }; class TransactionHistoryIterator : public TransactionHistoryIteratorBase { @@ -66,7 +71,8 @@ public: virtual ~TransactionHistoryIterator() = default; bool hasNext() const override; - repl::OplogEntry next(OperationContext* opCtx); + repl::OplogEntry next(OperationContext* opCtx) override; + repl::OpTime nextOpTime(OperationContext* opCtx) override; private: // Clients can set this to allow PlanExecutors created by this TransactionHistoryIterator to |